diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java index 60ff77aa03c52..07b7364d0042d 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java @@ -56,7 +56,10 @@ import org.apache.iotdb.session.Session; import org.apache.iotdb.session.pool.SessionPool; +import org.apache.thrift.TConfiguration; import org.apache.thrift.TException; +import org.apache.thrift.transport.TSocket; +import org.apache.thrift.transport.TTransportException; import org.slf4j.Logger; import java.io.File; @@ -1119,7 +1122,7 @@ public void shutdownForciblyAllDataNodes() { @Override public void ensureNodeStatus( - final List nodes, final List targetStatus) + final List nodes, final List targetStatusList) throws IllegalStateException { Throwable lastException = null; for (int i = 0; i < retryCount; i++) { @@ -1147,7 +1150,9 @@ public void ensureNodeStatus( + node.getClientRpcEndPoint().getPort(), node.getDataNodeId())); for (int j = 0; j < nodes.size(); j++) { - final String endpoint = nodes.get(j).getIpAndPortString(); + BaseNodeWrapper nodeWrapper = nodes.get(j); + String ipAndPortString = nodeWrapper.getIpAndPortString(); + final String endpoint = ipAndPortString; if (!nodeIds.containsKey(endpoint)) { // Node not exist // Notice: Never modify this line, since the NodeLocation might be modified in IT @@ -1155,12 +1160,27 @@ public void ensureNodeStatus( continue; } final String status = showClusterResp.getNodeStatus().get(nodeIds.get(endpoint)); - if (!targetStatus.get(j).getStatus().equals(status)) { + final NodeStatus targetStatus = targetStatusList.get(j); + if (!targetStatus.getStatus().equals(status)) { // Error status errorMessages.add( String.format( "Node %s is in status %s, but expected %s", - endpoint, status, targetStatus.get(j))); + endpoint, status, targetStatusList.get(j))); + continue; + } + if (nodeWrapper instanceof DataNodeWrapper && targetStatus.equals(NodeStatus.Running)) { + final String[] ipPort = nodeWrapper.getIpAndPortString().split(":"); + final String ip = ipPort[0]; + final int port = Integer.parseInt(ipPort[1]); + try (TSocket socket = new TSocket(new TConfiguration(), ip, port, 1000)) { + socket.open(); + } catch (final TTransportException e) { + errorMessages.add( + String.format( + "DataNode %s is not reachable: %s", + nodeWrapper.getIpAndPortString(), e.getMessage())); + } } } if (errorMessages.isEmpty()) { diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java index 2e0ac35d5df98..81cad2f3befdf 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java @@ -930,7 +930,7 @@ public List setConfiguration(TSetConfigurationReq req) { if (!targetDataNodes.isEmpty()) { DataNodeAsyncRequestContext clientHandler = new DataNodeAsyncRequestContext<>( - CnToDnAsyncRequestType.SET_CONFIGURATION, req, dataNodeLocationMap); + CnToDnAsyncRequestType.SET_CONFIGURATION, req, targetDataNodes); CnToDnInternalServiceAsyncRequestManager.getInstance() .sendAsyncRequestWithRetry(clientHandler); responseList.addAll(clientHandler.getResponseList()); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduleTaskManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduleTaskManager.java index 8348b87813768..516b1489a206d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduleTaskManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduleTaskManager.java @@ -65,6 +65,7 @@ public class CompactionScheduleTaskManager implements IService { ConcurrentHashMap.newKeySet(); private ReentrantLock lock = new ReentrantLock(); private volatile boolean init = false; + private volatile boolean isStoppingAllScheduleTask = false; @Override public void start() throws StartupException { @@ -76,8 +77,13 @@ public void start() throws StartupException { logger.info("Compaction schedule task manager started."); } + public boolean isStoppingAllScheduleTask() { + return isStoppingAllScheduleTask; + } + public void stopCompactionScheduleTasks() throws InterruptedException { lock.lock(); + isStoppingAllScheduleTask = true; try { for (Future task : submitCompactionScheduleTaskFutures) { task.cancel(true); @@ -121,6 +127,7 @@ public void checkAndMayApplyConfigurationChange() throws InterruptedException { public void startScheduleTasks() { lock.lock(); + isStoppingAllScheduleTask = false; try { // compaction selector for (int workerId = 0; workerId < compactionSelectorNum; workerId++) { @@ -144,6 +151,7 @@ public void startScheduleTasks() { @Override public void stop() { lock.lock(); + isStoppingAllScheduleTask = true; try { if (!init) { return; @@ -160,6 +168,7 @@ public void stop() { @Override public void waitAndStop(long milliseconds) { lock.lock(); + isStoppingAllScheduleTask = true; try { if (!init) { return; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduleTaskWorker.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduleTaskWorker.java index 17ad0dd4334e6..714d232d62538 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduleTaskWorker.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduleTaskWorker.java @@ -72,9 +72,26 @@ public Void call() { dataRegion.executeCompaction(); } } catch (InterruptedException ignored) { + boolean isStoppedByUser = + CompactionScheduleTaskManager.getInstance().isStoppingAllScheduleTask(); logger.info( - "[CompactionScheduleTaskWorker-{}] compaction schedule is interrupted", workerId); - return null; + "[CompactionScheduleTaskWorker-{}] compaction schedule is interrupted, isStopByUser: {}", + workerId, + isStoppedByUser); + if (isStoppedByUser) { + return null; + } + } catch (Exception e) { + logger.error( + "[CompactionScheduleTaskWorker-{}] Failed to execute compaction schedule task", + workerId, + e); + } catch (Throwable t) { + logger.error( + "[CompactionScheduleTaskWorker-{}] Failed to execute compaction schedule task and cannot recover", + workerId, + t); + throw t; } } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/TTLScheduleTask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/TTLScheduleTask.java index 393a9f6d2dc68..c8cba2b52927b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/TTLScheduleTask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/TTLScheduleTask.java @@ -66,8 +66,21 @@ public Void call() throws Exception { } } } catch (InterruptedException ignored) { - logger.info("[TTLCheckTask-{}] TTL checker is interrupted", workerId); - return null; + boolean isStoppedByUser = + CompactionScheduleTaskManager.getInstance().isStoppingAllScheduleTask(); + logger.info( + "[TTLCheckTask-{}] TTL checker is interrupted, isStoppedByUser: {}", + workerId, + isStoppedByUser); + if (isStoppedByUser) { + return null; + } + } catch (Exception e) { + logger.error("[TTLCheckTask-{}] Failed to execute ttl check", workerId, e); + } catch (Throwable t) { + logger.error( + "[TTLCheckTask-{}] Failed to execute ttl check and cannot recover", workerId, t); + throw t; } } }