Skip to content
This repository was archived by the owner on Mar 15, 2023. It is now read-only.

Master update 20201111 #20

Open
wants to merge 48 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
b0f3071
update to main
wyndhblb Jun 9, 2021
3efda8c
rm the dispose
wyndhblb Jun 9, 2021
d72adb9
comments
wyndhblb Dec 11, 2020
2552b1a
put back atext to see
wyndhblb Dec 14, 2020
ee9b4d8
prints for debuggin'
wyndhblb Dec 14, 2020
466bd0e
force timedelta
wyndhblb Jun 9, 2021
0cc025d
revert
wyndhblb Dec 15, 2020
a919b1e
force timedelta
wyndhblb Jun 9, 2021
51eec27
job id
wyndhblb Jun 9, 2021
39ab649
json requests
wyndhblb Jun 9, 2021
cb40228
only instance if startdate is defined
wyndhblb Jun 9, 2021
eb0b57e
only instance if execdate is defined
wyndhblb Jan 13, 2021
e769f6f
attempt to set next_dagrun_create_after properly
wyndhblb Mar 3, 2021
b6e4650
timezone.utcnow()
wyndhblb Mar 3, 2021
cd9a50c
another self.next_dagrun_create_after
wyndhblb Mar 3, 2021
917042c
try a shunt of dag run explosion
wyndhblb Mar 18, 2021
11c3246
need the continue
wyndhblb Mar 18, 2021
2d2a625
fmt
wyndhblb Mar 18, 2021
fcf632c
shunt secrets hider for now
wyndhblb Jun 29, 2021
d28d992
rm comment
wyndhblb Jun 29, 2021
c0d9a12
rm space
wyndhblb Jun 29, 2021
191deb4
rm space
wyndhblb Jun 29, 2021
85a7799
update to main
wyndhblb Jun 9, 2021
453830d
Merge branch 'master' into master-update-20201111
wyndhblb Jun 29, 2021
7cfea23
isoliation level “tweak” for aurora
wyndhblb Jul 1, 2021
32825e6
update to main
wyndhblb Jun 9, 2021
c6197b2
Merge branch 'master' into master-update-20201111
wyndhblb Jul 1, 2021
2650093
update to main
wyndhblb Jun 9, 2021
bfd4ee6
Merge branch 'master' into master-update-20201111
wyndhblb Jul 1, 2021
b622bb3
don’t lock pool stats
wyndhblb Jul 2, 2021
1afa758
nope causes more probelms
wyndhblb Jul 2, 2021
75ec7fc
add isolation level back.
wyndhblb Jul 2, 2021
d6cee4d
UNIQUE_POD_ID_SEP == “-“ not “.”
wyndhblb Jul 18, 2021
58c972f
SigTerm != FAIL .. ugh
wyndhblb Jul 18, 2021
aa578c8
another place to “up for retry vs fail” on sigterm
wyndhblb Jul 18, 2021
b4189cd
revert this PR https://github.com/apache/airflow/pull/15172
wyndhblb Jul 18, 2021
351377f
log some things, as things _should_ be up for retries but are getting…
wyndhblb Jul 18, 2021
c12ec55
log another spot
wyndhblb Jul 19, 2021
7aa69b0
don't mark things upstream failed here :: there's a race
wyndhblb Jul 21, 2021
bdc9181
mend date getter
wyndhblb Aug 2, 2021
e268802
add skipped to "ok to pass" items
wyndhblb Aug 10, 2021
43a445b
some comments to know i've edited things
wyndhblb Aug 10, 2021
38bb95d
log more things for debugs on deadlock
wyndhblb Aug 10, 2021
b2139cd
re-filter for "skipped" things as it's not filtering properly
wyndhblb Aug 11, 2021
7e56d7e
erg devil may need flag_upstream_failed=True
wyndhblb Aug 11, 2021
efc9735
!=
wyndhblb Aug 11, 2021
983c202
race condition "retry vs fail" so "ignore" the upstream fail setters
wyndhblb Aug 12, 2021
bcd920b
fine tab err
wyndhblb Aug 12, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 22 additions & 8 deletions airflow/jobs/local_task_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,15 @@ def signal_handler(signum, frame):
"""Setting kill signal handler"""
self.log.error("Received SIGTERM. Terminating subprocesses")
self.on_kill()
self.task_instance.refresh_from_db()
if self.task_instance.state not in State.finished:
self.task_instance.set_state(State.FAILED)
self.task_instance._run_finished_callback(error="task received sigterm")
# TODO: [bobo] this behavior really SUCKS as it causing things
# to never retry when they should .. sigterms can come from
# all sorts of places "outside" airflow in K8 and this behavior halts
# all the jobs .. if something is truly wrong then after retries this will
# eventually fail
# self.task_instance.refresh_from_db()
# if self.task_instance.state not in State.finished:
# self.task_instance.set_state(State.FAILED)
# self.task_instance._run_finished_callback(error="task received sigterm")
raise AirflowException("LocalTaskJob received SIGTERM signal")

