diff --git a/src/sos/executor_utils.py b/src/sos/executor_utils.py index d0277c37a..de913964e 100644 --- a/src/sos/executor_utils.py +++ b/src/sos/executor_utils.py @@ -231,7 +231,6 @@ def create_task(global_def, task_stmt, task_params): task_tags.append(tag) # save task to a file - task_vars['__task_vars__'] = copy.copy(task_vars) taskdef = TaskParams( name='{} (index={})'.format( env.sos_dict['step_name'], env.sos_dict['_index']), diff --git a/src/sos/step_executor.py b/src/sos/step_executor.py index 367de5e42..522bbdd54 100755 --- a/src/sos/step_executor.py +++ b/src/sos/step_executor.py @@ -107,7 +107,7 @@ def get_job(self, all_tasks=False): for idx, (task_id, taskdef, _) in slot: master.push(task_id, taskdef) ids.append(master.ID) - TaskFile(master.ID).save(master) + TaskFile(master.ID).save(master.finalize()) env.signature_push_socket.send_pyobj(['workflow', 'task', master.ID, f"{{'creation_time': {time.time()}}}"]) self._unsubmitted_slots = [] @@ -146,7 +146,7 @@ def get_job(self, all_tasks=False): master.push(task_id, taskdef) # the last piece if master is not None: - TaskFile(master.ID).save(master) + TaskFile(master.ID).save(master.finalize()) env.signature_push_socket.send_pyobj(['workflow', 'task', master.ID, f"{{'creation_time': {time.time()}}}"]) ids.append(master.ID) diff --git a/src/sos/task_executor.py b/src/sos/task_executor.py index 74393b34b..db2b5b941 100644 --- a/src/sos/task_executor.py +++ b/src/sos/task_executor.py @@ -238,6 +238,8 @@ def copy_out_and_err(result): p = Pool(params.num_workers) results = [] for tid, tdef in params.task_stack: + if hasattr(params, 'common_dict'): + tdef.sos_dict.update(params.common_dict) results.append(p.apply_async(_execute_task, ((tid, tdef, {tid: sig_content.get(tid, {})}), verbosity, runmode, sigmode, None, None), callback=copy_out_and_err)) @@ -257,6 +259,8 @@ def copy_out_and_err(result): else: results = [] for tid, tdef in params.task_stack: + if hasattr(params, 'common_dict'): + tdef.sos_dict.update(params.common_dict) # no monitor process for subtasks res = _execute_task((tid, tdef, {tid: sig_content.get(tid, {})}), verbosity=verbosity, runmode=runmode, sigmode=sigmode, monitor_interval=None, resource_monitor_interval=None) diff --git a/src/sos/tasks.py b/src/sos/tasks.py index 559d59617..e75a590f6 100644 --- a/src/sos/tasks.py +++ b/src/sos/tasks.py @@ -132,6 +132,23 @@ def push(self, task_id, params): self.ID = f'M{len(self.task_stack)}_{self.task_stack[0][0]}' self.name = self.ID + def finalize(self): + if not self.task_stack: + return + common_dict = None + common_keys = set() + for id, params in self.task_stack: + if common_dict is None: + common_dict = params.sos_dict + common_keys = set(params.sos_dict.keys()) + else: + common_keys = {key for key in common_keys if key in params.sos_dict and common_dict[key] == params.sos_dict[key]} + if not common_keys: + break + self.common_dict = {x:common_dict[x] for x in common_keys} + for id, params in self.task_stack: + params.sos_dict = {k:v for k,v in params.sos_dict.items() if k not in common_keys} + return self class TaskStatus(Enum): new = 0