diff --git a/README.md b/README.md index 23ae5df..cdcd072 100644 --- a/README.md +++ b/README.md @@ -45,7 +45,7 @@ curl -s http://localhost:9200/.awesome_events/_search -H 'Content-Type: applicat Get queries: ``` -curl -s http://localhost:9200/.awesome_queries/_search | jq +curl -s http://localhost:9200/.awesome_queries/_search -H "X-ubi-store: awesome" | jq ``` Delete the store: diff --git a/src/main/java/org/opensearch/ubl/HeaderConstants.java b/src/main/java/org/opensearch/ubl/HeaderConstants.java new file mode 100644 index 0000000..b848641 --- /dev/null +++ b/src/main/java/org/opensearch/ubl/HeaderConstants.java @@ -0,0 +1,15 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.ubl; + +public class HeaderConstants { + + public static final String EVENT_STORE_HEADER = "X-ubi-store"; + +} diff --git a/src/main/java/org/opensearch/ubl/UserBehaviorLoggingPlugin.java b/src/main/java/org/opensearch/ubl/UserBehaviorLoggingPlugin.java index 47380fe..a34a1bf 100644 --- a/src/main/java/org/opensearch/ubl/UserBehaviorLoggingPlugin.java +++ b/src/main/java/org/opensearch/ubl/UserBehaviorLoggingPlugin.java @@ -10,11 +10,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.opensearch.ubl.action.UserBehaviorLoggingRestHandler; -import org.opensearch.ubl.action.UserBehaviorLoggingSearchFilter; import org.opensearch.action.support.ActionFilter; -import org.opensearch.ubl.backends.Backend; -import org.opensearch.ubl.backends.OpenSearchBackend; import org.opensearch.client.Client; import org.opensearch.cluster.metadata.IndexNameExpressionResolver; import org.opensearch.cluster.node.DiscoveryNodes; @@ -24,14 +20,19 @@ import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.env.Environment; import org.opensearch.env.NodeEnvironment; -import org.opensearch.ubl.events.OpenSearchEventManager; import org.opensearch.plugins.ActionPlugin; import org.opensearch.plugins.Plugin; import org.opensearch.repositories.RepositoriesService; import org.opensearch.rest.RestController; import org.opensearch.rest.RestHandler; +import org.opensearch.rest.RestHeaderDefinition; import org.opensearch.script.ScriptService; import org.opensearch.threadpool.ThreadPool; +import org.opensearch.ubl.action.UserBehaviorLoggingActionFilter; +import org.opensearch.ubl.action.UserBehaviorLoggingRestHandler; +import org.opensearch.ubl.backends.Backend; +import org.opensearch.ubl.backends.OpenSearchBackend; +import org.opensearch.ubl.events.OpenSearchEventManager; import org.opensearch.watcher.ResourceWatcherService; import java.util.ArrayList; @@ -50,6 +51,16 @@ public class UserBehaviorLoggingPlugin extends Plugin implements ActionPlugin { private Backend backend; private ActionFilter userBehaviorLoggingFilter; + @Override + public Collection getRestHeaders() { + return List.of(new RestHeaderDefinition(HeaderConstants.EVENT_STORE_HEADER, false)); + } + + @Override + public Collection getTaskHeaders() { + return List.of(HeaderConstants.EVENT_STORE_HEADER); + } + @Override public List getRestHandlers(final Settings settings, final RestController restController, @@ -98,7 +109,7 @@ public Collection createComponents( ) { this.backend = new OpenSearchBackend(client); - this.userBehaviorLoggingFilter = new UserBehaviorLoggingSearchFilter(backend, environment.settings()); + this.userBehaviorLoggingFilter = new UserBehaviorLoggingActionFilter(backend, environment.settings(), threadPool); LOGGER.info("Creating scheduled task"); diff --git a/src/main/java/org/opensearch/ubl/action/UserBehaviorLoggingSearchFilter.java b/src/main/java/org/opensearch/ubl/action/UserBehaviorLoggingActionFilter.java similarity index 68% rename from src/main/java/org/opensearch/ubl/action/UserBehaviorLoggingSearchFilter.java rename to src/main/java/org/opensearch/ubl/action/UserBehaviorLoggingActionFilter.java index 336c0fc..eac44cd 100644 --- a/src/main/java/org/opensearch/ubl/action/UserBehaviorLoggingSearchFilter.java +++ b/src/main/java/org/opensearch/ubl/action/UserBehaviorLoggingActionFilter.java @@ -16,27 +16,31 @@ import org.opensearch.action.support.ActionFilter; import org.opensearch.action.support.ActionFilterChain; import org.opensearch.common.settings.Settings; -import org.opensearch.common.util.concurrent.ThreadContext; import org.opensearch.core.action.ActionListener; import org.opensearch.core.action.ActionResponse; +import org.opensearch.tasks.Task; import org.opensearch.threadpool.ThreadPool; -import org.opensearch.ubl.model.QueryResponse; +import org.opensearch.ubl.HeaderConstants; import org.opensearch.ubl.backends.Backend; import org.opensearch.ubl.model.QueryRequest; -import org.opensearch.tasks.Task; +import org.opensearch.ubl.model.QueryResponse; -import java.util.*; +import java.util.LinkedList; +import java.util.List; +import java.util.UUID; -public class UserBehaviorLoggingSearchFilter implements ActionFilter { +public class UserBehaviorLoggingActionFilter implements ActionFilter { - private static final Logger LOGGER = LogManager.getLogger(UserBehaviorLoggingSearchFilter.class); + private static final Logger LOGGER = LogManager.getLogger(UserBehaviorLoggingActionFilter.class); private final Backend backend; private final Settings settings; + private final ThreadPool threadPool; - public UserBehaviorLoggingSearchFilter(final Backend backend, final Settings settings) { + public UserBehaviorLoggingActionFilter(final Backend backend, final Settings settings, ThreadPool threadPool) { this.backend = backend; this.settings = settings; + this.threadPool = threadPool; } @Override @@ -59,15 +63,22 @@ public void app @Override public void onResponse(Response response) { + LOGGER.info("Query ID header: " + task.getHeader("query-id")); + final long startTime = System.currentTimeMillis(); - // Get the search itself. - final SearchRequest searchRequest = (SearchRequest) request; + final String eventStore = task.getHeader(HeaderConstants.EVENT_STORE_HEADER); - // Restrict this to only searches of certain indices specified in the settings. - //final List indices = Arrays.asList(searchRequest.indices()); - //final Set indicesToLog = new HashSet<>(Arrays.asList(settings.get(SettingsConstants.INDEX_NAMES).split(","))); - //if(indicesToLog.containsAll(indices)) { + // If there is no event store header we should not continue anything. + if(eventStore != null && !eventStore.trim().isEmpty()) { + + // Get the search itself. + final SearchRequest searchRequest = (SearchRequest) request; + + // TODO: Restrict logging to only queries of certain indices specified in the settings. + //final List indices = Arrays.asList(searchRequest.indices()); + //final Set indicesToLog = new HashSet<>(Arrays.asList(settings.get(SettingsConstants.INDEX_NAMES).split(","))); + //if(indicesToLog.containsAll(indices)) { // Get all search hits from the response. if (response instanceof SearchResponse) { @@ -82,6 +93,7 @@ public void onResponse(Response response) { final String queryResponseId = UUID.randomUUID().toString(); final List queryResponseHitIds = new LinkedList<>(); + final SearchResponse searchResponse = (SearchResponse) response; // Add each hit to the list of query responses. @@ -92,8 +104,7 @@ public void onResponse(Response response) { try { // Persist the query to the backend. - // TODO: How do we know which storeName? - backend.persistQuery("awesome", + backend.persistQuery(eventStore, new QueryRequest(queryId, query), new QueryResponse(queryId, queryResponseId, queryResponseHitIds)); @@ -102,14 +113,16 @@ public void onResponse(Response response) { LOGGER.error("Unable to persist query.", ex); } - // TODO: Somehow return the queryId to the client. + threadPool.getThreadContext().addResponseHeader("query_id", queryId); } - //} + //} + + final long elapsedTime = System.currentTimeMillis() - startTime; + LOGGER.info("UBL search request filter took {} ms", elapsedTime); - final long elapsedTime = System.currentTimeMillis() - startTime; - LOGGER.info("UBL search request filter took {} ms", elapsedTime); + } listener.onResponse(response);