Skip to content

Commit

Permalink
Optimize deferrable execution mode `AzureDataFactoryPipelineRunStatus…
Browse files Browse the repository at this point in the history
…Sensor` (#30983)

* Optimize deferrable execution mode in AzureDataFactoryPipelineRunStatusSensor

* Apply review suggestions
  • Loading branch information
phanikumv authored May 1, 2023
1 parent 23b5b31 commit 607068f
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 16 deletions.
23 changes: 12 additions & 11 deletions airflow/providers/microsoft/azure/sensors/data_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,17 +94,18 @@ def execute(self, context: Context) -> None:
if not self.deferrable:
super().execute(context=context)
else:
self.defer(
timeout=timedelta(seconds=self.timeout),
trigger=ADFPipelineRunStatusSensorTrigger(
run_id=self.run_id,
azure_data_factory_conn_id=self.azure_data_factory_conn_id,
resource_group_name=self.resource_group_name,
factory_name=self.factory_name,
poke_interval=self.poke_interval,
),
method_name="execute_complete",
)
if not self.poke(context=context):
self.defer(
timeout=timedelta(seconds=self.timeout),
trigger=ADFPipelineRunStatusSensorTrigger(
run_id=self.run_id,
azure_data_factory_conn_id=self.azure_data_factory_conn_id,
resource_group_name=self.resource_group_name,
factory_name=self.factory_name,
poke_interval=self.poke_interval,
),
method_name="execute_complete",
)

def execute_complete(self, context: Context, event: dict[str, str]) -> None:
"""
Expand Down
24 changes: 19 additions & 5 deletions tests/providers/microsoft/azure/sensors/test_azure_data_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,15 +84,28 @@ def test_poke(self, mock_pipeline_run, pipeline_run_status, expected_status):
with pytest.raises(AzureDataFactoryPipelineRunException, match=error_message):
self.sensor.poke({})

def test_adf_pipeline_status_sensor_async(self):
@mock.patch("airflow.providers.microsoft.azure.sensors.data_factory.AzureDataFactoryHook")
def test_adf_pipeline_status_sensor_async(self, mock_hook):
"""Assert execute method defer for Azure Data factory pipeline run status sensor"""

mock_hook.return_value.get_pipeline_run_status.return_value = AzureDataFactoryPipelineRunStatus.QUEUED
with pytest.raises(TaskDeferred) as exc:
self.defered_sensor.execute({})
self.defered_sensor.execute(mock.MagicMock())
assert isinstance(
exc.value.trigger, ADFPipelineRunStatusSensorTrigger
), "Trigger is not a ADFPipelineRunStatusSensorTrigger"

@mock.patch("airflow.providers.microsoft.azure.sensors.data_factory.AzureDataFactoryHook")
@mock.patch(
"airflow.providers.microsoft.azure.sensors.data_factory"
".AzureDataFactoryPipelineRunStatusSensor.defer"
)
def test_adf_pipeline_status_sensor_finish_before_deferred(self, mock_defer, mock_hook):
mock_hook.return_value.get_pipeline_run_status.return_value = (
AzureDataFactoryPipelineRunStatus.SUCCEEDED
)
self.defered_sensor.execute(mock.MagicMock())
assert not mock_defer.called

def test_adf_pipeline_status_sensor_execute_complete_success(self):
"""Assert execute_complete log success message when trigger fire with target status"""

Expand All @@ -115,9 +128,10 @@ class TestAzureDataFactoryPipelineRunStatusAsyncSensor:
run_id=RUN_ID,
)

def test_adf_pipeline_status_sensor_async(self):
@mock.patch("airflow.providers.microsoft.azure.sensors.data_factory.AzureDataFactoryHook")
def test_adf_pipeline_status_sensor_async(self, mock_hook):
"""Assert execute method defer for Azure Data factory pipeline run status sensor"""

mock_hook.return_value.get_pipeline_run_status.return_value = AzureDataFactoryPipelineRunStatus.QUEUED
with pytest.raises(TaskDeferred) as exc:
self.SENSOR.execute({})
assert isinstance(
Expand Down

0 comments on commit 607068f

Please sign in to comment.