diff --git a/app_sidebar_collapsible.py b/app_sidebar_collapsible.py index da74ade..5c77e5d 100644 --- a/app_sidebar_collapsible.py +++ b/app_sidebar_collapsible.py @@ -211,8 +211,11 @@ def make_controls(): ) page_content = dcc.Loading( + id='global-loading', type='default', fullscreen=True, + overlay_style={"visibility": "visible", "filter": "opacity(0.5)"}, + style={"background-color": "transparent"}, children=html.Div(dash.page_container, style={ "margin-left": "5rem", "margin-right": "2rem", @@ -221,6 +224,16 @@ def make_controls(): ) +@app.callback( + Output('global-loading', 'display'), + Input('interval-load-more', 'disabled'), +) +def hide_spinner_while_loading_batch(interval_disabled): + if interval_disabled: + return 'auto' + return 'hide' + + def make_home_page(): return [ sidebar, html.Div([make_controls(), page_content]) diff --git a/pages/data.py b/pages/data.py index 4f35e7a..ca45aec 100644 --- a/pages/data.py +++ b/pages/data.py @@ -21,7 +21,7 @@ intro = """## Data""" layout = html.Div( - [ + [ dcc.Markdown(intro), dcc.Tabs(id="tabs-datatable", value='tab-uuids-datatable', children=[ dcc.Tab(label='UUIDs', value='tab-uuids-datatable'), @@ -30,6 +30,28 @@ dcc.Tab(label='Trajectories', value='tab-trajectories-datatable'), ]), html.Div(id='tabs-content'), + dcc.Store(id='selected-tab', data='tab-uuids-datatable'), # Store to hold selected tab + dcc.Interval(id='interval-load-more', interval=24000, n_intervals=0), # Interval for loading more data + dcc.Store(id='store-uuids', data=[]), # Store to hold the original UUIDs data + dcc.Store(id='store-loaded-uuids', data={'data': [], 'loaded': False}), # Store to track loaded data + dcc.Store(id='uuids-page-current', data=0), # Store to track current page for UUIDs DataTable + # RadioItems for key list switch, wrapped in a div that can hide/show + html.Div( + id='keylist-switch-container', + children=[ + html.Label("Select Key List:"), + dcc.RadioItems( + id='keylist-switch', + options=[ + {'label': 'Analysis/Recreated Location', 'value': 'analysis/recreated_location'}, + {'label': 'Background/Location', 'value': 'background/location'} + ], + value='analysis/recreated_location', # Default value + labelStyle={'display': 'inline-block', 'margin-right': '10px'} + ), + ], + style={'display': 'none'} # Initially hidden, will show only for the "Trajectories" tab + ), ] ) @@ -62,12 +84,12 @@ def clean_location_data(df): return df -def update_store_trajectories(start_date: str, end_date: str, tz: str, excluded_uuids): +def update_store_trajectories(start_date: str, end_date: str, tz: str, excluded_uuids, key_list): with ect.Timer() as total_timer: # Stage 1: Query trajectories with ect.Timer() as stage1_timer: - df = query_trajectories(start_date, end_date, tz) + df = query_trajectories(start_date, end_date, tz, key_list) esdsq.store_dashboard_time( "admin/data/update_store_trajectories/query_trajectories", stage1_timer @@ -100,8 +122,31 @@ def update_store_trajectories(start_date: str, end_date: str, tz: str, excluded_ return store +@callback( + Output('keylist-switch-container', 'style'), + Input('tabs-datatable', 'value'), +) +def show_keylist_switch(tab): + if tab == 'tab-trajectories-datatable': + return {'display': 'block'} + return {'display': 'none'} # Hide the keylist-switch on all other tabs + + +@callback( + Output('uuids-page-current', 'data'), + Input('uuid-table', 'page_current'), + State('tabs-datatable', 'value') +) +def update_uuids_page_current(page_current, selected_tab): + if selected_tab == 'tab-uuids-datatable': + return page_current + raise PreventUpdate + + @callback( Output('tabs-content', 'children'), + Output('store-loaded-uuids', 'data'), + Output('interval-load-more', 'disabled'), # Disable interval when all data is loaded Input('tabs-datatable', 'value'), Input('store-uuids', 'data'), Input('store-excluded-uuids', 'data'), @@ -111,123 +156,220 @@ def update_store_trajectories(start_date: str, end_date: str, tz: str, excluded_ Input('date-picker', 'start_date'), Input('date-picker', 'end_date'), Input('date-picker-timezone', 'value'), + Input('interval-load-more', 'n_intervals'), # Interval to trigger the loading of more data + Input('keylist-switch', 'value'), # Add keylist-switch to trigger data refresh on change + Input('uuids-page-current', 'data'), # Current page number for UUIDs DataTable + State('store-loaded-uuids', 'data'), # Use State to track already loaded data + State('store-loaded-uuids', 'loaded') # Keep track if we have finished loading all data ) -def render_content(tab, store_uuids, store_excluded_uuids, store_trips, store_demographics, store_trajectories, start_date, end_date, timezone): +def render_content(tab, store_uuids, store_excluded_uuids, store_trips, store_demographics, store_trajectories, start_date, end_date, timezone, n_intervals, key_list, current_page, loaded_uuids_store, all_data_loaded): with ect.Timer() as total_timer: - data, columns, has_perm = None, [], False + initial_batch_size = 10 # Define the batch size for loading UUIDs + + # Stage 1: Update selected tab + selected_tab = tab + logging.debug(f"Callback - {selected_tab} Stage 1: Selected tab updated.") + + # Initialize return variables + content = None + updated_loaded_uuids_store = loaded_uuids_store.copy() if loaded_uuids_store else {'data': [], 'loaded': False} + interval_disabled = True - # Handle UUIDs tab + # Handle the UUIDs tab without fullscreen loading spinner if tab == 'tab-uuids-datatable': - with ect.Timer() as stage1_timer: - data = store_uuids["data"] - data = db_utils.add_user_stats(data) - columns = perm_utils.get_uuids_columns() - has_perm = perm_utils.has_permission('data_uuids') + with ect.Timer() as handle_uuids_timer: + logging.debug(f"Callback - {selected_tab} Stage 2: Handling UUIDs tab.") + + # Ensure store_uuids contains the key 'data' which is a list of dictionaries + if not isinstance(store_uuids, dict) or 'data' not in store_uuids: + logging.error(f"Expected store_uuids to be a dict with a 'data' key, but got {type(store_uuids)}") + content = html.Div([html.P("Data structure error.")]) + interval_disabled = True + else: + uuids_list = store_uuids['data'] + + # Ensure uuids_list is a list for slicing + if not isinstance(uuids_list, list): + logging.error(f"Expected store_uuids['data'] to be a list but got {type(uuids_list)}") + content = html.Div([html.P("Data structure error.")]) + interval_disabled = True + else: + loaded_data = updated_loaded_uuids_store.get('data', []) + total_loaded = len(loaded_data) + + # Handle lazy loading + if not updated_loaded_uuids_store.get('loaded', False): + total_to_load = total_loaded + initial_batch_size + total_to_load = min(total_to_load, len(uuids_list)) # Avoid loading more than available + + logging.debug(f"Callback - {selected_tab} Stage 3: Loading next batch of UUIDs from {total_loaded} to {total_to_load}.") + + new_data = uuids_list[total_loaded:total_to_load] + + if new_data: + # Process and append the new data to the loaded store + processed_data = db_utils.add_user_stats(new_data, initial_batch_size) + loaded_data.extend(processed_data) + + # Update the store with the new data by creating a new dict + updated_loaded_uuids_store = { + 'data': loaded_data, + 'loaded': len(loaded_data) >= len(uuids_list) + } + + logging.debug(f"Callback - {selected_tab} Stage 4: New batch loaded. Total loaded: {len(loaded_data)}.") + + # Prepare the data to be displayed + columns = perm_utils.get_uuids_columns() # Get the relevant columns + df = pd.DataFrame(updated_loaded_uuids_store['data']) + + if df.empty or not perm_utils.has_permission('data_uuids'): + logging.debug(f"Callback - {selected_tab} Error Stage: No data available or permission issues.") + content = html.Div([html.P("No data available or you don't have permission.")]) + interval_disabled = True + else: + df = df.drop(columns=[col for col in df.columns if col not in columns]) + + logging.debug(f"Callback - {selected_tab} Stage 5: Returning appended data to update the UI.") + content = html.Div([ + populate_datatable(df, table_id='uuid-table', page_current=current_page), # Pass current_page + html.P( + f"Showing {len(updated_loaded_uuids_store['data'])} of {len(uuids_list)} UUIDs." + + (f" Loading {initial_batch_size} more..." if not updated_loaded_uuids_store.get('loaded', False) else ""), + style={'margin': '15px 5px'} + ) + ]) + interval_disabled = updated_loaded_uuids_store.get('loaded', False) + + # Store timing after handling UUIDs tab esdsq.store_dashboard_time( "admin/data/render_content/handle_uuids_tab", - stage1_timer + handle_uuids_timer ) # Handle Trips tab elif tab == 'tab-trips-datatable': - with ect.Timer() as stage2_timer: - data = store_trips["data"] + with ect.Timer() as handle_trips_timer: + logging.debug(f"Callback - {selected_tab} Stage 2: Handling Trips tab.") + + data = store_trips.get("data", []) columns = perm_utils.get_allowed_trip_columns() - columns.update( - col['label'] for col in perm_utils.get_allowed_named_trip_columns() - ) - columns.update(store_trips["userinputcols"]) + columns.update(col['label'] for col in perm_utils.get_allowed_named_trip_columns()) + columns.update(store_trips.get("userinputcols", [])) has_perm = perm_utils.has_permission('data_trips') + df = pd.DataFrame(data) if df.empty or not has_perm: - return None - - logging.debug(f"Final list of retained cols {columns=}") - logging.debug(f"Before dropping, {df.columns=}") - df = df.drop(columns=[col for col in df.columns if col not in columns]) - logging.debug(f"After dropping, {df.columns=}") - df = clean_location_data(df) - - trips_table = populate_datatable(df, 'trips-table') - # Return an HTML Div containing a button (button-clicked) and the populated datatable - return html.Div([ - html.Button( - 'Display columns with raw units', - id='button-clicked', # identifier for the button - n_clicks=0, # initialize number of clicks to 0 - style={'marginLeft': '5px'} - ), - trips_table, # populated trips table component - ]) + logging.debug(f"Callback - {selected_tab} Error Stage: No data available or permission issues.") + content = None + interval_disabled = True + else: + df = df.drop(columns=[col for col in df.columns if col not in columns]) + df = clean_location_data(df) + + trips_table = populate_datatable(df, table_id='trips-datatable') + + content = html.Div([ + html.Button('Display columns with raw units', id='button-clicked', n_clicks=0, style={'marginLeft': '5px'}), + trips_table + ]) + interval_disabled = True + + # Store timing after handling Trips tab esdsq.store_dashboard_time( "admin/data/render_content/handle_trips_tab", - stage2_timer + handle_trips_timer ) # Handle Demographics tab elif tab == 'tab-demographics-datatable': - with ect.Timer() as stage3_timer: - data = store_demographics["data"] + with ect.Timer() as handle_demographics_timer: + data = store_demographics.get("data", {}) has_perm = perm_utils.has_permission('data_demographics') - # If only one survey is available, process it without creating a subtab + if len(data) == 1: # Here data is a dictionary data = list(data.values())[0] - columns = list(data[0].keys()) - # For multiple surveys, create subtabs for unique surveys + columns = list(data[0].keys()) if data else [] + df = pd.DataFrame(data) + if df.empty: + content = None + interval_disabled = True + else: + content = populate_datatable(df) + interval_disabled = True elif len(data) > 1: - # Returns subtab only if has_perm is True if not has_perm: - return None - return html.Div([ - dcc.Tabs(id='subtabs-demographics', value=list(data.keys())[0], children=[ - dcc.Tab(label=key, value=key) for key in data - ]), - html.Div(id='subtabs-demographics-content') - ]) + content = None + interval_disabled = True + else: + content = html.Div([ + dcc.Tabs(id='subtabs-demographics', value=list(data.keys())[0], children=[ + dcc.Tab(label=key, value=key) for key in data + ]), + html.Div(id='subtabs-demographics-content') + ]) + interval_disabled = True + else: + content = None + interval_disabled = True + + # Store timing after handling Demographics tab esdsq.store_dashboard_time( "admin/data/render_content/handle_demographics_tab", - stage3_timer + handle_demographics_timer ) # Handle Trajectories tab elif tab == 'tab-trajectories-datatable': # Currently store_trajectories data is loaded only when the respective tab is selected # Here we query for trajectory data once "Trajectories" tab is selected - with ect.Timer() as stage4_timer: + with ect.Timer() as handle_trajectories_timer: (start_date, end_date) = iso_to_date_only(start_date, end_date) if store_trajectories == {}: - store_trajectories = update_store_trajectories(start_date, end_date, timezone, store_excluded_uuids) + store_trajectories = update_store_trajectories(start_date, end_date, timezone, store_excluded_uuids, key_list) data = store_trajectories["data"] if data: columns = list(data[0].keys()) columns = perm_utils.get_trajectories_columns(columns) has_perm = perm_utils.has_permission('data_trajectories') + + df = pd.DataFrame(data) + if df.empty or not has_perm: + logging.debug(f"Callback - {selected_tab} Error Stage: No data available or permission issues.") + content = None + interval_disabled = True + else: + df = df.drop(columns=[col for col in df.columns if col not in columns]) + + datatable = populate_datatable(df) + + content = datatable + interval_disabled = True + else: + content = None + interval_disabled = True + + # Store timing after handling Trajectories tab esdsq.store_dashboard_time( "admin/data/render_content/handle_trajectories_tab", - stage4_timer + handle_trajectories_timer ) - # Prepare final DataFrame and return datatable - with ect.Timer() as stage5_timer: - df = pd.DataFrame(data) - if df.empty or not has_perm: - return None - - df = df.drop(columns=[col for col in df.columns if col not in columns]) - - result = populate_datatable(df) - esdsq.store_dashboard_time( - "admin/data/render_content/prepare_final_dataframe_and_return", - stage5_timer - ) + # Handle unhandled tabs or errors + else: + logging.debug(f"Callback - {selected_tab} Error Stage: No data loaded or unhandled tab.") + content = None + interval_disabled = True + # Store total timing after all stages esdsq.store_dashboard_time( "admin/data/render_content/total_time", total_timer ) - return result + return content, updated_loaded_uuids_store, interval_disabled + # Handle subtabs for demographic table when there are multiple surveys @callback( @@ -293,7 +435,7 @@ def update_sub_tab(tab, store_demographics): return result @callback( - Output('trips-table', 'hidden_columns'), # Output hidden columns in the trips-table + Output('trips-datatable', 'hidden_columns'), # Output hidden columns in the trips-table Output('button-clicked', 'children'), # Updates button label Input('button-clicked', 'n_clicks'), # Number of clicks on the button State('button-clicked', 'children') # State representing the current label of button @@ -326,7 +468,7 @@ def update_dropdowns_trips(n_clicks, button_label): -def populate_datatable(df, table_id=''): +def populate_datatable(df, table_id='', page_current=0): with ect.Timer() as total_timer: # Stage 1: Check if df is a DataFrame and raise PreventUpdate if not @@ -349,7 +491,7 @@ def populate_datatable(df, table_id=''): # filter_action="native", sort_action="native", # give user capability to sort columns sort_mode="single", # sort across 'multi' or 'single' columns - page_current=0, # page number that user is on + page_current=page_current, # set to current page page_size=50, # number of rows visible per page style_cell={ 'textAlign': 'left', @@ -364,11 +506,11 @@ def populate_datatable(df, table_id=''): "admin/data/populate_datatable/create_datatable", stage2_timer ) - - # Store the total time for the entire function + esdsq.store_dashboard_time( "admin/data/populate_datatable/total_time", total_timer ) - return result + + diff --git a/requirements.txt b/requirements.txt index 6f0d5e4..6182fe8 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,8 +1,8 @@ --extra-index-url https://plotly.nrel.gov/Docs/packages # dash is required to call `build:py` -dash==2.15.0 +dash==2.18.0 gunicorn==20.1.0 -plotly==5.14.1 +plotly==5.24.1 dash-bootstrap-components==1.4.1 dash_extensions==0.1.13 #dashboard_setup/nrel_dash_components-0.0.1.tar.gz # for docker-compose diff --git a/utils/db_utils.py b/utils/db_utils.py index a44b4c8..387b45c 100644 --- a/utils/db_utils.py +++ b/utils/db_utils.py @@ -18,6 +18,7 @@ from utils import constants from utils import permissions as perm_utils from utils.datetime_utils import iso_range_to_ts_range +from concurrent.futures import ThreadPoolExecutor, as_completed def df_to_filtered_records(df, col_to_filter=None, vals_to_exclude: list[str] = []): """ @@ -354,9 +355,9 @@ def query_demographics(): return dataframes -def query_trajectories(start_date: str, end_date: str, tz: str): +def query_trajectories(start_date: str, end_date: str, tz: str, key_list): with ect.Timer() as total_timer: - + key_list = [key_list] if isinstance(key_list, str) else key_list # Stage 1: Convert date range to timestamps with ect.Timer() as stage1_timer: (start_ts, end_ts) = iso_range_to_ts_range(start_date, end_date, tz) @@ -369,7 +370,7 @@ def query_trajectories(start_date: str, end_date: str, tz: str): with ect.Timer() as stage2_timer: ts = esta.TimeSeries.get_aggregate_time_series() entries = ts.find_entries( - key_list=["analysis/recreated_location"], + key_list=key_list, time_query=estt.TimeQuery("data.ts", start_ts, end_ts), ) esdsq.store_dashboard_time( @@ -407,9 +408,14 @@ def query_trajectories(start_date: str, end_date: str, tz: str): # Stage 5: Add human-readable mode string with ect.Timer() as stage5_timer: - df['data.mode_str'] = df['data.mode'].apply( - lambda x: ecwm.MotionTypes(x).name if x in set(enum.value for enum in ecwm.MotionTypes) else 'UNKNOWN' - ) + if 'background/location' in key_list: + if 'data.mode' in df.columns: + # Set the values in data.mode to blank ('') + df['data.mode'] = '' + else: + df['data.mode_str'] = df['data.mode'].apply( + lambda x: ecwm.MotionTypes(x).name if x in set(enum.value for enum in ecwm.MotionTypes) else 'UNKNOWN' + ) esdsq.store_dashboard_time( "admin/db_utils/query_trajectories/add_mode_string", stage5_timer @@ -424,100 +430,109 @@ def query_trajectories(start_date: str, end_date: str, tz: str): -def add_user_stats(user_data): +def add_user_stats(user_data, batch_size=5): + time_format = 'YYYY-MM-DD HH:mm:ss' with ect.Timer() as total_timer: - - for user in user_data: - user_uuid = UUID(user['user_id']) - - # Stage 1: Count total trips - with ect.Timer() as stage1_timer: - total_trips = esta.TimeSeries.get_aggregate_time_series().find_entries_count( + # Stage 1: Define process_user + def process_user(user): + with ect.Timer() as process_user_timer: + user_uuid = UUID(user['user_id']) + + # Fetch aggregated data for all users once and cache it + ts_aggregate = esta.TimeSeries.get_aggregate_time_series() + + # Fetch data for the user, cached for repeated queries + profile_data = edb.get_profile_db().find_one({'user_id': user_uuid}) + + total_trips = ts_aggregate.find_entries_count( key_list=["analysis/confirmed_trip"], extra_query_list=[{'user_id': user_uuid}] ) - user['total_trips'] = total_trips - esdsq.store_dashboard_time( - "admin/db_utils/add_user_stats/count_total_trips", - stage1_timer - ) - - # Stage 2: Count labeled trips - with ect.Timer() as stage2_timer: - labeled_trips = esta.TimeSeries.get_aggregate_time_series().find_entries_count( + labeled_trips = ts_aggregate.find_entries_count( key_list=["analysis/confirmed_trip"], extra_query_list=[{'user_id': user_uuid}, {'data.user_input': {'$ne': {}}}] ) + + user['total_trips'] = total_trips user['labeled_trips'] = labeled_trips - esdsq.store_dashboard_time( - "admin/db_utils/add_user_stats/count_labeled_trips", - stage2_timer - ) - - # Stage 3: Retrieve user profile data - with ect.Timer() as stage3_timer: - profile_data = edb.get_profile_db().find_one({'user_id': user_uuid}) - user['platform'] = profile_data.get('curr_platform') - user['manufacturer'] = profile_data.get('manufacturer') - user['app_version'] = profile_data.get('client_app_version') - user['os_version'] = profile_data.get('client_os_version') - user['phone_lang'] = profile_data.get('phone_lang') - esdsq.store_dashboard_time( - "admin/db_utils/add_user_stats/retrieve_user_profile_data", - stage3_timer - ) - - if total_trips > 0: - # Stage 4: Get first trip timestamp - with ect.Timer() as stage4_timer: - time_format = 'YYYY-MM-DD HH:mm:ss' + + if profile_data: + user['platform'] = profile_data.get('curr_platform') + user['manufacturer'] = profile_data.get('manufacturer') + user['app_version'] = profile_data.get('client_app_version') + user['os_version'] = profile_data.get('client_os_version') + user['phone_lang'] = profile_data.get('phone_lang') + + if total_trips > 0: ts = esta.TimeSeries.get_time_series(user_uuid) - start_ts = ts.get_first_value_for_field( + first_trip_ts = ts.get_first_value_for_field( key='analysis/confirmed_trip', field='data.end_ts', sort_order=pymongo.ASCENDING ) - if start_ts != -1: - user['first_trip'] = arrow.get(start_ts).format(time_format) - esdsq.store_dashboard_time( - "admin/db_utils/add_user_stats/get_first_trip_timestamp", - stage4_timer - ) - - # Stage 5: Get last trip timestamp - with ect.Timer() as stage5_timer: - end_ts = ts.get_first_value_for_field( + if first_trip_ts != -1: + user['first_trip'] = arrow.get(first_trip_ts).format(time_format) + + last_trip_ts = ts.get_first_value_for_field( key='analysis/confirmed_trip', field='data.end_ts', sort_order=pymongo.DESCENDING ) - if end_ts != -1: - user['last_trip'] = arrow.get(end_ts).format(time_format) - esdsq.store_dashboard_time( - "admin/db_utils/add_user_stats/get_last_trip_timestamp", - stage5_timer - ) - - # Stage 6: Get last server call timestamp - with ect.Timer() as stage6_timer: - last_call = ts.get_first_value_for_field( + if last_trip_ts != -1: + user['last_trip'] = arrow.get(last_trip_ts).format(time_format) + + last_call_ts = ts.get_first_value_for_field( key='stats/server_api_time', field='data.ts', sort_order=pymongo.DESCENDING ) - if last_call != -1: - user['last_call'] = arrow.get(last_call).format(time_format) + if last_call_ts != -1: + user['last_call'] = arrow.get(last_call_ts).format(time_format) + + esdsq.store_dashboard_time( + "admin/db_utils/add_user_stats/process_user", + process_user_timer + ) + return user + + def batch_process(users_batch): + with ect.Timer() as batch_process_timer: + with ThreadPoolExecutor() as executor: # Adjust max_workers based on CPU cores + futures = [executor.submit(process_user, user) for user in users_batch] + processed_batch = [future.result() for future in as_completed(futures)] + esdsq.store_dashboard_time( + "admin/db_utils/add_user_stats/get_last_trip_timestamp", + batch_process_timer + ) + return processed_batch + + + # Stage 3: Process batches + with ect.Timer() as stage3_timer: + total_users = len(user_data) + processed_data = [] + + for i in range(0, total_users, batch_size): + with ect.Timer() as stage_loop_timer: + batch = user_data[i:i + batch_size] + processed_batch = batch_process(batch) + processed_data.extend(processed_batch) + + logging.debug(f'Processed {len(processed_data)} users out of {total_users}') esdsq.store_dashboard_time( - "admin/db_utils/add_user_stats/get_last_server_call_timestamp", - stage6_timer + "admin/db_utils/add_user_stats/processing_loop_stage", + stage_loop_timer ) - + esdsq.store_dashboard_time( + "admin/db_utils/add_user_stats/process_batches", + stage3_timer + ) esdsq.store_dashboard_time( "admin/db_utils/add_user_stats/total_time", total_timer ) + return processed_data - return user_data def query_segments_crossing_endpoints(poly_region_start, poly_region_end, start_date: str, end_date: str, tz: str, excluded_uuids: list[str]): with ect.Timer() as total_timer: