Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master+iceberg-release' into ice…
Browse files Browse the repository at this point in the history
…berg-rest-api
  • Loading branch information
david-leifker committed Jan 30, 2025
2 parents ffce018 + 9c568f0 commit ca40351
Show file tree
Hide file tree
Showing 63 changed files with 6,504 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,8 @@ private SearchUtils() {}
EntityType.DATA_PRODUCT,
EntityType.NOTEBOOK,
EntityType.BUSINESS_ATTRIBUTE,
EntityType.SCHEMA_FIELD);
EntityType.SCHEMA_FIELD,
EntityType.DATA_PLATFORM_INSTANCE);

/** Entities that are part of autocomplete by default in Auto Complete Across Entities */
public static final List<EntityType> AUTO_COMPLETE_ENTITY_TYPES =
Expand Down
3 changes: 3 additions & 0 deletions datahub-web-react/src/graphql/search.graphql
Original file line number Diff line number Diff line change
Expand Up @@ -482,6 +482,9 @@ fragment searchResultsWithoutSchemaField on Entity {
}
}
}
... on DataPlatformInstance {
...dataPlatformInstanceFields
}
... on Role {
id
properties {
Expand Down
6 changes: 6 additions & 0 deletions docs/advanced/mcp-mcl.md
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,7 @@ A writer can specify that the aspect must NOT have been modified after a specifi
`If-Modified-Since`
A writer can specify that the aspect must have been modified after a specific time, following [If-Modified-Since](https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/If-Modified-Since) http headers.


#### Change Types: [`CREATE`, `CREATE_ENTITY`]

Another form of conditional writes which considers the existence of an aspect or entity uses the following Change Types.
Expand All @@ -221,3 +222,8 @@ Another form of conditional writes which considers the existence of an aspect or
By default, a validation exception is thrown if the `CREATE`/`CREATE_ENTITY` constraint is violated. If the write operation
should be dropped without considering it an exception, then add the following header: `If-None-Match: *` to the MCP.

### Synchronous ElasticSearch Updates

The writes to the elasticsearch are asynchronous by default. A writer can add a custom header
`X-DataHub-Sync-Index-Update` to the MCP `headers` with value set to `true` to enable a synchronous update of
elasticsearch for specific MCPs that may benefit from it.
7 changes: 7 additions & 0 deletions li-utils/src/main/java/com/linkedin/metadata/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ public class Constants {
public static final String INTERNAL_DELEGATED_FOR_ACTOR_HEADER_NAME = "X-DataHub-Delegated-For";
public static final String INTERNAL_DELEGATED_FOR_ACTOR_TYPE = "X-DataHub-Delegated-For-";

// Use on specific MCP to request an synchronous index update avoid the kafka lag.
public static final String SYNC_INDEX_UPDATE_HEADER_NAME = "X-DataHub-Sync-Index-Update";

public static final String URN_LI_PREFIX = "urn:li:";
public static final String DATAHUB_ACTOR = "urn:li:corpuser:datahub"; // Super user.
public static final String SYSTEM_ACTOR =
Expand Down Expand Up @@ -103,6 +106,7 @@ public class Constants {
public static final String FORM_ENTITY_NAME = "form";
public static final String RESTRICTED_ENTITY_NAME = "restricted";
public static final String BUSINESS_ATTRIBUTE_ENTITY_NAME = "businessAttribute";
public static final String PLATFORM_RESOURCE_ENTITY_NAME = "platformResource";

/** Aspects */
// Common
Expand Down Expand Up @@ -211,6 +215,9 @@ public class Constants {
public static final String DATA_PLATFORM_INSTANCE_PROPERTIES_ASPECT_NAME =
"dataPlatformInstanceProperties";

// PlatformResource
public static final String PLATFORM_RESOURCE_INFO_ASPECT_NAME = "platformResourceInfo";

// ML Feature
public static final String ML_FEATURE_KEY_ASPECT_NAME = "mlFeatureKey";
public static final String ML_FEATURE_PROPERTIES_ASPECT_NAME = "mlFeatureProperties";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -561,8 +561,7 @@ public static DisjunctivePrivilegeGroup buildDisjunctivePrivilegeGroup(
return buildDisjunctivePrivilegeGroup(lookupAPIPrivilege(apiGroup, apiOperation, entityType));
}

@VisibleForTesting
static DisjunctivePrivilegeGroup buildDisjunctivePrivilegeGroup(
public static DisjunctivePrivilegeGroup buildDisjunctivePrivilegeGroup(
final Disjunctive<Conjunctive<PoliciesConfig.Privilege>> privileges) {
return new DisjunctivePrivilegeGroup(
privileges.stream()
Expand Down
3 changes: 3 additions & 0 deletions metadata-ingestion/examples/iceberg/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
warehouse = "arctic_warehouse"
namespace = "alpine_db"
table_name = "resort_metrics"
108 changes: 108 additions & 0 deletions metadata-ingestion/examples/iceberg/create_table.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
from datetime import datetime

import pyarrow as pa
import pyiceberg
from constants import namespace, table_name, warehouse
from pyiceberg.catalog import load_catalog
from pyiceberg.schema import Schema
from pyiceberg.types import LongType, NestedField, StringType, TimestampType

from datahub.ingestion.graph.client import get_default_graph

# Define a more comprehensive schema for ski resort data
schema = Schema(
NestedField(
field_id=1,
name="resort_id",
field_type=LongType(),
required=True,
doc="Unique identifier for each ski resort",
initial_default=None,
),
NestedField(
field_id=2,
name="resort_name",
field_type=StringType(),
required=True,
doc="Official name of the ski resort",
initial_default=None,
),
NestedField(
field_id=3,
name="daily_snowfall",
field_type=LongType(),
required=False,
doc="Amount of new snow in inches during the last 24 hours. Null if no measurement available",
initial_default=0,
),
NestedField(
field_id=4,
name="conditions",
field_type=StringType(),
required=False,
doc="Current snow conditions description (e.g., 'Powder', 'Packed Powder', 'Groomed'). Null if not reported",
initial_default=None,
),
NestedField(
field_id=5,
name="last_updated",
field_type=TimestampType(),
required=False,
doc="Timestamp of when the snow report was last updated",
initial_default=None,
),
)

# Load the catalog with new warehouse name
graph = get_default_graph()
catalog = load_catalog("local_datahub", warehouse=warehouse, token=graph.config.token)

# Create namespace (database)
try:
catalog.create_namespace(namespace)
except Exception as e:
print(f"Namespace creation error (might already exist): {e}")

full_table_name = f"{namespace}.{table_name}"
try:
catalog.create_table(full_table_name, schema)
except pyiceberg.exceptions.TableAlreadyExistsError:
print(f"Table {full_table_name} already exists")

# Create sample data with explicit PyArrow schema to match required fields
pa_schema = pa.schema(
[
("resort_id", pa.int64(), False), # False means not nullable
("resort_name", pa.string(), False), # False means not nullable
("daily_snowfall", pa.int64(), True),
("conditions", pa.string(), True),
("last_updated", pa.timestamp("us"), True),
]
)
# Create sample data
sample_data = pa.Table.from_pydict(
{
"resort_id": [1, 2, 3],
"resort_name": ["Snowpeak Resort", "Alpine Valley", "Glacier Heights"],
"daily_snowfall": [12, 8, 15],
"conditions": ["Powder", "Packed", "Fresh Powder"],
"last_updated": [
pa.scalar(datetime.now()),
pa.scalar(datetime.now()),
pa.scalar(datetime.now()),
],
},
schema=pa_schema,
)

# Write data to table
table = catalog.load_table(full_table_name)
table.overwrite(sample_data)

table.refresh()
# Read and verify data
con = table.scan().to_duckdb(table_name=f"{table_name}")
print("\nResort Metrics Data:")
print("-" * 50)
for row in con.execute(f"SELECT * FROM {table_name}").fetchall():
print(row)
10 changes: 10 additions & 0 deletions metadata-ingestion/examples/iceberg/drop_table.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from constants import namespace, table_name, warehouse
from pyiceberg.catalog import load_catalog

# Load the catalog
from datahub.ingestion.graph.client import get_default_graph

graph = get_default_graph()
catalog = load_catalog("local_datahub", warehouse=warehouse, token=graph.config.token)
# Append the data to the Iceberg table
catalog.drop_table(f"{namespace}.{table_name}")
Loading

0 comments on commit ca40351

Please sign in to comment.