Skip to content
This repository has been archived by the owner on Jun 6, 2024. It is now read-only.

Commit

Permalink
Merge pull request #109 from o19s/queue-queries
Browse files Browse the repository at this point in the history
Writing queries to a queue prior to indexing
  • Loading branch information
epugh authored Mar 11, 2024
2 parents ac5cec9 + deb5822 commit ef739e1
Show file tree
Hide file tree
Showing 13 changed files with 238 additions and 166 deletions.
2 changes: 1 addition & 1 deletion index-chorus-data.sh
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#!/bin/bash -e

CHORUS_HOME=`realpath ../chorus-opensearch-edition`
CHORUS_HOME=${1:-`realpath ../chorus-opensearch-edition`}
echo "Using CHORUS_HOME = ${CHORUS_HOME}"

TEMP_FILE=`mktemp`
Expand Down
9 changes: 9 additions & 0 deletions load-test/load-test.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,12 @@ def event_task(self):
}

self.client.post("/_plugins/ubi/mystore", headers=headers, json=data)

@task
def queries_task(self):
headers = {
"Content-Type": "application/json",
"X-ubi-store": "mystore"
}

self.client.get("/ecommerce/_search", headers=headers)
21 changes: 14 additions & 7 deletions load-test/run.sh
Original file line number Diff line number Diff line change
@@ -1,17 +1,24 @@
#!/bin/bash -e

# Delete the store if it exists.
curl -X DELETE http://localhost:9200/_plugins/ubi/mystore
curl -X DELETE "http://localhost:9200/_plugins/ubi/mystore"

# Create the store
curl -X PUT http://localhost:9200/_plugins/ubi/mystore
# Create the store.
curl -X PUT "http://localhost:9200/_plugins/ubi/mystore?index=ecommerce"

# Insert events
locust -f load-test.py --headless -u 1 -r 1 --run-time 10s --host http://localhost:9200
# Index subset of Chorus data.
../index-chorus-data.sh `realpath ../../chorus-opensearch-edition`

# Insert events and queries.
locust -f load-test.py --headless -u 1 -r 1 --run-time 30s --host http://localhost:9200

# Let events index.
sleep 2
sleep 10

# Get count of indexed events.
EVENTS=`curl -s http://localhost:9200/.mystore_events/_count | jq .count`
echo "Found $EVENTS indexed"
echo "Found $EVENTS events"

# Get count of indexed queries.
QUERIES=`curl -s http://localhost:9200/.mystore_queries/_count | jq .count`
echo "Found $QUERIES queries"
121 changes: 121 additions & 0 deletions src/main/java/com/o19s/ubi/OpenSearchEventManager.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package com.o19s.ubi;

import com.o19s.ubi.events.Event;
import com.o19s.ubi.events.EventManager;
import com.o19s.ubi.model.QueryRequest;
import com.o19s.ubi.utils.UbiUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.bulk.BulkRequest;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.client.Client;
import org.opensearch.common.xcontent.XContentType;

import java.util.HashMap;
import java.util.Map;

