Skip to content
This repository has been archived by the owner on Jun 6, 2024. It is now read-only.

Commit

Permalink
Merge pull request #44 from o19s/queue-interface
Browse files Browse the repository at this point in the history
Making the events queue abstract to future support other backends
  • Loading branch information
jzonthemtn authored Feb 19, 2024
2 parents f26f334 + 7ef6742 commit b4946d3
Show file tree
Hide file tree
Showing 9 changed files with 103 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import org.opensearch.ubl.action.UserBehaviorLoggingRestHandler;
import org.opensearch.ubl.backends.Backend;
import org.opensearch.ubl.backends.OpenSearchBackend;
import org.opensearch.ubl.events.EventManager;
import org.opensearch.ubl.events.OpenSearchEventManager;
import org.opensearch.watcher.ResourceWatcherService;

import java.util.ArrayList;
Expand Down Expand Up @@ -113,9 +113,10 @@ public Collection<Object> createComponents(

LOGGER.info("Creating scheduled task");

// TODO: Only start this if already initialized.
// TODO: Only start this if an OpenSearch store is already initialized.
// Otherwise, start it when a store is initialized.
threadPool.scheduler().scheduleAtFixedRate(() -> {
EventManager.getInstance(client).process();
OpenSearchEventManager.getInstance(client).process();
}, 0, 2000, TimeUnit.MILLISECONDS);

return Collections.emptyList();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,8 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient nod
final String storeName = request.param("store");

LOGGER.info("Persisting event into {}", storeName);
final String event = request.content().utf8ToString();
backend.persistEvent(storeName, event);
final String eventJson = request.content().utf8ToString();
backend.persistEvent(storeName, eventJson);

return (channel) -> channel.sendResponse(new BytesRestResponse(RestStatus.OK, "Event received"));

Expand Down
2 changes: 1 addition & 1 deletion src/main/java/org/opensearch/ubl/backends/Backend.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public interface Backend {

void delete(final String storeName, RestChannel channel);

void persistEvent(final String storeName, String event);
void persistEvent(final String storeName, String eventJson);

void persistQuery(final String storeName, QueryRequest queryRequest, QueryResponse queryResponse) throws Exception;

Expand Down
16 changes: 8 additions & 8 deletions src/main/java/org/opensearch/ubl/backends/OpenSearchBackend.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@
import org.opensearch.rest.RestChannel;
import org.opensearch.rest.action.RestToXContentListener;
import org.opensearch.ubl.SettingsConstants;
import org.opensearch.ubl.events.EventManager;
import org.opensearch.ubl.events.Event;
import org.opensearch.ubl.events.OpenSearchEventManager;
import org.opensearch.ubl.model.QueryRequest;
import org.opensearch.ubl.model.QueryResponse;

Expand Down Expand Up @@ -90,16 +91,15 @@ public void delete(String storeName, RestChannel channel) {
}

@Override
public void persistEvent(String storeName, String event) {
public void persistEvent(String storeName, String eventJson) {

// Add the event for indexing.
LOGGER.info("Indexing event into {}", storeName);
final String eventsIndexName = getEventsIndexName(storeName);
final IndexRequest indexRequest = new IndexRequest(eventsIndexName)
.source(event, XContentType.JSON);

//return (channel) -> client.index(indexRequest, new RestToXContentListener<>(channel));
EventManager.getInstance(client).addIndexRequest(indexRequest);
final Event event = new Event(eventsIndexName, eventJson);
OpenSearchEventManager.getInstance(client).add(event);

}

Expand All @@ -123,8 +123,8 @@ public void persistQuery(final String storeName, final QueryRequest queryRequest
final IndexRequest indexRequest = new IndexRequest(queriesIndexName)
.source(source, XContentType.JSON);

//return (channel) -> client.index(indexRequest, new RestToXContentListener<>(channel));
EventManager.getInstance(client).addIndexRequest(indexRequest);
// TODO: Move this to the queue, too.
client.index(indexRequest);

}

Expand All @@ -148,7 +148,7 @@ private String getResourceFile(final String fileName) {
Streams.copy(is, out);
return out.toString(StandardCharsets.UTF_8);
} catch (IOException e) {
throw new IllegalStateException("failed to create index with resource [" + OpenSearchBackend.EVENTS_MAPPING_FILE + "]", e);
throw new IllegalStateException("Unable to create index with resource [" + OpenSearchBackend.EVENTS_MAPPING_FILE + "]", e);
}
}

Expand Down
31 changes: 31 additions & 0 deletions src/main/java/org/opensearch/ubl/events/AbstractEventManager.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.ubl.events;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.ubl.events.queues.EventQueue;
import org.opensearch.ubl.events.queues.InternalQueue;

public abstract class AbstractEventManager {

private final Logger LOGGER = LogManager.getLogger(AbstractEventManager.class);

protected final EventQueue eventQueue;

public AbstractEventManager() {
this.eventQueue = new InternalQueue();
}

public abstract void process();

public abstract void add(Event event);

}
29 changes: 29 additions & 0 deletions src/main/java/org/opensearch/ubl/events/Event.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.ubl.events;

public class Event {

private final String indexName;
private final String event;

public Event(String indexName, String event) {
this.indexName = indexName;
this.event = event;
}

public String getIndexName() {
return indexName;
}

public String getEvent() {
return event;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -13,31 +13,34 @@
import org.opensearch.action.bulk.BulkRequest;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.client.Client;
import org.opensearch.ubl.events.queues.EventQueue;
import org.opensearch.ubl.events.queues.InternalQueue;
import org.opensearch.common.xcontent.XContentType;

public class EventManager {
public class OpenSearchEventManager extends AbstractEventManager {

private static final Logger LOGGER = LogManager.getLogger(EventManager.class);
private static final Logger LOGGER = LogManager.getLogger(OpenSearchEventManager.class);

private final EventQueue eventQueue;
private final Client client;
private static EventManager eventManager;
private static OpenSearchEventManager openSearchEventManager;

private EventManager(Client client) {
private OpenSearchEventManager(Client client) {
this.client = client;
this.eventQueue = new InternalQueue();
}

@Override
public void process() {

if(eventQueue.size() > 0) {

final BulkRequest bulkRequest = new BulkRequest();
LOGGER.info("Bulk inserting " + eventQueue.size() + " search relevance events");

for (final IndexRequest indexRequest : eventQueue.get()) {
for (final Event event : eventQueue.get()) {

final IndexRequest indexRequest = new IndexRequest(event.getIndexName())
.source(event.getEvent(), XContentType.JSON);

bulkRequest.add(indexRequest);

}

eventQueue.clear();
Expand All @@ -47,15 +50,16 @@ public void process() {

}

public static EventManager getInstance(Client client) {
if(eventManager == null) {
eventManager = new EventManager(client);
}
return eventManager;
@Override
public void add(Event event) {
eventQueue.add(event);
}

public void addIndexRequest(IndexRequest request) {
eventQueue.add(request);
public static OpenSearchEventManager getInstance(Client client) {
if(openSearchEventManager == null) {
openSearchEventManager = new OpenSearchEventManager(client);
}
return openSearchEventManager;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,17 @@
package org.opensearch.ubl.events.queues;

import org.opensearch.action.index.IndexRequest;
import org.opensearch.ubl.events.Event;

import java.util.List;

public interface EventQueue {

void add(IndexRequest indexRequest);
void add(Event event);

void clear();

List<IndexRequest> get();
List<Event> get();

int size();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,18 @@

package org.opensearch.ubl.events.queues;

import org.opensearch.action.index.IndexRequest;
import org.opensearch.ubl.events.Event;

import java.util.LinkedList;
import java.util.List;

public class InternalQueue implements EventQueue {

private static final List<IndexRequest> indexRequests = new LinkedList<>();
private static final List<Event> indexRequests = new LinkedList<>();

@Override
public void add(IndexRequest indexRequest) {
indexRequests.add(indexRequest);
public void add(Event event) {
indexRequests.add(event);
}

@Override
Expand All @@ -28,7 +28,7 @@ public void clear() {
}

@Override
public List<IndexRequest> get() {
public List<Event> get() {
return indexRequests;
}

Expand Down

0 comments on commit b4946d3

Please sign in to comment.