Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[sdk] V2 IR compiler cannot compile nested parallel loop #6383

Closed
Tomcli opened this issue Aug 18, 2021 · 1 comment · Fixed by #6643
Closed

[sdk] V2 IR compiler cannot compile nested parallel loop #6383

Tomcli opened this issue Aug 18, 2021 · 1 comment · Fixed by #6643
Assignees

Comments

@Tomcli
Copy link
Member

Tomcli commented Aug 18, 2021

Environment

  • KFP version: 1.7.0-rc3
  • KFP SDK version: 1.7.0
  • All dependencies version:

Steps to reproduce

Below is a simple pipeline to run a parallel loop inside another parallel loop. This works on the V1 compiler but not the V2 IR compiler. The errors we got is about incorrect parameters.

Example pipeline:

import kfp.dsl as dsl
from kfp import components
import kfp

op1_yaml = '''\
name: 'my-in-coop1'
inputs:
- {name: item, type: Integer}
- {name: my_pipe_param, type: Integer}
implementation:
    container:
        image: library/bash:4.4.23
        command: ['sh', '-c']
        args:
        - |
          set -e
          echo op1 "$0" "$1"
        - {inputValue: item}
        - {inputValue: my_pipe_param}
'''

op11_yaml = '''\
name: 'my-inner-inner-coop'
inputs:
- {name: item, type: Integer}
- {name: inner_item, type: Integer}
- {name: my_pipe_param, type: Integer}
implementation:
    container:
        image: library/bash:4.4.23
        command: ['sh', '-c']
        args:
        - |
          set -e
          echo op11 "$0" "$1" "$2"
        - {inputValue: item}
        - {inputValue: inner_item}
        - {inputValue: my_pipe_param}
'''

op2_yaml = '''\
name: 'my-in-coop2'
inputs:
- {name: item, type: Integer}
implementation:
    container:
        image: library/bash:4.4.23
        command: ['sh', '-c']
        args:
        - |
          set -e
          echo op2 "$0"
        - {inputValue: item}
'''

op_out_yaml = '''\
name: 'my-out-cop'
inputs:
- {name: my_pipe_param, type: Integer}
implementation:
    container:
        image: library/bash:4.4.23
        command: ['sh', '-c']
        args:
        - |
          set -e
          echo "$0"
        - {inputValue: my_pipe_param}
'''

@dsl.pipeline(name='my-pipeline')
def pipeline(my_pipe_param: int = 10):
    loop_args = [1, 2]
    with dsl.ParallelFor(loop_args) as item:
        op1_template = components.load_component_from_text(op1_yaml)
        op1 = op1_template(item, my_pipe_param)

        with dsl.ParallelFor([100, 200, 300]) as inner_item:
            op11_template = components.load_component_from_text(op11_yaml)
            op11 = op11_template(item, inner_item, my_pipe_param)

        op2_template = components.load_component_from_text(op2_yaml)
        op2 = op2_template(item)

    op_out_template = components.load_component_from_text(op_out_yaml)
    op_out = op_out_template(my_pipe_param)


if __name__ == '__main__':
    from kfp.compiler import Compiler
    Compiler().compile(pipeline, __file__.replace('.py', '.yaml')) # V1 works
    kfp.v2.compiler.Compiler().compile(pipeline, __file__.replace('.py','.json')) # V2 IR has errors

Errors we got from the v2 IR compiler:

Traceback (most recent call last):
  File "test.py", line 92, in <module>
    kfp.v2.compiler.Compiler().compile(pipeline, __file__.replace('.py','.json'))
  File "/Users/tommyli/opt/anaconda3/envs/py3/lib/python3.8/site-packages/kfp/v2/compiler/compiler.py", line 1137, in compile
    pipeline_job = self._create_pipeline_v2(
  File "/Users/tommyli/opt/anaconda3/envs/py3/lib/python3.8/site-packages/kfp/v2/compiler/compiler.py", line 1093, in _create_pipeline_v2
    pipeline_spec = self._create_pipeline_spec(
  File "/Users/tommyli/opt/anaconda3/envs/py3/lib/python3.8/site-packages/kfp/v2/compiler/compiler.py", line 915, in _create_pipeline_spec
    self._group_to_dag_spec(
  File "/Users/tommyli/opt/anaconda3/envs/py3/lib/python3.8/site-packages/kfp/v2/compiler/compiler.py", line 684, in _group_to_dag_spec
    dsl_component_spec.update_task_inputs_spec(
  File "/Users/tommyli/opt/anaconda3/envs/py3/lib/python3.8/site-packages/kfp/dsl/component_spec.py", line 336, in update_task_inputs_spec
    assert component_input_parameter in parent_component_inputs.parameters, \
AssertionError: component_input_parameter: pipelineparam--pipelineparam--loop-item-param-1 not found. All inputs: parameters {
  key: "pipelineparam--loop-item-param-3"
  value {
    type: STRING
  }
}
parameters {
  key: "pipelineparam--my_pipe_param"
  value {
    type: INT
  }
}

Expected result

The IR should able to map the input parameters for sub-dag in sub-dag similar to the V1 Argo compiler.

Materials and Reference


Impacted by this bug? Give it a 👍. We prioritise the issues with the most 👍.

@Tomcli
Copy link
Member Author

Tomcli commented Aug 18, 2021

/cc @neuromage @chensun @animeshsingh

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants