Skip to content

Commit

Permalink
Add region tags (#8198)
Browse files Browse the repository at this point in the history
* add region tags to pyspark job

* Add region tag for dag
  • Loading branch information
leahecole authored Jul 20, 2022
1 parent f9fe9d8 commit 7fb9175
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 8 deletions.
9 changes: 5 additions & 4 deletions composer/2022_airflow_summit/data_analytics_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# limitations under the License.


# [START composer_dataanalyticstutorial_dag]
import datetime

from airflow import models
Expand Down Expand Up @@ -46,13 +47,12 @@
f"{BQ_DESTINATION_DATASET_NAME}.{BQ_DESTINATION_TABLE_NAME}",
f"{BQ_DESTINATION_DATASET_NAME}.{BQ_NORMALIZED_TABLE_NAME}",
],

},
"environment_config": {
"execution_config": {
"service_account": "{{var.value.dataproc_service_account}}"
}
}
},
}

yesterday = datetime.datetime.combine(
Expand All @@ -66,7 +66,7 @@
# To email on failure or retry set 'email' arg to your email and enable
# emailing here.
"email_on_failure": False,
"email_on_retry": False
"email_on_retry": False,
}

with models.DAG(
Expand Down Expand Up @@ -96,7 +96,7 @@
{"name": "Holiday", "type": "STRING"},
],
skip_leading_rows=1,
write_disposition="WRITE_TRUNCATE"
write_disposition="WRITE_TRUNCATE",
)

with TaskGroup("join_bq_datasets") as bq_join_group:
Expand Down Expand Up @@ -138,3 +138,4 @@
)

load_external_dataset >> bq_join_group >> create_batch
# [END composer_dataanalyticstutorial_dag]
9 changes: 5 additions & 4 deletions composer/2022_airflow_summit/data_analytics_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.


# [START composer_dataanalyticstutorial_dataprocjob]
import sys


Expand Down Expand Up @@ -51,7 +51,8 @@
# Uses the "overwrite" SaveMode to ensure DAG doesn't fail when being re-run
# See https://spark.apache.org/docs/latest/sql-data-sources-load-save-functions.html#save-modes
# for other save mode options
df.write.format("bigquery").option("temporaryGcsBucket", temp_path).mode("overwrite").save(
WRITE_TABLE
)
df.write.format("bigquery").option("temporaryGcsBucket", temp_path).mode(
"overwrite"
).save(WRITE_TABLE)
print("Data written to BigQuery")
# [END composer_dataanalyticstutorial_dataprocjob]

0 comments on commit 7fb9175

Please sign in to comment.