diff --git a/metadata-ingestion/src/datahub/configuration/common.py b/metadata-ingestion/src/datahub/configuration/common.py index 7e0bf1107b749..a3897669a9d42 100644 --- a/metadata-ingestion/src/datahub/configuration/common.py +++ b/metadata-ingestion/src/datahub/configuration/common.py @@ -2,17 +2,19 @@ import unittest.mock from abc import ABC, abstractmethod from enum import auto -from typing import IO, Any, ClassVar, Dict, List, Optional, Type +from typing import IO, Any, ClassVar, Dict, List, Optional, Type, TypeVar import pydantic from cached_property import cached_property from pydantic import BaseModel, Extra, ValidationError from pydantic.fields import Field -from typing_extensions import Protocol, Self, runtime_checkable +from typing_extensions import Protocol, runtime_checkable from datahub.configuration._config_enum import ConfigEnum from datahub.utilities.dedup_list import deduplicate_list +_ConfigSelf = TypeVar("_ConfigSelf", bound="ConfigModel") + REDACT_KEYS = { "password", "token", @@ -86,7 +88,7 @@ def schema_extra(schema: Dict[str, Any], model: Type["ConfigModel"]) -> None: del schema["properties"][key] @classmethod - def parse_obj_allow_extras(cls, obj: Any) -> Self: + def parse_obj_allow_extras(cls: Type[_ConfigSelf], obj: Any) -> _ConfigSelf: with unittest.mock.patch.object(cls.Config, "extra", pydantic.Extra.allow): return cls.parse_obj(obj) diff --git a/metadata-ingestion/src/datahub/ingestion/api/ingestion_job_checkpointing_provider_base.py b/metadata-ingestion/src/datahub/ingestion/api/ingestion_job_checkpointing_provider_base.py index 580222bc26f96..65952af1765fa 100644 --- a/metadata-ingestion/src/datahub/ingestion/api/ingestion_job_checkpointing_provider_base.py +++ b/metadata-ingestion/src/datahub/ingestion/api/ingestion_job_checkpointing_provider_base.py @@ -1,8 +1,6 @@ from abc import abstractmethod from dataclasses import dataclass -from typing import Any, Dict, NewType - -from typing_extensions import Self +from typing import Any, Dict, NewType, Type, TypeVar import datahub.emitter.mce_builder as builder from datahub.configuration.common import ConfigModel @@ -19,6 +17,9 @@ class IngestionCheckpointingProviderConfig(ConfigModel): pass +_Self = TypeVar("_Self", bound="IngestionCheckpointingProviderBase") + + @dataclass() class IngestionCheckpointingProviderBase(StatefulCommittable[CheckpointJobStatesMap]): """ @@ -34,8 +35,8 @@ def __init__( @classmethod @abstractmethod def create( - cls, config_dict: Dict[str, Any], ctx: PipelineContext, name: str - ) -> "Self": + cls: Type[_Self], config_dict: Dict[str, Any], ctx: PipelineContext, name: str + ) -> "_Self": pass @abstractmethod diff --git a/metadata-ingestion/src/datahub/ingestion/api/sink.py b/metadata-ingestion/src/datahub/ingestion/api/sink.py index 655e6bb22fa8d..62feb7b5a02e6 100644 --- a/metadata-ingestion/src/datahub/ingestion/api/sink.py +++ b/metadata-ingestion/src/datahub/ingestion/api/sink.py @@ -3,8 +3,6 @@ from dataclasses import dataclass, field from typing import Any, Generic, Optional, Type, TypeVar, cast -from typing_extensions import Self - from datahub.configuration.common import ConfigModel from datahub.ingestion.api.closeable import Closeable from datahub.ingestion.api.common import PipelineContext, RecordEnvelope, WorkUnit @@ -81,6 +79,7 @@ def on_failure( SinkReportType = TypeVar("SinkReportType", bound=SinkReport, covariant=True) SinkConfig = TypeVar("SinkConfig", bound=ConfigModel, covariant=True) +Self = TypeVar("Self", bound="Sink") class Sink(Generic[SinkConfig, SinkReportType], Closeable, metaclass=ABCMeta): @@ -91,7 +90,7 @@ class Sink(Generic[SinkConfig, SinkReportType], Closeable, metaclass=ABCMeta): report: SinkReportType @classmethod - def get_config_class(cls) -> Type[SinkConfig]: + def get_config_class(cls: Type[Self]) -> Type[SinkConfig]: config_class = get_class_from_annotation(cls, Sink, ConfigModel) assert config_class, "Sink subclasses must define a config class" return cast(Type[SinkConfig], config_class) @@ -113,7 +112,7 @@ def __post_init__(self) -> None: pass @classmethod - def create(cls, config_dict: dict, ctx: PipelineContext) -> "Self": + def create(cls: Type[Self], config_dict: dict, ctx: PipelineContext) -> "Self": return cls(ctx, cls.get_config_class().parse_obj(config_dict)) def handle_work_unit_start(self, workunit: WorkUnit) -> None: