Skip to content

Commit

Permalink
Added chunk_size parameter to LocalFilesystemToGCSOperator (apache#40379
Browse files Browse the repository at this point in the history
)
  • Loading branch information
jsjasonseba authored and romsharon98 committed Jul 26, 2024
1 parent 90146b3 commit 1a3d3fb
Show file tree
Hide file tree
Showing 5 changed files with 18 additions and 3 deletions.
6 changes: 5 additions & 1 deletion airflow/providers/google/cloud/transfers/local_to_gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@

class LocalFilesystemToGCSOperator(BaseOperator):
"""
Uploads a file or list of files to Google Cloud Storage; optionally can compress the file for upload.
Uploads a file or list of files to Google Cloud Storage; optionally can compress the file for upload; optionally can upload the data in multiple chunks.
.. seealso::
For more information on how to use this operator, take a look at the guide:
Expand All @@ -47,6 +47,7 @@ class LocalFilesystemToGCSOperator(BaseOperator):
:param gcp_conn_id: (Optional) The connection ID used to connect to Google Cloud.
:param mime_type: The mime-type string
:param gzip: Allows for file to be compressed and uploaded as gzip
:param chunk_size: Blob chunk size in bytes. This must be a multiple of 262144 bytes (256 KiB)
:param impersonation_chain: Optional service account to impersonate using short-term
credentials, or chained list of accounts required to get the access_token
of the last account in the list, which will be impersonated in the request.
Expand All @@ -73,6 +74,7 @@ def __init__(
gcp_conn_id="google_cloud_default",
mime_type="application/octet-stream",
gzip=False,
chunk_size: int | None = None,
impersonation_chain: str | Sequence[str] | None = None,
**kwargs,
):
Expand All @@ -84,6 +86,7 @@ def __init__(
self.gcp_conn_id = gcp_conn_id
self.mime_type = mime_type
self.gzip = gzip
self.chunk_size = chunk_size
self.impersonation_chain = impersonation_chain

def execute(self, context: Context):
Expand Down Expand Up @@ -114,4 +117,5 @@ def execute(self, context: Context):
mime_type=self.mime_type,
filename=filepath,
gzip=self.gzip,
chunk_size=self.chunk_size,
)
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ LocalFilesystemToGCSOperator
:class:`~airflow.providers.google.cloud.transfers.local_to_gcs.LocalFilesystemToGCSOperator` allows you to upload
data from local filesystem to GCS.

When you use this operator, you can optionally compress the data being uploaded.
When you use this operator, you can optionally compress the data being uploaded. The operator also supports uploading data in multiple chunks optionally.

Below is an example of using this operator to upload a file to GCS.

Expand Down
1 change: 1 addition & 0 deletions docs/spelling_wordlist.txt
Original file line number Diff line number Diff line change
Expand Up @@ -901,6 +901,7 @@ keyring
keyspace
keytab
kfp
KiB
Kibana
killMode
Kinesis
Expand Down
1 change: 1 addition & 0 deletions newsfragments/40379.improvement.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
``chunk_size`` parameter is added to ``LocalFilesystemToGCSOperator``, enabling file uploads in multiple chunks of a specified size.
11 changes: 10 additions & 1 deletion tests/providers/google/cloud/transfers/test_local_to_gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,12 @@


class TestFileToGcsOperator:
_config = {"bucket": "dummy", "mime_type": "application/octet-stream", "gzip": False}
_config = {
"bucket": "dummy",
"mime_type": "application/octet-stream",
"gzip": False,
"chunk_size": 262144,
}

def setup_method(self):
args = {"owner": "airflow", "start_date": datetime.datetime(2017, 1, 1)}
Expand Down Expand Up @@ -61,6 +66,7 @@ def test_init(self):
assert operator.bucket == self._config["bucket"]
assert operator.mime_type == self._config["mime_type"]
assert operator.gzip == self._config["gzip"]
assert operator.chunk_size == self._config["chunk_size"]

@mock.patch("airflow.providers.google.cloud.transfers.local_to_gcs.GCSHook", autospec=True)
def test_execute(self, mock_hook):
Expand All @@ -79,6 +85,7 @@ def test_execute(self, mock_hook):
gzip=self._config["gzip"],
mime_type=self._config["mime_type"],
object_name="test/test1.csv",
chunk_size=self._config["chunk_size"],
)

@pytest.mark.db_test
Expand Down Expand Up @@ -110,6 +117,7 @@ def test_execute_multiple(self, mock_hook):
gzip=self._config["gzip"],
mime_type=self._config["mime_type"],
object_name=object_name,
chunk_size=self._config["chunk_size"],
)
for filepath, object_name in files_objects
]
Expand All @@ -131,6 +139,7 @@ def test_execute_wildcard(self, mock_hook):
gzip=self._config["gzip"],
mime_type=self._config["mime_type"],
object_name=object_name,
chunk_size=self._config["chunk_size"],
)
for filepath, object_name in files_objects
]
Expand Down

0 comments on commit 1a3d3fb

Please sign in to comment.