Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingest): add and use file system abstraction in file source #8415

Merged
merged 17 commits into from
Jul 1, 2024
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -733,6 +733,11 @@
"file = datahub.ingestion.reporting.file_reporter:FileReporter",
],
"datahub.custom_packages": [],
"datahub.fs.plugins": [
"s3 = datahub.ingestion.fs.s3_fs:S3FileSystem",
"file = datahub.ingestion.fs.local_fs:LocalFileSystem",
"http = datahub.ingestion.fs.http_fs:HttpFileSystem",
],
}


Expand Down
Empty file.
40 changes: 40 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/fs/fs_base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
from abc import ABCMeta, abstractmethod
from dataclasses import dataclass
from typing import Any, Iterable
from urllib import parse


@dataclass
class FileInfo:
path: str
size: int
is_file: bool

def __str__(self):
return f"FileInfo({self.path}, {self.size}, {self.is_file})"


class FileSystem(metaclass=ABCMeta):
@classmethod
def create(cls, **kwargs: Any) -> "FileSystem":
raise NotImplementedError('File system implementations must implement "create"')

@abstractmethod
def open(self, path: str, **kwargs: Any) -> Any:
pass

@abstractmethod
def file_status(self, path: str) -> FileInfo:
pass

@abstractmethod
def list(self, path: str) -> Iterable[FileInfo]:
pass


def get_path_schema(path: str) -> str:
scheme = parse.urlparse(path).scheme
if scheme == "":
# This makes the default schema "file" for local paths.
scheme = "file"
return scheme
5 changes: 5 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/fs/fs_registry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from datahub.ingestion.api.registry import PluginRegistry
from datahub.ingestion.fs.fs_base import FileSystem

fs_registry = PluginRegistry[FileSystem]()
fs_registry.register_from_entrypoint("datahub.fs.plugins")
28 changes: 28 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/fs/http_fs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
from typing import Any, Iterable

import requests
import smart_open

from datahub.ingestion.fs.fs_base import FileInfo, FileSystem


class HttpFileSystem(FileSystem):
@classmethod
def create(cls, **kwargs):
return HttpFileSystem()

def open(self, path: str, **kwargs: Any) -> Any:
return smart_open.open(path, mode="rb", transport_params=kwargs)

def file_status(self, path: str) -> FileInfo:
head = requests.head(path)
if head.ok:
return FileInfo(path, int(head.headers["Content-length"]), is_file=True)
elif head.status_code == 404:
raise Exception(f"Requested path {path} does not exists.")
else:
raise Exception(f"Cannot get file status for the requested path {path}.")

def list(self, path: str) -> Iterable[FileInfo]:
status = self.file_status(path)
return [status]
hsheth2 marked this conversation as resolved.
Show resolved Hide resolved
31 changes: 31 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/fs/local_fs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import os
import pathlib
from typing import Any, Iterable

from datahub.ingestion.fs.fs_base import FileInfo, FileSystem


class LocalFileSystem(FileSystem):
@classmethod
def create(cls, **kwargs):
return LocalFileSystem()

def open(self, path: str, **kwargs: Any) -> Any:
# Local does not support any additional kwargs
assert not kwargs
return pathlib.Path(path).open(mode="rb")
hsheth2 marked this conversation as resolved.
Show resolved Hide resolved

def list(self, path: str) -> Iterable[FileInfo]:
p = pathlib.Path(path)
if p.is_file():
return [self.file_status(path)]
elif p.is_dir():
return iter([self.file_status(str(x)) for x in p.iterdir()])
else:
raise Exception(f"Failed to process {path}")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Improve error handling by providing more specific error information.

-            raise Exception(f"Failed to process {path}")
+            raise FileNotFoundError(f"The specified path does not exist: {path}")

This change makes the error message more informative and specific, which aids in troubleshooting.

Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
else:
raise Exception(f"Failed to process {path}")
else:
raise FileNotFoundError(f"The specified path does not exist: {path}")


def file_status(self, path: str) -> FileInfo:
if os.path.isfile(path):
return FileInfo(path, os.path.getsize(path), is_file=True)
else:
return FileInfo(path, 0, is_file=False)
108 changes: 108 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/fs/s3_fs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
from collections.abc import Iterator
from dataclasses import dataclass
from typing import Any, Iterable
from urllib.parse import urlparse

import boto3
import smart_open

from datahub.ingestion.fs import s3_fs
from datahub.ingestion.fs.fs_base import FileInfo, FileSystem


def parse_s3_path(path: str) -> "S3Path":
parsed = urlparse(path)
return S3Path(parsed.netloc, parsed.path.lstrip("/"))


def assert_ok_status(s3_response):
is_ok = s3_response["ResponseMetadata"]["HTTPStatusCode"] == 200
assert (
is_ok
), f"Failed to fetch S3 object, error message: {s3_response['Error']['Message']}"


@dataclass
class S3Path:
bucket: str
key: str

def __str__(self):
return f"S3Path({self.bucket}, {self.key})"


class S3ListIterator(Iterator):

MAX_KEYS = 1000

def __init__(
self, s3_client: Any, bucket: str, prefix: str, max_keys: int = MAX_KEYS
) -> None:
self._s3 = s3_client
self._bucket = bucket
self._prefix = prefix
self._max_keys = max_keys
self._file_statuses: Iterator = iter([])
self._token = ""
self.fetch()

def __next__(self) -> FileInfo:
try:
return next(self._file_statuses)
except StopIteration:
if self._token:
self.fetch()
return next(self._file_statuses)
else:
raise StopIteration()

def fetch(self):
params = dict(Bucket=self._bucket, Prefix=self._prefix, MaxKeys=self._max_keys)
if self._token:
params.update(ContinuationToken=self._token)

response = self._s3.list_objects_v2(**params)

s3_fs.assert_ok_status(response)

self._file_statuses = iter(
[
FileInfo(f"s3://{response['Name']}/{x['Key']}", x["Size"], is_file=True)
for x in response.get("Contents", [])
]
)
self._token = response.get("NextContinuationToken")


class S3FileSystem(FileSystem):
def __init__(self, **kwargs):
self.s3 = boto3.client("s3", **kwargs)

@classmethod
def create(cls, **kwargs):
return S3FileSystem(**kwargs)

def open(self, path: str, **kwargs: Any) -> Any:
transport_params = kwargs.update({"client": self.s3})
return smart_open.open(path, mode="rb", transport_params=transport_params)

def file_status(self, path: str) -> FileInfo:
s3_path = parse_s3_path(path)
try:
response = self.s3.get_object_attributes(
Bucket=s3_path.bucket, Key=s3_path.key, ObjectAttributes=["ObjectSize"]
)
assert_ok_status(response)
return FileInfo(path, response["ObjectSize"], is_file=True)
except Exception as e:
if (
hasattr(e, "response")
and e.response["ResponseMetadata"]["HTTPStatusCode"] == 404
):
return FileInfo(path, 0, is_file=False)
else:
raise e

def list(self, path: str) -> Iterable[FileInfo]:
s3_path = parse_s3_path(path)
return S3ListIterator(self.s3, s3_path.bucket, s3_path.key)
Loading
Loading