diff --git a/tests/test_io.py b/tests/test_io.py index af63edafe..14ccf6410 100644 --- a/tests/test_io.py +++ b/tests/test_io.py @@ -3,8 +3,10 @@ import pytest +from cosmos.constants import _default_s3_conn from cosmos.exceptions import CosmosValueError from cosmos.io import ( + _configure_remote_target_path, _construct_dest_file_path, upload_artifacts_to_aws_s3, upload_artifacts_to_azure_wasb, @@ -110,3 +112,51 @@ def test_upload_artifacts_to_cloud_storage_success(dummy_kwargs): mock_configure.assert_called_once() assert mock_copy.call_count == 2 + + +@pytest.mark.skipif(not AIRFLOW_IO_AVAILABLE, reason="Airflow did not have Object Storage until the 2.8 release") +@patch("cosmos.io.remote_target_path") +def test_configure_remote_target_path_no_conn_id(mock_remote_target_path): + """Test when no remote_conn_id is provided, but conn_id is resolved from scheme.""" + mock_remote_target_path.return_value = "s3://bucket/path/to/file" + mock_storage_path = MagicMock() + with patch("cosmos.io.urlparse") as mock_urlparse: + mock_urlparse.return_value.scheme = "s3" + with patch("airflow.io.path.ObjectStoragePath") as mock_object_storage: + mock_object_storage.return_value = mock_storage_path + mock_storage_path.exists.return_value = True + + result = _configure_remote_target_path() + assert result == (mock_object_storage.return_value, _default_s3_conn) + + +@pytest.mark.skipif(not AIRFLOW_IO_AVAILABLE, reason="Airflow did not have Object Storage until the 2.8 release") +@patch("cosmos.io.remote_target_path") +def test_configure_remote_target_path_conn_id_is_none(mock_remote_target_path): + """Test when conn_id cannot be resolved and is None.""" + mock_remote_target_path.return_value = "abcd://bucket/path/to/file" + mock_storage_path = MagicMock() + with patch("cosmos.io.urlparse") as mock_urlparse: + mock_urlparse.return_value.scheme = "abcd" + with patch("airflow.io.path.ObjectStoragePath") as mock_object_storage: + mock_object_storage.return_value = mock_storage_path + mock_storage_path.exists.return_value = True + + result = _configure_remote_target_path() + assert result == (None, None) + + +@patch("cosmos.settings.AIRFLOW_IO_AVAILABLE", False) +@patch("cosmos.io.remote_target_path") +def test_configure_remote_target_path_airflow_io_unavailable(mock_remote_target_path): + """Test when AIRFLOW_IO_AVAILABLE is False.""" + mock_remote_target_path.return_value = "s3://bucket/path/to/file" + mock_storage_path = MagicMock() + with patch("cosmos.io.urlparse") as mock_urlparse: + mock_urlparse.return_value.scheme = "s3" + with patch("airflow.io.path.ObjectStoragePath") as mock_object_storage: + mock_object_storage.return_value = mock_storage_path + mock_storage_path.exists.return_value = True + with pytest.raises(CosmosValueError) as exc_info: + _configure_remote_target_path() + assert "Object Storage feature is unavailable" in str(exc_info.value)