From 4aae061c0e2c9cbe8a2df589f7d0fcf00d45c46f Mon Sep 17 00:00:00 2001 From: Alex Massen-Hane Date: Tue, 12 Sep 2023 17:34:28 +0800 Subject: [PATCH 1/2] Add ability to create buckets for tests with roles --- .../platform/observatory_environment.py | 31 +++++++++++++------ .../platform/test_observatory_environment.py | 18 ++++++++--- 2 files changed, 35 insertions(+), 14 deletions(-) diff --git a/observatory-platform/observatory/platform/observatory_environment.py b/observatory-platform/observatory/platform/observatory_environment.py index e273ba313..c9b57b533 100644 --- a/observatory-platform/observatory/platform/observatory_environment.py +++ b/observatory-platform/observatory/platform/observatory_environment.py @@ -73,7 +73,7 @@ from datetime import datetime, timedelta from http.server import SimpleHTTPRequestHandler, ThreadingHTTPServer from multiprocessing import Process -from typing import Dict, List, Optional, Set +from typing import Dict, List, Optional, Set, Union import boto3 import croniter @@ -90,7 +90,7 @@ from airflow.models.dagrun import DagRun from airflow.models.taskinstance import TaskInstance from airflow.models.variable import Variable -from airflow.operators.dummy_operator import DummyOperator +from airflow.operators.empty import EmptyOperator as DummyOperator from airflow.utils import db from airflow.utils.state import State from airflow.utils.types import DagRunType @@ -253,7 +253,7 @@ def __init__( self.data_location = data_location self.api_host = api_host self.api_port = api_port - self.buckets = [] + self.buckets = {} self.datasets = [] self.data_path = None self.session = None @@ -303,7 +303,7 @@ def assert_gcp_dependencies(self): assert self.create_gcp_env, "Please specify the Google Cloud project_id and data_location" - def add_bucket(self, prefix: Optional[str] = None) -> str: + def add_bucket(self, prefix: Optional[str] = None, roles: Optional[Union[Set[str], str]] = None) -> str: """Add a Google Cloud Storage Bucket to the Observatory environment. The bucket will be created when create() is called and deleted when the Observatory @@ -325,21 +325,32 @@ def add_bucket(self, prefix: Optional[str] = None) -> str: if len(bucket_name) > 63: raise Exception(f"Bucket name cannot be longer than 63 characters: {bucket_name}") else: - self.buckets.append(bucket_name) + self.buckets[bucket_name] = roles return bucket_name - def _create_bucket(self, bucket_id: str) -> None: + def _create_bucket(self, bucket_id: str, roles: Optional[Union[str, Set[str]]] = None) -> None: """Create a Google Cloud Storage Bucket. :param bucket_id: the bucket identifier. + :param roles: Create bucket with custom roles if required. :return: None. """ self.assert_gcp_dependencies() - self.storage_client.create_bucket(bucket_id, location=self.data_location) + bucket = self.storage_client.create_bucket(bucket_id, location=self.data_location) logging.info(f"Created bucket with name: {bucket_id}") + if roles: + roles = set(roles) if isinstance(roles, str) else roles + + # Get policy of bucket and add roles. + policy = bucket.get_iam_policy() + for role in roles: + policy.bindings.append({"role": role, "members": {"allUsers"}}) + bucket.set_iam_policy(policy) + logging.info(f"Added permission {role} to bucket {bucket_id} for allUsers.") + def _create_dataset(self, dataset_id: str) -> None: """Create a BigQuery dataset. @@ -526,8 +537,8 @@ def create(self, task_logging: bool = False): # Create buckets and datasets if self.create_gcp_env: - for bucket_id in self.buckets: - self._create_bucket(bucket_id) + for bucket_id, roles in self.buckets.items(): + self._create_bucket(bucket_id, roles) for dataset_id in self.datasets: self._create_dataset(dataset_id) @@ -573,7 +584,7 @@ def create(self, task_logging: bool = False): if self.create_gcp_env: # Remove Google Cloud Storage buckets - for bucket_id in self.buckets: + for bucket_id, roles in self.buckets.items(): self._delete_bucket(bucket_id) # Remove BigQuery datasets diff --git a/tests/observatory/platform/test_observatory_environment.py b/tests/observatory/platform/test_observatory_environment.py index 69f578612..bfc2d134e 100644 --- a/tests/observatory/platform/test_observatory_environment.py +++ b/tests/observatory/platform/test_observatory_environment.py @@ -119,13 +119,15 @@ def test_add_bucket(self): env = ObservatoryEnvironment(self.project_id, self.data_location) # The download and transform buckets are added in the constructor - self.assertEqual(2, len(env.buckets)) - self.assertEqual(env.download_bucket, env.buckets[0]) - self.assertEqual(env.transform_bucket, env.buckets[1]) + buckets = list(env.buckets.keys()) + self.assertEqual(2, len(buckets)) + self.assertEqual(env.download_bucket, buckets[0]) + self.assertEqual(env.transform_bucket, buckets[1]) # Test that calling add bucket adds a new bucket to the buckets list name = env.add_bucket() - self.assertEqual(name, env.buckets[-1]) + buckets = list(env.buckets.keys()) + self.assertEqual(name, buckets[-1]) # No Google Cloud variables raises error with self.assertRaises(AssertionError): @@ -150,6 +152,14 @@ def test_create_delete_bucket(self): # Test double delete is handled gracefully env._delete_bucket(bucket_id) + # Test create a bucket with a set of roles + roles = {"roles/storage.objectViewer", "roles/storage.legacyBucketWriter"} + env._create_bucket(bucket_id, roles=roles) + bucket = env.storage_client.bucket(bucket_id) + bucket_policy = bucket.get_iam_policy() + for role in roles: + self.assertTrue({"role": role, "members": {"allUsers"}} in bucket_policy) + # No Google Cloud variables raises error bucket_id = "obsenv_tests_" + random_id() with self.assertRaises(AssertionError): From 7666f20bbd02d450612df7a45dd2fd94ccf10a09 Mon Sep 17 00:00:00 2001 From: Alex Massen-Hane Date: Tue, 12 Sep 2023 18:23:32 +0800 Subject: [PATCH 2/2] Update --- .../observatory/platform/observatory_environment.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/observatory-platform/observatory/platform/observatory_environment.py b/observatory-platform/observatory/platform/observatory_environment.py index c9b57b533..d80e735a7 100644 --- a/observatory-platform/observatory/platform/observatory_environment.py +++ b/observatory-platform/observatory/platform/observatory_environment.py @@ -232,6 +232,7 @@ def __init__( prefix: Optional[str] = "obsenv_tests", age_to_delete: int = 12, workflows: List[Workflow] = None, + gcs_bucket_roles: Union[Set[str], str] = None, ): """Constructor for an Observatory environment. @@ -267,8 +268,8 @@ def __init__( self.workflows = workflows if self.create_gcp_env: - self.download_bucket = self.add_bucket() - self.transform_bucket = self.add_bucket() + self.download_bucket = self.add_bucket(roles=gcs_bucket_roles) + self.transform_bucket = self.add_bucket(roles=gcs_bucket_roles) self.storage_client = storage.Client() self.bigquery_client = bigquery.Client() else: