Skip to content

Commit

Permalink
properly handle sliced struct arrays (#279)
Browse files Browse the repository at this point in the history
  • Loading branch information
candiduslynx authored Aug 29, 2023
1 parent a7865cf commit e5feaf9
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 18 deletions.
24 changes: 11 additions & 13 deletions parquet/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,7 @@ func (*Client) Read(f parquet.ReaderAtSeeker, table *schema.Table, res chan<- ar

sc := table.ToArrowSchema()
for rr.Next() {
rec := rr.Record()
newRecs := convertToSingleRowRecords(sc, rec)
for _, r := range newRecs {
for _, r := range slice(reverseTransformRecord(sc, rr.Record())) {
res <- r
}
}
Expand All @@ -48,15 +46,12 @@ func (*Client) Read(f parquet.ReaderAtSeeker, table *schema.Table, res chan<- ar
return nil
}

func convertToSingleRowRecords(sc *arrow.Schema, rec arrow.Record) []arrow.Record {
// transform first
transformed := reverseTransformRecord(sc, rec)
// slice after
records := make([]arrow.Record, transformed.NumRows())
for i := int64(0); i < transformed.NumRows(); i++ {
records[i] = transformed.NewSlice(i, i+1)
func slice(r arrow.Record) []arrow.Record {
res := make([]arrow.Record, r.NumRows())
for i := int64(0); i < r.NumRows(); i++ {
res[i] = r.NewSlice(i, i+1)
}
return records
return res
}

func reverseTransformRecord(sc *arrow.Schema, rec arrow.Record) arrow.Record {
Expand Down Expand Up @@ -90,15 +85,18 @@ func reverseTransformArray(dt arrow.DataType, arr arrow.Array) arrow.Array {
dt, arr.Len(),
arr.Data().Buffers(),
children,
arr.NullN(), arr.Data().Offset(),
arr.NullN(),
0, // we use 0 as offset for struct arrays, as the child arrays would already be sliced properly
))

case array.ListLike: // this also handles maps
return array.MakeFromData(array.NewData(
dt, arr.Len(),
arr.Data().Buffers(),
[]arrow.ArrayData{reverseTransformArray(dt.(arrow.ListLikeType).Elem(), arr.ListValues()).Data()},
arr.NullN(), arr.Data().Offset(),
arr.NullN(),
// we use data offset for list like as the `ListValues` can be a larger array (happens when slicing)
arr.Data().Offset(),
))

default:
Expand Down
7 changes: 5 additions & 2 deletions parquet/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,15 +126,18 @@ func transformArray(arr arrow.Array) arrow.Array {
transformDataType(dt), arr.Len(),
arr.Data().Buffers(),
children,
arr.NullN(), arr.Data().Offset(),
arr.NullN(),
0, // we use 0 as offset for struct arrays, as the child arrays would already be sliced properly
))

case array.ListLike: // this also handles maps
return array.MakeFromData(array.NewData(
transformDataType(arr.DataType()), arr.Len(),
arr.Data().Buffers(),
[]arrow.ArrayData{transformArray(arr.ListValues()).Data()},
arr.NullN(), arr.Data().Offset(),
arr.NullN(),
// we use data offset for list like as the `ListValues` can be a larger array (happens when slicing)
arr.Data().Offset(),
))

default:
Expand Down
55 changes: 52 additions & 3 deletions parquet/write_read_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,15 @@ import (
)

func TestWriteRead(t *testing.T) {
const rows = 10
var b bytes.Buffer
table := schema.TestTable("test", schema.TestSourceOptions{})
sourceName := "test-source"
syncTime := time.Now().UTC().Round(time.Second)
opts := schema.GenTestDataOptions{
SourceName: sourceName,
SyncTime: syncTime,
MaxRows: 2,
MaxRows: rows,
}
tg := schema.NewTestDataGenerator()
record := tg.Generate(table, opts)
Expand Down Expand Up @@ -53,13 +54,61 @@ func TestWriteRead(t *testing.T) {
readErr = cl.Read(byteReader, table, ch)
close(ch)
}()
received := make([]arrow.Record, 0, 2)
received := make([]arrow.Record, 0, rows)
for got := range ch {
received = append(received, got)
}
require.Empty(t, plugin.RecordsDiff(table.ToArrowSchema(), []arrow.Record{record}, received))
require.NoError(t, readErr)
require.Equalf(t, 2, len(received), "got %d row(s), want %d", len(received), 2)
require.Equalf(t, rows, len(received), "got %d row(s), want %d", len(received), rows)
}
func TestWriteReadSliced(t *testing.T) {
const rows = 10
var b bytes.Buffer
table := schema.TestTable("test", schema.TestSourceOptions{})
sourceName := "test-source"
syncTime := time.Now().UTC().Round(time.Second)
opts := schema.GenTestDataOptions{
SourceName: sourceName,
SyncTime: syncTime,
MaxRows: rows,
}
tg := schema.NewTestDataGenerator()
record := tg.Generate(table, opts)

writer := bufio.NewWriter(&b)
reader := bufio.NewReader(&b)

cl, err := NewClient()
if err != nil {
t.Fatal(err)
}
if err := types.WriteAll(cl, writer, table, slice(record)); err != nil {
t.Fatal(err)
}
err = writer.Flush()
if err != nil {
t.Fatal(err)
}

rawBytes, err := io.ReadAll(reader)
if err != nil {
t.Fatal(err)
}
byteReader := bytes.NewReader(rawBytes)
ch := make(chan arrow.Record)
var readErr error
go func() {
readErr = cl.Read(byteReader, table, ch)
close(ch)
}()
received := make([]arrow.Record, 0, rows)
for got := range ch {
received = append(received, got)
}
require.Empty(t, plugin.RecordsDiff(table.ToArrowSchema(), []arrow.Record{record}, received))
require.NoError(t, readErr)
require.Equalf(t, rows, len(received), "got %d row(s), want %d", len(received), rows)
}

func BenchmarkWrite(b *testing.B) {
Expand Down

0 comments on commit e5feaf9

Please sign in to comment.