From 89d911119dd0bd7de47644031257047cb505e482 Mon Sep 17 00:00:00 2001 From: Vincent P Date: Thu, 26 Oct 2023 10:58:35 +0200 Subject: [PATCH] SOLR-17054: Remove unused and duplicate code in DistributedZkUpdateProcessor (#2038) Co-authored-by: Vincent Primault --- .../DistributedZkUpdateProcessor.java | 98 ++++--------------- 1 file changed, 17 insertions(+), 81 deletions(-) diff --git a/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java index a1e491c82bb..35d96a7c14a 100644 --- a/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java +++ b/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java @@ -179,15 +179,12 @@ public void processCommit(CommitUpdateCommand cmd) throws IOException { } isLeader = leaderReplica.getName().equals(cloudDesc.getCoreNodeName()); - nodes = getCollectionUrls(collection, EnumSet.of(Replica.Type.TLOG, Replica.Type.NRT), true); + nodes = getCollectionUrls(collection); if (nodes == null) { // This could happen if there are only pull replicas throw new SolrException( SolrException.ErrorCode.SERVER_ERROR, - "Unable to distribute commit operation. No replicas available of types " - + Replica.Type.TLOG - + " or " - + Replica.Type.NRT); + "Unable to distribute commit operation. No leader replicas available."); } nodes.removeIf( @@ -212,13 +209,11 @@ public void processCommit(CommitUpdateCommand cmd) throws IOException { useNodes = nodes; params.set(DISTRIB_UPDATE_PARAM, DistribPhase.TOLEADER.toString()); params.set(COMMIT_END_POINT, "leaders"); - if (useNodes != null) { - params.set( - DISTRIB_FROM, - ZkCoreNodeProps.getCoreUrl(zkController.getBaseUrl(), req.getCore().getName())); - cmdDistrib.distribCommit(cmd, useNodes, params); - issuedDistribCommit = true; - } + params.set( + DISTRIB_FROM, + ZkCoreNodeProps.getCoreUrl(zkController.getBaseUrl(), req.getCore().getName())); + cmdDistrib.distribCommit(cmd, useNodes, params); + issuedDistribCommit = true; } if (isLeader) { @@ -232,7 +227,7 @@ public void processCommit(CommitUpdateCommand cmd) throws IOException { params.set(COMMIT_END_POINT, "replicas"); - useNodes = getReplicaNodesForLeader(cloudDesc.getShardId(), leaderReplica); + useNodes = getReplicaNodesForLeader(cloudDesc.getShardId(), leaderReplica, 0); if (useNodes != null) { params.set( @@ -786,52 +781,7 @@ protected List setupRequest( // that means I want to forward onto my replicas... // so get the replicas... forwardToLeader = false; - String leaderCoreNodeName = leaderReplica.getName(); - List replicas = - clusterState - .getCollection(collection) - .getSlice(shardId) - .getReplicas(EnumSet.of(Replica.Type.NRT, Replica.Type.TLOG)); - replicas.removeIf((replica) -> replica.getName().equals(leaderCoreNodeName)); - if (replicas.isEmpty()) { - return null; - } - - // check for test param that lets us miss replicas - String[] skipList = req.getParams().getParams(TEST_DISTRIB_SKIP_SERVERS); - Set skipListSet = null; - if (skipList != null) { - skipListSet = CollectionUtil.newHashSet(skipList.length); - skipListSet.addAll(Arrays.asList(skipList)); - log.info("test.distrib.skip.servers was found and contains:{}", skipListSet); - } - - List nodes = new ArrayList<>(replicas.size()); - skippedCoreNodeNames = new HashSet<>(); - ZkShardTerms zkShardTerms = zkController.getShardTerms(collection, shardId); - for (Replica replica : replicas) { - String coreNodeName = replica.getName(); - if (skipList != null && skipListSet.contains(replica.getCoreUrl())) { - if (log.isInfoEnabled()) { - log.info("check url:{} against:{} result:true", replica.getCoreUrl(), skipListSet); - } - } else if (zkShardTerms.registered(coreNodeName) - && zkShardTerms.skipSendingUpdatesTo(coreNodeName)) { - if (log.isDebugEnabled()) { - log.debug("skip url:{} cause its term is less than leader", replica.getCoreUrl()); - } - skippedCoreNodeNames.add(replica.getName()); - } else if (!clusterState.getLiveNodes().contains(replica.getNodeName()) - || replica.getState() == Replica.State.DOWN) { - skippedCoreNodeNames.add(replica.getName()); - } else { - nodes.add( - new SolrCmdDistributor.StdNode( - new ZkCoreNodeProps(replica), collection, shardId, maxRetriesToFollowers)); - } - } - return nodes; - + return getReplicaNodesForLeader(shardId, leaderReplica, maxRetriesToFollowers); } else { // I need to forward on to the leader... forwardToLeader = true; @@ -883,8 +833,7 @@ private void checkReplicationTracker(UpdateCommand cmd) { } } - private List getCollectionUrls( - String collection, EnumSet types, boolean onlyLeaders) { + private List getCollectionUrls(String collection) { final DocCollection docCollection = clusterState.getCollectionOrNull(collection); if (docCollection == null || docCollection.getSlicesMap() == null) { throw new ZooKeeperException( @@ -894,24 +843,10 @@ private List getCollectionUrls( final List urls = new ArrayList<>(slices.size()); for (Map.Entry sliceEntry : slices.entrySet()) { Slice replicas = slices.get(sliceEntry.getKey()); - if (onlyLeaders) { - Replica replica = docCollection.getLeader(replicas.getName()); - if (replica != null) { - ZkCoreNodeProps nodeProps = new ZkCoreNodeProps(replica); - urls.add(new SolrCmdDistributor.StdNode(nodeProps, collection, replicas.getName())); - } - continue; - } - Map shardMap = replicas.getReplicasMap(); - - for (Map.Entry entry : shardMap.entrySet()) { - if (!types.contains(entry.getValue().getType())) { - continue; - } - ZkCoreNodeProps nodeProps = new ZkCoreNodeProps(entry.getValue()); - if (clusterState.liveNodesContain(nodeProps.getNodeName())) { - urls.add(new SolrCmdDistributor.StdNode(nodeProps, collection, replicas.getName())); - } + Replica replica = docCollection.getLeader(replicas.getName()); + if (replica != null) { + ZkCoreNodeProps nodeProps = new ZkCoreNodeProps(replica); + urls.add(new SolrCmdDistributor.StdNode(nodeProps, collection, replicas.getName())); } } if (urls.isEmpty()) { @@ -959,7 +894,7 @@ protected boolean amISubShardLeader( } protected List getReplicaNodesForLeader( - String shardId, Replica leaderReplica) { + String shardId, Replica leaderReplica, int maxRetries) { String leaderCoreNodeName = leaderReplica.getName(); List replicas = clusterState @@ -1000,7 +935,8 @@ protected List getReplicaNodesForLeader( skippedCoreNodeNames.add(replica.getName()); } else { nodes.add( - new SolrCmdDistributor.StdNode(new ZkCoreNodeProps(replica), collection, shardId)); + new SolrCmdDistributor.StdNode( + new ZkCoreNodeProps(replica), collection, shardId, maxRetries)); } } return nodes;