From 830f0b9df4fd1620317a7fb0d3d5fa4328d336da Mon Sep 17 00:00:00 2001 From: Peng Junzhi <201250214@smail.nju.edu.cn> Date: Sat, 18 Apr 2026 01:50:04 +0800 Subject: [PATCH] Fix region migration reliability regressions --- .../procedure/env/RegionMaintainHandler.java | 37 +++++++-- ...egionMaintainHandlerConsensusPipeTest.java | 60 +++++++++++++- .../iotdb/consensus/iot/IoTConsensus.java | 2 +- .../iotdb/consensus/iot/StabilityTest.java | 13 +++ .../PipeRealtimeDataRegionHybridSource.java | 3 +- .../PipeRealtimeDataRegionLogSource.java | 1 + .../PipeRealtimeDataRegionSource.java | 34 ++++++++ .../PipeRealtimeDataRegionTsFileSource.java | 1 + .../assigner/PipeDataRegionAssigner.java | 16 ---- ...eRealtimeReplicateIndexAssignmentTest.java | 79 +++++++++++++++++++ 10 files changed, 218 insertions(+), 28 deletions(-) create mode 100644 iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeReplicateIndexAssignmentTest.java diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/RegionMaintainHandler.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/RegionMaintainHandler.java index 5043becc1fad5..47105dda1c582 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/RegionMaintainHandler.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/RegionMaintainHandler.java @@ -299,15 +299,22 @@ public TSStatus submitDeleteOldRegionPeerTask( TMaintainPeerReq maintainPeerReq = new TMaintainPeerReq(regionId, originalDataNode, procedureId); - // Always use full retries regardless of node status, because after a cluster crash the - // target DataNode may be Unknown but still in the process of restarting. + final NodeStatus nodeStatus = getDataNodeStatus(originalDataNode.getDataNodeId()); + final boolean useFullRetry = !NodeStatus.Unknown.equals(nodeStatus); + if (!useFullRetry) { + LOGGER.info( + "{}, DataNode {} is {}, submit DELETE_OLD_REGION_PEER with a single RPC attempt and let RemoveRegionPeerProcedure handle retries.", + REGION_MIGRATE_PROCESS, + simplifiedLocation(originalDataNode), + nodeStatus); + } + status = - (TSStatus) - SyncDataNodeClientPool.getInstance() - .sendSyncRequestToDataNodeWithRetry( - originalDataNode.getInternalEndPoint(), - maintainPeerReq, - CnToDnSyncRequestType.DELETE_OLD_REGION_PEER); + submitDataNodeSyncRequest( + originalDataNode.getInternalEndPoint(), + maintainPeerReq, + CnToDnSyncRequestType.DELETE_OLD_REGION_PEER, + useFullRetry); LOGGER.info( "{}, Send action deleteOldRegionPeer finished, regionId: {}, dataNodeId: {}", REGION_MIGRATE_PROCESS, @@ -316,6 +323,20 @@ public TSStatus submitDeleteOldRegionPeerTask( return status; } + protected NodeStatus getDataNodeStatus(int dataNodeId) { + return configManager.getLoadManager().getNodeStatus(dataNodeId); + } + + protected TSStatus submitDataNodeSyncRequest( + TEndPoint endPoint, Object request, CnToDnSyncRequestType requestType, boolean useFullRetry) { + return (TSStatus) + (useFullRetry + ? SyncDataNodeClientPool.getInstance() + .sendSyncRequestToDataNodeWithRetry(endPoint, request, requestType) + : SyncDataNodeClientPool.getInstance() + .sendSyncRequestToDataNodeWithGivenRetry(endPoint, request, requestType, 1)); + } + public Map resetPeerList( TConsensusGroupId regionId, List correctDataNodeLocations, diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/env/RegionMaintainHandlerConsensusPipeTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/env/RegionMaintainHandlerConsensusPipeTest.java index b384b9e0f0acd..ce51f52dfab9a 100644 --- a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/env/RegionMaintainHandlerConsensusPipeTest.java +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/env/RegionMaintainHandlerConsensusPipeTest.java @@ -24,10 +24,14 @@ import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; +import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.commons.cluster.NodeStatus; import org.apache.iotdb.commons.consensus.DataRegionId; import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStatus; import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor; +import org.apache.iotdb.confignode.client.sync.CnToDnSyncRequestType; import org.apache.iotdb.confignode.manager.ConfigManager; +import org.apache.iotdb.confignode.manager.load.LoadManager; import org.apache.iotdb.confignode.manager.ProcedureManager; import org.apache.iotdb.confignode.manager.partition.PartitionManager; import org.apache.iotdb.confignode.manager.pipe.coordinator.PipeManager; @@ -59,7 +63,8 @@ public class RegionMaintainHandlerConsensusPipeTest { private PipeManager pipeManager; private PipeTaskCoordinator pipeTaskCoordinator; private ProcedureManager procedureManager; - private RegionMaintainHandler handler; + private LoadManager loadManager; + private TestRegionMaintainHandler handler; private String originalConsensusProtocol; @@ -76,13 +81,15 @@ public void setUp() { pipeManager = mock(PipeManager.class); pipeTaskCoordinator = mock(PipeTaskCoordinator.class); procedureManager = mock(ProcedureManager.class); + loadManager = mock(LoadManager.class); when(configManager.getPartitionManager()).thenReturn(partitionManager); when(configManager.getPipeManager()).thenReturn(pipeManager); when(configManager.getProcedureManager()).thenReturn(procedureManager); + when(configManager.getLoadManager()).thenReturn(loadManager); when(pipeManager.getPipeTaskCoordinator()).thenReturn(pipeTaskCoordinator); - handler = new RegionMaintainHandler(configManager); + handler = new TestRegionMaintainHandler(configManager); } @After @@ -254,6 +261,36 @@ public void testMixedScenario() { verify(procedureManager, times(1)).startConsensusPipe(pipe2to1); } + @Test + public void testDeleteOldRegionPeerUsesSingleAttemptWhenNodeUnknown() { + TDataNodeLocation loc1 = makeLocation(1, "127.0.0.1", 40010); + when(loadManager.getNodeStatus(loc1.getDataNodeId())).thenReturn(NodeStatus.Unknown); + + handler.submitDeleteOldRegionPeerTask( + 1L, loc1, new TConsensusGroupId(TConsensusGroupType.DataRegion, 100)); + + verify(loadManager, times(1)).getNodeStatus(loc1.getDataNodeId()); + org.junit.Assert.assertFalse(handler.lastUseFullRetry); + org.junit.Assert.assertEquals( + CnToDnSyncRequestType.DELETE_OLD_REGION_PEER, handler.lastRequestType); + org.junit.Assert.assertEquals(loc1.getInternalEndPoint(), handler.lastEndPoint); + } + + @Test + public void testDeleteOldRegionPeerKeepsFullRetryWhenNodeRunning() { + TDataNodeLocation loc1 = makeLocation(1, "127.0.0.1", 40010); + when(loadManager.getNodeStatus(loc1.getDataNodeId())).thenReturn(NodeStatus.Running); + + handler.submitDeleteOldRegionPeerTask( + 1L, loc1, new TConsensusGroupId(TConsensusGroupType.DataRegion, 100)); + + verify(loadManager, times(1)).getNodeStatus(loc1.getDataNodeId()); + org.junit.Assert.assertTrue(handler.lastUseFullRetry); + org.junit.Assert.assertEquals( + CnToDnSyncRequestType.DELETE_OLD_REGION_PEER, handler.lastRequestType); + org.junit.Assert.assertEquals(loc1.getInternalEndPoint(), handler.lastEndPoint); + } + @Test public void testThreeNodeReplicaSetCreatesAllSixPipes() { TDataNodeLocation loc1 = makeLocation(1, "127.0.0.1", 40010); @@ -287,4 +324,23 @@ public void testEmptyReplicaSetsAndEmptyPipes() { verify(procedureManager, never()).dropConsensusPipeAsync(any()); verify(procedureManager, never()).startConsensusPipe(any()); } + + private static class TestRegionMaintainHandler extends RegionMaintainHandler { + private boolean lastUseFullRetry; + private TEndPoint lastEndPoint; + private CnToDnSyncRequestType lastRequestType; + + private TestRegionMaintainHandler(ConfigManager configManager) { + super(configManager); + } + + @Override + protected TSStatus submitDataNodeSyncRequest( + TEndPoint endPoint, Object request, CnToDnSyncRequestType requestType, boolean useFullRetry) { + lastEndPoint = endPoint; + lastRequestType = requestType; + lastUseFullRetry = useFullRetry; + return new TSStatus(200); + } + } } diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java index 959191ca2d6d3..d187f283d70d9 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java @@ -273,7 +273,7 @@ public void createLocalPeer(ConsensusGroupId groupId, List peers) String path = buildPeerDir(storageDir, groupId); File file = new File(path); - if (!file.mkdirs()) { + if (!file.exists() && !file.mkdirs()) { logger.warn("Unable to create consensus dir for group {} at {}", groupId, path); return null; } diff --git a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/StabilityTest.java b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/StabilityTest.java index 5147632431f22..1351b6743120e 100644 --- a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/StabilityTest.java +++ b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/StabilityTest.java @@ -186,6 +186,19 @@ public void peerTest() throws Exception { consensusImpl.deleteLocalPeer(dataRegionId); } + @Test + public void createLocalPeerShouldAllowExistingConsensusDir() throws Exception { + File existingPeerDir = new File(IoTConsensus.buildPeerDir(storageDir, dataRegionId)); + Assert.assertTrue(existingPeerDir.mkdirs()); + + consensusImpl.createLocalPeer( + dataRegionId, + Collections.singletonList(new Peer(dataRegionId, 1, new TEndPoint("0.0.0.0", basePort)))); + + Assert.assertEquals(1, consensusImpl.getReplicationNum(dataRegionId)); + consensusImpl.deleteLocalPeer(dataRegionId); + } + public void transferLeader() { try { consensusImpl.transferLeader( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionHybridSource.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionHybridSource.java index 97b6d54fde55f..2d0ee0dc6b21b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionHybridSource.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionHybridSource.java @@ -217,7 +217,7 @@ public Event supply() { PipeRealtimeEvent realtimeEvent = (PipeRealtimeEvent) pendingQueue.directPoll(); while (realtimeEvent != null) { - final Event suppliedEvent; + Event suppliedEvent; // Used to judge the type of the event, not directly for supplying. final Event eventToSupply = realtimeEvent.getEvent(); @@ -241,6 +241,7 @@ public Event supply() { PipeRealtimeDataRegionHybridSource.class.getName(), false); if (suppliedEvent != null) { + suppliedEvent = assignReplicateIndexIfNeeded(realtimeEvent, suppliedEvent); maySkipIndex4Event(realtimeEvent); return suppliedEvent; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionLogSource.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionLogSource.java index 579310b3f15f7..3d9c81bcb0cff 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionLogSource.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionLogSource.java @@ -123,6 +123,7 @@ public Event supply() { realtimeEvent.decreaseReferenceCount(PipeRealtimeDataRegionLogSource.class.getName(), false); if (suppliedEvent != null) { + suppliedEvent = assignReplicateIndexIfNeeded(realtimeEvent, suppliedEvent); return suppliedEvent; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java index 2ab2061ce7e73..0cf2271e58cae 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java @@ -32,12 +32,16 @@ import org.apache.iotdb.commons.pipe.event.ProgressReportEvent; import org.apache.iotdb.commons.utils.PathUtils; import org.apache.iotdb.commons.utils.TimePartitionUtils; +import org.apache.iotdb.consensus.pipe.IoTConsensusV2; +import org.apache.iotdb.db.consensus.DataRegionConsensusImpl; import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent; +import org.apache.iotdb.db.pipe.consensus.ReplicateProgressDataNodeManager; import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent; import org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent; import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent; import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEvent; import org.apache.iotdb.db.pipe.metric.source.PipeDataRegionEventCounter; +import org.apache.iotdb.db.pipe.processor.iotconsensusv2.IoTConsensusV2Processor; import org.apache.iotdb.db.pipe.source.dataregion.DataRegionListeningFilter; import org.apache.iotdb.db.pipe.source.dataregion.realtime.assigner.PipeTsFileEpochProgressIndexKeeper; import org.apache.iotdb.db.pipe.source.dataregion.realtime.listener.PipeInsertionDataNodeListener; @@ -460,6 +464,36 @@ protected void maySkipIndex4Event(final PipeRealtimeEvent event) { } } + protected Event assignReplicateIndexIfNeeded( + final PipeRealtimeEvent realtimeEvent, final Event suppliedEvent) { + if (!(suppliedEvent instanceof EnrichedEvent) || !shouldAssignReplicateIndex(suppliedEvent)) { + return suppliedEvent; + } + + final EnrichedEvent enrichedEvent = (EnrichedEvent) suppliedEvent; + if (enrichedEvent.getReplicateIndexForIoTV2() != EnrichedEvent.NO_COMMIT_ID) { + return suppliedEvent; + } + + enrichedEvent.setReplicateIndexForIoTV2(assignReplicateIndexForRealtimeEvent()); + LOGGER.debug( + "[{}]Set {} for realtime event {}", + pipeName, + enrichedEvent.getReplicateIndexForIoTV2(), + realtimeEvent.coreReportMessage()); + return suppliedEvent; + } + + protected boolean shouldAssignReplicateIndex(final Event suppliedEvent) { + return !(suppliedEvent instanceof ProgressReportEvent) + && DataRegionConsensusImpl.getInstance() instanceof IoTConsensusV2 + && IoTConsensusV2Processor.isShouldReplicate((EnrichedEvent) suppliedEvent); + } + + protected long assignReplicateIndexForRealtimeEvent() { + return ReplicateProgressDataNodeManager.assignReplicateIndexForIoTV2(pipeName); + } + protected Event supplyHeartbeat(final PipeRealtimeEvent event) { if (event.increaseReferenceCount(PipeRealtimeDataRegionSource.class.getName())) { return event.getEvent(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionTsFileSource.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionTsFileSource.java index 97c3138de7c7d..d70d93db548c8 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionTsFileSource.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionTsFileSource.java @@ -114,6 +114,7 @@ public Event supply() { PipeRealtimeDataRegionTsFileSource.class.getName(), false); if (suppliedEvent != null) { + suppliedEvent = assignReplicateIndexIfNeeded(realtimeEvent, suppliedEvent); maySkipIndex4Event(realtimeEvent); return suppliedEvent; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java index bdeebde8938e4..4e5514078de2a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java @@ -24,9 +24,6 @@ import org.apache.iotdb.commons.pipe.event.ProgressReportEvent; import org.apache.iotdb.commons.pipe.metric.PipeEventCounter; import org.apache.iotdb.commons.utils.PathUtils; -import org.apache.iotdb.consensus.pipe.IoTConsensusV2; -import org.apache.iotdb.db.consensus.DataRegionConsensusImpl; -import org.apache.iotdb.db.pipe.consensus.ReplicateProgressDataNodeManager; import org.apache.iotdb.db.pipe.consensus.deletion.DeletionResource; import org.apache.iotdb.db.pipe.consensus.deletion.DeletionResourceManager; import org.apache.iotdb.db.pipe.event.common.deletion.PipeDeleteDataNodeEvent; @@ -36,7 +33,6 @@ import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEventFactory; import org.apache.iotdb.db.pipe.metric.source.PipeAssignerMetrics; import org.apache.iotdb.db.pipe.metric.source.PipeDataRegionEventCounter; -import org.apache.iotdb.db.pipe.processor.iotconsensusv2.IoTConsensusV2Processor; import org.apache.iotdb.db.pipe.source.dataregion.realtime.PipeRealtimeDataRegionSource; import org.apache.iotdb.db.pipe.source.dataregion.realtime.matcher.CachedSchemaPatternMatcher; import org.apache.iotdb.db.pipe.source.dataregion.realtime.matcher.PipeDataRegionMatcher; @@ -174,18 +170,6 @@ private void assignToSource( source.getRealtimeDataExtractionStartTime(), source.getRealtimeDataExtractionEndTime()); final EnrichedEvent innerEvent = copiedEvent.getEvent(); - // if using IoTV2, assign a replicateIndex for this realtime event - if (DataRegionConsensusImpl.getInstance() instanceof IoTConsensusV2 - && IoTConsensusV2Processor.isShouldReplicate(innerEvent)) { - innerEvent.setReplicateIndexForIoTV2( - ReplicateProgressDataNodeManager.assignReplicateIndexForIoTV2( - source.getPipeName())); - LOGGER.debug( - "[{}]Set {} for realtime event {}", - source.getPipeName(), - innerEvent.getReplicateIndexForIoTV2(), - innerEvent); - } if (innerEvent instanceof PipeTsFileInsertionEvent) { final PipeTsFileInsertionEvent tsFileInsertionEvent = diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeReplicateIndexAssignmentTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeReplicateIndexAssignmentTest.java new file mode 100644 index 0000000000000..b58886afb3b9b --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeReplicateIndexAssignmentTest.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.source.dataregion.realtime; + +import org.apache.iotdb.commons.pipe.event.EnrichedEvent; +import org.apache.iotdb.db.pipe.event.common.deletion.PipeDeleteDataNodeEvent; +import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEvent; +import org.apache.iotdb.db.pipe.source.dataregion.realtime.epoch.TsFileEpoch; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; +import org.apache.iotdb.pipe.api.event.Event; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.concurrent.atomic.AtomicLong; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class PipeRealtimeReplicateIndexAssignmentTest { + + @Test + public void assignReplicateIndexShouldBeLazyAndIdempotent() { + final TestPipeRealtimeDataRegionSource source = new TestPipeRealtimeDataRegionSource(); + final PipeDeleteDataNodeEvent event = new PipeDeleteDataNodeEvent(); + + Assert.assertEquals(EnrichedEvent.NO_COMMIT_ID, event.getReplicateIndexForIoTV2()); + + final Event suppliedEvent = source.assign(event); + Assert.assertSame(event, suppliedEvent); + Assert.assertEquals(1L, event.getReplicateIndexForIoTV2()); + Assert.assertEquals(1L, source.assignedCount.get()); + + source.assign(event); + Assert.assertEquals(1L, event.getReplicateIndexForIoTV2()); + Assert.assertEquals(1L, source.assignedCount.get()); + } + + private static class TestPipeRealtimeDataRegionSource extends PipeRealtimeDataRegionLogSource { + private final AtomicLong nextReplicateIndex = new AtomicLong(1); + private final AtomicLong assignedCount = new AtomicLong(0); + + private Event assign(final PipeDeleteDataNodeEvent event) { + final TsFileResource resource = mock(TsFileResource.class); + when(resource.getTsFilePath()).thenReturn("target/test.tsfile"); + final PipeRealtimeEvent realtimeEvent = + new PipeRealtimeEvent(event, new TsFileEpoch(resource), null); + return assignReplicateIndexIfNeeded(realtimeEvent, event); + } + + @Override + protected boolean shouldAssignReplicateIndex(final Event suppliedEvent) { + return true; + } + + @Override + protected long assignReplicateIndexForRealtimeEvent() { + assignedCount.incrementAndGet(); + return nextReplicateIndex.getAndIncrement(); + } + } +}