Skip to content

Commit

Permalink
Merge branch 'main' into bugfix/27931-AIP-51-Executor-Coupling-in-Log…
Browse files Browse the repository at this point in the history
…ging
  • Loading branch information
snjypl authored Jan 18, 2023
2 parents f31b2b3 + bc5cecc commit 1e385ac
Show file tree
Hide file tree
Showing 27 changed files with 675 additions and 91 deletions.
4 changes: 0 additions & 4 deletions airflow/config_templates/airflow_local_settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,6 @@
from airflow.configuration import conf
from airflow.exceptions import AirflowException

# TODO: Logging format and level should be configured
# in this file instead of from airflow.cfg. Currently
# there are other log format and level configurations in
# settings.py and cli.py. Please see AIRFLOW-1455.
LOG_LEVEL: str = conf.get_mandatory_value("logging", "LOGGING_LEVEL").upper()


Expand Down
2 changes: 2 additions & 0 deletions airflow/datasets/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from airflow.configuration import conf
from airflow.datasets import Dataset
from airflow.models.dataset import DatasetDagRunQueue, DatasetEvent, DatasetModel
from airflow.stats import Stats
from airflow.utils.log.logging_mixin import LoggingMixin

if TYPE_CHECKING:
Expand Down Expand Up @@ -66,6 +67,7 @@ def register_dataset_change(
)
)
session.flush()
Stats.incr("dataset.updates")
if dataset_model.consuming_dags:
self._queue_dagruns(dataset_model, session)
session.flush()
Expand Down
26 changes: 19 additions & 7 deletions airflow/executors/kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -498,13 +498,14 @@ def clear_not_launched_queued_tasks(self, session=None) -> None:
"""
Clear tasks that were not yet launched, but were previously queued.
Tasks can end up in a "Queued" state through either the executor being
abruptly shut down (leaving a non-empty task_queue on this executor)
or when a rescheduled/deferred operator comes back up for execution
(with the same try_number) before the pod of its previous incarnation
has been fully removed (we think).
Tasks can end up in a "Queued" state through when a rescheduled/deferred
operator comes back up for execution (with the same try_number) before the
pod of its previous incarnation has been fully removed (we think).
This method checks each of those tasks to see if the corresponding pod
It's also possible when an executor abruptly shuts down (leaving a non-empty
task_queue on that executor), but that scenario is handled via normal adoption.
This method checks each of our queued tasks to see if the corresponding pod
is around, and if not, and there's no matching entry in our own
task_queue, marks it for re-execution.
"""
Expand Down Expand Up @@ -832,7 +833,18 @@ def try_adopt_task_instances(self, tis: Sequence[TaskInstance]) -> Sequence[Task
kube_client: client.CoreV1Api = self.kube_client
for scheduler_job_id in scheduler_job_ids:
scheduler_job_id = pod_generator.make_safe_label_value(str(scheduler_job_id))
query_kwargs = {"label_selector": f"airflow-worker={scheduler_job_id}"}
# We will look for any pods owned by the no-longer-running scheduler,
# but will exclude only successful pods, as those TIs will have a terminal state
# and not be up for adoption!
# Those workers that failed, however, are okay to adopt here as their TI will
# still be in queued.
query_kwargs = {
"field_selector": "status.phase!=Succeeded",
"label_selector": (
"kubernetes_executor=True,"
f"airflow-worker={scheduler_job_id},{POD_EXECUTOR_DONE_KEY}!=True"
),
}
pod_list = self._list_pods(query_kwargs)
for pod in pod_list:
self.adopt_launched_task(kube_client, pod, pod_ids)
Expand Down
12 changes: 9 additions & 3 deletions airflow/jobs/scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -1198,6 +1198,7 @@ def _create_dag_runs_dataset_triggered(
dag_hash=dag_hash,
creating_job_id=self.id,
)
Stats.incr("dataset.triggered_dagruns")
dag_run.consumed_dataset_events.extend(dataset_events)
session.query(DatasetDagRunQueue).filter(
DatasetDagRunQueue.target_dag_id == dag_run.dag_id
Expand Down Expand Up @@ -1607,6 +1608,11 @@ def _cleanup_stale_dags(self, session: Session = NEW_SESSION) -> None:
SerializedDagModel.remove_dag(dag_id=dag.dag_id, session=session)
session.flush()

def _set_orphaned(self, dataset: DatasetModel) -> int:
self.log.info("Orphaning unreferenced dataset '%s'", dataset.uri)
dataset.is_orphaned = expression.true()
return 1

@provide_session
def _orphan_unreferenced_datasets(self, session: Session = NEW_SESSION) -> None:
"""
Expand All @@ -1633,6 +1639,6 @@ def _orphan_unreferenced_datasets(self, session: Session = NEW_SESSION) -> None:
)
)
)
for dataset in orphaned_dataset_query:
self.log.info("Orphaning unreferenced dataset '%s'", dataset.uri)
dataset.is_orphaned = expression.true()

updated_count = sum(self._set_orphaned(dataset) for dataset in orphaned_dataset_query)
Stats.gauge("dataset.orphaned", updated_count)
7 changes: 6 additions & 1 deletion airflow/models/xcom.py
Original file line number Diff line number Diff line change
Expand Up @@ -731,7 +731,12 @@ def __getstate__(self) -> Any:
# do the same for count(), but I think it should be performant enough to
# calculate only that eagerly.
with self._get_bound_query() as query:
statement = query.statement.compile(query.session.get_bind())
statement = query.statement.compile(
query.session.get_bind(),
# This inlines all the values into the SQL string to simplify
# cross-process commuinication as much as possible.
compile_kwargs={"literal_binds": True},
)
return (str(statement), query.count())

def __setstate__(self, state: Any) -> None:
Expand Down
35 changes: 33 additions & 2 deletions airflow/providers/apache/beam/hooks/beam.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from __future__ import annotations

import contextlib
import copy
import json
import os
import select
Expand Down Expand Up @@ -310,11 +311,10 @@ def start_go_pipeline(
should_init_module: bool = False,
) -> None:
"""
Starts Apache Beam Go pipeline.
Starts Apache Beam Go pipeline with a source file.
:param variables: Variables passed to the job.
:param go_file: Path to the Go file with your beam pipeline.
:param go_file:
:param process_line_callback: (optional) Callback that can be used to process each line of
the stdout and stderr file descriptors.
:param should_init_module: If False (default), will just execute a `go run` command. If True, will
Expand Down Expand Up @@ -346,3 +346,34 @@ def start_go_pipeline(
process_line_callback=process_line_callback,
working_directory=working_directory,
)

def start_go_pipeline_with_binary(
self,
variables: dict,
launcher_binary: str,
worker_binary: str,
process_line_callback: Callable[[str], None] | None = None,
) -> None:
"""
Starts Apache Beam Go pipeline with an executable binary.
:param variables: Variables passed to the job.
:param launcher_binary: Path to the binary compiled for the launching platform.
:param worker_binary: Path to the binary compiled for the worker platform.
:param process_line_callback: (optional) Callback that can be used to process each line of
the stdout and stderr file descriptors.
"""
job_variables = copy.deepcopy(variables)

if "labels" in job_variables:
job_variables["labels"] = json.dumps(job_variables["labels"], separators=(",", ":"))

job_variables["worker_binary"] = worker_binary

command_prefix = [launcher_binary]

self._start_pipeline(
variables=job_variables,
command_prefix=command_prefix,
process_line_callback=process_line_callback,
)
Loading

0 comments on commit 1e385ac

Please sign in to comment.