Skip to content

Commit

Permalink
fix: Optimize Parquet writer/reader (#136)
Browse files Browse the repository at this point in the history
This uses `arrow.NewRecord` instead of a RecordBuilder in order to do fewer copies.

```
BenchmarkWrite-8   	       3	 335982583 ns/op (before)
BenchmarkWrite-8   	       4	 319302886 ns/op (after)
```

Also turns on Snappy compression.
  • Loading branch information
hermanschaaf authored Apr 20, 2023
1 parent 5c79803 commit e8e7cb8
Show file tree
Hide file tree
Showing 6 changed files with 156 additions and 228 deletions.
31 changes: 31 additions & 0 deletions csv/write_read_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,3 +88,34 @@ func TestWriteRead(t *testing.T) {
})
}
}

func BenchmarkWrite(b *testing.B) {
table := testdata.TestTable("test")
arrowSchema := table.ToArrowSchema()
sourceName := "test-source"
syncTime := time.Now().UTC().Round(1 * time.Second)
opts := testdata.GenTestDataOptions{
SourceName: sourceName,
SyncTime: syncTime,
MaxRows: 1000,
}
records := testdata.GenTestData(arrowSchema, opts)

cl, err := NewClient()
if err != nil {
b.Fatal(err)
}
var buf bytes.Buffer
writer := bufio.NewWriter(&buf)
b.ResetTimer()
for i := 0; i < b.N; i++ {
if err := cl.WriteTableBatch(writer, arrowSchema, records); err != nil {
b.Fatal(err)
}
err = writer.Flush()
if err != nil {
b.Fatal(err)
}
buf.Reset()
}
}
31 changes: 31 additions & 0 deletions json/write_read_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,3 +95,34 @@ func TestWriteRead(t *testing.T) {
t.Fatalf("expected 2 rows, got %d", totalCount)
}
}

