From ee8657438799c07baadd8efa0f272bb57380d772 Mon Sep 17 00:00:00 2001 From: Eric Pugh Date: Fri, 15 Mar 2024 09:02:34 -0400 Subject: [PATCH] To prove it works, passing around compression flag and associated compressor However, I think the compression choice COULD be a builder option when creating SolrZkClient, which would simplify the methods and avoid duplication. Wanted to get POC up. --- .../java/org/apache/solr/cli/ZkCpTool.java | 56 +++++++- .../handler/designer/SchemaDesignerAPI.java | 8 +- .../apache/solr/cli/ZkSubcommandsTest.java | 34 ++++- .../solr/common/cloud/SolrZkClient.java | 52 +++++--- .../solr/common/cloud/ZkMaintenanceUtils.java | 120 +++++++++++++++++- .../org/apache/solr/cloud/ZkTestServer.java | 2 +- 6 files changed, 248 insertions(+), 24 deletions(-) diff --git a/solr/core/src/java/org/apache/solr/cli/ZkCpTool.java b/solr/core/src/java/org/apache/solr/cli/ZkCpTool.java index 02379d5748a..c3808820e78 100644 --- a/solr/core/src/java/org/apache/solr/cli/ZkCpTool.java +++ b/solr/core/src/java/org/apache/solr/cli/ZkCpTool.java @@ -18,13 +18,23 @@ import java.io.PrintStream; import java.lang.invoke.MethodHandles; +import java.lang.reflect.InvocationTargetException; +import java.nio.file.Path; +import java.nio.file.Paths; import java.util.List; import java.util.Locale; +import java.util.Properties; import java.util.concurrent.TimeUnit; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.Option; import org.apache.solr.client.solrj.impl.SolrZkClientTimeout; +import org.apache.solr.common.SolrException; import org.apache.solr.common.cloud.SolrZkClient; +import org.apache.solr.common.util.Compressor; +import org.apache.solr.common.util.StrUtils; +import org.apache.solr.common.util.ZLibCompressor; +import org.apache.solr.core.NodeConfig; +import org.apache.solr.core.SolrXmlConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -105,7 +115,51 @@ public void runImpl(CommandLine cli) throws Exception { dstName = dstName.substring(5); } } - zkClient.zkTransfer(srcName, srcIsZk, dstName, dstIsZk, recurse); + + int minStateByteLenForCompression = -1; + Compressor compressor = new ZLibCompressor(); + + if (dstIsZk) { + String solrHome = cli.getOptionValue("solr.home"); + if (StrUtils.isNullOrEmpty(solrHome)) { + solrHome = System.getProperty("solr.home"); + } + + if (solrHome != null) { + try { + Path solrHomePath = Paths.get(solrHome); + Properties props = new Properties(); + props.put(SolrXmlConfig.ZK_HOST, zkHost); + NodeConfig nodeConfig = NodeConfig.loadNodeConfig(solrHomePath, props); + minStateByteLenForCompression = + nodeConfig.getCloudConfig().getMinStateByteLenForCompression(); + String stateCompressorClass = nodeConfig.getCloudConfig().getStateCompressorClass(); + if (StrUtils.isNotNullOrEmpty(stateCompressorClass)) { + Class compressionClass = + Class.forName(stateCompressorClass).asSubclass(Compressor.class); + compressor = compressionClass.getDeclaredConstructor().newInstance(); + } + } catch (SolrException e) { + // Failed to load solr.xml + throw new IllegalStateException( + "Failed to load solr.xml from ZK or SolrHome, put/get operations on compressed data will use data as is. If you intention is to read and de-compress data or compress and write data, then solr.xml must be accessible."); + } catch (ClassNotFoundException + | NoSuchMethodException + | InstantiationException + | IllegalAccessException + | InvocationTargetException e) { + throw new IllegalStateException( + "Unable to find or instantiate compression class: " + e.getMessage()); + } + } + } + + // I *think* that we should have builder methods on SolrZkClient that sets + // minStateByteLenForCompression and the Compressor! + zkClient.zkTransfer( + srcName, srcIsZk, dstName, dstIsZk, recurse, minStateByteLenForCompression, compressor); + // ZkMaintenanceUtils.zkTransfer(zkClient, src, srcIsZk, dst, dstIsZk, recurse, + // minStateByteLenForCompression, compressor); } catch (Exception e) { log.error("Could not complete the zk operation for reason: ", e); throw (e); diff --git a/solr/core/src/java/org/apache/solr/handler/designer/SchemaDesignerAPI.java b/solr/core/src/java/org/apache/solr/handler/designer/SchemaDesignerAPI.java index a8a4b5091d4..227c33fa345 100644 --- a/solr/core/src/java/org/apache/solr/handler/designer/SchemaDesignerAPI.java +++ b/solr/core/src/java/org/apache/solr/handler/designer/SchemaDesignerAPI.java @@ -551,7 +551,13 @@ && zkStateReader().getClusterState().hasCollection(newCollection)) { SolrZkClient zkClient = coreContainer.getZkController().getZkClient(); try { zkClient.zkTransfer( - getConfigSetZkPath(mutableId), true, getConfigSetZkPath(configSet), true, true); + getConfigSetZkPath(mutableId), + true, + getConfigSetZkPath(configSet), + true, + true, + -1, + null); } catch (KeeperException | InterruptedException e) { throw new IOException( "Failed to copy config set: " + mutableId, SolrZkClient.checkInterrupted(e)); diff --git a/solr/core/src/test/org/apache/solr/cli/ZkSubcommandsTest.java b/solr/core/src/test/org/apache/solr/cli/ZkSubcommandsTest.java index 3ca35f9953f..a6f84258486 100644 --- a/solr/core/src/test/org/apache/solr/cli/ZkSubcommandsTest.java +++ b/solr/core/src/test/org/apache/solr/cli/ZkSubcommandsTest.java @@ -37,7 +37,6 @@ import org.apache.solr.SolrTestCaseJ4; import org.apache.solr.cloud.AbstractDistribZkTestBase; import org.apache.solr.cloud.AbstractZkTestCase; -import org.apache.solr.cloud.ZkCLI; import org.apache.solr.cloud.ZkConfigSetService; import org.apache.solr.cloud.ZkTestServer; import org.apache.solr.common.cloud.ClusterProperties; @@ -196,11 +195,27 @@ public void testPutCompressed() throws Exception { // test re-put to existing data = "my data deux"; + localFile = File.createTempFile("state", ".json"); + writer = new FileWriter(localFile, StandardCharsets.UTF_8); + writer.write(data); + writer.close(); + dataBytes = data.getBytes(StandardCharsets.UTF_8); expected = random().nextBoolean() ? zLibCompressor.compressBytes(dataBytes) : zLibCompressor.compressBytes(dataBytes, dataBytes.length / 10); + args2 = + new String[] { + "cp", + "-src", + localFile.getAbsolutePath(), + "-dst", + "zk:/state.json", + "-z", + zkServer.getZkAddress() + }; + assertEquals(0, runTool(args2, tool)); // args = new String[] {"-zkhost", zkServer.getZkAddress(), "-cmd", "put", "/state.json", data}; // ZkCLI.main(args); assertArrayEquals(zkClient.getZooKeeper().getData("/state.json", null, null), expected); @@ -245,7 +260,21 @@ public void testPutFileCompressed() throws Exception { "/state.json", SOLR_HOME + File.separator + "solr-stress-new.xml" }; - ZkCLI.main(args); + // ZkCLI.main(args); + + args = + new String[] { + "cp", + "-src", + SOLR_HOME + File.separator + "solr-stress-new.xml", + "-dst", + "zk:/state.json", + "-z", + zkServer.getZkAddress() + }; + + ZkCpTool tool = new ZkCpTool(); + assertEquals(0, runTool(args, tool)); byte[] fromZk = zkClient.getZooKeeper().getData("/state.json", null, null); Path locFile = Path.of(SOLR_HOME, "solr-stress-new.xml"); @@ -545,6 +574,7 @@ public void testGetFileNotExists() throws Exception { assertEquals(1, runTool(args, tool)); } + @Test public void testInvalidZKAddress() throws Exception { String[] args = new String[] {"ls", "-path", "/", "-r", "true", "-z", "----------:33332"}; diff --git a/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/SolrZkClient.java b/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/SolrZkClient.java index 25b16e18fc4..88a30947c6d 100644 --- a/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/SolrZkClient.java +++ b/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/SolrZkClient.java @@ -580,16 +580,9 @@ public void makePath(String path, boolean failOnExists, boolean retryOnConnLoss) makePath(path, null, CreateMode.PERSISTENT, null, failOnExists, retryOnConnLoss, 0); } - public void makePath(String path, Path data, boolean failOnExists, boolean retryOnConnLoss) + public void makePath(String path, byte[] data, boolean failOnExists, boolean retryOnConnLoss) throws IOException, KeeperException, InterruptedException { - makePath( - path, - Files.readAllBytes(data), - CreateMode.PERSISTENT, - null, - failOnExists, - retryOnConnLoss, - 0); + makePath(path, data, CreateMode.PERSISTENT, null, failOnExists, retryOnConnLoss, 0); } public void makePath(String path, Path data, boolean retryOnConnLoss) @@ -764,14 +757,25 @@ public Stat setData(String path, byte[] data, boolean retryOnConnLoss) * Write file to ZooKeeper - default system encoding used. * * @param path path to upload file to e.g. /solr/conf/solrconfig.xml - * @param data a filepath to read data from + * @param source a filepath to read data from */ - public Stat setData(String path, Path data, boolean retryOnConnLoss) + public Stat setData( + String path, + Path source, + boolean retryOnConnLoss, + int minStateByteLenForCompression, + Compressor compressor) throws IOException, KeeperException, InterruptedException { if (log.isDebugEnabled()) { - log.debug("Write to ZooKeeper: {} to {}", data.toAbsolutePath(), path); + log.debug("Write to ZooKeeper: {} to {}", source.toAbsolutePath(), path); + } + byte[] data = Files.readAllBytes(source); + if (shouldCompressData(data, path, minStateByteLenForCompression)) { + // state.json should be compressed before being put to ZK + data = compressor.compressBytes(data, data.length / 10); } - return setData(path, Files.readAllBytes(data), retryOnConnLoss); + + return setData(path, data, retryOnConnLoss); } public List multi(final Iterable ops, boolean retryOnConnLoss) @@ -1029,9 +1033,17 @@ public void downConfig(String confName, Path confPath) throws IOException { this, ZkMaintenanceUtils.CONFIGS_ZKNODE + "/" + confName, confPath); } - public void zkTransfer(String src, Boolean srcIsZk, String dst, Boolean dstIsZk, Boolean recurse) + public void zkTransfer( + String src, + Boolean srcIsZk, + String dst, + Boolean dstIsZk, + Boolean recurse, + int minStateByteLenForCompression, + Compressor compressor) throws SolrServerException, KeeperException, InterruptedException, IOException { - ZkMaintenanceUtils.zkTransfer(this, src, srcIsZk, dst, dstIsZk, recurse); + ZkMaintenanceUtils.zkTransfer( + this, src, srcIsZk, dst, dstIsZk, recurse, minStateByteLenForCompression, compressor); } public void moveZnode(String src, String dst) @@ -1238,4 +1250,14 @@ public SolrZkClient build() { return new SolrZkClient(this); } } + + static boolean shouldCompressData(byte[] data, String path, int minStateByteLenForCompression) { + if (path.endsWith("state.json") + && minStateByteLenForCompression > -1 + && data.length > minStateByteLenForCompression) { + // state.json should be compressed before being put to ZK + return true; + } + return false; + } } diff --git a/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/ZkMaintenanceUtils.java b/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/ZkMaintenanceUtils.java index e40294a6683..c88e374384a 100644 --- a/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/ZkMaintenanceUtils.java +++ b/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/ZkMaintenanceUtils.java @@ -34,6 +34,7 @@ import java.util.function.Predicate; import java.util.regex.Pattern; import org.apache.solr.client.solrj.SolrServerException; +import org.apache.solr.common.util.Compressor; import org.apache.solr.common.util.StrUtils; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; @@ -130,7 +131,9 @@ public static void zkTransfer( Boolean srcIsZk, String dst, Boolean dstIsZk, - Boolean recurse) + Boolean recurse, + int minStateByteLenForCompression, + Compressor compressor) throws SolrServerException, KeeperException, InterruptedException, IOException { if (srcIsZk == false && dstIsZk == false) { @@ -165,7 +168,8 @@ public static void zkTransfer( // local -> ZK copy if (dstIsZk) { - uploadToZK(zkClient, Paths.get(src), dst, null); + uploadToZKWithCompression( + zkClient, Paths.get(src), dst, null, minStateByteLenForCompression, compressor); return; } @@ -354,10 +358,10 @@ public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) // if the path exists (and presumably we're uploading data to it) just set its data if (file.toFile().getName().equals(ZKNODE_DATA_FILE) && zkClient.exists(zkNode, true)) { - zkClient.setData(zkNode, file, true); + zkClient.setData(zkNode, file, true, -1, null); } else if (file == rootPath) { // We are only uploading a single file, preVisitDirectory was never called - zkClient.makePath(zkNode, file, false, true); + zkClient.makePath(zkNode, Files.readAllBytes(file), false, true); } else { // Skip path parts here because they should have been created during // preVisitDirectory @@ -409,6 +413,114 @@ public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) }); } + // If zkClient has a builder and knows about compression then we don't need this method. + public static void uploadToZKWithCompression( + SolrZkClient zkClient, + final Path fromPath, + final String zkPath, + final Pattern filenameExclusions, + int minStateByteLenForCompression, + Compressor compressor) + throws IOException { + + String path = fromPath.toString(); + if (path.endsWith("*")) { + path = path.substring(0, path.length() - 1); + } + + final Path rootPath = Paths.get(path); + + if (!Files.exists(rootPath)) throw new IOException("Path " + rootPath + " does not exist"); + + int partsOffset = + Path.of(zkPath).getNameCount() - rootPath.getNameCount() - 1; // will be negative + Files.walkFileTree( + rootPath, + new SimpleFileVisitor() { + @Override + public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) + throws IOException { + String filename = file.getFileName().toString(); + if ((filenameExclusions != null && filenameExclusions.matcher(filename).matches())) { + log.info( + "uploadToZK skipping '{}' due to filenameExclusions '{}'", + filename, + filenameExclusions); + return FileVisitResult.CONTINUE; + } + if (isFileForbiddenInConfigSets(filename)) { + log.info( + "uploadToZK skipping '{}' due to forbidden file types '{}'", + filename, + USE_FORBIDDEN_FILE_TYPES); + return FileVisitResult.CONTINUE; + } + // TODO: Cannot check MAGIC header for file since FileTypeGuesser is in core + String zkNode = createZkNodeName(zkPath, rootPath, file); + try { + // if the path exists (and presumably we're uploading data to it) just set its data + if (file.toFile().getName().equals(ZKNODE_DATA_FILE) + && zkClient.exists(zkNode, true)) { + zkClient.setData(zkNode, file, true, minStateByteLenForCompression, compressor); + } else if (file == rootPath) { + // We are only uploading a single file, preVisitDirectory was never called + byte[] data = Files.readAllBytes(file); + if (SolrZkClient.shouldCompressData(data, zkNode, minStateByteLenForCompression)) { + // state.json should be compressed before being put to ZK + data = compressor.compressBytes(data, data.length / 10); + } + zkClient.makePath(zkNode, data, false, true); + } else { + + byte[] data = Files.readAllBytes(file); + if (SolrZkClient.shouldCompressData(data, zkNode, minStateByteLenForCompression)) { + // state.json should be compressed before being put to ZK + data = compressor.compressBytes(data, data.length / 10); + } + // Skip path parts here because they should have been created during + // preVisitDirectory + int pathParts = file.getNameCount() + partsOffset; + zkClient.makePath( + zkNode, data, CreateMode.PERSISTENT, null, false, true, pathParts); + } + } catch (KeeperException | InterruptedException e) { + throw new IOException( + "Error uploading file " + file + " to zookeeper path " + zkNode, + SolrZkClient.checkInterrupted(e)); + } + return FileVisitResult.CONTINUE; + } + + @Override + public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) + throws IOException { + if (dir.getFileName().toString().startsWith(".")) return FileVisitResult.SKIP_SUBTREE; + + String zkNode = createZkNodeName(zkPath, rootPath, dir); + try { + if (dir.equals(rootPath)) { + // Make sure the root path exists, including potential parents + zkClient.makePath(zkNode, true); + } else { + // Skip path parts here because they should have been created during previous visits + int pathParts = dir.getNameCount() + partsOffset; + zkClient.makePath(zkNode, null, CreateMode.PERSISTENT, null, true, true, pathParts); + } + } catch (KeeperException.NodeExistsException ignored) { + // Using fail-on-exists == false has side effect of makePath attempting to setData on + // the leaf of the path + // We prefer that if the parent directory already exists, we do not modify it + // Particularly relevant for marking config sets as trusted + } catch (KeeperException | InterruptedException e) { + throw new IOException( + "Error creating intermediate directory " + dir, SolrZkClient.checkInterrupted(e)); + } + + return FileVisitResult.CONTINUE; + } + }); + } + private static boolean isEphemeral(SolrZkClient zkClient, String zkPath) throws KeeperException, InterruptedException { Stat znodeStat = zkClient.exists(zkPath, null, true); diff --git a/solr/test-framework/src/java/org/apache/solr/cloud/ZkTestServer.java b/solr/test-framework/src/java/org/apache/solr/cloud/ZkTestServer.java index 4a2f97475d0..14d2a5a1f86 100644 --- a/solr/test-framework/src/java/org/apache/solr/cloud/ZkTestServer.java +++ b/solr/test-framework/src/java/org/apache/solr/cloud/ZkTestServer.java @@ -786,7 +786,7 @@ public static void putConfig( if (log.isInfoEnabled()) { log.info("put {} to {}", file.toAbsolutePath(), destPath); } - zkClient.makePath(destPath, file, false, true); + zkClient.makePath(destPath, Files.readAllBytes(file), false, true); } // static to share with distrib test