From 104e78776dfb9cd3b9a10063aadb54dcf14d65d1 Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Mon, 18 Mar 2024 13:59:01 -0700 Subject: [PATCH 1/3] fix(ingest): only auto-enable stateful ingestion if pipeline name is set (#10075) --- docs/how/updating-datahub.md | 2 +- .../ingestion/source/state/stateful_ingestion_base.py | 10 +++++++--- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/docs/how/updating-datahub.md b/docs/how/updating-datahub.md index 9d46fe606fa564..cc8de2b541ce2d 100644 --- a/docs/how/updating-datahub.md +++ b/docs/how/updating-datahub.md @@ -20,7 +20,7 @@ This file documents any backwards-incompatible changes in DataHub and assists pe ### Breaking Changes -- #9934 - Stateful ingestion is now enabled by default if datahub-rest sink is used or if a `datahub_api` is specified. It will still be disabled by default when any other sink type is used. +- #9934 and #10075 - Stateful ingestion is now enabled by default if a `pipeline_name` is set and either a datahub-rest sink or `datahub_api` is specified. It will still be disabled by default when any other sink type is used or if there is no pipeline name set. - #10002 - The `DataHubGraph` client no longer makes a request to the backend during initialization. If you want to preserve the old behavior, call `graph.test_connection()` after constructing the client. ### Potential Downtime diff --git a/metadata-ingestion/src/datahub/ingestion/source/state/stateful_ingestion_base.py b/metadata-ingestion/src/datahub/ingestion/source/state/stateful_ingestion_base.py index 521f8f5ee07d82..4e9e1425a9ae06 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/state/stateful_ingestion_base.py +++ b/metadata-ingestion/src/datahub/ingestion/source/state/stateful_ingestion_base.py @@ -53,7 +53,7 @@ class StatefulIngestionConfig(ConfigModel): enabled: bool = Field( default=False, description="Whether or not to enable stateful ingest. " - "Default: True if datahub-rest sink is used or if a `datahub_api` is specified, otherwise False", + "Default: True if a pipeline_name is set and either a datahub-rest sink or `datahub_api` is specified, otherwise False", ) max_checkpoint_state_size: pydantic.PositiveInt = Field( default=2**24, # 16 MB @@ -233,9 +233,13 @@ def _initialize_checkpointing_state_provider(self) -> None: IngestionCheckpointingProviderBase ] = None - if self.stateful_ingestion_config is None and self.ctx.graph: + if ( + self.stateful_ingestion_config is None + and self.ctx.graph + and self.ctx.pipeline_name + ): logger.info( - "Stateful ingestion got enabled by default, as datahub-rest sink is used or `datahub_api` is specified" + "Stateful ingestion will be automatically enabled, as datahub-rest sink is used or `datahub_api` is specified" ) self.stateful_ingestion_config = StatefulIngestionConfig( enabled=True, From 05930560cc82ad1ec33a800a02cb650c735ea548 Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Mon, 18 Mar 2024 14:27:01 -0700 Subject: [PATCH 2/3] feat(ingest/s3): set default spark version (#10057) --- metadata-ingestion/scripts/docgen.sh | 2 +- metadata-ingestion/src/datahub/ingestion/source/s3/source.py | 4 +++- metadata-ingestion/tests/integration/s3/test_s3.py | 1 - 3 files changed, 4 insertions(+), 3 deletions(-) diff --git a/metadata-ingestion/scripts/docgen.sh b/metadata-ingestion/scripts/docgen.sh index 09fa2be912f614..affb87f2e70a95 100755 --- a/metadata-ingestion/scripts/docgen.sh +++ b/metadata-ingestion/scripts/docgen.sh @@ -7,4 +7,4 @@ DOCS_OUT_DIR=$DATAHUB_ROOT/docs/generated/ingestion EXTRA_DOCS_DIR=$DATAHUB_ROOT/metadata-ingestion/docs/sources rm -r $DOCS_OUT_DIR || true -SPARK_VERSION=3.3 python scripts/docgen.py --out-dir ${DOCS_OUT_DIR} --extra-docs ${EXTRA_DOCS_DIR} $@ +python scripts/docgen.py --out-dir ${DOCS_OUT_DIR} --extra-docs ${EXTRA_DOCS_DIR} $@ diff --git a/metadata-ingestion/src/datahub/ingestion/source/s3/source.py b/metadata-ingestion/src/datahub/ingestion/source/s3/source.py index 43a1bcd06d3f32..8bc075f720cc55 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/s3/source.py +++ b/metadata-ingestion/src/datahub/ingestion/source/s3/source.py @@ -263,12 +263,14 @@ def __init__(self, config: DataLakeSourceConfig, ctx: PipelineContext): self.init_spark() def init_spark(self): + os.environ.setdefault("SPARK_VERSION", "3.3") + spark_version = os.environ["SPARK_VERSION"] + # Importing here to avoid Deequ dependency for non profiling use cases # Deequ fails if Spark is not available which is not needed for non profiling use cases import pydeequ conf = SparkConf() - spark_version = os.getenv("SPARK_VERSION", "3.3") conf.set( "spark.jars.packages", ",".join( diff --git a/metadata-ingestion/tests/integration/s3/test_s3.py b/metadata-ingestion/tests/integration/s3/test_s3.py index 462ca88b7c1230..d255463444b18a 100644 --- a/metadata-ingestion/tests/integration/s3/test_s3.py +++ b/metadata-ingestion/tests/integration/s3/test_s3.py @@ -140,7 +140,6 @@ def test_data_lake_s3_ingest( def test_data_lake_local_ingest( pytestconfig, touch_local_files, source_file, tmp_path, mock_time ): - os.environ["SPARK_VERSION"] = "3.3.2" test_resources_dir = pytestconfig.rootpath / "tests/integration/s3/" f = open(os.path.join(SOURCE_FILES_PATH, source_file)) source = json.load(f) From 656f94b63c6b7e72d15df9e23525818c49e0ce87 Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Mon, 18 Mar 2024 14:27:43 -0700 Subject: [PATCH 3/3] feat(ingest): better rest emitter error message (#10073) --- .../src/datahub/emitter/rest_emitter.py | 21 +++++-------------- 1 file changed, 5 insertions(+), 16 deletions(-) diff --git a/metadata-ingestion/src/datahub/emitter/rest_emitter.py b/metadata-ingestion/src/datahub/emitter/rest_emitter.py index b2c1f685e288c7..d4e974d5855178 100644 --- a/metadata-ingestion/src/datahub/emitter/rest_emitter.py +++ b/metadata-ingestion/src/datahub/emitter/rest_emitter.py @@ -168,22 +168,11 @@ def test_connection(self) -> None: return else: - # Looks like we either connected to an old GMS or to some other service. Let's see if we can determine which before raising an error - # A common misconfiguration is connecting to datahub-frontend so we special-case this check - if ( - config.get("config", {}).get("application") == "datahub-frontend" - or config.get("config", {}).get("shouldShowDatasetLineage") - is not None - ): - raise ConfigurationError( - "You seem to have connected to the frontend instead of the GMS endpoint. " - "The rest emitter should connect to DataHub GMS (usually :8080) or Frontend GMS API (usually :9002/api/gms)" - ) - else: - raise ConfigurationError( - "You have either connected to a pre-v0.8.0 DataHub GMS instance, or to a different server altogether! " - "Please check your configuration and make sure you are talking to the DataHub GMS endpoint." - ) + raise ConfigurationError( + "You seem to have connected to the frontend service instead of the GMS endpoint. " + "The rest emitter should connect to DataHub GMS (usually :8080) or Frontend GMS API (usually :9002/api/gms). " + "For Acryl users, the endpoint should be https://.acryl.io/gms" + ) else: logger.debug( f"Unable to connect to {url} with status_code: {response.status_code}. Response: {response.text}"