Skip to content

Commit

Permalink
Allow sos_step('a') to trigger a subworkflow a #983
Browse files Browse the repository at this point in the history
  • Loading branch information
Bo Peng committed Jun 16, 2018
1 parent da23232 commit 4fd39a9
Show file tree
Hide file tree
Showing 5 changed files with 131 additions and 6 deletions.
2 changes: 1 addition & 1 deletion src/sos/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ def step_name(self, alias: bool = False) -> str:
def match(self, step_name: str) -> bool:
# if this step provides name...
for name, index, _ in self.names:
if (step_name == name and self.index is None) or step_name == f'{name}_{0 if index is None else int(index)}':
if step_name == name or step_name == f'{name}_{0 if index is None else int(index)}':
return True
return False

Expand Down
3 changes: 2 additions & 1 deletion src/sos/targets.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,8 @@ def __init__(self, step_name):
def target_exists(self, mode='any'):
# the target exists only if it has been executed?
# which is indicated by a variable
return '__completed__' in env.sos_dict and self._step_name in env.sos_dict['__completed__']
return '__completed__' in env.sos_dict and (self._step_name in env.sos_dict['__completed__'] or
self._step_name in [x.rsplit('_')[0] for x in env.sos_dict['__completed__']])

def target_name(self):
return self._step_name
Expand Down
64 changes: 62 additions & 2 deletions src/sos/workflow_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -577,8 +577,68 @@ def resolve_dangling_targets(self, dag: SoS_DAG, targets: Optional[sos_targets]=
resolved += 1
continue
if len(mo) > 1:
raise RuntimeError(
f'Multiple steps {", ".join(x[0].step_name() for x in mo)} to generate target {target}')
# sos_step('a') could match to step a_1, a_2, etc, in this case we are adding a subworkflow
if isinstance(target, sos_step):
# get the step names
sections = sorted([x[0] for x in mo], key=lambda x: x.step_name())
# no default input
default_input: sos_targets = sos_targets()
#
for idx, section in enumerate(sections):
if self.skip(section):
continue
res = analyze_section(section, default_input)

environ_vars = res['environ_vars'] - self._base_symbols
signature_vars = res['signature_vars'] - self._base_symbols
changed_vars = res['changed_vars']
# parameters, if used in the step, should be considered environmental
environ_vars |= env.parameter_vars & signature_vars

# add shared to targets
if res['changed_vars']:
if 'provides' in section.options:
if isinstance(section.options['provides'], str):
section.options.set(
'provides', [section.options['provides']])
else:
section.options.set('provides', [])
#
section.options.set('provides',
section.options['provides'] + [sos_variable(var) for var in changed_vars])

# build DAG with input and output files of step
env.logger.debug(
f'Adding step {res["step_name"]} with output {short_repr(res["step_output"])} to resolve target {target}')
context = {
'__signature_vars__': signature_vars,
'__environ_vars__': environ_vars,
'__changed_vars__': changed_vars,
}
if idx == 0:
context['__step_output__'] = env.sos_dict['__step_output__']
else:
res['step_depends'].extend(sos_step(sections[idx-1].step_name()))
if idx == len(sections) - 1:
# for the last step, we say the mini-subworkflow satisfies sos_step('a')
# we have to do it this way because by default the DAG only sees sos_step('a_1') etc
res['step_output'].extend(target)

node_name = section.step_name()
dag.add_step(section.uuid,
node_name, None,
res['step_input'],
res['step_depends'],
res['step_output'],
context=context)
default_input = res['step_output']
added_node += len(sections)
resolved += 1
#dag.show_nodes()
continue
else:
raise RuntimeError(
f'Multiple steps {", ".join(x[0].step_name() for x in mo)} to generate target {target}')
#
# only one step, we need to process it # execute section with specified input
#
Expand Down
64 changes: 64 additions & 0 deletions test/test_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -1083,6 +1083,70 @@ def testForwardStyleDepend(self):
Base_Executor(wf).run()
self.assertTrue(file_target('a.txt.bak').target_exists())

def testSoSStepMiniworkflow(self):
'''Test the addition of mini forward workflows introduced by sos_step'''
script = SoS_Script('''
[a_1]
print(step_name)
[a_2]
print(step_name)
[a_20]
print(step_name)
[b_1]
print(step_name)
[b_2]
print(step_name)
[b_20]
depends: sos_step('c')
print(step_name)
[c_1]
print(step_name)
[c_2]
print(step_name)
[c_20]
print(step_name)
[default]
depends: sos_step('a'), sos_step('b')
''')
wf = script.workflow()
Base_Executor(wf, config={'output_dag': 'test.dot'}
).initialize_dag()
# note that A2 is no longer mentioned
self.assertDAG('test.dot',
'''
strict digraph "" {
default;
a_1;
a_2;
a_20;
b_1;
b_2;
b_20;
c_1;
c_2;
c_20;
a_1 -> a_2;
a_2 -> a_20;
a_20 -> default;
b_1 -> b_2;
b_2 -> b_20;
b_20 -> default;
c_1 -> c_2;
c_2 -> c_20;
c_20 -> b_20;
}
''')


if __name__ == '__main__':
unittest.main()
4 changes: 2 additions & 2 deletions test/test_execute.py
Original file line number Diff line number Diff line change
Expand Up @@ -1200,8 +1200,8 @@ def testMultiSoSStep(self):
wf = script.workflow()
res = Base_Executor(wf).run()
self.assertEqual(res['__completed__']['__step_completed__'], 3)
self.assertTrue(os.path.is_file('a_1')
self.assertTrue(os.path.is_file('a_2')
self.assertTrue(os.path.isfile('a_1'))
self.assertTrue(os.path.isfile('a_2'))

if __name__ == '__main__':
unittest.main()

0 comments on commit 4fd39a9

Please sign in to comment.