Skip to content
This repository has been archived by the owner on Jun 6, 2024. It is now read-only.

Commit

Permalink
Resolving merge conflicts.
Browse files Browse the repository at this point in the history
  • Loading branch information
jzonthemtn committed Feb 19, 2024
2 parents 25409fd + f26f334 commit 7ef6742
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 26 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ curl -s http://localhost:9200/.awesome_events/_search -H 'Content-Type: applicat
Get queries:

```
curl -s http://localhost:9200/.awesome_queries/_search | jq
curl -s http://localhost:9200/.awesome_queries/_search -H "X-ubi-store: awesome" | jq
```

Delete the store:
Expand Down
15 changes: 15 additions & 0 deletions src/main/java/org/opensearch/ubl/HeaderConstants.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.ubl;

public class HeaderConstants {

public static final String EVENT_STORE_HEADER = "X-ubi-store";

}
23 changes: 17 additions & 6 deletions src/main/java/org/opensearch/ubl/UserBehaviorLoggingPlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,7 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.ubl.action.UserBehaviorLoggingRestHandler;
import org.opensearch.ubl.action.UserBehaviorLoggingSearchFilter;
import org.opensearch.action.support.ActionFilter;
import org.opensearch.ubl.backends.Backend;
import org.opensearch.ubl.backends.OpenSearchBackend;
import org.opensearch.client.Client;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.node.DiscoveryNodes;
Expand All @@ -24,14 +20,19 @@
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.env.Environment;
import org.opensearch.env.NodeEnvironment;
import org.opensearch.ubl.events.OpenSearchEventManager;
import org.opensearch.plugins.ActionPlugin;
import org.opensearch.plugins.Plugin;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.rest.RestController;
import org.opensearch.rest.RestHandler;
import org.opensearch.rest.RestHeaderDefinition;
import org.opensearch.script.ScriptService;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.ubl.action.UserBehaviorLoggingActionFilter;
import org.opensearch.ubl.action.UserBehaviorLoggingRestHandler;
import org.opensearch.ubl.backends.Backend;
import org.opensearch.ubl.backends.OpenSearchBackend;
import org.opensearch.ubl.events.OpenSearchEventManager;
import org.opensearch.watcher.ResourceWatcherService;

import java.util.ArrayList;
Expand All @@ -50,6 +51,16 @@ public class UserBehaviorLoggingPlugin extends Plugin implements ActionPlugin {
private Backend backend;
private ActionFilter userBehaviorLoggingFilter;

@Override
public Collection<RestHeaderDefinition> getRestHeaders() {
return List.of(new RestHeaderDefinition(HeaderConstants.EVENT_STORE_HEADER, false));
}

@Override
public Collection<String> getTaskHeaders() {
return List.of(HeaderConstants.EVENT_STORE_HEADER);
}

@Override
public List<RestHandler> getRestHandlers(final Settings settings,
final RestController restController,
Expand Down Expand Up @@ -98,7 +109,7 @@ public Collection<Object> createComponents(
) {

this.backend = new OpenSearchBackend(client);
this.userBehaviorLoggingFilter = new UserBehaviorLoggingSearchFilter(backend, environment.settings());
this.userBehaviorLoggingFilter = new UserBehaviorLoggingActionFilter(backend, environment.settings(), threadPool);

LOGGER.info("Creating scheduled task");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,27 +16,31 @@
import org.opensearch.action.support.ActionFilter;
import org.opensearch.action.support.ActionFilterChain;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.action.ActionResponse;
import org.opensearch.tasks.Task;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.ubl.model.QueryResponse;
import org.opensearch.ubl.HeaderConstants;
import org.opensearch.ubl.backends.Backend;
import org.opensearch.ubl.model.QueryRequest;
import org.opensearch.tasks.Task;
import org.opensearch.ubl.model.QueryResponse;

import java.util.*;
import java.util.LinkedList;
import java.util.List;
import java.util.UUID;

public class UserBehaviorLoggingSearchFilter implements ActionFilter {
public class UserBehaviorLoggingActionFilter implements ActionFilter {

private static final Logger LOGGER = LogManager.getLogger(UserBehaviorLoggingSearchFilter.class);
private static final Logger LOGGER = LogManager.getLogger(UserBehaviorLoggingActionFilter.class);

private final Backend backend;
private final Settings settings;
private final ThreadPool threadPool;

public UserBehaviorLoggingSearchFilter(final Backend backend, final Settings settings) {
public UserBehaviorLoggingActionFilter(final Backend backend, final Settings settings, ThreadPool threadPool) {
this.backend = backend;
this.settings = settings;
this.threadPool = threadPool;
}

@Override
Expand All @@ -59,15 +63,22 @@ public <Request extends ActionRequest, Response extends ActionResponse> void app
@Override
public void onResponse(Response response) {

LOGGER.info("Query ID header: " + task.getHeader("query-id"));

final long startTime = System.currentTimeMillis();

// Get the search itself.
final SearchRequest searchRequest = (SearchRequest) request;
final String eventStore = task.getHeader(HeaderConstants.EVENT_STORE_HEADER);

// Restrict this to only searches of certain indices specified in the settings.
//final List<String> indices = Arrays.asList(searchRequest.indices());
//final Set<String> indicesToLog = new HashSet<>(Arrays.asList(settings.get(SettingsConstants.INDEX_NAMES).split(",")));
//if(indicesToLog.containsAll(indices)) {
// If there is no event store header we should not continue anything.
if(eventStore != null && !eventStore.trim().isEmpty()) {

// Get the search itself.
final SearchRequest searchRequest = (SearchRequest) request;

// TODO: Restrict logging to only queries of certain indices specified in the settings.
//final List<String> indices = Arrays.asList(searchRequest.indices());
//final Set<String> indicesToLog = new HashSet<>(Arrays.asList(settings.get(SettingsConstants.INDEX_NAMES).split(",")));
//if(indicesToLog.containsAll(indices)) {

// Get all search hits from the response.
if (response instanceof SearchResponse) {
Expand All @@ -82,6 +93,7 @@ public void onResponse(Response response) {
final String queryResponseId = UUID.randomUUID().toString();

final List<String> queryResponseHitIds = new LinkedList<>();

final SearchResponse searchResponse = (SearchResponse) response;

// Add each hit to the list of query responses.
Expand All @@ -92,8 +104,7 @@ public void onResponse(Response response) {
try {

// Persist the query to the backend.
// TODO: How do we know which storeName?
backend.persistQuery("awesome",
backend.persistQuery(eventStore,
new QueryRequest(queryId, query),
new QueryResponse(queryId, queryResponseId, queryResponseHitIds));

Expand All @@ -102,14 +113,16 @@ public void onResponse(Response response) {
LOGGER.error("Unable to persist query.", ex);
}

// TODO: Somehow return the queryId to the client.
threadPool.getThreadContext().addResponseHeader("query_id", queryId);

}

//}
//}

final long elapsedTime = System.currentTimeMillis() - startTime;
LOGGER.info("UBL search request filter took {} ms", elapsedTime);

final long elapsedTime = System.currentTimeMillis() - startTime;
LOGGER.info("UBL search request filter took {} ms", elapsedTime);
}

listener.onResponse(response);

Expand Down

0 comments on commit 7ef6742

Please sign in to comment.