Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

webui: separate search results by job ids #170

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ def search(
search_cmd.append("--mongodb-database")
search_cmd.append(str(output_config["db_name"]))
search_cmd.append("--mongodb-collection")
search_cmd.append(str(query['results_collection_name']))
search_cmd.append(f"{output_config['results_collection_name']}_{job_id_str}")

# Start compression
logger.info("Searching...")
Expand Down
77 changes: 39 additions & 38 deletions components/webui-query-handler/webui_query_handler/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,7 @@
logger.addHandler(logging_console_handler)

# Constants
GENERAL_METADATA_DOC_ID = "general"
TIMELINE_DOC_ID = "timeline"
NUM_SEARCH_RESULTS_DOC_ID = "num_search_results"

METADATA_DOC_ID_PREFIX = "meta-job-"

class DatabaseConnectionConfig(BaseModel):
host: str
Expand Down Expand Up @@ -268,7 +265,10 @@ async def receive_from_client(websocket) -> typing.Tuple[bool, typing.Optional[s


def clear_results(
db_uri: str, results_collection_name: str, results_metadata_collection_name: str
db_uri: str,
results_collection_name: str,
results_metadata_collection_name: str,
job_id: int
):
# Connect to database
client = MongoClient(db_uri)
Expand All @@ -277,29 +277,24 @@ def clear_results(
# Clear previous results
search_results_collection = pymongo.collection.Collection(
db,
results_collection_name,
f"{results_collection_name}_{job_id}",
write_concern=pymongo.collection.WriteConcern(w=1, wtimeout=2000),
)
search_results_collection.drop()
search_results_metadata_collection = pymongo.collection.Collection(
db, results_metadata_collection_name
)
search_results_metadata_collection.drop()
search_results_metadata_collection.delete_one({"_id": f"{METADATA_DOC_ID_PREFIX}{job_id}"})
return True


def update_timeline_and_count(
search_results_collection: pymongo.collection.Collection,
search_results_metadata_collection: pymongo.collection.Collection,
job_id: int,
time_range,
output_result_type: ResultType,
):
# Update count
num_results = search_results_collection.count_documents({})
search_results_metadata_collection.update_one(
{"_id": NUM_SEARCH_RESULTS_DOC_ID}, {"$set": {"all": num_results}}, upsert=True
)

if (
ResultType.MESSAGE_RESULT == output_result_type
):
Expand Down Expand Up @@ -426,19 +421,21 @@ def update_timeline_and_count(
# Update timeline graph
doc = {
"data": ts_graph_data,
"num_results": num_results,
"num_results_in_range": num_results,
"period_ms": ts_period_ms,
"period_name": ts_period_name,
"num_total_results": search_results_collection.count_documents({}),
}
search_results_metadata_collection.update_one(
{"_id": TIMELINE_DOC_ID}, {"$set": doc}, upsert=True
{"_id": f"{METADATA_DOC_ID_PREFIX}{job_id}"}, {"$set": doc}, upsert=True
)


def search_metadata_updater(
db_uri: str,
results_collection_name: str,
results_metadata_collection_name: str,
job_id: int,
time_range,
output_result_type: ResultType,
):
Expand All @@ -447,16 +444,18 @@ def search_metadata_updater(
db = client.get_default_database()

search_results_collection = pymongo.collection.Collection(
db, results_collection_name
db, f"{results_collection_name}_{job_id}"
)
search_results_metadata_collection = pymongo.collection.Collection(
db, results_metadata_collection_name
)

logger.info(f"{results_collection_name}_{job_id}")
while True:
update_timeline_and_count(
search_results_collection,
search_results_metadata_collection,
job_id,
time_range,
output_result_type,
)
Expand Down Expand Up @@ -486,8 +485,8 @@ async def send_preparing_for_query_msg(websocket):
await send_type_only_msg(websocket, ServerMessageType.PREPARING_FOR_QUERY)


async def send_query_started_msg(websocket):
await send_type_only_msg(websocket, ServerMessageType.QUERY_STARTED)
async def send_query_started_msg(websocket, job_id: int):
await send_msg(websocket, ServerMessageType.QUERY_STARTED, {"jobID": job_id})


async def send_error_msg(websocket, msg):
Expand Down Expand Up @@ -534,10 +533,12 @@ async def cancel_metadata_task(task, pending_tasks, query_done_event):
query_done_event.clear()


# FIXME: only clear specific job in meta
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we move this inline comment to the function body and mark it as TODO?

def schedule_clear_results_task(
db_uri: str,
results_collection_name: str,
results_metadata_collection_name: str,
job_id: int,
pending_tasks,
):
operation_task = asyncio.ensure_future(
Expand All @@ -546,6 +547,7 @@ def schedule_clear_results_task(
db_uri,
results_collection_name,
results_metadata_collection_name,
job_id,
)
)
pending_tasks.add(operation_task)
Expand Down Expand Up @@ -578,9 +580,6 @@ async def query_handler(
logger.debug(f'query_handler: Received sessionId as {sessionId}')
g_webui_connected[sessionId] = True

session_results_collection_name = f"{results_collection_name}_{sessionId}"
session_results_metadata_collection_name = f"{results_metadata_collection_name}_{sessionId}"

pending = set()
job_id = None
operation_task: typing.Optional[asyncio.Future] = None
Expand Down Expand Up @@ -640,8 +639,9 @@ async def query_handler(

operation_task = schedule_clear_results_task(
results_cache_uri,
session_results_collection_name,
session_results_metadata_collection_name,
results_collection_name,
results_metadata_collection_name,
job_id,
pending,
)

Expand All @@ -668,8 +668,9 @@ async def query_handler(

operation_task = schedule_clear_results_task(
results_cache_uri,
session_results_collection_name,
session_results_metadata_collection_name,
results_collection_name,
results_metadata_collection_name,
job_id,
pending,
)

Expand Down Expand Up @@ -710,8 +711,9 @@ async def query_handler(
run_function_in_process(
search_metadata_updater,
results_cache_uri,
session_results_collection_name,
session_results_metadata_collection_name,
results_collection_name,
results_metadata_collection_name,
job_id,
time_range,
output_result_type,
initializer=load_query_done_event,
Expand Down Expand Up @@ -786,14 +788,13 @@ async def query_handler(
logger.debug("CLEAR_RESULTS_IN_PROGRESS_BEFORE_QUERY -> READY")
continue

# Tell client we've started the query
await send_query_started_msg(websocket)

# Submit query synchronously so that we're guaranteed to get
# the job ID back
pending_query["results_collection_name"] = session_results_collection_name
job_id = submit_query(db_conn_conf, results_cache_uri,
session_results_collection_name, pending_query)
results_collection_name, pending_query)

# Tell client we've started the query
await send_query_started_msg(websocket, job_id)

operation_task = asyncio.ensure_future(
run_function_in_process(
Expand All @@ -811,8 +812,9 @@ async def query_handler(
run_function_in_process(
search_metadata_updater,
results_cache_uri,
session_results_collection_name,
session_results_metadata_collection_name,
results_collection_name,
results_metadata_collection_name,
job_id,
{"begin": None, "end": None},
output_result_type,
initializer=load_query_done_event,
Expand Down Expand Up @@ -870,7 +872,6 @@ async def query_handler(
done.remove(operation_task)
await handle_operation_task_completion(websocket, operation_task)
operation_task = None
job_id = None

await send_operation_complete_msg(websocket)

Expand All @@ -895,7 +896,6 @@ async def query_handler(
pending_query = None

cancel_query(db_conn_conf, job_id)
job_id = None

# Signal metadata updater to stop
query_done_event.set()
Expand Down Expand Up @@ -945,8 +945,9 @@ async def query_handler(
run_function_in_process(
search_metadata_updater,
results_cache_uri,
session_results_collection_name,
session_results_metadata_collection_name,
results_collection_name,
results_metadata_collection_name,
job_id,
time_range,
output_result_type,
initializer=load_query_done_event,
Expand Down
1 change: 1 addition & 0 deletions components/webui/imports/api/search/publications.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@ import {Mongo} from "meteor/mongo";

export const SEARCH_SERVER_STATE_COLLECTION_NAME = "search-server-state";
export const SearchServerStateCollection = new Mongo.Collection(SEARCH_SERVER_STATE_COLLECTION_NAME);
export const SearchResultsMetadataCollection = new Mongo.Collection(Meteor.settings.public.SearchResultsMetadataCollectionName);
31 changes: 17 additions & 14 deletions components/webui/imports/api/search/server/publications.js
Original file line number Diff line number Diff line change
@@ -1,20 +1,21 @@
import {Meteor} from "meteor/meteor";

import {SEARCH_SERVER_STATE_COLLECTION_NAME} from "../publications";
import {SEARCH_SERVER_STATE_COLLECTION_NAME, SearchResultsMetadataCollection} from "../publications";
import {currentServerStateList} from "./query_handler_mediator";

const MyCollections = {};
const createCollectionsIfNotExist = (sessionId) => {
if (!(sessionId in MyCollections)) {
MyCollections[sessionId] = {
[Meteor.settings.public.SearchResultsCollectionName]: null,
[Meteor.settings.public.SearchResultsMetadataCollectionName]: null,
};
MyCollections[sessionId][Meteor.settings.public.SearchResultsCollectionName] =
new Mongo.Collection(`${Meteor.settings.public.SearchResultsCollectionName}_${sessionId}`);
MyCollections[sessionId][Meteor.settings.public.SearchResultsMetadataCollectionName] =
new Mongo.Collection(`${Meteor.settings.public.SearchResultsMetadataCollectionName}_${sessionId}`);
const createCollectionsIfNotExist = (sessionId, jobID) => {
const collectionName = `${Meteor.settings.public.SearchResultsCollectionName}_${jobID}`;

if ((MyCollections[sessionId] !== undefined) &&
(MyCollections[sessionId][Meteor.settings.public.SearchResultsCollectionName]._name === collectionName)) {
Comment on lines +10 to +11
Copy link
Member

@LinZhihao-723 LinZhihao-723 Oct 24, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if ((MyCollections[sessionId] !== undefined) &&
(MyCollections[sessionId][Meteor.settings.public.SearchResultsCollectionName]._name === collectionName)) {
if (undefined !== MyCollections[sessionId] &&
collectionName === MyCollections[sessionId][Meteor.settings.public.SearchResultsCollectionName]._name) {

return;
}
MyCollections[sessionId] = {
[Meteor.settings.public.SearchResultsCollectionName]: null,
};
MyCollections[sessionId][Meteor.settings.public.SearchResultsCollectionName] =
new Mongo.Collection(collectionName);
}

// TODO: revisit: there is no need to isolate this collection per user/session because it is already done so by meteor?
Expand All @@ -36,15 +37,17 @@ Meteor.publish(Meteor.settings.public.SearchResultsMetadataCollectionName, funct
let findOptions = {
disableOplog: true, pollingIntervalMs: 250
};
createCollectionsIfNotExist(this.userId);
return MyCollections[this.userId][Meteor.settings.public.SearchResultsMetadataCollectionName].find({}, findOptions);
return SearchResultsMetadataCollection.find({}, findOptions);
});

Meteor.publish(Meteor.settings.public.SearchResultsCollectionName, function search_results_publication({
visibleTimeRange,
fieldToSortBy,
visibleSearchResultsLimit
}) {
if ((!currentServerStateList[this.userId]) || !(currentServerStateList[this.userId].jobID)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if ((!currentServerStateList[this.userId]) || !(currentServerStateList[this.userId].jobID)) {
if (null === currentServerStateList[this.userId] || undefined === currentServerStateList[this.userId].jobID) {

We should make explicit comparison rather than using !

return []
}
let selector = {};
if (null !== visibleTimeRange.begin && null !== visibleTimeRange.end) {
selector["timestamp"] = {
Expand All @@ -62,6 +65,6 @@ Meteor.publish(Meteor.settings.public.SearchResultsCollectionName, function sear
findOptions["sort"] = sort;
}

createCollectionsIfNotExist(this.userId);
createCollectionsIfNotExist(this.userId, currentServerStateList[this.userId].jobID);
return MyCollections[this.userId][Meteor.settings.public.SearchResultsCollectionName].find(selector, findOptions);
});
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ class WebUiQueryHandlerWebsocket {
case ServerMessageType.QUERY_STARTED:
if (SearchState.WAITING_FOR_QUERY_TO_START === currentServerStateList[this.__sessionId].state) {
currentServerStateList[this.__sessionId].state = SearchState.QUERY_IN_PROGRESS;
currentServerStateList[this.__sessionId].jobID = message['value']['jobID']
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
currentServerStateList[this.__sessionId].jobID = message['value']['jobID']
currentServerStateList[this.__sessionId].jobID = message['value']['jobID'];

} else {
// Should not be possible, so disconnect
console.error("Got QUERY_STARTED while in impossible state.");
Expand Down Expand Up @@ -118,7 +119,7 @@ class WebUiQueryHandlerWebsocket {

export function initCurrentServerState(sessionId) {
if (currentServerStateList[sessionId] === undefined) {
currentServerStateList[sessionId] = {state: null, errorMessage: ""};
currentServerStateList[sessionId] = {state: null, errorMessage: "", jobID: null};
}
}

Expand Down
Loading