Skip to content

Commit

Permalink
fix(ingest): limt typing_extensions classes to those available in min…
Browse files Browse the repository at this point in the history
… version

Depending on python versions, the minimum typing_extensions is
>=3.7.4.3 or >=3.10.0.2. (And needs to stay that way for Airflow.)
However, typing_extensions.Self is only available starting in 4.0.0
https://github.com/python/typing_extensions/blob/main/CHANGELOG.md#added-in-version-400

This partially reverts datahub-project#7313 to avoid the use of
typing_extensions.Self.

Fixes datahub-project#7370
  • Loading branch information
cburroughs committed Mar 3, 2023
1 parent 7a71b84 commit 521eff7
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 12 deletions.
8 changes: 5 additions & 3 deletions metadata-ingestion/src/datahub/configuration/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,19 @@
import unittest.mock
from abc import ABC, abstractmethod
from enum import auto
from typing import IO, Any, ClassVar, Dict, List, Optional, Type
from typing import IO, Any, ClassVar, Dict, List, Optional, Type, TypeVar

import pydantic
from cached_property import cached_property
from pydantic import BaseModel, Extra, ValidationError
from pydantic.fields import Field
from typing_extensions import Protocol, Self, runtime_checkable
from typing_extensions import Protocol, runtime_checkable

from datahub.configuration._config_enum import ConfigEnum
from datahub.utilities.dedup_list import deduplicate_list

_ConfigSelf = TypeVar("_ConfigSelf", bound="ConfigModel")

REDACT_KEYS = {
"password",
"token",
Expand Down Expand Up @@ -86,7 +88,7 @@ def schema_extra(schema: Dict[str, Any], model: Type["ConfigModel"]) -> None:
del schema["properties"][key]

@classmethod
def parse_obj_allow_extras(cls, obj: Any) -> Self:
def parse_obj_allow_extras(cls: Type[_ConfigSelf], obj: Any) -> _ConfigSelf:
with unittest.mock.patch.object(cls.Config, "extra", pydantic.Extra.allow):
return cls.parse_obj(obj)

Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
from abc import abstractmethod
from dataclasses import dataclass
from typing import Any, Dict, NewType

from typing_extensions import Self
from typing import Any, Dict, NewType, Type, TypeVar

import datahub.emitter.mce_builder as builder
from datahub.configuration.common import ConfigModel
Expand All @@ -19,6 +17,9 @@ class IngestionCheckpointingProviderConfig(ConfigModel):
pass


_Self = TypeVar("_Self", bound="IngestionCheckpointingProviderBase")


@dataclass()
class IngestionCheckpointingProviderBase(StatefulCommittable[CheckpointJobStatesMap]):
"""
Expand All @@ -34,8 +35,8 @@ def __init__(
@classmethod
@abstractmethod
def create(
cls, config_dict: Dict[str, Any], ctx: PipelineContext, name: str
) -> "Self":
cls: Type[_Self], config_dict: Dict[str, Any], ctx: PipelineContext, name: str
) -> "_Self":
pass

@abstractmethod
Expand Down
7 changes: 3 additions & 4 deletions metadata-ingestion/src/datahub/ingestion/api/sink.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@
from dataclasses import dataclass, field
from typing import Any, Generic, Optional, Type, TypeVar, cast

from typing_extensions import Self

from datahub.configuration.common import ConfigModel
from datahub.ingestion.api.closeable import Closeable
from datahub.ingestion.api.common import PipelineContext, RecordEnvelope, WorkUnit
Expand Down Expand Up @@ -81,6 +79,7 @@ def on_failure(

SinkReportType = TypeVar("SinkReportType", bound=SinkReport, covariant=True)
SinkConfig = TypeVar("SinkConfig", bound=ConfigModel, covariant=True)
Self = TypeVar("Self", bound="Sink")


class Sink(Generic[SinkConfig, SinkReportType], Closeable, metaclass=ABCMeta):
Expand All @@ -91,7 +90,7 @@ class Sink(Generic[SinkConfig, SinkReportType], Closeable, metaclass=ABCMeta):
report: SinkReportType

@classmethod
def get_config_class(cls) -> Type[SinkConfig]:
def get_config_class(cls: Type[Self]) -> Type[SinkConfig]:
config_class = get_class_from_annotation(cls, Sink, ConfigModel)
assert config_class, "Sink subclasses must define a config class"
return cast(Type[SinkConfig], config_class)
Expand All @@ -113,7 +112,7 @@ def __post_init__(self) -> None:
pass

@classmethod
def create(cls, config_dict: dict, ctx: PipelineContext) -> "Self":
def create(cls: Type[Self], config_dict: dict, ctx: PipelineContext) -> "Self":
return cls(ctx, cls.get_config_class().parse_obj(config_dict))

def handle_work_unit_start(self, workunit: WorkUnit) -> None:
Expand Down

0 comments on commit 521eff7

Please sign in to comment.