Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingest/s3): Support reading S3 file type #11177

Merged
merged 9 commits into from
Aug 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -493,7 +493,7 @@ def validate_path_spec(cls, values: Dict) -> Dict[str, Any]:

if (
include_ext not in values["file_types"]
and include_ext != "*"
and include_ext not in ["*", ""]
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Support a path spec like a/b/*. Allows ingesting files without a file path now that that's supported for s3. Ideally would only allow this if use_s3_content_type was enabled, but that's at a different config level...

and not values["default_extension"]
and include_ext not in SUPPORTED_COMPRESSIONS
):
Expand Down
23 changes: 22 additions & 1 deletion metadata-ingestion/src/datahub/ingestion/source/s3/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,13 @@ class DataLakeSourceConfig(
None,
description="Whether or not to create tags in datahub from the s3 object",
)
use_s3_content_type: bool = Field(
default=False,
description=(
"If enabled, use S3 Object metadata to determine content type over file extension, if set."
" Warning: this requires a separate query to S3 for each object, which can be slow for large datasets."
),
)

# Whether to update the table schema when schema in files within the partitions are updated
_update_schema_on_partition_file_updates_deprecation = pydantic_field_deprecated(
Expand Down Expand Up @@ -145,13 +152,27 @@ def check_path_specs_and_infer_platform(
return path_specs

@pydantic.validator("platform", always=True)
def platform_not_empty(cls, platform: str, values: dict) -> str:
def platform_valid(cls, platform: str, values: dict) -> str:
inferred_platform = values.get(
"platform", None
) # we may have inferred it above
platform = platform or inferred_platform
if not platform:
raise ValueError("platform must not be empty")

if platform != "s3" and values.get("use_s3_bucket_tags"):
raise ValueError(
"Cannot grab s3 bucket tags when platform is not s3. Remove the flag or ingest from s3."
)
if platform != "s3" and values.get("use_s3_object_tags"):
raise ValueError(
"Cannot grab s3 object tags when platform is not s3. Remove the flag or ingest from s3."
)
if platform != "s3" and values.get("use_s3_content_type"):
raise ValueError(
"Cannot grab s3 object content type when platform is not s3. Remove the flag or ingest from s3."
)

return platform

@pydantic.root_validator(skip_on_failure=True)
Expand Down
112 changes: 66 additions & 46 deletions metadata-ingestion/src/datahub/ingestion/source/s3/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
from datahub.ingestion.source.s3.config import DataLakeSourceConfig, PathSpec
from datahub.ingestion.source.s3.report import DataLakeSourceReport
from datahub.ingestion.source.schema_inference import avro, csv_tsv, json, parquet
from datahub.ingestion.source.schema_inference.base import SchemaInferenceBase
from datahub.ingestion.source.state.stale_entity_removal_handler import (
StaleEntityRemovalHandler,
)
Expand Down Expand Up @@ -82,7 +83,6 @@
# Hack to support the .gzip extension with smart_open.
so_compression.register_compressor(".gzip", so_compression._COMPRESSOR_REGISTRY[".gz"])


# config flags to emit telemetry for
config_options_to_report = [
"platform",
Expand Down Expand Up @@ -161,6 +161,7 @@ class BrowsePath:
timestamp: datetime
size: int
partitions: List[Folder]
content_type: Optional[str] = None


@dataclasses.dataclass
Expand All @@ -175,6 +176,7 @@ class TableData:
partitions: Optional[List[Folder]] = None
max_partition: Optional[Folder] = None
min_partition: Optional[Folder] = None
content_type: Optional[str] = None


@platform_name("S3 / Local Files", id="s3")
Expand Down Expand Up @@ -378,8 +380,6 @@ def get_fields(self, table_data: TableData, path_spec: PathSpec) -> List:
# capabilities of smart_open.
file = smart_open(table_data.full_path, "rb")

fields = []

extension = pathlib.Path(table_data.full_path).suffix
from datahub.ingestion.source.data_lake_common.path_spec import (
SUPPORTED_COMPRESSIONS,
Expand All @@ -391,38 +391,24 @@ def get_fields(self, table_data: TableData, path_spec: PathSpec) -> List:
if extension == "" and path_spec.default_extension:
extension = f".{path_spec.default_extension}"

try:
if extension == ".parquet":
fields = parquet.ParquetInferrer().infer_schema(file)
elif extension == ".csv":
fields = csv_tsv.CsvInferrer(
max_rows=self.source_config.max_rows
).infer_schema(file)
elif extension == ".tsv":
fields = csv_tsv.TsvInferrer(
max_rows=self.source_config.max_rows
).infer_schema(file)
elif extension == ".jsonl":
fields = json.JsonInferrer(
max_rows=self.source_config.max_rows, format="jsonl"
).infer_schema(file)
elif extension == ".json":
fields = json.JsonInferrer().infer_schema(file)
elif extension == ".avro":
fields = avro.AvroInferrer().infer_schema(file)
else:
fields = []
inferrer = self._get_inferrer(extension, table_data.content_type)
if inferrer:
try:
fields = inferrer.infer_schema(file)
logger.debug(f"Extracted fields in schema: {fields}")
except Exception as e:
self.report.report_warning(
table_data.full_path,
f"file {table_data.full_path} has unsupported extension",
f"could not infer schema for file {table_data.full_path}: {e}",
)
file.close()
except Exception as e:
else:
self.report.report_warning(
table_data.full_path,
f"could not infer schema for file {table_data.full_path}: {e}",
f"file {table_data.full_path} has unsupported extension",
)
file.close()
logger.debug(f"Extracted fields in schema: {fields}")
file.close()

if self.source_config.sort_schema_fields:
fields = sorted(fields, key=lambda f: f.fieldPath)

Expand All @@ -433,6 +419,36 @@ def get_fields(self, table_data: TableData, path_spec: PathSpec) -> List:

return fields

def _get_inferrer(
self, extension: str, content_type: Optional[str]
) -> Optional[SchemaInferenceBase]:
if content_type == "application/vnd.apache.parquet":
return parquet.ParquetInferrer()
elif content_type == "text/csv":
return csv_tsv.CsvInferrer(max_rows=self.source_config.max_rows)
elif content_type == "text/tab-separated-values":
return csv_tsv.TsvInferrer(max_rows=self.source_config.max_rows)
elif content_type == "application/json":
return json.JsonInferrer()
elif content_type == "application/avro":
return avro.AvroInferrer()
elif extension == ".parquet":
return parquet.ParquetInferrer()
elif extension == ".csv":
return csv_tsv.CsvInferrer(max_rows=self.source_config.max_rows)
elif extension == ".tsv":
return csv_tsv.TsvInferrer(max_rows=self.source_config.max_rows)
elif extension == ".jsonl":
return json.JsonInferrer(
max_rows=self.source_config.max_rows, format="jsonl"
)
elif extension == ".json":
return json.JsonInferrer()
elif extension == ".avro":
return avro.AvroInferrer()
else:
return None

def add_partition_columns_to_schema(
self, path_spec: PathSpec, full_path: str, fields: List[SchemaField]
) -> None:
Expand Down Expand Up @@ -734,35 +750,34 @@ def extract_table_name(self, path_spec: PathSpec, named_vars: dict) -> str:
def extract_table_data(
self,
path_spec: PathSpec,
path: str,
timestamp: datetime,
size: int,
partitions: List[Folder],
browse_path: BrowsePath,
) -> TableData:
path = browse_path.file
partitions = browse_path.partitions
logger.debug(f"Getting table data for path: {path}")
table_name, table_path = path_spec.extract_table_name_and_path(path)
table_data = TableData(
return TableData(
display_name=table_name,
is_s3=self.is_s3_platform(),
full_path=path,
partitions=partitions,
max_partition=partitions[-1] if partitions else None,
min_partition=partitions[0] if partitions else None,
timestamp=timestamp,
timestamp=browse_path.timestamp,
table_path=table_path,
number_of_files=1,
size_in_bytes=(
size
if size
browse_path.size
if browse_path.size
else sum(
[
partition.size if partition.size else 0
for partition in partitions
]
)
),
content_type=browse_path.content_type,
)
return table_data

def resolve_templated_folders(self, bucket_name: str, prefix: str) -> Iterable[str]:
folder_split: List[str] = prefix.split("*", 1)
Expand Down Expand Up @@ -1001,6 +1016,7 @@ def s3_browser(self, path_spec: PathSpec, sample_size: int) -> Iterable[BrowsePa
timestamp=max_folder.modification_time,
size=max_folder.size,
partitions=partitions,
# TODO: Support content type inference for partitions
)
except Exception as e:
# This odd check if being done because boto does not have a proper exception to catch
Expand All @@ -1021,11 +1037,17 @@ def s3_browser(self, path_spec: PathSpec, sample_size: int) -> Iterable[BrowsePa
for obj in bucket.objects.filter(Prefix=prefix).page_size(PAGE_SIZE):
s3_path = self.create_s3_path(obj.bucket_name, obj.key)
logger.debug(f"Path: {s3_path}")

content_type = None
if self.source_config.use_s3_content_type:
content_type = s3.Object(obj.bucket_name, obj.key).content_type

yield BrowsePath(
file=s3_path,
timestamp=obj.last_modified,
size=obj.size,
partitions=[],
content_type=content_type,
)

def create_s3_path(self, bucket_name: str, key: str) -> str:
Expand Down Expand Up @@ -1078,15 +1100,13 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
)
table_dict: Dict[str, TableData] = {}
for browse_path in file_browser:
if not path_spec.allowed(browse_path.file):
continue
table_data = self.extract_table_data(
path_spec,
if not path_spec.allowed(
browse_path.file,
browse_path.timestamp,
browse_path.size,
browse_path.partitions,
)
ignore_ext=self.is_s3_platform()
and self.source_config.use_s3_content_type,
):
continue
table_data = self.extract_table_data(path_spec, browse_path)
if table_data.table_path not in table_dict:
table_dict[table_data.table_path] = table_data
else:
Expand Down
Loading
Loading