diff --git a/src/main/java/org/opensearch/ubl/UserBehaviorLoggingPlugin.java b/src/main/java/org/opensearch/ubl/UserBehaviorLoggingPlugin.java index 4a74468..a34a1bf 100644 --- a/src/main/java/org/opensearch/ubl/UserBehaviorLoggingPlugin.java +++ b/src/main/java/org/opensearch/ubl/UserBehaviorLoggingPlugin.java @@ -32,7 +32,7 @@ import org.opensearch.ubl.action.UserBehaviorLoggingRestHandler; import org.opensearch.ubl.backends.Backend; import org.opensearch.ubl.backends.OpenSearchBackend; -import org.opensearch.ubl.events.EventManager; +import org.opensearch.ubl.events.OpenSearchEventManager; import org.opensearch.watcher.ResourceWatcherService; import java.util.ArrayList; @@ -113,9 +113,10 @@ public Collection createComponents( LOGGER.info("Creating scheduled task"); - // TODO: Only start this if already initialized. + // TODO: Only start this if an OpenSearch store is already initialized. + // Otherwise, start it when a store is initialized. threadPool.scheduler().scheduleAtFixedRate(() -> { - EventManager.getInstance(client).process(); + OpenSearchEventManager.getInstance(client).process(); }, 0, 2000, TimeUnit.MILLISECONDS); return Collections.emptyList(); diff --git a/src/main/java/org/opensearch/ubl/action/UserBehaviorLoggingRestHandler.java b/src/main/java/org/opensearch/ubl/action/UserBehaviorLoggingRestHandler.java index c6ada81..dc2d4e0 100644 --- a/src/main/java/org/opensearch/ubl/action/UserBehaviorLoggingRestHandler.java +++ b/src/main/java/org/opensearch/ubl/action/UserBehaviorLoggingRestHandler.java @@ -72,8 +72,8 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient nod final String storeName = request.param("store"); LOGGER.info("Persisting event into {}", storeName); - final String event = request.content().utf8ToString(); - backend.persistEvent(storeName, event); + final String eventJson = request.content().utf8ToString(); + backend.persistEvent(storeName, eventJson); return (channel) -> channel.sendResponse(new BytesRestResponse(RestStatus.OK, "Event received")); diff --git a/src/main/java/org/opensearch/ubl/backends/Backend.java b/src/main/java/org/opensearch/ubl/backends/Backend.java index 2c76064..9ed461d 100644 --- a/src/main/java/org/opensearch/ubl/backends/Backend.java +++ b/src/main/java/org/opensearch/ubl/backends/Backend.java @@ -20,7 +20,7 @@ public interface Backend { void delete(final String storeName, RestChannel channel); - void persistEvent(final String storeName, String event); + void persistEvent(final String storeName, String eventJson); void persistQuery(final String storeName, QueryRequest queryRequest, QueryResponse queryResponse) throws Exception; diff --git a/src/main/java/org/opensearch/ubl/backends/OpenSearchBackend.java b/src/main/java/org/opensearch/ubl/backends/OpenSearchBackend.java index 2956a93..1997ad9 100644 --- a/src/main/java/org/opensearch/ubl/backends/OpenSearchBackend.java +++ b/src/main/java/org/opensearch/ubl/backends/OpenSearchBackend.java @@ -21,7 +21,8 @@ import org.opensearch.rest.RestChannel; import org.opensearch.rest.action.RestToXContentListener; import org.opensearch.ubl.SettingsConstants; -import org.opensearch.ubl.events.EventManager; +import org.opensearch.ubl.events.Event; +import org.opensearch.ubl.events.OpenSearchEventManager; import org.opensearch.ubl.model.QueryRequest; import org.opensearch.ubl.model.QueryResponse; @@ -90,16 +91,15 @@ public void delete(String storeName, RestChannel channel) { } @Override - public void persistEvent(String storeName, String event) { + public void persistEvent(String storeName, String eventJson) { // Add the event for indexing. LOGGER.info("Indexing event into {}", storeName); final String eventsIndexName = getEventsIndexName(storeName); - final IndexRequest indexRequest = new IndexRequest(eventsIndexName) - .source(event, XContentType.JSON); //return (channel) -> client.index(indexRequest, new RestToXContentListener<>(channel)); - EventManager.getInstance(client).addIndexRequest(indexRequest); + final Event event = new Event(eventsIndexName, eventJson); + OpenSearchEventManager.getInstance(client).add(event); } @@ -123,8 +123,8 @@ public void persistQuery(final String storeName, final QueryRequest queryRequest final IndexRequest indexRequest = new IndexRequest(queriesIndexName) .source(source, XContentType.JSON); - //return (channel) -> client.index(indexRequest, new RestToXContentListener<>(channel)); - EventManager.getInstance(client).addIndexRequest(indexRequest); + // TODO: Move this to the queue, too. + client.index(indexRequest); } @@ -148,7 +148,7 @@ private String getResourceFile(final String fileName) { Streams.copy(is, out); return out.toString(StandardCharsets.UTF_8); } catch (IOException e) { - throw new IllegalStateException("failed to create index with resource [" + OpenSearchBackend.EVENTS_MAPPING_FILE + "]", e); + throw new IllegalStateException("Unable to create index with resource [" + OpenSearchBackend.EVENTS_MAPPING_FILE + "]", e); } } diff --git a/src/main/java/org/opensearch/ubl/events/AbstractEventManager.java b/src/main/java/org/opensearch/ubl/events/AbstractEventManager.java new file mode 100644 index 0000000..88f8093 --- /dev/null +++ b/src/main/java/org/opensearch/ubl/events/AbstractEventManager.java @@ -0,0 +1,31 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.ubl.events; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.action.index.IndexRequest; +import org.opensearch.ubl.events.queues.EventQueue; +import org.opensearch.ubl.events.queues.InternalQueue; + +public abstract class AbstractEventManager { + + private final Logger LOGGER = LogManager.getLogger(AbstractEventManager.class); + + protected final EventQueue eventQueue; + + public AbstractEventManager() { + this.eventQueue = new InternalQueue(); + } + + public abstract void process(); + + public abstract void add(Event event); + +} diff --git a/src/main/java/org/opensearch/ubl/events/Event.java b/src/main/java/org/opensearch/ubl/events/Event.java new file mode 100644 index 0000000..4f38e65 --- /dev/null +++ b/src/main/java/org/opensearch/ubl/events/Event.java @@ -0,0 +1,29 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.ubl.events; + +public class Event { + + private final String indexName; + private final String event; + + public Event(String indexName, String event) { + this.indexName = indexName; + this.event = event; + } + + public String getIndexName() { + return indexName; + } + + public String getEvent() { + return event; + } + +} diff --git a/src/main/java/org/opensearch/ubl/events/EventManager.java b/src/main/java/org/opensearch/ubl/events/OpenSearchEventManager.java similarity index 51% rename from src/main/java/org/opensearch/ubl/events/EventManager.java rename to src/main/java/org/opensearch/ubl/events/OpenSearchEventManager.java index ec0cac8..80a71ef 100644 --- a/src/main/java/org/opensearch/ubl/events/EventManager.java +++ b/src/main/java/org/opensearch/ubl/events/OpenSearchEventManager.java @@ -13,22 +13,20 @@ import org.opensearch.action.bulk.BulkRequest; import org.opensearch.action.index.IndexRequest; import org.opensearch.client.Client; -import org.opensearch.ubl.events.queues.EventQueue; -import org.opensearch.ubl.events.queues.InternalQueue; +import org.opensearch.common.xcontent.XContentType; -public class EventManager { +public class OpenSearchEventManager extends AbstractEventManager { - private static final Logger LOGGER = LogManager.getLogger(EventManager.class); + private static final Logger LOGGER = LogManager.getLogger(OpenSearchEventManager.class); - private final EventQueue eventQueue; private final Client client; - private static EventManager eventManager; + private static OpenSearchEventManager openSearchEventManager; - private EventManager(Client client) { + private OpenSearchEventManager(Client client) { this.client = client; - this.eventQueue = new InternalQueue(); } + @Override public void process() { if(eventQueue.size() > 0) { @@ -36,8 +34,13 @@ public void process() { final BulkRequest bulkRequest = new BulkRequest(); LOGGER.info("Bulk inserting " + eventQueue.size() + " search relevance events"); - for (final IndexRequest indexRequest : eventQueue.get()) { + for (final Event event : eventQueue.get()) { + + final IndexRequest indexRequest = new IndexRequest(event.getIndexName()) + .source(event.getEvent(), XContentType.JSON); + bulkRequest.add(indexRequest); + } eventQueue.clear(); @@ -47,15 +50,16 @@ public void process() { } - public static EventManager getInstance(Client client) { - if(eventManager == null) { - eventManager = new EventManager(client); - } - return eventManager; + @Override + public void add(Event event) { + eventQueue.add(event); } - public void addIndexRequest(IndexRequest request) { - eventQueue.add(request); + public static OpenSearchEventManager getInstance(Client client) { + if(openSearchEventManager == null) { + openSearchEventManager = new OpenSearchEventManager(client); + } + return openSearchEventManager; } } diff --git a/src/main/java/org/opensearch/ubl/events/queues/EventQueue.java b/src/main/java/org/opensearch/ubl/events/queues/EventQueue.java index c66c571..31afec3 100644 --- a/src/main/java/org/opensearch/ubl/events/queues/EventQueue.java +++ b/src/main/java/org/opensearch/ubl/events/queues/EventQueue.java @@ -9,16 +9,17 @@ package org.opensearch.ubl.events.queues; import org.opensearch.action.index.IndexRequest; +import org.opensearch.ubl.events.Event; import java.util.List; public interface EventQueue { - void add(IndexRequest indexRequest); + void add(Event event); void clear(); - List get(); + List get(); int size(); diff --git a/src/main/java/org/opensearch/ubl/events/queues/InternalQueue.java b/src/main/java/org/opensearch/ubl/events/queues/InternalQueue.java index 279be1c..4266b78 100644 --- a/src/main/java/org/opensearch/ubl/events/queues/InternalQueue.java +++ b/src/main/java/org/opensearch/ubl/events/queues/InternalQueue.java @@ -8,18 +8,18 @@ package org.opensearch.ubl.events.queues; -import org.opensearch.action.index.IndexRequest; +import org.opensearch.ubl.events.Event; import java.util.LinkedList; import java.util.List; public class InternalQueue implements EventQueue { - private static final List indexRequests = new LinkedList<>(); + private static final List indexRequests = new LinkedList<>(); @Override - public void add(IndexRequest indexRequest) { - indexRequests.add(indexRequest); + public void add(Event event) { + indexRequests.add(event); } @Override @@ -28,7 +28,7 @@ public void clear() { } @Override - public List get() { + public List get() { return indexRequests; }