Skip to content

Commit b566f9e

Browse files
authored
[hotfix] Fix unstable test ReplicaFetcherITCase#testFlushForPutKvNeedAck (#682)
1 parent b8d614a commit b566f9e

File tree

3 files changed

+64
-44
lines changed

3 files changed

+64
-44
lines changed

fluss-server/src/test/java/com/alibaba/fluss/server/replica/AdjustIsrITCase.java

+4-40
Original file line numberDiff line numberDiff line change
@@ -18,21 +18,14 @@
1818

1919
import com.alibaba.fluss.config.ConfigOptions;
2020
import com.alibaba.fluss.config.Configuration;
21-
import com.alibaba.fluss.metadata.PhysicalTablePath;
2221
import com.alibaba.fluss.metadata.TableBucket;
2322
import com.alibaba.fluss.metadata.TableDescriptor;
24-
import com.alibaba.fluss.metadata.TablePath;
2523
import com.alibaba.fluss.rpc.gateway.TabletServerGateway;
26-
import com.alibaba.fluss.rpc.messages.NotifyLeaderAndIsrRequest;
27-
import com.alibaba.fluss.rpc.messages.PbNotifyLeaderAndIsrReqForBucket;
2824
import com.alibaba.fluss.rpc.messages.PbProduceLogRespForBucket;
2925
import com.alibaba.fluss.rpc.messages.ProduceLogResponse;
30-
import com.alibaba.fluss.rpc.messages.StopReplicaRequest;
3126
import com.alibaba.fluss.rpc.protocol.Errors;
32-
import com.alibaba.fluss.server.entity.NotifyLeaderAndIsrData;
3327
import com.alibaba.fluss.server.testutils.FlussClusterExtension;
3428
import com.alibaba.fluss.server.testutils.RpcMessageTestUtils;
35-
import com.alibaba.fluss.server.utils.RpcMessageUtils;
3629
import com.alibaba.fluss.server.zk.ZooKeeperClient;
3730
import com.alibaba.fluss.server.zk.data.LeaderAndIsr;
3831

@@ -41,15 +34,12 @@
4134
import org.junit.jupiter.api.extension.RegisterExtension;
4235

4336
import java.time.Duration;
44-
import java.util.Collections;
4537
import java.util.List;
4638
import java.util.stream.Collectors;
4739

