From e444eaed200d1270424784356b246b6e6e613939 Mon Sep 17 00:00:00 2001 From: Andrew Sikowitz Date: Wed, 14 Aug 2024 11:57:27 -0700 Subject: [PATCH 1/8] feat(ingest/s3): Support reading S3 file type --- .../source/data_lake_common/path_spec.py | 2 +- .../src/datahub/ingestion/source/s3/config.py | 23 +- .../src/datahub/ingestion/source/s3/source.py | 93 ++- ...mces_file_inference_without_extension.json | 769 ++++++++++++++++++ .../s3/file_inference_without_extension.json | 19 + .../tests/integration/s3/test_s3.py | 8 +- 6 files changed, 876 insertions(+), 38 deletions(-) create mode 100644 metadata-ingestion/tests/integration/s3/golden-files/s3/golden_mces_file_inference_without_extension.json create mode 100644 metadata-ingestion/tests/integration/s3/sources/s3/file_inference_without_extension.json diff --git a/metadata-ingestion/src/datahub/ingestion/source/data_lake_common/path_spec.py b/metadata-ingestion/src/datahub/ingestion/source/data_lake_common/path_spec.py index 4c2b0c276b9e7e..bc19940afdd1e3 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/data_lake_common/path_spec.py +++ b/metadata-ingestion/src/datahub/ingestion/source/data_lake_common/path_spec.py @@ -493,7 +493,7 @@ def validate_path_spec(cls, values: Dict) -> Dict[str, Any]: if ( include_ext not in values["file_types"] - and include_ext != "*" + and include_ext not in ["*", ""] and not values["default_extension"] and include_ext not in SUPPORTED_COMPRESSIONS ): diff --git a/metadata-ingestion/src/datahub/ingestion/source/s3/config.py b/metadata-ingestion/src/datahub/ingestion/source/s3/config.py index 8f1b79251c466f..dd6d6c7eba2535 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/s3/config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/s3/config.py @@ -47,6 +47,13 @@ class DataLakeSourceConfig( None, description="Whether or not to create tags in datahub from the s3 object", ) + use_s3_content_type: bool = Field( + default=False, + description=( + "If enabled, use S3 Object metadata to determine content type over file extension, if set.", + " Warning: this requires a separate query to S3 for each object, which can be slow for large datasets.", + ), + ) # Whether to update the table schema when schema in files within the partitions are updated _update_schema_on_partition_file_updates_deprecation = pydantic_field_deprecated( @@ -145,13 +152,27 @@ def check_path_specs_and_infer_platform( return path_specs @pydantic.validator("platform", always=True) - def platform_not_empty(cls, platform: str, values: dict) -> str: + def platform_valid(cls, platform: str, values: dict) -> str: inferred_platform = values.get( "platform", None ) # we may have inferred it above platform = platform or inferred_platform if not platform: raise ValueError("platform must not be empty") + + if platform != "s3" and values.get("use_s3_bucket_tags"): + raise ValueError( + "Cannot grab s3 bucket tags when platform is not s3. Remove the flag or ingest from s3." + ) + if platform != "s3" and values.get("use_s3_object_tags"): + raise ValueError( + "Cannot grab s3 object tags when platform is not s3. Remove the flag or ingest from s3." + ) + if platform != "s3" and values.get("use_s3_content_type"): + raise ValueError( + "Cannot grab s3 object content type when platform is not s3. Remove the flag or ingest from s3." + ) + return platform @pydantic.root_validator(skip_on_failure=True) diff --git a/metadata-ingestion/src/datahub/ingestion/source/s3/source.py b/metadata-ingestion/src/datahub/ingestion/source/s3/source.py index 55e25ebe88d125..8a06ff36093dc8 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/s3/source.py +++ b/metadata-ingestion/src/datahub/ingestion/source/s3/source.py @@ -47,6 +47,7 @@ from datahub.ingestion.source.s3.config import DataLakeSourceConfig, PathSpec from datahub.ingestion.source.s3.report import DataLakeSourceReport from datahub.ingestion.source.schema_inference import avro, csv_tsv, json, parquet +from datahub.ingestion.source.schema_inference.base import SchemaInferenceBase from datahub.ingestion.source.state.stale_entity_removal_handler import ( StaleEntityRemovalHandler, ) @@ -82,7 +83,6 @@ # Hack to support the .gzip extension with smart_open. so_compression.register_compressor(".gzip", so_compression._COMPRESSOR_REGISTRY[".gz"]) - # config flags to emit telemetry for config_options_to_report = [ "platform", @@ -161,6 +161,7 @@ class BrowsePath: timestamp: datetime size: int partitions: List[Folder] + content_type: Optional[str] = None @dataclasses.dataclass @@ -175,6 +176,7 @@ class TableData: partitions: Optional[List[Folder]] = None max_partition: Optional[Folder] = None min_partition: Optional[Folder] = None + content_type: Optional[str] = None @platform_name("S3 / Local Files", id="s3") @@ -378,8 +380,6 @@ def get_fields(self, table_data: TableData, path_spec: PathSpec) -> List: # capabilities of smart_open. file = smart_open(table_data.full_path, "rb") - fields = [] - extension = pathlib.Path(table_data.full_path).suffix from datahub.ingestion.source.data_lake_common.path_spec import ( SUPPORTED_COMPRESSIONS, @@ -391,38 +391,49 @@ def get_fields(self, table_data: TableData, path_spec: PathSpec) -> List: if extension == "" and path_spec.default_extension: extension = f".{path_spec.default_extension}" - try: - if extension == ".parquet": - fields = parquet.ParquetInferrer().infer_schema(file) - elif extension == ".csv": - fields = csv_tsv.CsvInferrer( - max_rows=self.source_config.max_rows - ).infer_schema(file) - elif extension == ".tsv": - fields = csv_tsv.TsvInferrer( - max_rows=self.source_config.max_rows - ).infer_schema(file) - elif extension == ".jsonl": - fields = json.JsonInferrer( - max_rows=self.source_config.max_rows, format="jsonl" - ).infer_schema(file) - elif extension == ".json": - fields = json.JsonInferrer().infer_schema(file) - elif extension == ".avro": - fields = avro.AvroInferrer().infer_schema(file) - else: - self.report.report_warning( - table_data.full_path, - f"file {table_data.full_path} has unsupported extension", - ) - file.close() - except Exception as e: + inferrer: Optional[SchemaInferenceBase] = None + if table_data.content_type == "application/vnd.apache.parquet": + inferrer = parquet.ParquetInferrer() + elif table_data.content_type == "text/csv": + inferrer = csv_tsv.CsvInferrer(max_rows=self.source_config.max_rows) + elif table_data.content_type == "text/tab-separated-values": + inferrer = csv_tsv.TsvInferrer(max_rows=self.source_config.max_rows) + elif table_data.content_type == "application/json": + inferrer = json.JsonInferrer() + elif table_data.content_type == "application/avro": + inferrer = avro.AvroInferrer() + elif extension == ".parquet": + inferrer = parquet.ParquetInferrer() + elif extension == ".csv": + inferrer = csv_tsv.CsvInferrer(max_rows=self.source_config.max_rows) + elif extension == ".tsv": + inferrer = csv_tsv.TsvInferrer(max_rows=self.source_config.max_rows) + elif extension == ".jsonl": + inferrer = json.JsonInferrer( + max_rows=self.source_config.max_rows, format="jsonl" + ) + elif extension == ".json": + inferrer = json.JsonInferrer() + elif extension == ".avro": + inferrer = avro.AvroInferrer() + else: self.report.report_warning( table_data.full_path, - f"could not infer schema for file {table_data.full_path}: {e}", + f"file {table_data.full_path} has unsupported extension", ) - file.close() - logger.debug(f"Extracted fields in schema: {fields}") + + fields = [] + if inferrer: + try: + fields = inferrer.infer_schema(file) + logger.debug(f"Extracted fields in schema: {fields}") + except Exception as e: + self.report.report_warning( + table_data.full_path, + f"could not infer schema for file {table_data.full_path}: {e}", + ) + file.close() + if self.source_config.sort_schema_fields: fields = sorted(fields, key=lambda f: f.fieldPath) @@ -738,10 +749,11 @@ def extract_table_data( timestamp: datetime, size: int, partitions: List[Folder], + content_type: Optional[str], ) -> TableData: logger.debug(f"Getting table data for path: {path}") table_name, table_path = path_spec.extract_table_name_and_path(path) - table_data = TableData( + return TableData( display_name=table_name, is_s3=self.is_s3_platform(), full_path=path, @@ -761,8 +773,8 @@ def extract_table_data( ] ) ), + content_type=content_type, ) - return table_data def resolve_templated_folders(self, bucket_name: str, prefix: str) -> Iterable[str]: folder_split: List[str] = prefix.split("*", 1) @@ -1021,11 +1033,17 @@ def s3_browser(self, path_spec: PathSpec, sample_size: int) -> Iterable[BrowsePa for obj in bucket.objects.filter(Prefix=prefix).page_size(PAGE_SIZE): s3_path = self.create_s3_path(obj.bucket_name, obj.key) logger.debug(f"Path: {s3_path}") + + content_type = None + if self.source_config.use_s3_content_type: + content_type = s3.Object(obj.bucket_name, obj.key).content_type + yield BrowsePath( file=s3_path, timestamp=obj.last_modified, size=obj.size, partitions=[], + content_type=content_type, ) def create_s3_path(self, bucket_name: str, key: str) -> str: @@ -1078,7 +1096,11 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: ) table_dict: Dict[str, TableData] = {} for browse_path in file_browser: - if not path_spec.allowed(browse_path.file): + if not path_spec.allowed( + browse_path.file, + ignore_ext=self.is_s3_platform() + and self.source_config.use_s3_content_type, + ): continue table_data = self.extract_table_data( path_spec, @@ -1086,6 +1108,7 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: browse_path.timestamp, browse_path.size, browse_path.partitions, + browse_path.content_type, ) if table_data.table_path not in table_dict: table_dict[table_data.table_path] = table_data diff --git a/metadata-ingestion/tests/integration/s3/golden-files/s3/golden_mces_file_inference_without_extension.json b/metadata-ingestion/tests/integration/s3/golden-files/s3/golden_mces_file_inference_without_extension.json new file mode 100644 index 00000000000000..d50f00efacaa06 --- /dev/null +++ b/metadata-ingestion/tests/integration/s3/golden-files/s3/golden_mces_file_inference_without_extension.json @@ -0,0 +1,769 @@ +[ +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:s3,test-platform-instance.my-test-bucket/folder_a/folder_aa/folder_aaa/no_extension/small,DEV)", + "changeType": "UPSERT", + "aspectName": "dataPlatformInstance", + "aspect": { + "json": { + "platform": "urn:li:dataPlatform:s3", + "instance": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:s3,test-platform-instance)" + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "file_without_extension.json" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:s3,test-platform-instance.my-test-bucket/folder_a/folder_aa/folder_aaa/no_extension/small,DEV)", + "changeType": "UPSERT", + "aspectName": "datasetProperties", + "aspect": { + "json": { + "customProperties": { + "schema_inferred_from": "s3://my-test-bucket/folder_a/folder_aa/folder_aaa/no_extension/small", + "number_of_files": "1", + "size_in_bytes": "172" + }, + "name": "small", + "description": "", + "tags": [] + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "file_without_extension.json" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:s3,test-platform-instance.my-test-bucket/folder_a/folder_aa/folder_aaa/no_extension/small,DEV)", + "changeType": "UPSERT", + "aspectName": "schemaMetadata", + "aspect": { + "json": { + "schemaName": "small", + "platform": "urn:li:dataPlatform:s3", + "version": 0, + "created": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "lastModified": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "hash": "", + "platformSchema": { + "com.linkedin.schema.OtherSchema": { + "rawSchema": "" + } + }, + "fields": [ + { + "fieldPath": "1st chord", + "nullable": false, + "type": { + "type": { + "com.linkedin.schema.NumberType": {} + } + }, + "nativeDataType": "integer", + "recursive": false, + "isPartOfKey": false + }, + { + "fieldPath": "2nd chord", + "nullable": false, + "type": { + "type": { + "com.linkedin.schema.NumberType": {} + } + }, + "nativeDataType": "integer", + "recursive": false, + "isPartOfKey": false + }, + { + "fieldPath": "3rd chord", + "nullable": false, + "type": { + "type": { + "com.linkedin.schema.NumberType": {} + } + }, + "nativeDataType": "integer", + "recursive": false, + "isPartOfKey": false + }, + { + "fieldPath": "4th chord", + "nullable": false, + "type": { + "type": { + "com.linkedin.schema.NumberType": {} + } + }, + "nativeDataType": "integer", + "recursive": false, + "isPartOfKey": false + }, + { + "fieldPath": "Progression Quality", + "nullable": false, + "type": { + "type": { + "com.linkedin.schema.StringType": {} + } + }, + "nativeDataType": "string", + "recursive": false, + "isPartOfKey": false + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "file_without_extension.json" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:s3,test-platform-instance.my-test-bucket/folder_a/folder_aa/folder_aaa/no_extension/small,DEV)", + "changeType": "UPSERT", + "aspectName": "operation", + "aspect": { + "json": { + "timestampMillis": 1615443388097, + "partitionSpec": { + "type": "FULL_TABLE", + "partition": "FULL_TABLE_SNAPSHOT" + }, + "operationType": "UPDATE", + "lastUpdatedTimestamp": 1586848010000 + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "file_without_extension.json" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:647eefb4dfda8695baf1aa0775d78689", + "changeType": "UPSERT", + "aspectName": "containerProperties", + "aspect": { + "json": { + "customProperties": { + "platform": "s3", + "instance": "test-platform-instance", + "env": "DEV", + "bucket_name": "my-test-bucket" + }, + "name": "my-test-bucket" + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "file_without_extension.json" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:647eefb4dfda8695baf1aa0775d78689", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "file_without_extension.json" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:647eefb4dfda8695baf1aa0775d78689", + "changeType": "UPSERT", + "aspectName": "dataPlatformInstance", + "aspect": { + "json": { + "platform": "urn:li:dataPlatform:s3", + "instance": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:s3,test-platform-instance)" + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "file_without_extension.json" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:647eefb4dfda8695baf1aa0775d78689", + "changeType": "UPSERT", + "aspectName": "subTypes", + "aspect": { + "json": { + "typeNames": [ + "S3 bucket" + ] + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "file_without_extension.json" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:647eefb4dfda8695baf1aa0775d78689", + "changeType": "UPSERT", + "aspectName": "browsePathsV2", + "aspect": { + "json": { + "path": [ + { + "id": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:s3,test-platform-instance)", + "urn": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:s3,test-platform-instance)" + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "file_without_extension.json" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:c8d940d2010edd365619411b385b11e4", + "changeType": "UPSERT", + "aspectName": "containerProperties", + "aspect": { + "json": { + "customProperties": { + "platform": "s3", + "instance": "test-platform-instance", + "env": "DEV", + "folder_abs_path": "my-test-bucket/folder_a" + }, + "name": "folder_a" + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "file_without_extension.json" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:c8d940d2010edd365619411b385b11e4", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "file_without_extension.json" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:c8d940d2010edd365619411b385b11e4", + "changeType": "UPSERT", + "aspectName": "dataPlatformInstance", + "aspect": { + "json": { + "platform": "urn:li:dataPlatform:s3", + "instance": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:s3,test-platform-instance)" + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "file_without_extension.json" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:c8d940d2010edd365619411b385b11e4", + "changeType": "UPSERT", + "aspectName": "subTypes", + "aspect": { + "json": { + "typeNames": [ + "Folder" + ] + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "file_without_extension.json" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:c8d940d2010edd365619411b385b11e4", + "changeType": "UPSERT", + "aspectName": "container", + "aspect": { + "json": { + "container": "urn:li:container:647eefb4dfda8695baf1aa0775d78689" + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "file_without_extension.json" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:c8d940d2010edd365619411b385b11e4", + "changeType": "UPSERT", + "aspectName": "browsePathsV2", + "aspect": { + "json": { + "path": [ + { + "id": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:s3,test-platform-instance)", + "urn": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:s3,test-platform-instance)" + }, + { + "id": "urn:li:container:647eefb4dfda8695baf1aa0775d78689", + "urn": "urn:li:container:647eefb4dfda8695baf1aa0775d78689" + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "file_without_extension.json" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:b0037296cdd497e3137aa0628b8687bc", + "changeType": "UPSERT", + "aspectName": "containerProperties", + "aspect": { + "json": { + "customProperties": { + "platform": "s3", + "instance": "test-platform-instance", + "env": "DEV", + "folder_abs_path": "my-test-bucket/folder_a/folder_aa" + }, + "name": "folder_aa" + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "file_without_extension.json" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:b0037296cdd497e3137aa0628b8687bc", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "file_without_extension.json" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:b0037296cdd497e3137aa0628b8687bc", + "changeType": "UPSERT", + "aspectName": "dataPlatformInstance", + "aspect": { + "json": { + "platform": "urn:li:dataPlatform:s3", + "instance": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:s3,test-platform-instance)" + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "file_without_extension.json" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:b0037296cdd497e3137aa0628b8687bc", + "changeType": "UPSERT", + "aspectName": "subTypes", + "aspect": { + "json": { + "typeNames": [ + "Folder" + ] + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "file_without_extension.json" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:b0037296cdd497e3137aa0628b8687bc", + "changeType": "UPSERT", + "aspectName": "container", + "aspect": { + "json": { + "container": "urn:li:container:c8d940d2010edd365619411b385b11e4" + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "file_without_extension.json" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:b0037296cdd497e3137aa0628b8687bc", + "changeType": "UPSERT", + "aspectName": "browsePathsV2", + "aspect": { + "json": { + "path": [ + { + "id": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:s3,test-platform-instance)", + "urn": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:s3,test-platform-instance)" + }, + { + "id": "urn:li:container:647eefb4dfda8695baf1aa0775d78689", + "urn": "urn:li:container:647eefb4dfda8695baf1aa0775d78689" + }, + { + "id": "urn:li:container:c8d940d2010edd365619411b385b11e4", + "urn": "urn:li:container:c8d940d2010edd365619411b385b11e4" + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "file_without_extension.json" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:de5780654849d6a18b66df2f9cb8e8d9", + "changeType": "UPSERT", + "aspectName": "containerProperties", + "aspect": { + "json": { + "customProperties": { + "platform": "s3", + "instance": "test-platform-instance", + "env": "DEV", + "folder_abs_path": "my-test-bucket/folder_a/folder_aa/folder_aaa" + }, + "name": "folder_aaa" + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "file_without_extension.json" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:de5780654849d6a18b66df2f9cb8e8d9", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "file_without_extension.json" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:de5780654849d6a18b66df2f9cb8e8d9", + "changeType": "UPSERT", + "aspectName": "dataPlatformInstance", + "aspect": { + "json": { + "platform": "urn:li:dataPlatform:s3", + "instance": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:s3,test-platform-instance)" + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "file_without_extension.json" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:de5780654849d6a18b66df2f9cb8e8d9", + "changeType": "UPSERT", + "aspectName": "subTypes", + "aspect": { + "json": { + "typeNames": [ + "Folder" + ] + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "file_without_extension.json" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:de5780654849d6a18b66df2f9cb8e8d9", + "changeType": "UPSERT", + "aspectName": "container", + "aspect": { + "json": { + "container": "urn:li:container:b0037296cdd497e3137aa0628b8687bc" + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "file_without_extension.json" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:de5780654849d6a18b66df2f9cb8e8d9", + "changeType": "UPSERT", + "aspectName": "browsePathsV2", + "aspect": { + "json": { + "path": [ + { + "id": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:s3,test-platform-instance)", + "urn": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:s3,test-platform-instance)" + }, + { + "id": "urn:li:container:647eefb4dfda8695baf1aa0775d78689", + "urn": "urn:li:container:647eefb4dfda8695baf1aa0775d78689" + }, + { + "id": "urn:li:container:c8d940d2010edd365619411b385b11e4", + "urn": "urn:li:container:c8d940d2010edd365619411b385b11e4" + }, + { + "id": "urn:li:container:b0037296cdd497e3137aa0628b8687bc", + "urn": "urn:li:container:b0037296cdd497e3137aa0628b8687bc" + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "file_without_extension.json" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:9b4624d58669059c9e62afb3d7341944", + "changeType": "UPSERT", + "aspectName": "containerProperties", + "aspect": { + "json": { + "customProperties": { + "platform": "s3", + "instance": "test-platform-instance", + "env": "DEV", + "folder_abs_path": "my-test-bucket/folder_a/folder_aa/folder_aaa/no_extension" + }, + "name": "no_extension" + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "file_without_extension.json" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:9b4624d58669059c9e62afb3d7341944", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "file_without_extension.json" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:9b4624d58669059c9e62afb3d7341944", + "changeType": "UPSERT", + "aspectName": "dataPlatformInstance", + "aspect": { + "json": { + "platform": "urn:li:dataPlatform:s3", + "instance": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:s3,test-platform-instance)" + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "file_without_extension.json" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:9b4624d58669059c9e62afb3d7341944", + "changeType": "UPSERT", + "aspectName": "subTypes", + "aspect": { + "json": { + "typeNames": [ + "Folder" + ] + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "file_without_extension.json" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:9b4624d58669059c9e62afb3d7341944", + "changeType": "UPSERT", + "aspectName": "container", + "aspect": { + "json": { + "container": "urn:li:container:de5780654849d6a18b66df2f9cb8e8d9" + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "file_without_extension.json" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:9b4624d58669059c9e62afb3d7341944", + "changeType": "UPSERT", + "aspectName": "browsePathsV2", + "aspect": { + "json": { + "path": [ + { + "id": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:s3,test-platform-instance)", + "urn": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:s3,test-platform-instance)" + }, + { + "id": "urn:li:container:647eefb4dfda8695baf1aa0775d78689", + "urn": "urn:li:container:647eefb4dfda8695baf1aa0775d78689" + }, + { + "id": "urn:li:container:c8d940d2010edd365619411b385b11e4", + "urn": "urn:li:container:c8d940d2010edd365619411b385b11e4" + }, + { + "id": "urn:li:container:b0037296cdd497e3137aa0628b8687bc", + "urn": "urn:li:container:b0037296cdd497e3137aa0628b8687bc" + }, + { + "id": "urn:li:container:de5780654849d6a18b66df2f9cb8e8d9", + "urn": "urn:li:container:de5780654849d6a18b66df2f9cb8e8d9" + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "file_without_extension.json" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:s3,test-platform-instance.my-test-bucket/folder_a/folder_aa/folder_aaa/no_extension/small,DEV)", + "changeType": "UPSERT", + "aspectName": "container", + "aspect": { + "json": { + "container": "urn:li:container:9b4624d58669059c9e62afb3d7341944" + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "file_without_extension.json" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:s3,test-platform-instance.my-test-bucket/folder_a/folder_aa/folder_aaa/no_extension/small,DEV)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "file_without_extension.json" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:s3,test-platform-instance.my-test-bucket/folder_a/folder_aa/folder_aaa/no_extension/small,DEV)", + "changeType": "UPSERT", + "aspectName": "browsePathsV2", + "aspect": { + "json": { + "path": [ + { + "id": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:s3,test-platform-instance)", + "urn": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:s3,test-platform-instance)" + }, + { + "id": "urn:li:container:647eefb4dfda8695baf1aa0775d78689", + "urn": "urn:li:container:647eefb4dfda8695baf1aa0775d78689" + }, + { + "id": "urn:li:container:c8d940d2010edd365619411b385b11e4", + "urn": "urn:li:container:c8d940d2010edd365619411b385b11e4" + }, + { + "id": "urn:li:container:b0037296cdd497e3137aa0628b8687bc", + "urn": "urn:li:container:b0037296cdd497e3137aa0628b8687bc" + }, + { + "id": "urn:li:container:de5780654849d6a18b66df2f9cb8e8d9", + "urn": "urn:li:container:de5780654849d6a18b66df2f9cb8e8d9" + }, + { + "id": "urn:li:container:9b4624d58669059c9e62afb3d7341944", + "urn": "urn:li:container:9b4624d58669059c9e62afb3d7341944" + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "file_without_extension.json" + } +} +] \ No newline at end of file diff --git a/metadata-ingestion/tests/integration/s3/sources/s3/file_inference_without_extension.json b/metadata-ingestion/tests/integration/s3/sources/s3/file_inference_without_extension.json new file mode 100644 index 00000000000000..87f7950946780a --- /dev/null +++ b/metadata-ingestion/tests/integration/s3/sources/s3/file_inference_without_extension.json @@ -0,0 +1,19 @@ +{ + "type": "s3", + "config": { + "platform_instance": "test-platform-instance", + "env": "DEV", + "use_s3_content_type": true, + "path_specs": [{ + "include": "s3://my-test-bucket/folder_a/folder_aa/folder_aaa/no_extension/*" + }], + "aws_config": { + "aws_region": "us-east-1", + "aws_access_key_id": "testing", + "aws_secret_access_key": "testing" + }, + "profiling": { + "enabled": false + } + } +} diff --git a/metadata-ingestion/tests/integration/s3/test_s3.py b/metadata-ingestion/tests/integration/s3/test_s3.py index 4137c6c5c399ea..6ab399e20b3a8d 100644 --- a/metadata-ingestion/tests/integration/s3/test_s3.py +++ b/metadata-ingestion/tests/integration/s3/test_s3.py @@ -109,7 +109,13 @@ def s3_populate(pytestconfig, s3_resource, s3_client, bucket_names): full_path = os.path.join(root, file) rel_path = os.path.relpath(full_path, test_resources_dir) file_list.append(rel_path) - bkt.upload_file(full_path, rel_path) + bkt.upload_file( + full_path, + rel_path, # Set content type for `no_extension/small` file to text/csv + ExtraArgs={"ContentType": "text/csv"} + if "." not in rel_path + else {}, + ) s3_client.put_object_tagging( Bucket=bucket_name, Key=rel_path, From 466538f5e8e618aaa8157f65ad31558ed4bdb503 Mon Sep 17 00:00:00 2001 From: Andrew Sikowitz Date: Wed, 14 Aug 2024 12:08:20 -0700 Subject: [PATCH 2/8] refactor --- .../src/datahub/ingestion/source/s3/source.py | 67 ++++++++++--------- 1 file changed, 36 insertions(+), 31 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/s3/source.py b/metadata-ingestion/src/datahub/ingestion/source/s3/source.py index 8a06ff36093dc8..eba9f9761be9db 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/s3/source.py +++ b/metadata-ingestion/src/datahub/ingestion/source/s3/source.py @@ -391,38 +391,8 @@ def get_fields(self, table_data: TableData, path_spec: PathSpec) -> List: if extension == "" and path_spec.default_extension: extension = f".{path_spec.default_extension}" - inferrer: Optional[SchemaInferenceBase] = None - if table_data.content_type == "application/vnd.apache.parquet": - inferrer = parquet.ParquetInferrer() - elif table_data.content_type == "text/csv": - inferrer = csv_tsv.CsvInferrer(max_rows=self.source_config.max_rows) - elif table_data.content_type == "text/tab-separated-values": - inferrer = csv_tsv.TsvInferrer(max_rows=self.source_config.max_rows) - elif table_data.content_type == "application/json": - inferrer = json.JsonInferrer() - elif table_data.content_type == "application/avro": - inferrer = avro.AvroInferrer() - elif extension == ".parquet": - inferrer = parquet.ParquetInferrer() - elif extension == ".csv": - inferrer = csv_tsv.CsvInferrer(max_rows=self.source_config.max_rows) - elif extension == ".tsv": - inferrer = csv_tsv.TsvInferrer(max_rows=self.source_config.max_rows) - elif extension == ".jsonl": - inferrer = json.JsonInferrer( - max_rows=self.source_config.max_rows, format="jsonl" - ) - elif extension == ".json": - inferrer = json.JsonInferrer() - elif extension == ".avro": - inferrer = avro.AvroInferrer() - else: - self.report.report_warning( - table_data.full_path, - f"file {table_data.full_path} has unsupported extension", - ) - fields = [] + inferrer = self._get_inferrer(extension, table_data.content_type) if inferrer: try: fields = inferrer.infer_schema(file) @@ -432,6 +402,11 @@ def get_fields(self, table_data: TableData, path_spec: PathSpec) -> List: table_data.full_path, f"could not infer schema for file {table_data.full_path}: {e}", ) + else: + self.report.report_warning( + table_data.full_path, + f"file {table_data.full_path} has unsupported extension", + ) file.close() if self.source_config.sort_schema_fields: @@ -444,6 +419,36 @@ def get_fields(self, table_data: TableData, path_spec: PathSpec) -> List: return fields + def _get_inferrer( + self, extension: str, content_type: Optional[str] + ) -> Optional[SchemaInferenceBase]: + if content_type == "application/vnd.apache.parquet": + return parquet.ParquetInferrer() + elif content_type == "text/csv": + return csv_tsv.CsvInferrer(max_rows=self.source_config.max_rows) + elif content_type == "text/tab-separated-values": + return csv_tsv.TsvInferrer(max_rows=self.source_config.max_rows) + elif content_type == "application/json": + return json.JsonInferrer() + elif content_type == "application/avro": + return avro.AvroInferrer() + elif extension == ".parquet": + return parquet.ParquetInferrer() + elif extension == ".csv": + return csv_tsv.CsvInferrer(max_rows=self.source_config.max_rows) + elif extension == ".tsv": + return csv_tsv.TsvInferrer(max_rows=self.source_config.max_rows) + elif extension == ".jsonl": + return json.JsonInferrer( + max_rows=self.source_config.max_rows, format="jsonl" + ) + elif extension == ".json": + return json.JsonInferrer() + elif extension == ".avro": + return avro.AvroInferrer() + else: + return None + def add_partition_columns_to_schema( self, path_spec: PathSpec, full_path: str, fields: List[SchemaField] ) -> None: From 36c462cba4707a8e29e8ad3e85809420c8c76fde Mon Sep 17 00:00:00 2001 From: Andrew Sikowitz Date: Wed, 14 Aug 2024 12:18:57 -0700 Subject: [PATCH 3/8] refactoring --- metadata-ingestion/src/datahub/ingestion/source/s3/config.py | 4 ++-- metadata-ingestion/src/datahub/ingestion/source/s3/source.py | 2 ++ 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/s3/config.py b/metadata-ingestion/src/datahub/ingestion/source/s3/config.py index dd6d6c7eba2535..3069c62e3a240f 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/s3/config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/s3/config.py @@ -50,8 +50,8 @@ class DataLakeSourceConfig( use_s3_content_type: bool = Field( default=False, description=( - "If enabled, use S3 Object metadata to determine content type over file extension, if set.", - " Warning: this requires a separate query to S3 for each object, which can be slow for large datasets.", + "If enabled, use S3 Object metadata to determine content type over file extension, if set." + " Warning: this requires a separate query to S3 for each object, which can be slow for large datasets." ), ) diff --git a/metadata-ingestion/src/datahub/ingestion/source/s3/source.py b/metadata-ingestion/src/datahub/ingestion/source/s3/source.py index eba9f9761be9db..548e9b6150968e 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/s3/source.py +++ b/metadata-ingestion/src/datahub/ingestion/source/s3/source.py @@ -12,6 +12,7 @@ import smart_open.compression as so_compression from more_itertools import peekable +from mypy_boto3_s3.service_resource import ObjectSummary, S3ServiceResource from pyspark.conf import SparkConf from pyspark.sql import SparkSession from pyspark.sql.dataframe import DataFrame @@ -1018,6 +1019,7 @@ def s3_browser(self, path_spec: PathSpec, sample_size: int) -> Iterable[BrowsePa timestamp=max_folder.modification_time, size=max_folder.size, partitions=partitions, + # TODO: Support content type inference for partitions ) except Exception as e: # This odd check if being done because boto does not have a proper exception to catch From a113d1a6987b332c8a048040a2f3c0e5d469bbd9 Mon Sep 17 00:00:00 2001 From: Andrew Sikowitz Date: Wed, 14 Aug 2024 13:08:01 -0700 Subject: [PATCH 4/8] separate out local tests --- .../sources/local/file_without_extension.json | 19 +++++++++++++++ .../s3/sources/local/folder_no_partition.json | 17 ++++++++++++++ .../local/folder_no_partition_exclude.json | 20 ++++++++++++++++ .../local/folder_no_partition_filename.json | 19 +++++++++++++++ .../local/folder_no_partition_glob.json | 20 ++++++++++++++++ .../sources/local/folder_partition_basic.json | 21 +++++++++++++++++ .../local/folder_partition_keyval.json | 18 +++++++++++++++ .../local/folder_partition_update_schema.json | 22 ++++++++++++++++++ .../s3/sources/local/multiple_files.json | 18 +++++++++++++++ .../local/multiple_spec_for_files.json | 23 +++++++++++++++++++ .../multiple_specs_of_different_buckets.json | 23 +++++++++++++++++++ .../s3/sources/local/single_file.json | 20 ++++++++++++++++ .../tests/integration/s3/test_s3.py | 12 ++++++---- 13 files changed, 248 insertions(+), 4 deletions(-) create mode 100644 metadata-ingestion/tests/integration/s3/sources/local/file_without_extension.json create mode 100644 metadata-ingestion/tests/integration/s3/sources/local/folder_no_partition.json create mode 100644 metadata-ingestion/tests/integration/s3/sources/local/folder_no_partition_exclude.json create mode 100644 metadata-ingestion/tests/integration/s3/sources/local/folder_no_partition_filename.json create mode 100644 metadata-ingestion/tests/integration/s3/sources/local/folder_no_partition_glob.json create mode 100644 metadata-ingestion/tests/integration/s3/sources/local/folder_partition_basic.json create mode 100644 metadata-ingestion/tests/integration/s3/sources/local/folder_partition_keyval.json create mode 100644 metadata-ingestion/tests/integration/s3/sources/local/folder_partition_update_schema.json create mode 100644 metadata-ingestion/tests/integration/s3/sources/local/multiple_files.json create mode 100644 metadata-ingestion/tests/integration/s3/sources/local/multiple_spec_for_files.json create mode 100644 metadata-ingestion/tests/integration/s3/sources/local/multiple_specs_of_different_buckets.json create mode 100644 metadata-ingestion/tests/integration/s3/sources/local/single_file.json diff --git a/metadata-ingestion/tests/integration/s3/sources/local/file_without_extension.json b/metadata-ingestion/tests/integration/s3/sources/local/file_without_extension.json new file mode 100644 index 00000000000000..235b588e5719bf --- /dev/null +++ b/metadata-ingestion/tests/integration/s3/sources/local/file_without_extension.json @@ -0,0 +1,19 @@ +{ + "type": "s3", + "config": { + "platform_instance": "test-platform-instance", + "env": "DEV", + "path_specs": [{ + "include": "s3://my-test-bucket/folder_a/folder_aa/folder_aaa/no_extension/*", + "default_extension": "csv" + }], + "aws_config": { + "aws_region": "us-east-1", + "aws_access_key_id": "testing", + "aws_secret_access_key": "testing" + }, + "profiling": { + "enabled": false + } + } +} \ No newline at end of file diff --git a/metadata-ingestion/tests/integration/s3/sources/local/folder_no_partition.json b/metadata-ingestion/tests/integration/s3/sources/local/folder_no_partition.json new file mode 100644 index 00000000000000..c06e411005399e --- /dev/null +++ b/metadata-ingestion/tests/integration/s3/sources/local/folder_no_partition.json @@ -0,0 +1,17 @@ +{ + "type": "s3", + "config": { + "env": "UAT", + "path_specs": [{ + "include": "s3://my-test-bucket/folder_a/folder_aa/folder_aaa/{table}/*.*" + }], + "aws_config": { + "aws_region": "us-east-1", + "aws_access_key_id": "testing", + "aws_secret_access_key": "testing" + }, + "profiling": { + "enabled": false + } + } +} \ No newline at end of file diff --git a/metadata-ingestion/tests/integration/s3/sources/local/folder_no_partition_exclude.json b/metadata-ingestion/tests/integration/s3/sources/local/folder_no_partition_exclude.json new file mode 100644 index 00000000000000..8d05bfcc60a0c7 --- /dev/null +++ b/metadata-ingestion/tests/integration/s3/sources/local/folder_no_partition_exclude.json @@ -0,0 +1,20 @@ +{ + "type": "s3", + "config": { + "env": "UAT", + "path_specs": [{ + "include": "s3://my-test-bucket/folder_a/folder_aa/folder_aaa/{table}/*.*", + "exclude": [ + "**/food_csv/**" + ] + }], + "aws_config": { + "aws_region": "us-east-1", + "aws_access_key_id": "testing", + "aws_secret_access_key": "testing" + }, + "profiling": { + "enabled": false + } + } +} \ No newline at end of file diff --git a/metadata-ingestion/tests/integration/s3/sources/local/folder_no_partition_filename.json b/metadata-ingestion/tests/integration/s3/sources/local/folder_no_partition_filename.json new file mode 100644 index 00000000000000..421483c74a8681 --- /dev/null +++ b/metadata-ingestion/tests/integration/s3/sources/local/folder_no_partition_filename.json @@ -0,0 +1,19 @@ +{ + "type": "s3", + "config": { + "env": "UAT", + "path_specs": [{ + "include": "s3://my-test-bucket/folder_a/folder_aa/{dept}/{table}/*.*", + "file_types": ["csv"], + "table_name": "{dept}.{table}" + }], + "aws_config": { + "aws_region": "us-east-1", + "aws_access_key_id": "testing", + "aws_secret_access_key": "testing" + }, + "profiling": { + "enabled": false + } + } +} diff --git a/metadata-ingestion/tests/integration/s3/sources/local/folder_no_partition_glob.json b/metadata-ingestion/tests/integration/s3/sources/local/folder_no_partition_glob.json new file mode 100644 index 00000000000000..bf3f832f9c7b4a --- /dev/null +++ b/metadata-ingestion/tests/integration/s3/sources/local/folder_no_partition_glob.json @@ -0,0 +1,20 @@ +{ + "type": "s3", + "config": { + "env": "UAT", + "path_specs": [{ + "include": "s3://my-test-bucket/*/folder_aa/*/{table}/*.*", + "exclude": [ + "**/food_csv/**" + ] + }], + "aws_config": { + "aws_region": "us-east-1", + "aws_access_key_id": "testing", + "aws_secret_access_key": "testing" + }, + "profiling": { + "enabled": false + } + } +} \ No newline at end of file diff --git a/metadata-ingestion/tests/integration/s3/sources/local/folder_partition_basic.json b/metadata-ingestion/tests/integration/s3/sources/local/folder_partition_basic.json new file mode 100644 index 00000000000000..c6afe024f6a2c8 --- /dev/null +++ b/metadata-ingestion/tests/integration/s3/sources/local/folder_partition_basic.json @@ -0,0 +1,21 @@ +{ + "type": "s3", + "config": { + "env": "UAT", + "path_specs": [{ + "include": "s3://my-test-bucket/folder_a/folder_aa/{dept}/{table}/{partition[0]}/{partition[1]}/*.*", + "table_name": "{dept}.{table}", + "exclude":[ + "**/folder_aaaa/**" + ] + }], + "aws_config": { + "aws_region": "us-east-1", + "aws_access_key_id": "testing", + "aws_secret_access_key": "testing" + }, + "profiling": { + "enabled": false + } + } +} \ No newline at end of file diff --git a/metadata-ingestion/tests/integration/s3/sources/local/folder_partition_keyval.json b/metadata-ingestion/tests/integration/s3/sources/local/folder_partition_keyval.json new file mode 100644 index 00000000000000..9bb6d129858ab0 --- /dev/null +++ b/metadata-ingestion/tests/integration/s3/sources/local/folder_partition_keyval.json @@ -0,0 +1,18 @@ +{ + "type": "s3", + "config": { + "env": "UAT", + "path_specs": [{ + "include": "s3://my-test-bucket/folder_a/folder_aa/{dept}/{table}/{partition_key[0]}={partition[0]}/{partition_key[1]}={partition[1]}/*.*", + "table_name": "{dept}.{table}" + }], + "aws_config": { + "aws_region": "us-east-1", + "aws_access_key_id": "testing", + "aws_secret_access_key": "testing" + }, + "profiling": { + "enabled": false + } + } +} \ No newline at end of file diff --git a/metadata-ingestion/tests/integration/s3/sources/local/folder_partition_update_schema.json b/metadata-ingestion/tests/integration/s3/sources/local/folder_partition_update_schema.json new file mode 100644 index 00000000000000..b542f60a6a8aa6 --- /dev/null +++ b/metadata-ingestion/tests/integration/s3/sources/local/folder_partition_update_schema.json @@ -0,0 +1,22 @@ +{ + "type": "s3", + "config": { + "env": "UAT", + "path_specs": [{ + "include": "s3://my-test-bucket/folder_a/folder_aa/{dept}/{table}/{partition[0]}/{partition[1]}/*.*", + "sample_files": true, + "table_name": "{dept}.{table}", + "exclude":[ + "**/folder_aaaa/**" + ] + }], + "aws_config": { + "aws_region": "us-east-1", + "aws_access_key_id": "testing", + "aws_secret_access_key": "testing" + }, + "profiling": { + "enabled": false + } + } +} \ No newline at end of file diff --git a/metadata-ingestion/tests/integration/s3/sources/local/multiple_files.json b/metadata-ingestion/tests/integration/s3/sources/local/multiple_files.json new file mode 100644 index 00000000000000..77be022895cfca --- /dev/null +++ b/metadata-ingestion/tests/integration/s3/sources/local/multiple_files.json @@ -0,0 +1,18 @@ +{ + "type": "s3", + "config": { + "platform_instance": "test-platform-instance", + "env": "DEV", + "path_specs": [{ + "include": "s3://my-test-bucket/folder_a/folder_aa/folder_aaa/*.*" + }], + "aws_config": { + "aws_region": "us-east-1", + "aws_access_key_id": "testing", + "aws_secret_access_key": "testing" + }, + "profiling": { + "enabled": false + } + } +} \ No newline at end of file diff --git a/metadata-ingestion/tests/integration/s3/sources/local/multiple_spec_for_files.json b/metadata-ingestion/tests/integration/s3/sources/local/multiple_spec_for_files.json new file mode 100644 index 00000000000000..d051421e154e72 --- /dev/null +++ b/metadata-ingestion/tests/integration/s3/sources/local/multiple_spec_for_files.json @@ -0,0 +1,23 @@ +{ + "type": "s3", + "config": { + "path_specs": [ + { + "include": "s3://my-test-bucket/folder_a/folder_aa/folder_aaa/chord_progressions_avro.avro" + }, + { + "include": "s3://my-test-bucket/folder_a/folder_aa/folder_aaa/chord_progressions_csv.csv" + } + ], + "aws_config": { + "aws_region": "us-east-1", + "aws_access_key_id": "testing", + "aws_secret_access_key": "testing" + }, + "profiling": { + "enabled": false + }, + "use_s3_bucket_tags": true, + "use_s3_object_tags": true + } +} diff --git a/metadata-ingestion/tests/integration/s3/sources/local/multiple_specs_of_different_buckets.json b/metadata-ingestion/tests/integration/s3/sources/local/multiple_specs_of_different_buckets.json new file mode 100644 index 00000000000000..d6aec502e11954 --- /dev/null +++ b/metadata-ingestion/tests/integration/s3/sources/local/multiple_specs_of_different_buckets.json @@ -0,0 +1,23 @@ +{ + "type": "s3", + "config": { + "path_specs": [ + { + "include": "s3://my-test-bucket/folder_a/folder_aa/folder_aaa/chord_progressions_avro.avro" + }, + { + "include": "s3://my-test-bucket-2/folder_a/folder_aa/folder_aaa/chord_progressions_csv.csv" + } + ], + "aws_config": { + "aws_region": "us-east-1", + "aws_access_key_id": "testing", + "aws_secret_access_key": "testing" + }, + "profiling": { + "enabled": false + }, + "use_s3_bucket_tags": true, + "use_s3_object_tags": true + } +} diff --git a/metadata-ingestion/tests/integration/s3/sources/local/single_file.json b/metadata-ingestion/tests/integration/s3/sources/local/single_file.json new file mode 100644 index 00000000000000..96e5eafa562dd3 --- /dev/null +++ b/metadata-ingestion/tests/integration/s3/sources/local/single_file.json @@ -0,0 +1,20 @@ +{ + "type": "s3", + "config": { + "path_specs": [ + { + "include": "s3://my-test-bucket/folder_a/folder_aa/folder_aaa/chord_progressions_avro.avro" + } + ], + "aws_config": { + "aws_region": "us-east-1", + "aws_access_key_id": "testing", + "aws_secret_access_key": "testing" + }, + "profiling": { + "enabled": false + }, + "use_s3_bucket_tags": true, + "use_s3_object_tags": true + } +} diff --git a/metadata-ingestion/tests/integration/s3/test_s3.py b/metadata-ingestion/tests/integration/s3/test_s3.py index 6ab399e20b3a8d..66873a0a2ef3a4 100644 --- a/metadata-ingestion/tests/integration/s3/test_s3.py +++ b/metadata-ingestion/tests/integration/s3/test_s3.py @@ -149,8 +149,8 @@ def touch_local_files(pytestconfig): os.utime(full_path, times=(current_time_sec, current_time_sec)) -SOURCE_FILES_PATH = "./tests/integration/s3/sources/s3" -source_files = os.listdir(SOURCE_FILES_PATH) +S3_SOURCE_FILES_PATH = "./tests/integration/s3/sources/s3" +source_files = os.listdir(S3_SOURCE_FILES_PATH) @pytest.mark.integration @@ -160,7 +160,7 @@ def test_data_lake_s3_ingest( ): test_resources_dir = pytestconfig.rootpath / "tests/integration/s3/" - f = open(os.path.join(SOURCE_FILES_PATH, source_file)) + f = open(os.path.join(S3_SOURCE_FILES_PATH, source_file)) source = json.load(f) config_dict = {} @@ -189,13 +189,17 @@ def test_data_lake_s3_ingest( ) +LOCAL_SOURCE_FILES_PATH = "./tests/integration/s3/sources/local" +source_files = os.listdir(LOCAL_SOURCE_FILES_PATH) + + @pytest.mark.integration @pytest.mark.parametrize("source_file", source_files) def test_data_lake_local_ingest( pytestconfig, touch_local_files, source_file, tmp_path, mock_time ): test_resources_dir = pytestconfig.rootpath / "tests/integration/s3/" - f = open(os.path.join(SOURCE_FILES_PATH, source_file)) + f = open(os.path.join(LOCAL_SOURCE_FILES_PATH, source_file)) source = json.load(f) config_dict = {} From e4ae38dfd47156916fda9ae68518985119e8ea91 Mon Sep 17 00:00:00 2001 From: Andrew Sikowitz Date: Wed, 14 Aug 2024 13:13:06 -0700 Subject: [PATCH 5/8] refactor test file --- .../file_without_extension.json | 0 .../folder_no_partition.json | 0 .../folder_no_partition_exclude.json | 0 .../folder_no_partition_filename.json | 0 .../folder_no_partition_glob.json | 0 .../folder_partition_basic.json | 0 .../folder_partition_keyval.json | 0 .../folder_partition_update_schema.json | 0 .../{local => shared}/multiple_files.json | 0 .../multiple_spec_for_files.json | 0 .../multiple_specs_of_different_buckets.json | 0 .../{local => shared}/single_file.json | 0 .../tests/integration/s3/test_s3.py | 25 +++++++++++-------- 13 files changed, 14 insertions(+), 11 deletions(-) rename metadata-ingestion/tests/integration/s3/sources/{local => shared}/file_without_extension.json (100%) rename metadata-ingestion/tests/integration/s3/sources/{local => shared}/folder_no_partition.json (100%) rename metadata-ingestion/tests/integration/s3/sources/{local => shared}/folder_no_partition_exclude.json (100%) rename metadata-ingestion/tests/integration/s3/sources/{local => shared}/folder_no_partition_filename.json (100%) rename metadata-ingestion/tests/integration/s3/sources/{local => shared}/folder_no_partition_glob.json (100%) rename metadata-ingestion/tests/integration/s3/sources/{local => shared}/folder_partition_basic.json (100%) rename metadata-ingestion/tests/integration/s3/sources/{local => shared}/folder_partition_keyval.json (100%) rename metadata-ingestion/tests/integration/s3/sources/{local => shared}/folder_partition_update_schema.json (100%) rename metadata-ingestion/tests/integration/s3/sources/{local => shared}/multiple_files.json (100%) rename metadata-ingestion/tests/integration/s3/sources/{local => shared}/multiple_spec_for_files.json (100%) rename metadata-ingestion/tests/integration/s3/sources/{local => shared}/multiple_specs_of_different_buckets.json (100%) rename metadata-ingestion/tests/integration/s3/sources/{local => shared}/single_file.json (100%) diff --git a/metadata-ingestion/tests/integration/s3/sources/local/file_without_extension.json b/metadata-ingestion/tests/integration/s3/sources/shared/file_without_extension.json similarity index 100% rename from metadata-ingestion/tests/integration/s3/sources/local/file_without_extension.json rename to metadata-ingestion/tests/integration/s3/sources/shared/file_without_extension.json diff --git a/metadata-ingestion/tests/integration/s3/sources/local/folder_no_partition.json b/metadata-ingestion/tests/integration/s3/sources/shared/folder_no_partition.json similarity index 100% rename from metadata-ingestion/tests/integration/s3/sources/local/folder_no_partition.json rename to metadata-ingestion/tests/integration/s3/sources/shared/folder_no_partition.json diff --git a/metadata-ingestion/tests/integration/s3/sources/local/folder_no_partition_exclude.json b/metadata-ingestion/tests/integration/s3/sources/shared/folder_no_partition_exclude.json similarity index 100% rename from metadata-ingestion/tests/integration/s3/sources/local/folder_no_partition_exclude.json rename to metadata-ingestion/tests/integration/s3/sources/shared/folder_no_partition_exclude.json diff --git a/metadata-ingestion/tests/integration/s3/sources/local/folder_no_partition_filename.json b/metadata-ingestion/tests/integration/s3/sources/shared/folder_no_partition_filename.json similarity index 100% rename from metadata-ingestion/tests/integration/s3/sources/local/folder_no_partition_filename.json rename to metadata-ingestion/tests/integration/s3/sources/shared/folder_no_partition_filename.json diff --git a/metadata-ingestion/tests/integration/s3/sources/local/folder_no_partition_glob.json b/metadata-ingestion/tests/integration/s3/sources/shared/folder_no_partition_glob.json similarity index 100% rename from metadata-ingestion/tests/integration/s3/sources/local/folder_no_partition_glob.json rename to metadata-ingestion/tests/integration/s3/sources/shared/folder_no_partition_glob.json diff --git a/metadata-ingestion/tests/integration/s3/sources/local/folder_partition_basic.json b/metadata-ingestion/tests/integration/s3/sources/shared/folder_partition_basic.json similarity index 100% rename from metadata-ingestion/tests/integration/s3/sources/local/folder_partition_basic.json rename to metadata-ingestion/tests/integration/s3/sources/shared/folder_partition_basic.json diff --git a/metadata-ingestion/tests/integration/s3/sources/local/folder_partition_keyval.json b/metadata-ingestion/tests/integration/s3/sources/shared/folder_partition_keyval.json similarity index 100% rename from metadata-ingestion/tests/integration/s3/sources/local/folder_partition_keyval.json rename to metadata-ingestion/tests/integration/s3/sources/shared/folder_partition_keyval.json diff --git a/metadata-ingestion/tests/integration/s3/sources/local/folder_partition_update_schema.json b/metadata-ingestion/tests/integration/s3/sources/shared/folder_partition_update_schema.json similarity index 100% rename from metadata-ingestion/tests/integration/s3/sources/local/folder_partition_update_schema.json rename to metadata-ingestion/tests/integration/s3/sources/shared/folder_partition_update_schema.json diff --git a/metadata-ingestion/tests/integration/s3/sources/local/multiple_files.json b/metadata-ingestion/tests/integration/s3/sources/shared/multiple_files.json similarity index 100% rename from metadata-ingestion/tests/integration/s3/sources/local/multiple_files.json rename to metadata-ingestion/tests/integration/s3/sources/shared/multiple_files.json diff --git a/metadata-ingestion/tests/integration/s3/sources/local/multiple_spec_for_files.json b/metadata-ingestion/tests/integration/s3/sources/shared/multiple_spec_for_files.json similarity index 100% rename from metadata-ingestion/tests/integration/s3/sources/local/multiple_spec_for_files.json rename to metadata-ingestion/tests/integration/s3/sources/shared/multiple_spec_for_files.json diff --git a/metadata-ingestion/tests/integration/s3/sources/local/multiple_specs_of_different_buckets.json b/metadata-ingestion/tests/integration/s3/sources/shared/multiple_specs_of_different_buckets.json similarity index 100% rename from metadata-ingestion/tests/integration/s3/sources/local/multiple_specs_of_different_buckets.json rename to metadata-ingestion/tests/integration/s3/sources/shared/multiple_specs_of_different_buckets.json diff --git a/metadata-ingestion/tests/integration/s3/sources/local/single_file.json b/metadata-ingestion/tests/integration/s3/sources/shared/single_file.json similarity index 100% rename from metadata-ingestion/tests/integration/s3/sources/local/single_file.json rename to metadata-ingestion/tests/integration/s3/sources/shared/single_file.json diff --git a/metadata-ingestion/tests/integration/s3/test_s3.py b/metadata-ingestion/tests/integration/s3/test_s3.py index 66873a0a2ef3a4..b45f1f78fc55a8 100644 --- a/metadata-ingestion/tests/integration/s3/test_s3.py +++ b/metadata-ingestion/tests/integration/s3/test_s3.py @@ -149,18 +149,24 @@ def touch_local_files(pytestconfig): os.utime(full_path, times=(current_time_sec, current_time_sec)) +SHARED_SOURCE_FILES_PATH = "./tests/integration/s3/sources/shared" +shared_source_files = [ + (SHARED_SOURCE_FILES_PATH, p) for p in os.listdir(SHARED_SOURCE_FILES_PATH) +] + S3_SOURCE_FILES_PATH = "./tests/integration/s3/sources/s3" -source_files = os.listdir(S3_SOURCE_FILES_PATH) +s3_source_files = [(S3_SOURCE_FILES_PATH, p) for p in os.listdir(S3_SOURCE_FILES_PATH)] @pytest.mark.integration -@pytest.mark.parametrize("source_file", source_files) +@pytest.mark.parametrize("source_file_tuple", shared_source_files + s3_source_files) def test_data_lake_s3_ingest( - pytestconfig, s3_populate, source_file, tmp_path, mock_time + pytestconfig, s3_populate, source_file_tuple, tmp_path, mock_time ): + source_dir, source_file = source_file_tuple test_resources_dir = pytestconfig.rootpath / "tests/integration/s3/" - f = open(os.path.join(S3_SOURCE_FILES_PATH, source_file)) + f = open(os.path.join(source_dir, source_file)) source = json.load(f) config_dict = {} @@ -189,17 +195,14 @@ def test_data_lake_s3_ingest( ) -LOCAL_SOURCE_FILES_PATH = "./tests/integration/s3/sources/local" -source_files = os.listdir(LOCAL_SOURCE_FILES_PATH) - - @pytest.mark.integration -@pytest.mark.parametrize("source_file", source_files) +@pytest.mark.parametrize("source_file_tuple", shared_source_files) def test_data_lake_local_ingest( - pytestconfig, touch_local_files, source_file, tmp_path, mock_time + pytestconfig, touch_local_files, source_file_tuple, tmp_path, mock_time ): + source_dir, source_file = source_file_tuple test_resources_dir = pytestconfig.rootpath / "tests/integration/s3/" - f = open(os.path.join(LOCAL_SOURCE_FILES_PATH, source_file)) + f = open(os.path.join(source_dir, source_file)) source = json.load(f) config_dict = {} From 3604c86db45d6762faad5d2878a2e63a60f1b3d2 Mon Sep 17 00:00:00 2001 From: Andrew Sikowitz Date: Thu, 29 Aug 2024 11:27:29 -0700 Subject: [PATCH 6/8] lint; refactor --- .../src/datahub/ingestion/source/s3/source.py | 27 +++++++------------ 1 file changed, 9 insertions(+), 18 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/s3/source.py b/metadata-ingestion/src/datahub/ingestion/source/s3/source.py index 548e9b6150968e..8a512cf86b3052 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/s3/source.py +++ b/metadata-ingestion/src/datahub/ingestion/source/s3/source.py @@ -12,13 +12,13 @@ import smart_open.compression as so_compression from more_itertools import peekable -from mypy_boto3_s3.service_resource import ObjectSummary, S3ServiceResource from pyspark.conf import SparkConf from pyspark.sql import SparkSession from pyspark.sql.dataframe import DataFrame from pyspark.sql.utils import AnalysisException from smart_open import open as smart_open +from build.lib.datahub.lite.lite_server import browse from datahub.emitter.mce_builder import ( make_data_platform_urn, make_dataplatform_instance_urn, @@ -751,12 +751,10 @@ def extract_table_name(self, path_spec: PathSpec, named_vars: dict) -> str: def extract_table_data( self, path_spec: PathSpec, - path: str, - timestamp: datetime, - size: int, - partitions: List[Folder], - content_type: Optional[str], + browse_path: BrowsePath, ) -> TableData: + path = browse_path.file + partitions = browse_path.partitions logger.debug(f"Getting table data for path: {path}") table_name, table_path = path_spec.extract_table_name_and_path(path) return TableData( @@ -766,12 +764,12 @@ def extract_table_data( partitions=partitions, max_partition=partitions[-1] if partitions else None, min_partition=partitions[0] if partitions else None, - timestamp=timestamp, + timestamp=browse_path.timestamp, table_path=table_path, number_of_files=1, size_in_bytes=( - size - if size + browse_path.size + if browse_path.size else sum( [ partition.size if partition.size else 0 @@ -779,7 +777,7 @@ def extract_table_data( ] ) ), - content_type=content_type, + content_type=browse_path.content_type, ) def resolve_templated_folders(self, bucket_name: str, prefix: str) -> Iterable[str]: @@ -1109,14 +1107,7 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: and self.source_config.use_s3_content_type, ): continue - table_data = self.extract_table_data( - path_spec, - browse_path.file, - browse_path.timestamp, - browse_path.size, - browse_path.partitions, - browse_path.content_type, - ) + table_data = self.extract_table_data(path_spec, browse_path) if table_data.table_path not in table_dict: table_dict[table_data.table_path] = table_data else: From f8dcd6368c3c66aee5020687812fb5bf63bb6050 Mon Sep 17 00:00:00 2001 From: treff7es Date: Fri, 30 Aug 2024 09:25:52 +0200 Subject: [PATCH 7/8] Isorting --- metadata-ingestion/src/datahub/ingestion/source/s3/source.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/s3/source.py b/metadata-ingestion/src/datahub/ingestion/source/s3/source.py index 8a512cf86b3052..48b922f6610044 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/s3/source.py +++ b/metadata-ingestion/src/datahub/ingestion/source/s3/source.py @@ -11,6 +11,7 @@ from typing import Any, Dict, Iterable, List, Optional, Tuple import smart_open.compression as so_compression +from build.lib.datahub.lite.lite_server import browse from more_itertools import peekable from pyspark.conf import SparkConf from pyspark.sql import SparkSession @@ -18,7 +19,6 @@ from pyspark.sql.utils import AnalysisException from smart_open import open as smart_open -from build.lib.datahub.lite.lite_server import browse from datahub.emitter.mce_builder import ( make_data_platform_urn, make_dataplatform_instance_urn, From b4f2422f615e1ade2504da6fb96e3a23a94e0ef8 Mon Sep 17 00:00:00 2001 From: treff7es Date: Fri, 30 Aug 2024 11:15:09 +0200 Subject: [PATCH 8/8] Remove unused import --- metadata-ingestion/src/datahub/ingestion/source/s3/source.py | 1 - 1 file changed, 1 deletion(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/s3/source.py b/metadata-ingestion/src/datahub/ingestion/source/s3/source.py index 48b922f6610044..ef5ed3c6304c92 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/s3/source.py +++ b/metadata-ingestion/src/datahub/ingestion/source/s3/source.py @@ -11,7 +11,6 @@ from typing import Any, Dict, Iterable, List, Optional, Tuple import smart_open.compression as so_compression -from build.lib.datahub.lite.lite_server import browse from more_itertools import peekable from pyspark.conf import SparkConf from pyspark.sql import SparkSession