Skip to content

Commit

Permalink
Added select_query to the templated fields in RedshiftToS3Operator (#…
Browse files Browse the repository at this point in the history
…16767)

Co-authored-by: Weiping He <weiping.he@cirium.com>
  • Loading branch information
hewe and Weiping He authored Jul 3, 2021
1 parent 4c5376b commit ffe8fab
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 5 deletions.
9 changes: 4 additions & 5 deletions airflow/providers/amazon/aws/transfers/redshift_to_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ class RedshiftToS3Operator(BaseOperator):
:type table_as_file_name: bool
"""

template_fields = ('s3_bucket', 's3_key', 'schema', 'table', 'unload_options')
template_fields = ('s3_bucket', 's3_key', 'schema', 'table', 'unload_options', 'select_query')
template_ext = ()
ui_color = '#ededed'

Expand Down Expand Up @@ -105,11 +105,10 @@ def __init__(
self.include_header = include_header
self.table_as_file_name = table_as_file_name

self._select_query = None
if select_query:
self._select_query = select_query
self.select_query = select_query
elif self.schema and self.table:
self._select_query = f"SELECT * FROM {self.schema}.{self.table}"
self.select_query = f"SELECT * FROM {self.schema}.{self.table}"
else:
raise ValueError(
'Please provide both `schema` and `table` params or `select_query` to fetch the data.'
Expand Down Expand Up @@ -140,7 +139,7 @@ def execute(self, context) -> None:
unload_options = '\n\t\t\t'.join(self.unload_options)

unload_query = self._build_unload_query(
credentials_block, self._select_query, self.s3_key, unload_options
credentials_block, self.select_query, self.s3_key, unload_options
)

self.log.info('Executing UNLOAD command...')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,4 +213,5 @@ def test_template_fields_overrides(self):
'schema',
'table',
'unload_options',
'select_query',
)

0 comments on commit ffe8fab

Please sign in to comment.