From e170a9a3a2d246c764134619ef71a67cd0eb285f Mon Sep 17 00:00:00 2001 From: Junhao Liao Date: Mon, 23 Oct 2023 00:09:16 -0400 Subject: [PATCH] webui: separate search results by job ids --- .../executor/search/fs_search_method.py | 2 +- .../webui_query_handler/main.py | 77 ++++----- .../webui/imports/api/search/publications.js | 1 + .../imports/api/search/server/publications.js | 31 ++-- .../search/server/query_handler_mediator.js | 3 +- .../imports/ui/SearchView/SearchView.jsx | 151 ++++++++++++------ 6 files changed, 158 insertions(+), 107 deletions(-) diff --git a/components/job-orchestration/job_orchestration/executor/search/fs_search_method.py b/components/job-orchestration/job_orchestration/executor/search/fs_search_method.py index 487b25d0b..fe8bc180d 100644 --- a/components/job-orchestration/job_orchestration/executor/search/fs_search_method.py +++ b/components/job-orchestration/job_orchestration/executor/search/fs_search_method.py @@ -113,7 +113,7 @@ def search( search_cmd.append("--mongodb-database") search_cmd.append(str(output_config["db_name"])) search_cmd.append("--mongodb-collection") - search_cmd.append(str(query['results_collection_name'])) + search_cmd.append(f"{output_config['results_collection_name']}_{job_id_str}") # Start compression logger.info("Searching...") diff --git a/components/webui-query-handler/webui_query_handler/main.py b/components/webui-query-handler/webui_query_handler/main.py index 399e3247c..4ff1f9c31 100644 --- a/components/webui-query-handler/webui_query_handler/main.py +++ b/components/webui-query-handler/webui_query_handler/main.py @@ -36,10 +36,7 @@ logger.addHandler(logging_console_handler) # Constants -GENERAL_METADATA_DOC_ID = "general" -TIMELINE_DOC_ID = "timeline" -NUM_SEARCH_RESULTS_DOC_ID = "num_search_results" - +METADATA_DOC_ID_PREFIX = "meta-job-" class DatabaseConnectionConfig(BaseModel): host: str @@ -268,7 +265,10 @@ async def receive_from_client(websocket) -> typing.Tuple[bool, typing.Optional[s def clear_results( - db_uri: str, results_collection_name: str, results_metadata_collection_name: str + db_uri: str, + results_collection_name: str, + results_metadata_collection_name: str, + job_id: int ): # Connect to database client = MongoClient(db_uri) @@ -277,29 +277,24 @@ def clear_results( # Clear previous results search_results_collection = pymongo.collection.Collection( db, - results_collection_name, + f"{results_collection_name}_{job_id}", write_concern=pymongo.collection.WriteConcern(w=1, wtimeout=2000), ) search_results_collection.drop() search_results_metadata_collection = pymongo.collection.Collection( db, results_metadata_collection_name ) - search_results_metadata_collection.drop() + search_results_metadata_collection.delete_one({"_id": f"{METADATA_DOC_ID_PREFIX}{job_id}"}) return True def update_timeline_and_count( search_results_collection: pymongo.collection.Collection, search_results_metadata_collection: pymongo.collection.Collection, + job_id: int, time_range, output_result_type: ResultType, ): - # Update count - num_results = search_results_collection.count_documents({}) - search_results_metadata_collection.update_one( - {"_id": NUM_SEARCH_RESULTS_DOC_ID}, {"$set": {"all": num_results}}, upsert=True - ) - if ( ResultType.MESSAGE_RESULT == output_result_type ): @@ -426,12 +421,13 @@ def update_timeline_and_count( # Update timeline graph doc = { "data": ts_graph_data, - "num_results": num_results, + "num_results_in_range": num_results, "period_ms": ts_period_ms, "period_name": ts_period_name, + "num_total_results": search_results_collection.count_documents({}), } search_results_metadata_collection.update_one( - {"_id": TIMELINE_DOC_ID}, {"$set": doc}, upsert=True + {"_id": f"{METADATA_DOC_ID_PREFIX}{job_id}"}, {"$set": doc}, upsert=True ) @@ -439,6 +435,7 @@ def search_metadata_updater( db_uri: str, results_collection_name: str, results_metadata_collection_name: str, + job_id: int, time_range, output_result_type: ResultType, ): @@ -447,16 +444,18 @@ def search_metadata_updater( db = client.get_default_database() search_results_collection = pymongo.collection.Collection( - db, results_collection_name + db, f"{results_collection_name}_{job_id}" ) search_results_metadata_collection = pymongo.collection.Collection( db, results_metadata_collection_name ) + logger.info(f"{results_collection_name}_{job_id}") while True: update_timeline_and_count( search_results_collection, search_results_metadata_collection, + job_id, time_range, output_result_type, ) @@ -486,8 +485,8 @@ async def send_preparing_for_query_msg(websocket): await send_type_only_msg(websocket, ServerMessageType.PREPARING_FOR_QUERY) -async def send_query_started_msg(websocket): - await send_type_only_msg(websocket, ServerMessageType.QUERY_STARTED) +async def send_query_started_msg(websocket, job_id: int): + await send_msg(websocket, ServerMessageType.QUERY_STARTED, {"jobID": job_id}) async def send_error_msg(websocket, msg): @@ -534,10 +533,12 @@ async def cancel_metadata_task(task, pending_tasks, query_done_event): query_done_event.clear() +# FIXME: only clear specific job in meta def schedule_clear_results_task( db_uri: str, results_collection_name: str, results_metadata_collection_name: str, + job_id: int, pending_tasks, ): operation_task = asyncio.ensure_future( @@ -546,6 +547,7 @@ def schedule_clear_results_task( db_uri, results_collection_name, results_metadata_collection_name, + job_id, ) ) pending_tasks.add(operation_task) @@ -578,9 +580,6 @@ async def query_handler( logger.debug(f'query_handler: Received sessionId as {sessionId}') g_webui_connected[sessionId] = True - session_results_collection_name = f"{results_collection_name}_{sessionId}" - session_results_metadata_collection_name = f"{results_metadata_collection_name}_{sessionId}" - pending = set() job_id = None operation_task: typing.Optional[asyncio.Future] = None @@ -640,8 +639,9 @@ async def query_handler( operation_task = schedule_clear_results_task( results_cache_uri, - session_results_collection_name, - session_results_metadata_collection_name, + results_collection_name, + results_metadata_collection_name, + job_id, pending, ) @@ -668,8 +668,9 @@ async def query_handler( operation_task = schedule_clear_results_task( results_cache_uri, - session_results_collection_name, - session_results_metadata_collection_name, + results_collection_name, + results_metadata_collection_name, + job_id, pending, ) @@ -710,8 +711,9 @@ async def query_handler( run_function_in_process( search_metadata_updater, results_cache_uri, - session_results_collection_name, - session_results_metadata_collection_name, + results_collection_name, + results_metadata_collection_name, + job_id, time_range, output_result_type, initializer=load_query_done_event, @@ -786,14 +788,13 @@ async def query_handler( logger.debug("CLEAR_RESULTS_IN_PROGRESS_BEFORE_QUERY -> READY") continue - # Tell client we've started the query - await send_query_started_msg(websocket) - # Submit query synchronously so that we're guaranteed to get # the job ID back - pending_query["results_collection_name"] = session_results_collection_name job_id = submit_query(db_conn_conf, results_cache_uri, - session_results_collection_name, pending_query) + results_collection_name, pending_query) + + # Tell client we've started the query + await send_query_started_msg(websocket, job_id) operation_task = asyncio.ensure_future( run_function_in_process( @@ -811,8 +812,9 @@ async def query_handler( run_function_in_process( search_metadata_updater, results_cache_uri, - session_results_collection_name, - session_results_metadata_collection_name, + results_collection_name, + results_metadata_collection_name, + job_id, {"begin": None, "end": None}, output_result_type, initializer=load_query_done_event, @@ -870,7 +872,6 @@ async def query_handler( done.remove(operation_task) await handle_operation_task_completion(websocket, operation_task) operation_task = None - job_id = None await send_operation_complete_msg(websocket) @@ -895,7 +896,6 @@ async def query_handler( pending_query = None cancel_query(db_conn_conf, job_id) - job_id = None # Signal metadata updater to stop query_done_event.set() @@ -945,8 +945,9 @@ async def query_handler( run_function_in_process( search_metadata_updater, results_cache_uri, - session_results_collection_name, - session_results_metadata_collection_name, + results_collection_name, + results_metadata_collection_name, + job_id, time_range, output_result_type, initializer=load_query_done_event, diff --git a/components/webui/imports/api/search/publications.js b/components/webui/imports/api/search/publications.js index cd292a1ca..9150c965a 100644 --- a/components/webui/imports/api/search/publications.js +++ b/components/webui/imports/api/search/publications.js @@ -2,3 +2,4 @@ import {Mongo} from "meteor/mongo"; export const SEARCH_SERVER_STATE_COLLECTION_NAME = "search-server-state"; export const SearchServerStateCollection = new Mongo.Collection(SEARCH_SERVER_STATE_COLLECTION_NAME); +export const SearchResultsMetadataCollection = new Mongo.Collection(Meteor.settings.public.SearchResultsMetadataCollectionName); diff --git a/components/webui/imports/api/search/server/publications.js b/components/webui/imports/api/search/server/publications.js index e544a6751..648648b5e 100644 --- a/components/webui/imports/api/search/server/publications.js +++ b/components/webui/imports/api/search/server/publications.js @@ -1,20 +1,21 @@ import {Meteor} from "meteor/meteor"; -import {SEARCH_SERVER_STATE_COLLECTION_NAME} from "../publications"; +import {SEARCH_SERVER_STATE_COLLECTION_NAME, SearchResultsMetadataCollection} from "../publications"; import {currentServerStateList} from "./query_handler_mediator"; const MyCollections = {}; -const createCollectionsIfNotExist = (sessionId) => { - if (!(sessionId in MyCollections)) { - MyCollections[sessionId] = { - [Meteor.settings.public.SearchResultsCollectionName]: null, - [Meteor.settings.public.SearchResultsMetadataCollectionName]: null, - }; - MyCollections[sessionId][Meteor.settings.public.SearchResultsCollectionName] = - new Mongo.Collection(`${Meteor.settings.public.SearchResultsCollectionName}_${sessionId}`); - MyCollections[sessionId][Meteor.settings.public.SearchResultsMetadataCollectionName] = - new Mongo.Collection(`${Meteor.settings.public.SearchResultsMetadataCollectionName}_${sessionId}`); +const createCollectionsIfNotExist = (sessionId, jobID) => { + const collectionName = `${Meteor.settings.public.SearchResultsCollectionName}_${jobID}`; + + if ((MyCollections[sessionId] !== undefined) && + (MyCollections[sessionId][Meteor.settings.public.SearchResultsCollectionName]._name === collectionName)) { + return; } + MyCollections[sessionId] = { + [Meteor.settings.public.SearchResultsCollectionName]: null, + }; + MyCollections[sessionId][Meteor.settings.public.SearchResultsCollectionName] = + new Mongo.Collection(collectionName); } // TODO: revisit: there is no need to isolate this collection per user/session because it is already done so by meteor? @@ -36,8 +37,7 @@ Meteor.publish(Meteor.settings.public.SearchResultsMetadataCollectionName, funct let findOptions = { disableOplog: true, pollingIntervalMs: 250 }; - createCollectionsIfNotExist(this.userId); - return MyCollections[this.userId][Meteor.settings.public.SearchResultsMetadataCollectionName].find({}, findOptions); + return SearchResultsMetadataCollection.find({}, findOptions); }); Meteor.publish(Meteor.settings.public.SearchResultsCollectionName, function search_results_publication({ @@ -45,6 +45,9 @@ Meteor.publish(Meteor.settings.public.SearchResultsCollectionName, function sear fieldToSortBy, visibleSearchResultsLimit }) { + if ((!currentServerStateList[this.userId]) || !(currentServerStateList[this.userId].jobID)) { + return [] + } let selector = {}; if (null !== visibleTimeRange.begin && null !== visibleTimeRange.end) { selector["timestamp"] = { @@ -62,6 +65,6 @@ Meteor.publish(Meteor.settings.public.SearchResultsCollectionName, function sear findOptions["sort"] = sort; } - createCollectionsIfNotExist(this.userId); + createCollectionsIfNotExist(this.userId, currentServerStateList[this.userId].jobID); return MyCollections[this.userId][Meteor.settings.public.SearchResultsCollectionName].find(selector, findOptions); }); diff --git a/components/webui/imports/api/search/server/query_handler_mediator.js b/components/webui/imports/api/search/server/query_handler_mediator.js index 552a68f6d..97a24ef37 100644 --- a/components/webui/imports/api/search/server/query_handler_mediator.js +++ b/components/webui/imports/api/search/server/query_handler_mediator.js @@ -88,6 +88,7 @@ class WebUiQueryHandlerWebsocket { case ServerMessageType.QUERY_STARTED: if (SearchState.WAITING_FOR_QUERY_TO_START === currentServerStateList[this.__sessionId].state) { currentServerStateList[this.__sessionId].state = SearchState.QUERY_IN_PROGRESS; + currentServerStateList[this.__sessionId].jobID = message['value']['jobID'] } else { // Should not be possible, so disconnect console.error("Got QUERY_STARTED while in impossible state."); @@ -118,7 +119,7 @@ class WebUiQueryHandlerWebsocket { export function initCurrentServerState(sessionId) { if (currentServerStateList[sessionId] === undefined) { - currentServerStateList[sessionId] = {state: null, errorMessage: ""}; + currentServerStateList[sessionId] = {state: null, errorMessage: "", jobID: null}; } } diff --git a/components/webui/imports/ui/SearchView/SearchView.jsx b/components/webui/imports/ui/SearchView/SearchView.jsx index f6a3fc82d..309d90cb2 100644 --- a/components/webui/imports/ui/SearchView/SearchView.jsx +++ b/components/webui/imports/ui/SearchView/SearchView.jsx @@ -3,7 +3,19 @@ import {Meteor} from "meteor/meteor"; import React, {useEffect, useState} from "react"; import {DateTime} from "luxon"; -import {faBars, faCog, faExclamationCircle, faFile, faSearch, faSort, faSortDown, faSortUp, faTimes, faTrash, faUndo,} from "@fortawesome/free-solid-svg-icons"; +import { + faBars, + faCog, + faExclamationCircle, + faFile, + faSearch, + faSort, + faSortDown, + faSortUp, + faTimes, + faTrash, + faUndo, +} from "@fortawesome/free-solid-svg-icons"; import { Button, Col, @@ -27,29 +39,26 @@ import Highcharts from "highcharts"; import DatePicker from "react-datepicker"; import ReactVisibilitySensor from "react-visibility-sensor"; -import {SearchServerStateCollection} from "../../api/search/publications"; +import {SearchResultsMetadataCollection, SearchServerStateCollection} from "../../api/search/publications"; import {SearchState} from "../../api/search/constants"; import "react-datepicker/dist/react-datepicker.css"; -const MyCollections = { - [Meteor.settings.public.SearchResultsCollectionName]:null, - [Meteor.settings.public.SearchResultsMetadataCollectionName]:null, -} +const MyCollections = {} -const InitMyCollections = (sessionId) => { - if (MyCollections[Meteor.settings.public.SearchResultsCollectionName] === null){ - MyCollections[Meteor.settings.public.SearchResultsCollectionName] = - new Mongo.Collection(`${Meteor.settings.public.SearchResultsCollectionName}_${sessionId}`); - } - if (MyCollections[Meteor.settings.public.SearchResultsMetadataCollectionName] === null){ - MyCollections[Meteor.settings.public.SearchResultsMetadataCollectionName] = - new Mongo.Collection(`${Meteor.settings.public.SearchResultsMetadataCollectionName}_${sessionId}`); +const InitMyCollections = (jobID) => { + const collectionName = `${Meteor.settings.public.SearchResultsCollectionName}_${jobID}`; + + if ((MyCollections[Meteor.settings.public.SearchResultsCollectionName] !== undefined) && + (MyCollections[Meteor.settings.public.SearchResultsCollectionName]._name === collectionName)) { + return; } + + MyCollections[Meteor.settings.public.SearchResultsCollectionName] = + new Mongo.Collection(collectionName); } -const SearchView = () => { - InitMyCollections(Meteor.userId()) +const SearchView = () => { const [operationErrorMsg, setOperationErrorMsg] = useState(""); const [visibleSearchResultsLimit, setVisibleSearchResultsLimit] = useState(10); @@ -67,8 +76,7 @@ const SearchView = () => { if ((null === visibleTimeRangeBegin && null !== visibleTimeRangeEnd) || (null !== visibleTimeRangeBegin && null === visibleTimeRangeEnd) || (null !== visibleTimeRangeBegin && null !== visibleTimeRangeEnd - && visibleTimeRangeBegin >= visibleTimeRangeEnd)) - { + && visibleTimeRangeBegin >= visibleTimeRangeEnd)) { // Ignore invalid combinations which may occur to due to the two // values being updated separately return; @@ -109,9 +117,16 @@ const SearchView = () => { findOptions["sort"] = sort; } + if ((!serverState) || !(serverState.jobID)) { + return [] + } + InitMyCollections(serverState.jobID) return MyCollections[Meteor.settings.public.SearchResultsCollectionName].find({}, findOptions).fetch(); - }, [fieldToSortBy, visibleSearchResultsLimit, visibleTimeRangeBegin, visibleTimeRangeEnd]); + }, [serverState && serverState.jobID, fieldToSortBy, visibleSearchResultsLimit, visibleTimeRangeBegin, visibleTimeRangeEnd]); const [resultHighlightRegex, numSearchResultsOnServer, timeline] = useTracker(() => { + if ((!serverState) || !(serverState.jobID)) { + return [] + } const subscription = Meteor.subscribe(Meteor.settings.public.SearchResultsMetadataCollectionName); let isReady = subscription.ready(); @@ -119,23 +134,26 @@ const SearchView = () => { let numSearchResultsOnServer = 0; let timeline = null; if (isReady) { - const numSearchResultsDoc = MyCollections[Meteor.settings.public.SearchResultsMetadataCollectionName].findOne({_id: "num_search_results"}); - if (numSearchResultsDoc) { - numSearchResultsOnServer = numSearchResultsDoc["all"]; - } - - const timelineDoc = MyCollections[Meteor.settings.public.SearchResultsMetadataCollectionName].findOne({_id: "timeline"}); - if (timelineDoc) { - timeline = timelineDoc; + // FIXME: add job id + const metaDoc = SearchResultsMetadataCollection.findOne({_id: `meta-job-${serverState.jobID}`}); + if (metaDoc) { + timeline = { + "data": metaDoc["data"], + "num_results_in_range": metaDoc["num_results_in_range"], + "period_ms": metaDoc["period_ms"], + "period_name": metaDoc["period_name"], + }; + numSearchResultsOnServer = metaDoc["num_total_results"]; } - const generalDoc = MyCollections[Meteor.settings.public.SearchResultsMetadataCollectionName].findOne({_id: "general"}); + // FIXME: check what's the use of this + const generalDoc = SearchResultsMetadataCollection.findOne({_id: "general"}); if (generalDoc) { resultHighlightRegex = generalDoc["regex"]; } } return [resultHighlightRegex, numSearchResultsOnServer, timeline]; - }); + }, [serverState && serverState.jobID]); const resetVisibleResultSettings = () => { resetVisibleTimeRange(); @@ -497,7 +515,8 @@ const SearchResultsTable = ({ // Display results as messages headerColumns.push( -
Timestamp @@ -565,7 +584,7 @@ const SearchResultsTable = ({ let columns = []; for (const columnName of headerColumnNames) { let result = record[columnName]; - if (typeof(result) === "object") { + if (typeof (result) === "object") { result = JSON.stringify(result); } columns.push({result}); @@ -584,7 +603,8 @@ const SearchResultsTable = ({ } headerColumns.push( - +
{columnName}
@@ -604,7 +624,7 @@ const SearchResultsTable = ({ } let result = searchResult[columnName]; - if (typeof(result) === "object") { + if (typeof (result) === "object") { result = JSON.stringify(result); } columns.push({result}); @@ -619,10 +639,10 @@ const SearchResultsTable = ({
- {headerColumns} + {headerColumns} - {rows} + {rows}
@@ -681,13 +701,15 @@ const SearchResultsHeader = ({ {numResultsText} {null !== visibleTimeRange.begin ? ( - + ) : (<>)} +
Max lines per result @@ -702,7 +724,8 @@ const SearchResultsHeader = ({
}> - +
@@ -737,7 +760,7 @@ const SearchResults = ({ let realNumResultsOnServer = Math.max(numResultsOnServer, searchResults.length); let numResultsInTimeRange = realNumResultsOnServer; if (timeline) { - numResultsInTimeRange = timeline.num_results; + numResultsInTimeRange = timeline.num_results_in_range; } const isMessageTable = searchResults.length === 0 || Object.keys(searchResults[0]).includes("timestamp"); return ( @@ -773,7 +796,8 @@ const SearchResults = ({ const SearchStatus = ({state, errorMessage, searchResultsExist}) => { if ("" !== errorMessage) { - return (
{errorMessage}
); + return (
{errorMessage}
); } else { let noResultsStatus = ""; if (false === searchResultsExist) { @@ -801,13 +825,21 @@ const SearchStatus = ({state, errorMessage, searchResultsExist}) => { now={SearchState.READY !== state ? 100 : 0} variant="primary" /> - {false === searchResultsExist ? (
{noResultsStatus}
) : (<>)} + {false === searchResultsExist ? ( +
{noResultsStatus}
) : (<>)} ); } } -const SearchFilterControlsDrawer = ({timeRange, setTimeRange, matchCase, setMatchCase, filePathRegex, setFilePathRegex}) => { +const SearchFilterControlsDrawer = ({ + timeRange, + setTimeRange, + matchCase, + setMatchCase, + filePathRegex, + setFilePathRegex + }) => { const updateBeginTimestamp = (date) => { if (date.getTime() > timeRange.end.getTime()) { setTimeRange({begin: date, end: date}); @@ -846,8 +878,7 @@ const SearchFilterControlsDrawer = ({timeRange, setTimeRange, matchCase, setMatc let timestampEndMax = null; if (timeRange.begin.getFullYear() === timeRange.end.getFullYear() && timeRange.begin.getMonth() === timeRange.end.getMonth() - && timeRange.begin.getDate() === timeRange.end.getDate()) - { + && timeRange.begin.getDate() === timeRange.end.getDate()) { timestampEndMin = new Date(timeRange.begin); // TODO This doesn't handle leap seconds timestampEndMax = new Date(timeRange.end).setHours(23, 59, 59, 999); @@ -861,10 +892,12 @@ const SearchFilterControlsDrawer = ({timeRange, setTimeRange, matchCase, setMatc - Time range + Time + range - {timeRangePresetItems} @@ -908,16 +941,21 @@ const SearchFilterControlsDrawer = ({timeRange, setTimeRange, matchCase, setMatc - Case sensitivity + Case + sensitivity - - Sensitive - Insensitive + + Sensitive + Insensitive - Path filter + Path + filter { +const SearchControls = ({ + serverState, + searchResultsExist, + matchCase, + setMatchCase, + setOperationErrorMsg, + resetVisibleResultSettings + }) => { const [queryString, setQueryString] = useState(""); const [drawerOpen, setDrawerOpen] = useState(false); const [timeRange, setTimeRange] = useState(computeLast15MinTimeRange);