From 18a018c73b6ec4f757ee02dbdd209b954b33e5d8 Mon Sep 17 00:00:00 2001 From: Peter Wicks Date: Tue, 6 Sep 2022 20:05:39 -0600 Subject: [PATCH] GCSToBigQueryOperator allow for schema_object in alternate GCS Bucket --- .../google/cloud/transfers/gcs_to_bigquery.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py b/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py index 6821e78433396..3f5f1d0edfe50 100644 --- a/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py +++ b/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py @@ -56,6 +56,8 @@ class GCSToBigQueryOperator(BaseOperator): :param schema_object: If set, a GCS object path pointing to a .json file that contains the schema for the table. (templated) Parameter must be defined if 'schema_fields' is null and autodetect is False. + :param schema_object_bucket: [Optional] If set, the GCS bucket where the schema object + template is stored. (templated) (Default: the value of ``bucket``) :param source_format: File format to export. :param compression: [Optional] The compression type of the data source. Possible values include GZIP and NONE. @@ -133,6 +135,7 @@ class GCSToBigQueryOperator(BaseOperator): 'bucket', 'source_objects', 'schema_object', + 'schema_object_bucket', 'destination_project_dataset_table', 'impersonation_chain', ) @@ -147,6 +150,7 @@ def __init__( destination_project_dataset_table, schema_fields=None, schema_object=None, + schema_object_bucket=None, source_format='CSV', compression='NONE', create_disposition='CREATE_IF_NEEDED', @@ -187,6 +191,10 @@ def __init__( self.source_objects = source_objects self.schema_object = schema_object + if schema_object_bucket is None: + schema_object_bucket = bucket + self.schema_object_bucket = schema_object_bucket + # BQ config self.destination_project_dataset_table = destination_project_dataset_table self.schema_fields = schema_fields @@ -236,7 +244,7 @@ def execute(self, context: 'Context'): impersonation_chain=self.impersonation_chain, ) blob = gcs_hook.download( - bucket_name=self.bucket, + bucket_name=self.schema_object_bucket, object_name=self.schema_object, ) schema_fields = json.loads(blob.decode("utf-8"))