Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Python: Change UUID representation to bytes and add integration tests #8267

Merged
merged 4 commits into from
Aug 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions python/dev/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ ENV SPARK_VERSION=3.4.1
ENV ICEBERG_SPARK_RUNTIME_VERSION=3.4_2.12
ENV ICEBERG_VERSION=1.3.1
ENV AWS_SDK_VERSION=2.20.18
ENV PYICEBERG_VERSION=0.4.0

RUN curl --retry 3 -s -C - https://dlcdn.apache.org/spark/spark-${SPARK_VERSION}/spark-${SPARK_VERSION}-bin-hadoop3.tgz -o spark-${SPARK_VERSION}-bin-hadoop3.tgz \
&& tar xzf spark-${SPARK_VERSION}-bin-hadoop3.tgz --directory /opt/spark --strip-components 1 \
Expand All @@ -65,6 +66,8 @@ RUN chmod u+x /opt/spark/sbin/* && \

RUN pip3 install -q ipython

RUN pip3 install "pyiceberg[s3fs]==${PYICEBERG_VERSION}"

COPY entrypoint.sh .
COPY provision.py .

Expand Down
33 changes: 33 additions & 0 deletions python/dev/provision.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@
from pyspark.sql import SparkSession
from pyspark.sql.functions import current_date, date_add, expr

from pyiceberg.catalog import load_catalog
from pyiceberg.schema import Schema
from pyiceberg.types import FixedType, NestedField, UUIDType

spark = SparkSession.builder.getOrCreate()

spark.sql(
Expand All @@ -26,6 +30,35 @@
"""
)

schema = Schema(
NestedField(field_id=1, name="uuid_col", field_type=UUIDType(), required=False),
NestedField(field_id=2, name="fixed_col", field_type=FixedType(25), required=False),
)

catalog = load_catalog(
"local",
**{
"type": "rest",
"uri": "http://rest:8181",
"s3.endpoint": "http://minio:9000",
"s3.access-key-id": "admin",
"s3.secret-access-key": "password",
},
)

catalog.create_table(identifier="default.test_uuid_and_fixed_unpartitioned", schema=schema)

spark.sql(
"""
INSERT INTO default.test_uuid_and_fixed_unpartitioned VALUES
('102cb62f-e6f8-4eb0-9973-d9b012ff0967', CAST('1234567890123456789012345' AS BINARY)),
('ec33e4b2-a834-4cc3-8c4a-a1d3bfc2f226', CAST('1231231231231231231231231' AS BINARY)),
('639cccce-c9d2-494a-a78c-278ab234f024', CAST('12345678901234567ass12345' AS BINARY)),
('c1b0d8e0-0b0e-4b1e-9b0a-0e0b0d0c0a0b', CAST('asdasasdads12312312312111' AS BINARY)),
('923dae77-83d6-47cd-b4b0-d383e64ee57e', CAST('qweeqwwqq1231231231231111' AS BINARY));
"""
)

spark.sql(
"""
CREATE OR REPLACE TABLE default.test_null_nan
Expand Down
14 changes: 5 additions & 9 deletions python/pyiceberg/conversions.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@
_LONG_STRUCT = Struct("<q")
_FLOAT_STRUCT = Struct("<f")
_DOUBLE_STRUCT = Struct("<d")
_UUID_STRUCT = Struct(">QQ")


def handle_none(func: Callable) -> Callable: # type: ignore
Expand Down Expand Up @@ -228,8 +227,10 @@ def _(_: StringType, value: str) -> bytes:


@to_bytes.register(UUIDType)
def _(_: UUIDType, value: uuid.UUID) -> bytes:
return _UUID_STRUCT.pack((value.int >> 64) & 0xFFFFFFFFFFFFFFFF, value.int & 0xFFFFFFFFFFFFFFFF)
def _(_: UUIDType, value: Union[uuid.UUID, bytes]) -> bytes:
if isinstance(value, bytes):
return value
return value.bytes


@to_bytes.register(BinaryType)
Expand Down Expand Up @@ -310,14 +311,9 @@ def _(_: StringType, b: bytes) -> str:
return bytes(b).decode("utf-8")


@from_bytes.register(UUIDType)
def _(_: UUIDType, b: bytes) -> uuid.UUID:
unpacked_bytes = _UUID_STRUCT.unpack(b)
return uuid.UUID(int=unpacked_bytes[0] << 64 | unpacked_bytes[1])


@from_bytes.register(BinaryType)
@from_bytes.register(FixedType)
@from_bytes.register(UUIDType)
def _(_: PrimitiveType, b: bytes) -> bytes:
return b

Expand Down
34 changes: 27 additions & 7 deletions python/pyiceberg/expressions/literals.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@
from pyiceberg.utils.decimal import decimal_to_unscaled, unscaled_to_decimal
from pyiceberg.utils.singleton import Singleton

UUID_BYTES_LENGTH = 16


class Literal(Generic[L], ABC):
"""Literal which has a value and can be converted between types."""
Expand Down Expand Up @@ -139,7 +141,7 @@ def literal(value: L) -> Literal[L]:
elif isinstance(value, str):
return StringLiteral(value)
elif isinstance(value, UUID):
return UUIDLiteral(value)
return UUIDLiteral(value.bytes) # type: ignore
elif isinstance(value, bytes):
return BinaryLiteral(value)
elif isinstance(value, Decimal):
Expand Down Expand Up @@ -571,8 +573,8 @@ def _(self, _: TimestamptzType) -> Literal[int]:
return TimestampLiteral(timestamptz_to_micros(self.value))

@to.register(UUIDType)
def _(self, _: UUIDType) -> Literal[UUID]:
return UUIDLiteral(UUID(self.value))
def _(self, _: UUIDType) -> Literal[bytes]:
return UUIDLiteral(UUID(self.value).bytes)

@to.register(DecimalType)
def _(self, type_var: DecimalType) -> Literal[Decimal]:
Expand All @@ -596,16 +598,16 @@ def __repr__(self) -> str:
return f"literal({repr(self.value)})"


class UUIDLiteral(Literal[UUID]):
def __init__(self, value: UUID) -> None:
super().__init__(value, UUID)
class UUIDLiteral(Literal[bytes]):
def __init__(self, value: bytes) -> None:
super().__init__(value, bytes)

@singledispatchmethod
def to(self, type_var: IcebergType) -> Literal: # type: ignore
raise TypeError(f"Cannot convert UUIDLiteral into {type_var}")

@to.register(UUIDType)
def _(self, _: UUIDType) -> Literal[UUID]:
def _(self, _: UUIDType) -> Literal[bytes]:
return self


Expand All @@ -630,6 +632,15 @@ def _(self, type_var: FixedType) -> Literal[bytes]:
def _(self, _: BinaryType) -> Literal[bytes]:
return BinaryLiteral(self.value)

@to.register(UUIDType)
def _(self, type_var: UUIDType) -> Literal[bytes]:
if len(self.value) == UUID_BYTES_LENGTH:
return UUIDLiteral(self.value)
else:
raise TypeError(
f"Could not convert {self.value!r} into a {type_var}, lengths differ {len(self.value)} <> {UUID_BYTES_LENGTH}"
)


class BinaryLiteral(Literal[bytes]):
def __init__(self, value: bytes) -> None:
Expand All @@ -651,3 +662,12 @@ def _(self, type_var: FixedType) -> Literal[bytes]:
raise TypeError(
f"Cannot convert BinaryLiteral into {type_var}, different length: {len(type_var)} <> {len(self.value)}"
)

@to.register(UUIDType)
def _(self, type_var: UUIDType) -> Literal[bytes]:
if len(self.value) == UUID_BYTES_LENGTH:
return UUIDLiteral(self.value)
else:
raise TypeError(
f"Cannot convert BinaryLiteral into {type_var}, different length: {UUID_BYTES_LENGTH} <> {len(self.value)}"
)
2 changes: 1 addition & 1 deletion python/pyiceberg/io/pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -451,7 +451,7 @@ def visit_binary(self, _: BinaryType) -> pa.DataType:
def _convert_scalar(value: Any, iceberg_type: IcebergType) -> pa.scalar:
if not isinstance(iceberg_type, PrimitiveType):
raise ValueError(f"Expected primitive type, got: {iceberg_type}")
return pa.scalar(value).cast(schema_to_pyarrow(iceberg_type))
return pa.scalar(value=value, type=schema_to_pyarrow(iceberg_type))


class _ConvertToArrowExpression(BoundBooleanExpressionVisitor[pc.Expression]):
Expand Down
11 changes: 4 additions & 7 deletions python/pyiceberg/transforms.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
)
from typing import Literal as LiteralType
from typing import Optional, TypeVar
from uuid import UUID

import mmh3
from pydantic import Field, PositiveInt, PrivateAttr
Expand Down Expand Up @@ -269,13 +270,9 @@ def hash_func(v: Any) -> int:
elif source_type == UUIDType:

def hash_func(v: Any) -> int:
return mmh3.hash(
struct.pack(
">QQ",
(v.int >> 64) & 0xFFFFFFFFFFFFFFFF,
v.int & 0xFFFFFFFFFFFFFFFF,
)
)
if isinstance(v, UUID):
return mmh3.hash(v.bytes)
return mmh3.hash(v)

else:
raise ValueError(f"Unknown type {source}")
Expand Down
42 changes: 40 additions & 2 deletions python/tests/expressions/test_literals.py
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,7 @@ def test_string_to_uuid_literal() -> None:
uuid_str = literal(str(expected))
uuid_lit = uuid_str.to(UUIDType())

assert expected == uuid_lit.value
assert expected.bytes == uuid_lit.value


def test_string_to_decimal_literal() -> None:
Expand Down Expand Up @@ -503,6 +503,22 @@ def test_binary_to_smaller_fixed_none() -> None:
assert "Cannot convert BinaryLiteral into fixed[2], different length: 2 <> 3" in str(e.value)


def test_binary_to_uuid() -> None:
test_uuid = uuid.uuid4()
lit = literal(test_uuid.bytes)
uuid_lit = lit.to(UUIDType())
assert uuid_lit is not None
assert lit.value == uuid_lit.value
assert uuid_lit.value == test_uuid.bytes


def test_incompatible_binary_to_uuid() -> None:
lit = literal(bytes([0x00, 0x01, 0x02]))
with pytest.raises(TypeError) as e:
_ = lit.to(UUIDType())
assert "Cannot convert BinaryLiteral into uuid, different length: 16 <> 3" in str(e.value)


def test_fixed_to_binary() -> None:
lit = literal(bytes([0x00, 0x01, 0x02])).to(FixedType(3))
binary_lit = lit.to(BinaryType())
Expand All @@ -517,6 +533,22 @@ def test_fixed_to_smaller_fixed_none() -> None:
assert "Could not convert b'\\x00\\x01\\x02' into a fixed[2]" in str(e.value)


def test_fixed_to_uuid() -> None:
test_uuid = uuid.uuid4()
lit = literal(test_uuid.bytes).to(FixedType(16))
uuid_lit = lit.to(UUIDType())
assert uuid_lit is not None
assert lit.value == uuid_lit.value
assert uuid_lit.value == test_uuid.bytes


def test_incompatible_fixed_to_uuid() -> None:
lit = literal(bytes([0x00, 0x01, 0x02])).to(FixedType(3))
with pytest.raises(TypeError) as e:
_ = lit.to(UUIDType())
assert "Cannot convert BinaryLiteral into uuid, different length: 16 <> 3" in str(e.value)


def test_above_max_float() -> None:
a = FloatAboveMax()
# singleton
Expand Down Expand Up @@ -843,6 +875,13 @@ def test_decimal_literal_dencrement() -> None:
assert dec.decrement().value.as_tuple() == Decimal("10.122").as_tuple()


def test_uuid_literal_initialization() -> None:
test_uuid = uuid.UUID("f79c3e09-677c-4bbd-a479-3f349cb785e7")
uuid_literal = literal(test_uuid)
assert isinstance(uuid_literal, Literal)
assert test_uuid.bytes == uuid_literal.value


# __ __ ___
# | \/ |_ _| _ \_ _
# | |\/| | || | _/ || |
Expand All @@ -853,7 +892,6 @@ def test_decimal_literal_dencrement() -> None:
assert_type(literal(True), Literal[bool])
assert_type(literal(123), Literal[int])
assert_type(literal(123.4), Literal[float])
assert_type(literal(uuid.UUID("f79c3e09-677c-4bbd-a479-3f349cb785e7")), Literal[uuid.UUID])
assert_type(literal(bytes([0x01, 0x02, 0x03])), Literal[bytes])
assert_type(literal(Decimal("19.25")), Literal[Decimal])
assert_type({literal(1), literal(2), literal(3)}, Set[Literal[int]])
24 changes: 20 additions & 4 deletions python/tests/test_conversions.py
Original file line number Diff line number Diff line change
Expand Up @@ -270,9 +270,9 @@ def test_partition_to_py_raise_on_incorrect_precision_or_scale(
(
UUIDType(),
b"\xf7\x9c>\tg|K\xbd\xa4y?4\x9c\xb7\x85\xe7",
uuid.UUID("f79c3e09-677c-4bbd-a479-3f349cb785e7"),
b"\xf7\x9c>\tg|K\xbd\xa4y?4\x9c\xb7\x85\xe7",
),
(UUIDType(), b"\xf7\x9c>\tg|K\xbd\xa4y?4\x9c\xb7\x85\xe7", uuid.UUID("f79c3e09-677c-4bbd-a479-3f349cb785e7")),
(UUIDType(), b"\xf7\x9c>\tg|K\xbd\xa4y?4\x9c\xb7\x85\xe7", b"\xf7\x9c>\tg|K\xbd\xa4y?4\x9c\xb7\x85\xe7"),
(FixedType(3), b"foo", b"foo"),
(BinaryType(), b"foo", b"foo"),
(DecimalType(5, 2), b"\x30\x39", Decimal("123.45")),
Expand Down Expand Up @@ -308,9 +308,9 @@ def test_from_bytes(primitive_type: PrimitiveType, b: bytes, result: Any) -> Non
(
UUIDType(),
b"\xf7\x9c>\tg|K\xbd\xa4y?4\x9c\xb7\x85\xe7",
uuid.UUID("f79c3e09-677c-4bbd-a479-3f349cb785e7"),
b"\xf7\x9c>\tg|K\xbd\xa4y?4\x9c\xb7\x85\xe7",
),
(UUIDType(), b"\xf7\x9c>\tg|K\xbd\xa4y?4\x9c\xb7\x85\xe7", uuid.UUID("f79c3e09-677c-4bbd-a479-3f349cb785e7")),
(UUIDType(), b"\xf7\x9c>\tg|K\xbd\xa4y?4\x9c\xb7\x85\xe7", b"\xf7\x9c>\tg|K\xbd\xa4y?4\x9c\xb7\x85\xe7"),
(FixedType(3), b"foo", b"foo"),
(BinaryType(), b"foo", b"foo"),
(DecimalType(5, 2), b"\x30\x39", Decimal("123.45")),
Expand Down Expand Up @@ -341,6 +341,22 @@ def test_round_trip_conversion(primitive_type: PrimitiveType, b: bytes, result:
assert bytes_from_value == b


@pytest.mark.parametrize(
"primitive_type, v, result",
[
(
UUIDType(),
uuid.UUID("f79c3e09-677c-4bbd-a479-3f349cb785e7"),
b"\xf7\x9c>\tg|K\xbd\xa4y?4\x9c\xb7\x85\xe7",
),
(UUIDType(), uuid.UUID("f79c3e09-677c-4bbd-a479-3f349cb785e7"), b"\xf7\x9c>\tg|K\xbd\xa4y?4\x9c\xb7\x85\xe7"),
],
)
def test_uuid_to_bytes(primitive_type: PrimitiveType, v: Any, result: bytes) -> None:
bytes_from_value = conversions.to_bytes(primitive_type, v)
assert bytes_from_value == result


@pytest.mark.parametrize(
"primitive_type, b, result",
[
Expand Down
37 changes: 37 additions & 0 deletions python/tests/test_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
# pylint:disable=redefined-outer-name

import math
import uuid
from urllib.parse import urlparse

import pyarrow.parquet as pq
Expand All @@ -27,9 +28,11 @@
from pyiceberg.exceptions import NoSuchTableError
from pyiceberg.expressions import (
And,
EqualTo,
GreaterThanOrEqual,
IsNaN,
LessThan,
NotEqualTo,
NotNaN,
)
from pyiceberg.io.pyarrow import pyarrow_to_schema
Expand Down Expand Up @@ -315,3 +318,37 @@ def test_partitioned_tables(catalog: Catalog) -> None:
table = catalog.load_table(f"default.{table_name}")
arrow_table = table.scan(selected_fields=("number",), row_filter=predicate).to_arrow()
assert set(arrow_table["number"].to_pylist()) == {5, 6, 7, 8, 9, 10, 11, 12}, f"Table {table_name}, predicate {predicate}"


@pytest.mark.integration
def test_unpartitioned_uuid_table(catalog: Catalog) -> None:
unpartitioned_uuid = catalog.load_table("default.test_uuid_and_fixed_unpartitioned")
arrow_table_eq = unpartitioned_uuid.scan(row_filter="uuid_col == '102cb62f-e6f8-4eb0-9973-d9b012ff0967'").to_arrow()
assert arrow_table_eq["uuid_col"].to_pylist() == [uuid.UUID("102cb62f-e6f8-4eb0-9973-d9b012ff0967").bytes]

arrow_table_neq = unpartitioned_uuid.scan(
row_filter="uuid_col != '102cb62f-e6f8-4eb0-9973-d9b012ff0967' and uuid_col != '639cccce-c9d2-494a-a78c-278ab234f024'"
).to_arrow()
assert arrow_table_neq["uuid_col"].to_pylist() == [
uuid.UUID("ec33e4b2-a834-4cc3-8c4a-a1d3bfc2f226").bytes,
uuid.UUID("c1b0d8e0-0b0e-4b1e-9b0a-0e0b0d0c0a0b").bytes,
uuid.UUID("923dae77-83d6-47cd-b4b0-d383e64ee57e").bytes,
]


@pytest.mark.integration
def test_unpartitioned_fixed_table(catalog: Catalog) -> None:
fixed_table = catalog.load_table("default.test_uuid_and_fixed_unpartitioned")
arrow_table_eq = fixed_table.scan(row_filter=EqualTo("fixed_col", b"1234567890123456789012345")).to_arrow()
assert arrow_table_eq["fixed_col"].to_pylist() == [b"1234567890123456789012345"]

arrow_table_neq = fixed_table.scan(
row_filter=And(
NotEqualTo("fixed_col", b"1234567890123456789012345"), NotEqualTo("uuid_col", "c1b0d8e0-0b0e-4b1e-9b0a-0e0b0d0c0a0b")
)
).to_arrow()
assert arrow_table_neq["fixed_col"].to_pylist() == [
b"1231231231231231231231231",
b"12345678901234567ass12345",
b"qweeqwwqq1231231231231111",
]
Loading