/**
* An event manager that inserts events into an OpenSearch index.
*/
public class OpenSearchEventManager extends EventManager {

private static final Logger LOGGER = LogManager.getLogger(OpenSearchEventManager.class);

private final Client client;
private static OpenSearchEventManager openSearchEventManager;

private OpenSearchEventManager(Client client) {
this.client = client;
}

@Override
public void process() {

if(eventsQueue.size() > 0) {

final BulkRequest eventsBulkRequest = new BulkRequest();
LOGGER.info("Bulk inserting " + eventsQueue.size() + " UBI events");

for (final Event event : eventsQueue.get()) {

final IndexRequest indexRequest = new IndexRequest(event.getIndexName())
.source(event.getEvent(), XContentType.JSON);

eventsBulkRequest.add(indexRequest);

}

eventsQueue.clear();
client.bulk(eventsBulkRequest);

}

if(queryRequestsQueue.size() > 0) {

final BulkRequest queryRequestsBulkRequest = new BulkRequest();
LOGGER.info("Bulk inserting " + queryRequestsQueue.size() + " UBI queries");

for(final QueryRequest queryRequest : queryRequestsQueue.get()) {

LOGGER.info("Writing query ID {} with response ID {}",
queryRequest.getQueryId(), queryRequest.getQueryResponse().getQueryResponseId());

// What will be indexed - adheres to the queries-mapping.json
final Map<String, Object> source = new HashMap<>();
source.put("timestamp", queryRequest.getTimestamp());
source.put("query_id", queryRequest.getQueryId());
source.put("query", queryRequest.getQuery());
source.put("query_response_id", queryRequest.getQueryResponse().getQueryResponseId());
source.put("query_response_hit_ids", queryRequest.getQueryResponse().getQueryResponseHitIds());
source.put("user_id", queryRequest.getUserId());
source.put("session_id", queryRequest.getSessionId());

// Get the name of the queries.
final String queriesIndexName = UbiUtils.getQueriesIndexName(queryRequest.getStoreName());

// Build the index request.
final IndexRequest indexRequest = new IndexRequest(queriesIndexName)
.source(source, XContentType.JSON);

queryRequestsBulkRequest.add(indexRequest);

}

queryRequestsQueue.clear();
client.bulk(queryRequestsBulkRequest);

}

}

@Override
public void add(final Event event) {
eventsQueue.add(event);
}

@Override
public void add(final QueryRequest queryRequest) {
queryRequestsQueue.add(queryRequest);
}

/**
* Gets a singleton instance of the manager.
* @param client An OpenSearch {@link Client}.
* @return An instance of {@link OpenSearchEventManager}.
*/
public static OpenSearchEventManager getInstance(Client client) {
if(openSearchEventManager == null) {
openSearchEventManager = new OpenSearchEventManager(client);
}
return openSearchEventManager;
}

}
22 changes: 6 additions & 16 deletions src/main/java/com/o19s/ubi/UserBehaviorInsightsPlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,18 @@

package com.o19s.ubi;

import com.o19s.ubi.action.UserBehaviorInsightsActionFilter;
import com.o19s.ubi.model.HeaderConstants;
import com.o19s.ubi.model.SettingsConstants;
import com.o19s.ubi.rest.UserBehaviorInsightsRestHandler;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.support.ActionFilter;
import org.opensearch.client.Client;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.IndexScopedSettings;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.settings.SettingsFilter;
import org.opensearch.common.settings.*;
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.env.Environment;
Expand All @@ -32,19 +32,9 @@
import org.opensearch.rest.RestHeaderDefinition;
import org.opensearch.script.ScriptService;
import org.opensearch.threadpool.ThreadPool;
import com.o19s.ubi.action.UserBehaviorInsightsActionFilter;
import com.o19s.ubi.rest.UserBehaviorInsightsRestHandler;
import com.o19s.ubi.events.OpenSearchEventManager;
import com.o19s.ubi.model.HeaderConstants;
import com.o19s.ubi.model.SettingsConstants;
import org.opensearch.watcher.ResourceWatcherService;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package com.o19s.ubi.action;

import com.o19s.ubi.UserBehaviorInsightsPlugin;
import com.o19s.ubi.events.EventManager;
import com.o19s.ubi.model.HeaderConstants;
import com.o19s.ubi.model.QueryRequest;
import com.o19s.ubi.model.QueryResponse;
Expand All @@ -31,13 +32,9 @@
import org.opensearch.search.SearchHit;
import org.opensearch.tasks.Task;
import org.opensearch.threadpool.ThreadPool;
import com.o19s.ubi.OpenSearchEventManager;

import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.*;

