Skip to content

Commit

Permalink
refactor SimplePool automatic reconnection.
Browse files Browse the repository at this point in the history
  • Loading branch information
fiatjaf committed Dec 2, 2023
1 parent a65bd94 commit 0e0ecb2
Showing 1 changed file with 13 additions and 17 deletions.
30 changes: 13 additions & 17 deletions pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,15 +79,6 @@ func (pool *SimplePool) subMany(ctx context.Context, urls []string, filters Filt
ticker := time.NewTicker(seenAlreadyDropTick)
eose := false

updateSince := func() {
// After reconnection, update the since in the filter so that
// old events are not retrieved.
now := Now()
for i := range filters {
filters[i].Since = &now
}
}

pending := xsync.NewCounter()
pending.Add(int64(len(urls)))
for _, url := range urls {
Expand All @@ -107,26 +98,25 @@ func (pool *SimplePool) subMany(ctx context.Context, urls []string, filters Filt
default:
}

var sub *Subscription

relay, err := pool.EnsureRelay(nm)
if err != nil {
time.Sleep(3 * time.Second)
updateSince()
continue
goto reconnect
}

sub, err := relay.Subscribe(ctx, filters)
sub, err = relay.Subscribe(ctx, filters)
if err != nil {
time.Sleep(3 * time.Second)
updateSince()
continue
goto reconnect
}

loop:
for {
select {
case evt, more := <-sub.Events:
if !more {
break loop
goto reconnect
}
if unique {
if _, seen := seenAlready.LoadOrStore(evt.ID, evt.CreatedAt); seen {
Expand Down Expand Up @@ -156,7 +146,13 @@ func (pool *SimplePool) subMany(ctx context.Context, urls []string, filters Filt
return
}
}
updateSince()

reconnect:
// when attempting to reconnect update the `since` in filters so old events are not retrieved
now := Now()
for i := range filters {
filters[i].Since = &now
}
}
}(NormalizeURL(url))
}
Expand Down

0 comments on commit 0e0ecb2

Please sign in to comment.