Skip to content

Commit

Permalink
fix(ingest): tighten Source.create type annotations
Browse files Browse the repository at this point in the history
Supersedes #8620
  • Loading branch information
hsheth2 committed Jan 11, 2025
1 parent 9897804 commit bca63db
Show file tree
Hide file tree
Showing 13 changed files with 9 additions and 60 deletions.
4 changes: 2 additions & 2 deletions metadata-ingestion/src/datahub/ingestion/api/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
)

from pydantic import BaseModel
from typing_extensions import LiteralString
from typing_extensions import LiteralString, Self

from datahub.configuration.common import ConfigModel
from datahub.configuration.source_common import PlatformInstanceConfigMixin
Expand Down Expand Up @@ -400,7 +400,7 @@ class Source(Closeable, metaclass=ABCMeta):
ctx: PipelineContext

@classmethod
def create(cls, config_dict: dict, ctx: PipelineContext) -> "Source":
def create(cls, config_dict: dict, ctx: PipelineContext) -> Self:
# Technically, this method should be abstract. However, the @config_class
# decorator automatically generates a create method at runtime if one is
# not defined. Python still treats the class as abstract because it thinks
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,11 +122,6 @@ def __init__(self, config: DeltaLakeSourceConfig, ctx: PipelineContext):
config_report,
)

@classmethod
def create(cls, config_dict: dict, ctx: PipelineContext) -> "Source":
config = DeltaLakeSourceConfig.parse_obj(config_dict)
return cls(config, ctx)

def _parse_datatype(self, raw_field_json_str: str) -> List[SchemaFieldClass]:
raw_field_json = json.loads(raw_field_json_str)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class DemoDataSource(Source):

def __init__(self, ctx: PipelineContext, config: DemoDataConfig):
file_config = FileSourceConfig(path=str(download_sample_data()))
self.file_source = GenericFileSource(ctx, file_config)
self.file_source: GenericFileSource = GenericFileSource(ctx, file_config)

Check warning on line 32 in metadata-ingestion/src/datahub/ingestion/source/demo_data.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/demo_data.py#L32

Added line #L32 was not covered by tests

