Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Error handling for when Azure container log cannot be read in properly. #34627

Merged
merged 10 commits into from
Oct 13, 2023

Conversation

krisfur
Copy link
Contributor

@krisfur krisfur commented Sep 26, 2023

Added simple error handling for when the Azure container log cannot be read in properly, leading to an attribute error when get_logs() uses .splitlines(). Desired behaviour is to stop the run.
Closes #34516


^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in newsfragments.

@boring-cyborg
Copy link

boring-cyborg bot commented Sep 26, 2023

Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contribution Guide (https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst)
Here are some useful points:

  • Pay attention to the quality of your code (ruff, mypy and type annotations). Our pre-commits will help you with that.
  • In case of a new feature add useful documentation (in docstrings or in docs/ directory). Adding a new operator? Check this short guide Consider adding an example DAG that shows how users should use it.
  • Consider using Breeze environment for testing locally, it's a heavy docker but it ships with a working Airflow and a lot of integrations.
  • Be patient and persistent. It might take some time to get a review or get the final approval from Committers.
  • Please follow ASF Code of Conduct for all communication including (but not limited to) comments on Pull Requests, Mailing list and Slack.
  • Be sure to read the Airflow Coding style.
  • Always keep your Pull Requests rebased, otherwise your build might fail due to changes not related to your commits.
    Apache Airflow is a community-driven project and together we are making it better 🚀.
    In case of doubts contact the developers at:
    Mailing List: dev@airflow.apache.org
    Slack: https://s.apache.org/airflow-slack

@@ -319,6 +319,9 @@ def _monitor_logging(self, resource_group: str, name: str) -> int:
if state in ["Running", "Terminated", "Succeeded"]:
try:
logs = self._ci_hook.get_logs(resource_group, name)
if logs is None:
self.log.exception("Container log is broken, marking as failed.")
Copy link
Contributor

@Taragolis Taragolis Sep 26, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not familiar with Azure Container, so I'm not sure is unable to get logs should be a reason for mark task failed?

If so, then we should replace exception to error, they have the same severity the difference is attach or not stack trace to the log

Suggested change
self.log.exception("Container log is broken, marking as failed.")
self.log.error("Container log is broken, marking as failed.")

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed from exception to error as suggested.

As for behaviour - without stopping the task it falls into an infinite loop of reprovisioning and restarting the container as per issue #34516 - we've lost a bit of money because of it already unfortunately.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've just wondering are broken logs in container mean that container can't change it status to Terminated or Failed? So this code would be unavailable (interesting fact there is not condition for Succeeded

if state == "Terminated":
self.log.info("Container exited with detail_status %s", detail_status)
return exit_code
if state == "Failed":
self.log.error("Azure provision failure")
return 1

If it can than basically we need just need to skip write logs into the task, but it is optionally because AzureContainerInstancesOperator._log_last could work with None values.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reprovisioning and restarting the container as per issue #34516

I would like to a bit more detail here, I can't see anything related to re-provisioning here, if task failed here than it should be failed. Are you by any chance running the execute method manually?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fortunately from a behaviour standpoint returning 1 does correctly delete the container group:

[2023-09-27, 10:22:09 UTC] {container_instances.py:322} ERROR - Container log is broken. Marking as failed.
[2023-09-27, 10:22:09 UTC] {container_instances.py:262} INFO - Container had exit code: 1
[2023-09-27, 10:22:09 UTC] {container_instances.py:277} INFO - Deleting container group

But yes since getting logs fails and gives a None you can't really get a Terminated or Failed state, the whole thing gets lost and starts reprovisioning the container unless something is done such as returning 1 to kill it.

Copy link
Contributor Author

@krisfur krisfur Sep 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, it is possible that returning None logs is just a symptom of a more complex failure of getting something from the instance view or beyond? I tried debugging those bits but fell short of finding anything useful.

The solution in this PR stops the results of this issue from causing an infinite loop of restarts (which incurs costs if it happens in prod), but I agree it probably doesn't resolve the underlying logic problem, which I fail to investigate fully with my limited skills of a data engineer and limited knowledge of the airflow code.

