Skip to content

Commit

Permalink
Format types based on plugin runtime, built-in or standalone. (#169)
Browse files Browse the repository at this point in the history
* Format types based on plugin runtime, built-in or standalone.

When the plugin is embedded, the type restriction changes to only pg types.
However if the plugin is standalone, only base types will be supported, others
converted to their chosen format representation.

* fmt; lint

---------

Co-authored-by: Lyubo Kamenov <lyubo@meroxa.io>
  • Loading branch information
samirketema and lyuboxa authored Jun 13, 2024
1 parent a1229e7 commit f190c59
Show file tree
Hide file tree
Showing 6 changed files with 150 additions and 8 deletions.
4 changes: 4 additions & 0 deletions cmd/connector/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,13 @@ package main

import (
postgres "github.com/conduitio/conduit-connector-postgres"
"github.com/conduitio/conduit-connector-postgres/source/types"
sdk "github.com/conduitio/conduit-connector-sdk"
)

func main() {
// Running as standalone plugin
types.WithBuiltinPlugin = false

sdk.Serve(postgres.Connector)
}
117 changes: 114 additions & 3 deletions source/logrepl/internal/relationset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"testing"
"time"

"github.com/conduitio/conduit-connector-postgres/source/types"
"github.com/conduitio/conduit-connector-postgres/test"
"github.com/google/go-cmp/cmp"
"github.com/jackc/pglogrepl"
Expand Down Expand Up @@ -85,9 +86,23 @@ func TestRelationSetAllTypes(t *testing.T) {
is.NoErr(err)
is.Equal(gotRel, rel)

values, err := rs.Values(ins.RelationID, ins.Tuple)
is.NoErr(err)
isValuesAllTypes(is, values)
t.Run("with builtin plugin", func(t *testing.T) {
is := is.New(t)

values, err := rs.Values(ins.RelationID, ins.Tuple)
is.NoErr(err)
isValuesAllTypes(is, values)
})

t.Run("with standalone plugin", func(t *testing.T) {
is := is.New(t)

types.WithBuiltinPlugin = false
values, err := rs.Values(ins.RelationID, ins.Tuple)
is.NoErr(err)
isValuesAllTypesStandalone(is, values)
types.WithBuiltinPlugin = true
})
}

// setupTableAllTypes creates a new table with all types and returns its name.
Expand Down Expand Up @@ -248,6 +263,102 @@ func insertRowAllTypes(ctx context.Context, t *testing.T, conn test.Querier, tab
}

func isValuesAllTypes(is *is.I, got map[string]any) {
want := map[string]any{
"col_bit": pgtype.Bits{
Bytes: []byte{0b01},
Len: 8,
Valid: true,
},
"col_varbit": pgtype.Bits{
Bytes: []byte{0b10},
Len: 8,
Valid: true,
},
"col_boolean": true,
"col_box": pgtype.Box{
P: [2]pgtype.Vec2{{X: 5, Y: 6}, {X: 3, Y: 4}},
Valid: true,
},
"col_bytea": []byte{0x07},
"col_char": "8 ", // blank padded char
"col_varchar": "9",
"col_cidr": netip.MustParsePrefix("192.168.100.128/25"),
"col_circle": pgtype.Circle{
P: pgtype.Vec2{X: 11, Y: 12},
R: 13,
Valid: true,
},
"col_date": time.Date(2022, 3, 14, 0, 0, 0, 0, time.UTC).UTC(),
"col_float4": float32(15),
"col_float8": float64(16.16),
"col_inet": netip.MustParsePrefix("192.168.0.17/32"),
"col_int2": int16(32767),
"col_int4": int32(2147483647),
"col_int8": int64(9223372036854775807),
"col_interval": pgtype.Interval{
Microseconds: 18000000,
Days: 0,
Months: 0,
Valid: true,
},
"col_json": map[string]any{"foo": "bar"},
"col_jsonb": map[string]any{"foo": "baz"},
"col_line": pgtype.Line{
A: 19,
B: 20,
C: 21,
Valid: true,
},
"col_lseg": pgtype.Lseg{
P: [2]pgtype.Vec2{{X: 22, Y: 23}, {X: 24, Y: 25}},
Valid: true,
},
"col_macaddr": net.HardwareAddr{0x08, 0x00, 0x2b, 0x01, 0x02, 0x26},
"col_macaddr8": net.HardwareAddr{0x08, 0x00, 0x2b, 0x01, 0x02, 0x03, 0x04, 0x27},
"col_money": "$28.00",
"col_numeric": float64(292929.29),
"col_path": pgtype.Path{
P: []pgtype.Vec2{{X: 30, Y: 31}, {X: 32, Y: 33}, {X: 34, Y: 35}},
Closed: false,
Valid: true,
},
"col_pg_lsn": "36/37",
"col_pg_snapshot": "10:20:10,14,15",
"col_point": pgtype.Point{
P: pgtype.Vec2{X: 38, Y: 39},
Valid: true,
},
"col_polygon": pgtype.Polygon{
P: []pgtype.Vec2{{X: 40, Y: 41}, {X: 42, Y: 43}, {X: 44, Y: 45}},
Valid: true,
},
"col_serial2": int16(32767),
"col_serial4": int32(2147483647),
"col_serial8": int64(9223372036854775807),
"col_text": "foo bar baz",
"col_time": pgtype.Time{
Microseconds: time.Date(1970, 1, 1, 4, 5, 6, 789000000, time.UTC).UnixMicro(),
Valid: true,
},
"col_timetz": "04:05:06.789-08",
"col_timestamp": time.Date(2022, 3, 14, 15, 16, 17, 0, time.UTC).UTC(),
"col_timestamptz": time.Date(2022, 3, 14, 15+8, 16, 17, 0, time.UTC).UTC(),
"col_tsquery": "'fat' & ( 'rat' | 'cat' )",
"col_tsvector": "'a' 'and' 'ate' 'cat' 'fat' 'mat' 'on' 'rat' 'sat'",
"col_uuid": [16]uint8{0xbd, 0x94, 0xee, 0x0b, 0x56, 0x4f, 0x40, 0x88, 0xbf, 0x4e, 0x8d, 0x5e, 0x62, 0x6c, 0xaf, 0x66},
"col_xml": "<foo>bar</foo>",
}
is.Equal("", cmp.Diff(want, got,
cmp.Comparer(func(x, y *big.Int) bool {
return x.Cmp(y) == 0
}),
cmp.Comparer(func(x, y netip.Prefix) bool {
return x.String() == y.String()
}),
))
}

func isValuesAllTypesStandalone(is *is.I, got map[string]any) {
want := map[string]any{
"col_bit": pgtype.Bits{
Bytes: []byte{0b01},
Expand Down
2 changes: 1 addition & 1 deletion source/snapshot/fetch_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,7 @@ func Test_FetchWorker_buildRecordData(t *testing.T) {
// special case fields
fields = []string{"id", "time"}
values = []any{1, now}
expectValues = []any{1, now.String()}
expectValues = []any{1, now}
)

key, payload, err := (&FetchWorker{
Expand Down
7 changes: 6 additions & 1 deletion source/types/time.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,12 @@ import (

type TimeFormatter struct{}

// Format coerces `time.Time` to a string representation in UTC tz.
// Format returns:
// * string format of Time when connectorn is not builtin
// * time type in UTC when connector is builtin
func (n TimeFormatter) Format(t time.Time) (any, error) {
if WithBuiltinPlugin {
return t.UTC(), nil
}
return t.UTC().String(), nil
}
2 changes: 2 additions & 0 deletions source/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ var (
Time = TimeFormatter{}
)

var WithBuiltinPlugin = true

func Format(v any) (any, error) {
switch t := v.(type) {
case pgtype.Numeric:
Expand Down
26 changes: 23 additions & 3 deletions source/types/types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,13 @@ import (
)

func Test_Format(t *testing.T) {
now := time.Now().UTC()

tests := []struct {
name string
input []any
expect []any
name string
input []any
expect []any
withBuiltin bool
}{
{
name: "int float string bool",
Expand Down Expand Up @@ -62,13 +65,30 @@ func Test_Format(t *testing.T) {
"2009-11-10 23:00:00 +0000 UTC", nil,
},
},
{
name: "builtin time.Time",
input: []any{
now,
},
expect: []any{
now,
},
withBuiltin: true,
},
}
_ = time.Now()

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
is := is.New(t)

prevWithBuiltinPlugin := WithBuiltinPlugin
WithBuiltinPlugin = tc.withBuiltin

t.Cleanup(func() {
WithBuiltinPlugin = prevWithBuiltinPlugin
})

for i, in := range tc.input {
v, err := Format(in)
is.NoErr(err)
Expand Down

0 comments on commit f190c59

Please sign in to comment.