-
Notifications
You must be signed in to change notification settings - Fork 42
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add support for configurable qualx label column #1528
Changes from 1 commit
c122dfa
398ec13
5bda4a3
bbb9847
ed22793
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
@@ -0,0 +1,37 @@ | ||||||||||||||||||||||||||||||||||
# Copyright (c) 2025, NVIDIA CORPORATION. | ||||||||||||||||||||||||||||||||||
# | ||||||||||||||||||||||||||||||||||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||||||||||||||||||||||||||||||||||
# you may not use this file except in compliance with the License. | ||||||||||||||||||||||||||||||||||
# You may obtain a copy of the License at | ||||||||||||||||||||||||||||||||||
# | ||||||||||||||||||||||||||||||||||
# http://www.apache.org/licenses/LICENSE-2.0 | ||||||||||||||||||||||||||||||||||
# | ||||||||||||||||||||||||||||||||||
# Unless required by applicable law or agreed to in writing, software | ||||||||||||||||||||||||||||||||||
# distributed under the License is distributed on an "AS IS" BASIS, | ||||||||||||||||||||||||||||||||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||||||||||||||||||||||||||||||||
# See the License for the specific language governing permissions and | ||||||||||||||||||||||||||||||||||
# limitations under the License. | ||||||||||||||||||||||||||||||||||
""" | ||||||||||||||||||||||||||||||||||
Config module for Qualx, controlled by environment variables. | ||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||
Environment variables: | ||||||||||||||||||||||||||||||||||
- QUALX_CACHE_DIR: cache directory for saving Profiler output. | ||||||||||||||||||||||||||||||||||
- QUALX_DATA_DIR: data directory containing eventlogs, primarily used in dataset JSON files. | ||||||||||||||||||||||||||||||||||
- QUALX_DIR: root directory for Qualx execution, primarily used in dataset JSON files to locate | ||||||||||||||||||||||||||||||||||
dataset-specific plugins. | ||||||||||||||||||||||||||||||||||
- QUALX_LABEL: targeted label column for XGBoost model. | ||||||||||||||||||||||||||||||||||
- SPARK_RAPIDS_TOOLS_JAR: path to Spark RAPIDS Tools JAR file. | ||||||||||||||||||||||||||||||||||
""" | ||||||||||||||||||||||||||||||||||
import os | ||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||
def get_cache_dir() -> str: | ||||||||||||||||||||||||||||||||||
"""Get cache directory to save Profiler output.""" | ||||||||||||||||||||||||||||||||||
return os.environ.get('QUALX_CACHE_DIR', 'qualx_cache') | ||||||||||||||||||||||||||||||||||
Comment on lines
+28
to
+30
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We can use the utility methods to get/set the env variables. spark-rapids-tools/user_tools/src/spark_rapids_pytools/common/utilities.py Lines 103 to 118 in 14255f4
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same comment as above. |
||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||
def get_label() -> str: | ||||||||||||||||||||||||||||||||||
"""Get targeted label column for XGBoost model.""" | ||||||||||||||||||||||||||||||||||
label = os.environ.get('QUALX_LABEL', 'Duration') | ||||||||||||||||||||||||||||||||||
assert label in ['Duration', 'duration_sum'] | ||||||||||||||||||||||||||||||||||
return label |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,4 +1,4 @@ | ||
# Copyright (c) 2024, NVIDIA CORPORATION. | ||
# Copyright (c) 2024-2025, NVIDIA CORPORATION. | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
|
@@ -22,6 +22,7 @@ | |
import pandas as pd | ||
import xgboost as xgb | ||
from xgboost import Booster | ||
from spark_rapids_tools.tools.qualx.config import get_label | ||
from spark_rapids_tools.tools.qualx.preprocess import expected_raw_features | ||
from spark_rapids_tools.tools.qualx.util import get_logger | ||
# Import optional packages | ||
|
@@ -42,7 +43,6 @@ | |
'appId', | ||
'appName', | ||
'description', | ||
'Duration', | ||
'fraction_supported', | ||
'jobStartTime_min', | ||
'pluginEnabled', | ||
|
@@ -53,8 +53,6 @@ | |
'sqlID' | ||
} | ||
|
||
expected_model_features = expected_raw_features - ignored_features | ||
|
||
|
||
def train( | ||
cpu_aug_tbl: pd.DataFrame, | ||
|
@@ -174,19 +172,22 @@ def predict( | |
preds['y'] = y | ||
preds_df = pd.DataFrame(preds) | ||
|
||
label = get_label() # Duration, duration_sum | ||
select_columns = [ | ||
'appName', | ||
'appId', | ||
'appDuration', | ||
'sqlID', | ||
'scaleFactor', | ||
'Duration', | ||
'fraction_supported', | ||
'description', | ||
] | ||
if 'split' in cpu_aug_tbl: | ||
select_columns.append('split') | ||
|
||
if label in cpu_aug_tbl: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should this be an error at this point if not true? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good point, think I was just being overly-cautious here, will update the code. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Actually this code is used for non-training prediction too so label might not actually be in the table in that case. |
||
select_columns.append(label) | ||
|
||
# join predictions with select input features | ||
results_df = ( | ||
cpu_aug_tbl[select_columns] | ||
|
@@ -196,31 +197,39 @@ def predict( | |
|
||
if 'y' in results_df.columns: | ||
# reconstruct original gpu duration for validation purposes | ||
results_df['gpuDuration'] = results_df['Duration'] / results_df['y'] | ||
results_df['gpuDuration'] = np.floor(results_df['gpuDuration']) | ||
results_df[f'gpu_{label}'] = results_df[label] / results_df['y'] | ||
results_df[f'gpu_{label}'] = np.floor(results_df[f'gpu_{label}']) | ||
|
||
# adjust raw predictions with stage/sqlID filtering of unsupported ops | ||
results_df['Duration_pred'] = results_df['Duration'] * ( | ||
results_df[f'{label}_pred'] = results_df[label] * ( | ||
1.0 | ||
- results_df['fraction_supported'] | ||
+ (results_df['fraction_supported'] / results_df['y_pred']) | ||
) | ||
# compute fraction of duration in supported ops | ||
results_df['Duration_supported'] = ( | ||
results_df['Duration'] * results_df['fraction_supported'] | ||
results_df[f'{label}_supported'] = ( | ||
results_df[label] * results_df['fraction_supported'] | ||
) | ||
# compute adjusted speedup (vs. raw speedup prediction: 'y_pred') | ||
# without qual data, this should be the same as the raw 'y_pred' | ||
results_df['speedup_pred'] = results_df['Duration'] / results_df['Duration_pred'] | ||
results_df['speedup_pred'] = results_df[label] / results_df[f'{label}_pred'] | ||
results_df = results_df.drop(columns=['fraction_supported']) | ||
|
||
return results_df | ||
|
||
|
||
def extract_model_features( | ||
df: pd.DataFrame, split_functions: Mapping[str, Callable[[pd.DataFrame], pd.DataFrame]] = None | ||
df: pd.DataFrame, | ||
split_functions: Mapping[str, Callable[[pd.DataFrame], pd.DataFrame]] = None, | ||
) -> Tuple[pd.DataFrame, List[str], str]: | ||
"""Extract model features from raw features.""" | ||
label = get_label() | ||
expected_model_features = expected_raw_features - ignored_features | ||
expected_model_features.remove(label) | ||
if label == 'duration_sum': | ||
# for 'duration_sum' label, also remove 'duration_mean' since it's related to 'duration_sum' | ||
expected_model_features.remove('duration_mean') | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Keeping duration_mean could give opportunity for non-linear speedup estimate based on duration_mean. Not sure should be removed. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. duration_mean is directly computed from duration_sum / numTasks_sum, so I was trying to avoid leaking any (duration_sum) label information in the training features. That said, the true label would be the ratio of CPU/GPU duration_sum, so it might be ok to leave. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes the true label has GPU duration. Anything with CPU info is ok and doesn't leak. Though may be useless/unnecessary. |
||
|
||
missing = expected_raw_features - set(df.columns) | ||
if missing: | ||
logger.warning('Input dataframe is missing expected raw features: %s', missing) | ||
|
@@ -256,11 +265,11 @@ def extract_model_features( | |
'appName', | ||
'scaleFactor', | ||
'sqlID', | ||
'Duration', | ||
label, | ||
'description', | ||
] | ||
] | ||
gpu_aug_tbl = gpu_aug_tbl.rename(columns={'Duration': 'xgpu_Duration'}) | ||
gpu_aug_tbl = gpu_aug_tbl.rename(columns={label: f'xgpu_{label}'}) | ||
cpu_aug_tbl = cpu_aug_tbl.merge( | ||
gpu_aug_tbl, | ||
on=['appName', 'scaleFactor', 'sqlID', 'description'], | ||
|
@@ -269,7 +278,7 @@ def extract_model_features( | |
|
||
# warn for possible mismatched sqlIDs | ||
num_rows = len(cpu_aug_tbl) | ||
num_na = cpu_aug_tbl['xgpu_Duration'].isna().sum() | ||
num_na = cpu_aug_tbl[f'xgpu_{label}'].isna().sum() | ||
if ( | ||
num_na / num_rows > 0.05 | ||
): # arbitrary threshold, misaligned sqlIDs still may 'match' most of the time | ||
|
@@ -279,14 +288,14 @@ def extract_model_features( | |
num_rows, | ||
) | ||
|
||
# calculate Duration_speedup | ||
cpu_aug_tbl['Duration_speedup'] = ( | ||
cpu_aug_tbl['Duration'] / cpu_aug_tbl['xgpu_Duration'] | ||
# calculate speedup | ||
cpu_aug_tbl[f'{label}_speedup'] = ( | ||
cpu_aug_tbl[label] / cpu_aug_tbl[f'xgpu_{label}'] | ||
) | ||
cpu_aug_tbl = cpu_aug_tbl.drop(columns=['xgpu_Duration']) | ||
cpu_aug_tbl = cpu_aug_tbl.drop(columns=[f'xgpu_{label}']) | ||
|
||
# use Duration_speedup as label | ||
label_col = 'Duration_speedup' | ||
# use speedup as label | ||
label_col = f'{label}_speedup' | ||
else: | ||
# inference dataset with CPU runs only | ||
label_col = None | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -22,11 +22,14 @@ | |
import os | ||
import numpy as np | ||
import pandas as pd | ||
from spark_rapids_tools.tools.qualx.config import ( | ||
get_cache_dir, | ||
get_label, | ||
) | ||
from spark_rapids_tools.tools.qualx.util import ( | ||
ensure_directory, | ||
find_eventlogs, | ||
find_paths, | ||
get_cache_dir, | ||
get_logger, | ||
get_dataset_platforms, | ||
load_plugin, | ||
|
@@ -392,19 +395,20 @@ def infer_app_meta(eventlogs: List[str]) -> Mapping[str, Mapping]: | |
) | ||
|
||
# run any plugin hooks on profile_df | ||
for ds_name, plugin_path in plugins.items(): | ||
plugin = load_plugin(plugin_path) | ||
if plugin: | ||
df_schema = profile_df.dtypes | ||
dataset_df = profile_df.loc[ | ||
(profile_df.appName == ds_name) | (profile_df.appName.str.startswith(f'{ds_name}:')) | ||
] | ||
modified_dataset_df = plugin.load_profiles_hook(dataset_df) | ||
if modified_dataset_df.index.equals(dataset_df.index): | ||
profile_df.update(modified_dataset_df) | ||
profile_df.astype(df_schema) | ||
else: | ||
raise ValueError(f'Plugin: load_profiles_hook for {ds_name} unexpectedly modified row indices.') | ||
if not profile_df.empty: | ||
for ds_name, plugin_path in plugins.items(): | ||
plugin = load_plugin(plugin_path) | ||
if plugin: | ||
df_schema = profile_df.dtypes | ||
dataset_df = profile_df.loc[ | ||
(profile_df.appName == ds_name) | (profile_df.appName.str.startswith(f'{ds_name}:')) | ||
] | ||
modified_dataset_df = plugin.load_profiles_hook(dataset_df) | ||
if modified_dataset_df.index.equals(dataset_df.index): | ||
profile_df.update(modified_dataset_df) | ||
profile_df.astype(df_schema) | ||
else: | ||
raise ValueError(f'Plugin: load_profiles_hook for {ds_name} unexpectedly modified row indices.') | ||
return profile_df | ||
|
||
|
||
|
@@ -448,6 +452,10 @@ def combine_tables(table_name: str) -> pd.DataFrame: | |
fallback_reason=f'Empty feature tables found after preprocessing: {empty_tables_str}') | ||
return pd.DataFrame() | ||
|
||
if get_label() == 'duration_sum': | ||
# override appDuration with sum(duration_sum) across all stages | ||
app_tbl['appDuration'] = job_stage_agg_tbl['duration_sum'].astype(float).sum() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does duration_sum only include sql ids? I guess no way to get duration for sum for non sql parts? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes, although I'm seeing some weirdnesses w/ this number so taking another look. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed this code to correctly aggregate by appId, but the duration_sum is still only derived from whatever is available in the job_stage_agg_tbl. |
||
|
||
# normalize dtypes | ||
app_int_dtypes = ['taskCpu', 'taskGpu'] | ||
app_tbl[app_int_dtypes] = app_tbl[app_int_dtypes].fillna(0).astype(int) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
RAPIDS_USER_TOOLS_*
. Shall we apply the same concept for QualX related ones?QUALX_CACHE_DIR
: there is cache-directory used by the tools wrapper. Can we use the same value for both to reduce the number of variables needed by the tools? the tools uses env variableRAPIDS_USER_TOOLS_CACHE_FOLDER
and it has default variable to/var/tmp/spark_rapids_user_tools_cache
.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@amahussein I think there are a lot of scripts/tools that use these at the moment, so I'd leave renaming for another time. My hope is that this new
config.py
file will make it easier to refactor/rename in the future (while keeping changes minimal for now).