Skip to content

Commit

Permalink
SOLR-17054: Remove unused and duplicate code in DistributedZkUpdatePr…
Browse files Browse the repository at this point in the history
…ocessor (#2038)

Co-authored-by: Vincent Primault <[email protected]>
  • Loading branch information
2 people authored and bruno-roustant committed Oct 26, 2023
1 parent 542cf0b commit 89d9111
Showing 1 changed file with 17 additions and 81 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -179,15 +179,12 @@ public void processCommit(CommitUpdateCommand cmd) throws IOException {
}
isLeader = leaderReplica.getName().equals(cloudDesc.getCoreNodeName());

nodes = getCollectionUrls(collection, EnumSet.of(Replica.Type.TLOG, Replica.Type.NRT), true);
nodes = getCollectionUrls(collection);
if (nodes == null) {
// This could happen if there are only pull replicas
throw new SolrException(
SolrException.ErrorCode.SERVER_ERROR,
"Unable to distribute commit operation. No replicas available of types "
+ Replica.Type.TLOG
+ " or "
+ Replica.Type.NRT);
"Unable to distribute commit operation. No leader replicas available.");
}

nodes.removeIf(
Expand All @@ -212,13 +209,11 @@ public void processCommit(CommitUpdateCommand cmd) throws IOException {
useNodes = nodes;
params.set(DISTRIB_UPDATE_PARAM, DistribPhase.TOLEADER.toString());
params.set(COMMIT_END_POINT, "leaders");
if (useNodes != null) {
params.set(
DISTRIB_FROM,
ZkCoreNodeProps.getCoreUrl(zkController.getBaseUrl(), req.getCore().getName()));
cmdDistrib.distribCommit(cmd, useNodes, params);
issuedDistribCommit = true;
}
params.set(
DISTRIB_FROM,
ZkCoreNodeProps.getCoreUrl(zkController.getBaseUrl(), req.getCore().getName()));
cmdDistrib.distribCommit(cmd, useNodes, params);
issuedDistribCommit = true;
}

if (isLeader) {
Expand All @@ -232,7 +227,7 @@ public void processCommit(CommitUpdateCommand cmd) throws IOException {

params.set(COMMIT_END_POINT, "replicas");

useNodes = getReplicaNodesForLeader(cloudDesc.getShardId(), leaderReplica);
useNodes = getReplicaNodesForLeader(cloudDesc.getShardId(), leaderReplica, 0);

if (useNodes != null) {
params.set(
Expand Down Expand Up @@ -786,52 +781,7 @@ protected List<SolrCmdDistributor.Node> setupRequest(
// that means I want to forward onto my replicas...
// so get the replicas...
forwardToLeader = false;
String leaderCoreNodeName = leaderReplica.getName();
List<Replica> replicas =
clusterState
.getCollection(collection)
.getSlice(shardId)
.getReplicas(EnumSet.of(Replica.Type.NRT, Replica.Type.TLOG));
replicas.removeIf((replica) -> replica.getName().equals(leaderCoreNodeName));
if (replicas.isEmpty()) {
return null;
}

// check for test param that lets us miss replicas
String[] skipList = req.getParams().getParams(TEST_DISTRIB_SKIP_SERVERS);
Set<String> skipListSet = null;
if (skipList != null) {
skipListSet = CollectionUtil.newHashSet(skipList.length);
skipListSet.addAll(Arrays.asList(skipList));
log.info("test.distrib.skip.servers was found and contains:{}", skipListSet);
}

List<SolrCmdDistributor.Node> nodes = new ArrayList<>(replicas.size());
skippedCoreNodeNames = new HashSet<>();
ZkShardTerms zkShardTerms = zkController.getShardTerms(collection, shardId);
for (Replica replica : replicas) {
String coreNodeName = replica.getName();
if (skipList != null && skipListSet.contains(replica.getCoreUrl())) {
if (log.isInfoEnabled()) {
log.info("check url:{} against:{} result:true", replica.getCoreUrl(), skipListSet);
}
} else if (zkShardTerms.registered(coreNodeName)
&& zkShardTerms.skipSendingUpdatesTo(coreNodeName)) {
if (log.isDebugEnabled()) {
log.debug("skip url:{} cause its term is less than leader", replica.getCoreUrl());
}
skippedCoreNodeNames.add(replica.getName());
} else if (!clusterState.getLiveNodes().contains(replica.getNodeName())
|| replica.getState() == Replica.State.DOWN) {
skippedCoreNodeNames.add(replica.getName());
} else {
nodes.add(
new SolrCmdDistributor.StdNode(
new ZkCoreNodeProps(replica), collection, shardId, maxRetriesToFollowers));
}
}
return nodes;

return getReplicaNodesForLeader(shardId, leaderReplica, maxRetriesToFollowers);
} else {
// I need to forward on to the leader...
forwardToLeader = true;
Expand Down Expand Up @@ -883,8 +833,7 @@ private void checkReplicationTracker(UpdateCommand cmd) {
}
}

private List<SolrCmdDistributor.Node> getCollectionUrls(
String collection, EnumSet<Replica.Type> types, boolean onlyLeaders) {
private List<SolrCmdDistributor.Node> getCollectionUrls(String collection) {
final DocCollection docCollection = clusterState.getCollectionOrNull(collection);
if (docCollection == null || docCollection.getSlicesMap() == null) {
throw new ZooKeeperException(
Expand All @@ -894,24 +843,10 @@ private List<SolrCmdDistributor.Node> getCollectionUrls(
final List<SolrCmdDistributor.Node> urls = new ArrayList<>(slices.size());
for (Map.Entry<String, Slice> sliceEntry : slices.entrySet()) {
Slice replicas = slices.get(sliceEntry.getKey());
if (onlyLeaders) {
Replica replica = docCollection.getLeader(replicas.getName());
if (replica != null) {
ZkCoreNodeProps nodeProps = new ZkCoreNodeProps(replica);
urls.add(new SolrCmdDistributor.StdNode(nodeProps, collection, replicas.getName()));
}
continue;
}
Map<String, Replica> shardMap = replicas.getReplicasMap();

for (Map.Entry<String, Replica> entry : shardMap.entrySet()) {
if (!types.contains(entry.getValue().getType())) {
continue;
}
ZkCoreNodeProps nodeProps = new ZkCoreNodeProps(entry.getValue());
if (clusterState.liveNodesContain(nodeProps.getNodeName())) {
urls.add(new SolrCmdDistributor.StdNode(nodeProps, collection, replicas.getName()));
}
Replica replica = docCollection.getLeader(replicas.getName());
if (replica != null) {
ZkCoreNodeProps nodeProps = new ZkCoreNodeProps(replica);
urls.add(new SolrCmdDistributor.StdNode(nodeProps, collection, replicas.getName()));
}
}
if (urls.isEmpty()) {
Expand Down Expand Up @@ -959,7 +894,7 @@ protected boolean amISubShardLeader(
}

protected List<SolrCmdDistributor.Node> getReplicaNodesForLeader(
String shardId, Replica leaderReplica) {
String shardId, Replica leaderReplica, int maxRetries) {
String leaderCoreNodeName = leaderReplica.getName();
List<Replica> replicas =
clusterState
Expand Down Expand Up @@ -1000,7 +935,8 @@ protected List<SolrCmdDistributor.Node> getReplicaNodesForLeader(
skippedCoreNodeNames.add(replica.getName());
} else {
nodes.add(
new SolrCmdDistributor.StdNode(new ZkCoreNodeProps(replica), collection, shardId));
new SolrCmdDistributor.StdNode(
new ZkCoreNodeProps(replica), collection, shardId, maxRetries));
}
}
return nodes;
Expand Down

0 comments on commit 89d9111

Please sign in to comment.