Skip to content

Commit

Permalink
pass wait_for_done parameter down to _DataflowJobsController
Browse files Browse the repository at this point in the history
  • Loading branch information
dejii committed Apr 26, 2021
1 parent 0b4c67d commit f610519
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 0 deletions.
3 changes: 3 additions & 0 deletions airflow/providers/google/cloud/hooks/dataflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -728,6 +728,7 @@ def start_template_dataflow(
num_retries=self.num_retries,
drain_pipeline=self.drain_pipeline,
cancel_timeout=self.cancel_timeout,
wait_until_finished=self.wait_until_finished,
)
jobs_controller.wait_for_done()
return response["job"]
Expand Down Expand Up @@ -774,6 +775,7 @@ def start_flex_template(
poll_sleep=self.poll_sleep,
num_retries=self.num_retries,
cancel_timeout=self.cancel_timeout,
wait_until_finished=self.wait_until_finished,
)
jobs_controller.wait_for_done()

Expand Down Expand Up @@ -1030,6 +1032,7 @@ def start_sql_job(
poll_sleep=self.poll_sleep,
num_retries=self.num_retries,
drain_pipeline=self.drain_pipeline,
wait_until_finished=self.wait_until_finished,
)
jobs_controller.wait_for_done()

Expand Down
7 changes: 7 additions & 0 deletions tests/providers/google/cloud/hooks/test_dataflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -835,6 +835,7 @@ def test_start_template_dataflow(self, mock_conn, mock_controller, mock_uuid):
location=DEFAULT_DATAFLOW_LOCATION,
drain_pipeline=False,
cancel_timeout=DEFAULT_CANCEL_TIMEOUT,
wait_until_finished=None,
)
mock_controller.return_value.wait_for_done.assert_called_once()

Expand Down Expand Up @@ -873,6 +874,7 @@ def test_start_template_dataflow_with_custom_region_as_variable(
location=TEST_LOCATION,
drain_pipeline=False,
cancel_timeout=DEFAULT_CANCEL_TIMEOUT,
wait_until_finished=None,
)
mock_controller.return_value.wait_for_done.assert_called_once()

Expand Down Expand Up @@ -913,6 +915,7 @@ def test_start_template_dataflow_with_custom_region_as_parameter(
location=TEST_LOCATION,
drain_pipeline=False,
cancel_timeout=DEFAULT_CANCEL_TIMEOUT,
wait_until_finished=None,
)
mock_controller.return_value.wait_for_done.assert_called_once()

Expand Down Expand Up @@ -957,6 +960,7 @@ def test_start_template_dataflow_with_runtime_env(self, mock_conn, mock_dataflow
project_number=TEST_PROJECT,
drain_pipeline=False,
cancel_timeout=DEFAULT_CANCEL_TIMEOUT,
wait_until_finished=None,
)
mock_uuid.assert_called_once_with()

Expand Down Expand Up @@ -1005,6 +1009,7 @@ def test_start_template_dataflow_update_runtime_env(self, mock_conn, mock_datafl
project_number=TEST_PROJECT,
drain_pipeline=False,
cancel_timeout=DEFAULT_CANCEL_TIMEOUT,
wait_until_finished=None,
)
mock_uuid.assert_called_once_with()

Expand Down Expand Up @@ -1037,6 +1042,7 @@ def test_start_flex_template(self, mock_conn, mock_controller):
poll_sleep=self.dataflow_hook.poll_sleep,
num_retries=self.dataflow_hook.num_retries,
cancel_timeout=DEFAULT_CANCEL_TIMEOUT,
wait_until_finished=self.dataflow_hook.wait_until_finished,
)
mock_controller.return_value.get_jobs.wait_for_done.assrt_called_once_with()
mock_controller.return_value.get_jobs.assrt_called_once_with()
Expand Down Expand Up @@ -1110,6 +1116,7 @@ def test_start_sql_job_failed_to_run(
project_number=TEST_PROJECT,
num_retries=5,
drain_pipeline=False,
wait_until_finished=None,
)
mock_controller.return_value.wait_for_done.assert_called_once()
assert result == test_job
Expand Down

0 comments on commit f610519

Please sign in to comment.