Another solution (which worked) that I tested is checking if last_state == Running and state == Waiting which also catches the problem, but this state change only happens after those errors from None logs are thrown, so stopping it at the error point seems better.

Copy link
Contributor

@Taragolis Taragolis Sep 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The solution in this PR stops the results of this issue from causing an infinite loop of restarts

But my point that there is no any loops for restarts _monitor_logging it is only called once in execute method and there is not any loops for there

exit_code = self._monitor_logging(self.resource_group, self.name)

The creation happen couple lines above invoking _monitor_logging

self._ci_hook.create_or_update(self.resource_group, self.name, container_group)

So there is no legit way to return back from L266 to L262.

In additional I could see teardown in finally block

finally:
if exit_code == 0 or self.remove_on_error:
self.on_kill()

So many mysterious stuff happen in one place.

It would be nice if you could also provide a bit more detail about your deploying method and some DAG sample

Copy link
Contributor Author

@krisfur krisfur Sep 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting! Looking at execute if that function were rerun in any regard we'd see another "Container group started" message, but the only new messages when the bug happens are container state changing to waiting/pending, suggesting that monitor_logging never completes, just throws errors and then correctly continues working reporting the state having changed.

Interestingly when we run the same code that induces the log failure in azure containers deployed manually, instead of through airflow, they did not restart - which could either mean it's an airflow side issue, or that azure being asked to provide a log that is None sets itself back to waiting/pending and then it can't be solved on airflow side (i.e. issue Azure side that gets triggered when you try to grab a broken log, which we can't debug really).

Our deployment setup: deploying an azure container instance based on a simple docker image (which just has a pyenv with libraries and the script to run), our airflow is hosted on an azure VM in a virtual environment.

Dag used for our issue inducing test run:

import airflow
import datetime
from isodate import DT_BAS_COMPLETE
from lib.default_settings import defaults, creds
from airflow.decorators import task
import json
import airflow.providers.microsoft.azure.operators.azure_container_instances as ACI


class CaptureACIOperator(ACI.AzureContainerInstancesOperator):
    """
    Capture expansion for ACI operator
    custom_args are used for rendering ACI kwargs:
    map_index: index of expansion
    config: config for this mapped instance
    output: where to output (standard format {account_url}/{container}/{prefix}/{project}/{date})
    date: date of run
    project: self explanatory
    capture: not used here
    """
    def __init__(self, **kwargs):
        command = """
        source /root/pyenv/bin/activate
        python /root/ext/ds-models/bin/run_log_test.py
        CAPEXIT=$?
        echo "Exit code $CAPEXIT"
        exit $CAPEXIT
        """
        kwargs["command"] = ["/bin/bash", "-c", command]
        kwargs[
            "name"
        ] = f"ds-ci-log-test"
        
        super().__init__(**kwargs)

with airflow.DAG('log_test', description="",
        default_args=defaults.default_none, schedule_interval='0 0 * * 3', start_date=datetime.datetime(2022, 1, 1),
    catchup=False,tags=["development"]) as dag:

    t1 = CaptureACIOperator(
        dag=dag,
        task_id="log_test",
        image="asadsregistry.azurecr.io/dsmodel:log_test",
        ci_conn_id="azure-ds-ci-airflow-dspython",
        registry_conn_id="azure-asadsregistry",
        resource_group="Machine-Learning",
        region="UK South",
        memory_in_gb=8,
        cpu=4,
        pool="ci_cpus",
        pool_slots=4,
    )

The script to induce failures (just multiple TQDM progress bars on any code that takes a bit to run):

import re
import pandas as pd
import phonenumbers
import tldextract
from sentence_transformers import SentenceTransformer
from urlextract import URLExtract
import logging
from tqdm.auto import tqdm

tqdm.pandas()


def find_phone_numbers(text):
    uk_numbers = []
    numbers = []
    exclusive_uk = []
    exclusive_foreign = []
    for match in phonenumbers.PhoneNumberMatcher(text, "GB"):
        uk_numbers.append(
            phonenumbers.format_number(
                match.number, phonenumbers.PhoneNumberFormat.NATIONAL
            )
        )
    for match in phonenumbers.PhoneNumberMatcher(text, "US"):
        numbers.append(
            phonenumbers.format_number(
                match.number, phonenumbers.PhoneNumberFormat.NATIONAL
            )
        )
    for match in phonenumbers.PhoneNumberMatcher(text, "AU"):
        numbers.append(
            phonenumbers.format_number(
                match.number, phonenumbers.PhoneNumberFormat.NATIONAL
            )
        )
    for match in phonenumbers.PhoneNumberMatcher(text, "NZ"):
        numbers.append(
            phonenumbers.format_number(
                match.number, phonenumbers.PhoneNumberFormat.NATIONAL
            )
        )
    for match in phonenumbers.PhoneNumberMatcher(text, "CA"):
        numbers.append(
            phonenumbers.format_number(
                match.number, phonenumbers.PhoneNumberFormat.NATIONAL
            )
        )
    for match in phonenumbers.PhoneNumberMatcher(text, "ZA"):
        numbers.append(
            phonenumbers.format_number(
                match.number, phonenumbers.PhoneNumberFormat.NATIONAL
            )
        )

    exclusive_uk = [x for x in uk_numbers if x not in numbers]
    exclusive_foreign = [x for x in numbers if x not in uk_numbers]

    return pd.Series(
        [
            list(set(uk_numbers)),
            list(set(numbers)),
            list(set(exclusive_uk)),
            list(set(exclusive_foreign)),
        ]
    )


def find_urls(text):
    uk_urls = []
    foreign_urls = []
    extractor = URLExtract()
    links = list(set(extractor.find_urls(text)))

    for x in links:
        tld = tldextract.extract(x)
        suffix = tld.suffix
        if ".uk" in suffix:
            uk_urls.append(tld)
        if ".us" in suffix:
            foreign_urls.append(tld)
        if ".ca" in suffix:
            foreign_urls.append(tld)
        if ".nz" in suffix:
            foreign_urls.append(tld)
        if ".au" in suffix:
            foreign_urls.append(tld)
        if ".in" in suffix:
            foreign_urls.append(tld)
        if ".pk" in suffix:
            foreign_urls.append(tld)
    return pd.Series([uk_urls, foreign_urls])


df = pd.DataFrame()
datalist = [str(i) for i in range(int(10000))]
df["data"] = datalist

df[
    [
        "possible_uk_numbers",
        "possible_foreign_numbers",
        "definite_uk_numbers",
        "definite_foreign_numbers",
    ]
] = df.data.progress_apply(find_phone_numbers)
df[["uk_urls", "foreign_urls"]] = df.data.progress_apply(find_urls)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be honest I'm absolutely no idea how it possible to that behaviour happen 🤣
Because everything looks fine to me. DAG seems fine, in provider code also seems fine.
In general if this fix seems work for you I have no objections

May I ask which Executor and DB backend (Postgres or MySQL) are you use for Airflow?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm equally surprised haha. I'm suspecting this could be an issue Azure side that airflow just draws out when asking for logs at that error point. Do you have better contact with Azure devs (compared to a basic client) to maybe ask them?

executor = LocalExecutor
DB backend: postgresql+psycopg2

I suppose if we can't find the root of the issue then it would be prudent to at least handle this error since choosing no restarts and still getting restarts is quite problematic (and if we return 1 on this error then onFailure restarts will trigger properly anyway if someone has them turned on).

Co-authored-by: Wei Lee <weilee.rx@gmail.com>
@eladkal
Copy link
Contributor

eladkal commented Oct 13, 2023

Merging after checking with @Taragolis

@eladkal eladkal merged commit 546c850 into apache:main Oct 13, 2023
@boring-cyborg
Copy link

boring-cyborg bot commented Oct 13, 2023

Awesome work, congrats on your first merged pull request! You are invited to check our Issue Tracker for additional contributions.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Jobs in Azure Containers restart infinitely if logger crashes, despite retries being set to off.
6 participants