Skip to content

Commit

Permalink
Fix dag run selection (#38941)
Browse files Browse the repository at this point in the history
* Fix dag run link params

* Do run_id checks inside of the grid_data hook

* remove firstRunId context
  • Loading branch information
bbovenzi authored Apr 11, 2024
1 parent 05ba268 commit f87772f
Show file tree
Hide file tree
Showing 6 changed files with 36 additions and 56 deletions.
32 changes: 26 additions & 6 deletions airflow/www/static/js/api/useGridData.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@ import useFilters, {
FILTER_UPSTREAM_PARAM,
ROOT_PARAM,
} from "src/dag/useFilters";
import type { Task, DagRun, RunOrdering } from "src/types";
import type { Task, DagRun, RunOrdering, API } from "src/types";
import { camelCase } from "lodash";
import useSelection, { RUN_ID } from "src/dag/useSelection";
import useSelection from "src/dag/useSelection";

const DAG_ID_PARAM = "dag_id";

Expand Down Expand Up @@ -80,9 +80,12 @@ const useGridData = () => {
filterDownstream,
filterUpstream,
},
onBaseDateChange,
} = useFilters();
const { firstRunIdSetByUrl } = useSelection();

const {
onSelect,
selected: { taskId, runId },
} = useSelection();
const query = useQuery(
[
"gridData",
Expand All @@ -93,7 +96,7 @@ const useGridData = () => {
root,
filterUpstream,
filterDownstream,
firstRunIdSetByUrl,
runId,
],
async () => {
const params = {
Expand All @@ -105,11 +108,28 @@ const useGridData = () => {
[NUM_RUNS_PARAM]: numRuns,
[RUN_TYPE_PARAM]: runType,
[RUN_STATE_PARAM]: runState,
[RUN_ID]: firstRunIdSetByUrl || "",
};
const response = await axios.get<AxiosResponse, GridData>(gridDataUrl, {
params,
});
if (runId && !response.dagRuns.find((dr) => dr.runId === runId)) {
const dagRunUrl = getMetaValue("dag_run_url")
.replace("__DAG_ID__", dagId)
.replace("__DAG_RUN_ID__", runId);

// If the run id cannot be found in the response, try fetching it to see if its real and then adjust the base date filter
try {
const selectedRun = await axios.get<AxiosResponse, API.DAGRun>(
dagRunUrl
);
if (selectedRun?.executionDate) {
onBaseDateChange(selectedRun.executionDate);
}
// otherwise the run_id isn't valid and we should unselect it
} catch (e) {
onSelect({ taskId });
}
}
// turn off auto refresh if there are no active runs
if (!areActiveRuns(response.dagRuns)) stopRefresh();
return response;
Expand Down
14 changes: 1 addition & 13 deletions airflow/www/static/js/dag/Main.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ import FilterBar from "./nav/FilterBar";
import LegendRow from "./nav/LegendRow";
import useToggleGroups from "./useToggleGroups";
import keyboardShortcutIdentifier from "./keyboardShortcutIdentifier";
import { DagRunSelectionContext, RUN_ID } from "./useSelection";

const detailsPanelKey = "hideDetailsPanel";
const minPanelWidth = 300;
Expand All @@ -74,7 +73,7 @@ const headerHeight =
10
) || 0;

const MainInContext = () => {
const Main = () => {
const {
data: { groups },
isLoading,
Expand Down Expand Up @@ -319,15 +318,4 @@ const MainInContext = () => {
);
};

const Main = () => {
const [searchParams] = useSearchParams();
const [firstRunIdSetByUrl] = useState(searchParams.get(RUN_ID));

return (
<DagRunSelectionContext.Provider value={firstRunIdSetByUrl}>
<MainInContext />
</DagRunSelectionContext.Provider>
);
};

export default Main;
11 changes: 4 additions & 7 deletions airflow/www/static/js/dag/details/Header.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -51,17 +51,14 @@ const Header = ({ mapIndex }: Props) => {
} = useSelection();

const dagRun = dagRuns.find((r) => r.runId === runId);

const group = getTask({ taskId, task: groups });

// If runId and/or taskId can't be found remove the selection
// If taskId can't be found remove the selection
useEffect(() => {
if (runId && !dagRun && taskId && !group) {
clearSelection();
} else if (runId && !dagRun) {
onSelect({ taskId });
if (taskId && !group) {
onSelect({ runId });
}
}, [dagRun, taskId, group, runId, onSelect, clearSelection]);
}, [taskId, group, onSelect, runId]);

let runLabel;
if (dagRun && runId) {
Expand Down
7 changes: 0 additions & 7 deletions airflow/www/static/js/dag/useSelection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
* under the License.
*/

import { createContext, useContext } from "react";
import { useSearchParams } from "react-router-dom";
import {
LIMIT_PARAM,
Expand All @@ -35,13 +34,8 @@ export interface SelectionProps {
mapIndex?: number;
}

// The first run_id need to be treated differently from the selection, because it is used in backend to
// calculate the base_date, which we don't want jumping around when user is clicking in the grid.
export const DagRunSelectionContext = createContext<string | null>(null);

const useSelection = () => {
const [searchParams, setSearchParams] = useSearchParams();
const firstRunIdSetByUrl = useContext(DagRunSelectionContext);

// Clear selection, but keep other search params
const clearSelection = () => {
Expand Down Expand Up @@ -99,7 +93,6 @@ const useSelection = () => {
},
clearSelection,
onSelect,
firstRunIdSetByUrl,
};
};

Expand Down
1 change: 1 addition & 0 deletions airflow/www/templates/airflow/dag.html
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@
<meta name="datasets_api" content="{{ url_for('/api/v1.airflow_api_connexion_endpoints_dataset_endpoint_get_datasets') }}">
<meta name="event_logs_api" content="{{ url_for('/api/v1.airflow_api_connexion_endpoints_event_log_endpoint_get_event_logs') }}">
<meta name="audit_log_url" content="{{ url_for('LogModelView.list') }}">
<meta name="dag_run_url" content="{{ url_for('/api/v1.airflow_api_connexion_endpoints_dag_run_endpoint_get_dag_run', dag_id='__DAG_ID__', dag_run_id='__DAG_RUN_ID__') }}">

<!-- End Urls -->
<meta name="is_paused" content="{{ dag_is_paused }}">
Expand Down
27 changes: 4 additions & 23 deletions airflow/www/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -3167,7 +3167,6 @@ def task_instances(self):
def grid_data(self):
"""Return grid data."""
dag_id = request.args.get("dag_id")
run_id = request.args.get("dag_run_id")
dag = get_airflow_app().dag_bag.get_dag(dag_id)

if not dag:
Expand All @@ -3185,43 +3184,25 @@ def grid_data(self):
if num_runs is None:
num_runs = conf.getint("webserver", "default_dag_run_display_number")

dagrun = None
if run_id:
with create_session() as session:
dagrun = dag.get_dagrun(run_id=run_id, session=session)
if not dagrun:
return {"error": f"can't find dag_run_id={run_id}"}, 404
base_date = dagrun.execution_date
else:
try:
base_date = timezone.parse(request.args["base_date"], strict=True)
except (KeyError, ValueError):
base_date = dag.get_latest_execution_date() or timezone.utcnow()
try:
base_date = timezone.parse(request.args["base_date"], strict=True)
except (KeyError, ValueError):
base_date = dag.get_latest_execution_date() or timezone.utcnow()

with create_session() as session:
query = select(DagRun).where(DagRun.dag_id == dag.dag_id, DagRun.execution_date <= base_date)

run_types = request.args.getlist("run_type")
if run_types:
if run_id:
return {"error": "Can not provide filters when dag_run_id filter is selected."}, 400
query = query.where(DagRun.run_type.in_(run_types))

run_states = request.args.getlist("run_state")
if run_states:
if run_id:
return {"error": "Can not provide filters when dag_run_id filter is selected."}, 400
query = query.where(DagRun.state.in_(run_states))

dag_runs = wwwutils.sorted_dag_runs(
query, ordering=dag.timetable.run_ordering, limit=num_runs, session=session
)
if dagrun:
found_requested_run_id = any(True for d in dag_runs if d.run_id == run_id)
if not found_requested_run_id:
return {
"error": f"Dag with dag_run_id={run_id} found, but not in selected time range or filters."
}, 404

encoded_runs = [wwwutils.encode_dag_run(dr, json_encoder=utils_json.WebEncoder) for dr in dag_runs]
data = {
Expand Down

0 comments on commit f87772f

Please sign in to comment.