Skip to content

Commit

Permalink
fix(deps): Upgrade SDK to 3.6.3, encode unsupported types as strings (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
disq authored May 25, 2023
1 parent ba7c364 commit 6b6e305
Show file tree
Hide file tree
Showing 17 changed files with 292 additions and 245 deletions.
38 changes: 36 additions & 2 deletions csv/read.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,20 @@
package csv

import (
"fmt"
"io"

"github.com/apache/arrow/go/v13/arrow"
"github.com/apache/arrow/go/v13/arrow/array"
"github.com/apache/arrow/go/v13/arrow/csv"
"github.com/apache/arrow/go/v13/arrow/memory"
"github.com/cloudquery/plugin-sdk/v3/schema"
)

func (cl *Client) Read(r io.Reader, table *schema.Table, _ string, res chan<- arrow.Record) error {
reader := csv.NewReader(r, table.ToArrowSchema(),
arrowSchema := table.ToArrowSchema()
newSchema := convertSchema(arrowSchema)
reader := csv.NewReader(r, newSchema,
csv.WithComma(cl.Delimiter),
csv.WithHeader(cl.IncludeHeaders),
csv.WithNullReader(true, ""),
Expand All @@ -20,7 +25,36 @@ func (cl *Client) Read(r io.Reader, table *schema.Table, _ string, res chan<- ar
}
rec := reader.Record()
rec.Retain()
res <- rec
castRec, err := castFromString(rec, arrowSchema)
if err != nil {
return fmt.Errorf("failed to cast extension types: %w", err)
}
res <- castRec
}
return nil
}

// castFromString casts extension columns to string.
func castFromString(rec arrow.Record, arrowSchema *arrow.Schema) (arrow.Record, error) {
cols := make([]arrow.Array, rec.NumCols())
for c, f := range arrowSchema.Fields() {
col := rec.Column(c)
if isTypeSupported(f.Type) {
cols[c] = col
continue
}

sb := array.NewBuilder(memory.DefaultAllocator, f.Type)
for i := 0; i < col.Len(); i++ {
if col.IsNull(i) {
sb.AppendNull()
continue
}
if err := sb.AppendValueFromString(col.ValueStr(i)); err != nil {
return nil, fmt.Errorf("failed to AppendValueFromString col %v: %w", rec.ColumnName(c), err)
}
}
cols[c] = sb.NewArray()
}
return array.NewRecord(arrowSchema, cols, rec.NumRows()), nil
}
Binary file modified csv/testdata/TestWriteRead-default.csv
Binary file not shown.
Binary file modified csv/testdata/TestWriteRead-with_delimiter.csv
Binary file not shown.
Binary file modified csv/testdata/TestWriteRead-with_delimiter_headers.csv
Binary file not shown.
Binary file modified csv/testdata/TestWriteRead-with_headers.csv
Binary file not shown.
68 changes: 66 additions & 2 deletions csv/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,23 @@ import (
"io"

"github.com/apache/arrow/go/v13/arrow"
"github.com/apache/arrow/go/v13/arrow/array"
"github.com/apache/arrow/go/v13/arrow/csv"
"github.com/apache/arrow/go/v13/arrow/memory"
"github.com/cloudquery/plugin-sdk/v3/schema"
)

func (cl *Client) WriteTableBatch(w io.Writer, table *schema.Table, records []arrow.Record) error {
writer := csv.NewWriter(w, table.ToArrowSchema(),
newSchema := convertSchema(table.ToArrowSchema())
writer := csv.NewWriter(w, newSchema,
csv.WithComma(cl.Delimiter),
csv.WithHeader(cl.IncludeHeaders),
csv.WithNullWriter(""),
)
for _, record := range records {
if err := writer.Write(record); err != nil {
castRec := castToString(record)

if err := writer.Write(castRec); err != nil {
return fmt.Errorf("failed to write record to csv: %w", err)
}
if err := writer.Flush(); err != nil {
Expand All @@ -26,3 +31,62 @@ func (cl *Client) WriteTableBatch(w io.Writer, table *schema.Table, records []ar
}
return nil
}

func convertSchema(sch *arrow.Schema) *arrow.Schema {
oldFields := sch.Fields()
fields := make([]arrow.Field, len(oldFields))
copy(fields, oldFields)
for i, f := range fields {
if !isTypeSupported(f.Type) {
fields[i].Type = arrow.BinaryTypes.String
}
}

md := sch.Metadata()
newSchema := arrow.NewSchema(fields, &md)
return newSchema
}

func isTypeSupported(t arrow.DataType) bool {
// list from arrow/csv/common.go
switch t.(type) {
case *arrow.BooleanType:
case *arrow.Int8Type, *arrow.Int16Type, *arrow.Int32Type, *arrow.Int64Type:
case *arrow.Uint8Type, *arrow.Uint16Type, *arrow.Uint32Type, *arrow.Uint64Type:
case *arrow.Float32Type, *arrow.Float64Type:
case *arrow.StringType:
case *arrow.TimestampType:
case *arrow.Date32Type, *arrow.Date64Type:
case *arrow.Decimal128Type, *arrow.Decimal256Type:
case *arrow.ListType:
case *arrow.BinaryType:
case arrow.ExtensionType:
return true
}

return false
}

// castToString casts extension columns or unsupported columns to string. It does not release the original record.
func castToString(rec arrow.Record) arrow.Record {
newSchema := convertSchema(rec.Schema())
cols := make([]arrow.Array, rec.NumCols())
for c := 0; c < int(rec.NumCols()); c++ {
col := rec.Column(c)
if isTypeSupported(col.DataType()) {
cols[c] = col
continue
}

sb := array.NewStringBuilder(memory.DefaultAllocator)
for i := 0; i < col.Len(); i++ {
if col.IsNull(i) {
sb.AppendNull()
continue
}
sb.Append(col.ValueStr(i))
}
cols[c] = sb.NewArray()
}
return array.NewRecord(newSchema, cols, rec.NumRows())
}
4 changes: 2 additions & 2 deletions csv/write_read_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func TestWriteRead(t *testing.T) {

for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
table := schema.TestTable("test")
table := schema.TestTable("test", schema.TestSourceOptions{})
sourceName := "test-source"
syncTime := time.Date(2021, 1, 1, 0, 0, 0, 0, time.UTC)
opts := schema.GenTestDataOptions{
Expand Down Expand Up @@ -89,7 +89,7 @@ func TestWriteRead(t *testing.T) {
}

func BenchmarkWrite(b *testing.B) {
table := schema.TestTable("test")
table := schema.TestTable("test", schema.TestSourceOptions{})
sourceName := "test-source"
syncTime := time.Now().UTC().Round(time.Second)
opts := schema.GenTestDataOptions{
Expand Down
8 changes: 4 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@ go 1.19
require (
github.com/apache/arrow/go/v13 v13.0.0-20230509040948-de6c3cd2b604
github.com/bradleyjkemp/cupaloy/v2 v2.8.0
github.com/cloudquery/plugin-sdk/v3 v3.0.1
github.com/cloudquery/plugin-sdk/v3 v3.6.3
github.com/goccy/go-json v0.10.2
github.com/google/uuid v1.3.0
github.com/stretchr/testify v1.8.2
)

replace github.com/apache/arrow/go/v13 => github.com/cloudquery/arrow/go/v13 v13.0.0-20230509053643-898a79b1d3c8
replace github.com/apache/arrow/go/v13 => github.com/cloudquery/arrow/go/v13 v13.0.0-20230521112802-adef07d4bbaa

require (
github.com/JohnCGriffin/overflow v0.0.0-20211019200055-46fa312c352c // indirect
Expand All @@ -18,11 +20,9 @@ require (
github.com/cloudquery/plugin-pb-go v1.0.8 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/ghodss/yaml v1.0.0 // indirect
github.com/goccy/go-json v0.10.2 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/google/flatbuffers v2.0.8+incompatible // indirect
github.com/google/uuid v1.3.0
github.com/klauspost/asmfmt v1.3.2 // indirect
github.com/klauspost/compress v1.16.5 // indirect
github.com/klauspost/cpuid/v2 v2.2.4 // indirect
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@ github.com/apache/thrift v0.18.1 h1:lNhK/1nqjbwbiOPDBPFJVKxgDEGSepKuTh6OLiXW8kg=
github.com/apache/thrift v0.18.1/go.mod h1:rdQn/dCcDKEWjjylUeueum4vQEjG2v8v2PqriUnbr+I=
github.com/bradleyjkemp/cupaloy/v2 v2.8.0 h1:any4BmKE+jGIaMpnU8YgH/I2LPiLBufr6oMMlVBbn9M=
github.com/bradleyjkemp/cupaloy/v2 v2.8.0/go.mod h1:bm7JXdkRd4BHJk9HpwqAI8BoAY1lps46Enkdqw6aRX0=
github.com/cloudquery/arrow/go/v13 v13.0.0-20230509053643-898a79b1d3c8 h1:CmgLSEGQNLHpUQ5cU4L4aF7cuJZRnc1toIIWqC1gmPg=
github.com/cloudquery/arrow/go/v13 v13.0.0-20230509053643-898a79b1d3c8/go.mod h1:/XatdE3kDIBqZKhZ7OBUHwP2jaASDFZHqF4puOWM8po=
github.com/cloudquery/arrow/go/v13 v13.0.0-20230521112802-adef07d4bbaa h1:6y3l+YgGqMJsx5TrxFHPjxDqZ5c3M9+r3dv+CYIRl44=
github.com/cloudquery/arrow/go/v13 v13.0.0-20230521112802-adef07d4bbaa/go.mod h1:/XatdE3kDIBqZKhZ7OBUHwP2jaASDFZHqF4puOWM8po=
github.com/cloudquery/plugin-pb-go v1.0.8 h1:wn3GXhcNItcP+6wUUZuzUFbvdL59liKBO37/izMi+FQ=
github.com/cloudquery/plugin-pb-go v1.0.8/go.mod h1:vAGA27psem7ZZNAY4a3S9TKuA/JDQWstjKcHPJX91Mc=
github.com/cloudquery/plugin-sdk/v3 v3.0.1 h1:5l3dG4AIrAWadc0aEiht5au2gM/wHLRSK2qSzao1Sm0=
github.com/cloudquery/plugin-sdk/v3 v3.0.1/go.mod h1:cJP020H448wknQfjCDo0HR0b3vt9kYcjrEWtmV3YIgc=
github.com/cloudquery/plugin-sdk/v3 v3.6.3 h1:TyljGXffaPICARPBg8geOfKI4biP5sjW9OjSkjMXwig=
github.com/cloudquery/plugin-sdk/v3 v3.6.3/go.mod h1:3JrZXEULmGXpkOukVaRIzaA63d7TJr9Ukp6hemTjbtc=
github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
Expand Down
4 changes: 2 additions & 2 deletions json/testdata/TestWriteRead.jsonl
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
{"_cq_id":"00000000-0000-0000-0000-000000000001","_cq_parent_id":"00000000-0000-0000-0000-000000000001","_cq_source_name":"test-source","_cq_sync_time":"2021-01-01 00:00:00","bool":true,"bytea":"AQID","cidr":"192.0.2.0/24","cidr_array":["192.0.2.0/24","192.0.2.0/24"],"float":1.1,"inet":"192.0.2.0/24","inet_array":["192.0.2.0/24","192.0.2.0/24"],"int":1,"int_array":[1,2],"json":{"test":"test"},"macaddr":"aa:bb:cc:dd:ee:ff","macaddr_array":["aa:bb:cc:dd:ee:ff","11:22:33:44:55:66"],"string_pk":"AString","text":"AString","text_array":["test1","test2"],"text_array_with_null":["test1","test2\u0000WithNull"],"text_with_null":"AStringWith\u0000NullBytes","timestamp":"2021-01-02 00:00:00","uuid":"00000000-0000-0000-0000-000000000001","uuid_array":["00000000-0000-0000-0000-000000000001","00000000-0000-0000-0000-000000000002"],"uuid_pk":"00000000-0000-0000-0000-000000000001"}
{"_cq_id":"00000000-0000-0000-0000-000000000001","_cq_parent_id":"00000000-0000-0000-0000-000000000001","_cq_source_name":"test-source","_cq_sync_time":"2021-01-01 00:00:00","bool":null,"bytea":null,"cidr":null,"cidr_array":null,"float":null,"inet":null,"inet_array":null,"int":null,"int_array":null,"json":null,"macaddr":null,"macaddr_array":null,"string_pk":"AString","text":null,"text_array":null,"text_array_with_null":null,"text_with_null":null,"timestamp":null,"uuid":null,"uuid_array":null,"uuid_pk":"00000000-0000-0000-0000-000000000001"}
{"_cq_id":"00000000-0000-0000-0000-000000000001","_cq_parent_id":"00000000-0000-0000-0000-000000000001","_cq_source_name":"test-source","_cq_sync_time":"2021-01-01 00:00:00","binary":"AQIDBA==","boolean":true,"boolean_list":[true,null,true],"date32":"2023-04-24","date32_list":["2023-04-24",null,"2023-04-24"],"date64":"2023-04-24","date64_list":["2023-04-24",null,"2023-04-24"],"daytimeinterval":{"days":1,"milliseconds":1},"daytimeinterval_list":[{"days":1,"milliseconds":1},null,{"days":1,"milliseconds":1}],"duration_ms":"123456789ms","duration_ms_list":["123456789ms",null,"123456789ms"],"duration_ns":"123456789ns","duration_ns_list":["123456789ns",null,"123456789ns"],"duration_s":"123456789s","duration_s_list":["123456789s",null,"123456789s"],"duration_us":"123456789us","duration_us_list":["123456789us",null,"123456789us"],"float32":1.1,"float32_list":[1.100000023841858,null,1.100000023841858],"float64":1.1,"float64_list":[1.1,null,1.1],"inet":"192.0.2.0/24","inet_list":["192.0.2.0/24",null,"192.0.2.0/24"],"int16":-1,"int16_list":[-1,null,-1],"int32":-1,"int32_list":[-1,null,-1],"int64":-1,"int64_list":[-1,null,-1],"int8":-1,"int8_list":[-1,null,-1],"json":{"test":["a","b",3]},"json_array":[{"test":"test"},123,{"test_number":456}],"largebinary":"AQIDBA==","largestring":"AString","largestring_list":["AString",null,"AString"],"mac":"aa:bb:cc:dd:ee:ff","mac_list":["aa:bb:cc:dd:ee:ff",null,"aa:bb:cc:dd:ee:ff"],"monthdaynanointerval":{"months":1,"days":1,"nanoseconds":1},"monthdaynanointerval_list":[{"months":1,"days":1,"nanoseconds":1},null,{"months":1,"days":1,"nanoseconds":1}],"monthinterval":{"months":1},"monthinterval_list":[{"months":1},null,{"months":1}],"nested_struct":{"inner":{"binary":"AQIDBA==","boolean":true,"date32":"2023-04-24","date64":"2023-04-24","daytimeinterval":{"days":1,"milliseconds":1},"duration_ms":"123456789ms","duration_ns":"123456789ns","duration_s":"123456789s","duration_us":"123456789us","float32":1.1,"float64":1.1,"inet":"192.0.2.0/24","int16":-1,"int32":-1,"int64":-1,"int8":-1,"json":{"test":["a","b",3]},"json_array":[{"test":"test"},123,{"test_number":456}],"largebinary":"AQIDBA==","largestring":"AString","mac":"aa:bb:cc:dd:ee:ff","monthdaynanointerval":{"months":1,"days":1,"nanoseconds":1},"monthinterval":{"months":1},"string":"AString","time32ms":"00:00:00","time32s":"00:00:00","time64ns":"00:00:00","time64us":"00:00:00","timestamp_ms":"2021-01-02 00:00:00","timestamp_ns":"2021-01-02 00:00:00","timestamp_s":"2021-01-02 00:00:00","timestamp_us":"2021-01-02 00:00:00","uint16":1,"uint32":1,"uint64":1,"uint8":1,"uuid":"00000000-0000-0000-0000-000000000001"}},"string":"AString","string_list":["AString",null,"AString"],"struct":{"binary":"AQIDBA==","boolean":true,"date32":"2023-04-24","date64":"2023-04-24","daytimeinterval":{"days":1,"milliseconds":1},"duration_ms":"123456789ms","duration_ns":"123456789ns","duration_s":"123456789s","duration_us":"123456789us","float32":1.1,"float64":1.1,"inet":"192.0.2.0/24","int16":-1,"int32":-1,"int64":-1,"int8":-1,"json":{"test":["a","b",3]},"json_array":[{"test":"test"},123,{"test_number":456}],"largebinary":"AQIDBA==","largestring":"AString","mac":"aa:bb:cc:dd:ee:ff","monthdaynanointerval":{"months":1,"days":1,"nanoseconds":1},"monthinterval":{"months":1},"string":"AString","time32ms":"00:00:00","time32s":"00:00:00","time64ns":"00:00:00","time64us":"00:00:00","timestamp_ms":"2021-01-02 00:00:00","timestamp_ns":"2021-01-02 00:00:00","timestamp_s":"2021-01-02 00:00:00","timestamp_us":"2021-01-02 00:00:00","uint16":1,"uint32":1,"uint64":1,"uint8":1,"uuid":"00000000-0000-0000-0000-000000000001"},"time32ms":"00:00:00","time32ms_list":["00:00:00",null,"00:00:00"],"time32s":"00:00:00","time32s_list":["00:00:00",null,"00:00:00"],"time64ns":"00:00:00","time64ns_list":["00:00:00",null,"00:00:00"],"time64us":"00:00:00","time64us_list":["00:00:00",null,"00:00:00"],"timestamp_ms":"2021-01-02 00:00:00","timestamp_ms_list":["2021-01-02 00:00:00",null,"2021-01-02 00:00:00"],"timestamp_ns":"2021-01-02 00:00:00","timestamp_ns_list":["2021-01-02 00:00:00",null,"2021-01-02 00:00:00"],"timestamp_s":"2021-01-02 00:00:00","timestamp_s_list":["2021-01-02 00:00:00",null,"2021-01-02 00:00:00"],"timestamp_us":"2021-01-02 00:00:00","timestamp_us_list":["2021-01-02 00:00:00",null,"2021-01-02 00:00:00"],"uint16":1,"uint16_list":[1,null,1],"uint32":1,"uint32_list":[1,null,1],"uint64":1,"uint64_list":[1,null,1],"uint8":1,"uint8_list":[1,null,1],"uuid":"00000000-0000-0000-0000-000000000001","uuid_list":["00000000-0000-0000-0000-000000000001",null,"00000000-0000-0000-0000-000000000001"]}
{"_cq_id":"00000000-0000-0000-0000-000000000001","_cq_parent_id":null,"_cq_source_name":"test-source","_cq_sync_time":"2021-01-01 00:00:00","binary":null,"boolean":null,"boolean_list":null,"date32":null,"date32_list":null,"date64":null,"date64_list":null,"daytimeinterval":null,"daytimeinterval_list":null,"duration_ms":null,"duration_ms_list":null,"duration_ns":null,"duration_ns_list":null,"duration_s":null,"duration_s_list":null,"duration_us":null,"duration_us_list":null,"float32":null,"float32_list":null,"float64":null,"float64_list":null,"inet":null,"inet_list":null,"int16":null,"int16_list":null,"int32":null,"int32_list":null,"int64":null,"int64_list":null,"int8":null,"int8_list":null,"json":null,"json_array":null,"largebinary":null,"largestring":null,"largestring_list":null,"mac":null,"mac_list":null,"monthdaynanointerval":null,"monthdaynanointerval_list":null,"monthinterval":null,"monthinterval_list":null,"nested_struct":null,"string":null,"string_list":null,"struct":null,"time32ms":null,"time32ms_list":null,"time32s":null,"time32s_list":null,"time64ns":null,"time64ns_list":null,"time64us":null,"time64us_list":null,"timestamp_ms":null,"timestamp_ms_list":null,"timestamp_ns":null,"timestamp_ns_list":null,"timestamp_s":null,"timestamp_s_list":null,"timestamp_us":null,"timestamp_us_list":null,"uint16":null,"uint16_list":null,"uint32":null,"uint32_list":null,"uint64":null,"uint64_list":null,"uint8":null,"uint8_list":null,"uuid":null,"uuid_list":null}

2 changes: 1 addition & 1 deletion json/write.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
package json

import (
"encoding/json"
"io"

"github.com/apache/arrow/go/v13/arrow"
"github.com/apache/arrow/go/v13/arrow/array"
"github.com/cloudquery/plugin-sdk/v3/schema"
"github.com/goccy/go-json"
)

func (c *Client) WriteTableBatch(w io.Writer, _ *schema.Table, records []arrow.Record) error {
Expand Down
6 changes: 3 additions & 3 deletions json/write_read_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (

func TestWrite(t *testing.T) {
var b bytes.Buffer
table := schema.TestTable("test")
table := schema.TestTable("test", schema.TestSourceOptions{})
sourceName := "test-source"
syncTime := time.Now().UTC().Round(time.Second)
opts := schema.GenTestDataOptions{
Expand All @@ -36,7 +36,7 @@ func TestWrite(t *testing.T) {
}

func TestWriteRead(t *testing.T) {
table := schema.TestTable("test")
table := schema.TestTable("test", schema.TestSourceOptions{})
sourceName := "test-source"
syncTime := time.Date(2021, 1, 1, 0, 0, 0, 0, time.UTC)
opts := schema.GenTestDataOptions{
Expand Down Expand Up @@ -95,7 +95,7 @@ func TestWriteRead(t *testing.T) {
}

func BenchmarkWrite(b *testing.B) {
table := schema.TestTable("test")
table := schema.TestTable("test", schema.TestSourceOptions{})
sourceName := "test-source"
syncTime := time.Now().UTC().Round(time.Second)
opts := schema.GenTestDataOptions{
Expand Down
10 changes: 10 additions & 0 deletions parquet/list.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package parquet

import (
"github.com/apache/arrow/go/v13/arrow"
)

type listLikeType interface {
arrow.DataType
Elem() arrow.DataType
}
Loading

0 comments on commit 6b6e305

Please sign in to comment.