From 455dccc5074f3999789e75d9b33710a708d722d9 Mon Sep 17 00:00:00 2001 From: Bertil Chapuis Date: Mon, 27 May 2024 20:51:06 +0200 Subject: [PATCH] Implement the GeoParquetGroup interface --- .../baremaps/geoparquet/GeoParquetReader.java | 18 +- .../baremaps/geoparquet/data/FileInfo.java | 70 --- .../geoparquet/data/GeoParquetGroup.java | 156 +++-- .../data/GeoParquetGroupFactory.java | 43 +- .../geoparquet/data/GeoParquetGroupImpl.java | 550 ++++++++++++++++-- .../data/GeoParquetGroupRecordConverter.java | 2 +- .../geoparquet/data/NanoTimeValue.java | 79 --- .../hadoop/GeoParquetGroupReadSupport.java | 6 +- .../hadoop/GeoParquetInputFormat.java | 4 +- .../geoparquet/GeoParquetReaderTest.java | 4 +- 10 files changed, 667 insertions(+), 265 deletions(-) delete mode 100644 baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/FileInfo.java delete mode 100644 baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/NanoTimeValue.java diff --git a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetReader.java b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetReader.java index 4e8186af0..f72759742 100644 --- a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetReader.java +++ b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetReader.java @@ -25,7 +25,7 @@ import java.util.function.Consumer; import java.util.stream.Stream; import java.util.stream.StreamSupport; -import org.apache.baremaps.geoparquet.data.GeoParquetGroupImpl; +import org.apache.baremaps.geoparquet.data.GeoParquetGroup; import org.apache.baremaps.geoparquet.hadoop.GeoParquetGroupReadSupport; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; @@ -52,7 +52,7 @@ public GeoParquetReader(URI uri, Configuration configuration) { this.configuration = configuration; } - public Stream readParallel() throws IOException, URISyntaxException { + public Stream readParallel() throws IOException, URISyntaxException { Path globPath = new Path(uri.getPath()); URI rootUri = getRootUri(uri); FileSystem fileSystem = FileSystem.get(rootUri, configuration); @@ -62,24 +62,24 @@ public Stream readParallel() throws IOException, URISyntaxE true); } - public Stream read() throws IOException, URISyntaxException { + public Stream read() throws IOException, URISyntaxException { return readParallel().sequential(); } - public class GeoParquetGroupSpliterator implements Spliterator { + public class GeoParquetGroupSpliterator implements Spliterator { private final Queue files; private FileStatus file; - private ParquetReader reader; + private ParquetReader reader; public GeoParquetGroupSpliterator(List files) { this.files = new ArrayBlockingQueue<>(files.size(), false, files); } @Override - public boolean tryAdvance(Consumer action) { + public boolean tryAdvance(Consumer action) { try { // Poll the next file if (file == null) { @@ -97,7 +97,7 @@ public boolean tryAdvance(Consumer action) { } // Read the next group - GeoParquetGroupImpl group = reader.read(); + GeoParquetGroup group = reader.read(); // If the group is null, close the resources and set the variables to null if (group == null) { @@ -127,7 +127,7 @@ public boolean tryAdvance(Consumer action) { } @Override - public Spliterator trySplit() { + public Spliterator trySplit() { // Create a new spliterator by polling the next file FileStatus file = files.poll(); @@ -153,7 +153,7 @@ public int characteristics() { } } - private ParquetReader createParquetReader(FileStatus file) + private ParquetReader createParquetReader(FileStatus file) throws IOException { return ParquetReader .builder(new GeoParquetGroupReadSupport(), file.getPath()) diff --git a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/FileInfo.java b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/FileInfo.java deleted file mode 100644 index bd7ccaee6..000000000 --- a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/FileInfo.java +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to you under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.baremaps.geoparquet.data; - -import com.google.common.base.Objects; -import org.apache.parquet.hadoop.metadata.ParquetMetadata; - -public final class FileInfo { - - private final long rowCount; - - private final ParquetMetadata parquetMetadata; - - private final GeoParquetMetadata geoParquetMetadata; - - public FileInfo( - long rowCount, - ParquetMetadata parquetMetadata, - GeoParquetMetadata geoParquetMetadata) { - this.rowCount = rowCount; - this.parquetMetadata = parquetMetadata; - this.geoParquetMetadata = geoParquetMetadata; - } - - public long getRowCount() { - return rowCount; - } - - public ParquetMetadata getParquetMetadata() { - return parquetMetadata; - } - - public GeoParquetMetadata getGeoParquetMetadata() { - return geoParquetMetadata; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - FileInfo that = (FileInfo) o; - return rowCount == that.rowCount - && Objects.equal(parquetMetadata, that.parquetMetadata) - && Objects.equal(geoParquetMetadata, that.geoParquetMetadata); - } - - @Override - public int hashCode() { - return Objects.hashCode(rowCount, parquetMetadata, geoParquetMetadata); - } -} diff --git a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeoParquetGroup.java b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeoParquetGroup.java index 495fc1a8d..81b49339e 100644 --- a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeoParquetGroup.java +++ b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeoParquetGroup.java @@ -78,6 +78,14 @@ public interface GeoParquetGroup { List getIntegerValues(int fieldIndex); + Binary getInt96Value(int fieldIndex); + + List getInt96Values(int fieldIndex); + + Binary getNanoTimeValue(int fieldIndex); + + List getNanoTimeValues(int fieldIndex); + Long getLongValue(int fieldIndex); List getLongValues(int fieldIndex); @@ -94,41 +102,49 @@ public interface GeoParquetGroup { List getGroupValues(int fieldIndex); - Binary getBinaryValue(String columnName); + Binary getBinaryValue(String fieldName); + + List getBinaryValues(String fieldName); - List getBinaryValues(String columnName); + Boolean getBooleanValue(String fieldName); - Boolean getBooleanValue(String columnName); + List getBooleanValues(String fieldName); - List getBooleanValues(String columnName); + Double getDoubleValue(String fieldName); - Double getDoubleValue(String columnName); + List getDoubleValues(String fieldName); - List getDoubleValues(String columnName); + Float getFloatValue(String fieldName); - Float getFloatValue(String columnName); + List getFloatValues(String fieldName); - List getFloatValues(String columnName); + Integer getIntegerValue(String fieldName); - Integer getIntegerValue(String columnName); + List getIntegerValues(String fieldName); - List getIntegerValues(String columnName); + Binary getInt96Value(String fieldName); - Long getLongValue(String columnName); + List getInt96Values(String fieldName); - List getLongValues(String columnName); + Binary getNanoTimeValue(String fieldName); - String getStringValue(String columnName); + List getNanoTimeValues(String fieldName); - List getStringValues(String columnName); + Long getLongValue(String fieldName); - Geometry getGeometryValue(String columnName); + List getLongValues(String fieldName); - List getGeometryValues(String columnName); + String getStringValue(String fieldName); - GeoParquetGroup getGroupValue(String columnName); + List getStringValues(String fieldName); - List getGroupValues(String columnName); + Geometry getGeometryValue(String fieldName); + + List getGeometryValues(String fieldName); + + GeoParquetGroup getGroupValue(String fieldName); + + List getGroupValues(String fieldName); void setBinaryValue(int fieldIndex, Binary binaryValue); @@ -150,6 +166,14 @@ public interface GeoParquetGroup { void setIntegerValues(int fieldIndex, List integerValues); + void setInt96Value(int fieldIndex, Binary int96Value); + + void setInt96Values(int fieldIndex, List int96Values); + + void setNanoTimeValue(int fieldIndex, Binary nanoTimeValue); + + void setNanoTimeValues(int fieldIndex, List nanoTimeValues); + void setLongValue(int fieldIndex, Long longValue); void setLongValues(int fieldIndex, List longValues); @@ -166,41 +190,49 @@ public interface GeoParquetGroup { void setGroupValues(int fieldIndex, List groupValues); - void setBinaryValue(String columnName, Binary binaryValue); + void setBinaryValue(String fieldName, Binary binaryValue); + + void setBinaryValues(String fieldName, List binaryValues); + + void setBooleanValue(String fieldName, Boolean booleanValue); - void setBinaryValues(String columnName, List binaryValues); + void setBooleanValues(String fieldName, List booleanValues); - void setBooleanValue(String columnName, Boolean booleanValue); + void setDoubleValue(String fieldName, Double doubleValue); - void setBooleanValues(String columnName, List booleanValues); + void setDoubleValues(String fieldName, List doubleValues); - void setDoubleValue(String columnName, Double doubleValue); + void setFloatValue(String fieldName, Float floatValue); - void setDoubleValues(String columnName, List doubleValues); + void setFloatValues(String fieldName, List floatValues); - void setFloatValue(String columnName, Float floatValue); + void setIntegerValue(String fieldName, Integer integerValue); - void setFloatValues(String columnName, List floatValues); + void setIntegerValues(String fieldName, List integerValues); - void setIntegerValue(String columnName, Integer integerValue); + void setInt96Value(String fieldName, Binary int96Value); - void setIntegerValues(String columnName, List integerValues); + void setInt96Values(String fieldName, List int96Values); - void setLongValue(String columnName, Long longValue); + void setNanoTimeValue(String fieldName, Binary nanoTimeValue); - void setLongValues(String columnName, List longValues); + void setNanoTimeValues(String fieldName, List nanoTimeValues); - void setStringValue(String columnName, String stringValue); + void setLongValue(String fieldName, Long longValue); - void setStringValues(String columnName, List stringValues); + void setLongValues(String fieldName, List longValues); - void setGeometryValue(String columnName, Geometry geometryValue); + void setStringValue(String fieldName, String stringValue); - void setGeometryValues(String columnName, List geometryValues); + void setStringValues(String fieldName, List stringValues); - void setGroupValue(String columnName, GeoParquetGroup groupValue); + void setGeometryValue(String fieldName, Geometry geometryValue); - void setGroupValues(String columnName, List groupValues); + void setGeometryValues(String fieldName, List geometryValues); + + void setGroupValue(String fieldName, GeoParquetGroup groupValue); + + void setGroupValues(String fieldName, List groupValues); /** * A GeoParquet schema that describes the fields of a group and can easily be introspected. @@ -237,57 +269,65 @@ public Type type() { record BooleanField(String name, Cardinality cardinality) implements Field { @Override - public Type type() { - return Type.BOOLEAN; - } + public Type type() { + return Type.BOOLEAN; + } } record DoubleField(String name, Cardinality cardinality) implements Field { @Override - public Type type() { - return Type.DOUBLE; - } + public Type type() { + return Type.DOUBLE; + } } record FloatField(String name, Cardinality cardinality) implements Field { @Override - public Type type() { - return Type.FLOAT; - } + public Type type() { + return Type.FLOAT; + } } record IntegerField(String name, Cardinality cardinality) implements Field { @Override - public Type type() { - return Type.INTEGER; - } + public Type type() { + return Type.INTEGER; + } + } + + record Int96Field(String name, Cardinality cardinality) implements Field { + + @Override + public Type type() { + return Type.INT96; + } } record LongField(String name, Cardinality cardinality) implements Field { @Override - public Type type() { - return Type.LONG; - } + public Type type() { + return Type.LONG; + } } record StringField(String name, Cardinality cardinality) implements Field { @Override - public Type type() { - return Type.STRING; - } + public Type type() { + return Type.STRING; + } } record GeometryField(String name, Cardinality cardinality) implements Field { @Override - public Type type() { - return Type.GEOMETRY; - } + public Type type() { + return Type.GEOMETRY; + } } record GroupField(String name, Cardinality cardinality, Schema schema) implements Field { @@ -307,6 +347,8 @@ enum Type { DOUBLE, FLOAT, INTEGER, + INT96, + NANO_TIME, LONG, STRING, GEOMETRY, diff --git a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeoParquetGroupFactory.java b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeoParquetGroupFactory.java index c56e0aadc..ebadfbda5 100644 --- a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeoParquetGroupFactory.java +++ b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeoParquetGroupFactory.java @@ -17,7 +17,11 @@ package org.apache.baremaps.geoparquet.data; +import java.util.List; +import org.apache.baremaps.geoparquet.data.GeoParquetGroup.Field; import org.apache.parquet.schema.GroupType; +import org.apache.parquet.schema.PrimitiveType; +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName; public class GeoParquetGroupFactory { @@ -25,13 +29,50 @@ public class GeoParquetGroupFactory { private final GeoParquetMetadata metadata; + private final GeoParquetGroup.Schema geoParquetSchema; + public GeoParquetGroupFactory(GroupType schema, GeoParquetMetadata metadata) { this.schema = schema; this.metadata = metadata; + this.geoParquetSchema = createGeoParquetSchema(schema, metadata); + } + + private static GeoParquetGroup.Schema createGeoParquetSchema( + GroupType schema, + GeoParquetMetadata metadata) { + List fields = schema.getFields().stream().map(field -> { + GeoParquetGroup.Cardinality cardinality = switch (field.getRepetition()) { + case REQUIRED -> GeoParquetGroup.Cardinality.REQUIRED; + case OPTIONAL -> GeoParquetGroup.Cardinality.OPTIONAL; + case REPEATED -> GeoParquetGroup.Cardinality.REPEATED; + }; + if (field.isPrimitive() && metadata.isGeometryColumn(field.getName())) { + return new GeoParquetGroup.GeometryField(field.getName(), cardinality); + } else if (field.isPrimitive()) { + PrimitiveType primitiveType = field.asPrimitiveType(); + PrimitiveTypeName primitiveTypeName = primitiveType.getPrimitiveTypeName(); + String name = primitiveType.getName(); + return switch (primitiveTypeName) { + case INT32 -> new GeoParquetGroup.IntegerField(name, cardinality); + case INT64 -> new GeoParquetGroup.LongField(name, cardinality); + case INT96 -> new GeoParquetGroup.Int96Field(name, cardinality); + case FLOAT -> new GeoParquetGroup.FloatField(name, cardinality); + case DOUBLE -> new GeoParquetGroup.DoubleField(name, cardinality); + case BOOLEAN -> new GeoParquetGroup.BooleanField(name, cardinality); + case BINARY -> new GeoParquetGroup.BinaryField(name, cardinality); + case FIXED_LEN_BYTE_ARRAY -> new GeoParquetGroup.BinaryField(name, cardinality); + }; + } else { + GroupType groupType = field.asGroupType(); + return (Field) new GeoParquetGroup.GroupField(groupType.getName(), + GeoParquetGroup.Cardinality.REQUIRED, createGeoParquetSchema(groupType, metadata)); + } + }).toList(); + return new GeoParquetGroup.Schema(fields); } public GeoParquetGroupImpl newGroup() { - return new GeoParquetGroupImpl(schema, metadata); + return new GeoParquetGroupImpl(schema, metadata, geoParquetSchema); } } diff --git a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeoParquetGroupImpl.java b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeoParquetGroupImpl.java index ec4329c46..42eef5201 100644 --- a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeoParquetGroupImpl.java +++ b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeoParquetGroupImpl.java @@ -22,43 +22,43 @@ import org.apache.parquet.io.api.Binary; import org.apache.parquet.io.api.RecordConsumer; import org.apache.parquet.schema.GroupType; -import org.apache.parquet.schema.Type; import org.locationtech.jts.geom.Geometry; import org.locationtech.jts.io.ParseException; import org.locationtech.jts.io.WKBReader; import org.locationtech.jts.io.WKBWriter; -public class GeoParquetGroupImpl { +public class GeoParquetGroupImpl implements GeoParquetGroup { private final GroupType schema; private final GeoParquetMetadata metadata; - private final List[] data; + private final Schema geoParquetSchema; + + private final List[] data; @SuppressWarnings("unchecked") - public GeoParquetGroupImpl(GroupType schema, GeoParquetMetadata metadata) { + public GeoParquetGroupImpl(GroupType schema, GeoParquetMetadata metadata, + Schema geoParquetSchema) { this.schema = schema; this.metadata = metadata; + this.geoParquetSchema = geoParquetSchema; this.data = new List[schema.getFields().size()]; for (int i = 0; i < schema.getFieldCount(); i++) { this.data[i] = new ArrayList<>(); } } - public GroupType getSchema() { - return schema; - } - public GeoParquetGroupImpl addGroup(int fieldIndex) { GeoParquetGroupImpl g = - new GeoParquetGroupImpl(schema.getType(fieldIndex).asGroupType(), metadata); + new GeoParquetGroupImpl(schema.getType(fieldIndex).asGroupType(), metadata, + geoParquetSchema); add(fieldIndex, g); return g; } public GeoParquetGroupImpl addGroup(String field) { - return addGroup(getSchema().getFieldIndex(field)); + return addGroup(getParquetSchema().getFieldIndex(field)); } public GeoParquetGroupImpl getGroup(int fieldIndex, int index) { @@ -66,11 +66,11 @@ public GeoParquetGroupImpl getGroup(int fieldIndex, int index) { } public GeoParquetGroupImpl getGroup(String field, int index) { - return getGroup(getSchema().getFieldIndex(field), index); + return getGroup(getParquetSchema().getFieldIndex(field), index); } public int getFieldRepetitionCount(int fieldIndex) { - List list = data[fieldIndex]; + List list = data[fieldIndex]; return list == null ? 0 : list.size(); } @@ -106,10 +106,6 @@ public Binary getBinary(int fieldIndex, int index) { return ((BinaryValue) getValue(fieldIndex, index)).getBinary(); } - public NanoTimeValue getTimeNanos(int fieldIndex, int index) { - return NanoTimeValue.fromInt96((Int96Value) getValue(fieldIndex, index)); - } - public Binary getInt96(int fieldIndex, int index) { return ((Int96Value) getValue(fieldIndex, index)).getInt96(); } @@ -124,7 +120,7 @@ public Geometry getGeometry(int fieldIndex, int index) { } private Object getValue(int fieldIndex, int index) { - List list; + List list; try { list = data[fieldIndex]; } catch (IndexOutOfBoundsException e) { @@ -142,9 +138,9 @@ private Object getValue(int fieldIndex, int index) { } private void add(int fieldIndex, Primitive value) { - Type type = schema.getType(fieldIndex); - List list = data[fieldIndex]; - if (!type.isRepetition(Type.Repetition.REPEATED) + org.apache.parquet.schema.Type type = schema.getType(fieldIndex); + List list = data[fieldIndex]; + if (!type.isRepetition(org.apache.parquet.schema.Type.Repetition.REPEATED) && !list.isEmpty()) { throw new IllegalStateException("field " + fieldIndex + " (" + type.getName() + ") can not have more than one value: " + list); @@ -164,16 +160,12 @@ public void add(int fieldIndex, String value) { add(fieldIndex, new BinaryValue(Binary.fromString(value))); } - public void add(int fieldIndex, NanoTimeValue value) { - add(fieldIndex, value.toInt96()); - } - public void add(int fieldIndex, boolean value) { add(fieldIndex, new BooleanValue(value)); } public void add(int fieldIndex, Binary value) { - switch (getSchema().getType(fieldIndex).asPrimitiveType().getPrimitiveTypeName()) { + switch (getParquetSchema().getType(fieldIndex).asPrimitiveType().getPrimitiveTypeName()) { case BINARY: case FIXED_LEN_BYTE_ARRAY: add(fieldIndex, new BinaryValue(value)); @@ -183,7 +175,7 @@ public void add(int fieldIndex, Binary value) { break; default: throw new UnsupportedOperationException( - getSchema().asPrimitiveType().getName() + " not supported for Binary"); + getParquetSchema().asPrimitiveType().getName() + " not supported for Binary"); } } @@ -196,7 +188,8 @@ public void add(int fieldIndex, double value) { } public void add(int fieldIndex, GeoParquetGroupImpl value) { - data[fieldIndex].add(value); + List list = data[fieldIndex]; + list.add(value); } public void add(int fieldIndex, Geometry geometry) { @@ -205,44 +198,40 @@ public void add(int fieldIndex, Geometry geometry) { } public void add(String field, int value) { - add(getSchema().getFieldIndex(field), value); + add(getParquetSchema().getFieldIndex(field), value); } public void add(String field, long value) { - add(getSchema().getFieldIndex(field), value); + add(getParquetSchema().getFieldIndex(field), value); } public void add(String field, float value) { - add(getSchema().getFieldIndex(field), value); + add(getParquetSchema().getFieldIndex(field), value); } public void add(String field, double value) { - add(getSchema().getFieldIndex(field), value); + add(getParquetSchema().getFieldIndex(field), value); } public void add(String field, String value) { - add(getSchema().getFieldIndex(field), value); - } - - public void add(String field, NanoTimeValue value) { - add(getSchema().getFieldIndex(field), value); + add(getParquetSchema().getFieldIndex(field), value); } public void add(String field, boolean value) { - add(getSchema().getFieldIndex(field), value); + add(getParquetSchema().getFieldIndex(field), value); } public void add(String field, Binary value) { - add(getSchema().getFieldIndex(field), value); + add(getParquetSchema().getFieldIndex(field), value); } public void add(String field, GeoParquetGroupImpl value) { - add(getSchema().getFieldIndex(field), value); + add(getParquetSchema().getFieldIndex(field), value); } public void add(String field, Geometry geometry) { byte[] bytes = new WKBWriter().write(geometry); - add(getSchema().getFieldIndex(field), Binary.fromConstantByteArray(bytes)); + add(getParquetSchema().getFieldIndex(field), Binary.fromConstantByteArray(bytes)); } public void writeValue(int field, int index, RecordConsumer recordConsumer) { @@ -262,9 +251,9 @@ public String toString(String indent) { private void appendToString(StringBuilder builder, String indent) { int i = 0; - for (Type field : schema.getFields()) { + for (org.apache.parquet.schema.Type field : schema.getFields()) { String name = field.getName(); - List values = data[i]; + List values = data[i]; ++i; if (values != null && !values.isEmpty()) { for (Object value : values) { @@ -281,4 +270,481 @@ private void appendToString(StringBuilder builder, String indent) { } } } + + private List getValues(int fieldIndex) { + return (List) data[fieldIndex]; + } + + private List getGroups(int fieldIndex) { + return (List) data[fieldIndex]; + } + + @Override + public Schema getSchema() { + return geoParquetSchema; + } + + @Override + public GroupType getParquetSchema() { + return schema; + } + + @Override + public GeoParquetMetadata getGeoParquetMetadata() { + return metadata; + } + + @Override + public GeoParquetGroup createGroup(int fieldIndex) { + return new GeoParquetGroupImpl(schema.getType(fieldIndex).asGroupType(), metadata, + geoParquetSchema); + } + + @Override + public Binary getBinaryValue(int fieldIndex) { + return getBinaryValues(fieldIndex).get(0); + } + + @Override + public List getBinaryValues(int fieldIndex) { + return getValues(fieldIndex).stream().map(Primitive::getBinary).toList(); + } + + @Override + public Boolean getBooleanValue(int fieldIndex) { + return getBooleanValues(fieldIndex).get(0); + } + + @Override + public List getBooleanValues(int fieldIndex) { + return getValues(fieldIndex).stream().map(Primitive::getBoolean).toList(); + } + + @Override + public Double getDoubleValue(int fieldIndex) { + return getDoubleValues(fieldIndex).get(0); + } + + @Override + public List getDoubleValues(int fieldIndex) { + return getValues(fieldIndex).stream().map(Primitive::getDouble).toList(); + } + + @Override + public Float getFloatValue(int fieldIndex) { + return getFloatValues(fieldIndex).get(0); + } + + @Override + public List getFloatValues(int fieldIndex) { + return getValues(fieldIndex).stream().map(Primitive::getFloat).toList(); + } + + @Override + public Integer getIntegerValue(int fieldIndex) { + return getIntegerValues(fieldIndex).get(0); + } + + @Override + public List getIntegerValues(int fieldIndex) { + return getValues(fieldIndex).stream().map(Primitive::getInteger).toList(); + } + + @Override + public Binary getInt96Value(int fieldIndex) { + return getBinaryValues(fieldIndex).get(0); + } + + @Override + public List getInt96Values(int fieldIndex) { + return getValues(fieldIndex).stream().map(Primitive::getBinary).toList(); + } + + @Override + public Binary getNanoTimeValue(int fieldIndex) { + return getBinaryValues(fieldIndex).get(0); + } + + @Override + public List getNanoTimeValues(int fieldIndex) { + return getValues(fieldIndex).stream().map(Primitive::getBinary).toList(); + } + + @Override + public Long getLongValue(int fieldIndex) { + return getLongValues(fieldIndex).get(0); + } + + @Override + public List getLongValues(int fieldIndex) { + return getValues(fieldIndex).stream().map(Primitive::getLong).toList(); + } + + @Override + public String getStringValue(int fieldIndex) { + return getStringValues(fieldIndex).get(0); + } + + @Override + public List getStringValues(int fieldIndex) { + return getValues(fieldIndex).stream().map(Primitive::getString).toList(); + } + + @Override + public Geometry getGeometryValue(int fieldIndex) { + return getGeometryValues(fieldIndex).get(0); + } + + @Override + public List getGeometryValues(int fieldIndex) { + List geometries = new ArrayList<>(); + for (Binary binary : getBinaryValues(fieldIndex)) { + try { + geometries.add(new WKBReader().read(binary.getBytes())); + } catch (ParseException e) { + throw new RuntimeException(e); + } + } + return geometries; + } + + @Override + public GeoParquetGroup getGroupValue(int fieldIndex) { + return getGroupValues(fieldIndex).get(0); + } + + @Override + public List getGroupValues(int fieldIndex) { + return getGroups(fieldIndex); + } + + @Override + public Binary getBinaryValue(String fieldName) { + return getBinaryValues(fieldName).get(0); + } + + @Override + public List getBinaryValues(String fieldName) { + return getBinaryValues(schema.getFieldIndex(fieldName)); + } + + @Override + public Boolean getBooleanValue(String fieldName) { + return getBooleanValues(fieldName).get(0); + } + + @Override + public List getBooleanValues(String fieldName) { + return getBooleanValues(schema.getFieldIndex(fieldName)); + } + + @Override + public Double getDoubleValue(String fieldName) { + return getDoubleValues(fieldName).get(0); + } + + @Override + public List getDoubleValues(String fieldName) { + return getDoubleValues(schema.getFieldIndex(fieldName)); + } + + @Override + public Float getFloatValue(String fieldName) { + return getFloatValues(fieldName).get(0); + } + + @Override + public List getFloatValues(String fieldName) { + return getFloatValues(schema.getFieldIndex(fieldName)); + } + + @Override + public Integer getIntegerValue(String fieldName) { + return getIntegerValues(fieldName).get(0); + } + + @Override + public List getIntegerValues(String fieldName) { + return getIntegerValues(schema.getFieldIndex(fieldName)); + } + + @Override + public Binary getInt96Value(String fieldName) { + return getBinaryValues(fieldName).get(0); + } + + @Override + public List getInt96Values(String fieldName) { + return getBinaryValues(schema.getFieldIndex(fieldName)); + } + + @Override + public Binary getNanoTimeValue(String fieldName) { + return getBinaryValues(fieldName).get(0); + } + + @Override + public List getNanoTimeValues(String fieldName) { + return getBinaryValues(schema.getFieldIndex(fieldName)); + } + + @Override + public Long getLongValue(String fieldName) { + return getLongValues(fieldName).get(0); + } + + @Override + public List getLongValues(String fieldName) { + return getLongValues(schema.getFieldIndex(fieldName)); + } + + @Override + public String getStringValue(String fieldName) { + return getStringValues(fieldName).get(0); + } + + @Override + public List getStringValues(String fieldName) { + return getStringValues(schema.getFieldIndex(fieldName)); + } + + @Override + public Geometry getGeometryValue(String fieldName) { + return getGeometryValues(fieldName).get(0); + } + + @Override + public List getGeometryValues(String fieldName) { + return getGeometryValues(schema.getFieldIndex(fieldName)); + } + + @Override + public GeoParquetGroup getGroupValue(String fieldName) { + return getGroupValues(fieldName).get(0); + } + + @Override + public List getGroupValues(String fieldName) { + return getGroupValues(schema.getFieldIndex(fieldName)); + } + + @Override + public void setBinaryValue(int fieldIndex, Binary binaryValue) { + throw new UnsupportedOperationException(); + } + + @Override + public void setBinaryValues(int fieldIndex, List binaryValues) { + throw new UnsupportedOperationException(); + } + + @Override + public void setBooleanValue(int fieldIndex, Boolean booleanValue) { + throw new UnsupportedOperationException(); + } + + @Override + public void setBooleanValues(int fieldIndex, List booleanValues) { + throw new UnsupportedOperationException(); + } + + @Override + public void setDoubleValue(int fieldIndex, Double doubleValue) { + throw new UnsupportedOperationException(); + } + + @Override + public void setDoubleValues(int fieldIndex, List doubleValues) { + throw new UnsupportedOperationException(); + } + + @Override + public void setFloatValue(int fieldIndex, Float floatValue) { + throw new UnsupportedOperationException(); + } + + @Override + public void setFloatValues(int fieldIndex, List floatValues) { + throw new UnsupportedOperationException(); + } + + @Override + public void setIntegerValue(int fieldIndex, Integer integerValue) { + throw new UnsupportedOperationException(); + } + + @Override + public void setIntegerValues(int fieldIndex, List integerValues) { + throw new UnsupportedOperationException(); + } + + @Override + public void setInt96Value(int fieldIndex, Binary int96Value) { + throw new UnsupportedOperationException(); + } + + @Override + public void setInt96Values(int fieldIndex, List int96Values) { + throw new UnsupportedOperationException(); + } + + @Override + public void setNanoTimeValue(int fieldIndex, Binary nanoTimeValue) { + throw new UnsupportedOperationException(); + } + + @Override + public void setNanoTimeValues(int fieldIndex, List nanoTimeValues) { + throw new UnsupportedOperationException(); + } + + @Override + public void setLongValue(int fieldIndex, Long longValue) { + throw new UnsupportedOperationException(); + } + + @Override + public void setLongValues(int fieldIndex, List longValues) { + throw new UnsupportedOperationException(); + } + + @Override + public void setStringValue(int fieldIndex, String stringValue) { + throw new UnsupportedOperationException(); + } + + @Override + public void setStringValues(int fieldIndex, List stringValues) { + throw new UnsupportedOperationException(); + } + + @Override + public void setGeometryValue(int fieldIndex, Geometry geometryValue) { + throw new UnsupportedOperationException(); + } + + @Override + public void setGeometryValues(int fieldIndex, List geometryValues) { + throw new UnsupportedOperationException(); + } + + @Override + public void setGroupValue(int fieldIndex, GeoParquetGroup groupValue) { + throw new UnsupportedOperationException(); + } + + @Override + public void setGroupValues(int fieldIndex, List groupValues) { + throw new UnsupportedOperationException(); + } + + @Override + public void setBinaryValue(String fieldName, Binary binaryValue) { + throw new UnsupportedOperationException(); + } + + @Override + public void setBinaryValues(String fieldName, List binaryValues) { + throw new UnsupportedOperationException(); + } + + @Override + public void setBooleanValue(String fieldName, Boolean booleanValue) { + throw new UnsupportedOperationException(); + } + + @Override + public void setBooleanValues(String fieldName, List booleanValues) { + throw new UnsupportedOperationException(); + } + + @Override + public void setDoubleValue(String fieldName, Double doubleValue) { + throw new UnsupportedOperationException(); + } + + @Override + public void setDoubleValues(String fieldName, List doubleValues) { + throw new UnsupportedOperationException(); + } + + @Override + public void setFloatValue(String fieldName, Float floatValue) { + throw new UnsupportedOperationException(); + } + + @Override + public void setFloatValues(String fieldName, List floatValues) { + throw new UnsupportedOperationException(); + } + + @Override + public void setIntegerValue(String fieldName, Integer integerValue) { + throw new UnsupportedOperationException(); + } + + @Override + public void setIntegerValues(String fieldName, List integerValues) { + throw new UnsupportedOperationException(); + } + + @Override + public void setInt96Value(String fieldName, Binary int96Value) { + throw new UnsupportedOperationException(); + } + + @Override + public void setInt96Values(String fieldName, List int96Values) { + throw new UnsupportedOperationException(); + } + + @Override + public void setNanoTimeValue(String fieldName, Binary nanoTimeValue) { + throw new UnsupportedOperationException(); + } + + @Override + public void setNanoTimeValues(String fieldName, List nanoTimeValues) { + throw new UnsupportedOperationException(); + } + + @Override + public void setLongValue(String fieldName, Long longValue) { + throw new UnsupportedOperationException(); + } + + @Override + public void setLongValues(String fieldName, List longValues) { + throw new UnsupportedOperationException(); + } + + @Override + public void setStringValue(String fieldName, String stringValue) { + throw new UnsupportedOperationException(); + } + + @Override + public void setStringValues(String fieldName, List stringValues) { + throw new UnsupportedOperationException(); + } + + @Override + public void setGeometryValue(String fieldName, Geometry geometryValue) { + throw new UnsupportedOperationException(); + } + + @Override + public void setGeometryValues(String fieldName, List geometryValues) { + throw new UnsupportedOperationException(); + } + + @Override + public void setGroupValue(String fieldName, GeoParquetGroup groupValue) { + throw new UnsupportedOperationException(); + } + + @Override + public void setGroupValues(String fieldName, List groupValues) { + throw new UnsupportedOperationException(); + } } diff --git a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeoParquetGroupRecordConverter.java b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeoParquetGroupRecordConverter.java index 016800744..54cfb7975 100644 --- a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeoParquetGroupRecordConverter.java +++ b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeoParquetGroupRecordConverter.java @@ -36,7 +36,7 @@ import org.apache.parquet.io.api.RecordMaterializer; import org.apache.parquet.schema.MessageType; -public class GeoParquetGroupRecordConverter extends RecordMaterializer { +public class GeoParquetGroupRecordConverter extends RecordMaterializer { private final GeoParquetGroupFactory groupFactory; diff --git a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/NanoTimeValue.java b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/NanoTimeValue.java deleted file mode 100644 index b60373676..000000000 --- a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/NanoTimeValue.java +++ /dev/null @@ -1,79 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to you under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.baremaps.geoparquet.data; - -import java.nio.ByteBuffer; -import java.nio.ByteOrder; -import org.apache.parquet.Preconditions; -import org.apache.parquet.io.api.Binary; -import org.apache.parquet.io.api.RecordConsumer; - -public class NanoTimeValue extends Primitive { - private final int julianDay; - private final long timeOfDayNanos; - - public static NanoTimeValue fromBinary(Binary bytes) { - Preconditions.checkArgument(bytes.length() == 12, "Must be 12 bytes"); - ByteBuffer buf = bytes.toByteBuffer(); - buf.order(ByteOrder.LITTLE_ENDIAN); - long timeOfDayNanos = buf.getLong(); - int julianDay = buf.getInt(); - return new NanoTimeValue(julianDay, timeOfDayNanos); - } - - public static NanoTimeValue fromInt96(Int96Value int96) { - ByteBuffer buf = int96.getInt96().toByteBuffer(); - return new NanoTimeValue(buf.getInt(), buf.getLong()); - } - - public NanoTimeValue(int julianDay, long timeOfDayNanos) { - this.julianDay = julianDay; - this.timeOfDayNanos = timeOfDayNanos; - } - - public int getJulianDay() { - return julianDay; - } - - public long getTimeOfDayNanos() { - return timeOfDayNanos; - } - - public Binary toBinary() { - ByteBuffer buf = ByteBuffer.allocate(12); - buf.order(ByteOrder.LITTLE_ENDIAN); - buf.putLong(timeOfDayNanos); - buf.putInt(julianDay); - buf.flip(); - return Binary.fromConstantByteBuffer(buf); - } - - public Int96Value toInt96() { - return new Int96Value(toBinary()); - } - - @Override - public void writeValue(RecordConsumer recordConsumer) { - recordConsumer.addBinary(toBinary()); - } - - @Override - public String toString() { - return "NanoTime{julianDay=" + julianDay + ", timeOfDayNanos=" + timeOfDayNanos + "}"; - } -} diff --git a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/hadoop/GeoParquetGroupReadSupport.java b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/hadoop/GeoParquetGroupReadSupport.java index e579b6e54..51b575f88 100644 --- a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/hadoop/GeoParquetGroupReadSupport.java +++ b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/hadoop/GeoParquetGroupReadSupport.java @@ -20,7 +20,7 @@ import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.ObjectMapper; import java.util.Map; -import org.apache.baremaps.geoparquet.data.GeoParquetGroupImpl; +import org.apache.baremaps.geoparquet.data.GeoParquetGroup; import org.apache.baremaps.geoparquet.data.GeoParquetGroupRecordConverter; import org.apache.baremaps.geoparquet.data.GeoParquetMetadata; import org.apache.hadoop.conf.Configuration; @@ -28,7 +28,7 @@ import org.apache.parquet.io.api.RecordMaterializer; import org.apache.parquet.schema.MessageType; -public class GeoParquetGroupReadSupport extends ReadSupport { +public class GeoParquetGroupReadSupport extends ReadSupport { @Override public ReadContext init( @@ -40,7 +40,7 @@ public ReadContext init( } @Override - public RecordMaterializer prepareForRead(Configuration configuration, + public RecordMaterializer prepareForRead(Configuration configuration, Map keyValueMetaData, MessageType fileSchema, ReadContext readContext) { diff --git a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/hadoop/GeoParquetInputFormat.java b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/hadoop/GeoParquetInputFormat.java index d9aacabd7..1c2bc8ff3 100644 --- a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/hadoop/GeoParquetInputFormat.java +++ b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/hadoop/GeoParquetInputFormat.java @@ -17,7 +17,7 @@ package org.apache.baremaps.geoparquet.hadoop; -import org.apache.baremaps.geoparquet.data.GeoParquetGroupImpl; +import org.apache.baremaps.geoparquet.data.GeoParquetGroup; import org.apache.parquet.hadoop.ParquetInputFormat; /** @@ -26,7 +26,7 @@ * This Input format uses a rather inefficient data model but works independently of higher level * abstractions. */ -public class GeoParquetInputFormat extends ParquetInputFormat { +public class GeoParquetInputFormat extends ParquetInputFormat { public GeoParquetInputFormat() { super(GeoParquetGroupReadSupport.class); diff --git a/baremaps-geoparquet/src/test/java/org/apache/baremaps/geoparquet/GeoParquetReaderTest.java b/baremaps-geoparquet/src/test/java/org/apache/baremaps/geoparquet/GeoParquetReaderTest.java index bf7e92f46..2039937e7 100644 --- a/baremaps-geoparquet/src/test/java/org/apache/baremaps/geoparquet/GeoParquetReaderTest.java +++ b/baremaps-geoparquet/src/test/java/org/apache/baremaps/geoparquet/GeoParquetReaderTest.java @@ -34,7 +34,9 @@ void read() throws IOException, URISyntaxException { GeoParquetReader geoParquetReader = new GeoParquetReader(geoParquet); geoParquetReader.read().forEach(group -> { System.out.println("-----"); - System.out.println(group); + System.out.println(group.getSchema()); + System.out.println(group.getGeometryValue("geometry")); + }); } }