Skip to content

Commit

Permalink
Merge branch 'main' into support-multiple-queries-in-dataproc
Browse files Browse the repository at this point in the history
  • Loading branch information
amirmor1 authored Dec 14, 2024
2 parents 833b034 + ad3d022 commit 7223c00
Show file tree
Hide file tree
Showing 117 changed files with 4,186 additions and 4,255 deletions.
14 changes: 14 additions & 0 deletions .github/actions/migration_tests/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,20 @@ runs:
COMPOSE_PROJECT_NAME: "docker-compose"
env:
COMPOSE_PROJECT_NAME: "docker-compose"
- name: "Bring compose down again"
shell: bash
run: breeze down
env:
COMPOSE_PROJECT_NAME: "docker-compose"
- name: "Test offline migration ${{env.BACKEND}}"
shell: bash
run: >
breeze shell "airflow db reset -y &&
airflow db downgrade -n 2.7.0 -y &&
airflow db migrate -s"
env:
COMPOSE_PROJECT_NAME: "docker-compose"
if: env.BACKEND != 'sqlite'
- name: "Bring any containers left down"
shell: bash
run: breeze down
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/basic-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,7 @@ jobs:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
GITHUB_USERNAME: ${{ github.actor }}
VERBOSE: "true"
if: inputs.canary-run == 'true'
steps:
- name: "Cleanup repo"
shell: bash
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -739,7 +739,7 @@ jobs:
- tests-kubernetes
- tests-task-sdk
- finalize-tests
if: github.event_name == 'schedule' && failure()
if: github.event_name == 'schedule' && failure() && github.run_attempt == 1
runs-on: ["ubuntu-22.04"]
steps:
- name: Notify Slack
Expand Down
2 changes: 1 addition & 1 deletion INTHEWILD.md
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ Currently, **officially** using Airflow:
1. [Bombora Inc](https://bombora.com/) [[@jeffkpayne](https://github.com/jeffkpayne), [@pakelley](https://github.com/pakelley), [@dNavalta](https://github.com/dNavalta), [@austynh](https://github.com/austynh), [@TheOriginalAlex](https://github.com/TheOriginalAlex)]
1. [Bonial International GmbH](https://www.bonial.com/)
1. [Bonnier Broadcasting](http://www.bonnierbroadcasting.com) [[@wileeam](https://github.com/wileeam)]
1. [Bosch (Robert Bosch GmbH)](https://www.bosch.com/stories/topics/automated-driving/) [[@jscheffl](https://github.com/jscheffl), [@clellmann](https://github.com/clellmann), [@wolfdn](https://github.com/wolfdn), [@AutomationDev85](https://github.com/AutomationDev85), [@majorosdonat](https://github.com/majorosdonat)]
1. [Bosch (Robert Bosch GmbH)](https://www.bosch.com/stories/topics/automated-driving/) [[@jscheffl](https://github.com/jscheffl), [@clellmann](https://github.com/clellmann), [@wolfdn](https://github.com/wolfdn), [@AutomationDev85](https://github.com/AutomationDev85), [@majorosdonat](https://github.com/majorosdonat), [@OliverWannenwetsch](https://github.com/OliverWannenwetsch)]
1. [BounceX](http://www.bouncex.com) [[@JoshFerge](https://github.com/JoshFerge), [@hudsonrio](https://github.com/hudsonrio), [@ronniekritou](https://github.com/ronniekritou)]
1. [Braintree](https://www.braintreepayments.com) [[@coopergillan](https://github.com/coopergillan), [@curiousjazz77](https://github.com/curiousjazz77), [@raymondberg](https://github.com/raymondberg)]
1. [Branch](https://branch.io) [[@sdebarshi](https://github.com/sdebarshi), [@dmitrig01](https://github.com/dmitrig01)]
Expand Down
13 changes: 12 additions & 1 deletion airflow/api_fastapi/core_api/datamodels/dag_tags.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,18 @@

from __future__ import annotations

from pydantic import BaseModel
from pydantic import ConfigDict

from airflow.api_fastapi.core_api.base import BaseModel


class DagTagResponse(BaseModel):
"""DAG Tag serializer for responses."""

model_config = ConfigDict(populate_by_name=True, from_attributes=True)

name: str
dag_id: str


class DAGTagCollectionResponse(BaseModel):
Expand Down
4 changes: 2 additions & 2 deletions airflow/api_fastapi/core_api/datamodels/dags.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@
)

from airflow.api_fastapi.core_api.base import BaseModel
from airflow.api_fastapi.core_api.datamodels.dag_tags import DagTagResponse
from airflow.configuration import conf
from airflow.serialization.pydantic.dag import DagTagPydantic


class DAGResponse(BaseModel):
Expand All @@ -50,7 +50,7 @@ class DAGResponse(BaseModel):
description: str | None
timetable_summary: str | None
timetable_description: str | None
tags: list[DagTagPydantic]
tags: list[DagTagResponse]
max_active_tasks: int
max_active_runs: int | None
max_consecutive_failed_dag_runs: int
Expand Down
2 changes: 1 addition & 1 deletion airflow/api_fastapi/core_api/datamodels/ui/structure.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class NodeResponse(BaseModel):
label: str
tooltip: str | None = None
setup_teardown_type: Literal["setup", "teardown"] | None = None
type: Literal["join", "task", "asset_condition"]
type: Literal["join", "task", "asset-condition", "asset", "asset-alias", "dag", "sensor", "trigger"]
operator: str | None = None


Expand Down
45 changes: 37 additions & 8 deletions airflow/api_fastapi/core_api/openapi/v1-generated.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,13 @@ paths:
type: boolean
default: false
title: Include Downstream
- name: external_dependencies
in: query
required: false
schema:
type: boolean
default: false
title: External Dependencies
responses:
'200':
description: Successful Response
Expand Down Expand Up @@ -595,6 +602,24 @@ paths:
- type: integer
- type: 'null'
title: Source Map Index
- name: timestamp_gte
in: query
required: false
schema:
anyOf:
- type: string
format: date-time
- type: 'null'
title: Timestamp Gte
- name: timestamp_lte
in: query
required: false
schema:
anyOf:
- type: string
format: date-time
- type: 'null'
title: Timestamp Lte
responses:
'200':
description: Successful Response
Expand Down Expand Up @@ -6714,7 +6739,7 @@ components:
title: Timetable Description
tags:
items:
$ref: '#/components/schemas/DagTagPydantic'
$ref: '#/components/schemas/DagTagResponse'
type: array
title: Tags
max_active_tasks:
Expand Down Expand Up @@ -6936,7 +6961,7 @@ components:
title: Timetable Description
tags:
items:
$ref: '#/components/schemas/DagTagPydantic'
$ref: '#/components/schemas/DagTagResponse'
type: array
title: Tags
max_active_tasks:
Expand Down Expand Up @@ -7412,7 +7437,7 @@ components:
title: Timetable Description
tags:
items:
$ref: '#/components/schemas/DagTagPydantic'
$ref: '#/components/schemas/DagTagResponse'
type: array
title: Tags
max_active_tasks:
Expand Down Expand Up @@ -7665,7 +7690,7 @@ components:
- count
title: DagStatsStateResponse
description: DagStatsState serializer for responses.
DagTagPydantic:
DagTagResponse:
properties:
name:
type: string
Expand All @@ -7677,9 +7702,8 @@ components:
required:
- name
- dag_id
title: DagTagPydantic
description: Serializable representation of the DagTag ORM SqlAlchemyModel used
by internal API.
title: DagTagResponse
description: DAG Tag serializer for responses.
DagWarningType:
type: string
enum:
Expand Down Expand Up @@ -8043,7 +8067,12 @@ components:
enum:
- join
- task
- asset_condition
- asset-condition
- asset
- asset-alias
- dag
- sensor
- trigger
title: Type
operator:
anyOf:
Expand Down
5 changes: 4 additions & 1 deletion airflow/api_fastapi/core_api/routes/public/assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@
QueryLimit,
QueryOffset,
QueryUriPatternSearch,
RangeFilter,
SortParam,
datetime_range_filter_factory,
filter_param_factory,
)
from airflow.api_fastapi.common.router import AirflowRouter
Expand Down Expand Up @@ -193,12 +195,13 @@ def get_asset_events(
source_map_index: Annotated[
FilterParam[int | None], Depends(filter_param_factory(AssetEvent.source_map_index, int | None))
],
timestamp_range: Annotated[RangeFilter, Depends(datetime_range_filter_factory("timestamp", AssetEvent))],
session: SessionDep,
) -> AssetEventCollectionResponse:
"""Get asset events."""
assets_event_select, total_entries = paginated_select(
statement=select(AssetEvent),
filters=[asset_id, source_dag_id, source_task_id, source_run_id, source_map_index],
filters=[asset_id, source_dag_id, source_task_id, source_run_id, source_map_index, timestamp_range],
order_by=order_by,
offset=offset,
limit=limit,
Expand Down
42 changes: 39 additions & 3 deletions airflow/api_fastapi/core_api/routes/ui/structure.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from airflow.api_fastapi.common.router import AirflowRouter
from airflow.api_fastapi.core_api.datamodels.ui.structure import StructureDataResponse
from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc
from airflow.models.serialized_dag import SerializedDagModel
from airflow.utils.dag_edges import dag_edges
from airflow.utils.task_group import task_group_to_dict

Expand All @@ -39,6 +40,7 @@ def structure_data(
root: str | None = None,
include_upstream: bool = False,
include_downstream: bool = False,
external_dependencies: bool = False,
) -> StructureDataResponse:
"""Get Structure Data."""
dag = request.app.state.dag_bag.get_dag(dag_id)
Expand All @@ -51,9 +53,7 @@ def structure_data(
task_ids_or_regex=root, include_upstream=include_upstream, include_downstream=include_downstream
)

nodes = [
task_group_to_dict(child) for child in sorted(dag.task_group.children.values(), key=lambda t: t.label)
]
nodes = [task_group_to_dict(child) for child in dag.task_group.topological_sort()]
edges = dag_edges(dag)

data = {
Expand All @@ -62,4 +62,40 @@ def structure_data(
"edges": edges,
}

if external_dependencies:
entry_node_ref = nodes[0] if nodes else None
exit_node_ref = nodes[-1] if nodes else None

start_edges: list[dict] = []
end_edges: list[dict] = []

for dependency_dag_id, dependencies in SerializedDagModel.get_dag_dependencies().items():
for dependency in dependencies:
if dependency_dag_id != dag_id and dependency.target != dag_id:
continue

# Add nodes
nodes.append(
{
"id": dependency.node_id,
"label": dependency.dependency_id,
"type": dependency.dependency_type,
}
)

# Add edges
# start dependency
if (
dependency.source == dependency.dependency_type or dependency.target == dag_id
) and entry_node_ref:
start_edges.append({"source_id": dependency.node_id, "target_id": entry_node_ref["id"]})

# end dependency
elif (
dependency.target == dependency.dependency_type or dependency.source == dag_id
) and exit_node_ref:
end_edges.append({"source_id": exit_node_ref["id"], "target_id": dependency.node_id})

data["edges"] = start_edges + edges + end_edges

return StructureDataResponse(**data)
8 changes: 2 additions & 6 deletions airflow/api_fastapi/execution_api/datamodels/taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from datetime import timedelta
from typing import Annotated, Any, Literal, Union

from pydantic import Discriminator, Field, RootModel, Tag, WithJsonSchema
from pydantic import Discriminator, Field, Tag, WithJsonSchema

from airflow.api_fastapi.common.types import UtcDateTime
from airflow.api_fastapi.core_api.base import BaseModel
Expand All @@ -33,7 +33,7 @@ class TIEnterRunningPayload(BaseModel):

state: Annotated[
Literal[TIState.RUNNING],
# Specify a default in the schema, but not in code, so Pydantic marks it as required.
# Specify a default in the schema, but not in code.
WithJsonSchema({"type": "string", "enum": [TIState.RUNNING], "default": TIState.RUNNING}),
]
hostname: str
Expand Down Expand Up @@ -135,7 +135,3 @@ class TaskInstance(BaseModel):
run_id: str
try_number: int
map_index: int | None = None


"""Schema for setting RTIF for a task instance."""
RTIFPayload = RootModel[dict[str, str]]
6 changes: 3 additions & 3 deletions airflow/api_fastapi/execution_api/routes/task_instances.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,14 @@
from uuid import UUID

from fastapi import Body, HTTPException, status
from pydantic import JsonValue
from sqlalchemy import update
from sqlalchemy.exc import NoResultFound, SQLAlchemyError
from sqlalchemy.sql import select

from airflow.api_fastapi.common.db.common import SessionDep
from airflow.api_fastapi.common.router import AirflowRouter
from airflow.api_fastapi.execution_api.datamodels.taskinstance import (
RTIFPayload,
TIDeferredStatePayload,
TIEnterRunningPayload,
TIHeartbeatInfo,
Expand Down Expand Up @@ -237,7 +237,7 @@ def ti_heartbeat(
)
def ti_put_rtif(
task_instance_id: UUID,
put_rtif_payload: RTIFPayload,
put_rtif_payload: Annotated[dict[str, JsonValue], Body()],
session: SessionDep,
):
"""Add an RTIF entry for a task instance, sent by the worker."""
Expand All @@ -247,6 +247,6 @@ def ti_put_rtif(
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
)
_update_rtif(task_instance, put_rtif_payload.model_dump(), session)
_update_rtif(task_instance, put_rtif_payload, session)

return {"message": "Rendered task instance fields successfully set"}
Loading

0 comments on commit 7223c00

Please sign in to comment.