diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index 874adfcc4d7..93e737a543d 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -12,6 +12,8 @@ New Features * SOLR-17467: Solr CLI bin/solr start defaults to starting Solr in Cloud mode, use --user-managed switch for User Managed (aka Standalone) mode. (Eric Pugh) +* SOLR-14673: Solr CLI now has bin/solr stream tool that executates streaming expressions via command line, either locally or on solr cluster. (Eric Pugh) + Improvements --------------------- diff --git a/solr/bin/solr.cmd b/solr/bin/solr.cmd index 5fd6ec44aea..9f875926517 100755 --- a/solr/bin/solr.cmd +++ b/solr/bin/solr.cmd @@ -1175,9 +1175,9 @@ for %%a in (%*) do ( ) else ( set "option!option!=%%a" if "!option!" equ "-s" set "SOLR_HOME=%%a" - if "!option!" equ "--solr-home" set "SOLR_HOME=%%a" + if "!option!" equ "--solr-home" set "SOLR_HOME=%%a" if "!option!" equ "-d" set "SOLR_SERVER_DIR=%%a" - if "!option!" equ "--server-dir" set "SOLR_SERVER_DIR=%%a" + if "!option!" equ "--server-dir" set "SOLR_SERVER_DIR=%%a" if not "!option!" equ "-s" if not "!option!" equ "--solr-home" if not "!option!" equ "-d" if not "!option!" equ "--server-dir" ( set "AUTH_PARAMS=!AUTH_PARAMS! !option! %%a" ) diff --git a/solr/core/src/java/org/apache/solr/cli/SolrCLI.java b/solr/core/src/java/org/apache/solr/cli/SolrCLI.java index 00a97b4434c..4714c43c99c 100755 --- a/solr/core/src/java/org/apache/solr/cli/SolrCLI.java +++ b/solr/core/src/java/org/apache/solr/cli/SolrCLI.java @@ -246,6 +246,7 @@ private static Tool newTool(String toolType) throws Exception { else if ("post".equals(toolType)) return new PostTool(); else if ("postlogs".equals(toolType)) return new PostLogsTool(); else if ("version".equals(toolType)) return new VersionTool(); + else if ("stream".equals(toolType)) return new StreamTool(); else if ("snapshot-create".equals(toolType)) return new SnapshotCreateTool(); else if ("snapshot-delete".equals(toolType)) return new SnapshotDeleteTool(); else if ("snapshot-list".equals(toolType)) return new SnapshotListTool(); @@ -511,8 +512,7 @@ private static void printHelp() { print("Usage: solr COMMAND OPTIONS"); print(" where COMMAND is one of: start, stop, restart, status, "); print( - " healthcheck, create, delete, auth, assert, config, export, api, package, post, "); - + " healthcheck, create, delete, auth, assert, config, export, api, package, post, stream,"); print( " zk ls, zk cp, zk rm , zk mv, zk mkroot, zk upconfig, zk downconfig,"); print( diff --git a/solr/core/src/java/org/apache/solr/cli/StreamTool.java b/solr/core/src/java/org/apache/solr/cli/StreamTool.java new file mode 100644 index 00000000000..9c0392ec71b --- /dev/null +++ b/solr/core/src/java/org/apache/solr/cli/StreamTool.java @@ -0,0 +1,531 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.solr.cli; + +import java.io.BufferedReader; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.LineNumberReader; +import java.io.PrintStream; +import java.io.Reader; +import java.io.StringReader; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Locale; +import java.util.Set; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; +import org.apache.solr.client.solrj.io.Lang; +import org.apache.solr.client.solrj.io.SolrClientCache; +import org.apache.solr.client.solrj.io.Tuple; +import org.apache.solr.client.solrj.io.comp.StreamComparator; +import org.apache.solr.client.solrj.io.stream.PushBackStream; +import org.apache.solr.client.solrj.io.stream.SolrStream; +import org.apache.solr.client.solrj.io.stream.StreamContext; +import org.apache.solr.client.solrj.io.stream.TupleStream; +import org.apache.solr.client.solrj.io.stream.expr.Explanation; +import org.apache.solr.client.solrj.io.stream.expr.Expressible; +import org.apache.solr.client.solrj.io.stream.expr.StreamExpression; +import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParser; +import org.apache.solr.client.solrj.io.stream.expr.StreamFactory; +import org.apache.solr.common.SolrException; +import org.apache.solr.common.params.ModifiableSolrParams; +import org.apache.solr.handler.CatStream; + +/** Supports stream command in the bin/solr script. */ +public class StreamTool extends ToolBase { + + public StreamTool() { + this(CLIO.getOutStream()); + } + + public StreamTool(PrintStream stdout) { + super(stdout); + } + + private final SolrClientCache solrClientCache = new SolrClientCache(); + + @Override + public String getName() { + return "stream"; + } + + @Override + public String getUsage() { + // Specify that the last argument is the streaming expression + return "bin/solr stream [--array-delimiter ] [-c ] [--delimiter ] [-e ] [-f\n" + + " ] [-h] [--header] [-s ] [-u ] [-v] [-z ] \n"; + } + + private static final Option EXECUTION_OPTION = + Option.builder("e") + .longOpt("execution") + .hasArg() + .argName("ENVIRONMENT") + .desc( + "Execution environment is either 'local' (i.e CLI process) or via a 'remote' Solr server. Default environment is 'remote'.") + .build(); + + private static final Option COLLECTION_OPTION = + Option.builder("c") + .longOpt("name") + .argName("NAME") + .hasArg() + .desc( + "Name of the specific collection to execute expression on if the execution is set to 'remote'. Required for 'remote' execution environment.") + .build(); + + private static final Option FIELDS_OPTION = + Option.builder("f") + .longOpt("fields") + .argName("FIELDS") + .hasArg() + .desc( + "The fields in the tuples to output. Defaults to fields in the first tuple of result set.") + .build(); + + private static final Option HEADER_OPTION = + Option.builder().longOpt("header").desc("Specify to include a header line.").build(); + + private static final Option DELIMITER_OPTION = + Option.builder() + .longOpt("delimiter") + .argName("CHARACTER") + .hasArg() + .desc("The output delimiter. Default to using three spaces.") + .build(); + private static final Option ARRAY_DELIMITER_OPTION = + Option.builder() + .longOpt("array-delimiter") + .argName("CHARACTER") + .hasArg() + .desc("The delimiter multi-valued fields. Default to using a pipe (|) delimiter.") + .build(); + + @Override + public Options getOptions() { + + return super.getOptions() + .addOption(EXECUTION_OPTION) + .addOption(COLLECTION_OPTION) + .addOption(FIELDS_OPTION) + .addOption(HEADER_OPTION) + .addOption(DELIMITER_OPTION) + .addOption(ARRAY_DELIMITER_OPTION) + .addOption(CommonCLIOptions.CREDENTIALS_OPTION) + .addOptionGroup(getConnectionOptions()); + } + + @Override + @SuppressWarnings({"rawtypes"}) + public void runImpl(CommandLine cli) throws Exception { + + String expressionArgument = cli.getArgs()[0]; + String execution = cli.getOptionValue(EXECUTION_OPTION, "remote"); + String arrayDelimiter = cli.getOptionValue(ARRAY_DELIMITER_OPTION, "|"); + String delimiter = cli.getOptionValue(DELIMITER_OPTION, " "); + boolean includeHeaders = cli.hasOption(HEADER_OPTION); + String[] outputHeaders = getOutputFields(cli); + + LineNumberReader bufferedReader = null; + String expr; + try { + Reader inputStream = + expressionArgument.toLowerCase(Locale.ROOT).endsWith(".expr") + ? new InputStreamReader( + new FileInputStream(expressionArgument), Charset.defaultCharset()) + : new StringReader(expressionArgument); + + bufferedReader = new LineNumberReader(inputStream); + expr = StreamTool.readExpression(bufferedReader, cli.getArgs()); + echoIfVerbose("Running Expression: " + expr); + } finally { + if (bufferedReader != null) { + bufferedReader.close(); + } + } + + PushBackStream pushBackStream; + if (execution.equalsIgnoreCase("local")) { + pushBackStream = doLocalMode(cli, expr); + } else { + pushBackStream = doRemoteMode(cli, expr); + } + + try { + pushBackStream.open(); + + if (outputHeaders == null) { + + Tuple tuple = pushBackStream.read(); + + if (!tuple.EOF) { + outputHeaders = getHeadersFromFirstTuple(tuple); + } + + pushBackStream.pushBack(tuple); + } + + if (includeHeaders) { + StringBuilder headersOut = new StringBuilder(); + if (outputHeaders != null) { + for (int i = 0; i < outputHeaders.length; i++) { + if (i > 0) { + headersOut.append(delimiter); + } + headersOut.append(outputHeaders[i]); + } + } + CLIO.out(headersOut.toString()); + } + + while (true) { + Tuple tuple = pushBackStream.read(); + if (tuple.EOF) { + break; + } else { + StringBuilder outLine = new StringBuilder(); + if (outputHeaders != null) { + for (int i = 0; i < outputHeaders.length; i++) { + if (i > 0) { + outLine.append(delimiter); + } + + Object o = tuple.get(outputHeaders[i]); + if (o != null) { + if (o instanceof List) { + List outfields = (List) o; + outLine.append(listToString(outfields, arrayDelimiter)); + } else { + outLine.append(o); + } + } + } + } + CLIO.out(outLine.toString()); + } + } + } finally { + + if (pushBackStream != null) { + pushBackStream.close(); + } + + solrClientCache.close(); + } + + echoIfVerbose("StreamTool -- Done."); + } + + /** + * Runs a streaming expression in the local process of the CLI. + * + *

Running locally means that parallelization support or those expressions requiring access to + * internal Solr capabilities will not function. + * + * @param cli The CLI invoking the call + * @param expr The streaming expression to be parsed and in the context of the CLI process + * @return A connection to the streaming expression that receives Tuples as they are emitted + * locally. + */ + private PushBackStream doLocalMode(CommandLine cli, String expr) throws Exception { + String zkHost = SolrCLI.getZkHost(cli); + + echoIfVerbose("Connecting to ZooKeeper at " + zkHost); + solrClientCache.getCloudSolrClient(zkHost); + solrClientCache.setBasicAuthCredentials( + cli.getOptionValue(CommonCLIOptions.CREDENTIALS_OPTION)); + + TupleStream stream; + PushBackStream pushBackStream; + + StreamExpression streamExpression = StreamExpressionParser.parse(expr); + StreamFactory streamFactory = new StreamFactory(); + + // stdin is ONLY available in the local mode, not in the remote mode as it + // requires access to System.in + streamFactory.withFunctionName("stdin", StandardInStream.class); + + // LocalCatStream extends CatStream and disables the Solr cluster specific + // logic about where to read data from. + streamFactory.withFunctionName("cat", LocalCatStream.class); + + streamFactory.withDefaultZkHost(zkHost); + + Lang.register(streamFactory); + + stream = StreamTool.constructStream(streamFactory, streamExpression); + + pushBackStream = new PushBackStream(stream); + + // Now we can run the stream and return the results. + StreamContext streamContext = new StreamContext(); + streamContext.setSolrClientCache(solrClientCache); + + // Output the headers + pushBackStream.setStreamContext(streamContext); + + return pushBackStream; + } + + /** + * Runs a streaming expression on a Solr collection via the /stream end point and returns the + * results to the CLI. Requires a collection to be specified to send the expression to. + * + *

Running remotely allows you to use all the standard Streaming Expression capabilities as the + * expression is running in a Solr environment. + * + * @param cli The CLI invoking the call + * @param expr The streaming expression to be parsed and run remotely + * @return A connection to the streaming expression that receives Tuples as they are emitted from + * Solr /stream. + */ + private PushBackStream doRemoteMode(CommandLine cli, String expr) throws Exception { + + String solrUrl = SolrCLI.normalizeSolrUrl(cli); + if (!cli.hasOption("name")) { + throw new IllegalStateException( + "You must provide --name COLLECTION with --worker solr parameter."); + } + String collection = cli.getOptionValue("name"); + + if (expr.toLowerCase(Locale.ROOT).contains("stdin(")) { + throw new IllegalStateException( + "The stdin() expression is only usable with --worker local set up."); + } + + final SolrStream solrStream = + new SolrStream(solrUrl + "/solr/" + collection, params("qt", "/stream", "expr", expr)); + + String credentials = cli.getOptionValue(CommonCLIOptions.CREDENTIALS_OPTION); + if (credentials != null) { + String username = credentials.split(":")[0]; + String password = credentials.split(":")[1]; + solrStream.setCredentials(username, password); + } + return new PushBackStream(solrStream); + } + + private static ModifiableSolrParams params(String... params) { + if (params.length % 2 != 0) throw new RuntimeException("Params length should be even"); + ModifiableSolrParams msp = new ModifiableSolrParams(); + for (int i = 0; i < params.length; i += 2) { + msp.add(params[i], params[i + 1]); + } + return msp; + } + + public static class StandardInStream extends TupleStream implements Expressible { + + private BufferedReader reader; + private InputStream inputStream = System.in; + private boolean doClose = false; + + public StandardInStream() {} + + public StandardInStream(StreamExpression expression, StreamFactory factory) + throws IOException {} + + @Override + public List children() { + return null; + } + + public void setInputStream(InputStream inputStream) { + this.inputStream = inputStream; + this.doClose = true; + } + + @Override + public void open() { + reader = new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8)); + } + + @Override + public void close() throws IOException { + if (doClose) { + inputStream.close(); + } + } + + @SuppressWarnings({"unchecked", "rawtypes"}) + @Override + public Tuple read() throws IOException { + String line = reader.readLine(); + HashMap map = new HashMap(); + Tuple tuple = new Tuple(map); + if (line != null) { + tuple.put("line", line); + tuple.put("file", "cat"); + } else { + tuple.put("EOF", "true"); + } + return tuple; + } + + @Override + public void setStreamContext(StreamContext context) {} + + @Override + public StreamExpression toExpression(StreamFactory factory) { + return null; + } + + @Override + public Explanation toExplanation(StreamFactory factory) { + return null; + } + + @Override + public StreamComparator getStreamSort() { + return null; + } + } + + static String[] getOutputFields(CommandLine cli) { + if (cli.hasOption(FIELDS_OPTION)) { + + String fl = cli.getOptionValue(FIELDS_OPTION); + String[] flArray = fl.split(","); + String[] outputHeaders = new String[flArray.length]; + + for (int i = 0; i < outputHeaders.length; i++) { + outputHeaders[i] = flArray[i].trim(); + } + + return outputHeaders; + + } else { + return null; + } + } + + public static class LocalCatStream extends CatStream { + + public LocalCatStream(StreamExpression expression, StreamFactory factory) throws IOException { + super(expression, factory); + } + + public LocalCatStream(String commaDelimitedFilepaths, int maxLines) { + super(commaDelimitedFilepaths, maxLines); + } + + @Override + public void setStreamContext(StreamContext context) { + // LocalCatStream has no Solr core to pull from the context + } + + @Override + protected List validateAndSetFilepathsInSandbox(String commaDelimitedFilepaths) { + final List crawlSeeds = new ArrayList<>(); + for (String crawlRootStr : commaDelimitedFilepaths.split(",")) { + Path crawlRootPath = Paths.get(crawlRootStr).normalize(); + + if (!Files.exists(crawlRootPath)) { + throw new SolrException( + SolrException.ErrorCode.BAD_REQUEST, + "file/directory to stream doesn't exist: " + crawlRootStr); + } + + crawlSeeds.add(new CrawlFile(crawlRootStr, crawlRootPath)); + } + + return crawlSeeds; + } + } + + @SuppressWarnings({"rawtypes"}) + static String[] getHeadersFromFirstTuple(Tuple tuple) { + Set fields = tuple.getFields().keySet(); + String[] outputHeaders = new String[fields.size()]; + int i = -1; + for (Object o : fields) { + outputHeaders[++i] = o.toString(); + } + Arrays.sort(outputHeaders); + return outputHeaders; + } + + @SuppressWarnings({"rawtypes"}) + static String listToString(List values, String internalDelim) { + StringBuilder buf = new StringBuilder(); + for (Object value : values) { + if (buf.length() > 0) { + buf.append(internalDelim); + } + + buf.append(value.toString()); + } + + return buf.toString(); + } + + private static TupleStream constructStream( + StreamFactory streamFactory, StreamExpression streamExpression) throws IOException { + return streamFactory.constructStream(streamExpression); + } + + static String readExpression(LineNumberReader bufferedReader, String[] args) throws IOException { + + StringBuilder exprBuff = new StringBuilder(); + + boolean comment = false; + while (true) { + String line = bufferedReader.readLine(); + if (line == null) { + break; + } + + if (line.indexOf("/*") == 0) { + comment = true; + continue; + } + + if (line.indexOf("*/") == 0) { + comment = false; + continue; + } + + if (comment || line.startsWith("#") || line.startsWith("//")) { + continue; + } + + // Substitute parameters + + if (line.length() > 0) { + for (int i = 1; i < args.length; i++) { + String arg = args[i]; + line = line.replace("$" + i, arg); + } + } + + exprBuff.append(line); + } + + return exprBuff.toString(); + } +} diff --git a/solr/core/src/java/org/apache/solr/handler/CatStream.java b/solr/core/src/java/org/apache/solr/handler/CatStream.java index 70ee2b65242..f2515f9b38b 100644 --- a/solr/core/src/java/org/apache/solr/handler/CatStream.java +++ b/solr/core/src/java/org/apache/solr/handler/CatStream.java @@ -113,7 +113,8 @@ public List children() { @Override public void open() throws IOException { - final List initialCrawlSeeds = validateAndSetFilepathsInSandbox(); + final List initialCrawlSeeds = + validateAndSetFilepathsInSandbox(this.commaDelimitedFilepaths); final List filesToCrawl = new ArrayList<>(); for (CrawlFile crawlSeed : initialCrawlSeeds) { @@ -163,7 +164,7 @@ public Explanation toExplanation(StreamFactory factory) throws IOException { .withExpression(toExpression(factory).toString()); } - private List validateAndSetFilepathsInSandbox() { + protected List validateAndSetFilepathsInSandbox(String commaDelimitedFilepaths) { final List crawlSeeds = new ArrayList<>(); for (String crawlRootStr : commaDelimitedFilepaths.split(",")) { Path crawlRootPath = chroot.resolve(crawlRootStr).normalize(); diff --git a/solr/core/src/test/org/apache/solr/cli/StreamToolTest.java b/solr/core/src/test/org/apache/solr/cli/StreamToolTest.java new file mode 100644 index 00000000000..e91ab9e2d81 --- /dev/null +++ b/solr/core/src/test/org/apache/solr/cli/StreamToolTest.java @@ -0,0 +1,366 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.solr.cli; + +import static org.apache.solr.cli.SolrCLI.findTool; +import static org.apache.solr.cli.SolrCLI.parseCmdLine; + +import java.io.BufferedWriter; +import java.io.ByteArrayInputStream; +import java.io.File; +import java.io.FileWriter; +import java.io.LineNumberReader; +import java.io.PrintWriter; +import java.io.StringReader; +import java.io.StringWriter; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import org.apache.commons.cli.CommandLine; +import org.apache.solr.client.solrj.SolrRequest; +import org.apache.solr.client.solrj.SolrResponse; +import org.apache.solr.client.solrj.io.Tuple; +import org.apache.solr.client.solrj.request.CollectionAdminRequest; +import org.apache.solr.cloud.SolrCloudTestCase; +import org.apache.solr.util.SecurityJson; +import org.junit.BeforeClass; +import org.junit.Test; + +public class StreamToolTest extends SolrCloudTestCase { + + @BeforeClass + public static void setupClusterWithSecurityEnabled() throws Exception { + configureCluster(2).withSecurityJson(SecurityJson.SIMPLE).configure(); + } + + private > T withBasicAuth(T req) { + req.setBasicAuthCredentials(SecurityJson.USER, SecurityJson.PASS); + return req; + } + + @Test + @SuppressWarnings({"unchecked", "rawtypes"}) + public void testGetHeaderFromFirstTuple() { + Tuple tuple = new Tuple(new HashMap()); + tuple.put("field1", "blah"); + tuple.put("field2", "blah"); + tuple.put("field3", "blah"); + + String[] headers = StreamTool.getHeadersFromFirstTuple(tuple); + + assertEquals(headers.length, 3); + assertEquals(headers[0], "field1"); + assertEquals(headers[1], "field2"); + assertEquals(headers[2], "field3"); + } + + @Test + public void testGetOutputFields() { + String[] args = + new String[] { + "--fields", "field9, field2, field3, field4", + }; + StreamTool streamTool = new StreamTool(); + CommandLine cli = SolrCLI.processCommandLineArgs(streamTool, args); + String[] outputFields = StreamTool.getOutputFields(cli); + assert outputFields != null; + assertEquals(outputFields.length, 4); + assertEquals(outputFields[0], "field9"); + assertEquals(outputFields[1], "field2"); + assertEquals(outputFields[2], "field3"); + assertEquals(outputFields[3], "field4"); + } + + @Test + public void testReadExpression() throws Exception { + // This covers parameter substitution and expanded comments support. + + String[] args = {"file.expr", "one", "two", "three"}; + StringWriter stringWriter = new StringWriter(); + PrintWriter buf = new PrintWriter(stringWriter); + buf.println("/*"); + buf.println("Multi-line comment Comment..."); + buf.println("*/"); + buf.println("// Single line comment"); + buf.println("# Single line comment"); + buf.println("let(a=$1, b=$2,"); + buf.println("search($3))"); + buf.println(")"); + + String expr = stringWriter.toString(); + + LineNumberReader reader = new LineNumberReader(new StringReader(expr)); + String finalExpression = StreamTool.readExpression(reader, args); + // Strip the comment and insert the params in order. + assertEquals(finalExpression, "let(a=one, b=two,search(three)))"); + } + + @Test + public void testReadExpression2() throws Exception { + // This covers parameter substitution and expanded comments support. + + String[] args = {"file.expr", "id", "desc_s", "desc"}; + StringWriter stringWriter = new StringWriter(); + PrintWriter buf = new PrintWriter(stringWriter); + + buf.println("# Try me"); + buf.println("search(my_collection,q='*:*',fl='$1, $2',sort='id $3')"); + + String expr = stringWriter.toString(); + + LineNumberReader reader = new LineNumberReader(new StringReader(expr)); + String finalExpression = StreamTool.readExpression(reader, args); + // Strip the comment and insert the params in order. + assertEquals(finalExpression, "search(my_collection,q='*:*',fl='id, desc_s',sort='id desc')"); + } + + @Test + @SuppressWarnings({"unchecked", "rawtypes"}) + public void testReadStream() throws Exception { + StreamTool.StandardInStream inStream = new StreamTool.StandardInStream(); + List tuples = new ArrayList(); + try { + StringWriter stringWriter = new StringWriter(); + PrintWriter buf = new PrintWriter(stringWriter); + + buf.println("one two"); + buf.println("three four"); + buf.println("five six"); + + String expr = stringWriter.toString(); + ByteArrayInputStream inputStream = + new ByteArrayInputStream(expr.getBytes(Charset.defaultCharset())); + inStream.setInputStream(inputStream); + inStream.open(); + while (true) { + Tuple tuple = inStream.read(); + if (tuple.EOF) { + break; + } else { + tuples.add(tuple); + } + } + + } finally { + inStream.close(); + } + + assertEquals(tuples.size(), 3); + + String line1 = tuples.get(0).getString("line"); + String line2 = tuples.get(1).getString("line"); + String line3 = tuples.get(2).getString("line"); + + assertEquals("one two", line1); + assertEquals("three four", line2); + assertEquals("five six", line3); + } + + @Test + @SuppressWarnings({"unchecked", "rawtypes"}) + public void testLocalCatStream() throws Exception { + File localFile = File.createTempFile("topLevel1", ".txt"); + populateFileWithData(localFile.toPath()); + + StreamTool.LocalCatStream catStream = + new StreamTool.LocalCatStream(localFile.getAbsolutePath(), -1); + List tuples = new ArrayList(); + try { + catStream.open(); + while (true) { + Tuple tuple = catStream.read(); + if (tuple.EOF) { + break; + } else { + tuples.add(tuple); + } + } + + } finally { + catStream.close(); + } + + assertEquals(4, tuples.size()); + + for (int i = 0; i < 4; i++) { + Tuple t = tuples.get(i); + assertEquals(localFile.getName() + " line " + (i + 1), t.get("line")); + assertEquals(localFile.getAbsolutePath(), t.get("file")); + } + } + + @Test + @SuppressWarnings({"unchecked", "rawtypes"}) + public void testListToString() { + List stuff = new ArrayList(); + stuff.add("test1"); + stuff.add(3); + stuff.add(111.32322); + stuff.add("test3"); + String s = StreamTool.listToString(stuff, "|"); + assertEquals("test1|3|111.32322|test3", s); + } + + @Test + public void testStdInFailsWithRemoteWorker() throws Exception { + String expression = "echo(stdin())"; + + String[] args = + new String[] { + "stream", + "-e", + "remote", + "--name", + "fakeCollection", + "--verbose", + "--zk-host", + cluster.getZkClient().getZkServerAddress(), + expression + }; + assertEquals(1, runTool(args)); + } + + @Test + public void testStdInSucceedsWithLocalWorker() throws Exception { + String expression = "echo(stdin())"; + + String[] args = + new String[] { + "stream", + "-e", + "local", + "-v", + "-z", + cluster.getZkClient().getZkServerAddress(), + expression + }; + assertEquals(0, runTool(args)); + } + + @Test + public void testRunEchoStreamLocally() throws Exception { + + String expression = "echo(Hello)"; + File expressionFile = File.createTempFile("expression", ".EXPR"); + FileWriter writer = new FileWriter(expressionFile, Charset.defaultCharset()); + writer.write(expression); + writer.close(); + + // test passing in the file + // notice that we do not pass in zkHost or solrUrl for a simple echo run locally. + String[] args = { + "stream", + "-e", + "local", + "--verbose", + "-zk-host", + cluster.getZkClient().getZkServerAddress(), + expressionFile.getAbsolutePath() + }; + + assertEquals(0, runTool(args)); + + // test passing in the expression directly + args = + new String[] { + "stream", + "--execution", + "local", + "--verbose", + "--zk-host", + cluster.getZkClient().getZkServerAddress(), + expression + }; + + assertEquals(0, runTool(args)); + } + + @Test + public void testRunEchoStreamRemotely() throws Exception { + String collectionName = "streamWorkerCollection"; + withBasicAuth(CollectionAdminRequest.createCollection(collectionName, "_default", 1, 1)) + .processAndWait(cluster.getSolrClient(), 10); + waitForState( + "Expected collection to be created with 1 shard and 1 replicas", + collectionName, + clusterShape(1, 1)); + + String expression = "echo(Hello)"; + File expressionFile = File.createTempFile("expression", ".EXPR"); + FileWriter writer = new FileWriter(expressionFile, Charset.defaultCharset()); + writer.write(expression); + writer.close(); + + // test passing in the file + String[] args = { + "stream", + "-e", + "remote", + "-c", + collectionName, + "--verbose", + "-z", + cluster.getZkClient().getZkServerAddress(), + "--credentials", + SecurityJson.USER_PASS, + expressionFile.getAbsolutePath() + }; + + assertEquals(0, runTool(args)); + + // test passing in the expression directly + args = + new String[] { + "stream", + "--execution", + "remote", + "--name", + collectionName, + "--verbose", + "--zk-host", + cluster.getZkClient().getZkServerAddress(), + "--credentials", + SecurityJson.USER_PASS, + expression + }; + + assertEquals(0, runTool(args)); + } + + private int runTool(String[] args) throws Exception { + Tool tool = findTool(args); + assertTrue(tool instanceof StreamTool); + CommandLine cli = parseCmdLine(tool, args); + return tool.runTool(cli); + } + + // Copied from StreamExpressionTest.java + private static void populateFileWithData(Path dataFile) throws Exception { + // Files.createFile(dataFile); + try (final BufferedWriter writer = Files.newBufferedWriter(dataFile, StandardCharsets.UTF_8)) { + for (int i = 1; i <= 4; i++) { + writer.write(dataFile.getFileName() + " line " + i); + writer.newLine(); + } + } + } +} diff --git a/solr/packaging/test/test_stream.bats b/solr/packaging/test/test_stream.bats new file mode 100644 index 00000000000..63145522c79 --- /dev/null +++ b/solr/packaging/test/test_stream.bats @@ -0,0 +1,86 @@ +#!/usr/bin/env bats + +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +load bats_helper + +setup_file() { + common_clean_setup + solr start -e techproducts + solr auth enable --type basicAuth --credentials name:password +} + +teardown_file() { + common_setup + solr stop --all +} + +setup() { + common_setup +} + +teardown() { + # save a snapshot of SOLR_HOME for failed tests + save_home_on_failure +} + +@test "searching solr via locally executed streaming expression" { + + local solr_stream_file="${BATS_TEST_TMPDIR}/search.expr" + echo 'search(techproducts,' > "${solr_stream_file}" + echo 'q="name:memory",' >> "${solr_stream_file}" + echo 'fl="name,price",' >> "${solr_stream_file}" + echo 'sort="price desc"' >> "${solr_stream_file}" + echo ')' >> "${solr_stream_file}" + + run solr stream --execution local --header --credentials name:password ${solr_stream_file} + + assert_output --partial 'name price' + assert_output --partial 'CORSAIR XMS' + refute_output --partial 'ERROR' +} + +@test "searching solr via remotely executed streaming expression" { + + local solr_stream_file="${BATS_TEST_TMPDIR}/search.expr" + echo 'search(techproducts,' > "${solr_stream_file}" + echo 'q="name:memory",' >> "${solr_stream_file}" + echo 'fl="name,price",' >> "${solr_stream_file}" + echo 'sort="price desc"' >> "${solr_stream_file}" + echo ')' >> "${solr_stream_file}" + + run solr stream -e remote --name techproducts --solr-url http://localhost:${SOLR_PORT} --header --credentials name:password ${solr_stream_file} + + assert_output --partial 'name price' + assert_output --partial 'CORSAIR XMS' + refute_output --partial 'ERROR' +} + +@test "variable interpolation" { + + local solr_stream_file="${BATS_TEST_TMPDIR}/search.expr" + echo 'search(techproducts,' > "${solr_stream_file}" + echo 'q="name:$1",' >> "${solr_stream_file}" + echo 'fl="name,price",' >> "${solr_stream_file}" + echo 'sort="price $2"' >> "${solr_stream_file}" + echo ')' >> "${solr_stream_file}" + + run solr stream --execution local --header --credentials name:password ${solr_stream_file} apple asc + + assert_output --partial 'name price' + assert_output --partial 'Apple 60 GB iPod' + refute_output --partial 'ERROR' +} diff --git a/solr/solr-ref-guide/modules/query-guide/pages/stream-tool.adoc b/solr/solr-ref-guide/modules/query-guide/pages/stream-tool.adoc new file mode 100644 index 00000000000..20fe2458e42 --- /dev/null +++ b/solr/solr-ref-guide/modules/query-guide/pages/stream-tool.adoc @@ -0,0 +1,176 @@ += Stream Tool +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +The Stream tool allows you to run a xref:streaming-expressions.adoc[] in Solr and see the results from the command line. +It is very similar to the xref:stream-screen.adoc[], but is part of the `bin/solr` CLI. +Being a CLI, you can pipe content into it similar to other Unix style tools, as well as run actually RUN many kinds of expressions locally as well. + +NOTE: The Stream Tool is classified as "experimental". +It may change in backwards-incompatible ways as it evolves to cover additional functionality. + +To run it, open a terminal and enter: + +[,console] +---- +$ bin/solr stream --header -c techproducts --delimiter=\| 'search(techproducts,q="name:memory",fl="name,price")' +---- + +This will run the provided streaming expression on the `techproducts` collection on your local Solr and produce: + +[,console] +---- +name|price +CORSAIR XMS 2GB (2 x 1GB) 184-Pin DDR SDRAM Unbuffered DDR 400 (PC 3200) Dual Channel Kit System Memory - Retail|185.0 +CORSAIR ValueSelect 1GB 184-Pin DDR SDRAM Unbuffered DDR 400 (PC 3200) System Memory - Retail|74.99 +A-DATA V-Series 1GB 184-Pin DDR SDRAM Unbuffered DDR 400 (PC 3200) System Memory - OEM| +---- + +TIP: Notice how we used the pipe character (|) as the delimiter? It required a backslash for escaping it so it wouldn't be treated as a pipe within the shell script. + +You can also specify a file with the suffix `.expr` containing your streaming expression. +This is useful for longer expressions or if you are experiencing shell character-escaping issues with your expression. + +Assuming you have create the file `stream.expr` with the contents: + +---- +# Stream a search + +search( + techproducts, + q="name:memory", + fl="name,price", + sort="price desc" +) +---- + +Then you can run it on the Solr collection `techproducts`, specifying you want a header row: + +[,console] +---- +$ bin/solr stream --header -c techproducts stream.expr +---- + +And this will produce: + +[,console] +---- +name price +CORSAIR XMS 2GB (2 x 1GB) 184-Pin DDR SDRAM Unbuffered DDR 400 (PC 3200) Dual Channel Kit System Memory - Retail 185.0 +CORSAIR ValueSelect 1GB 184-Pin DDR SDRAM Unbuffered DDR 400 (PC 3200) System Memory - Retail 74.99 +A-DATA V-Series 1GB 184-Pin DDR SDRAM Unbuffered DDR 400 (PC 3200) System Memory - OEM +---- + +== Using the bin/solr stream Tool + +To use the tool you need to provide the streaming expression either inline as the last argument, or provide a file ending in `.expr` that contains the expression. + +The `--help` (or simply `-h`) option will output information on its usage (i.e., `bin/solr stream --help)`: + +[source,plain] +---- +usage: bin/solr stream [--array-delimiter ] [-c ] [--delimiter ] [-e ] [-f + ] [-h] [--header] [-s ] [-u ] [-v] [-z ] + +List of options: + --array-delimiter The delimiter multi-valued fields. Default to using a pipe (|) delimiter. + -c,--name Name of the specific collection to execute expression on if the execution is set + to 'remote'. Required for 'remote' execution environment. + --delimiter The output delimiter. Default to using three spaces. + -e,--execution Execution environment is either 'local' (i.e CLI process) or via a 'remote' Solr + server. Default environment is 'remote'. + -f,--fields The fields in the tuples to output. Defaults to fields in the first tuple of result + set. + -h,--help Print this message. + --header Specify to include a header line. + -s,--solr-url Base Solr URL, which can be used to determine the zk-host if that's not known; + defaults to: http://localhost:8983. + -u,--credentials Credentials in the format username:password. Example: --credentials solr:SolrRocks + -v,--verbose Enable verbose command output. + -z,--zk-host Zookeeper connection string; unnecessary if ZK_HOST is defined in solr.in.sh; + otherwise, defaults to localhost:9983. +---- + +== Examples Using bin/solr stream + +There are several ways to use `bin/solr stream`. +This section presents several examples. + +=== Executing Expression Locally + +Streaming Expressions by default are executed in the Solr cluster. +However there are use cases where you want to interact with data in your local environment, or even run a streaming expression independent of Solr. + +The Stream Tool allows you to specify `--execution local` to process the expression in the Solr CLI's JVM. + +However, "local" processing does not imply a networking sandbox. +Many streaming expressions, such as `search` and `update`, will make network requests to remote Solr nodes if configured to do so, even in "local" execution mode. + +Assuming you have create the file `load_data.expr` with the contents: + +---- +# Index CSV File + +update( + gettingstarted, + parseCSV( + cat(./example/exampledocs/books.csv, maxLines=2) + ) +) +---- + +Running this expression will read in the local file and send the first two lines to the collection `gettingstarted`. + +TIP: Want to send data to a remote Solr? pass in `--solr-url http://solr.remote:8983`. + + +[,console] +---- +$ bin/solr stream --execution local --header load_data.expr +---- + + +The StreamTool adds some Streaming Expressions specifically for local use: + +* stdin() lets you pipe data directly into the streaming expression. +* cat() that allows you to read ANY file on your local system. This is different from the xref:stream-source-reference.adoc#cat[`cat`] operator that runs in Solr that only accesses `$SOLR_HOME/userfiles/`. + +Caveats: + + * You don't get to use any of the parallelization support that is available when you run the expression on the cluster. + * Anything that requires Solr internals access won't work with the `--execution local` context. + +=== Piping data to an expression + +Index a CSV file into `gettingstarted` collection. + +[,console] +---- +$ cat example/exampledocs/books.csv | bin/solr stream -e local 'update(gettingstarted,parseCSV(stdin()))' +---- + +=== Variable interpolation + +You can do variable interpolation via having `$1`, `$2` etc in your streaming expression, and then passing those values as arguments. + +[,console] +---- +$ bin/solr stream -c techproducts 'echo("$1")' "Hello World" +Hello World +---- + +This also works when using `.expr` files. diff --git a/solr/solr-ref-guide/modules/query-guide/pages/streaming-expressions.adoc b/solr/solr-ref-guide/modules/query-guide/pages/streaming-expressions.adoc index a9a6bf564aa..cc3e502a2bc 100644 --- a/solr/solr-ref-guide/modules/query-guide/pages/streaming-expressions.adoc +++ b/solr/solr-ref-guide/modules/query-guide/pages/streaming-expressions.adoc @@ -143,3 +143,7 @@ The xref:math-expressions.adoc[] has in depth coverage of visualization techniqu === Stream Screen * xref:stream-screen.adoc[]: Submit streaming expressions and see results and parsing explanations. + +=== Stream Tool + +* xref:stream-tool.adoc[]: Submit streaming expressions and see results via `bin/solr stream`. diff --git a/solr/solr-ref-guide/modules/query-guide/querying-nav.adoc b/solr/solr-ref-guide/modules/query-guide/querying-nav.adoc index aa3f0fbade7..973bd80f005 100644 --- a/solr/solr-ref-guide/modules/query-guide/querying-nav.adoc +++ b/solr/solr-ref-guide/modules/query-guide/querying-nav.adoc @@ -96,3 +96,4 @@ ** xref:graph-traversal.adoc[] ** xref:stream-api.adoc[] ** xref:stream-screen.adoc[] +** xref:stream-tool.adoc[] diff --git a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/SolrClientCache.java b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/SolrClientCache.java index 45ce93c30c4..7550d1a35c4 100644 --- a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/SolrClientCache.java +++ b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/SolrClientCache.java @@ -17,7 +17,6 @@ package org.apache.solr.client.solrj.io; import java.io.Closeable; -import java.lang.invoke.MethodHandles; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -38,14 +37,10 @@ import org.apache.solr.common.AlreadyClosedException; import org.apache.solr.common.util.IOUtils; import org.apache.solr.common.util.URLUtil; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** The SolrClientCache caches SolrClients, so they can be reused by different TupleStreams. */ public class SolrClientCache implements Closeable { - private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); - // Set the floor for timeouts to 60 seconds. // Timeouts can be increased by setting the system properties defined below. private static final int MIN_TIMEOUT = 60000; @@ -55,6 +50,8 @@ public class SolrClientCache implements Closeable { private static final int minSocketTimeout = Math.max(Integer.getInteger(HttpClientUtil.PROP_SO_TIMEOUT, MIN_TIMEOUT), MIN_TIMEOUT); + private String basicAuthCredentials = null; // Only support with the http2SolrClient + private final Map solrClients = new HashMap<>(); private final HttpClient apacheHttpClient; private final Http2SolrClient http2SolrClient; @@ -77,6 +74,10 @@ public SolrClientCache(Http2SolrClient http2SolrClient) { this.http2SolrClient = http2SolrClient; } + public void setBasicAuthCredentials(String basicAuthCredentials) { + this.basicAuthCredentials = basicAuthCredentials; + } + public void setDefaultZKHost(String zkHost) { if (zkHost != null) { zkHost = zkHost.split("/")[0]; @@ -101,11 +102,12 @@ public synchronized CloudSolrClient getCloudSolrClient(String zkHost) { String zkHostNoChroot = zkHost.split("/")[0]; boolean canUseACLs = Optional.ofNullable(defaultZkHost.get()).map(zkHostNoChroot::equals).orElse(false); + final CloudSolrClient client; if (apacheHttpClient != null) { client = newCloudLegacySolrClient(zkHost, apacheHttpClient, canUseACLs); } else { - client = newCloudHttp2SolrClient(zkHost, http2SolrClient, canUseACLs); + client = newCloudHttp2SolrClient(zkHost, http2SolrClient, canUseACLs, basicAuthCredentials); } solrClients.put(zkHost, client); return client; @@ -129,12 +131,17 @@ private static CloudSolrClient newCloudLegacySolrClient( } private static CloudHttp2SolrClient newCloudHttp2SolrClient( - String zkHost, Http2SolrClient http2SolrClient, boolean canUseACLs) { + String zkHost, + Http2SolrClient http2SolrClient, + boolean canUseACLs, + String basicAuthCredentials) { final List hosts = List.of(zkHost); var builder = new CloudHttp2SolrClient.Builder(hosts, Optional.empty()); builder.canUseZkACLs(canUseACLs); // using internal builder to ensure the internal client gets closed - builder = builder.withInternalClientBuilder(newHttp2SolrClientBuilder(null, http2SolrClient)); + builder = + builder.withInternalClientBuilder( + newHttp2SolrClientBuilder(null, http2SolrClient, basicAuthCredentials)); var client = builder.build(); try { client.connect(); @@ -163,7 +170,7 @@ public synchronized SolrClient getHttpSolrClient(String baseUrl) { if (apacheHttpClient != null) { client = newHttpSolrClient(baseUrl, apacheHttpClient); } else { - client = newHttp2SolrClientBuilder(baseUrl, http2SolrClient).build(); + client = newHttp2SolrClientBuilder(baseUrl, http2SolrClient, basicAuthCredentials).build(); } solrClients.put(baseUrl, client); return client; @@ -190,7 +197,7 @@ private static void adjustTimeouts(SolrClientBuilder builder, HttpClient http } private static Http2SolrClient.Builder newHttp2SolrClientBuilder( - String url, Http2SolrClient http2SolrClient) { + String url, Http2SolrClient http2SolrClient, String basicAuthCredentials) { final var builder = (url == null || URLUtil.isBaseUrl(url)) // URL may be null here and set by caller ? new Http2SolrClient.Builder(url) @@ -199,6 +206,8 @@ private static Http2SolrClient.Builder newHttp2SolrClientBuilder( if (http2SolrClient != null) { builder.withHttpClient(http2SolrClient); } + builder.withOptionalBasicAuthCredentials(basicAuthCredentials); + long idleTimeout = minSocketTimeout; if (builder.getIdleTimeoutMillis() != null) { idleTimeout = Math.max(idleTimeout, builder.getIdleTimeoutMillis()); diff --git a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/LetStream.java b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/LetStream.java index 9576cf9658e..fc26a8972f7 100644 --- a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/LetStream.java +++ b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/LetStream.java @@ -224,4 +224,9 @@ public StreamComparator getStreamSort() { public int getCost() { return 0; } + + @SuppressWarnings({"rawtypes"}) + public Map getLetParams() { + return this.letParams; + } }