diff --git a/system_tests/bigquery.py b/system_tests/bigquery.py index 6a3d8b22e913..8a5d56e7717d 100644 --- a/system_tests/bigquery.py +++ b/system_tests/bigquery.py @@ -424,3 +424,87 @@ def _job_done(instance): by_age = operator.itemgetter(1) self.assertEqual(sorted(rows, key=by_age), sorted(ROWS, key=by_age)) + + def test_load_table_from_storage_write_truncate(self): + import csv + import tempfile + from google.cloud.storage import Client as StorageClient + local_id = unique_resource_id() + BUCKET_NAME = 'bq_load_test' + local_id + BLOB_NAME = 'person_ages.csv' + GS_URL = 'gs://%s/%s' % (BUCKET_NAME, BLOB_NAME) + ROWS = [ + ('Phred Phlyntstone', 32), + ('Bharney Rhubble', 33), + ('Wylma Phlyntstone', 29), + ('Bhettye Rhubble', 27), + ] + TABLE_NAME = 'test_table' + + s_client = StorageClient() + + # In the **very** rare case the bucket name is reserved, this + # fails with a ConnectionError. + bucket = s_client.create_bucket(BUCKET_NAME) + self.to_delete.append(bucket) + + blob = bucket.blob(BLOB_NAME) + + with tempfile.TemporaryFile(mode='w+') as csv_file: + writer = csv.writer(csv_file) + writer.writerow(('Full Name', 'Age')) + writer.writerows(ROWS) + blob.upload_from_file( + csv_file, rewind=True, content_type='text/csv') + + self.to_delete.insert(0, blob) + + dataset = Config.CLIENT.dataset( + _make_dataset_name('load_gcs_write_trunc')) + + retry_403(dataset.create)() + self.to_delete.append(dataset) + + full_name = bigquery.SchemaField('full_name', 'STRING', + mode='REQUIRED') + age = bigquery.SchemaField('age', 'INTEGER', mode='REQUIRED') + table = dataset.table(TABLE_NAME, schema=[full_name, age]) + table.create() + self.to_delete.insert(0, table) + + def _job_done(instance): + return instance.state in ('DONE', 'done') + + job_1 = Config.CLIENT.load_table_from_storage( + 'bq_load_trunc_1' + local_id, table, GS_URL) + job_1.create_disposition = 'CREATE_NEVER' + job_1.skip_leading_rows = 1 + job_1.source_format = 'CSV' + job_1.write_disposition = 'WRITE_EMPTY' + + job_1.begin() + + # 8 tries -> 1 + 2 + 4 + 8 + 16 + 32 + 64 = 127 seconds + retry = RetryInstanceState(_job_done, max_tries=8) + retry(job_1.reload)() + + self.assertTrue(job_1.state in ('DONE', 'done')) + + job_2 = Config.CLIENT.load_table_from_storage( + 'bq_load_trunc_2' + local_id, table, GS_URL) + job_2.skip_leading_rows = 1 + job_2.source_format = 'CSV' + job_2.write_disposition = 'WRITE_TRUNCATE' + + job_2.begin() + + # 8 tries -> 1 + 2 + 4 + 8 + 16 + 32 + 64 = 127 seconds + retry = RetryInstanceState(_job_done, max_tries=8) + retry(job_2.reload)() + + self.assertTrue(job_2.state in ('DONE', 'done')) + + rows, _, _ = table.fetch_data() + by_age = operator.itemgetter(1) + self.assertEqual(sorted(rows, key=by_age), + sorted(ROWS, key=by_age))