4840
import static com.alibaba.fluss.record.TestData.DATA1;
4941
import static com.alibaba.fluss.record.TestData.DATA1_SCHEMA;
5042
import static com.alibaba.fluss.record.TestData.DATA1_TABLE_PATH;
51-
import static com.alibaba.fluss.server.utils.RpcMessageUtils.makeNotifyBucketLeaderAndIsr;
52-
import static com.alibaba.fluss.server.utils.RpcMessageUtils.makeStopBucketReplica;
5343
import static com.alibaba.fluss.testutils.DataTestUtils.genMemoryLogRecordsByObject;
5444
import static com.alibaba.fluss.testutils.common.CommonTestUtils.retry;
5545
import static com.alibaba.fluss.testutils.common.CommonTestUtils.waitValue;
@@ -95,18 +85,8 @@ void testIsrShrinkAndExpand() throws Exception {
9585
FLUSS_CLUSTER_EXTENSION.waitAndGetFollowerReplica(tb, stopFollower);
9686
TabletServerGateway followerGateway =
9787
FLUSS_CLUSTER_EXTENSION.newTabletServerClientForNode(stopFollower);
98-
// send stop replica request to the follower
99-
followerGateway
100-
.stopReplica(
101-
new StopReplicaRequest()
102-
.setCoordinatorEpoch(currentLeaderAndIsr.coordinatorEpoch())
103-
.addAllStopReplicasReqs(
104-
Collections.singleton(
105-
makeStopBucketReplica(
106-
tb,
107-
false,
108-
currentLeaderAndIsr.leaderEpoch()))))
109-
.get();
88+
// stop follower replica for the bucket
89+
FLUSS_CLUSTER_EXTENSION.stopReplica(stopFollower, tb, leader);
11090

11191
isr.remove(stopFollower);
11292

@@ -159,8 +139,8 @@ void testIsrShrinkAndExpand() throws Exception {
159139
currentLeaderAndIsr.coordinatorEpoch(),
160140
currentLeaderAndIsr.bucketEpoch());
161141
isr.add(stopFollower);
162-
followerGateway.notifyLeaderAndIsr(
163-
makeNotifyLeaderAndIsrRequest(DATA1_TABLE_PATH, tb, newLeaderAndIsr, isr));
142+
FLUSS_CLUSTER_EXTENSION.notifyLeaderAndIsr(
143+
stopFollower, DATA1_TABLE_PATH, tb, newLeaderAndIsr, isr);
164144
// retry until the stop follower add back to ISR.
165145
retry(
166146
Duration.ofMinutes(1),
@@ -276,20 +256,4 @@ private static Configuration initConfig() {
276256
conf.setInt(ConfigOptions.LOG_REPLICA_MIN_IN_SYNC_REPLICAS_NUMBER, 2);
277257
return conf;
278258
}
279-
280-
private NotifyLeaderAndIsrRequest makeNotifyLeaderAndIsrRequest(
281-
TablePath tablePath,
282-
TableBucket tableBucket,
283-
LeaderAndIsr leaderAndIsr,
284-
List<Integer> replicas) {
285-
PbNotifyLeaderAndIsrReqForBucket reqForBucket =
286-
makeNotifyBucketLeaderAndIsr(
287-
new NotifyLeaderAndIsrData(
288-
PhysicalTablePath.of(tablePath),
289-
tableBucket,
290-
replicas,
291-
leaderAndIsr));
292-
return RpcMessageUtils.makeNotifyLeaderAndIsrRequest(
293-
0, Collections.singletonList(reqForBucket));
294-
}
295259
}

fluss-server/src/test/java/com/alibaba/fluss/server/replica/fetcher/ReplicaFetcherITCase.java

+17-4
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import org.junit.jupiter.api.extension.RegisterExtension;
4242

4343
import java.time.Duration;
44+
import java.util.Arrays;
4445
import java.util.Collections;
4546
import java.util.List;
4647
import java.util.Map;
@@ -257,14 +258,16 @@ void testFlushForPutKvNeedAck() throws Exception {
257258
// let's kill a non leader server
258259
int leader = FLUSS_CLUSTER_EXTENSION.waitAndGetLeader(tb);
259260

260-
int serverToKill =
261+
int followerToStop =
261262
FLUSS_CLUSTER_EXTENSION.getTabletServerNodes().stream()
262263
.filter(node -> node.id() != leader)
263264
.findFirst()
264265
.get()
265266
.id();
266267

267-
FLUSS_CLUSTER_EXTENSION.stopTabletServer(serverToKill);
268+
int leaderEpoch = 0;
269+
// stop the follower replica for the bucket
270+
FLUSS_CLUSTER_EXTENSION.stopReplica(followerToStop, tb, leaderEpoch);
268271

269272
// put kv record batch to the leader,
270273
// but as one server is killed, the put won't be ack
@@ -301,8 +304,18 @@ void testFlushForPutKvNeedAck() throws Exception {
301304
null);
302305
}
303306

304-
// start the server again, then the kv should be flushed finally
305-
FLUSS_CLUSTER_EXTENSION.startTabletServer(serverToKill);
307+
// start the follower replica by notify leaderAndIsr,
308+
// then the kv should be flushed finally
309+
LeaderAndIsr currentLeaderAndIsr = zkClient.getLeaderAndIsr(tb).get();
310+
LeaderAndIsr newLeaderAndIsr =
311+
new LeaderAndIsr(
312+
currentLeaderAndIsr.leader(),
313+
currentLeaderAndIsr.leaderEpoch() + 1,
314+
currentLeaderAndIsr.isr(),
315+
currentLeaderAndIsr.coordinatorEpoch(),
316+
currentLeaderAndIsr.bucketEpoch());
317+
FLUSS_CLUSTER_EXTENSION.notifyLeaderAndIsr(
318+
followerToStop, DATA1_TABLE_PATH, tb, newLeaderAndIsr, Arrays.asList(0, 1, 2));
306319

307320
// wait util the put future is done
308321
putResponse.get();

fluss-server/src/test/java/com/alibaba/fluss/server/testutils/FlussClusterExtension.java

+43
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import com.alibaba.fluss.config.Configuration;
2424
import com.alibaba.fluss.config.MemorySize;
2525
import com.alibaba.fluss.fs.local.LocalFileSystem;
26+
import com.alibaba.fluss.metadata.PhysicalTablePath;
2627
import com.alibaba.fluss.metadata.TableBucket;
2728
import com.alibaba.fluss.metadata.TablePath;
2829
import com.alibaba.fluss.metrics.registry.MetricRegistry;
@@ -33,15 +34,20 @@
3334
import com.alibaba.fluss.rpc.gateway.TabletServerGateway;
3435
import com.alibaba.fluss.rpc.messages.MetadataRequest;
3536
import com.alibaba.fluss.rpc.messages.MetadataResponse;
37+
import com.alibaba.fluss.rpc.messages.NotifyLeaderAndIsrRequest;
38+
import com.alibaba.fluss.rpc.messages.PbNotifyLeaderAndIsrReqForBucket;
39+
import com.alibaba.fluss.rpc.messages.StopReplicaRequest;
3640
import com.alibaba.fluss.rpc.metrics.ClientMetricGroup;
3741
import com.alibaba.fluss.server.coordinator.CoordinatorServer;
3842
import com.alibaba.fluss.server.coordinator.MetadataManager;
43+
import com.alibaba.fluss.server.entity.NotifyLeaderAndIsrData;
3944
import com.alibaba.fluss.server.kv.snapshot.CompletedSnapshot;
4045
import com.alibaba.fluss.server.kv.snapshot.CompletedSnapshotHandle;
4146
import com.alibaba.fluss.server.metadata.ServerInfo;
4247
import com.alibaba.fluss.server.replica.Replica;
4348
import com.alibaba.fluss.server.replica.ReplicaManager;
4449
import com.alibaba.fluss.server.tablet.TabletServer;
50+
import com.alibaba.fluss.server.utils.RpcMessageUtils;
4551
import com.alibaba.fluss.server.zk.NOPErrorHandler;
4652
import com.alibaba.fluss.server.zk.ZooKeeperClient;
4753
import com.alibaba.fluss.server.zk.ZooKeeperTestUtils;
@@ -76,6 +82,8 @@
7682
import java.util.Set;
7783
import java.util.stream.Collectors;
7884

85+
import static com.alibaba.fluss.server.utils.RpcMessageUtils.makeNotifyBucketLeaderAndIsr;
86+
import static com.alibaba.fluss.server.utils.RpcMessageUtils.makeStopBucketReplica;
7987
import static com.alibaba.fluss.server.utils.RpcMessageUtils.toServerNode;
8088
import static com.alibaba.fluss.server.zk.ZooKeeperTestUtils.createZooKeeperClient;
8189
import static com.alibaba.fluss.testutils.common.CommonTestUtils.retry;
@@ -585,6 +593,41 @@ public Replica waitAndGetFollowerReplica(TableBucket tableBucket, int replica) {
585593
"Fail to wait " + replica + " ready");
586594
}
587595

