Skip to content

Commit e8dff9d

Browse files
committed
fix error
1 parent 0bddfd5 commit e8dff9d

5 files changed

Lines changed: 12 additions & 32 deletions

File tree

fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -214,7 +214,7 @@ public void registerCoordinatorServer(CoordinatorAddress coordinatorAddress) thr
214214
* registerCoordinatorLeader(). This is to ensure the coordinator get and update the coordinator
215215
* epoch and coordinator epoch zk version.
216216
*/
217-
public Optional<ZkEpoch> fenceBecomeCoordinatorLeader(int coordinatorId) throws Exception {
217+
public Optional<ZkEpoch> fenceBecomeCoordinatorLeader(String coordinatorId) throws Exception {
218218
ensureEpochZnodeExists();
219219

220220
try {

fluss-server/src/main/java/org/apache/fluss/server/zk/data/CoordinatorAddressJsonSerde.java

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ public class CoordinatorAddressJsonSerde
3535

3636
public static final CoordinatorAddressJsonSerde INSTANCE = new CoordinatorAddressJsonSerde();
3737
private static final String VERSION_KEY = "version";
38-
private static final int VERSION = 3;
38+
private static final int VERSION = 2;
3939

4040
private static final String ID = "id";
4141
private static final String HOST = "host";
@@ -51,7 +51,7 @@ public void serialize(CoordinatorAddress coordinatorAddress, JsonGenerator gener
5151
throws IOException {
5252
generator.writeStartObject();
5353
writeVersion(generator);
54-
generator.writeNumberField(ID, coordinatorAddress.getId());
54+
generator.writeStringField(ID, coordinatorAddress.getId());
5555
generator.writeStringField(
5656
LISTENERS, Endpoint.toListenersString(coordinatorAddress.getEndpoints()));
5757
generator.writeEndObject();
@@ -60,18 +60,13 @@ public void serialize(CoordinatorAddress coordinatorAddress, JsonGenerator gener
6060
@Override
6161
public CoordinatorAddress deserialize(JsonNode node) {
6262
int version = node.get(VERSION_KEY).asInt();
63-
int id;
63+
String id = node.get(ID).asText();
6464
List<Endpoint> endpoints;
6565
if (version == 1) {
66-
id = Integer.parseInt(node.get(ID).asText());
6766
String host = node.get(HOST).asText();
6867
int port = node.get(PORT).asInt();
6968
endpoints = Collections.singletonList(new Endpoint(host, port, "CLIENT"));
70-
} else if (version == 2) {
71-
id = Integer.parseInt(node.get(ID).asText());
72-
endpoints = Endpoint.fromListenersString(node.get(LISTENERS).asText());
7369
} else {
74-
id = node.get(ID).asInt();
7570
endpoints = Endpoint.fromListenersString(node.get(LISTENERS).asText());
7671
}
7772
return new CoordinatorAddress(id, endpoints);

fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessorTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,7 @@ static void baseBeforeAll() throws Exception {
174174
// register coordinator server
175175
zookeeperClient.registerCoordinatorLeader(
176176
new CoordinatorAddress(
177-
2, Endpoint.fromListenersString("CLIENT://localhost:10012")));
177+
"2", Endpoint.fromListenersString("CLIENT://localhost:10012")));
178178

179179
// register 3 tablet servers
180180
for (int i = 0; i < 3; i++) {

fluss-server/src/test/java/org/apache/fluss/server/zk/ZooKeeperClientTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ void testCoordinatorLeader() throws Exception {
113113
assertThat(zookeeperClient.getCoordinatorLeaderAddress()).isEmpty();
114114
CoordinatorAddress coordinatorAddress =
115115
new CoordinatorAddress(
116-
2, Endpoint.fromListenersString("CLIENT://localhost1:10012"));
116+
"2", Endpoint.fromListenersString("CLIENT://localhost1:10012"));
117117
// register leader address
118118
zookeeperClient.registerCoordinatorLeader(coordinatorAddress);
119119
// check get leader address

fluss-server/src/test/java/org/apache/fluss/server/zk/data/CoordinatorAddressJsonSerdeTest.java

Lines changed: 6 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ public class CoordinatorAddressJsonSerdeTest extends JsonSerdeTestBase<Coordinat
3939
protected CoordinatorAddress[] createObjects() {
4040
CoordinatorAddress coordinatorAddress =
4141
new CoordinatorAddress(
42-
1,
42+
"1",
4343
Arrays.asList(
4444
new Endpoint("localhost", 1001, "CLIENT"),
4545
new Endpoint("127.0.0.1", 9124, "FLUSS")));
@@ -49,38 +49,23 @@ protected CoordinatorAddress[] createObjects() {
4949
@Override
5050
protected String[] expectedJsons() {
5151
return new String[] {
52-
"{\"version\":3,\"id\":1,\"listeners\":\"CLIENT://localhost:1001,FLUSS://127.0.0.1:9124\"}"
52+
"{\"version\":2,\"id\":\"1\",\"listeners\":\"CLIENT://localhost:1001,FLUSS://127.0.0.1:9124\"}"
5353
};
5454
}
5555

5656
@Test
5757
void testCompatibility() throws IOException {
58-
// version 1
5958
JsonNode jsonInVersion1 =
6059
new ObjectMapper()
6160
.readTree(
6261
"{\"version\":1,\"id\":\"1\",\"host\":\"localhost\",\"port\":1001}"
6362
.getBytes(StandardCharsets.UTF_8));
6463

65-
CoordinatorAddress coordinatorAddress1 =
64+
CoordinatorAddress coordinatorAddress =
6665
CoordinatorAddressJsonSerde.INSTANCE.deserialize(jsonInVersion1);
67-
CoordinatorAddress expectedCoordinator1 =
68-
new CoordinatorAddress(1, Endpoint.fromListenersString("CLIENT://localhost:1001"));
69-
assertEquals(coordinatorAddress1, expectedCoordinator1);
70-
71-
// version 2
72-
JsonNode jsonInVersion2 =
73-
new ObjectMapper()
74-
.readTree(
75-
"{\"version\":2,\"id\":\"2\",\"listeners\":\"CLIENT://localhost:1001,FLUSS://127.0.0.1:9124\"}"
76-
.getBytes(StandardCharsets.UTF_8));
77-
CoordinatorAddress coordinatorAddress2 =
78-
CoordinatorAddressJsonSerde.INSTANCE.deserialize(jsonInVersion2);
79-
CoordinatorAddress expectedCoordinator2 =
66+
CoordinatorAddress expectedCoordinator =
8067
new CoordinatorAddress(
81-
2,
82-
Endpoint.fromListenersString(
83-
"CLIENT://localhost:1001,FLUSS://127.0.0.1:9124"));
84-
assertEquals(coordinatorAddress2, expectedCoordinator2);
68+
"1", Endpoint.fromListenersString("CLIENT://localhost:1001"));
69+
assertEquals(coordinatorAddress, expectedCoordinator);
8570
}
8671
}

0 commit comments

Comments
 (0)