Skip to content

Commit

Permalink
feat!: Upgrade to SDK v3 (#155)
Browse files Browse the repository at this point in the history
Co-authored-by: Kemal Hadimli <disq@users.noreply.github.com>
  • Loading branch information
disq and disq authored May 11, 2023
1 parent 0ef3003 commit 0509231
Show file tree
Hide file tree
Showing 16 changed files with 83 additions and 81 deletions.
6 changes: 3 additions & 3 deletions client.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package filetypes

import (
csvFile "github.com/cloudquery/filetypes/v2/csv"
jsonFile "github.com/cloudquery/filetypes/v2/json"
"github.com/cloudquery/filetypes/v2/parquet"
csvFile "github.com/cloudquery/filetypes/v3/csv"
jsonFile "github.com/cloudquery/filetypes/v3/json"
"github.com/cloudquery/filetypes/v3/parquet"
)

type Client struct {
Expand Down
5 changes: 3 additions & 2 deletions csv/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@ import (

"github.com/apache/arrow/go/v13/arrow"
"github.com/apache/arrow/go/v13/arrow/csv"
"github.com/cloudquery/plugin-sdk/v3/schema"
)

func (cl *Client) Read(r io.Reader, arrowSchema *arrow.Schema, _ string, res chan<- arrow.Record) error {
reader := csv.NewReader(r, arrowSchema,
func (cl *Client) Read(r io.Reader, table *schema.Table, _ string, res chan<- arrow.Record) error {
reader := csv.NewReader(r, table.ToArrowSchema(),
csv.WithComma(cl.Delimiter),
csv.WithHeader(cl.IncludeHeaders),
csv.WithNullReader(true, ""),
Expand Down
5 changes: 3 additions & 2 deletions csv/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,11 @@ import (

"github.com/apache/arrow/go/v13/arrow"
"github.com/apache/arrow/go/v13/arrow/csv"
"github.com/cloudquery/plugin-sdk/v3/schema"
)

func (cl *Client) WriteTableBatch(w io.Writer, arrowSchema *arrow.Schema, records []arrow.Record) error {
writer := csv.NewWriter(w, arrowSchema,
func (cl *Client) WriteTableBatch(w io.Writer, table *schema.Table, records []arrow.Record) error {
writer := csv.NewWriter(w, table.ToArrowSchema(),
csv.WithComma(cl.Delimiter),
csv.WithHeader(cl.IncludeHeaders),
csv.WithNullWriter(""),
Expand Down
26 changes: 12 additions & 14 deletions csv/write_read_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ import (

"github.com/apache/arrow/go/v13/arrow"
"github.com/bradleyjkemp/cupaloy/v2"
"github.com/cloudquery/plugin-sdk/v2/plugins/destination"
"github.com/cloudquery/plugin-sdk/v2/testdata"
"github.com/cloudquery/plugin-sdk/v3/plugins/destination"
"github.com/cloudquery/plugin-sdk/v3/schema"
"github.com/google/uuid"
)

Expand All @@ -28,18 +28,17 @@ func TestWriteRead(t *testing.T) {

for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
table := testdata.TestTable("test")
arrowSchema := table.ToArrowSchema()
table := schema.TestTable("test")
sourceName := "test-source"
syncTime := time.Date(2021, 1, 1, 0, 0, 0, 0, time.UTC)
opts := testdata.GenTestDataOptions{
opts := schema.GenTestDataOptions{
SourceName: sourceName,
SyncTime: syncTime,
MaxRows: 2,
StableUUID: uuid.MustParse("00000000-0000-0000-0000-000000000001"),
StableTime: time.Date(2021, 1, 2, 0, 0, 0, 0, time.UTC),
}
records := testdata.GenTestData(arrowSchema, opts)
records := schema.GenTestData(table, opts)
cl, err := NewClient(tc.options...)
if err != nil {
t.Fatal(err)
Expand All @@ -49,7 +48,7 @@ func TestWriteRead(t *testing.T) {
writer := bufio.NewWriter(&b)
reader := bufio.NewReader(&b)

if err := cl.WriteTableBatch(writer, arrowSchema, records); err != nil {
if err := cl.WriteTableBatch(writer, table, records); err != nil {
t.Fatal(err)
}
writer.Flush()
Expand All @@ -69,7 +68,7 @@ func TestWriteRead(t *testing.T) {
ch := make(chan arrow.Record)
var readErr error
go func() {
readErr = cl.Read(byteReader, arrowSchema, "test-source", ch)
readErr = cl.Read(byteReader, table, "test-source", ch)
close(ch)
}()
totalCount := 0
Expand All @@ -90,16 +89,15 @@ func TestWriteRead(t *testing.T) {
}

func BenchmarkWrite(b *testing.B) {
table := testdata.TestTable("test")
arrowSchema := table.ToArrowSchema()
table := schema.TestTable("test")
sourceName := "test-source"
syncTime := time.Now().UTC().Round(1 * time.Second)
opts := testdata.GenTestDataOptions{
syncTime := time.Now().UTC().Round(time.Second)
opts := schema.GenTestDataOptions{
SourceName: sourceName,
SyncTime: syncTime,
MaxRows: 1000,
}
records := testdata.GenTestData(arrowSchema, opts)
records := schema.GenTestData(table, opts)

cl, err := NewClient()
if err != nil {
Expand All @@ -109,7 +107,7 @@ func BenchmarkWrite(b *testing.B) {
writer := bufio.NewWriter(&buf)
b.ResetTimer()
for i := 0; i < b.N; i++ {
if err := cl.WriteTableBatch(writer, arrowSchema, records); err != nil {
if err := cl.WriteTableBatch(writer, table, records); err != nil {
b.Fatal(err)
}
err = writer.Flush()
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
module github.com/cloudquery/filetypes/v2
module github.com/cloudquery/filetypes/v3

go 1.19

require (
github.com/apache/arrow/go/v13 v13.0.0-20230509040948-de6c3cd2b604
github.com/bradleyjkemp/cupaloy/v2 v2.8.0
github.com/cloudquery/plugin-sdk/v2 v2.7.0
github.com/cloudquery/plugin-sdk/v3 v3.0.1
github.com/stretchr/testify v1.8.2
)

Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ github.com/cloudquery/arrow/go/v13 v13.0.0-20230509053643-898a79b1d3c8 h1:CmgLSE
github.com/cloudquery/arrow/go/v13 v13.0.0-20230509053643-898a79b1d3c8/go.mod h1:/XatdE3kDIBqZKhZ7OBUHwP2jaASDFZHqF4puOWM8po=
github.com/cloudquery/plugin-pb-go v1.0.5 h1:Du6pXI2JZRtgWfc0K69/gtNcyHICqEbAmfJXTARAqCc=
github.com/cloudquery/plugin-pb-go v1.0.5/go.mod h1:vAGA27psem7ZZNAY4a3S9TKuA/JDQWstjKcHPJX91Mc=
github.com/cloudquery/plugin-sdk/v2 v2.7.0 h1:hRXsdEiaOxJtsn/wZMFQC9/jPfU1MeMK3KF+gPGqm7U=
github.com/cloudquery/plugin-sdk/v2 v2.7.0/go.mod h1:pAX6ojIW99b/Vg4CkhnsGkRIzNaVEceYMR+Bdit73ug=
github.com/cloudquery/plugin-sdk/v3 v3.0.1 h1:5l3dG4AIrAWadc0aEiht5au2gM/wHLRSK2qSzao1Sm0=
github.com/cloudquery/plugin-sdk/v3 v3.0.1/go.mod h1:cJP020H448wknQfjCDo0HR0b3vt9kYcjrEWtmV3YIgc=
github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
Expand Down
5 changes: 3 additions & 2 deletions json/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,15 @@ import (
"github.com/apache/arrow/go/v13/arrow"
"github.com/apache/arrow/go/v13/arrow/array"
"github.com/apache/arrow/go/v13/arrow/memory"
"github.com/cloudquery/plugin-sdk/v3/schema"
)

const maxJSONSize = 1024 * 1024 * 20

func (*Client) Read(r io.Reader, arrowSchema *arrow.Schema, _ string, res chan<- arrow.Record) error {
func (*Client) Read(r io.Reader, table *schema.Table, _ string, res chan<- arrow.Record) error {
scanner := bufio.NewScanner(r)
scanner.Buffer(make([]byte, maxJSONSize), maxJSONSize)
rb := array.NewRecordBuilder(memory.DefaultAllocator, arrowSchema)
rb := array.NewRecordBuilder(memory.DefaultAllocator, table.ToArrowSchema())
for scanner.Scan() {
b := scanner.Bytes()
err := rb.UnmarshalJSON(b)
Expand Down
3 changes: 2 additions & 1 deletion json/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@ import (

"github.com/apache/arrow/go/v13/arrow"
"github.com/apache/arrow/go/v13/arrow/array"
"github.com/cloudquery/plugin-sdk/v3/schema"
)

func (c *Client) WriteTableBatch(w io.Writer, _ *arrow.Schema, records []arrow.Record) error {
func (c *Client) WriteTableBatch(w io.Writer, _ *schema.Table, records []arrow.Record) error {
for _, r := range records {
err := c.writeRecord(w, r)
if err != nil {
Expand Down
37 changes: 17 additions & 20 deletions json/write_read_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,46 +9,44 @@ import (

"github.com/apache/arrow/go/v13/arrow"
"github.com/bradleyjkemp/cupaloy/v2"
"github.com/cloudquery/plugin-sdk/v2/plugins/destination"
"github.com/cloudquery/plugin-sdk/v2/testdata"
"github.com/cloudquery/plugin-sdk/v3/plugins/destination"
"github.com/cloudquery/plugin-sdk/v3/schema"
"github.com/google/uuid"
)

func TestWrite(t *testing.T) {
var b bytes.Buffer
table := testdata.TestTable("test")
arrowSchema := table.ToArrowSchema()
table := schema.TestTable("test")
sourceName := "test-source"
syncTime := time.Now().UTC().Round(1 * time.Second)
opts := testdata.GenTestDataOptions{
syncTime := time.Now().UTC().Round(time.Second)
opts := schema.GenTestDataOptions{
SourceName: sourceName,
SyncTime: syncTime,
MaxRows: 1,
}
records := testdata.GenTestData(arrowSchema, opts)
records := schema.GenTestData(table, opts)
cl, err := NewClient()
if err != nil {
t.Fatal(err)
}
if err := cl.WriteTableBatch(&b, arrowSchema, records); err != nil {
if err := cl.WriteTableBatch(&b, table, records); err != nil {
t.Fatal(err)
}
t.Log(b.String())
}

func TestWriteRead(t *testing.T) {
table := testdata.TestTable("test")
arrowSchema := table.ToArrowSchema()
table := schema.TestTable("test")
sourceName := "test-source"
syncTime := time.Date(2021, 1, 1, 0, 0, 0, 0, time.UTC)
opts := testdata.GenTestDataOptions{
opts := schema.GenTestDataOptions{
SourceName: sourceName,
SyncTime: syncTime,
MaxRows: 2,
StableUUID: uuid.MustParse("00000000-0000-0000-0000-000000000001"),
StableTime: time.Date(2021, 1, 2, 0, 0, 0, 0, time.UTC),
}
records := testdata.GenTestData(arrowSchema, opts)
records := schema.GenTestData(table, opts)
cl, err := NewClient()
if err != nil {
t.Fatal(err)
Expand All @@ -58,7 +56,7 @@ func TestWriteRead(t *testing.T) {
writer := bufio.NewWriter(&b)
reader := bufio.NewReader(&b)

if err := cl.WriteTableBatch(writer, arrowSchema, records); err != nil {
if err := cl.WriteTableBatch(writer, table, records); err != nil {
t.Fatal(err)
}
writer.Flush()
Expand All @@ -78,7 +76,7 @@ func TestWriteRead(t *testing.T) {
ch := make(chan arrow.Record)
var readErr error
go func() {
readErr = cl.Read(byteReader, arrowSchema, "test-source", ch)
readErr = cl.Read(byteReader, table, "test-source", ch)
close(ch)
}()
totalCount := 0
Expand All @@ -97,16 +95,15 @@ func TestWriteRead(t *testing.T) {
}

func BenchmarkWrite(b *testing.B) {
table := testdata.TestTable("test")
arrowSchema := table.ToArrowSchema()
table := schema.TestTable("test")
sourceName := "test-source"
syncTime := time.Now().UTC().Round(1 * time.Second)
opts := testdata.GenTestDataOptions{
syncTime := time.Now().UTC().Round(time.Second)
opts := schema.GenTestDataOptions{
SourceName: sourceName,
SyncTime: syncTime,
MaxRows: 1000,
}
records := testdata.GenTestData(arrowSchema, opts)
records := schema.GenTestData(table, opts)

cl, err := NewClient()
if err != nil {
Expand All @@ -116,7 +113,7 @@ func BenchmarkWrite(b *testing.B) {
writer := bufio.NewWriter(&buf)
b.ResetTimer()
for i := 0; i < b.N; i++ {
if err := cl.WriteTableBatch(writer, arrowSchema, records); err != nil {
if err := cl.WriteTableBatch(writer, table, records); err != nil {
b.Fatal(err)
}
err = writer.Flush()
Expand Down
6 changes: 4 additions & 2 deletions parquet/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ import (
"github.com/apache/arrow/go/v13/arrow/memory"
"github.com/apache/arrow/go/v13/parquet/file"
"github.com/apache/arrow/go/v13/parquet/pqarrow"
"github.com/cloudquery/plugin-sdk/v2/types"
"github.com/cloudquery/plugin-sdk/v3/schema"
"github.com/cloudquery/plugin-sdk/v3/types"
)

type ReaderAtSeeker interface {
Expand All @@ -20,7 +21,7 @@ type ReaderAtSeeker interface {
io.Seeker
}

func (*Client) Read(f ReaderAtSeeker, arrowSchema *arrow.Schema, _ string, res chan<- arrow.Record) error {
func (*Client) Read(f ReaderAtSeeker, table *schema.Table, _ string, res chan<- arrow.Record) error {
ctx := context.Background()
rdr, err := file.NewParquetReader(f)
if err != nil {
Expand All @@ -39,6 +40,7 @@ func (*Client) Read(f ReaderAtSeeker, arrowSchema *arrow.Schema, _ string, res c
return fmt.Errorf("failed to get parquet record reader: %w", err)
}

arrowSchema := table.ToArrowSchema()
for rr.Next() {
rec := rr.Record()
castRec, err := castStringsToExtensions(rec, arrowSchema)
Expand Down
7 changes: 4 additions & 3 deletions parquet/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,17 @@ import (
"github.com/apache/arrow/go/v13/parquet"
"github.com/apache/arrow/go/v13/parquet/compress"
"github.com/apache/arrow/go/v13/parquet/pqarrow"
"github.com/cloudquery/plugin-sdk/v2/types"
"github.com/cloudquery/plugin-sdk/v3/schema"
"github.com/cloudquery/plugin-sdk/v3/types"
)

func (c *Client) WriteTableBatch(w io.Writer, arrowSchema *arrow.Schema, records []arrow.Record) error {
func (c *Client) WriteTableBatch(w io.Writer, table *schema.Table, records []arrow.Record) error {
props := parquet.NewWriterProperties(
parquet.WithMaxRowGroupLength(128*1024*1024), // 128M
parquet.WithCompression(compress.Codecs.Snappy),
)
arrprops := pqarrow.DefaultWriterProps()
newSchema := convertSchema(arrowSchema)
newSchema := convertSchema(table.ToArrowSchema())
fw, err := pqarrow.NewFileWriter(newSchema, w, props, arrprops)
if err != nil {
return err
Expand Down
Loading

0 comments on commit 0509231

Please sign in to comment.