From 8321cd88475b774417f3db54ba9795b1e23c55a9 Mon Sep 17 00:00:00 2001 From: Adam Sachs Date: Fri, 4 Oct 2024 00:17:16 -0400 Subject: [PATCH] PROD-2734 - initial partitioning support (user-defined windows) (#5325) --- CHANGELOG.md | 1 + .../dataset/bigquery_example_test_dataset.yml | 16 +++ requirements.txt | 2 +- src/fides/api/graph/config.py | 87 ++++++++++++++ src/fides/api/models/datasetconfig.py | 5 + .../api/service/connectors/query_config.py | 109 ++++++++++++++++-- .../api/service/connectors/sql_connector.py | 73 +++++++++++- tests/fixtures/bigquery_fixtures.py | 93 +++++++++++++++ .../v1/endpoints/test_dataset_endpoints.py | 2 +- tests/ops/graph/test_config.py | 78 +++++++++++++ .../test_external_database_connections.py | 1 + tests/ops/models/test_datasetconfig.py | 84 ++++++++++++++ .../connectors/test_bigquery_connector.py | 107 +++++++++++++++++ .../test_request_runner_service.py | 7 ++ tests/ops/task/test_create_request_tasks.py | 3 + 15 files changed, 649 insertions(+), 19 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index af6a1187f2..471d2e38fa 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,7 @@ The types of changes are: ### Added - Make all "Description" table columns expandable in Admin UI tables [#5340](https://github.com/ethyca/fides/pull/5340) +- Initial support for DSR requests against partitioned BigQuery tables [#5325](https://github.com/ethyca/fides/pull/5325) - Added new RDS MySQL Connector [#5343](https://github.com/ethyca/fides/pull/5343) ### Added diff --git a/data/dataset/bigquery_example_test_dataset.yml b/data/dataset/bigquery_example_test_dataset.yml index d34f7e6543..46b76a4a31 100644 --- a/data/dataset/bigquery_example_test_dataset.yml +++ b/data/dataset/bigquery_example_test_dataset.yml @@ -230,3 +230,19 @@ dataset: data_type: string - name: last_visit data_categories: [system.operations] + - name: visit_partitioned + fides_meta: + partitioning: + where_clauses: [ + "`last_visit` > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 500 DAY) AND `last_visit` <= CURRENT_TIMESTAMP()", + "`last_visit` > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 1000 DAY) AND `last_visit` <= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 500 DAY)", + "`last_visit` <= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 1000 DAY)", + ] + fields: + - name: email + data_categories: [user.contact.email] + fides_meta: + identity: email + data_type: string + - name: last_visit + data_categories: [system.operations] diff --git a/requirements.txt b/requirements.txt index 286e83b9d8..68fc3bae46 100644 --- a/requirements.txt +++ b/requirements.txt @@ -16,7 +16,7 @@ types-defusedxml==0.7.0.20240218 expandvars==0.9.0 fastapi[all]==0.111.0 fastapi-pagination[sqlalchemy]==0.12.25 -fideslang==3.0.6 +fideslang==3.0.7 fideslog==1.2.10 firebase-admin==5.3.0 GitPython==3.1.41 diff --git a/src/fides/api/graph/config.py b/src/fides/api/graph/config.py index ad9bdf090d..2cac02a79f 100644 --- a/src/fides/api/graph/config.py +++ b/src/fides/api/graph/config.py @@ -82,6 +82,7 @@ from abc import ABC, abstractmethod from collections import defaultdict from dataclasses import dataclass +from re import match, search from typing import Any, Callable, Dict, List, Literal, Optional, Set, Tuple, Union from fideslang.models import MaskingStrategyOverride @@ -443,6 +444,22 @@ class MaskingOverride: length: Optional[int] +# for now, we only support BQ partitioning, so the clause pattern we expect is BQ-specific +BIGQUERY_PARTITION_CLAUSE_PATTERN = r"^`(?P[a-zA-Z0-9_]*)` ([<|>][=]?) (?P[a-zA-Z0-9_\s(),\.\"\']*)(\sAND `(?P[a-zA-Z0-9_]*)` ([<|>][=]?) (?P[a-zA-Z0-9_\s(),\.\"\']*))?$" +# protected keywords that are _not_ allowed in the operands, to avoid any potential malicious execution. +PROHIBITED_KEYWORDS = [ + "UNION", + "INSERT", + "UPDATE", + "CREATE", + "DROP", + "SELECT", + "CHAR", + "HAVING", + "EXEC", +] + + class Collection(BaseModel): """A single grouping of individual data points that are accessed together""" @@ -456,6 +473,7 @@ class Collection(BaseModel): grouped_inputs: Set[str] = set() data_categories: Set[FidesKey] = set() masking_strategy_override: Optional[MaskingStrategyOverride] = None + partitioning: Optional[Dict] = None @property def field_dict(self) -> Dict[FieldPath, Field]: @@ -613,6 +631,7 @@ def build_field(serialized_field: dict) -> Field: CollectionAddress.from_string(addr_string) for addr_string in data.get("erase_after", []) } + data["partitioning"] = data.get("partitioning") return Collection.model_validate(data) @@ -647,6 +666,74 @@ def serialize_erase_after(self, erase_after: Set[CollectionAddress]) -> Set[str] }, ) + @field_validator("partitioning") + @classmethod + def validate_partitioning( + cls, partitioning: Optional[Dict[str, Any]] + ) -> Optional[Dict[str, Any]]: + """ + Validates the `partitioning` dict field. + + The `partitioning` dict field is untyped in Fideslang, but here we enforce + that it has the required and expected `where_clauses` key, whose value must be + a list of strings. + + The string values are validated to ensure they match the expected syntax, which + is strictly prescribed. The string values MUST be a valid SQL clause that defines + a partition window, with the form: + + ``` + `column_1` >(=) [some value] AND `column_1` <(=) [some value] + ``` + + To be clear, some notable constraints on the input: + - the clause string must begin by referencing a column name wrapped by backticks (`) + - the clause string must compare that first column with a `<>(=)` operator, and may + include at most one other conditional with a `<>(=)` operator that's joined to the first + conditional via an AND operator + - if the clause string contains a second conditional, it must reference the same column name + as the first conditional, also wrapped by backticks + - column names (wrapped by backticks) must always be on the _left_ side of the `<>(=)`operator + in its conditional + + """ + if not partitioning: + return partitioning + + # NOTE: when we deprecate `where_clause` partitioning in favor of a more proper partitioning DSL, + # we should be sure to still support the existing `where_clause` partition definition on + # any in-progress DSRs so that they can run through to completion. + if where_clauses := partitioning.get("where_clauses"): + if not isinstance(where_clauses, List) or not all( + isinstance(where_clause, str) for where_clause in where_clauses + ): + raise ValueError("`where_clauses` must be a list of strings!") + for partition_clause in where_clauses: + if matching := match( + BIGQUERY_PARTITION_CLAUSE_PATTERN, partition_clause + ): + # check that if there are two field comparison sub-clauses, they reference the same field, e.g.: + # "`my_field_1` > 5 AND `my_field_1` <= 10", not "`my_field_1` > 5 AND `my_field_1` <= 10" + if matching["field_2"] is not None and ( + matching["field_1"] != matching["field_2"] + ): + raise ValueError( + f"Partition clause must have matching fields. Identified non-matching field references '{matching['field_1']}' and '{matching['field_2']}" + ) + + for prohibited_keyword in PROHIBITED_KEYWORDS: + search_str = prohibited_keyword.lower() + r"\s" + if search(search_str, partition_clause.lower()): + raise ValueError( + "Prohibited keyword referenced in partition clause" + ) + else: + raise ValueError("Unsupported partition clause format") + return partitioning + raise ValueError( + "`where_clauses` must be specified in `partitioning` specification!" + ) + class GraphDataset(BaseModel): """Master collection of collections that are accessed in a common way""" diff --git a/src/fides/api/models/datasetconfig.py b/src/fides/api/models/datasetconfig.py index ed87456ae8..6ecc311dad 100644 --- a/src/fides/api/models/datasetconfig.py +++ b/src/fides/api/models/datasetconfig.py @@ -329,6 +329,10 @@ def convert_dataset_to_graph( if collection.fides_meta and collection.fides_meta.masking_strategy_override: masking_override = collection.fides_meta.masking_strategy_override + collection_partitioning = None + if collection.fides_meta and collection.fides_meta.partitioning: + collection_partitioning = collection.fides_meta.partitioning + graph_collection = Collection( name=collection.name, fields=graph_fields, @@ -339,6 +343,7 @@ def convert_dataset_to_graph( data_categories=( set(collection.data_categories) if collection.data_categories else set() ), + partitioning=collection_partitioning, ) graph_collections.append(graph_collection) logger.debug( diff --git a/src/fides/api/service/connectors/query_config.py b/src/fides/api/service/connectors/query_config.py index 656ad8c18b..51d7ff4bb6 100644 --- a/src/fides/api/service/connectors/query_config.py +++ b/src/fides/api/service/connectors/query_config.py @@ -51,6 +51,16 @@ class QueryConfig(Generic[T], ABC): def __init__(self, node: ExecutionNode): self.node = node + @property + def partitioning(self) -> Optional[Dict]: # pylint: disable=R1711 + # decided to de-scope partitioning support to only bigquery as this grew more complex, + # but keeping more generic support stubbed out feels like a reasonable step. + if self.node.collection.partitioning: + logger.warning( + "Partitioning is only supported on BigQuery connectors at this time!" + ) + return None + def field_map(self) -> Dict[FieldPath, Field]: """Flattened FieldPaths of interest from this traversal_node.""" return self.node.collection.field_dict @@ -707,7 +717,7 @@ def generate_raw_query( data_vals = list(data) query_data_keys: List[str] = [] for val in data_vals: - # appending "_in_stmt_generated_" (can be any arbitrary str) so that this name has less change of conflicting with pre-existing column in table + # appending "_in_stmt_generated_" (can be any arbitrary str) so that this name has lower chance of conflicting with pre-existing column in table query_data_name = ( field_name + "_in_stmt_generated_" + str(data_vals.index(val)) ) @@ -837,6 +847,42 @@ class BigQueryQueryConfig(QueryStringWithoutTuplesOverrideQueryConfig): namespace_meta_schema = BigQueryNamespaceMeta + @property + def partitioning(self) -> Optional[Dict]: + # Overriden from base implementation to allow for _only_ BQ partitioning, for now + return self.node.collection.partitioning + + def get_partition_clauses( + self, + ) -> List[str]: + """ + Returns the WHERE clauses specified in the partitioning spec + + Currently, only where-clause based partitioning is supported. + + TODO: derive partitions from a start/end/interval specification + + + NOTE: when we deprecate `where_clause` partitioning in favor of a more proper partitioning DSL, + we should be sure to still support the existing `where_clause` partition definition on + any in-progress DSRs so that they can run through to completion. + """ + partition_spec = self.partitioning + if not partition_spec: + logger.error( + "Partitioning clauses cannot be retrieved, no partitioning specification found" + ) + return [] + + if where_clauses := partition_spec.get("where_clauses"): + return where_clauses + + # TODO: implement more advanced partitioning support! + + raise ValueError( + "`where_clauses` must be specified in partitioning specification!" + ) + def _generate_table_name(self) -> str: """ Prepends the dataset ID and project ID to the base table name @@ -860,7 +906,6 @@ def get_formatted_query_string( Returns a query string with backtick formatting for tables that have the same names as BigQuery reserved words. """ - return f'SELECT {field_list} FROM `{self._generate_table_name()}` WHERE {" OR ".join(clauses)}' def generate_masking_stmt( @@ -870,7 +915,7 @@ def generate_masking_stmt( policy: Policy, request: PrivacyRequest, client: Engine, - ) -> Union[Optional[Update], Optional[Delete]]: + ) -> Union[List[Update], List[Delete]]: """ Generate a masking statement for BigQuery. @@ -887,10 +932,15 @@ def generate_masking_stmt( def generate_update( self, row: Row, policy: Policy, request: PrivacyRequest, client: Engine - ) -> Optional[Update]: + ) -> List[Update]: """ Using TextClause to insert 'None' values into BigQuery throws an exception, so we use update clause instead. - Returns a SQLAlchemy Update object. Does not actually execute the update object. + Returns a List of SQLAlchemy Update object. Does not actually execute the update object. + + A List of multiple Update objects are returned for partitioned tables; for a non-partitioned table, + a single Update object is returned in a List for consistent typing. + + TODO: DRY up this method and `generate_delete` a bit """ update_value_map: Dict[str, Any] = self.update_value_map(row, policy, request) non_empty_primary_keys: Dict[str, Field] = filter_nonempty_values( @@ -907,19 +957,41 @@ def generate_update( "There is not enough data to generate a valid update statement for {}", self.node.address, ) - return None + return [] table = Table(self._generate_table_name(), MetaData(bind=client), autoload=True) pk_clauses: List[ColumnElement] = [ getattr(table.c, k) == v for k, v in non_empty_primary_keys.items() ] - return table.update().where(*pk_clauses).values(**update_value_map) - def generate_delete(self, row: Row, client: Engine) -> Optional[Delete]: - """Returns a SQLAlchemy DELETE statement for BigQuery. Does not actually execute the delete statement. + if self.partitioning: + partition_clauses = self.get_partition_clauses() + partitioned_queries = [] + logger.info( + f"Generating {len(partition_clauses)} partition queries for node '{self.node.address}' in DSR execution" + ) + for partition_clause in partition_clauses: + partitioned_queries.append( + table.update() + .where(*(pk_clauses + [text(partition_clause)])) + .values(**update_value_map) + ) + + return partitioned_queries + + return [table.update().where(*pk_clauses).values(**update_value_map)] + + def generate_delete(self, row: Row, client: Engine) -> List[Delete]: + """Returns a List of SQLAlchemy DELETE statements for BigQuery. Does not actually execute the delete statement. Used when a collection-level masking override is present and the masking strategy is DELETE. + + A List of multiple DELETE statements are returned for partitioned tables; for a non-partitioned table, + a single DELETE statement is returned in a List for consistent typing. + + TODO: DRY up this method and `generate_update` a bit """ + non_empty_primary_keys: Dict[str, Field] = filter_nonempty_values( { fpath.string_path: fld.cast(row[fpath.string_path]) @@ -934,13 +1006,28 @@ def generate_delete(self, row: Row, client: Engine) -> Optional[Delete]: "There is not enough data to generate a valid DELETE statement for {}", self.node.address, ) - return None + return [] table = Table(self._generate_table_name(), MetaData(bind=client), autoload=True) pk_clauses: List[ColumnElement] = [ getattr(table.c, k) == v for k, v in non_empty_primary_keys.items() ] - return table.delete().where(*pk_clauses) + + if self.partitioning: + partition_clauses = self.get_partition_clauses() + partitioned_queries = [] + logger.info( + f"Generating {len(partition_clauses)} partition queries for node '{self.node.address}' in DSR execution" + ) + + for partition_clause in partition_clauses: + partitioned_queries.append( + table.delete().where(*(pk_clauses + [text(partition_clause)])) + ) + + return partitioned_queries + + return [table.delete().where(*pk_clauses)] MongoStatement = Tuple[Dict[str, Any], Dict[str, Any]] diff --git a/src/fides/api/service/connectors/sql_connector.py b/src/fides/api/service/connectors/sql_connector.py index aad5681bb9..084e21b26e 100644 --- a/src/fides/api/service/connectors/sql_connector.py +++ b/src/fides/api/service/connectors/sql_connector.py @@ -199,9 +199,15 @@ def retrieve_data( stmt: Optional[TextClause] = query_config.generate_query(input_data, policy) if stmt is None: return [] + logger.info("Starting data retrieval for {}", node.address) with client.connect() as connection: self.set_schema(connection) + if ( + query_config.partitioning + ): # only BigQuery supports partitioning, for now + return self.partitioned_retrieval(query_config, connection, stmt) + results = connection.execute(stmt) return self.cursor_result_to_rows(results) @@ -277,6 +283,22 @@ def create_ssh_tunnel(self, host: Optional[str], port: Optional[int]) -> None: ), ) + def partitioned_retrieval( + self, + query_config: SQLQueryConfig, + connection: Connection, + stmt: TextClause, + ) -> List[Row]: + """ + Retrieve data against a partitioned table using the partitioning spec configured for this node to execute + multiple queries against the partitioned table. + + This is only supported by the BigQueryConnector currently, so the base implementation is overridden there. + """ + raise NotImplementedError( + "Partitioned retrieval is only supported for BigQuery currently!" + ) + class PostgreSQLConnector(SQLConnector): """Connector specific to postgresql""" @@ -553,6 +575,44 @@ def query_config(self, node: ExecutionNode) -> BigQueryQueryConfig: node, SQLConnector.get_namespace_meta(db, node.address.dataset) ) + def partitioned_retrieval( + self, + query_config: SQLQueryConfig, + connection: Connection, + stmt: TextClause, + ) -> List[Row]: + """ + Retrieve data against a partitioned table using the partitioning spec configured for this node to execute + multiple queries against the partitioned table. + + This is only supported by the BigQueryConnector currently. + + NOTE: when we deprecate `where_clause` partitioning in favor of a more proper partitioning DSL, + we should be sure to still support the existing `where_clause` partition definition on + any in-progress DSRs so that they can run through to completion. + """ + if not isinstance(query_config, BigQueryQueryConfig): + raise TypeError( + f"Unexpected query config of type '{type(query_config)}' passed to BigQueryConnector's `partitioned_retrieval`" + ) + + partition_clauses = query_config.get_partition_clauses() + logger.info( + f"Executing {len(partition_clauses)} partition queries for node '{query_config.node.address}' in DSR execution" + ) + rows = [] + for partition_clause in partition_clauses: + logger.debug( + f"Executing partition query with partition clause '{partition_clause}'" + ) + existing_bind_params = stmt.compile().params + partitioned_stmt = text(f"{stmt} AND ({text(partition_clause)})").params( + existing_bind_params + ) + results = connection.execute(partitioned_stmt) + rows.extend(self.cursor_result_to_rows(results)) + return rows + # Overrides SQLConnector.test_connection def test_connection(self) -> Optional[ConnectionTestStatus]: """ @@ -589,17 +649,18 @@ def mask_data( update_or_delete_ct = 0 client = self.client() for row in rows: - update_or_delete_stmt: Optional[Executable] = ( + update_or_delete_stmts: List[Executable] = ( query_config.generate_masking_stmt( node, row, policy, privacy_request, client ) ) - if update_or_delete_stmt is not None: + if update_or_delete_stmts: with client.connect() as connection: - results: LegacyCursorResult = connection.execute( - update_or_delete_stmt - ) - update_or_delete_ct = update_or_delete_ct + results.rowcount + for update_or_delete_stmt in update_or_delete_stmts: + results: LegacyCursorResult = connection.execute( + update_or_delete_stmt + ) + update_or_delete_ct = update_or_delete_ct + results.rowcount return update_or_delete_ct diff --git a/tests/fixtures/bigquery_fixtures.py b/tests/fixtures/bigquery_fixtures.py index 0965213311..acac500bf2 100644 --- a/tests/fixtures/bigquery_fixtures.py +++ b/tests/fixtures/bigquery_fixtures.py @@ -145,6 +145,56 @@ def bigquery_example_test_dataset_config_with_namespace_meta( ctl_dataset.delete(db=db) +@pytest.fixture +def bigquery_example_test_dataset_config_with_namespace_and_partitioning_meta( + bigquery_connection_config_without_default_dataset: ConnectionConfig, + db: Session, + example_datasets: List[Dict], +) -> Generator: + bigquery_dataset = example_datasets[7] + bigquery_dataset["fides_meta"] = { + "namespace": { + "project_id": "silken-precinct-284918", + "dataset_id": "fidesopstest", + }, + } + # update customer collection to have a partition + customer_collection = next( + collection + for collection in bigquery_dataset["collections"] + if collection["name"] == "customer" + ) + bigquery_dataset["collections"].remove(customer_collection) + customer_collection["fides_meta"] = { + "partitioning": { + "where_clauses": [ + "`created` > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 1000 DAY) AND `created` <= CURRENT_TIMESTAMP()", + "`created` > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 2000 DAY) AND `created` <= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 1000 DAY)", + ] + } + } + bigquery_dataset["collections"].append(customer_collection) + + fides_key = bigquery_dataset["fides_key"] + bigquery_connection_config_without_default_dataset.name = fides_key + bigquery_connection_config_without_default_dataset.key = fides_key + bigquery_connection_config_without_default_dataset.save(db=db) + + ctl_dataset = CtlDataset.create_from_dataset_dict(db, bigquery_dataset) + + dataset = DatasetConfig.create( + db=db, + data={ + "connection_config_id": bigquery_connection_config_without_default_dataset.id, + "fides_key": fides_key, + "ctl_dataset_id": ctl_dataset.id, + }, + ) + yield dataset + dataset.delete(db=db) + ctl_dataset.delete(db=db) + + @pytest.fixture(scope="function") def bigquery_resources( bigquery_example_test_dataset_config, @@ -180,6 +230,14 @@ def bigquery_resources( connection.execute(stmt) + last_visit_date = "2024-10-03 01:00:00" + stmt = f""" + insert into visit_partitioned (email, last_visit) + values ('{customer_email}', '{last_visit_date}'); + """ + + connection.execute(stmt) + stmt = "select max(id) from employee;" res = connection.execute(stmt) employee_id = res.all()[0][0] + 1 @@ -208,6 +266,9 @@ def bigquery_resources( stmt = f"delete from customer where email = '{customer_email}';" connection.execute(stmt) + stmt = f"delete from visit_partitioned where email = '{customer_email}' and last_visit = '{last_visit_date}';" + connection.execute(stmt) + stmt = f"delete from address where id = {address_id};" connection.execute(stmt) @@ -252,6 +313,14 @@ def bigquery_resources_with_namespace_meta( connection.execute(stmt) + last_visit_date = "2024-10-03 01:00:00" + stmt = f""" + insert into fidesopstest.visit_partitioned (email, last_visit) + values ('{customer_email}', '{last_visit_date}'); + """ + + connection.execute(stmt) + stmt = "select max(id) from fidesopstest.employee;" res = connection.execute(stmt) employee_id = res.all()[0][0] + 1 @@ -280,6 +349,9 @@ def bigquery_resources_with_namespace_meta( stmt = f"delete from fidesopstest.customer where email = '{customer_email}';" connection.execute(stmt) + stmt = f"delete from fidesopstest.visit_partitioned where email = '{customer_email}' and last_visit = '{last_visit_date}';" + connection.execute(stmt) + stmt = f"delete from fidesopstest.address where id = {address_id};" connection.execute(stmt) @@ -335,6 +407,9 @@ def seed_bigquery_integration_db(bigquery_integration_engine) -> None: DROP TABLE IF EXISTS fidesopstest.visit; """, """ + DROP TABLE IF EXISTS fidesopstest.visit_partitioned; + """, + """ DROP TABLE IF EXISTS fidesopstest.order_item; """, """ @@ -424,6 +499,18 @@ def seed_bigquery_integration_db(bigquery_integration_engine) -> None: ); """, """ + CREATE TABLE fidesopstest.visit_partitioned ( + email STRING, + last_visit TIMESTAMP + ) + PARTITION BY + last_visit + OPTIONS( + require_partition_filter = TRUE + ) + ; + """, + """ CREATE TABLE fidesopstest.login ( id INT, customer_id INT, @@ -498,6 +585,12 @@ def seed_bigquery_integration_db(bigquery_integration_engine) -> None: ('customer-2@example.com', '2021-01-06 01:00:00'); """, """ + INSERT INTO fidesopstest.visit_partitioned VALUES + ('customer-1@example.com', '2021-01-06 01:00:00'), + ('customer-2@example.com', '2021-01-06 01:00:00'); + ('customer-2@example.com', '2024-10-03 01:00:00'); + """, + """ INSERT INTO fidesopstest.login VALUES (1, 1, '2021-01-01 01:00:00'), (2, 1, '2021-01-02 01:00:00'), diff --git a/tests/ops/api/v1/endpoints/test_dataset_endpoints.py b/tests/ops/api/v1/endpoints/test_dataset_endpoints.py index e4e62e7682..35deb7fe8a 100644 --- a/tests/ops/api/v1/endpoints/test_dataset_endpoints.py +++ b/tests/ops/api/v1/endpoints/test_dataset_endpoints.py @@ -61,7 +61,7 @@ def test_example_datasets(example_datasets): assert example_datasets[6]["fides_key"] == "mariadb_example_test_dataset" assert len(example_datasets[6]["collections"]) == 11 assert example_datasets[7]["fides_key"] == "bigquery_example_test_dataset" - assert len(example_datasets[7]["collections"]) == 11 + assert len(example_datasets[7]["collections"]) == 12 assert example_datasets[9]["fides_key"] == "email_dataset" assert len(example_datasets[9]["collections"]) == 3 assert example_datasets[11]["fides_key"] == "dynamodb_example_test_dataset" diff --git a/tests/ops/graph/test_config.py b/tests/ops/graph/test_config.py index d2fc17a116..b656ae90d1 100644 --- a/tests/ops/graph/test_config.py +++ b/tests/ops/graph/test_config.py @@ -127,6 +127,7 @@ def test_from_string(self): "name": "t3", "skip_processing": False, "masking_strategy_override": None, + "partitioning": None, "fields": [ { "name": "f1", @@ -381,6 +382,83 @@ def test_parse_from_task_without_data_categories(self): parsed = Collection.parse_from_request_task(serialized_collection) assert parsed.data_categories == set() + @pytest.mark.parametrize( + "where_clauses,validation_error", + [ + ( + [ + "`created` > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 1000 DAY) AND `created` <= CURRENT_TIMESTAMP()", + "`created` > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 2000 DAY) AND `created` <= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 1000 DAY)", + ], + None, + ), + ( + [ + "`created` > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 1000 DAY) AND `created` <= CURRENT_TIMESTAMP()", + "`created` <= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 1000 DAY)", # we support only a single comparison for 'terminal' partition windows + ], + None, + ), + ( + [ + "`created` > 4 OR 1 = 1", # comparison operators after an OR are not permitted + "`created` > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 2000 DAY) AND `created` <= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 1000 DAY)", + ], + "Unsupported partition clause format", + ), + ( + [ + "`created` > 4) OR 1 > 0", # comparison operators after an OR are not permitted + ], + "Unsupported partition clause format", + ), + ( + [ + "`created` > 4; drop table user", # semi-colons are not allowed, so stacked queries are prevented + ], + "Unsupported partition clause format", + ), + ( + [ + "`created` > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 2000 DAY) AND `foobar` <= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 1000 DAY)", # field names in comparison must match + ], + "Partition clause must have matching fields", + ), + ( + [ + "`created` > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 1000 DAY) AND `created` <= CURRENT_TIMESTAMP()) OR 1 > 0", # comparison operators after an OR are not permitted + ], + "Unsupported partition clause format", + ), + ( + [ + "`created` > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 1000 DAY)) UNION\nSELECT password from user" # union is a protected keyword not allowed in an operand + ], + "Prohibited keyword referenced in partition clause", + ), + ], + ) + def test_parse_from_task_with_partitioning(self, where_clauses, validation_error): + """ + Verify that a collection stored with partitioning specification goes through proper validation + """ + serialized_collection_with_partitioning = { + "name": "partitioning_collection", + "partitioning": {"where_clauses": where_clauses}, + "fields": [], + } + if validation_error is None: + parsed = Collection.parse_from_request_task( + serialized_collection_with_partitioning + ) + assert parsed.partitioning == {"where_clauses": where_clauses} + else: + with pytest.raises(pydantic.ValidationError) as e: + Collection.parse_from_request_task( + serialized_collection_with_partitioning + ) + assert validation_error in str(e) + def test_collection_masking_strategy_override(self): ds = Collection( name="t3", diff --git a/tests/ops/integration_tests/test_external_database_connections.py b/tests/ops/integration_tests/test_external_database_connections.py index ca3b65c3cc..d5077bbc10 100644 --- a/tests/ops/integration_tests/test_external_database_connections.py +++ b/tests/ops/integration_tests/test_external_database_connections.py @@ -180,6 +180,7 @@ def test_bigquery_example_data(bigquery_test_engine): "report", "service_request", "visit", + "visit_partitioned", ] ) diff --git a/tests/ops/models/test_datasetconfig.py b/tests/ops/models/test_datasetconfig.py index 5c52b7eca0..969002baac 100644 --- a/tests/ops/models/test_datasetconfig.py +++ b/tests/ops/models/test_datasetconfig.py @@ -1,5 +1,6 @@ import pytest from fideslang.models import Dataset, FidesDatasetReference +from pydantic import ValidationError as PydanticValidationError from sqlalchemy.orm import Session from fides.api.common_exceptions import ValidationError @@ -65,6 +66,89 @@ def test_convert_dataset_to_graph_no_collections(example_datasets): assert len(graph.collections) == 0 +@pytest.mark.parametrize( + "where_clauses,validation_error", + [ + ( + [ + "`created` > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 1000 DAY) AND `created` <= CURRENT_TIMESTAMP()", + "`created` > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 2000 DAY) AND `created` <= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 1000 DAY)", + ], + None, + ), + ( + [ + "`created` > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 1000 DAY) AND `created` <= CURRENT_TIMESTAMP()", + "`created` <= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 1000 DAY)", # we support only a single comparison for 'terminal' partition windows + ], + None, + ), + ( + [ + "`created` > 4 OR 1 = 1", # comparison operators after an OR are not permitted + "`created` > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 2000 DAY) AND `created` <= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 1000 DAY)", + ], + "Unsupported partition clause format", + ), + ( + [ + "`created` > 4) OR 1 > 0", # comparison operators after an OR are not permitted + ], + "Unsupported partition clause format", + ), + ( + [ + "`created` > 4; drop table user", # semi-colons are not allowed, so stacked queries are prevented + ], + "Unsupported partition clause format", + ), + ( + [ + "`created` > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 2000 DAY) AND `foobar` <= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 1000 DAY)", # field names in comparison must match + ], + "Partition clause must have matching fields", + ), + ( + [ + "`created` > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 1000 DAY) AND `created` <= CURRENT_TIMESTAMP()) OR 1 > 0", # comparison operators after an OR are not permitted + ], + "Unsupported partition clause format", + ), + ( + [ + "`created` > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 1000 DAY)) UNION\nSELECT password from user" # union is a protected keyword not allowed in an operand + ], + "Prohibited keyword referenced in partition clause", + ), + ], +) +def test_convert_dataset_to_graph_partitioning( + example_datasets, where_clauses, validation_error +): + """ + Verify that a collection with partitioning specification + goes through proper validation during graph conversion. + """ + dataset_json = example_datasets[0].copy() + existing_collection = dataset_json["collections"][0] + if existing_collection.get("fides_meta") is None: + existing_collection["fides_meta"] = {} + existing_collection["fides_meta"]["partitioning"] = { + "where_clauses": where_clauses, + } + dataset_json["collections"][0] = existing_collection + dataset = Dataset(**dataset_json) + if validation_error is None: + graph = convert_dataset_to_graph(dataset, "mock_connection_config_key") + assert graph is not None + assert graph.name == "postgres_example_test_dataset" + assert graph.collections[0].partitioning == {"where_clauses": where_clauses} + else: + with pytest.raises(PydanticValidationError) as e: + graph = convert_dataset_to_graph(dataset, "mock_connection_config_key") + assert validation_error in str(e) + + def test_convert_dataset_to_graph(example_datasets): """Test a more complex dataset->graph conversion using the helper method directly""" diff --git a/tests/ops/service/connectors/test_bigquery_connector.py b/tests/ops/service/connectors/test_bigquery_connector.py index a906ddd642..7a9ef245b8 100644 --- a/tests/ops/service/connectors/test_bigquery_connector.py +++ b/tests/ops/service/connectors/test_bigquery_connector.py @@ -7,6 +7,7 @@ from fides.api.graph.graph import DatasetGraph from fides.api.graph.traversal import Traversal from fides.api.models.datasetconfig import DatasetConfig, convert_dataset_to_graph +from fides.api.models.privacy_request import PrivacyRequest, RequestTask from fides.api.schemas.namespace_meta.bigquery_namespace_meta import ( BigQueryNamespaceMeta, ) @@ -53,6 +54,25 @@ def execution_node_with_namespace_meta( CollectionAddress("bigquery_example_test_dataset", "customer") ].to_mock_execution_node() + @pytest.fixture + def execution_node_with_namespace_and_partitioning_meta( + self, + bigquery_example_test_dataset_config_with_namespace_and_partitioning_meta: DatasetConfig, + ) -> Generator: + dataset_config = ( + bigquery_example_test_dataset_config_with_namespace_and_partitioning_meta + ) + graph_dataset = convert_dataset_to_graph( + Dataset.model_validate(dataset_config.ctl_dataset), + dataset_config.connection_config.key, + ) + dataset_graph = DatasetGraph(graph_dataset) + traversal = Traversal(dataset_graph, {"email": "customer-1@example.com"}) + + yield traversal.traversal_node_dict[ + CollectionAddress("bigquery_example_test_dataset", "customer") + ].to_mock_execution_node() + def test_query_config( self, bigquery_example_test_dataset_config: DatasetConfig, @@ -74,3 +94,90 @@ def test_query_config_with_namespace_meta( assert query_config.namespace_meta == BigQueryNamespaceMeta( **dataset_config.ctl_dataset.fides_meta["namespace"] ) + + def test_generate_update_partitioned_table( + self, + bigquery_example_test_dataset_config_with_namespace_and_partitioning_meta: DatasetConfig, + execution_node_with_namespace_and_partitioning_meta, + erasure_policy, + ): + """Unit test of BigQueryQueryConfig.generate_update specifically for a partitioned table""" + dataset_config = ( + bigquery_example_test_dataset_config_with_namespace_and_partitioning_meta + ) + connector = BigQueryConnector(dataset_config.connection_config) + query_config = connector.query_config( + execution_node_with_namespace_and_partitioning_meta + ) + + row = { + "email": "customer-1@example.com", + "name": "John Customer", + "address_id": 1, + "id": 1, + } + updates = query_config.generate_update( + row=row, + policy=erasure_policy, + request=PrivacyRequest(id=123), + client=connector.client(), + ) + + assert len(updates) == 2 + assert ( + str(updates[0]) + == "UPDATE `silken-precinct-284918.fidesopstest.customer` SET `name`=%(name:STRING)s WHERE `silken-precinct-284918.fidesopstest.customer`.`id` = %(id_1:INT64)s AND `created` > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 1000 DAY) AND `created` <= CURRENT_TIMESTAMP()" + ) + + def test_generate_delete_partitioned_table( + self, + bigquery_example_test_dataset_config_with_namespace_and_partitioning_meta: DatasetConfig, + execution_node_with_namespace_and_partitioning_meta, + erasure_policy, + ): + """Unit test of BigQueryQueryConfig.generate_delete specifically for a partitioned table""" + dataset_config = ( + bigquery_example_test_dataset_config_with_namespace_and_partitioning_meta + ) + connector = BigQueryConnector(dataset_config.connection_config) + query_config = connector.query_config( + execution_node_with_namespace_and_partitioning_meta + ) + + row = { + "email": "customer-1@example.com", + "name": "John Customer", + "address_id": 1, + "id": 1, + } + deletes = query_config.generate_delete(row=row, client=connector.client()) + + assert len(deletes) == 2 + assert ( + str(deletes[0]) + == "DELETE FROM `silken-precinct-284918.fidesopstest.customer` WHERE `silken-precinct-284918.fidesopstest.customer`.`id` = %(id_1:INT64)s AND `created` > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 1000 DAY) AND `created` <= CURRENT_TIMESTAMP()" + ) + + def test_retrieve_partitioned_data( + self, + bigquery_example_test_dataset_config_with_namespace_and_partitioning_meta: DatasetConfig, + execution_node_with_namespace_and_partitioning_meta, + policy, + privacy_request_with_email_identity, + ): + """Unit test of BigQueryQueryConfig.generate_delete specifically for a partitioned table""" + dataset_config = ( + bigquery_example_test_dataset_config_with_namespace_and_partitioning_meta + ) + connector = BigQueryConnector(dataset_config.connection_config) + + results = connector.retrieve_data( + node=execution_node_with_namespace_and_partitioning_meta, + policy=policy, + privacy_request=privacy_request_with_email_identity, + request_task=RequestTask(), + input_data={"email": ["customer-1@example.com"]}, + ) + + assert len(results) == 1 + assert results[0]["email"] == "customer-1@example.com" diff --git a/tests/ops/service/privacy_request/test_request_runner_service.py b/tests/ops/service/privacy_request/test_request_runner_service.py index b0ac24d33a..34c4bfa26c 100644 --- a/tests/ops/service/privacy_request/test_request_runner_service.py +++ b/tests/ops/service/privacy_request/test_request_runner_service.py @@ -2156,6 +2156,13 @@ def test_create_and_process_access_request_bigquery( ) assert results[employee_table_key][0]["id"] == bigquery_resources["employee_id"] + # this covers access requests against a partitioned table + visit_partitioned_table_key = "bigquery_example_test_dataset:visit_partitioned" + assert len(results[visit_partitioned_table_key]) == 1 + assert ( + results[visit_partitioned_table_key][0]["email"] == bigquery_resources["email"] + ) + pr.delete(db=db) diff --git a/tests/ops/task/test_create_request_tasks.py b/tests/ops/task/test_create_request_tasks.py index 55c79188f5..fc89b75f0f 100644 --- a/tests/ops/task/test_create_request_tasks.py +++ b/tests/ops/task/test_create_request_tasks.py @@ -40,6 +40,7 @@ "name": "payment_card", "after": [], "masking_strategy_override": None, + "partitioning": None, "fields": [ { "name": "billing_address_id", @@ -290,6 +291,7 @@ def test_persist_access_tasks_with_object_fields_in_collection( "name": "internal_customer_profile", "after": [], "masking_strategy_override": None, + "partitioning": None, "fields": [ { "name": "_id", @@ -1177,6 +1179,7 @@ def test_persist_new_consent_request_tasks( "skip_processing": False, "data_categories": [], "masking_strategy_override": None, + "partitioning": None, } assert ga_task.traversal_details == { "input_keys": [],