Skip to content

Commit

Permalink
Rewrite kerberos security integration and unit tests (apache#28092)
Browse files Browse the repository at this point in the history
  • Loading branch information
Taragolis authored Dec 5, 2022
1 parent d932406 commit e0bb6be
Showing 1 changed file with 101 additions and 127 deletions.
228 changes: 101 additions & 127 deletions tests/security/test_kerberos.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,98 +17,68 @@
# under the License.
from __future__ import annotations

import logging
import os
import shlex
import unittest
from argparse import Namespace
from contextlib import nullcontext
from unittest import mock

import pytest
from parameterized import parameterized

from airflow.security import kerberos
from airflow.security.kerberos import renew_from_kt
from tests.test_utils.config import conf_vars

KRB5_KTNAME = os.environ.get("KRB5_KTNAME")

@pytest.mark.integration("kerberos")
class TestKerberosIntegration:
@classmethod
def setup_class(cls):
assert "KRB5_KTNAME" in os.environ, "Missing KRB5_KTNAME environment variable"
cls.keytab = os.environ["KRB5_KTNAME"]

@unittest.skipIf(KRB5_KTNAME is None, "Skipping Kerberos API tests due to missing KRB5_KTNAME")
class TestKerberos(unittest.TestCase):
def setUp(self):
self.args = Namespace(
keytab=KRB5_KTNAME, principal=None, pid=None, daemon=None, stdout=None, stderr=None, log_file=None
)

@conf_vars({("kerberos", "keytab"): KRB5_KTNAME})
def test_renew_from_kt(self):
"""
We expect no result, but a successful run. No more TypeError
"""
assert renew_from_kt(principal=self.args.principal, keytab=self.args.keytab) is None

@conf_vars({("kerberos", "keytab"): KRB5_KTNAME, ("kerberos", "include_ip"): ""})
def test_renew_from_kt_include_ip_empty(self):
"""
We expect no result, but a successful run.
"""
assert renew_from_kt(principal=self.args.principal, keytab=self.args.keytab) is None

@conf_vars({("kerberos", "keytab"): KRB5_KTNAME, ("kerberos", "include_ip"): "False"})
def test_renew_from_kt_include_ip_false(self):
"""
We expect no result, but a successful run.
"""
assert renew_from_kt(principal=self.args.principal, keytab=self.args.keytab) is None

@conf_vars({("kerberos", "keytab"): KRB5_KTNAME, ("kerberos", "include_ip"): "True"})
def test_renew_from_kt_include_ip_true(self):
"""
We expect no result, but a successful run.
"""
assert renew_from_kt(principal=self.args.principal, keytab=self.args.keytab) is None

# Validate forwardable kerberos option
@conf_vars({("kerberos", "keytab"): KRB5_KTNAME, ("kerberos", "forwardable"): ""})
def test_renew_from_kt_forwardable_empty(self):
"""
We expect no result, but a successful run.
"""
assert renew_from_kt(principal=self.args.principal, keytab=self.args.keytab) is None

@conf_vars({("kerberos", "keytab"): KRB5_KTNAME, ("kerberos", "forwardable"): "False"})
def test_renew_from_kt_forwardable_false(self):
"""
We expect no result, but a successful run.
"""
assert renew_from_kt(principal=self.args.principal, keytab=self.args.keytab) is None

@conf_vars({("kerberos", "keytab"): KRB5_KTNAME, ("kerberos", "forwardable"): "True"})
def test_renew_from_kt_forwardable_true(self):
"""
We expect no result, but a successful run.
"""
assert renew_from_kt(principal=self.args.principal, keytab=self.args.keytab) is None

@conf_vars({("kerberos", "keytab"): ""})
def test_args_from_cli(self):
"""
We expect no result, but a run with sys.exit(1) because keytab not exist.
"""
with pytest.raises(SystemExit) as ctx:
renew_from_kt(principal=self.args.principal, keytab=self.args.keytab)

with self.assertLogs(kerberos.log) as log:
assert (
f"kinit: krb5_init_creds_set_keytab: Failed to find airflow@LUPUS.GRIDDYNAMICS.NET in "
f"keytab FILE:{self.args.keytab} (unknown enctype)" in log.output
)

assert ctx.value.code == 1

@pytest.mark.parametrize(
"kerberos_config",
[
pytest.param({}, id="default-config"),
pytest.param({("kerberos", "include_ip"): "True"}, id="explicit-include-ip"),
pytest.param({("kerberos", "include_ip"): "False"}, id="explicit-not-include-ip"),
pytest.param({("kerberos", "forwardable"): "True"}, id="explicit-forwardable"),
pytest.param({("kerberos", "forwardable"): "False"}, id="explicit-not-forwardable"),
],
)
def test_renew_from_kt(self, kerberos_config):
"""We expect return 0 (exit code) and successful run."""
with conf_vars(kerberos_config):
assert renew_from_kt(principal=None, keytab=self.keytab) == 0

class TestKerberosUnit(unittest.TestCase):
@parameterized.expand(
@pytest.mark.parametrize(
"exit_on_fail, expected_context",
[
pytest.param(True, pytest.raises(SystemExit), id="exit-on-fail"),
pytest.param(False, nullcontext(), id="return-code-of-fail"),
],
)
def test_args_from_cli(self, exit_on_fail, expected_context, caplog):
"""Test exit code if keytab not exist."""
keytab = "/not/exists/keytab"
result = None

with mock.patch.dict(os.environ, KRB5_KTNAME=keytab), conf_vars({("kerberos", "keytab"): keytab}):
with expected_context as ctx:
with caplog.at_level(logging.ERROR, logger=kerberos.log.name):
caplog.clear()
result = renew_from_kt(principal=None, keytab=keytab, exit_on_fail=exit_on_fail)

# If `exit_on_fail` set to True than exit code in exception, otherwise in function return
exit_code = ctx.value.code if exit_on_fail else result
assert exit_code == 1
assert caplog.record_tuples


class TestKerberos:
@pytest.mark.parametrize(
"kerberos_config, expected_cmd",
[
(
{("kerberos", "reinit_frequency"): "42"},
Expand Down Expand Up @@ -158,31 +128,27 @@ class TestKerberosUnit(unittest.TestCase):
"test-principal",
],
),
]
],
)
def test_renew_from_kt(self, kerberos_config, expected_cmd):
with self.assertLogs(kerberos.log) as log_ctx, conf_vars(kerberos_config), mock.patch(
"airflow.security.kerberos.subprocess"
) as mock_subprocess, mock.patch(
"airflow.security.kerberos.NEED_KRB181_WORKAROUND", None
), mock.patch(
"airflow.security.kerberos.open", mock.mock_open(read_data=b"X-CACHECONF:")
), mock.patch(
"time.sleep", return_value=None
):
@mock.patch("time.sleep", return_value=None)
@mock.patch("airflow.security.kerberos.open", mock.mock_open(read_data=b"X-CACHECONF:"))
@mock.patch("airflow.security.kerberos.NEED_KRB181_WORKAROUND", None)
@mock.patch("airflow.security.kerberos.subprocess")
def test_renew_from_kt(self, mock_subprocess, mock_sleep, kerberos_config, expected_cmd, caplog):
expected_cmd_text = " ".join(shlex.quote(f) for f in expected_cmd)

