Skip to content

Commit

Permalink
Removes limitations from Dask dependencies (apache#22017)
Browse files Browse the repository at this point in the history
Dask dependencies were holding us back - when it comes to upgrading
somoe of the packages (for example apache-beam and looker - in google
provider). This PR removes the limitations but with a twist.

* Dask tests stop working. We reach out to the Dask Team to fix them
  but since a very old version of `distributed` library was used
  the Dask team is called for help to fix those

* The typing-extensions library was limited by `distributed` but it
  seems that version 4.0.0+ breaks kubernetes tests
  • Loading branch information
potiuk authored Mar 6, 2022
1 parent 7acc190 commit 9be3c50
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 9 deletions.
3 changes: 2 additions & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,8 @@ install_requires =
termcolor>=1.1.0
# typing-extensions can be removed under two scenarios: dropping support for python 3.7
# or bumping the minimum version of airflow for providers to 2.2.* which would allow the use of airflow.typing_compat
typing-extensions>=3.7.4
# Kubernetes Tests also rely on typing-extensions < 4.0.0 - fixing the tests should allow to remove the upperbound
typing-extensions>=3.7.4,<4.0.0
unicodecsv>=0.14.1
# Werkzeug is known to cause breaking changes and it is very closely tied with FlaskAppBuilder and other
# Flask dependencies and the limit to 1.* line should be reviewed when we upgrade Flask and remove
Expand Down
7 changes: 3 additions & 4 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,10 +251,9 @@ def write_version(filename: str = os.path.join(*[my_dir, "airflow", "git_version
dask = [
# Dask support is limited, we need Dask team to upgrade support for dask if we were to continue
# Supporting it in the future
# TODO: upgrade libraries used or maybe deprecate and drop DASK support
'cloudpickle>=1.4.1, <1.5.0',
'dask>=2.9.0, <2021.6.1', # dask 2021.6.1 does not work with `distributed`
'distributed>=2.11.1, <2.20',
'cloudpickle>=1.4.1',
'dask>=2.9.0',
'distributed>=2.11.1',
]
databricks = [
'requests>=2.26.0, <3',
Expand Down
18 changes: 14 additions & 4 deletions tests/executors/test_dask_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,31 +20,39 @@
from unittest import mock

import pytest
from distributed import LocalCluster

from airflow.exceptions import AirflowException
from airflow.executors.dask_executor import DaskExecutor
from airflow.jobs.backfill_job import BackfillJob
from airflow.models import DagBag
from airflow.utils import timezone
from tests.test_utils.config import conf_vars

try:
from distributed import LocalCluster

# utility functions imported from the dask testing suite to instantiate a test
# cluster for tls tests
from distributed import tests # noqa
from distributed.utils_test import cluster as dask_testing_cluster, get_cert, tls_security

from airflow.executors.dask_executor import DaskExecutor

skip_tls_tests = False
except ImportError:
skip_tls_tests = True
# In case the tests are skipped because of lacking test harness, get_cert should be
# overridden to avoid get_cert failing during test discovery as get_cert is used
# in conf_vars decorator
get_cert = lambda x: x

DEFAULT_DATE = timezone.datetime(2017, 1, 1)
SUCCESS_COMMAND = ['airflow', 'tasks', 'run', '--help']
FAIL_COMMAND = ['airflow', 'tasks', 'run', 'false']

# For now we are temporarily removing Dask support until we get Dask Team help us in making the
# tests pass again
skip_dask_tests = True


@pytest.mark.skipif(skip_dask_tests, reason="The tests are skipped because it needs testing from Dask team")
class TestBaseDask(unittest.TestCase):
def assert_tasks_on_executor(self, executor, timeout_executor=120):

Expand Down Expand Up @@ -75,6 +83,7 @@ def assert_tasks_on_executor(self, executor, timeout_executor=120):
assert fail_future.exception() is not None


@pytest.mark.skipif(skip_dask_tests, reason="The tests are skipped because it needs testing from Dask team")
class TestDaskExecutor(TestBaseDask):
def setUp(self):
self.dagbag = DagBag(include_examples=True)
Expand Down Expand Up @@ -148,6 +157,7 @@ def test_gauge_executor_metrics(self, mock_stats_gauge, mock_trigger_tasks, mock
mock_stats_gauge.assert_has_calls(calls)


@pytest.mark.skipif(skip_dask_tests, reason="The tests are skipped because it needs testing from Dask team")
class TestDaskExecutorQueue(unittest.TestCase):
def test_dask_queues_no_resources(self):
self.cluster = LocalCluster()
Expand Down

0 comments on commit 9be3c50

Please sign in to comment.