Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Schema support #205

Merged
merged 15 commits into from
Sep 16, 2024
2 changes: 1 addition & 1 deletion .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ linters:
# - errorlint
# - exhaustive
# - exhaustivestruct
- exportloopref
- copyloopvar
- forbidigo
# - forcetypeassert
# - funlen
Expand Down
4 changes: 2 additions & 2 deletions source.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,8 @@ func (s *Source) Open(ctx context.Context, pos opencdc.Position) error {

switch s.config.CDCMode {
case source.CDCModeAuto:
// TODO add logic that checks if the DB supports logical replication (since that's the only thing we support at the moment)
// TODO add logic that checks if the DB supports logical replication
// (since that's the only thing we support at the moment)
fallthrough
case source.CDCModeLogrepl:
i, err := logrepl.NewCombinedIterator(ctx, s.pool, logrepl.Config{
Expand All @@ -113,7 +114,6 @@ func (s *Source) Open(ctx context.Context, pos opencdc.Position) error {
TableKeys: s.tableKeys,
WithSnapshot: s.config.SnapshotMode == source.SnapshotModeInitial,
SnapshotFetchSize: s.config.SnapshotFetchSize,
WithAvroSchema: s.config.WithAvroSchema,
})
if err != nil {
return fmt.Errorf("failed to create logical replication iterator: %w", err)
Expand Down
4 changes: 2 additions & 2 deletions source/logrepl/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
"github.com/jackc/pgx/v5/pgxpool"
)

// Config holds configuration values for CDCIterator.
// CDCConfig holds configuration values for CDCIterator.
type CDCConfig struct {
LSN pglogrepl.LSN
SlotName string
Expand Down Expand Up @@ -65,7 +65,7 @@ func NewCDCIterator(ctx context.Context, pool *pgxpool.Pool, c CDCConfig) (*CDCI
}

records := make(chan opencdc.Record)
handler := NewCDCHandler(internal.NewRelationSet(), c.TableKeys, c.WithAvroSchema, records)
handler := NewCDCHandler(internal.NewRelationSet(), c.TableKeys, records)

sub, err := internal.CreateSubscription(
ctx,
Expand Down
164 changes: 159 additions & 5 deletions source/logrepl/cdc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,13 @@ import (
"time"

"github.com/conduitio/conduit-commons/opencdc"
"github.com/conduitio/conduit-commons/schema"
"github.com/conduitio/conduit-connector-postgres/source/position"
"github.com/conduitio/conduit-connector-postgres/test"
sdkschema "github.com/conduitio/conduit-connector-sdk/schema"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/hamba/avro/v2"
"github.com/jackc/pglogrepl"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/matryer/is"
Expand Down Expand Up @@ -146,7 +149,11 @@ func TestCDCIterator_Next(t *testing.T) {
want: opencdc.Record{
Operation: opencdc.OperationCreate,
Metadata: map[string]string{
opencdc.MetadataCollection: table,
opencdc.MetadataCollection: table,
opencdc.MetadataKeySchemaSubject: table + "_key",
opencdc.MetadataKeySchemaVersion: "1",
opencdc.MetadataPayloadSchemaSubject: table + "_payload",
opencdc.MetadataPayloadSchemaVersion: "1",
},
Key: opencdc.StructuredData{"id": int64(6)},
Payload: opencdc.Change{
Expand Down Expand Up @@ -175,7 +182,11 @@ func TestCDCIterator_Next(t *testing.T) {
want: opencdc.Record{
Operation: opencdc.OperationUpdate,
Metadata: map[string]string{
opencdc.MetadataCollection: table,
opencdc.MetadataCollection: table,
opencdc.MetadataKeySchemaSubject: table + "_key",
opencdc.MetadataKeySchemaVersion: "1",
opencdc.MetadataPayloadSchemaSubject: table + "_payload",
opencdc.MetadataPayloadSchemaVersion: "1",
},
Key: opencdc.StructuredData{"id": int64(1)},
Payload: opencdc.Change{
Expand Down Expand Up @@ -205,7 +216,11 @@ func TestCDCIterator_Next(t *testing.T) {
want: opencdc.Record{
Operation: opencdc.OperationUpdate,
Metadata: map[string]string{
opencdc.MetadataCollection: table,
opencdc.MetadataCollection: table,
opencdc.MetadataKeySchemaSubject: table + "_key",
opencdc.MetadataKeySchemaVersion: "1",
opencdc.MetadataPayloadSchemaSubject: table + "_payload",
opencdc.MetadataPayloadSchemaVersion: "1",
},
Key: opencdc.StructuredData{"id": int64(1)},
Payload: opencdc.Change{
Expand Down Expand Up @@ -244,7 +259,11 @@ func TestCDCIterator_Next(t *testing.T) {
want: opencdc.Record{
Operation: opencdc.OperationDelete,
Metadata: map[string]string{
opencdc.MetadataCollection: table,
opencdc.MetadataCollection: table,
opencdc.MetadataKeySchemaSubject: table + "_key",
opencdc.MetadataKeySchemaVersion: "1",
opencdc.MetadataPayloadSchemaSubject: table + "_payload",
opencdc.MetadataPayloadSchemaVersion: "1",
},
Key: opencdc.StructuredData{"id": int64(4)},
Payload: opencdc.Change{
Expand Down Expand Up @@ -274,7 +293,11 @@ func TestCDCIterator_Next(t *testing.T) {
want: opencdc.Record{
Operation: opencdc.OperationDelete,
Metadata: map[string]string{
opencdc.MetadataCollection: table,
opencdc.MetadataCollection: table,
opencdc.MetadataKeySchemaSubject: table + "_key",
opencdc.MetadataKeySchemaVersion: "1",
opencdc.MetadataPayloadSchemaSubject: table + "_payload",
opencdc.MetadataPayloadSchemaVersion: "1",
},
Key: opencdc.StructuredData{"id": int64(3)},
Payload: opencdc.Change{
Expand Down Expand Up @@ -497,3 +520,134 @@ func fetchSlotStats(t *testing.T, c test.Querier, slotName string) (pglogrepl.LS
time.Sleep(100 * time.Millisecond)
}
}

func TestCDCIterator_Schema(t *testing.T) {
ctx := context.Background()

pool := test.ConnectPool(ctx, t, test.RepmgrConnString)
table := test.SetupTestTable(ctx, t, pool)

i := testCDCIterator(ctx, t, pool, table, true)
<-i.sub.Ready()

t.Run("initial table schema", func(t *testing.T) {
is := is.New(t)

_, err := pool.Exec(
ctx,
fmt.Sprintf(`INSERT INTO %s (id, column1, column2, column3, column4, column5)
VALUES (6, 'bizz', 456, false, 12.3, 14)`, table),
)
is.NoErr(err)

r, err := i.Next(ctx)
is.NoErr(err)

assertPayloadSchemaOK(ctx, is, test.TestTableAvroSchemaV1, table, r)
assertKeySchemaOK(ctx, is, table, r)
})

t.Run("column added", func(t *testing.T) {
is := is.New(t)

_, err := pool.Exec(ctx, fmt.Sprintf(`ALTER TABLE %s ADD COLUMN column6 timestamp;`, table))
is.NoErr(err)

_, err = pool.Exec(
ctx,
fmt.Sprintf(`INSERT INTO %s (id, key, column1, column2, column3, column4, column5, column6)
VALUES (7, decode('aabbcc', 'hex'), 'example data 1', 100, true, 12345.678, 12345, '2023-09-09 10:00:00');`, table),
)
is.NoErr(err)

r, err := i.Next(ctx)
is.NoErr(err)

assertPayloadSchemaOK(ctx, is, test.TestTableAvroSchemaV2, table, r)
assertKeySchemaOK(ctx, is, table, r)
})

t.Run("column removed", func(t *testing.T) {
is := is.New(t)

_, err := pool.Exec(ctx, fmt.Sprintf(`ALTER TABLE %s DROP COLUMN column4, DROP COLUMN column5;`, table))
is.NoErr(err)

_, err = pool.Exec(
ctx,
fmt.Sprintf(`INSERT INTO %s (id, key, column1, column2, column3, column6)
VALUES (8, decode('aabbcc', 'hex'), 'example data 1', 100, true, '2023-09-09 10:00:00');`, table),
)
is.NoErr(err)

r, err := i.Next(ctx)
is.NoErr(err)

assertPayloadSchemaOK(ctx, is, test.TestTableAvroSchemaV3, table, r)
assertKeySchemaOK(ctx, is, table, r)
})
}

func assertPayloadSchemaOK(ctx context.Context, is *is.I, wantSchemaTemplate string, table string, r opencdc.Record) {
gotConduitSch, err := getPayloadSchema(ctx, r)
is.NoErr(err)

want, err := avro.Parse(fmt.Sprintf(wantSchemaTemplate, table+"_payload"))
is.NoErr(err)

got, err := avro.ParseBytes(gotConduitSch.Bytes)
is.NoErr(err)

is.Equal(want.String(), got.String())
}

func assertKeySchemaOK(ctx context.Context, is *is.I, table string, r opencdc.Record) {
gotConduitSch, err := getKeySchema(ctx, r)
is.NoErr(err)

want, err := avro.Parse(fmt.Sprintf(test.TestTableKeyAvroSchema, table+"_key"))
is.NoErr(err)

got, err := avro.ParseBytes(gotConduitSch.Bytes)
is.NoErr(err)

is.Equal(want.String(), got.String())
}

func getPayloadSchema(ctx context.Context, r opencdc.Record) (schema.Schema, error) {
payloadSubj, err := r.Metadata.GetPayloadSchemaSubject()
if err != nil {
return schema.Schema{}, fmt.Errorf("GetPayloadSchemaSubject failed: %w", err)
}

payloadV, err := r.Metadata.GetPayloadSchemaVersion()
if err != nil {
return schema.Schema{}, fmt.Errorf("GetPayloadSchemaVersion failed: %w", err)
}

payloadSch, err := sdkschema.Get(ctx, payloadSubj, payloadV)
if err != nil {
return schema.Schema{}, fmt.Errorf("failed getting schema: %w", err)
}

return payloadSch, nil
}

func getKeySchema(ctx context.Context, r opencdc.Record) (schema.Schema, error) {
keySubj, err := r.Metadata.GetKeySchemaSubject()
if err != nil {
return schema.Schema{}, fmt.Errorf("GetKeySchemaSubject failed: %w", err)
}

keyV, err := r.Metadata.GetKeySchemaVersion()
if err != nil {
return schema.Schema{}, fmt.Errorf("GetKeySchemaVersion failed: %w", err)
}

keySch, err := sdkschema.Get(ctx, keySubj, keyV)
if err != nil {
return schema.Schema{}, fmt.Errorf("failed getting schema: %w", err)
}

return keySch, nil
}
13 changes: 5 additions & 8 deletions source/logrepl/combined.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ type Config struct {
TableKeys map[string]string
WithSnapshot bool
SnapshotFetchSize int
WithAvroSchema bool
}

// Validate performs validation tasks on the config.
Expand Down Expand Up @@ -179,7 +178,6 @@ func (c *CombinedIterator) initCDCIterator(ctx context.Context, pos position.Pos
PublicationName: c.conf.PublicationName,
Tables: c.conf.Tables,
TableKeys: c.conf.TableKeys,
WithAvroSchema: c.conf.WithAvroSchema,
})
if err != nil {
return fmt.Errorf("failed to create CDC iterator: %w", err)
Expand All @@ -201,12 +199,11 @@ func (c *CombinedIterator) initSnapshotIterator(ctx context.Context, pos positio
}

snapshotIterator, err := snapshot.NewIterator(ctx, c.pool, snapshot.Config{
Position: c.conf.Position,
Tables: c.conf.Tables,
TableKeys: c.conf.TableKeys,
TXSnapshotID: c.cdcIterator.TXSnapshotID(),
FetchSize: c.conf.SnapshotFetchSize,
WithAvroSchema: c.conf.WithAvroSchema,
Position: c.conf.Position,
Tables: c.conf.Tables,
TableKeys: c.conf.TableKeys,
TXSnapshotID: c.cdcIterator.TXSnapshotID(),
FetchSize: c.conf.SnapshotFetchSize,
})
if err != nil {
return fmt.Errorf("failed to create snapshot iterator: %w", err)
Expand Down
Loading