Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create dataproc serverless spark batches operator #19248

Merged

Conversation

MaksYermak
Copy link
Contributor

@MaksYermak MaksYermak commented Oct 27, 2021

Create operator for working with Batches for Google Dataproc. Includes operators, hooks, example dags, tests and docs.

Co-authored-by: Wojciech Januszek januszek@google.com
Co-authored-by: Lukasz Wyszomirski wyszomirski@google.com
Co-authored-by: Maksim Yermakou maksimy@google.com


^ Add meaningful description above

Read the Pull Request Guidelines for more information.
In case of fundamental code change, Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in UPDATING.md.

@boring-cyborg
Copy link

boring-cyborg bot commented Oct 27, 2021

Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contribution Guide (https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst)
Here are some useful points:

  • Pay attention to the quality of your code (flake8, mypy and type annotations). Our pre-commits will help you with that.
  • In case of a new feature add useful documentation (in docstrings or in docs/ directory). Adding a new operator? Check this short guide Consider adding an example DAG that shows how users should use it.
  • Consider using Breeze environment for testing locally, it’s a heavy docker but it ships with a working Airflow and a lot of integrations.
  • Be patient and persistent. It might take some time to get a review or get the final approval from Committers.
  • Please follow ASF Code of Conduct for all communication including (but not limited to) comments on Pull Requests, Mailing list and Slack.
  • Be sure to read the Airflow Coding style.
    Apache Airflow is a community-driven project and together we are making it better 🚀.
    In case of doubts contact the developers at:
    Mailing List: dev@airflow.apache.org
    Slack: https://s.apache.org/airflow-slack

