Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix issue with unsupported list values and tuple #103

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 16 additions & 12 deletions src/pydantic_avro/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,27 +84,31 @@ def get_type(value: dict) -> dict:
}

classes_seen.add(class_name)
elif t == "array" and "prefixItems" in value:
# Handle tuple since it is considered an array
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The best we can do for a tuple with the AVRO schema is figure out all of the unique types and say the given field can be an array that is one of those unique types. We figure out the minimum set of possible types based on the prefixItems and return that as the type for the array.

prefix_items = value.get("prefixItems")
possible_types = []
for prefix_item in prefix_items:
item_type = get_type(prefix_item)["type"]
if isinstance(item_type, list):
possible_types.extend([x for x in item_type if x not in possible_types])
elif item_type not in possible_types:
possible_types.append(item_type)
avro_type_dict["type"] = {"type": "array", "items": possible_types}
elif t == "array":
items = value.get("items")
tn = get_type(items)
# If items in array are a object:

# If items in array are an object:
if "$ref" in items:
tn = tn["type"]
# If items in array are a logicalType

# Necessary to handle things like logical types, list of lists, and list with union
if (
isinstance(tn, dict)
and isinstance(tn.get("type", {}), dict)
and tn.get("type", {}).get("logicalType") is not None
and isinstance(tn.get("type", None), (dict, list))
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The change to pull the type out of the dict representing the types of the elements seemed similar to the logic we are using in the case that handles just a raw dictionary:

if isinstance(value_type, dict) and len(value_type) == 1:
    value_type = value_type.get("type")

The dict case is the equivalent for what we had with the previous if statements handling logicalType and a list of lists. However, it also handles list[dict[str,str]]. In the case that it is a list, we end up with just a list of the types which is valid to set directly as the items type.

):
tn = tn["type"]
# If items in array are an array, the structure must be corrected
if (
isinstance(tn, dict)
and isinstance(tn.get("type", {}), dict)
and tn.get("type", {}).get("type") == "array"
):
items = tn["type"]["items"]
tn = {"type": "array", "items": items}
avro_type_dict["type"] = {"type": "array", "items": tn}
elif t == "string" and f == "date-time":
avro_type_dict["type"] = {
Expand Down
178 changes: 175 additions & 3 deletions tests/test_to_avro.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import uuid
from datetime import date, datetime, time, timezone
from pprint import pprint
from typing import Dict, List, Optional, Type, Union
from typing import Dict, List, Optional, Type, Union, Tuple
from uuid import UUID

from avro import schema as avro_schema
Expand Down Expand Up @@ -65,13 +65,30 @@ class ListofLists(AvroBase):
c3: List[List[int]]


AllBasicTypes = Union[str, int, float, bool, None]
LeafForNestedType = Union[List[AllBasicTypes], Dict[str, AllBasicTypes]]
RootNestedType = Union[Dict[str, Union[AllBasicTypes, LeafForNestedType]], List[Union[AllBasicTypes, LeafForNestedType]]]


class ComplexNestedTestModel(AvroBase):
c1: RootNestedType


class TupleTestModel(AvroBase):
c1: Tuple[int]
c2: Tuple[float, float]
c3: Tuple[Status, Status]
c4: Tuple[Union[Dict[str, str], Status]]


class ComplexTestModel(AvroBase):
c1: List[str]
c2: NestedModel
c3: List[NestedModel]
c4: List[datetime]
c5: Dict[str, NestedModel]
c6: Union[None, str, int, NestedModel] = None
c7: List[Tuple[Union[int, float], Union[int, float]]]


class ReusedObject(AvroBase):
Expand Down Expand Up @@ -249,12 +266,166 @@ def test_complex_avro():
{"name": "c4", "type": {"items": {"logicalType": "timestamp-micros", "type": "long"}, "type": "array"}},
{"name": "c5", "type": {"type": "map", "values": "NestedModel"}},
{"name": "c6", "type": ["null", "string", "long", "NestedModel"], "default": None},
{
'name': 'c7',
'type': {
'items': {
'items': ['long', 'double'],
'type': 'array'
},
'type': 'array'
}
},
],
}

# Reading schema with avro library to be sure format is correct
schema = avro_schema.parse(json.dumps(result))
assert len(schema.fields) == 6
assert len(schema.fields) == 7


def test_complex_nested_avro():
result = ComplexNestedTestModel.avro_schema()
pprint(result)
assert result == {
'type': 'record',
'name': 'ComplexNestedTestModel',
'namespace': 'ComplexNestedTestModel',
'fields': [
{
'name': 'c1',
'type': [
{
'type': 'map',
'values': [
'string',
'long',
'double',
'boolean',
{
'items': [
'string',
'long',
'double',
'boolean',
'null'
],
'type': 'array'
},
{
'type': 'map',
'values': [
'string',
'long',
'double',
'boolean',
'null'
]
},
'null'
]
},
{
'items': [
'string',
'long',
'double',
'boolean',
{
'items': [
'string',
'long',
'double',
'boolean',
'null'
],
'type': 'array'
},
{
'type': 'map',
'values': [
'string',
'long',
'double',
'boolean',
'null'
]
},
'null'
],
'type': 'array'
}
]
},
]
}

# Reading schema with avro library to be sure format is correct
schema = avro_schema.parse(json.dumps(result))
assert len(schema.fields) == 1

# Also test parsing with fast avro
parse_schema(result)


def test_tuple_avro():
result = TupleTestModel.avro_schema()
pprint(result)
assert result == {
"fields": [
{
"name": "c1",
"type": {
"items": ["long"],
"type": "array"
}
},
{
"name": "c2",
"type": {
"items": ["double"],
"type": "array"
}
},
{
"name": "c3",
"type": {
"items": [
{
"name": "Status",
"symbols": ["passed", "failed"],
"type": "enum"
},
"Status"
],
"type": "array"
}
},
{
"name": "c4",
"type": {
"items": [
{
"type": "map",
"values": "string"
},
"Status"
],
"type": "array"
}
}
],
"name": "TupleTestModel",
"namespace": "TupleTestModel",
"type": "record"
}

# Reading schema with avro library to be sure format is correct
schema = avro_schema.parse(json.dumps(result))
assert len(schema.fields) == 4

# Also test parsing with fast avro
parse_schema(result)


def test_avro_parse_list_of_lists():
Expand Down Expand Up @@ -285,6 +456,7 @@ def test_avro_write_complex():
c3=[NestedModel(c11=Nested2Model(c111="test"))],
c4=[1, 2, 3, 4],
c5={"key": NestedModel(c11=Nested2Model(c111="test"))},
c7=[(1.0, 1)]
)

parsed_schema = parse_schema(ComplexTestModel.avro_schema())
Expand Down Expand Up @@ -428,7 +600,7 @@ def test_optional_array():


class IntModel(AvroBase):
c1: int = Field(..., ge=-(2**31), le=(2**31 - 1))
c1: int = Field(..., ge=-(2 ** 31), le=(2 ** 31 - 1))


def test_int():
Expand Down