Skip to content

Commit

Permalink
[region migration] Make peer list correct when IoTConsensus & IoTCons…
Browse files Browse the repository at this point in the history
…ensusV2 & Ratis starting (#14505)

* done

* for IoTV2 ?

* test conf

* fix iotv2

* ˜ø

* still to handle ratis

* all done

* rename start to starting

* use string var

* spotless

* tan review

* tan review

* add UT for iotv1

* fix review

* fix RatisConsensus

* RatisConsensus only sendConfiguration if necessary

* done

* add peer list

---------

Co-authored-by: Peng Junzhi <[email protected]>
Co-authored-by: Peng Junzhi <[email protected]>
  • Loading branch information
3 people authored Dec 23, 2024
1 parent 5fe299a commit 7d92f47
Show file tree
Hide file tree
Showing 16 changed files with 375 additions and 233 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import org.apache.iotdb.common.rpc.thrift.TAINodeConfiguration;
import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TFlushReq;
Expand Down Expand Up @@ -362,11 +361,8 @@ public TDataNodeRestartResp updateDataNodeIfNecessary(TDataNodeRestartReq req) {

resp.setStatus(ClusterNodeStartUtils.ACCEPT_NODE_RESTART);
resp.setRuntimeConfiguration(getRuntimeConfiguration());
List<TConsensusGroupId> consensusGroupIds =
getPartitionManager().getAllReplicaSets(nodeId).stream()
.map(TRegionReplicaSet::getRegionId)
.collect(Collectors.toList());
resp.setConsensusGroupIds(consensusGroupIds);

resp.setCorrectConsensusGroups(getPartitionManager().getAllReplicaSets(nodeId));
return resp;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@
import java.util.stream.StreamSupport;

import static org.apache.iotdb.commons.pipe.agent.plugin.builtin.BuiltinPipePlugin.IOTDB_THRIFT_CONNECTOR;
import static org.apache.iotdb.commons.pipe.config.constant.PipeRPCMessageConstant.PIPE_ALREADY_EXIST_MSG;
import static org.apache.iotdb.commons.pipe.config.constant.PipeRPCMessageConstant.PIPE_NOT_EXIST_MSG;

public class PipeTaskInfo implements SnapshotProcessor {

Expand Down Expand Up @@ -179,8 +181,8 @@ private boolean checkBeforeCreatePipeInternal(final TCreatePipeReq createPipeReq

final String exceptionMessage =
String.format(
"Failed to create pipe %s, the pipe with the same name has been created",
createPipeRequest.getPipeName());
"Failed to create pipe %s, %s",
createPipeRequest.getPipeName(), PIPE_ALREADY_EXIST_MSG);
LOGGER.warn(exceptionMessage);
throw new PipeException(exceptionMessage);
}
Expand All @@ -204,7 +206,7 @@ private boolean checkAndUpdateRequestBeforeAlterPipeInternal(final TAlterPipeReq

final String exceptionMessage =
String.format(
"Failed to alter pipe %s, the pipe does not exist", alterPipeRequest.getPipeName());
"Failed to alter pipe %s, %s", alterPipeRequest.getPipeName(), PIPE_NOT_EXIST_MSG);
LOGGER.warn(exceptionMessage);
throw new PipeException(exceptionMessage);
}
Expand Down Expand Up @@ -281,7 +283,7 @@ public void checkBeforeStartPipe(final String pipeName) throws PipeException {
private void checkBeforeStartPipeInternal(final String pipeName) throws PipeException {
if (!isPipeExisted(pipeName)) {
final String exceptionMessage =
String.format("Failed to start pipe %s, the pipe does not exist", pipeName);
String.format("Failed to start pipe %s, %s", pipeName, PIPE_NOT_EXIST_MSG);
LOGGER.warn(exceptionMessage);
throw new PipeException(exceptionMessage);
}
Expand All @@ -307,7 +309,7 @@ public void checkBeforeStopPipe(final String pipeName) throws PipeException {
private void checkBeforeStopPipeInternal(final String pipeName) throws PipeException {
if (!isPipeExisted(pipeName)) {
final String exceptionMessage =
String.format("Failed to stop pipe %s, the pipe does not exist", pipeName);
String.format("Failed to stop pipe %s, %s", pipeName, PIPE_NOT_EXIST_MSG);
LOGGER.warn(exceptionMessage);
throw new PipeException(exceptionMessage);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@

import java.io.IOException;
import java.util.List;
import java.util.Map;

/** Consensus module base interface. */
@ThreadSafe
Expand Down Expand Up @@ -145,6 +146,15 @@ public interface IConsensus {
*/
void removeRemotePeer(ConsensusGroupId groupId, Peer peer) throws ConsensusException;

/**
* Record the correct peer list (likely got from the ConfigNode) for future use in resetPeerList.
* Only use this method if necessary. If it is called, it should be called before {@link
* #start()}.
*
* @param correctPeerList The correct consensus group member list
*/
void recordCorrectPeerListBeforeStarting(Map<ConsensusGroupId, List<Peer>> correctPeerList);

/**
* Reset the peer list of the corresponding consensus group. Currently only used in the automatic
* cleanup of region migration as a rollback for {@link #addRemotePeer(ConsensusGroupId, Peer)},
Expand Down Expand Up @@ -226,17 +236,6 @@ public interface IConsensus {
*/
List<ConsensusGroupId> getAllConsensusGroupIds();

/**
* Return all consensus group ids from disk.
*
* <p>We need to parse all the RegionGroupIds from the disk directory before starting the
* consensus layer, and {@link #getAllConsensusGroupIds()} returns an empty list, so we need to
* add a new interface.
*
* @return consensusGroupId list
*/
List<ConsensusGroupId> getAllConsensusGroupIdsWithoutStarting();

/**
* Return the region directory of the corresponding consensus group.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;

public class IoTConsensus implements IConsensus {

Expand All @@ -99,6 +101,7 @@ public class IoTConsensus implements IConsensus {
private final IClientManager<TEndPoint, SyncIoTConsensusServiceClient> syncClientManager;
private final ScheduledExecutorService backgroundTaskService;
private Future<?> updateReaderFuture;
private Map<ConsensusGroupId, List<Peer>> correctPeerListBeforeStart = null;

public IoTConsensus(ConsensusConfig config, Registry registry) {
this.thisNode = config.getThisNodeEndPoint();
Expand Down Expand Up @@ -178,10 +181,32 @@ private void initAndRecover() throws IOException {
syncClientManager,
config);
stateMachineMap.put(consensusGroupId, consensus);
consensus.start();
}
}
}
if (correctPeerListBeforeStart != null) {
BiConsumer<ConsensusGroupId, List<Peer>> resetPeerListWithoutThrow =
(consensusGroupId, peers) -> {
try {
resetPeerList(consensusGroupId, peers);
} catch (ConsensusGroupNotExistException ignore) {

} catch (Exception e) {
logger.warn("Failed to reset peer list while start", e);
}
};
// make peers which are in list correct
correctPeerListBeforeStart.forEach(resetPeerListWithoutThrow);
// clear peers which are not in the list
stateMachineMap.keySet().stream()
.filter(consensusGroupId -> !correctPeerListBeforeStart.containsKey(consensusGroupId))
// copy to a new list to avoid concurrent modification
.collect(Collectors.toList())
.forEach(
consensusGroupId ->
resetPeerListWithoutThrow.accept(consensusGroupId, Collections.emptyList()));
}
stateMachineMap.values().forEach(IoTConsensusServerImpl::start);
}

@Override
Expand Down Expand Up @@ -435,36 +460,6 @@ public List<ConsensusGroupId> getAllConsensusGroupIds() {
return new ArrayList<>(stateMachineMap.keySet());
}

@Override
public List<ConsensusGroupId> getAllConsensusGroupIdsWithoutStarting() {
return getConsensusGroupIdsFromDir(storageDir, logger);
}

public static List<ConsensusGroupId> getConsensusGroupIdsFromDir(File storageDir, Logger logger) {
if (!storageDir.exists()) {
return Collections.emptyList();
}
List<ConsensusGroupId> consensusGroupIds = new ArrayList<>();
try (DirectoryStream<Path> stream = Files.newDirectoryStream(storageDir.toPath())) {
for (Path path : stream) {
try {
String[] items = path.getFileName().toString().split("_");
ConsensusGroupId consensusGroupId =
ConsensusGroupId.Factory.create(
Integer.parseInt(items[0]), Integer.parseInt(items[1]));
consensusGroupIds.add(consensusGroupId);
} catch (Exception e) {
logger.info(
"The directory {} is not a group directory;" + " ignoring it. ",
path.getFileName().toString());
}
}
} catch (IOException e) {
logger.error("Failed to get all consensus group ids from disk", e);
}
return consensusGroupIds;
}

@Override
public String getRegionDirFromConsensusGroupId(ConsensusGroupId groupId) {
return buildPeerDir(storageDir, groupId);
Expand All @@ -483,10 +478,16 @@ public void reloadConsensusConfig(ConsensusConfig consensusConfig) {
.init(config.getReplication().getRegionMigrationSpeedLimitBytesPerSecond());
}

@Override
public void recordCorrectPeerListBeforeStarting(
Map<ConsensusGroupId, List<Peer>> correctPeerList) {
logger.info("Record correct peer list: {}", correctPeerList);
this.correctPeerListBeforeStart = correctPeerList;
}

@Override
public void resetPeerList(ConsensusGroupId groupId, List<Peer> correctPeers)
throws ConsensusException {
logger.info("[RESET PEER LIST] Start to reset peer list to {}", correctPeers);
IoTConsensusServerImpl impl =
Optional.ofNullable(stateMachineMap.get(groupId))
.orElseThrow(() -> new ConsensusGroupNotExistException(groupId));
Expand All @@ -501,27 +502,37 @@ public void resetPeerList(ConsensusGroupId groupId, List<Peer> correctPeers)
}

synchronized (impl) {
// remove invalid peer
ImmutableList<Peer> currentMembers = ImmutableList.copyOf(impl.getConfiguration());
String previousPeerListStr = currentMembers.toString();
for (Peer peer : currentMembers) {
if (!correctPeers.contains(peer)) {
if (!impl.removeSyncLogChannel(peer)) {
logger.error(
"[RESET PEER LIST] Failed to remove peer {}'s sync log channel from group {}",
peer,
groupId);
logger.error("[RESET PEER LIST] Failed to remove sync channel with: {}", peer);
} else {
logger.info("[RESET PEER LIST] Remove sync channel with: {}", peer);
}
}
}
logger.info(
"[RESET PEER LIST] Local peer list has been reset: {} -> {}",
previousPeerListStr,
impl.getConfiguration());
// add correct peer
for (Peer peer : correctPeers) {
if (!impl.getConfiguration().contains(peer)) {
logger.warn("[RESET PEER LIST] \"Correct peer\" {} is not in local peer list", peer);
impl.buildSyncLogChannel(peer);
logger.info("[RESET PEER LIST] Build sync channel with: {}", peer);
}
}
// show result
String newPeerListStr = impl.getConfiguration().toString();
if (!previousPeerListStr.equals(newPeerListStr)) {
logger.info(
"[RESET PEER LIST] Local peer list has been reset: {} -> {}",
previousPeerListStr,
newPeerListStr);
} else {
logger.info(
"[RESET PEER LIST] The current peer list is correct, nothing need to be reset: {}",
previousPeerListStr);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -645,17 +645,12 @@ private boolean isSuccess(TSStatus status) {
return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode();
}

/**
* build SyncLog channel with safeIndex as the default initial sync index.
*
* @throws ConsensusGroupModifyPeerException
*/
public void buildSyncLogChannel(Peer targetPeer) throws ConsensusGroupModifyPeerException {
/** build SyncLog channel with safeIndex as the default initial sync index. */
public void buildSyncLogChannel(Peer targetPeer) {
buildSyncLogChannel(targetPeer, getMinSyncIndex());
}

public void buildSyncLogChannel(Peer targetPeer, long initialSyncIndex)
throws ConsensusGroupModifyPeerException {
public void buildSyncLogChannel(Peer targetPeer, long initialSyncIndex) {
KillPoint.setKillPoint(DataNodeKillPoints.ORIGINAL_ADD_PEER_DONE);
// step 1, build sync channel in LogDispatcher
logger.info(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,13 +185,8 @@ public TBuildSyncLogChannelRes buildSyncLogChannel(TBuildSyncLogChannelReq req)
return new TBuildSyncLogChannelRes(status);
}
TSStatus responseStatus;
try {
impl.buildSyncLogChannel(new Peer(groupId, req.nodeId, req.endPoint));
responseStatus = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
} catch (ConsensusGroupModifyPeerException e) {
responseStatus = new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
responseStatus.setMessage(e.getMessage());
}
impl.buildSyncLogChannel(new Peer(groupId, req.nodeId, req.endPoint));
responseStatus = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
return new TBuildSyncLogChannelRes(responseStatus);
}

Expand Down
Loading

0 comments on commit 7d92f47

Please sign in to comment.