Skip to content

Commit

Permalink
To prove it works, passing around compression flag and associated com…
Browse files Browse the repository at this point in the history
…pressor

However, I think the compression choice COULD be a builder option when creating SolrZkClient, which would simplify the methods and avoid duplication.  Wanted to get POC up.
  • Loading branch information
epugh committed Mar 15, 2024
1 parent f6c7718 commit ee86574
Show file tree
Hide file tree
Showing 6 changed files with 248 additions and 24 deletions.
56 changes: 55 additions & 1 deletion solr/core/src/java/org/apache/solr/cli/ZkCpTool.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,23 @@

import java.io.PrintStream;
import java.lang.invoke.MethodHandles;
import java.lang.reflect.InvocationTargetException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.List;
import java.util.Locale;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.solr.client.solrj.impl.SolrZkClientTimeout;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.util.Compressor;
import org.apache.solr.common.util.StrUtils;
import org.apache.solr.common.util.ZLibCompressor;
import org.apache.solr.core.NodeConfig;
import org.apache.solr.core.SolrXmlConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -105,7 +115,51 @@ public void runImpl(CommandLine cli) throws Exception {
dstName = dstName.substring(5);
}
}
zkClient.zkTransfer(srcName, srcIsZk, dstName, dstIsZk, recurse);

int minStateByteLenForCompression = -1;
Compressor compressor = new ZLibCompressor();

if (dstIsZk) {
String solrHome = cli.getOptionValue("solr.home");
if (StrUtils.isNullOrEmpty(solrHome)) {
solrHome = System.getProperty("solr.home");
}

if (solrHome != null) {
try {
Path solrHomePath = Paths.get(solrHome);
Properties props = new Properties();
props.put(SolrXmlConfig.ZK_HOST, zkHost);
NodeConfig nodeConfig = NodeConfig.loadNodeConfig(solrHomePath, props);
minStateByteLenForCompression =
nodeConfig.getCloudConfig().getMinStateByteLenForCompression();
String stateCompressorClass = nodeConfig.getCloudConfig().getStateCompressorClass();
if (StrUtils.isNotNullOrEmpty(stateCompressorClass)) {
Class<? extends Compressor> compressionClass =
Class.forName(stateCompressorClass).asSubclass(Compressor.class);
compressor = compressionClass.getDeclaredConstructor().newInstance();
}
} catch (SolrException e) {
// Failed to load solr.xml
throw new IllegalStateException(
"Failed to load solr.xml from ZK or SolrHome, put/get operations on compressed data will use data as is. If you intention is to read and de-compress data or compress and write data, then solr.xml must be accessible.");
} catch (ClassNotFoundException
| NoSuchMethodException
| InstantiationException
| IllegalAccessException
| InvocationTargetException e) {
throw new IllegalStateException(
"Unable to find or instantiate compression class: " + e.getMessage());
}
}
}

