Skip to content

Commit

Permalink
implement Read on sourceWithSchemaExtraction middleware (#249)
Browse files Browse the repository at this point in the history
Co-authored-by: Haris Osmanagić <haris@meroxa.io>
  • Loading branch information
lovromazgon and hariso authored Jan 31, 2025
1 parent c51cb03 commit c1c9938
Show file tree
Hide file tree
Showing 2 changed files with 272 additions and 0 deletions.
16 changes: 16 additions & 0 deletions source_middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,22 @@ type sourceWithSchemaExtraction struct {
keyWarnOnce sync.Once
}

func (s *sourceWithSchemaExtraction) Read(ctx context.Context) (opencdc.Record, error) {
rec, err := s.Source.Read(ctx)
if err != nil || (!*s.config.KeyEnabled && !*s.config.PayloadEnabled) {
return rec, err
}

if err := s.extractAttachKeySchema(ctx, &rec); err != nil {
return rec, err
}
if err := s.extractAttachPayloadSchema(ctx, &rec); err != nil {
return rec, err
}

return rec, nil
}

func (s *sourceWithSchemaExtraction) ReadN(ctx context.Context, n int) ([]opencdc.Record, error) {
recs, err := s.Source.ReadN(ctx, n)
if err != nil || (!*s.config.KeyEnabled && !*s.config.PayloadEnabled) {
Expand Down
256 changes: 256 additions & 0 deletions source_middleware_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,262 @@ func TestSourceWithSchemaExtraction_Read(t *testing.T) {
wantPayloadSubject: customTestSchema.Subject,
}}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
ctrl := gomock.NewController(t)
src := NewMockSource(ctrl)

s := (&SourceWithSchemaExtraction{
SchemaTypeStr: schema.TypeAvro.String(),
PayloadEnabled: lang.Ptr(true),
PayloadSubject: lang.Ptr("payload"),
KeyEnabled: lang.Ptr(true),
KeySubject: lang.Ptr("key"),
}).Wrap(src)

src.EXPECT().Read(ctx).Return(tc.record, nil)

var wantKey, wantPayloadBefore, wantPayloadAfter opencdc.Data
if tc.record.Key != nil {
wantKey = tc.record.Key.Clone()
}
if tc.record.Payload.Before != nil {
wantPayloadBefore = tc.record.Payload.Before.Clone()
}
if tc.record.Payload.After != nil {
wantPayloadAfter = tc.record.Payload.After.Clone()
}

got, err := s.Read(ctx)
is.NoErr(err)

gotKey := got.Key
gotPayloadBefore := got.Payload.Before
gotPayloadAfter := got.Payload.After

if _, ok := wantKey.(opencdc.StructuredData); ok {
subject, err := got.Metadata.GetKeySchemaSubject()
is.NoErr(err)
version, err := got.Metadata.GetKeySchemaVersion()
is.NoErr(err)

is.Equal(subject, tc.wantKeySubject)
is.Equal(version, 1)

sch, err := schema.Get(ctx, subject, version)
is.NoErr(err)

is.Equal("", cmp.Diff(wantSchema, string(sch.Bytes)))
} else {
_, err := got.Metadata.GetKeySchemaSubject()
is.True(errors.Is(err, opencdc.ErrMetadataFieldNotFound))
_, err = got.Metadata.GetKeySchemaVersion()
is.True(errors.Is(err, opencdc.ErrMetadataFieldNotFound))
}

_, isPayloadBeforeStructured := wantPayloadBefore.(opencdc.StructuredData)
_, isPayloadAfterStructured := wantPayloadAfter.(opencdc.StructuredData)
if isPayloadBeforeStructured || isPayloadAfterStructured {
subject, err := got.Metadata.GetPayloadSchemaSubject()
is.NoErr(err)
version, err := got.Metadata.GetPayloadSchemaVersion()
is.NoErr(err)

is.Equal(subject, tc.wantPayloadSubject)
is.Equal(version, 1)

sch, err := schema.Get(ctx, subject, version)
is.NoErr(err)

is.Equal("", cmp.Diff(wantSchema, string(sch.Bytes)))
} else {
_, err := got.Metadata.GetPayloadSchemaSubject()
is.True(errors.Is(err, opencdc.ErrMetadataFieldNotFound))
_, err = got.Metadata.GetPayloadSchemaVersion()
is.True(errors.Is(err, opencdc.ErrMetadataFieldNotFound))
}

is.Equal("", cmp.Diff(wantKey, gotKey))
is.Equal("", cmp.Diff(wantPayloadBefore, gotPayloadBefore))
is.Equal("", cmp.Diff(wantPayloadAfter, gotPayloadAfter))
})
}
}

