Skip to content

Commit

Permalink
update method calls
Browse files Browse the repository at this point in the history
  • Loading branch information
raulb committed Sep 3, 2024
1 parent 908c4f4 commit 9386361
Show file tree
Hide file tree
Showing 5 changed files with 59 additions and 28 deletions.
5 changes: 4 additions & 1 deletion pkg/orchestrator/orchestrator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
proc_builtin "github.com/conduitio/conduit/pkg/plugin/processor/builtin"
"github.com/conduitio/conduit/pkg/processor"
"github.com/google/go-cmp/cmp"
"github.com/jpillora/backoff"
"github.com/matryer/is"
"github.com/rs/zerolog"
"go.uber.org/mock/gomock"
Expand Down Expand Up @@ -90,10 +91,12 @@ func TestPipelineSimple(t *testing.T) {
nil,
)

b := &backoff.Backoff{}

orc := NewOrchestrator(
db,
logger,
pipeline.NewService(logger, db),
pipeline.NewService(logger, db, b),
connector.NewService(logger, db, connector.NewPersister(logger, db, time.Second, 3)),
processor.NewService(logger, db, procPluginService),
connPluginService,
Expand Down
2 changes: 0 additions & 2 deletions pkg/pipeline/lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -634,8 +634,6 @@ func (s *Service) runPipeline(ctx context.Context, pl *Instance) error {
} else {
pl.SetStatus(StatusRecovering)
// TODO: Implement backoff strategy
// check backoff strategy

}
}

