From 1a3d3fb5435cb90fb23a5e3099940730c16534d9 Mon Sep 17 00:00:00 2001 From: Jason <46563896+jsjasonseba@users.noreply.github.com> Date: Sun, 23 Jun 2024 12:52:08 +0700 Subject: [PATCH] Added chunk_size parameter to LocalFilesystemToGCSOperator (#40379) --- .../providers/google/cloud/transfers/local_to_gcs.py | 6 +++++- .../operators/transfer/local_to_gcs.rst | 2 +- docs/spelling_wordlist.txt | 1 + newsfragments/40379.improvement.rst | 1 + .../google/cloud/transfers/test_local_to_gcs.py | 11 ++++++++++- 5 files changed, 18 insertions(+), 3 deletions(-) create mode 100644 newsfragments/40379.improvement.rst diff --git a/airflow/providers/google/cloud/transfers/local_to_gcs.py b/airflow/providers/google/cloud/transfers/local_to_gcs.py index 3bd2855bd046c..a5e9806d2c515 100644 --- a/airflow/providers/google/cloud/transfers/local_to_gcs.py +++ b/airflow/providers/google/cloud/transfers/local_to_gcs.py @@ -32,7 +32,7 @@ class LocalFilesystemToGCSOperator(BaseOperator): """ - Uploads a file or list of files to Google Cloud Storage; optionally can compress the file for upload. + Uploads a file or list of files to Google Cloud Storage; optionally can compress the file for upload; optionally can upload the data in multiple chunks. .. seealso:: For more information on how to use this operator, take a look at the guide: @@ -47,6 +47,7 @@ class LocalFilesystemToGCSOperator(BaseOperator): :param gcp_conn_id: (Optional) The connection ID used to connect to Google Cloud. :param mime_type: The mime-type string :param gzip: Allows for file to be compressed and uploaded as gzip + :param chunk_size: Blob chunk size in bytes. This must be a multiple of 262144 bytes (256 KiB) :param impersonation_chain: Optional service account to impersonate using short-term credentials, or chained list of accounts required to get the access_token of the last account in the list, which will be impersonated in the request. @@ -73,6 +74,7 @@ def __init__( gcp_conn_id="google_cloud_default", mime_type="application/octet-stream", gzip=False, + chunk_size: int | None = None, impersonation_chain: str | Sequence[str] | None = None, **kwargs, ): @@ -84,6 +86,7 @@ def __init__( self.gcp_conn_id = gcp_conn_id self.mime_type = mime_type self.gzip = gzip + self.chunk_size = chunk_size self.impersonation_chain = impersonation_chain def execute(self, context: Context): @@ -114,4 +117,5 @@ def execute(self, context: Context): mime_type=self.mime_type, filename=filepath, gzip=self.gzip, + chunk_size=self.chunk_size, ) diff --git a/docs/apache-airflow-providers-google/operators/transfer/local_to_gcs.rst b/docs/apache-airflow-providers-google/operators/transfer/local_to_gcs.rst index a341ae8ca5612..e12726712beb1 100644 --- a/docs/apache-airflow-providers-google/operators/transfer/local_to_gcs.rst +++ b/docs/apache-airflow-providers-google/operators/transfer/local_to_gcs.rst @@ -34,7 +34,7 @@ LocalFilesystemToGCSOperator :class:`~airflow.providers.google.cloud.transfers.local_to_gcs.LocalFilesystemToGCSOperator` allows you to upload data from local filesystem to GCS. -When you use this operator, you can optionally compress the data being uploaded. +When you use this operator, you can optionally compress the data being uploaded. The operator also supports uploading data in multiple chunks optionally. Below is an example of using this operator to upload a file to GCS. diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt index 80d232247a588..4a6d5475c48c3 100644 --- a/docs/spelling_wordlist.txt +++ b/docs/spelling_wordlist.txt @@ -901,6 +901,7 @@ keyring keyspace keytab kfp +KiB Kibana killMode Kinesis diff --git a/newsfragments/40379.improvement.rst b/newsfragments/40379.improvement.rst new file mode 100644 index 0000000000000..ecccde2065a1d --- /dev/null +++ b/newsfragments/40379.improvement.rst @@ -0,0 +1 @@ +``chunk_size`` parameter is added to ``LocalFilesystemToGCSOperator``, enabling file uploads in multiple chunks of a specified size. diff --git a/tests/providers/google/cloud/transfers/test_local_to_gcs.py b/tests/providers/google/cloud/transfers/test_local_to_gcs.py index 9f046b20b2b0e..d994f43a2d1b4 100644 --- a/tests/providers/google/cloud/transfers/test_local_to_gcs.py +++ b/tests/providers/google/cloud/transfers/test_local_to_gcs.py @@ -31,7 +31,12 @@ class TestFileToGcsOperator: - _config = {"bucket": "dummy", "mime_type": "application/octet-stream", "gzip": False} + _config = { + "bucket": "dummy", + "mime_type": "application/octet-stream", + "gzip": False, + "chunk_size": 262144, + } def setup_method(self): args = {"owner": "airflow", "start_date": datetime.datetime(2017, 1, 1)} @@ -61,6 +66,7 @@ def test_init(self): assert operator.bucket == self._config["bucket"] assert operator.mime_type == self._config["mime_type"] assert operator.gzip == self._config["gzip"] + assert operator.chunk_size == self._config["chunk_size"] @mock.patch("airflow.providers.google.cloud.transfers.local_to_gcs.GCSHook", autospec=True) def test_execute(self, mock_hook): @@ -79,6 +85,7 @@ def test_execute(self, mock_hook): gzip=self._config["gzip"], mime_type=self._config["mime_type"], object_name="test/test1.csv", + chunk_size=self._config["chunk_size"], ) @pytest.mark.db_test @@ -110,6 +117,7 @@ def test_execute_multiple(self, mock_hook): gzip=self._config["gzip"], mime_type=self._config["mime_type"], object_name=object_name, + chunk_size=self._config["chunk_size"], ) for filepath, object_name in files_objects ] @@ -131,6 +139,7 @@ def test_execute_wildcard(self, mock_hook): gzip=self._config["gzip"], mime_type=self._config["mime_type"], object_name=object_name, + chunk_size=self._config["chunk_size"], ) for filepath, object_name in files_objects ]