Skip to content

Commit

Permalink
fix: add fixture for dataproc batch and better test cleanup (GoogleCl…
Browse files Browse the repository at this point in the history
…oudPlatform#8262)

* fix: add second exception to backoff decorator

* add fixture

* refactor dataproc fixture, remove check for alreadyexists

* remove duplicate call to fixuttre

* fix lint

* switch pip version to fix test

Co-authored-by: Anthonios Partheniou <partheniou@google.com>
Co-authored-by: Charles Engelke <engelke@google.com>
  • Loading branch information
3 people authored Aug 18, 2022
1 parent 292859c commit d4d26d4
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 20 deletions.
60 changes: 41 additions & 19 deletions composer/2022_airflow_summit/data_analytics_process_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
# GCP Project
PROJECT_ID = os.environ["GOOGLE_CLOUD_PROJECT"]
TEST_ID = uuid.uuid4()
DATAPROC_REGION = "us-central1"


# Google Cloud Storage constants
BUCKET_NAME = f"data-analytics-process-test{TEST_ID}"
Expand All @@ -44,24 +46,45 @@
BQ_WRITE_TABLE = f"data-analytics-process-test-normalized-{TEST_ID}".replace("-", "_")
TABLE_ID = f"{PROJECT_ID}.{BQ_DATASET}.{BQ_READ_TABLE}"

DATAPROC_REGION = "us-central1"
PYSPARK_JAR = "gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar"
PROCESSING_PYTHON_FILE = f"gs://{BUCKET_NAME}/{BUCKET_BLOB}"

BATCH_ID = (
f"summit-dag-test-{TEST_ID}" # Dataproc serverless only allows lowercase characters
)
BATCH_CONFIG = {
"pyspark_batch": {
"jar_file_uris": [PYSPARK_JAR],
"main_python_file_uri": PROCESSING_PYTHON_FILE,
"args": [
PROJECT_ID,
f"{BQ_DATASET}.{BQ_READ_TABLE}",
f"{BQ_DATASET}.{BQ_WRITE_TABLE}",
],
},
}

@pytest.fixture(scope="module")
def test_dataproc_batch():

BATCH_ID = (
f"summit-dag-test-{TEST_ID}" # Dataproc serverless only allows lowercase characters
)
BATCH_CONFIG = {
"pyspark_batch": {
"jar_file_uris": [PYSPARK_JAR],
"main_python_file_uri": PROCESSING_PYTHON_FILE,
"args": [
PROJECT_ID,
f"{BQ_DATASET}.{BQ_READ_TABLE}",
f"{BQ_DATASET}.{BQ_WRITE_TABLE}",
],
},
}

yield (BATCH_ID, BATCH_CONFIG)
dataproc_client = dataproc.BatchControllerClient(
client_options={
"api_endpoint": f"{DATAPROC_REGION}-dataproc.googleapis.com:443"
}
)
request = dataproc.DeleteBatchRequest(
name=f"projects/{PROJECT_ID}/locations/{DATAPROC_REGION}/batches/{BATCH_ID}"
)

# Make the request
response = dataproc_client.delete_batch(request=request)

# There will only be a response if the deletion fails
# otherwise response will be None
if response:
print(response)


@pytest.fixture(scope="module")
Expand All @@ -85,7 +108,6 @@ def test_bucket():
bucket.delete(force=True)


# TODO(coleleah): teardown any previous resources
@pytest.fixture(autouse=True)
def bq_dataset(test_bucket):
# Create dataset and table tfor test CSV
Expand Down Expand Up @@ -125,7 +147,7 @@ def bq_dataset(test_bucket):

# Retry if we see a flaky 409 "subnet not ready" exception
@backoff.on_exception(backoff.expo, Aborted, max_tries=3)
def test_process(test_bucket):
def test_process(test_dataproc_batch):
# check that the results table isnt there
with pytest.raises(NotFound):
BQ_CLIENT.get_table(f"{BQ_DATASET}.{BQ_WRITE_TABLE}")
Expand All @@ -138,8 +160,8 @@ def test_process(test_bucket):
)
request = dataproc.CreateBatchRequest(
parent=f"projects/{PROJECT_ID}/regions/{DATAPROC_REGION}",
batch=BATCH_CONFIG,
batch_id=BATCH_ID,
batch=test_dataproc_batch[1],
batch_id=test_dataproc_batch[0],
)
# Make the request
operation = dataproc_client.create_batch(request=request)
Expand Down
2 changes: 1 addition & 1 deletion composer/2022_airflow_summit/noxfile_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
# If you need to use a specific version of pip,
# change pip_version_override to the string representation
# of the version number, for example, "20.2.4"
"pip_version_override": "20.2.4",
"pip_version_override": "",
# A dictionary you want to inject into your test. Don't put any
# secrets here. These values will override predefined values.
"envs": {"AIRFLOW_HOME": _tmpdir.name},
Expand Down

0 comments on commit d4d26d4

Please sign in to comment.