From 291ad2e263f0f295348d6d2a067be8d9ff0c542f Mon Sep 17 00:00:00 2001 From: Kacper Muda Date: Thu, 6 Jun 2024 14:23:32 +0200 Subject: [PATCH] Add task SLA and queued datetime information to AirflowRunFacet (#40091) Signed-off-by: Kacper Muda --- airflow/providers/openlineage/facets/AirflowRunFacet.json | 7 +++++++ airflow/providers/openlineage/utils/utils.py | 5 ++++- 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/airflow/providers/openlineage/facets/AirflowRunFacet.json b/airflow/providers/openlineage/facets/AirflowRunFacet.json index 504fb1bc3ab16..8425258eefd09 100644 --- a/airflow/providers/openlineage/facets/AirflowRunFacet.json +++ b/airflow/providers/openlineage/facets/AirflowRunFacet.json @@ -97,6 +97,9 @@ "run_as_user": { "type": "string" }, + "sla": { + "type": "number" + }, "task_id": { "type": "string" }, @@ -198,6 +201,10 @@ }, "try_number": { "type": "integer" + }, + "queued_dttm": { + "type": "string", + "format": "date-time" } }, "additionalProperties": true, diff --git a/airflow/providers/openlineage/utils/utils.py b/airflow/providers/openlineage/utils/utils.py index 6904b32d2db50..a56bf58884f58 100644 --- a/airflow/providers/openlineage/utils/utils.py +++ b/airflow/providers/openlineage/utils/utils.py @@ -145,6 +145,8 @@ def __init__(self, obj): def _cast_basic_types(value): if isinstance(value, datetime.datetime): return value.isoformat() + if isinstance(value, datetime.timedelta): + return f"{value.total_seconds()} seconds" if isinstance(value, (set, list, tuple)): return str(list(value)) return value @@ -201,7 +203,7 @@ class DagRunInfo(InfoJsonEncodable): class TaskInstanceInfo(InfoJsonEncodable): """Defines encoding TaskInstance object to JSON.""" - includes = ["duration", "try_number", "pool"] + includes = ["duration", "try_number", "pool", "queued_dttm"] casts = { "map_index": lambda ti: ( ti.map_index if hasattr(ti, "map_index") and getattr(ti, "map_index") != -1 else None @@ -235,6 +237,7 @@ class TaskInfo(InfoJsonEncodable): "retries", "retry_exponential_backoff", "run_as_user", + "sla", "task_id", "trigger_rule", "upstream_task_ids",