diff --git a/index-chorus-data.sh b/index-chorus-data.sh index a885990..44d34e6 100755 --- a/index-chorus-data.sh +++ b/index-chorus-data.sh @@ -1,6 +1,6 @@ #!/bin/bash -e -CHORUS_HOME=`realpath ../chorus-opensearch-edition` +CHORUS_HOME=${1:-`realpath ../chorus-opensearch-edition`} echo "Using CHORUS_HOME = ${CHORUS_HOME}" TEMP_FILE=`mktemp` diff --git a/load-test/load-test.py b/load-test/load-test.py index d818ad3..780c359 100644 --- a/load-test/load-test.py +++ b/load-test/load-test.py @@ -22,3 +22,12 @@ def event_task(self): } self.client.post("/_plugins/ubi/mystore", headers=headers, json=data) + + @task + def queries_task(self): + headers = { + "Content-Type": "application/json", + "X-ubi-store": "mystore" + } + + self.client.get("/ecommerce/_search", headers=headers) \ No newline at end of file diff --git a/load-test/run.sh b/load-test/run.sh index fc36a81..246d0ce 100755 --- a/load-test/run.sh +++ b/load-test/run.sh @@ -1,17 +1,24 @@ #!/bin/bash -e # Delete the store if it exists. -curl -X DELETE http://localhost:9200/_plugins/ubi/mystore +curl -X DELETE "http://localhost:9200/_plugins/ubi/mystore" -# Create the store -curl -X PUT http://localhost:9200/_plugins/ubi/mystore +# Create the store. +curl -X PUT "http://localhost:9200/_plugins/ubi/mystore?index=ecommerce" -# Insert events -locust -f load-test.py --headless -u 1 -r 1 --run-time 10s --host http://localhost:9200 +# Index subset of Chorus data. +../index-chorus-data.sh `realpath ../../chorus-opensearch-edition` + +# Insert events and queries. +locust -f load-test.py --headless -u 1 -r 1 --run-time 30s --host http://localhost:9200 # Let events index. -sleep 2 +sleep 10 # Get count of indexed events. EVENTS=`curl -s http://localhost:9200/.mystore_events/_count | jq .count` -echo "Found $EVENTS indexed" +echo "Found $EVENTS events" + +# Get count of indexed queries. +QUERIES=`curl -s http://localhost:9200/.mystore_queries/_count | jq .count` +echo "Found $QUERIES queries" diff --git a/src/main/java/com/o19s/ubi/OpenSearchEventManager.java b/src/main/java/com/o19s/ubi/OpenSearchEventManager.java new file mode 100644 index 0000000..0fb06e0 --- /dev/null +++ b/src/main/java/com/o19s/ubi/OpenSearchEventManager.java @@ -0,0 +1,121 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package com.o19s.ubi; + +import com.o19s.ubi.events.Event; +import com.o19s.ubi.events.EventManager; +import com.o19s.ubi.model.QueryRequest; +import com.o19s.ubi.utils.UbiUtils; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.action.bulk.BulkRequest; +import org.opensearch.action.index.IndexRequest; +import org.opensearch.client.Client; +import org.opensearch.common.xcontent.XContentType; + +import java.util.HashMap; +import java.util.Map; + +/** + * An event manager that inserts events into an OpenSearch index. + */ +public class OpenSearchEventManager extends EventManager { + + private static final Logger LOGGER = LogManager.getLogger(OpenSearchEventManager.class); + + private final Client client; + private static OpenSearchEventManager openSearchEventManager; + + private OpenSearchEventManager(Client client) { + this.client = client; + } + + @Override + public void process() { + + if(eventsQueue.size() > 0) { + + final BulkRequest eventsBulkRequest = new BulkRequest(); + LOGGER.info("Bulk inserting " + eventsQueue.size() + " UBI events"); + + for (final Event event : eventsQueue.get()) { + + final IndexRequest indexRequest = new IndexRequest(event.getIndexName()) + .source(event.getEvent(), XContentType.JSON); + + eventsBulkRequest.add(indexRequest); + + } + + eventsQueue.clear(); + client.bulk(eventsBulkRequest); + + } + + if(queryRequestsQueue.size() > 0) { + + final BulkRequest queryRequestsBulkRequest = new BulkRequest(); + LOGGER.info("Bulk inserting " + queryRequestsQueue.size() + " UBI queries"); + + for(final QueryRequest queryRequest : queryRequestsQueue.get()) { + + LOGGER.info("Writing query ID {} with response ID {}", + queryRequest.getQueryId(), queryRequest.getQueryResponse().getQueryResponseId()); + + // What will be indexed - adheres to the queries-mapping.json + final Map source = new HashMap<>(); + source.put("timestamp", queryRequest.getTimestamp()); + source.put("query_id", queryRequest.getQueryId()); + source.put("query", queryRequest.getQuery()); + source.put("query_response_id", queryRequest.getQueryResponse().getQueryResponseId()); + source.put("query_response_hit_ids", queryRequest.getQueryResponse().getQueryResponseHitIds()); + source.put("user_id", queryRequest.getUserId()); + source.put("session_id", queryRequest.getSessionId()); + + // Get the name of the queries. + final String queriesIndexName = UbiUtils.getQueriesIndexName(queryRequest.getStoreName()); + + // Build the index request. + final IndexRequest indexRequest = new IndexRequest(queriesIndexName) + .source(source, XContentType.JSON); + + queryRequestsBulkRequest.add(indexRequest); + + } + + queryRequestsQueue.clear(); + client.bulk(queryRequestsBulkRequest); + + } + + } + + @Override + public void add(final Event event) { + eventsQueue.add(event); + } + + @Override + public void add(final QueryRequest queryRequest) { + queryRequestsQueue.add(queryRequest); + } + + /** + * Gets a singleton instance of the manager. + * @param client An OpenSearch {@link Client}. + * @return An instance of {@link OpenSearchEventManager}. + */ + public static OpenSearchEventManager getInstance(Client client) { + if(openSearchEventManager == null) { + openSearchEventManager = new OpenSearchEventManager(client); + } + return openSearchEventManager; + } + +} diff --git a/src/main/java/com/o19s/ubi/UserBehaviorInsightsPlugin.java b/src/main/java/com/o19s/ubi/UserBehaviorInsightsPlugin.java index 9c4cc38..580f8cc 100644 --- a/src/main/java/com/o19s/ubi/UserBehaviorInsightsPlugin.java +++ b/src/main/java/com/o19s/ubi/UserBehaviorInsightsPlugin.java @@ -8,6 +8,10 @@ package com.o19s.ubi; +import com.o19s.ubi.action.UserBehaviorInsightsActionFilter; +import com.o19s.ubi.model.HeaderConstants; +import com.o19s.ubi.model.SettingsConstants; +import com.o19s.ubi.rest.UserBehaviorInsightsRestHandler; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.opensearch.action.support.ActionFilter; @@ -15,11 +19,7 @@ import org.opensearch.cluster.metadata.IndexNameExpressionResolver; import org.opensearch.cluster.node.DiscoveryNodes; import org.opensearch.cluster.service.ClusterService; -import org.opensearch.common.settings.ClusterSettings; -import org.opensearch.common.settings.IndexScopedSettings; -import org.opensearch.common.settings.Setting; -import org.opensearch.common.settings.Settings; -import org.opensearch.common.settings.SettingsFilter; +import org.opensearch.common.settings.*; import org.opensearch.core.common.io.stream.NamedWriteableRegistry; import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.env.Environment; @@ -32,19 +32,9 @@ import org.opensearch.rest.RestHeaderDefinition; import org.opensearch.script.ScriptService; import org.opensearch.threadpool.ThreadPool; -import com.o19s.ubi.action.UserBehaviorInsightsActionFilter; -import com.o19s.ubi.rest.UserBehaviorInsightsRestHandler; -import com.o19s.ubi.events.OpenSearchEventManager; -import com.o19s.ubi.model.HeaderConstants; -import com.o19s.ubi.model.SettingsConstants; import org.opensearch.watcher.ResourceWatcherService; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.util.*; import java.util.concurrent.TimeUnit; import java.util.function.Supplier; diff --git a/src/main/java/com/o19s/ubi/action/UserBehaviorInsightsActionFilter.java b/src/main/java/com/o19s/ubi/action/UserBehaviorInsightsActionFilter.java index 5dd682c..425fd05 100644 --- a/src/main/java/com/o19s/ubi/action/UserBehaviorInsightsActionFilter.java +++ b/src/main/java/com/o19s/ubi/action/UserBehaviorInsightsActionFilter.java @@ -9,6 +9,7 @@ package com.o19s.ubi.action; import com.o19s.ubi.UserBehaviorInsightsPlugin; +import com.o19s.ubi.events.EventManager; import com.o19s.ubi.model.HeaderConstants; import com.o19s.ubi.model.QueryRequest; import com.o19s.ubi.model.QueryResponse; @@ -31,13 +32,9 @@ import org.opensearch.search.SearchHit; import org.opensearch.tasks.Task; import org.opensearch.threadpool.ThreadPool; +import com.o19s.ubi.OpenSearchEventManager; -import java.util.Arrays; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.UUID; +import java.util.*; /** * An implementation of {@link ActionFilter} that passively listens for OpenSearch @@ -49,6 +46,7 @@ public class UserBehaviorInsightsActionFilter implements ActionFilter { private final Client client; private final ThreadPool threadPool; + private final EventManager eventManager; /** * Creates a new filter. @@ -58,6 +56,7 @@ public class UserBehaviorInsightsActionFilter implements ActionFilter { public UserBehaviorInsightsActionFilter(Client client, ThreadPool threadPool) { this.client = client; this.threadPool = threadPool; + this.eventManager = OpenSearchEventManager.getInstance(client); } @Override @@ -90,17 +89,17 @@ public void onResponse(Response response) { // Get info from the headers. final String queryId = getHeaderValue(HeaderConstants.QUERY_ID_HEADER, UUID.randomUUID().toString(), task); - final String eventStore = getHeaderValue(HeaderConstants.EVENT_STORE_HEADER, "", task); + final String storeName = getHeaderValue(HeaderConstants.EVENT_STORE_HEADER, "", task); final String userId = getHeaderValue(HeaderConstants.USER_ID_HEADER, "", task); final String sessionId = getHeaderValue(HeaderConstants.SESSION_ID_HEADER, "", task); // If there is no event store header, ignore this search. - if(!"".equals(eventStore)) { + if(!"".equals(storeName)) { - final String index = getStoreSettings(eventStore, SettingsConstants.INDEX); - final String idField = getStoreSettings(eventStore, SettingsConstants.ID_FIELD); + final String index = getStoreSettings(storeName, SettingsConstants.INDEX); + final String idField = getStoreSettings(storeName, SettingsConstants.ID_FIELD); - LOGGER.info("Using id_field [{}] of index [{}] for UBI query.", idField, index); + LOGGER.debug("Using id_field [{}] of index [{}] for UBI query.", idField, index); // Only consider this search if the index being searched matches the store's index setting. if (Arrays.asList(searchRequest.indices()).contains(index)) { @@ -131,38 +130,31 @@ public void onResponse(Response response) { } - try { + final QueryResponse queryResponse = new QueryResponse(queryId, queryResponseId, queryResponseHitIds); + final QueryRequest queryRequest = new QueryRequest(storeName, queryId, query, userId, sessionId, queryResponse); - // Persist the query to the backend. - persistQuery(eventStore, - new QueryRequest(queryId, query, userId, sessionId), - new QueryResponse(queryId, queryResponseId, queryResponseHitIds)); - - } catch (Exception ex) { - // TODO: Handle this. - LOGGER.error("Unable to persist query.", ex); - } + // Queue this for writing to the UBI store. + eventManager.add(queryRequest); + // Add the query_id to the response headers. threadPool.getThreadContext().addResponseHeader("query_id", queryId); - //} - final long elapsedTime = System.currentTimeMillis() - startTime; LOGGER.info("UBI search request filter took {} ms", elapsedTime); } - } + LOGGER.info("Setting and exposing query_id {}", queryId); + //HACK: this should be set in the OpenSearch config (to send to the client code just once), + // and not on every single search response, + // but that server setting doesn't appear to be exposed. + threadPool.getThreadContext().addResponseHeader("Access-Control-Expose-Headers", "query_id"); + threadPool.getThreadContext().addResponseHeader("query_id", queryId); - LOGGER.info("Setting and exposing query_id {}", queryId); - //HACK: this should be set in the OpenSearch config (to send to the client code just once), - // and not on every single search response, - // but that server setting doesn't appear to be exposed. - threadPool.getThreadContext().addResponseHeader("Access-Control-Expose-Headers", "query_id"); - threadPool.getThreadContext().addResponseHeader("query_id", queryId); + final long elapsedTime = System.currentTimeMillis() - startTime; + LOGGER.info("UBI search request filter took {} ms", elapsedTime); - final long elapsedTime = System.currentTimeMillis() - startTime; - LOGGER.info("UBI search request filter took {} ms", elapsedTime); + } } diff --git a/src/main/java/com/o19s/ubi/events/AbstractEventManager.java b/src/main/java/com/o19s/ubi/events/EventManager.java similarity index 52% rename from src/main/java/com/o19s/ubi/events/AbstractEventManager.java rename to src/main/java/com/o19s/ubi/events/EventManager.java index 2f3a9b2..2ce64d7 100644 --- a/src/main/java/com/o19s/ubi/events/AbstractEventManager.java +++ b/src/main/java/com/o19s/ubi/events/EventManager.java @@ -8,29 +8,36 @@ package com.o19s.ubi.events; +import com.o19s.ubi.model.QueryRequest; +import com.o19s.ubi.model.events.queues.Queue; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import com.o19s.ubi.events.queues.EventQueue; import com.o19s.ubi.events.queues.InternalQueue; /** * Base class for managing client-side events. */ -public abstract class AbstractEventManager { +public abstract class EventManager { @SuppressWarnings("unused") - private final Logger LOGGER = LogManager.getLogger(AbstractEventManager.class); + private final Logger LOGGER = LogManager.getLogger(EventManager.class); /** - * The {@link EventQueue queue} that stores the client-side events. + * The {@link Queue queue} that stores the client-side events. */ - protected final EventQueue eventQueue; + protected final Queue eventsQueue; + + /** + * The {@link Queue queue} that stores the query reqeusts. + */ + protected final Queue queryRequestsQueue; /** * Initialize the base client-side event manager. */ - public AbstractEventManager() { - this.eventQueue = new InternalQueue(); + public EventManager() { + this.eventsQueue = new InternalQueue<>(); + this.queryRequestsQueue = new InternalQueue<>(); } /** @@ -44,4 +51,10 @@ public AbstractEventManager() { */ public abstract void add(Event event); + /** + * Add an event to the queue. + * @param queryRequest A {@link QueryRequest} to be persisted. + */ + public abstract void add(QueryRequest queryRequest); + } diff --git a/src/main/java/com/o19s/ubi/events/OpenSearchEventManager.java b/src/main/java/com/o19s/ubi/events/OpenSearchEventManager.java deleted file mode 100644 index 9c6aaf5..0000000 --- a/src/main/java/com/o19s/ubi/events/OpenSearchEventManager.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package com.o19s.ubi.events; - -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.opensearch.action.bulk.BulkRequest; -import org.opensearch.action.index.IndexRequest; -import org.opensearch.client.Client; -import org.opensearch.common.xcontent.XContentType; - -/** - * An event manager that inserts events into an OpenSearch index. - */ -public class OpenSearchEventManager extends AbstractEventManager { - - private static final Logger LOGGER = LogManager.getLogger(OpenSearchEventManager.class); - - private final Client client; - private static OpenSearchEventManager openSearchEventManager; - - private OpenSearchEventManager(Client client) { - this.client = client; - } - - @Override - public void process() { - - if(eventQueue.size() > 0) { - - final BulkRequest bulkRequest = new BulkRequest(); - LOGGER.info("Bulk inserting " + eventQueue.size() + " UBI events"); - - for (final Event event : eventQueue.get()) { - - final IndexRequest indexRequest = new IndexRequest(event.getIndexName()) - .source(event.getEvent(), XContentType.JSON); - - bulkRequest.add(indexRequest); - - } - - eventQueue.clear(); - client.bulk(bulkRequest); - - } - - } - - @Override - public void add(Event event) { - eventQueue.add(event); - } - - /** - * Gets a singleton instance of the manager. - * @param client An OpenSearch {@link Client}. - * @return An instance of {@link OpenSearchEventManager}. - */ - public static OpenSearchEventManager getInstance(Client client) { - if(openSearchEventManager == null) { - openSearchEventManager = new OpenSearchEventManager(client); - } - return openSearchEventManager; - } - -} diff --git a/src/main/java/com/o19s/ubi/events/queues/InternalQueue.java b/src/main/java/com/o19s/ubi/events/queues/InternalQueue.java index dcb7996..6a50fd4 100644 --- a/src/main/java/com/o19s/ubi/events/queues/InternalQueue.java +++ b/src/main/java/com/o19s/ubi/events/queues/InternalQueue.java @@ -8,20 +8,20 @@ package com.o19s.ubi.events.queues; -import com.o19s.ubi.events.Event; +import com.o19s.ubi.model.events.queues.Queue; import java.util.LinkedList; import java.util.List; /** - * An implementation of {@link EventQueue} that uses an in-memory list. + * An implementation of {@link Queue} that uses an in-memory list. */ -public class InternalQueue implements EventQueue { +public class InternalQueue implements Queue { - private static final List indexRequests = new LinkedList<>(); + private final List indexRequests = new LinkedList<>(); @Override - public void add(Event event) { + public void add(T event) { indexRequests.add(event); } @@ -31,7 +31,7 @@ public void clear() { } @Override - public List get() { + public List get() { return indexRequests; } diff --git a/src/main/java/com/o19s/ubi/events/queues/EventQueue.java b/src/main/java/com/o19s/ubi/events/queues/Queue.java similarity index 70% rename from src/main/java/com/o19s/ubi/events/queues/EventQueue.java rename to src/main/java/com/o19s/ubi/events/queues/Queue.java index fc6e802..78c526b 100644 --- a/src/main/java/com/o19s/ubi/events/queues/EventQueue.java +++ b/src/main/java/com/o19s/ubi/events/queues/Queue.java @@ -6,22 +6,20 @@ * compatible open source license. */ -package com.o19s.ubi.events.queues; - -import com.o19s.ubi.events.Event; +package com.o19s.ubi.model.events.queues; import java.util.List; /** * A queue that stores events prior to being indexed. */ -public interface EventQueue { +public interface Queue { /** - * Add an {@link Event event} to the queue. - * @param event The {@link Event event} to add to the queue. + * Add an object to the queue. + * @param event The object to add to the queue. */ - void add(Event event); + void add(T event); /** * Remove all events from the queue. @@ -32,7 +30,7 @@ public interface EventQueue { * Get a list of items in the queue. * @return A list of items in the queue. */ - List get(); + List get(); /** * Gets the count of items on the queue. diff --git a/src/main/java/com/o19s/ubi/model/QueryRequest.java b/src/main/java/com/o19s/ubi/model/QueryRequest.java index 71ab8a7..5b65ac5 100644 --- a/src/main/java/com/o19s/ubi/model/QueryRequest.java +++ b/src/main/java/com/o19s/ubi/model/QueryRequest.java @@ -13,25 +13,40 @@ */ public class QueryRequest { + private final String storeName; private final long timestamp; private final String queryId; private final String query; private final String userId; private final String sessionId; + private final QueryResponse queryResponse; /** * Creates a query request. + * @param storeName The name of the UBI store to hold this query request. * @param queryId The ID of the query. * @param query The query run by OpenSearch. * @param userId The ID of the user that initiated the query. * @param sessionId The ID of the session under which the query was run. + * @param queryResponse The {@link QueryResponse} for this query request. */ - public QueryRequest(final String queryId, final String query, final String userId, final String sessionId) { + public QueryRequest(final String storeName, final String queryId, final String query, + final String userId, final String sessionId, final QueryResponse queryResponse) { + this.storeName = storeName; this.timestamp = System.currentTimeMillis(); this.queryId = queryId; this.query = query; this.userId = userId; this.sessionId = sessionId; + this.queryResponse = queryResponse; + } + + /** + * Gets the name of the UBI store. + * @return The name of the UBI store. + */ + public String getStoreName() { + return storeName; } /** @@ -74,4 +89,12 @@ public String getSessionId() { return sessionId; } + /** + * Gets the query response for this query request. + * @return The {@link QueryResponse} for this query request. + */ + public QueryResponse getQueryResponse() { + return queryResponse; + } + } diff --git a/src/main/java/com/o19s/ubi/rest/UserBehaviorInsightsRestHandler.java b/src/main/java/com/o19s/ubi/rest/UserBehaviorInsightsRestHandler.java index 949f150..6a35b19 100644 --- a/src/main/java/com/o19s/ubi/rest/UserBehaviorInsightsRestHandler.java +++ b/src/main/java/com/o19s/ubi/rest/UserBehaviorInsightsRestHandler.java @@ -16,6 +16,7 @@ import com.o19s.ubi.events.Event; import com.o19s.ubi.model.HeaderConstants; import com.o19s.ubi.model.SettingsConstants; +import com.o19s.ubi.utils.UbiUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.opensearch.action.admin.indices.create.CreateIndexRequest; @@ -31,22 +32,13 @@ import org.opensearch.rest.BaseRestHandler; import org.opensearch.rest.BytesRestResponse; import org.opensearch.rest.RestRequest; -import com.o19s.ubi.events.OpenSearchEventManager; -import com.o19s.ubi.utils.UbiUtils; + +import com.o19s.ubi.OpenSearchEventManager; import java.io.IOException; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.Set; -import java.util.UUID; - -import static org.opensearch.rest.RestRequest.Method.DELETE; -import static org.opensearch.rest.RestRequest.Method.GET; -import static org.opensearch.rest.RestRequest.Method.POST; -import static org.opensearch.rest.RestRequest.Method.PUT; -import static org.opensearch.rest.RestRequest.Method.TRACE; +import java.util.*; + +import static org.opensearch.rest.RestRequest.Method.*; /** * The REST handler for User Behavior Insights. The handler provides the diff --git a/src/yamlRestTest/java/org/opensearch/rest/action/UserBehaviorLoggingClientYamlTestSuiteIT.java b/src/yamlRestTest/java/com/o19s/ubi/rest/action/UserBehaviorLoggingClientYamlTestSuiteIT.java similarity index 96% rename from src/yamlRestTest/java/org/opensearch/rest/action/UserBehaviorLoggingClientYamlTestSuiteIT.java rename to src/yamlRestTest/java/com/o19s/ubi/rest/action/UserBehaviorLoggingClientYamlTestSuiteIT.java index cd9165f..ab75960 100644 --- a/src/yamlRestTest/java/org/opensearch/rest/action/UserBehaviorLoggingClientYamlTestSuiteIT.java +++ b/src/yamlRestTest/java/com/o19s/ubi/rest/action/UserBehaviorLoggingClientYamlTestSuiteIT.java @@ -5,7 +5,7 @@ * this file be licensed under the Apache-2.0 license or a * compatible open source license. */ -package org.opensearch.rest.action; +package com.o19s.ubi.rest.action; import com.carrotsearch.randomizedtesting.annotations.Name; import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;