Skip to content

Commit

Permalink
Trigger Lifecycle Events (#963)
Browse files Browse the repository at this point in the history
* add LastActiveConfig to connector instance, refactor source

* trigger create and update events in source connectors

* trigger events in source

* source tests for created and updated events

* source tests for deleted events

* destination lifecycle events

* update orchestrator and provisioning service

* backwards compatibility of lifecycle events

* organize imports

* rename method receiver

* comment LastActiveConfig
  • Loading branch information
lovromazgon authored Apr 3, 2023
1 parent 073647c commit 166dfe9
Show file tree
Hide file tree
Showing 17 changed files with 789 additions and 150 deletions.
187 changes: 155 additions & 32 deletions pkg/connector/destination.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,55 +53,62 @@ func (d *Destination) Errors() <-chan error {
return d.errs
}

// init dispenses the plugin and configures it.
func (d *Destination) initPlugin(ctx context.Context) (plugin.DestinationPlugin, error) {
d.Instance.logger.Debug(ctx).Msg("starting destination connector plugin")
dest, err := d.dispenser.DispenseDestination()
if err != nil {
return nil, err
}

d.Instance.logger.Debug(ctx).Msg("configuring destination connector plugin")
err = dest.Configure(ctx, d.Instance.Config.Settings)
if err != nil {
tdErr := dest.Teardown(ctx)
err = cerrors.LogOrReplace(err, tdErr, func() {
d.Instance.logger.Err(ctx, tdErr).Msg("could not tear down destination connector plugin")
})
return nil, err
}

return dest, nil
}

func (d *Destination) Open(ctx context.Context) error {
func (d *Destination) Open(ctx context.Context) (err error) {
d.Instance.Lock()
defer d.Instance.Unlock()
if d.Instance.connector != nil {
// this shouldn't actually happen, it indicates a problem elsewhere
return cerrors.New("another instance of the connector is already running")
}

dest, err := d.initPlugin(ctx)
d.Instance.logger.Debug(ctx).Msg("dispensing destination connector plugin")
d.plugin, err = d.dispenser.DispenseDestination()
if err != nil {
return err
}

defer func() {
// ensure the plugin gets torn down if something bad happens
if err != nil {
tdErr := d.plugin.Teardown(ctx)
if tdErr != nil {
d.Instance.logger.Err(ctx, tdErr).Msg("could not tear down destination connector plugin")
}
d.plugin = nil
}
}()

err = d.configure(ctx)
if err != nil {
return err
}

streamCtx, cancelStreamCtx := context.WithCancel(ctx)
err = dest.Start(streamCtx)
lifecycleEventTriggered, err := d.triggerLifecycleEvent(ctx, d.Instance.LastActiveConfig.Settings, d.Instance.Config.Settings)
if err != nil {
cancelStreamCtx()
tdErr := dest.Teardown(ctx)
err = cerrors.LogOrReplace(err, tdErr, func() {
d.Instance.logger.Err(ctx, tdErr).Msg("could not tear down destination connector plugin")
return err
}

if lifecycleEventTriggered {
// when a lifecycle event is successfully triggered we consider the config active
d.Instance.LastActiveConfig = d.Instance.Config
// persist connector in the next batch to store last active config
err := d.Instance.persister.Persist(ctx, d.Instance, func(err error) {
if err != nil {
d.errs <- err
}
})
if err != nil {
return err
}
}

err = d.start(ctx)
if err != nil {
return err
}

d.Instance.logger.Info(ctx).Msg("destination connector plugin successfully started")

d.plugin = dest
d.stopStream = cancelStreamCtx
d.Instance.connector = d
if d.Instance.ProvisionedBy != ProvisionTypeDLQ {
// DLQ connectors are not persisted
Expand Down Expand Up @@ -131,7 +138,7 @@ func (d *Destination) Stop(ctx context.Context, lastPosition record.Position) er
}

func (d *Destination) Teardown(ctx context.Context) error {
// lock source as we are about to mutate the plugin field
// lock destination as we are about to mutate the plugin field
d.Instance.Lock()
defer d.Instance.Unlock()
if d.plugin == nil {
Expand Down Expand Up @@ -190,6 +197,37 @@ func (d *Destination) Ack(ctx context.Context) (record.Position, error) {
return d.plugin.Ack(ctx)
}

func (d *Destination) OnDelete(ctx context.Context) (err error) {
if d.Instance.LastActiveConfig.Settings == nil {
return nil // the connector was never started, nothing to trigger
}

d.Instance.Lock()
defer d.Instance.Unlock()

d.Instance.logger.Debug(ctx).Msg("dispensing destination connector plugin")
d.plugin, err = d.dispenser.DispenseDestination()
if err != nil {
return err
}

_, err = d.triggerLifecycleEvent(ctx, d.Instance.LastActiveConfig.Settings, nil)

// call teardown to close plugin regardless of the error
tdErr := d.plugin.Teardown(ctx)

d.plugin = nil

err = cerrors.LogOrReplace(err, tdErr, func() {
d.Instance.logger.Err(ctx, tdErr).Msg("could not tear down destination connector plugin")
})
if err != nil {
return cerrors.Errorf("could not trigger lifecycle event: %w", err)
}

return nil
}

// preparePluginCall makes sure the plugin is running and registers a new plugin
// call in the wait group. The returned function should be called in a deferred
// statement to signal the plugin call is over.
Expand All @@ -203,3 +241,88 @@ func (d *Destination) preparePluginCall() (func(), error) {
d.wg.Add(1)
return d.wg.Done, nil
}

func (d *Destination) configure(ctx context.Context) error {
d.Instance.logger.Trace(ctx).Msg("configuring destination connector plugin")
err := d.plugin.Configure(ctx, d.Instance.Config.Settings)
if err != nil {
return cerrors.Errorf("could not configure destination connector plugin: %w", err)
}
return nil
}

func (d *Destination) triggerLifecycleEvent(ctx context.Context, oldConfig, newConfig map[string]string) (ok bool, err error) {
if d.isEqual(oldConfig, newConfig) {
return false, nil // nothing to do, last active config is the same as current one
}

defer func() {
if cerrors.Is(err, plugin.ErrUnimplemented) {
d.Instance.logger.Trace(ctx).Msg("lifecycle events not implemented on destination connector plugin (it's probably an older connector)")
err = nil // ignore error to stay backwards compatible
}
}()

switch {
// created
case oldConfig == nil && newConfig != nil:
d.Instance.logger.Trace(ctx).Msg("triggering lifecycle event \"created\" on destination connector plugin")
err := d.plugin.LifecycleOnCreated(ctx, newConfig)
if err != nil {
return false, cerrors.Errorf("error while triggering lifecycle event \"created\": %w", err)
}
return true, nil

// updated
case oldConfig != nil && newConfig != nil:
d.Instance.logger.Trace(ctx).Msg("triggering lifecycle event \"updated\" on destination connector plugin")
err := d.plugin.LifecycleOnUpdated(ctx, oldConfig, newConfig)
if err != nil {
return false, cerrors.Errorf("error while triggering lifecycle event \"updated\": %w", err)
}
return true, nil

// deleted
case oldConfig != nil && newConfig == nil:
d.Instance.logger.Trace(ctx).Msg("triggering lifecycle event \"deleted\" on destination connector plugin")
err := d.plugin.LifecycleOnDeleted(ctx, oldConfig)
if err != nil {
return false, cerrors.Errorf("error while triggering lifecycle event \"deleted\": %w", err)
}
return true, nil

// default should never happen
default:
d.Instance.logger.Warn(ctx).
Any("oldConfig", oldConfig).
Any("newConfig", newConfig).
Msg("unexpected combination of old and new config")
// don't return an error when no event was triggered, strictly speaking
// the action did not fail
return false, nil
}
}

func (d *Destination) start(ctx context.Context) error {
d.Instance.logger.Trace(ctx).Msg("starting destination connector plugin")
ctx, d.stopStream = context.WithCancel(ctx)
err := d.plugin.Start(ctx)
if err != nil {
d.stopStream()
d.stopStream = nil
return cerrors.Errorf("could not start destination connector plugin: %w", err)
}
return nil
}

func (*Destination) isEqual(cfg1, cfg2 map[string]string) bool {
if len(cfg1) != len(cfg2) {
return false
}
for k, v := range cfg1 {
if w, ok := cfg2[k]; !ok || v != w {
return false
}
}
return (cfg1 != nil) == (cfg2 != nil)
}
Loading

0 comments on commit 166dfe9

Please sign in to comment.