Skip to content

Commit

Permalink
Fix GCSToGCSOperator behavior difference for moving single object (a…
Browse files Browse the repository at this point in the history
…pache#40162)

* Merge different behavior of `GCSToGCSOperator` for single and multiple objects

* Add behavior change note to changelog
  • Loading branch information
boraberke authored Jun 21, 2024
1 parent 832099c commit 2f2796f
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 12 deletions.
11 changes: 11 additions & 0 deletions airflow/providers/google/CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,17 @@
Changelog
---------

.. note::
The ``GCSToGCSOperator`` now retains the nested folder structure when moving or copying a single
object, aligning its behavior with the behavior for multiple objects. If this change impacts your
workflows, you may need to adjust your ``source_object`` parameter to include the full path up to
the folder containing your single file and specify ``destination_object`` explicitly to ignore
nested folders. For example, if you previously used ``source_object='folder/nested_folder/'``, to
move file ``'folder/nested_folder/second_nested_folder/file'`` you should now use
``source_object='folder/nested_folder/second_nested_folder/'`` and specify
``destination_object='folder/nested_folder/'``. This would move the file to ``'folder/nested_folder/file'``
instead of the fixed behavior of moving it to ``'folder/nested_folder/second_nested_folder/file'``.

10.19.0
.......

Expand Down
18 changes: 6 additions & 12 deletions airflow/providers/google/cloud/transfers/gcs_to_gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -408,20 +408,9 @@ def _copy_source_without_wildcard(self, hook, prefix):
msg = f"{prefix} does not exist in bucket {self.source_bucket}"
self.log.warning(msg)
raise AirflowException(msg)
if len(objects) == 1 and objects[0][-1] != "/":
self._copy_file(hook=hook, source_object=objects[0])
elif len(objects):
self._copy_multiple_objects(hook=hook, source_objects=objects, prefix=prefix)

def _copy_file(self, hook, source_object):
destination_object = self.destination_object or source_object
if self.destination_object and self.destination_object[-1] == "/":
file_name = source_object.split("/")[-1]
destination_object += file_name
self._copy_single_object(
hook=hook, source_object=source_object, destination_object=destination_object
)

def _copy_multiple_objects(self, hook, source_objects, prefix):
# Check whether the prefix is a root directory for all the rest of objects.
_pref = prefix.rstrip("/")
Expand All @@ -441,7 +430,12 @@ def _copy_multiple_objects(self, hook, source_objects, prefix):
destination_object = source_obj
else:
file_name_postfix = source_obj.replace(base_path, "", 1)
destination_object = self.destination_object.rstrip("/") + "/" + file_name_postfix

destination_object = (
self.destination_object.rstrip("/")[0 : self.destination_object.rfind("/")]
+ "/"
+ file_name_postfix
)

self._copy_single_object(
hook=hook, source_object=source_obj, destination_object=destination_object
Expand Down
46 changes: 46 additions & 0 deletions tests/providers/google/cloud/transfers/test_gcs_to_gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -742,6 +742,52 @@ def test_execute_source_object_required_flag_true(self, mock_hook):
["source/foo.txt"],
["{prefix}/foo.txt"],
),
(
["source/sub1/sub2/sub3/file.txt"],
"source/",
None,
False,
["source/sub1/sub2/sub3/file.txt"],
["{prefix}/sub1/sub2/sub3/file.txt"],
),
(
["source/sub1/sub2/sub3/file.txt", "source/sub1/sub2/sub3/file2.txt"],
"source/",
None,
False,
["source/sub1/sub2/sub3/file.txt", "source/sub1/sub2/sub3/file2.txt"],
["{prefix}/sub1/sub2/sub3/file.txt", "{prefix}/sub1/sub2/sub3/file2.txt"],
),
(
[f"{DESTINATION_OBJECT_PREFIX}/sub1/sub2/sub3/file.txt"],
f"{DESTINATION_OBJECT_PREFIX}",
None,
False,
[f"{DESTINATION_OBJECT_PREFIX}/sub1/sub2/sub3/file.txt"],
["{prefix}/sub1/sub2/sub3/file.txt"],
),
(
[f"{DESTINATION_OBJECT_PREFIX}/sub1/sub2/sub3/file.txt"],
f"{DESTINATION_OBJECT_PREFIX}/",
None,
False,
[f"{DESTINATION_OBJECT_PREFIX}/sub1/sub2/sub3/file.txt"],
["{prefix}/sub1/sub2/sub3/file.txt"],
),
(
[
f"{DESTINATION_OBJECT_PREFIX}/sub1/sub2/sub3/file.txt",
f"{DESTINATION_OBJECT_PREFIX}/sub1/sub2/sub3/file2.txt",
],
f"{DESTINATION_OBJECT_PREFIX}/",
None,
False,
[
f"{DESTINATION_OBJECT_PREFIX}/sub1/sub2/sub3/file.txt",
f"{DESTINATION_OBJECT_PREFIX}/sub1/sub2/sub3/file2.txt",
],
["{prefix}/sub1/sub2/sub3/file.txt", "{prefix}/sub1/sub2/sub3/file2.txt"],
),
(
["source/foo.txt", "source/foo.txt.abc", "source/foo.txt/subfolder/file.txt"],
"source/foo.txt",
Expand Down

0 comments on commit 2f2796f

Please sign in to comment.