Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce RESTARTING state #16681

Merged
merged 8 commits into from
Jul 31, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions UPDATING.md
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,10 @@ If you are using DAGs Details API endpoint, use `max_active_tasks` instead of `c

When marking a task success/failed in Graph View, its downstream tasks that are in failed/upstream_failed state are automatically cleared.

### Clearing a running task sets its state to `RESTARTING`

Previously, clearing a running task sets its state to `SHUTDOWN`. The task gets killed and goes into `FAILED` state. After [#16681](https://github.com/apache/airflow/pull/16681), clearing a running task sets its state to `RESTARTING`. The task is eligible for retry without going into `FAILED` state.

### Remove `TaskInstance.log_filepath` attribute

This method returned incorrect values for a long time, because it did not take into account the different
Expand Down
2 changes: 1 addition & 1 deletion airflow/jobs/base_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ def heartbeat(self, only_if_necessary: bool = False):
session.merge(self)
previous_heartbeat = self.latest_heartbeat

if self.state == State.SHUTDOWN:
if self.state in State.terminating_states:
self.kill()

# Figure out how long to sleep for
Expand Down
2 changes: 1 addition & 1 deletion airflow/models/dagrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -477,7 +477,7 @@ def task_instance_scheduling_decisions(self, session: Session = None) -> TISched
schedulable_tis: List[TI] = []
changed_tis = False

tis = list(self.get_task_instances(session=session, state=State.task_states + (State.SHUTDOWN,)))
tis = list(self.get_task_instances(session=session, state=State.task_states))
self.log.debug("number of tis tasks for %s: %s task(s)", self, len(tis))
for ti in tis:
try:
Expand Down
11 changes: 9 additions & 2 deletions airflow/models/taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,9 @@ def clear_task_instances(
for ti in tis:
if ti.state == State.RUNNING:
if ti.job_id:
ti.state = State.SHUTDOWN
# If a task is cleared when running, set its state to RESTARTING so that
# the task is terminated and becomes eligible for retry.
ti.state = State.RESTARTING
job_ids.append(ti.job_id)
else:
task_id = ti.task_id
Expand Down Expand Up @@ -211,7 +213,7 @@ def clear_task_instances(
from airflow.jobs.base_job import BaseJob

for job in session.query(BaseJob).filter(BaseJob.id.in_(job_ids)).all():
job.state = State.SHUTDOWN
job.state = State.RESTARTING

if activate_dag_runs is not None:
warnings.warn(
Expand Down Expand Up @@ -1519,6 +1521,11 @@ def handle_failure_with_callback(

def is_eligible_to_retry(self):
"""Is task instance is eligible for retry"""
if self.state == State.RESTARTING:
# If a task is cleared when running, it goes into RESTARTING state and is always
# eligible for retry
return True

return self.task.retries and self.try_number <= self.max_tries

@provide_session
Expand Down
11 changes: 10 additions & 1 deletion airflow/utils/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ class TaskInstanceState(str, Enum):
QUEUED = "queued" # Executor has enqueued the task
RUNNING = "running" # Task is executing
SUCCESS = "success" # Task completed
SHUTDOWN = "shutdown" # External request to shut down
SHUTDOWN = "shutdown" # External request to shut down (e.g. marked failed when running)
RESTARTING = "restarting" # External request to restart (e.g. cleared when running)
FAILED = "failed" # Task errored out
UP_FOR_RETRY = "up_for_retry" # Task failed but has retries left
UP_FOR_RESCHEDULE = "up_for_reschedule" # A waiting `reschedule` sensor
Expand Down Expand Up @@ -84,6 +85,7 @@ class State:
SCHEDULED = TaskInstanceState.SCHEDULED
QUEUED = TaskInstanceState.QUEUED
SHUTDOWN = TaskInstanceState.SHUTDOWN
RESTARTING = TaskInstanceState.RESTARTING
UP_FOR_RETRY = TaskInstanceState.UP_FOR_RETRY
UP_FOR_RESCHEDULE = TaskInstanceState.UP_FOR_RESCHEDULE
UPSTREAM_FAILED = TaskInstanceState.UPSTREAM_FAILED
Expand All @@ -105,6 +107,7 @@ class State:
TaskInstanceState.RUNNING: 'lime',
TaskInstanceState.SUCCESS: 'green',
TaskInstanceState.SHUTDOWN: 'blue',
TaskInstanceState.RESTARTING: 'violet',
TaskInstanceState.FAILED: 'red',
TaskInstanceState.UP_FOR_RETRY: 'gold',
TaskInstanceState.UP_FOR_RESCHEDULE: 'turquoise',
Expand Down Expand Up @@ -159,6 +162,7 @@ def color_fg(cls, state):
TaskInstanceState.RUNNING,
TaskInstanceState.SENSING,
TaskInstanceState.SHUTDOWN,
TaskInstanceState.RESTARTING,
TaskInstanceState.UP_FOR_RETRY,
TaskInstanceState.UP_FOR_RESCHEDULE,
]
Expand All @@ -182,6 +186,11 @@ def color_fg(cls, state):
A list of states indicating that a task or dag is a success state.
"""

terminating_states = frozenset([TaskInstanceState.SHUTDOWN, TaskInstanceState.RESTARTING])
"""
A list of states indicating that a task has been terminated.
"""


class PokeState:
"""Static class with poke states constants used in smart operator."""
Expand Down
4 changes: 4 additions & 0 deletions airflow/www/static/css/graph.css
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,10 @@ g.node.shutdown rect {
stroke: blue;
}

g.node.restarting rect {
stroke: violet;
}

g.node.upstream_failed rect {
stroke: orange;
}
Expand Down
4 changes: 4 additions & 0 deletions airflow/www/static/css/tree.css
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ rect.shutdown {
fill: blue;
}

rect.restarting {
fill: violet;
}

rect.upstream_failed {
fill: orange;
}
Expand Down
2 changes: 1 addition & 1 deletion airflow/www/static/js/graph.js
Original file line number Diff line number Diff line change
Expand Up @@ -536,7 +536,7 @@ function getNodeState(nodeId, tis) {
// In this order, if any of these states appeared in childrenStates, return it as
// the group state.
const priority = ['failed', 'upstream_failed', 'up_for_retry', 'up_for_reschedule',
'queued', 'scheduled', 'sensing', 'running', 'shutdown', 'removed',
'queued', 'scheduled', 'sensing', 'running', 'shutdown', 'restarting', 'removed',
'no_status', 'success', 'skipped'];

return priority.find((state) => childrenStates.has(state)) || 'no_status';
Expand Down
2 changes: 2 additions & 0 deletions docs/apache-airflow/concepts/tasks.rst
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ The possible states for a Task Instance are:
* ``queued``: The task has been assigned to an Executor and is awaiting a worker
* ``running``: The task is running on a worker (or on a local/synchronous executor)
* ``success``: The task finished running without errors
* ``shutdown``: The task was externally requested to shut down when it was running
* ``restarting``: The task was externally requested to restart when it was running
* ``failed``: The task had an error during execution and failed to run
* ``skipped``: The task was skipped due to branching, LatestOnly, or similar.
* ``upstream_failed``: An upstream task failed and the :ref:`Trigger Rule <concepts:trigger-rules>` says we needed it
Expand Down
Binary file modified docs/apache-airflow/img/task_lifecycle_diagram.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
2 changes: 1 addition & 1 deletion tests/models/test_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -1432,7 +1432,7 @@ def test_clear_set_dagrun_state_for_parent_dag(self, dag_run_state):

@parameterized.expand(
[(state, State.NONE) for state in State.task_states if state != State.RUNNING]
+ [(State.RUNNING, State.SHUTDOWN)]
+ [(State.RUNNING, State.RESTARTING)]
) # type: ignore
def test_clear_dag(self, ti_state_begin, ti_state_end: Optional[str]):
dag_id = 'test_clear_dag'
Expand Down
2 changes: 1 addition & 1 deletion tests/www/views/test_views_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ def test_home(capture_templates, admin_client):
val_state_color_mapping = (
'const STATE_COLOR = {"failed": "red", '
'"null": "lightblue", "queued": "gray", '
'"removed": "lightgrey", "running": "lime", '
'"removed": "lightgrey", "restarting": "violet", "running": "lime", '
'"scheduled": "tan", "sensing": "lightseagreen", '
'"shutdown": "blue", "skipped": "pink", '
'"success": "green", "up_for_reschedule": "turquoise", '
Expand Down