with conf_vars(kerberos_config), caplog.at_level(logging.INFO, logger=kerberos.log.name):
caplog.clear()
mock_subprocess.Popen.return_value.__enter__.return_value.returncode = 0
mock_subprocess.call.return_value = 0
renew_from_kt(principal="test-principal", keytab="keytab")

assert mock_subprocess.Popen.call_args[0][0] == expected_cmd

expected_cmd_text = " ".join(shlex.quote(f) for f in expected_cmd)
assert log_ctx.output == [
f"INFO:airflow.security.kerberos:Re-initialising kerberos from keytab: {expected_cmd_text}",
"INFO:airflow.security.kerberos:Renewing kerberos ticket to work around kerberos 1.8.1: "
"kinit -c /tmp/airflow_krb5_ccache -R",
assert caplog.messages == [
f"Re-initialising kerberos from keytab: {expected_cmd_text}",
"Renewing kerberos ticket to work around kerberos 1.8.1: kinit -c /tmp/airflow_krb5_ccache -R",
]

assert mock_subprocess.Popen.call_args[0][0] == expected_cmd
assert mock_subprocess.mock_calls == [
mock.call.Popen(
expected_cmd,
Expand All @@ -201,17 +167,17 @@ def test_renew_from_kt(self, kerberos_config, expected_cmd):
@mock.patch("airflow.security.kerberos.subprocess")
@mock.patch("airflow.security.kerberos.NEED_KRB181_WORKAROUND", None)
@mock.patch("airflow.security.kerberos.open", mock.mock_open(read_data=b""))
def test_renew_from_kt_without_workaround(self, mock_subprocess):
def test_renew_from_kt_without_workaround(self, mock_subprocess, caplog):
mock_subprocess.Popen.return_value.__enter__.return_value.returncode = 0
mock_subprocess.call.return_value = 0

with self.assertLogs(kerberos.log) as log_ctx:
with caplog.at_level(logging.INFO, logger=kerberos.log.name):
caplog.clear()
renew_from_kt(principal="test-principal", keytab="keytab")

assert log_ctx.output == [
"INFO:airflow.security.kerberos:Re-initialising kerberos from keytab: "
"kinit -f -a -r 3600m -k -t keytab -c /tmp/airflow_krb5_ccache test-principal"
]
assert caplog.messages == [
"Re-initialising kerberos from keytab: "
"kinit -f -a -r 3600m -k -t keytab -c /tmp/airflow_krb5_ccache test-principal"
]

assert mock_subprocess.mock_calls == [
mock.call.Popen(
Expand Down Expand Up @@ -241,21 +207,24 @@ def test_renew_from_kt_without_workaround(self, mock_subprocess):

@mock.patch("airflow.security.kerberos.subprocess")
@mock.patch("airflow.security.kerberos.NEED_KRB181_WORKAROUND", None)
def test_renew_from_kt_failed(self, mock_subprocess):
def test_renew_from_kt_failed(self, mock_subprocess, caplog):
mock_subp = mock_subprocess.Popen.return_value.__enter__.return_value
mock_subp.returncode = 1
mock_subp.stdout = mock.MagicMock(name="stdout", **{"readlines.return_value": ["STDOUT"]})
mock_subp.stderr = mock.MagicMock(name="stderr", **{"readlines.return_value": ["STDERR"]})

with self.assertLogs(kerberos.log) as log_ctx, self.assertRaises(SystemExit):
with pytest.raises(SystemExit) as ctx:
caplog.clear()
renew_from_kt(principal="test-principal", keytab="keytab")
assert ctx.value.code == 1

assert log_ctx.output == [
"INFO:airflow.security.kerberos:Re-initialising kerberos from keytab: "
log_records = [record for record in caplog.record_tuples if record[0] == kerberos.log.name]
assert len(log_records) == 2, log_records
assert [lr[1] for lr in log_records] == [logging.INFO, logging.ERROR]
assert [lr[2] for lr in log_records] == [
"Re-initialising kerberos from keytab: "
"kinit -f -a -r 3600m -k -t keytab -c /tmp/airflow_krb5_ccache test-principal",
"ERROR:airflow.security.kerberos:Couldn't reinit from keytab! `kinit' exited with 1.\n"
"STDOUT\n"
"STDERR",
"Couldn't reinit from keytab! `kinit' exited with 1.\nSTDOUT\nSTDERR",
]

assert mock_subprocess.mock_calls == [
Expand Down Expand Up @@ -289,22 +258,25 @@ def test_renew_from_kt_failed(self, mock_subprocess):
@mock.patch("airflow.security.kerberos.open", mock.mock_open(read_data=b"X-CACHECONF:"))
@mock.patch("airflow.security.kerberos.get_hostname", return_value="HOST")
@mock.patch("time.sleep", return_value=None)
def test_renew_from_kt_failed_workaround(self, mock_sleep, mock_getfqdn, mock_subprocess):
def test_renew_from_kt_failed_workaround(self, mock_sleep, mock_getfqdn, mock_subprocess, caplog):
mock_subprocess.Popen.return_value.__enter__.return_value.returncode = 0
mock_subprocess.call.return_value = 1

with self.assertLogs(kerberos.log) as log_ctx, self.assertRaises(SystemExit):
with pytest.raises(SystemExit) as ctx:
caplog.clear()
renew_from_kt(principal="test-principal", keytab="keytab")
assert ctx.value.code == 1

assert log_ctx.output == [
"INFO:airflow.security.kerberos:Re-initialising kerberos from keytab: "
log_records = [record for record in caplog.record_tuples if record[0] == kerberos.log.name]
assert len(log_records) == 3, log_records
assert [lr[1] for lr in log_records] == [logging.INFO, logging.INFO, logging.ERROR]
assert [lr[2] for lr in log_records] == [
"Re-initialising kerberos from keytab: "
"kinit -f -a -r 3600m -k -t keytab -c /tmp/airflow_krb5_ccache test-principal",
"INFO:airflow.security.kerberos:Renewing kerberos ticket to work around kerberos 1.8.1: "
"kinit -c /tmp/airflow_krb5_ccache -R",
"ERROR:airflow.security.kerberos:Couldn't renew kerberos ticket in order to work around "
"Renewing kerberos ticket to work around kerberos 1.8.1: kinit -c /tmp/airflow_krb5_ccache -R",
"Couldn't renew kerberos ticket in order to work around "
"Kerberos 1.8.1 issue. Please check that the ticket for 'test-principal/HOST' is still "
"renewable:\n"
" $ kinit -f -c /tmp/airflow_krb5_ccache\n"
"renewable:\n $ kinit -f -c /tmp/airflow_krb5_ccache\n"
"If the 'renew until' date is the same as the 'valid starting' date, the ticket cannot be "
"renewed. Please check your KDC configuration, and the ticket renewal policy (maxrenewlife) for "
"the 'test-principal/HOST' and `krbtgt' principals.",
Expand Down Expand Up @@ -337,19 +309,21 @@ def test_renew_from_kt_failed_workaround(self, mock_sleep, mock_getfqdn, mock_su
mock.call.call(["kinit", "-c", "/tmp/airflow_krb5_ccache", "-R"], close_fds=True),
]

def test_run_without_keytab(self):
with self.assertLogs(kerberos.log) as log_ctx, self.assertRaises(SystemExit):
kerberos.run(principal="test-principal", keytab=None)
assert log_ctx.output == [
"WARNING:airflow.security.kerberos:Keytab renewer not starting, no keytab configured"
]
def test_run_without_keytab(self, caplog):
with pytest.raises(SystemExit) as ctx:
with caplog.at_level(logging.WARNING, logger=kerberos.log.name):
caplog.clear()
kerberos.run(principal="test-principal", keytab=None)
assert ctx.value.code == 0
assert caplog.messages == ["Keytab renewer not starting, no keytab configured"]

@mock.patch("airflow.security.kerberos.renew_from_kt")
@mock.patch("time.sleep", return_value=None)
def test_run(self, mock_sleep, mock_renew_from_kt):
mock_renew_from_kt.side_effect = [1, 1, SystemExit(42)]
with self.assertRaises(SystemExit):
with pytest.raises(SystemExit) as ctx:
kerberos.run(principal="test-principal", keytab="/tmp/keytab")
assert ctx.value.code == 42
assert mock_renew_from_kt.mock_calls == [
mock.call("test-principal", "/tmp/keytab"),
mock.call("test-principal", "/tmp/keytab"),
Expand Down

0 comments on commit e0bb6be

Please sign in to comment.