Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Running task logs is incomplete #91

Closed
zhangqinyang opened this issue Nov 9, 2023 · 12 comments
Closed

Running task logs is incomplete #91

zhangqinyang opened this issue Nov 9, 2023 · 12 comments

Comments

@zhangqinyang
Copy link

Describe the bug
Airflow: 2.5.0
kubernetesjoboperator: 2.0.12
description:
The task running log output is incomplete. When the task runs for more than 4 hours, the Airflow task will stop updating and display error information "Task is not able to be run", but the container in the Kubernetes still outputs logs normally.

image

@LamaAni
Copy link
Owner

LamaAni commented Nov 9, 2023

Hi @zhangqinyang

I could not reproduce the error. I may be able to check this more later on but it may take a couple of weeks.

Looking at the log, I think there are two possible scenarios,

  1. The kube watch was disconnected from the cluster - thus stopping the log. I am not seeing any warnings about this so that is less plausible.
  2. The underlying airflow task (in taskinstance.py) timed-out or has not received a task update. But you said the pod is still running. This is odd since I would expect airflow to send a "Stop" command to the task, therefore triggering the Task cleanup - killing the all the resources.

This leads me to think that you may have some time limit in the underlining airflow, or maybe there is a missing command in the operator to update the task in the case of long runs... not sure.

Also, I find the 4 hours mark very strange - if there was an error/disconnect and not a timeout I would expect random execution times, rather then a stable time span. Is there a max connection time there? Was there other runs that lasted less/more time?

To check that the timeout is not coming from airflow:

  1. Create an airflow Python Task (PythonOperator) or Bash Task (BashOperator) which sleeps for 8 hours.
  2. The task should log every 10 seconds a log line. (So log + sleep 10).
  3. Get the log print and see if the task is complete.

To check that the timeout is a connection/kubernetes time:

  1. Run a task - single task in dag - for more then 4 hours using the KubernetesJobOperator
  2. Run a task - single task in dag - for more then 4 hours using the KubernetesPodOperator (different connection scheme)
  3. Make sure the task is like the above, sleep and log print, in bash if possible.

Also can you specify what kind of cloud structure and which cloud system you are running on. I know that there is some issue with the AWS Kubernetes cloud specified in this thread: #54

@zhangqinyang
Copy link
Author

According to your suggestion, I ran three tasks, bashoperator, kubernetespodoperator, and bashoperator. Based on the log results, only the kubernetesjoboperator task experienced a four-hour log interruption. Here are my dag.py and the corresponding pod and job YAML files
dag file of bashoperator

from airflow import DAG
from datetime import `datetime`
from airflow.operators.bash import BashOperator

dag = DAG(
    "c36e2d9000c54489a6fdd6b5f4e8bash",
    start_date=datetime(2022, 1, 1),
    schedule_interval=None,
    is_paused_upon_creation=False,
    max_active_runs=1,
    default_args={
        'email_on_failure': False,
        'email_on_retry': False,
        'retries': 0
    },
    tags=["test_dag"]
)


Bh6miAZv = BashOperator(
    task_id="Bh6miAZv",
    dag=dag,
    bash_command="while true;do echo 1 && sleep 10;done",
)

dag file of kubernetestpodoperator

from airflow import DAG
from datetime import datetime
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator

config_file = "/opt/airflow/dags/files/kubernetes-admin.kubeconfig"

dag = DAG(
    "c36e2d9000c54489a6fdd6b5f4e86pod",
    start_date=datetime(2022, 1, 1),
    schedule_interval=None,
    is_paused_upon_creation=False,
    max_active_runs=1,
    default_args={
        'email_on_failure': False,
        'email_on_retry': False,
        'retries': 0
    },
    tags=["test_dag"]
)

Bh8miAZv = KubernetesPodOperator(
    task_id="Bh8miAZv",
    namespace="queue-eva-cuda11-2gpu",
    in_cluster=False,
    config_file=config_file,
    dag=dag,
    pod_template_file="/opt/airflow/dags/yaml/pod.yaml",
)

pod.yaml

apiVersion: v1
kind: Pod
metadata:
  name: test-pod
  namespace: queue-eva-cuda11-2gpu
spec:
  containers:
  - name: cont-test-pod
    image: time_sleep_test:v1.1
    imagePullPolicy: IfNotPresent
    command: ["sh", "-c", "python3 run.py"]
    resources:
      limits:
        cpu: 1
        memory: 2G
        nvidia.com/gpu: 0
      requests:
        cpu: 1
        memory: 2G
        nvidia.com/gpu: 0
  imagePullSecrets:
  - name: registrykey
  nodeSelector:
    ns: queue-eva-cuda11-2gpu

dag file of kubernetesjoboperator

from airflow import DAG
from airflow_kubernetes_job_operator.kubernetes_job_operator import KubernetesJobOperator
from datetime import datetime

config_file = "/opt/airflow/dags/files/kubernetes-admin.kubeconfig"

dag = DAG(
    "c36e2d9000c54489a6fdd6b5f4e86bfd",
    start_date=datetime(2022, 1, 1),
    schedule_interval=None,
    is_paused_upon_creation=False,
    max_active_runs=1,
    default_args={
        'email_on_failure': False,
        'email_on_retry': False,
        'retries': 0
    },
    tags=["test_dag"]
)

bh7miazv = KubernetesJobOperator(
    task_id="Bh7miAZv",
    namespace="queue-eva-cuda11-2gpu",
    in_cluster=False,
    config_file=config_file,
    body_filepath="/opt/airflow/dags/yaml/kube.yaml",
    dag=dag,
    image="dynamic_input_new:0.0.1",
    command=["bash", "-c", "while true;do echo 1 && sleep 10;done"],
)

