Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-6807] Implement an Azure blobstore filesystem for Python SDK #12492

Merged
merged 130 commits into from
Aug 28, 2020
Merged
Show file tree
Hide file tree
Changes from 129 commits
Commits
Show all changes
130 commits
Select commit Hold shift + click to select a range
088303f
Initial commit
AldairCoronel Jun 10, 2020
48014f0
Create client wrapper files
AldairCoronel Jun 11, 2020
08769ed
Add azure requirements
AldairCoronel Jun 18, 2020
33afe4a
Add blobstoragefilesystem file
AldairCoronel Jun 18, 2020
09a8c20
Add BlobStorageFileSystem as an official file system
AldairCoronel Jun 18, 2020
5e127f7
Add BlobStorageFileSystem class method definitions
AldairCoronel Jun 25, 2020
45713eb
Clean client wrapper class
AldairCoronel Jun 25, 2020
d4b4bee
feat: Add scheme, mkdirs and has_dirs methods
AldairCoronel Jun 25, 2020
9287649
feat: Add join method
AldairCoronel Jun 25, 2020
ea2adc3
feat: Add blobstoragefilesystem_test file
AldairCoronel Jun 25, 2020
8c210ef
feat: Add the signature of all methods in azbs file system class
AldairCoronel Jun 25, 2020
25f4a53
test: scheme and join method
AldairCoronel Jun 25, 2020
917e6c0
test: split method
AldairCoronel Jun 25, 2020
b36a5c0
feat: Add blobstorageio file to interact with Azure
AldairCoronel Jun 29, 2020
8288599
feat: Add parse_azfs_path method
AldairCoronel Jun 30, 2020
4db2c7e
feat: Add blobstorageio_test file
AldairCoronel Jun 30, 2020
f93614f
test: parse_azfs_path method
AldairCoronel Jun 30, 2020
78e5289
test: extra test cases for parse_azfs_path method
AldairCoronel Jun 30, 2020
ef86ece
feat: Add list_prefix method
AldairCoronel Jul 1, 2020
e7f0199
test: list_prefix (progress)
AldairCoronel Jul 1, 2020
aebd55e
test: list_prefix method works with local account
AldairCoronel Jul 1, 2020
542f4c5
feat: Add _list method
AldairCoronel Jul 1, 2020
fa7b441
test: test_math_multiples
AldairCoronel Jul 1, 2020
e4c609e
test: math multiples limit
AldairCoronel Jul 1, 2020
287d836
test: match multiples error
AldairCoronel Jul 1, 2020
b4e9e80
test: match multiple patterns
AldairCoronel Jul 1, 2020
f2a8c91
feat: Add open method to the io class
AldairCoronel Jul 2, 2020
df8ec9b
test: file mode
AldairCoronel Jul 4, 2020
dc87bba
test: copy method
AldairCoronel Jul 7, 2020
6d4a0be
fix: remove unnecessary code
AldairCoronel Jul 7, 2020
b9f3771
feat: Add copy single blob method
AldairCoronel Jul 7, 2020
6cac8f5
test: delete a single file
AldairCoronel Jul 8, 2020
5428b51
feat: Add delete a single blob method
AldairCoronel Jul 8, 2020
9cbe131
test: exists method
AldairCoronel Jul 8, 2020
1000991
feat: Add exists method to wrapper
AldairCoronel Jul 8, 2020
84dbcc3
feat: Add exists method to filesystem
AldairCoronel Jul 8, 2020
962fb8c
feat: Add BloStorageDownloader
AldairCoronel Jul 12, 2020
4356b56
test: file full file read
AldairCoronel Jul 13, 2020
dd25245
test: file write
AldairCoronel Jul 13, 2020
ce03578
test: delete batch
AldairCoronel Jul 16, 2020
c734950
fix: delete method
AldairCoronel Jul 16, 2020
67dea5d
test: (WIP) checksum
AldairCoronel Jul 16, 2020
f6e3c39
feat: Add BlobStorageUploader
AldairCoronel Jul 17, 2020
9c50ae2
feat: Add content_type support to BlosStorageUploader
AldairCoronel Jul 17, 2020
d990de2
feat: Add insert random file method
AldairCoronel Jul 17, 2020
24a64af
feat: Add overwrite support when uploading blobs
AldairCoronel Jul 17, 2020
abc93c0
test: Add insert_random_file functionality to list prefix
AldairCoronel Jul 17, 2020
6085fe5
test: Add insert_random_file functionality to copy
AldairCoronel Jul 17, 2020
4ca7667
test: (WIP) copy non existent files
AldairCoronel Jul 17, 2020
4ad4920
fix: delete method exception handling
AldairCoronel Jul 18, 2020
1339c2f
test: Add insert_random_file functionality to delete
AldairCoronel Jul 18, 2020
b3bcae8
fix: delete method
AldairCoronel Jul 18, 2020
3f32935
test: Add insert_random_file functionality to exists
AldairCoronel Jul 18, 2020
5a7938f
feat: Add FakeFile class
AldairCoronel Jul 18, 2020
6464ad9
feat: Add FakeFile to insert_random_file
AldairCoronel Jul 18, 2020
2e8ba9f
format: Change FakeFile name to fake_file
AldairCoronel Jul 18, 2020
01eaf24
test: Add insert_random_file functionality to full file read
AldairCoronel Jul 18, 2020
399d90c
feat: Remove unnecesary files
AldairCoronel Jul 21, 2020
551e3ff
chore: Add formatting
AldairCoronel Jul 21, 2020
79a3799
feat: Add size method
AldairCoronel Jul 21, 2020
051f4f9
fix: size method
AldairCoronel Jul 21, 2020
adfd97e
test: size
AldairCoronel Jul 21, 2020
d4df28c
feat: Add size method to filesystem
AldairCoronel Jul 21, 2020
6dea1b2
feat: Add last updated method
AldairCoronel Jul 22, 2020
d934c79
test: last updated
AldairCoronel Jul 22, 2020
8bdf861
feat: Add last updated method to filesystem
AldairCoronel Jul 22, 2020
318e49b
feat: Add checksum method
AldairCoronel Jul 22, 2020
d2eacf3
test: (WIP) checksum
AldairCoronel Jul 22, 2020
ff08ccb
test: (WIP) checksum (now is working)
AldairCoronel Jul 23, 2020
9cc9710
feat: Add checksum method to filesystem
AldairCoronel Jul 23, 2020
b4e7e70
test: checksum
AldairCoronel Jul 23, 2020
84a185e
refactor: general code clean up
AldairCoronel Jul 23, 2020
58a11a3
feat: Add rename method
AldairCoronel Jul 23, 2020
2b2ec57
test: rename
AldairCoronel Jul 23, 2020
556172b
feat: Add path open auxiliary method to filesystem
AldairCoronel Jul 25, 2020
b7fbffa
feat: Add create method to filesystem
AldairCoronel Jul 25, 2020
8556fda
test: create
AldairCoronel Jul 25, 2020
d82f8df
feat: Add open method to filesystem
AldairCoronel Jul 25, 2020
9bfdd36
test: create
AldairCoronel Jul 25, 2020
665e1ed
test: open
AldairCoronel Jul 25, 2020
6011b3c
feat: Add copy tree
AldairCoronel Jul 28, 2020
3b9d74c
test: copy tree
AldairCoronel Jul 28, 2020
c0c1788
feat: Add copy paths method
AldairCoronel Jul 28, 2020
8d221de
test: copy paths
AldairCoronel Jul 29, 2020
720c02b
feat: Add copy method to filesystem
AldairCoronel Jul 29, 2020
68b7a0b
test: copy file
AldairCoronel Jul 29, 2020
b83043c
test: copy file error
AldairCoronel Jul 29, 2020
96e39b4
test: delete paths
AldairCoronel Jul 30, 2020
cccd598
feat: Add delete paths method
AldairCoronel Jul 30, 2020
ef7ece7
feat: Add delete tree method
AldairCoronel Aug 1, 2020
c8a7a3b
test: delete tree
AldairCoronel Aug 1, 2020
0a46918
feat: Add delete files method
AldairCoronel Aug 1, 2020
aebb0ad
feat: Add delete batch helper method
AldairCoronel Aug 1, 2020
cb79ce9
fix: delete methods
AldairCoronel Aug 1, 2020
cf7ae74
test: delete files
AldairCoronel Aug 1, 2020
d074c33
test: delete files with errors
AldairCoronel Aug 1, 2020
b286ff5
refactor: general code clean up
AldairCoronel Aug 1, 2020
0bb5eb5
feat: Add get_account kwarg in parse azfs method
AldairCoronel Aug 1, 2020
4fef996
feat: Add delete method to filesystem
AldairCoronel Aug 2, 2020
adf7243
test: delete in filesystem
AldairCoronel Aug 5, 2020
1d51910
style: Add commas at the end of multi-line lists
AldairCoronel Aug 5, 2020
95cc275
test: delete error in filesystem
AldairCoronel Aug 5, 2020
066c6b0
style: Rewrite the lines that have 80+ characters in filesystem test …
AldairCoronel Aug 5, 2020
7787c36
feat: Add rename files method
AldairCoronel Aug 6, 2020
60fc5bd
test: rename files
AldairCoronel Aug 6, 2020
f8dd237
test: Add delete files functionality to copy paths
AldairCoronel Aug 6, 2020
f0caa88
test: rename directory error
AldairCoronel Aug 6, 2020
4e32f38
feat: Add rename method to filesystem
AldairCoronel Aug 6, 2020
18d3c1c
test: rename method in filesystem
AldairCoronel Aug 6, 2020
4cbfa96
feat: Remove unnecesary folder
AldairCoronel Aug 6, 2020
5e72f38
feat: Remove unnecesary spaces
AldairCoronel Aug 6, 2020
60917d9
fix: typos
AldairCoronel Aug 6, 2020
a3deab9
feat: Add azure-core requirements
AldairCoronel Aug 6, 2020
3aa52d7
feat: Remove unnecesary spaces
AldairCoronel Aug 6, 2020
8f4cf72
style: Add final dots to comments
AldairCoronel Aug 6, 2020
a36e099
feat: Remove unnecesary file
AldairCoronel Aug 7, 2020
5fd8083
feat: Remove blobstorage io test that connect to Azure
AldairCoronel Aug 7, 2020
001eda6
feat: Add azure dependency in tox.ini and BeamModulePlugin.grovy
AldairCoronel Aug 11, 2020
b2e0ac4
feat: Skip if there are no dependencies
AldairCoronel Aug 11, 2020
7018fdf
feat: Fix import errors
AldairCoronel Aug 11, 2020
fd0df02
fix: minor type errors
AldairCoronel Aug 11, 2020
4bf78b2
feat: Add pylint stuff
AldairCoronel Aug 11, 2020
8d3e8d7
fix: Remove space
AldairCoronel Aug 11, 2020
d38aa59
feat: Add new line
AldairCoronel Aug 11, 2020
408568f
fix: comparison
AldairCoronel Aug 12, 2020
8e42d85
docs: Fix little things
AldairCoronel Aug 13, 2020
6709450
feat: remove trailing whitespaces
AldairCoronel Aug 13, 2020
519fbaf
feat: Add message
AldairCoronel Aug 13, 2020
4c5ab4c
feat: minor changes in comments
AldairCoronel Aug 14, 2020
3884527
feat: Change delete_blobs to delete_blob
AldairCoronel Aug 27, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1973,7 +1973,7 @@ class BeamModulePlugin implements Plugin<Project> {
def distTarBall = "${pythonRootDir}/build/apache-beam.tar.gz"
project.exec {
executable 'sh'
args '-c', ". ${project.ext.envdir}/bin/activate && pip install --retries 10 ${distTarBall}[gcp,test,aws]"
args '-c', ". ${project.ext.envdir}/bin/activate && pip install --retries 10 ${distTarBall}[gcp,test,aws,azure]"
}
}
}
Expand Down
18 changes: 18 additions & 0 deletions sdks/python/apache_beam/io/azure/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

