diff --git a/metadata-ingestion/src/datahub/ingestion/source/data_lake_common/path_spec.py b/metadata-ingestion/src/datahub/ingestion/source/data_lake_common/path_spec.py index 4c2b0c276b9e7..bc19940afdd1e 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/data_lake_common/path_spec.py +++ b/metadata-ingestion/src/datahub/ingestion/source/data_lake_common/path_spec.py @@ -493,7 +493,7 @@ def validate_path_spec(cls, values: Dict) -> Dict[str, Any]: if ( include_ext not in values["file_types"] - and include_ext != "*" + and include_ext not in ["*", ""] and not values["default_extension"] and include_ext not in SUPPORTED_COMPRESSIONS ): diff --git a/metadata-ingestion/src/datahub/ingestion/source/s3/config.py b/metadata-ingestion/src/datahub/ingestion/source/s3/config.py index 8f1b79251c466..3069c62e3a240 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/s3/config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/s3/config.py @@ -47,6 +47,13 @@ class DataLakeSourceConfig( None, description="Whether or not to create tags in datahub from the s3 object", ) + use_s3_content_type: bool = Field( + default=False, + description=( + "If enabled, use S3 Object metadata to determine content type over file extension, if set." + " Warning: this requires a separate query to S3 for each object, which can be slow for large datasets." + ), + ) # Whether to update the table schema when schema in files within the partitions are updated _update_schema_on_partition_file_updates_deprecation = pydantic_field_deprecated( @@ -145,13 +152,27 @@ def check_path_specs_and_infer_platform( return path_specs @pydantic.validator("platform", always=True) - def platform_not_empty(cls, platform: str, values: dict) -> str: + def platform_valid(cls, platform: str, values: dict) -> str: inferred_platform = values.get( "platform", None ) # we may have inferred it above platform = platform or inferred_platform if not platform: raise ValueError("platform must not be empty") + + if platform != "s3" and values.get("use_s3_bucket_tags"): + raise ValueError( + "Cannot grab s3 bucket tags when platform is not s3. Remove the flag or ingest from s3." + ) + if platform != "s3" and values.get("use_s3_object_tags"): + raise ValueError( + "Cannot grab s3 object tags when platform is not s3. Remove the flag or ingest from s3." + ) + if platform != "s3" and values.get("use_s3_content_type"): + raise ValueError( + "Cannot grab s3 object content type when platform is not s3. Remove the flag or ingest from s3." + ) + return platform @pydantic.root_validator(skip_on_failure=True) diff --git a/metadata-ingestion/src/datahub/ingestion/source/s3/source.py b/metadata-ingestion/src/datahub/ingestion/source/s3/source.py index 55e25ebe88d12..ef5ed3c6304c9 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/s3/source.py +++ b/metadata-ingestion/src/datahub/ingestion/source/s3/source.py @@ -47,6 +47,7 @@ from datahub.ingestion.source.s3.config import DataLakeSourceConfig, PathSpec from datahub.ingestion.source.s3.report import DataLakeSourceReport from datahub.ingestion.source.schema_inference import avro, csv_tsv, json, parquet +from datahub.ingestion.source.schema_inference.base import SchemaInferenceBase from datahub.ingestion.source.state.stale_entity_removal_handler import ( StaleEntityRemovalHandler, ) @@ -82,7 +83,6 @@ # Hack to support the .gzip extension with smart_open. so_compression.register_compressor(".gzip", so_compression._COMPRESSOR_REGISTRY[".gz"]) - # config flags to emit telemetry for config_options_to_report = [ "platform", @@ -161,6 +161,7 @@ class BrowsePath: timestamp: datetime size: int partitions: List[Folder] + content_type: Optional[str] = None @dataclasses.dataclass @@ -175,6 +176,7 @@ class TableData: partitions: Optional[List[Folder]] = None max_partition: Optional[Folder] = None min_partition: Optional[Folder] = None + content_type: Optional[str] = None @platform_name("S3 / Local Files", id="s3") @@ -378,8 +380,6 @@ def get_fields(self, table_data: TableData, path_spec: PathSpec) -> List: # capabilities of smart_open. file = smart_open(table_data.full_path, "rb") - fields = [] - extension = pathlib.Path(table_data.full_path).suffix from datahub.ingestion.source.data_lake_common.path_spec import ( SUPPORTED_COMPRESSIONS, @@ -391,38 +391,24 @@ def get_fields(self, table_data: TableData, path_spec: PathSpec) -> List: if extension == "" and path_spec.default_extension: extension = f".{path_spec.default_extension}" - try: - if extension == ".parquet": - fields = parquet.ParquetInferrer().infer_schema(file) - elif extension == ".csv": - fields = csv_tsv.CsvInferrer( - max_rows=self.source_config.max_rows - ).infer_schema(file) - elif extension == ".tsv": - fields = csv_tsv.TsvInferrer( - max_rows=self.source_config.max_rows - ).infer_schema(file) - elif extension == ".jsonl": - fields = json.JsonInferrer( - max_rows=self.source_config.max_rows, format="jsonl" - ).infer_schema(file) - elif extension == ".json": - fields = json.JsonInferrer().infer_schema(file) - elif extension == ".avro": - fields = avro.AvroInferrer().infer_schema(file) - else: + fields = [] + inferrer = self._get_inferrer(extension, table_data.content_type) + if inferrer: + try: + fields = inferrer.infer_schema(file) + logger.debug(f"Extracted fields in schema: {fields}") + except Exception as e: self.report.report_warning( table_data.full_path, - f"file {table_data.full_path} has unsupported extension", + f"could not infer schema for file {table_data.full_path}: {e}", ) - file.close() - except Exception as e: + else: self.report.report_warning( table_data.full_path, - f"could not infer schema for file {table_data.full_path}: {e}", + f"file {table_data.full_path} has unsupported extension", ) - file.close() - logger.debug(f"Extracted fields in schema: {fields}") + file.close() + if self.source_config.sort_schema_fields: fields = sorted(fields, key=lambda f: f.fieldPath) @@ -433,6 +419,36 @@ def get_fields(self, table_data: TableData, path_spec: PathSpec) -> List: return fields + def _get_inferrer( + self, extension: str, content_type: Optional[str] + ) -> Optional[SchemaInferenceBase]: + if content_type == "application/vnd.apache.parquet": + return parquet.ParquetInferrer() + elif content_type == "text/csv": + return csv_tsv.CsvInferrer(max_rows=self.source_config.max_rows) + elif content_type == "text/tab-separated-values": + return csv_tsv.TsvInferrer(max_rows=self.source_config.max_rows) + elif content_type == "application/json": + return json.JsonInferrer() + elif content_type == "application/avro": + return avro.AvroInferrer() + elif extension == ".parquet": + return parquet.ParquetInferrer() + elif extension == ".csv": + return csv_tsv.CsvInferrer(max_rows=self.source_config.max_rows) + elif extension == ".tsv": + return csv_tsv.TsvInferrer(max_rows=self.source_config.max_rows) + elif extension == ".jsonl": + return json.JsonInferrer( + max_rows=self.source_config.max_rows, format="jsonl" + ) + elif extension == ".json": + return json.JsonInferrer() + elif extension == ".avro": + return avro.AvroInferrer() + else: + return None + def add_partition_columns_to_schema( self, path_spec: PathSpec, full_path: str, fields: List[SchemaField] ) -> None: @@ -734,26 +750,25 @@ def extract_table_name(self, path_spec: PathSpec, named_vars: dict) -> str: def extract_table_data( self, path_spec: PathSpec, - path: str, - timestamp: datetime, - size: int, - partitions: List[Folder], + browse_path: BrowsePath, ) -> TableData: + path = browse_path.file + partitions = browse_path.partitions logger.debug(f"Getting table data for path: {path}") table_name, table_path = path_spec.extract_table_name_and_path(path) - table_data = TableData( + return TableData( display_name=table_name, is_s3=self.is_s3_platform(), full_path=path, partitions=partitions, max_partition=partitions[-1] if partitions else None, min_partition=partitions[0] if partitions else None, - timestamp=timestamp, + timestamp=browse_path.timestamp, table_path=table_path, number_of_files=1, size_in_bytes=( - size - if size + browse_path.size + if browse_path.size else sum( [ partition.size if partition.size else 0 @@ -761,8 +776,8 @@ def extract_table_data( ] ) ), + content_type=browse_path.content_type, ) - return table_data def resolve_templated_folders(self, bucket_name: str, prefix: str) -> Iterable[str]: folder_split: List[str] = prefix.split("*", 1) @@ -1001,6 +1016,7 @@ def s3_browser(self, path_spec: PathSpec, sample_size: int) -> Iterable[BrowsePa timestamp=max_folder.modification_time, size=max_folder.size, partitions=partitions, + # TODO: Support content type inference for partitions ) except Exception as e: # This odd check if being done because boto does not have a proper exception to catch @@ -1021,11 +1037,17 @@ def s3_browser(self, path_spec: PathSpec, sample_size: int) -> Iterable[BrowsePa for obj in bucket.objects.filter(Prefix=prefix).page_size(PAGE_SIZE): s3_path = self.create_s3_path(obj.bucket_name, obj.key) logger.debug(f"Path: {s3_path}") + + content_type = None + if self.source_config.use_s3_content_type: + content_type = s3.Object(obj.bucket_name, obj.key).content_type + yield BrowsePath( file=s3_path, timestamp=obj.last_modified, size=obj.size, partitions=[], + content_type=content_type, ) def create_s3_path(self, bucket_name: str, key: str) -> str: @@ -1078,15 +1100,13 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: ) table_dict: Dict[str, TableData] = {} for browse_path in file_browser: - if not path_spec.allowed(browse_path.file): - continue - table_data = self.extract_table_data( - path_spec, + if not path_spec.allowed( browse_path.file, - browse_path.timestamp, - browse_path.size, - browse_path.partitions, - ) + ignore_ext=self.is_s3_platform() + and self.source_config.use_s3_content_type, + ): + continue + table_data = self.extract_table_data(path_spec, browse_path) if table_data.table_path not in table_dict: table_dict[table_data.table_path] = table_data else: diff --git a/metadata-ingestion/tests/integration/s3/golden-files/s3/golden_mces_file_inference_without_extension.json b/metadata-ingestion/tests/integration/s3/golden-files/s3/golden_mces_file_inference_without_extension.json new file mode 100644 index 0000000000000..d50f00efacaa0 --- /dev/null +++ b/metadata-ingestion/tests/integration/s3/golden-files/s3/golden_mces_file_inference_without_extension.json @@ -0,0 +1,769 @@ +[ +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:s3,test-platform-instance.my-test-bucket/folder_a/folder_aa/folder_aaa/no_extension/small,DEV)", + "changeType": "UPSERT", + "aspectName": "dataPlatformInstance", + "aspect": { + "json": { + "platform": "urn:li:dataPlatform:s3", + "instance": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:s3,test-platform-instance)" + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "file_without_extension.json" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:s3,test-platform-instance.my-test-bucket/folder_a/folder_aa/folder_aaa/no_extension/small,DEV)", + "changeType": "UPSERT", + "aspectName": "datasetProperties", + "aspect": { + "json": { + "customProperties": { + "schema_inferred_from": "s3://my-test-bucket/folder_a/folder_aa/folder_aaa/no_extension/small", + "number_of_files": "1", + "size_in_bytes": "172" + }, + "name": "small", + "description": "", + "tags": [] + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "file_without_extension.json" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:s3,test-platform-instance.my-test-bucket/folder_a/folder_aa/folder_aaa/no_extension/small,DEV)", + "changeType": "UPSERT", + "aspectName": "schemaMetadata", + "aspect": { + "json": { + "schemaName": "small", + "platform": "urn:li:dataPlatform:s3", + "version": 0, + "created": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "lastModified": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "hash": "", + "platformSchema": { + "com.linkedin.schema.OtherSchema": { + "rawSchema": "" + } + }, + "fields": [ + { + "fieldPath": "1st chord", + "nullable": false, + "type": { + "type": { + "com.linkedin.schema.NumberType": {} + } + }, + "nativeDataType": "integer", + "recursive": false, + "isPartOfKey": false + }, + { + "fieldPath": "2nd chord", + "nullable": false, + "type": { + "type": { + "com.linkedin.schema.NumberType": {} + } + }, + "nativeDataType": "integer", + "recursive": false, + "isPartOfKey": false + }, + { + "fieldPath": "3rd chord", + "nullable": false, + "type": { + "type": { + "com.linkedin.schema.NumberType": {} + } + }, + "nativeDataType": "integer", + "recursive": false, + "isPartOfKey": false + }, + { + "fieldPath": "4th chord", + "nullable": false, + "type": { + "type": { + "com.linkedin.schema.NumberType": {} + } + }, + "nativeDataType": "integer", + "recursive": false, + "isPartOfKey": false + }, + { + "fieldPath": "Progression Quality", + "nullable": false, + "type": { + "type": { + "com.linkedin.schema.StringType": {} + } + }, + "nativeDataType": "string", + "recursive": false, + "isPartOfKey": false + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "file_without_extension.json" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:s3,test-platform-instance.my-test-bucket/folder_a/folder_aa/folder_aaa/no_extension/small,DEV)", + "changeType": "UPSERT", + "aspectName": "operation", + "aspect": { + "json": { + "timestampMillis": 1615443388097, + "partitionSpec": { + "type": "FULL_TABLE", + "partition": "FULL_TABLE_SNAPSHOT" + }, + "operationType": "UPDATE", + "lastUpdatedTimestamp": 1586848010000 + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "file_without_extension.json" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:647eefb4dfda8695baf1aa0775d78689", + "changeType": "UPSERT", + "aspectName": "containerProperties", + "aspect": { + "json": { + "customProperties": { + "platform": "s3", + "instance": "test-platform-instance", + "env": "DEV", + "bucket_name": "my-test-bucket" + }, + "name": "my-test-bucket" + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "file_without_extension.json" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:647eefb4dfda8695baf1aa0775d78689", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "file_without_extension.json" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:647eefb4dfda8695baf1aa0775d78689", + "changeType": "UPSERT", + "aspectName": "dataPlatformInstance", + "aspect": { + "json": { + "platform": "urn:li:dataPlatform:s3", + "instance": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:s3,test-platform-instance)" + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "file_without_extension.json" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:647eefb4dfda8695baf1aa0775d78689", + "changeType": "UPSERT", + "aspectName": "subTypes", + "aspect": { + "json": { + "typeNames": [ + "S3 bucket" + ] + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "file_without_extension.json" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:647eefb4dfda8695baf1aa0775d78689", + "changeType": "UPSERT", + "aspectName": "browsePathsV2", + "aspect": { + "json": { + "path": [ + { + "id": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:s3,test-platform-instance)", + "urn": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:s3,test-platform-instance)" + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "file_without_extension.json" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:c8d940d2010edd365619411b385b11e4", + "changeType": "UPSERT", + "aspectName": "containerProperties", + "aspect": { + "json": { + "customProperties": { + "platform": "s3", + "instance": "test-platform-instance", + "env": "DEV", + "folder_abs_path": "my-test-bucket/folder_a" + }, + "name": "folder_a" + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "file_without_extension.json" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:c8d940d2010edd365619411b385b11e4", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "file_without_extension.json" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:c8d940d2010edd365619411b385b11e4", + "changeType": "UPSERT", + "aspectName": "dataPlatformInstance", + "aspect": { + "json": { + "platform": "urn:li:dataPlatform:s3", + "instance": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:s3,test-platform-instance)" + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "file_without_extension.json" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:c8d940d2010edd365619411b385b11e4", + "changeType": "UPSERT", + "aspectName": "subTypes", + "aspect": { + "json": { + "typeNames": [ + "Folder" + ] + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "file_without_extension.json" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:c8d940d2010edd365619411b385b11e4", + "changeType": "UPSERT", + "aspectName": "container", + "aspect": { + "json": { + "container": "urn:li:container:647eefb4dfda8695baf1aa0775d78689" + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "file_without_extension.json" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:c8d940d2010edd365619411b385b11e4", + "changeType": "UPSERT", + "aspectName": "browsePathsV2", + "aspect": { + "json": { + "path": [ + { + "id": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:s3,test-platform-instance)", + "urn": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:s3,test-platform-instance)" + }, + { + "id": "urn:li:container:647eefb4dfda8695baf1aa0775d78689", + "urn": "urn:li:container:647eefb4dfda8695baf1aa0775d78689" + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "file_without_extension.json" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:b0037296cdd497e3137aa0628b8687bc", + "changeType": "UPSERT", + "aspectName": "containerProperties", + "aspect": { + "json": { + "customProperties": { + "platform": "s3", + "instance": "test-platform-instance", + "env": "DEV", + "folder_abs_path": "my-test-bucket/folder_a/folder_aa" + }, + "name": "folder_aa" + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "file_without_extension.json" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:b0037296cdd497e3137aa0628b8687bc", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "file_without_extension.json" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:b0037296cdd497e3137aa0628b8687bc", + "changeType": "UPSERT", + "aspectName": "dataPlatformInstance", + "aspect": { + "json": { + "platform": "urn:li:dataPlatform:s3", + "instance": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:s3,test-platform-instance)" + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "file_without_extension.json" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:b0037296cdd497e3137aa0628b8687bc", + "changeType": "UPSERT", + "aspectName": "subTypes", + "aspect": { + "json": { + "typeNames": [ + "Folder" + ] + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "file_without_extension.json" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:b0037296cdd497e3137aa0628b8687bc", + "changeType": "UPSERT", + "aspectName": "container", + "aspect": { + "json": { + "container": "urn:li:container:c8d940d2010edd365619411b385b11e4" + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "file_without_extension.json" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:b0037296cdd497e3137aa0628b8687bc", + "changeType": "UPSERT", + "aspectName": "browsePathsV2", + "aspect": { + "json": { + "path": [ + { + "id": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:s3,test-platform-instance)", + "urn": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:s3,test-platform-instance)" + }, + { + "id": "urn:li:container:647eefb4dfda8695baf1aa0775d78689", + "urn": "urn:li:container:647eefb4dfda8695baf1aa0775d78689" + }, + { + "id": "urn:li:container:c8d940d2010edd365619411b385b11e4", + "urn": "urn:li:container:c8d940d2010edd365619411b385b11e4" + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "file_without_extension.json" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:de5780654849d6a18b66df2f9cb8e8d9", + "changeType": "UPSERT", + "aspectName": "containerProperties", + "aspect": { + "json": { + "customProperties": { + "platform": "s3", + "instance": "test-platform-instance", + "env": "DEV", + "folder_abs_path": "my-test-bucket/folder_a/folder_aa/folder_aaa" + }, + "name": "folder_aaa" + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "file_without_extension.json" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:de5780654849d6a18b66df2f9cb8e8d9", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "file_without_extension.json" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:de5780654849d6a18b66df2f9cb8e8d9", + "changeType": "UPSERT", + "aspectName": "dataPlatformInstance", + "aspect": { + "json": { + "platform": "urn:li:dataPlatform:s3", + "instance": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:s3,test-platform-instance)" + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "file_without_extension.json" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:de5780654849d6a18b66df2f9cb8e8d9", + "changeType": "UPSERT", + "aspectName": "subTypes", + "aspect": { + "json": { + "typeNames": [ + "Folder" + ] + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "file_without_extension.json" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:de5780654849d6a18b66df2f9cb8e8d9", + "changeType": "UPSERT", + "aspectName": "container", + "aspect": { + "json": { + "container": "urn:li:container:b0037296cdd497e3137aa0628b8687bc" + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "file_without_extension.json" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:de5780654849d6a18b66df2f9cb8e8d9", + "changeType": "UPSERT", + "aspectName": "browsePathsV2", + "aspect": { + "json": { + "path": [ + { + "id": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:s3,test-platform-instance)", + "urn": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:s3,test-platform-instance)" + }, + { + "id": "urn:li:container:647eefb4dfda8695baf1aa0775d78689", + "urn": "urn:li:container:647eefb4dfda8695baf1aa0775d78689" + }, + { + "id": "urn:li:container:c8d940d2010edd365619411b385b11e4", + "urn": "urn:li:container:c8d940d2010edd365619411b385b11e4" + }, + { + "id": "urn:li:container:b0037296cdd497e3137aa0628b8687bc", + "urn": "urn:li:container:b0037296cdd497e3137aa0628b8687bc" + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "file_without_extension.json" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:9b4624d58669059c9e62afb3d7341944", + "changeType": "UPSERT", + "aspectName": "containerProperties", + "aspect": { + "json": { + "customProperties": { + "platform": "s3", + "instance": "test-platform-instance", + "env": "DEV", + "folder_abs_path": "my-test-bucket/folder_a/folder_aa/folder_aaa/no_extension" + }, + "name": "no_extension" + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "file_without_extension.json" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:9b4624d58669059c9e62afb3d7341944", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "file_without_extension.json" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:9b4624d58669059c9e62afb3d7341944", + "changeType": "UPSERT", + "aspectName": "dataPlatformInstance", + "aspect": { + "json": { + "platform": "urn:li:dataPlatform:s3", + "instance": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:s3,test-platform-instance)" + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "file_without_extension.json" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:9b4624d58669059c9e62afb3d7341944", + "changeType": "UPSERT", + "aspectName": "subTypes", + "aspect": { + "json": { + "typeNames": [ + "Folder" + ] + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "file_without_extension.json" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:9b4624d58669059c9e62afb3d7341944", + "changeType": "UPSERT", + "aspectName": "container", + "aspect": { + "json": { + "container": "urn:li:container:de5780654849d6a18b66df2f9cb8e8d9" + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "file_without_extension.json" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:9b4624d58669059c9e62afb3d7341944", + "changeType": "UPSERT", + "aspectName": "browsePathsV2", + "aspect": { + "json": { + "path": [ + { + "id": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:s3,test-platform-instance)", + "urn": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:s3,test-platform-instance)" + }, + { + "id": "urn:li:container:647eefb4dfda8695baf1aa0775d78689", + "urn": "urn:li:container:647eefb4dfda8695baf1aa0775d78689" + }, + { + "id": "urn:li:container:c8d940d2010edd365619411b385b11e4", + "urn": "urn:li:container:c8d940d2010edd365619411b385b11e4" + }, + { + "id": "urn:li:container:b0037296cdd497e3137aa0628b8687bc", + "urn": "urn:li:container:b0037296cdd497e3137aa0628b8687bc" + }, + { + "id": "urn:li:container:de5780654849d6a18b66df2f9cb8e8d9", + "urn": "urn:li:container:de5780654849d6a18b66df2f9cb8e8d9" + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "file_without_extension.json" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:s3,test-platform-instance.my-test-bucket/folder_a/folder_aa/folder_aaa/no_extension/small,DEV)", + "changeType": "UPSERT", + "aspectName": "container", + "aspect": { + "json": { + "container": "urn:li:container:9b4624d58669059c9e62afb3d7341944" + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "file_without_extension.json" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:s3,test-platform-instance.my-test-bucket/folder_a/folder_aa/folder_aaa/no_extension/small,DEV)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "file_without_extension.json" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:s3,test-platform-instance.my-test-bucket/folder_a/folder_aa/folder_aaa/no_extension/small,DEV)", + "changeType": "UPSERT", + "aspectName": "browsePathsV2", + "aspect": { + "json": { + "path": [ + { + "id": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:s3,test-platform-instance)", + "urn": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:s3,test-platform-instance)" + }, + { + "id": "urn:li:container:647eefb4dfda8695baf1aa0775d78689", + "urn": "urn:li:container:647eefb4dfda8695baf1aa0775d78689" + }, + { + "id": "urn:li:container:c8d940d2010edd365619411b385b11e4", + "urn": "urn:li:container:c8d940d2010edd365619411b385b11e4" + }, + { + "id": "urn:li:container:b0037296cdd497e3137aa0628b8687bc", + "urn": "urn:li:container:b0037296cdd497e3137aa0628b8687bc" + }, + { + "id": "urn:li:container:de5780654849d6a18b66df2f9cb8e8d9", + "urn": "urn:li:container:de5780654849d6a18b66df2f9cb8e8d9" + }, + { + "id": "urn:li:container:9b4624d58669059c9e62afb3d7341944", + "urn": "urn:li:container:9b4624d58669059c9e62afb3d7341944" + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "file_without_extension.json" + } +} +] \ No newline at end of file diff --git a/metadata-ingestion/tests/integration/s3/sources/s3/file_inference_without_extension.json b/metadata-ingestion/tests/integration/s3/sources/s3/file_inference_without_extension.json new file mode 100644 index 0000000000000..87f7950946780 --- /dev/null +++ b/metadata-ingestion/tests/integration/s3/sources/s3/file_inference_without_extension.json @@ -0,0 +1,19 @@ +{ + "type": "s3", + "config": { + "platform_instance": "test-platform-instance", + "env": "DEV", + "use_s3_content_type": true, + "path_specs": [{ + "include": "s3://my-test-bucket/folder_a/folder_aa/folder_aaa/no_extension/*" + }], + "aws_config": { + "aws_region": "us-east-1", + "aws_access_key_id": "testing", + "aws_secret_access_key": "testing" + }, + "profiling": { + "enabled": false + } + } +} diff --git a/metadata-ingestion/tests/integration/s3/sources/shared/file_without_extension.json b/metadata-ingestion/tests/integration/s3/sources/shared/file_without_extension.json new file mode 100644 index 0000000000000..235b588e5719b --- /dev/null +++ b/metadata-ingestion/tests/integration/s3/sources/shared/file_without_extension.json @@ -0,0 +1,19 @@ +{ + "type": "s3", + "config": { + "platform_instance": "test-platform-instance", + "env": "DEV", + "path_specs": [{ + "include": "s3://my-test-bucket/folder_a/folder_aa/folder_aaa/no_extension/*", + "default_extension": "csv" + }], + "aws_config": { + "aws_region": "us-east-1", + "aws_access_key_id": "testing", + "aws_secret_access_key": "testing" + }, + "profiling": { + "enabled": false + } + } +} \ No newline at end of file diff --git a/metadata-ingestion/tests/integration/s3/sources/shared/folder_no_partition.json b/metadata-ingestion/tests/integration/s3/sources/shared/folder_no_partition.json new file mode 100644 index 0000000000000..c06e411005399 --- /dev/null +++ b/metadata-ingestion/tests/integration/s3/sources/shared/folder_no_partition.json @@ -0,0 +1,17 @@ +{ + "type": "s3", + "config": { + "env": "UAT", + "path_specs": [{ + "include": "s3://my-test-bucket/folder_a/folder_aa/folder_aaa/{table}/*.*" + }], + "aws_config": { + "aws_region": "us-east-1", + "aws_access_key_id": "testing", + "aws_secret_access_key": "testing" + }, + "profiling": { + "enabled": false + } + } +} \ No newline at end of file diff --git a/metadata-ingestion/tests/integration/s3/sources/shared/folder_no_partition_exclude.json b/metadata-ingestion/tests/integration/s3/sources/shared/folder_no_partition_exclude.json new file mode 100644 index 0000000000000..8d05bfcc60a0c --- /dev/null +++ b/metadata-ingestion/tests/integration/s3/sources/shared/folder_no_partition_exclude.json @@ -0,0 +1,20 @@ +{ + "type": "s3", + "config": { + "env": "UAT", + "path_specs": [{ + "include": "s3://my-test-bucket/folder_a/folder_aa/folder_aaa/{table}/*.*", + "exclude": [ + "**/food_csv/**" + ] + }], + "aws_config": { + "aws_region": "us-east-1", + "aws_access_key_id": "testing", + "aws_secret_access_key": "testing" + }, + "profiling": { + "enabled": false + } + } +} \ No newline at end of file diff --git a/metadata-ingestion/tests/integration/s3/sources/shared/folder_no_partition_filename.json b/metadata-ingestion/tests/integration/s3/sources/shared/folder_no_partition_filename.json new file mode 100644 index 0000000000000..421483c74a868 --- /dev/null +++ b/metadata-ingestion/tests/integration/s3/sources/shared/folder_no_partition_filename.json @@ -0,0 +1,19 @@ +{ + "type": "s3", + "config": { + "env": "UAT", + "path_specs": [{ + "include": "s3://my-test-bucket/folder_a/folder_aa/{dept}/{table}/*.*", + "file_types": ["csv"], + "table_name": "{dept}.{table}" + }], + "aws_config": { + "aws_region": "us-east-1", + "aws_access_key_id": "testing", + "aws_secret_access_key": "testing" + }, + "profiling": { + "enabled": false + } + } +} diff --git a/metadata-ingestion/tests/integration/s3/sources/shared/folder_no_partition_glob.json b/metadata-ingestion/tests/integration/s3/sources/shared/folder_no_partition_glob.json new file mode 100644 index 0000000000000..bf3f832f9c7b4 --- /dev/null +++ b/metadata-ingestion/tests/integration/s3/sources/shared/folder_no_partition_glob.json @@ -0,0 +1,20 @@ +{ + "type": "s3", + "config": { + "env": "UAT", + "path_specs": [{ + "include": "s3://my-test-bucket/*/folder_aa/*/{table}/*.*", + "exclude": [ + "**/food_csv/**" + ] + }], + "aws_config": { + "aws_region": "us-east-1", + "aws_access_key_id": "testing", + "aws_secret_access_key": "testing" + }, + "profiling": { + "enabled": false + } + } +} \ No newline at end of file diff --git a/metadata-ingestion/tests/integration/s3/sources/shared/folder_partition_basic.json b/metadata-ingestion/tests/integration/s3/sources/shared/folder_partition_basic.json new file mode 100644 index 0000000000000..c6afe024f6a2c --- /dev/null +++ b/metadata-ingestion/tests/integration/s3/sources/shared/folder_partition_basic.json @@ -0,0 +1,21 @@ +{ + "type": "s3", + "config": { + "env": "UAT", + "path_specs": [{ + "include": "s3://my-test-bucket/folder_a/folder_aa/{dept}/{table}/{partition[0]}/{partition[1]}/*.*", + "table_name": "{dept}.{table}", + "exclude":[ + "**/folder_aaaa/**" + ] + }], + "aws_config": { + "aws_region": "us-east-1", + "aws_access_key_id": "testing", + "aws_secret_access_key": "testing" + }, + "profiling": { + "enabled": false + } + } +} \ No newline at end of file diff --git a/metadata-ingestion/tests/integration/s3/sources/shared/folder_partition_keyval.json b/metadata-ingestion/tests/integration/s3/sources/shared/folder_partition_keyval.json new file mode 100644 index 0000000000000..9bb6d129858ab --- /dev/null +++ b/metadata-ingestion/tests/integration/s3/sources/shared/folder_partition_keyval.json @@ -0,0 +1,18 @@ +{ + "type": "s3", + "config": { + "env": "UAT", + "path_specs": [{ + "include": "s3://my-test-bucket/folder_a/folder_aa/{dept}/{table}/{partition_key[0]}={partition[0]}/{partition_key[1]}={partition[1]}/*.*", + "table_name": "{dept}.{table}" + }], + "aws_config": { + "aws_region": "us-east-1", + "aws_access_key_id": "testing", + "aws_secret_access_key": "testing" + }, + "profiling": { + "enabled": false + } + } +} \ No newline at end of file diff --git a/metadata-ingestion/tests/integration/s3/sources/shared/folder_partition_update_schema.json b/metadata-ingestion/tests/integration/s3/sources/shared/folder_partition_update_schema.json new file mode 100644 index 0000000000000..b542f60a6a8aa --- /dev/null +++ b/metadata-ingestion/tests/integration/s3/sources/shared/folder_partition_update_schema.json @@ -0,0 +1,22 @@ +{ + "type": "s3", + "config": { + "env": "UAT", + "path_specs": [{ + "include": "s3://my-test-bucket/folder_a/folder_aa/{dept}/{table}/{partition[0]}/{partition[1]}/*.*", + "sample_files": true, + "table_name": "{dept}.{table}", + "exclude":[ + "**/folder_aaaa/**" + ] + }], + "aws_config": { + "aws_region": "us-east-1", + "aws_access_key_id": "testing", + "aws_secret_access_key": "testing" + }, + "profiling": { + "enabled": false + } + } +} \ No newline at end of file diff --git a/metadata-ingestion/tests/integration/s3/sources/shared/multiple_files.json b/metadata-ingestion/tests/integration/s3/sources/shared/multiple_files.json new file mode 100644 index 0000000000000..77be022895cfc --- /dev/null +++ b/metadata-ingestion/tests/integration/s3/sources/shared/multiple_files.json @@ -0,0 +1,18 @@ +{ + "type": "s3", + "config": { + "platform_instance": "test-platform-instance", + "env": "DEV", + "path_specs": [{ + "include": "s3://my-test-bucket/folder_a/folder_aa/folder_aaa/*.*" + }], + "aws_config": { + "aws_region": "us-east-1", + "aws_access_key_id": "testing", + "aws_secret_access_key": "testing" + }, + "profiling": { + "enabled": false + } + } +} \ No newline at end of file diff --git a/metadata-ingestion/tests/integration/s3/sources/shared/multiple_spec_for_files.json b/metadata-ingestion/tests/integration/s3/sources/shared/multiple_spec_for_files.json new file mode 100644 index 0000000000000..d051421e154e7 --- /dev/null +++ b/metadata-ingestion/tests/integration/s3/sources/shared/multiple_spec_for_files.json @@ -0,0 +1,23 @@ +{ + "type": "s3", + "config": { + "path_specs": [ + { + "include": "s3://my-test-bucket/folder_a/folder_aa/folder_aaa/chord_progressions_avro.avro" + }, + { + "include": "s3://my-test-bucket/folder_a/folder_aa/folder_aaa/chord_progressions_csv.csv" + } + ], + "aws_config": { + "aws_region": "us-east-1", + "aws_access_key_id": "testing", + "aws_secret_access_key": "testing" + }, + "profiling": { + "enabled": false + }, + "use_s3_bucket_tags": true, + "use_s3_object_tags": true + } +} diff --git a/metadata-ingestion/tests/integration/s3/sources/shared/multiple_specs_of_different_buckets.json b/metadata-ingestion/tests/integration/s3/sources/shared/multiple_specs_of_different_buckets.json new file mode 100644 index 0000000000000..d6aec502e1195 --- /dev/null +++ b/metadata-ingestion/tests/integration/s3/sources/shared/multiple_specs_of_different_buckets.json @@ -0,0 +1,23 @@ +{ + "type": "s3", + "config": { + "path_specs": [ + { + "include": "s3://my-test-bucket/folder_a/folder_aa/folder_aaa/chord_progressions_avro.avro" + }, + { + "include": "s3://my-test-bucket-2/folder_a/folder_aa/folder_aaa/chord_progressions_csv.csv" + } + ], + "aws_config": { + "aws_region": "us-east-1", + "aws_access_key_id": "testing", + "aws_secret_access_key": "testing" + }, + "profiling": { + "enabled": false + }, + "use_s3_bucket_tags": true, + "use_s3_object_tags": true + } +} diff --git a/metadata-ingestion/tests/integration/s3/sources/shared/single_file.json b/metadata-ingestion/tests/integration/s3/sources/shared/single_file.json new file mode 100644 index 0000000000000..96e5eafa562dd --- /dev/null +++ b/metadata-ingestion/tests/integration/s3/sources/shared/single_file.json @@ -0,0 +1,20 @@ +{ + "type": "s3", + "config": { + "path_specs": [ + { + "include": "s3://my-test-bucket/folder_a/folder_aa/folder_aaa/chord_progressions_avro.avro" + } + ], + "aws_config": { + "aws_region": "us-east-1", + "aws_access_key_id": "testing", + "aws_secret_access_key": "testing" + }, + "profiling": { + "enabled": false + }, + "use_s3_bucket_tags": true, + "use_s3_object_tags": true + } +} diff --git a/metadata-ingestion/tests/integration/s3/test_s3.py b/metadata-ingestion/tests/integration/s3/test_s3.py index 4137c6c5c399e..b45f1f78fc55a 100644 --- a/metadata-ingestion/tests/integration/s3/test_s3.py +++ b/metadata-ingestion/tests/integration/s3/test_s3.py @@ -109,7 +109,13 @@ def s3_populate(pytestconfig, s3_resource, s3_client, bucket_names): full_path = os.path.join(root, file) rel_path = os.path.relpath(full_path, test_resources_dir) file_list.append(rel_path) - bkt.upload_file(full_path, rel_path) + bkt.upload_file( + full_path, + rel_path, # Set content type for `no_extension/small` file to text/csv + ExtraArgs={"ContentType": "text/csv"} + if "." not in rel_path + else {}, + ) s3_client.put_object_tagging( Bucket=bucket_name, Key=rel_path, @@ -143,18 +149,24 @@ def touch_local_files(pytestconfig): os.utime(full_path, times=(current_time_sec, current_time_sec)) -SOURCE_FILES_PATH = "./tests/integration/s3/sources/s3" -source_files = os.listdir(SOURCE_FILES_PATH) +SHARED_SOURCE_FILES_PATH = "./tests/integration/s3/sources/shared" +shared_source_files = [ + (SHARED_SOURCE_FILES_PATH, p) for p in os.listdir(SHARED_SOURCE_FILES_PATH) +] + +S3_SOURCE_FILES_PATH = "./tests/integration/s3/sources/s3" +s3_source_files = [(S3_SOURCE_FILES_PATH, p) for p in os.listdir(S3_SOURCE_FILES_PATH)] @pytest.mark.integration -@pytest.mark.parametrize("source_file", source_files) +@pytest.mark.parametrize("source_file_tuple", shared_source_files + s3_source_files) def test_data_lake_s3_ingest( - pytestconfig, s3_populate, source_file, tmp_path, mock_time + pytestconfig, s3_populate, source_file_tuple, tmp_path, mock_time ): + source_dir, source_file = source_file_tuple test_resources_dir = pytestconfig.rootpath / "tests/integration/s3/" - f = open(os.path.join(SOURCE_FILES_PATH, source_file)) + f = open(os.path.join(source_dir, source_file)) source = json.load(f) config_dict = {} @@ -184,12 +196,13 @@ def test_data_lake_s3_ingest( @pytest.mark.integration -@pytest.mark.parametrize("source_file", source_files) +@pytest.mark.parametrize("source_file_tuple", shared_source_files) def test_data_lake_local_ingest( - pytestconfig, touch_local_files, source_file, tmp_path, mock_time + pytestconfig, touch_local_files, source_file_tuple, tmp_path, mock_time ): + source_dir, source_file = source_file_tuple test_resources_dir = pytestconfig.rootpath / "tests/integration/s3/" - f = open(os.path.join(SOURCE_FILES_PATH, source_file)) + f = open(os.path.join(source_dir, source_file)) source = json.load(f) config_dict = {}