Skip to content

Commit

Permalink
Update configs
Browse files Browse the repository at this point in the history
  • Loading branch information
robinholzi committed Sep 15, 2024
1 parent 5cb048d commit 0f9c602
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def gen_pipeline_config(
dataloader_workers=1,
use_previous_model=True,
initial_model="random",
batch_size=256,
batch_size=128, # gpu memory limit does't allow for larger batch sizes
shuffle=True,
optimizers=[
OptimizerConfig(
Expand Down
30 changes: 17 additions & 13 deletions experiments/arxiv/compare_trigger_policies/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,15 @@ def construct_periodic_eval_handlers(


def construct_between_trigger_eval_handler(execution_time: EvalHandlerExecutionTime = "manual") -> EvalHandlerConfig:
return EvalHandlerConfig(
name="full",
execution_time=execution_time,
models="active",
strategy=BetweenTwoTriggersEvalStrategyConfig(),
datasets=["arxiv_kaggle_all"], # train and test
)
return [
EvalHandlerConfig(
name="full",
execution_time=execution_time,
models="active",
strategy=BetweenTwoTriggersEvalStrategyConfig(),
datasets=["arxiv_kaggle_all"], # train and test
)
]


def construct_pipelines(experiment: Experiment) -> list[ModynPipelineConfig]:
Expand Down Expand Up @@ -103,8 +105,6 @@ def construct_pipelines(experiment: Experiment) -> list[ModynPipelineConfig]:
# 1X: Baselines with PERIODIC_EVAL_INTERVAL, executed with cautious #
# parallelism and post factum evaluation (bottlenecking) #
# -------------------------------------------------------------------------------- #
# TODO: merge main
# TODO: reset datasets in db
# time baselines
10: Experiment(
name="arxiv-baseline-time",
Expand All @@ -114,9 +114,11 @@ def construct_pipelines(experiment: Experiment) -> list[ModynPipelineConfig]:
),
time_triggers={
schedule: TimeTriggerConfig(every=schedule, start_timestamp=_FIRST_TIMESTAMP)
for schedule in reversed(["52w", "2y", "5y", "10y"]) # TODO: add 1y
for schedule in reversed(["26w", "10y"])
# 0: "1y", "2y", "5y"
# 1: "26w", "10y"
},
gpu_device="cuda:0",
gpu_device="cuda:2",
),
# data amount baselines
11: Experiment(
Expand All @@ -127,9 +129,11 @@ def construct_pipelines(experiment: Experiment) -> list[ModynPipelineConfig]:
),
data_amount_triggers={
f"{num_samples}": DataAmountTriggerConfig(num_samples=num_samples)
for num_samples in reversed([50_000, 100_000, 500_000]) # TODO: add 25_000
for num_samples in reversed([25_000, 50_000])
# 2: 100_000, 500_000, 1_000_000
# 3: 25_000, 50_000
},
gpu_device="cuda:1",
gpu_device="cuda:3",
),
# -------------------------------------------------------------------------------- #
# 2X: Drift triggers #
Expand Down
17 changes: 5 additions & 12 deletions experiments/huffpost/compare_trigger_policies/pipeline_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,22 +51,15 @@ def gen_pipeline_config(
dataloader_workers=1,
use_previous_model=True,
initial_model="random",
batch_size=256,
batch_size=128, # gpu memory limit does't allow for larger batch sizes
shuffle=True,
optimizers=[
OptimizerConfig(
name="default",
algorithm="SGD",
algorithm="AdamW",
source="PyTorch",
param_groups=[
OptimizerParamGroup(
name="default",
algorithm="AdamW",
source="PyTorch",
param_groups=[
OptimizerParamGroup(module="model", config={"lr": 0.00002, "weight_decay": 0.01})
],
)
OptimizerParamGroup(module="model", config={"lr": 0.00002, "weight_decay": 0.01})
],
)
],
Expand All @@ -88,8 +81,8 @@ def gen_pipeline_config(
evaluation=EvaluationConfig(
handlers=eval_handlers,
device=gpu_device,
after_training_evaluation_workers=8,
after_pipeline_evaluation_workers=8,
after_training_evaluation_workers=2, # one worker needs 8-9 GB of memory
after_pipeline_evaluation_workers=2,
datasets=[
EvalDataConfig(
dataset_id=hp_dataset_name,
Expand Down
30 changes: 21 additions & 9 deletions modyn/supervisor/internal/pipeline_executor/evaluation_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,9 +176,15 @@ def run_pipeline_evaluations(
logs = self._launch_evaluations_async(eval_requests, log, num_workers)
return logs

def run_post_pipeline_evaluations(self, manual_run: bool = False) -> SupervisorLogs:
def run_post_pipeline_evaluations(self, manual_run: bool = False, num_workers: int | None = None) -> SupervisorLogs:
"""Evaluate the trained models after the core pipeline and store the
results."""
results.
Args:
manual_run: If True, only the evaluations that are marked as manual will be executed.
num_workers: The number of workers to use for the evaluations. If None, the number of workers will be
determined by the pipeline configuration.
"""
if not self.pipeline.evaluation:
return SupervisorLogs(stage_runs=[])

Expand Down Expand Up @@ -224,7 +230,9 @@ def run_post_pipeline_evaluations(self, manual_run: bool = False) -> SupervisorL
sample_time=-1,
trigger_idx=-1,
),
num_workers=self.pipeline.evaluation.after_pipeline_evaluation_workers,
num_workers=(
num_workers if num_workers else self.pipeline.evaluation.after_pipeline_evaluation_workers
)
)
return logs

Expand Down Expand Up @@ -420,17 +428,17 @@ def get_failure_reason(eval_aborted_reason: EvaluationAbortedReason) -> str:
# ------------------------------------------------------------------------------------ #


def eval_executor_single_pipeline(pipeline_dir: Path) -> SupervisorLogs:
def eval_executor_single_pipeline(pipeline_dir: Path, num_workers: int) -> SupervisorLogs:
# restart evaluation executor
ex = EvaluationExecutor.init_from_path(pipeline_dir)

supervisor_eval_logs = ex.run_post_pipeline_evaluations(manual_run=True)
supervisor_eval_logs = ex.run_post_pipeline_evaluations(manual_run=True, num_workers=num_workers)
logger.info("Done with manual evaluation.")

return supervisor_eval_logs


def eval_executor_multi_pipeline(pipelines_dir: Path, pids: list[int] | None = None) -> None:
def eval_executor_multi_pipeline(pipelines_dir: Path, num_workers: int, pids: list[int] | None = None) -> None:
"""Run the evaluation executor for multiple pipelines."""
faulty_dir = pipelines_dir / "_faulty"
done_dir = pipelines_dir / "_done"
Expand All @@ -454,7 +462,7 @@ def eval_executor_multi_pipeline(pipelines_dir: Path, pids: list[int] | None = N

(finished_dir / p_dir.stem).mkdir(exist_ok=True)

supervisor_eval_logs = eval_executor_single_pipeline(p_dir)
supervisor_eval_logs = eval_executor_single_pipeline(p_dir, num_workers=num_workers)

shutil.copytree(p_dir, done_dir / p_dir.stem, dirs_exist_ok=True)
full_logs = PipelineLogs.model_validate_json((done_dir / p_dir.stem / "pipeline.log").read_text())
Expand All @@ -473,11 +481,15 @@ def eval_executor_multi_pipeline(pipelines_dir: Path, pids: list[int] | None = N
print("Path not found")
sys.exit(1)

num_workers = int(input("Enter number of workers (<= 0 will use the pipeline default): "))
if num_workers <= 0:
num_workers = None

if single_pipeline_mode.lower() == "y":
p_id = int(input("Enter pipeline id: "))
eval_executor_multi_pipeline(userpath, [p_id])
eval_executor_multi_pipeline(userpath, num_workers=num_workers, pids=[p_id])
elif single_pipeline_mode.lower() == "n":
eval_executor_multi_pipeline(userpath)
eval_executor_multi_pipeline(userpath, num_workers=num_workers)
else:
print("Invalid input")
sys.exit(1)
Expand Down

0 comments on commit 0f9c602

Please sign in to comment.