from __future__ import absolute_import
301 changes: 301 additions & 0 deletions sdks/python/apache_beam/io/azure/blobstoragefilesystem.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,301 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""Azure Blob Storage Implementation for accesing files on
Azure Blob Storage.
"""

from __future__ import absolute_import

from future.utils import iteritems

from apache_beam.io.azure import blobstorageio
from apache_beam.io.filesystem import BeamIOError
from apache_beam.io.filesystem import CompressedFile
from apache_beam.io.filesystem import CompressionTypes
from apache_beam.io.filesystem import FileMetadata
from apache_beam.io.filesystem import FileSystem

__all__ = ['BlobStorageFileSystem']


class BlobStorageFileSystem(FileSystem):
"""An Azure Blob Storage ``FileSystem`` implementation for accesing files on
Azure Blob Storage.
"""

CHUNK_SIZE = blobstorageio.MAX_BATCH_OPERATION_SIZE
AZURE_FILE_SYSTEM_PREFIX = 'azfs://'

@classmethod
def scheme(cls):
"""URI scheme for the FileSystem
"""
return 'azfs'

def join(self, basepath, *paths):
"""Join two or more pathname components for the filesystem

Args:
basepath: string path of the first component of the path
paths: path components to be added

Returns: full path after combining all the passed components
"""
if not basepath.startswith(BlobStorageFileSystem.AZURE_FILE_SYSTEM_PREFIX):
raise ValueError(
'Basepath %r must be an Azure Blob Storage path.' % basepath)

path = basepath
for p in paths:
path = path.rstrip('/') + '/' + p.lstrip('/')
return path

def split(self, path):
"""Splits the given path into two parts.

