Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PostgresToGoogleCloudStorageOperator - BigQuery schema type for time zone naive fields #22536

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 17 additions & 11 deletions airflow/providers/google/cloud/transfers/postgres_to_gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,10 @@ class PostgresToGCSOperator(BaseSQLToGCSOperator):
ui_color = '#a0e08c'

type_map = {
1114: 'TIMESTAMP',
1114: 'DATETIME',
1184: 'TIMESTAMP',
1082: 'TIMESTAMP',
1083: 'TIMESTAMP',
1082: 'DATE',
1083: 'TIME',
1005: 'INTEGER',
1007: 'INTEGER',
1016: 'INTEGER',
Expand Down Expand Up @@ -131,18 +131,24 @@ def field_to_bigquery(self, field) -> Dict[str, str]:
def convert_type(self, value, schema_type):
"""
Takes a value from Postgres, and converts it to a value that's safe for
JSON/Google Cloud Storage/BigQuery. Dates are converted to UTC seconds.
Decimals are converted to floats. Times are converted to seconds.
JSON/Google Cloud Storage/BigQuery.
Timezone aware Datetime are converted to UTC seconds.
Unaware Datetime, Date and Time are converted to ISO formatted strings.
Decimals are converted to floats.
"""
if isinstance(value, (datetime.datetime, datetime.date)):
return pendulum.parse(value.isoformat()).float_timestamp
if isinstance(value, datetime.datetime):
iso_format_value = value.isoformat()
if value.tzinfo is None:
return iso_format_value
return pendulum.parse(iso_format_value).float_timestamp
if isinstance(value, datetime.date):
return value.isoformat()
if isinstance(value, datetime.time):
formatted_time = time.strptime(str(value), "%H:%M:%S")
return int(
datetime.timedelta(
hours=formatted_time.tm_hour, minutes=formatted_time.tm_min, seconds=formatted_time.tm_sec
).total_seconds()
time_delta = datetime.timedelta(
hours=formatted_time.tm_hour, minutes=formatted_time.tm_min, seconds=formatted_time.tm_sec
)
return str(time_delta)
if isinstance(value, dict):
return json.dumps(value)
if isinstance(value, Decimal):
Expand Down
26 changes: 25 additions & 1 deletion tests/providers/google/cloud/transfers/test_postgres_to_gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

import datetime
import unittest
from unittest.mock import patch

import pytest
import pytz
from parameterized import parameterized

from airflow.providers.google.cloud.transfers.postgres_to_gcs import PostgresToGCSOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
Expand Down Expand Up @@ -89,6 +91,28 @@ def _assert_uploaded_file_content(self, bucket, obj, tmp_filename, mime_type, gz
with open(tmp_filename, 'rb') as file:
assert b''.join(NDJSON_LINES) == file.read()

@parameterized.expand(
[
("string", "string"),
(32.9, 32.9),
(-2, -2),
(datetime.date(1970, 1, 2), "1970-01-02"),
(datetime.date(1000, 1, 2), "1000-01-02"),
(datetime.datetime(1970, 1, 1, 1, 0, tzinfo=None), "1970-01-01T01:00:00"),
(
datetime.datetime(2022, 1, 1, 2, 0, tzinfo=pytz.UTC),
1641002400.0,
),
(datetime.time(hour=0, minute=0, second=0), "0:00:00"),
(datetime.time(hour=23, minute=59, second=59), "23:59:59"),
]
)
def test_convert_type(self, value, expected):
op = PostgresToGCSOperator(
task_id=TASK_ID, postgres_conn_id=POSTGRES_CONN_ID, sql=SQL, bucket=BUCKET, filename=FILENAME
)
assert op.convert_type(value, None) == expected

@patch('airflow.providers.google.cloud.transfers.sql_to_gcs.GCSHook')
def test_exec_success(self, gcs_hook_mock_class):
"""Test the execute function in case where the run is successful."""
Expand Down