diff --git a/tests/cluster_policies/__init__.py b/tests/cluster_policies/__init__.py index 989430626f9a2..bab74cc5d7a5f 100644 --- a/tests/cluster_policies/__init__.py +++ b/tests/cluster_policies/__init__.py @@ -27,6 +27,9 @@ # [START example_cluster_policy_rule] def task_must_have_owners(task: BaseOperator): + if task.owner and not isinstance(task.owner, str): + raise AirflowClusterPolicyViolation(f'''owner should be a string. Current value: {task.owner!r}''') + if not task.owner or task.owner.lower() == conf.get('operators', 'default_owner'): raise AirflowClusterPolicyViolation( f'''Task must have non-None non-default owner. Current value: {task.owner}''' diff --git a/tests/dags_corrupted/test_nonstring_owner.py b/tests/dags_corrupted/test_nonstring_owner.py new file mode 100644 index 0000000000000..ace1e4ce0a8f1 --- /dev/null +++ b/tests/dags_corrupted/test_nonstring_owner.py @@ -0,0 +1,34 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from datetime import datetime, timedelta + +from airflow import DAG +from airflow.operators.empty import EmptyOperator + +with DAG( + dag_id="test_nonstring_owner", + schedule_interval="0 0 * * *", + start_date=datetime(2022, 1, 1), + dagrun_timeout=timedelta(minutes=60), + tags=["example"], + default_args={'owner': ['a']}, +) as dag: + run_this_last = EmptyOperator( + task_id="test_task", + ) diff --git a/tests/models/test_dagbag.py b/tests/models/test_dagbag.py index 8017be66f044d..8ea7197a62579 100644 --- a/tests/models/test_dagbag.py +++ b/tests/models/test_dagbag.py @@ -17,6 +17,7 @@ import inspect import logging import os +import pathlib import shutil import sys import textwrap @@ -979,6 +980,26 @@ def test_task_cluster_policy_violation(self): } assert expected_import_errors == dagbag.import_errors + @patch("airflow.settings.task_policy", cluster_policies.cluster_policy) + def test_task_cluster_policy_nonstring_owner(self): + """ + test that file processing results in import error when task does not + obey cluster policy and has owner whose type is not string. + """ + TEST_DAGS_CORRUPTED_FOLDER = pathlib.Path(__file__).parent.with_name('dags_corrupted') + dag_file = os.path.join(TEST_DAGS_CORRUPTED_FOLDER, "test_nonstring_owner.py") + + dagbag = DagBag(dag_folder=dag_file, include_smart_sensor=False, include_examples=False) + assert set() == set(dagbag.dag_ids) + expected_import_errors = { + dag_file: ( + f"""DAG policy violation (DAG ID: test_nonstring_owner, Path: {dag_file}):\n""" + """Notices:\n""" + """ * owner should be a string. Current value: ['a']""" + ) + } + assert expected_import_errors == dagbag.import_errors + @patch("airflow.settings.task_policy", cluster_policies.cluster_policy) def test_task_cluster_policy_obeyed(self): """