diff --git a/composer/2022_airflow_summit/data_analytics_dag.py b/composer/2022_airflow_summit/data_analytics_dag.py index 53dea433f607..a493b6b486ba 100644 --- a/composer/2022_airflow_summit/data_analytics_dag.py +++ b/composer/2022_airflow_summit/data_analytics_dag.py @@ -13,6 +13,7 @@ # limitations under the License. +# [START composer_dataanalyticstutorial_dag] import datetime from airflow import models @@ -46,13 +47,12 @@ f"{BQ_DESTINATION_DATASET_NAME}.{BQ_DESTINATION_TABLE_NAME}", f"{BQ_DESTINATION_DATASET_NAME}.{BQ_NORMALIZED_TABLE_NAME}", ], - }, "environment_config": { "execution_config": { "service_account": "{{var.value.dataproc_service_account}}" } - } + }, } yesterday = datetime.datetime.combine( @@ -66,7 +66,7 @@ # To email on failure or retry set 'email' arg to your email and enable # emailing here. "email_on_failure": False, - "email_on_retry": False + "email_on_retry": False, } with models.DAG( @@ -96,7 +96,7 @@ {"name": "Holiday", "type": "STRING"}, ], skip_leading_rows=1, - write_disposition="WRITE_TRUNCATE" + write_disposition="WRITE_TRUNCATE", ) with TaskGroup("join_bq_datasets") as bq_join_group: @@ -138,3 +138,4 @@ ) load_external_dataset >> bq_join_group >> create_batch +# [END composer_dataanalyticstutorial_dag] diff --git a/composer/2022_airflow_summit/data_analytics_process.py b/composer/2022_airflow_summit/data_analytics_process.py index 44e68b5c2f32..6429541a6c3b 100644 --- a/composer/2022_airflow_summit/data_analytics_process.py +++ b/composer/2022_airflow_summit/data_analytics_process.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. - +# [START composer_dataanalyticstutorial_dataprocjob] import sys @@ -51,7 +51,8 @@ # Uses the "overwrite" SaveMode to ensure DAG doesn't fail when being re-run # See https://spark.apache.org/docs/latest/sql-data-sources-load-save-functions.html#save-modes # for other save mode options - df.write.format("bigquery").option("temporaryGcsBucket", temp_path).mode("overwrite").save( - WRITE_TABLE - ) + df.write.format("bigquery").option("temporaryGcsBucket", temp_path).mode( + "overwrite" + ).save(WRITE_TABLE) print("Data written to BigQuery") +# [END composer_dataanalyticstutorial_dataprocjob]