/**
* An implementation of {@link ActionFilter} that passively listens for OpenSearch
Expand All @@ -49,6 +46,7 @@ public class UserBehaviorInsightsActionFilter implements ActionFilter {

private final Client client;
private final ThreadPool threadPool;
private final EventManager eventManager;

/**
* Creates a new filter.
Expand All @@ -58,6 +56,7 @@ public class UserBehaviorInsightsActionFilter implements ActionFilter {
public UserBehaviorInsightsActionFilter(Client client, ThreadPool threadPool) {
this.client = client;
this.threadPool = threadPool;
this.eventManager = OpenSearchEventManager.getInstance(client);
}

@Override
Expand Down Expand Up @@ -90,17 +89,17 @@ public void onResponse(Response response) {

// Get info from the headers.
final String queryId = getHeaderValue(HeaderConstants.QUERY_ID_HEADER, UUID.randomUUID().toString(), task);
final String eventStore = getHeaderValue(HeaderConstants.EVENT_STORE_HEADER, "", task);
final String storeName = getHeaderValue(HeaderConstants.EVENT_STORE_HEADER, "", task);
final String userId = getHeaderValue(HeaderConstants.USER_ID_HEADER, "", task);
final String sessionId = getHeaderValue(HeaderConstants.SESSION_ID_HEADER, "", task);

// If there is no event store header, ignore this search.
if(!"".equals(eventStore)) {
if(!"".equals(storeName)) {

final String index = getStoreSettings(eventStore, SettingsConstants.INDEX);
final String idField = getStoreSettings(eventStore, SettingsConstants.ID_FIELD);
final String index = getStoreSettings(storeName, SettingsConstants.INDEX);
final String idField = getStoreSettings(storeName, SettingsConstants.ID_FIELD);

LOGGER.info("Using id_field [{}] of index [{}] for UBI query.", idField, index);
LOGGER.debug("Using id_field [{}] of index [{}] for UBI query.", idField, index);

// Only consider this search if the index being searched matches the store's index setting.
if (Arrays.asList(searchRequest.indices()).contains(index)) {
Expand Down Expand Up @@ -131,38 +130,31 @@ public void onResponse(Response response) {

}

try {
final QueryResponse queryResponse = new QueryResponse(queryId, queryResponseId, queryResponseHitIds);
final QueryRequest queryRequest = new QueryRequest(storeName, queryId, query, userId, sessionId, queryResponse);

// Persist the query to the backend.
persistQuery(eventStore,
new QueryRequest(queryId, query, userId, sessionId),
new QueryResponse(queryId, queryResponseId, queryResponseHitIds));

} catch (Exception ex) {
// TODO: Handle this.
LOGGER.error("Unable to persist query.", ex);
}
// Queue this for writing to the UBI store.
eventManager.add(queryRequest);

// Add the query_id to the response headers.
threadPool.getThreadContext().addResponseHeader("query_id", queryId);

//}

final long elapsedTime = System.currentTimeMillis() - startTime;
LOGGER.info("UBI search request filter took {} ms", elapsedTime);

}

}
LOGGER.info("Setting and exposing query_id {}", queryId);
//HACK: this should be set in the OpenSearch config (to send to the client code just once),
// and not on every single search response,
// but that server setting doesn't appear to be exposed.
threadPool.getThreadContext().addResponseHeader("Access-Control-Expose-Headers", "query_id");
threadPool.getThreadContext().addResponseHeader("query_id", queryId);

LOGGER.info("Setting and exposing query_id {}", queryId);
//HACK: this should be set in the OpenSearch config (to send to the client code just once),
// and not on every single search response,
// but that server setting doesn't appear to be exposed.
threadPool.getThreadContext().addResponseHeader("Access-Control-Expose-Headers", "query_id");
threadPool.getThreadContext().addResponseHeader("query_id", queryId);
final long elapsedTime = System.currentTimeMillis() - startTime;
LOGGER.info("UBI search request filter took {} ms", elapsedTime);

final long elapsedTime = System.currentTimeMillis() - startTime;
LOGGER.info("UBI search request filter took {} ms", elapsedTime);
}

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,29 +8,36 @@

package com.o19s.ubi.events;

import com.o19s.ubi.model.QueryRequest;
import com.o19s.ubi.model.events.queues.Queue;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import com.o19s.ubi.events.queues.EventQueue;
import com.o19s.ubi.events.queues.InternalQueue;

/**
* Base class for managing client-side events.
*/
public abstract class AbstractEventManager {
public abstract class EventManager {

@SuppressWarnings("unused")
private final Logger LOGGER = LogManager.getLogger(AbstractEventManager.class);
private final Logger LOGGER = LogManager.getLogger(EventManager.class);

/**
* The {@link EventQueue queue} that stores the client-side events.
* The {@link Queue queue} that stores the client-side events.
*/
protected final EventQueue eventQueue;
protected final Queue<Event> eventsQueue;

/**
* The {@link Queue queue} that stores the query reqeusts.
*/
protected final Queue<QueryRequest> queryRequestsQueue;

/**
* Initialize the base client-side event manager.
*/
public AbstractEventManager() {
this.eventQueue = new InternalQueue();
public EventManager() {
this.eventsQueue = new InternalQueue<>();
this.queryRequestsQueue = new InternalQueue<>();
}

/**
Expand All @@ -44,4 +51,10 @@ public AbstractEventManager() {
*/
public abstract void add(Event event);

/**
* Add an event to the queue.
* @param queryRequest A {@link QueryRequest} to be persisted.
*/
public abstract void add(QueryRequest queryRequest);

}
Loading

0 comments on commit ef739e1

Please sign in to comment.