Skip to content

Commit

Permalink
Merge branch 'apache:main' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
subkanthi authored Dec 30, 2021
2 parents 2cbccda + dcd4c49 commit 00fbfcb
Show file tree
Hide file tree
Showing 234 changed files with 932 additions and 821 deletions.
14 changes: 12 additions & 2 deletions airflow/cli/cli_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -738,6 +738,16 @@ def _check(value):
help="Kubernetes Namespace. Default value is `[kubernetes] namespace` in configuration.",
)

ARG_MIN_PENDING_MINUTES = Arg(
("--min-pending-minutes",),
default=30,
type=positive_int(allow_zero=False),
help=(
"Pending pods created before the time interval are to be cleaned up, "
"measured in minutes. Default value is 30(m). The minimum value is 5(m)."
),
)

# jobs check
ARG_JOB_TYPE_FILTER = Arg(
('--job-type',),
Expand Down Expand Up @@ -1526,10 +1536,10 @@ class GroupCommand(NamedTuple):
help=(
"Clean up Kubernetes pods "
"(created by KubernetesExecutor/KubernetesPodOperator) "
"in evicted/failed/succeeded states"
"in evicted/failed/succeeded/pending states"
),
func=lazy_load_command('airflow.cli.commands.kubernetes_command.cleanup_pods'),
args=(ARG_NAMESPACE,),
args=(ARG_NAMESPACE, ARG_MIN_PENDING_MINUTES),
),
ActionCommand(
name='generate-dag-yaml',
Expand Down
18 changes: 17 additions & 1 deletion airflow/cli/commands/kubernetes_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
"""Kubernetes sub-commands"""
import os
import sys
from datetime import datetime, timedelta

from kubernetes import client
from kubernetes.client.api_client import ApiClient
Expand Down Expand Up @@ -69,13 +70,22 @@ def generate_pod_yaml(args):

@cli_utils.action_cli
def cleanup_pods(args):
"""Clean up k8s pods in evicted/failed/succeeded states"""
"""Clean up k8s pods in evicted/failed/succeeded/pending states"""
namespace = args.namespace

min_pending_minutes = args.min_pending_minutes
# protect newly created pods from deletion
if min_pending_minutes < 5:
min_pending_minutes = 5

# https://kubernetes.io/docs/concepts/workloads/pods/pod-lifecycle/
# All Containers in the Pod have terminated in success, and will not be restarted.
pod_succeeded = 'succeeded'

# The Pod has been accepted by the Kubernetes cluster,
# but one or more of the containers has not been set up and made ready to run.
pod_pending = 'pending'

# All Containers in the Pod have terminated, and at least one Container has terminated in failure.
# That is, the Container either exited with non-zero status or was terminated by the system.
pod_failed = 'failed'
Expand Down Expand Up @@ -107,11 +117,17 @@ def cleanup_pods(args):
pod_phase = pod.status.phase.lower()
pod_reason = pod.status.reason.lower() if pod.status.reason else ''
pod_restart_policy = pod.spec.restart_policy.lower()
current_time = datetime.now(pod.metadata.creation_timestamp.tzinfo)

if (
pod_phase == pod_succeeded
or (pod_phase == pod_failed and pod_restart_policy == pod_restart_policy_never)
or (pod_reason == pod_reason_evicted)
or (
pod_phase == pod_pending
and current_time - pod.metadata.creation_timestamp
> timedelta(minutes=min_pending_minutes)
)
):
print(
f'Deleting pod "{pod_name}" phase "{pod_phase}" and reason "{pod_reason}", '
Expand Down
4 changes: 2 additions & 2 deletions airflow/providers/airbyte/operators/airbyte.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from typing import TYPE_CHECKING, Optional
from typing import TYPE_CHECKING, Optional, Sequence

from airflow.models import BaseOperator
from airflow.providers.airbyte.hooks.airbyte import AirbyteHook
Expand Down Expand Up @@ -51,7 +51,7 @@ class AirbyteTriggerSyncOperator(BaseOperator):
:type timeout: float
"""

template_fields = ('connection_id',)
template_fields: Sequence[str] = ('connection_id',)

def __init__(
self,
Expand Down
4 changes: 2 additions & 2 deletions airflow/providers/airbyte/sensors/airbyte.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
# specific language governing permissions and limitations
# under the License.
"""This module contains a Airbyte Job sensor."""
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Sequence

from airflow.exceptions import AirflowException
from airflow.providers.airbyte.hooks.airbyte import AirbyteHook
Expand All @@ -39,7 +39,7 @@ class AirbyteJobSensor(BaseSensorOperator):
:type api_version: str
"""

template_fields = ('airbyte_job_id',)
template_fields: Sequence[str] = ('airbyte_job_id',)
ui_color = '#6C51FD'

def __init__(
Expand Down
4 changes: 2 additions & 2 deletions airflow/providers/alibaba/cloud/sensors/oss_key.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
else:
from cached_property import cached_property

from typing import TYPE_CHECKING, Optional
from typing import TYPE_CHECKING, Optional, Sequence
from urllib.parse import urlparse

from airflow.exceptions import AirflowException
Expand Down Expand Up @@ -51,7 +51,7 @@ class OSSKeySensor(BaseSensorOperator):
:type oss_conn_id: Optional[str]
"""

template_fields = ('bucket_key', 'bucket_name')
template_fields: Sequence[str] = ('bucket_key', 'bucket_name')

def __init__(
self,
Expand Down
4 changes: 2 additions & 2 deletions airflow/providers/amazon/aws/operators/athena.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
#
import sys
import warnings
from typing import TYPE_CHECKING, Any, Dict, Optional
from typing import TYPE_CHECKING, Any, Dict, Optional, Sequence
from uuid import uuid4

if sys.version_info >= (3, 8):
Expand Down Expand Up @@ -64,7 +64,7 @@ class AthenaOperator(BaseOperator):
"""

ui_color = '#44b5e2'
template_fields = ('query', 'database', 'output_location')
template_fields: Sequence[str] = ('query', 'database', 'output_location')
template_ext = ('.sql',)
template_fields_renderers = {"query": "sql"}

Expand Down
4 changes: 2 additions & 2 deletions airflow/providers/amazon/aws/operators/batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
- http://boto3.readthedocs.io/en/latest/reference/services/batch.html
- https://docs.aws.amazon.com/batch/latest/APIReference/Welcome.html
"""
from typing import TYPE_CHECKING, Any, Optional
from typing import TYPE_CHECKING, Any, Optional, Sequence

from airflow.exceptions import AirflowException
from airflow.models import BaseOperator
Expand Down Expand Up @@ -97,7 +97,7 @@ class AwsBatchOperator(BaseOperator):

ui_color = "#c3dae0"
arn = None # type: Optional[str]
template_fields = (
template_fields: Sequence[str] = (
"job_name",
"overrides",
"parameters",
Expand Down
6 changes: 3 additions & 3 deletions airflow/providers/amazon/aws/operators/cloud_formation.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
# specific language governing permissions and limitations
# under the License.
"""This module contains CloudFormation create/delete stack operators."""
from typing import TYPE_CHECKING, List, Optional
from typing import TYPE_CHECKING, Optional, Sequence

from airflow.models import BaseOperator
from airflow.providers.amazon.aws.hooks.cloud_formation import CloudFormationHook
Expand All @@ -40,7 +40,7 @@ class CloudFormationCreateStackOperator(BaseOperator):
:type aws_conn_id: str
"""

template_fields: List[str] = ['stack_name']
template_fields: Sequence[str] = ('stack_name',)
template_ext = ()
ui_color = '#6b9659'

Expand Down Expand Up @@ -72,7 +72,7 @@ class CloudFormationDeleteStackOperator(BaseOperator):
:type aws_conn_id: str
"""

template_fields: List[str] = ['stack_name']
template_fields: Sequence[str] = ('stack_name',)
template_ext = ()
ui_color = '#1d472b'
ui_fgcolor = '#FFF'
Expand Down
4 changes: 2 additions & 2 deletions airflow/providers/amazon/aws/operators/datasync.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import logging
import random
import warnings
from typing import TYPE_CHECKING, List, Optional
from typing import TYPE_CHECKING, List, Optional, Sequence

from airflow.exceptions import AirflowException, AirflowTaskTimeout
from airflow.models import BaseOperator
Expand Down Expand Up @@ -111,7 +111,7 @@ class DataSyncOperator(BaseOperator):
:raises AirflowException: If Task creation, update, execution or delete fails.
"""

template_fields = (
template_fields: Sequence[str] = (
"task_arn",
"source_location_uri",
"destination_location_uri",
Expand Down
12 changes: 6 additions & 6 deletions airflow/providers/amazon/aws/operators/dms.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
# under the License.


from typing import TYPE_CHECKING, Dict, Optional
from typing import TYPE_CHECKING, Dict, Optional, Sequence

from airflow.models import BaseOperator
from airflow.providers.amazon.aws.hooks.dms import DmsHook
Expand Down Expand Up @@ -56,7 +56,7 @@ class DmsCreateTaskOperator(BaseOperator):
:type aws_conn_id: Optional[str]
"""

template_fields = (
template_fields: Sequence[str] = (
'replication_task_id',
'source_endpoint_arn',
'target_endpoint_arn',
Expand Down Expand Up @@ -134,7 +134,7 @@ class DmsDeleteTaskOperator(BaseOperator):
:type aws_conn_id: Optional[str]
"""

template_fields = ('replication_task_arn',)
template_fields: Sequence[str] = ('replication_task_arn',)
template_ext = ()
template_fields_renderers: Dict[str, str] = {}

Expand Down Expand Up @@ -174,7 +174,7 @@ class DmsDescribeTasksOperator(BaseOperator):
:type aws_conn_id: Optional[str]
"""

template_fields = ('describe_tasks_kwargs',)
template_fields: Sequence[str] = ('describe_tasks_kwargs',)
template_ext = ()
template_fields_renderers: Dict[str, str] = {'describe_tasks_kwargs': 'json'}

Expand Down Expand Up @@ -223,7 +223,7 @@ class DmsStartTaskOperator(BaseOperator):
:type aws_conn_id: Optional[str]
"""

template_fields = (
template_fields: Sequence[str] = (
'replication_task_arn',
'start_replication_task_type',
'start_task_kwargs',
Expand Down Expand Up @@ -276,7 +276,7 @@ class DmsStopTaskOperator(BaseOperator):
:type aws_conn_id: Optional[str]
"""

template_fields = ('replication_task_arn',)
template_fields: Sequence[str] = ('replication_task_arn',)
template_ext = ()
template_fields_renderers: Dict[str, str] = {}

Expand Down
6 changes: 3 additions & 3 deletions airflow/providers/amazon/aws/operators/ec2.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
# under the License.
#

from typing import TYPE_CHECKING, Optional
from typing import TYPE_CHECKING, Optional, Sequence

from airflow.models import BaseOperator
from airflow.providers.amazon.aws.hooks.ec2 import EC2Hook
Expand All @@ -41,7 +41,7 @@ class EC2StartInstanceOperator(BaseOperator):
:type check_interval: float
"""

template_fields = ("instance_id", "region_name")
template_fields: Sequence[str] = ("instance_id", "region_name")
ui_color = "#eeaa11"
ui_fgcolor = "#ffffff"

Expand Down Expand Up @@ -87,7 +87,7 @@ class EC2StopInstanceOperator(BaseOperator):
:type check_interval: float
"""

template_fields = ("instance_id", "region_name")
template_fields: Sequence[str] = ("instance_id", "region_name")
ui_color = "#eeaa11"
ui_fgcolor = "#ffffff"

Expand Down
4 changes: 2 additions & 2 deletions airflow/providers/amazon/aws/operators/ecs.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from datetime import datetime, timedelta
from logging import Logger
from threading import Event, Thread
from typing import Dict, Generator, Optional
from typing import Dict, Generator, Optional, Sequence

from botocore.exceptions import ClientError
from botocore.waiter import Waiter
Expand Down Expand Up @@ -225,7 +225,7 @@ class ECSOperator(BaseOperator):
"""

ui_color = '#f0ede4'
template_fields = ('overrides',)
template_fields: Sequence[str] = ('overrides',)
template_fields_renderers = {
"overrides": "json",
"network_configuration": "json",
Expand Down
Loading

0 comments on commit 00fbfcb

Please sign in to comment.