func TestSourceWithSchemaExtraction_ReadN(t *testing.T) {
is := is.New(t)
ctx := context.Background()

testStructuredData := opencdc.StructuredData{
"foo": "bar",
"long": int64(1),
"float": 2.34,
"time": time.Now().UTC().Truncate(time.Microsecond), // avro precision is microseconds
}
wantSchema := `{"name":"record","type":"record","fields":[{"name":"float","type":"double"},{"name":"foo","type":"string"},{"name":"long","type":"long"},{"name":"time","type":{"type":"long","logicalType":"timestamp-micros"}}]}`

customTestSchema, err := schema.Create(ctx, schema.TypeAvro, "custom-test-schema", []byte(wantSchema))
is.NoErr(err)

testCases := []struct {
name string
record opencdc.Record
wantKeySubject string
wantPayloadSubject string
}{{
name: "no key, no payload",
record: opencdc.Record{
Key: nil,
Payload: opencdc.Change{
Before: nil,
After: nil,
},
},
wantKeySubject: "key",
wantPayloadSubject: "payload",
}, {
name: "raw key",
record: opencdc.Record{
Key: opencdc.RawData("this should not be encoded"),
Payload: opencdc.Change{
Before: nil,
After: nil,
},
},
wantKeySubject: "key",
wantPayloadSubject: "payload",
}, {
name: "structured key",
record: opencdc.Record{
Key: testStructuredData.Clone(),
Payload: opencdc.Change{
Before: nil,
After: nil,
},
},
wantKeySubject: "key",
wantPayloadSubject: "payload",
}, {
name: "raw payload before",
record: opencdc.Record{
Key: nil,
Payload: opencdc.Change{
Before: opencdc.RawData("this should not be encoded"),
After: nil,
},
},
wantKeySubject: "key",
wantPayloadSubject: "payload",
}, {
name: "structured payload before",
record: opencdc.Record{
Key: nil,
Payload: opencdc.Change{
Before: testStructuredData.Clone(),
},
},
wantKeySubject: "key",
wantPayloadSubject: "payload",
}, {
name: "raw payload after",
record: opencdc.Record{
Key: nil,
Payload: opencdc.Change{
Before: nil,
After: opencdc.RawData("this should not be encoded"),
},
},
wantKeySubject: "key",
wantPayloadSubject: "payload",
}, {
name: "structured payload after",
record: opencdc.Record{
Key: nil,
Payload: opencdc.Change{
Before: nil,
After: testStructuredData.Clone(),
},
},
wantKeySubject: "key",
wantPayloadSubject: "payload",
}, {
name: "all structured",
record: opencdc.Record{
Key: testStructuredData.Clone(),
Payload: opencdc.Change{
Before: testStructuredData.Clone(),
After: testStructuredData.Clone(),
},
},
wantKeySubject: "key",
wantPayloadSubject: "payload",
}, {
name: "all raw",
record: opencdc.Record{
Key: opencdc.RawData("this should not be encoded"),
Payload: opencdc.Change{
Before: opencdc.RawData("this should not be encoded"),
After: opencdc.RawData("this should not be encoded"),
},
},
wantKeySubject: "key",
wantPayloadSubject: "payload",
}, {
name: "key raw payload structured",
record: opencdc.Record{
Key: opencdc.RawData("this should not be encoded"),
Payload: opencdc.Change{
Before: nil,
After: testStructuredData.Clone(),
},
},
wantKeySubject: "key",
wantPayloadSubject: "payload",
}, {
name: "key structured payload raw",
record: opencdc.Record{
Key: testStructuredData.Clone(),
Payload: opencdc.Change{
Before: opencdc.RawData("this should not be encoded"),
After: nil,
},
},
wantKeySubject: "key",
wantPayloadSubject: "payload",
}, {
name: "all structured with collection",
record: opencdc.Record{
Metadata: map[string]string{
opencdc.MetadataCollection: "foo",
},
Key: testStructuredData.Clone(),
Payload: opencdc.Change{
Before: testStructuredData.Clone(),
After: testStructuredData.Clone(),
},
},
wantKeySubject: "foo.key",
wantPayloadSubject: "foo.payload",
}, {
name: "all structured with collection and predefined schema",
record: opencdc.Record{
Metadata: map[string]string{
opencdc.MetadataCollection: "foo",
opencdc.MetadataKeySchemaSubject: customTestSchema.Subject,
opencdc.MetadataKeySchemaVersion: strconv.Itoa(customTestSchema.Version),
opencdc.MetadataPayloadSchemaSubject: customTestSchema.Subject,
opencdc.MetadataPayloadSchemaVersion: strconv.Itoa(customTestSchema.Version),
},
Key: testStructuredData.Clone(),
Payload: opencdc.Change{
Before: testStructuredData.Clone(),
After: testStructuredData.Clone(),
},
},
wantKeySubject: customTestSchema.Subject,
wantPayloadSubject: customTestSchema.Subject,
}}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
ctrl := gomock.NewController(t)
Expand Down

0 comments on commit c1c9938

Please sign in to comment.