Skip to content

Commit

Permalink
Merge pull request #112 from o19s/scheduler
Browse files Browse the repository at this point in the history
Changing scheduler to fixed delay and separating events and queries tasks
  • Loading branch information
jzonthemtn authored Mar 12, 2024
2 parents 7e2f540 + 5466a20 commit 764847f
Show file tree
Hide file tree
Showing 5 changed files with 36 additions and 19 deletions.
2 changes: 1 addition & 1 deletion load-test/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ curl -X PUT "http://localhost:9200/_plugins/ubi/mystore?index=ecommerce"
../index-chorus-data.sh `realpath ../../chorus-opensearch-edition`

# Insert events and queries.
locust -f load-test.py --headless -u 1 -r 1 --run-time 30s --host http://localhost:9200
locust -f load-test.py --headless -u 1 -r 1 --run-time 600s --host http://localhost:9200

# Let events index.
sleep 10
Expand Down
13 changes: 9 additions & 4 deletions src/main/java/com/o19s/ubi/OpenSearchEventManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,12 @@ private OpenSearchEventManager(Client client) {
}

@Override
public void process() {
public void processEvents() {

if(eventsQueue.size() > 0) {

final BulkRequest eventsBulkRequest = new BulkRequest();
LOGGER.info("Bulk inserting " + eventsQueue.size() + " UBI events");
LOGGER.debug("Bulk inserting " + eventsQueue.size() + " UBI events");

for (final Event event : eventsQueue.get()) {

Expand All @@ -58,14 +58,19 @@ public void process() {

}

}

@Override
public void processQueries() {

if(queryRequestsQueue.size() > 0) {

final BulkRequest queryRequestsBulkRequest = new BulkRequest();
LOGGER.info("Bulk inserting " + queryRequestsQueue.size() + " UBI queries");
LOGGER.debug("Bulk inserting " + queryRequestsQueue.size() + " UBI queries");

for(final QueryRequest queryRequest : queryRequestsQueue.get()) {

LOGGER.info("Writing query ID {} with response ID {}",
LOGGER.debug("Writing query ID {} with response ID {}",
queryRequest.getQueryId(), queryRequest.getQueryResponse().getQueryResponseId());

// What will be indexed - adheres to the queries-mapping.json
Expand Down
13 changes: 8 additions & 5 deletions src/main/java/com/o19s/ubi/UserBehaviorInsightsPlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -120,13 +120,16 @@ public Collection<Object> createComponents(
Supplier<RepositoriesService> repositoriesServiceSupplier
) {

// TODO Only start this if an OpenSearch store is already initialized.
// Otherwise, start it when a store is initialized.
// TODO: Allow the parameters of the scheduled tasks to be configurable.

LOGGER.info("Creating UBI scheduled task to persist events.");
// TODO: Allow these time parameters to be configurable.
threadPool.scheduler().scheduleAtFixedRate(() -> {
OpenSearchEventManager.getInstance(client).process();
threadPool.scheduler().scheduleWithFixedDelay(() -> {
OpenSearchEventManager.getInstance(client).processEvents();
}, 0, 2000, TimeUnit.MILLISECONDS);

LOGGER.info("Creating UBI scheduled task to persist queries.");
threadPool.scheduler().scheduleWithFixedDelay(() -> {
OpenSearchEventManager.getInstance(client).processQueries();
}, 0, 2000, TimeUnit.MILLISECONDS);

// Initialize the action filter.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ public void onResponse(Response response) {
final QueryRequest queryRequest = new QueryRequest(storeName, queryId, query, userId, sessionId, queryResponse);

// Queue this for writing to the UBI store.
LOGGER.trace("Queueing queryRequest for write");
eventManager.add(queryRequest);

// Add the query_id to the response headers.
Expand All @@ -142,18 +143,21 @@ public void onResponse(Response response) {
final long elapsedTime = System.currentTimeMillis() - startTime;
LOGGER.info("UBI search request filter took {} ms", elapsedTime);

}
LOGGER.debug("Setting and exposing query_id {}", queryId);
//HACK: this should be set in the OpenSearch config (to send to the client code just once),
// and not on every single search response,
// but that server setting doesn't appear to be exposed.
threadPool.getThreadContext().addResponseHeader("Access-Control-Expose-Headers", "query_id");

LOGGER.info("Setting and exposing query_id {}", queryId);
//HACK: this should be set in the OpenSearch config (to send to the client code just once),
// and not on every single search response,
// but that server setting doesn't appear to be exposed.
threadPool.getThreadContext().addResponseHeader("Access-Control-Expose-Headers", "query_id");
threadPool.getThreadContext().addResponseHeader("query_id", queryId);
} else {
LOGGER.trace("Discarding query for UBI due to index name mismatch.");
}

final long elapsedTime = System.currentTimeMillis() - startTime;
LOGGER.info("UBI search request filter took {} ms", elapsedTime);

} else {
LOGGER.trace("Discarding query for UBI due to missing store name.");
}

}
Expand Down
9 changes: 7 additions & 2 deletions src/main/java/com/o19s/ubi/events/EventManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,14 @@ public EventManager() {
}

/**
* Process the items on the queue by writing them to persistent storage.
* Process the events on the queue by writing them to persistent storage.
*/
public abstract void process();
public abstract void processEvents();

/**
* Process the queries on the queue by writing them to persistent storage.
*/
public abstract void processQueries();

/**
* Add an event to the queue.
Expand Down

0 comments on commit 764847f

Please sign in to comment.