Skip to content

Commit

Permalink
encode connectors in persister only on flush (#1249)
Browse files Browse the repository at this point in the history
  • Loading branch information
lovromazgon authored Oct 24, 2023
1 parent 3111797 commit 3145202
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 6 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ require (
github.com/dop251/goja v0.0.0-20230531210528-d7324b2d74f7
github.com/dop251/goja_nodejs v0.0.0-20230602164024-804a84515562
github.com/gammazero/deque v0.2.1
github.com/goccy/go-json v0.10.2
github.com/golang/mock v1.6.0
github.com/google/go-cmp v0.6.0
github.com/google/uuid v1.3.1
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,8 @@ github.com/go-sourcemap/sourcemap v2.1.3+incompatible h1:W1iEw64niKVGogNgBN3ePyL
github.com/go-sourcemap/sourcemap v2.1.3+incompatible/go.mod h1:F8jJfvm2KbVjc5NqelyYJmf/v5J0dwNLS2mL4sNA1Jg=
github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg=
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
github.com/goccy/go-json v0.10.2 h1:CrxCmQqYDkv1z7lO7Wbh2HN93uovUHgrECaO5ZrCXAU=
github.com/goccy/go-json v0.10.2/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I=
github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
github.com/gofrs/flock v0.8.1 h1:+gYjHKf32LDeiEEFhQaotPbLuUXjY5ZqxKgXy7n59aw=
github.com/gofrs/uuid v4.0.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM=
Expand Down
2 changes: 1 addition & 1 deletion pkg/connector/persister.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (

const (
DefaultPersisterDelayThreshold = time.Second
DefaultPersisterBundleCountThreshold = 100
DefaultPersisterBundleCountThreshold = 10000
)

// Persister is responsible for persisting connectors and their state when
Expand Down
27 changes: 22 additions & 5 deletions pkg/connector/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@ package connector

import (
"context"
"encoding/json"
"strings"
"time"

"github.com/conduitio/conduit/pkg/foundation/cerrors"
"github.com/conduitio/conduit/pkg/foundation/database"
"github.com/conduitio/conduit/pkg/foundation/log"
"github.com/goccy/go-json"
)

const (
Expand Down Expand Up @@ -148,13 +148,30 @@ func (s *Store) PrepareSet(id string, instance *Instance) (func(context.Context)
return nil, cerrors.Errorf("can't store connector: %w", cerrors.ErrEmptyID)
}

raw, err := s.encode(instance)
if err != nil {
return nil, err
icopy := Instance{
ID: instance.ID,
Type: instance.Type,
Config: Config{
Name: instance.Config.Name,
Settings: instance.Config.Settings,
},
PipelineID: instance.PipelineID,
Plugin: instance.Plugin,
ProcessorIDs: instance.ProcessorIDs,
ProvisionedBy: instance.ProvisionedBy,
State: instance.State,
CreatedAt: instance.CreatedAt,
UpdatedAt: instance.UpdatedAt,
LastActiveConfig: instance.LastActiveConfig,
}
key := s.addKeyPrefix(id)

return func(ctx context.Context) error {
raw, err := s.encode(&icopy)
if err != nil {
return err
}
key := s.addKeyPrefix(id)

err = s.db.Set(ctx, key, raw)
if err != nil {
return cerrors.Errorf("failed to store connector with ID %q: %w", id, err)
Expand Down

0 comments on commit 3145202

Please sign in to comment.