Skip to content

Commit

Permalink
Implement the GeoParquetGroup interface
Browse files Browse the repository at this point in the history
  • Loading branch information
bchapuis committed May 27, 2024
1 parent 57f945c commit 455dccc
Show file tree
Hide file tree
Showing 10 changed files with 667 additions and 265 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import java.util.function.Consumer;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.apache.baremaps.geoparquet.data.GeoParquetGroupImpl;
import org.apache.baremaps.geoparquet.data.GeoParquetGroup;
import org.apache.baremaps.geoparquet.hadoop.GeoParquetGroupReadSupport;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
Expand All @@ -52,7 +52,7 @@ public GeoParquetReader(URI uri, Configuration configuration) {
this.configuration = configuration;
}

public Stream<GeoParquetGroupImpl> readParallel() throws IOException, URISyntaxException {
public Stream<GeoParquetGroup> readParallel() throws IOException, URISyntaxException {
Path globPath = new Path(uri.getPath());
URI rootUri = getRootUri(uri);
FileSystem fileSystem = FileSystem.get(rootUri, configuration);
Expand All @@ -62,24 +62,24 @@ public Stream<GeoParquetGroupImpl> readParallel() throws IOException, URISyntaxE
true);
}

public Stream<GeoParquetGroupImpl> read() throws IOException, URISyntaxException {
public Stream<GeoParquetGroup> read() throws IOException, URISyntaxException {
return readParallel().sequential();
}

public class GeoParquetGroupSpliterator implements Spliterator<GeoParquetGroupImpl> {
public class GeoParquetGroupSpliterator implements Spliterator<GeoParquetGroup> {

private final Queue<FileStatus> files;

private FileStatus file;

private ParquetReader<GeoParquetGroupImpl> reader;
private ParquetReader<GeoParquetGroup> reader;

public GeoParquetGroupSpliterator(List<FileStatus> files) {
this.files = new ArrayBlockingQueue<>(files.size(), false, files);
}

@Override
public boolean tryAdvance(Consumer<? super GeoParquetGroupImpl> action) {
public boolean tryAdvance(Consumer<? super GeoParquetGroup> action) {
try {
// Poll the next file
if (file == null) {
Expand All @@ -97,7 +97,7 @@ public boolean tryAdvance(Consumer<? super GeoParquetGroupImpl> action) {
}

// Read the next group
GeoParquetGroupImpl group = reader.read();
GeoParquetGroup group = reader.read();

// If the group is null, close the resources and set the variables to null
if (group == null) {
Expand Down Expand Up @@ -127,7 +127,7 @@ public boolean tryAdvance(Consumer<? super GeoParquetGroupImpl> action) {
}

@Override
public Spliterator<GeoParquetGroupImpl> trySplit() {
public Spliterator<GeoParquetGroup> trySplit() {
// Create a new spliterator by polling the next file
FileStatus file = files.poll();

Expand All @@ -153,7 +153,7 @@ public int characteristics() {
}
}

private ParquetReader<GeoParquetGroupImpl> createParquetReader(FileStatus file)
private ParquetReader<GeoParquetGroup> createParquetReader(FileStatus file)
throws IOException {
return ParquetReader
.builder(new GeoParquetGroupReadSupport(), file.getPath())
Expand Down

This file was deleted.

Loading

0 comments on commit 455dccc

Please sign in to comment.