Expand Down
27 changes: 18 additions & 9 deletions pkg/pipeline/lifecycle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
pmock "github.com/conduitio/conduit/pkg/plugin/connector/mock"
"github.com/conduitio/conduit/pkg/processor"
"github.com/google/uuid"
"github.com/jpillora/backoff"
"github.com/matryer/is"
"github.com/rs/zerolog"
"go.uber.org/mock/gomock"
Expand All @@ -50,8 +51,9 @@ func TestServiceLifecycle_buildNodes(t *testing.T) {
logger := log.New(zerolog.Nop())
db := &inmemory.DB{}
persister := connector.NewPersister(logger, db, time.Second, 3)
b := &backoff.Backoff{}

ps := NewService(logger, db)
ps := NewService(logger, db, b)

source := dummySource(persister)
destination := dummyDestination(persister)
Expand Down Expand Up @@ -133,8 +135,9 @@ func TestService_buildNodes_NoSourceNode(t *testing.T) {
logger := log.New(zerolog.Nop())
db := &inmemory.DB{}
persister := connector.NewPersister(logger, db, time.Second, 3)
b := &backoff.Backoff{}

ps := NewService(logger, db)
ps := NewService(logger, db, b)

wantErr := "can't build pipeline without any source connectors"

Expand Down Expand Up @@ -180,8 +183,9 @@ func TestService_buildNodes_NoDestinationNode(t *testing.T) {
logger := log.New(zerolog.Nop())
db := &inmemory.DB{}
persister := connector.NewPersister(logger, db, time.Second, 3)
b := &backoff.Backoff{}

ps := NewService(logger, db)
ps := NewService(logger, db, b)

wantErr := "can't build pipeline without any destination connectors"

Expand Down Expand Up @@ -228,8 +232,9 @@ func TestServiceLifecycle_PipelineSuccess(t *testing.T) {
db := &inmemory.DB{}
persister := connector.NewPersister(logger, db, time.Second, 3)
defer persister.Wait()
b := &backoff.Backoff{}

ps := NewService(logger, db)
ps := NewService(logger, db, b)

// create a host pipeline
pl, err := ps.Create(ctx, uuid.NewString(), Config{Name: "test pipeline"}, ProvisionTypeAPI)
Expand Down Expand Up @@ -287,8 +292,9 @@ func TestServiceLifecycle_PipelineError(t *testing.T) {
logger := log.Test(t)
db := &inmemory.DB{}
persister := connector.NewPersister(logger, db, time.Second, 3)
b := &backoff.Backoff{}

ps := NewService(logger, db)
ps := NewService(logger, db, b)

// create a host pipeline
pl, err := ps.Create(ctx, uuid.NewString(), Config{Name: "test pipeline"}, ProvisionTypeAPI)
Expand Down Expand Up @@ -371,8 +377,9 @@ func TestServiceLifecycle_StopAll_Recovering(t *testing.T) {
logger := log.New(zerolog.Nop())
db := &inmemory.DB{}
persister := connector.NewPersister(logger, db, time.Second, 3)
b := &backoff.Backoff{}

ps := NewService(logger, db)
ps := NewService(logger, db, b)

// create a host pipeline
pl, err := ps.Create(ctx, uuid.NewString(), Config{Name: "test pipeline"}, ProvisionTypeAPI)
Expand Down Expand Up @@ -472,8 +479,9 @@ func TestServiceLifecycle_PipelineStop(t *testing.T) {
logger := log.New(zerolog.Nop())
db := &inmemory.DB{}
persister := connector.NewPersister(logger, db, time.Second, 3)
b := &backoff.Backoff{}

ps := NewService(logger, db)
ps := NewService(logger, db, b)

// create a host pipeline
pl, err := ps.Create(ctx, uuid.NewString(), Config{Name: "test pipeline"}, ProvisionTypeAPI)
Expand Down Expand Up @@ -533,8 +541,9 @@ func TestService_Run_Rerun(t *testing.T) {
logger := log.Test(t)
db := &inmemory.DB{}
persister := connector.NewPersister(logger, db, time.Second, 3)
b := &backoff.Backoff{}

ps := NewService(logger, db)
ps := NewService(logger, db, b)

// create a host pipeline
pl, err := ps.Create(ctx, uuid.NewString(), Config{Name: "test pipeline"}, ProvisionTypeAPI)
Expand Down Expand Up @@ -571,7 +580,7 @@ func TestService_Run_Rerun(t *testing.T) {
is.NoErr(err)

// create a new pipeline service and initialize it
ps = NewService(logger, db)
ps = NewService(logger, db, b)
err = ps.Init(ctx)
is.NoErr(err)
err = ps.Run(
Expand Down
45 changes: 30 additions & 15 deletions pkg/pipeline/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/conduitio/conduit/pkg/foundation/cerrors"
"github.com/conduitio/conduit/pkg/foundation/log"
"github.com/google/uuid"
"github.com/jpillora/backoff"
"github.com/matryer/is"
"go.uber.org/mock/gomock"
)
Expand All @@ -34,15 +35,16 @@ func TestService_Init_Simple(t *testing.T) {
ctx := context.Background()
logger := log.Nop()
db := &inmemory.DB{}
b := &backoff.Backoff{}

service := NewService(logger, db)
service := NewService(logger, db, b)
_, err := service.Create(ctx, uuid.NewString(), Config{Name: "test-pipeline"}, ProvisionTypeAPI)
is.NoErr(err)

want := service.List(ctx)

// create a new pipeline service and initialize it
service = NewService(logger, db)
service = NewService(logger, db, b)
err = service.Init(ctx)
is.NoErr(err)

Expand All @@ -61,6 +63,7 @@ func TestService_Check(t *testing.T) {
ctx := context.Background()
logger := log.Nop()
db := mock.NewDB(gomock.NewController(t))
b := &backoff.Backoff{}

testCases := []struct {
name string
Expand All @@ -80,7 +83,7 @@ func TestService_Check(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
is := is.New(t)
db.EXPECT().Ping(gomock.Any()).Return(tc.wantErr)
service := NewService(logger, db)
service := NewService(logger, db, b)

gotErr := service.Check(ctx)
is.Equal(tc.wantErr, gotErr)
Expand All @@ -92,8 +95,9 @@ func TestService_CreateSuccess(t *testing.T) {
ctx := context.Background()
logger := log.Nop()
db := &inmemory.DB{}
b := &backoff.Backoff{}

service := NewService(logger, db)
service := NewService(logger, db, b)

testCases := []struct {
id string
Expand Down Expand Up @@ -151,8 +155,9 @@ func TestService_Create_ValidateSuccess(t *testing.T) {
ctx := context.Background()
logger := log.Nop()
db := &inmemory.DB{}
b := &backoff.Backoff{}

service := NewService(logger, db)
service := NewService(logger, db, b)

testCases := []struct {
name string
Expand Down Expand Up @@ -193,8 +198,9 @@ func TestService_Create_ValidateError(t *testing.T) {
ctx := context.Background()
logger := log.Nop()
db := &inmemory.DB{}
b := &backoff.Backoff{}

service := NewService(logger, db)
service := NewService(logger, db, b)

testCases := []struct {
name string
Expand Down Expand Up @@ -262,8 +268,9 @@ func TestService_Create_PipelineNameExists(t *testing.T) {
ctx := context.Background()
logger := log.Nop()
db := &inmemory.DB{}
b := &backoff.Backoff{}

service := NewService(logger, db)
service := NewService(logger, db, b)

conf := Config{Name: "test-pipeline"}
got, err := service.Create(ctx, uuid.NewString(), conf, ProvisionTypeAPI)
Expand All @@ -279,8 +286,9 @@ func TestService_CreateEmptyName(t *testing.T) {
ctx := context.Background()
logger := log.Nop()
db := &inmemory.DB{}
b := &backoff.Backoff{}

service := NewService(logger, db)
service := NewService(logger, db, b)
got, err := service.Create(ctx, uuid.NewString(), Config{Name: ""}, ProvisionTypeAPI)
is.True(err != nil)
is.Equal(got, nil)
Expand All @@ -291,8 +299,9 @@ func TestService_GetSuccess(t *testing.T) {
ctx := context.Background()
logger := log.Nop()
db := &inmemory.DB{}
b := &backoff.Backoff{}

service := NewService(logger, db)
service := NewService(logger, db, b)
want, err := service.Create(ctx, uuid.NewString(), Config{Name: "test-pipeline"}, ProvisionTypeAPI)
is.NoErr(err)

Expand All @@ -306,8 +315,9 @@ func TestService_GetInstanceNotFound(t *testing.T) {
ctx := context.Background()
logger := log.Nop()
db := &inmemory.DB{}
b := &backoff.Backoff{}

service := NewService(logger, db)
service := NewService(logger, db, b)

// get pipeline instance that does not exist
got, err := service.Get(ctx, uuid.NewString())
Expand All @@ -321,8 +331,9 @@ func TestService_DeleteSuccess(t *testing.T) {
ctx := context.Background()
logger := log.Nop()
db := &inmemory.DB{}
b := &backoff.Backoff{}

service := NewService(logger, db)
service := NewService(logger, db, b)
instance, err := service.Create(ctx, uuid.NewString(), Config{Name: "test-pipeline"}, ProvisionTypeAPI)
is.NoErr(err)

Expand All @@ -339,8 +350,9 @@ func TestService_List(t *testing.T) {
ctx := context.Background()
logger := log.Nop()
db := &inmemory.DB{}
b := &backoff.Backoff{}

service := NewService(logger, db)
service := NewService(logger, db, b)

want := make(map[string]*Instance)
for i := 0; i < 10; i++ {
Expand All @@ -358,8 +370,9 @@ func TestService_UpdateSuccess(t *testing.T) {
ctx := context.Background()
logger := log.Nop()
db := &inmemory.DB{}
b := &backoff.Backoff{}

service := NewService(logger, db)
service := NewService(logger, db, b)
instance, err := service.Create(ctx, uuid.NewString(), Config{Name: "test-pipeline"}, ProvisionTypeAPI)
is.NoErr(err)

Expand All @@ -378,8 +391,9 @@ func TestService_Update_PipelineNameExists(t *testing.T) {
ctx := context.Background()
logger := log.Nop()
db := &inmemory.DB{}
b := &backoff.Backoff{}

service := NewService(logger, db)
service := NewService(logger, db, b)
_, err := service.Create(ctx, uuid.NewString(), Config{Name: "test-pipeline"}, ProvisionTypeAPI)
is.NoErr(err)
instance2, err2 := service.Create(ctx, uuid.NewString(), Config{Name: "test-pipeline2"}, ProvisionTypeAPI)
Expand All @@ -400,8 +414,9 @@ func TestService_UpdateInvalidConfig(t *testing.T) {
ctx := context.Background()
logger := log.Nop()
db := &inmemory.DB{}
b := &backoff.Backoff{}

service := NewService(logger, db)
service := NewService(logger, db, b)
instance, err := service.Create(ctx, uuid.NewString(), Config{Name: "test-pipeline"}, ProvisionTypeAPI)
is.NoErr(err)

Expand Down
8 changes: 7 additions & 1 deletion pkg/provisioning/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
p4 "github.com/conduitio/conduit/pkg/provisioning/test/pipelines4-integration-test"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/jpillora/backoff"
"github.com/matryer/is"
"github.com/rs/zerolog"
"go.uber.org/mock/gomock"
Expand Down Expand Up @@ -513,8 +514,13 @@ func TestService_IntegrationTestServices(t *testing.T) {
proc_builtin.NewRegistry(logger, proc_builtin.DefaultBuiltinProcessors, schemaRegistry),
nil,
)
b := &backoff.Backoff{
Factor: 2,
Min: time.Millisecond * 100,
Max: time.Second, // 8 tries
}

plService := pipeline.NewService(logger, db)
plService := pipeline.NewService(logger, db, b)
connService := connector.NewService(logger, db, connector.NewPersister(logger, db, time.Second, 3))
procService := processor.NewService(logger, db, procPluginService)

Expand Down

0 comments on commit 9386361

Please sign in to comment.