-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathschema_conversion.rs
71 lines (69 loc) · 2.47 KB
/
schema_conversion.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
use {
arrow::datatypes::{DataType, Field},
serde_avro_fast::schema::{LogicalType, RegularType, SchemaKey, SchemaMut, SchemaNode},
};
/// Convert from avro schema to arrow schema
pub(crate) fn schema_node_to_fields(schema: &SchemaMut, key: SchemaKey) -> Vec<Field> {
match schema[key] {
SchemaNode::RegularType(ref rt) => match rt {
RegularType::Record(record) => record
.fields
.iter()
.map(|record| {
Field::new(
record.name.as_str(),
schema_node_to_data_type(schema, record.type_),
false,
)
})
.collect(),
_ => todo!(),
},
SchemaNode::LogicalType { inner, .. } => schema_node_to_fields(schema, inner),
}
}
fn schema_node_to_data_type(schema: &SchemaMut, key: SchemaKey) -> DataType {
match schema[key] {
SchemaNode::RegularType(ref rt) => match rt {
serde_avro_fast::schema::RegularType::Null => DataType::Null,
serde_avro_fast::schema::RegularType::Boolean => DataType::Boolean,
serde_avro_fast::schema::RegularType::Int => DataType::Int32,
serde_avro_fast::schema::RegularType::Long => DataType::Int64,
serde_avro_fast::schema::RegularType::Float => DataType::Float32,
serde_avro_fast::schema::RegularType::Double => DataType::Float64,
serde_avro_fast::schema::RegularType::Bytes => DataType::Binary,
serde_avro_fast::schema::RegularType::String => DataType::Utf8,
serde_avro_fast::schema::RegularType::Array(_) => todo!(),
serde_avro_fast::schema::RegularType::Map(_) => todo!(),
serde_avro_fast::schema::RegularType::Union(_) => todo!(),
serde_avro_fast::schema::RegularType::Record(_) => todo!(),
serde_avro_fast::schema::RegularType::Enum(_) => todo!(),
serde_avro_fast::schema::RegularType::Fixed(fixed) => DataType::FixedSizeBinary(
fixed
.size
.try_into()
.expect("fixed size too large for arrow FixedSizeBinary"),
),
},
SchemaNode::LogicalType {
inner,
ref logical_type,
} => {
let inner_rt = match &schema[inner] {
SchemaNode::RegularType(rt) => rt,
SchemaNode::LogicalType { .. } => panic!("Nested logical types are not allowed"),
};
match (logical_type, inner_rt) {
(LogicalType::TimestampMillis, RegularType::Long) => DataType::Timestamp(
arrow::datatypes::TimeUnit::Millisecond,
Some("+00:00".into()),
),
(LogicalType::TimestampMicros, RegularType::Long) => DataType::Timestamp(
arrow::datatypes::TimeUnit::Microsecond,
Some("+00:00".into()),
),
_ => schema_node_to_data_type(schema, inner),
}
}
}
}