-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy patherrgroup.go
119 lines (96 loc) · 2.35 KB
/
errgroup.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
package elephantine
import (
"context"
"fmt"
"log/slog"
"time"
"golang.org/x/sync/errgroup"
)
type BackoffFunction func(retry int) time.Duration
func NewErrGroup(ctx context.Context, logger *slog.Logger) *ErrGroup {
grp, gCtx := errgroup.WithContext(ctx)
eg := ErrGroup{
logger: logger,
grp: grp,
gCtx: gCtx,
}
return &eg
}
type ErrGroup struct {
logger *slog.Logger
grp *errgroup.Group
gCtx context.Context
}
func (eg *ErrGroup) Go(task string, fn func(ctx context.Context) error) {
eg.grp.Go(func() error {
eg.logger.Info("starting task",
LogKeyName, task)
defer eg.logger.Info("stopped task",
LogKeyName, task)
err := fn(eg.gCtx)
if err != nil {
return fmt.Errorf("%s: %w", task, err)
}
return nil
})
}
// GoWithRetries runs a task in a retry look. The retry counter will reset to
// zero if more time than `resetAfter` has passed since the last error. This is
// used to avoid creeping up on a retry limit over long periods of time.
func (eg *ErrGroup) GoWithRetries(
task string,
maxRetries int,
backoff BackoffFunction,
resetAfter time.Duration,
fn func(ctx context.Context) error,
) {
eg.grp.Go(func() error {
var tries int
// Count starting as a state change.
lastStateChange := time.Now()
for {
err := fn(eg.gCtx)
if err == nil {
return nil
}
// Bail immediately if the group has ben cancelled.
if eg.gCtx.Err() != nil {
return fmt.Errorf("%s: %w", task, eg.gCtx.Err())
}
// If it's been a long time since we last failed we
// don't want to creep up on a retry limit over the
// course of days, weeks, or months.
if time.Since(lastStateChange) > resetAfter {
tries = 0
}
lastStateChange = time.Now()
tries++
if maxRetries != 0 && tries > maxRetries {
return fmt.Errorf(
"%s: stopping after %d tries: %w",
task, tries, err)
}
wait := backoff(tries)
eg.logger.ErrorContext(eg.gCtx,
"task failure, restarting",
LogKeyName, task,
LogKeyError, err,
LogKeyAttempts, tries,
LogKeyDelay, slog.DurationValue(wait),
)
select {
case <-time.After(wait):
case <-eg.gCtx.Done():
return fmt.Errorf("%s: %w", task, eg.gCtx.Err())
}
}
})
}
func (eg *ErrGroup) Wait() error {
return eg.grp.Wait()
}
func StaticBackoff(wait time.Duration) BackoffFunction {
return func(_ int) time.Duration {
return wait
}
}