Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

unifies handling of naive datetime for arrow and object incrementals #2184

Merged
merged 2 commits into from
Jan 8, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 2 additions & 10 deletions dlt/common/libs/pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -462,16 +462,8 @@ def to_arrow_scalar(value: Any, arrow_type: pyarrow.DataType) -> Any:


def from_arrow_scalar(arrow_value: pyarrow.Scalar) -> Any:
"""Converts arrow scalar into Python type. Currently adds "UTC" to naive date times and converts all others to UTC"""
row_value = arrow_value.as_py()
# dates are not represented as datetimes but I see connector-x represents
# datetimes as dates and keeping the exact time inside. probably a bug
# but can be corrected this way
if isinstance(row_value, date) and not isinstance(row_value, datetime):
row_value = pendulum.from_timestamp(arrow_value.cast(pyarrow.int64()).as_py() / 1000)
elif isinstance(row_value, datetime):
row_value = pendulum.instance(row_value).in_tz("UTC")
return row_value
"""Converts arrow scalar into Python type."""
return arrow_value.as_py()


TNewColumns = Sequence[Tuple[int, pyarrow.Field, Callable[[pyarrow.Table], Iterable[Any]]]]
Expand Down
1 change: 0 additions & 1 deletion dlt/extract/incremental/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
coerce_value,
py_type_to_sc_type,
)
from dlt.common.utils import without_none

from dlt.extract.exceptions import IncrementalUnboundError
from dlt.extract.incremental.exceptions import (
Expand Down
36 changes: 20 additions & 16 deletions dlt/extract/incremental/transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,19 @@ def __call__(
row: TDataItem,
) -> Tuple[bool, bool, bool]: ...

@staticmethod
def _adapt_if_datetime(row_value: Any, last_value: Any) -> Any:
# For datetime cursor, ensure the value is a timezone aware datetime.
# The object saved in state will always be a tz aware pendulum datetime so this ensures values are comparable
if (
isinstance(row_value, datetime)
and row_value.tzinfo is None
and isinstance(last_value, datetime)
and last_value.tzinfo is not None
):
row_value = pendulum.instance(row_value).in_tz("UTC")
return row_value

@property
def deduplication_disabled(self) -> bool:
"""Skip deduplication when length of the key is 0 or if lag is applied."""
Expand Down Expand Up @@ -185,19 +198,9 @@ def __call__(
return None, False, False
else:
return row, False, False

last_value = self.last_value
last_value_func = self.last_value_func

# For datetime cursor, ensure the value is a timezone aware datetime.
# The object saved in state will always be a tz aware pendulum datetime so this ensures values are comparable
if (
isinstance(row_value, datetime)
and row_value.tzinfo is None
and isinstance(last_value, datetime)
and last_value.tzinfo is not None
):
row_value = pendulum.instance(row_value).in_tz("UTC")
row_value = self._adapt_if_datetime(row_value, last_value)

# Check whether end_value has been reached
# Filter end value ranges exclusively, so in case of "max" function we remove values >= end_value
Expand Down Expand Up @@ -354,13 +357,8 @@ def __call__(

# TODO: Json path support. For now assume the cursor_path is a column name
cursor_path = self.cursor_path

# The new max/min value
try:
# NOTE: datetimes are always pendulum in UTC
row_value = from_arrow_scalar(self.compute(tbl[cursor_path]))
cursor_data_type = tbl.schema.field(cursor_path).type
row_value_scalar = to_arrow_scalar(row_value, cursor_data_type)
except KeyError as e:
raise IncrementalCursorPathMissing(
self.resource_name,
Expand All @@ -371,6 +369,12 @@ def __call__(
" must be a column name.",
) from e

# The new max/min value
row_value_scalar = self.compute(
tbl[cursor_path]
) # to_arrow_scalar(row_value, cursor_data_type)
row_value = self._adapt_if_datetime(from_arrow_scalar(row_value_scalar), self.last_value)

if tbl.schema.field(cursor_path).nullable:
tbl_without_null, tbl_with_null = self._process_null_at_cursor_path(tbl)
tbl = tbl_without_null
Expand Down
35 changes: 17 additions & 18 deletions tests/libs/pyarrow/test_pyarrow.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from datetime import timezone, datetime, timedelta # noqa: I251
from datetime import timezone, datetime, date, timedelta # noqa: I251
from copy import deepcopy
from typing import List, Any

Expand Down Expand Up @@ -109,25 +109,24 @@ def test_to_arrow_scalar() -> None:
assert dt_converted == datetime(2021, 1, 1, 13, 2, 32, tzinfo=timezone.utc)


def test_from_arrow_scalar() -> None:
def test_arrow_type_coercion() -> None:
# coerce UTC python dt into naive arrow dt
naive_dt = get_py_arrow_timestamp(6, tz=None)
sc_dt = to_arrow_scalar(datetime(2021, 1, 1, 5, 2, 32), naive_dt)

# this value is like UTC
py_dt = from_arrow_scalar(sc_dt)
assert isinstance(py_dt, pendulum.DateTime)
# and we convert to explicit UTC
assert py_dt == datetime(2021, 1, 1, 5, 2, 32, tzinfo=timezone.utc)

# converts to UTC
berlin_dt = get_py_arrow_timestamp(6, tz="Europe/Berlin")
sc_dt = to_arrow_scalar(
datetime(2021, 1, 1, 5, 2, 32, tzinfo=timezone(timedelta(hours=-8))), berlin_dt
)
sc_dt = to_arrow_scalar(datetime(2021, 1, 1, 5, 2, 32, tzinfo=timezone.utc), naive_dt)
# does not convert to pendulum
py_dt = from_arrow_scalar(sc_dt)
assert isinstance(py_dt, pendulum.DateTime)
assert py_dt.tzname() == "UTC"
assert py_dt == datetime(2021, 1, 1, 13, 2, 32, tzinfo=timezone.utc)
assert not isinstance(py_dt, pendulum.DateTime)
assert isinstance(py_dt, datetime)
assert py_dt.tzname() is None

# coerce datetime into date
py_date = pa.date32()
sc_date = to_arrow_scalar(datetime(2021, 1, 1, 5, 2, 32, tzinfo=timezone.utc), py_date)
assert from_arrow_scalar(sc_date) == date(2021, 1, 1)

py_date = pa.date64()
sc_date = to_arrow_scalar(datetime(2021, 1, 1, 5, 2, 32, tzinfo=timezone.utc), py_date)
assert from_arrow_scalar(sc_date) == date(2021, 1, 1)


def _row_at_index(table: pa.Table, index: int) -> List[Any]:
Expand Down
1 change: 0 additions & 1 deletion tests/libs/test_parquet_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
from dlt.common.schema.utils import new_column
from dlt.common.configuration.specs.config_section_context import ConfigSectionContext
from dlt.common.time import ensure_pendulum_datetime
from dlt.common.libs.pyarrow import from_arrow_scalar

from tests.common.data_writers.utils import get_writer
from tests.cases import (
Expand Down
Loading