Skip to content

Commit

Permalink
feat(ingestion): create fs registry and declare supported file system…
Browse files Browse the repository at this point in the history
…s, make FileSystem abstract
  • Loading branch information
oleksandrsimonchuk committed Sep 8, 2023
1 parent 49c8b9e commit da955d6
Show file tree
Hide file tree
Showing 8 changed files with 54 additions and 31 deletions.
5 changes: 5 additions & 0 deletions metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -630,6 +630,11 @@ def get_long_description():
"file = datahub.ingestion.reporting.file_reporter:FileReporter",
],
"apache_airflow_provider": ["provider_info=datahub_provider:get_provider_info"],
"datahub.ingestion.fs.plugins": [
"s3 = datahub.ingestion.source.fs.s3_fs:S3FileSystem",
"file = datahub.ingestion.source.fs.local_fs:LocalFileSystem",
"http = datahub.ingestion.source.fs.http_fs:HttpFileSystem",
],
}


Expand Down
15 changes: 11 additions & 4 deletions metadata-ingestion/src/datahub/ingestion/source/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@
)
from datahub.metadata.schema_classes import UsageAggregationClass

from datahub.ingestion.source.fs.fs_base import FileSystem, FileStatus
from datahub.ingestion.source.fs.fs_base import FileStatus, get_path_schema
from datahub.ingestion.source.fs.fs_registry import fs_registry

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -189,7 +190,9 @@ def create(cls, config_dict, ctx):

def get_filenames(self) -> Iterable[FileStatus]:
path_str = str(self.config.path)
fs = FileSystem.get(path_str)
schema = get_path_schema(path_str)
fs_class = fs_registry.get(schema)
fs = fs_class.create_fs()
for file_status in fs.list(path_str):
if file_status.is_file and file_status.path.endswith(self.config.file_extension):
yield file_status
Expand Down Expand Up @@ -234,7 +237,9 @@ def close(self):
super().close()

def _iterate_file(self, file_status: FileStatus) -> Iterable[Tuple[int, Any]]:
fs = FileSystem.get(file_status.path)
schema = get_path_schema(file_status.path)
fs_class = fs_registry.get(schema)
fs = fs_class.create_fs()
self.report.current_file_name = file_status.path
self.report.current_file_size = file_status.size
self.fp = fs.open(file_status.path)
Expand Down Expand Up @@ -266,7 +271,9 @@ def _iterate_file(self, file_status: FileStatus) -> Iterable[Tuple[int, Any]]:
self.report.reset_current_file_stats()

def iterate_mce_file(self, path: str) -> Iterator[MetadataChangeEvent]:
fs = FileSystem.get(path)
schema = get_path_schema(path)
fs_class = fs_registry.get(schema)
fs = fs_class.create_fs()
file_status = fs.file_status(path)
for i, obj in self._iterate_file(file_status):
mce: MetadataChangeEvent = MetadataChangeEvent.from_obj(obj)
Expand Down
25 changes: 18 additions & 7 deletions metadata-ingestion/src/datahub/ingestion/source/fs/fs_base.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from dataclasses import dataclass
from typing import Iterable
from urllib import parse
from abc import ABCMeta, abstractmethod


@dataclass
Expand All @@ -12,18 +14,27 @@ def __str__(self):
return f"FileStatus({self.path}, {self.size}, {self.is_file})"


class FileSystem:
class FileSystem(metaclass=ABCMeta):

@classmethod
def get(cls, path: str):
from datahub.ingestion.source.fs import fs_factory
return fs_factory.get_fs(path)
def create_fs(cls) -> "FileSystem":
raise NotImplementedError('File system implementations must implement "create_fs"')

@abstractmethod
def open(self, path: str, **kwargs):
raise NotImplementedError("open method must be implemented in a subclass")
pass

@abstractmethod
def file_status(self, path: str) -> FileStatus:
raise NotImplementedError("file_status method must be implemented in a subclass")
pass

@abstractmethod
def list(self, path: str) -> Iterable[FileStatus]:
raise NotImplementedError("list method must be implemented in a subclass")
pass


def get_path_schema(path: str):
scheme = parse.urlparse(path).scheme
if scheme == "":
scheme = "file"
return scheme
17 changes: 0 additions & 17 deletions metadata-ingestion/src/datahub/ingestion/source/fs/fs_factory.py

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from datahub.ingestion.api.registry import PluginRegistry
from datahub.ingestion.source.fs.fs_base import FileSystem

fs_registry = PluginRegistry[FileSystem]()
fs_registry.register_from_entrypoint("datahub.ingestion.fs.plugins")
4 changes: 4 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/source/fs/http_fs.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@

class HttpFileSystem(FileSystem):

@classmethod
def create_fs(cls):
return HttpFileSystem()

def open(self, path: str, **kwargs):
return smart_open.open(path, mode='rb', transport_params=kwargs)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@

class LocalFileSystem(FileSystem):

@classmethod
def create_fs(cls):
return LocalFileSystem()

def open(self, path: str, **kwargs):
return smart_open.open(path, mode='rb', transport_params=kwargs)

Expand Down
10 changes: 7 additions & 3 deletions metadata-ingestion/src/datahub/ingestion/source/fs/s3_fs.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,18 @@ class S3FileSystem(FileSystem):

_s3 = boto3.client('s3')

@classmethod
def create_fs(cls):
return S3FileSystem()

def open(self, path: str, **kwargs):
transport_params = kwargs.update({'client': self._s3})
transport_params = kwargs.update({'client': S3FileSystem._s3})
return smart_open.open(path, mode='rb', transport_params=transport_params)

def file_status(self, path: str) -> FileStatus:
s3_path = parse_s3_path(path)
try:
response = self._s3.get_object_attributes(
response = S3FileSystem._s3.get_object_attributes(
Bucket=s3_path.bucket,
Key=s3_path.key,
ObjectAttributes=['ObjectSize']
Expand All @@ -52,4 +56,4 @@ def file_status(self, path: str) -> FileStatus:

def list(self, path: str) -> Iterable[FileStatus]:
s3_path = parse_s3_path(path)
return S3ListIterator(self._s3, s3_path.bucket, s3_path.key)
return S3ListIterator(S3FileSystem._s3, s3_path.bucket, s3_path.key)

0 comments on commit da955d6

Please sign in to comment.