signal.signal(signal.SIGTERM, signal_handler)
Expand Down Expand Up @@ -153,11 +158,20 @@ def handle_task_exit(self, return_code: int) -> None:
# task exited by itself, so we need to check for error file
# in case it failed due to runtime exception/error
error = None

# TODO: [bobo] this behavior really SUCKS as it causing things
# to never retry when they should .. sigterms can come from
# all sorts of places "outside" airflow in K8 and this behavior halts
# all the jobs .. if something is truly wrong then after retries this will
# eventually fail

if self.task_instance.state == State.RUNNING:
# This is for a case where the task received a sigkill
# while running
self.task_instance.set_state(State.FAILED)
if self.task_instance.state != State.SUCCESS:
# This is for a case where the task received a sigkill
# while running ...
# [bobo] rather then just "fail the dag" mark it for RETRY
self.task_instance.set_state(State.UP_FOR_RETRY)
# TODO: [bobo]: if the task is skipped no error to mention
if self.task_instance.state != State.SUCCESS and self.task_instance.state != State.SKIPPED:
error = self.task_runner.deserialize_run_error()
self.task_instance._run_finished_callback(error=error)
if not self.task_instance.test_mode:
Expand Down
32 changes: 31 additions & 1 deletion airflow/jobs/scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -967,13 +967,38 @@ def _create_dag_runs(self, dag_models: Iterable[DagModel], session: Session) ->
session.query(DagRun.dag_id, DagRun.execution_date).filter(active_dagruns_filter).all()
)

# TODO[bobo] not the best thing since the next dag run query in _create_dagruns_for_dags
# may find things that have too many runs already, we need to short circuit this
active_runs_of_dags = dict(
session.query(DagRun.dag_id, func.count('*'))
.filter(
DagRun.dag_id.in_([o.dag_id for o in dag_models]),
DagRun.state == State.RUNNING, # pylint: disable=comparison-with-callable
DagRun.external_trigger.is_(False),
)
.group_by(DagRun.dag_id)
.all()
)

for dag_model in dag_models:
try:
dag = self.dagbag.get_dag(dag_model.dag_id, session=session)
except SerializedDagNotFound:
self.log.exception("DAG '%s' not found in serialized_dag table", dag_model.dag_id)
continue

# TODO[bobo] not the best thing since the next dag run query in _create_dagruns_for_dags
# may find things that have too many runs already, we need to short circuit this
active_runs_of_dag = active_runs_of_dags.get(dag.dag_id, 0)
if dag.max_active_runs and active_runs_of_dag >= dag.max_active_runs:
self.log.info(
"create_dag_runs: DAG %s is at (or above) max_active_runs (%d of %d), not creating any more runs",
dag.dag_id,
active_runs_of_dag,
dag.max_active_runs,
)
continue

