diff --git a/.github/ISSUE_TEMPLATE/BUG_TEMPLATE.md b/.github/ISSUE_TEMPLATE/BUG_TEMPLATE.md new file mode 100644 index 0000000..c8fe373 --- /dev/null +++ b/.github/ISSUE_TEMPLATE/BUG_TEMPLATE.md @@ -0,0 +1,24 @@ +--- +name: Bug report +about: Create a report to help us improve +title: '[BUG]' +labels: 'bug, untriaged' +assignees: '' +--- +### What is the bug? +_A clear and concise description of the bug._ + +### How can one reproduce the bug? +_Steps to reproduce the behavior._ + +### What is the expected behavior? +_A clear and concise description of what you expected to happen._ + +### What is your host/environment? +_Operating system, version._ + +### Do you have any screenshots? +_If applicable, add screenshots to help explain your problem._ + +### Do you have any additional context? +_Add any other context about the problem._ \ No newline at end of file diff --git a/.github/ISSUE_TEMPLATE/FEATURE_REQUEST_TEMPLATE.md b/.github/ISSUE_TEMPLATE/FEATURE_REQUEST_TEMPLATE.md new file mode 100644 index 0000000..21b62ae --- /dev/null +++ b/.github/ISSUE_TEMPLATE/FEATURE_REQUEST_TEMPLATE.md @@ -0,0 +1,18 @@ +--- +name: Feature request +about: Request a feature in this project +title: '[FEATURE]' +labels: 'enhancement, untriaged' +assignees: '' +--- +### Is your feature request related to a problem? +_A clear and concise description of what the problem is, e.g. I'm always frustrated when [...]._ + +### What solution would you like? +_A clear and concise description of what you want to happen._ + +### What alternatives have you considered? +_A clear and concise description of any alternative solutions or features you've considered._ + +### Do you have any additional context? +_Add any other context or screenshots about the feature request here._ \ No newline at end of file diff --git a/.github/ISSUE_TEMPLATE/PROPOSAL_TEMPLATE.md b/.github/ISSUE_TEMPLATE/PROPOSAL_TEMPLATE.md new file mode 100644 index 0000000..08a6bb7 --- /dev/null +++ b/.github/ISSUE_TEMPLATE/PROPOSAL_TEMPLATE.md @@ -0,0 +1,40 @@ +--- +name: Proposal +about: Suggest an idea for a specific feature you wish to propose to the community for comment +title: '[PROPOSAL]' +labels: proposal +assignees: '' +--- +## What/Why +### What are you proposing? +_In a few sentences, describe the feature and its core capabilities._ + +### What users have asked for this feature? +_Highlight any research, proposals, requests or anecdotes that signal this is the right thing to build. Include links to GitHub Issues, Forums, Stack Overflow, Twitter, Etc_ + +### What problems are you trying to solve? +_Summarize the core use cases and user problems and needs you are trying to solve. Describe the most important user needs, pain points and jobs as expressed by the user asks above. Template: When \ , a \ wants to \, so they can \. (Example: When **searching by postal code**, **a buyer** wants to **be required to enter a valid code** so they **don’t waste time searching for a clearly invalid postal code.**)_ + +### What is the developer experience going to be? +_Does this have a REST API? If so, please describe the API and any impact it may have to existing APIs. In a brief summary (not a spec), highlight what new REST APIs or changes to REST APIs are planned. as well as any other API, CLI or Configuration changes that are planned as part of this feature._ + +#### Are there any security considerations? +_Describe if the feature has any security considerations or impact. What is the security model of the new APIs? Features should be integrated into the OpenSearch security suite and so if they are not, we should highlight the reasons here._ + +#### Are there any breaking changes to the API +_If this feature will require breaking changes to any APIs, ouline what those are and why they are needed. What is the path to minimizing impact? (example, add new API and deprecate the old one)_ + +### What is the user experience going to be? +_Describe the feature requirements and or user stories. You may include low-fidelity sketches, wireframes, APIs stubs, or other examples of how a user would use the feature via CLI, OpenSearch Dashboards, REST API, etc. Using a bulleted list or simple diagrams to outline features is okay. If this is net new functionality, call this out as well._ + +#### Are there breaking changes to the User Experience? +_Will this change the existing user experience? Will this be a breaking change from a user flow or user experience perspective?_ + +### Why should it be built? Any reason not to? +_Describe the value that this feature will bring to the OpenSearch community, as well as what impact it has if it isn't built, or new risks if it is. Highlight opportunities for additional research._ + +### What will it take to execute? +_Describe what it will take to build this feature. Are there any assumptions you may be making that could limit scope or add limitations? Are there performance, cost, or technical constraints that may impact the user experience? Does this feature depend on other feature work? What additional risks are there?_ + +### Any remaining open questions? +_What are known enhancements to this feature? Any enhancements that may be out of scope but that we will want to track long term? List any other open questions that may need to be answered before proceeding with an implementation._ \ No newline at end of file diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml new file mode 100644 index 0000000..f8c1b37 --- /dev/null +++ b/.github/workflows/release.yml @@ -0,0 +1,43 @@ +name: Tag and publish a release + +on: + push: + tags: + - 'v*.*.*' + branches: [ test-release ] + +jobs: + release: + runs-on: ubuntu-latest + permissions: + contents: write + steps: + - uses: actions/checkout@v2 + - name: Set release version Name + run: echo "RELEASE_VERSION=${GITHUB_REF#refs/*/}" >> $GITHUB_ENV + - name: Set up JDK 17.0 + uses: actions/setup-java@v1 + with: + java-version: 17.0 + - name: Grant execute permission for gradlew + run: chmod +x gradlew + - name: Build with Gradle + run: ./gradlew -Dtests.security.manager=false build + - name: Rename build assets + run: cp ./build/distributions/opensearch-ubi-*.zip ./opensearch-ubi-plugin-${{ env.RELEASE_VERSION }}.zip + - name: Create Release + id: create_release + uses: ncipollo/release-action@v1 + with: + artifacts: "./opensearch-ubi-plugin-${{ env.RELEASE_VERSION }}.zip" + token: ${{ secrets.GITHUB_TOKEN }} + tag: "release-${{ env.RELEASE_VERSION }}" + - name: Upload Release Asset + id: upload-release-asset + uses: softprops/action-gh-release@v1 + with: + token: ${{ secrets.GITHUB_TOKEN }} + tag_name: "release-${{ env.RELEASE_VERSION }}" + #upload_url: ${{ steps.create_release.outputs.upload_url }} + files: ./opensearch-ubi-plugin-${{ env.RELEASE_VERSION }}.zip + name: ${{ env.RELEASE_VERSION }} diff --git a/README.md b/README.md index 293e0ce..5e9bd27 100644 --- a/README.md +++ b/README.md @@ -18,10 +18,10 @@ Start the containers: `docker compose up` -Initialize the `awesome` UBL store: +Initialize the `awesome` UBI store: ``` -curl -X PUT http://localhost:9200/_plugins/ubi/awesome +curl -X PUT "http://localhost:9200/_plugins/ubi/awesome?index=ecommerce&id_field=name" ``` Send an event to the `awesome` store: @@ -40,10 +40,16 @@ curl -s http://localhost:9200/.awesome_events/_search | jq curl -s http://localhost:9200/.awesome_events/_search -H 'Content-Type: application/json' -d '{"query": {"term": {"type": "instant-search"}}}' | jq ``` +Do a search of the `ecommerce` index: + +``` +curl -s http://localhost:9200/ecommerce/_search -H "X-ubi-store: awesome" | jq +``` + Get queries: ``` -curl -s http://localhost:9200/.awesome_queries/_search -H "X-ubi-store: awesome" | jq +curl -s http://localhost:9200/.awesome_queries/_search | jq ``` Delete the store: @@ -52,6 +58,12 @@ Delete the store: curl -X DELETE http://localhost:9200/_plugins/ubi/awesome ``` +Get the stores: + +``` +curl http://localhost:9200/_plugins/ubi +``` + ## Load Test The `load-test` directory contains a basic load testing example. The purpose of the files under `load-test` are to provide a means of testing the plugin's ability to receive and store a large number of events over time. To use the load test, first start OpenSearch on `localhost:9200`, and then: @@ -74,4 +86,4 @@ POST /_plugins/ubi/mystore Found 8 indexed ``` -This shows 8 total requests made by locust, and 8 events are in the index. The idea being we can assert that the number of events sent matches the events stored in the index. \ No newline at end of file +This shows 8 total requests made by locust, and 8 events are in the index. The idea being we can assert that the number of events sent matches the events stored in the index. diff --git a/build.gradle b/build.gradle index 915ee5c..2eb099d 100644 --- a/build.gradle +++ b/build.gradle @@ -7,12 +7,12 @@ apply plugin: 'maven-publish' opensearchplugin { name 'opensearch-ubi' description 'OpenSearch User Behavior Insights Plugin' - classname 'org.opensearch.ubi.UserBehaviorInsightsPlugin' + classname 'com.o19s.ubi.UserBehaviorInsightsPlugin' licenseFile rootProject.file('LICENSE.txt') noticeFile rootProject.file('NOTICE.txt') } -group = 'org.opensearch' +group = 'com.o19s' version = "${ubiVersion}-os${opensearchVersion}" // disabling some unnecessary validations for this plugin @@ -61,7 +61,7 @@ publishing { pom { name = "opensearch-ubi" description = "Provides User Behavior Insights for OpenSearch" - groupId = "org.opensearch" + groupId = "com.o19s" licenses { license { name = "The Apache License, Version 2.0" diff --git a/documentation.md b/documentation.md index 60ef95a..5ca5ed3 100644 --- a/documentation.md +++ b/documentation.md @@ -16,8 +16,8 @@ index is used to store events, and the other index is for storing queries. #### OpenSearch Data Mappings -* The current event mappings file can be found [here](https://github.com/o19s/opensearch-ubi/blob/main/src/main/resources/org/opensearch/ubi/backends/events-mapping.json). -* The current query mappings file can be found [here](https://github.com/o19s/opensearch-ubi/blob/main/src/main/resources/org/opensearch/ubi/backends/queries-mapping.json). +* The current event mappings file can be found [here](https://github.com/o19s/opensearch-ubi/blob/main/src/main/resources/events-mapping.json). +* The current query mappings file can be found [here](https://github.com/o19s/opensearch-ubi/blob/main/src/main/resources/queries-mapping.json). Schema for events: diff --git a/index-chorus-data.sh b/index-chorus-data.sh new file mode 100755 index 0000000..44d34e6 --- /dev/null +++ b/index-chorus-data.sh @@ -0,0 +1,17 @@ +#!/bin/bash -e + +CHORUS_HOME=${1:-`realpath ../chorus-opensearch-edition`} +echo "Using CHORUS_HOME = ${CHORUS_HOME}" + +TEMP_FILE=`mktemp` +head -n 50 ${CHORUS_HOME}/transformed_data.json > ${TEMP_FILE} + +echo "Deleting index" +curl -s -X DELETE "localhost:9200/ecommerce" + +echo "Creating index" +curl -s -X PUT "localhost:9200/ecommerce" -H "Content-Type: application/json" --data-binary @${CHORUS_HOME}/opensearch/schema.json +curl -s -X PUT "localhost:9200/ecommerce/_settings" -H "Content-Type: application/json" -d '{"index.mapping.total_fields.limit": 20000}' + +echo "Indexing data" +curl -s -X POST "localhost:9200/ecommerce/_bulk?pretty" -H "Content-Type: application/json" --data-binary @${TEMP_FILE} diff --git a/load-test/load-test.py b/load-test/load-test.py index d818ad3..780c359 100644 --- a/load-test/load-test.py +++ b/load-test/load-test.py @@ -22,3 +22,12 @@ def event_task(self): } self.client.post("/_plugins/ubi/mystore", headers=headers, json=data) + + @task + def queries_task(self): + headers = { + "Content-Type": "application/json", + "X-ubi-store": "mystore" + } + + self.client.get("/ecommerce/_search", headers=headers) \ No newline at end of file diff --git a/load-test/run.sh b/load-test/run.sh index fc36a81..0f2df5a 100755 --- a/load-test/run.sh +++ b/load-test/run.sh @@ -1,17 +1,24 @@ #!/bin/bash -e # Delete the store if it exists. -curl -X DELETE http://localhost:9200/_plugins/ubi/mystore +curl -X DELETE "http://localhost:9200/_plugins/ubi/mystore" -# Create the store -curl -X PUT http://localhost:9200/_plugins/ubi/mystore +# Create the store. +curl -X PUT "http://localhost:9200/_plugins/ubi/mystore?index=ecommerce" -# Insert events -locust -f load-test.py --headless -u 1 -r 1 --run-time 10s --host http://localhost:9200 +# Index subset of Chorus data. +../index-chorus-data.sh `realpath ../../chorus-opensearch-edition` + +# Insert events and queries. +locust -f load-test.py --headless -u 50 -r 10 --run-time 300s --host http://localhost:9200 # Let events index. -sleep 2 +sleep 30 # Get count of indexed events. EVENTS=`curl -s http://localhost:9200/.mystore_events/_count | jq .count` -echo "Found $EVENTS indexed" +echo "Found $EVENTS events" + +# Get count of indexed queries. +QUERIES=`curl -s http://localhost:9200/.mystore_queries/_count | jq .count` +echo "Found $QUERIES queries" diff --git a/src/main/java/org/opensearch/ubi/UserBehaviorInsightsPlugin.java b/src/main/java/com/o19s/ubi/UserBehaviorInsightsPlugin.java similarity index 75% rename from src/main/java/org/opensearch/ubi/UserBehaviorInsightsPlugin.java rename to src/main/java/com/o19s/ubi/UserBehaviorInsightsPlugin.java index 1f5744c..3b451a4 100644 --- a/src/main/java/org/opensearch/ubi/UserBehaviorInsightsPlugin.java +++ b/src/main/java/com/o19s/ubi/UserBehaviorInsightsPlugin.java @@ -6,8 +6,13 @@ * compatible open source license. */ -package org.opensearch.ubi; +package com.o19s.ubi; +import com.o19s.ubi.action.UserBehaviorInsightsActionFilter; +import com.o19s.ubi.data.OpenSearchDataManager; +import com.o19s.ubi.model.HeaderConstants; +import com.o19s.ubi.model.SettingsConstants; +import com.o19s.ubi.rest.UserBehaviorInsightsRestHandler; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.opensearch.action.support.ActionFilter; @@ -28,29 +33,28 @@ import org.opensearch.rest.RestHeaderDefinition; import org.opensearch.script.ScriptService; import org.opensearch.threadpool.ThreadPool; -import org.opensearch.ubi.action.UserBehaviorInsightsActionFilter; -import org.opensearch.ubi.action.UserBehaviorInsightsRestHandler; -import org.opensearch.ubi.backends.Backend; -import org.opensearch.ubi.backends.OpenSearchBackend; -import org.opensearch.ubi.events.OpenSearchEventManager; import org.opensearch.watcher.ResourceWatcherService; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.List; +import java.util.*; import java.util.concurrent.TimeUnit; import java.util.function.Supplier; import static java.util.Collections.singletonList; +/** + * OpenSearch User Behavior Insights Plugin + */ public class UserBehaviorInsightsPlugin extends Plugin implements ActionPlugin { private static final Logger LOGGER = LogManager.getLogger(UserBehaviorInsightsPlugin.class); - private Backend backend; private ActionFilter userBehaviorLoggingFilter; + /** + * A map that caches store settings to avoid round-trip calls to the index. + */ + public static final Map storeSettings = new HashMap<>(); + @Override public Collection getRestHeaders() { return List.of( @@ -80,7 +84,7 @@ public List getRestHandlers(final Settings settings, final IndexNameExpressionResolver indexNameExpressionResolver, final Supplier nodesInCluster) { - return singletonList(new UserBehaviorInsightsRestHandler(backend)); + return singletonList(new UserBehaviorInsightsRestHandler()); } @@ -88,10 +92,10 @@ public List getRestHandlers(final Settings settings, public List> getSettings() { final List> settings = new ArrayList<>(); - settings.add(Setting.simpleString(SettingsConstants.INDEX_NAMES, "", Setting.Property.NodeScope)); - // The version of the index mapping. settings.add(Setting.intSetting(SettingsConstants.VERSION_SETTING, 1, -1, Integer.MAX_VALUE, Setting.Property.IndexScope)); + settings.add(Setting.simpleString(SettingsConstants.INDEX, "", Setting.Property.IndexScope)); + settings.add(Setting.simpleString(SettingsConstants.ID_FIELD, "", Setting.Property.IndexScope)); return settings; @@ -99,7 +103,6 @@ public List> getSettings() { @Override public List getActionFilters() { - // LOGGER.info("Index name: {}", settings.get(ConfigConstants.INDEX_NAME)); return singletonList(userBehaviorLoggingFilter); } @@ -118,25 +121,23 @@ public Collection createComponents( Supplier repositoriesServiceSupplier ) { - this.backend = new OpenSearchBackend(client); - this.userBehaviorLoggingFilter = new UserBehaviorInsightsActionFilter(backend, environment.settings(), threadPool); + // TODO: Allow the parameters of the scheduled tasks to be configurable. - LOGGER.info("Creating scheduled task"); + LOGGER.info("Starting UBI scheduled task to persist events."); + threadPool.scheduler().scheduleWithFixedDelay(() -> { + OpenSearchDataManager.getInstance(client).processEvents(); + }, 5000, 2000, TimeUnit.MILLISECONDS); - // TODO: Only start this if an OpenSearch store is already initialized. - // Otherwise, start it when a store is initialized. - threadPool.scheduler().scheduleAtFixedRate(() -> { - OpenSearchEventManager.getInstance(client).process(); - }, 0, 2000, TimeUnit.MILLISECONDS); + LOGGER.info("Starting UBI scheduled task to persist queries."); + threadPool.scheduler().scheduleWithFixedDelay(() -> { + OpenSearchDataManager.getInstance(client).processQueries(); + }, 5000, 2000, TimeUnit.MILLISECONDS); + + // Initialize the action filter. + this.userBehaviorLoggingFilter = new UserBehaviorInsightsActionFilter(client, threadPool); return Collections.emptyList(); } -// @Override -// public void close() { -// LOGGER.info("Stopping scheduled runnable."); -// FutureUtils.cancel(scheduled); -// } - } diff --git a/src/main/java/com/o19s/ubi/action/UserBehaviorInsightsActionFilter.java b/src/main/java/com/o19s/ubi/action/UserBehaviorInsightsActionFilter.java new file mode 100644 index 0000000..4e92cd7 --- /dev/null +++ b/src/main/java/com/o19s/ubi/action/UserBehaviorInsightsActionFilter.java @@ -0,0 +1,248 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package com.o19s.ubi.action; + +import com.o19s.ubi.UserBehaviorInsightsPlugin; +import com.o19s.ubi.data.DataManager; +import com.o19s.ubi.model.HeaderConstants; +import com.o19s.ubi.model.QueryRequest; +import com.o19s.ubi.model.QueryResponse; +import com.o19s.ubi.model.SettingsConstants; +import com.o19s.ubi.utils.UbiUtils; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.action.ActionRequest; +import org.opensearch.action.admin.indices.settings.get.GetSettingsRequest; +import org.opensearch.action.admin.indices.settings.get.GetSettingsResponse; +import org.opensearch.action.index.IndexRequest; +import org.opensearch.action.search.SearchRequest; +import org.opensearch.action.search.SearchResponse; +import org.opensearch.action.support.ActionFilter; +import org.opensearch.action.support.ActionFilterChain; +import org.opensearch.client.Client; +import org.opensearch.common.xcontent.XContentType; +import org.opensearch.core.action.ActionListener; +import org.opensearch.core.action.ActionResponse; +import org.opensearch.search.SearchHit; +import org.opensearch.tasks.Task; +import org.opensearch.threadpool.ThreadPool; +import com.o19s.ubi.data.OpenSearchDataManager; + +import java.util.*; + +/** + * An implementation of {@link ActionFilter} that passively listens for OpenSearch + * queries and persists the queries to the UBI store. + */ +public class UserBehaviorInsightsActionFilter implements ActionFilter { + + private static final Logger LOGGER = LogManager.getLogger(UserBehaviorInsightsActionFilter.class); + + private final Client client; + private final ThreadPool threadPool; + private final DataManager dataManager; + + /** + * Creates a new filter. + * @param client An OpenSearch {@link Client}. + * @param threadPool The OpenSearch {@link ThreadPool}. + */ + public UserBehaviorInsightsActionFilter(Client client, ThreadPool threadPool) { + this.client = client; + this.threadPool = threadPool; + this.dataManager = OpenSearchDataManager.getInstance(client); + } + + @Override + public int order() { + return Integer.MAX_VALUE; + } + + @Override + public void apply( + Task task, String action, Request request, ActionListener listener, + ActionFilterChain chain) { + + if (!(request instanceof SearchRequest)) { + chain.proceed(task, action, request, listener); + return; + } + + chain.proceed(task, action, request, new ActionListener<>() { + + @Override + public void onResponse(Response response) { + + final long startTime = System.currentTimeMillis(); + + // Get the search itself. + final SearchRequest searchRequest = (SearchRequest) request; + + // Get all search hits from the response. + if (response instanceof SearchResponse) { + + // Get info from the headers. + final String queryId = getHeaderValue(HeaderConstants.QUERY_ID_HEADER, UUID.randomUUID().toString(), task); + final String storeName = getHeaderValue(HeaderConstants.EVENT_STORE_HEADER, "", task); + final String userId = getHeaderValue(HeaderConstants.USER_ID_HEADER, "", task); + final String sessionId = getHeaderValue(HeaderConstants.SESSION_ID_HEADER, "", task); + + // If there is no event store header, ignore this search. + if(!"".equals(storeName)) { + + final String index = getStoreSettings(storeName, SettingsConstants.INDEX); + final String idField = getStoreSettings(storeName, SettingsConstants.ID_FIELD); + + LOGGER.debug("Using id_field [{}] of index [{}] for UBI query.", idField, index); + + // Only consider this search if the index being searched matches the store's index setting. + if (Arrays.asList(searchRequest.indices()).contains(index)) { + + // The query will be empty when there is no query, e.g. /_search + final String query = searchRequest.source().toString(); + + // Create a UUID for this search response. + final String queryResponseId = UUID.randomUUID().toString(); + + final List queryResponseHitIds = new LinkedList<>(); + final SearchResponse searchResponse = (SearchResponse) response; + + // Add each hit to the list of query responses. + for (final SearchHit hit : searchResponse.getHits()) { + + if (idField == null || "".equals(idField) || idField.equals("null")) { + + // Use the _id since there is no id_field setting for this index. + queryResponseHitIds.add(String.valueOf(hit.docId())); + + } else { + + final Map source = hit.getSourceAsMap(); + queryResponseHitIds.add((String) source.get(idField)); + + } + + } + + final QueryResponse queryResponse = new QueryResponse(queryId, queryResponseId, queryResponseHitIds); + final QueryRequest queryRequest = new QueryRequest(storeName, queryId, query, userId, sessionId, queryResponse); + + // Queue this for writing to the UBI store. + dataManager.add(queryRequest); + + // Add the query_id to the response headers. + threadPool.getThreadContext().addResponseHeader("query_id", queryId); + + //HACK: this should be set in the OpenSearch config (to send to the client code just once), + // and not on every single search response, + // but that server setting doesn't appear to be exposed. + threadPool.getThreadContext().addResponseHeader("Access-Control-Expose-Headers", "query_id"); + + } else { + LOGGER.trace("Discarding query for UBI due to index name mismatch."); + } + + final long elapsedTime = System.currentTimeMillis() - startTime; + LOGGER.trace("UBI search request filter took {} ms", elapsedTime); + + } else { + LOGGER.trace("Discarding query for UBI due to missing store name."); + } + + } + + listener.onResponse(response); + + } + + @Override + public void onFailure(Exception ex) { + listener.onFailure(ex); + } + + }); + + } + + private String getStoreSettings(final String storeName, final String setting) { + + final String key = storeName + "." + setting; + final String value; + + if(UserBehaviorInsightsPlugin.storeSettings.containsKey(key)) { + + LOGGER.debug("Getting setting " + setting + " for store " + storeName + " from the cache."); + value = UserBehaviorInsightsPlugin.storeSettings.get(key); + + } else{ + + LOGGER.debug("Getting setting " + setting + " for store " + storeName + " from the index."); + + // Get the id_field to use for each result's unique identifier. + final String queriesIndexName = UbiUtils.getQueriesIndexName(storeName); + final GetSettingsRequest getSettingsRequest = new GetSettingsRequest().indices(queriesIndexName); + + final GetSettingsResponse getSettingsResponse = client.admin().indices().getSettings(getSettingsRequest).actionGet(); + final String settingResponse = getSettingsResponse.getSetting(queriesIndexName, setting); + + UserBehaviorInsightsPlugin.storeSettings.put(key, settingResponse); + value = settingResponse; + + } + + return value; + + } + + private String getHeaderValue(final HeaderConstants header, final String defaultValue, final Task task) { + + final String value = task.getHeader(header.getHeader()); + + if(value == null || value.trim().isEmpty() || value.equals("null")) { + return defaultValue; + } else { + return value; + } + + } + + /** + * Persist the query to the UBI store. + * @param storeName The name of the UBI store. + * @param queryRequest The {@link QueryRequest} that initiated the query. + * @param queryResponse The {@link QueryResponse} that resulted from the query. + */ + private void persistQuery(final String storeName, final QueryRequest queryRequest, QueryResponse queryResponse) { + + LOGGER.info("Writing query ID {} with response ID {}", + queryRequest.getQueryId(), queryResponse.getQueryResponseId()); + + // What will be indexed - adheres to the queries-mapping.json + final Map source = new HashMap<>(); + source.put("timestamp", queryRequest.getTimestamp()); + source.put("query_id", queryRequest.getQueryId()); + source.put("query", queryRequest.getQuery()); + source.put("query_response_id", queryResponse.getQueryResponseId()); + source.put("query_response_hit_ids", queryResponse.getQueryResponseHitIds()); + source.put("user_id", queryRequest.getUserId()); + source.put("session_id", queryRequest.getSessionId()); + + // Get the name of the queries. + final String queriesIndexName = UbiUtils.getQueriesIndexName(storeName); + + // Build the index request. + final IndexRequest indexRequest = new IndexRequest(queriesIndexName) + .source(source, XContentType.JSON); + + // TODO: Move this to the queue, too. + client.index(indexRequest); + + } + +} diff --git a/src/main/java/com/o19s/ubi/data/DataManager.java b/src/main/java/com/o19s/ubi/data/DataManager.java new file mode 100644 index 0000000..84c13c6 --- /dev/null +++ b/src/main/java/com/o19s/ubi/data/DataManager.java @@ -0,0 +1,46 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package com.o19s.ubi.data; + +import com.o19s.ubi.model.Event; +import com.o19s.ubi.model.QueryRequest; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +/** + * Base class for managing client-side events and queries. + */ +public abstract class DataManager { + + @SuppressWarnings("unused") + private final Logger LOGGER = LogManager.getLogger(DataManager.class); + + /** + * Process the events on the queue by writing them to persistent storage. + */ + public abstract void processEvents(); + + /** + * Process the queries on the queue by writing them to persistent storage. + */ + public abstract void processQueries(); + + /** + * Add an event to the queue. + * @param event A client-side {@link Event event} to be persisted. + */ + public abstract void add(Event event); + + /** + * Add an event to the queue. + * @param queryRequest A {@link QueryRequest} to be persisted. + */ + public abstract void add(QueryRequest queryRequest); + +} diff --git a/src/main/java/com/o19s/ubi/data/OpenSearchDataManager.java b/src/main/java/com/o19s/ubi/data/OpenSearchDataManager.java new file mode 100644 index 0000000..c81337e --- /dev/null +++ b/src/main/java/com/o19s/ubi/data/OpenSearchDataManager.java @@ -0,0 +1,138 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package com.o19s.ubi.data; + +import com.o19s.ubi.model.Event; +import com.o19s.ubi.model.QueryRequest; +import com.o19s.ubi.utils.UbiUtils; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.action.bulk.BulkRequest; +import org.opensearch.action.index.IndexRequest; +import org.opensearch.client.Client; +import org.opensearch.common.xcontent.XContentType; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +/** + * An event manager that inserts events into an OpenSearch index. + */ +public class OpenSearchDataManager extends DataManager { + + private static final Logger LOGGER = LogManager.getLogger(OpenSearchDataManager.class); + + private final Client client; + private final int max_items_batch = 1000; + private static OpenSearchDataManager openSearchEventManager; + + /** + * The queue that stores the client-side events. + */ + protected final BlockingQueue eventsQueue; + + /** + * The queue that stores the query requests. + */ + protected final BlockingQueue queryRequestsQueue; + + /** + * Gets a singleton instance of the manager. + * @param client An OpenSearch {@link Client}. + * @return An instance of {@link OpenSearchDataManager}. + */ + public static OpenSearchDataManager getInstance(Client client) { + if(openSearchEventManager == null) { + openSearchEventManager = new OpenSearchDataManager(client); + } + return openSearchEventManager; + } + + private OpenSearchDataManager(Client client) { + this.client = client; + this.eventsQueue = new LinkedBlockingQueue<>(); + this.queryRequestsQueue = new LinkedBlockingQueue<>(); + } + + @Override + public void processEvents() { + + final BulkRequest eventsBulkRequest = new BulkRequest(); + + while(eventsQueue.peek() != null && eventsBulkRequest.numberOfActions() <= max_items_batch) { + + final Event event = eventsQueue.remove(); + + final IndexRequest indexRequest = new IndexRequest(event.getIndexName()) + .source(event.getEvent(), XContentType.JSON); + + eventsBulkRequest.add(indexRequest); + + } + + if(eventsBulkRequest.numberOfActions() > 0) { + client.bulk(eventsBulkRequest); + } + + } + + @Override + public void processQueries() { + + final BulkRequest queryRequestsBulkRequest = new BulkRequest(); + + while(queryRequestsQueue.peek() != null && queryRequestsBulkRequest.numberOfActions() <= max_items_batch) { + + final QueryRequest queryRequest = queryRequestsQueue.remove(); + + LOGGER.trace("Writing query ID {} with response ID {}", + queryRequest.getQueryId(), queryRequest.getQueryResponse().getQueryResponseId()); + + // What will be indexed - adheres to the queries-mapping.json + final Map source = new HashMap<>(); + source.put("timestamp", queryRequest.getTimestamp()); + source.put("query_id", queryRequest.getQueryId()); + source.put("query", queryRequest.getQuery()); + source.put("query_response_id", queryRequest.getQueryResponse().getQueryResponseId()); + source.put("query_response_hit_ids", queryRequest.getQueryResponse().getQueryResponseHitIds()); + source.put("user_id", queryRequest.getUserId()); + source.put("session_id", queryRequest.getSessionId()); + + // Get the name of the queries. + final String queriesIndexName = UbiUtils.getQueriesIndexName(queryRequest.getStoreName()); + + // Build the index request. + final IndexRequest indexRequest = new IndexRequest(queriesIndexName) + .source(source, XContentType.JSON); + + queryRequestsBulkRequest.add(indexRequest); + + } + + LOGGER.trace("Indexing " + queryRequestsBulkRequest.numberOfActions() + " queries"); + + if(queryRequestsBulkRequest.numberOfActions() > 0) { + client.bulk(queryRequestsBulkRequest); + } + + } + + @Override + public void add(final Event event) { + eventsQueue.add(event); + } + + @Override + public void add(final QueryRequest queryRequest) { + queryRequestsQueue.add(queryRequest); + } + +} diff --git a/src/main/java/com/o19s/ubi/model/Event.java b/src/main/java/com/o19s/ubi/model/Event.java new file mode 100644 index 0000000..020fe81 --- /dev/null +++ b/src/main/java/com/o19s/ubi/model/Event.java @@ -0,0 +1,52 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package com.o19s.ubi.model; + +/** + * A client-side event. + */ +public class Event { + + /** + * The name of the OpenSearch index where this event will be stored. + */ + private final String indexName; + + /** + * The event (a JSON string). + */ + private final String event; + + /** + * Create a new event. + * @param indexName The name of the index where this event will be stored. + * @param event The event (a JSON string). + */ + public Event(String indexName, String event) { + this.indexName = indexName; + this.event = event; + } + + /** + * Gets the name of the index where this event is to be stored. + * @return The name of the index where this event is to be stored. + */ + public String getIndexName() { + return indexName; + } + + /** + * Gets the event. + * @return The event. + */ + public String getEvent() { + return event; + } + +} diff --git a/src/main/java/org/opensearch/ubi/HeaderConstants.java b/src/main/java/com/o19s/ubi/model/HeaderConstants.java similarity index 59% rename from src/main/java/org/opensearch/ubi/HeaderConstants.java rename to src/main/java/com/o19s/ubi/model/HeaderConstants.java index e8b7d03..e098aed 100644 --- a/src/main/java/org/opensearch/ubi/HeaderConstants.java +++ b/src/main/java/com/o19s/ubi/model/HeaderConstants.java @@ -6,13 +6,31 @@ * compatible open source license. */ -package org.opensearch.ubi; +package com.o19s.ubi.model; +/** + * HTTP headers used by the plugin. + */ public enum HeaderConstants { + /** + * The plugin-assigned ID of the query. + */ QUERY_ID_HEADER("X-ubi-query-id"), + + /** + * The name of the UBI store associated with a query. + */ EVENT_STORE_HEADER("X-ubi-store"), + + /** + * The ID of a user performing a query. + */ USER_ID_HEADER("X-ubi-user-id"), + + /** + * A session ID corresponding to the query. + */ SESSION_ID_HEADER("X-ubi-session-id"); private final String header; @@ -21,6 +39,10 @@ private HeaderConstants(String header) { this.header = header; } + /** + * Gets the string value of the header. + * @return The string value of the header. + */ public String getHeader() { return header; } diff --git a/src/main/java/com/o19s/ubi/model/QueryRequest.java b/src/main/java/com/o19s/ubi/model/QueryRequest.java new file mode 100644 index 0000000..5b65ac5 --- /dev/null +++ b/src/main/java/com/o19s/ubi/model/QueryRequest.java @@ -0,0 +1,100 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package com.o19s.ubi.model; + +/** + * A query received by OpenSearch. + */ +public class QueryRequest { + + private final String storeName; + private final long timestamp; + private final String queryId; + private final String query; + private final String userId; + private final String sessionId; + private final QueryResponse queryResponse; + + /** + * Creates a query request. + * @param storeName The name of the UBI store to hold this query request. + * @param queryId The ID of the query. + * @param query The query run by OpenSearch. + * @param userId The ID of the user that initiated the query. + * @param sessionId The ID of the session under which the query was run. + * @param queryResponse The {@link QueryResponse} for this query request. + */ + public QueryRequest(final String storeName, final String queryId, final String query, + final String userId, final String sessionId, final QueryResponse queryResponse) { + this.storeName = storeName; + this.timestamp = System.currentTimeMillis(); + this.queryId = queryId; + this.query = query; + this.userId = userId; + this.sessionId = sessionId; + this.queryResponse = queryResponse; + } + + /** + * Gets the name of the UBI store. + * @return The name of the UBI store. + */ + public String getStoreName() { + return storeName; + } + + /** + * Gets the timestamp. + * @return The timestamp. + */ + public long getTimestamp() { + return timestamp; + } + + /** + * Gets the query ID. + * @return The query ID. + */ + public String getQueryId() { + return queryId; + } + + /** + * Gets the query. + * @return The query. + */ + public String getQuery() { + return query; + } + + /** + * Gets the user ID. + * @return The user ID. + */ + public String getUserId() { + return userId; + } + + /** + * Gets the session ID. + * @return The session ID. + */ + public String getSessionId() { + return sessionId; + } + + /** + * Gets the query response for this query request. + * @return The {@link QueryResponse} for this query request. + */ + public QueryResponse getQueryResponse() { + return queryResponse; + } + +} diff --git a/src/main/java/org/opensearch/ubi/model/QueryResponse.java b/src/main/java/com/o19s/ubi/model/QueryResponse.java similarity index 61% rename from src/main/java/org/opensearch/ubi/model/QueryResponse.java rename to src/main/java/com/o19s/ubi/model/QueryResponse.java index 4fe2bc6..7ac1af0 100644 --- a/src/main/java/org/opensearch/ubi/model/QueryResponse.java +++ b/src/main/java/com/o19s/ubi/model/QueryResponse.java @@ -6,30 +6,51 @@ * compatible open source license. */ -package org.opensearch.ubi.model; +package com.o19s.ubi.model; import java.util.List; +/** + * A query response. + */ public class QueryResponse { private final String queryId; private final String queryResponseId; private final List queryResponseHitIds; + /** + * Creates a query response. + * @param queryId The ID of the query. + * @param queryResponseId The ID of the query response. + * @param queryResponseHitIds A list of IDs for the hits in the query. + */ public QueryResponse(final String queryId, final String queryResponseId, final List queryResponseHitIds) { this.queryId = queryId; this.queryResponseId = queryResponseId; this.queryResponseHitIds = queryResponseHitIds; } + /** + * Gets the query ID. + * @return The query ID. + */ public String getQueryId() { return queryId; } + /** + * Gets the query response ID. + * @return The query response ID. + */ public String getQueryResponseId() { return queryResponseId; } + /** + * Gets the list of query response hit IDs. + * @return A list of query response hit IDs. + */ public List getQueryResponseHitIds() { return queryResponseHitIds; } diff --git a/src/main/java/com/o19s/ubi/model/SettingsConstants.java b/src/main/java/com/o19s/ubi/model/SettingsConstants.java new file mode 100644 index 0000000..469d830 --- /dev/null +++ b/src/main/java/com/o19s/ubi/model/SettingsConstants.java @@ -0,0 +1,31 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package com.o19s.ubi.model; + +/** + * Settings constants used by the plugin. + */ +public class SettingsConstants { + + /** + * The schema version. + */ + public static final String VERSION_SETTING = "index.ubi.version"; + + /** + * The name of the UBI store. + */ + public static final String INDEX = "index.ubi.store"; + + /** + * The field in an index's mapping that will be used as the unique identifier for a query result item. + */ + public static final String ID_FIELD = "index.ubi.id_field"; + +} diff --git a/src/main/java/com/o19s/ubi/rest/UserBehaviorInsightsRestHandler.java b/src/main/java/com/o19s/ubi/rest/UserBehaviorInsightsRestHandler.java new file mode 100644 index 0000000..c09997c --- /dev/null +++ b/src/main/java/com/o19s/ubi/rest/UserBehaviorInsightsRestHandler.java @@ -0,0 +1,294 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package com.o19s.ubi.rest; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.o19s.ubi.data.OpenSearchDataManager; +import com.o19s.ubi.UserBehaviorInsightsPlugin; +import com.o19s.ubi.model.Event; +import com.o19s.ubi.model.HeaderConstants; +import com.o19s.ubi.model.SettingsConstants; +import com.o19s.ubi.utils.UbiUtils; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.action.admin.indices.create.CreateIndexRequest; +import org.opensearch.action.admin.indices.delete.DeleteIndexRequest; +import org.opensearch.action.admin.indices.get.GetIndexRequest; +import org.opensearch.action.admin.indices.get.GetIndexResponse; +import org.opensearch.client.node.NodeClient; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.xcontent.XContentType; +import org.opensearch.core.rest.RestStatus; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.rest.BaseRestHandler; +import org.opensearch.rest.BytesRestResponse; +import org.opensearch.rest.RestRequest; +import org.opensearch.rest.action.RestActionListener; + +import java.io.IOException; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.UUID; + +import static org.opensearch.rest.RestRequest.Method.DELETE; +import static org.opensearch.rest.RestRequest.Method.GET; +import static org.opensearch.rest.RestRequest.Method.POST; +import static org.opensearch.rest.RestRequest.Method.PUT; +import static org.opensearch.rest.RestRequest.Method.TRACE; + +/** + * The REST handler for User Behavior Insights. The handler provides the + * REST interface for interacting with UBI stores and for storing client-side events. + */ +public class UserBehaviorInsightsRestHandler extends BaseRestHandler { + + private static final Logger LOGGER = LogManager.getLogger(UserBehaviorInsightsRestHandler.class); + + private static final String EVENTS_MAPPING_FILE = "/events-mapping.json"; + private static final String QUERIES_MAPPING_FILE = "/queries-mapping.json"; + private static final int VERSION = 1; + + @Override + public String getName() { + return "Search Relevance"; + } + + @Override + public List routes() { + return List.of( + new Route(PUT, "/_plugins/ubi/{store}"), // Initializes the store. + new Route(DELETE, "/_plugins/ubi/{store}"), // Deletes a store. + new Route(GET, "/_plugins/ubi"), // Lists all stores + new Route(TRACE, "/_plugins/ubi"), + new Route(POST, "/_plugins/ubi/{store}")); // Indexes events into the store. + } + + @Override + protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient nodeClient) throws IOException { + + if (restRequest.method() == PUT) { + + final String storeName = restRequest.param("store"); + final String index = restRequest.param("index"); + final String idField = restRequest.param("id_field"); + + return create(nodeClient, storeName, index, idField); + + } else if(restRequest.method() == DELETE) { + + final String storeName = restRequest.param("store"); + return delete(nodeClient, storeName); + + } else if(restRequest.method() == POST) { + + final String storeName = restRequest.param("store"); + return post(nodeClient, storeName, restRequest); + + } else if(restRequest.method() == TRACE) { + return trace(nodeClient, restRequest); + } + + return get(nodeClient); + + } + + private RestChannelConsumer trace(NodeClient nodeClient, RestRequest restRequest) { + + return (channel) -> { + + LOGGER.warn("TRACE"); + + final Map> headers = restRequest.getHeaders(); + LOGGER.info("Exposed headers: " + String.join(",", headers.keySet())); + + List ids = headers.get(HeaderConstants.QUERY_ID_HEADER.toString()); + String queryId = null; + if (ids == null || ids.size() == 0) { + LOGGER.warn("Null REST parameter: {}. Using default id.", HeaderConstants.QUERY_ID_HEADER); + queryId = UUID.randomUUID().toString(); + } else { + queryId = ids.get(0); + } + + final GetIndexRequest getIndexRequest = new GetIndexRequest(); + final GetIndexResponse getIndexResponse = nodeClient.admin().indices().getIndex(getIndexRequest).actionGet(); + final Set stores = getStoreNames(getIndexResponse.indices()); + + final String s = "query_id:" + queryId + "&stores:" + String.join(",", stores); + + BytesRestResponse response = new BytesRestResponse(RestStatus.OK, "application/x-www-form-urlencoded", s); + response.addHeader("Access-Control-Expose-Headers", "query_id"); + response.addHeader("query_id", queryId); + + channel.sendResponse(response); + + }; + + } + + private RestChannelConsumer get(final NodeClient nodeClient) { + + return (channel) -> { + + final GetIndexRequest getIndexRequest = new GetIndexRequest(); + + nodeClient.admin().indices().getIndex(getIndexRequest, new RestActionListener<>(channel) { + @Override + public void processResponse(final GetIndexResponse getIndexResponse) throws Exception { + + final Set stores = new HashSet<>(); + + for(final String index : getIndexResponse.indices()) { + if(index.startsWith(".") && index.endsWith("_events")) { + stores.add(index.substring(1, index.length() - 7)); + } + } + + final XContentBuilder builder = XContentType.JSON.contentBuilder(); + builder.startObject().field("stores", stores); + builder.endObject(); + + channel.sendResponse(new BytesRestResponse(RestStatus.OK, builder)); + + } + }); + + }; + + } + + private RestChannelConsumer create(final NodeClient nodeClient, final String storeName, final String index, final String idField) throws IOException { + + LOGGER.info("Creating UBI store [{}] for index [{}] using field [{}]", storeName, index, idField); + + final Settings indexSettings = Settings.builder() + .put(IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1) + .put(IndexMetadata.INDEX_AUTO_EXPAND_REPLICAS_SETTING.getKey(), "0-2") + .put(IndexMetadata.SETTING_PRIORITY, Integer.MAX_VALUE) + .put(SettingsConstants.INDEX, index) + .put(SettingsConstants.ID_FIELD, idField) + .put(SettingsConstants.VERSION_SETTING, VERSION) + .build(); + + // Create the events index. + final String eventsIndex = UbiUtils.getEventsIndexName(storeName); + final CreateIndexRequest createEventsIndexRequest = new CreateIndexRequest(eventsIndex) + .mapping(UbiUtils.getResourceFile(EVENTS_MAPPING_FILE)) + .settings(indexSettings); + + nodeClient.admin().indices().create(createEventsIndexRequest); + + // Create the queries index. + final String queriesIndex = UbiUtils.getQueriesIndexName(storeName); + final CreateIndexRequest createQueriesIndexRequest = new CreateIndexRequest(queriesIndex) + .mapping(UbiUtils.getResourceFile(QUERIES_MAPPING_FILE)) + .settings(indexSettings); + + nodeClient.admin().indices().create(createQueriesIndexRequest); + + final XContentBuilder builder = XContentType.JSON.contentBuilder(); + builder.startObject().field("status", "initialized"); + builder.endObject(); + + return (channel) -> channel.sendResponse(new BytesRestResponse(RestStatus.OK, builder)); + + } + + private RestChannelConsumer post(final NodeClient nodeClient, final String storeName, final RestRequest restRequest) throws IOException { + + try { + + final String eventJson = restRequest.content().utf8ToString(); + final String eventJsonWithTimestamp = setEventTimestamp(eventJson); + + LOGGER.trace("Indexing UBI event into store {}", storeName); + final String eventsIndexName = UbiUtils.getEventsIndexName(storeName); + + final Event event = new Event(eventsIndexName, eventJsonWithTimestamp); + OpenSearchDataManager.getInstance(nodeClient).add(event); + + } catch (JsonProcessingException ex) { + LOGGER.error("Unable to get/set timestamp on UBI event.", ex); + + final XContentBuilder builder = XContentType.JSON.contentBuilder(); + builder.startObject().field("error", "unable to set event timestamp"); + builder.endObject(); + + return (channel) -> channel.sendResponse(new BytesRestResponse(RestStatus.BAD_REQUEST, builder)); + } + + final XContentBuilder builder = XContentType.JSON.contentBuilder(); + builder.startObject().field("status", "received"); + builder.endObject(); + + return (channel) -> channel.sendResponse(new BytesRestResponse(RestStatus.OK, builder)); + + } + + private RestChannelConsumer delete(final NodeClient nodeClient, final String storeName) throws IOException { + + // Delete the events index. + final DeleteIndexRequest deleteEventsIndexRequest = new DeleteIndexRequest(UbiUtils.getEventsIndexName(storeName)); + nodeClient.admin().indices().delete(deleteEventsIndexRequest); + + // Delete the queries index. + final DeleteIndexRequest deleteQueriesIndexRequest = new DeleteIndexRequest(UbiUtils.getQueriesIndexName(storeName)); + nodeClient.admin().indices().delete(deleteQueriesIndexRequest); + + final XContentBuilder builder = XContentType.JSON.contentBuilder(); + builder.startObject().field("status", "deleted"); + builder.endObject(); + + // Remove this store's settings from the settings map. + UserBehaviorInsightsPlugin.storeSettings.entrySet().removeIf(entry -> entry.getKey().startsWith(storeName + ".")); + + return (channel) -> channel.sendResponse(new BytesRestResponse(RestStatus.OK, builder)); + + } + + private String setEventTimestamp(final String eventJson) throws JsonProcessingException { + + final JsonNode rootNode = new ObjectMapper().readTree(eventJson); + + final ObjectNode target = (ObjectNode) rootNode; + + // If there is already a timestamp don't overwrite it. + if(target.get("timestamp") == null || Objects.equals(target.get("timestamp").asText(), "")) { + target.put("timestamp", System.currentTimeMillis()); + } + + return new ObjectMapper().writeValueAsString(rootNode); + + } + + private Set getStoreNames(String[] indices) { + final Set stores = new HashSet<>(); + for (final String index : indices) { + if (index.startsWith(".") && index.endsWith("_events")) { + stores.add(index.substring(1, index.length() - 7)); + } + } + return stores; + } + + private boolean validateStoreName(final String storeName) { + + // Validate the store name. + return storeName != null && !storeName.isEmpty(); + + } + +} diff --git a/src/main/java/com/o19s/ubi/utils/UbiUtils.java b/src/main/java/com/o19s/ubi/utils/UbiUtils.java new file mode 100644 index 0000000..8b4620c --- /dev/null +++ b/src/main/java/com/o19s/ubi/utils/UbiUtils.java @@ -0,0 +1,63 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package com.o19s.ubi.utils; + +import org.opensearch.common.util.io.Streams; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; + +/** + * A utility class used by the plugin. + */ +public class UbiUtils { + + /** + * This is a static utility class. + */ + private UbiUtils() { + + } + + /** + * Gets the formatted name of the queries index. + * @param storeName The name of the UBI store. + * @return The formatted name of the queries index. + */ + public static String getQueriesIndexName(final String storeName) { + return "." + storeName + "_queries"; + } + + /** + * Gets the formatted name of the events index. + * @param storeName The name of the UBI store. + * @return The formatted name of the events index. + */ + public static String getEventsIndexName(final String storeName) { + return "." + storeName + "_events"; + } + + /** + * Gets the content of a resource file. + * @param fileName The file name to open and read. + * @return The content of the given filename. + */ + public static String getResourceFile(final String fileName) { + try (InputStream is = UbiUtils.class.getResourceAsStream(fileName)) { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + Streams.copy(is, out); + return out.toString(StandardCharsets.UTF_8); + } catch (IOException e) { + throw new IllegalStateException("Unable to create index with resource [" + fileName + "]", e); + } + } + +} diff --git a/src/main/java/org/opensearch/ubi/SettingsConstants.java b/src/main/java/org/opensearch/ubi/SettingsConstants.java deleted file mode 100644 index d4811e9..0000000 --- a/src/main/java/org/opensearch/ubi/SettingsConstants.java +++ /dev/null @@ -1,16 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.ubi; - -public class SettingsConstants { - - public static final String INDEX_NAMES = "plugins.ubi.indices"; - public static final String VERSION_SETTING = "index.ubistore.version"; - -} diff --git a/src/main/java/org/opensearch/ubi/action/UserBehaviorInsightsActionFilter.java b/src/main/java/org/opensearch/ubi/action/UserBehaviorInsightsActionFilter.java deleted file mode 100644 index ca62998..0000000 --- a/src/main/java/org/opensearch/ubi/action/UserBehaviorInsightsActionFilter.java +++ /dev/null @@ -1,150 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.ubi.action; - -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.opensearch.action.ActionRequest; -import org.opensearch.action.search.SearchRequest; -import org.opensearch.action.search.SearchResponse; -import org.opensearch.action.support.ActionFilter; -import org.opensearch.action.support.ActionFilterChain; -import org.opensearch.common.settings.Settings; -import org.opensearch.core.action.ActionListener; -import org.opensearch.core.action.ActionResponse; -import org.opensearch.tasks.Task; -import org.opensearch.threadpool.ThreadPool; -import org.opensearch.ubi.HeaderConstants; -import org.opensearch.ubi.backends.Backend; -import org.opensearch.ubi.model.QueryRequest; -import org.opensearch.ubi.model.QueryResponse; - -import java.util.LinkedList; -import java.util.List; -import java.util.UUID; - -public class UserBehaviorInsightsActionFilter implements ActionFilter { - - private static final Logger LOGGER = LogManager.getLogger(UserBehaviorInsightsActionFilter.class); - - private final Backend backend; - private final Settings settings; - private final ThreadPool threadPool; - - public Settings getSettings(){ - return this.settings; - } - - public UserBehaviorInsightsActionFilter(final Backend backend, final Settings settings, ThreadPool threadPool) { - this.backend = backend; - this.settings = settings; - this.threadPool = threadPool; - } - - @Override - public int order() { - return Integer.MAX_VALUE; - } - - @Override - public void apply( - Task task, String action, Request request, ActionListener listener, - ActionFilterChain chain) { - - if (!(request instanceof SearchRequest)) { - chain.proceed(task, action, request, listener); - return; - } - - chain.proceed(task, action, request, new ActionListener<>() { - - @Override - public void onResponse(Response response) { - - final long startTime = System.currentTimeMillis(); - - // Get the search itself. - final SearchRequest searchRequest = (SearchRequest) request; - - // TODO: Restrict logging to only queries of certain indices specified in the settings. - //final List indices = Arrays.asList(searchRequest.indices()); - //final Set indicesToLog = new HashSet<>(Arrays.asList(settings.get(SettingsConstants.INDEX_NAMES).split(","))); - //if(indicesToLog.containsAll(indices)) { - - // Get all search hits from the response. - if (response instanceof SearchResponse) { - - // Get info from the headers. - final String queryId = getHeaderValue(HeaderConstants.QUERY_ID_HEADER, UUID.randomUUID().toString(), task); - final String eventStore = getHeaderValue(HeaderConstants.EVENT_STORE_HEADER, "default", task); - final String userId = getHeaderValue(HeaderConstants.USER_ID_HEADER, "", task); - final String sessionId = getHeaderValue(HeaderConstants.SESSION_ID_HEADER, "", task); - - // The query will be empty when there is no query, e.g. /_search - final String query = searchRequest.source().toString(); - - // Create a UUID for this search response. - final String queryResponseId = UUID.randomUUID().toString(); - - final List queryResponseHitIds = new LinkedList<>(); - final SearchResponse searchResponse = (SearchResponse) response; - - // Add each hit to the list of query responses. - searchResponse.getHits().forEach(hit -> queryResponseHitIds.add(String.valueOf(hit.docId()))); - - try { - - // Persist the query to the backend. - backend.persistQuery(eventStore, - new QueryRequest(queryId, query, userId, sessionId), - new QueryResponse(queryId, queryResponseId, queryResponseHitIds)); - - } catch (Exception ex) { - // TODO: Handle this. - LOGGER.error("Unable to persist query.", ex); - } - - LOGGER.info("Setting and exposing query_id {}", queryId); - //HACK: this should be set in the OpenSearch config (to send to the client code just once), - // and not on every single search response, - // but that server setting doesn't appear to be exposed. - threadPool.getThreadContext().addResponseHeader("Access-Control-Expose-Headers", "query_id"); - threadPool.getThreadContext().addResponseHeader("query_id", queryId); - - final long elapsedTime = System.currentTimeMillis() - startTime; - LOGGER.info("UBI search request filter took {} ms", elapsedTime); - - } - - listener.onResponse(response); - - } - - @Override - public void onFailure(Exception ex) { - listener.onFailure(ex); - } - - }); - - } - - private String getHeaderValue(final HeaderConstants header, final String defaultValue, final Task task) { - - final String value = task.getHeader(header.getHeader()); - - if(value == null || value.trim().isEmpty()) { - return defaultValue; - } else { - return value; - } - - } - -} diff --git a/src/main/java/org/opensearch/ubi/action/UserBehaviorInsightsRestHandler.java b/src/main/java/org/opensearch/ubi/action/UserBehaviorInsightsRestHandler.java deleted file mode 100644 index ec1221f..0000000 --- a/src/main/java/org/opensearch/ubi/action/UserBehaviorInsightsRestHandler.java +++ /dev/null @@ -1,187 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.ubi.action; - -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.node.ObjectNode; -import org.apache.logging.log4j.Level; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.opensearch.client.node.NodeClient; -import org.opensearch.core.rest.RestStatus; -import org.opensearch.rest.BaseRestHandler; -import org.opensearch.rest.BytesRestResponse; -import org.opensearch.rest.RestRequest; -import org.opensearch.ubi.HeaderConstants; -import org.opensearch.ubi.backends.Backend; - -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.Set; -import java.util.UUID; - -import static org.opensearch.rest.RestRequest.Method.*; - -public class UserBehaviorInsightsRestHandler extends BaseRestHandler { - - private static final Logger LOGGER = LogManager.getLogger(UserBehaviorInsightsRestHandler.class); - - private final Backend backend; - - public UserBehaviorInsightsRestHandler(final Backend backend) { - this.backend = backend; - } - - @Override - public String getName() { - return "Search Relevance"; - } - - @Override - public List routes() { - return List.of( - new Route(PUT, "/_plugins/ubi/{store}"), // Initializes the store. - new Route(DELETE, "/_plugins/ubi/{store}"), // Deletes a store. - new Route(GET, "/_plugins/ubi"), // Lists all stores - new Route(TRACE, "/_plugins/ubi"), // for debugging rest weirdness - new Route(POST, "/_plugins/ubi/{store}")); // Indexes events into the store. - } - - @Override - protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient nodeClient) { - - LOGGER.log(Level.INFO, "{}: received event", request.method()); - - if (request.method() == PUT) { - - final String storeName = request.param("store"); - - // Validate the store name. - if(!backend.validateStoreName(storeName)) { - return (channel) -> channel.sendResponse(new BytesRestResponse(RestStatus.BAD_REQUEST, "missing store name")); - } - - LOGGER.info("Creating UBI store {}", storeName); - - return (channel) -> { - /*if(backend.exists(storeName)) { - channel.sendResponse(new BytesRestResponse(RestStatus.CONFLICT, "already exists")); - } else {*/ - backend.initialize(storeName); - channel.sendResponse(new BytesRestResponse(RestStatus.OK, "created")); - //} - }; - - } else if (request.method() == DELETE) { - - final String storeName = request.param("store"); - - // Validate the store name. - if(!backend.validateStoreName(storeName)) { - return (channel) -> channel.sendResponse(new BytesRestResponse(RestStatus.BAD_REQUEST, "missing store name")); - } - - LOGGER.info("Deleting UBI store {}", storeName); - - return (channel) -> { - backend.delete(storeName); - channel.sendResponse(new BytesRestResponse(RestStatus.OK, "created")); - }; - - } else if (request.method() == POST) { - - if (request.hasContent()) { - - final String storeName = request.param("store"); - - // Make sure the store exists. - /*if(!backend.exists(storeName)) { - return (channel) -> channel.sendResponse(new BytesRestResponse(RestStatus.NOT_FOUND, "store not found")); - }*/ - - LOGGER.info("Queuing event for storage into UBI store {}", storeName); - final String eventJson = request.content().utf8ToString(); - - try { - - final String eventJsonWithTimestamp = setEventTimestamp(eventJson); - - backend.persistEvent(storeName, eventJsonWithTimestamp); - return (channel) -> channel.sendResponse(new BytesRestResponse(RestStatus.OK, "event received")); - - } catch (JsonProcessingException ex) { - LOGGER.error("Unable to get/set timestamp on event.", ex); - return (channel) -> channel.sendResponse(new BytesRestResponse(RestStatus.BAD_REQUEST, "unable to set event timestamp")); - } - - } else { - throw new IllegalArgumentException("Missing event content"); - } - - } else if (request.method() == GET) { - - final Set stores = backend.get(); - final String s = String.join(",", stores); - - return (channel) -> channel.sendResponse(new BytesRestResponse(RestStatus.OK, s)); - - } else if (request.method() == TRACE) { - LOGGER.warn("TRACE"); - - final Map> headers = request.getHeaders(); - LOGGER.info("Exposed headers: " + String.join(",", headers.keySet())); - - List ids = headers.get(HeaderConstants.QUERY_ID_HEADER.toString()); - String queryId = null; - if(ids == null || ids.size() == 0){ - LOGGER.warn("Null REST parameter: {}. Using default id.", HeaderConstants.QUERY_ID_HEADER); - queryId = UUID.randomUUID().toString(); - } - else { - queryId = ids.get(0); - } - - final Set stores = backend.get(); - - - final String s = "query_id:" + queryId + "&stores:" + String.join(",", stores); - - BytesRestResponse response = new BytesRestResponse(RestStatus.OK, "application/x-www-form-urlencoded", s); - response.addHeader("Access-Control-Expose-Headers", "query_id"); - response.addHeader("query_id", queryId); - - return (channel) -> channel.sendResponse(response); - } - else - LOGGER.warn("Unknown method " + request.method()); - - // TODO: Return a list names of all search_relevance stores. - return (channel) -> channel.sendResponse(new BytesRestResponse(RestStatus.OK, "ok")); - - } - - private String setEventTimestamp(final String eventJson) throws JsonProcessingException { - - final JsonNode rootNode = new ObjectMapper().readTree(eventJson); - - ObjectNode target = (ObjectNode) rootNode; - - // If there is already a timestamp don't overwrite it. - if(target.get("timestamp") == null || Objects.equals(target.get("timestamp").asText(), "")) { - target.put("timestamp", System.currentTimeMillis()); - } - - return new ObjectMapper().writeValueAsString(rootNode); - - } - -} diff --git a/src/main/java/org/opensearch/ubi/backends/Backend.java b/src/main/java/org/opensearch/ubi/backends/Backend.java deleted file mode 100644 index b11016c..0000000 --- a/src/main/java/org/opensearch/ubi/backends/Backend.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.ubi.backends; - -import org.opensearch.ubi.model.QueryRequest; -import org.opensearch.ubi.model.QueryResponse; - -import java.util.Set; - -public interface Backend { - - void initialize(final String storeName); - - void delete(final String storeName); - - void persistEvent(final String storeName, String eventJson); - - void persistQuery(final String storeName, QueryRequest queryRequest, QueryResponse queryResponse) throws Exception; - - Set get(); - - boolean exists(final String storeName); - - boolean validateStoreName(final String storeName); - -} diff --git a/src/main/java/org/opensearch/ubi/backends/OpenSearchBackend.java b/src/main/java/org/opensearch/ubi/backends/OpenSearchBackend.java deleted file mode 100644 index 7a7428e..0000000 --- a/src/main/java/org/opensearch/ubi/backends/OpenSearchBackend.java +++ /dev/null @@ -1,202 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.ubi.backends; - -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.opensearch.action.admin.indices.create.CreateIndexRequest; -import org.opensearch.action.admin.indices.delete.DeleteIndexRequest; -import org.opensearch.action.admin.indices.exists.indices.IndicesExistsRequest; -import org.opensearch.action.admin.indices.exists.indices.IndicesExistsResponse; -import org.opensearch.action.index.IndexRequest; -import org.opensearch.client.Client; -import org.opensearch.cluster.metadata.IndexMetadata; -import org.opensearch.common.settings.Settings; -import org.opensearch.common.util.io.Streams; -import org.opensearch.common.xcontent.XContentType; -import org.opensearch.ubi.SettingsConstants; -import org.opensearch.ubi.events.Event; -import org.opensearch.ubi.events.OpenSearchEventManager; -import org.opensearch.ubi.model.QueryRequest; -import org.opensearch.ubi.model.QueryResponse; - -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.nio.charset.StandardCharsets; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; -import java.util.Set; - -public class OpenSearchBackend implements Backend { - - private static final Logger LOGGER = LogManager.getLogger(OpenSearchBackend.class); - - private static final String EVENTS_MAPPING_FILE = "events-mapping.json"; - private static final String QUERIES_MAPPING_FILE = "queries-mapping.json"; - - public static final int VERSION = 1; - - private final Client client; - - public OpenSearchBackend(final Client client) { - this.client = client; - } - - @Override - public void initialize(final String storeName) { - - LOGGER.info("Creating search relevance store {}", storeName); - - // Create the events index. - final String eventsIndexName = getEventsIndexName(storeName); - - final CreateIndexRequest createEventsIndexRequest = new CreateIndexRequest(eventsIndexName) - .mapping(getResourceFile(EVENTS_MAPPING_FILE)) - .settings(getIndexSettings()); - - client.admin().indices().create(createEventsIndexRequest); - - // Create the queries index. - final String queriesIndexName = getQueriesIndexName(storeName); - - final CreateIndexRequest createQueryIndexRequest = new CreateIndexRequest(queriesIndexName) - .mapping(getResourceFile(QUERIES_MAPPING_FILE)) - .settings(getIndexSettings()); - - client.admin().indices().create(createQueryIndexRequest); - - } - - @Override - public void delete(String storeName) { - - LOGGER.info("Deleting search relevance store {}", storeName); - - final String eventsIndexName = getEventsIndexName(storeName); - final String queriesIndexName = getQueriesIndexName(storeName); - final DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(eventsIndexName, queriesIndexName); - - client.admin().indices().delete(deleteIndexRequest); - - } - - @Override - public void persistEvent(String storeName, String eventJson) { - - // Add the event for indexing. - LOGGER.info("Indexing event into {}", storeName); - final String eventsIndexName = getEventsIndexName(storeName); - - //return (channel) -> client.index(indexRequest, new RestToXContentListener<>(channel)); - final Event event = new Event(eventsIndexName, eventJson); - OpenSearchEventManager.getInstance(client).add(event); - - } - - @Override - public void persistQuery(final String storeName, final QueryRequest queryRequest, QueryResponse queryResponse) { - - LOGGER.info("Writing query ID {} with response ID {}", queryRequest.getQueryId(), queryResponse.getQueryResponseId()); - - // What will be indexed - adheres to the queries-mapping.json - final Map source = new HashMap<>(); - source.put("timestamp", queryRequest.getTimestamp()); - source.put("query_id", queryRequest.getQueryId()); - source.put("query", queryRequest.getQuery()); - source.put("query_response_id", queryResponse.getQueryResponseId()); - source.put("query_response_hit_ids", queryResponse.getQueryResponseHitIds()); - source.put("user_id", queryRequest.getUserId()); - source.put("session_id", queryRequest.getSessionId()); - - // Get the name of the queries. - final String queriesIndexName = getQueriesIndexName(storeName); - - // Build the index request. - final IndexRequest indexRequest = new IndexRequest(queriesIndexName) - .source(source, XContentType.JSON); - - // TODO: Move this to the queue, too. - client.index(indexRequest); - - } - - @Override - public Set get() { - - /*final GetIndexRequest getIndexRequest = new GetIndexRequest(); - final GetIndexResponse getIndexResponse = client.admin().indices().getIndex(getIndexRequest).actionGet(); - - final String[] indexes = getIndexResponse.indices(); - final Set stores = new HashSet<>(); - - for(final String index : indexes) { - LOGGER.info("Index name: " + index); - if(index.startsWith(".") && index.endsWith("_queries")) { - stores.add(index); - } - } - - return stores;*/ - - return Collections.emptySet(); - - } - - @Override - public boolean exists(final String storeName) { - - final String indexName = getEventsIndexName(storeName); - - // TODO: This has to run on a non-blocking thread. - final IndicesExistsRequest indicesExistsRequest = new IndicesExistsRequest(indexName); - final IndicesExistsResponse indicesExistsResponse = client.admin().indices().exists(indicesExistsRequest).actionGet(); - - return indicesExistsResponse.isExists(); - - } - - @Override - public boolean validateStoreName(final String storeName) { - - // Validate the store name. - return storeName != null && !storeName.isEmpty(); - - } - - private String getEventsIndexName(final String storeName) { - return "." + storeName + "_events"; - } - - private String getQueriesIndexName(final String storeName) { - return "." + storeName + "_queries"; - } - - private String getResourceFile(final String fileName) { - try (InputStream is = OpenSearchBackend.class.getResourceAsStream(fileName)) { - ByteArrayOutputStream out = new ByteArrayOutputStream(); - Streams.copy(is, out); - return out.toString(StandardCharsets.UTF_8); - } catch (IOException e) { - throw new IllegalStateException("Unable to create index with resource [" + OpenSearchBackend.EVENTS_MAPPING_FILE + "]", e); - } - } - - private Settings getIndexSettings() { - return Settings.builder() - .put(IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1) - .put(IndexMetadata.INDEX_AUTO_EXPAND_REPLICAS_SETTING.getKey(), "0-2") - .put(SettingsConstants.VERSION_SETTING, VERSION) - .put(IndexMetadata.SETTING_PRIORITY, Integer.MAX_VALUE) - .put(IndexMetadata.SETTING_INDEX_HIDDEN, true) - .build(); - } - -} diff --git a/src/main/java/org/opensearch/ubi/events/AbstractEventManager.java b/src/main/java/org/opensearch/ubi/events/AbstractEventManager.java deleted file mode 100644 index 28cdbba..0000000 --- a/src/main/java/org/opensearch/ubi/events/AbstractEventManager.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.ubi.events; - -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.opensearch.ubi.events.queues.EventQueue; -import org.opensearch.ubi.events.queues.InternalQueue; - -public abstract class AbstractEventManager { - - @SuppressWarnings("unused") - private final Logger LOGGER = LogManager.getLogger(AbstractEventManager.class); - - protected final EventQueue eventQueue; - - public AbstractEventManager() { - this.eventQueue = new InternalQueue(); - } - - public abstract void process(); - - public abstract void add(Event event); - -} diff --git a/src/main/java/org/opensearch/ubi/events/Event.java b/src/main/java/org/opensearch/ubi/events/Event.java deleted file mode 100644 index 56fff82..0000000 --- a/src/main/java/org/opensearch/ubi/events/Event.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.ubi.events; - -public class Event { - - private final String indexName; - private final String event; - - public Event(String indexName, String event) { - this.indexName = indexName; - this.event = event; - } - - public String getIndexName() { - return indexName; - } - - public String getEvent() { - return event; - } - -} diff --git a/src/main/java/org/opensearch/ubi/events/OpenSearchEventManager.java b/src/main/java/org/opensearch/ubi/events/OpenSearchEventManager.java deleted file mode 100644 index d1972a9..0000000 --- a/src/main/java/org/opensearch/ubi/events/OpenSearchEventManager.java +++ /dev/null @@ -1,65 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.ubi.events; - -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.opensearch.action.bulk.BulkRequest; -import org.opensearch.action.index.IndexRequest; -import org.opensearch.client.Client; -import org.opensearch.common.xcontent.XContentType; - -public class OpenSearchEventManager extends AbstractEventManager { - - private static final Logger LOGGER = LogManager.getLogger(OpenSearchEventManager.class); - - private final Client client; - private static OpenSearchEventManager openSearchEventManager; - - private OpenSearchEventManager(Client client) { - this.client = client; - } - - @Override - public void process() { - - if(eventQueue.size() > 0) { - - final BulkRequest bulkRequest = new BulkRequest(); - LOGGER.info("Bulk inserting " + eventQueue.size() + " UBI events"); - - for (final Event event : eventQueue.get()) { - - final IndexRequest indexRequest = new IndexRequest(event.getIndexName()) - .source(event.getEvent(), XContentType.JSON); - - bulkRequest.add(indexRequest); - - } - - eventQueue.clear(); - client.bulk(bulkRequest); - - } - - } - - @Override - public void add(Event event) { - eventQueue.add(event); - } - - public static OpenSearchEventManager getInstance(Client client) { - if(openSearchEventManager == null) { - openSearchEventManager = new OpenSearchEventManager(client); - } - return openSearchEventManager; - } - -} diff --git a/src/main/java/org/opensearch/ubi/events/queues/EventQueue.java b/src/main/java/org/opensearch/ubi/events/queues/EventQueue.java deleted file mode 100644 index e26fa7f..0000000 --- a/src/main/java/org/opensearch/ubi/events/queues/EventQueue.java +++ /dev/null @@ -1,25 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.ubi.events.queues; - -import org.opensearch.ubi.events.Event; - -import java.util.List; - -public interface EventQueue { - - void add(Event event); - - void clear(); - - List get(); - - int size(); - -} diff --git a/src/main/java/org/opensearch/ubi/events/queues/InternalQueue.java b/src/main/java/org/opensearch/ubi/events/queues/InternalQueue.java deleted file mode 100644 index 62a8a45..0000000 --- a/src/main/java/org/opensearch/ubi/events/queues/InternalQueue.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.ubi.events.queues; - -import org.opensearch.ubi.events.Event; - -import java.util.LinkedList; -import java.util.List; - -public class InternalQueue implements EventQueue { - - private static final List indexRequests = new LinkedList<>(); - - @Override - public void add(Event event) { - indexRequests.add(event); - } - - @Override - public void clear() { - indexRequests.clear(); - } - - @Override - public List get() { - return indexRequests; - } - - @Override - public int size() { - return indexRequests.size(); - } - -} diff --git a/src/main/java/org/opensearch/ubi/model/QueryRequest.java b/src/main/java/org/opensearch/ubi/model/QueryRequest.java deleted file mode 100644 index 3b89b05..0000000 --- a/src/main/java/org/opensearch/ubi/model/QueryRequest.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.ubi.model; - -public class QueryRequest { - - private final long timestamp; - private final String queryId; - private final String query; - private final String userId; - private final String sessionId; - - public QueryRequest(final String queryId, final String query, final String userId, final String sessionId) { - this.timestamp = System.currentTimeMillis(); - this.queryId = queryId; - this.query = query; - this.userId = userId; - this.sessionId = sessionId; - } - - public long getTimestamp() { - return timestamp; - } - - public String getQueryId() { - return queryId; - } - - public String getQuery() { - return query; - } - - public String getUserId() { - return userId; - } - - public String getSessionId() { - return sessionId; - } - -} diff --git a/src/main/resources/org/opensearch/ubi/backends/events-mapping.json b/src/main/resources/events-mapping.json similarity index 100% rename from src/main/resources/org/opensearch/ubi/backends/events-mapping.json rename to src/main/resources/events-mapping.json diff --git a/src/main/resources/org/opensearch/ubi/backends/queries-mapping.json b/src/main/resources/queries-mapping.json similarity index 100% rename from src/main/resources/org/opensearch/ubi/backends/queries-mapping.json rename to src/main/resources/queries-mapping.json diff --git a/src/yamlRestTest/java/org/opensearch/rest/action/UserBehaviorLoggingClientYamlTestSuiteIT.java b/src/yamlRestTest/java/com/o19s/ubi/rest/action/UserBehaviorLoggingClientYamlTestSuiteIT.java similarity index 96% rename from src/yamlRestTest/java/org/opensearch/rest/action/UserBehaviorLoggingClientYamlTestSuiteIT.java rename to src/yamlRestTest/java/com/o19s/ubi/rest/action/UserBehaviorLoggingClientYamlTestSuiteIT.java index cd9165f..ab75960 100644 --- a/src/yamlRestTest/java/org/opensearch/rest/action/UserBehaviorLoggingClientYamlTestSuiteIT.java +++ b/src/yamlRestTest/java/com/o19s/ubi/rest/action/UserBehaviorLoggingClientYamlTestSuiteIT.java @@ -5,7 +5,7 @@ * this file be licensed under the Apache-2.0 license or a * compatible open source license. */ -package org.opensearch.rest.action; +package com.o19s.ubi.rest.action; import com.carrotsearch.randomizedtesting.annotations.Name; import com.carrotsearch.randomizedtesting.annotations.ParametersFactory; diff --git a/src/yamlRestTest/resources/rest-api-spec/api/ubi.create_store.json b/src/yamlRestTest/resources/rest-api-spec/api/ubi.create_store.json index 8082522..bfa909d 100644 --- a/src/yamlRestTest/resources/rest-api-spec/api/ubi.create_store.json +++ b/src/yamlRestTest/resources/rest-api-spec/api/ubi.create_store.json @@ -7,7 +7,7 @@ "path": "/_plugins/ubi/{store}", "parts": { "store": { - "required": false, + "required": true, "type": "string", "description": "The name of the store" } @@ -18,6 +18,17 @@ } ] }, - "body": null + "params": { + "index": { + "required": true, + "type": "string", + "description": "The name of the index being searched" + }, + "id_field": { + "required": false, + "type": "string", + "description": "The name of the field to use for the doc ID field" + } + } } } \ No newline at end of file diff --git a/src/yamlRestTest/resources/rest-api-spec/api/ubi.get_stores.json b/src/yamlRestTest/resources/rest-api-spec/api/ubi.get_stores.json new file mode 100644 index 0000000..5327f5b --- /dev/null +++ b/src/yamlRestTest/resources/rest-api-spec/api/ubi.get_stores.json @@ -0,0 +1,15 @@ +{ + "ubi.get_stores": { + "stability": "stable", + "url": { + "paths": [ + { + "path": "/_plugins/ubi", + "methods": [ + "GET" + ] + } + ] + } + } +} \ No newline at end of file diff --git a/src/yamlRestTest/resources/rest-api-spec/test/_plugins.ubi/10_manage.yml b/src/yamlRestTest/resources/rest-api-spec/test/_plugins.ubi/10_manage.yml index 28dcc82..46c7882 100644 --- a/src/yamlRestTest/resources/rest-api-spec/test/_plugins.ubi/10_manage.yml +++ b/src/yamlRestTest/resources/rest-api-spec/test/_plugins.ubi/10_manage.yml @@ -3,6 +3,7 @@ - do: ubi.create_store: store: mystore + index: ecommerce - do: indices.exists: @@ -10,6 +11,11 @@ - match: { created } + - do: + ubi.get_stores: {} + + - match: { mystore } + - do: ubi.delete_store: store: mystore diff --git a/tag-and-release.sh b/tag-and-release.sh new file mode 100755 index 0000000..77ae161 --- /dev/null +++ b/tag-and-release.sh @@ -0,0 +1,18 @@ +#!/bin/bash -e + +GRADLE_PROPERTIES_FILE=gradle.properties + +function getProperty { + PROP_KEY=$1 + PROP_VALUE=`cat $GRADLE_PROPERTIES_FILE | grep "$PROP_KEY" | cut -d'=' -f2` + echo $PROP_VALUE +} + +OPENSEARCH_VERSION=$(getProperty "opensearchVersion") +UBI_VERSION=$(getProperty "ubiVersion") + +TAG_VERSION="v${UBI_VERSION}-os${OPENSEARCH_VERSION}" +echo "Tagging as ${TAG_VERSION}" + +git tag -a "${TAG_VERSION}" -m "${TAG_VERSION}" +git push --tags