Skip to content

Commit

Permalink
Validate DAG owner to be a string (#23359)
Browse files Browse the repository at this point in the history
non-string values raise `AttributeError` as `task.owner.lower` is called with `task.owner` not being a string and the error is not passed as import error failing silently. Raise explicit error will be helpful to the user.

closes: #23343
related: #23343
  • Loading branch information
tirkarthi authored May 2, 2022
1 parent 9a0080c commit c4887bc
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 0 deletions.
3 changes: 3 additions & 0 deletions tests/cluster_policies/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@

# [START example_cluster_policy_rule]
def task_must_have_owners(task: BaseOperator):
if task.owner and not isinstance(task.owner, str):
raise AirflowClusterPolicyViolation(f'''owner should be a string. Current value: {task.owner!r}''')

if not task.owner or task.owner.lower() == conf.get('operators', 'default_owner'):
raise AirflowClusterPolicyViolation(
f'''Task must have non-None non-default owner. Current value: {task.owner}'''
Expand Down
34 changes: 34 additions & 0 deletions tests/dags_corrupted/test_nonstring_owner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from datetime import datetime, timedelta

from airflow import DAG
from airflow.operators.empty import EmptyOperator

with DAG(
dag_id="test_nonstring_owner",
schedule_interval="0 0 * * *",
start_date=datetime(2022, 1, 1),
dagrun_timeout=timedelta(minutes=60),
tags=["example"],
default_args={'owner': ['a']},
) as dag:
run_this_last = EmptyOperator(
task_id="test_task",
)
21 changes: 21 additions & 0 deletions tests/models/test_dagbag.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import inspect
import logging
import os
import pathlib
import shutil
import sys
import textwrap
Expand Down Expand Up @@ -979,6 +980,26 @@ def test_task_cluster_policy_violation(self):
}
assert expected_import_errors == dagbag.import_errors

@patch("airflow.settings.task_policy", cluster_policies.cluster_policy)
def test_task_cluster_policy_nonstring_owner(self):
"""
test that file processing results in import error when task does not
obey cluster policy and has owner whose type is not string.
"""
TEST_DAGS_CORRUPTED_FOLDER = pathlib.Path(__file__).parent.with_name('dags_corrupted')
dag_file = os.path.join(TEST_DAGS_CORRUPTED_FOLDER, "test_nonstring_owner.py")

dagbag = DagBag(dag_folder=dag_file, include_smart_sensor=False, include_examples=False)
assert set() == set(dagbag.dag_ids)
expected_import_errors = {
dag_file: (
f"""DAG policy violation (DAG ID: test_nonstring_owner, Path: {dag_file}):\n"""
"""Notices:\n"""
""" * owner should be a string. Current value: ['a']"""
)
}
assert expected_import_errors == dagbag.import_errors

@patch("airflow.settings.task_policy", cluster_policies.cluster_policy)
def test_task_cluster_policy_obeyed(self):
"""
Expand Down

0 comments on commit c4887bc

Please sign in to comment.