Skip to content
This repository has been archived by the owner on Jun 6, 2024. It is now read-only.

Making the events queue abstract to future support other backends #44

Merged
merged 2 commits into from
Feb 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import org.opensearch.ubl.action.UserBehaviorLoggingRestHandler;
import org.opensearch.ubl.backends.Backend;
import org.opensearch.ubl.backends.OpenSearchBackend;
import org.opensearch.ubl.events.EventManager;
import org.opensearch.ubl.events.OpenSearchEventManager;
import org.opensearch.watcher.ResourceWatcherService;

import java.util.ArrayList;
Expand Down Expand Up @@ -113,9 +113,10 @@ public Collection<Object> createComponents(

LOGGER.info("Creating scheduled task");

// TODO: Only start this if already initialized.
// TODO: Only start this if an OpenSearch store is already initialized.
// Otherwise, start it when a store is initialized.
threadPool.scheduler().scheduleAtFixedRate(() -> {
EventManager.getInstance(client).process();
OpenSearchEventManager.getInstance(client).process();
}, 0, 2000, TimeUnit.MILLISECONDS);

return Collections.emptyList();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,8 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient nod
final String storeName = request.param("store");

LOGGER.info("Persisting event into {}", storeName);
final String event = request.content().utf8ToString();
backend.persistEvent(storeName, event);
final String eventJson = request.content().utf8ToString();
backend.persistEvent(storeName, eventJson);

return (channel) -> channel.sendResponse(new BytesRestResponse(RestStatus.OK, "Event received"));

Expand Down
2 changes: 1 addition & 1 deletion src/main/java/org/opensearch/ubl/backends/Backend.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public interface Backend {

void delete(final String storeName, RestChannel channel);

void persistEvent(final String storeName, String event);
void persistEvent(final String storeName, String eventJson);

void persistQuery(final String storeName, QueryRequest queryRequest, QueryResponse queryResponse) throws Exception;

Expand Down
16 changes: 8 additions & 8 deletions src/main/java/org/opensearch/ubl/backends/OpenSearchBackend.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@
import org.opensearch.rest.RestChannel;
import org.opensearch.rest.action.RestToXContentListener;
import org.opensearch.ubl.SettingsConstants;
import org.opensearch.ubl.events.EventManager;
import org.opensearch.ubl.events.Event;
import org.opensearch.ubl.events.OpenSearchEventManager;
import org.opensearch.ubl.model.QueryRequest;
import org.opensearch.ubl.model.QueryResponse;

Expand Down Expand Up @@ -90,16 +91,15 @@ public void delete(String storeName, RestChannel channel) {
}

@Override
public void persistEvent(String storeName, String event) {
public void persistEvent(String storeName, String eventJson) {

// Add the event for indexing.
LOGGER.info("Indexing event into {}", storeName);
final String eventsIndexName = getEventsIndexName(storeName);
final IndexRequest indexRequest = new IndexRequest(eventsIndexName)
.source(event, XContentType.JSON);

//return (channel) -> client.index(indexRequest, new RestToXContentListener<>(channel));
EventManager.getInstance(client).addIndexRequest(indexRequest);
final Event event = new Event(eventsIndexName, eventJson);
OpenSearchEventManager.getInstance(client).add(event);

}

Expand All @@ -123,8 +123,8 @@ public void persistQuery(final String storeName, final QueryRequest queryRequest
final IndexRequest indexRequest = new IndexRequest(queriesIndexName)
.source(source, XContentType.JSON);

//return (channel) -> client.index(indexRequest, new RestToXContentListener<>(channel));
EventManager.getInstance(client).addIndexRequest(indexRequest);
// TODO: Move this to the queue, too.
client.index(indexRequest);

}

Expand All @@ -148,7 +148,7 @@ private String getResourceFile(final String fileName) {
Streams.copy(is, out);
return out.toString(StandardCharsets.UTF_8);
} catch (IOException e) {
throw new IllegalStateException("failed to create index with resource [" + OpenSearchBackend.EVENTS_MAPPING_FILE + "]", e);
throw new IllegalStateException("Unable to create index with resource [" + OpenSearchBackend.EVENTS_MAPPING_FILE + "]", e);
}
}

Expand Down
31 changes: 31 additions & 0 deletions src/main/java/org/opensearch/ubl/events/AbstractEventManager.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.ubl.events;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.ubl.events.queues.EventQueue;
import org.opensearch.ubl.events.queues.InternalQueue;

public abstract class AbstractEventManager {

private final Logger LOGGER = LogManager.getLogger(AbstractEventManager.class);

protected final EventQueue eventQueue;

public AbstractEventManager() {
this.eventQueue = new InternalQueue();
}

public abstract void process();

public abstract void add(Event event);

}
29 changes: 29 additions & 0 deletions src/main/java/org/opensearch/ubl/events/Event.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.ubl.events;

public class Event {

private final String indexName;
private final String event;

public Event(String indexName, String event) {
this.indexName = indexName;
this.event = event;
}

public String getIndexName() {
return indexName;
}

public String getEvent() {
return event;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -13,31 +13,34 @@
import org.opensearch.action.bulk.BulkRequest;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.client.Client;
import org.opensearch.ubl.events.queues.EventQueue;
import org.opensearch.ubl.events.queues.InternalQueue;
import org.opensearch.common.xcontent.XContentType;

public class EventManager {
public class OpenSearchEventManager extends AbstractEventManager {

private static final Logger LOGGER = LogManager.getLogger(EventManager.class);
private static final Logger LOGGER = LogManager.getLogger(OpenSearchEventManager.class);

private final EventQueue eventQueue;
private final Client client;
private static EventManager eventManager;
private static OpenSearchEventManager openSearchEventManager;

private EventManager(Client client) {
private OpenSearchEventManager(Client client) {
this.client = client;
this.eventQueue = new InternalQueue();
}

@Override
public void process() {

if(eventQueue.size() > 0) {

final BulkRequest bulkRequest = new BulkRequest();
LOGGER.info("Bulk inserting " + eventQueue.size() + " search relevance events");

for (final IndexRequest indexRequest : eventQueue.get()) {
for (final Event event : eventQueue.get()) {

final IndexRequest indexRequest = new IndexRequest(event.getIndexName())
.source(event.getEvent(), XContentType.JSON);

bulkRequest.add(indexRequest);

}

eventQueue.clear();
Expand All @@ -47,15 +50,16 @@ public void process() {

}

public static EventManager getInstance(Client client) {
if(eventManager == null) {
eventManager = new EventManager(client);
}
return eventManager;
@Override
public void add(Event event) {
eventQueue.add(event);
}

public void addIndexRequest(IndexRequest request) {
eventQueue.add(request);
public static OpenSearchEventManager getInstance(Client client) {
if(openSearchEventManager == null) {
openSearchEventManager = new OpenSearchEventManager(client);
}
return openSearchEventManager;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,17 @@
package org.opensearch.ubl.events.queues;

import org.opensearch.action.index.IndexRequest;
import org.opensearch.ubl.events.Event;

import java.util.List;

public interface EventQueue {

void add(IndexRequest indexRequest);
void add(Event event);

void clear();

List<IndexRequest> get();
List<Event> get();

int size();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,18 @@

package org.opensearch.ubl.events.queues;

import org.opensearch.action.index.IndexRequest;
import org.opensearch.ubl.events.Event;

import java.util.LinkedList;
import java.util.List;

public class InternalQueue implements EventQueue {

private static final List<IndexRequest> indexRequests = new LinkedList<>();
private static final List<Event> indexRequests = new LinkedList<>();

@Override
public void add(IndexRequest indexRequest) {
indexRequests.add(indexRequest);
public void add(Event event) {
indexRequests.add(event);
}

@Override
Expand All @@ -28,7 +28,7 @@ public void clear() {
}

@Override
public List<IndexRequest> get() {
public List<Event> get() {
return indexRequests;
}

Expand Down
Loading