:type timeout: float
:param metadata: Additional metadata that is provided to the method.
:type metadata: Sequence[Tuple[str, str]]
"""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing gcp_conn_id and impersonation_chain from the docstring.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@josh-fell I've added it

@josh-fell
Copy link
Contributor

josh-fell commented Oct 28, 2021

@MaksYermak General thoughts across the new operators:

  • WDYT about adding batch_id as a template_field? Would it be beneficial for users to be able to dynamically create batch_id values?
  • Adjacently, is batch_id something the DataprocCreateBatchOperator can push as an XCom such that the value can be used in downstream tasks as an XComArgs (i.e. the use case in the example DAG if the .output property is used as an input to the DataprocGetBatchOperator and DataprocDeleteBatchOperator tasks)?

Not saying it needs to/should be implemented but thought these might be good (and possibly cheap) features.

@MaksYermak
Copy link
Contributor Author

MaksYermak commented Nov 2, 2021

@MaksYermak General thoughts across the new operators:

  • WDYT about adding batch_id as a template_field? Would it be beneficial for users to be able to dynamically create batch_id values?
  • Adjacently, is batch_id something the DataprocCreateBatchOperator can push as an XCom such that the value can be used in downstream tasks as an XComArgs (i.e. the use case in the example DAG if the .output property is used as an input to the DataprocGetBatchOperator and DataprocDeleteBatchOperator tasks)?

Not saying it needs to/should be implemented but thought these might be good (and possibly cheap) features.

@josh-fell answers for your questions

  1. I think it can be useful I will add it to a code.
  2. No, because DataprocCreateBatchOperator returns a Batch object, not a batch_id

@MaksYermak MaksYermak force-pushed the create-dataproc-serverless-spark-batches-operator branch 2 times, most recently from 7d1ea91 to e9a956d Compare November 10, 2021 17:05
@MaksYermak
Copy link
Contributor Author

@turbaszek @josh-fell @mik-laj hi guys, could you look on this PR one more time?

@MaksYermak MaksYermak force-pushed the create-dataproc-serverless-spark-batches-operator branch from e9a956d to 4065232 Compare November 16, 2021 13:39
@josh-fell
Copy link
Contributor

@MaksYermak Should batch_id be added to template_fields in DataprocGetBatchOperator and DataprocDeleteBatchOperator as well? No strong opinion though.

LGTM 👍

@MaksYermak MaksYermak force-pushed the create-dataproc-serverless-spark-batches-operator branch from 4065232 to 0c1b5e5 Compare November 17, 2021 09:27
@MaksYermak
Copy link
Contributor Author

@MaksYermak Should batch_id be added to template_fields in DataprocGetBatchOperator and DataprocDeleteBatchOperator as well? No strong opinion though.

LGTM 👍

@josh-fell make sense, I have added it to the code.

@MaksYermak
Copy link
Contributor Author

@turbaszek @josh-fell @mik-laj hi guys, could you look and approve this PR for merge if all good?

@lwyszomi
Copy link
Contributor

@turbaszek @josh-fell @mik-laj Is the a chance to merge this?

@josh-fell
Copy link
Contributor

@MaksYermak @lwyszomi There is a static check that is failing. Can you address this?

FYI - I am not able to merge. A code owner will have to do that.

@potiuk potiuk merged commit bf68b9a into apache:main Nov 26, 2021
@boring-cyborg
Copy link

boring-cyborg bot commented Nov 26, 2021

Awesome work, congrats on your first merged pull request!

dillonjohnson pushed a commit to dillonjohnson/airflow that referenced this pull request Dec 1, 2021
@potiuk potiuk added the changelog:skip Changes that should be skipped from the changelog (CI, tests, etc..) label Jan 22, 2022
@potiuk potiuk added this to the Airflow 2.2.4 milestone Jan 22, 2022
potiuk pushed a commit that referenced this pull request Jan 22, 2022
jedcunningham pushed a commit that referenced this pull request Jan 27, 2022
@aoelvp94
Copy link

A question here, how can I specify the docker image to run pyspark workloads in custom containers using DataprocCreateBatchOperator?

@lwyszomi
Copy link
Contributor

@aoelvp94 I'm not a expert of Dataproc serverless service, but operator is only a wrapper for the SDK and we take the same parameters as we heve there. So in the Batch definision we have runtimeConfig where you can specify the image. I hope this will help you.

Refs:
Batch -> https://cloud.google.com/dataproc-serverless/docs/reference/rest/v1/projects.locations.batches#Batch
runtimeConfig -> https://cloud.google.com/dataproc-serverless/docs/reference/rest/v1/projects.locations.batches#RuntimeConfig

@aoelvp94
Copy link

Yeah @lwyszomi I am trying with something like that. I am trying that now so if I have news I will leave the code snippet for future references

@aoelvp94
Copy link

aoelvp94 commented Mar 11, 2022

@lwyszomi No success when I tried to set jinja params in the Batch object, do you know why?

@lwyszomi
Copy link
Contributor

@aoelvp94 which version of the provider you are using? The Batch object is templated field starting from 6.4.0

@aoelvp94
Copy link

@aoelvp94 which version of the provider you are using? The Batch object is templated field starting from 6.4.0

I am using composer-2.0.5-airflow-2.2.3 so I have 6.4.0

@aoelvp94
Copy link

Another question, why the operator don't generate a hash in batch_id field as happens in DataprocSubmitPySparkJobOperator (I am replacing this implementation)? I have to delete the batch to run again the same batch (I prefer having the historical runs)

@lwyszomi
Copy link
Contributor

@aoelvp94 which version of the provider you are using? The Batch object is templated field starting from 6.4.0

I am using composer-2.0.5-airflow-2.2.3 so I have 6.4.0

I will need more investigation why jijna paramams doesn't work, I checked and you have right that for Composer 2.0.5 using 6.4.0.

Another question, why the operator don't generate a hash in batch_id field as happens in DataprocSubmitPySparkJobOperator (I am replacing this implementation)? I have to delete the batch to run again the same batch (I prefer having the historical runs)

We created operators based on the exisiting SDK, we didn't add any extra logic to add hash to the batch_id.

@lwyszomi
Copy link
Contributor

@aoelvp94 for which property you want to use jinja template?

@aoelvp94
Copy link

batch when I use Batch / PySparkBatch object

@lwyszomi
Copy link
Contributor

@aoelvp94 butch is tepletized so it should work, but which property inside Batch/PySparkBatch you try to templetize

@lwyszomi
Copy link
Contributor

can you share Batch config?

@aoelvp94
Copy link

image
I removed the main python file on purpose.

def get_week_ago(dt):
    bd = dt + relativedelta(days=-7)
    return bd.strftime('%Y-%m-%d')

dag = DAG(
        dag_id=f"{table_name}_table_v4",
        ...
        user_defined_macros={'week_ago': get_week_ago}
    )


create_batch = DataprocCreateBatchOperator(
        task_id="create_batch",
        project_id=project_id,
        region=f"{{{{ var.value.region_{environment} }}}}",
        batch=Batch(pyspark_batch=PySparkBatch(
        main_python_file_uri=main_python_file_uri,
        python_file_uris=files,
        args=[
            "--start-date",
            "{{ week_ago(execution_date) }}", # it doesn't work
            "--end-date",
            "{{ ds }}", # it doesn't work
            "--table",
            table_name,
            "--source-bucket",
            source_bucket,
            "--source-prefix",
            source_prefix,
            "--output-path",
            f"{output_path}/{table_name}/",
        ],
    ), runtime_config=RuntimeConfig(container_image="gcr.io/company/image_name:1.0.1")),
        batch_id=f"pyspark-table-{table_name.replace('_','-')}-{{{{ ds_nodash }}}}",
        retry=DEFAULT,
        timeout=500,
    )

@lwyszomi
Copy link
Contributor

@aoelvp94 thanks, we will check why this not work, I will back to you with update when I will have any information

@marekw-openx
Copy link

@aoelvp94, have you tried passing batch config in the form of python dict? in my case I'm using just a python dict and it gets templetized, like:

batch = {
  "spark_batch": {
    "spark_job": {
      "args": ["-d", "{{ ts }}"]
    }
  }
  (...)
}

also, please double check the apache-airflow-providers-google version (batch wasn't templated in the 6.2.0)

@MaksYermak
Copy link
Contributor Author

@aoelvp94 I have checked your configuration for correct work you should use dictionary instead of Batch() object. It is because in some reasons Airflow can't template object's property.

batch = {
    "pyspark_batch": {
        "main_python_file_uri": main_python_file_uri,
        "python_file_uris": files,
        "args": [
            "--start-date={{ get_week_ago(data_interval_start) }}",
            "--end-date={{ ds }}",
            (. . .)
        ],
    },
    ( . . .)
}

One more thing in the last Jinja version execution_date from the template is deprecated. Please use data_interval_start or logical_date instead.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:providers changelog:skip Changes that should be skipped from the changelog (CI, tests, etc..) full tests needed We need to run full set of tests for this PR to merge kind:documentation provider:google Google (including GCP) related issues
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants