Skip to content

Commit

Permalink
feat!: Upgrade to plugin-sdk v4 (#210)
Browse files Browse the repository at this point in the history
Co-authored-by: Kemal Hadimli <disq@users.noreply.github.com>
  • Loading branch information
disq and disq authored Jun 29, 2023
1 parent b5f44e2 commit fae0de7
Show file tree
Hide file tree
Showing 19 changed files with 286 additions and 82 deletions.
8 changes: 4 additions & 4 deletions client.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package filetypes

import (
csvFile "github.com/cloudquery/filetypes/v3/csv"
jsonFile "github.com/cloudquery/filetypes/v3/json"
"github.com/cloudquery/filetypes/v3/parquet"
"github.com/cloudquery/filetypes/v3/types"
csvFile "github.com/cloudquery/filetypes/v4/csv"
jsonFile "github.com/cloudquery/filetypes/v4/json"
"github.com/cloudquery/filetypes/v4/parquet"
"github.com/cloudquery/filetypes/v4/types"
)

type Client struct {
Expand Down
4 changes: 2 additions & 2 deletions csv/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ import (
"github.com/apache/arrow/go/v13/arrow/array"
"github.com/apache/arrow/go/v13/arrow/csv"
"github.com/apache/arrow/go/v13/arrow/memory"
"github.com/cloudquery/plugin-sdk/v3/schema"
"github.com/cloudquery/plugin-sdk/v4/schema"
)

func (cl *Client) Read(r io.Reader, table *schema.Table, _ string, res chan<- arrow.Record) error {
func (cl *Client) Read(r io.Reader, table *schema.Table, res chan<- arrow.Record) error {
arrowSchema := table.ToArrowSchema()
newSchema := convertSchema(arrowSchema)
reader := csv.NewReader(r, newSchema,
Expand Down
23 changes: 18 additions & 5 deletions csv/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,14 @@ import (
// "encoding/csv"
"fmt"
"io"
"strings"

"github.com/apache/arrow/go/v13/arrow"
"github.com/apache/arrow/go/v13/arrow/array"
"github.com/apache/arrow/go/v13/arrow/csv"
"github.com/apache/arrow/go/v13/arrow/memory"
"github.com/cloudquery/filetypes/v3/types"
"github.com/cloudquery/plugin-sdk/v3/schema"
"github.com/cloudquery/filetypes/v4/types"
"github.com/cloudquery/plugin-sdk/v4/schema"
)

type Handle struct {
Expand Down Expand Up @@ -39,9 +40,10 @@ func (h *Handle) WriteContent(records []arrow.Record) error {
if err := h.w.Write(castRec); err != nil {
return fmt.Errorf("failed to write record to csv: %w", err)
}
if err := h.w.Flush(); err != nil {
return fmt.Errorf("failed to flush csv writer: %w", err)
}
}

if err := h.w.Flush(); err != nil {
return fmt.Errorf("failed to flush csv writer: %w", err)
}
return nil
}
Expand All @@ -58,6 +60,7 @@ func convertSchema(sch *arrow.Schema) *arrow.Schema {
if !isTypeSupported(f.Type) {
fields[i].Type = arrow.BinaryTypes.String
}
fields[i].Metadata = stripCQExtensionMetadata(fields[i].Metadata)
}

md := sch.Metadata()
Expand Down Expand Up @@ -108,3 +111,13 @@ func castToString(rec arrow.Record) arrow.Record {
}
return array.NewRecord(newSchema, cols, rec.NumRows())
}

func stripCQExtensionMetadata(md arrow.Metadata) arrow.Metadata {
m := md.ToMap()
for k := range m {
if strings.HasPrefix(k, "cq:extension:") {
delete(m, k)
}
}
return arrow.MetadataFrom(m)
}
10 changes: 5 additions & 5 deletions csv/write_read_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ import (

"github.com/apache/arrow/go/v13/arrow"
"github.com/bradleyjkemp/cupaloy/v2"
"github.com/cloudquery/filetypes/v3/types"
"github.com/cloudquery/plugin-sdk/v3/plugins/destination"
"github.com/cloudquery/plugin-sdk/v3/schema"
"github.com/cloudquery/filetypes/v4/types"
"github.com/cloudquery/plugin-sdk/v4/plugin"
"github.com/cloudquery/plugin-sdk/v4/schema"
"github.com/google/uuid"
)

Expand Down Expand Up @@ -69,12 +69,12 @@ func TestWriteRead(t *testing.T) {
ch := make(chan arrow.Record)
var readErr error
go func() {
readErr = cl.Read(byteReader, table, "test-source", ch)
readErr = cl.Read(byteReader, table, ch)
close(ch)
}()
totalCount := 0
for got := range ch {
if diff := destination.RecordDiff(records[totalCount], got); diff != "" {
if diff := plugin.RecordDiff(records[totalCount], got); diff != "" {
t.Errorf("got diff: %s", diff)
}
totalCount++
Expand Down
7 changes: 2 additions & 5 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
module github.com/cloudquery/filetypes/v3
module github.com/cloudquery/filetypes/v4

go 1.19

require (
github.com/apache/arrow/go/v13 v13.0.0-20230622042343-ec413b7763fe
github.com/bradleyjkemp/cupaloy/v2 v2.8.0
github.com/cloudquery/plugin-sdk/v3 v3.10.6
github.com/cloudquery/plugin-sdk/v4 v4.2.0-rc1
github.com/goccy/go-json v0.10.2
github.com/google/uuid v1.3.0
github.com/stretchr/testify v1.8.4
Expand All @@ -18,9 +18,7 @@ require (
github.com/JohnCGriffin/overflow v0.0.0-20211019200055-46fa312c352c // indirect
github.com/andybalholm/brotli v1.0.5 // indirect
github.com/apache/thrift v0.18.1 // indirect
github.com/cloudquery/plugin-pb-go v1.4.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/ghodss/yaml v1.0.0 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/google/flatbuffers v23.1.21+incompatible // indirect
Expand All @@ -47,6 +45,5 @@ require (
google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1 // indirect
google.golang.org/grpc v1.55.0 // indirect
google.golang.org/protobuf v1.30.0 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
10 changes: 2 additions & 8 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,12 @@ github.com/bradleyjkemp/cupaloy/v2 v2.8.0 h1:any4BmKE+jGIaMpnU8YgH/I2LPiLBufr6oM
github.com/bradleyjkemp/cupaloy/v2 v2.8.0/go.mod h1:bm7JXdkRd4BHJk9HpwqAI8BoAY1lps46Enkdqw6aRX0=
github.com/cloudquery/arrow/go/v13 v13.0.0-20230626001500-065602842c3a h1:O/FNq1+8YlWzHYNj2tokFQyja6GXsQBdkuvLMdpuaSw=
github.com/cloudquery/arrow/go/v13 v13.0.0-20230626001500-065602842c3a/go.mod h1:W69eByFNO0ZR30q1/7Sr9d83zcVZmF2MiP3fFYAWJOc=
github.com/cloudquery/plugin-pb-go v1.4.0 h1:sfy0oWSFac2JCJQJuKoR+8flZGKkEoUVORwZDNM3aiI=
github.com/cloudquery/plugin-pb-go v1.4.0/go.mod h1:NbWAtT2BzJQ9+XUWwh3IKBg3MOeV9ZEpHoHNAQ/YDV8=
github.com/cloudquery/plugin-sdk/v3 v3.10.6 h1:KqTsLZ6OA1h8BUMeMcU6BAD6TBW6ojgQaC4zDZMgvu0=
github.com/cloudquery/plugin-sdk/v3 v3.10.6/go.mod h1:QhBaVgiNyQ3P6uAzJWOYpYykHXL+WDZffwg1riTwv60=
github.com/cloudquery/plugin-sdk/v4 v4.2.0-rc1 h1:sRjZ/Lb/yjLw92HzvgPiyVynbocbtaa13fEgS9MN/DQ=
github.com/cloudquery/plugin-sdk/v4 v4.2.0-rc1/go.mod h1:gn2ANihFC5SUMPCcYnVD+Gt3Cgn8OeXJW2/0lRUoB68=
github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/ghodss/yaml v1.0.0 h1:wQHKEahhL6wmXdzwWG11gIVCkOv05bNOh+Rxn0yngAk=
github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
github.com/goccy/go-json v0.10.2 h1:CrxCmQqYDkv1z7lO7Wbh2HN93uovUHgrECaO5ZrCXAU=
github.com/goccy/go-json v0.10.2/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I=
github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
Expand Down Expand Up @@ -103,8 +99,6 @@ google.golang.org/protobuf v1.30.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqw
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
4 changes: 2 additions & 2 deletions json/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@ import (
"github.com/apache/arrow/go/v13/arrow"
"github.com/apache/arrow/go/v13/arrow/array"
"github.com/apache/arrow/go/v13/arrow/memory"
"github.com/cloudquery/plugin-sdk/v3/schema"
"github.com/cloudquery/plugin-sdk/v4/schema"
)

const maxJSONSize = 1024 * 1024 * 20

func (*Client) Read(r io.Reader, table *schema.Table, _ string, res chan<- arrow.Record) error {
func (*Client) Read(r io.Reader, table *schema.Table, res chan<- arrow.Record) error {
scanner := bufio.NewScanner(r)
scanner.Buffer(make([]byte, maxJSONSize), maxJSONSize)
rb := array.NewRecordBuilder(memory.DefaultAllocator, table.ToArrowSchema())
Expand Down
4 changes: 2 additions & 2 deletions json/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ import (

"github.com/apache/arrow/go/v13/arrow"
"github.com/apache/arrow/go/v13/arrow/array"
"github.com/cloudquery/filetypes/v3/types"
"github.com/cloudquery/plugin-sdk/v3/schema"
"github.com/cloudquery/filetypes/v4/types"
"github.com/cloudquery/plugin-sdk/v4/schema"
"github.com/goccy/go-json"
)

Expand Down
31 changes: 5 additions & 26 deletions json/write_read_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,33 +9,12 @@ import (

"github.com/apache/arrow/go/v13/arrow"
"github.com/bradleyjkemp/cupaloy/v2"
"github.com/cloudquery/filetypes/v3/types"
"github.com/cloudquery/plugin-sdk/v3/plugins/destination"
"github.com/cloudquery/plugin-sdk/v3/schema"
"github.com/cloudquery/filetypes/v4/types"
"github.com/cloudquery/plugin-sdk/v4/plugin"
"github.com/cloudquery/plugin-sdk/v4/schema"
"github.com/google/uuid"
)

func TestWrite(t *testing.T) {
var b bytes.Buffer
table := schema.TestTable("test", schema.TestSourceOptions{})
sourceName := "test-source"
syncTime := time.Now().UTC().Round(time.Second)
opts := schema.GenTestDataOptions{
SourceName: sourceName,
SyncTime: syncTime,
MaxRows: 1,
}
records := schema.GenTestData(table, opts)
cl, err := NewClient()
if err != nil {
t.Fatal(err)
}
if err := types.WriteAll(cl, &b, table, records); err != nil {
t.Fatal(err)
}
t.Log(b.String())
}

func TestWriteRead(t *testing.T) {
table := schema.TestTable("test", schema.TestSourceOptions{})
sourceName := "test-source"
Expand Down Expand Up @@ -77,12 +56,12 @@ func TestWriteRead(t *testing.T) {
ch := make(chan arrow.Record)
var readErr error
go func() {
readErr = cl.Read(byteReader, table, "test-source", ch)
readErr = cl.Read(byteReader, table, ch)
close(ch)
}()
totalCount := 0
for got := range ch {
if diff := destination.RecordDiff(records[totalCount], got); diff != "" {
if diff := plugin.RecordDiff(records[totalCount], got); diff != "" {
t.Fatalf("got diff: %s", diff)
}
totalCount++
Expand Down
4 changes: 2 additions & 2 deletions parquet/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@ import (
"github.com/apache/arrow/go/v13/parquet"
"github.com/apache/arrow/go/v13/parquet/file"
"github.com/apache/arrow/go/v13/parquet/pqarrow"
"github.com/cloudquery/plugin-sdk/v3/schema"
"github.com/cloudquery/plugin-sdk/v4/schema"
)

func (*Client) Read(f parquet.ReaderAtSeeker, table *schema.Table, _ string, res chan<- arrow.Record) error {
func (*Client) Read(f parquet.ReaderAtSeeker, table *schema.Table, res chan<- arrow.Record) error {
ctx := context.Background()
rdr, err := file.NewParquetReader(f)
if err != nil {
Expand Down
6 changes: 3 additions & 3 deletions parquet/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ import (
"github.com/apache/arrow/go/v13/parquet"
"github.com/apache/arrow/go/v13/parquet/compress"
"github.com/apache/arrow/go/v13/parquet/pqarrow"
ftypes "github.com/cloudquery/filetypes/v3/types"
"github.com/cloudquery/plugin-sdk/v3/schema"
"github.com/cloudquery/plugin-sdk/v3/types"
ftypes "github.com/cloudquery/filetypes/v4/types"
"github.com/cloudquery/plugin-sdk/v4/schema"
"github.com/cloudquery/plugin-sdk/v4/types"
)

type Handle struct {
Expand Down
10 changes: 5 additions & 5 deletions parquet/write_read_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ import (

"github.com/apache/arrow/go/v13/arrow"
"github.com/apache/arrow/go/v13/arrow/array"
"github.com/cloudquery/filetypes/v3/types"
"github.com/cloudquery/plugin-sdk/v3/plugins/destination"
"github.com/cloudquery/plugin-sdk/v3/schema"
"github.com/cloudquery/filetypes/v4/types"
"github.com/cloudquery/plugin-sdk/v4/plugin"
"github.com/cloudquery/plugin-sdk/v4/schema"
)

func TestWriteRead(t *testing.T) {
Expand Down Expand Up @@ -48,14 +48,14 @@ func TestWriteRead(t *testing.T) {
ch := make(chan arrow.Record)
var readErr error
go func() {
readErr = cl.Read(byteReader, table, "test-source", ch)
readErr = cl.Read(byteReader, table, ch)
close(ch)
}()
totalCount := 0
for got := range ch {
curr := records[totalCount]
if !array.RecordApproxEqual(curr, got) {
t.Fatalf("got diff (record %d): %s\n", totalCount, destination.RecordDiff(records[totalCount], got))
t.Fatalf("got diff (record %d): %s\n", totalCount, plugin.RecordDiff(records[totalCount], got))
}
totalCount++
}
Expand Down
10 changes: 5 additions & 5 deletions read.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"io"

"github.com/apache/arrow/go/v13/arrow"
"github.com/cloudquery/plugin-sdk/v3/schema"
"github.com/cloudquery/plugin-sdk/v4/schema"
)

type ReaderAtSeeker interface {
Expand All @@ -13,18 +13,18 @@ type ReaderAtSeeker interface {
io.Seeker
}

func (cl *Client) Read(f ReaderAtSeeker, table *schema.Table, sourceName string, res chan<- arrow.Record) error {
func (cl *Client) Read(f ReaderAtSeeker, table *schema.Table, res chan<- arrow.Record) error {
switch cl.spec.Format {
case FormatTypeCSV:
if err := cl.csv.Read(f, table, sourceName, res); err != nil {
if err := cl.csv.Read(f, table, res); err != nil {
return err
}
case FormatTypeJSON:
if err := cl.json.Read(f, table, sourceName, res); err != nil {
if err := cl.json.Read(f, table, res); err != nil {
return err
}
case FormatTypeParquet:
if err := cl.parquet.Read(f, table, sourceName, res); err != nil {
if err := cl.parquet.Read(f, table, res); err != nil {
return err
}
default:
Expand Down
6 changes: 3 additions & 3 deletions spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ import (
"encoding/json"
"fmt"

"github.com/cloudquery/filetypes/v3/csv"
jsonFile "github.com/cloudquery/filetypes/v3/json"
"github.com/cloudquery/filetypes/v3/parquet"
"github.com/cloudquery/filetypes/v4/csv"
jsonFile "github.com/cloudquery/filetypes/v4/json"
"github.com/cloudquery/filetypes/v4/parquet"
)

type FormatType string
Expand Down
4 changes: 2 additions & 2 deletions spec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ package filetypes
import (
"testing"

"github.com/cloudquery/filetypes/v3/csv"
"github.com/cloudquery/filetypes/v3/json"
"github.com/cloudquery/filetypes/v4/csv"
"github.com/cloudquery/filetypes/v4/json"
"github.com/stretchr/testify/assert"
)

Expand Down
Loading

0 comments on commit fae0de7

Please sign in to comment.