def get_workunits(self) -> Iterable[MetadataWorkUnit]:
yield from self.file_source.get_workunits()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
platform_name,
support_status,
)
from datahub.ingestion.api.source import MetadataWorkUnitProcessor, Source, SourceReport
from datahub.ingestion.api.source import MetadataWorkUnitProcessor, SourceReport
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.fivetran.config import (
KNOWN_DATA_PLATFORM_MAPPING,
Expand Down Expand Up @@ -291,11 +291,6 @@ def _get_connector_workunits(
dpi = self._generate_dpi_from_job(job, datajob)
yield from self._get_dpi_workunits(job, dpi)

@classmethod
def create(cls, config_dict: dict, ctx: PipelineContext) -> Source:
config = FivetranSourceConfig.parse_obj(config_dict)
return cls(config, ctx)

def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]:
return [
*super().get_workunit_processors(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
platform_name,
support_status,
)
from datahub.ingestion.api.source import MetadataWorkUnitProcessor, Source
from datahub.ingestion.api.source import MetadataWorkUnitProcessor
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.kafka_connect.common import (
CONNECTOR_CLASS,
Expand Down Expand Up @@ -94,11 +94,6 @@ def __init__(self, config: KafkaConnectSourceConfig, ctx: PipelineContext):
if not jpype.isJVMStarted():
jpype.startJVM()

@classmethod
def create(cls, config_dict: dict, ctx: PipelineContext) -> Source:
config = KafkaConnectSourceConfig.parse_obj(config_dict)
return cls(config, ctx)

def get_connectors_manifest(self) -> Iterable[ConnectorManifest]:
"""Get Kafka Connect connectors manifest using REST API.
Enrich with lineages metadata.
Expand Down
7 changes: 1 addition & 6 deletions metadata-ingestion/src/datahub/ingestion/source/metabase.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
platform_name,
support_status,
)
from datahub.ingestion.api.source import MetadataWorkUnitProcessor, Source, SourceReport
from datahub.ingestion.api.source import MetadataWorkUnitProcessor, SourceReport
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.state.stale_entity_removal_handler import (
StaleEntityRemovalHandler,
Expand Down Expand Up @@ -789,11 +789,6 @@ def get_datasource_from_id(

return platform, dbname, schema, platform_instance

@classmethod
def create(cls, config_dict: dict, ctx: PipelineContext) -> Source:
config = MetabaseConfig.parse_obj(config_dict)
return cls(ctx, config)

def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]:
return [
*super().get_workunit_processors(),
Expand Down
5 changes: 0 additions & 5 deletions metadata-ingestion/src/datahub/ingestion/source/mlflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -333,8 +333,3 @@ def _get_global_tags_workunit(
aspect=global_tags,
)
return wu

@classmethod
def create(cls, config_dict: dict, ctx: PipelineContext) -> Source:
config = MLflowConfig.parse_obj(config_dict)
return cls(ctx, config)
5 changes: 0 additions & 5 deletions metadata-ingestion/src/datahub/ingestion/source/nifi.py
Original file line number Diff line number Diff line change
Expand Up @@ -484,11 +484,6 @@ def __init__(self, config: NifiSourceConfig, ctx: PipelineContext) -> None:
def rest_api_base_url(self):
return self.config.site_url[: -len("nifi/")] + "nifi-api/"

@classmethod
def create(cls, config_dict: dict, ctx: PipelineContext) -> "Source":
config = NifiSourceConfig.parse_obj(config_dict)
return cls(config, ctx)

def get_report(self) -> SourceReport:
return self.report

Expand Down
5 changes: 0 additions & 5 deletions metadata-ingestion/src/datahub/ingestion/source/redash.py
Original file line number Diff line number Diff line change
Expand Up @@ -369,11 +369,6 @@ def validate_connection(self) -> None:
else:
raise ValueError(f"Failed to connect to {self.config.connect_uri}/api")

@classmethod
def create(cls, config_dict: dict, ctx: PipelineContext) -> Source:
config = RedashConfig.parse_obj(config_dict)
return cls(ctx, config)

def _get_chart_data_source(self, data_source_id: Optional[int] = None) -> Dict:
url = f"/api/data_sources/{data_source_id}"
resp = self.client._get(url).json()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
from datahub.ingestion.api.source import (
CapabilityReport,
MetadataWorkUnitProcessor,
Source,
SourceCapability,
SourceReport,
TestableSource,
Expand Down Expand Up @@ -251,11 +250,6 @@ def __init__(self, ctx: PipelineContext, config: SnowflakeV2Config):

self.add_config_to_report()

@classmethod
def create(cls, config_dict: dict, ctx: PipelineContext) -> "Source":
config = SnowflakeV2Config.parse_obj(config_dict)
return cls(ctx, config)

@staticmethod
def test_connection(config_dict: dict) -> TestConnectionReport:
test_report = TestConnectionReport()
Expand Down
7 changes: 1 addition & 6 deletions metadata-ingestion/src/datahub/ingestion/source/superset.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
platform_name,
support_status,
)
from datahub.ingestion.api.source import MetadataWorkUnitProcessor, Source
from datahub.ingestion.api.source import MetadataWorkUnitProcessor
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.sql.sql_types import resolve_sql_type
from datahub.ingestion.source.sql.sqlalchemy_uri_mapper import (
Expand Down Expand Up @@ -265,11 +265,6 @@ def login(self) -> requests.Session:
# TODO(Gabe): how should we message about this error?
return requests_session

@classmethod
def create(cls, config_dict: dict, ctx: PipelineContext) -> Source:
config = SupersetConfig.parse_obj(config_dict)
return cls(ctx, config)

def paginate_entity_api_results(self, entity_type, page_size=100):
current_page = 0
total_items = page_size
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@
from datahub.ingestion.api.source import (
CapabilityReport,
MetadataWorkUnitProcessor,
Source,
StructuredLogLevel,
TestableSource,
TestConnectionReport,
Expand Down Expand Up @@ -804,11 +803,6 @@ def test_connection(config_dict: dict) -> TestConnectionReport:
def get_report(self) -> TableauSourceReport:
return self.report

@classmethod
def create(cls, config_dict: dict, ctx: PipelineContext) -> Source:
config = TableauConfig.parse_obj(config_dict)
return cls(config, ctx)

def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]:
return [
*super().get_workunit_processors(),
Expand Down
3 changes: 2 additions & 1 deletion metadata-ingestion/tests/unit/api/test_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import pytest
from freezegun import freeze_time
from typing_extensions import Self

from datahub.configuration.common import DynamicTypedConfig
from datahub.ingestion.api.committable import CommitPolicy, Committable
Expand Down Expand Up @@ -440,7 +441,7 @@ def __init__(self, ctx: PipelineContext):
]

@classmethod
def create(cls, config_dict: dict, ctx: PipelineContext) -> "Source":
def create(cls, config_dict: dict, ctx: PipelineContext) -> Self:
assert not config_dict
return cls(ctx)

Expand Down

0 comments on commit bca63db

Please sign in to comment.