diff --git a/.github/workflows/build-and-test.yml b/.github/workflows/build-and-test.yml index 9a940ef8040d1..86545946d6afe 100644 --- a/.github/workflows/build-and-test.yml +++ b/.github/workflows/build-and-test.yml @@ -118,10 +118,12 @@ jobs: run: | echo "BACKEND_FILES=`find ./build/coverage-reports/ -type f | grep -E '(metadata-models|entity-registry|datahuyb-graphql-core|metadata-io|metadata-jobs|metadata-utils|metadata-service|medata-dao-impl|metadata-operation|li-utils|metadata-integration|metadata-events|metadata-auth|ingestion-scheduler|notifications|datahub-upgrade)' | xargs | sed 's/ /,/g'`" >> $GITHUB_ENV echo "FRONTEND_FILES=`find ./build/coverage-reports/ -type f | grep -E '(datahub-frontend|datahub-web-react).*\.(xml|json)$' | xargs | sed 's/ /,/g'`" >> $GITHUB_ENV + - name: Generate tz artifact name + run: echo "NAME_TZ=$(echo ${{ matrix.timezone }} | tr '/' '-')" >> $GITHUB_ENV - uses: actions/upload-artifact@v4 if: always() with: - name: Test Results (build) - ${{ matrix.command}}-${{ matrix.timezone }} + name: Test Results (build) - ${{ matrix.command}}-${{ env.NAME_TZ }} path: | **/build/reports/tests/test/** **/build/test-results/test/** diff --git a/build.gradle b/build.gradle index 5b6613d3057f3..284092e2b14f4 100644 --- a/build.gradle +++ b/build.gradle @@ -379,6 +379,7 @@ configure(subprojects.findAll {! it.name.startsWith('spark-lineage')}) { resolutionStrategy.force externalDependency.antlr4Runtime resolutionStrategy.force externalDependency.antlr4 + resolutionStrategy.force 'org.apache.mina:mina-core:2.2.4' } } diff --git a/docs-website/README.md b/docs-website/README.md index 3b24cb869a444..b40e463642278 100644 --- a/docs-website/README.md +++ b/docs-website/README.md @@ -130,7 +130,6 @@ The purpose of this section is to provide developers & technical users with conc This section aims to provide plain-language feature overviews for both technical and non-technical readers alike. - ## Docs Generation Features **Includes all markdown files** @@ -145,16 +144,33 @@ You can suppress this check by adding the path to the file in a comment in `side Use an "inline" directive to include code snippets from other files. The `show_path_as_comment` option will include the path to the file as a comment at the top of the snippet. - ```python - {{ inline /metadata-ingestion/examples/library/data_quality_mcpw_rest.py show_path_as_comment }} - ``` + ```python + {{ inline /metadata-ingestion/examples/library/data_quality_mcpw_rest.py show_path_as_comment }} + ``` + +**Command Output** + +Use the `{{ command-output cmd }}` directive to run subprocesses and inject the outputs into the final markdown. + + {{ command-output python -c 'print("Hello world")' }} +This also works for multi-line scripts. + + {{ command-output + source metadata-ingestion/venv/bin/activate + python -m + }} + +Regardless of the location of the markdown file, the subcommands will be executed with working directory set to the repo root. + +Only the stdout of the subprocess will be outputted. The stderr, if any, will be included as a comment in the markdown. ## Docs site generation process This process is orchestrated by a combination of Gradle and Yarn tasks. The main entrypoint is via the `docs-website:yarnGenerate` task, which in turn eventually runs `yarn run generate`. Steps: + 1. Generate the GraphQL combined schema using the gradle's `docs-website:generateGraphQLSchema` task. This generates `./graphql/combined.graphql`. 2. Generate docs for ingestion sources using the `:metadata-ingestion:docGen` gradle task. 3. Generate docs for our metadata model using the `:metadata-ingestion:modelDocGen` gradle task. diff --git a/docs-website/generateDocsDir.ts b/docs-website/generateDocsDir.ts index ad82a85f9e567..3a14baee073c2 100644 --- a/docs-website/generateDocsDir.ts +++ b/docs-website/generateDocsDir.ts @@ -439,6 +439,42 @@ function markdown_process_inline_directives( contents.content = new_content; } +function markdown_process_command_output( + contents: matter.GrayMatterFile, + filepath: string +): void { + const new_content = contents.content.replace( + /^{{\s*command-output\s*([\s\S]*?)\s*}}$/gm, + (_, command: string) => { + try { + // Change to repo root directory before executing command + const repoRoot = path.resolve(__dirname, ".."); + + console.log(`Executing command: ${command}`); + + // Execute the command and capture output + const output = execSync(command, { + cwd: repoRoot, + encoding: "utf8", + stdio: ["pipe", "pipe", "pipe"], + }); + + // Return the command output + return output.trim(); + } catch (error: any) { + // If there's an error, include it as a comment + const errorMessage = error.stderr + ? error.stderr.toString() + : error.message; + return `${ + error.stdout ? error.stdout.toString().trim() : "" + }\n`; + } + } + ); + contents.content = new_content; +} + function markdown_sanitize_and_linkify(content: string): string { // MDX escaping content = content.replace(/ None: code = """ # This file contains classes corresponding to entity URNs. -from typing import ClassVar, List, Optional, Type, TYPE_CHECKING +from typing import ClassVar, List, Optional, Type, TYPE_CHECKING, Union import functools from deprecated.sphinx import deprecated as _sphinx_deprecated @@ -547,10 +547,31 @@ def generate_urn_class(entity_type: str, key_aspect: dict) -> str: assert fields[0]["type"] == ["null", "string"] fields[0]["type"] = "string" + field_urn_type_classes = {} + for field in fields: + # Figure out if urn types are valid for each field. + field_urn_type_class = None + if field_name(field) == "platform": + field_urn_type_class = "DataPlatformUrn" + elif field.get("Urn"): + if len(field.get("entityTypes", [])) == 1: + field_entity_type = field["entityTypes"][0] + field_urn_type_class = f"{capitalize_entity_name(field_entity_type)}Urn" + else: + field_urn_type_class = "Urn" + + field_urn_type_classes[field_name(field)] = field_urn_type_class + _init_arg_parts: List[str] = [] for field in fields: + field_urn_type_class = field_urn_type_classes[field_name(field)] + default = '"PROD"' if field_name(field) == "env" else None - _arg_part = f"{field_name(field)}: {field_type(field)}" + + type_hint = field_type(field) + if field_urn_type_class: + type_hint = f'Union["{field_urn_type_class}", str]' + _arg_part = f"{field_name(field)}: {type_hint}" if default: _arg_part += f" = {default}" _init_arg_parts.append(_arg_part) @@ -579,16 +600,7 @@ def generate_urn_class(entity_type: str, key_aspect: dict) -> str: init_validation += f'if not {field_name(field)}:\n raise InvalidUrnError("{class_name} {field_name(field)} cannot be empty")\n' # Generalized mechanism for validating embedded urns. - field_urn_type_class = None - if field_name(field) == "platform": - field_urn_type_class = "DataPlatformUrn" - elif field.get("Urn"): - if len(field.get("entityTypes", [])) == 1: - field_entity_type = field["entityTypes"][0] - field_urn_type_class = f"{capitalize_entity_name(field_entity_type)}Urn" - else: - field_urn_type_class = "Urn" - + field_urn_type_class = field_urn_type_classes[field_name(field)] if field_urn_type_class: init_validation += f"{field_name(field)} = str({field_name(field)})\n" init_validation += ( @@ -608,7 +620,7 @@ def generate_urn_class(entity_type: str, key_aspect: dict) -> str: init_coercion += " platform_name = DataPlatformUrn.from_string(platform_name).platform_name\n" if field_name(field) == "platform": - init_coercion += "platform = DataPlatformUrn(platform).urn()\n" + init_coercion += "platform = platform.urn() if isinstance(platform, DataPlatformUrn) else DataPlatformUrn(platform).urn()\n" elif field_urn_type_class is None: # For all non-urns, run the value through the UrnEncoder. init_coercion += ( diff --git a/metadata-ingestion/scripts/docgen.py b/metadata-ingestion/scripts/docgen.py index 402cd8a814199..71eef96f5b926 100644 --- a/metadata-ingestion/scripts/docgen.py +++ b/metadata-ingestion/scripts/docgen.py @@ -1,381 +1,25 @@ +import dataclasses import glob -import html import json import logging import os +import pathlib import re import sys import textwrap from importlib.metadata import metadata, requires -from typing import Any, Dict, Iterable, List, Optional +from typing import Dict, List, Optional import click -from pydantic import BaseModel, Field +from docgen_types import Platform, Plugin +from docs_config_table import gen_md_table_from_json_schema from datahub.configuration.common import ConfigModel -from datahub.ingestion.api.decorators import ( - CapabilitySetting, - SourceCapability, - SupportStatus, -) +from datahub.ingestion.api.decorators import SourceCapability, SupportStatus from datahub.ingestion.source.source_registry import source_registry -from datahub.metadata.schema_classes import SchemaFieldClass logger = logging.getLogger(__name__) -DEFAULT_VALUE_MAX_LENGTH = 50 -DEFAULT_VALUE_TRUNCATION_MESSAGE = "..." - - -def _truncate_default_value(value: str) -> str: - if len(value) > DEFAULT_VALUE_MAX_LENGTH: - return value[:DEFAULT_VALUE_MAX_LENGTH] + DEFAULT_VALUE_TRUNCATION_MESSAGE - return value - - -def _format_path_component(path: str) -> str: - """ - Given a path like 'a.b.c', adds css tags to the components. - """ - path_components = path.rsplit(".", maxsplit=1) - if len(path_components) == 1: - return f'{path_components[0]}' - - return ( - f'{path_components[0]}.' - f'{path_components[1]}' - ) - - -def _format_type_name(type_name: str) -> str: - return f'{type_name}' - - -def _format_default_line(default_value: str, has_desc_above: bool) -> str: - default_value = _truncate_default_value(default_value) - escaped_value = ( - html.escape(default_value) - # Replace curly braces to avoid JSX issues. - .replace("{", "{") - .replace("}", "}") - # We also need to replace markdown special characters. - .replace("*", "*") - .replace("_", "_") - .replace("[", "[") - .replace("]", "]") - .replace("|", "|") - .replace("`", "`") - ) - value_elem = f'{escaped_value}' - return f'
Default: {value_elem}
' - - -class FieldRow(BaseModel): - path: str - parent: Optional[str] - type_name: str - required: bool - has_default: bool - default: str - description: str - inner_fields: List["FieldRow"] = Field(default_factory=list) - discriminated_type: Optional[str] = None - - class Component(BaseModel): - type: str - field_name: Optional[str] - - # matches any [...] style section inside a field path - _V2_FIELD_PATH_TOKEN_MATCHER = r"\[[\w.]*[=]*[\w\(\-\ \_\).]*\][\.]*" - # matches a .?[...] style section inside a field path anchored to the beginning - _V2_FIELD_PATH_TOKEN_MATCHER_PREFIX = rf"^[\.]*{_V2_FIELD_PATH_TOKEN_MATCHER}" - _V2_FIELD_PATH_FIELD_NAME_MATCHER = r"^\w+" - - @staticmethod - def map_field_path_to_components(field_path: str) -> List[Component]: - m = re.match(FieldRow._V2_FIELD_PATH_TOKEN_MATCHER_PREFIX, field_path) - v = re.match(FieldRow._V2_FIELD_PATH_FIELD_NAME_MATCHER, field_path) - components: List[FieldRow.Component] = [] - while m or v: - token = m.group() if m else v.group() # type: ignore - if v: - if components: - if components[-1].field_name is None: - components[-1].field_name = token - else: - components.append( - FieldRow.Component(type="non_map_type", field_name=token) - ) - else: - components.append( - FieldRow.Component(type="non_map_type", field_name=token) - ) - - if m: - if token.startswith("[version="): - pass - elif "[type=" in token: - type_match = re.match(r"[\.]*\[type=(.*)\]", token) - if type_match: - type_string = type_match.group(1) - if components and components[-1].type == "map": - if components[-1].field_name is None: - pass - else: - new_component = FieldRow.Component( - type="map_key", field_name="`key`" - ) - components.append(new_component) - new_component = FieldRow.Component( - type=type_string, field_name=None - ) - components.append(new_component) - if type_string == "map": - new_component = FieldRow.Component( - type=type_string, field_name=None - ) - components.append(new_component) - - field_path = field_path[m.span()[1] :] if m else field_path[v.span()[1] :] # type: ignore - m = re.match(FieldRow._V2_FIELD_PATH_TOKEN_MATCHER_PREFIX, field_path) - v = re.match(FieldRow._V2_FIELD_PATH_FIELD_NAME_MATCHER, field_path) - - return components - - @staticmethod - def field_path_to_components(field_path: str) -> List[str]: - """ - Inverts the field_path v2 format to get the canonical field path - [version=2.0].[type=x].foo.[type=string(format=uri)].bar => ["foo","bar"] - """ - if "type=map" not in field_path: - return re.sub(FieldRow._V2_FIELD_PATH_TOKEN_MATCHER, "", field_path).split( - "." - ) - else: - # fields with maps in them need special handling to insert the `key` fragment - return [ - c.field_name - for c in FieldRow.map_field_path_to_components(field_path) - if c.field_name - ] - - @classmethod - def from_schema_field(cls, schema_field: SchemaFieldClass) -> "FieldRow": - path_components = FieldRow.field_path_to_components(schema_field.fieldPath) - - parent = path_components[-2] if len(path_components) >= 2 else None - if parent == "`key`": - # the real parent node is one index above - parent = path_components[-3] - json_props = ( - json.loads(schema_field.jsonProps) if schema_field.jsonProps else {} - ) - - required = json_props.get("required", True) - has_default = "default" in json_props - default_value = str(json_props.get("default")) - - field_path = ".".join(path_components) - - return FieldRow( - path=field_path, - parent=parent, - type_name=str(schema_field.nativeDataType), - required=required, - has_default=has_default, - default=default_value, - description=schema_field.description, - inner_fields=[], - discriminated_type=schema_field.nativeDataType, - ) - - def get_checkbox(self) -> str: - if self.required and not self.has_default: - # Using a non-breaking space to prevent the checkbox from being - # broken into a new line. - if not self.parent: # None and empty string both count - return ' ' - else: - return f' ' - else: - return "" - - def to_md_line(self) -> str: - if self.inner_fields: - if len(self.inner_fields) == 1: - type_name = self.inner_fields[0].type_name or self.type_name - else: - # To deal with unions that have essentially the same simple field path, - # we combine the type names into a single string. - type_name = "One of " + ", ".join( - [x.type_name for x in self.inner_fields if x.discriminated_type] - ) - else: - type_name = self.type_name - - description = self.description.strip() - description = self.description.replace( - "\n", "
" - ) # descriptions with newlines in them break markdown rendering - - md_line = ( - f'|
{_format_path_component(self.path)}' - f"{self.get_checkbox()}
" - f'
{_format_type_name(type_name)}
' - f"| {description} " - f"{_format_default_line(self.default, bool(description)) if self.has_default else ''} |\n" - ) - return md_line - - -class FieldHeader(FieldRow): - def to_md_line(self) -> str: - return "\n".join( - [ - "| Field | Description |", - "|:--- |:--- |", - "", - ] - ) - - def __init__(self): - pass - - -def get_prefixed_name(field_prefix: Optional[str], field_name: Optional[str]) -> str: - assert ( - field_prefix or field_name - ), "One of field_prefix or field_name should be present" - return ( - f"{field_prefix}.{field_name}" # type: ignore - if field_prefix and field_name - else field_name - if not field_prefix - else field_prefix - ) - - -def custom_comparator(path: str) -> str: - """ - Projects a string onto a separate space - Low_prio string will start with Z else start with A - Number of field paths will add the second set of letters: 00 - 99 - - """ - opt1 = path - prio_value = priority_value(opt1) - projection = f"{prio_value}" - projection = f"{projection}{opt1}" - return projection - - -class FieldTree: - """ - A helper class that re-constructs the tree hierarchy of schema fields - to help sort fields by importance while keeping nesting intact - """ - - def __init__(self, field: Optional[FieldRow] = None): - self.field = field - self.fields: Dict[str, "FieldTree"] = {} - - def add_field(self, row: FieldRow, path: Optional[str] = None) -> "FieldTree": - # logger.warn(f"Add field: path:{path}, row:{row}") - if self.field and self.field.path == row.path: - # we have an incoming field with the same path as us, this is probably a union variant - # attach to existing field - self.field.inner_fields.append(row) - else: - path = path if path is not None else row.path - top_level_field = path.split(".")[0] - if top_level_field in self.fields: - self.fields[top_level_field].add_field( - row, ".".join(path.split(".")[1:]) - ) - else: - self.fields[top_level_field] = FieldTree(field=row) - # logger.warn(f"{self}") - return self - - def sort(self): - # Required fields before optionals - required_fields = { - k: v for k, v in self.fields.items() if v.field and v.field.required - } - optional_fields = { - k: v for k, v in self.fields.items() if v.field and not v.field.required - } - - self.sorted_fields = [] - for field_map in [required_fields, optional_fields]: - # Top-level fields before fields with nesting - self.sorted_fields.extend( - sorted( - [f for f, val in field_map.items() if val.fields == {}], - key=custom_comparator, - ) - ) - self.sorted_fields.extend( - sorted( - [f for f, val in field_map.items() if val.fields != {}], - key=custom_comparator, - ) - ) - - for field_tree in self.fields.values(): - field_tree.sort() - - def get_fields(self) -> Iterable[FieldRow]: - if self.field: - yield self.field - for key in self.sorted_fields: - yield from self.fields[key].get_fields() - - def __repr__(self) -> str: - result = {} - if self.field: - result["_self"] = json.loads(json.dumps(self.field.dict())) - for f in self.fields: - result[f] = json.loads(str(self.fields[f])) - return json.dumps(result, indent=2) - - -def priority_value(path: str) -> str: - # A map of low value tokens to their relative importance - low_value_token_map = {"env": "X", "profiling": "Y", "stateful_ingestion": "Z"} - tokens = path.split(".") - for low_value_token in low_value_token_map: - if low_value_token in tokens: - return low_value_token_map[low_value_token] - - # everything else high-prio - return "A" - - -def gen_md_table_from_struct(schema_dict: Dict[str, Any]) -> List[str]: - from datahub.ingestion.extractor.json_schema_util import JsonSchemaTranslator - - # we don't want default field values to be injected into the description of the field - JsonSchemaTranslator._INJECT_DEFAULTS_INTO_DESCRIPTION = False - schema_fields = list(JsonSchemaTranslator.get_fields_from_schema(schema_dict)) - result: List[str] = [FieldHeader().to_md_line()] - - field_tree = FieldTree(field=None) - for field in schema_fields: - row: FieldRow = FieldRow.from_schema_field(field) - field_tree.add_field(row) - - field_tree.sort() - - for row in field_tree.get_fields(): - result.append(row.to_md_line()) - - # Wrap with a .config-table div. - result = ["\n
\n\n", *result, "\n
\n"] - - return result - def get_snippet(long_string: str, max_length: int = 100) -> str: snippet = "" @@ -424,19 +68,6 @@ def get_capability_text(src_capability: SourceCapability) -> str: ) -def create_or_update( - something: Dict[Any, Any], path: List[str], value: Any -) -> Dict[Any, Any]: - dict_under_operation = something - for p in path[:-1]: - if p not in dict_under_operation: - dict_under_operation[p] = {} - dict_under_operation = dict_under_operation[p] - - dict_under_operation[path[-1]] = value - return something - - def does_extra_exist(extra_name: str) -> bool: for key, value in metadata("acryl-datahub").items(): if key == "Provides-Extra" and value == extra_name: @@ -498,6 +129,102 @@ def new_url(original_url: str, file_path: str) -> str: return new_content +def load_plugin(plugin_name: str, out_dir: str) -> Plugin: + logger.debug(f"Loading {plugin_name}") + class_or_exception = source_registry._ensure_not_lazy(plugin_name) + if isinstance(class_or_exception, Exception): + raise class_or_exception + source_type = source_registry.get(plugin_name) + logger.debug(f"Source class is {source_type}") + + if hasattr(source_type, "get_platform_name"): + platform_name = source_type.get_platform_name() + else: + platform_name = ( + plugin_name.title() + ) # we like platform names to be human readable + + platform_id = None + if hasattr(source_type, "get_platform_id"): + platform_id = source_type.get_platform_id() + if platform_id is None: + raise ValueError(f"Platform ID not found for {plugin_name}") + + plugin = Plugin( + name=plugin_name, + platform_id=platform_id, + platform_name=platform_name, + classname=".".join([source_type.__module__, source_type.__name__]), + ) + + if hasattr(source_type, "get_platform_doc_order"): + platform_doc_order = source_type.get_platform_doc_order() + plugin.doc_order = platform_doc_order + + plugin_file_name = "src/" + "/".join(source_type.__module__.split(".")) + if os.path.exists(plugin_file_name) and os.path.isdir(plugin_file_name): + plugin_file_name = plugin_file_name + "/__init__.py" + else: + plugin_file_name = plugin_file_name + ".py" + if os.path.exists(plugin_file_name): + plugin.filename = plugin_file_name + else: + logger.info( + f"Failed to locate filename for {plugin_name}. Guessed {plugin_file_name}, but that doesn't exist" + ) + + if hasattr(source_type, "__doc__"): + plugin.source_docstring = textwrap.dedent(source_type.__doc__ or "") + + if hasattr(source_type, "get_support_status"): + plugin.support_status = source_type.get_support_status() + + if hasattr(source_type, "get_capabilities"): + capabilities = list(source_type.get_capabilities()) + capabilities.sort(key=lambda x: x.capability.value) + plugin.capabilities = capabilities + + try: + extra_plugin = plugin_name if does_extra_exist(plugin_name) else None + plugin.extra_deps = ( + get_additional_deps_for_extra(extra_plugin) if extra_plugin else [] + ) + except Exception as e: + logger.info( + f"Failed to load extras for {plugin_name} due to exception {e}", exc_info=e + ) + + if hasattr(source_type, "get_config_class"): + source_config_class: ConfigModel = source_type.get_config_class() + + plugin.config_json_schema = source_config_class.schema_json(indent=2) + plugin.config_md = gen_md_table_from_json_schema(source_config_class.schema()) + + # Write the config json schema to the out_dir. + config_dir = pathlib.Path(out_dir) / "config_schemas" + config_dir.mkdir(parents=True, exist_ok=True) + (config_dir / f"{plugin_name}_config.json").write_text( + plugin.config_json_schema + ) + + return plugin + + +@dataclasses.dataclass +class PluginMetrics: + discovered: int = 0 + loaded: int = 0 + generated: int = 0 + failed: int = 0 + + +@dataclasses.dataclass +class PlatformMetrics: + discovered: int = 0 + generated: int = 0 + warnings: List[str] = dataclasses.field(default_factory=list) + + @click.command() @click.option("--out-dir", type=str, required=True) @click.option("--extra-docs", type=str, required=False) @@ -505,239 +232,111 @@ def new_url(original_url: str, file_path: str) -> str: def generate( out_dir: str, extra_docs: Optional[str] = None, source: Optional[str] = None ) -> None: # noqa: C901 - source_documentation: Dict[str, Any] = {} - metrics = {} - metrics["source_platforms"] = {"discovered": 0, "generated": 0, "warnings": []} - metrics["plugins"] = {"discovered": 0, "generated": 0, "failed": 0} - - if extra_docs: - for path in glob.glob(f"{extra_docs}/**/*[.md|.yaml|.yml]", recursive=True): - m = re.search("/docs/sources/(.*)/(.*).md", path) - if m: - platform_name = m.group(1).lower() - file_name = m.group(2) - destination_md: str = ( - f"../docs/generated/ingestion/sources/{platform_name}.md" - ) - - with open(path, "r") as doc_file: - file_contents = doc_file.read() - final_markdown = rewrite_markdown( - file_contents, path, destination_md - ) - - if file_name == "README": - # README goes as platform level docs - # all other docs are assumed to be plugin level - create_or_update( - source_documentation, - [platform_name, "custom_docs"], - final_markdown, - ) - else: - if "_" in file_name: - plugin_doc_parts = file_name.split("_") - if len(plugin_doc_parts) != 2 or plugin_doc_parts[ - 1 - ] not in ["pre", "post"]: - raise Exception( - f"{file_name} needs to be of the form _pre.md or _post.md" - ) - - docs_key_name = f"custom_docs_{plugin_doc_parts[1]}" - create_or_update( - source_documentation, - [ - platform_name, - "plugins", - plugin_doc_parts[0], - docs_key_name, - ], - final_markdown, - ) - else: - create_or_update( - source_documentation, - [ - platform_name, - "plugins", - file_name, - "custom_docs_post", - ], - final_markdown, - ) - else: - yml_match = re.search("/docs/sources/(.*)/(.*)_recipe.yml", path) - if yml_match: - platform_name = yml_match.group(1).lower() - plugin_name = yml_match.group(2) - with open(path, "r") as doc_file: - file_contents = doc_file.read() - create_or_update( - source_documentation, - [platform_name, "plugins", plugin_name, "recipe"], - file_contents, - ) + plugin_metrics = PluginMetrics() + platform_metrics = PlatformMetrics() + platforms: Dict[str, Platform] = {} for plugin_name in sorted(source_registry.mapping.keys()): if source and source != plugin_name: continue if plugin_name in { "snowflake-summary", + "snowflake-queries", + "bigquery-queries", }: logger.info(f"Skipping {plugin_name} as it is on the deny list") continue - metrics["plugins"]["discovered"] = metrics["plugins"]["discovered"] + 1 # type: ignore - # We want to attempt to load all plugins before printing a summary. - source_type = None + plugin_metrics.discovered += 1 try: - # output = subprocess.check_output( - # ["/bin/bash", "-c", f"pip install -e '.[{key}]'"] - # ) - class_or_exception = source_registry._ensure_not_lazy(plugin_name) - if isinstance(class_or_exception, Exception): - raise class_or_exception - logger.debug(f"Processing {plugin_name}") - source_type = source_registry.get(plugin_name) - logger.debug(f"Source class is {source_type}") - extra_plugin = plugin_name if does_extra_exist(plugin_name) else None - extra_deps = ( - get_additional_deps_for_extra(extra_plugin) if extra_plugin else [] - ) + plugin = load_plugin(plugin_name, out_dir=out_dir) except Exception as e: - logger.warning( - f"Failed to process {plugin_name} due to exception {e}", exc_info=e + logger.error( + f"Failed to load {plugin_name} due to exception {e}", exc_info=e ) - metrics["plugins"]["failed"] = metrics["plugins"].get("failed", 0) + 1 # type: ignore - - if source_type and hasattr(source_type, "get_config_class"): - try: - source_config_class: ConfigModel = source_type.get_config_class() - support_status = SupportStatus.UNKNOWN - capabilities = [] - if hasattr(source_type, "__doc__"): - source_doc = textwrap.dedent(source_type.__doc__ or "") - if hasattr(source_type, "get_platform_name"): - platform_name = source_type.get_platform_name() - else: - platform_name = ( - plugin_name.title() - ) # we like platform names to be human readable - - if hasattr(source_type, "get_platform_id"): - platform_id = source_type.get_platform_id() - - if hasattr(source_type, "get_platform_doc_order"): - platform_doc_order = source_type.get_platform_doc_order() - create_or_update( - source_documentation, - [platform_id, "plugins", plugin_name, "doc_order"], - platform_doc_order, - ) - - source_documentation[platform_id] = ( - source_documentation.get(platform_id) or {} - ) - - create_or_update( - source_documentation, - [platform_id, "plugins", plugin_name, "classname"], - ".".join([source_type.__module__, source_type.__name__]), - ) - plugin_file_name = "src/" + "/".join(source_type.__module__.split(".")) - if os.path.exists(plugin_file_name) and os.path.isdir(plugin_file_name): - plugin_file_name = plugin_file_name + "/__init__.py" - else: - plugin_file_name = plugin_file_name + ".py" - if os.path.exists(plugin_file_name): - create_or_update( - source_documentation, - [platform_id, "plugins", plugin_name, "filename"], - plugin_file_name, - ) - else: - logger.info( - f"Failed to locate filename for {plugin_name}. Guessed {plugin_file_name}" - ) - - if hasattr(source_type, "get_support_status"): - support_status = source_type.get_support_status() - - if hasattr(source_type, "get_capabilities"): - capabilities = list(source_type.get_capabilities()) - capabilities.sort(key=lambda x: x.capability.value) - - create_or_update( - source_documentation, - [platform_id, "plugins", plugin_name, "capabilities"], - capabilities, - ) - - create_or_update( - source_documentation, [platform_id, "name"], platform_name - ) - - create_or_update( - source_documentation, - [platform_id, "plugins", plugin_name, "extra_deps"], - extra_deps, - ) + plugin_metrics.failed += 1 + continue + else: + plugin_metrics.loaded += 1 - config_dir = f"{out_dir}/config_schemas" - os.makedirs(config_dir, exist_ok=True) - with open(f"{config_dir}/{plugin_name}_config.json", "w") as f: - f.write(source_config_class.schema_json(indent=2)) + # Add to the platform list if not already present. + platforms.setdefault( + plugin.platform_id, + Platform( + id=plugin.platform_id, + name=plugin.platform_name, + ), + ).add_plugin(plugin_name=plugin.name, plugin=plugin) - create_or_update( - source_documentation, - [platform_id, "plugins", plugin_name, "config_schema"], - source_config_class.schema_json(indent=2) or "", + if extra_docs: + for path in glob.glob(f"{extra_docs}/**/*[.md|.yaml|.yml]", recursive=True): + if m := re.search("/docs/sources/(.*)/(.*).md", path): + platform_name = m.group(1).lower() # TODO: rename this to platform_id + file_name = m.group(2) + destination_md: str = ( + f"../docs/generated/ingestion/sources/{platform_name}.md" ) - table_md = gen_md_table_from_struct(source_config_class.schema()) - create_or_update( - source_documentation, - [platform_id, "plugins", plugin_name, "source_doc"], - source_doc or "", - ) - create_or_update( - source_documentation, - [platform_id, "plugins", plugin_name, "config"], - table_md, - ) - create_or_update( - source_documentation, - [platform_id, "plugins", plugin_name, "support_status"], - support_status, - ) + with open(path, "r") as doc_file: + file_contents = doc_file.read() + final_markdown = rewrite_markdown(file_contents, path, destination_md) + + if file_name == "README": + # README goes as platform level docs + # all other docs are assumed to be plugin level + platforms[platform_name].custom_docs_pre = final_markdown + + elif "_" in file_name: + plugin_doc_parts = file_name.split("_") + if len(plugin_doc_parts) != 2: + raise ValueError( + f"{file_name} needs to be of the form _pre.md or _post.md" + ) + plugin_name, suffix = plugin_doc_parts + if suffix == "pre": + platforms[platform_name].plugins[ + plugin_name + ].custom_docs_pre = final_markdown + elif suffix == "post": + platforms[platform_name].plugins[ + plugin_name + ].custom_docs_post = final_markdown + else: + raise ValueError( + f"{file_name} needs to be of the form _pre.md or _post.md" + ) - except Exception as e: - raise e + else: # assume this is the platform post. + # TODO: Probably need better error checking here. + platforms[platform_name].plugins[ + file_name + ].custom_docs_post = final_markdown + elif yml_match := re.search("/docs/sources/(.*)/(.*)_recipe.yml", path): + platform_name = yml_match.group(1).lower() + plugin_name = yml_match.group(2) + platforms[platform_name].plugins[ + plugin_name + ].starter_recipe = pathlib.Path(path).read_text() sources_dir = f"{out_dir}/sources" os.makedirs(sources_dir, exist_ok=True) + # Sort platforms by platform name. + platforms = dict(sorted(platforms.items(), key=lambda x: x[1].name.casefold())) + i = 0 - for platform_id, platform_docs in sorted( - source_documentation.items(), - key=lambda x: (x[1]["name"].casefold(), x[1]["name"]) - if "name" in x[1] - else (x[0].casefold(), x[0]), - ): + for platform_id, platform in platforms.items(): if source and platform_id != source: continue - metrics["source_platforms"]["discovered"] = ( - metrics["source_platforms"]["discovered"] + 1 # type: ignore - ) + platform_metrics.discovered += 1 platform_doc_file = f"{sources_dir}/{platform_id}.md" - if "name" not in platform_docs: - # We seem to have discovered written docs that corresponds to a platform, but haven't found linkage to it from the source classes - warning_msg = f"Failed to find source classes for platform {platform_id}. Did you remember to annotate your source class with @platform_name({platform_id})?" - logger.error(warning_msg) - metrics["source_platforms"]["warnings"].append(warning_msg) # type: ignore - continue + # if "name" not in platform_docs: + # # We seem to have discovered written docs that corresponds to a platform, but haven't found linkage to it from the source classes + # warning_msg = f"Failed to find source classes for platform {platform_id}. Did you remember to annotate your source class with @platform_name({platform_id})?" + # logger.error(warning_msg) + # metrics["source_platforms"]["warnings"].append(warning_msg) # type: ignore + # continue with open(platform_doc_file, "w") as f: i += 1 @@ -745,12 +344,12 @@ def generate( f.write( "import Tabs from '@theme/Tabs';\nimport TabItem from '@theme/TabItem';\n\n" ) - f.write(f"# {platform_docs['name']}\n") + f.write(f"# {platform.name}\n") - if len(platform_docs["plugins"].keys()) > 1: + if len(platform.plugins) > 1: # More than one plugin used to provide integration with this platform f.write( - f"There are {len(platform_docs['plugins'].keys())} sources that provide integration with {platform_docs['name']}\n" + f"There are {len(platform.plugins)} sources that provide integration with {platform.name}\n" ) f.write("\n") f.write("\n") @@ -759,18 +358,22 @@ def generate( f.write(f"") f.write("") + # Sort plugins in the platform. + # It's a dict, so we need to recreate it. + platform.plugins = dict( + sorted( + platform.plugins.items(), + key=lambda x: str(x[1].doc_order) if x[1].doc_order else x[0], + ) + ) + # f.write("| Source Module | Documentation |\n") # f.write("| ------ | ---- |\n") - for plugin, plugin_docs in sorted( - platform_docs["plugins"].items(), - key=lambda x: str(x[1].get("doc_order")) - if x[1].get("doc_order") - else x[0], - ): + for plugin_name, plugin in platform.plugins.items(): f.write("\n") - f.write(f"\n") + f.write(f"\n") f.write( - f"\n" + f"\n" ) f.write("\n") # f.write( @@ -778,43 +381,33 @@ def generate( # ) f.write("
{col_header}
\n\n`{plugin}`\n\n\n\n`{plugin_name}`\n\n\n\n\n{platform_docs['plugins'][plugin].get('source_doc') or ''} [Read more...](#module-{plugin})\n\n\n\n\n\n{plugin.source_docstring or ''} [Read more...](#module-{plugin_name})\n\n\n
\n\n") # insert platform level custom docs before plugin section - f.write(platform_docs.get("custom_docs") or "") + f.write(platform.custom_docs_pre or "") # all_plugins = platform_docs["plugins"].keys() - for plugin, plugin_docs in sorted( - platform_docs["plugins"].items(), - key=lambda x: str(x[1].get("doc_order")) - if x[1].get("doc_order") - else x[0], - ): - if len(platform_docs["plugins"].keys()) > 1: + for plugin_name, plugin in platform.plugins.items(): + if len(platform.plugins) > 1: # We only need to show this if there are multiple modules. - f.write(f"\n\n## Module `{plugin}`\n") + f.write(f"\n\n## Module `{plugin_name}`\n") - if "support_status" in plugin_docs: - f.write( - get_support_status_badge(plugin_docs["support_status"]) + "\n\n" - ) - if "capabilities" in plugin_docs and len(plugin_docs["capabilities"]): + if plugin.support_status != SupportStatus.UNKNOWN: + f.write(get_support_status_badge(plugin.support_status) + "\n\n") + if plugin.capabilities and len(plugin.capabilities): f.write("\n### Important Capabilities\n") f.write("| Capability | Status | Notes |\n") f.write("| ---------- | ------ | ----- |\n") - plugin_capabilities: List[CapabilitySetting] = plugin_docs[ - "capabilities" - ] - for cap_setting in plugin_capabilities: + for cap_setting in plugin.capabilities: f.write( f"| {get_capability_text(cap_setting.capability)} | {get_capability_supported_badge(cap_setting.supported)} | {cap_setting.description} |\n" ) f.write("\n") - f.write(f"{plugin_docs.get('source_doc') or ''}\n") + f.write(f"{plugin.source_docstring or ''}\n") # Insert custom pre section - f.write(plugin_docs.get("custom_docs_pre", "")) + f.write(plugin.custom_docs_pre or "") f.write("\n### CLI based Ingestion\n") - if "extra_deps" in plugin_docs: + if plugin.extra_deps and len(plugin.extra_deps): f.write("\n#### Install the Plugin\n") - if plugin_docs["extra_deps"] != []: + if plugin.extra_deps != []: f.write("```shell\n") f.write(f"pip install 'acryl-datahub[{plugin}]'\n") f.write("```\n") @@ -822,7 +415,7 @@ def generate( f.write( f"The `{plugin}` source works out of the box with `acryl-datahub`.\n" ) - if "recipe" in plugin_docs: + if plugin.starter_recipe: f.write("\n### Starter Recipe\n") f.write( "Check out the following recipe to get started with ingestion! See [below](#config-details) for full configuration options.\n\n\n" @@ -831,9 +424,10 @@ def generate( "For general pointers on writing and running a recipe, see our [main recipe guide](../../../../metadata-ingestion/README.md#recipes).\n" ) f.write("```yaml\n") - f.write(plugin_docs["recipe"]) + f.write(plugin.starter_recipe) f.write("\n```\n") - if "config" in plugin_docs: + if plugin.config_json_schema: + assert plugin.config_md is not None f.write("\n### Config Details\n") f.write( """ @@ -845,8 +439,8 @@ def generate( # f.write( # "\n
\nView All Configuration Options\n\n" # ) - for doc in plugin_docs["config"]: - f.write(doc) + f.write(plugin.config_md) + f.write("\n\n") # f.write("\n
\n\n") f.write( f""" @@ -854,39 +448,49 @@ def generate( The [JSONSchema](https://json-schema.org/) for this configuration is inlined below.\n\n ```javascript -{plugin_docs['config_schema']} +{plugin.config_json_schema} ```\n\n
\n\n""" ) + # insert custom plugin docs after config details - f.write(plugin_docs.get("custom_docs_post", "")) - if "classname" in plugin_docs: + f.write(plugin.custom_docs_post or "") + if plugin.classname: f.write("\n### Code Coordinates\n") - f.write(f"- Class Name: `{plugin_docs['classname']}`\n") - if "filename" in plugin_docs: + f.write(f"- Class Name: `{plugin.classname}`\n") + if plugin.filename: f.write( - f"- Browse on [GitHub](../../../../metadata-ingestion/{plugin_docs['filename']})\n\n" + f"- Browse on [GitHub](../../../../metadata-ingestion/{plugin.filename})\n\n" ) - metrics["plugins"]["generated"] = metrics["plugins"]["generated"] + 1 # type: ignore + plugin_metrics.generated += 1 # Using an h2 tag to prevent this from showing up in page's TOC sidebar. f.write("\n

Questions

\n\n") f.write( - f"If you've got any questions on configuring ingestion for {platform_docs.get('name',platform_id)}, feel free to ping us on [our Slack](https://slack.datahubproject.io).\n" - ) - metrics["source_platforms"]["generated"] = ( - metrics["source_platforms"]["generated"] + 1 # type: ignore + f"If you've got any questions on configuring ingestion for {platform.name}, feel free to ping us on [our Slack](https://slack.datahubproject.io).\n" ) + platform_metrics.generated += 1 print("Ingestion Documentation Generation Complete") print("############################################") - print(json.dumps(metrics, indent=2)) + print( + json.dumps( + { + "plugin_metrics": dataclasses.asdict(plugin_metrics), + "platform_metrics": dataclasses.asdict(platform_metrics), + }, + indent=2, + ) + ) print("############################################") - if metrics["plugins"].get("failed", 0) > 0: # type: ignore + if plugin_metrics.failed > 0: sys.exit(1) - ### Create Lineage doc + # Create Lineage doc + generate_lineage_doc(platforms) + +def generate_lineage_doc(platforms: Dict[str, Platform]) -> None: source_dir = "../docs/generated/lineage" os.makedirs(source_dir, exist_ok=True) doc_file = f"{source_dir}/lineage-feature-guide.md" @@ -894,7 +498,7 @@ def generate( f.write( "import FeatureAvailability from '@site/src/components/FeatureAvailability';\n\n" ) - f.write(f"# About DataHub Lineage\n\n") + f.write("# About DataHub Lineage\n\n") f.write("\n") f.write( @@ -996,30 +600,24 @@ def generate( ) f.write("| ---------- | ------ | ----- |----- |\n") - for platform_id, platform_docs in sorted( - source_documentation.items(), - key=lambda x: (x[1]["name"].casefold(), x[1]["name"]) - if "name" in x[1] - else (x[0].casefold(), x[0]), - ): - for plugin, plugin_docs in sorted( - platform_docs["plugins"].items(), - key=lambda x: str(x[1].get("doc_order")) - if x[1].get("doc_order") - else x[0], + for platform_id, platform in platforms.items(): + for plugin in sorted( + platform.plugins.values(), + key=lambda x: str(x.doc_order) if x.doc_order else x.name, ): - platform_name = platform_docs["name"] - if len(platform_docs["plugins"].keys()) > 1: + if len(platform.plugins) > 1: # We only need to show this if there are multiple modules. - platform_name = f"{platform_name} `{plugin}`" + platform_plugin_name = f"{platform.name} `{plugin.name}`" + else: + platform_plugin_name = platform.name # Initialize variables table_level_supported = "❌" column_level_supported = "❌" config_names = "" - if "capabilities" in plugin_docs: - plugin_capabilities = plugin_docs["capabilities"] + if plugin.capabilities and len(plugin.capabilities): + plugin_capabilities = plugin.capabilities for cap_setting in plugin_capabilities: capability_text = get_capability_text(cap_setting.capability) @@ -1040,10 +638,10 @@ def generate( column_level_supported = "✅" if not (table_level_supported == "❌" and column_level_supported == "❌"): - if "config_schema" in plugin_docs: - config_properties = json.loads( - plugin_docs["config_schema"] - ).get("properties", {}) + if plugin.config_json_schema: + config_properties = json.loads(plugin.config_json_schema).get( + "properties", {} + ) config_names = "
".join( [ f"- {property_name}" @@ -1065,7 +663,7 @@ def generate( ] if platform_id not in lineage_not_applicable_sources: f.write( - f"| [{platform_name}](../../generated/ingestion/sources/{platform_id}.md) | {table_level_supported} | {column_level_supported} | {config_names}|\n" + f"| [{platform_plugin_name}](../../generated/ingestion/sources/{platform_id}.md) | {table_level_supported} | {column_level_supported} | {config_names}|\n" ) f.write( diff --git a/metadata-ingestion/scripts/docgen_types.py b/metadata-ingestion/scripts/docgen_types.py new file mode 100644 index 0000000000000..c96ab955e8cce --- /dev/null +++ b/metadata-ingestion/scripts/docgen_types.py @@ -0,0 +1,45 @@ +from dataclasses import dataclass, field +from typing import Dict, List, Optional + +from datahub.ingestion.api.decorators import CapabilitySetting, SupportStatus + + +@dataclass +class Plugin: + # Required fields + name: str + platform_id: str + platform_name: str + classname: str + + # Optional documentation fields + source_docstring: Optional[str] = None + config_json_schema: Optional[str] = None + config_md: Optional[str] = None + custom_docs_pre: Optional[str] = None + custom_docs_post: Optional[str] = None + starter_recipe: Optional[str] = None + + # Optional metadata fields + support_status: SupportStatus = SupportStatus.UNKNOWN + filename: Optional[str] = None + doc_order: Optional[int] = None + + # Lists with empty defaults + capabilities: List[CapabilitySetting] = field(default_factory=list) + extra_deps: List[str] = field(default_factory=list) + + +@dataclass +class Platform: + # Required fields + id: str + name: str + + # Optional fields + custom_docs_pre: Optional[str] = None + plugins: Dict[str, Plugin] = field(default_factory=dict) + + def add_plugin(self, plugin_name: str, plugin: Plugin) -> None: + """Helper method to add a plugin to the platform""" + self.plugins[plugin_name] = plugin diff --git a/metadata-ingestion/scripts/docs_config_table.py b/metadata-ingestion/scripts/docs_config_table.py new file mode 100644 index 0000000000000..3c5d9d0b0a2ba --- /dev/null +++ b/metadata-ingestion/scripts/docs_config_table.py @@ -0,0 +1,376 @@ +import html +import json +import re +from typing import Any, Dict, Iterable, List, Optional, Type + +from pydantic import BaseModel, Field + +from datahub.ingestion.extractor.json_schema_util import JsonSchemaTranslator +from datahub.metadata.schema_classes import SchemaFieldClass + +DEFAULT_VALUE_MAX_LENGTH = 50 +DEFAULT_VALUE_TRUNCATION_MESSAGE = "..." + + +def _truncate_default_value(value: str) -> str: + if len(value) > DEFAULT_VALUE_MAX_LENGTH: + return value[:DEFAULT_VALUE_MAX_LENGTH] + DEFAULT_VALUE_TRUNCATION_MESSAGE + return value + + +def _format_path_component(path: str) -> str: + """ + Given a path like 'a.b.c', adds css tags to the components. + """ + path_components = path.rsplit(".", maxsplit=1) + if len(path_components) == 1: + return f'{path_components[0]}' + + return ( + f'{path_components[0]}.' + f'{path_components[1]}' + ) + + +def _format_type_name(type_name: str) -> str: + return f'{type_name}' + + +def _format_default_line(default_value: str, has_desc_above: bool) -> str: + default_value = _truncate_default_value(default_value) + escaped_value = ( + html.escape(default_value) + # Replace curly braces to avoid JSX issues. + .replace("{", "{") + .replace("}", "}") + # We also need to replace markdown special characters. + .replace("*", "*") + .replace("_", "_") + .replace("[", "[") + .replace("]", "]") + .replace("|", "|") + .replace("`", "`") + ) + value_elem = f'{escaped_value}' + return f'
Default: {value_elem}
' + + +class FieldRow(BaseModel): + path: str + parent: Optional[str] + type_name: str + required: bool + has_default: bool + default: str + description: str + inner_fields: List["FieldRow"] = Field(default_factory=list) + discriminated_type: Optional[str] = None + + class Component(BaseModel): + type: str + field_name: Optional[str] + + # matches any [...] style section inside a field path + _V2_FIELD_PATH_TOKEN_MATCHER = r"\[[\w.]*[=]*[\w\(\-\ \_\).]*\][\.]*" + # matches a .?[...] style section inside a field path anchored to the beginning + _V2_FIELD_PATH_TOKEN_MATCHER_PREFIX = rf"^[\.]*{_V2_FIELD_PATH_TOKEN_MATCHER}" + _V2_FIELD_PATH_FIELD_NAME_MATCHER = r"^\w+" + + @staticmethod + def map_field_path_to_components(field_path: str) -> List[Component]: + m = re.match(FieldRow._V2_FIELD_PATH_TOKEN_MATCHER_PREFIX, field_path) + v = re.match(FieldRow._V2_FIELD_PATH_FIELD_NAME_MATCHER, field_path) + components: List[FieldRow.Component] = [] + while m or v: + token = m.group() if m else v.group() # type: ignore + if v: + if components: + if components[-1].field_name is None: + components[-1].field_name = token + else: + components.append( + FieldRow.Component(type="non_map_type", field_name=token) + ) + else: + components.append( + FieldRow.Component(type="non_map_type", field_name=token) + ) + + if m: + if token.startswith("[version="): + pass + elif "[type=" in token: + type_match = re.match(r"[\.]*\[type=(.*)\]", token) + if type_match: + type_string = type_match.group(1) + if components and components[-1].type == "map": + if components[-1].field_name is None: + pass + else: + new_component = FieldRow.Component( + type="map_key", field_name="`key`" + ) + components.append(new_component) + new_component = FieldRow.Component( + type=type_string, field_name=None + ) + components.append(new_component) + if type_string == "map": + new_component = FieldRow.Component( + type=type_string, field_name=None + ) + components.append(new_component) + + field_path = field_path[m.span()[1] :] if m else field_path[v.span()[1] :] # type: ignore + m = re.match(FieldRow._V2_FIELD_PATH_TOKEN_MATCHER_PREFIX, field_path) + v = re.match(FieldRow._V2_FIELD_PATH_FIELD_NAME_MATCHER, field_path) + + return components + + @staticmethod + def field_path_to_components(field_path: str) -> List[str]: + """ + Inverts the field_path v2 format to get the canonical field path + [version=2.0].[type=x].foo.[type=string(format=uri)].bar => ["foo","bar"] + """ + if "type=map" not in field_path: + return re.sub(FieldRow._V2_FIELD_PATH_TOKEN_MATCHER, "", field_path).split( + "." + ) + else: + # fields with maps in them need special handling to insert the `key` fragment + return [ + c.field_name + for c in FieldRow.map_field_path_to_components(field_path) + if c.field_name + ] + + @classmethod + def from_schema_field(cls, schema_field: SchemaFieldClass) -> "FieldRow": + path_components = FieldRow.field_path_to_components(schema_field.fieldPath) + + parent = path_components[-2] if len(path_components) >= 2 else None + if parent == "`key`": + # the real parent node is one index above + parent = path_components[-3] + json_props = ( + json.loads(schema_field.jsonProps) if schema_field.jsonProps else {} + ) + + required = json_props.get("required", True) + has_default = "default" in json_props + default_value = str(json_props.get("default")) + + field_path = ".".join(path_components) + + return FieldRow( + path=field_path, + parent=parent, + type_name=str(schema_field.nativeDataType), + required=required, + has_default=has_default, + default=default_value, + description=schema_field.description, + inner_fields=[], + discriminated_type=schema_field.nativeDataType, + ) + + def get_checkbox(self) -> str: + if self.required and not self.has_default: + # Using a non-breaking space to prevent the checkbox from being + # broken into a new line. + if not self.parent: # None and empty string both count + return ' ' + else: + return f' ' + else: + return "" + + def to_md_line(self) -> str: + if self.inner_fields: + if len(self.inner_fields) == 1: + type_name = self.inner_fields[0].type_name or self.type_name + else: + # To deal with unions that have essentially the same simple field path, + # we combine the type names into a single string. + type_name = "One of " + ", ".join( + [x.type_name for x in self.inner_fields if x.discriminated_type] + ) + else: + type_name = self.type_name + + description = self.description.strip() + description = self.description.replace( + "\n", "
" + ) # descriptions with newlines in them break markdown rendering + + md_line = ( + f'|
{_format_path_component(self.path)}' + f"{self.get_checkbox()}
" + f'
{_format_type_name(type_name)}
' + f"| {description} " + f"{_format_default_line(self.default, bool(description)) if self.has_default else ''} |\n" + ) + return md_line + + +class FieldHeader(FieldRow): + def to_md_line(self) -> str: + return "\n".join( + [ + "| Field | Description |", + "|:--- |:--- |", + "", + ] + ) + + def __init__(self): + pass + + +def get_prefixed_name(field_prefix: Optional[str], field_name: Optional[str]) -> str: + assert ( + field_prefix or field_name + ), "One of field_prefix or field_name should be present" + return ( + f"{field_prefix}.{field_name}" # type: ignore + if field_prefix and field_name + else field_name + if not field_prefix + else field_prefix + ) + + +def custom_comparator(path: str) -> str: + """ + Projects a string onto a separate space + Low_prio string will start with Z else start with A + Number of field paths will add the second set of letters: 00 - 99 + + """ + opt1 = path + prio_value = priority_value(opt1) + projection = f"{prio_value}" + projection = f"{projection}{opt1}" + return projection + + +class FieldTree: + """ + A helper class that re-constructs the tree hierarchy of schema fields + to help sort fields by importance while keeping nesting intact + """ + + def __init__(self, field: Optional[FieldRow] = None): + self.field = field + self.fields: Dict[str, "FieldTree"] = {} + + def add_field(self, row: FieldRow, path: Optional[str] = None) -> "FieldTree": + # logger.warn(f"Add field: path:{path}, row:{row}") + if self.field and self.field.path == row.path: + # we have an incoming field with the same path as us, this is probably a union variant + # attach to existing field + self.field.inner_fields.append(row) + else: + path = path if path is not None else row.path + top_level_field = path.split(".")[0] + if top_level_field in self.fields: + self.fields[top_level_field].add_field( + row, ".".join(path.split(".")[1:]) + ) + else: + self.fields[top_level_field] = FieldTree(field=row) + # logger.warn(f"{self}") + return self + + def sort(self): + # Required fields before optionals + required_fields = { + k: v for k, v in self.fields.items() if v.field and v.field.required + } + optional_fields = { + k: v for k, v in self.fields.items() if v.field and not v.field.required + } + + self.sorted_fields = [] + for field_map in [required_fields, optional_fields]: + # Top-level fields before fields with nesting + self.sorted_fields.extend( + sorted( + [f for f, val in field_map.items() if val.fields == {}], + key=custom_comparator, + ) + ) + self.sorted_fields.extend( + sorted( + [f for f, val in field_map.items() if val.fields != {}], + key=custom_comparator, + ) + ) + + for field_tree in self.fields.values(): + field_tree.sort() + + def get_fields(self) -> Iterable[FieldRow]: + if self.field: + yield self.field + for key in self.sorted_fields: + yield from self.fields[key].get_fields() + + def __repr__(self) -> str: + result = {} + if self.field: + result["_self"] = json.loads(json.dumps(self.field.dict())) + for f in self.fields: + result[f] = json.loads(str(self.fields[f])) + return json.dumps(result, indent=2) + + +def priority_value(path: str) -> str: + # A map of low value tokens to their relative importance + low_value_token_map = { + "env": "X", + "classification": "Y", + "profiling": "Y", + "stateful_ingestion": "Z", + } + tokens = path.split(".") + for low_value_token in low_value_token_map: + if low_value_token in tokens: + return low_value_token_map[low_value_token] + + # everything else high-prio + return "A" + + +def gen_md_table_from_json_schema(schema_dict: Dict[str, Any]) -> str: + # we don't want default field values to be injected into the description of the field + JsonSchemaTranslator._INJECT_DEFAULTS_INTO_DESCRIPTION = False + schema_fields = list(JsonSchemaTranslator.get_fields_from_schema(schema_dict)) + result: List[str] = [FieldHeader().to_md_line()] + + field_tree = FieldTree(field=None) + for field in schema_fields: + row: FieldRow = FieldRow.from_schema_field(field) + field_tree.add_field(row) + + field_tree.sort() + + for row in field_tree.get_fields(): + result.append(row.to_md_line()) + + # Wrap with a .config-table div. + result = ["\n
\n\n", *result, "\n
\n"] + + return "".join(result) + + +def gen_md_table_from_pydantic(model: Type[BaseModel]) -> str: + return gen_md_table_from_json_schema(model.schema()) + + +if __name__ == "__main__": + # Simple test code. + from datahub.ingestion.source.snowflake.snowflake_config import SnowflakeV2Config + + print("".join(gen_md_table_from_pydantic(SnowflakeV2Config))) diff --git a/metadata-ingestion/src/datahub/ingestion/graph/client.py b/metadata-ingestion/src/datahub/ingestion/graph/client.py index 7de6e8130a7ab..8c5f894a072d9 100644 --- a/metadata-ingestion/src/datahub/ingestion/graph/client.py +++ b/metadata-ingestion/src/datahub/ingestion/graph/client.py @@ -248,9 +248,11 @@ def make_rest_sink( with DatahubRestSink(PipelineContext(run_id=run_id), sink_config) as sink: yield sink if sink.report.failures: + logger.error( + f"Failed to emit {len(sink.report.failures)} records\n{sink.report.as_string()}" + ) raise OperationalError( - f"Failed to emit {len(sink.report.failures)} records", - info=sink.report.as_obj(), + f"Failed to emit {len(sink.report.failures)} records" ) def emit_all( diff --git a/metadata-ingestion/src/datahub/ingestion/source/aws/glue.py b/metadata-ingestion/src/datahub/ingestion/source/aws/glue.py index a0bed4ae9a758..30e8164383737 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/aws/glue.py +++ b/metadata-ingestion/src/datahub/ingestion/source/aws/glue.py @@ -248,6 +248,9 @@ def report_table_dropped(self, table: str) -> None: "Enabled by default when stateful ingestion is turned on.", ) @capability(SourceCapability.LINEAGE_COARSE, "Enabled by default") +@capability( + SourceCapability.LINEAGE_FINE, "Support via the `emit_s3_lineage` config field" +) class GlueSource(StatefulIngestionSourceBase): """ Note: if you also have files in S3 that you'd like to ingest, we recommend you use Glue's built-in data catalog. See [here](../../../../docs/generated/ingestion/sources/s3.md) for a quick guide on how to set up a crawler on Glue and ingest the outputs with DataHub. @@ -284,12 +287,22 @@ class GlueSource(StatefulIngestionSourceBase): "Action": [ "glue:GetDataflowGraph", "glue:GetJobs", + "s3:GetObject", ], "Resource": "*" } ``` - plus `s3:GetObject` for the job script locations. + For profiling datasets, the following additional permissions are required: + ```json + { + "Effect": "Allow", + "Action": [ + "glue:GetPartitions", + ], + "Resource": "*" + } + ``` """ diff --git a/metadata-ingestion/src/datahub/ingestion/source/gc/execution_request_cleanup.py b/metadata-ingestion/src/datahub/ingestion/source/gc/execution_request_cleanup.py index f9a00d7f00905..c1763b16f3670 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/gc/execution_request_cleanup.py +++ b/metadata-ingestion/src/datahub/ingestion/source/gc/execution_request_cleanup.py @@ -29,7 +29,7 @@ class DatahubExecutionRequestCleanupConfig(ConfigModel): ) keep_history_max_days: int = Field( - 30, + 90, description="Maximum number of days to keep execution requests for, per ingestion source", ) @@ -48,6 +48,10 @@ class DatahubExecutionRequestCleanupConfig(ConfigModel): description="Maximum runtime in seconds for the cleanup task", ) + limit_entities_delete: Optional[int] = Field( + 10000, description="Max number of execution requests to hard delete." + ) + max_read_errors: int = Field( default=10, description="Maximum number of read errors before aborting", @@ -65,6 +69,8 @@ class DatahubExecutionRequestCleanupReport(SourceReport): ergc_delete_errors: int = 0 ergc_start_time: Optional[datetime.datetime] = None ergc_end_time: Optional[datetime.datetime] = None + ergc_delete_limit_reached: bool = False + ergc_runtime_limit_reached: bool = False class CleanupRecord(BaseModel): @@ -85,12 +91,20 @@ def __init__( self.graph = graph self.report = report self.instance_id = int(time.time()) + self.last_print_time = 0.0 if config is not None: self.config = config else: self.config = DatahubExecutionRequestCleanupConfig() + def _print_report(self) -> None: + time_taken = round(time.time() - self.last_print_time, 1) + # Print report every 2 minutes + if time_taken > 120: + self.last_print_time = time.time() + logger.info(f"\n{self.report.as_string()}") + def _to_cleanup_record(self, entry: Dict) -> CleanupRecord: input_aspect = ( entry.get("aspects", {}) @@ -175,6 +189,7 @@ def _scroll_garbage_records(self): running_guard_timeout = now_ms - 30 * 24 * 3600 * 1000 for entry in self._scroll_execution_requests(): + self._print_report() self.report.ergc_records_read += 1 key = entry.ingestion_source @@ -225,15 +240,12 @@ def _scroll_garbage_records(self): f"record timestamp: {entry.requested_at}." ) ) - self.report.ergc_records_deleted += 1 yield entry def _delete_entry(self, entry: CleanupRecord) -> None: try: - logger.info( - f"ergc({self.instance_id}): going to delete ExecutionRequest {entry.request_id}" - ) self.graph.delete_entity(entry.urn, True) + self.report.ergc_records_deleted += 1 except Exception as e: self.report.ergc_delete_errors += 1 self.report.failure( @@ -252,10 +264,23 @@ def _reached_runtime_limit(self) -> bool: >= datetime.timedelta(seconds=self.config.runtime_limit_seconds) ) ): + self.report.ergc_runtime_limit_reached = True logger.info(f"ergc({self.instance_id}): max runtime reached.") return True return False + def _reached_delete_limit(self) -> bool: + if ( + self.config.limit_entities_delete + and self.report.ergc_records_deleted >= self.config.limit_entities_delete + ): + logger.info( + f"ergc({self.instance_id}): max delete limit reached: {self.config.limit_entities_delete}." + ) + self.report.ergc_delete_limit_reached = True + return True + return False + def run(self) -> None: if not self.config.enabled: logger.info( @@ -274,7 +299,7 @@ def run(self) -> None: ) for entry in self._scroll_garbage_records(): - if self._reached_runtime_limit(): + if self._reached_runtime_limit() or self._reached_delete_limit(): break self._delete_entry(entry) diff --git a/metadata-ingestion/src/datahub/ingestion/source/gc/soft_deleted_entity_cleanup.py b/metadata-ingestion/src/datahub/ingestion/source/gc/soft_deleted_entity_cleanup.py index 0a52b7e17bf71..471eeff0224ed 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/gc/soft_deleted_entity_cleanup.py +++ b/metadata-ingestion/src/datahub/ingestion/source/gc/soft_deleted_entity_cleanup.py @@ -231,6 +231,15 @@ def _process_futures(self, futures: Dict[Future, str]) -> Dict[Future, str]: def _get_soft_deleted(self, graphql_query: str, entity_type: str) -> Iterable[str]: assert self.ctx.graph scroll_id: Optional[str] = None + + batch_size = self.config.batch_size + if entity_type == "DATA_PROCESS_INSTANCE": + # Due to a bug in Data process instance querying this is a temp workaround + # to avoid a giant stacktrace by having a smaller batch size in first call + # This will be remove in future version after server with fix has been + # around for a while + batch_size = 10 + while True: try: result = self.ctx.graph.execute_graphql( @@ -240,7 +249,7 @@ def _get_soft_deleted(self, graphql_query: str, entity_type: str) -> Iterable[st "types": [entity_type], "query": "*", "scrollId": scroll_id if scroll_id else None, - "count": self.config.batch_size, + "count": batch_size, "orFilters": [ { "and": [ @@ -263,6 +272,10 @@ def _get_soft_deleted(self, graphql_query: str, entity_type: str) -> Iterable[st scroll_across_entities = result.get("scrollAcrossEntities") if not scroll_across_entities or not scroll_across_entities.get("count"): break + if entity_type == "DATA_PROCESS_INSTANCE": + # Temp workaround. See note in beginning of the function + # We make the batch size = config after call has succeeded once + batch_size = self.config.batch_size scroll_id = scroll_across_entities.get("nextScrollId") self.report.num_queries_found += scroll_across_entities.get("count") for query in scroll_across_entities.get("searchResults"): diff --git a/metadata-ingestion/src/datahub/ingestion/source/powerbi_report_server/report_server.py b/metadata-ingestion/src/datahub/ingestion/source/powerbi_report_server/report_server.py index 2a247d0c63957..4764400215e12 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/powerbi_report_server/report_server.py +++ b/metadata-ingestion/src/datahub/ingestion/source/powerbi_report_server/report_server.py @@ -485,7 +485,7 @@ def report_dropped(self, view: str) -> None: self.filtered_reports.append(view) -@platform_name("PowerBI") +@platform_name("PowerBI Report Server") @config_class(PowerBiReportServerDashboardSourceConfig) @support_status(SupportStatus.INCUBATING) @capability(SourceCapability.OWNERSHIP, "Enabled by default") diff --git a/metadata-ingestion/tests/unit/urns/test_urn.py b/metadata-ingestion/tests/unit/urns/test_urn.py index 0c362473c0cf1..bee80ec33148e 100644 --- a/metadata-ingestion/tests/unit/urns/test_urn.py +++ b/metadata-ingestion/tests/unit/urns/test_urn.py @@ -4,7 +4,13 @@ import pytest -from datahub.metadata.urns import CorpUserUrn, DatasetUrn, Urn +from datahub.metadata.urns import ( + CorpUserUrn, + DataPlatformUrn, + DatasetUrn, + SchemaFieldUrn, + Urn, +) from datahub.utilities.urns.error import InvalidUrnError pytestmark = pytest.mark.filterwarnings("ignore::DeprecationWarning") @@ -60,6 +66,20 @@ def test_urn_coercion() -> None: assert urn == Urn.from_string(urn.urn()) +def test_urns_in_init() -> None: + platform = DataPlatformUrn("abc") + assert platform.urn() == "urn:li:dataPlatform:abc" + + dataset_urn = DatasetUrn(platform, "def", "PROD") + assert dataset_urn.urn() == "urn:li:dataset:(urn:li:dataPlatform:abc,def,PROD)" + + schema_field = SchemaFieldUrn(dataset_urn, "foo") + assert ( + schema_field.urn() + == "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:abc,def,PROD),foo)" + ) + + def test_urn_type_dispatch_1() -> None: urn = Urn.from_string("urn:li:dataset:(urn:li:dataPlatform:abc,def,PROD)") assert isinstance(urn, DatasetUrn) diff --git a/metadata-service/configuration/src/main/resources/bootstrap_mcps/ingestion-datahub-gc.yaml b/metadata-service/configuration/src/main/resources/bootstrap_mcps/ingestion-datahub-gc.yaml index c0c5be85b16b1..8879a2f654994 100644 --- a/metadata-service/configuration/src/main/resources/bootstrap_mcps/ingestion-datahub-gc.yaml +++ b/metadata-service/configuration/src/main/resources/bootstrap_mcps/ingestion-datahub-gc.yaml @@ -21,19 +21,30 @@ truncate_indices: {{truncate_indices}}{{^truncate_indices}}true{{/truncate_indices}} truncate_index_older_than_days: {{truncate_indices_retention_days}}{{^truncate_indices_retention_days}}30{{/truncate_indices_retention_days}} dataprocess_cleanup: + enabled: {{dataprocess_cleanup.enabled}}{{^dataprocess_cleanup.enabled}}false{{/dataprocess_cleanup.enabled}} retention_days: {{dataprocess_cleanup.retention_days}}{{^dataprocess_cleanup.retention_days}}10{{/dataprocess_cleanup.retention_days}} - delete_empty_data_jobs: {{dataprocess_cleanup.delete_empty_data_jobs}}{{^dataprocess_cleanup.delete_empty_data_jobs}}true{{/dataprocess_cleanup.delete_empty_data_jobs}} - delete_empty_data_flows: {{dataprocess_cleanup.delete_empty_data_flows}}{{^dataprocess_cleanup.delete_empty_data_flows}}true{{/dataprocess_cleanup.delete_empty_data_flows}} + delete_empty_data_jobs: {{dataprocess_cleanup.delete_empty_data_jobs}}{{^dataprocess_cleanup.delete_empty_data_jobs}}false{{/dataprocess_cleanup.delete_empty_data_jobs}} + delete_empty_data_flows: {{dataprocess_cleanup.delete_empty_data_flows}}{{^dataprocess_cleanup.delete_empty_data_flows}}false{{/dataprocess_cleanup.delete_empty_data_flows}} hard_delete_entities: {{dataprocess_cleanup.hard_delete_entities}}{{^dataprocess_cleanup.hard_delete_entities}}false{{/dataprocess_cleanup.hard_delete_entities}} keep_last_n: {{dataprocess_cleanup.keep_last_n}}{{^dataprocess_cleanup.keep_last_n}}5{{/dataprocess_cleanup.keep_last_n}} + batch_size: {{dataprocess_cleanup.batch_size}}{{^dataprocess_cleanup.batch_size}}500{{/dataprocess_cleanup.batch_size}} + max_workers: {{dataprocess_cleanup.max_workers}}{{^dataprocess_cleanup.max_workers}}10{{/dataprocess_cleanup.max_workers}} soft_deleted_entities_cleanup: retention_days: {{soft_deleted_entities_cleanup.retention_days}}{{^soft_deleted_entities_cleanup.retention_days}}10{{/soft_deleted_entities_cleanup.retention_days}} + enabled: {{soft_deleted_entities_cleanup.enabled}}{{^soft_deleted_entities_cleanup.enabled}}true{{/soft_deleted_entities_cleanup.enabled}} + batch_size: {{soft_deleted_entities_cleanup.batch_size}}{{^soft_deleted_entities_cleanup.batch_size}}500{{/soft_deleted_entities_cleanup.batch_size}} + max_workers: {{soft_deleted_entities_cleanup.max_workers}}{{^soft_deleted_entities_cleanup.max_workers}}10{{/soft_deleted_entities_cleanup.max_workers}} + limit_entities_delete: {{soft_deleted_entities_cleanup.limit_entities_delete}}{{^soft_deleted_entities_cleanup.limit_entities_delete}}25000{{/soft_deleted_entities_cleanup.limit_entities_delete}} + runtime_limit_seconds: {{soft_deleted_entities_cleanup.runtime_limit_seconds}}{{^soft_deleted_entities_cleanup.runtime_limit_seconds}}7200{{/soft_deleted_entities_cleanup.runtime_limit_seconds}} execution_request_cleanup: keep_history_min_count: {{execution_request_cleanup.keep_history_min_count}}{{^execution_request_cleanup.keep_history_min_count}}10{{/execution_request_cleanup.keep_history_min_count}} keep_history_max_count: {{execution_request_cleanup.keep_history_max_count}}{{^execution_request_cleanup.keep_history_max_count}}1000{{/execution_request_cleanup.keep_history_max_count}} - keep_history_max_days: {{execution_request_cleanup.keep_history_max_days}}{{^execution_request_cleanup.keep_history_max_days}}30{{/execution_request_cleanup.keep_history_max_days}} + keep_history_max_days: {{execution_request_cleanup.keep_history_max_days}}{{^execution_request_cleanup.keep_history_max_days}}90{{/execution_request_cleanup.keep_history_max_days}} batch_read_size: {{execution_request_cleanup.batch_read_size}}{{^execution_request_cleanup.batch_read_size}}100{{/execution_request_cleanup.batch_read_size}} - enabled: {{execution_request_cleanup.enabled}}{{^execution_request_cleanup.enabled}}false{{/execution_request_cleanup.enabled}} + enabled: {{execution_request_cleanup.enabled}}{{^execution_request_cleanup.enabled}}true{{/execution_request_cleanup.enabled}} + runtime_limit_seconds: {{execution_request_cleanup.runtime_limit_seconds}}{{^execution_request_cleanup.runtime_limit_seconds}}3600{{/execution_request_cleanup.runtime_limit_seconds}} + limit_entities_delete: {{execution_request_cleanup.limit_entities_delete}}{{^execution_request_cleanup.limit_entities_delete}}10000{{/execution_request_cleanup.limit_entities_delete}} + max_read_errors: {{execution_request_cleanup.max_read_errors}}{{^execution_request_cleanup.max_read_errors}}10{{/execution_request_cleanup.max_read_errors}} extraArgs: {} debugMode: false executorId: default