job.yaml

apiVersion: batch/v1
kind: Job
metadata:
  name: test-job
  namespace: queue-eva-cuda11-2gpu
spec:
  suspend: true
  template:
    spec:
      containers:
        - name: cont-test-job
          image: dynamic_input_new:0.0.1
          command: ["bash", "-c", "while 1 do echo 1; sleep 1000; done"]
          resources:
            requests:
              cpu: 1
              nvidia.com/gpu: 0
              memory: 2Gi
              ephemeral-storage: 10Gi
            limits:
              cpu: 1
              nvidia.com/gpu: 0
              memory: 2Gi
              ephemeral-storage: 10Gi
          volumeMounts:
            - name: localtime
              mountPath: "/usr/share/zoneinfo/UTC"
              readOnly: True
            - name: timezone
              mountPath: "/etc/timezone"
              readOnly: True
      priorityClassName: high-priority
      restartPolicy: Never
      imagePullSecrets:
        - name: registrykey
      nodeSelector:
        ns: queue-eva-cuda11-2gpu
      volumes:
        - name: localtime
          hostPath:
            path: /usr/share/zoneinfo/Asia/Shanghai
        - name: timezone
          hostPath:
            path: /etc/timezone

I don't know if there are any other suggestions, look forward to your reply

@LamaAni
Copy link
Owner

LamaAni commented Nov 15, 2023

Hi. Let me set up the system and give that a go. Seems to be coming from the operator. May take a few weeks since I'm dealing with some things at work.

Can you share all of the cloud/os/airflow details?

@zhangqinyang
Copy link
Author

Hi. Let me set up the system and give that a go. Seems to be coming from the operator. May take a few weeks since I'm dealing with some things at work.

Can you share all of the cloud/os/airflow details?

Cloud:BCC(Baidu Cloud Compute)
OS:CentOS7.5

# uname -a
Linux 0i 3.10.0-1160.80.1.el7.x86_64 #1 SMP Tue Nov 8 15:48:59 UTC 2022 x86_64 x86_64 x86_64 GNU/Linux

Airflow:2.5.0
KubernetesJobOperator:2.0.12

@LamaAni
Copy link
Owner

LamaAni commented Nov 22, 2023

I was able to reproduce the issue, not sure of cause yet.

Using: k0s (local server)
Running:

from datetime import timedelta
from utils import default_args, name_from_file
from airflow import DAG
from airflow_kubernetes_job_operator.kubernetes_job_operator import (
    KubernetesJobOperator,
)

dag = DAG(
    name_from_file(__file__),
    default_args=default_args,
    description="Test base job operator",
    schedule_interval=None,
    catchup=False,
)

envs = {
    "PASS_ARG": "a test",
}

total_time_seconds = round(timedelta(hours=4.5).total_seconds())

KubernetesJobOperator(
    task_id="test-long-job-success",
    body_filepath="./templates/test_long_job.yaml",
    envs={
        "PASS_ARG": "a long test",
        "TIC_COUNT": str(total_time_seconds),
    },
    dag=dag,
)


if __name__ == "__main__":
   dag.test()
apiVersion: batch/v1
kind: Job
metadata: {}
spec:
  template:
    spec:
      restartPolicy: Never
      containers:
        - name: job-executor
          image: ubuntu
          command:
            - bash
            - -c
            - |
              #/usr/bin/env bash
              : "${SLEEP_INTERVAL:=10}"
              echo "Starting $PASS_ARG (Sleep interval $SLEEP_INTERVAL)"
              local elapsed_time=0
              while true; do
                  sleep $SLEEP_INTERVAL
                  elapsed_time=$((elapsed_time + $SLEEP_INTERVAL))
                  echo "Elapsed $elapsed_time [seconds]"
                  if [ "$elapsed_time" -ge "$TIC_COUNT" ]; then
                      break
                  fi
                  
              done
              echo "Complete"
          env:
            - name: TIC_COUNT
              value: '10800' # 3 hrs

            - name: SLEEP_INTERVAL
              value: '60'
  backoffLimit: 0

@zhangqinyang
Copy link
Author

I have made no progress on this issue yet. Thank you for the information, and I hope to find a solution soon. Thanks again.

@LamaAni
Copy link
Owner

LamaAni commented Nov 22, 2023

Found and fixed? the issue @ #93 and merge to master.

The issue was that once 4 hours have passed, Kubernetes closes the connection on its side, but returns no errors. Since its follow, the query should restart.

  1. Added query restart in this case.
  2. Added support for proper sinceTime requirement on the query.

This would allow the logs query to essentially last forever. This dose add some call overhead whilst Kubernetes is deleting the resources, but that should be not a lot (maybe 10 more calls at the end of the sequence).

Please, if you can, do test by installing master branch as the package and let me know.

@LamaAni
Copy link
Owner

LamaAni commented Nov 23, 2023

My tests were successful, 4.5 hrs run without issue. Once you confirm on your end I'll make another release.

@zhangqinyang
Copy link
Author

Thank you very much for your support. I will quickly use the master branch to verify the issue, and I will respond to you with the exact results as soon as possible.

@LamaAni
Copy link
Owner

LamaAni commented Nov 28, 2023

Hi, any update? I want to create a new release

@zhangqinyang
Copy link
Author

I apologize for the delayed response as I was validating multiple long tasks. I have now verified in the environment, and the issue has been resolved. Thank you very much for your support.

@LamaAni
Copy link
Owner

LamaAni commented Dec 2, 2023

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants