Skip to content

Commit

Permalink
Add options to event to change continue name
Browse files Browse the repository at this point in the history
  • Loading branch information
alinz committed Aug 8, 2024
1 parent b2810bc commit f6962e6
Showing 1 changed file with 43 additions and 13 deletions.
56 changes: 43 additions & 13 deletions event/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,13 @@ type Func func(ctx context.Context, wdb *sqlite.Worker, evt *bus.Event) error
type FuncsMap map[string][]Func

type Sync struct {
wdb *sqlite.Worker
busClient *client.Client
mapper FuncsMap
mux sync.RWMutex
locked bool
subject string
wdb *sqlite.Worker
busClient *client.Client
mapper FuncsMap
mux sync.RWMutex
locked bool
subject string
continueName string
}

func (s *Sync) Register(subject string, fn Func) {
Expand Down Expand Up @@ -104,7 +105,7 @@ func (s *Sync) Continue(ctx context.Context) error {
ctx,
bus.WithBatchSize(10),
bus.WithDurable(),
bus.WithId("event.sync.continue"),
bus.WithId(s.continueName),
bus.WithFromNewest(),
bus.WithSubject(s.subject),
) {
Expand All @@ -125,13 +126,42 @@ func (s *Sync) Continue(ctx context.Context) error {
return nil
}

func NewSync(wdb *sqlite.Worker, busClient *client.Client, subject string) *Sync {
return &Sync{
wdb: wdb,
mapper: make(FuncsMap),
busClient: busClient,
subject: subject,
type syncOption interface {
configureSync(*Sync)
}

type syncOptionFunc func(*Sync)

func (f syncOptionFunc) configureSync(s *Sync) {
f(s)
}

func WithSubject(subject string) syncOption {
return syncOptionFunc(func(s *Sync) {
s.subject = subject
})
}

func WithContinueName(name string) syncOption {
return syncOptionFunc(func(s *Sync) {
s.continueName = name
})
}

func NewSync(wdb *sqlite.Worker, busClient *client.Client, opts ...syncOption) *Sync {
sync := &Sync{
wdb: wdb,
mapper: make(FuncsMap),
busClient: busClient,
subject: "*",
continueName: "event.sync.continue",
}

for _, opt := range opts {
opt.configureSync(sync)
}

return sync
}

func FnDB[T any](fn func(context.Context, *sqlite.Worker, *T) error) Func {
Expand Down

0 comments on commit f6962e6

Please sign in to comment.