Skip to content

Commit

Permalink
Attempt to stabilize flaky celery integration test (apache#39892)
Browse files Browse the repository at this point in the history
  • Loading branch information
potiuk authored May 28, 2024
1 parent 53970a8 commit 729bb64
Showing 1 changed file with 12 additions and 1 deletion.
13 changes: 12 additions & 1 deletion tests/integration/executors/test_celery_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import os
import sys
from datetime import datetime
from time import sleep
from unittest import mock

# leave this it is used by the test worker
Expand All @@ -42,6 +43,8 @@
from airflow.utils.state import State
from tests.test_utils import db

logger = logging.getLogger(__name__)


def _prepare_test_bodies():
if "CELERY_BROKER_URLS" in os.environ:
Expand Down Expand Up @@ -145,7 +148,15 @@ def fake_execute_command(command):
executor.task_publish_retries[key] = 1

executor._process_tasks(task_tuples_to_send)

for _ in range(20):
num_tasks = len(executor.tasks.keys())
if num_tasks == 2:
break
logger.info(
"Waiting 0.1 s for tasks to be processed asynchronously. Processed so far %d",
num_tasks,
)
sleep(0.4)
assert list(executor.tasks.keys()) == [
("success", "fake_simple_ti", execute_date, 0),
("fail", "fake_simple_ti", execute_date, 0),
Expand Down

0 comments on commit 729bb64

Please sign in to comment.