func BenchmarkWrite(b *testing.B) {
table := testdata.TestTable("test")
arrowSchema := table.ToArrowSchema()
sourceName := "test-source"
syncTime := time.Now().UTC().Round(1 * time.Second)
opts := testdata.GenTestDataOptions{
SourceName: sourceName,
SyncTime: syncTime,
MaxRows: 1000,
}
records := testdata.GenTestData(arrowSchema, opts)

cl, err := NewClient()
if err != nil {
b.Fatal(err)
}
var buf bytes.Buffer
writer := bufio.NewWriter(&buf)
b.ResetTimer()
for i := 0; i < b.N; i++ {
if err := cl.WriteTableBatch(writer, arrowSchema, records); err != nil {
b.Fatal(err)
}
err = writer.Flush()
if err != nil {
b.Fatal(err)
}
buf.Reset()
}
}
53 changes: 39 additions & 14 deletions parquet/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func convertToSingleRowRecords(rec arrow.Record) []arrow.Record {

// castExtensionColsToString casts extension columns to string.
func castStringsToExtensions(rec arrow.Record, arrowSchema *arrow.Schema) (arrow.Record, error) {
rb := array.NewRecordBuilder(memory.DefaultAllocator, arrowSchema)
cols := make([]arrow.Array, rec.NumCols())
for c := 0; c < int(rec.NumCols()); c++ {
col := rec.Column(c)
switch {
Expand All @@ -77,20 +77,24 @@ func castStringsToExtensions(rec arrow.Record, arrowSchema *arrow.Schema) (arrow
if err != nil {
return nil, fmt.Errorf("failed to marshal col %v: %w", rec.ColumnName(c), err)
}
err = rb.Field(c).UnmarshalJSON(b)
sb := types.NewUUIDBuilder(array.NewExtensionBuilder(memory.DefaultAllocator, types.NewUUIDType()))
err = sb.UnmarshalJSON(b)
if err != nil {
return nil, fmt.Errorf("failed to unmarshal col %v: %w", rec.ColumnName(c), err)
}
cols[c] = sb.NewArray()
case arrow.TypeEqual(arrowSchema.Field(c).Type, types.NewInetType()):
arr := col.(*array.String)
b, err := arr.MarshalJSON()
if err != nil {
return nil, fmt.Errorf("failed to marshal col %v: %w", rec.ColumnName(c), err)
}
err = rb.Field(c).UnmarshalJSON(b)
sb := types.NewInetBuilder(array.NewExtensionBuilder(memory.DefaultAllocator, types.NewInetType()))
err = sb.UnmarshalJSON(b)
if err != nil {
return nil, fmt.Errorf("failed to unmarshal col %v: %w", rec.ColumnName(c), err)
}
cols[c] = sb.NewArray()
case arrow.TypeEqual(arrowSchema.Field(c).Type, types.NewJSONType()):
arr := col.(*array.String)
b, err := arr.MarshalJSON()
Expand All @@ -102,50 +106,71 @@ func castStringsToExtensions(rec arrow.Record, arrowSchema *arrow.Schema) (arrow
if err != nil {
return nil, fmt.Errorf("failed to unmarshal col %v: %w", rec.ColumnName(c), err)
}
sb := types.NewJSONBuilder(array.NewExtensionBuilder(memory.DefaultAllocator, types.NewJSONType()))
for _, v := range a {
if v == nil {
rb.Field(c).(*types.JSONBuilder).AppendNull()
sb.AppendNull()
continue
}
var v2 any
err = json.Unmarshal([]byte(v.(string)), &v2)
if err != nil {
return nil, fmt.Errorf("failed to unmarshal col %v: %w", rec.ColumnName(c), err)
}
rb.Field(c).(*types.JSONBuilder).Append(v2)
sb.Append(v2)
}
cols[c] = sb.NewArray()
case arrow.TypeEqual(arrowSchema.Field(c).Type, types.NewMacType()):
arr := col.(*array.String)
b, err := arr.MarshalJSON()
if err != nil {
return nil, fmt.Errorf("failed to marshal col %v: %w", rec.ColumnName(c), err)
}
err = rb.Field(c).UnmarshalJSON(b)
sb := types.NewMacBuilder(array.NewExtensionBuilder(memory.DefaultAllocator, types.NewMacType()))
err = sb.UnmarshalJSON(b)
if err != nil {
return nil, fmt.Errorf("failed to unmarshal col %v: %w", rec.ColumnName(c), err)
}
case arrow.TypeEqual(arrowSchema.Field(c).Type, arrow.ListOf(types.NewUUIDType())),
arrow.TypeEqual(arrowSchema.Field(c).Type, arrow.ListOf(types.NewInetType())),
arrow.TypeEqual(arrowSchema.Field(c).Type, arrow.ListOf(types.NewMacType())):
cols[c] = sb.NewArray()
case arrow.TypeEqual(arrowSchema.Field(c).Type, arrow.ListOf(types.NewUUIDType())):
arr := col.(*array.List)
b, err := arr.MarshalJSON()
if err != nil {
return nil, fmt.Errorf("failed to marshal col %v: %w", rec.ColumnName(c), err)
}
err = rb.Field(c).UnmarshalJSON(b)
sb := array.NewListBuilder(memory.DefaultAllocator, types.NewUUIDType())
err = sb.UnmarshalJSON(b)
if err != nil {
return nil, fmt.Errorf("failed to unmarshal col %v: %w", rec.ColumnName(c), err)
}
default:
b, err := rec.Column(c).MarshalJSON()
cols[c] = sb.NewArray()
case arrow.TypeEqual(arrowSchema.Field(c).Type, arrow.ListOf(types.NewInetType())):
arr := col.(*array.List)
b, err := arr.MarshalJSON()
if err != nil {
return nil, fmt.Errorf("failed to marshal col %v: %w", rec.ColumnName(c), err)
}
sb := array.NewListBuilder(memory.DefaultAllocator, types.NewInetType())
err = sb.UnmarshalJSON(b)
if err != nil {
return nil, fmt.Errorf("failed to unmarshal col %v: %w", rec.ColumnName(c), err)
}
cols[c] = sb.NewArray()
case arrow.TypeEqual(arrowSchema.Field(c).Type, arrow.ListOf(types.NewMacType())):
arr := col.(*array.List)
b, err := arr.MarshalJSON()
if err != nil {
return nil, fmt.Errorf("failed to marshal col %v: %w", rec.ColumnName(c), err)
}
err = rb.Field(c).UnmarshalJSON(b)
sb := array.NewListBuilder(memory.DefaultAllocator, types.NewMacType())
err = sb.UnmarshalJSON(b)
if err != nil {
return nil, fmt.Errorf("failed to unmarshal col %v: %w", rec.ColumnName(c), err)
}
cols[c] = sb.NewArray()
default:
cols[c] = col
}
}
return rb.NewRecord(), nil
return array.NewRecord(arrowSchema, cols, rec.NumRows()), nil
}
197 changes: 0 additions & 197 deletions parquet/transformer.go

This file was deleted.

Loading

0 comments on commit e8e7cb8

Please sign in to comment.