From e396f06041e51691ae09751c23ef57fd9a03df22 Mon Sep 17 00:00:00 2001 From: Daniel Bell <70265071+danielbe11@users.noreply.github.com> Date: Sun, 5 May 2024 08:19:50 +0200 Subject: [PATCH] Pass SSL arg to all requests in DruidOperator (#39066) * Pass SSL arg to all requests in DruidOperator * Remove unneeded test * Lint * Fix test * Fix tests * Add true test as per dirrao's comment * Use call_count == 1 --------- Co-authored-by: Daniel Bell --- airflow/providers/apache/druid/hooks/druid.py | 6 +- .../apache/druid/hooks/test_druid.py | 60 ++++++++++++++++--- 2 files changed, 56 insertions(+), 10 deletions(-) diff --git a/airflow/providers/apache/druid/hooks/druid.py b/airflow/providers/apache/druid/hooks/druid.py index da678d0153cfa..a79b494f321d0 100644 --- a/airflow/providers/apache/druid/hooks/druid.py +++ b/airflow/providers/apache/druid/hooks/druid.py @@ -144,13 +144,15 @@ def submit_indexing_job( sec = 0 while running: - req_status = requests.get(druid_task_status_url, auth=self.get_auth()) + req_status = requests.get(druid_task_status_url, auth=self.get_auth(), verify=self.get_verify()) self.log.info("Job still running for %s seconds...", sec) if self.max_ingestion_time and sec > self.max_ingestion_time: # ensure that the job gets killed if the max ingestion time is exceeded - requests.post(f"{url}/{druid_task_id}/shutdown", auth=self.get_auth()) + requests.post( + f"{url}/{druid_task_id}/shutdown", auth=self.get_auth(), verify=self.get_verify() + ) raise AirflowException(f"Druid ingestion took more than {self.max_ingestion_time} seconds") time.sleep(self.timeout) diff --git a/tests/providers/apache/druid/hooks/test_druid.py b/tests/providers/apache/druid/hooks/test_druid.py index 2b40264455723..0d42695f06f7c 100644 --- a/tests/providers/apache/druid/hooks/test_druid.py +++ b/tests/providers/apache/druid/hooks/test_druid.py @@ -99,25 +99,69 @@ def test_submit_sql_based_ingestion_ok(self, requests_mock): assert task_post.call_count == 1 assert status_check.call_count == 1 - def test_submit_with_correct_ssl_arg(self, requests_mock): + def test_submit_with_false_ssl_arg(self, requests_mock): + # Timeout so that all three requests are sent + self.db_hook.timeout = 1 + self.db_hook.max_ingestion_time = 5 self.db_hook.verify_ssl = False + task_post = requests_mock.post( "http://druid-overlord:8081/druid/indexer/v1/task", text='{"task":"9f8a7359-77d4-4612-b0cd-cc2f6a3c28de"}', ) status_check = requests_mock.get( "http://druid-overlord:8081/druid/indexer/v1/task/9f8a7359-77d4-4612-b0cd-cc2f6a3c28de/status", - text='{"status":{"status": "SUCCESS"}}', + text='{"status":{"status": "RUNNING"}}', + ) + shutdown_post = requests_mock.post( + "http://druid-overlord:8081/druid/indexer/v1/task/" + "9f8a7359-77d4-4612-b0cd-cc2f6a3c28de/shutdown", + text='{"task":"9f8a7359-77d4-4612-b0cd-cc2f6a3c28de"}', ) - self.db_hook.submit_indexing_job("Long json file") + with pytest.raises(AirflowException): + self.db_hook.submit_indexing_job("Long json file") - # PGH005: false positive on ``requests_mock`` argument `called_once` assert task_post.call_count == 1 - assert status_check.call_count == 1 - if task_post.called_once: - verify_ssl = task_post.request_history[0].verify - assert False is verify_ssl + assert False is task_post.request_history[0].verify + + assert status_check.call_count > 1 + assert False is status_check.request_history[0].verify + + assert shutdown_post.call_count == 1 + assert False is shutdown_post.request_history[0].verify + + def test_submit_with_true_ssl_arg(self, requests_mock): + # Timeout so that all three requests are sent + self.db_hook.timeout = 1 + self.db_hook.max_ingestion_time = 5 + self.db_hook.verify_ssl = True + + task_post = requests_mock.post( + "http://druid-overlord:8081/druid/indexer/v1/task", + text='{"task":"9f8a7359-77d4-4612-b0cd-cc2f6a3c28de"}', + ) + status_check = requests_mock.get( + "http://druid-overlord:8081/druid/indexer/v1/task/9f8a7359-77d4-4612-b0cd-cc2f6a3c28de/status", + text='{"status":{"status": "RUNNING"}}', + ) + shutdown_post = requests_mock.post( + "http://druid-overlord:8081/druid/indexer/v1/task/" + "9f8a7359-77d4-4612-b0cd-cc2f6a3c28de/shutdown", + text='{"task":"9f8a7359-77d4-4612-b0cd-cc2f6a3c28de"}', + ) + + with pytest.raises(AirflowException): + self.db_hook.submit_indexing_job("Long json file") + + assert task_post.call_count == 1 + assert True is task_post.request_history[0].verify + + assert status_check.call_count > 1 + assert True is status_check.request_history[0].verify + + assert shutdown_post.call_count == 1 + assert True is shutdown_post.request_history[0].verify def test_submit_correct_json_body(self, requests_mock): task_post = requests_mock.post(