From a4d536859db791236cf56c788a90f28d8860b6b2 Mon Sep 17 00:00:00 2001 From: Collin McNulty Date: Thu, 21 Mar 2024 09:06:05 -0500 Subject: [PATCH 1/7] Allow specification of buffer length for GCS to Samba Co-Authored-By: jslepicka-apex <110119914+jslepicka-apex@users.noreply.github.com> --- airflow/providers/samba/hooks/samba.py | 20 +++++++++++++++---- .../providers/samba/transfers/gcs_to_samba.py | 12 +++++++++-- 2 files changed, 26 insertions(+), 6 deletions(-) diff --git a/airflow/providers/samba/hooks/samba.py b/airflow/providers/samba/hooks/samba.py index 535ec267ccf42..b71928f2158ce 100644 --- a/airflow/providers/samba/hooks/samba.py +++ b/airflow/providers/samba/hooks/samba.py @@ -245,10 +245,22 @@ def setxattr(self, path, attribute, value, flags=0, follow_symlinks=True): **self._conn_kwargs, ) - def push_from_local(self, destination_filepath: str, local_filepath: str): - """Push local file to samba server.""" - with open(local_filepath, "rb") as f, self.open_file(destination_filepath, mode="wb") as g: - copyfileobj(f, g) + def push_from_local(self, destination_filepath: str, local_filepath: str, buffer_len: int | None = None): + """ + Push local file to samba server. + + :param destination_filepath: the samba location to push to + :param local_filepath: the file to push + :param buffer_len: + size in bytes of the individual chunks of file to send. Larger values may + speed up large file transfers + """ + if buffer_len is not None: + with open(local_filepath, "rb") as f, self.open_file(destination_filepath, mode="wb") as g: + copyfileobj(f, g, buffer_len) + else: # Use default buffer size for OS as determined by copyfileobj + with open(local_filepath, "rb") as f, self.open_file(destination_filepath, mode="wb") as g: + copyfileobj(f, g) @classmethod def get_ui_field_behaviour(cls) -> dict[str, Any]: diff --git a/airflow/providers/samba/transfers/gcs_to_samba.py b/airflow/providers/samba/transfers/gcs_to_samba.py index fb1cb6ad98b07..3fec52356f770 100644 --- a/airflow/providers/samba/transfers/gcs_to_samba.py +++ b/airflow/providers/samba/transfers/gcs_to_samba.py @@ -93,6 +93,9 @@ class GCSToSambaOperator(BaseOperator): If set as a sequence, the identities from the list must grant Service Account Token Creator IAM role to the directly preceding identity, with first account from the list granting this role to the originating account (templated). + :param buffer_len: Optional specification of the size in bytes of the chunks sent to + Samba. Larger buffer lengths may decrease the time to upload large files. The default + length is determined by shutil.copyfileobj, which is 64 KB. """ template_fields: Sequence[str] = ( @@ -114,6 +117,7 @@ def __init__( gcp_conn_id: str = "google_cloud_default", samba_conn_id: str = "samba_default", impersonation_chain: str | Sequence[str] | None = None, + buffer_len: int | None = None, **kwargs, ) -> None: super().__init__(**kwargs) @@ -127,6 +131,7 @@ def __init__( self.samba_conn_id = samba_conn_id self.impersonation_chain = impersonation_chain self.sftp_dirs = None + self.buffer_len = buffer_len def execute(self, context: Context): gcs_hook = GCSHook( @@ -154,7 +159,9 @@ def execute(self, context: Context): for source_object in objects: destination_path = self._resolve_destination_path(source_object, prefix=prefix_dirname) - self._copy_single_object(gcs_hook, samba_hook, source_object, destination_path) + self._copy_single_object( + gcs_hook, samba_hook, source_object, destination_path, self.buffer_len + ) self.log.info("Done. Uploaded '%d' files to %s", len(objects), self.destination_path) else: @@ -176,6 +183,7 @@ def _copy_single_object( samba_hook: SambaHook, source_object: str, destination_path: str, + buffer_len: int | None = None, ) -> None: """Copy single object.""" self.log.info( @@ -194,7 +202,7 @@ def _copy_single_object( object_name=source_object, filename=tmp.name, ) - samba_hook.push_from_local(destination_path, tmp.name) + samba_hook.push_from_local(destination_path, tmp.name, buffer_len=buffer_len) if self.move_object: self.log.info("Executing delete of gs://%s/%s", self.source_bucket, source_object) From 77f4bf4d8baade3998ff2fe7a7db33031a4ff522 Mon Sep 17 00:00:00 2001 From: Collin McNulty Date: Sat, 23 Mar 2024 20:50:37 -0500 Subject: [PATCH 2/7] Pass buffer_size more cleanly --- airflow/providers/samba/hooks/samba.py | 13 +++++-------- airflow/providers/samba/transfers/gcs_to_samba.py | 12 ++++++------ 2 files changed, 11 insertions(+), 14 deletions(-) diff --git a/airflow/providers/samba/hooks/samba.py b/airflow/providers/samba/hooks/samba.py index b71928f2158ce..895c885d92205 100644 --- a/airflow/providers/samba/hooks/samba.py +++ b/airflow/providers/samba/hooks/samba.py @@ -245,22 +245,19 @@ def setxattr(self, path, attribute, value, flags=0, follow_symlinks=True): **self._conn_kwargs, ) - def push_from_local(self, destination_filepath: str, local_filepath: str, buffer_len: int | None = None): + def push_from_local(self, destination_filepath: str, local_filepath: str, buffer_size: int | None = None): """ Push local file to samba server. :param destination_filepath: the samba location to push to :param local_filepath: the file to push - :param buffer_len: + :param buffer_size: size in bytes of the individual chunks of file to send. Larger values may speed up large file transfers """ - if buffer_len is not None: - with open(local_filepath, "rb") as f, self.open_file(destination_filepath, mode="wb") as g: - copyfileobj(f, g, buffer_len) - else: # Use default buffer size for OS as determined by copyfileobj - with open(local_filepath, "rb") as f, self.open_file(destination_filepath, mode="wb") as g: - copyfileobj(f, g) + extra_args = (buffer_size,) if buffer_size else () + with open(local_filepath, "rb") as f, self.open_file(destination_filepath, mode="wb") as g: + copyfileobj(f, g, *extra_args) @classmethod def get_ui_field_behaviour(cls) -> dict[str, Any]: diff --git a/airflow/providers/samba/transfers/gcs_to_samba.py b/airflow/providers/samba/transfers/gcs_to_samba.py index 3fec52356f770..004cd42c2380d 100644 --- a/airflow/providers/samba/transfers/gcs_to_samba.py +++ b/airflow/providers/samba/transfers/gcs_to_samba.py @@ -93,7 +93,7 @@ class GCSToSambaOperator(BaseOperator): If set as a sequence, the identities from the list must grant Service Account Token Creator IAM role to the directly preceding identity, with first account from the list granting this role to the originating account (templated). - :param buffer_len: Optional specification of the size in bytes of the chunks sent to + :param buffer_size: Optional specification of the size in bytes of the chunks sent to Samba. Larger buffer lengths may decrease the time to upload large files. The default length is determined by shutil.copyfileobj, which is 64 KB. """ @@ -117,7 +117,7 @@ def __init__( gcp_conn_id: str = "google_cloud_default", samba_conn_id: str = "samba_default", impersonation_chain: str | Sequence[str] | None = None, - buffer_len: int | None = None, + buffer_size: int | None = None, **kwargs, ) -> None: super().__init__(**kwargs) @@ -131,7 +131,7 @@ def __init__( self.samba_conn_id = samba_conn_id self.impersonation_chain = impersonation_chain self.sftp_dirs = None - self.buffer_len = buffer_len + self.buffer_size = buffer_size def execute(self, context: Context): gcs_hook = GCSHook( @@ -160,7 +160,7 @@ def execute(self, context: Context): for source_object in objects: destination_path = self._resolve_destination_path(source_object, prefix=prefix_dirname) self._copy_single_object( - gcs_hook, samba_hook, source_object, destination_path, self.buffer_len + gcs_hook, samba_hook, source_object, destination_path, self.buffer_size ) self.log.info("Done. Uploaded '%d' files to %s", len(objects), self.destination_path) @@ -183,7 +183,7 @@ def _copy_single_object( samba_hook: SambaHook, source_object: str, destination_path: str, - buffer_len: int | None = None, + buffer_size: int | None = None, ) -> None: """Copy single object.""" self.log.info( @@ -202,7 +202,7 @@ def _copy_single_object( object_name=source_object, filename=tmp.name, ) - samba_hook.push_from_local(destination_path, tmp.name, buffer_len=buffer_len) + samba_hook.push_from_local(destination_path, tmp.name, buffer_size=buffer_size) if self.move_object: self.log.info("Executing delete of gs://%s/%s", self.source_bucket, source_object) From d25f490b35013e745822196f9b76ad0873b6161e Mon Sep 17 00:00:00 2001 From: Collin McNulty Date: Thu, 21 Mar 2024 09:06:05 -0500 Subject: [PATCH 3/7] Allow specification of buffer length for GCS to Samba Co-Authored-By: jslepicka-apex <110119914+jslepicka-apex@users.noreply.github.com> --- airflow/providers/samba/hooks/samba.py | 20 +++++++++++++++---- .../providers/samba/transfers/gcs_to_samba.py | 12 +++++++++-- 2 files changed, 26 insertions(+), 6 deletions(-) diff --git a/airflow/providers/samba/hooks/samba.py b/airflow/providers/samba/hooks/samba.py index 535ec267ccf42..b71928f2158ce 100644 --- a/airflow/providers/samba/hooks/samba.py +++ b/airflow/providers/samba/hooks/samba.py @@ -245,10 +245,22 @@ def setxattr(self, path, attribute, value, flags=0, follow_symlinks=True): **self._conn_kwargs, ) - def push_from_local(self, destination_filepath: str, local_filepath: str): - """Push local file to samba server.""" - with open(local_filepath, "rb") as f, self.open_file(destination_filepath, mode="wb") as g: - copyfileobj(f, g) + def push_from_local(self, destination_filepath: str, local_filepath: str, buffer_len: int | None = None): + """ + Push local file to samba server. + + :param destination_filepath: the samba location to push to + :param local_filepath: the file to push + :param buffer_len: + size in bytes of the individual chunks of file to send. Larger values may + speed up large file transfers + """ + if buffer_len is not None: + with open(local_filepath, "rb") as f, self.open_file(destination_filepath, mode="wb") as g: + copyfileobj(f, g, buffer_len) + else: # Use default buffer size for OS as determined by copyfileobj + with open(local_filepath, "rb") as f, self.open_file(destination_filepath, mode="wb") as g: + copyfileobj(f, g) @classmethod def get_ui_field_behaviour(cls) -> dict[str, Any]: diff --git a/airflow/providers/samba/transfers/gcs_to_samba.py b/airflow/providers/samba/transfers/gcs_to_samba.py index fb1cb6ad98b07..3fec52356f770 100644 --- a/airflow/providers/samba/transfers/gcs_to_samba.py +++ b/airflow/providers/samba/transfers/gcs_to_samba.py @@ -93,6 +93,9 @@ class GCSToSambaOperator(BaseOperator): If set as a sequence, the identities from the list must grant Service Account Token Creator IAM role to the directly preceding identity, with first account from the list granting this role to the originating account (templated). + :param buffer_len: Optional specification of the size in bytes of the chunks sent to + Samba. Larger buffer lengths may decrease the time to upload large files. The default + length is determined by shutil.copyfileobj, which is 64 KB. """ template_fields: Sequence[str] = ( @@ -114,6 +117,7 @@ def __init__( gcp_conn_id: str = "google_cloud_default", samba_conn_id: str = "samba_default", impersonation_chain: str | Sequence[str] | None = None, + buffer_len: int | None = None, **kwargs, ) -> None: super().__init__(**kwargs) @@ -127,6 +131,7 @@ def __init__( self.samba_conn_id = samba_conn_id self.impersonation_chain = impersonation_chain self.sftp_dirs = None + self.buffer_len = buffer_len def execute(self, context: Context): gcs_hook = GCSHook( @@ -154,7 +159,9 @@ def execute(self, context: Context): for source_object in objects: destination_path = self._resolve_destination_path(source_object, prefix=prefix_dirname) - self._copy_single_object(gcs_hook, samba_hook, source_object, destination_path) + self._copy_single_object( + gcs_hook, samba_hook, source_object, destination_path, self.buffer_len + ) self.log.info("Done. Uploaded '%d' files to %s", len(objects), self.destination_path) else: @@ -176,6 +183,7 @@ def _copy_single_object( samba_hook: SambaHook, source_object: str, destination_path: str, + buffer_len: int | None = None, ) -> None: """Copy single object.""" self.log.info( @@ -194,7 +202,7 @@ def _copy_single_object( object_name=source_object, filename=tmp.name, ) - samba_hook.push_from_local(destination_path, tmp.name) + samba_hook.push_from_local(destination_path, tmp.name, buffer_len=buffer_len) if self.move_object: self.log.info("Executing delete of gs://%s/%s", self.source_bucket, source_object) From 654f04c49e869f6bf8272f049935b4677532be26 Mon Sep 17 00:00:00 2001 From: Collin McNulty Date: Sat, 23 Mar 2024 20:50:37 -0500 Subject: [PATCH 4/7] Pass buffer_size more cleanly --- airflow/providers/samba/hooks/samba.py | 13 +++++-------- airflow/providers/samba/transfers/gcs_to_samba.py | 12 ++++++------ 2 files changed, 11 insertions(+), 14 deletions(-) diff --git a/airflow/providers/samba/hooks/samba.py b/airflow/providers/samba/hooks/samba.py index b71928f2158ce..895c885d92205 100644 --- a/airflow/providers/samba/hooks/samba.py +++ b/airflow/providers/samba/hooks/samba.py @@ -245,22 +245,19 @@ def setxattr(self, path, attribute, value, flags=0, follow_symlinks=True): **self._conn_kwargs, ) - def push_from_local(self, destination_filepath: str, local_filepath: str, buffer_len: int | None = None): + def push_from_local(self, destination_filepath: str, local_filepath: str, buffer_size: int | None = None): """ Push local file to samba server. :param destination_filepath: the samba location to push to :param local_filepath: the file to push - :param buffer_len: + :param buffer_size: size in bytes of the individual chunks of file to send. Larger values may speed up large file transfers """ - if buffer_len is not None: - with open(local_filepath, "rb") as f, self.open_file(destination_filepath, mode="wb") as g: - copyfileobj(f, g, buffer_len) - else: # Use default buffer size for OS as determined by copyfileobj - with open(local_filepath, "rb") as f, self.open_file(destination_filepath, mode="wb") as g: - copyfileobj(f, g) + extra_args = (buffer_size,) if buffer_size else () + with open(local_filepath, "rb") as f, self.open_file(destination_filepath, mode="wb") as g: + copyfileobj(f, g, *extra_args) @classmethod def get_ui_field_behaviour(cls) -> dict[str, Any]: diff --git a/airflow/providers/samba/transfers/gcs_to_samba.py b/airflow/providers/samba/transfers/gcs_to_samba.py index 3fec52356f770..004cd42c2380d 100644 --- a/airflow/providers/samba/transfers/gcs_to_samba.py +++ b/airflow/providers/samba/transfers/gcs_to_samba.py @@ -93,7 +93,7 @@ class GCSToSambaOperator(BaseOperator): If set as a sequence, the identities from the list must grant Service Account Token Creator IAM role to the directly preceding identity, with first account from the list granting this role to the originating account (templated). - :param buffer_len: Optional specification of the size in bytes of the chunks sent to + :param buffer_size: Optional specification of the size in bytes of the chunks sent to Samba. Larger buffer lengths may decrease the time to upload large files. The default length is determined by shutil.copyfileobj, which is 64 KB. """ @@ -117,7 +117,7 @@ def __init__( gcp_conn_id: str = "google_cloud_default", samba_conn_id: str = "samba_default", impersonation_chain: str | Sequence[str] | None = None, - buffer_len: int | None = None, + buffer_size: int | None = None, **kwargs, ) -> None: super().__init__(**kwargs) @@ -131,7 +131,7 @@ def __init__( self.samba_conn_id = samba_conn_id self.impersonation_chain = impersonation_chain self.sftp_dirs = None - self.buffer_len = buffer_len + self.buffer_size = buffer_size def execute(self, context: Context): gcs_hook = GCSHook( @@ -160,7 +160,7 @@ def execute(self, context: Context): for source_object in objects: destination_path = self._resolve_destination_path(source_object, prefix=prefix_dirname) self._copy_single_object( - gcs_hook, samba_hook, source_object, destination_path, self.buffer_len + gcs_hook, samba_hook, source_object, destination_path, self.buffer_size ) self.log.info("Done. Uploaded '%d' files to %s", len(objects), self.destination_path) @@ -183,7 +183,7 @@ def _copy_single_object( samba_hook: SambaHook, source_object: str, destination_path: str, - buffer_len: int | None = None, + buffer_size: int | None = None, ) -> None: """Copy single object.""" self.log.info( @@ -202,7 +202,7 @@ def _copy_single_object( object_name=source_object, filename=tmp.name, ) - samba_hook.push_from_local(destination_path, tmp.name, buffer_len=buffer_len) + samba_hook.push_from_local(destination_path, tmp.name, buffer_size=buffer_size) if self.move_object: self.log.info("Executing delete of gs://%s/%s", self.source_bucket, source_object) From 56313e7500dbfc6490e9b1bf7e655f32eb241ae3 Mon Sep 17 00:00:00 2001 From: Collin McNulty Date: Sat, 23 Mar 2024 23:44:17 -0500 Subject: [PATCH 5/7] Cleanup tests to account for new parameter --- airflow/providers/samba/transfers/gcs_to_samba.py | 2 +- tests/providers/samba/transfers/test_gcs_to_samba.py | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/airflow/providers/samba/transfers/gcs_to_samba.py b/airflow/providers/samba/transfers/gcs_to_samba.py index 004cd42c2380d..2642cf4a82b4f 100644 --- a/airflow/providers/samba/transfers/gcs_to_samba.py +++ b/airflow/providers/samba/transfers/gcs_to_samba.py @@ -95,7 +95,7 @@ class GCSToSambaOperator(BaseOperator): account from the list granting this role to the originating account (templated). :param buffer_size: Optional specification of the size in bytes of the chunks sent to Samba. Larger buffer lengths may decrease the time to upload large files. The default - length is determined by shutil.copyfileobj, which is 64 KB. + length is determined by shutil, which is 64 KB. """ template_fields: Sequence[str] = ( diff --git a/tests/providers/samba/transfers/test_gcs_to_samba.py b/tests/providers/samba/transfers/test_gcs_to_samba.py index 100fde5f7dc75..0bca4d83268f8 100644 --- a/tests/providers/samba/transfers/test_gcs_to_samba.py +++ b/tests/providers/samba/transfers/test_gcs_to_samba.py @@ -70,7 +70,7 @@ def test_execute_copy_single_file( bucket_name=TEST_BUCKET, object_name=source_object, filename=mock.ANY ) samba_hook_mock.return_value.push_from_local.assert_called_with( - os.path.join(DESTINATION_SMB, target_object), mock.ANY + os.path.join(DESTINATION_SMB, target_object), mock.ANY, None ) gcs_hook_mock.return_value.delete.assert_not_called() @@ -114,7 +114,7 @@ def test_execute_move_single_file( bucket_name=TEST_BUCKET, object_name=source_object, filename=mock.ANY ) samba_hook_mock.return_value.push_from_local.assert_called_with( - os.path.join(DESTINATION_SMB, target_object), mock.ANY + os.path.join(DESTINATION_SMB, target_object), mock.ANY, None ) gcs_hook_mock.return_value.delete.assert_called_once_with(TEST_BUCKET, source_object) @@ -201,7 +201,7 @@ def test_execute_copy_with_wildcard( ) samba_hook_mock.return_value.push_from_local.assert_has_calls( [ - mock.call(os.path.join(DESTINATION_SMB, target_object), mock.ANY) + mock.call(os.path.join(DESTINATION_SMB, target_object), mock.ANY, None) for target_object in target_objects ] ) @@ -290,7 +290,7 @@ def test_execute_move_with_wildcard( ) samba_hook_mock.return_value.push_from_local.assert_has_calls( [ - mock.call(os.path.join(DESTINATION_SMB, target_object), mock.ANY) + mock.call(os.path.join(DESTINATION_SMB, target_object), mock.ANY, None) for target_object in target_objects ] ) From b5690e3afe2e890b837a6d3a8f19396a08b84978 Mon Sep 17 00:00:00 2001 From: Collin McNulty Date: Sun, 24 Mar 2024 00:19:35 -0500 Subject: [PATCH 6/7] Update test_gcs_to_samba.py --- tests/providers/samba/transfers/test_gcs_to_samba.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tests/providers/samba/transfers/test_gcs_to_samba.py b/tests/providers/samba/transfers/test_gcs_to_samba.py index 0bca4d83268f8..a646eb896c84f 100644 --- a/tests/providers/samba/transfers/test_gcs_to_samba.py +++ b/tests/providers/samba/transfers/test_gcs_to_samba.py @@ -70,7 +70,7 @@ def test_execute_copy_single_file( bucket_name=TEST_BUCKET, object_name=source_object, filename=mock.ANY ) samba_hook_mock.return_value.push_from_local.assert_called_with( - os.path.join(DESTINATION_SMB, target_object), mock.ANY, None + os.path.join(DESTINATION_SMB, target_object), mock.ANY, buffer_size=None ) gcs_hook_mock.return_value.delete.assert_not_called() @@ -114,7 +114,7 @@ def test_execute_move_single_file( bucket_name=TEST_BUCKET, object_name=source_object, filename=mock.ANY ) samba_hook_mock.return_value.push_from_local.assert_called_with( - os.path.join(DESTINATION_SMB, target_object), mock.ANY, None + os.path.join(DESTINATION_SMB, target_object), mock.ANY, buffer_size=None ) gcs_hook_mock.return_value.delete.assert_called_once_with(TEST_BUCKET, source_object) @@ -201,7 +201,7 @@ def test_execute_copy_with_wildcard( ) samba_hook_mock.return_value.push_from_local.assert_has_calls( [ - mock.call(os.path.join(DESTINATION_SMB, target_object), mock.ANY, None) + mock.call(os.path.join(DESTINATION_SMB, target_object), mock.ANY, buffer_size=None) for target_object in target_objects ] ) @@ -290,7 +290,7 @@ def test_execute_move_with_wildcard( ) samba_hook_mock.return_value.push_from_local.assert_has_calls( [ - mock.call(os.path.join(DESTINATION_SMB, target_object), mock.ANY, None) + mock.call(os.path.join(DESTINATION_SMB, target_object), mock.ANY, buffer_size=None) for target_object in target_objects ] ) From d2cdadbe8de87ea4c4271cb7e0898a4b0133c519 Mon Sep 17 00:00:00 2001 From: Collin McNulty Date: Wed, 3 Apr 2024 22:21:28 -0500 Subject: [PATCH 7/7] Add test and fix non-wildcard path --- .../providers/samba/transfers/gcs_to_samba.py | 4 +- .../samba/transfers/test_gcs_to_samba.py | 45 +++++++++++++++++++ 2 files changed, 48 insertions(+), 1 deletion(-) diff --git a/airflow/providers/samba/transfers/gcs_to_samba.py b/airflow/providers/samba/transfers/gcs_to_samba.py index 2642cf4a82b4f..bddc038b736ed 100644 --- a/airflow/providers/samba/transfers/gcs_to_samba.py +++ b/airflow/providers/samba/transfers/gcs_to_samba.py @@ -166,7 +166,9 @@ def execute(self, context: Context): self.log.info("Done. Uploaded '%d' files to %s", len(objects), self.destination_path) else: destination_path = self._resolve_destination_path(self.source_object) - self._copy_single_object(gcs_hook, samba_hook, self.source_object, destination_path) + self._copy_single_object( + gcs_hook, samba_hook, self.source_object, destination_path, self.buffer_size + ) self.log.info("Done. Uploaded '%s' file to %s", self.source_object, destination_path) def _resolve_destination_path(self, source_object: str, prefix: str | None = None) -> str: diff --git a/tests/providers/samba/transfers/test_gcs_to_samba.py b/tests/providers/samba/transfers/test_gcs_to_samba.py index a646eb896c84f..f335d7842371b 100644 --- a/tests/providers/samba/transfers/test_gcs_to_samba.py +++ b/tests/providers/samba/transfers/test_gcs_to_samba.py @@ -118,6 +118,51 @@ def test_execute_move_single_file( ) gcs_hook_mock.return_value.delete.assert_called_once_with(TEST_BUCKET, source_object) + @pytest.mark.parametrize( + "source_object, target_object, keep_directory_structure", + [ + ("folder/test_object.txt", "folder/test_object.txt", True), + ("folder/subfolder/test_object.txt", "folder/subfolder/test_object.txt", True), + ("folder/test_object.txt", "test_object.txt", False), + ("folder/subfolder/test_object.txt", "test_object.txt", False), + ], + ) + @mock.patch("airflow.providers.samba.transfers.gcs_to_samba.GCSHook") + @mock.patch("airflow.providers.samba.transfers.gcs_to_samba.SambaHook") + def test_execute_adjust_buffer_size( + self, + samba_hook_mock, + gcs_hook_mock, + source_object, + target_object, + keep_directory_structure, + ): + operator = GCSToSambaOperator( + task_id=TASK_ID, + source_bucket=TEST_BUCKET, + source_object=source_object, + destination_path=DESTINATION_SMB, + keep_directory_structure=keep_directory_structure, + move_object=True, + gcp_conn_id=GCP_CONN_ID, + samba_conn_id=SAMBA_CONN_ID, + impersonation_chain=IMPERSONATION_CHAIN, + buffer_size=128000, + ) + operator.execute(None) + gcs_hook_mock.assert_called_once_with( + gcp_conn_id=GCP_CONN_ID, + impersonation_chain=IMPERSONATION_CHAIN, + ) + samba_hook_mock.assert_called_once_with(samba_conn_id=SAMBA_CONN_ID) + gcs_hook_mock.return_value.download.assert_called_with( + bucket_name=TEST_BUCKET, object_name=source_object, filename=mock.ANY + ) + samba_hook_mock.return_value.push_from_local.assert_called_with( + os.path.join(DESTINATION_SMB, target_object), mock.ANY, buffer_size=128000 + ) + gcs_hook_mock.return_value.delete.assert_called_once_with(TEST_BUCKET, source_object) + @pytest.mark.parametrize( "source_object, prefix, delimiter, gcs_files_list, target_objects, keep_directory_structure", [