From 63af0ea7ecafab6b2dafefe633fabbec03c6626c Mon Sep 17 00:00:00 2001 From: Silviu-Surcica Date: Fri, 23 Feb 2024 16:49:02 +0200 Subject: [PATCH 1/5] use table_id for partition query --- airflow/providers/google/cloud/hooks/bigquery.py | 4 +++- airflow/providers/google/cloud/triggers/bigquery.py | 4 +++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/airflow/providers/google/cloud/hooks/bigquery.py b/airflow/providers/google/cloud/hooks/bigquery.py index 5f5ab68c1f616..c65beab175fa0 100644 --- a/airflow/providers/google/cloud/hooks/bigquery.py +++ b/airflow/providers/google/cloud/hooks/bigquery.py @@ -3313,6 +3313,7 @@ async def get_job_output( async def create_job_for_partition_get( self, dataset_id: str | None, + table_id: str | None = None, project_id: str | None = None, ): """Create a new job and get the job_id using gcloud-aio.""" @@ -3322,7 +3323,8 @@ async def create_job_for_partition_get( query_request = { "query": "SELECT partition_id " - f"FROM `{project_id}.{dataset_id}.INFORMATION_SCHEMA.PARTITIONS`", + f"FROM `{project_id}.{dataset_id}.INFORMATION_SCHEMA.PARTITIONS`" + + (f" WHERE table_id={table_id}" if table_id else ""), "useLegacySql": False, } job_query_resp = await job_client.query(query_request, cast(Session, session)) diff --git a/airflow/providers/google/cloud/triggers/bigquery.py b/airflow/providers/google/cloud/triggers/bigquery.py index bc9e812d1b28c..302316e4ae581 100644 --- a/airflow/providers/google/cloud/triggers/bigquery.py +++ b/airflow/providers/google/cloud/triggers/bigquery.py @@ -681,7 +681,9 @@ async def run(self) -> AsyncIterator[TriggerEvent]: # type: ignore[override] await asyncio.sleep(self.poll_interval) else: - job_id = await hook.create_job_for_partition_get(self.dataset_id, project_id=self.project_id) + job_id = await hook.create_job_for_partition_get( + self.dataset_id, table_id=self.table_id, project_id=self.project_id + ) self.log.info("Sleeping for %s seconds.", self.poll_interval) await asyncio.sleep(self.poll_interval) From 84a7df130a5720e1c1ef5e03023c434cd4bda3da Mon Sep 17 00:00:00 2001 From: Silviu-Surcica Date: Mon, 26 Feb 2024 12:30:52 +0200 Subject: [PATCH 2/5] tests --- .../google/cloud/hooks/test_bigquery.py | 33 +++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/tests/providers/google/cloud/hooks/test_bigquery.py b/tests/providers/google/cloud/hooks/test_bigquery.py index 5ca34b276f21c..161afbb23bf2f 100644 --- a/tests/providers/google/cloud/hooks/test_bigquery.py +++ b/tests/providers/google/cloud/hooks/test_bigquery.py @@ -2181,6 +2181,39 @@ async def test_get_job_output_assert_once_with(self, mock_job_instance): resp = await hook.get_job_output(job_id=JOB_ID, project_id=PROJECT_ID) assert resp == response + @pytest.mark.asyncio + @mock.patch("airflow.providers.google.cloud.hooks.bigquery.ClientSession") + @mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryAsyncHook.get_job_instance") + async def test_create_job_for_partition_get_with_table(self, mock_job_instance, mock_session): + hook = BigQueryAsyncHook() + mock_job_client = AsyncMock(Job) + mock_job_instance.return_value = mock_job_client + expected_query_request = { + "query": "SELECT partition_id " + f"FROM `{PROJECT_ID}.{DATASET_ID}.INFORMATION_SCHEMA.PARTITIONS`" + f" WHERE table_id={TABLE_ID}", + "useLegacySql": False, + } + await hook.create_job_for_partition_get( + dataset_id=DATASET_ID, table_id=TABLE_ID, project_id=PROJECT_ID) + mock_job_client.query.assert_called_once_with(expected_query_request, mock_session) + + @pytest.mark.asyncio + @mock.patch("airflow.providers.google.cloud.hooks.bigquery.ClientSession") + @mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryAsyncHook.get_job_instance") + async def test_create_job_for_partition_get(self, mock_job_instance, mock_session): + hook = BigQueryAsyncHook() + mock_job_client = AsyncMock(Job) + mock_job_instance.return_value = mock_job_client + expected_query_request = { + "query": "SELECT partition_id " + f"FROM `{PROJECT_ID}.{DATASET_ID}.INFORMATION_SCHEMA.PARTITIONS`", + "useLegacySql": False, + } + await hook.create_job_for_partition_get( + dataset_id=DATASET_ID, project_id=PROJECT_ID) + mock_job_client.query.assert_called_once_with(expected_query_request, mock_session) + def test_interval_check_for_airflow_exception(self): """ Assert that check return AirflowException From 7b8f93708297b2d7d810a0478d12e9b1d0937292 Mon Sep 17 00:00:00 2001 From: Silviu-Surcica Date: Mon, 26 Feb 2024 16:04:00 +0200 Subject: [PATCH 3/5] fix mock --- .../providers/google/cloud/hooks/test_bigquery.py | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/tests/providers/google/cloud/hooks/test_bigquery.py b/tests/providers/google/cloud/hooks/test_bigquery.py index 161afbb23bf2f..55a2934c0e097 100644 --- a/tests/providers/google/cloud/hooks/test_bigquery.py +++ b/tests/providers/google/cloud/hooks/test_bigquery.py @@ -2188,14 +2188,16 @@ async def test_create_job_for_partition_get_with_table(self, mock_job_instance, hook = BigQueryAsyncHook() mock_job_client = AsyncMock(Job) mock_job_instance.return_value = mock_job_client + mock_session.__aenter__.return_value = AsyncMock() expected_query_request = { "query": "SELECT partition_id " - f"FROM `{PROJECT_ID}.{DATASET_ID}.INFORMATION_SCHEMA.PARTITIONS`" - f" WHERE table_id={TABLE_ID}", + f"FROM `{PROJECT_ID}.{DATASET_ID}.INFORMATION_SCHEMA.PARTITIONS`" + f" WHERE table_id={TABLE_ID}", "useLegacySql": False, } await hook.create_job_for_partition_get( - dataset_id=DATASET_ID, table_id=TABLE_ID, project_id=PROJECT_ID) + dataset_id=DATASET_ID, table_id=TABLE_ID, project_id=PROJECT_ID + ) mock_job_client.query.assert_called_once_with(expected_query_request, mock_session) @pytest.mark.asyncio @@ -2205,13 +2207,12 @@ async def test_create_job_for_partition_get(self, mock_job_instance, mock_sessio hook = BigQueryAsyncHook() mock_job_client = AsyncMock(Job) mock_job_instance.return_value = mock_job_client + mock_session.__aenter__.return_value = AsyncMock() expected_query_request = { - "query": "SELECT partition_id " - f"FROM `{PROJECT_ID}.{DATASET_ID}.INFORMATION_SCHEMA.PARTITIONS`", + "query": "SELECT partition_id " f"FROM `{PROJECT_ID}.{DATASET_ID}.INFORMATION_SCHEMA.PARTITIONS`", "useLegacySql": False, } - await hook.create_job_for_partition_get( - dataset_id=DATASET_ID, project_id=PROJECT_ID) + await hook.create_job_for_partition_get(dataset_id=DATASET_ID, project_id=PROJECT_ID) mock_job_client.query.assert_called_once_with(expected_query_request, mock_session) def test_interval_check_for_airflow_exception(self): From fc5f5798bfe60a6f9d1b25e5ef40fed13754186e Mon Sep 17 00:00:00 2001 From: Silviu-Surcica Date: Mon, 26 Feb 2024 16:22:50 +0200 Subject: [PATCH 4/5] fix mock; --- tests/providers/google/cloud/hooks/test_bigquery.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/tests/providers/google/cloud/hooks/test_bigquery.py b/tests/providers/google/cloud/hooks/test_bigquery.py index 55a2934c0e097..23bb98b87f57f 100644 --- a/tests/providers/google/cloud/hooks/test_bigquery.py +++ b/tests/providers/google/cloud/hooks/test_bigquery.py @@ -2182,13 +2182,14 @@ async def test_get_job_output_assert_once_with(self, mock_job_instance): assert resp == response @pytest.mark.asyncio - @mock.patch("airflow.providers.google.cloud.hooks.bigquery.ClientSession") + @mock.patch( + "airflow.providers.google.cloud.hooks.bigquery.ClientSession.__aenter__", return_value=AsyncMock() + ) @mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryAsyncHook.get_job_instance") async def test_create_job_for_partition_get_with_table(self, mock_job_instance, mock_session): hook = BigQueryAsyncHook() mock_job_client = AsyncMock(Job) mock_job_instance.return_value = mock_job_client - mock_session.__aenter__.return_value = AsyncMock() expected_query_request = { "query": "SELECT partition_id " f"FROM `{PROJECT_ID}.{DATASET_ID}.INFORMATION_SCHEMA.PARTITIONS`" @@ -2201,13 +2202,14 @@ async def test_create_job_for_partition_get_with_table(self, mock_job_instance, mock_job_client.query.assert_called_once_with(expected_query_request, mock_session) @pytest.mark.asyncio - @mock.patch("airflow.providers.google.cloud.hooks.bigquery.ClientSession") + @mock.patch( + "airflow.providers.google.cloud.hooks.bigquery.ClientSession.__aenter__", return_value=AsyncMock() + ) @mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryAsyncHook.get_job_instance") async def test_create_job_for_partition_get(self, mock_job_instance, mock_session): hook = BigQueryAsyncHook() mock_job_client = AsyncMock(Job) mock_job_instance.return_value = mock_job_client - mock_session.__aenter__.return_value = AsyncMock() expected_query_request = { "query": "SELECT partition_id " f"FROM `{PROJECT_ID}.{DATASET_ID}.INFORMATION_SCHEMA.PARTITIONS`", "useLegacySql": False, From 6a1ff42ea35a27c4beb6fc352a29c21599969e1a Mon Sep 17 00:00:00 2001 From: Silviu-Surcica Date: Mon, 26 Feb 2024 17:02:49 +0200 Subject: [PATCH 5/5] fix mock --- .../google/cloud/hooks/test_bigquery.py | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/tests/providers/google/cloud/hooks/test_bigquery.py b/tests/providers/google/cloud/hooks/test_bigquery.py index 23bb98b87f57f..0214b382390ff 100644 --- a/tests/providers/google/cloud/hooks/test_bigquery.py +++ b/tests/providers/google/cloud/hooks/test_bigquery.py @@ -2182,14 +2182,14 @@ async def test_get_job_output_assert_once_with(self, mock_job_instance): assert resp == response @pytest.mark.asyncio - @mock.patch( - "airflow.providers.google.cloud.hooks.bigquery.ClientSession.__aenter__", return_value=AsyncMock() - ) + @mock.patch("airflow.providers.google.cloud.hooks.bigquery.ClientSession") @mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryAsyncHook.get_job_instance") - async def test_create_job_for_partition_get_with_table(self, mock_job_instance, mock_session): + async def test_create_job_for_partition_get_with_table(self, mock_job_instance, mock_client_session): hook = BigQueryAsyncHook() mock_job_client = AsyncMock(Job) mock_job_instance.return_value = mock_job_client + mock_session = AsyncMock() + mock_client_session.return_value.__aenter__.return_value = mock_session expected_query_request = { "query": "SELECT partition_id " f"FROM `{PROJECT_ID}.{DATASET_ID}.INFORMATION_SCHEMA.PARTITIONS`" @@ -2202,16 +2202,16 @@ async def test_create_job_for_partition_get_with_table(self, mock_job_instance, mock_job_client.query.assert_called_once_with(expected_query_request, mock_session) @pytest.mark.asyncio - @mock.patch( - "airflow.providers.google.cloud.hooks.bigquery.ClientSession.__aenter__", return_value=AsyncMock() - ) + @mock.patch("airflow.providers.google.cloud.hooks.bigquery.ClientSession") @mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryAsyncHook.get_job_instance") - async def test_create_job_for_partition_get(self, mock_job_instance, mock_session): + async def test_create_job_for_partition_get(self, mock_job_instance, mock_client_session): hook = BigQueryAsyncHook() mock_job_client = AsyncMock(Job) mock_job_instance.return_value = mock_job_client + mock_session = AsyncMock() + mock_client_session.return_value.__aenter__.return_value = mock_session expected_query_request = { - "query": "SELECT partition_id " f"FROM `{PROJECT_ID}.{DATASET_ID}.INFORMATION_SCHEMA.PARTITIONS`", + "query": f"SELECT partition_id FROM `{PROJECT_ID}.{DATASET_ID}.INFORMATION_SCHEMA.PARTITIONS`", "useLegacySql": False, } await hook.create_job_for_partition_get(dataset_id=DATASET_ID, project_id=PROJECT_ID)