Skip to content

Commit

Permalink
fix: OpenLineage in FileTransferOperator for Airflow 2.8 (#39755)
Browse files Browse the repository at this point in the history
Signed-off-by: Kacper Muda <mudakacper@gmail.com>
  • Loading branch information
kacpermuda authored May 22, 2024
1 parent 39269d6 commit 6171051
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 2 deletions.
14 changes: 12 additions & 2 deletions airflow/providers/common/io/operators/file_transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,21 @@ def get_openlineage_facets_on_start(self) -> OperatorLineage:

from airflow.providers.openlineage.extractors import OperatorLineage

def _prepare_ol_dataset(path: ObjectStoragePath) -> Dataset:
if hasattr(path, "namespace"):
# namespace has been added in Airflow 2.9.0; #36410
return Dataset(namespace=path.namespace, name=path.key)
# manually recreating namespace
return Dataset(
namespace=f"{path.protocol}://{path.bucket}" if path.bucket else path.protocol,
name=path.key.lstrip(path.sep),
)

src: ObjectStoragePath = self._get_path(self.src, self.source_conn_id)
dst: ObjectStoragePath = self._get_path(self.dst, self.dst_conn_id)

input_dataset = Dataset(namespace=src.namespace, name=src.key)
output_dataset = Dataset(namespace=dst.namespace, name=dst.key)
input_dataset = _prepare_ol_dataset(src)
output_dataset = _prepare_ol_dataset(dst)

return OperatorLineage(
inputs=[input_dataset],
Expand Down
27 changes: 27 additions & 0 deletions tests/providers/common/io/operators/test_file_transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,3 +69,30 @@ def test_get_openlineage_facets_on_start():
assert len(lineage.outputs) == 1
assert lineage.inputs[0] == expected_input
assert lineage.outputs[0] == expected_output


def test_get_openlineage_facets_on_start_without_namespace():
mock_src = mock.MagicMock(key="/src_key", protocol="s3", bucket="src_bucket", sep="/")
mock_dst = mock.MagicMock(key="dst_key", protocol="gcs", bucket="", sep="/")

# Ensure the `namespace` attribute does not exist
if hasattr(mock_src, "namespace"):
delattr(mock_src, "namespace")
if hasattr(mock_dst, "namespace"):
delattr(mock_dst, "namespace")

operator = FileTransferOperator(
task_id="task",
src=mock_src,
dst=mock_dst,
source_conn_id="source_conn_id",
dest_conn_id="dest_conn_id",
)
# Make sure the _get_path method returns the mock objects
operator._get_path = mock.Mock(side_effect=[mock_src, mock_dst])

lineage = operator.get_openlineage_facets_on_start()
assert len(lineage.inputs) == 1
assert len(lineage.outputs) == 1
assert lineage.inputs[0] == Dataset(namespace="s3://src_bucket", name="src_key")
assert lineage.outputs[0] == Dataset(namespace="gcs", name="dst_key")

0 comments on commit 6171051

Please sign in to comment.