Skip to content

Commit

Permalink
feat!: Move to Arrow implementation (#120)
Browse files Browse the repository at this point in the history
* Move to Apache Arrow in-memory representation
  • Loading branch information
hermanschaaf authored Apr 18, 2023
1 parent a5deffa commit b4fb660
Show file tree
Hide file tree
Showing 37 changed files with 539 additions and 2,999 deletions.
21 changes: 6 additions & 15 deletions client.go
Original file line number Diff line number Diff line change
@@ -1,26 +1,17 @@
package filetypes

import (
csvFile "github.com/cloudquery/filetypes/csv"
jsonFile "github.com/cloudquery/filetypes/json"
"github.com/cloudquery/filetypes/parquet"
"github.com/cloudquery/plugin-sdk/schema"
csvFile "github.com/cloudquery/filetypes/v2/csv"
jsonFile "github.com/cloudquery/filetypes/v2/json"
"github.com/cloudquery/filetypes/v2/parquet"
)

type Client struct {
spec *FileSpec

csv *csvFile.Client
csvTransformer schema.DefaultTransformer
csvReverseTransformer csvFile.ReverseTransformer

json *jsonFile.Client
jsonTransformer schema.DefaultTransformer
jsonReverseTransformer jsonFile.ReverseTransformer

parquet *parquet.Client
parquetTransformer parquet.Transformer
parquetReverseTransformer parquet.ReverseTransformer
csv *csvFile.Client
json *jsonFile.Client
parquet *parquet.Client
}

// NewClient creates a new client for the given spec
Expand Down
46 changes: 14 additions & 32 deletions csv/read.go
Original file line number Diff line number Diff line change
@@ -1,43 +1,25 @@
package csv

import (
"encoding/csv"
"errors"
"fmt"
"io"

"github.com/cloudquery/plugin-sdk/schema"
"github.com/apache/arrow/go/v12/arrow"
"github.com/apache/arrow/go/v12/arrow/csv"
)

func (cl *Client) Read(r io.Reader, table *schema.Table, sourceName string, res chan<- []any) error {
reader := csv.NewReader(r)
reader.Comma = cl.Delimiter
sourceNameIndex := table.Columns.Index(schema.CqSourceNameColumn.Name)
if sourceNameIndex == -1 {
return fmt.Errorf("could not find column %s in table %s", schema.CqSourceNameColumn.Name, table.Name)
}
if cl.IncludeHeaders {
_, err := reader.Read()
if err != nil {
return err
}
}
for {
record, err := reader.Read()
if err != nil {
if errors.Is(err, io.EOF) {
break
}
return err
}
if record[sourceNameIndex] != sourceName {
continue
}
values := make([]any, len(record))
for i, v := range record {
values[i] = v
func (cl *Client) Read(r io.Reader, arrowSchema *arrow.Schema, _ string, res chan<- arrow.Record) error {
reader := csv.NewReader(r, arrowSchema,
csv.WithComma(cl.Delimiter),
csv.WithHeader(cl.IncludeHeaders),
csv.WithNullReader(true, ""),
)
for reader.Next() {
if reader.Err() != nil {
return reader.Err()
}
res <- values
rec := reader.Record()
rec.Retain()
res <- rec
}
return nil
}
Binary file added csv/testdata/TestWriteRead-default.csv
Binary file not shown.
Binary file added csv/testdata/TestWriteRead-with_delimiter.csv
Binary file not shown.
Binary file not shown.
Binary file added csv/testdata/TestWriteRead-with_headers.csv
Binary file not shown.
14 changes: 0 additions & 14 deletions csv/transformer.go

This file was deleted.

33 changes: 13 additions & 20 deletions csv/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,30 +5,23 @@ import (
"fmt"
"io"

"github.com/apache/arrow/go/arrow/memory"
"github.com/apache/arrow/go/v12/arrow"
"github.com/apache/arrow/go/v12/arrow/csv"
"github.com/cloudquery/filetypes/internal/cqarrow"
"github.com/cloudquery/plugin-sdk/schema"
)

func (cl *Client) WriteTableBatch(w io.Writer, table *schema.Table, resources [][]any) error {
arrowSchema := cqarrow.CQSchemaToArrow(table)
cqTypes := make([]schema.CQTypes, len(resources))
for i := range resources {
cqTypes[i] = make(schema.CQTypes, len(resources[i]))
for j := range resources[i] {
cqTypes[i][j] = resources[i][j].(schema.CQType)
func (cl *Client) WriteTableBatch(w io.Writer, arrowSchema *arrow.Schema, records []arrow.Record) error {
writer := csv.NewWriter(w, arrowSchema,
csv.WithComma(cl.Delimiter),
csv.WithHeader(cl.IncludeHeaders),
csv.WithNullWriter(""),
)
for _, record := range records {
if err := writer.Write(record); err != nil {
return fmt.Errorf("failed to write record to csv: %w", err)
}
if err := writer.Flush(); err != nil {
return fmt.Errorf("failed to flush csv writer: %w", err)
}
}
record := cqarrow.CQTypesToRecord(memory.DefaultAllocator, cqTypes, arrowSchema)
defer record.Release()

writer := csv.NewWriter(w, arrowSchema, csv.WithComma(cl.Delimiter), csv.WithHeader(cl.IncludeHeaders), csv.WithNullWriter(""))
if err := writer.Write(record); err != nil {
return fmt.Errorf("failed to write record to csv: %w", err)
}
if err := writer.Flush(); err != nil {
return fmt.Errorf("failed to flush csv writer: %w", err)
}
return nil
}
84 changes: 54 additions & 30 deletions csv/write_read_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,16 @@ package csv
import (
"bufio"
"bytes"
"io"
"testing"
"time"

"github.com/cloudquery/plugin-sdk/schema"
"github.com/cloudquery/plugin-sdk/testdata"
"github.com/apache/arrow/go/v12/arrow"
"github.com/apache/arrow/go/v12/arrow/memory"
"github.com/bradleyjkemp/cupaloy/v2"
"github.com/cloudquery/plugin-sdk/v2/plugins/destination"
"github.com/cloudquery/plugin-sdk/v2/testdata"
"github.com/google/uuid"
)

func TestWriteRead(t *testing.T) {
Expand All @@ -15,61 +21,79 @@ func TestWriteRead(t *testing.T) {
options []Options
outputCount int
}{
{name: "default", outputCount: 1},
{name: "with_headers", options: []Options{WithHeader()}, outputCount: 1},
{name: "with_delimiter", options: []Options{WithDelimiter('\t')}, outputCount: 1},
{name: "with_delimter_headers", options: []Options{WithDelimiter('\t'), WithHeader()}, outputCount: 1},
{name: "default", outputCount: 2},
{name: "with_headers", options: []Options{WithHeader()}, outputCount: 2},
{name: "with_delimiter", options: []Options{WithDelimiter('\t')}, outputCount: 2},
{name: "with_delimiter_headers", options: []Options{WithDelimiter('\t'), WithHeader()}, outputCount: 2},
}

for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
var b bytes.Buffer
table := testdata.TestTable("test")
cqtypes := testdata.GenTestData(table)
if err := cqtypes[0].Set("test-source"); err != nil {
arrowSchema := table.ToArrowSchema()
sourceName := "test-source"
syncTime := time.Date(2021, 1, 1, 0, 0, 0, 0, time.UTC)
mem := memory.NewCheckedAllocator(memory.NewGoAllocator())
defer mem.AssertSize(t, 0)
opts := testdata.GenTestDataOptions{
SourceName: sourceName,
SyncTime: syncTime,
MaxRows: 2,
StableUUID: uuid.MustParse("00000000-0000-0000-0000-000000000001"),
StableTime: time.Date(2021, 1, 2, 0, 0, 0, 0, time.UTC),
}
records := testdata.GenTestData(mem, arrowSchema, opts)
defer func() {
for _, r := range records {
r.Release()
}
}()
cl, err := NewClient(tc.options...)
if err != nil {
t.Fatal(err)
}

var b bytes.Buffer
writer := bufio.NewWriter(&b)
reader := bufio.NewReader(&b)
transformer := &schema.DefaultTransformer{}
transformedValues := schema.TransformWithTransformer(transformer, cqtypes)
client, err := NewClient(tc.options...)
if err != nil {

if err := cl.WriteTableBatch(writer, arrowSchema, records); err != nil {
t.Fatal(err)
}
writer.Flush()

if err := client.WriteTableBatch(writer, table, [][]any{transformedValues}); err != nil {
rawBytes, err := io.ReadAll(reader)
if err != nil {
t.Fatal(err)
}
writer.Flush()
snap := cupaloy.New(
cupaloy.SnapshotFileExtension(".csv"),
cupaloy.SnapshotSubdirectory("testdata"),
)
snap.SnapshotT(t, string(rawBytes))

ch := make(chan []any)
byteReader := bytes.NewReader(rawBytes)

ch := make(chan arrow.Record)
var readErr error
go func() {
readErr = client.Read(reader, table, "test-source", ch)
readErr = cl.Read(byteReader, arrowSchema, "test-source", ch)
close(ch)
}()
totalCount := 0
reverseTransformer := &ReverseTransformer{}
for row := range ch {
if client.IncludeHeaders && totalCount == 0 {
totalCount++
continue
}
gotCqtypes, err := reverseTransformer.ReverseTransformValues(table, row)
if err != nil {
t.Fatal(err)
}
if diff := cqtypes.Diff(gotCqtypes); diff != "" {
t.Fatalf("got diff: %s", diff)
for got := range ch {
if diff := destination.RecordDiff(records[totalCount], got); diff != "" {
got.Release()
t.Errorf("got diff: %s", diff)
}
got.Release()
totalCount++
}
if readErr != nil {
t.Fatal(readErr)
}
if totalCount != tc.outputCount {
t.Fatalf("expected %d row, got %d", tc.outputCount, totalCount)
t.Errorf("got %d row(s), want %d", totalCount, tc.outputCount)
}
})
}
Expand Down
27 changes: 15 additions & 12 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
module github.com/cloudquery/filetypes
module github.com/cloudquery/filetypes/v2

go 1.19

require (
github.com/cloudquery/plugin-sdk v1.45.0
github.com/cloudquery/plugin-sdk/v2 v2.3.0
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/stretchr/testify v1.8.2
Expand All @@ -19,41 +19,44 @@ require (
github.com/rs/zerolog v1.29.0 // indirect; indirect // indirect
github.com/thoas/go-funk v0.9.3 // indirect; indirect // indirect
golang.org/x/sync v0.1.0 // indirect
golang.org/x/sys v0.7.0 // indirect; indirect // indirect
golang.org/x/sys v0.7.0 // indirect; indirect // indirect // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
)

require (
github.com/apache/arrow/go/arrow v0.0.0-20200730104253-651201b0f516
github.com/apache/arrow/go/v12 v12.0.0-20230331222054-7e19111f2f81
github.com/goccy/go-json v0.10.2
github.com/google/go-cmp v0.5.9
github.com/xitongsys/parquet-go v1.6.2
github.com/apache/arrow/go/v12 v12.0.0-20230417014917-9888ac36c142
github.com/bradleyjkemp/cupaloy/v2 v2.8.0
)

replace github.com/apache/arrow/go/v12 => github.com/cloudquery/arrow/go/v12 v12.0.0-20230317130341-c648117570af
replace github.com/apache/arrow/go/v12 => github.com/cloudquery/arrow/go/v12 v12.0.0-20230417154311-f9add0212acd

require golang.org/x/exp v0.0.0-20230321023759-10a507213a29 // indirect

require (
github.com/JohnCGriffin/overflow v0.0.0-20211019200055-46fa312c352c // indirect
github.com/andybalholm/brotli v1.0.5 // indirect
github.com/apache/thrift v0.18.1 // indirect
// indirect
github.com/goccy/go-json v0.10.2 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/google/flatbuffers v2.0.8+incompatible // indirect
github.com/klauspost/asmfmt v1.3.2 // indirect
github.com/klauspost/cpuid/v2 v2.2.4 // indirect
github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 // indirect
github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 // indirect
github.com/xitongsys/parquet-go-source v0.0.0-20230312005205-fbbcdea5f512 // indirect
github.com/zeebo/xxh3 v1.0.2 // indirect
golang.org/x/mod v0.9.0 // indirect
golang.org/x/net v0.9.0 // indirect
golang.org/x/tools v0.7.0 // indirect
golang.org/x/xerrors v0.0.0-20220609144429-65e65417b02f // indirect
google.golang.org/genproto v0.0.0-20230331144136-dcfb400f0633 // indirect
google.golang.org/grpc v1.54.0 // indirect
google.golang.org/protobuf v1.30.0 // indirect
)

require (
github.com/getsentry/sentry-go v0.20.0 // indirect; indirect // indirect
github.com/klauspost/compress v1.16.3 // indirect
github.com/pierrec/lz4/v4 v4.1.17 // indirect; indirect // indirect
golang.org/x/text v0.8.0 // indirect; indirect // indirect
golang.org/x/text v0.9.0 // indirect; indirect // indirect
)
Loading

0 comments on commit b4fb660

Please sign in to comment.