Skip to content

Commit

Permalink
Fix: Python dataflow fixes for CMEK in streaming engine, network tags…
Browse files Browse the repository at this point in the history
… and user defined experiments (#261)

* fix network tags creation for Python jobs

* use steaming engine for Python dataflow job in the regional dlp example

* add user defined additional_experiments to the Dataflow flex template module
  • Loading branch information
daniel-cit authored Jan 5, 2022
1 parent 241e621 commit 77be7ef
Show file tree
Hide file tree
Showing 4 changed files with 9 additions and 5 deletions.
1 change: 0 additions & 1 deletion examples/regional-dlp/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,6 @@ module "regional_dlp" {
kms_key_name = module.data_ingestion.cmek_data_ingestion_crypto_key
temp_location = "gs://${module.data_ingestion.data_ingestion_dataflow_bucket_name}/tmp/"
staging_location = "gs://${module.data_ingestion.data_ingestion_dataflow_bucket_name}/staging/"
enable_streaming_engine = false

parameters = {
input_topic = "projects/${var.data_ingestion_project_id}/topics/${module.data_ingestion.data_ingestion_topic_name}"
Expand Down
1 change: 1 addition & 0 deletions modules/dataflow-flex-job/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ module "dataflow-flex-job" {

| Name | Description | Type | Default | Required |
|------|-------------|------|---------|:--------:|
| additional\_experiments | Comma separated list of additional experiments to be used by the job. | `string` | `""` | no |
| container\_spec\_gcs\_path | The GCS path to the Dataflow job flex template. | `string` | n/a | yes |
| enable\_streaming\_engine | Enable/disable the use of Streaming Engine for the job. Note that Streaming Engine is enabled by default for pipelines developed against the Beam SDK for Python v2.21.0 or later when using Python 3. | `bool` | `true` | no |
| job\_language | Language of the flex template code. Options are 'JAVA' or 'PYTHON'. | `string` | `"JAVA"` | no |
Expand Down
6 changes: 2 additions & 4 deletions modules/dataflow-flex-job/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,10 @@ locals {
pipeline_options = var.job_language == "JAVA" ? local.java_pipeline_options : local.python_pipeline_options

network_tags = join(";", var.network_tags)
network_tags_experiment_java = local.network_tags != "" ? "use_network_tags=${local.network_tags},use_network_tags_for_flex_templates=${local.network_tags}" : ""
network_tags_experiment_python = local.network_tags != "" ? "use_network_tags_for_flex_templates=${local.network_tags}" : ""
network_tags_experiment = var.job_language == "JAVA" ? local.network_tags_experiment_java : local.network_tags_experiment_python
network_tags_experiment = local.network_tags != "" ? "use_network_tags=${local.network_tags},use_network_tags_for_flex_templates=${local.network_tags}" : ""
kms_on_streaming_engine_experiment = var.kms_key_name != null && var.enable_streaming_engine ? "enable_kms_on_streaming_engine" : ""
experiment_options = local.network_tags_experiment != "" || local.kms_on_streaming_engine_experiment != "" ? join(",", compact([local.kms_on_streaming_engine_experiment, local.network_tags_experiment])) : ""
experiments = local.experiment_options != "" ? { experiments = local.experiment_options } : {}
experiments = local.experiment_options != "" || var.additional_experiments != "" ? { experiments = join(",", compact([local.experiment_options, var.additional_experiments])) } : {}
}

resource "google_dataflow_flex_template_job" "dataflow_flex_template_job" {
Expand Down
6 changes: 6 additions & 0 deletions modules/dataflow-flex-job/variables.tf
Original file line number Diff line number Diff line change
Expand Up @@ -105,3 +105,9 @@ variable "kms_key_name" {
description = "The name for the Cloud KMS key for the job. Key format is: `projects/PROJECT_ID/locations/LOCATION/keyRings/KEY_RING/cryptoKeys/KEY`."
type = string
}

variable "additional_experiments" {
description = "Comma separated list of additional experiments to be used by the job."
type = string
default = ""
}

0 comments on commit 77be7ef

Please sign in to comment.