diff --git a/tests/unit_tests/utils/test_core.py b/tests/unit_tests/utils/test_core.py index a8d5a2af29d5c..3e73637dad5b2 100644 --- a/tests/unit_tests/utils/test_core.py +++ b/tests/unit_tests/utils/test_core.py @@ -17,16 +17,19 @@ import os from dataclasses import dataclass from typing import Any, Optional -from unittest.mock import MagicMock +from unittest.mock import MagicMock, patch import pandas as pd import pytest +from sqlalchemy import CheckConstraint, Column, Integer, MetaData, Table from superset.exceptions import SupersetException from superset.utils.core import ( cast_to_boolean, check_is_safe_zip, DateColumn, + generic_find_constraint_name, + generic_find_fk_constraint_name, is_test, normalize_dttm_col, parse_boolean_string, @@ -258,3 +261,113 @@ def test_check_if_safe_zip_hidden_bomb(app_context: None) -> None: ] with pytest.raises(SupersetException): check_is_safe_zip(ZipFile) + + +def test_generic_constraint_name_exists(): + # Create a mock SQLAlchemy database object + database_mock = MagicMock() + + # Define the table name and constraint details + table_name = "my_table" + columns = {"column1", "column2"} + referenced_table_name = "other_table" + constraint_name = "my_constraint" + + # Create a mock table object with the same structure + table_mock = MagicMock() + table_mock.name = table_name + table_mock.columns = [MagicMock(name=col) for col in columns] + + # Create a mock for the referred_table with a name attribute + referred_table_mock = MagicMock() + referred_table_mock.name = referenced_table_name + + # Create a mock for the foreign key constraint with a name attribute + foreign_key_constraint_mock = MagicMock() + foreign_key_constraint_mock.name = constraint_name + foreign_key_constraint_mock.referred_table = referred_table_mock + foreign_key_constraint_mock.column_keys = list(columns) + + # Set the foreign key constraint mock as part of the table's constraints + table_mock.foreign_key_constraints = [foreign_key_constraint_mock] + + # Configure the autoload behavior for the database mock + database_mock.metadata = MagicMock() + database_mock.metadata.tables = {table_name: table_mock} + + # Mock the sa.Table creation with autoload + with patch("superset.utils.core.sa.Table") as table_creation_mock: + table_creation_mock.return_value = table_mock + + result = generic_find_constraint_name( + table_name, columns, referenced_table_name, database_mock + ) + + assert result == constraint_name + + +def test_generic_constraint_name_not_found(): + # Create a mock SQLAlchemy database object + database_mock = MagicMock() + + # Define the table name and constraint details + table_name = "my_table" + columns = {"column1", "column2"} + referenced_table_name = "other_table" + constraint_name = "my_constraint" + + # Create a mock table object with the same structure but no matching constraint + table_mock = MagicMock() + table_mock.name = table_name + table_mock.columns = [MagicMock(name=col) for col in columns] + table_mock.foreign_key_constraints = [] + + # Configure the autoload behavior for the database mock + database_mock.metadata = MagicMock() + database_mock.metadata.tables = {table_name: table_mock} + + result = generic_find_constraint_name( + table_name, columns, referenced_table_name, database_mock + ) + + assert result is None + + +def test_generic_find_fk_constraint_exists(): + insp_mock = MagicMock() + table_name = "my_table" + columns = {"column1", "column2"} + referenced_table_name = "other_table" + constraint_name = "my_constraint" + + # Create a mock for the foreign key constraint as a dictionary + constraint_mock = { + "name": constraint_name, + "referred_table": referenced_table_name, + "referred_columns": list(columns), + } + + # Configure the Inspector mock to return the list of foreign key constraints + insp_mock.get_foreign_keys.return_value = [constraint_mock] + + result = generic_find_fk_constraint_name( + table_name, columns, referenced_table_name, insp_mock + ) + + assert result == constraint_name + + +def test_generic_find_fk_constraint_none_exist(): + insp_mock = MagicMock() + table_name = "my_table" + columns = {"column1", "column2"} + referenced_table_name = "other_table" + + # Configure the Inspector mock to return the list of foreign key constraints + insp_mock.get_foreign_keys.return_value = [] + + result = generic_find_fk_constraint_name( + table_name, columns, referenced_table_name, insp_mock + ) + + assert result is None