diff --git a/cmd/alertmanager/main.go b/cmd/alertmanager/main.go index 0a6c0727e6..d2b7b769b4 100644 --- a/cmd/alertmanager/main.go +++ b/cmd/alertmanager/main.go @@ -279,15 +279,14 @@ func run() int { var wg sync.WaitGroup wg.Add(1) - notificationLogOpts := []nflog.Option{ - nflog.WithRetention(*retention), - nflog.WithSnapshot(filepath.Join(*dataDir, "nflog")), - nflog.WithMaintenance(*maintenanceInterval, stopc, wg.Done, nil), - nflog.WithMetrics(prometheus.DefaultRegisterer), - nflog.WithLogger(log.With(logger, "component", "nflog")), + notificationLogOpts := nflog.Options{ + SnapshotFile: filepath.Join(*dataDir, "nflog"), + Retention: *retention, + Logger: log.With(logger, "component", "nflog"), + Metrics: prometheus.DefaultRegisterer, } - notificationLog, err := nflog.New(notificationLogOpts...) + notificationLog, err := nflog.New(notificationLogOpts) if err != nil { level.Error(logger).Log("err", err) return 1 @@ -297,6 +296,12 @@ func run() int { notificationLog.SetBroadcast(c.Broadcast) } + wg.Add(1) + go func() { + notificationLog.Maintenance(*maintenanceInterval, filepath.Join(*dataDir, "nflog"), stopc, nil) + wg.Done() + }() + marker := types.NewMarker(prometheus.DefaultRegisterer) silenceOpts := silence.Options{ diff --git a/nflog/nflog.go b/nflog/nflog.go index 54830bf9c8..af39894bb4 100644 --- a/nflog/nflog.go +++ b/nflog/nflog.go @@ -27,6 +27,7 @@ import ( "sync" "time" + "github.com/benbjohnson/clock" "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/matttproud/golang_protobuf_extensions/pbutil" @@ -73,23 +74,19 @@ func QGroupKey(gk string) QueryParam { } } +// Log holds the notification log state for alerts that have been notified. type Log struct { + clock clock.Clock + logger log.Logger metrics *metrics - now func() time.Time retention time.Duration - runInterval time.Duration - snapf string - stopc chan struct{} - done func() - // For now we only store the most recently added log entry. // The key is a serialized concatenation of group key and receiver. - mtx sync.RWMutex - st state - broadcast func([]byte) - maintenanceOverride MaintenanceFunc + mtx sync.RWMutex + st state + broadcast func([]byte) } // MaintenanceFunc represents the function to run as part of the periodic maintenance for the nflog. @@ -154,76 +151,6 @@ func newMetrics(r prometheus.Registerer) *metrics { return m } -// Option configures a new Log implementation. -type Option func(*Log) error - -// WithRetention sets the retention time for log st. -func WithRetention(d time.Duration) Option { - return func(l *Log) error { - l.retention = d - return nil - } -} - -// WithNow overwrites the function used to retrieve a timestamp -// for the current point in time. -// This is generally useful for injection during tests. -func WithNow(f func() time.Time) Option { - return func(l *Log) error { - l.now = f - return nil - } -} - -// WithLogger configures a logger for the notification log. -func WithLogger(logger log.Logger) Option { - return func(l *Log) error { - l.logger = logger - return nil - } -} - -// WithMetrics registers metrics for the notification log. -func WithMetrics(r prometheus.Registerer) Option { - return func(l *Log) error { - l.metrics = newMetrics(r) - return nil - } -} - -// WithMaintenance configures the Log to run garbage collection -// and snapshotting, if configured, at the given interval. -// -// The maintenance terminates on receiving from the provided channel. -// The done function is called after the final snapshot was completed. -// If not nil, the last argument is an override for what to do as part of the maintenance - for advanced usage. -func WithMaintenance(d time.Duration, stopc chan struct{}, done func(), maintenanceOverride MaintenanceFunc) Option { - return func(l *Log) error { - if d == 0 { - return errors.New("maintenance interval must not be 0") - } - l.runInterval = d - l.stopc = stopc - l.done = done - l.maintenanceOverride = maintenanceOverride - return nil - } -} - -// WithSnapshot configures the log to be initialized from a given snapshot file. -// If maintenance is configured, a snapshot will be saved periodically and on -// shutdown as well. -func WithSnapshot(sf string) Option { - return func(l *Log) error { - l.snapf = sf - return nil - } -} - -func utcNow() time.Time { - return time.Now().UTC() -} - type state map[string]*pb.MeshEntry func (s state) clone() state { @@ -289,48 +216,80 @@ func marshalMeshEntry(e *pb.MeshEntry) ([]byte, error) { return buf.Bytes(), nil } +// Options configures a new Log implementation. +type Options struct { + SnapshotReader io.Reader + SnapshotFile string + + Retention time.Duration + + Logger log.Logger + Metrics prometheus.Registerer +} + +func (o *Options) validate() error { + if o.SnapshotFile != "" && o.SnapshotReader != nil { + return errors.New("only one of SnapshotFile and SnapshotReader must be set") + } + + return nil +} + // New creates a new notification log based on the provided options. // The snapshot is loaded into the Log if it is set. -func New(opts ...Option) (*Log, error) { +func New(o Options) (*Log, error) { + if err := o.validate(); err != nil { + return nil, err + } + l := &Log{ + clock: clock.New(), + retention: o.Retention, logger: log.NewNopLogger(), - now: utcNow, st: state{}, broadcast: func([]byte) {}, - } - for _, o := range opts { - if err := o(l); err != nil { - return nil, err - } - } - if l.metrics == nil { - l.metrics = newMetrics(nil) + metrics: newMetrics(o.Metrics), } - if l.snapf != "" { - if f, err := os.Open(l.snapf); !os.IsNotExist(err) { - if err != nil { - return l, err - } - defer f.Close() + if o.Logger != nil { + l.logger = o.Logger + } - if err := l.loadSnapshot(f); err != nil { - return l, err + if o.SnapshotFile != "" { + if r, err := os.Open(o.SnapshotFile); err != nil { + if !os.IsNotExist(err) { + return nil, err } + level.Debug(l.logger).Log("msg", "notification log snapshot file doesn't exist", "err", err) + } else { + o.SnapshotReader = r + defer r.Close() } } - go l.run() + if o.SnapshotReader != nil { + if err := l.loadSnapshot(o.SnapshotReader); err != nil { + return l, err + } + } return l, nil } -// run periodic background maintenance. -func (l *Log) run() { - if l.runInterval == 0 || l.stopc == nil { +func (l *Log) now() time.Time { + return l.clock.Now() +} + +// Maintenance garbage collects the notification log state at the given interval. If the snapshot +// file is set, a snapshot is written to it afterwards. +// Terminates on receiving from stopc. +// If not nil, the last argument is an override for what to do as part of the maintenance - for advanced usage. +func (l *Log) Maintenance(interval time.Duration, snapf string, stopc <-chan struct{}, override MaintenanceFunc) { + if interval == 0 || stopc == nil { + level.Error(l.logger).Log("msg", "interval or stop signal are missing - not running maintenance") return } - t := time.NewTicker(l.runInterval) + t := l.clock.Ticker(interval) defer t.Stop() var doMaintenance MaintenanceFunc @@ -339,29 +298,26 @@ func (l *Log) run() { if _, err := l.GC(); err != nil { return size, err } - if l.snapf == "" { + if snapf == "" { return size, nil } - f, err := openReplace(l.snapf) + f, err := openReplace(snapf) if err != nil { return size, err } if size, err = l.Snapshot(f); err != nil { + f.Close() return size, err } return size, f.Close() } - if l.maintenanceOverride != nil { - doMaintenance = l.maintenanceOverride - } - - if l.done != nil { - defer l.done() + if override != nil { + doMaintenance = override } runMaintenance := func(do func() (int64, error)) error { - start := l.now() + start := l.now().UTC() level.Debug(l.logger).Log("msg", "Running maintenance") size, err := do() level.Debug(l.logger).Log("msg", "Maintenance done", "duration", l.now().Sub(start), "size", size) @@ -372,7 +328,7 @@ func (l *Log) run() { Loop: for { select { - case <-l.stopc: + case <-stopc: break Loop case <-t.C: if err := runMaintenance(doMaintenance); err != nil { @@ -380,8 +336,9 @@ Loop: } } } + // No need to run final maintenance if we don't want to snapshot. - if l.snapf == "" { + if snapf == "" { return } if err := runMaintenance(doMaintenance); err != nil { diff --git a/nflog/nflog_test.go b/nflog/nflog_test.go index 8bd98de6d2..d55ee761ed 100644 --- a/nflog/nflog_test.go +++ b/nflog/nflog_test.go @@ -18,18 +18,21 @@ import ( "io" "os" "path/filepath" + "runtime" "sync" "testing" "time" pb "github.com/prometheus/alertmanager/nflog/nflogpb" + "github.com/benbjohnson/clock" "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/require" ) func TestLogGC(t *testing.T) { - now := utcNow() + mockClock := clock.NewMock() + now := mockClock.Now() // We only care about key names and expiration timestamps. newEntry := func(ts time.Time) *pb.MeshEntry { return &pb.MeshEntry{ @@ -43,7 +46,7 @@ func TestLogGC(t *testing.T) { "a2": newEntry(now.Add(time.Second)), "a3": newEntry(now.Add(-time.Second)), }, - now: func() time.Time { return now }, + clock: mockClock, metrics: newMetrics(nil), } n, err := l.GC() @@ -58,7 +61,8 @@ func TestLogGC(t *testing.T) { func TestLogSnapshot(t *testing.T) { // Check whether storing and loading the snapshot is symmetric. - now := utcNow() + mockClock := clock.NewMock() + now := mockClock.Now().UTC() cases := []struct { entries []*pb.MeshEntry @@ -133,24 +137,31 @@ func TestWithMaintenance_SupportsCustomCallback(t *testing.T) { stopc := make(chan struct{}) var mtx sync.Mutex var mc int - l, err := New(WithMetrics(prometheus.NewPedanticRegistry()), WithSnapshot(f.Name()), WithMaintenance(100*time.Millisecond, stopc, nil, func() (int64, error) { + opts := Options{ + Metrics: prometheus.NewPedanticRegistry(), + SnapshotFile: f.Name(), + } + + l, err := New(opts) + mockClock := clock.NewMock() + l.clock = mockClock + require.NoError(t, err) + + go l.Maintenance(100*time.Millisecond, f.Name(), stopc, func() (int64, error) { mtx.Lock() mc++ mtx.Unlock() return 0, nil - })) - require.NoError(t, err) + }) + runtime.Gosched() // ensure that the ticker is running. - go l.run() - time.Sleep(200 * time.Millisecond) + mockClock.Add(200 * time.Millisecond) close(stopc) - require.Eventually(t, func() bool { - mtx.Lock() - defer mtx.Unlock() - return mc >= 2 - }, 500*time.Millisecond, 100*time.Millisecond) + mtx.Lock() + defer mtx.Unlock() + require.Equal(t, 2, mc) } func TestReplaceFile(t *testing.T) { @@ -182,7 +193,8 @@ func TestReplaceFile(t *testing.T) { } func TestStateMerge(t *testing.T) { - now := utcNow() + mockClock := clock.NewMock() + now := mockClock.Now() // We only care about key names and timestamps for the // merging logic. @@ -243,7 +255,8 @@ func TestStateMerge(t *testing.T) { func TestStateDataCoding(t *testing.T) { // Check whether encoding and decoding the data is symmetric. - now := utcNow() + mockClock := clock.NewMock() + now := mockClock.Now().UTC() cases := []struct { entries []*pb.MeshEntry @@ -299,7 +312,8 @@ func TestStateDataCoding(t *testing.T) { } func TestQuery(t *testing.T) { - nl, err := New(WithRetention(time.Second)) + opts := Options{Retention: time.Second} + nl, err := New(opts) if err != nil { require.NoError(t, err, "constructing nflog failed") } diff --git a/silence/silence.go b/silence/silence.go index d8837d28af..21be9438b7 100644 --- a/silence/silence.go +++ b/silence/silence.go @@ -319,16 +319,6 @@ func New(o Options) (*Silences, error) { if err := o.validate(); err != nil { return nil, err } - if o.SnapshotFile != "" { - if r, err := os.Open(o.SnapshotFile); err != nil { - if !os.IsNotExist(err) { - return nil, err - } - } else { - o.SnapshotReader = r - defer r.Close() - } - } s := &Silences{ clock: clock.New(), mc: matcherCache{}, @@ -342,6 +332,19 @@ func New(o Options) (*Silences, error) { if o.Logger != nil { s.logger = o.Logger } + + if o.SnapshotFile != "" { + if r, err := os.Open(o.SnapshotFile); err != nil { + if !os.IsNotExist(err) { + return nil, err + } + level.Debug(s.logger).Log("msg", "silences snapshot file doesn't exist", "err", err) + } else { + o.SnapshotReader = r + defer r.Close() + } + } + if o.SnapshotReader != nil { if err := s.loadSnapshot(o.SnapshotReader); err != nil { return s, err @@ -359,6 +362,10 @@ func (s *Silences) nowUTC() time.Time { // Terminates on receiving from stopc. // If not nil, the last argument is an override for what to do as part of the maintenance - for advanced usage. func (s *Silences) Maintenance(interval time.Duration, snapf string, stopc <-chan struct{}, override MaintenanceFunc) { + if interval == 0 || stopc == nil { + level.Error(s.logger).Log("msg", "interval or stop signal are missing - not running maintenance") + return + } t := s.clock.Ticker(interval) defer t.Stop() @@ -377,6 +384,7 @@ func (s *Silences) Maintenance(interval time.Duration, snapf string, stopc <-cha return size, err } if size, err = s.Snapshot(f); err != nil { + f.Close() return size, err } return size, f.Close() @@ -406,6 +414,7 @@ Loop: } } } + // No need for final maintenance if we don't want to snapshot. if snapf == "" { return