diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java index 3bba8ff8da5e..ecdb8e4ba45e 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java @@ -21,7 +21,6 @@ import org.apache.iotdb.common.rpc.thrift.TAINodeConfiguration; import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation; -import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId; import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration; import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; import org.apache.iotdb.common.rpc.thrift.TFlushReq; @@ -362,11 +361,8 @@ public TDataNodeRestartResp updateDataNodeIfNecessary(TDataNodeRestartReq req) { resp.setStatus(ClusterNodeStartUtils.ACCEPT_NODE_RESTART); resp.setRuntimeConfiguration(getRuntimeConfiguration()); - List consensusGroupIds = - getPartitionManager().getAllReplicaSets(nodeId).stream() - .map(TRegionReplicaSet::getRegionId) - .collect(Collectors.toList()); - resp.setConsensusGroupIds(consensusGroupIds); + + resp.setCorrectConsensusGroups(getPartitionManager().getAllReplicaSets(nodeId)); return resp; } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java index 70d23a47cc7e..372209bfd741 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java @@ -78,6 +78,8 @@ import java.util.stream.StreamSupport; import static org.apache.iotdb.commons.pipe.agent.plugin.builtin.BuiltinPipePlugin.IOTDB_THRIFT_CONNECTOR; +import static org.apache.iotdb.commons.pipe.config.constant.PipeRPCMessageConstant.PIPE_ALREADY_EXIST_MSG; +import static org.apache.iotdb.commons.pipe.config.constant.PipeRPCMessageConstant.PIPE_NOT_EXIST_MSG; public class PipeTaskInfo implements SnapshotProcessor { @@ -179,8 +181,8 @@ private boolean checkBeforeCreatePipeInternal(final TCreatePipeReq createPipeReq final String exceptionMessage = String.format( - "Failed to create pipe %s, the pipe with the same name has been created", - createPipeRequest.getPipeName()); + "Failed to create pipe %s, %s", + createPipeRequest.getPipeName(), PIPE_ALREADY_EXIST_MSG); LOGGER.warn(exceptionMessage); throw new PipeException(exceptionMessage); } @@ -204,7 +206,7 @@ private boolean checkAndUpdateRequestBeforeAlterPipeInternal(final TAlterPipeReq final String exceptionMessage = String.format( - "Failed to alter pipe %s, the pipe does not exist", alterPipeRequest.getPipeName()); + "Failed to alter pipe %s, %s", alterPipeRequest.getPipeName(), PIPE_NOT_EXIST_MSG); LOGGER.warn(exceptionMessage); throw new PipeException(exceptionMessage); } @@ -281,7 +283,7 @@ public void checkBeforeStartPipe(final String pipeName) throws PipeException { private void checkBeforeStartPipeInternal(final String pipeName) throws PipeException { if (!isPipeExisted(pipeName)) { final String exceptionMessage = - String.format("Failed to start pipe %s, the pipe does not exist", pipeName); + String.format("Failed to start pipe %s, %s", pipeName, PIPE_NOT_EXIST_MSG); LOGGER.warn(exceptionMessage); throw new PipeException(exceptionMessage); } @@ -307,7 +309,7 @@ public void checkBeforeStopPipe(final String pipeName) throws PipeException { private void checkBeforeStopPipeInternal(final String pipeName) throws PipeException { if (!isPipeExisted(pipeName)) { final String exceptionMessage = - String.format("Failed to stop pipe %s, the pipe does not exist", pipeName); + String.format("Failed to stop pipe %s, %s", pipeName, PIPE_NOT_EXIST_MSG); LOGGER.warn(exceptionMessage); throw new PipeException(exceptionMessage); } diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/IConsensus.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/IConsensus.java index 0e3af42d8021..8f49af524bc0 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/IConsensus.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/IConsensus.java @@ -37,6 +37,7 @@ import java.io.IOException; import java.util.List; +import java.util.Map; /** Consensus module base interface. */ @ThreadSafe @@ -145,6 +146,15 @@ public interface IConsensus { */ void removeRemotePeer(ConsensusGroupId groupId, Peer peer) throws ConsensusException; + /** + * Record the correct peer list (likely got from the ConfigNode) for future use in resetPeerList. + * Only use this method if necessary. If it is called, it should be called before {@link + * #start()}. + * + * @param correctPeerList The correct consensus group member list + */ + void recordCorrectPeerListBeforeStarting(Map> correctPeerList); + /** * Reset the peer list of the corresponding consensus group. Currently only used in the automatic * cleanup of region migration as a rollback for {@link #addRemotePeer(ConsensusGroupId, Peer)}, @@ -226,17 +236,6 @@ public interface IConsensus { */ List getAllConsensusGroupIds(); - /** - * Return all consensus group ids from disk. - * - *

We need to parse all the RegionGroupIds from the disk directory before starting the - * consensus layer, and {@link #getAllConsensusGroupIds()} returns an empty list, so we need to - * add a new interface. - * - * @return consensusGroupId list - */ - List getAllConsensusGroupIdsWithoutStarting(); - /** * Return the region directory of the corresponding consensus group. * diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java index 7f1222e5c678..987ba6585382 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java @@ -80,6 +80,8 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.BiConsumer; +import java.util.stream.Collectors; public class IoTConsensus implements IConsensus { @@ -99,6 +101,7 @@ public class IoTConsensus implements IConsensus { private final IClientManager syncClientManager; private final ScheduledExecutorService backgroundTaskService; private Future updateReaderFuture; + private Map> correctPeerListBeforeStart = null; public IoTConsensus(ConsensusConfig config, Registry registry) { this.thisNode = config.getThisNodeEndPoint(); @@ -178,10 +181,32 @@ private void initAndRecover() throws IOException { syncClientManager, config); stateMachineMap.put(consensusGroupId, consensus); - consensus.start(); } } } + if (correctPeerListBeforeStart != null) { + BiConsumer> resetPeerListWithoutThrow = + (consensusGroupId, peers) -> { + try { + resetPeerList(consensusGroupId, peers); + } catch (ConsensusGroupNotExistException ignore) { + + } catch (Exception e) { + logger.warn("Failed to reset peer list while start", e); + } + }; + // make peers which are in list correct + correctPeerListBeforeStart.forEach(resetPeerListWithoutThrow); + // clear peers which are not in the list + stateMachineMap.keySet().stream() + .filter(consensusGroupId -> !correctPeerListBeforeStart.containsKey(consensusGroupId)) + // copy to a new list to avoid concurrent modification + .collect(Collectors.toList()) + .forEach( + consensusGroupId -> + resetPeerListWithoutThrow.accept(consensusGroupId, Collections.emptyList())); + } + stateMachineMap.values().forEach(IoTConsensusServerImpl::start); } @Override @@ -435,36 +460,6 @@ public List getAllConsensusGroupIds() { return new ArrayList<>(stateMachineMap.keySet()); } - @Override - public List getAllConsensusGroupIdsWithoutStarting() { - return getConsensusGroupIdsFromDir(storageDir, logger); - } - - public static List getConsensusGroupIdsFromDir(File storageDir, Logger logger) { - if (!storageDir.exists()) { - return Collections.emptyList(); - } - List consensusGroupIds = new ArrayList<>(); - try (DirectoryStream stream = Files.newDirectoryStream(storageDir.toPath())) { - for (Path path : stream) { - try { - String[] items = path.getFileName().toString().split("_"); - ConsensusGroupId consensusGroupId = - ConsensusGroupId.Factory.create( - Integer.parseInt(items[0]), Integer.parseInt(items[1])); - consensusGroupIds.add(consensusGroupId); - } catch (Exception e) { - logger.info( - "The directory {} is not a group directory;" + " ignoring it. ", - path.getFileName().toString()); - } - } - } catch (IOException e) { - logger.error("Failed to get all consensus group ids from disk", e); - } - return consensusGroupIds; - } - @Override public String getRegionDirFromConsensusGroupId(ConsensusGroupId groupId) { return buildPeerDir(storageDir, groupId); @@ -483,10 +478,16 @@ public void reloadConsensusConfig(ConsensusConfig consensusConfig) { .init(config.getReplication().getRegionMigrationSpeedLimitBytesPerSecond()); } + @Override + public void recordCorrectPeerListBeforeStarting( + Map> correctPeerList) { + logger.info("Record correct peer list: {}", correctPeerList); + this.correctPeerListBeforeStart = correctPeerList; + } + @Override public void resetPeerList(ConsensusGroupId groupId, List correctPeers) throws ConsensusException { - logger.info("[RESET PEER LIST] Start to reset peer list to {}", correctPeers); IoTConsensusServerImpl impl = Optional.ofNullable(stateMachineMap.get(groupId)) .orElseThrow(() -> new ConsensusGroupNotExistException(groupId)); @@ -501,27 +502,37 @@ public void resetPeerList(ConsensusGroupId groupId, List correctPeers) } synchronized (impl) { + // remove invalid peer ImmutableList currentMembers = ImmutableList.copyOf(impl.getConfiguration()); String previousPeerListStr = currentMembers.toString(); for (Peer peer : currentMembers) { if (!correctPeers.contains(peer)) { if (!impl.removeSyncLogChannel(peer)) { - logger.error( - "[RESET PEER LIST] Failed to remove peer {}'s sync log channel from group {}", - peer, - groupId); + logger.error("[RESET PEER LIST] Failed to remove sync channel with: {}", peer); + } else { + logger.info("[RESET PEER LIST] Remove sync channel with: {}", peer); } } } - logger.info( - "[RESET PEER LIST] Local peer list has been reset: {} -> {}", - previousPeerListStr, - impl.getConfiguration()); + // add correct peer for (Peer peer : correctPeers) { if (!impl.getConfiguration().contains(peer)) { - logger.warn("[RESET PEER LIST] \"Correct peer\" {} is not in local peer list", peer); + impl.buildSyncLogChannel(peer); + logger.info("[RESET PEER LIST] Build sync channel with: {}", peer); } } + // show result + String newPeerListStr = impl.getConfiguration().toString(); + if (!previousPeerListStr.equals(newPeerListStr)) { + logger.info( + "[RESET PEER LIST] Local peer list has been reset: {} -> {}", + previousPeerListStr, + newPeerListStr); + } else { + logger.info( + "[RESET PEER LIST] The current peer list is correct, nothing need to be reset: {}", + previousPeerListStr); + } } } diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java index 8ea522aea27e..1a32248853a0 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java @@ -645,17 +645,12 @@ private boolean isSuccess(TSStatus status) { return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode(); } - /** - * build SyncLog channel with safeIndex as the default initial sync index. - * - * @throws ConsensusGroupModifyPeerException - */ - public void buildSyncLogChannel(Peer targetPeer) throws ConsensusGroupModifyPeerException { + /** build SyncLog channel with safeIndex as the default initial sync index. */ + public void buildSyncLogChannel(Peer targetPeer) { buildSyncLogChannel(targetPeer, getMinSyncIndex()); } - public void buildSyncLogChannel(Peer targetPeer, long initialSyncIndex) - throws ConsensusGroupModifyPeerException { + public void buildSyncLogChannel(Peer targetPeer, long initialSyncIndex) { KillPoint.setKillPoint(DataNodeKillPoints.ORIGINAL_ADD_PEER_DONE); // step 1, build sync channel in LogDispatcher logger.info( diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java index a76b0e97b066..dc88d8eb98b7 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java @@ -185,13 +185,8 @@ public TBuildSyncLogChannelRes buildSyncLogChannel(TBuildSyncLogChannelReq req) return new TBuildSyncLogChannelRes(status); } TSStatus responseStatus; - try { - impl.buildSyncLogChannel(new Peer(groupId, req.nodeId, req.endPoint)); - responseStatus = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()); - } catch (ConsensusGroupModifyPeerException e) { - responseStatus = new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode()); - responseStatus.setMessage(e.getMessage()); - } + impl.buildSyncLogChannel(new Peer(groupId, req.nodeId, req.endPoint)); + responseStatus = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()); return new TBuildSyncLogChannelRes(responseStatus); } diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensus.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensus.java index ce5acf73cc3a..5328a25f9dcf 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensus.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensus.java @@ -69,6 +69,7 @@ import java.nio.file.Files; import java.nio.file.Path; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Optional; @@ -77,10 +78,9 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.BiConsumer; import java.util.stream.Collectors; -import static org.apache.iotdb.consensus.iot.IoTConsensus.getConsensusGroupIdsFromDir; - public class PipeConsensus implements IConsensus { private static final String CONSENSUS_PIPE_GUARDIAN_TASK_ID = "consensus_pipe_guardian"; private static final String CLASS_NAME = PipeConsensus.class.getSimpleName(); @@ -102,6 +102,7 @@ public class PipeConsensus implements IConsensus { private final ConsensusPipeGuardian consensusPipeGuardian; private final IClientManager asyncClientManager; private final IClientManager syncClientManager; + private Map> correctPeerListBeforeStart = null; public PipeConsensus(ConsensusConfig config, IStateMachine.Registry registry) { this.thisNode = config.getThisNodeEndPoint(); @@ -146,33 +147,64 @@ private void initAndRecover() throws IOException { } } else { // asynchronously recover, retry logic is implemented at PipeConsensusImpl - CompletableFuture.runAsync( - () -> { - try (DirectoryStream stream = Files.newDirectoryStream(storageDir.toPath())) { - for (Path path : stream) { - ConsensusGroupId consensusGroupId = - parsePeerFileName(path.getFileName().toString()); - PipeConsensusServerImpl consensus = - new PipeConsensusServerImpl( - new Peer(consensusGroupId, thisNodeId, thisNode), - registry.apply(consensusGroupId), - path.toString(), - new ArrayList<>(), - config, - consensusPipeManager, - syncClientManager); - stateMachineMap.put(consensusGroupId, consensus); - consensus.start(true); - } - } catch (Exception e) { - LOGGER.error("Failed to recover consensus from {}", storageDir, e); - } - }) - .exceptionally( - e -> { - LOGGER.error("Failed to recover consensus from {}", storageDir, e); - return null; - }); + CompletableFuture future = + CompletableFuture.runAsync( + () -> { + try (DirectoryStream stream = + Files.newDirectoryStream(storageDir.toPath())) { + for (Path path : stream) { + ConsensusGroupId consensusGroupId = + parsePeerFileName(path.getFileName().toString()); + PipeConsensusServerImpl consensus = + new PipeConsensusServerImpl( + new Peer(consensusGroupId, thisNodeId, thisNode), + registry.apply(consensusGroupId), + path.toString(), + new ArrayList<>(), + config, + consensusPipeManager, + syncClientManager); + stateMachineMap.put(consensusGroupId, consensus); + checkPeerListAndStartIfEligible(consensusGroupId, consensus); + } + } catch (Exception e) { + LOGGER.error("Failed to recover consensus from {}", storageDir, e); + } + }) + .exceptionally( + e -> { + LOGGER.error("Failed to recover consensus from {}", storageDir, e); + return null; + }); + } + } + + private void checkPeerListAndStartIfEligible( + ConsensusGroupId consensusGroupId, PipeConsensusServerImpl consensus) throws IOException { + BiConsumer> resetPeerListWithoutThrow = + (dataRegionId, peers) -> { + try { + resetPeerList(dataRegionId, peers); + } catch (ConsensusGroupNotExistException ignore) { + + } catch (Exception e) { + LOGGER.warn("Failed to reset peer list while start", e); + } + }; + + if (correctPeerListBeforeStart != null) { + if (correctPeerListBeforeStart.containsKey(consensusGroupId)) { + // make peers which are in list correct + resetPeerListWithoutThrow.accept( + consensusGroupId, correctPeerListBeforeStart.get(consensusGroupId)); + consensus.start(true); + } else { + // clear peers which are not in the list + resetPeerListWithoutThrow.accept(consensusGroupId, Collections.emptyList()); + } + + } else { + consensus.start(true); } } @@ -412,13 +444,20 @@ public void removeRemotePeer(ConsensusGroupId groupId, Peer peer) throws Consens KillPoint.setKillPoint(IoTConsensusRemovePeerCoordinatorKillPoints.FINISH); } + @Override + public void recordCorrectPeerListBeforeStarting( + Map> correctPeerList) { + LOGGER.info("Record correct peer list: {}", correctPeerList); + this.correctPeerListBeforeStart = correctPeerList; + } + @Override public void resetPeerList(ConsensusGroupId groupId, List correctPeers) throws ConsensusException { - LOGGER.info("[RESET PEER LIST] Start to reset peer list to {}", correctPeers); PipeConsensusServerImpl impl = Optional.ofNullable(stateMachineMap.get(groupId)) .orElseThrow(() -> new ConsensusGroupNotExistException(groupId)); + if (!correctPeers.contains(new Peer(groupId, thisNodeId, thisNode))) { LOGGER.warn( "[RESET PEER LIST] Local peer is not in the correct configuration, delete local peer {}", @@ -426,30 +465,43 @@ public void resetPeerList(ConsensusGroupId groupId, List correctPeers) deleteLocalPeer(groupId); return; } + ImmutableList currentPeers = ImmutableList.copyOf(impl.getPeers()); String previousPeerListStr = impl.getPeers().toString(); + // remove invalid peer for (Peer peer : currentPeers) { if (!correctPeers.contains(peer)) { try { impl.dropConsensusPipeToTargetPeer(peer); + LOGGER.info("[RESET PEER LIST] Remove sync channel with: {}", peer); } catch (ConsensusGroupModifyPeerException e) { - LOGGER.error( - "[RESET PEER LIST] Failed to remove peer {}'s consensus pipe from group {}", - peer, - groupId, - e); + LOGGER.error("[RESET PEER LIST] Failed to remove sync channel with: {}", peer, e); } } } - LOGGER.info( - "[RESET PEER LIST] Local peer list has been reset: {} -> {}", - previousPeerListStr, - impl.getPeers()); + // add correct peer for (Peer peer : correctPeers) { - if (!impl.containsPeer(peer)) { - LOGGER.warn("[RESET PEER LIST] \"Correct peer\" {} is not in local peer list", peer); + if (!impl.containsPeer(peer) && peer.getNodeId() != this.thisNodeId) { + try { + impl.createConsensusPipeToTargetPeer(peer, false); + LOGGER.info("[RESET PEER LIST] Build sync channel with: {}", peer); + } catch (ConsensusGroupModifyPeerException e) { + LOGGER.warn("[RESET PEER LIST] Failed to build sync channel with: {}", peer, e); + } } } + // show result + String currentPeerListStr = impl.getPeers().toString(); + if (!previousPeerListStr.equals(currentPeerListStr)) { + LOGGER.info( + "[RESET PEER LIST] Local peer list has been reset: {} -> {}", + previousPeerListStr, + impl.getPeers()); + } else { + LOGGER.info( + "[RESET PEER LIST] The current peer list is correct, nothing need to be reset: {}", + previousPeerListStr); + } } @Override @@ -500,11 +552,6 @@ public List getAllConsensusGroupIds() { return new ArrayList<>(stateMachineMap.keySet()); } - @Override - public List getAllConsensusGroupIdsWithoutStarting() { - return getConsensusGroupIdsFromDir(storageDir, LOGGER); - } - @Override public String getRegionDirFromConsensusGroupId(ConsensusGroupId groupId) { return getPeerDir(groupId); diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java index 9b4c3274e6a7..9ae8dcd6a267 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java @@ -94,19 +94,18 @@ import java.io.File; import java.io.IOException; -import java.nio.file.DirectoryStream; -import java.nio.file.Files; -import java.nio.file.Path; import java.util.ArrayList; import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.UUID; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.BiConsumer; import java.util.function.BooleanSupplier; import java.util.stream.Collectors; @@ -146,6 +145,8 @@ class RatisConsensus implements IConsensus { private final RatisMetricSet ratisMetricSet; private final TConsensusGroupType consensusGroupType; + private Map> correctPeerListBeforeStart = null; + private final ConcurrentHashMap canServeStaleRead; public RatisConsensus(ConsensusConfig config, IStateMachine.Registry registry) { @@ -235,6 +236,27 @@ public synchronized void start() throws IOException { MetricService.getInstance().addMetricSet(this.ratisMetricSet); server.get().start(); registerAndStartDiskGuardian(); + + if (correctPeerListBeforeStart != null) { + BiConsumer> resetPeerListWithoutThrow = + (consensusGroupId, peers) -> { + try { + resetPeerList(consensusGroupId, peers); + } catch (ConsensusGroupNotExistException ignore) { + + } catch (Exception e) { + logger.warn("Failed to reset peer list while start", e); + } + }; + // make peers which are in list correct + correctPeerListBeforeStart.forEach(resetPeerListWithoutThrow); + // clear peers which are not in the list + getAllConsensusGroupIds().stream() + .filter(consensusGroupId -> !correctPeerListBeforeStart.containsKey(consensusGroupId)) + .forEach( + consensusGroupId -> + resetPeerListWithoutThrow.accept(consensusGroupId, Collections.emptyList())); + } } @Override @@ -591,10 +613,16 @@ public void removeRemotePeer(ConsensusGroupId groupId, Peer peer) throws Consens sendReconfiguration(RaftGroup.valueOf(raftGroupId, newConfig)); } + @Override + public void recordCorrectPeerListBeforeStarting( + Map> correctPeerList) { + logger.info("Record correct peer list: {}", correctPeerList); + this.correctPeerListBeforeStart = correctPeerList; + } + @Override public void resetPeerList(ConsensusGroupId groupId, List correctPeers) throws ConsensusException { - logger.info("[RESET PEER LIST] Start to reset peer list to {}", correctPeers); final RaftGroupId raftGroupId = Utils.fromConsensusGroupIdToRaftGroupId(groupId); final RaftGroup group = getGroupInfo(raftGroupId); @@ -610,7 +638,7 @@ public void resetPeerList(ConsensusGroupId groupId, List correctPeers) peer.getNodeId(), peer.getEndpoint(), DEFAULT_PRIORITY)) .anyMatch( raftPeer -> - myself.getId() == raftPeer.getId() + myself.getId().equals(raftPeer.getId()) && myself.getAddress().equals(raftPeer.getAddress())); if (!myselfInCorrectPeers) { logger.info( @@ -624,6 +652,20 @@ public void resetPeerList(ConsensusGroupId groupId, List correctPeers) Utils.fromPeersAndPriorityToRaftPeers(correctPeers, DEFAULT_PRIORITY); final RaftGroup newGroup = RaftGroup.valueOf(raftGroupId, newGroupPeers); + Set localRaftPeerSet = new HashSet<>(group.getPeers()); + Set correctRaftPeerSet = new HashSet<>(newGroupPeers); + if (localRaftPeerSet.equals(correctRaftPeerSet)) { + // configurations are the same + logger.info( + "[RESET PEER LIST] The current peer list is correct, nothing need to be reset: {}", + localRaftPeerSet); + return; + } + + logger.info( + "[RESET PEER LIST] Peer list will be reset from {} to {}", + localRaftPeerSet, + correctRaftPeerSet); RaftClientReply reply = sendReconfiguration(newGroup); if (reply.isSuccess()) { logger.info("[RESET PEER LIST] Peer list has been reset to {}", newGroupPeers); @@ -791,30 +833,6 @@ public List getAllConsensusGroupIds() { } } - @Override - public List getAllConsensusGroupIdsWithoutStarting() { - if (!storageDir.exists()) { - return Collections.emptyList(); - } - List consensusGroupIds = new ArrayList<>(); - try (DirectoryStream stream = Files.newDirectoryStream(storageDir.toPath())) { - for (Path path : stream) { - try { - RaftGroupId raftGroupId = - RaftGroupId.valueOf(UUID.fromString(path.getFileName().toString())); - consensusGroupIds.add(Utils.fromRaftGroupIdToConsensusGroupId(raftGroupId)); - } catch (Exception e) { - logger.info( - "The directory {} is not a group directory;" + " ignoring it. ", - path.getFileName().toString()); - } - } - } catch (IOException e) { - logger.error("Failed to get all consensus group ids from disk", e); - } - return consensusGroupIds; - } - @Override public String getRegionDirFromConsensusGroupId(ConsensusGroupId consensusGroupId) { RaftGroupId raftGroupId = Utils.fromConsensusGroupIdToRaftGroupId(consensusGroupId); diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/simple/SimpleConsensus.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/simple/SimpleConsensus.java index 19258309028d..5800e76b0081 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/simple/SimpleConsensus.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/simple/SimpleConsensus.java @@ -38,7 +38,6 @@ import org.apache.iotdb.consensus.exception.ConsensusGroupNotExistException; import org.apache.iotdb.consensus.exception.IllegalPeerEndpointException; import org.apache.iotdb.consensus.exception.IllegalPeerNumException; -import org.apache.iotdb.consensus.iot.IoTConsensus; import org.apache.iotdb.rpc.TSStatusCode; import org.slf4j.Logger; @@ -211,6 +210,12 @@ public void removeRemotePeer(ConsensusGroupId groupId, Peer peer) throws Consens throw new ConsensusException("SimpleConsensus does not support membership changes"); } + @Override + public void recordCorrectPeerListBeforeStarting( + Map> correctPeerList) { + logger.info("SimpleConsensus will do nothing when calling recordCorrectPeerListBeforeStarting"); + } + @Override public void transferLeader(ConsensusGroupId groupId, Peer newLeader) throws ConsensusException { throw new ConsensusException("SimpleConsensus does not support leader transfer"); @@ -254,11 +259,6 @@ public List getAllConsensusGroupIds() { return new ArrayList<>(stateMachineMap.keySet()); } - @Override - public List getAllConsensusGroupIdsWithoutStarting() { - return IoTConsensus.getConsensusGroupIdsFromDir(storageDir, logger); - } - @Override public String getRegionDirFromConsensusGroupId(ConsensusGroupId groupId) { return buildPeerDir(groupId); diff --git a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/ReplicateTest.java b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/ReplicateTest.java index 8072ab100660..3ec7769f2ac5 100644 --- a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/ReplicateTest.java +++ b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/ReplicateTest.java @@ -324,10 +324,6 @@ public void parsingAndConstructIDTest() throws Exception { for (int i = 0; i < CHECK_POINT_GAP; i++) { servers.get(0).write(gid, new TestEntry(i, peers.get(0))); } - List ids = servers.get(0).getAllConsensusGroupIdsWithoutStarting(); - - Assert.assertEquals(1, ids.size()); - Assert.assertEquals(gid, ids.get(0)); String regionDir = servers.get(0).getRegionDirFromConsensusGroupId(gid); try { diff --git a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/StabilityTest.java b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/StabilityTest.java index d7675084680c..9fcbf9e03f16 100644 --- a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/StabilityTest.java +++ b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/StabilityTest.java @@ -24,7 +24,6 @@ import org.apache.iotdb.commons.consensus.ConsensusGroupId; import org.apache.iotdb.commons.consensus.DataRegionId; import org.apache.iotdb.consensus.ConsensusFactory; -import org.apache.iotdb.consensus.IConsensus; import org.apache.iotdb.consensus.common.Peer; import org.apache.iotdb.consensus.config.ConsensusConfig; import org.apache.iotdb.consensus.exception.ConsensusException; @@ -42,7 +41,11 @@ import java.io.File; import java.io.IOException; +import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import static org.junit.Assert.assertTrue; @@ -52,32 +55,34 @@ public class StabilityTest { private final File storageDir = new File("target" + java.io.File.separator + "stability"); - private IConsensus consensusImpl; + private IoTConsensus consensusImpl; private final int basePort = 6667; public void constructConsensus() throws IOException { consensusImpl = - ConsensusFactory.getConsensusImpl( - ConsensusFactory.IOT_CONSENSUS, - ConsensusConfig.newBuilder() - .setThisNodeId(1) - .setThisNode(new TEndPoint("0.0.0.0", basePort)) - .setStorageDir(storageDir.getAbsolutePath()) - .setConsensusGroupType(TConsensusGroupType.DataRegion) - .build(), - gid -> new TestStateMachine()) - .orElseThrow( - () -> - new IllegalArgumentException( - String.format( - ConsensusFactory.CONSTRUCT_FAILED_MSG, - ConsensusFactory.IOT_CONSENSUS))); + (IoTConsensus) + ConsensusFactory.getConsensusImpl( + ConsensusFactory.IOT_CONSENSUS, + ConsensusConfig.newBuilder() + .setThisNodeId(1) + .setThisNode(new TEndPoint("0.0.0.0", basePort)) + .setStorageDir(storageDir.getAbsolutePath()) + .setConsensusGroupType(TConsensusGroupType.DataRegion) + .build(), + gid -> new TestStateMachine()) + .orElseThrow( + () -> + new IllegalArgumentException( + String.format( + ConsensusFactory.CONSTRUCT_FAILED_MSG, + ConsensusFactory.IOT_CONSENSUS))); consensusImpl.start(); } @Before public void setUp() throws Exception { + FileUtils.deleteFully(storageDir); constructConsensus(); } @@ -210,4 +215,47 @@ public void snapshotTest() throws ConsensusException { Assert.assertNotEquals(versionFiles1[0].getName(), versionFiles2[0].getName()); consensusImpl.deleteLocalPeer(dataRegionId); } + + @Test + public void recordAndResetPeerListTest() throws Exception { + try { + Assert.assertEquals(0, consensusImpl.getReplicationNum(dataRegionId)); + consensusImpl.createLocalPeer( + dataRegionId, + Collections.singletonList(new Peer(dataRegionId, 1, new TEndPoint("0.0.0.0", basePort)))); + Assert.assertEquals(1, consensusImpl.getReplicationNum(dataRegionId)); + Assert.assertEquals(1, consensusImpl.getImpl(dataRegionId).getConfiguration().size()); + } catch (ConsensusException e) { + Assert.fail(); + } + consensusImpl.stop(); + + // test add sync channel + Map> correctPeers = new HashMap<>(); + List peerList1And2 = new ArrayList<>(); + peerList1And2.add(new Peer(dataRegionId, 1, new TEndPoint("0.0.0.0", basePort))); + peerList1And2.add(new Peer(dataRegionId, 2, new TEndPoint("0.0.0.0", basePort))); + correctPeers.put(dataRegionId, peerList1And2); + consensusImpl.recordCorrectPeerListBeforeStarting(correctPeers); + consensusImpl.start(); + Assert.assertEquals(2, consensusImpl.getImpl(dataRegionId).getConfiguration().size()); + consensusImpl.stop(); + + // test remove sync channel + List peerList1 = new ArrayList<>(); + peerList1.add(new Peer(dataRegionId, 1, new TEndPoint("0.0.0.0", basePort))); + correctPeers.put(dataRegionId, peerList1); + consensusImpl.recordCorrectPeerListBeforeStarting(correctPeers); + consensusImpl.start(); + Assert.assertEquals(1, consensusImpl.getImpl(dataRegionId).getConfiguration().size()); + consensusImpl.stop(); + + // test remove invalid peer + List peerList2 = new ArrayList<>(); + peerList2.add(new Peer(dataRegionId, 2, new TEndPoint("0.0.0.0", basePort))); + correctPeers.put(dataRegionId, peerList2); + consensusImpl.recordCorrectPeerListBeforeStarting(correctPeers); + consensusImpl.start(); + Assert.assertNull(consensusImpl.getImpl(dataRegionId)); + } } diff --git a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java index 685f580e4bae..842ea78af723 100644 --- a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java +++ b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java @@ -302,10 +302,6 @@ public void parsingAndConstructIDs() throws Exception { servers.get(0).createLocalPeer(gid, peers.subList(0, 1)); doConsensus(0, 10, 10); - List ids = servers.get(0).getAllConsensusGroupIdsWithoutStarting(); - Assert.assertEquals(1, ids.size()); - Assert.assertEquals(gid, ids.get(0)); - String regionDir = servers.get(0).getRegionDirFromConsensusGroupId(gid); try { File regionDirFile = new File(regionDir); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/ConsensusPipeDataNodeDispatcher.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/ConsensusPipeDataNodeDispatcher.java index 23db3caa0d73..3e56a57e3ca7 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/ConsensusPipeDataNodeDispatcher.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/ConsensusPipeDataNodeDispatcher.java @@ -37,6 +37,9 @@ import java.util.Map; +import static org.apache.iotdb.commons.pipe.config.constant.PipeRPCMessageConstant.PIPE_ALREADY_EXIST_MSG; +import static org.apache.iotdb.commons.pipe.config.constant.PipeRPCMessageConstant.PIPE_NOT_EXIST_MSG; + public class ConsensusPipeDataNodeDispatcher implements ConsensusPipeDispatcher { private static final Logger LOGGER = LoggerFactory.getLogger(ConsensusPipeDataNodeDispatcher.class); @@ -64,6 +67,10 @@ public void createPipe( TSStatus status = configNodeClient.createPipe(req); if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != status.getCode()) { LOGGER.warn("Failed to create consensus pipe-{}, status: {}", pipeName, status); + // ignore idempotence logic + if (status.getMessage().contains(PIPE_ALREADY_EXIST_MSG)) { + return; + } throw new PipeException(status.getMessage()); } } catch (Exception e) { @@ -111,6 +118,10 @@ public void dropPipe(ConsensusPipeName pipeName) throws Exception { final TSStatus status = configNodeClient.dropPipe(pipeName.toString()); if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != status.getCode()) { LOGGER.warn("Failed to drop consensus pipe-{}, status: {}", pipeName, status); + // ignore idempotence logic + if (status.getMessage().contains(PIPE_NOT_EXIST_MSG)) { + return; + } throw new PipeException(status.getMessage()); } } catch (Exception e) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java index fc2100d45d1a..9a2ff29502a4 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java @@ -21,12 +21,12 @@ import org.apache.iotdb.common.rpc.thrift.Model; import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation; -import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId; import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType; import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration; import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.common.rpc.thrift.TNodeResource; +import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; import org.apache.iotdb.commons.ServerCommandLine; import org.apache.iotdb.commons.client.exception.ClientManagerException; import org.apache.iotdb.commons.concurrent.IoTDBDefaultThreadExceptionHandler; @@ -65,6 +65,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TRuntimeConfiguration; import org.apache.iotdb.confignode.rpc.thrift.TSystemConfigurationResp; import org.apache.iotdb.consensus.ConsensusFactory; +import org.apache.iotdb.consensus.common.Peer; import org.apache.iotdb.db.conf.DataNodeStartupCheck; import org.apache.iotdb.db.conf.DataNodeSystemPropertiesHandler; import org.apache.iotdb.db.conf.IoTDBConfig; @@ -126,6 +127,7 @@ import java.nio.charset.Charset; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -560,11 +562,15 @@ private void sendRegisterRequestToConfigNode(boolean isPreCheck) } } - private void removeInvalidRegions(List dataNodeConsensusGroupIds) { - removeInvalidConsensusDataRegions(dataNodeConsensusGroupIds); + private void makeRegionsCorrect(List correctRegions) { + List dataNodeConsensusGroupIds = + correctRegions.stream() + .map(TRegionReplicaSet::getRegionId) + .map(ConsensusGroupId.Factory::createFromTConsensusGroupId) + .collect(Collectors.toList()); removeInvalidDataRegions(dataNodeConsensusGroupIds); - removeInvalidConsensusSchemaRegions(dataNodeConsensusGroupIds); removeInvalidSchemaRegions(dataNodeConsensusGroupIds); + prepareToResetDataRegionPeerList(correctRegions); } private void removeInvalidDataRegions(List dataNodeConsensusGroupIds) { @@ -581,24 +587,6 @@ private void removeInvalidDataRegions(List dataNodeConsensusGr }); } - private void removeInvalidConsensusDataRegions(List dataNodeConsensusGroupIds) { - List invalidDataRegionConsensusGroupIds = - DataRegionConsensusImpl.getInstance().getAllConsensusGroupIdsWithoutStarting().stream() - .filter(consensusGroupId -> !dataNodeConsensusGroupIds.contains(consensusGroupId)) - .collect(Collectors.toList()); - if (invalidDataRegionConsensusGroupIds.isEmpty()) { - return; - } - logger.info("Remove invalid dataRegion directories... {}", invalidDataRegionConsensusGroupIds); - for (ConsensusGroupId consensusGroupId : invalidDataRegionConsensusGroupIds) { - File oldDir = - new File( - DataRegionConsensusImpl.getInstance() - .getRegionDirFromConsensusGroupId(consensusGroupId)); - removeDir(oldDir); - } - } - private void removeInvalidSchemaRegions(List schemaConsensusGroupIds) { Map> localSchemaRegionInfo = SchemaEngine.getLocalSchemaRegionInfo(); @@ -622,26 +610,6 @@ private void removeDataDirRegion( }); } - private void removeInvalidConsensusSchemaRegions( - List dataNodeConsensusGroupIds) { - List invalidSchemaRegionConsensusGroupIds = - SchemaRegionConsensusImpl.getInstance().getAllConsensusGroupIdsWithoutStarting().stream() - .filter(consensusGroupId -> !dataNodeConsensusGroupIds.contains(consensusGroupId)) - .collect(Collectors.toList()); - if (invalidSchemaRegionConsensusGroupIds.isEmpty()) { - return; - } - logger.info( - "Remove invalid schemaRegion directories... {}", invalidSchemaRegionConsensusGroupIds); - for (ConsensusGroupId consensusGroupId : invalidSchemaRegionConsensusGroupIds) { - File oldDir = - new File( - SchemaRegionConsensusImpl.getInstance() - .getRegionDirFromConsensusGroupId(consensusGroupId)); - removeDir(oldDir); - } - } - private void removeInvalidSchemaDir(String database, SchemaRegionId schemaRegionId) { String systemSchemaDir = config.getSystemDir() + File.separator + database + File.separator + schemaRegionId.getId(); @@ -657,6 +625,39 @@ private void removeDir(File regionDir) { } } + private void prepareToResetDataRegionPeerList(List correctedRegions) { + Map> correctPeerListForDataRegion = new HashMap<>(); + Map> correctPeerListForSchemaRegion = new HashMap<>(); + for (TRegionReplicaSet regionReplicaSet : correctedRegions) { + ConsensusGroupId consensusGroupId = + ConsensusGroupId.Factory.createFromTConsensusGroupId(regionReplicaSet.regionId); + List peerList = new ArrayList<>(); + if (consensusGroupId.getType() == TConsensusGroupType.DataRegion) { + for (TDataNodeLocation dataNodeLocation : regionReplicaSet.getDataNodeLocations()) { + peerList.add( + new Peer( + consensusGroupId, + dataNodeLocation.getDataNodeId(), + dataNodeLocation.getDataRegionConsensusEndPoint())); + } + correctPeerListForDataRegion.put(consensusGroupId, peerList); + } else if (consensusGroupId.getType() == TConsensusGroupType.SchemaRegion) { + for (TDataNodeLocation dataNodeLocation : regionReplicaSet.getDataNodeLocations()) { + peerList.add( + new Peer( + consensusGroupId, + dataNodeLocation.getDataNodeId(), + dataNodeLocation.getSchemaRegionConsensusEndPoint())); + } + correctPeerListForSchemaRegion.put(consensusGroupId, peerList); + } + } + DataRegionConsensusImpl.getInstance() + .recordCorrectPeerListBeforeStarting(correctPeerListForDataRegion); + SchemaRegionConsensusImpl.getInstance() + .recordCorrectPeerListBeforeStarting(correctPeerListForSchemaRegion); + } + private void sendRestartRequestToConfigNode() throws StartupException { logger.info("Sending restart request to ConfigNode-leader..."); long startTime = System.currentTimeMillis(); @@ -709,11 +710,7 @@ private void sendRestartRequestToConfigNode() throws StartupException { config.getClusterName(), (endTime - startTime)); - List consensusGroupIds = dataNodeRestartResp.getConsensusGroupIds(); - removeInvalidRegions( - consensusGroupIds.stream() - .map(ConsensusGroupId.Factory::createFromTConsensusGroupId) - .collect(Collectors.toList())); + makeRegionsCorrect(dataNodeRestartResp.getCorrectConsensusGroups()); } else { /* Throw exception when restart is rejected */ throw new StartupException(dataNodeRestartResp.getStatus().getMessage()); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeRPCMessageConstant.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeRPCMessageConstant.java new file mode 100644 index 000000000000..7e9646e5a864 --- /dev/null +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeRPCMessageConstant.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.commons.pipe.config.constant; + +public class PipeRPCMessageConstant { + // These two message are used in multi-modules such as pipe and IoTV2 + public static final String PIPE_ALREADY_EXIST_MSG = + "the pipe with the same name has been created"; + public static final String PIPE_NOT_EXIST_MSG = "the pipe does not exist"; + + private PipeRPCMessageConstant() { + throw new IllegalStateException("Utility class"); + } +} diff --git a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift index 5f090003119d..f2e6c1400916 100644 --- a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift +++ b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift @@ -134,7 +134,7 @@ struct TDataNodeRestartResp { 1: required common.TSStatus status 2: required list configNodeList 3: optional TRuntimeConfiguration runtimeConfiguration - 4: optional list consensusGroupIds + 4: optional list correctConsensusGroups } struct TDataNodeRemoveReq {