dag_hash = self.dagbag.dags_hash.get(dag.dag_id)
# Explicitly check if the DagRun already exists. This is an edge case
# where a Dag Run is created but `DagModel.next_dagrun` and `DagModel.next_dagrun_create_after`
Expand Down Expand Up @@ -1043,7 +1068,10 @@ def _update_dag_next_dagruns(self, dag_models: Iterable[DagModel], session: Sess
active_runs_of_dag,
dag.max_active_runs,
)
dag_model.next_dagrun_create_after = None
# TODO[bobo] setting this to none seems to never get picked up
# again in the main dags_needing_dagruns loop, so we just give it
# a 5 min uptick from "now"
dag_model.next_dagrun_create_after = timezone.utcnow() + timedelta(seconds=300)
else:
dag_model.next_dagrun, dag_model.next_dagrun_create_after = dag.next_dagrun_info(
dag_model.next_dagrun
Expand Down Expand Up @@ -1120,6 +1148,8 @@ def _schedule_dag_run(
len(currently_active_runs),
dag_run.execution_date,
)
# update the last scheduled tick so we don't endlessly loop over things
dag_run.update_state(session=session, execute_callbacks=False)
return 0

self._verify_integrity_if_dag_changed(dag_run=dag_run, session=session)
Expand Down
7 changes: 6 additions & 1 deletion airflow/kubernetes/pod_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,11 @@ class PodGenerator:
:type extract_xcom: bool
"""

# TODO: [bobo] when making a pod name the default airflow does {name}.{uuid}
# that "dot" makes things non-address able (for spark driver modes)
# this we make things "-" so it is
UNIQUE_POD_ID_SEP = "-"

def __init__(
self,
pod: Optional[k8s.V1Pod] = None,
Expand Down Expand Up @@ -453,7 +458,7 @@ def make_unique_pod_id(pod_id: str) -> str:
# Strip trailing '-' and '.' as they can't be followed by '.'
trimmed_pod_id = pod_id[:MAX_LABEL_LEN].rstrip('-.')

safe_pod_id = f"{trimmed_pod_id}.{safe_uuid}"
safe_pod_id = f"{trimmed_pod_id}{PodGenerator.UNIQUE_POD_ID_SEP}{safe_uuid}"
return safe_pod_id


Expand Down
7 changes: 6 additions & 1 deletion airflow/models/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -1036,6 +1036,8 @@ def get_task_instances_before(
)
if min_date is None:
min_date = timezone.utc_epoch()
else:
min_date = min_date.execution_date
return self.get_task_instances(start_date=min_date, end_date=base_date, session=session)

@provide_session
Expand Down Expand Up @@ -2462,7 +2464,10 @@ def calculate_dagrun_date_fields(
active_runs_of_dag,
dag.max_active_runs,
)
self.next_dagrun_create_after = None
# TODO[bobo] setting this to none seems to never get picked up
# again in the main dags_needing_dagruns loop, so we just let the above computation ride
# commented out the below
# self.next_dagrun_create_after = None

log.info("Setting next_dagrun for %s to %s", dag.dag_id, self.next_dagrun)

Expand Down
16 changes: 16 additions & 0 deletions airflow/models/dagrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,9 @@ def update_state(
finished_tasks = info.finished_tasks
unfinished_tasks = info.unfinished_tasks

# TODO [bobo]: task_instance_scheduling_decisions should not include "SKIPPED" tasks but it's not
unfinished_tasks = [t for t in unfinished_tasks if t.state != State.SKIPPED]

none_depends_on_past = all(not t.task.depends_on_past for t in unfinished_tasks)
none_task_concurrency = all(t.task.task_concurrency is None for t in unfinished_tasks)

Expand Down Expand Up @@ -458,6 +461,11 @@ def update_state(
# if *all tasks* are deadlocked, the run failed
elif unfinished_tasks and none_depends_on_past and none_task_concurrency and not are_runnable_tasks:
self.log.error('Deadlock; marking run %s failed', self)
# TODO: [bobo] log some items for debugging
self.log.info(
"DEADLOCK: unfinished_tasks: %s, none_depends_on_past: %s, none_task_concurrency: %s, are_runnable_tasks: %s",
unfinished_tasks, none_depends_on_past, none_task_concurrency, are_runnable_tasks
)
self.set_state(State.FAILED)
if execute_callbacks:
dag.handle_callback(self, success=False, reason='all_tasks_deadlocked', session=session)
Expand Down Expand Up @@ -530,6 +538,14 @@ def _get_ready_tis(
# Check dependencies
for st in scheduleable_tasks:
old_state = st.state
# TODO: [bobo] there seems to be a race between when a "task as failed"
# the DB is marked state saved "fail" BUT the task is really UP_FOR_RETRY
# (see other places [bobo] where SIGTERMS cause this Failed state initially but
# then "resets" things to UP for retry, and that process also has a RACE
# so like the other race condition, we are going back to the old behavior
# of NOT marking these tasks as upstream failed as that too wedges the entire planet)
# the marking happens in "trigger_rule_dep.py"
# [bobo]: 2021-08-10 debuging way skipped are not skipping others setting flag_upstream_failed=True
if st.are_dependencies_met(
dep_context=DepContext(flag_upstream_failed=True, finished_tasks=finished_tasks),
session=session,
Expand Down
31 changes: 26 additions & 5 deletions airflow/models/taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -839,7 +839,7 @@ def get_previous_execution_date(
"""
self.log.debug("previous_execution_date was called")
prev_ti = self.get_previous_ti(state=state, session=session)
return prev_ti and pendulum.instance(prev_ti.execution_date)
return (prev_ti and prev_ti.execution_date) and pendulum.instance(prev_ti.execution_date)

@provide_session
def get_previous_start_date(
Expand All @@ -853,8 +853,7 @@ def get_previous_start_date(
"""
self.log.debug("previous_start_date was called")
prev_ti = self.get_previous_ti(state=state, session=session)
# prev_ti may not exist and prev_ti.start_date may be None.
return prev_ti and prev_ti.start_date and pendulum.instance(prev_ti.start_date)
return (prev_ti and prev_ti.start_date) and pendulum.instance(prev_ti.start_date)

@property
def previous_start_date_success(self) -> Optional[pendulum.DateTime]:
Expand Down Expand Up @@ -958,7 +957,10 @@ def next_retry_datetime(self):
delay_backoff_in_seconds = min(modded_hash, timedelta.max.total_seconds() - 1)
delay = timedelta(seconds=delay_backoff_in_seconds)
if self.task.max_retry_delay:
delay = min(self.task.max_retry_delay, delay)
td = self.task.max_retry_delay
if isinstance(td, int) or isinstance(td, float):
td = timedelta(seconds=td)
delay = min(td, delay)
return self.end_date + delay

def ready_for_retry(self):
Expand Down Expand Up @@ -1221,8 +1223,16 @@ def _run_raw_task(
if not test_mode:
session.add(Log(self.state, self))
session.merge(self)
self.log.info(
"[bobo] merge task_id: %s, state: %s",
self.task_id, self.state.upper()
)

session.commit()
self.log.info(
"[bobo] commit task_id: %s, state: %s",
self.task_id, self.state.upper()
)

def _prepare_and_execute_task_with_callbacks(self, context, task):
"""Prepare Task for Execution"""
Expand Down Expand Up @@ -1336,7 +1346,8 @@ def _run_finished_callback(self, error: Optional[Union[str, Exception]] = None)
context = self.get_template_context()
context["exception"] = error
task.on_failure_callback(context)
elif self.state == State.SUCCESS:
# TODO: [bobo]: run "success" callback if task has been skipped
elif self.state == State.SUCCESS or self.state == State.SKIPPED:
task = self.task
if task.on_success_callback is not None:
context = self.get_template_context()
Expand Down Expand Up @@ -1485,6 +1496,7 @@ def handle_failure(
self.state = State.FAILED
email_for_state = task.email_on_failure
else:
self.log.info("[bobo] setting tasks UP_FOR_RETRY")
self.state = State.UP_FOR_RETRY
email_for_state = task.email_on_retry

Expand All @@ -1497,7 +1509,16 @@ def handle_failure(

if not test_mode:
session.merge(self)
self.log.info(
"[bobo] merge task_id: %s, state: %s",
self.task_id, self.state.upper()
)

session.commit()
self.log.info(
"[bobo] post DB commit task_id: %s, state: %s",
self.task_id, self.state.upper()
)

@provide_session
def handle_failure_with_callback(
Expand Down
2 changes: 1 addition & 1 deletion airflow/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -446,10 +446,10 @@ def initialize():
# The webservers import this file from models.py with the default settings.
configure_orm()
configure_action_logging()

# Ensure we close DB connections at scheduler and gunicorn worker terminations
atexit.register(dispose_orm)

# pylint: enable=global-statement

# Const stuff

Expand Down
24 changes: 16 additions & 8 deletions airflow/ti_deps/deps/trigger_rule_dep.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,9 +126,12 @@ def _evaluate_trigger_rule(
# handling instant state assignment based on trigger rules
if flag_upstream_failed:
if trigger_rule == TR.ALL_SUCCESS:
if upstream_failed or failed:
ti.set_state(State.UPSTREAM_FAILED, session)
elif skipped:
# TODO [bobo]: race condition between "retry vs fail" so just "don't" do this
#if upstream_failed or failed:
# ti.set_state(State.UPSTREAM_FAILED, session)
#elif skipped:
# ti.set_state(State.SKIPPED, session)
if skipped:
ti.set_state(State.SKIPPED, session)
elif trigger_rule == TR.ALL_FAILED:
if successes or skipped:
Expand All @@ -144,12 +147,17 @@ def _evaluate_trigger_rule(
if upstream_done and not (failed or upstream_failed):
ti.set_state(State.SKIPPED, session)
elif trigger_rule == TR.NONE_FAILED:
if upstream_failed or failed:
ti.set_state(State.UPSTREAM_FAILED, session)
# TODO [bobo]: race condition "retry vs fail" so just "don't" do this
#if upstream_failed or failed:
# ti.set_state(State.UPSTREAM_FAILED, session)
pass
elif trigger_rule == TR.NONE_FAILED_OR_SKIPPED:
if upstream_failed or failed:
ti.set_state(State.UPSTREAM_FAILED, session)
elif skipped == upstream:
#if upstream_failed or failed:
# ti.set_state(State.UPSTREAM_FAILED, session)
#elif skipped == upstream:
# ti.set_state(State.SKIPPED, session)
# TODO [bobo]: race condition "retry vs fail" so just "don't" do this
if skipped == upstream:
ti.set_state(State.SKIPPED, session)
elif trigger_rule == TR.NONE_SKIPPED:
if skipped:
Expand Down
5 changes: 5 additions & 0 deletions airflow/utils/json.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,5 +78,10 @@ def _default(obj):
from airflow.kubernetes.pod_generator import PodGenerator

return PodGenerator.serialize_pod(obj)
elif k8s is not None and isinstance(obj, k8s.V1ResourceRequirements):
return {
"limits": obj.limits,
"requests": obj.requests
}

raise TypeError(f"Object of type '{obj.__class__.__name__}' is not JSON serializable")