Splits the path into a pair (head, tail) such that tail contains the last
component of the path and head contains everything up to that.
For file-systems other than the local file-system, head should include the
prefix.

Args:
path: path as a string

Returns:
a pair of path components as strings.
"""
path = path.strip()
if not path.startswith(BlobStorageFileSystem.AZURE_FILE_SYSTEM_PREFIX):
raise ValueError('Path %r must be Azure Blob Storage path.' % path)

prefix_len = len(BlobStorageFileSystem.AZURE_FILE_SYSTEM_PREFIX)
last_sep = path[prefix_len:].rfind('/')
if last_sep >= 0:
last_sep += prefix_len

if last_sep > 0:
return (path[:last_sep], path[last_sep + 1:])
elif last_sep < 0:
return (path, '')
else:
raise ValueError('Invalid path: %s' % path)

def mkdirs(self, path):
"""Recursively create directories for the provided path.

Args:
path: string path of the directory structure that should be created

Raises:
IOError: if leaf directory already exists.
"""
pass

def has_dirs(self):
"""Whether this FileSystem supports directories."""
return False

def _list(self, dir_or_prefix):
"""List files in a location.
Listing is non-recursive (for filesystems that support directories).
Args:
dir_or_prefix: (string) A directory or location prefix (for filesystems
that don't have directories).
Returns:
Generator of ``FileMetadata`` objects.
Raises:
``BeamIOError``: if listing fails, but not if no files were found.
"""
try:
for path, size in \
iteritems(blobstorageio.BlobStorageIO().list_prefix(dir_or_prefix)):
yield FileMetadata(path, size)
except Exception as e: # pylint: disable=broad-except
raise BeamIOError("List operation failed", {dir_or_prefix: e})

def _path_open(
self,
path,
mode,
mime_type='application/octet-stream',
compression_type=CompressionTypes.AUTO):
"""Helper functions to open a file in the provided mode.
"""
compression_type = FileSystem._get_compression_type(path, compression_type)
mime_type = CompressionTypes.mime_type(compression_type, mime_type)
raw_file = blobstorageio.BlobStorageIO().open(
path, mode, mime_type=mime_type)
if compression_type == CompressionTypes.UNCOMPRESSED:
return raw_file
return CompressedFile(raw_file, compression_type=compression_type)

def create(
self,
path,
mime_type='application/octet-stream',
compression_type=CompressionTypes.AUTO):
# type: (...) -> BinaryIO

"""Returns a write channel for the given file path.

Args:
path: string path of the file object to be written to the system
mime_type: MIME type to specify the type of content in the file object
compression_type: Type of compression to be used for this object

Returns: file handle with a close function for the user to use
"""
return self._path_open(path, 'wb', mime_type, compression_type)

def open(
self,
path,
mime_type='application/octet-stream',
compression_type=CompressionTypes.AUTO):
# type: (...) -> BinaryIO

"""Returns a read channel for the given file path.

Args:
path: string path of the file object to be read
mime_type: MIME type to specify the type of content in the file object
compression_type: Type of compression to be used for this object

Returns: file handle with a close function for the user to use
"""
return self._path_open(path, 'rb', mime_type, compression_type)

def copy(self, source_file_names, destination_file_names):
"""Recursively copy the file tree from the source to the destination

Args:
source_file_names: list of source file objects that needs to be copied
destination_file_names: list of destination of the new object

Raises:
``BeamIOError``: if any of the copy operations fail
"""
if not len(source_file_names) == len(destination_file_names):
message = 'Unable to copy unequal number of sources and destinations.'
raise BeamIOError(message)
src_dest_pairs = list(zip(source_file_names, destination_file_names))
return blobstorageio.BlobStorageIO().copy_paths(src_dest_pairs)

def rename(self, source_file_names, destination_file_names):
"""Rename the files at the source list to the destination list.
Source and destination lists should be of the same size.

Args:
source_file_names: List of file paths that need to be moved
destination_file_names: List of destination_file_names for the files

Raises:
``BeamIOError``: if any of the rename operations fail
"""
if not len(source_file_names) == len(destination_file_names):
message = 'Unable to rename unequal number of sources and destinations.'
raise BeamIOError(message)
src_dest_pairs = list(zip(source_file_names, destination_file_names))
results = blobstorageio.BlobStorageIO().rename_files(src_dest_pairs)
# Retrieve exceptions.
exceptions = {(src, dest): error
for (src, dest, error) in results if error is not None}
if exceptions:
raise BeamIOError("Rename operation failed.", exceptions)

def exists(self, path):
"""Check if the provided path exists on the FileSystem.

Args:
path: string path that needs to be checked.

Returns: boolean flag indicating if path exists
"""
try:
return blobstorageio.BlobStorageIO().exists(path)
except Exception as e: # pylint: disable=broad-except
raise BeamIOError("Exists operation failed", {path: e})

def size(self, path):
"""Get size in bytes of a file on the FileSystem.

Args:
path: string filepath of file.

Returns: int size of file according to the FileSystem.

Raises:
``BeamIOError``: if path doesn't exist.
"""
try:
return blobstorageio.BlobStorageIO().size(path)
except Exception as e: # pylint: disable=broad-except
raise BeamIOError("Size operation failed", {path: e})

def last_updated(self, path):
"""Get UNIX Epoch time in seconds on the FileSystem.

Args:
path: string path of file.

Returns: float UNIX Epoch time

Raises:
``BeamIOError``: if path doesn't exist.
"""
try:
return blobstorageio.BlobStorageIO().last_updated(path)
except Exception as e: # pylint: disable=broad-except
raise BeamIOError("Last updated operation failed", {path: e})

def checksum(self, path):
"""Fetch checksum metadata of a file on the
:class:`~apache_beam.io.filesystem.FileSystem`.

Args:
path: string path of a file.

Returns: string containing checksum

Raises:
``BeamIOError``: if path isn't a file or doesn't exist.
"""
try:
return blobstorageio.BlobStorageIO().checksum(path)
except Exception as e: # pylint: disable=broad-except
raise BeamIOError("Checksum operation failed", {path, e})

def delete(self, paths):
"""Deletes files or directories at the provided paths.
Directories will be deleted recursively.

Args:
paths: list of paths that give the file objects to be deleted

Raises:
``BeamIOError``: if any of the delete operations fail
"""
results = blobstorageio.BlobStorageIO().delete_paths(paths)
# Retrieve exceptions.
exceptions = {
path: error
for (path, error) in results.items() if error != 202
}

if exceptions:
raise BeamIOError("Delete operation failed", exceptions)
Loading