Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WorkflowsCreateExecutionOperator execution argument only receive bytes #27165

Closed
1 of 2 tasks
akakakakakaa opened this issue Oct 20, 2022 · 5 comments · Fixed by #27361
Closed
1 of 2 tasks

WorkflowsCreateExecutionOperator execution argument only receive bytes #27165

akakakakakaa opened this issue Oct 20, 2022 · 5 comments · Fixed by #27361

Comments

@akakakakakaa
Copy link
Contributor

Apache Airflow Provider(s)

google

Versions of Apache Airflow Providers

apache-airflow-providers-google==7.0.0

Apache Airflow version

2.3.2

Operating System

Ubuntu 20.04.5 LTS (Focal Fossa)

Deployment

Docker-Compose

Deployment details

No response

What happened

WorkflowsCreateExecutionOperator triggers google cloud workflows and execution param receives argument as {"argument": {"key": "val", "key", "val"...}

But, When I passed argument as dict using render_template_as_native_obj=True, protobuf error occured TypeError: {'projectId': 'project-id', 'location': 'us-east1'} has type dict, but expected one of: bytes, unicode.

When I passed argument as bytes {"argument": b'{\n "projectId": "project-id",\n "location": "us-east1"\n}' It working.

What you think should happen instead

execution argument should be Dict instead of bytes.

How to reproduce

not working

from airflow import DAG
from airflow.models.param import Param
from airflow.operators.dummy_operator import DummyOperator
from airflow.providers.google.cloud.operators.workflows import WorkflowsCreateExecutionOperator

with DAG(
    dag_id="continual_learning_deid_norm_h2h_test",
    params={
        "location": Param(type="string", default="us-east1"),
        "project_id": Param(type="string", default="project-id"),
        "workflow_id": Param(type="string", default="orkflow"),
        "workflow_execution_info": {
            "argument": {
                "projectId": "project-id",
                "location": "us-east1"
            }
        }
    },
    render_template_as_native_obj=True
) as dag:
    execution = "{{ params.workflow_execution_info }}"
    create_execution = WorkflowsCreateExecutionOperator(
        task_id="create_execution",
        location="{{ params.location }}",
        project_id="{{ params.project_id }}",
        workflow_id="{{ params.workflow_id }}",
        execution="{{ params.workflow_execution_info }}"
    )

    start_operator = DummyOperator(task_id='test_task')

    start_operator >> create_execution

working

from airflow import DAG
from airflow.models.param import Param
from airflow.operators.dummy_operator import DummyOperator
from airflow.providers.google.cloud.operators.workflows import WorkflowsCreateExecutionOperator

with DAG(
    dag_id="continual_learning_deid_norm_h2h_test",
    params={
        "location": Param(type="string", default="us-east1"),
        "project_id": Param(type="string", default="project-id"),
        "workflow_id": Param(type="string", default="orkflow"),
        "workflow_execution_info": {
            "argument": b'{\n  "projectId": "project-id",\n  "location": "us-east1"\n}'
        }
    },
    render_template_as_native_obj=True
) as dag:
    execution = "{{ params.workflow_execution_info }}"
    create_execution = WorkflowsCreateExecutionOperator(
        task_id="create_execution",
        location="{{ params.location }}",
        project_id="{{ params.project_id }}",
        workflow_id="{{ params.workflow_id }}",
        execution="{{ params.workflow_execution_info }}"
    )

    start_operator = DummyOperator(task_id='test_task')

    start_operator >> create_execution

Anything else

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@potiuk
Copy link
Member

potiuk commented Oct 28, 2022

Hopefully someone will pick it - marked is as good-first-issue - also if you wnt @akakakakakaa you can attempt to make a PR and fix it - might be nice first contribution. Otherwise i will have to wait for someone who will pick it.

@rkarish
Copy link
Contributor

rkarish commented Oct 29, 2022

I'll take this one if it's still available.

@rkarish
Copy link
Contributor

rkarish commented Oct 29, 2022

I've opened a PR for this. Tests for WorkflowsHook and WorkflowsCreateExecutionOperator are passing. Is there anything else we should be concerned about here?

@rkarish
Copy link
Contributor

rkarish commented Oct 29, 2022

Also want to mention that project_id parameter is not a templated field.

rkarish added a commit to rkarish/airflow that referenced this issue Oct 29, 2022
@potiuk
Copy link
Member

potiuk commented Oct 29, 2022

commented there. You need to add test. And making project_id templated for those might be a good idea for another PR

rkarish added a commit to rkarish/airflow that referenced this issue Oct 30, 2022
potiuk pushed a commit that referenced this issue Oct 31, 2022
…o be dicts (#27361)

 Convert execution dict values to str (#27165)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants