Skip to content

Commit

Permalink
Fix couple of bugs
Browse files Browse the repository at this point in the history
  • Loading branch information
alinz committed Aug 16, 2024
1 parent 035cab6 commit 5a2bc26
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 11 deletions.
16 changes: 5 additions & 11 deletions event/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ import (
"fmt"
"iter"
"strings"
"sync"

"sync/atomic"
"time"

"ella.to/bus"
Expand All @@ -22,8 +23,7 @@ type Sync struct {
wdb *sqlite.Worker
busClient *client.Client
mapper FuncsMap
mux sync.RWMutex
locked bool
locked atomic.Int32
subject string
continueName string
}
Expand All @@ -41,20 +41,14 @@ func (s *Sync) Register(subject string, fn Func) {
}

func (s *Sync) Lock() error {
s.mux.Lock()
defer s.mux.Unlock()
if s.locked {
if !s.locked.CompareAndSwap(0, 1) {
return fmt.Errorf("event.Sync.Lock already called")
}

s.locked = true
return nil
}

func (s *Sync) IsLocked() bool {
s.mux.RLock()
defer s.mux.RUnlock()
return s.locked
return s.locked.Load() == 1
}

func (s *Sync) Once(ctx context.Context, wait time.Duration) error {
Expand Down
1 change: 1 addition & 0 deletions storage/sqlite.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ func (s *Sqlite) LoadConsumerById(ctx context.Context, consumerId string) (consu

if !hasRow {
err = ErrConsumerNotFound
return
}

consumer = loadConsumer(stmt)
Expand Down

0 comments on commit 5a2bc26

Please sign in to comment.