// I *think* that we should have builder methods on SolrZkClient that sets
// minStateByteLenForCompression and the Compressor!
zkClient.zkTransfer(
srcName, srcIsZk, dstName, dstIsZk, recurse, minStateByteLenForCompression, compressor);
// ZkMaintenanceUtils.zkTransfer(zkClient, src, srcIsZk, dst, dstIsZk, recurse,
// minStateByteLenForCompression, compressor);
} catch (Exception e) {
log.error("Could not complete the zk operation for reason: ", e);
throw (e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -551,7 +551,13 @@ && zkStateReader().getClusterState().hasCollection(newCollection)) {
SolrZkClient zkClient = coreContainer.getZkController().getZkClient();
try {
zkClient.zkTransfer(
getConfigSetZkPath(mutableId), true, getConfigSetZkPath(configSet), true, true);
getConfigSetZkPath(mutableId),
true,
getConfigSetZkPath(configSet),
true,
true,
-1,
null);
} catch (KeeperException | InterruptedException e) {
throw new IOException(
"Failed to copy config set: " + mutableId, SolrZkClient.checkInterrupted(e));
Expand Down
34 changes: 32 additions & 2 deletions solr/core/src/test/org/apache/solr/cli/ZkSubcommandsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.cloud.AbstractDistribZkTestBase;
import org.apache.solr.cloud.AbstractZkTestCase;
import org.apache.solr.cloud.ZkCLI;
import org.apache.solr.cloud.ZkConfigSetService;
import org.apache.solr.cloud.ZkTestServer;
import org.apache.solr.common.cloud.ClusterProperties;
Expand Down Expand Up @@ -196,11 +195,27 @@ public void testPutCompressed() throws Exception {

// test re-put to existing
data = "my data deux";
localFile = File.createTempFile("state", ".json");
writer = new FileWriter(localFile, StandardCharsets.UTF_8);
writer.write(data);
writer.close();

dataBytes = data.getBytes(StandardCharsets.UTF_8);
expected =
random().nextBoolean()
? zLibCompressor.compressBytes(dataBytes)
: zLibCompressor.compressBytes(dataBytes, dataBytes.length / 10);
args2 =
new String[] {
"cp",
"-src",
localFile.getAbsolutePath(),
"-dst",
"zk:/state.json",
"-z",
zkServer.getZkAddress()
};
assertEquals(0, runTool(args2, tool));
// args = new String[] {"-zkhost", zkServer.getZkAddress(), "-cmd", "put", "/state.json", data};
// ZkCLI.main(args);
assertArrayEquals(zkClient.getZooKeeper().getData("/state.json", null, null), expected);
Expand Down Expand Up @@ -245,7 +260,21 @@ public void testPutFileCompressed() throws Exception {
"/state.json",
SOLR_HOME + File.separator + "solr-stress-new.xml"
};
ZkCLI.main(args);
// ZkCLI.main(args);

args =
new String[] {
"cp",
"-src",
SOLR_HOME + File.separator + "solr-stress-new.xml",
"-dst",
"zk:/state.json",
"-z",
zkServer.getZkAddress()
};

ZkCpTool tool = new ZkCpTool();
assertEquals(0, runTool(args, tool));

byte[] fromZk = zkClient.getZooKeeper().getData("/state.json", null, null);
Path locFile = Path.of(SOLR_HOME, "solr-stress-new.xml");
Expand Down Expand Up @@ -545,6 +574,7 @@ public void testGetFileNotExists() throws Exception {
assertEquals(1, runTool(args, tool));
}

@Test
public void testInvalidZKAddress() throws Exception {

String[] args = new String[] {"ls", "-path", "/", "-r", "true", "-z", "----------:33332"};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -580,16 +580,9 @@ public void makePath(String path, boolean failOnExists, boolean retryOnConnLoss)
makePath(path, null, CreateMode.PERSISTENT, null, failOnExists, retryOnConnLoss, 0);
}

public void makePath(String path, Path data, boolean failOnExists, boolean retryOnConnLoss)
public void makePath(String path, byte[] data, boolean failOnExists, boolean retryOnConnLoss)
throws IOException, KeeperException, InterruptedException {
makePath(
path,
Files.readAllBytes(data),
CreateMode.PERSISTENT,
null,
failOnExists,
retryOnConnLoss,
0);
makePath(path, data, CreateMode.PERSISTENT, null, failOnExists, retryOnConnLoss, 0);
}

public void makePath(String path, Path data, boolean retryOnConnLoss)
Expand Down Expand Up @@ -764,14 +757,25 @@ public Stat setData(String path, byte[] data, boolean retryOnConnLoss)
* Write file to ZooKeeper - default system encoding used.
*
* @param path path to upload file to e.g. /solr/conf/solrconfig.xml
* @param data a filepath to read data from
* @param source a filepath to read data from
*/
public Stat setData(String path, Path data, boolean retryOnConnLoss)
public Stat setData(
String path,
Path source,
boolean retryOnConnLoss,
int minStateByteLenForCompression,
Compressor compressor)
throws IOException, KeeperException, InterruptedException {
if (log.isDebugEnabled()) {
log.debug("Write to ZooKeeper: {} to {}", data.toAbsolutePath(), path);
log.debug("Write to ZooKeeper: {} to {}", source.toAbsolutePath(), path);
}
byte[] data = Files.readAllBytes(source);
if (shouldCompressData(data, path, minStateByteLenForCompression)) {
// state.json should be compressed before being put to ZK
data = compressor.compressBytes(data, data.length / 10);
}
return setData(path, Files.readAllBytes(data), retryOnConnLoss);

return setData(path, data, retryOnConnLoss);
}

public List<OpResult> multi(final Iterable<Op> ops, boolean retryOnConnLoss)
Expand Down Expand Up @@ -1029,9 +1033,17 @@ public void downConfig(String confName, Path confPath) throws IOException {
this, ZkMaintenanceUtils.CONFIGS_ZKNODE + "/" + confName, confPath);
}

public void zkTransfer(String src, Boolean srcIsZk, String dst, Boolean dstIsZk, Boolean recurse)
public void zkTransfer(
String src,
Boolean srcIsZk,
String dst,
Boolean dstIsZk,
Boolean recurse,
int minStateByteLenForCompression,
Compressor compressor)
throws SolrServerException, KeeperException, InterruptedException, IOException {
ZkMaintenanceUtils.zkTransfer(this, src, srcIsZk, dst, dstIsZk, recurse);
ZkMaintenanceUtils.zkTransfer(
this, src, srcIsZk, dst, dstIsZk, recurse, minStateByteLenForCompression, compressor);
}

public void moveZnode(String src, String dst)
Expand Down Expand Up @@ -1238,4 +1250,14 @@ public SolrZkClient build() {
return new SolrZkClient(this);
}
}

static boolean shouldCompressData(byte[] data, String path, int minStateByteLenForCompression) {
if (path.endsWith("state.json")
&& minStateByteLenForCompression > -1
&& data.length > minStateByteLenForCompression) {
// state.json should be compressed before being put to ZK
return true;
}
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.function.Predicate;
import java.util.regex.Pattern;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.common.util.Compressor;
import org.apache.solr.common.util.StrUtils;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
Expand Down Expand Up @@ -130,7 +131,9 @@ public static void zkTransfer(
Boolean srcIsZk,
String dst,
Boolean dstIsZk,
Boolean recurse)
Boolean recurse,
int minStateByteLenForCompression,
Compressor compressor)
throws SolrServerException, KeeperException, InterruptedException, IOException {

if (srcIsZk == false && dstIsZk == false) {
Expand Down Expand Up @@ -165,7 +168,8 @@ public static void zkTransfer(

// local -> ZK copy
if (dstIsZk) {
uploadToZK(zkClient, Paths.get(src), dst, null);
uploadToZKWithCompression(
zkClient, Paths.get(src), dst, null, minStateByteLenForCompression, compressor);
return;
}

Expand Down Expand Up @@ -354,10 +358,10 @@ public FileVisitResult visitFile(Path file, BasicFileAttributes attrs)
// if the path exists (and presumably we're uploading data to it) just set its data
if (file.toFile().getName().equals(ZKNODE_DATA_FILE)
&& zkClient.exists(zkNode, true)) {
zkClient.setData(zkNode, file, true);
zkClient.setData(zkNode, file, true, -1, null);
} else if (file == rootPath) {
// We are only uploading a single file, preVisitDirectory was never called
zkClient.makePath(zkNode, file, false, true);
zkClient.makePath(zkNode, Files.readAllBytes(file), false, true);
} else {
// Skip path parts here because they should have been created during
// preVisitDirectory
Expand Down Expand Up @@ -409,6 +413,114 @@ public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs)
});
}

// If zkClient has a builder and knows about compression then we don't need this method.
public static void uploadToZKWithCompression(
SolrZkClient zkClient,
final Path fromPath,
final String zkPath,
final Pattern filenameExclusions,
int minStateByteLenForCompression,
Compressor compressor)
throws IOException {

String path = fromPath.toString();
if (path.endsWith("*")) {
path = path.substring(0, path.length() - 1);
}

final Path rootPath = Paths.get(path);

if (!Files.exists(rootPath)) throw new IOException("Path " + rootPath + " does not exist");

int partsOffset =
Path.of(zkPath).getNameCount() - rootPath.getNameCount() - 1; // will be negative
Files.walkFileTree(
rootPath,
new SimpleFileVisitor<Path>() {
@Override
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs)
throws IOException {
String filename = file.getFileName().toString();
if ((filenameExclusions != null && filenameExclusions.matcher(filename).matches())) {
log.info(
"uploadToZK skipping '{}' due to filenameExclusions '{}'",
filename,
filenameExclusions);
return FileVisitResult.CONTINUE;
}
if (isFileForbiddenInConfigSets(filename)) {
log.info(
"uploadToZK skipping '{}' due to forbidden file types '{}'",
filename,
USE_FORBIDDEN_FILE_TYPES);
return FileVisitResult.CONTINUE;
}
// TODO: Cannot check MAGIC header for file since FileTypeGuesser is in core
String zkNode = createZkNodeName(zkPath, rootPath, file);
try {
// if the path exists (and presumably we're uploading data to it) just set its data
if (file.toFile().getName().equals(ZKNODE_DATA_FILE)
&& zkClient.exists(zkNode, true)) {
zkClient.setData(zkNode, file, true, minStateByteLenForCompression, compressor);
} else if (file == rootPath) {
// We are only uploading a single file, preVisitDirectory was never called
byte[] data = Files.readAllBytes(file);
if (SolrZkClient.shouldCompressData(data, zkNode, minStateByteLenForCompression)) {
// state.json should be compressed before being put to ZK
data = compressor.compressBytes(data, data.length / 10);
}
zkClient.makePath(zkNode, data, false, true);
} else {

byte[] data = Files.readAllBytes(file);
if (SolrZkClient.shouldCompressData(data, zkNode, minStateByteLenForCompression)) {
// state.json should be compressed before being put to ZK
data = compressor.compressBytes(data, data.length / 10);
}
// Skip path parts here because they should have been created during
// preVisitDirectory
int pathParts = file.getNameCount() + partsOffset;
zkClient.makePath(
zkNode, data, CreateMode.PERSISTENT, null, false, true, pathParts);
}
} catch (KeeperException | InterruptedException e) {
throw new IOException(
"Error uploading file " + file + " to zookeeper path " + zkNode,
SolrZkClient.checkInterrupted(e));
}
return FileVisitResult.CONTINUE;
}

@Override
public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs)
throws IOException {
if (dir.getFileName().toString().startsWith(".")) return FileVisitResult.SKIP_SUBTREE;

String zkNode = createZkNodeName(zkPath, rootPath, dir);
try {
if (dir.equals(rootPath)) {
// Make sure the root path exists, including potential parents
zkClient.makePath(zkNode, true);
} else {
// Skip path parts here because they should have been created during previous visits
int pathParts = dir.getNameCount() + partsOffset;
zkClient.makePath(zkNode, null, CreateMode.PERSISTENT, null, true, true, pathParts);
}
} catch (KeeperException.NodeExistsException ignored) {
// Using fail-on-exists == false has side effect of makePath attempting to setData on
// the leaf of the path
// We prefer that if the parent directory already exists, we do not modify it
// Particularly relevant for marking config sets as trusted
} catch (KeeperException | InterruptedException e) {
throw new IOException(
"Error creating intermediate directory " + dir, SolrZkClient.checkInterrupted(e));
}

return FileVisitResult.CONTINUE;
}
});
}

private static boolean isEphemeral(SolrZkClient zkClient, String zkPath)
throws KeeperException, InterruptedException {
Stat znodeStat = zkClient.exists(zkPath, null, true);
Expand Down
Loading

0 comments on commit ee86574

Please sign in to comment.