diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 49e74d453d09d6..65e3a0895b2cfe 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -174,6 +174,13 @@ repos: language: python additional_dependencies: ['rich>=12.4.4'] require_serial: true + - id: check-imports-in-providers + name: Check imports in providers + entry: ./scripts/ci/pre_commit/check_imports_in_providers.py + language: python + additional_dependencies: ['rich>=12.4.4', "ruff==0.8.1"] + files: ^providers/src/airflow/providers/.*\.py$ + require_serial: true - id: update-common-sql-api-stubs name: Check and update common.sql API stubs entry: ./scripts/ci/pre_commit/update_common_sql_api_stubs.py diff --git a/Dockerfile b/Dockerfile index 6cccf95b40ea5b..730164405b744b 100644 --- a/Dockerfile +++ b/Dockerfile @@ -55,7 +55,7 @@ ARG PYTHON_BASE_IMAGE="python:3.9-slim-bookworm" # Also use `force pip` label on your PR to swap all places we use `uv` to `pip` ARG AIRFLOW_PIP_VERSION=24.3.1 # ARG AIRFLOW_PIP_VERSION="git+https://github.com/pypa/pip.git@main" -ARG AIRFLOW_UV_VERSION=0.5.6 +ARG AIRFLOW_UV_VERSION=0.5.7 ARG AIRFLOW_USE_UV="false" ARG UV_HTTP_TIMEOUT="300" ARG AIRFLOW_IMAGE_REPOSITORY="https://github.com/apache/airflow" diff --git a/Dockerfile.ci b/Dockerfile.ci index 031efab217cb42..1b3594f062dc3f 100644 --- a/Dockerfile.ci +++ b/Dockerfile.ci @@ -1381,7 +1381,7 @@ RUN bash /scripts/docker/install_packaging_tools.sh; \ # Also use `force pip` label on your PR to swap all places we use `uv` to `pip` ARG AIRFLOW_PIP_VERSION=24.3.1 # ARG AIRFLOW_PIP_VERSION="git+https://github.com/pypa/pip.git@main" -ARG AIRFLOW_UV_VERSION=0.5.6 +ARG AIRFLOW_UV_VERSION=0.5.7 # TODO(potiuk): automate with upgrade check (possibly) ARG AIRFLOW_PRE_COMMIT_VERSION="4.0.1" ARG AIRFLOW_PRE_COMMIT_UV_VERSION="4.1.4" diff --git a/airflow/api_fastapi/common/parameters.py b/airflow/api_fastapi/common/parameters.py index 7d5e4c63416807..423de808551453 100644 --- a/airflow/api_fastapi/common/parameters.py +++ b/airflow/api_fastapi/common/parameters.py @@ -40,10 +40,16 @@ from sqlalchemy.inspection import inspect from airflow.models import Base -from airflow.models.asset import AssetModel, DagScheduleAssetReference, TaskOutletAssetReference +from airflow.models.asset import ( + AssetAliasModel, + AssetModel, + DagScheduleAssetReference, + TaskOutletAssetReference, +) from airflow.models.dag import DagModel, DagTag from airflow.models.dagrun import DagRun from airflow.models.taskinstance import TaskInstance +from airflow.models.variable import Variable from airflow.typing_compat import Self from airflow.utils import timezone from airflow.utils.state import DagRunState, TaskInstanceState @@ -568,7 +574,18 @@ def _transform_ti_states(states: list[str] | None) -> list[TaskInstanceState | N ] # Assets +QueryAssetNamePatternSearch = Annotated[ + _SearchParam, Depends(search_param_factory(AssetModel.name, "name_pattern")) +] QueryUriPatternSearch = Annotated[_SearchParam, Depends(search_param_factory(AssetModel.uri, "uri_pattern"))] +QueryAssetAliasNamePatternSearch = Annotated[ + _SearchParam, Depends(search_param_factory(AssetAliasModel.name, "name_pattern")) +] QueryAssetDagIdPatternSearch = Annotated[ _DagIdAssetReferenceFilter, Depends(_DagIdAssetReferenceFilter().depends) ] + +# Variables +QueryVariableKeyPatternSearch = Annotated[ + _SearchParam, Depends(search_param_factory(Variable.key, "variable_key_pattern")) +] diff --git a/airflow/api_fastapi/core_api/datamodels/assets.py b/airflow/api_fastapi/core_api/datamodels/assets.py index 72bba200fabf33..9721157998564e 100644 --- a/airflow/api_fastapi/core_api/datamodels/assets.py +++ b/airflow/api_fastapi/core_api/datamodels/assets.py @@ -42,14 +42,6 @@ class TaskOutletAssetReference(BaseModel): updated_at: datetime -class AssetAliasSchema(BaseModel): - """Asset alias serializer for assets.""" - - id: int - name: str - group: str - - class AssetResponse(BaseModel): """Asset serializer for responses.""" @@ -62,7 +54,7 @@ class AssetResponse(BaseModel): updated_at: datetime consuming_dags: list[DagScheduleAssetReference] producing_tasks: list[TaskOutletAssetReference] - aliases: list[AssetAliasSchema] + aliases: list[AssetAliasResponse] @field_validator("extra", mode="after") @classmethod @@ -77,6 +69,21 @@ class AssetCollectionResponse(BaseModel): total_entries: int +class AssetAliasResponse(BaseModel): + """Asset alias serializer for responses.""" + + id: int + name: str + group: str + + +class AssetAliasCollectionResponse(BaseModel): + """Asset alias collection response.""" + + asset_aliases: list[AssetAliasResponse] + total_entries: int + + class DagRunAssetReference(BaseModel): """DAGRun serializer for asset responses.""" @@ -95,7 +102,6 @@ class AssetEventResponse(BaseModel): id: int asset_id: int - uri: str extra: dict | None = None source_task_id: str | None = None source_dag_id: str | None = None @@ -120,8 +126,8 @@ class AssetEventCollectionResponse(BaseModel): class QueuedEventResponse(BaseModel): """Queued Event serializer for responses..""" - uri: str dag_id: str + asset_id: int created_at: datetime @@ -135,7 +141,7 @@ class QueuedEventCollectionResponse(BaseModel): class CreateAssetEventsBody(BaseModel): """Create asset events request.""" - uri: str + asset_id: int extra: dict = Field(default_factory=dict) @field_validator("extra", mode="after") diff --git a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml index b877e8988a15bd..00f1c5aa5106fc 100644 --- a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml +++ b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml @@ -273,6 +273,14 @@ paths: minimum: 0 default: 0 title: Offset + - name: name_pattern + in: query + required: false + schema: + anyOf: + - type: string + - type: 'null' + title: Name Pattern - name: uri_pattern in: query required: false @@ -327,6 +335,120 @@ paths: application/json: schema: $ref: '#/components/schemas/HTTPValidationError' + /public/assets/aliases: + get: + tags: + - Asset + summary: Get Asset Aliases + description: Get asset aliases. + operationId: get_asset_aliases + parameters: + - name: limit + in: query + required: false + schema: + type: integer + minimum: 0 + default: 100 + title: Limit + - name: offset + in: query + required: false + schema: + type: integer + minimum: 0 + default: 0 + title: Offset + - name: name_pattern + in: query + required: false + schema: + anyOf: + - type: string + - type: 'null' + title: Name Pattern + - name: order_by + in: query + required: false + schema: + type: string + default: id + title: Order By + responses: + '200': + description: Successful Response + content: + application/json: + schema: + $ref: '#/components/schemas/AssetAliasCollectionResponse' + '401': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Unauthorized + '403': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Forbidden + '404': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Not Found + '422': + description: Validation Error + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPValidationError' + /public/assets/aliases/{asset_alias_id}: + get: + tags: + - Asset + summary: Get Asset Alias + description: Get an asset alias. + operationId: get_asset_alias + parameters: + - name: asset_alias_id + in: path + required: true + schema: + type: integer + title: Asset Alias Id + responses: + '200': + description: Successful Response + content: + application/json: + schema: {} + '401': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Unauthorized + '403': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Forbidden + '404': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Not Found + '422': + description: Validation Error + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPValidationError' /public/assets/events: get: tags: @@ -472,7 +594,7 @@ paths: application/json: schema: $ref: '#/components/schemas/HTTPValidationError' - /public/assets/queuedEvents/{uri}: + /public/assets/{asset_id}/queuedEvents: get: tags: - Asset @@ -480,12 +602,12 @@ paths: description: Get queued asset events for an asset. operationId: get_asset_queued_events parameters: - - name: uri + - name: asset_id in: path required: true schema: - type: string - title: Uri + type: integer + title: Asset Id - name: before in: query required: false @@ -532,12 +654,12 @@ paths: description: Delete queued asset events for an asset. operationId: delete_asset_queued_events parameters: - - name: uri + - name: asset_id in: path required: true schema: - type: string - title: Uri + type: integer + title: Asset Id - name: before in: query required: false @@ -573,7 +695,7 @@ paths: application/json: schema: $ref: '#/components/schemas/HTTPValidationError' - /public/assets/{uri}: + /public/assets/{asset_id}: get: tags: - Asset @@ -581,12 +703,12 @@ paths: description: Get an asset. operationId: get_asset parameters: - - name: uri + - name: asset_id in: path required: true schema: - type: string - title: Uri + type: integer + title: Asset Id responses: '200': description: Successful Response @@ -724,7 +846,7 @@ paths: application/json: schema: $ref: '#/components/schemas/HTTPValidationError' - /public/dags/{dag_id}/assets/queuedEvents/{uri}: + /public/dags/{dag_id}/assets/{asset_id}/queuedEvents: get: tags: - Asset @@ -738,12 +860,12 @@ paths: schema: type: string title: Dag Id - - name: uri + - name: asset_id in: path required: true schema: - type: string - title: Uri + type: integer + title: Asset Id - name: before in: query required: false @@ -796,12 +918,12 @@ paths: schema: type: string title: Dag Id - - name: uri + - name: asset_id in: path required: true schema: - type: string - title: Uri + type: integer + title: Asset Id - name: before in: query required: false @@ -5427,6 +5549,14 @@ paths: type: string default: id title: Order By + - name: variable_key_pattern + in: query + required: false + schema: + anyOf: + - type: string + - type: 'null' + title: Variable Key Pattern responses: '200': description: Successful Response @@ -5778,7 +5908,23 @@ components: type: object title: AppBuilderViewResponse description: Serializer for AppBuilder View responses. - AssetAliasSchema: + AssetAliasCollectionResponse: + properties: + asset_aliases: + items: + $ref: '#/components/schemas/AssetAliasResponse' + type: array + title: Asset Aliases + total_entries: + type: integer + title: Total Entries + type: object + required: + - asset_aliases + - total_entries + title: AssetAliasCollectionResponse + description: Asset alias collection response. + AssetAliasResponse: properties: id: type: integer @@ -5794,8 +5940,8 @@ components: - id - name - group - title: AssetAliasSchema - description: Asset alias serializer for assets. + title: AssetAliasResponse + description: Asset alias serializer for responses. AssetCollectionResponse: properties: assets: @@ -5836,9 +5982,6 @@ components: asset_id: type: integer title: Asset Id - uri: - type: string - title: Uri extra: anyOf: - type: object @@ -5875,7 +6018,6 @@ components: required: - id - asset_id - - uri - source_map_index - created_dagruns - timestamp @@ -5920,7 +6062,7 @@ components: title: Producing Tasks aliases: items: - $ref: '#/components/schemas/AssetAliasSchema' + $ref: '#/components/schemas/AssetAliasResponse' type: array title: Aliases type: object @@ -6402,16 +6544,16 @@ components: description: Connection Test serializer for responses. CreateAssetEventsBody: properties: - uri: - type: string - title: Uri + asset_id: + type: integer + title: Asset Id extra: type: object title: Extra additionalProperties: false type: object required: - - uri + - asset_id title: CreateAssetEventsBody description: Create asset events request. DAGCollectionResponse: @@ -8127,20 +8269,20 @@ components: description: Queued Event Collection serializer for responses. QueuedEventResponse: properties: - uri: - type: string - title: Uri dag_id: type: string title: Dag Id + asset_id: + type: integer + title: Asset Id created_at: type: string format: date-time title: Created At type: object required: - - uri - dag_id + - asset_id - created_at title: QueuedEventResponse description: Queued Event serializer for responses.. diff --git a/airflow/api_fastapi/core_api/routes/public/assets.py b/airflow/api_fastapi/core_api/routes/public/assets.py index 3258c4180d97dc..c8cc9fb0f7695c 100644 --- a/airflow/api_fastapi/core_api/routes/public/assets.py +++ b/airflow/api_fastapi/core_api/routes/public/assets.py @@ -28,7 +28,9 @@ from airflow.api_fastapi.common.parameters import ( FilterParam, OptionalDateTimeQuery, + QueryAssetAliasNamePatternSearch, QueryAssetDagIdPatternSearch, + QueryAssetNamePatternSearch, QueryLimit, QueryOffset, QueryUriPatternSearch, @@ -37,6 +39,8 @@ ) from airflow.api_fastapi.common.router import AirflowRouter from airflow.api_fastapi.core_api.datamodels.assets import ( + AssetAliasCollectionResponse, + AssetAliasResponse, AssetCollectionResponse, AssetEventCollectionResponse, AssetEventResponse, @@ -47,7 +51,7 @@ ) from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc from airflow.assets.manager import asset_manager -from airflow.models.asset import AssetDagRunQueue, AssetEvent, AssetModel +from airflow.models.asset import AssetAliasModel, AssetDagRunQueue, AssetEvent, AssetModel from airflow.utils import timezone assets_router = AirflowRouter(tags=["Asset"]) @@ -55,20 +59,16 @@ def _generate_queued_event_where_clause( *, + asset_id: int | None = None, dag_id: str | None = None, - uri: str | None = None, before: datetime | str | None = None, ) -> list: """Get AssetDagRunQueue where clause.""" where_clause = [] if dag_id is not None: where_clause.append(AssetDagRunQueue.target_dag_id == dag_id) - if uri is not None: - where_clause.append( - AssetDagRunQueue.asset_id.in_( - select(AssetModel.id).where(AssetModel.uri == uri), - ), - ) + if asset_id is not None: + where_clause.append(AssetDagRunQueue.asset_id == asset_id) if before is not None: where_clause.append(AssetDagRunQueue.created_at < before) return where_clause @@ -81,18 +81,19 @@ def _generate_queued_event_where_clause( def get_assets( limit: QueryLimit, offset: QueryOffset, + name_pattern: QueryAssetNamePatternSearch, uri_pattern: QueryUriPatternSearch, dag_ids: QueryAssetDagIdPatternSearch, order_by: Annotated[ SortParam, - Depends(SortParam(["id", "uri", "created_at", "updated_at"], AssetModel).dynamic_depends()), + Depends(SortParam(["id", "name", "uri", "created_at", "updated_at"], AssetModel).dynamic_depends()), ], session: SessionDep, ) -> AssetCollectionResponse: """Get assets.""" assets_select, total_entries = paginated_select( statement=select(AssetModel), - filters=[uri_pattern, dag_ids], + filters=[name_pattern, uri_pattern, dag_ids], order_by=order_by, offset=offset, limit=limit, @@ -110,6 +111,51 @@ def get_assets( ) +@assets_router.get( + "/assets/aliases", + responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]), +) +def get_asset_aliases( + limit: QueryLimit, + offset: QueryOffset, + name_pattern: QueryAssetAliasNamePatternSearch, + order_by: Annotated[ + SortParam, + Depends(SortParam(["id", "name"], AssetAliasModel).dynamic_depends()), + ], + session: SessionDep, +) -> AssetAliasCollectionResponse: + """Get asset aliases.""" + asset_aliases_select, total_entries = paginated_select( + statement=select(AssetAliasModel), + filters=[name_pattern], + order_by=order_by, + offset=offset, + limit=limit, + session=session, + ) + + return AssetAliasCollectionResponse( + asset_aliases=session.scalars(asset_aliases_select), + total_entries=total_entries, + ) + + +@assets_router.get( + "/assets/aliases/{asset_alias_id}", + responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]), +) +def get_asset_alias(asset_alias_id: int, session: SessionDep): + """Get an asset alias.""" + alias = session.scalar(select(AssetAliasModel).where(AssetAliasModel.id == asset_alias_id)) + if alias is None: + raise HTTPException( + status.HTTP_404_NOT_FOUND, + f"The Asset Alias with ID: `{asset_alias_id}` was not found", + ) + return AssetAliasResponse.model_validate(alias) + + @assets_router.get( "/assets/events", responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]), @@ -177,9 +223,9 @@ def create_asset_event( session: SessionDep, ) -> AssetEventResponse: """Create asset events.""" - asset_model = session.scalar(select(AssetModel).where(AssetModel.uri == body.uri).limit(1)) + asset_model = session.scalar(select(AssetModel).where(AssetModel.id == body.asset_id).limit(1)) if not asset_model: - raise HTTPException(status.HTTP_404_NOT_FOUND, f"Asset with uri: `{body.uri}` was not found") + raise HTTPException(status.HTTP_404_NOT_FOUND, f"Asset with ID: `{body.asset_id}` was not found") timestamp = timezone.utcnow() assets_event = asset_manager.register_asset_change( @@ -190,41 +236,35 @@ def create_asset_event( ) if not assets_event: - raise HTTPException(status.HTTP_404_NOT_FOUND, f"Asset with uri: `{body.uri}` was not found") - return assets_event + raise HTTPException(status.HTTP_404_NOT_FOUND, f"Asset with ID: `{body.asset_id}` was not found") + return AssetEventResponse.model_validate(assets_event) @assets_router.get( - "/assets/queuedEvents/{uri:path}", - responses=create_openapi_http_exception_doc( - [ - status.HTTP_404_NOT_FOUND, - ] - ), + "/assets/{asset_id}/queuedEvents", + responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]), ) def get_asset_queued_events( - uri: str, + asset_id: int, session: SessionDep, before: OptionalDateTimeQuery = None, ) -> QueuedEventCollectionResponse: """Get queued asset events for an asset.""" - print(f"uri: {uri}") - where_clause = _generate_queued_event_where_clause(uri=uri, before=before) - query = ( - select(AssetDagRunQueue, AssetModel.uri) - .join(AssetModel, AssetDagRunQueue.asset_id == AssetModel.id) - .where(*where_clause) - ) + where_clause = _generate_queued_event_where_clause(asset_id=asset_id, before=before) + query = select(AssetDagRunQueue).where(*where_clause) dag_asset_queued_events_select, total_entries = paginated_select(statement=query) - adrqs = session.execute(dag_asset_queued_events_select).all() + adrqs = session.scalars(dag_asset_queued_events_select).all() if not adrqs: - raise HTTPException(status.HTTP_404_NOT_FOUND, f"Queue event with uri: `{uri}` was not found") + raise HTTPException( + status.HTTP_404_NOT_FOUND, + f"Queue event with asset_id: `{asset_id}` was not found", + ) queued_events = [ - QueuedEventResponse(created_at=adrq.created_at, dag_id=adrq.target_dag_id, uri=uri) - for adrq, uri in adrqs + QueuedEventResponse(created_at=adrq.created_at, dag_id=adrq.target_dag_id, asset_id=adrq.asset_id) + for adrq in adrqs ] return QueuedEventCollectionResponse( @@ -234,33 +274,29 @@ def get_asset_queued_events( @assets_router.get( - "/assets/{uri:path}", + "/assets/{asset_id}", responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]), ) def get_asset( - uri: str, + asset_id: int, session: SessionDep, ) -> AssetResponse: """Get an asset.""" asset = session.scalar( select(AssetModel) - .where(AssetModel.uri == uri) + .where(AssetModel.id == asset_id) .options(joinedload(AssetModel.consuming_dags), joinedload(AssetModel.producing_tasks)) ) if asset is None: - raise HTTPException(status.HTTP_404_NOT_FOUND, f"The Asset with uri: `{uri}` was not found") + raise HTTPException(status.HTTP_404_NOT_FOUND, f"The Asset with ID: `{asset_id}` was not found") return AssetResponse.model_validate(asset) @assets_router.get( "/dags/{dag_id}/assets/queuedEvents", - responses=create_openapi_http_exception_doc( - [ - status.HTTP_404_NOT_FOUND, - ] - ), + responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]), ) def get_dag_asset_queued_events( dag_id: str, @@ -269,20 +305,16 @@ def get_dag_asset_queued_events( ) -> QueuedEventCollectionResponse: """Get queued asset events for a DAG.""" where_clause = _generate_queued_event_where_clause(dag_id=dag_id, before=before) - query = ( - select(AssetDagRunQueue, AssetModel.uri) - .join(AssetModel, AssetDagRunQueue.asset_id == AssetModel.id) - .where(*where_clause) - ) + query = select(AssetDagRunQueue).where(*where_clause) dag_asset_queued_events_select, total_entries = paginated_select(statement=query) - adrqs = session.execute(dag_asset_queued_events_select).all() + adrqs = session.scalars(dag_asset_queued_events_select).all() if not adrqs: raise HTTPException(status.HTTP_404_NOT_FOUND, f"Queue event with dag_id: `{dag_id}` was not found") queued_events = [ - QueuedEventResponse(created_at=adrq.created_at, dag_id=adrq.target_dag_id, uri=uri) - for adrq, uri in adrqs + QueuedEventResponse(created_at=adrq.created_at, dag_id=adrq.target_dag_id, asset_id=adrq.asset_id) + for adrq in adrqs ] return QueuedEventCollectionResponse( @@ -292,56 +324,47 @@ def get_dag_asset_queued_events( @assets_router.get( - "/dags/{dag_id}/assets/queuedEvents/{uri:path}", - responses=create_openapi_http_exception_doc( - [ - status.HTTP_404_NOT_FOUND, - ] - ), + "/dags/{dag_id}/assets/{asset_id}/queuedEvents", + responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]), ) def get_dag_asset_queued_event( dag_id: str, - uri: str, + asset_id: int, session: SessionDep, before: OptionalDateTimeQuery = None, ) -> QueuedEventResponse: """Get a queued asset event for a DAG.""" - where_clause = _generate_queued_event_where_clause(dag_id=dag_id, uri=uri, before=before) - query = ( - select(AssetDagRunQueue) - .join(AssetModel, AssetDagRunQueue.asset_id == AssetModel.id) - .where(*where_clause) - ) + where_clause = _generate_queued_event_where_clause(dag_id=dag_id, asset_id=asset_id, before=before) + query = select(AssetDagRunQueue).where(*where_clause) adrq = session.scalar(query) if not adrq: raise HTTPException( status.HTTP_404_NOT_FOUND, - f"Queued event with dag_id: `{dag_id}` and asset uri: `{uri}` was not found", + f"Queued event with dag_id: `{dag_id}` and asset_id: `{asset_id}` was not found", ) - return QueuedEventResponse(created_at=adrq.created_at, dag_id=adrq.target_dag_id, uri=uri) + return QueuedEventResponse(created_at=adrq.created_at, dag_id=adrq.target_dag_id, asset_id=asset_id) @assets_router.delete( - "/assets/queuedEvents/{uri:path}", + "/assets/{asset_id}/queuedEvents", status_code=status.HTTP_204_NO_CONTENT, - responses=create_openapi_http_exception_doc( - [ - status.HTTP_404_NOT_FOUND, - ] - ), + responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]), ) def delete_asset_queued_events( - uri: str, + asset_id: int, session: SessionDep, before: OptionalDateTimeQuery = None, ): """Delete queued asset events for an asset.""" - where_clause = _generate_queued_event_where_clause(uri=uri, before=before) + where_clause = _generate_queued_event_where_clause(asset_id=asset_id, before=before) delete_stmt = delete(AssetDagRunQueue).where(*where_clause).execution_options(synchronize_session="fetch") result = session.execute(delete_stmt) if result.rowcount == 0: - raise HTTPException(status.HTTP_404_NOT_FOUND, detail=f"Queue event with uri: `{uri}` was not found") + raise HTTPException( + status.HTTP_404_NOT_FOUND, + detail=f"Queue event with asset_id: `{asset_id}` was not found", + ) @assets_router.delete( @@ -369,7 +392,7 @@ def delete_dag_asset_queued_events( @assets_router.delete( - "/dags/{dag_id}/assets/queuedEvents/{uri:path}", + "/dags/{dag_id}/assets/{asset_id}/queuedEvents", status_code=status.HTTP_204_NO_CONTENT, responses=create_openapi_http_exception_doc( [ @@ -380,12 +403,12 @@ def delete_dag_asset_queued_events( ) def delete_dag_asset_queued_event( dag_id: str, - uri: str, + asset_id: int, session: SessionDep, before: OptionalDateTimeQuery = None, ): """Delete a queued asset event for a DAG.""" - where_clause = _generate_queued_event_where_clause(dag_id=dag_id, before=before, uri=uri) + where_clause = _generate_queued_event_where_clause(dag_id=dag_id, before=before, asset_id=asset_id) delete_statement = ( delete(AssetDagRunQueue).where(*where_clause).execution_options(synchronize_session="fetch") ) @@ -393,5 +416,5 @@ def delete_dag_asset_queued_event( if result.rowcount == 0: raise HTTPException( status.HTTP_404_NOT_FOUND, - detail=f"Queued event with dag_id: `{dag_id}` and asset uri: `{uri}` was not found", + detail=f"Queued event with dag_id: `{dag_id}` and asset_id: `{asset_id}` was not found", ) diff --git a/airflow/api_fastapi/core_api/routes/public/variables.py b/airflow/api_fastapi/core_api/routes/public/variables.py index 2c98fbe7f36c8d..1b970444f3175c 100644 --- a/airflow/api_fastapi/core_api/routes/public/variables.py +++ b/airflow/api_fastapi/core_api/routes/public/variables.py @@ -22,7 +22,12 @@ from sqlalchemy import select from airflow.api_fastapi.common.db.common import SessionDep, paginated_select -from airflow.api_fastapi.common.parameters import QueryLimit, QueryOffset, SortParam +from airflow.api_fastapi.common.parameters import ( + QueryLimit, + QueryOffset, + QueryVariableKeyPatternSearch, + SortParam, +) from airflow.api_fastapi.common.router import AirflowRouter from airflow.api_fastapi.core_api.datamodels.variables import ( VariableBody, @@ -80,16 +85,18 @@ def get_variables( SortParam, Depends( SortParam( - ["key", "id"], + ["key", "id", "_val", "description"], Variable, ).dynamic_depends() ), ], session: SessionDep, + varaible_key_pattern: QueryVariableKeyPatternSearch, ) -> VariableCollectionResponse: """Get all Variables entries.""" variable_select, total_entries = paginated_select( statement=select(Variable), + filters=[varaible_key_pattern], order_by=order_by, offset=offset, limit=limit, diff --git a/airflow/api_fastapi/execution_api/datamodels/taskinstance.py b/airflow/api_fastapi/execution_api/datamodels/taskinstance.py index ae05cc140c435a..2b8830c6de1b77 100644 --- a/airflow/api_fastapi/execution_api/datamodels/taskinstance.py +++ b/airflow/api_fastapi/execution_api/datamodels/taskinstance.py @@ -21,7 +21,7 @@ from datetime import timedelta from typing import Annotated, Any, Literal, Union -from pydantic import Discriminator, Field, Tag, WithJsonSchema +from pydantic import Discriminator, Field, RootModel, Tag, WithJsonSchema from airflow.api_fastapi.common.types import UtcDateTime from airflow.api_fastapi.core_api.base import BaseModel @@ -135,3 +135,7 @@ class TaskInstance(BaseModel): run_id: str try_number: int map_index: int | None = None + + +"""Schema for setting RTIF for a task instance.""" +RTIFPayload = RootModel[dict[str, str]] diff --git a/airflow/api_fastapi/execution_api/routes/task_instances.py b/airflow/api_fastapi/execution_api/routes/task_instances.py index 8d15a063f4fc7d..90bbe1c1d3e5b1 100644 --- a/airflow/api_fastapi/execution_api/routes/task_instances.py +++ b/airflow/api_fastapi/execution_api/routes/task_instances.py @@ -29,13 +29,14 @@ from airflow.api_fastapi.common.db.common import SessionDep from airflow.api_fastapi.common.router import AirflowRouter from airflow.api_fastapi.execution_api.datamodels.taskinstance import ( + RTIFPayload, TIDeferredStatePayload, TIEnterRunningPayload, TIHeartbeatInfo, TIStateUpdate, TITerminalStatePayload, ) -from airflow.models.taskinstance import TaskInstance as TI +from airflow.models.taskinstance import TaskInstance as TI, _update_rtif from airflow.models.trigger import Trigger from airflow.utils import timezone from airflow.utils.state import State @@ -219,3 +220,33 @@ def ti_heartbeat( # Update the last heartbeat time! session.execute(update(TI).where(TI.id == ti_id_str).values(last_heartbeat_at=timezone.utcnow())) log.debug("Task with %s state heartbeated", previous_state) + + +@router.put( + "/{task_instance_id}/rtif", + status_code=status.HTTP_201_CREATED, + # TODO: Add description to the operation + # TODO: Add Operation ID to control the function name in the OpenAPI spec + # TODO: Do we need to use create_openapi_http_exception_doc here? + responses={ + status.HTTP_404_NOT_FOUND: {"description": "Task Instance not found"}, + status.HTTP_422_UNPROCESSABLE_ENTITY: { + "description": "Invalid payload for the setting rendered task instance fields" + }, + }, +) +def ti_put_rtif( + task_instance_id: UUID, + put_rtif_payload: RTIFPayload, + session: SessionDep, +): + """Add an RTIF entry for a task instance, sent by the worker.""" + ti_id_str = str(task_instance_id) + task_instance = session.scalar(select(TI).where(TI.id == ti_id_str)) + if not task_instance: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + ) + _update_rtif(task_instance, put_rtif_payload.model_dump(), session) + + return {"message": "Rendered task instance fields successfully set"} diff --git a/airflow/cli/commands/remote_commands/asset_command.py b/airflow/cli/commands/remote_commands/asset_command.py index 12d8516ff62305..02fc9f7bd98d98 100644 --- a/airflow/cli/commands/remote_commands/asset_command.py +++ b/airflow/cli/commands/remote_commands/asset_command.py @@ -23,7 +23,7 @@ from sqlalchemy import select from airflow.api.common.trigger_dag import trigger_dag -from airflow.api_fastapi.core_api.datamodels.assets import AssetAliasSchema, AssetResponse +from airflow.api_fastapi.core_api.datamodels.assets import AssetAliasResponse, AssetResponse from airflow.api_fastapi.core_api.datamodels.dag_run import DAGRunResponse from airflow.cli.simple_table import AirflowConsole from airflow.models.asset import AssetAliasModel, AssetModel, TaskOutletAssetReference @@ -43,7 +43,7 @@ def _list_asset_aliases(args, *, session: Session) -> tuple[Any, type[BaseModel]]: aliases = session.scalars(select(AssetAliasModel).order_by(AssetAliasModel.name)) - return aliases, AssetAliasSchema + return aliases, AssetAliasResponse def _list_assets(args, *, session: Session) -> tuple[Any, type[BaseModel]]: @@ -77,7 +77,7 @@ def _detail_asset_alias(args, *, session: Session) -> BaseModel: if alias is None: raise SystemExit(f"Asset alias with name {args.name} does not exist.") - return AssetAliasSchema.model_validate(alias) + return AssetAliasResponse.model_validate(alias) def _detail_asset(args, *, session: Session) -> BaseModel: diff --git a/airflow/configuration.py b/airflow/configuration.py index c2f8455125065e..76e2690ffe1f7f 100644 --- a/airflow/configuration.py +++ b/airflow/configuration.py @@ -1505,12 +1505,7 @@ def _include_envs( opt = (opt, "env var") section = section.lower() - # if we lower key for kubernetes_environment_variables section, - # then we won't be able to set any Airflow environment - # variables. Airflow only parse environment variables starts - # with AIRFLOW_. Therefore, we need to make it a special case. - if section != "kubernetes_environment_variables": - key = key.lower() + key = key.lower() config_sources.setdefault(section, {}).update({key: opt}) def _filter_by_source( diff --git a/airflow/dag_processing/collection.py b/airflow/dag_processing/collection.py index 34a9823ed96ae5..d26c417c550149 100644 --- a/airflow/dag_processing/collection.py +++ b/airflow/dag_processing/collection.py @@ -31,7 +31,7 @@ import logging from typing import TYPE_CHECKING, NamedTuple -from sqlalchemy import func, select, tuple_ +from sqlalchemy import and_, exists, func, select, tuple_ from sqlalchemy.orm import joinedload, load_only from airflow.assets.manager import asset_manager @@ -281,18 +281,32 @@ def _find_active_assets(name_uri_assets, session: Session): for dm in session.scalars(select(DagModel).where(DagModel.is_active).where(~DagModel.is_paused)) } - return { - (asset_model.name, asset_model.uri) - for asset_model in session.scalars( - select(AssetModel) - .join(AssetActive, (AssetActive.name == AssetModel.name) & (AssetActive.uri == AssetModel.uri)) - .where(tuple_(AssetActive.name, AssetActive.uri).in_(name_uri_assets)) - .where(AssetModel.consuming_dags.any(DagScheduleAssetReference.dag_id.in_(active_dags))) - .options( - joinedload(AssetModel.consuming_dags).joinedload(DagScheduleAssetReference.dag), + return set( + session.execute( + select( + AssetModel.name, + AssetModel.uri, + ).where( + tuple_(AssetModel.name, AssetModel.uri).in_(name_uri_assets), + exists( + select(1).where( + and_( + AssetActive.name == AssetModel.name, + AssetActive.uri == AssetModel.uri, + ), + ) + ), + exists( + select(1).where( + and_( + DagScheduleAssetReference.asset_id == AssetModel.id, + DagScheduleAssetReference.dag_id.in_(active_dags), + ) + ) + ), ) - ).unique() - } + ) + ) class AssetModelOperation(NamedTuple): diff --git a/airflow/dag_processing/manager.py b/airflow/dag_processing/manager.py index d0fbcf44d9531c..2df7890a48f5df 100644 --- a/airflow/dag_processing/manager.py +++ b/airflow/dag_processing/manager.py @@ -31,12 +31,13 @@ import time import zipfile from collections import defaultdict, deque -from collections.abc import Iterator +from collections.abc import Iterator, MutableMapping from datetime import datetime, timedelta from importlib import import_module from pathlib import Path from typing import TYPE_CHECKING, Any, Callable, NamedTuple, cast +import attrs from setproctitle import setproctitle from sqlalchemy import delete, select, update from tabulate import tabulate @@ -81,15 +82,16 @@ class DagParsingStat(NamedTuple): all_files_processed: bool -class DagFileStat(NamedTuple): +@attrs.define +class DagFileStat: """Information about single processing of one file.""" - num_dags: int - import_errors: int - last_finish_time: datetime | None - last_duration: timedelta | None - run_count: int - last_num_of_db_queries: int + num_dags: int = 0 + import_errors: int = 0 + last_finish_time: datetime | None = None + last_duration: float | None = None + run_count: int = 0 + last_num_of_db_queries: int = 0 class DagParsingSignal(enum.Enum): @@ -353,15 +355,6 @@ class DagFileProcessorManager(LoggingMixin): :param async_mode: whether to start the manager in async mode """ - DEFAULT_FILE_STAT = DagFileStat( - num_dags=0, - import_errors=0, - last_finish_time=None, - last_duration=None, - run_count=0, - last_num_of_db_queries=0, - ) - def __init__( self, dag_directory: os.PathLike[str], @@ -416,7 +409,7 @@ def __init__( self._num_run = 0 # Map from file path to stats about the file - self._file_stats: dict[str, DagFileStat] = {} + self._file_stats: MutableMapping[str, DagFileStat] = defaultdict(DagFileStat) # Last time that the DAG dir was traversed to look for files self.last_dag_dir_refresh_time = timezone.make_aware(datetime.fromtimestamp(0)) @@ -488,7 +481,7 @@ def _scan_stale_dags(self): elapsed_time_since_refresh = (now - self.last_deactivate_stale_dags_time).total_seconds() if elapsed_time_since_refresh > self.parsing_cleanup_interval: last_parsed = { - fp: self.get_last_finish_time(fp) for fp in self.file_paths if self.get_last_finish_time(fp) + fp: stat.last_finish_time for fp, stat in self._file_stats.items() if stat.last_finish_time } DagFileProcessorManager.deactivate_stale_dags( last_parsed=last_parsed, @@ -501,7 +494,7 @@ def _scan_stale_dags(self): @provide_session def deactivate_stale_dags( cls, - last_parsed: dict[str, datetime | None], + last_parsed: dict[str, datetime], dag_directory: str, stale_dag_threshold: int, session: Session = NEW_SESSION, @@ -655,7 +648,9 @@ def _run_parsing_loop(self): span.add_event(name="print_stat") self._print_stat() - all_files_processed = all(self.get_last_finish_time(x) is not None for x in self.file_paths) + all_files_processed = all( + self._file_stats[x].last_finish_time is not None for x in self.file_paths + ) max_runs_reached = self.max_runs_reached() try: @@ -872,30 +867,27 @@ def _log_file_processing_stats(self, known_file_paths): rows = [] now = timezone.utcnow() for file_path in known_file_paths: - last_runtime = self.get_last_runtime(file_path) - num_dags = self.get_last_dag_count(file_path) - num_errors = self.get_last_error_count(file_path) + stat = self._file_stats[file_path] file_name = Path(file_path).stem processor_pid = self.get_pid(file_path) processor_start_time = self.get_start_time(file_path) runtime = (now - processor_start_time) if processor_start_time else None - last_run = self.get_last_finish_time(file_path) + last_run = stat.last_finish_time if last_run: seconds_ago = (now - last_run).total_seconds() Stats.gauge(f"dag_processing.last_run.seconds_ago.{file_name}", seconds_ago) - last_num_of_db_queries = self.get_last_num_of_db_queries(file_path) - Stats.gauge(f"dag_processing.last_num_of_db_queries.{file_name}", last_num_of_db_queries) + Stats.gauge(f"dag_processing.last_num_of_db_queries.{file_name}", stat.last_num_of_db_queries) rows.append( ( file_path, processor_pid, runtime, - num_dags, - num_errors, - last_runtime, + stat.num_dags, + stat.import_errors, + stat.last_duration, last_run, - last_num_of_db_queries, + stat.last_num_of_db_queries, ) ) @@ -955,58 +947,6 @@ def get_all_pids(self) -> list[int]: """ return [x.pid for x in self._processors.values()] - def get_last_runtime(self, file_path) -> float | None: - """ - Retrieve the last processing time of a specific path. - - :param file_path: the path to the file that was processed - :return: the runtime (in seconds) of the process of the last run, or - None if the file was never processed. - """ - stat = self._file_stats.get(file_path) - return stat.last_duration.total_seconds() if stat and stat.last_duration else None - - def get_last_dag_count(self, file_path) -> int | None: - """ - Retrieve the total DAG count at a specific path. - - :param file_path: the path to the file that was processed - :return: the number of dags loaded from that file, or None if the file was never processed. - """ - stat = self._file_stats.get(file_path) - return stat.num_dags if stat else None - - def get_last_error_count(self, file_path) -> int | None: - """ - Retrieve the total number of errors from processing a specific path. - - :param file_path: the path to the file that was processed - :return: the number of import errors from processing, or None if the file was never processed. - """ - stat = self._file_stats.get(file_path) - return stat.import_errors if stat else None - - def get_last_num_of_db_queries(self, file_path) -> int | None: - """ - Retrieve the number of queries performed to the Airflow database during last parsing of the file. - - :param file_path: the path to the file that was processed - :return: the number of queries performed to the Airflow database during last parsing of the file, - or None if the file was never processed. - """ - stat = self._file_stats.get(file_path) - return stat.last_num_of_db_queries if stat else None - - def get_last_finish_time(self, file_path) -> datetime | None: - """ - Retrieve the last completion time for processing a specific path. - - :param file_path: the path to the file that was processed - :return: the finish time of the process of the last run, or None if the file was never processed. - """ - stat = self._file_stats.get(file_path) - return stat.last_finish_time if stat else None - def get_start_time(self, file_path) -> datetime | None: """ Retrieve the last start time for processing a specific path. @@ -1019,15 +959,6 @@ def get_start_time(self, file_path) -> datetime | None: return self._processors[file_path].start_time return None - def get_run_count(self, file_path) -> int: - """ - Return the number of times the given file has been parsed. - - :param file_path: the path to the file that's being processed. - """ - stat = self._file_stats.get(file_path) - return stat.run_count if stat else 0 - def get_dag_directory(self) -> str: """Return the dag_director as a string.""" if isinstance(self._dag_directory, Path): @@ -1092,13 +1023,13 @@ def _collect_results_from_processor(self, processor, session: Session = NEW_SESS num_dags = 0 last_num_of_db_queries = 0 - last_duration = last_finish_time - processor.start_time + last_duration = (last_finish_time - processor.start_time).total_seconds() stat = DagFileStat( num_dags=num_dags, import_errors=count_import_errors, last_finish_time=last_finish_time, last_duration=last_duration, - run_count=self.get_run_count(processor.file_path) + 1, + run_count=self._file_stats[processor.file_path].run_count + 1, last_num_of_db_queries=last_num_of_db_queries, ) self._file_stats[processor.file_path] = stat @@ -1110,7 +1041,7 @@ def _collect_results_from_processor(self, processor, session: Session = NEW_SESS span.set_attributes( { "file_path": processor.file_path, - "run_count": self.get_run_count(processor.file_path) + 1, + "run_count": stat.run_count, } ) @@ -1146,8 +1077,8 @@ def _collect_results_from_processor(self, processor, session: Session = NEW_SESS span.end(end_time=datetime_to_nano(last_finish_time)) - Stats.timing(f"dag_processing.last_duration.{file_name}", last_duration) - Stats.timing("dag_processing.last_duration", last_duration, tags={"file_name": file_name}) + Stats.timing(f"dag_processing.last_duration.{file_name}", last_duration * 1000.0) + Stats.timing("dag_processing.last_duration", last_duration * 1000.0, tags={"file_name": file_name}) def collect_results(self) -> None: """Collect the result from any finished DAG processors.""" @@ -1219,7 +1150,6 @@ def add_new_file_path_to_queue(self): if file_path not in self._file_stats: # We found new file after refreshing dir. add to parsing queue at start self.log.info("Adding new file %s to parsing queue", file_path) - self._file_stats[file_path] = DagFileProcessorManager.DEFAULT_FILE_STAT self._file_path_queue.appendleft(file_path) span = Trace.get_current_span() if span.is_recording(): @@ -1266,7 +1196,7 @@ def prepare_file_path_queue(self): # from being added to file_path_queue # unless they were modified recently and parsing mode is "modified_time" # in which case we don't honor "self._file_process_interval" (min_file_process_interval) - last_finish_time = self.get_last_finish_time(file_path) + last_finish_time = self._file_stats[file_path].last_finish_time if ( last_finish_time is not None and (now - last_finish_time).total_seconds() < self._file_process_interval @@ -1316,8 +1246,6 @@ def prepare_file_path_queue(self): "Queuing the following files for processing:\n\t%s", "\n\t".join(files_paths_to_queue) ) - for file_path in files_paths_to_queue: - self._file_stats.setdefault(file_path, DagFileProcessorManager.DEFAULT_FILE_STAT) self._add_paths_to_queue(files_paths_to_queue, False) Stats.incr("dag_processing.file_path_queue_update_count") @@ -1355,8 +1283,8 @@ def _kill_timed_out_processors(self): num_dags=0, import_errors=1, last_finish_time=now, - last_duration=duration, - run_count=self.get_run_count(file_path) + 1, + last_duration=duration.total_seconds(), + run_count=self._file_stats[processor.file_path].run_count + 1, last_num_of_db_queries=0, ) self._file_stats[processor.file_path] = stat diff --git a/airflow/migrations/versions/0036_3_0_0_add_name_field_to_dataset_model.py b/airflow/migrations/versions/0036_3_0_0_add_name_field_to_dataset_model.py index 2676176692a81f..c3e8edfc3b9283 100644 --- a/airflow/migrations/versions/0036_3_0_0_add_name_field_to_dataset_model.py +++ b/airflow/migrations/versions/0036_3_0_0_add_name_field_to_dataset_model.py @@ -58,20 +58,22 @@ def upgrade(): with op.batch_alter_table("dataset_alias", schema=None) as batch_op: batch_op.drop_index("idx_name_unique") batch_op.create_index("idx_dataset_alias_name_unique", ["name"], unique=True) - # Add 'name' column. Set it to nullable for now. + # Add 'name' and 'group' columns. Set them to nullable for now. with op.batch_alter_table("dataset", schema=None) as batch_op: batch_op.add_column(sa.Column("name", _STRING_COLUMN_TYPE)) - batch_op.add_column(sa.Column("group", _STRING_COLUMN_TYPE, default="", nullable=False)) - # Fill name from uri column. + batch_op.add_column(sa.Column("group", _STRING_COLUMN_TYPE)) + # Fill name from uri column, and group to 'asset'. + dataset_table = sa.table("dataset", sa.column("name"), sa.column("uri"), sa.column("group")) with Session(bind=op.get_bind()) as session: - session.execute(sa.text("update dataset set name=uri")) + session.execute(sa.update(dataset_table).values(name=dataset_table.c.uri, group="asset")) session.commit() - # Set the name column non-nullable. + # Set the name and group columns non-nullable. # Now with values in there, we can create the new unique constraint and index. # Due to MySQL restrictions, we are also reducing the length on uri. with op.batch_alter_table("dataset", schema=None) as batch_op: batch_op.alter_column("name", existing_type=_STRING_COLUMN_TYPE, nullable=False) batch_op.alter_column("uri", type_=_STRING_COLUMN_TYPE, nullable=False) + batch_op.alter_column("group", type_=_STRING_COLUMN_TYPE, default="asset", nullable=False) batch_op.drop_index("idx_uri_unique") batch_op.create_index("idx_dataset_name_uri_unique", ["name", "uri"], unique=True) diff --git a/airflow/migrations/versions/0039_3_0_0_tweak_assetaliasmodel_to_match_asset.py b/airflow/migrations/versions/0039_3_0_0_tweak_assetaliasmodel_to_match_asset.py index f1b57974f053cc..25b4f1871938d7 100644 --- a/airflow/migrations/versions/0039_3_0_0_tweak_assetaliasmodel_to_match_asset.py +++ b/airflow/migrations/versions/0039_3_0_0_tweak_assetaliasmodel_to_match_asset.py @@ -41,6 +41,7 @@ import sqlalchemy as sa from alembic import op +from sqlalchemy.orm import Session # Revision identifiers, used by Alembic. revision = "fb2d4922cd79" @@ -59,7 +60,13 @@ def upgrade(): """Tweak AssetAliasModel to match AssetModel.""" with op.batch_alter_table("dataset_alias", schema=None) as batch_op: batch_op.alter_column("name", type_=_STRING_COLUMN_TYPE, nullable=False) - batch_op.add_column(sa.Column("group", _STRING_COLUMN_TYPE, default=str, nullable=False)) + batch_op.add_column(sa.Column("group", _STRING_COLUMN_TYPE)) + dataset_alias_table = sa.table("dataset_alias", sa.column("group")) + with Session(bind=op.get_bind()) as session: + session.execute(sa.update(dataset_alias_table).values(group="asset")) + session.commit() + with op.batch_alter_table("dataset_alias", schema=None) as batch_op: + batch_op.alter_column("group", type_=_STRING_COLUMN_TYPE, default="asset", nullable=False) def downgrade(): diff --git a/airflow/ui/openapi-gen/queries/common.ts b/airflow/ui/openapi-gen/queries/common.ts index c8bc4d067299da..ed1d6066bdcd6a 100644 --- a/airflow/ui/openapi-gen/queries/common.ts +++ b/airflow/ui/openapi-gen/queries/common.ts @@ -59,12 +59,14 @@ export const UseAssetServiceGetAssetsKeyFn = ( { dagIds, limit, + namePattern, offset, orderBy, uriPattern, }: { dagIds?: string[]; limit?: number; + namePattern?: string; offset?: number; orderBy?: string; uriPattern?: string; @@ -72,8 +74,51 @@ export const UseAssetServiceGetAssetsKeyFn = ( queryKey?: Array, ) => [ useAssetServiceGetAssetsKey, - ...(queryKey ?? [{ dagIds, limit, offset, orderBy, uriPattern }]), + ...(queryKey ?? [ + { dagIds, limit, namePattern, offset, orderBy, uriPattern }, + ]), +]; +export type AssetServiceGetAssetAliasesDefaultResponse = Awaited< + ReturnType +>; +export type AssetServiceGetAssetAliasesQueryResult< + TData = AssetServiceGetAssetAliasesDefaultResponse, + TError = unknown, +> = UseQueryResult; +export const useAssetServiceGetAssetAliasesKey = "AssetServiceGetAssetAliases"; +export const UseAssetServiceGetAssetAliasesKeyFn = ( + { + limit, + namePattern, + offset, + orderBy, + }: { + limit?: number; + namePattern?: string; + offset?: number; + orderBy?: string; + } = {}, + queryKey?: Array, +) => [ + useAssetServiceGetAssetAliasesKey, + ...(queryKey ?? [{ limit, namePattern, offset, orderBy }]), ]; +export type AssetServiceGetAssetAliasDefaultResponse = Awaited< + ReturnType +>; +export type AssetServiceGetAssetAliasQueryResult< + TData = AssetServiceGetAssetAliasDefaultResponse, + TError = unknown, +> = UseQueryResult; +export const useAssetServiceGetAssetAliasKey = "AssetServiceGetAssetAlias"; +export const UseAssetServiceGetAssetAliasKeyFn = ( + { + assetAliasId, + }: { + assetAliasId: number; + }, + queryKey?: Array, +) => [useAssetServiceGetAssetAliasKey, ...(queryKey ?? [{ assetAliasId }])]; export type AssetServiceGetAssetEventsDefaultResponse = Awaited< ReturnType >; @@ -129,16 +174,16 @@ export const useAssetServiceGetAssetQueuedEventsKey = "AssetServiceGetAssetQueuedEvents"; export const UseAssetServiceGetAssetQueuedEventsKeyFn = ( { + assetId, before, - uri, }: { + assetId: number; before?: string; - uri: string; }, queryKey?: Array, ) => [ useAssetServiceGetAssetQueuedEventsKey, - ...(queryKey ?? [{ before, uri }]), + ...(queryKey ?? [{ assetId, before }]), ]; export type AssetServiceGetAssetDefaultResponse = Awaited< ReturnType @@ -150,12 +195,12 @@ export type AssetServiceGetAssetQueryResult< export const useAssetServiceGetAssetKey = "AssetServiceGetAsset"; export const UseAssetServiceGetAssetKeyFn = ( { - uri, + assetId, }: { - uri: string; + assetId: number; }, queryKey?: Array, -) => [useAssetServiceGetAssetKey, ...(queryKey ?? [{ uri }])]; +) => [useAssetServiceGetAssetKey, ...(queryKey ?? [{ assetId }])]; export type AssetServiceGetDagAssetQueuedEventsDefaultResponse = Awaited< ReturnType >; @@ -189,18 +234,18 @@ export const useAssetServiceGetDagAssetQueuedEventKey = "AssetServiceGetDagAssetQueuedEvent"; export const UseAssetServiceGetDagAssetQueuedEventKeyFn = ( { + assetId, before, dagId, - uri, }: { + assetId: number; before?: string; dagId: string; - uri: string; }, queryKey?: Array, ) => [ useAssetServiceGetDagAssetQueuedEventKey, - ...(queryKey ?? [{ before, dagId, uri }]), + ...(queryKey ?? [{ assetId, before, dagId }]), ]; export type ConfigServiceGetConfigsDefaultResponse = Awaited< ReturnType @@ -1552,15 +1597,17 @@ export const UseVariableServiceGetVariablesKeyFn = ( limit, offset, orderBy, + variableKeyPattern, }: { limit?: number; offset?: number; orderBy?: string; + variableKeyPattern?: string; } = {}, queryKey?: Array, ) => [ useVariableServiceGetVariablesKey, - ...(queryKey ?? [{ limit, offset, orderBy }]), + ...(queryKey ?? [{ limit, offset, orderBy, variableKeyPattern }]), ]; export type MonitorServiceGetHealthDefaultResponse = Awaited< ReturnType diff --git a/airflow/ui/openapi-gen/queries/prefetch.ts b/airflow/ui/openapi-gen/queries/prefetch.ts index 40e7006e8f8e32..4bb01a7feaad84 100644 --- a/airflow/ui/openapi-gen/queries/prefetch.ts +++ b/airflow/ui/openapi-gen/queries/prefetch.ts @@ -56,6 +56,7 @@ export const prefetchUseAssetServiceNextRunAssets = ( * @param data The data for the request. * @param data.limit * @param data.offset + * @param data.namePattern * @param data.uriPattern * @param data.dagIds * @param data.orderBy @@ -67,12 +68,14 @@ export const prefetchUseAssetServiceGetAssets = ( { dagIds, limit, + namePattern, offset, orderBy, uriPattern, }: { dagIds?: string[]; limit?: number; + namePattern?: string; offset?: number; orderBy?: string; uriPattern?: string; @@ -82,12 +85,75 @@ export const prefetchUseAssetServiceGetAssets = ( queryKey: Common.UseAssetServiceGetAssetsKeyFn({ dagIds, limit, + namePattern, offset, orderBy, uriPattern, }), queryFn: () => - AssetService.getAssets({ dagIds, limit, offset, orderBy, uriPattern }), + AssetService.getAssets({ + dagIds, + limit, + namePattern, + offset, + orderBy, + uriPattern, + }), + }); +/** + * Get Asset Aliases + * Get asset aliases. + * @param data The data for the request. + * @param data.limit + * @param data.offset + * @param data.namePattern + * @param data.orderBy + * @returns AssetAliasCollectionResponse Successful Response + * @throws ApiError + */ +export const prefetchUseAssetServiceGetAssetAliases = ( + queryClient: QueryClient, + { + limit, + namePattern, + offset, + orderBy, + }: { + limit?: number; + namePattern?: string; + offset?: number; + orderBy?: string; + } = {}, +) => + queryClient.prefetchQuery({ + queryKey: Common.UseAssetServiceGetAssetAliasesKeyFn({ + limit, + namePattern, + offset, + orderBy, + }), + queryFn: () => + AssetService.getAssetAliases({ limit, namePattern, offset, orderBy }), + }); +/** + * Get Asset Alias + * Get an asset alias. + * @param data The data for the request. + * @param data.assetAliasId + * @returns unknown Successful Response + * @throws ApiError + */ +export const prefetchUseAssetServiceGetAssetAlias = ( + queryClient: QueryClient, + { + assetAliasId, + }: { + assetAliasId: number; + }, +) => + queryClient.prefetchQuery({ + queryKey: Common.UseAssetServiceGetAssetAliasKeyFn({ assetAliasId }), + queryFn: () => AssetService.getAssetAlias({ assetAliasId }), }); /** * Get Asset Events @@ -153,7 +219,7 @@ export const prefetchUseAssetServiceGetAssetEvents = ( * Get Asset Queued Events * Get queued asset events for an asset. * @param data The data for the request. - * @param data.uri + * @param data.assetId * @param data.before * @returns QueuedEventCollectionResponse Successful Response * @throws ApiError @@ -161,36 +227,39 @@ export const prefetchUseAssetServiceGetAssetEvents = ( export const prefetchUseAssetServiceGetAssetQueuedEvents = ( queryClient: QueryClient, { + assetId, before, - uri, }: { + assetId: number; before?: string; - uri: string; }, ) => queryClient.prefetchQuery({ - queryKey: Common.UseAssetServiceGetAssetQueuedEventsKeyFn({ before, uri }), - queryFn: () => AssetService.getAssetQueuedEvents({ before, uri }), + queryKey: Common.UseAssetServiceGetAssetQueuedEventsKeyFn({ + assetId, + before, + }), + queryFn: () => AssetService.getAssetQueuedEvents({ assetId, before }), }); /** * Get Asset * Get an asset. * @param data The data for the request. - * @param data.uri + * @param data.assetId * @returns AssetResponse Successful Response * @throws ApiError */ export const prefetchUseAssetServiceGetAsset = ( queryClient: QueryClient, { - uri, + assetId, }: { - uri: string; + assetId: number; }, ) => queryClient.prefetchQuery({ - queryKey: Common.UseAssetServiceGetAssetKeyFn({ uri }), - queryFn: () => AssetService.getAsset({ uri }), + queryKey: Common.UseAssetServiceGetAssetKeyFn({ assetId }), + queryFn: () => AssetService.getAsset({ assetId }), }); /** * Get Dag Asset Queued Events @@ -223,7 +292,7 @@ export const prefetchUseAssetServiceGetDagAssetQueuedEvents = ( * Get a queued asset event for a DAG. * @param data The data for the request. * @param data.dagId - * @param data.uri + * @param data.assetId * @param data.before * @returns QueuedEventResponse Successful Response * @throws ApiError @@ -231,22 +300,23 @@ export const prefetchUseAssetServiceGetDagAssetQueuedEvents = ( export const prefetchUseAssetServiceGetDagAssetQueuedEvent = ( queryClient: QueryClient, { + assetId, before, dagId, - uri, }: { + assetId: number; before?: string; dagId: string; - uri: string; }, ) => queryClient.prefetchQuery({ queryKey: Common.UseAssetServiceGetDagAssetQueuedEventKeyFn({ + assetId, before, dagId, - uri, }), - queryFn: () => AssetService.getDagAssetQueuedEvent({ before, dagId, uri }), + queryFn: () => + AssetService.getDagAssetQueuedEvent({ assetId, before, dagId }), }); /** * Get Configs @@ -2117,6 +2187,7 @@ export const prefetchUseVariableServiceGetVariable = ( * @param data.limit * @param data.offset * @param data.orderBy + * @param data.variableKeyPattern * @returns VariableCollectionResponse Successful Response * @throws ApiError */ @@ -2126,10 +2197,12 @@ export const prefetchUseVariableServiceGetVariables = ( limit, offset, orderBy, + variableKeyPattern, }: { limit?: number; offset?: number; orderBy?: string; + variableKeyPattern?: string; } = {}, ) => queryClient.prefetchQuery({ @@ -2137,8 +2210,15 @@ export const prefetchUseVariableServiceGetVariables = ( limit, offset, orderBy, + variableKeyPattern, }), - queryFn: () => VariableService.getVariables({ limit, offset, orderBy }), + queryFn: () => + VariableService.getVariables({ + limit, + offset, + orderBy, + variableKeyPattern, + }), }); /** * Get Health diff --git a/airflow/ui/openapi-gen/queries/queries.ts b/airflow/ui/openapi-gen/queries/queries.ts index 624b9c38be86c1..f8c8bcd7a58b34 100644 --- a/airflow/ui/openapi-gen/queries/queries.ts +++ b/airflow/ui/openapi-gen/queries/queries.ts @@ -87,6 +87,7 @@ export const useAssetServiceNextRunAssets = < * @param data The data for the request. * @param data.limit * @param data.offset + * @param data.namePattern * @param data.uriPattern * @param data.dagIds * @param data.orderBy @@ -101,12 +102,14 @@ export const useAssetServiceGetAssets = < { dagIds, limit, + namePattern, offset, orderBy, uriPattern, }: { dagIds?: string[]; limit?: number; + namePattern?: string; offset?: number; orderBy?: string; uriPattern?: string; @@ -116,19 +119,93 @@ export const useAssetServiceGetAssets = < ) => useQuery({ queryKey: Common.UseAssetServiceGetAssetsKeyFn( - { dagIds, limit, offset, orderBy, uriPattern }, + { dagIds, limit, namePattern, offset, orderBy, uriPattern }, queryKey, ), queryFn: () => AssetService.getAssets({ dagIds, limit, + namePattern, offset, orderBy, uriPattern, }) as TData, ...options, }); +/** + * Get Asset Aliases + * Get asset aliases. + * @param data The data for the request. + * @param data.limit + * @param data.offset + * @param data.namePattern + * @param data.orderBy + * @returns AssetAliasCollectionResponse Successful Response + * @throws ApiError + */ +export const useAssetServiceGetAssetAliases = < + TData = Common.AssetServiceGetAssetAliasesDefaultResponse, + TError = unknown, + TQueryKey extends Array = unknown[], +>( + { + limit, + namePattern, + offset, + orderBy, + }: { + limit?: number; + namePattern?: string; + offset?: number; + orderBy?: string; + } = {}, + queryKey?: TQueryKey, + options?: Omit, "queryKey" | "queryFn">, +) => + useQuery({ + queryKey: Common.UseAssetServiceGetAssetAliasesKeyFn( + { limit, namePattern, offset, orderBy }, + queryKey, + ), + queryFn: () => + AssetService.getAssetAliases({ + limit, + namePattern, + offset, + orderBy, + }) as TData, + ...options, + }); +/** + * Get Asset Alias + * Get an asset alias. + * @param data The data for the request. + * @param data.assetAliasId + * @returns unknown Successful Response + * @throws ApiError + */ +export const useAssetServiceGetAssetAlias = < + TData = Common.AssetServiceGetAssetAliasDefaultResponse, + TError = unknown, + TQueryKey extends Array = unknown[], +>( + { + assetAliasId, + }: { + assetAliasId: number; + }, + queryKey?: TQueryKey, + options?: Omit, "queryKey" | "queryFn">, +) => + useQuery({ + queryKey: Common.UseAssetServiceGetAssetAliasKeyFn( + { assetAliasId }, + queryKey, + ), + queryFn: () => AssetService.getAssetAlias({ assetAliasId }) as TData, + ...options, + }); /** * Get Asset Events * Get asset events. @@ -202,7 +279,7 @@ export const useAssetServiceGetAssetEvents = < * Get Asset Queued Events * Get queued asset events for an asset. * @param data The data for the request. - * @param data.uri + * @param data.assetId * @param data.before * @returns QueuedEventCollectionResponse Successful Response * @throws ApiError @@ -213,28 +290,29 @@ export const useAssetServiceGetAssetQueuedEvents = < TQueryKey extends Array = unknown[], >( { + assetId, before, - uri, }: { + assetId: number; before?: string; - uri: string; }, queryKey?: TQueryKey, options?: Omit, "queryKey" | "queryFn">, ) => useQuery({ queryKey: Common.UseAssetServiceGetAssetQueuedEventsKeyFn( - { before, uri }, + { assetId, before }, queryKey, ), - queryFn: () => AssetService.getAssetQueuedEvents({ before, uri }) as TData, + queryFn: () => + AssetService.getAssetQueuedEvents({ assetId, before }) as TData, ...options, }); /** * Get Asset * Get an asset. * @param data The data for the request. - * @param data.uri + * @param data.assetId * @returns AssetResponse Successful Response * @throws ApiError */ @@ -244,16 +322,16 @@ export const useAssetServiceGetAsset = < TQueryKey extends Array = unknown[], >( { - uri, + assetId, }: { - uri: string; + assetId: number; }, queryKey?: TQueryKey, options?: Omit, "queryKey" | "queryFn">, ) => useQuery({ - queryKey: Common.UseAssetServiceGetAssetKeyFn({ uri }, queryKey), - queryFn: () => AssetService.getAsset({ uri }) as TData, + queryKey: Common.UseAssetServiceGetAssetKeyFn({ assetId }, queryKey), + queryFn: () => AssetService.getAsset({ assetId }) as TData, ...options, }); /** @@ -294,7 +372,7 @@ export const useAssetServiceGetDagAssetQueuedEvents = < * Get a queued asset event for a DAG. * @param data The data for the request. * @param data.dagId - * @param data.uri + * @param data.assetId * @param data.before * @returns QueuedEventResponse Successful Response * @throws ApiError @@ -305,24 +383,24 @@ export const useAssetServiceGetDagAssetQueuedEvent = < TQueryKey extends Array = unknown[], >( { + assetId, before, dagId, - uri, }: { + assetId: number; before?: string; dagId: string; - uri: string; }, queryKey?: TQueryKey, options?: Omit, "queryKey" | "queryFn">, ) => useQuery({ queryKey: Common.UseAssetServiceGetDagAssetQueuedEventKeyFn( - { before, dagId, uri }, + { assetId, before, dagId }, queryKey, ), queryFn: () => - AssetService.getDagAssetQueuedEvent({ before, dagId, uri }) as TData, + AssetService.getDagAssetQueuedEvent({ assetId, before, dagId }) as TData, ...options, }); /** @@ -2503,6 +2581,7 @@ export const useVariableServiceGetVariable = < * @param data.limit * @param data.offset * @param data.orderBy + * @param data.variableKeyPattern * @returns VariableCollectionResponse Successful Response * @throws ApiError */ @@ -2515,21 +2594,28 @@ export const useVariableServiceGetVariables = < limit, offset, orderBy, + variableKeyPattern, }: { limit?: number; offset?: number; orderBy?: string; + variableKeyPattern?: string; } = {}, queryKey?: TQueryKey, options?: Omit, "queryKey" | "queryFn">, ) => useQuery({ queryKey: Common.UseVariableServiceGetVariablesKeyFn( - { limit, offset, orderBy }, + { limit, offset, orderBy, variableKeyPattern }, queryKey, ), queryFn: () => - VariableService.getVariables({ limit, offset, orderBy }) as TData, + VariableService.getVariables({ + limit, + offset, + orderBy, + variableKeyPattern, + }) as TData, ...options, }); /** @@ -3716,7 +3802,7 @@ export const useVariableServicePatchVariable = < * Delete Asset Queued Events * Delete queued asset events for an asset. * @param data The data for the request. - * @param data.uri + * @param data.assetId * @param data.before * @returns void Successful Response * @throws ApiError @@ -3731,8 +3817,8 @@ export const useAssetServiceDeleteAssetQueuedEvents = < TData, TError, { + assetId: number; before?: string; - uri: string; }, TContext >, @@ -3743,15 +3829,15 @@ export const useAssetServiceDeleteAssetQueuedEvents = < TData, TError, { + assetId: number; before?: string; - uri: string; }, TContext >({ - mutationFn: ({ before, uri }) => + mutationFn: ({ assetId, before }) => AssetService.deleteAssetQueuedEvents({ + assetId, before, - uri, }) as unknown as Promise, ...options, }); @@ -3802,7 +3888,7 @@ export const useAssetServiceDeleteDagAssetQueuedEvents = < * Delete a queued asset event for a DAG. * @param data The data for the request. * @param data.dagId - * @param data.uri + * @param data.assetId * @param data.before * @returns void Successful Response * @throws ApiError @@ -3817,9 +3903,9 @@ export const useAssetServiceDeleteDagAssetQueuedEvent = < TData, TError, { + assetId: number; before?: string; dagId: string; - uri: string; }, TContext >, @@ -3830,17 +3916,17 @@ export const useAssetServiceDeleteDagAssetQueuedEvent = < TData, TError, { + assetId: number; before?: string; dagId: string; - uri: string; }, TContext >({ - mutationFn: ({ before, dagId, uri }) => + mutationFn: ({ assetId, before, dagId }) => AssetService.deleteDagAssetQueuedEvent({ + assetId, before, dagId, - uri, }) as unknown as Promise, ...options, }); diff --git a/airflow/ui/openapi-gen/queries/suspense.ts b/airflow/ui/openapi-gen/queries/suspense.ts index 250b2038a8c412..cd6d7c5a7983fa 100644 --- a/airflow/ui/openapi-gen/queries/suspense.ts +++ b/airflow/ui/openapi-gen/queries/suspense.ts @@ -62,6 +62,7 @@ export const useAssetServiceNextRunAssetsSuspense = < * @param data The data for the request. * @param data.limit * @param data.offset + * @param data.namePattern * @param data.uriPattern * @param data.dagIds * @param data.orderBy @@ -76,12 +77,14 @@ export const useAssetServiceGetAssetsSuspense = < { dagIds, limit, + namePattern, offset, orderBy, uriPattern, }: { dagIds?: string[]; limit?: number; + namePattern?: string; offset?: number; orderBy?: string; uriPattern?: string; @@ -91,19 +94,93 @@ export const useAssetServiceGetAssetsSuspense = < ) => useSuspenseQuery({ queryKey: Common.UseAssetServiceGetAssetsKeyFn( - { dagIds, limit, offset, orderBy, uriPattern }, + { dagIds, limit, namePattern, offset, orderBy, uriPattern }, queryKey, ), queryFn: () => AssetService.getAssets({ dagIds, limit, + namePattern, offset, orderBy, uriPattern, }) as TData, ...options, }); +/** + * Get Asset Aliases + * Get asset aliases. + * @param data The data for the request. + * @param data.limit + * @param data.offset + * @param data.namePattern + * @param data.orderBy + * @returns AssetAliasCollectionResponse Successful Response + * @throws ApiError + */ +export const useAssetServiceGetAssetAliasesSuspense = < + TData = Common.AssetServiceGetAssetAliasesDefaultResponse, + TError = unknown, + TQueryKey extends Array = unknown[], +>( + { + limit, + namePattern, + offset, + orderBy, + }: { + limit?: number; + namePattern?: string; + offset?: number; + orderBy?: string; + } = {}, + queryKey?: TQueryKey, + options?: Omit, "queryKey" | "queryFn">, +) => + useSuspenseQuery({ + queryKey: Common.UseAssetServiceGetAssetAliasesKeyFn( + { limit, namePattern, offset, orderBy }, + queryKey, + ), + queryFn: () => + AssetService.getAssetAliases({ + limit, + namePattern, + offset, + orderBy, + }) as TData, + ...options, + }); +/** + * Get Asset Alias + * Get an asset alias. + * @param data The data for the request. + * @param data.assetAliasId + * @returns unknown Successful Response + * @throws ApiError + */ +export const useAssetServiceGetAssetAliasSuspense = < + TData = Common.AssetServiceGetAssetAliasDefaultResponse, + TError = unknown, + TQueryKey extends Array = unknown[], +>( + { + assetAliasId, + }: { + assetAliasId: number; + }, + queryKey?: TQueryKey, + options?: Omit, "queryKey" | "queryFn">, +) => + useSuspenseQuery({ + queryKey: Common.UseAssetServiceGetAssetAliasKeyFn( + { assetAliasId }, + queryKey, + ), + queryFn: () => AssetService.getAssetAlias({ assetAliasId }) as TData, + ...options, + }); /** * Get Asset Events * Get asset events. @@ -177,7 +254,7 @@ export const useAssetServiceGetAssetEventsSuspense = < * Get Asset Queued Events * Get queued asset events for an asset. * @param data The data for the request. - * @param data.uri + * @param data.assetId * @param data.before * @returns QueuedEventCollectionResponse Successful Response * @throws ApiError @@ -188,28 +265,29 @@ export const useAssetServiceGetAssetQueuedEventsSuspense = < TQueryKey extends Array = unknown[], >( { + assetId, before, - uri, }: { + assetId: number; before?: string; - uri: string; }, queryKey?: TQueryKey, options?: Omit, "queryKey" | "queryFn">, ) => useSuspenseQuery({ queryKey: Common.UseAssetServiceGetAssetQueuedEventsKeyFn( - { before, uri }, + { assetId, before }, queryKey, ), - queryFn: () => AssetService.getAssetQueuedEvents({ before, uri }) as TData, + queryFn: () => + AssetService.getAssetQueuedEvents({ assetId, before }) as TData, ...options, }); /** * Get Asset * Get an asset. * @param data The data for the request. - * @param data.uri + * @param data.assetId * @returns AssetResponse Successful Response * @throws ApiError */ @@ -219,16 +297,16 @@ export const useAssetServiceGetAssetSuspense = < TQueryKey extends Array = unknown[], >( { - uri, + assetId, }: { - uri: string; + assetId: number; }, queryKey?: TQueryKey, options?: Omit, "queryKey" | "queryFn">, ) => useSuspenseQuery({ - queryKey: Common.UseAssetServiceGetAssetKeyFn({ uri }, queryKey), - queryFn: () => AssetService.getAsset({ uri }) as TData, + queryKey: Common.UseAssetServiceGetAssetKeyFn({ assetId }, queryKey), + queryFn: () => AssetService.getAsset({ assetId }) as TData, ...options, }); /** @@ -269,7 +347,7 @@ export const useAssetServiceGetDagAssetQueuedEventsSuspense = < * Get a queued asset event for a DAG. * @param data The data for the request. * @param data.dagId - * @param data.uri + * @param data.assetId * @param data.before * @returns QueuedEventResponse Successful Response * @throws ApiError @@ -280,24 +358,24 @@ export const useAssetServiceGetDagAssetQueuedEventSuspense = < TQueryKey extends Array = unknown[], >( { + assetId, before, dagId, - uri, }: { + assetId: number; before?: string; dagId: string; - uri: string; }, queryKey?: TQueryKey, options?: Omit, "queryKey" | "queryFn">, ) => useSuspenseQuery({ queryKey: Common.UseAssetServiceGetDagAssetQueuedEventKeyFn( - { before, dagId, uri }, + { assetId, before, dagId }, queryKey, ), queryFn: () => - AssetService.getDagAssetQueuedEvent({ before, dagId, uri }) as TData, + AssetService.getDagAssetQueuedEvent({ assetId, before, dagId }) as TData, ...options, }); /** @@ -2478,6 +2556,7 @@ export const useVariableServiceGetVariableSuspense = < * @param data.limit * @param data.offset * @param data.orderBy + * @param data.variableKeyPattern * @returns VariableCollectionResponse Successful Response * @throws ApiError */ @@ -2490,21 +2569,28 @@ export const useVariableServiceGetVariablesSuspense = < limit, offset, orderBy, + variableKeyPattern, }: { limit?: number; offset?: number; orderBy?: string; + variableKeyPattern?: string; } = {}, queryKey?: TQueryKey, options?: Omit, "queryKey" | "queryFn">, ) => useSuspenseQuery({ queryKey: Common.UseVariableServiceGetVariablesKeyFn( - { limit, offset, orderBy }, + { limit, offset, orderBy, variableKeyPattern }, queryKey, ), queryFn: () => - VariableService.getVariables({ limit, offset, orderBy }) as TData, + VariableService.getVariables({ + limit, + offset, + orderBy, + variableKeyPattern, + }) as TData, ...options, }); /** diff --git a/airflow/ui/openapi-gen/requests/schemas.gen.ts b/airflow/ui/openapi-gen/requests/schemas.gen.ts index c745b04fbd01e1..965355b0c19375 100644 --- a/airflow/ui/openapi-gen/requests/schemas.gen.ts +++ b/airflow/ui/openapi-gen/requests/schemas.gen.ts @@ -89,7 +89,27 @@ export const $AppBuilderViewResponse = { description: "Serializer for AppBuilder View responses.", } as const; -export const $AssetAliasSchema = { +export const $AssetAliasCollectionResponse = { + properties: { + asset_aliases: { + items: { + $ref: "#/components/schemas/AssetAliasResponse", + }, + type: "array", + title: "Asset Aliases", + }, + total_entries: { + type: "integer", + title: "Total Entries", + }, + }, + type: "object", + required: ["asset_aliases", "total_entries"], + title: "AssetAliasCollectionResponse", + description: "Asset alias collection response.", +} as const; + +export const $AssetAliasResponse = { properties: { id: { type: "integer", @@ -106,8 +126,8 @@ export const $AssetAliasSchema = { }, type: "object", required: ["id", "name", "group"], - title: "AssetAliasSchema", - description: "Asset alias serializer for assets.", + title: "AssetAliasResponse", + description: "Asset alias serializer for responses.", } as const; export const $AssetCollectionResponse = { @@ -160,10 +180,6 @@ export const $AssetEventResponse = { type: "integer", title: "Asset Id", }, - uri: { - type: "string", - title: "Uri", - }, extra: { anyOf: [ { @@ -229,7 +245,6 @@ export const $AssetEventResponse = { required: [ "id", "asset_id", - "uri", "source_map_index", "created_dagruns", "timestamp", @@ -293,7 +308,7 @@ export const $AssetResponse = { }, aliases: { items: { - $ref: "#/components/schemas/AssetAliasSchema", + $ref: "#/components/schemas/AssetAliasResponse", }, type: "array", title: "Aliases", @@ -1005,9 +1020,9 @@ export const $ConnectionTestResponse = { export const $CreateAssetEventsBody = { properties: { - uri: { - type: "string", - title: "Uri", + asset_id: { + type: "integer", + title: "Asset Id", }, extra: { type: "object", @@ -1016,7 +1031,7 @@ export const $CreateAssetEventsBody = { }, additionalProperties: false, type: "object", - required: ["uri"], + required: ["asset_id"], title: "CreateAssetEventsBody", description: "Create asset events request.", } as const; @@ -3620,14 +3635,14 @@ export const $QueuedEventCollectionResponse = { export const $QueuedEventResponse = { properties: { - uri: { - type: "string", - title: "Uri", - }, dag_id: { type: "string", title: "Dag Id", }, + asset_id: { + type: "integer", + title: "Asset Id", + }, created_at: { type: "string", format: "date-time", @@ -3635,7 +3650,7 @@ export const $QueuedEventResponse = { }, }, type: "object", - required: ["uri", "dag_id", "created_at"], + required: ["dag_id", "asset_id", "created_at"], title: "QueuedEventResponse", description: "Queued Event serializer for responses..", } as const; diff --git a/airflow/ui/openapi-gen/requests/services.gen.ts b/airflow/ui/openapi-gen/requests/services.gen.ts index 4bec27ae4651cc..f0446ae0fd8587 100644 --- a/airflow/ui/openapi-gen/requests/services.gen.ts +++ b/airflow/ui/openapi-gen/requests/services.gen.ts @@ -7,6 +7,10 @@ import type { NextRunAssetsResponse, GetAssetsData, GetAssetsResponse, + GetAssetAliasesData, + GetAssetAliasesResponse, + GetAssetAliasData, + GetAssetAliasResponse, GetAssetEventsData, GetAssetEventsResponse, CreateAssetEventData, @@ -209,6 +213,7 @@ export class AssetService { * @param data The data for the request. * @param data.limit * @param data.offset + * @param data.namePattern * @param data.uriPattern * @param data.dagIds * @param data.orderBy @@ -224,6 +229,7 @@ export class AssetService { query: { limit: data.limit, offset: data.offset, + name_pattern: data.namePattern, uri_pattern: data.uriPattern, dag_ids: data.dagIds, order_by: data.orderBy, @@ -237,6 +243,64 @@ export class AssetService { }); } + /** + * Get Asset Aliases + * Get asset aliases. + * @param data The data for the request. + * @param data.limit + * @param data.offset + * @param data.namePattern + * @param data.orderBy + * @returns AssetAliasCollectionResponse Successful Response + * @throws ApiError + */ + public static getAssetAliases( + data: GetAssetAliasesData = {}, + ): CancelablePromise { + return __request(OpenAPI, { + method: "GET", + url: "/public/assets/aliases", + query: { + limit: data.limit, + offset: data.offset, + name_pattern: data.namePattern, + order_by: data.orderBy, + }, + errors: { + 401: "Unauthorized", + 403: "Forbidden", + 404: "Not Found", + 422: "Validation Error", + }, + }); + } + + /** + * Get Asset Alias + * Get an asset alias. + * @param data The data for the request. + * @param data.assetAliasId + * @returns unknown Successful Response + * @throws ApiError + */ + public static getAssetAlias( + data: GetAssetAliasData, + ): CancelablePromise { + return __request(OpenAPI, { + method: "GET", + url: "/public/assets/aliases/{asset_alias_id}", + path: { + asset_alias_id: data.assetAliasId, + }, + errors: { + 401: "Unauthorized", + 403: "Forbidden", + 404: "Not Found", + 422: "Validation Error", + }, + }); + } + /** * Get Asset Events * Get asset events. @@ -306,7 +370,7 @@ export class AssetService { * Get Asset Queued Events * Get queued asset events for an asset. * @param data The data for the request. - * @param data.uri + * @param data.assetId * @param data.before * @returns QueuedEventCollectionResponse Successful Response * @throws ApiError @@ -316,9 +380,9 @@ export class AssetService { ): CancelablePromise { return __request(OpenAPI, { method: "GET", - url: "/public/assets/queuedEvents/{uri}", + url: "/public/assets/{asset_id}/queuedEvents", path: { - uri: data.uri, + asset_id: data.assetId, }, query: { before: data.before, @@ -336,7 +400,7 @@ export class AssetService { * Delete Asset Queued Events * Delete queued asset events for an asset. * @param data The data for the request. - * @param data.uri + * @param data.assetId * @param data.before * @returns void Successful Response * @throws ApiError @@ -346,9 +410,9 @@ export class AssetService { ): CancelablePromise { return __request(OpenAPI, { method: "DELETE", - url: "/public/assets/queuedEvents/{uri}", + url: "/public/assets/{asset_id}/queuedEvents", path: { - uri: data.uri, + asset_id: data.assetId, }, query: { before: data.before, @@ -366,7 +430,7 @@ export class AssetService { * Get Asset * Get an asset. * @param data The data for the request. - * @param data.uri + * @param data.assetId * @returns AssetResponse Successful Response * @throws ApiError */ @@ -375,9 +439,9 @@ export class AssetService { ): CancelablePromise { return __request(OpenAPI, { method: "GET", - url: "/public/assets/{uri}", + url: "/public/assets/{asset_id}", path: { - uri: data.uri, + asset_id: data.assetId, }, errors: { 401: "Unauthorized", @@ -453,7 +517,7 @@ export class AssetService { * Get a queued asset event for a DAG. * @param data The data for the request. * @param data.dagId - * @param data.uri + * @param data.assetId * @param data.before * @returns QueuedEventResponse Successful Response * @throws ApiError @@ -463,10 +527,10 @@ export class AssetService { ): CancelablePromise { return __request(OpenAPI, { method: "GET", - url: "/public/dags/{dag_id}/assets/queuedEvents/{uri}", + url: "/public/dags/{dag_id}/assets/{asset_id}/queuedEvents", path: { dag_id: data.dagId, - uri: data.uri, + asset_id: data.assetId, }, query: { before: data.before, @@ -485,7 +549,7 @@ export class AssetService { * Delete a queued asset event for a DAG. * @param data The data for the request. * @param data.dagId - * @param data.uri + * @param data.assetId * @param data.before * @returns void Successful Response * @throws ApiError @@ -495,10 +559,10 @@ export class AssetService { ): CancelablePromise { return __request(OpenAPI, { method: "DELETE", - url: "/public/dags/{dag_id}/assets/queuedEvents/{uri}", + url: "/public/dags/{dag_id}/assets/{asset_id}/queuedEvents", path: { dag_id: data.dagId, - uri: data.uri, + asset_id: data.assetId, }, query: { before: data.before, @@ -2946,6 +3010,7 @@ export class VariableService { * @param data.limit * @param data.offset * @param data.orderBy + * @param data.variableKeyPattern * @returns VariableCollectionResponse Successful Response * @throws ApiError */ @@ -2959,6 +3024,7 @@ export class VariableService { limit: data.limit, offset: data.offset, order_by: data.orderBy, + variable_key_pattern: data.variableKeyPattern, }, errors: { 401: "Unauthorized", diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow/ui/openapi-gen/requests/types.gen.ts index 2831e7225f71d5..88f6a2fc763c11 100644 --- a/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow/ui/openapi-gen/requests/types.gen.ts @@ -22,9 +22,17 @@ export type AppBuilderViewResponse = { }; /** - * Asset alias serializer for assets. + * Asset alias collection response. */ -export type AssetAliasSchema = { +export type AssetAliasCollectionResponse = { + asset_aliases: Array; + total_entries: number; +}; + +/** + * Asset alias serializer for responses. + */ +export type AssetAliasResponse = { id: number; name: string; group: string; @@ -52,7 +60,6 @@ export type AssetEventCollectionResponse = { export type AssetEventResponse = { id: number; asset_id: number; - uri: string; extra?: { [key: string]: unknown; } | null; @@ -79,7 +86,7 @@ export type AssetResponse = { updated_at: string; consuming_dags: Array; producing_tasks: Array; - aliases: Array; + aliases: Array; }; /** @@ -257,7 +264,7 @@ export type ConnectionTestResponse = { * Create asset events request. */ export type CreateAssetEventsBody = { - uri: string; + asset_id: number; extra?: { [key: string]: unknown; }; @@ -891,8 +898,8 @@ export type QueuedEventCollectionResponse = { * Queued Event serializer for responses.. */ export type QueuedEventResponse = { - uri: string; dag_id: string; + asset_id: number; created_at: string; }; @@ -1302,6 +1309,7 @@ export type NextRunAssetsResponse = { export type GetAssetsData = { dagIds?: Array; limit?: number; + namePattern?: string | null; offset?: number; orderBy?: string; uriPattern?: string | null; @@ -1309,6 +1317,21 @@ export type GetAssetsData = { export type GetAssetsResponse = AssetCollectionResponse; +export type GetAssetAliasesData = { + limit?: number; + namePattern?: string | null; + offset?: number; + orderBy?: string; +}; + +export type GetAssetAliasesResponse = AssetAliasCollectionResponse; + +export type GetAssetAliasData = { + assetAliasId: number; +}; + +export type GetAssetAliasResponse = unknown; + export type GetAssetEventsData = { assetId?: number | null; limit?: number; @@ -1329,21 +1352,21 @@ export type CreateAssetEventData = { export type CreateAssetEventResponse = AssetEventResponse; export type GetAssetQueuedEventsData = { + assetId: number; before?: string | null; - uri: string; }; export type GetAssetQueuedEventsResponse = QueuedEventCollectionResponse; export type DeleteAssetQueuedEventsData = { + assetId: number; before?: string | null; - uri: string; }; export type DeleteAssetQueuedEventsResponse = void; export type GetAssetData = { - uri: string; + assetId: number; }; export type GetAssetResponse = AssetResponse; @@ -1363,17 +1386,17 @@ export type DeleteDagAssetQueuedEventsData = { export type DeleteDagAssetQueuedEventsResponse = void; export type GetDagAssetQueuedEventData = { + assetId: number; before?: string | null; dagId: string; - uri: string; }; export type GetDagAssetQueuedEventResponse = QueuedEventResponse; export type DeleteDagAssetQueuedEventData = { + assetId: number; before?: string | null; dagId: string; - uri: string; }; export type DeleteDagAssetQueuedEventResponse = void; @@ -2030,6 +2053,7 @@ export type GetVariablesData = { limit?: number; offset?: number; orderBy?: string; + variableKeyPattern?: string | null; }; export type GetVariablesResponse = VariableCollectionResponse; @@ -2095,6 +2119,60 @@ export type $OpenApiTs = { }; }; }; + "/public/assets/aliases": { + get: { + req: GetAssetAliasesData; + res: { + /** + * Successful Response + */ + 200: AssetAliasCollectionResponse; + /** + * Unauthorized + */ + 401: HTTPExceptionResponse; + /** + * Forbidden + */ + 403: HTTPExceptionResponse; + /** + * Not Found + */ + 404: HTTPExceptionResponse; + /** + * Validation Error + */ + 422: HTTPValidationError; + }; + }; + }; + "/public/assets/aliases/{asset_alias_id}": { + get: { + req: GetAssetAliasData; + res: { + /** + * Successful Response + */ + 200: unknown; + /** + * Unauthorized + */ + 401: HTTPExceptionResponse; + /** + * Forbidden + */ + 403: HTTPExceptionResponse; + /** + * Not Found + */ + 404: HTTPExceptionResponse; + /** + * Validation Error + */ + 422: HTTPValidationError; + }; + }; + }; "/public/assets/events": { get: { req: GetAssetEventsData; @@ -2147,7 +2225,7 @@ export type $OpenApiTs = { }; }; }; - "/public/assets/queuedEvents/{uri}": { + "/public/assets/{asset_id}/queuedEvents": { get: { req: GetAssetQueuedEventsData; res: { @@ -2199,7 +2277,7 @@ export type $OpenApiTs = { }; }; }; - "/public/assets/{uri}": { + "/public/assets/{asset_id}": { get: { req: GetAssetData; res: { @@ -2282,7 +2360,7 @@ export type $OpenApiTs = { }; }; }; - "/public/dags/{dag_id}/assets/queuedEvents/{uri}": { + "/public/dags/{dag_id}/assets/{asset_id}/queuedEvents": { get: { req: GetDagAssetQueuedEventData; res: { diff --git a/airflow/ui/package.json b/airflow/ui/package.json index 6a4dfaec8312ee..728babb74f6a3e 100644 --- a/airflow/ui/package.json +++ b/airflow/ui/package.json @@ -37,7 +37,7 @@ "react-chartjs-2": "^5.2.0", "react-dom": "^18.3.1", "react-hook-form": "^7.20.0", - "react-icons": "^5.3.0", + "react-icons": "^5.4.0", "react-markdown": "^9.0.1", "react-router-dom": "^6.26.2", "react-syntax-highlighter": "^15.5.6", diff --git a/airflow/ui/pnpm-lock.yaml b/airflow/ui/pnpm-lock.yaml index 524a88ac9120e5..afb2b276b4086f 100644 --- a/airflow/ui/pnpm-lock.yaml +++ b/airflow/ui/pnpm-lock.yaml @@ -72,8 +72,8 @@ importers: specifier: ^7.20.0 version: 7.53.1(react@18.3.1) react-icons: - specifier: ^5.3.0 - version: 5.3.0(react@18.3.1) + specifier: ^5.4.0 + version: 5.4.0(react@18.3.1) react-markdown: specifier: ^9.0.1 version: 9.0.1(@types/react@18.3.5)(react@18.3.1) @@ -3195,8 +3195,8 @@ packages: peerDependencies: react: ^16.8.0 || ^17 || ^18 || ^19 - react-icons@5.3.0: - resolution: {integrity: sha512-DnUk8aFbTyQPSkCfF8dbX6kQjXA9DktMeJqfjrg6cK9vwQVMxmcA3BfP4QoiztVmEHtwlTgLFsPuH2NskKT6eg==} + react-icons@5.4.0: + resolution: {integrity: sha512-7eltJxgVt7X64oHh6wSWNwwbKTCtMfK35hcjvJS0yxEAhPM8oUKdS3+kqaW1vicIltw+kR2unHaa12S9pPALoQ==} peerDependencies: react: '*' @@ -7769,7 +7769,7 @@ snapshots: dependencies: react: 18.3.1 - react-icons@5.3.0(react@18.3.1): + react-icons@5.4.0(react@18.3.1): dependencies: react: 18.3.1 diff --git a/airflow/ui/src/components/SearchBar.test.tsx b/airflow/ui/src/components/SearchBar.test.tsx index 06d4a90d4f3628..fd0c4dfaa9b0cd 100644 --- a/airflow/ui/src/components/SearchBar.test.tsx +++ b/airflow/ui/src/components/SearchBar.test.tsx @@ -25,9 +25,16 @@ import { SearchBar } from "./SearchBar"; describe("Test SearchBar", () => { it("Renders and clear button works", async () => { - render(, { - wrapper: Wrapper, - }); + render( + , + { + wrapper: Wrapper, + }, + ); const input = screen.getByTestId("search-dags"); diff --git a/airflow/ui/src/components/SearchBar.tsx b/airflow/ui/src/components/SearchBar.tsx index 2714a636e5b2fb..5cc27c4d4997bf 100644 --- a/airflow/ui/src/components/SearchBar.tsx +++ b/airflow/ui/src/components/SearchBar.tsx @@ -30,6 +30,7 @@ type Props = { readonly defaultValue: string; readonly groupProps?: InputGroupProps; readonly onChange: (value: string) => void; + readonly placeHolder: string; }; export const SearchBar = ({ @@ -37,6 +38,7 @@ export const SearchBar = ({ defaultValue, groupProps, onChange, + placeHolder, }: Props) => { const handleSearchChange = useDebouncedCallback( (val: string) => onChange(val), @@ -84,7 +86,7 @@ export const SearchBar = ({ diff --git a/airflow/ui/src/components/Stat.tsx b/airflow/ui/src/components/Stat.tsx new file mode 100644 index 00000000000000..7f39a092ea2fc5 --- /dev/null +++ b/airflow/ui/src/components/Stat.tsx @@ -0,0 +1,33 @@ +/*! + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +import { Heading, VStack } from "@chakra-ui/react"; +import type { PropsWithChildren } from "react"; + +type Props = { + readonly label: string; +} & PropsWithChildren; + +export const Stat = ({ children, label }: Props) => ( + + + {label} + + {children} + +); diff --git a/airflow/ui/src/components/ui/Breadcrumb/Root.tsx b/airflow/ui/src/components/ui/Breadcrumb/Root.tsx new file mode 100644 index 00000000000000..3f8af3d4984334 --- /dev/null +++ b/airflow/ui/src/components/ui/Breadcrumb/Root.tsx @@ -0,0 +1,55 @@ +/*! + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +import { Breadcrumb, type SystemStyleObject } from "@chakra-ui/react"; +import React from "react"; + +export type BreadcrumbRootProps = { + separator?: React.ReactNode; + separatorGap?: SystemStyleObject["gap"]; +} & Breadcrumb.RootProps; + +export const Root = React.forwardRef( + (props, ref) => { + const { children, separator, separatorGap, ...rest } = props; + + const validChildren = React.Children.toArray(children).filter( + React.isValidElement, + ); + + return ( + + + {validChildren.map((child, index) => { + const last = index === validChildren.length - 1; + + return ( + // eslint-disable-next-line react/no-array-index-key + + {child} + {!last && ( + {separator} + )} + + ); + })} + + + ); + }, +); diff --git a/airflow/ui/src/components/ui/Breadcrumb/index.ts b/airflow/ui/src/components/ui/Breadcrumb/index.ts new file mode 100644 index 00000000000000..fc812dea64a863 --- /dev/null +++ b/airflow/ui/src/components/ui/Breadcrumb/index.ts @@ -0,0 +1,26 @@ +/*! + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +import { Breadcrumb as ChakraBreadcrumb } from "@chakra-ui/react"; + +import { Root } from "./Root"; + +export const Breadcrumb = { + ...ChakraBreadcrumb, + Root, +}; diff --git a/airflow/ui/src/components/ui/Status.tsx b/airflow/ui/src/components/ui/Status.tsx index 7c408b931d3787..f614f6a0def9ee 100644 --- a/airflow/ui/src/components/ui/Status.tsx +++ b/airflow/ui/src/components/ui/Status.tsx @@ -28,12 +28,13 @@ import { stateColor } from "src/utils/stateColor"; type StatusValue = DagRunState | TaskInstanceState; export type StatusProps = { - state?: StatusValue; + state: StatusValue | null; } & ChakraStatus.RootProps; export const Status = React.forwardRef( ({ children, state, ...rest }, ref) => { - const colorPalette = state === undefined ? "info" : stateColor[state]; + // "null" is actually a string on stateColor + const colorPalette = stateColor[state ?? "null"]; return ( diff --git a/airflow/ui/src/components/ui/index.ts b/airflow/ui/src/components/ui/index.ts index f88340b42d79b3..add5c4b355038b 100644 --- a/airflow/ui/src/components/ui/index.ts +++ b/airflow/ui/src/components/ui/index.ts @@ -34,3 +34,4 @@ export * from "./Accordion"; export * from "./Status"; export * from "./Button"; export * from "./Toaster"; +export * from "./Breadcrumb"; diff --git a/airflow/ui/src/layouts/Details/DagVizModal.tsx b/airflow/ui/src/layouts/Details/DagVizModal.tsx new file mode 100644 index 00000000000000..36b5d63d659934 --- /dev/null +++ b/airflow/ui/src/layouts/Details/DagVizModal.tsx @@ -0,0 +1,105 @@ +/*! + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +import { Button, Heading, HStack } from "@chakra-ui/react"; +import { FaChartGantt } from "react-icons/fa6"; +import { FiGrid } from "react-icons/fi"; +import { Link as RouterLink, useSearchParams } from "react-router-dom"; + +import type { DAGResponse } from "openapi/requests/types.gen"; +import { DagIcon } from "src/assets/DagIcon"; +import { Dialog } from "src/components/ui"; +import { capitalize } from "src/utils"; + +import { Gantt } from "./Gantt"; +import { Graph } from "./Graph"; +import { Grid } from "./Grid"; + +type TriggerDAGModalProps = { + dagDisplayName?: DAGResponse["dag_display_name"]; + dagId?: DAGResponse["dag_id"]; + onClose: () => void; + open: boolean; +}; + +const visualizationOptions = [ + { + component: , + icon: , + value: "gantt", + }, + { + component: , + icon: , + value: "graph", + }, + { component: , icon: , value: "grid" }, +]; + +export const DagVizModal: React.FC = ({ + dagDisplayName, + dagId, + onClose, + open, +}) => { + const [searchParams] = useSearchParams(); + + const activeViz = searchParams.get("modal") ?? "graph"; + const params = new URLSearchParams(searchParams); + + params.delete("modal"); + + return ( + + + + + + {dagDisplayName ?? dagId} + + {visualizationOptions.map(({ icon, value }) => ( + + + + ))} + + + + + {dagId === undefined + ? undefined + : visualizationOptions.find((viz) => viz.value === activeViz) + ?.component} + + + + ); +}; diff --git a/airflow/ui/src/pages/DagsList/Dag/Dag.tsx b/airflow/ui/src/layouts/Details/DetailsLayout.tsx similarity index 55% rename from airflow/ui/src/pages/DagsList/Dag/Dag.tsx rename to airflow/ui/src/layouts/Details/DetailsLayout.tsx index 2a8c0a42159b2a..1302bd2dacc005 100644 --- a/airflow/ui/src/pages/DagsList/Dag/Dag.tsx +++ b/airflow/ui/src/layouts/Details/DetailsLayout.tsx @@ -17,62 +17,70 @@ * under the License. */ import { Box, Button } from "@chakra-ui/react"; +import type { PropsWithChildren } from "react"; import { FiChevronsLeft } from "react-icons/fi"; -import { Outlet, Link as RouterLink, useParams } from "react-router-dom"; - import { - useDagServiceGetDagDetails, - useDagsServiceRecentDagRuns, -} from "openapi/queries"; + Outlet, + Link as RouterLink, + useParams, + useSearchParams, +} from "react-router-dom"; + +import type { DAGResponse } from "openapi/requests/types.gen"; import { ErrorAlert } from "src/components/ErrorAlert"; import { ProgressBar } from "src/components/ui"; import { Toaster } from "src/components/ui"; import { OpenGroupsProvider } from "src/context/openGroups"; -import { Header } from "./Header"; -import { DagTabs } from "./Tabs"; +import { DagVizModal } from "./DagVizModal"; +import { NavTabs } from "./NavTabs"; + +type Props = { + readonly dag?: DAGResponse; + readonly error?: unknown; + readonly isLoading?: boolean; + readonly tabs: Array<{ label: string; value: string }>; +} & PropsWithChildren; -export const Dag = () => { - const { dagId } = useParams(); +export const DetailsLayout = ({ + children, + dag, + error, + isLoading, + tabs, +}: Props) => { + const { dagId = "" } = useParams(); - const { - data: dag, - error, - isLoading, - } = useDagServiceGetDagDetails({ - dagId: dagId ?? "", - }); + const [searchParams, setSearchParams] = useSearchParams(); - // TODO: replace with with a list dag runs by dag id request - const { - data: runsData, - error: runsError, - isLoading: isLoadingRuns, - } = useDagsServiceRecentDagRuns({ dagIdPattern: dagId ?? "" }, undefined, { - enabled: Boolean(dagId), - }); + const modal = searchParams.get("modal"); - const runs = - runsData?.dags.find((dagWithRuns) => dagWithRuns.dag_id === dagId) - ?.latest_dag_runs ?? []; + const isModalOpen = modal !== null; + const onClose = () => { + searchParams.delete("modal"); + setSearchParams(searchParams); + }; return ( - + - -
- - + {children} + + + + - diff --git a/airflow/ui/src/layouts/Details/Gantt/Gantt.tsx b/airflow/ui/src/layouts/Details/Gantt/Gantt.tsx new file mode 100644 index 00000000000000..90530c4b7b026d --- /dev/null +++ b/airflow/ui/src/layouts/Details/Gantt/Gantt.tsx @@ -0,0 +1,21 @@ +/*! + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +import { Box } from "@chakra-ui/react"; + +export const Gantt = () => gantt; diff --git a/airflow/ui/src/layouts/Details/Gantt/index.ts b/airflow/ui/src/layouts/Details/Gantt/index.ts new file mode 100644 index 00000000000000..24f6dabe4cfe72 --- /dev/null +++ b/airflow/ui/src/layouts/Details/Gantt/index.ts @@ -0,0 +1,20 @@ +/*! + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +export * from "./Gantt"; diff --git a/airflow/ui/src/pages/DagsList/Dag/Graph/Edge.tsx b/airflow/ui/src/layouts/Details/Graph/Edge.tsx similarity index 100% rename from airflow/ui/src/pages/DagsList/Dag/Graph/Edge.tsx rename to airflow/ui/src/layouts/Details/Graph/Edge.tsx diff --git a/airflow/ui/src/pages/DagsList/Dag/Graph/Graph.tsx b/airflow/ui/src/layouts/Details/Graph/Graph.tsx similarity index 94% rename from airflow/ui/src/pages/DagsList/Dag/Graph/Graph.tsx rename to airflow/ui/src/layouts/Details/Graph/Graph.tsx index 69ee51cc892178..f7dbc6b2d30164 100644 --- a/airflow/ui/src/pages/DagsList/Dag/Graph/Graph.tsx +++ b/airflow/ui/src/layouts/Details/Graph/Graph.tsx @@ -19,9 +19,9 @@ import { Flex } from "@chakra-ui/react"; import { ReactFlow, Controls, Background, MiniMap } from "@xyflow/react"; import "@xyflow/react/dist/style.css"; +import { useParams } from "react-router-dom"; import { useStructureServiceStructureData } from "openapi/queries"; -import type { DAGResponse } from "openapi/requests/types.gen"; import { useColorMode } from "src/context/colorMode"; import { useOpenGroups } from "src/context/openGroups"; @@ -36,8 +36,9 @@ const nodeTypes = { }; const edgeTypes = { custom: Edge }; -export const Graph = ({ dagId }: { readonly dagId: DAGResponse["dag_id"] }) => { +export const Graph = () => { const { colorMode } = useColorMode(); + const { dagId = "" } = useParams(); const { openGroupIds } = useOpenGroups(); diff --git a/airflow/ui/src/pages/DagsList/Dag/Graph/JoinNode.tsx b/airflow/ui/src/layouts/Details/Graph/JoinNode.tsx similarity index 100% rename from airflow/ui/src/pages/DagsList/Dag/Graph/JoinNode.tsx rename to airflow/ui/src/layouts/Details/Graph/JoinNode.tsx diff --git a/airflow/ui/src/pages/DagsList/Dag/Graph/NodeWrapper.tsx b/airflow/ui/src/layouts/Details/Graph/NodeWrapper.tsx similarity index 100% rename from airflow/ui/src/pages/DagsList/Dag/Graph/NodeWrapper.tsx rename to airflow/ui/src/layouts/Details/Graph/NodeWrapper.tsx diff --git a/airflow/ui/src/pages/DagsList/Dag/Graph/TaskName.tsx b/airflow/ui/src/layouts/Details/Graph/TaskName.tsx similarity index 100% rename from airflow/ui/src/pages/DagsList/Dag/Graph/TaskName.tsx rename to airflow/ui/src/layouts/Details/Graph/TaskName.tsx diff --git a/airflow/ui/src/pages/DagsList/Dag/Graph/TaskNode.tsx b/airflow/ui/src/layouts/Details/Graph/TaskNode.tsx similarity index 100% rename from airflow/ui/src/pages/DagsList/Dag/Graph/TaskNode.tsx rename to airflow/ui/src/layouts/Details/Graph/TaskNode.tsx diff --git a/airflow/ui/src/pages/DagsList/Dag/Graph/index.ts b/airflow/ui/src/layouts/Details/Graph/index.ts similarity index 100% rename from airflow/ui/src/pages/DagsList/Dag/Graph/index.ts rename to airflow/ui/src/layouts/Details/Graph/index.ts diff --git a/airflow/ui/src/pages/DagsList/Dag/Graph/reactflowUtils.ts b/airflow/ui/src/layouts/Details/Graph/reactflowUtils.ts similarity index 100% rename from airflow/ui/src/pages/DagsList/Dag/Graph/reactflowUtils.ts rename to airflow/ui/src/layouts/Details/Graph/reactflowUtils.ts diff --git a/airflow/ui/src/pages/DagsList/Dag/Graph/useGraphLayout.ts b/airflow/ui/src/layouts/Details/Graph/useGraphLayout.ts similarity index 100% rename from airflow/ui/src/pages/DagsList/Dag/Graph/useGraphLayout.ts rename to airflow/ui/src/layouts/Details/Graph/useGraphLayout.ts diff --git a/airflow/ui/src/layouts/Details/Grid/Grid.tsx b/airflow/ui/src/layouts/Details/Grid/Grid.tsx new file mode 100644 index 00000000000000..8717fef1d4340a --- /dev/null +++ b/airflow/ui/src/layouts/Details/Grid/Grid.tsx @@ -0,0 +1,21 @@ +/*! + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +import { Box } from "@chakra-ui/react"; + +export const Grid = () => grid; diff --git a/airflow/ui/src/layouts/Details/Grid/index.ts b/airflow/ui/src/layouts/Details/Grid/index.ts new file mode 100644 index 00000000000000..dccae200d1c236 --- /dev/null +++ b/airflow/ui/src/layouts/Details/Grid/index.ts @@ -0,0 +1,20 @@ +/*! + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +export * from "./Grid"; diff --git a/airflow/ui/src/layouts/Details/NavTabs.tsx b/airflow/ui/src/layouts/Details/NavTabs.tsx new file mode 100644 index 00000000000000..0d4da41b2c8a4f --- /dev/null +++ b/airflow/ui/src/layouts/Details/NavTabs.tsx @@ -0,0 +1,90 @@ +/*! + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +import { Button, Center, Flex } from "@chakra-ui/react"; +import { FaChartGantt } from "react-icons/fa6"; +import { FiGrid } from "react-icons/fi"; +import { NavLink, Link as RouterLink, useSearchParams } from "react-router-dom"; + +import { DagIcon } from "src/assets/DagIcon"; + +type Props = { + readonly tabs: Array<{ label: string; value: string }>; +}; + +export const NavTabs = ({ tabs }: Props) => { + const [searchParams] = useSearchParams(); + + return ( + + + {tabs.map(({ label, value }) => ( + + {({ isActive }) => ( +
+ {label} +
+ )} +
+ ))} +
+ + + + + +
+ ); +}; diff --git a/airflow/ui/src/layouts/Nav/AdminButton.tsx b/airflow/ui/src/layouts/Nav/AdminButton.tsx new file mode 100644 index 00000000000000..d791d135860ce7 --- /dev/null +++ b/airflow/ui/src/layouts/Nav/AdminButton.tsx @@ -0,0 +1,48 @@ +/*! + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +import { FiSettings } from "react-icons/fi"; +import { Link } from "react-router-dom"; + +import { Menu } from "src/components/ui"; + +import { NavButton } from "./NavButton"; + +const links = [ + { + href: "/variables", + title: "Variables", + }, +]; + +export const AdminButton = () => ( + + + } title="Admin" /> + + + {links.map((link) => ( + + + {link.title} + + + ))} + + +); diff --git a/airflow/ui/src/layouts/Nav/Nav.tsx b/airflow/ui/src/layouts/Nav/Nav.tsx index f716d612e8af4c..297dd603874dc5 100644 --- a/airflow/ui/src/layouts/Nav/Nav.tsx +++ b/airflow/ui/src/layouts/Nav/Nav.tsx @@ -17,12 +17,13 @@ * under the License. */ import { Box, Flex, VStack, Link } from "@chakra-ui/react"; -import { FiCornerUpLeft, FiDatabase, FiHome, FiSettings } from "react-icons/fi"; +import { FiCornerUpLeft, FiDatabase, FiHome } from "react-icons/fi"; import { useVersionServiceGetVersion } from "openapi/queries"; import { AirflowPin } from "src/assets/AirflowPin"; import { DagIcon } from "src/assets/DagIcon"; +import { AdminButton } from "./AdminButton"; import { BrowseButton } from "./BrowseButton"; import { DocsButton } from "./DocsButton"; import { NavButton } from "./NavButton"; @@ -61,12 +62,7 @@ export const Nav = () => { to="assets" /> - } - title="Admin" - to="admin" - /> + { + const { dagId = "" } = useParams(); + + const { + data: dag, + error, + isLoading, + } = useDagServiceGetDagDetails({ + dagId, + }); + + // TODO: replace with with a list dag runs by dag id request + const { + data: runsData, + error: runsError, + isLoading: isLoadingRuns, + } = useDagsServiceRecentDagRuns({ dagIdPattern: dagId }, undefined, { + enabled: Boolean(dagId), + }); + + const runs = + runsData?.dags.find((dagWithRuns) => dagWithRuns.dag_id === dagId) + ?.latest_dag_runs ?? []; + + return ( + +
+ + ); +}; diff --git a/airflow/ui/src/pages/DagsList/Dag/Header.tsx b/airflow/ui/src/pages/Dag/Header.tsx similarity index 84% rename from airflow/ui/src/pages/DagsList/Dag/Header.tsx rename to airflow/ui/src/pages/Dag/Header.tsx index 39145f25197fa4..c7a4c0654e0344 100644 --- a/airflow/ui/src/pages/DagsList/Dag/Header.tsx +++ b/airflow/ui/src/pages/Dag/Header.tsx @@ -16,15 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -import { - Box, - Flex, - Heading, - HStack, - SimpleGrid, - Text, - VStack, -} from "@chakra-ui/react"; +import { Box, Flex, Heading, HStack, SimpleGrid, Text } from "@chakra-ui/react"; import { FiCalendar } from "react-icons/fi"; import type { @@ -35,11 +27,12 @@ import { DagIcon } from "src/assets/DagIcon"; import DagDocumentation from "src/components/DagDocumentation"; import DagRunInfo from "src/components/DagRunInfo"; import ParseDag from "src/components/ParseDag"; +import { Stat } from "src/components/Stat"; import { TogglePause } from "src/components/TogglePause"; import TriggerDAGTextButton from "src/components/TriggerDag/TriggerDAGTextButton"; import { Tooltip } from "src/components/ui"; -import { DagTags } from "../DagTags"; +import { DagTags } from "../DagsList/DagTags"; export const Header = ({ dag, @@ -50,7 +43,7 @@ export const Header = ({ readonly dagId?: string; readonly latestRun?: DAGRunResponse; }) => ( - + @@ -77,10 +70,7 @@ export const Header = ({ - - - Schedule - + {Boolean(dag?.timetable_summary) ? ( @@ -89,11 +79,8 @@ export const Header = ({ ) : undefined} - - - - Last Run - + + {Boolean(latestRun) && latestRun !== undefined ? ( ) : undefined} - - - - Next Run - + + {Boolean(dag?.next_dagrun) && dag !== undefined ? ( ) : undefined} - +
diff --git a/airflow/ui/src/pages/DagsList/Dag/Overview/Overview.tsx b/airflow/ui/src/pages/Dag/Overview/Overview.tsx similarity index 100% rename from airflow/ui/src/pages/DagsList/Dag/Overview/Overview.tsx rename to airflow/ui/src/pages/Dag/Overview/Overview.tsx diff --git a/airflow/ui/src/pages/DagsList/Dag/Overview/index.ts b/airflow/ui/src/pages/Dag/Overview/index.ts similarity index 100% rename from airflow/ui/src/pages/DagsList/Dag/Overview/index.ts rename to airflow/ui/src/pages/Dag/Overview/index.ts diff --git a/airflow/ui/src/pages/DagsList/Dag/Runs/Runs.tsx b/airflow/ui/src/pages/Dag/Runs/Runs.tsx similarity index 100% rename from airflow/ui/src/pages/DagsList/Dag/Runs/Runs.tsx rename to airflow/ui/src/pages/Dag/Runs/Runs.tsx diff --git a/airflow/ui/src/pages/DagsList/Dag/Runs/index.ts b/airflow/ui/src/pages/Dag/Runs/index.ts similarity index 100% rename from airflow/ui/src/pages/DagsList/Dag/Runs/index.ts rename to airflow/ui/src/pages/Dag/Runs/index.ts diff --git a/airflow/ui/src/pages/DagsList/Dag/Tasks/TaskCard.tsx b/airflow/ui/src/pages/Dag/Tasks/TaskCard.tsx similarity index 100% rename from airflow/ui/src/pages/DagsList/Dag/Tasks/TaskCard.tsx rename to airflow/ui/src/pages/Dag/Tasks/TaskCard.tsx diff --git a/airflow/ui/src/pages/DagsList/Dag/Tasks/TaskRecentRuns.tsx b/airflow/ui/src/pages/Dag/Tasks/TaskRecentRuns.tsx similarity index 100% rename from airflow/ui/src/pages/DagsList/Dag/Tasks/TaskRecentRuns.tsx rename to airflow/ui/src/pages/Dag/Tasks/TaskRecentRuns.tsx diff --git a/airflow/ui/src/pages/DagsList/Dag/Tasks/Tasks.tsx b/airflow/ui/src/pages/Dag/Tasks/Tasks.tsx similarity index 100% rename from airflow/ui/src/pages/DagsList/Dag/Tasks/Tasks.tsx rename to airflow/ui/src/pages/Dag/Tasks/Tasks.tsx diff --git a/airflow/ui/src/pages/DagsList/Dag/Tasks/index.ts b/airflow/ui/src/pages/Dag/Tasks/index.ts similarity index 100% rename from airflow/ui/src/pages/DagsList/Dag/Tasks/index.ts rename to airflow/ui/src/pages/Dag/Tasks/index.ts diff --git a/airflow/ui/src/pages/DagsList/Dag/index.ts b/airflow/ui/src/pages/Dag/index.ts similarity index 100% rename from airflow/ui/src/pages/DagsList/Dag/index.ts rename to airflow/ui/src/pages/Dag/index.ts diff --git a/airflow/ui/src/pages/DagsList/Dag/DagVizModal.tsx b/airflow/ui/src/pages/DagsList/Dag/DagVizModal.tsx deleted file mode 100644 index 9ab868b8f8d28a..00000000000000 --- a/airflow/ui/src/pages/DagsList/Dag/DagVizModal.tsx +++ /dev/null @@ -1,52 +0,0 @@ -/*! - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -import { Heading } from "@chakra-ui/react"; - -import type { DAGResponse } from "openapi/requests/types.gen"; -import { Dialog } from "src/components/ui"; - -import { Graph } from "./Graph"; - -type TriggerDAGModalProps = { - dagDisplayName?: DAGResponse["dag_display_name"]; - dagId?: DAGResponse["dag_id"]; - onClose: () => void; - open: boolean; -}; - -export const DagVizModal: React.FC = ({ - dagDisplayName, - dagId, - onClose, - open, -}) => ( - - - - - {Boolean(dagDisplayName) ? dagDisplayName : "Dag Undefined"} - - - - - {dagId === undefined ? undefined : } - - - -); diff --git a/airflow/ui/src/pages/DagsList/Dag/Tabs.tsx b/airflow/ui/src/pages/DagsList/Dag/Tabs.tsx deleted file mode 100644 index c418c24094f1e6..00000000000000 --- a/airflow/ui/src/pages/DagsList/Dag/Tabs.tsx +++ /dev/null @@ -1,92 +0,0 @@ -/*! - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -import { Button, Center, Flex } from "@chakra-ui/react"; -import { NavLink, useSearchParams } from "react-router-dom"; - -import type { DAGDetailsResponse } from "openapi/requests/types.gen"; -import { DagIcon } from "src/assets/DagIcon"; -import { capitalize } from "src/utils"; - -import { DagVizModal } from "./DagVizModal"; - -const tabs = ["overview", "runs", "tasks", "events", "code"]; - -const MODAL = "modal"; - -export const DagTabs = ({ dag }: { readonly dag?: DAGDetailsResponse }) => { - const [searchParams, setSearchParams] = useSearchParams(); - - const modal = searchParams.get(MODAL); - - const isGraphOpen = modal === "graph"; - - const onClose = () => { - searchParams.delete(MODAL); - setSearchParams(searchParams); - }; - - const onOpen = () => { - searchParams.set(MODAL, "graph"); - setSearchParams(searchParams); - }; - - return ( - <> - - - {tabs.map((tab) => ( - - {({ isActive }) => ( -
- {capitalize(tab)} -
- )} -
- ))} -
- - - -
- - - ); -}; diff --git a/airflow/ui/src/pages/DagsList/DagCard.tsx b/airflow/ui/src/pages/DagsList/DagCard.tsx index 418a169d393b69..0896e0bacf6950 100644 --- a/airflow/ui/src/pages/DagsList/DagCard.tsx +++ b/airflow/ui/src/pages/DagsList/DagCard.tsx @@ -16,19 +16,12 @@ * specific language governing permissions and limitations * under the License. */ -import { - Box, - Flex, - HStack, - Heading, - SimpleGrid, - VStack, - Link, -} from "@chakra-ui/react"; +import { Box, Flex, HStack, SimpleGrid, Link } from "@chakra-ui/react"; import { Link as RouterLink } from "react-router-dom"; import type { DAGWithLatestDagRunsResponse } from "openapi/requests/types.gen"; import DagRunInfo from "src/components/DagRunInfo"; +import { Stat } from "src/components/Stat"; import { TogglePause } from "src/components/TogglePause"; import TriggerDAGIconButton from "src/components/TriggerDag/TriggerDAGIconButton"; import { Tooltip } from "src/components/ui"; @@ -82,16 +75,10 @@ export const DagCard = ({ dag }: Props) => { - - - Schedule - + - - - - Latest Run - + + {latestRun ? ( { state={latestRun.state} /> ) : undefined} - - - - Next Run - + + {Boolean(dag.next_dagrun) ? ( { nextDagrunCreateAfter={dag.next_dagrun_create_after} /> ) : undefined} - + diff --git a/airflow/ui/src/pages/DagsList/DagsList.tsx b/airflow/ui/src/pages/DagsList/DagsList.tsx index 66f1bd8a7007e6..2ed1d117731184 100644 --- a/airflow/ui/src/pages/DagsList/DagsList.tsx +++ b/airflow/ui/src/pages/DagsList/DagsList.tsx @@ -206,7 +206,7 @@ export const DagsList = () => { const { data, error, isFetching, isLoading } = useDags({ dagDisplayNamePattern: Boolean(dagDisplayNamePattern) - ? `%${dagDisplayNamePattern}%` + ? `${dagDisplayNamePattern}` : undefined, lastDagRunState, limit: pagination.pageSize, @@ -237,6 +237,7 @@ export const DagsList = () => { buttonProps={{ disabled: true }} defaultValue={dagDisplayNamePattern ?? ""} onChange={handleSearchChange} + placeHolder="Search Dags" /> diff --git a/airflow/ui/src/pages/DagsList/Run/Run.tsx b/airflow/ui/src/pages/DagsList/Run/Run.tsx deleted file mode 100644 index 4011eeda3cbd29..00000000000000 --- a/airflow/ui/src/pages/DagsList/Run/Run.tsx +++ /dev/null @@ -1,57 +0,0 @@ -/*! - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -import { Box, Button, Heading } from "@chakra-ui/react"; -import { FiChevronsLeft } from "react-icons/fi"; -import { useParams, Link as RouterLink } from "react-router-dom"; - -import { useDagRunServiceGetDagRun } from "openapi/queries"; -import { ErrorAlert } from "src/components/ErrorAlert"; -import { ProgressBar, Status } from "src/components/ui"; - -export const Run = () => { - const { dagId = "", runId = "" } = useParams(); - - const { - data: dagRun, - error, - isLoading, - } = useDagRunServiceGetDagRun({ - dagId, - dagRunId: runId, - }); - - return ( - - - - {dagId}.{runId} - - - - {dagRun === undefined ? undefined : ( - {dagRun.state} - )} - - ); -}; diff --git a/airflow/ui/src/pages/Events/Events.tsx b/airflow/ui/src/pages/Events/Events.tsx index 028a388894edcf..703cfe7d6006bc 100644 --- a/airflow/ui/src/pages/Events/Events.tsx +++ b/airflow/ui/src/pages/Events/Events.tsx @@ -28,7 +28,9 @@ import { ErrorAlert } from "src/components/ErrorAlert"; import Time from "src/components/Time"; const eventsColumn = ( - dagId: string | undefined, + dagId?: string, + runId?: string, + taskId?: string, ): Array> => [ { accessorKey: "when", @@ -51,22 +53,30 @@ const eventsColumn = ( }, }, ]), - { - accessorKey: "run_id", - enableSorting: true, - header: "Run ID", - meta: { - skeletonWidth: 10, - }, - }, - { - accessorKey: "task_id", - enableSorting: true, - header: "Task ID", - meta: { - skeletonWidth: 10, - }, - }, + ...(Boolean(runId) + ? [] + : [ + { + accessorKey: "run_id", + enableSorting: true, + header: "Run ID", + meta: { + skeletonWidth: 10, + }, + }, + ]), + ...(Boolean(taskId) + ? [] + : [ + { + accessorKey: "task_id", + enableSorting: true, + header: "Task ID", + meta: { + skeletonWidth: 10, + }, + }, + ]), { accessorKey: "map_index", enableSorting: false, @@ -102,7 +112,7 @@ const eventsColumn = ( ]; export const Events = () => { - const { dagId } = useParams(); + const { dagId, runId, taskId } = useParams(); const { setTableURLState, tableURLState } = useTableURLState({ sorting: [{ desc: true, id: "when" }], }); @@ -121,13 +131,15 @@ export const Events = () => { limit: pagination.pageSize, offset: pagination.pageIndex * pagination.pageSize, orderBy, + runId, + taskId, }); return ( ( + + + + + + Run: + {dagRun.dag_run_id} + + {dagRun.state} + +
+ + + + {dagRun.note === null || dagRun.note.length === 0 ? undefined : ( + + + + {dagRun.note} + + + )} + + + + + {dagRun.run_type} + + + + + + + + {dayjs + .duration(dayjs(dagRun.end_date).diff(dagRun.start_date)) + .asSeconds() + .toFixed(2)} + s + + + +); diff --git a/airflow/ui/src/pages/Run/Run.tsx b/airflow/ui/src/pages/Run/Run.tsx new file mode 100644 index 00000000000000..5fea67519276b1 --- /dev/null +++ b/airflow/ui/src/pages/Run/Run.tsx @@ -0,0 +1,78 @@ +/*! + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +import { LiaSlashSolid } from "react-icons/lia"; +import { useParams, Link as RouterLink } from "react-router-dom"; + +import { + useDagRunServiceGetDagRun, + useDagServiceGetDagDetails, +} from "openapi/queries"; +import { Breadcrumb } from "src/components/ui"; +import { DetailsLayout } from "src/layouts/Details/DetailsLayout"; + +import { Header } from "./Header"; + +const tabs = [ + { label: "Task Instances", value: "" }, + { label: "Events", value: "events" }, + { label: "Code", value: "code" }, +]; + +export const Run = () => { + const { dagId = "", runId = "" } = useParams(); + + const { + data: dagRun, + error, + isLoading, + } = useDagRunServiceGetDagRun({ + dagId, + dagRunId: runId, + }); + + const { + data: dag, + error: dagError, + isLoading: isLoadinDag, + } = useDagServiceGetDagDetails({ + dagId, + }); + + return ( + + }> + + Dags + + + + {dag?.dag_display_name ?? dagId} + + + {runId} + + {dagRun === undefined ? undefined :
} + + ); +}; diff --git a/airflow/ui/src/pages/Run/TaskInstances.tsx b/airflow/ui/src/pages/Run/TaskInstances.tsx new file mode 100644 index 00000000000000..9503b2e20ad2e2 --- /dev/null +++ b/airflow/ui/src/pages/Run/TaskInstances.tsx @@ -0,0 +1,117 @@ +/*! + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +import { Box, Link } from "@chakra-ui/react"; +import type { ColumnDef } from "@tanstack/react-table"; +import dayjs from "dayjs"; +import { Link as RouterLink, useParams } from "react-router-dom"; + +import { useTaskInstanceServiceGetTaskInstances } from "openapi/queries"; +import type { TaskInstanceResponse } from "openapi/requests/types.gen"; +import { DataTable } from "src/components/DataTable"; +import { useTableURLState } from "src/components/DataTable/useTableUrlState"; +import { ErrorAlert } from "src/components/ErrorAlert"; +import Time from "src/components/Time"; +import { Status } from "src/components/ui"; + +const columns: Array> = [ + { + accessorKey: "task_display_name", + cell: ({ row: { original } }) => ( + + -1 ? `?map_index=${original.map_index}` : ""}`} + > + {original.task_display_name} + + + ), + header: "Task ID", + }, + { + accessorKey: "map_index", + header: "Map Index", + }, + { + accessorKey: "try_number", + enableSorting: false, + header: "Try Number", + }, + { + accessorKey: "state", + cell: ({ + row: { + original: { state }, + }, + }) => {state}, + header: () => "State", + }, + { + accessorKey: "operator", + enableSorting: false, + header: "Operator", + }, + { + accessorKey: "start_date", + cell: ({ row: { original } }) =>