diff --git a/cosmos/log.py b/cosmos/log.py
index f7c512f17..3294ac5b7 100644
--- a/cosmos/log.py
+++ b/cosmos/log.py
@@ -2,9 +2,10 @@
import logging
-from airflow.configuration import conf
from airflow.utils.log.colored_log import CustomTTYColoredFormatter
+from cosmos.settings import propagate_logs
+
LOG_FORMAT: str = (
"[%(blue)s%(asctime)s%(reset)s] "
"{%(blue)s%(filename)s:%(reset)s%(lineno)d} "
@@ -24,13 +25,10 @@ def get_logger(name: str | None = None) -> logging.Logger:
By using this logger, we introduce a (yellow) astronomer-cosmos string into the project's log messages:
[2023-08-09T14:20:55.532+0100] {subprocess.py:94} INFO - (astronomer-cosmos) - 13:20:55 Completed successfully
"""
- propagateLogs: bool = True
- if conf.has_option("cosmos", "propagate_logs"):
- propagateLogs = conf.getboolean("cosmos", "propagate_logs")
logger = logging.getLogger(name)
formatter: logging.Formatter = CustomTTYColoredFormatter(fmt=LOG_FORMAT) # type: ignore
handler = logging.StreamHandler()
handler.setFormatter(formatter)
logger.addHandler(handler)
- logger.propagate = propagateLogs
+ logger.propagate = propagate_logs
return logger
diff --git a/cosmos/operators/local.py b/cosmos/operators/local.py
index 760f48290..ca255c7d8 100644
--- a/cosmos/operators/local.py
+++ b/cosmos/operators/local.py
@@ -8,10 +8,8 @@
from pathlib import Path
from typing import TYPE_CHECKING, Any, Callable, Literal, Sequence
-import airflow
import jinja2
from airflow import DAG
-from airflow.configuration import conf
from airflow.exceptions import AirflowException, AirflowSkipException
from airflow.models.taskinstance import TaskInstance
from airflow.utils.context import Context
@@ -22,6 +20,7 @@
from cosmos.constants import InvocationMode
from cosmos.dbt.project import get_partial_parse_path
from cosmos.exceptions import AirflowCompatibilityError
+from cosmos.settings import LINEAGE_NAMESPACE
try:
from airflow.datasets import Dataset
@@ -41,7 +40,6 @@
from cosmos.config import ProfileConfig
from cosmos.constants import (
- DEFAULT_OPENLINEAGE_NAMESPACE,
OPENLINEAGE_PRODUCER,
)
from cosmos.dbt.parser.output import (
@@ -92,12 +90,6 @@ class OperatorLineage: # type: ignore
job_facets: dict[str, str] = dict()
-try:
- LINEAGE_NAMESPACE = conf.get("openlineage", "namespace")
-except airflow.exceptions.AirflowConfigException:
- LINEAGE_NAMESPACE = os.getenv("OPENLINEAGE_NAMESPACE", DEFAULT_OPENLINEAGE_NAMESPACE)
-
-
class DbtLocalBaseOperator(AbstractDbtBaseOperator):
"""
Executes a dbt core cli command locally.
diff --git a/cosmos/plugin/__init__.py b/cosmos/plugin/__init__.py
index b82b29f5c..d05e15dd6 100644
--- a/cosmos/plugin/__init__.py
+++ b/cosmos/plugin/__init__.py
@@ -2,7 +2,6 @@
from typing import Any, Dict, Optional, Tuple
from urllib.parse import urlsplit
-from airflow.configuration import conf
from airflow.plugins_manager import AirflowPlugin
from airflow.security import permissions
from airflow.www.auth import has_access
@@ -10,6 +9,8 @@
from flask import abort, url_for
from flask_appbuilder import AppBuilder, expose
+from cosmos.settings import dbt_docs_conn_id, dbt_docs_dir
+
def bucket_and_key(path: str) -> Tuple[str, str]:
parsed_url = urlsplit(path)
@@ -69,16 +70,14 @@ def open_http_file(conn_id: Optional[str], path: str) -> str:
def open_file(path: str) -> str:
"""Retrieve a file from http, https, gs, s3, or wasb."""
- conn_id: Optional[str] = conf.get("cosmos", "dbt_docs_conn_id", fallback=None)
-
if path.strip().startswith("s3://"):
- return open_s3_file(conn_id=conn_id, path=path)
+ return open_s3_file(conn_id=dbt_docs_conn_id, path=path)
elif path.strip().startswith("gs://"):
- return open_gcs_file(conn_id=conn_id, path=path)
+ return open_gcs_file(conn_id=dbt_docs_conn_id, path=path)
elif path.strip().startswith("wasb://"):
- return open_azure_file(conn_id=conn_id, path=path)
+ return open_azure_file(conn_id=dbt_docs_conn_id, path=path)
elif path.strip().startswith("http://") or path.strip().startswith("https://"):
- return open_http_file(conn_id=conn_id, path=path)
+ return open_http_file(conn_id=dbt_docs_conn_id, path=path)
else:
with open(path) as f:
content = f.read()
@@ -159,17 +158,16 @@ def create_blueprint(
@expose("/dbt_docs") # type: ignore[misc]
@has_access([(permissions.ACTION_CAN_READ, permissions.RESOURCE_WEBSITE)])
def dbt_docs(self) -> str:
- if conf.get("cosmos", "dbt_docs_dir", fallback=None) is None:
+ if dbt_docs_dir is None:
return self.render_template("dbt_docs_not_set_up.html") # type: ignore[no-any-return,no-untyped-call]
return self.render_template("dbt_docs.html") # type: ignore[no-any-return,no-untyped-call]
@expose("/dbt_docs_index.html") # type: ignore[misc]
@has_access([(permissions.ACTION_CAN_READ, permissions.RESOURCE_WEBSITE)])
def dbt_docs_index(self) -> str:
- docs_dir = conf.get("cosmos", "dbt_docs_dir", fallback=None)
- if docs_dir is None:
+ if dbt_docs_dir is None:
abort(404)
- html = open_file(op.join(docs_dir, "index.html"))
+ html = open_file(op.join(dbt_docs_dir, "index.html"))
# Hack the dbt docs to render properly in an iframe
iframe_resizer_url = url_for(".static", filename="iframeResizer.contentWindow.min.js")
html = html.replace("", f'{iframe_script}', 1)
@@ -178,19 +176,17 @@ def dbt_docs_index(self) -> str:
@expose("/catalog.json") # type: ignore[misc]
@has_access([(permissions.ACTION_CAN_READ, permissions.RESOURCE_WEBSITE)])
def catalog(self) -> Tuple[str, int, Dict[str, Any]]:
- docs_dir = conf.get("cosmos", "dbt_docs_dir", fallback=None)
- if docs_dir is None:
+ if dbt_docs_dir is None:
abort(404)
- data = open_file(op.join(docs_dir, "catalog.json"))
+ data = open_file(op.join(dbt_docs_dir, "catalog.json"))
return data, 200, {"Content-Type": "application/json"}
@expose("/manifest.json") # type: ignore[misc]
@has_access([(permissions.ACTION_CAN_READ, permissions.RESOURCE_WEBSITE)])
def manifest(self) -> Tuple[str, int, Dict[str, Any]]:
- docs_dir = conf.get("cosmos", "dbt_docs_dir", fallback=None)
- if docs_dir is None:
+ if dbt_docs_dir is None:
abort(404)
- data = open_file(op.join(docs_dir, "manifest.json"))
+ data = open_file(op.join(dbt_docs_dir, "manifest.json"))
return data, 200, {"Content-Type": "application/json"}
diff --git a/cosmos/settings.py b/cosmos/settings.py
index 35e235edc..44a08fd48 100644
--- a/cosmos/settings.py
+++ b/cosmos/settings.py
@@ -1,11 +1,21 @@
+import os
import tempfile
from pathlib import Path
+import airflow
from airflow.configuration import conf
-from cosmos.constants import DEFAULT_COSMOS_CACHE_DIR_NAME
+from cosmos.constants import DEFAULT_COSMOS_CACHE_DIR_NAME, DEFAULT_OPENLINEAGE_NAMESPACE
# In MacOS users may want to set the envvar `TMPDIR` if they do not want the value of the temp directory to change
DEFAULT_CACHE_DIR = Path(tempfile.gettempdir(), DEFAULT_COSMOS_CACHE_DIR_NAME)
cache_dir = Path(conf.get("cosmos", "cache_dir", fallback=DEFAULT_CACHE_DIR) or DEFAULT_CACHE_DIR)
enable_cache = conf.get("cosmos", "enable_cache", fallback=True)
+propagate_logs = conf.getboolean("cosmos", "propagate_logs", fallback=True)
+dbt_docs_dir = conf.get("cosmos", "dbt_docs_dir", fallback=None)
+dbt_docs_conn_id = conf.get("cosmos", "dbt_docs_conn_id", fallback=None)
+
+try:
+ LINEAGE_NAMESPACE = conf.get("openlineage", "namespace")
+except airflow.exceptions.AirflowConfigException:
+ LINEAGE_NAMESPACE = os.getenv("OPENLINEAGE_NAMESPACE", DEFAULT_OPENLINEAGE_NAMESPACE)
diff --git a/docs/configuration/cosmos-conf.rst b/docs/configuration/cosmos-conf.rst
new file mode 100644
index 000000000..1d334884f
--- /dev/null
+++ b/docs/configuration/cosmos-conf.rst
@@ -0,0 +1,79 @@
+Cosmos Config
+=============
+
+This page lists all available Airflow configurations that affect ``astronomer-cosmos`` Astronomer Cosmos behavior. They can be set in the ``airflow.cfg file`` or using environment variables.
+
+.. note::
+ For more information, see `Setting Configuration Options `_.
+
+**Sections:**
+
+- [cosmos]
+- [openlineage]
+
+[cosmos]
+~~~~~~~~
+
+.. _cache_dir:
+
+`cache_dir`_:
+ The directory used for caching Cosmos data.
+
+ - Default: ``{TMPDIR}/cosmos_cache`` (where ``{TMPDIR}`` is the system temporary directory)
+ - Environment Variable: ``AIRFLOW__COSMOS__CACHE_DIR``
+
+.. _enable_cache:
+
+`enable_cache`_:
+ Enable or disable caching of Cosmos data.
+
+ - Default: ``True``
+ - Environment Variable: ``AIRFLOW__COSMOS__ENABLE_CACHE``
+
+.. _propagate_logs:
+
+`propagate_logs`_:
+ Whether to propagate logs in the Cosmos module.
+
+ - Default: ``True``
+ - Environment Variable: ``AIRFLOW__COSMOS__PROPAGATE_LOGS``
+
+.. _dbt_docs_dir:
+
+`dbt_docs_dir`_:
+ The directory path for dbt documentation.
+
+ - Default: ``None``
+ - Environment Variable: ``AIRFLOW__COSMOS__DBT_DOCS_DIR``
+
+.. _dbt_docs_conn_id:
+
+`dbt_docs_conn_id`_:
+ The connection ID for dbt documentation.
+
+ - Default: ``None``
+ - Environment Variable: ``AIRFLOW__COSMOS__DBT_DOCS_CONN_ID``
+
+[openlineage]
+~~~~~~~~~~~~~
+
+.. _namespace:
+
+`namespace`_:
+ The OpenLineage namespace for tracking lineage.
+
+ - Default: If not configured in Airflow configuration, it falls back to the environment variable ``OPENLINEAGE_NAMESPACE``, otherwise it uses ``DEFAULT_OPENLINEAGE_NAMESPACE``.
+ - Environment Variable: ``AIRFLOW__OPENLINEAGE__NAMESPACE``
+
+.. note::
+ For more information, see `Openlieage Configuration Options `_.
+
+Environment Variables
+~~~~~~~~~~~~~~~~~~~~~
+
+.. _LINEAGE_NAMESPACE:
+
+`LINEAGE_NAMESPACE`_:
+ The OpenLineage namespace for tracking lineage.
+
+ - Default: If not configured in Airflow configuration, it falls back to the environment variable ``OPENLINEAGE_NAMESPACE``, otherwise it uses ``DEFAULT_OPENLINEAGE_NAMESPACE``.
diff --git a/docs/configuration/index.rst b/docs/configuration/index.rst
index ec69c1f52..fc34b993e 100644
--- a/docs/configuration/index.rst
+++ b/docs/configuration/index.rst
@@ -14,6 +14,7 @@ Cosmos offers a number of configuration options to customize its behavior. For m
Render Config
Parsing Methods
+ Configuring in Airflow
Configuring Lineage
Generating Docs
Hosting Docs
diff --git a/tests/plugin/test_plugin.py b/tests/plugin/test_plugin.py
index 25fcacbda..796bfff8d 100644
--- a/tests/plugin/test_plugin.py
+++ b/tests/plugin/test_plugin.py
@@ -71,6 +71,7 @@ def conf_get(section, key, *args, **kwargs):
return original_conf_get(section, key, *args, **kwargs)
monkeypatch.setattr(conf, "get", conf_get)
+ monkeypatch.setattr("cosmos.plugin.dbt_docs_dir", "path/to/docs/dir")
response = app.get("/cosmos/dbt_docs")
@@ -96,6 +97,7 @@ def conf_get(section, key, *args, **kwargs):
return original_conf_get(section, key, *args, **kwargs)
monkeypatch.setattr(conf, "get", conf_get)
+ monkeypatch.setattr("cosmos.plugin.dbt_docs_dir", "path/to/docs/dir")
if artifact == "dbt_docs_index.html":
mock_open_file.return_value = ""
@@ -134,6 +136,7 @@ def conf_get(section, key, *args, **kwargs):
return original_conf_get(section, key, *args, **kwargs)
monkeypatch.setattr(conf, "get", conf_get)
+ monkeypatch.setattr("cosmos.plugin.dbt_docs_conn_id", "mock_conn_id")
with patch.object(cosmos.plugin, open_file_callback) as mock_callback:
mock_callback.return_value = "mock file contents"
diff --git a/tests/test_log.py b/tests/test_log.py
index c110c1918..75676d1ec 100644
--- a/tests/test_log.py
+++ b/tests/test_log.py
@@ -1,7 +1,5 @@
import logging
-from airflow.configuration import conf
-
from cosmos import get_provider_info
from cosmos.log import get_logger
@@ -17,10 +15,8 @@ def test_get_logger():
assert custom_string in custom_logger.handlers[0].formatter._fmt
-def test_propagate_logs_conf():
- if not conf.has_section("cosmos"):
- conf.add_section("cosmos")
- conf.set("cosmos", "propagate_logs", "False")
+def test_propagate_logs_conf(monkeypatch):
+ monkeypatch.setattr("cosmos.log.propagate_logs", False)
custom_logger = get_logger("cosmos-log")
assert custom_logger.propagate is False