Skip to content

Commit

Permalink
Use arrow library for checkpoint parquet file read/write (#20)
Browse files Browse the repository at this point in the history
* Use arrow library

* Refactor parquet read; now much faster

* Remove old arrow conversion functions
  • Loading branch information
chelseajonesr authored Jul 24, 2023
1 parent 6167048 commit 5d36c22
Show file tree
Hide file tree
Showing 8 changed files with 1,044 additions and 771 deletions.
438 changes: 53 additions & 385 deletions action.go

Large diffs are not rendered by default.

669 changes: 669 additions & 0 deletions arrow.go

Large diffs are not rendered by default.

208 changes: 208 additions & 0 deletions arrow_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
package delta

import (
"fmt"
"reflect"
"testing"
)

func ptr[T any](v T) *T {
return &v
}
func TestRoundTripSimpleTypes(t *testing.T) {
type SimpleTypes struct {
AnInt int `parquet:"name=anInt"`
AnInt8 int8 `parquet:"name=anInt8"`
AnInt16 int16 `parquet:"name=anInt16"`
AnInt32 int32 `parquet:"name=anInt32"`
AnInt64 int64 `parquet:"name=anInt64"`
AFloat32 float32 `parquet:"name=aFloat32"`
AFloat64 float32 `parquet:"name=aFloat64"`
AString string `parquet:"name=aString, converted=UTF8"`
ABool bool `parquet:"name=aBool"`
AUint uint `parquet:"name=aUint"`
AUint8 uint8 `parquet:"name=aUint8"`
AUint16 uint16 `parquet:"name=aUint16"`
AUint32 uint32 `parquet:"name=aUint32"`
AUint64 uint64 `parquet:"name=aUint64"`
}

simpleTypes := []SimpleTypes{
{AnInt: 1, AnInt8: 2, AnInt16: 3, AnInt32: -50, AnInt64: 8223372036854775807, AFloat32: 3.1415, AFloat64: -1.1234, AString: "Good morning!", ABool: true, AUint: 12, AUint8: 7, AUint16: 5, AUint32: 32, AUint64: 6543},
{AnInt: -42, AnInt8: 12, AnInt16: -34, AnInt32: 12345, AnInt64: -100000, AFloat32: 0, AFloat64: 1, AString: "", ABool: false, AUint: 912, AUint8: 4, AUint16: 55, AUint32: 23, AUint64: 3456},
}
expected := make([]any, len(simpleTypes))
for i, s := range simpleTypes {
expected[i] = s
}

bytes, err := writeStructsToParquetBytes(simpleTypes)
if err != nil {
t.Fatal(err)
}

i := 0
processSimpleStruct := func(result *SimpleTypes) error {
if !reflect.DeepEqual(*result, expected[i]) {
return fmt.Errorf("expected %v got %v", expected[i], *result)
}
i++
return nil
}

err = readAndProcessStructsFromParquet(bytes, processSimpleStruct)
if err != nil {
t.Error(err)
}

type PointerTypes struct {
AnInt *int `parquet:"name=anInt"`
AnInt8 *int8 `parquet:"name=anInt8"`
AnInt16 *int16 `parquet:"name=anInt16"`
AnInt32 *int32 `parquet:"name=anInt32"`
AnInt64 *int64 `parquet:"name=anInt64"`
AFloat32 *float32 `parquet:"name=aFloat32"`
AFloat64 *float32 `parquet:"name=aFloat64"`
AString *string `parquet:"name=aString, converted=UTF8"`
ABool *bool `parquet:"name=aBool"`
AUint *uint `parquet:"name=aUint"`
AUint8 *uint8 `parquet:"name=aUint8"`
AUint16 *uint16 `parquet:"name=aUint16"`
AUint32 *uint32 `parquet:"name=aUint32"`
AUint64 *uint64 `parquet:"name=aUint64"`
}
simpleToPointer := func(s *SimpleTypes) PointerTypes {
return PointerTypes{
AnInt: &s.AnInt,
AnInt8: &s.AnInt8,
AnInt16: &s.AnInt16,
AnInt32: &s.AnInt32,
AnInt64: &s.AnInt64,
AFloat32: &s.AFloat32,
AFloat64: &s.AFloat64,
AString: &s.AString,
ABool: &s.ABool,
AUint: &s.AUint,
AUint8: &s.AUint8,
AUint16: &s.AUint16,
AUint32: &s.AUint32,
AUint64: &s.AUint64,
}
}

pointerTypes := make([]PointerTypes, len(simpleTypes))
for i, s := range simpleTypes {
pointerTypes[i] = simpleToPointer(&s)
expected[i] = pointerTypes[i]
}

i = 0
processPointerStruct := func(result *PointerTypes) error {
if !reflect.DeepEqual(*result, expected[i]) {
return fmt.Errorf("expected %v got %v", expected[i], *result)
}
i++
return nil
}
bytes, err = writeStructsToParquetBytes(pointerTypes)
if err != nil {
t.Fatal(err)
}
err = readAndProcessStructsFromParquet(bytes, processPointerStruct)
if err != nil {
t.Error(err)
}
}

func TestRoundTripNestedTypes(t *testing.T) {
// Note that round tripping a nil map or slice (not a pointer) will fail on the compare,
// because arrow will read it as an empty non-nil map or slice
type NestedTypes2 struct {
AnInt int32 `parquet:"name=anInt"`
AString string `parquet:"name=aString, converted=UTF8"`
}
type NestedTypes1 struct {
AMap map[int64]NestedTypes2 `parquet:"name=aMap"`
ANestedTypes2 NestedTypes2 `parquet:"name=aNestedTypes2"`
}
type NestedTypes struct {
AMap map[string]string `parquet:"name=aMap, keyconverted=UTF8, valueconverted=UTF8"`
AList []int32 `parquet:"name=aList"`
AMapPtr *map[string]string `parquet:"name=aMapPtr, keyconverted=UTF8, valueconverted=UTF8"`
AListPtr *[]int32 `parquet:"name=aListPtr"`
ANestedTypes1 NestedTypes1 `parquet:"name=aNestedTypes1"`
ANestedTypes2 *NestedTypes2 `parquet:"name=aNestedTypes2"`
AListNested []NestedTypes2 `parquet:"name=aListNested"`
}

nestedTypes2 := []NestedTypes2{
{AnInt: 1, AString: "one"},
{AnInt: 2, AString: "two"},
{AnInt: 3, AString: "three"},
{AnInt: -4, AString: "negative four"},
{AnInt: 5, AString: ""},
}

nestedTypes1 := []NestedTypes1{
// {ANestedTypes2: nestedTypes2[1]},
// {ANestedTypes2: nestedTypes2[4]},
// {ANestedTypes2: nestedTypes2[2]},
{AMap: map[int64]NestedTypes2{1234: nestedTypes2[0], 567: nestedTypes2[1]}, ANestedTypes2: nestedTypes2[1]},
{AMap: map[int64]NestedTypes2{89: nestedTypes2[3], 0: nestedTypes2[2]}, ANestedTypes2: nestedTypes2[4]},
{AMap: map[int64]NestedTypes2{}, ANestedTypes2: nestedTypes2[2]},
}

nestedTypes := []NestedTypes{
{
AMap: map[string]string{
"hello": "bye",
"blue": "green",
"empty": "",
},
AList: []int32{1, 2, 3, 4},
AMapPtr: ptr(map[string]string{"ok": "okay", "pencil": "mechanical"}),
AListPtr: nil,
ANestedTypes1: nestedTypes1[2],
ANestedTypes2: nil,
AListNested: []NestedTypes2{nestedTypes2[0], nestedTypes2[3]},
},
{
AMap: map[string]string{
"apple": "banana",
"coffee": "tea",
},
AList: []int32{1, 1, 1},
AMapPtr: nil,
AListPtr: ptr([]int32{987, 654}),
ANestedTypes1: nestedTypes1[1],
ANestedTypes2: ptr(nestedTypes2[0]),
AListNested: []NestedTypes2{},
},
{
AMap: map[string]string{},
AList: []int32{},
ANestedTypes1: nestedTypes1[2],
AListNested: []NestedTypes2{nestedTypes2[0]},
},
}
expected := make([]any, len(nestedTypes))
for i := range nestedTypes {
expected[i] = nestedTypes[i]
}
i := 0
processNestedStruct := func(result *NestedTypes) error {
if !reflect.DeepEqual(*result, expected[i]) {
return fmt.Errorf("expected %v got %v at index %d", expected[i], *result, i)
}
i++
return nil
}
bytes, err := writeStructsToParquetBytes(nestedTypes)
if err != nil {
t.Fatal(err)
}
err = readAndProcessStructsFromParquet(bytes, processNestedStruct)
if err != nil {
t.Error(err)
}
}
5 changes: 2 additions & 3 deletions checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,7 @@ type CheckpointEntry[RowType any, PartitionType any, AddType AddPartitioned[RowT
Remove *Remove `parquet:"name=remove"`
MetaData *MetaData `parquet:"name=metaData"`
Protocol *Protocol `parquet:"name=protocol"`
// CDC not implemented yet
Cdc *Cdc
Cdc *Cdc `parquet:"-"` // CDC not implemented yet
}

// / Additional configuration for checkpointing
Expand Down Expand Up @@ -231,7 +230,7 @@ func createCheckpointWithAddType[RowType any, PartitionType any, AddType AddPart
}
offsetRow += len(records)

parquetBytes, err := checkpointParquetBytes(records)
parquetBytes, err := writeStructsToParquetBytes(records)
if err != nil {
return err
}
Expand Down
10 changes: 7 additions & 3 deletions checkpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,12 @@ func TestSimpleCheckpoint(t *testing.T) {
}
}

// Remove the previous log to make sure we use the checkpoint when loading
err = store.Delete(table.CommitUriFromVersion(4))
if err != nil {
t.Error(err)
}

// Checkpoint at version 10
_, err = CreateCheckpoint[simpleCheckpointTestData, simpleCheckpointTestPartition](store, checkpointLock, checkpointConfiguration, 10)
if err != nil {
Expand Down Expand Up @@ -492,8 +498,6 @@ func TestCheckpointNoPartition(t *testing.T) {
}

add1.DataChange = false
tags := make(map[string]string)
add1.Tags = &tags
if !reflect.DeepEqual(table.State.Files[add1.Path], *add1) {
t.Errorf("Expected %v found %v", add1, table.State.Files[add1.Path])
}
Expand Down Expand Up @@ -890,7 +894,7 @@ func TestCheckpointCleanupExpiredLogs(t *testing.T) {

table := NewDeltaTable[simpleCheckpointTestData, simpleCheckpointTestPartition](store, lock, stateStore)
// Use log expiration of 10 minutes
table.Create(DeltaTableMetaData{Configuration: map[string]string{string(LogRetentionDurationDeltaConfigKey): "interval 10 minutes", string(EnableExpiredLogCleanupDeltaConfigKey): strconv.FormatBool(enableCleanupInTableConfig)}}, Protocol{}, CommitInfo{}, []AddPartitioned[simpleCheckpointTestData, simpleCheckpointTestPartition]{})
table.Create(DeltaTableMetaData{Configuration: map[string]string{string(LogRetentionDurationDeltaConfigKey): "interval 10 minutes", string(EnableExpiredLogCleanupDeltaConfigKey): strconv.FormatBool(enableCleanupInTableConfig)}}, new(Protocol).Default(), CommitInfo{}, []AddPartitioned[simpleCheckpointTestData, simpleCheckpointTestPartition]{})

add1 := getTestAdd[simpleCheckpointTestData, simpleCheckpointTestPartition](3 * 60 * 1000) // 3 mins ago
add2 := getTestAdd[simpleCheckpointTestData, simpleCheckpointTestPartition](2 * 60 * 1000) // 2 mins ago
Expand Down
34 changes: 24 additions & 10 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go 1.20

require (
cirello.io/dynamolock v1.4.0
github.com/apache/arrow/go/v13 v13.0.0
github.com/aws/aws-sdk-go v1.44.200
github.com/aws/aws-sdk-go-v2 v1.18.1
github.com/aws/smithy-go v1.13.5
Expand All @@ -15,14 +16,12 @@ require (
github.com/segmentio/parquet-go v0.0.0-20230427215636-d483faba23a5
github.com/sirupsen/logrus v1.9.0
github.com/stvp/tempredis v0.0.0-20181119212430-b82af8480203
github.com/xitongsys/parquet-go v1.6.2
github.com/xitongsys/parquet-go-source v0.0.0-20200817004010-026bad9b25d0
golang.org/x/exp v0.0.0-20230224173230-c95f2b4c22f2
)

require (
github.com/apache/arrow/go/arrow v0.0.0-20200730104253-651201b0f516 // indirect
github.com/apache/thrift v0.14.2 // indirect
github.com/JohnCGriffin/overflow v0.0.0-20211019200055-46fa312c352c // indirect
github.com/apache/thrift v0.16.0 // indirect
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.4.10 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.34 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.28 // indirect
Expand All @@ -31,13 +30,28 @@ require (
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.1.25 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.28 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.13.24 // indirect
github.com/golang/snappy v0.0.3 // indirect
github.com/goccy/go-json v0.10.0 // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/google/flatbuffers v23.1.21+incompatible // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/klauspost/asmfmt v1.3.2 // indirect
github.com/klauspost/cpuid/v2 v2.2.3 // indirect
github.com/kr/pretty v0.3.0 // indirect
github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 // indirect
github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 // indirect
github.com/rivo/uniseg v0.1.0 // indirect
github.com/rogpeppe/go-internal v1.9.0 // indirect
golang.org/x/xerrors v0.0.0-20220609144429-65e65417b02f // indirect
github.com/zeebo/xxh3 v1.0.2 // indirect
golang.org/x/mod v0.8.0 // indirect
golang.org/x/net v0.7.0 // indirect
golang.org/x/sync v0.1.0 // indirect
golang.org/x/text v0.7.0 // indirect
golang.org/x/tools v0.6.0 // indirect
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
google.golang.org/genproto v0.0.0-20230209215440-0dfe4f8abfcc // indirect
google.golang.org/grpc v1.53.0 // indirect
google.golang.org/protobuf v1.30.0 // indirect
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect
)

Expand All @@ -48,12 +62,12 @@ require (
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/gomodule/redigo v1.8.9 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/klauspost/compress v1.15.9 // indirect
github.com/klauspost/compress v1.15.15 // indirect
github.com/mattn/go-runewidth v0.0.10 // indirect
github.com/olekukonko/tablewriter v0.0.5 // indirect
github.com/pierrec/lz4/v4 v4.1.15 // indirect
github.com/pierrec/lz4/v4 v4.1.17 // indirect
github.com/segmentio/encoding v0.3.5 // indirect
golang.org/x/sys v0.5.0 // indirect
)

replace github.com/segmentio/parquet-go v0.0.0-20230427215636-d483faba23a5 => github.com/chelseajonesr/parquet-go v0.0.3
replace github.com/apache/arrow/go/v13 => github.com/chelseajonesr/arrow/go/v13 v13.0.0-20230711200800-c7890b0a2007
Loading

0 comments on commit 5d36c22

Please sign in to comment.