596+
public void stopReplica(int tabletServerId, TableBucket tableBucket, int leaderEpoch)
597+
throws Exception {
598+
TabletServerGateway followerGateway = newTabletServerClientForNode(tabletServerId);
599+
// send stop replica request to the follower
600+
followerGateway
601+
.stopReplica(
602+
new StopReplicaRequest()
603+
.setCoordinatorEpoch(0)
604+
.addAllStopReplicasReqs(
605+
Collections.singleton(
606+
makeStopBucketReplica(
607+
tableBucket, false, leaderEpoch))))
608+
.get();
609+
}
610+
611+
public void notifyLeaderAndIsr(
612+
int tabletServerId,
613+
TablePath tablePath,
614+
TableBucket tableBucket,
615+
LeaderAndIsr leaderAndIsr,
616+
List<Integer> replicas) {
617+
TabletServerGateway followerGateway = newTabletServerClientForNode(tabletServerId);
618+
PbNotifyLeaderAndIsrReqForBucket reqForBucket =
619+
makeNotifyBucketLeaderAndIsr(
620+
new NotifyLeaderAndIsrData(
621+
PhysicalTablePath.of(tablePath),
622+
tableBucket,
623+
replicas,
624+
leaderAndIsr));
625+
NotifyLeaderAndIsrRequest notifyLeaderAndIsrRequest =
626+
RpcMessageUtils.makeNotifyLeaderAndIsrRequest(
627+
0, Collections.singletonList(reqForBucket));
628+
followerGateway.notifyLeaderAndIsr(notifyLeaderAndIsrRequest);
629+
}
630+
588631
private Optional<Replica> getReplica(TableBucket tableBucket, int replica, boolean isLeader) {
589632
ReplicaManager replicaManager = getTabletServerById(replica).getReplicaManager();
590633
if (replicaManager.getReplica(tableBucket) instanceof ReplicaManager.OnlineReplica) {

0 commit comments

Comments
 (0)