-
Notifications
You must be signed in to change notification settings - Fork 2.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Refactor nflog configuration options to make it similar to Silences. #3220
Changes from 5 commits
d17244d
5e9a323
f770c7d
bf303c1
95450ca
1555995
1a4f11f
9e7fdf9
01daf19
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -27,6 +27,7 @@ import ( | |
"sync" | ||
"time" | ||
|
||
"github.com/benbjohnson/clock" | ||
"github.com/go-kit/log" | ||
"github.com/go-kit/log/level" | ||
"github.com/matttproud/golang_protobuf_extensions/pbutil" | ||
|
@@ -73,23 +74,19 @@ func QGroupKey(gk string) QueryParam { | |
} | ||
} | ||
|
||
// Log holds the notification log state for alerts that have been notified. | ||
type Log struct { | ||
clock clock.Clock | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Most, if not all, of the diff in the tests, has to do with the fact that we now mock the clock instead of injecting. Technically, this can be done in separate PR, but this PR seems simple enough to follow so I decided to include it here. |
||
|
||
logger log.Logger | ||
metrics *metrics | ||
now func() time.Time | ||
retention time.Duration | ||
|
||
runInterval time.Duration | ||
snapf string | ||
stopc chan struct{} | ||
done func() | ||
|
||
// For now we only store the most recently added log entry. | ||
// The key is a serialized concatenation of group key and receiver. | ||
mtx sync.RWMutex | ||
st state | ||
broadcast func([]byte) | ||
maintenanceOverride MaintenanceFunc | ||
mtx sync.RWMutex | ||
st state | ||
broadcast func([]byte) | ||
} | ||
|
||
// MaintenanceFunc represents the function to run as part of the periodic maintenance for the nflog. | ||
|
@@ -154,76 +151,6 @@ func newMetrics(r prometheus.Registerer) *metrics { | |
return m | ||
} | ||
|
||
// Option configures a new Log implementation. | ||
type Option func(*Log) error | ||
|
||
// WithRetention sets the retention time for log st. | ||
func WithRetention(d time.Duration) Option { | ||
return func(l *Log) error { | ||
l.retention = d | ||
return nil | ||
} | ||
} | ||
|
||
// WithNow overwrites the function used to retrieve a timestamp | ||
// for the current point in time. | ||
// This is generally useful for injection during tests. | ||
func WithNow(f func() time.Time) Option { | ||
return func(l *Log) error { | ||
l.now = f | ||
return nil | ||
} | ||
} | ||
Comment on lines
-171
to
-176
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. All of this plumbing and the |
||
|
||
// WithLogger configures a logger for the notification log. | ||
func WithLogger(logger log.Logger) Option { | ||
return func(l *Log) error { | ||
l.logger = logger | ||
return nil | ||
} | ||
} | ||
|
||
// WithMetrics registers metrics for the notification log. | ||
func WithMetrics(r prometheus.Registerer) Option { | ||
return func(l *Log) error { | ||
l.metrics = newMetrics(r) | ||
return nil | ||
} | ||
} | ||
|
||
// WithMaintenance configures the Log to run garbage collection | ||
// and snapshotting, if configured, at the given interval. | ||
// | ||
// The maintenance terminates on receiving from the provided channel. | ||
// The done function is called after the final snapshot was completed. | ||
// If not nil, the last argument is an override for what to do as part of the maintenance - for advanced usage. | ||
func WithMaintenance(d time.Duration, stopc chan struct{}, done func(), maintenanceOverride MaintenanceFunc) Option { | ||
return func(l *Log) error { | ||
if d == 0 { | ||
return errors.New("maintenance interval must not be 0") | ||
} | ||
l.runInterval = d | ||
l.stopc = stopc | ||
l.done = done | ||
l.maintenanceOverride = maintenanceOverride | ||
return nil | ||
} | ||
} | ||
|
||
// WithSnapshot configures the log to be initialized from a given snapshot file. | ||
// If maintenance is configured, a snapshot will be saved periodically and on | ||
// shutdown as well. | ||
func WithSnapshot(sf string) Option { | ||
return func(l *Log) error { | ||
l.snapf = sf | ||
return nil | ||
} | ||
} | ||
|
||
func utcNow() time.Time { | ||
return time.Now().UTC() | ||
} | ||
|
||
type state map[string]*pb.MeshEntry | ||
|
||
func (s state) clone() state { | ||
|
@@ -289,48 +216,79 @@ func marshalMeshEntry(e *pb.MeshEntry) ([]byte, error) { | |
return buf.Bytes(), nil | ||
} | ||
|
||
// Options configures a new Log implementation. | ||
type Options struct { | ||
SnapshotReader io.Reader | ||
SnapshotFile string | ||
Comment on lines
+221
to
+222
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's add a doc-comment to each, saying only one of these fields should be set. That way, callers can see this in their editors directly - just makes it a little more convenient to use. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The top-level comment reads as:
In my editor it shows as: Which indicates:
Are you thinking of something different? Happy to do it, but unsure of what this means for a different editor 🤔 |
||
|
||
Retention time.Duration | ||
|
||
Logger log.Logger | ||
Metrics prometheus.Registerer | ||
} | ||
|
||
func (o *Options) validate() error { | ||
if o.SnapshotFile != "" && o.SnapshotReader != nil { | ||
return errors.New("only one of SnapshotFile and SnapshotReader must be set") | ||
} | ||
|
||
return nil | ||
} | ||
|
||
// New creates a new notification log based on the provided options. | ||
// The snapshot is loaded into the Log if it is set. | ||
func New(opts ...Option) (*Log, error) { | ||
func New(o Options) (*Log, error) { | ||
if err := o.validate(); err != nil { | ||
return nil, err | ||
} | ||
|
||
if o.SnapshotFile != "" { | ||
if r, err := os.Open(o.SnapshotFile); err != nil { | ||
if !os.IsNotExist(err) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we maybe log at info-level that a previous log to load up didn't exist, so it'll create one? Or, maybe the inverse path, something like "Loading a previous snapshot..." At a minimum, we should log it at debug level, since there are sort of two independent paths here. If for some reason a file can't be accessed, things could end up in a weird state, and there is no evidence trail left behind that it happened. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. A debug log sounds good to me. Alertmanager would hit this code path when it starts from scratch. |
||
return nil, err | ||
} | ||
} else { | ||
o.SnapshotReader = r | ||
defer r.Close() | ||
} | ||
} | ||
|
||
l := &Log{ | ||
clock: clock.New(), | ||
retention: o.Retention, | ||
logger: log.NewNopLogger(), | ||
now: utcNow, | ||
st: state{}, | ||
broadcast: func([]byte) {}, | ||
} | ||
for _, o := range opts { | ||
if err := o(l); err != nil { | ||
return nil, err | ||
} | ||
} | ||
if l.metrics == nil { | ||
l.metrics = newMetrics(nil) | ||
metrics: newMetrics(o.Metrics), | ||
} | ||
|
||
if l.snapf != "" { | ||
if f, err := os.Open(l.snapf); !os.IsNotExist(err) { | ||
if err != nil { | ||
return l, err | ||
} | ||
defer f.Close() | ||
if o.Logger != nil { | ||
l.logger = o.Logger | ||
} | ||
|
||
if err := l.loadSnapshot(f); err != nil { | ||
return l, err | ||
} | ||
if o.SnapshotReader != nil { | ||
if err := l.loadSnapshot(o.SnapshotReader); err != nil { | ||
return l, err | ||
} | ||
} | ||
|
||
go l.run() | ||
|
||
return l, nil | ||
} | ||
|
||
// run periodic background maintenance. | ||
func (l *Log) run() { | ||
if l.runInterval == 0 || l.stopc == nil { | ||
func (l *Log) now() time.Time { | ||
return l.clock.Now() | ||
} | ||
|
||
// Maintenance garbage collects the notification log state at the given interval. If the snapshot | ||
// file is set, a snapshot is written to it afterwards. | ||
// Terminates on receiving from stopc. | ||
// If not nil, the last argument is an override for what to do as part of the maintenance - for advanced usage. | ||
func (l *Log) Maintenance(interval time.Duration, snapf string, stopc <-chan struct{}, override MaintenanceFunc) { | ||
if interval == 0 || stopc == nil { | ||
level.Error(l.logger).Log("msg", "interval or stop signal are missing - not running maintenance") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is new - it kind of annoyed me that we'd return from this function on silences when you misconfigured the maintenance but fail silently. I don't think we should do this -- ideally, return an error but I settled with a log line to keep the diff sane. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Works for me. I'd suggest that we check the validity of the maintenance interval in cmd/alertmanager/main.go in a later PR. As of today, a negative maintenance interval triggers a panic... |
||
return | ||
} | ||
t := time.NewTicker(l.runInterval) | ||
t := l.clock.Ticker(interval) | ||
defer t.Stop() | ||
|
||
var doMaintenance MaintenanceFunc | ||
|
@@ -339,29 +297,26 @@ func (l *Log) run() { | |
if _, err := l.GC(); err != nil { | ||
return size, err | ||
} | ||
if l.snapf == "" { | ||
if snapf == "" { | ||
return size, nil | ||
} | ||
f, err := openReplace(l.snapf) | ||
f, err := openReplace(snapf) | ||
if err != nil { | ||
return size, err | ||
} | ||
if size, err = l.Snapshot(f); err != nil { | ||
f.Close() | ||
return size, err | ||
} | ||
return size, f.Close() | ||
} | ||
|
||
if l.maintenanceOverride != nil { | ||
doMaintenance = l.maintenanceOverride | ||
} | ||
|
||
if l.done != nil { | ||
defer l.done() | ||
if override != nil { | ||
doMaintenance = override | ||
} | ||
|
||
runMaintenance := func(do func() (int64, error)) error { | ||
start := l.now() | ||
start := l.now().UTC() | ||
level.Debug(l.logger).Log("msg", "Running maintenance") | ||
size, err := do() | ||
level.Debug(l.logger).Log("msg", "Maintenance done", "duration", l.now().Sub(start), "size", size) | ||
|
@@ -372,16 +327,17 @@ func (l *Log) run() { | |
Loop: | ||
for { | ||
select { | ||
case <-l.stopc: | ||
case <-stopc: | ||
break Loop | ||
case <-t.C: | ||
if err := runMaintenance(doMaintenance); err != nil { | ||
level.Error(l.logger).Log("msg", "Running maintenance failed", "err", err) | ||
} | ||
} | ||
} | ||
|
||
// No need to run final maintenance if we don't want to snapshot. | ||
if l.snapf == "" { | ||
if snapf == "" { | ||
return | ||
} | ||
if err := runMaintenance(doMaintenance); err != nil { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a nice change 👍