Skip to content

Commit

Permalink
Merge branch 'fix/databricks_repair_run_deferrable' of https://github…
Browse files Browse the repository at this point in the history
….com/raghvendra-singh1/airflow into fix/databricks_repair_run_deferrable
  • Loading branch information
raghvendra-singh1 committed Dec 10, 2024
2 parents 03e8894 + f08dfce commit ad55a15
Show file tree
Hide file tree
Showing 186 changed files with 4,100 additions and 1,450 deletions.
7 changes: 7 additions & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,13 @@ repos:
language: python
additional_dependencies: ['rich>=12.4.4']
require_serial: true
- id: check-imports-in-providers
name: Check imports in providers
entry: ./scripts/ci/pre_commit/check_imports_in_providers.py
language: python
additional_dependencies: ['rich>=12.4.4', "ruff==0.8.1"]
files: ^providers/src/airflow/providers/.*\.py$
require_serial: true
- id: update-common-sql-api-stubs
name: Check and update common.sql API stubs
entry: ./scripts/ci/pre_commit/update_common_sql_api_stubs.py
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ ARG PYTHON_BASE_IMAGE="python:3.9-slim-bookworm"
# Also use `force pip` label on your PR to swap all places we use `uv` to `pip`
ARG AIRFLOW_PIP_VERSION=24.3.1
# ARG AIRFLOW_PIP_VERSION="git+https://github.com/pypa/pip.git@main"
ARG AIRFLOW_UV_VERSION=0.5.6
ARG AIRFLOW_UV_VERSION=0.5.7
ARG AIRFLOW_USE_UV="false"
ARG UV_HTTP_TIMEOUT="300"
ARG AIRFLOW_IMAGE_REPOSITORY="https://github.com/apache/airflow"
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile.ci
Original file line number Diff line number Diff line change
Expand Up @@ -1381,7 +1381,7 @@ RUN bash /scripts/docker/install_packaging_tools.sh; \
# Also use `force pip` label on your PR to swap all places we use `uv` to `pip`
ARG AIRFLOW_PIP_VERSION=24.3.1
# ARG AIRFLOW_PIP_VERSION="git+https://github.com/pypa/pip.git@main"
ARG AIRFLOW_UV_VERSION=0.5.6
ARG AIRFLOW_UV_VERSION=0.5.7
# TODO(potiuk): automate with upgrade check (possibly)
ARG AIRFLOW_PRE_COMMIT_VERSION="4.0.1"
ARG AIRFLOW_PRE_COMMIT_UV_VERSION="4.1.4"
Expand Down
19 changes: 18 additions & 1 deletion airflow/api_fastapi/common/parameters.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,16 @@
from sqlalchemy.inspection import inspect

from airflow.models import Base
from airflow.models.asset import AssetModel, DagScheduleAssetReference, TaskOutletAssetReference
from airflow.models.asset import (
AssetAliasModel,
AssetModel,
DagScheduleAssetReference,
TaskOutletAssetReference,
)
from airflow.models.dag import DagModel, DagTag
from airflow.models.dagrun import DagRun
from airflow.models.taskinstance import TaskInstance
from airflow.models.variable import Variable
from airflow.typing_compat import Self
from airflow.utils import timezone
from airflow.utils.state import DagRunState, TaskInstanceState
Expand Down Expand Up @@ -568,7 +574,18 @@ def _transform_ti_states(states: list[str] | None) -> list[TaskInstanceState | N
]

# Assets
QueryAssetNamePatternSearch = Annotated[
_SearchParam, Depends(search_param_factory(AssetModel.name, "name_pattern"))
]
QueryUriPatternSearch = Annotated[_SearchParam, Depends(search_param_factory(AssetModel.uri, "uri_pattern"))]
QueryAssetAliasNamePatternSearch = Annotated[
_SearchParam, Depends(search_param_factory(AssetAliasModel.name, "name_pattern"))
]
QueryAssetDagIdPatternSearch = Annotated[
_DagIdAssetReferenceFilter, Depends(_DagIdAssetReferenceFilter().depends)
]

# Variables
QueryVariableKeyPatternSearch = Annotated[
_SearchParam, Depends(search_param_factory(Variable.key, "variable_key_pattern"))
]
30 changes: 18 additions & 12 deletions airflow/api_fastapi/core_api/datamodels/assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,6 @@ class TaskOutletAssetReference(BaseModel):
updated_at: datetime


class AssetAliasSchema(BaseModel):
"""Asset alias serializer for assets."""

id: int
name: str
group: str


class AssetResponse(BaseModel):
"""Asset serializer for responses."""

Expand All @@ -62,7 +54,7 @@ class AssetResponse(BaseModel):
updated_at: datetime
consuming_dags: list[DagScheduleAssetReference]
producing_tasks: list[TaskOutletAssetReference]
aliases: list[AssetAliasSchema]
aliases: list[AssetAliasResponse]

@field_validator("extra", mode="after")
@classmethod
Expand All @@ -77,6 +69,21 @@ class AssetCollectionResponse(BaseModel):
total_entries: int


class AssetAliasResponse(BaseModel):
"""Asset alias serializer for responses."""

id: int
name: str
group: str


class AssetAliasCollectionResponse(BaseModel):
"""Asset alias collection response."""

asset_aliases: list[AssetAliasResponse]
total_entries: int


class DagRunAssetReference(BaseModel):
"""DAGRun serializer for asset responses."""

Expand All @@ -95,7 +102,6 @@ class AssetEventResponse(BaseModel):

id: int
asset_id: int
uri: str
extra: dict | None = None
source_task_id: str | None = None
source_dag_id: str | None = None
Expand All @@ -120,8 +126,8 @@ class AssetEventCollectionResponse(BaseModel):
class QueuedEventResponse(BaseModel):
"""Queued Event serializer for responses.."""

uri: str
dag_id: str
asset_id: int
created_at: datetime


Expand All @@ -135,7 +141,7 @@ class QueuedEventCollectionResponse(BaseModel):
class CreateAssetEventsBody(BaseModel):
"""Create asset events request."""

uri: str
asset_id: int
extra: dict = Field(default_factory=dict)

@field_validator("extra", mode="after")
Expand Down
Loading

0 comments on commit ad55a15

Please sign in to comment.