Skip to content

Commit

Permalink
Python: Change UUID representation to bytes (#8267)
Browse files Browse the repository at this point in the history
* Change UUID Inner Representation to bytes, add integration tests for uuid and fixed

* address review comments

* optimize transform code

* supports conversion from bytes or fixed to uuid
  • Loading branch information
HonahX authored Aug 10, 2023
1 parent 01ab0d9 commit 2535c3a
Show file tree
Hide file tree
Showing 11 changed files with 247 additions and 31 deletions.
3 changes: 3 additions & 0 deletions python/dev/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ ENV SPARK_VERSION=3.4.1
ENV ICEBERG_SPARK_RUNTIME_VERSION=3.4_2.12
ENV ICEBERG_VERSION=1.3.1
ENV AWS_SDK_VERSION=2.20.18
ENV PYICEBERG_VERSION=0.4.0

RUN curl --retry 3 -s -C - https://dlcdn.apache.org/spark/spark-${SPARK_VERSION}/spark-${SPARK_VERSION}-bin-hadoop3.tgz -o spark-${SPARK_VERSION}-bin-hadoop3.tgz \
&& tar xzf spark-${SPARK_VERSION}-bin-hadoop3.tgz --directory /opt/spark --strip-components 1 \
Expand All @@ -65,6 +66,8 @@ RUN chmod u+x /opt/spark/sbin/* && \

RUN pip3 install -q ipython

RUN pip3 install "pyiceberg[s3fs]==${PYICEBERG_VERSION}"

COPY entrypoint.sh .
COPY provision.py .

Expand Down
33 changes: 33 additions & 0 deletions python/dev/provision.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@
from pyspark.sql import SparkSession
from pyspark.sql.functions import current_date, date_add, expr

from pyiceberg.catalog import load_catalog
from pyiceberg.schema import Schema
from pyiceberg.types import FixedType, NestedField, UUIDType

spark = SparkSession.builder.getOrCreate()

spark.sql(
Expand All @@ -26,6 +30,35 @@
"""
)

schema = Schema(
NestedField(field_id=1, name="uuid_col", field_type=UUIDType(), required=False),
NestedField(field_id=2, name="fixed_col", field_type=FixedType(25), required=False),
)

catalog = load_catalog(
"local",
**{
"type": "rest",
"uri": "http://rest:8181",
"s3.endpoint": "http://minio:9000",
"s3.access-key-id": "admin",
"s3.secret-access-key": "password",
},
)

catalog.create_table(identifier="default.test_uuid_and_fixed_unpartitioned", schema=schema)

spark.sql(
"""
INSERT INTO default.test_uuid_and_fixed_unpartitioned VALUES
('102cb62f-e6f8-4eb0-9973-d9b012ff0967', CAST('1234567890123456789012345' AS BINARY)),
('ec33e4b2-a834-4cc3-8c4a-a1d3bfc2f226', CAST('1231231231231231231231231' AS BINARY)),
('639cccce-c9d2-494a-a78c-278ab234f024', CAST('12345678901234567ass12345' AS BINARY)),
('c1b0d8e0-0b0e-4b1e-9b0a-0e0b0d0c0a0b', CAST('asdasasdads12312312312111' AS BINARY)),
('923dae77-83d6-47cd-b4b0-d383e64ee57e', CAST('qweeqwwqq1231231231231111' AS BINARY));
"""
)

spark.sql(
"""
CREATE OR REPLACE TABLE default.test_null_nan
Expand Down
14 changes: 5 additions & 9 deletions python/pyiceberg/conversions.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@
_LONG_STRUCT = Struct("<q")
_FLOAT_STRUCT = Struct("<f")
_DOUBLE_STRUCT = Struct("<d")
_UUID_STRUCT = Struct(">QQ")


def handle_none(func: Callable) -> Callable: # type: ignore
Expand Down Expand Up @@ -228,8 +227,10 @@ def _(_: StringType, value: str) -> bytes:


@to_bytes.register(UUIDType)
def _(_: UUIDType, value: uuid.UUID) -> bytes:
return _UUID_STRUCT.pack((value.int >> 64) & 0xFFFFFFFFFFFFFFFF, value.int & 0xFFFFFFFFFFFFFFFF)
def _(_: UUIDType, value: Union[uuid.UUID, bytes]) -> bytes:
if isinstance(value, bytes):
return value
return value.bytes


@to_bytes.register(BinaryType)
Expand Down Expand Up @@ -310,14 +311,9 @@ def _(_: StringType, b: bytes) -> str:
return bytes(b).decode("utf-8")


@from_bytes.register(UUIDType)
def _(_: UUIDType, b: bytes) -> uuid.UUID:
unpacked_bytes = _UUID_STRUCT.unpack(b)
return uuid.UUID(int=unpacked_bytes[0] << 64 | unpacked_bytes[1])


@from_bytes.register(BinaryType)
@from_bytes.register(FixedType)
@from_bytes.register(UUIDType)
def _(_: PrimitiveType, b: bytes) -> bytes:
return b

Expand Down
34 changes: 27 additions & 7 deletions python/pyiceberg/expressions/literals.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@
from pyiceberg.utils.decimal import decimal_to_unscaled, unscaled_to_decimal
from pyiceberg.utils.singleton import Singleton

UUID_BYTES_LENGTH = 16


class Literal(Generic[L], ABC):
"""Literal which has a value and can be converted between types."""
Expand Down Expand Up @@ -139,7 +141,7 @@ def literal(value: L) -> Literal[L]:
elif isinstance(value, str):
return StringLiteral(value)
elif isinstance(value, UUID):
return UUIDLiteral(value)
return UUIDLiteral(value.bytes) # type: ignore
elif isinstance(value, bytes):
return BinaryLiteral(value)
elif isinstance(value, Decimal):
Expand Down Expand Up @@ -571,8 +573,8 @@ def _(self, _: TimestamptzType) -> Literal[int]:
return TimestampLiteral(timestamptz_to_micros(self.value))

@to.register(UUIDType)
def _(self, _: UUIDType) -> Literal[UUID]:
return UUIDLiteral(UUID(self.value))
def _(self, _: UUIDType) -> Literal[bytes]:
return UUIDLiteral(UUID(self.value).bytes)

@to.register(DecimalType)
def _(self, type_var: DecimalType) -> Literal[Decimal]:
Expand All @@ -596,16 +598,16 @@ def __repr__(self) -> str:
return f"literal({repr(self.value)})"


class UUIDLiteral(Literal[UUID]):
def __init__(self, value: UUID) -> None:
super().__init__(value, UUID)
class UUIDLiteral(Literal[bytes]):
def __init__(self, value: bytes) -> None:
super().__init__(value, bytes)

@singledispatchmethod
def to(self, type_var: IcebergType) -> Literal: # type: ignore
raise TypeError(f"Cannot convert UUIDLiteral into {type_var}")

@to.register(UUIDType)
def _(self, _: UUIDType) -> Literal[UUID]:
def _(self, _: UUIDType) -> Literal[bytes]:
return self


Expand All @@ -630,6 +632,15 @@ def _(self, type_var: FixedType) -> Literal[bytes]:
def _(self, _: BinaryType) -> Literal[bytes]:
return BinaryLiteral(self.value)

@to.register(UUIDType)
def _(self, type_var: UUIDType) -> Literal[bytes]:
if len(self.value) == UUID_BYTES_LENGTH:
return UUIDLiteral(self.value)
else:
raise TypeError(
f"Could not convert {self.value!r} into a {type_var}, lengths differ {len(self.value)} <> {UUID_BYTES_LENGTH}"
)


class BinaryLiteral(Literal[bytes]):
def __init__(self, value: bytes) -> None:
Expand All @@ -651,3 +662,12 @@ def _(self, type_var: FixedType) -> Literal[bytes]:
raise TypeError(
f"Cannot convert BinaryLiteral into {type_var}, different length: {len(type_var)} <> {len(self.value)}"
)

@to.register(UUIDType)
def _(self, type_var: UUIDType) -> Literal[bytes]:
if len(self.value) == UUID_BYTES_LENGTH:
return UUIDLiteral(self.value)
else:
raise TypeError(
f"Cannot convert BinaryLiteral into {type_var}, different length: {UUID_BYTES_LENGTH} <> {len(self.value)}"
)
2 changes: 1 addition & 1 deletion python/pyiceberg/io/pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -451,7 +451,7 @@ def visit_binary(self, _: BinaryType) -> pa.DataType:
def _convert_scalar(value: Any, iceberg_type: IcebergType) -> pa.scalar:
if not isinstance(iceberg_type, PrimitiveType):
raise ValueError(f"Expected primitive type, got: {iceberg_type}")
return pa.scalar(value).cast(schema_to_pyarrow(iceberg_type))
return pa.scalar(value=value, type=schema_to_pyarrow(iceberg_type))


class _ConvertToArrowExpression(BoundBooleanExpressionVisitor[pc.Expression]):
Expand Down
11 changes: 4 additions & 7 deletions python/pyiceberg/transforms.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
)
from typing import Literal as LiteralType
from typing import Optional, TypeVar
from uuid import UUID

import mmh3
from pydantic import Field, PositiveInt, PrivateAttr
Expand Down Expand Up @@ -269,13 +270,9 @@ def hash_func(v: Any) -> int:
elif source_type == UUIDType:

def hash_func(v: Any) -> int:
return mmh3.hash(
struct.pack(
">QQ",
(v.int >> 64) & 0xFFFFFFFFFFFFFFFF,
v.int & 0xFFFFFFFFFFFFFFFF,
)
)
if isinstance(v, UUID):
return mmh3.hash(v.bytes)
return mmh3.hash(v)

else:
raise ValueError(f"Unknown type {source}")
Expand Down
42 changes: 40 additions & 2 deletions python/tests/expressions/test_literals.py
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,7 @@ def test_string_to_uuid_literal() -> None:
uuid_str = literal(str(expected))
uuid_lit = uuid_str.to(UUIDType())

assert expected == uuid_lit.value
assert expected.bytes == uuid_lit.value


def test_string_to_decimal_literal() -> None:
Expand Down Expand Up @@ -503,6 +503,22 @@ def test_binary_to_smaller_fixed_none() -> None:
assert "Cannot convert BinaryLiteral into fixed[2], different length: 2 <> 3" in str(e.value)


def test_binary_to_uuid() -> None:
test_uuid = uuid.uuid4()
lit = literal(test_uuid.bytes)
uuid_lit = lit.to(UUIDType())
assert uuid_lit is not None
assert lit.value == uuid_lit.value
assert uuid_lit.value == test_uuid.bytes


def test_incompatible_binary_to_uuid() -> None:
lit = literal(bytes([0x00, 0x01, 0x02]))
with pytest.raises(TypeError) as e:
_ = lit.to(UUIDType())
assert "Cannot convert BinaryLiteral into uuid, different length: 16 <> 3" in str(e.value)


def test_fixed_to_binary() -> None:
lit = literal(bytes([0x00, 0x01, 0x02])).to(FixedType(3))
binary_lit = lit.to(BinaryType())
Expand All @@ -517,6 +533,22 @@ def test_fixed_to_smaller_fixed_none() -> None:
assert "Could not convert b'\\x00\\x01\\x02' into a fixed[2]" in str(e.value)


def test_fixed_to_uuid() -> None:
test_uuid = uuid.uuid4()
lit = literal(test_uuid.bytes).to(FixedType(16))
uuid_lit = lit.to(UUIDType())
assert uuid_lit is not None
assert lit.value == uuid_lit.value
assert uuid_lit.value == test_uuid.bytes


def test_incompatible_fixed_to_uuid() -> None:
lit = literal(bytes([0x00, 0x01, 0x02])).to(FixedType(3))
with pytest.raises(TypeError) as e:
_ = lit.to(UUIDType())
assert "Cannot convert BinaryLiteral into uuid, different length: 16 <> 3" in str(e.value)


def test_above_max_float() -> None:
a = FloatAboveMax()
# singleton
Expand Down Expand Up @@ -843,6 +875,13 @@ def test_decimal_literal_dencrement() -> None:
assert dec.decrement().value.as_tuple() == Decimal("10.122").as_tuple()


def test_uuid_literal_initialization() -> None:
test_uuid = uuid.UUID("f79c3e09-677c-4bbd-a479-3f349cb785e7")
uuid_literal = literal(test_uuid)
assert isinstance(uuid_literal, Literal)
assert test_uuid.bytes == uuid_literal.value


# __ __ ___
# | \/ |_ _| _ \_ _
# | |\/| | || | _/ || |
Expand All @@ -853,7 +892,6 @@ def test_decimal_literal_dencrement() -> None:
assert_type(literal(True), Literal[bool])
assert_type(literal(123), Literal[int])
assert_type(literal(123.4), Literal[float])
assert_type(literal(uuid.UUID("f79c3e09-677c-4bbd-a479-3f349cb785e7")), Literal[uuid.UUID])
assert_type(literal(bytes([0x01, 0x02, 0x03])), Literal[bytes])
assert_type(literal(Decimal("19.25")), Literal[Decimal])
assert_type({literal(1), literal(2), literal(3)}, Set[Literal[int]])
24 changes: 20 additions & 4 deletions python/tests/test_conversions.py
Original file line number Diff line number Diff line change
Expand Up @@ -270,9 +270,9 @@ def test_partition_to_py_raise_on_incorrect_precision_or_scale(
(
UUIDType(),
b"\xf7\x9c>\tg|K\xbd\xa4y?4\x9c\xb7\x85\xe7",
uuid.UUID("f79c3e09-677c-4bbd-a479-3f349cb785e7"),
b"\xf7\x9c>\tg|K\xbd\xa4y?4\x9c\xb7\x85\xe7",
),
(UUIDType(), b"\xf7\x9c>\tg|K\xbd\xa4y?4\x9c\xb7\x85\xe7", uuid.UUID("f79c3e09-677c-4bbd-a479-3f349cb785e7")),
(UUIDType(), b"\xf7\x9c>\tg|K\xbd\xa4y?4\x9c\xb7\x85\xe7", b"\xf7\x9c>\tg|K\xbd\xa4y?4\x9c\xb7\x85\xe7"),
(FixedType(3), b"foo", b"foo"),
(BinaryType(), b"foo", b"foo"),
(DecimalType(5, 2), b"\x30\x39", Decimal("123.45")),
Expand Down Expand Up @@ -308,9 +308,9 @@ def test_from_bytes(primitive_type: PrimitiveType, b: bytes, result: Any) -> Non
(
UUIDType(),
b"\xf7\x9c>\tg|K\xbd\xa4y?4\x9c\xb7\x85\xe7",
uuid.UUID("f79c3e09-677c-4bbd-a479-3f349cb785e7"),
b"\xf7\x9c>\tg|K\xbd\xa4y?4\x9c\xb7\x85\xe7",
),
(UUIDType(), b"\xf7\x9c>\tg|K\xbd\xa4y?4\x9c\xb7\x85\xe7", uuid.UUID("f79c3e09-677c-4bbd-a479-3f349cb785e7")),
(UUIDType(), b"\xf7\x9c>\tg|K\xbd\xa4y?4\x9c\xb7\x85\xe7", b"\xf7\x9c>\tg|K\xbd\xa4y?4\x9c\xb7\x85\xe7"),
(FixedType(3), b"foo", b"foo"),
(BinaryType(), b"foo", b"foo"),
(DecimalType(5, 2), b"\x30\x39", Decimal("123.45")),
Expand Down Expand Up @@ -341,6 +341,22 @@ def test_round_trip_conversion(primitive_type: PrimitiveType, b: bytes, result:
assert bytes_from_value == b


@pytest.mark.parametrize(
"primitive_type, v, result",
[
(
UUIDType(),
uuid.UUID("f79c3e09-677c-4bbd-a479-3f349cb785e7"),
b"\xf7\x9c>\tg|K\xbd\xa4y?4\x9c\xb7\x85\xe7",
),
(UUIDType(), uuid.UUID("f79c3e09-677c-4bbd-a479-3f349cb785e7"), b"\xf7\x9c>\tg|K\xbd\xa4y?4\x9c\xb7\x85\xe7"),
],
)
def test_uuid_to_bytes(primitive_type: PrimitiveType, v: Any, result: bytes) -> None:
bytes_from_value = conversions.to_bytes(primitive_type, v)
assert bytes_from_value == result


@pytest.mark.parametrize(
"primitive_type, b, result",
[
Expand Down
37 changes: 37 additions & 0 deletions python/tests/test_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
# pylint:disable=redefined-outer-name

import math
import uuid
from urllib.parse import urlparse

import pyarrow.parquet as pq
Expand All @@ -27,9 +28,11 @@
from pyiceberg.exceptions import NoSuchTableError
from pyiceberg.expressions import (
And,
EqualTo,
GreaterThanOrEqual,
IsNaN,
LessThan,
NotEqualTo,
NotNaN,
)
from pyiceberg.io.pyarrow import pyarrow_to_schema
Expand Down Expand Up @@ -315,3 +318,37 @@ def test_partitioned_tables(catalog: Catalog) -> None:
table = catalog.load_table(f"default.{table_name}")
arrow_table = table.scan(selected_fields=("number",), row_filter=predicate).to_arrow()
assert set(arrow_table["number"].to_pylist()) == {5, 6, 7, 8, 9, 10, 11, 12}, f"Table {table_name}, predicate {predicate}"


@pytest.mark.integration
def test_unpartitioned_uuid_table(catalog: Catalog) -> None:
unpartitioned_uuid = catalog.load_table("default.test_uuid_and_fixed_unpartitioned")
arrow_table_eq = unpartitioned_uuid.scan(row_filter="uuid_col == '102cb62f-e6f8-4eb0-9973-d9b012ff0967'").to_arrow()
assert arrow_table_eq["uuid_col"].to_pylist() == [uuid.UUID("102cb62f-e6f8-4eb0-9973-d9b012ff0967").bytes]

arrow_table_neq = unpartitioned_uuid.scan(
row_filter="uuid_col != '102cb62f-e6f8-4eb0-9973-d9b012ff0967' and uuid_col != '639cccce-c9d2-494a-a78c-278ab234f024'"
).to_arrow()
assert arrow_table_neq["uuid_col"].to_pylist() == [
uuid.UUID("ec33e4b2-a834-4cc3-8c4a-a1d3bfc2f226").bytes,
uuid.UUID("c1b0d8e0-0b0e-4b1e-9b0a-0e0b0d0c0a0b").bytes,
uuid.UUID("923dae77-83d6-47cd-b4b0-d383e64ee57e").bytes,
]


@pytest.mark.integration
def test_unpartitioned_fixed_table(catalog: Catalog) -> None:
fixed_table = catalog.load_table("default.test_uuid_and_fixed_unpartitioned")
arrow_table_eq = fixed_table.scan(row_filter=EqualTo("fixed_col", b"1234567890123456789012345")).to_arrow()
assert arrow_table_eq["fixed_col"].to_pylist() == [b"1234567890123456789012345"]

arrow_table_neq = fixed_table.scan(
row_filter=And(
NotEqualTo("fixed_col", b"1234567890123456789012345"), NotEqualTo("uuid_col", "c1b0d8e0-0b0e-4b1e-9b0a-0e0b0d0c0a0b")
)
).to_arrow()
assert arrow_table_neq["fixed_col"].to_pylist() == [
b"1231231231231231231231231",
b"12345678901234567ass12345",
b"qweeqwwqq1231231231231111",
]
Loading

0 comments on commit 2535c3a

Please sign in to comment.