diff --git a/Gopkg.lock b/Gopkg.lock index 809f2f4..b10010a 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -1,6 +1,14 @@ # This file is autogenerated, do not edit; changes may be undone by the next 'dep ensure'. +[[projects]] + digest = "1:0d3deb8a6da8ffba5635d6fb1d2144662200def6c9d82a35a6d05d6c2d4a48f9" + name = "github.com/beorn7/perks" + packages = ["quantile"] + pruneopts = "" + revision = "4b2b341e8d7715fae06375aa633dbb6e91b3fb46" + version = "v1.0.0" + [[projects]] digest = "1:8722889ad027febfced94665914d1e7be8f1b703d31f2ef9461c59e4d40fe974" name = "github.com/certifi/gocertifi" @@ -33,6 +41,14 @@ pruneopts = "" revision = "a9457d81ec91fa6d538567f14c6138e9ce5a37fb" +[[projects]] + digest = "1:529d738b7976c3848cae5cf3a8036440166835e389c1f617af701eeb12a0518d" + name = "github.com/golang/protobuf" + packages = ["proto"] + pruneopts = "" + revision = "b5d812f8a3706043e23a9cd5babf2e5423744d30" + version = "v1.3.1" + [[projects]] branch = "strict" digest = "1:8928810213f9690c5e1cbd19b004a8c97ecc457f0a8c1da362bd96bdc3a5ab8c" @@ -42,6 +58,14 @@ revision = "b648cc9a908c22490de781dbe600459edd0ac533" source = "github.com/krallin/cronexpr" +[[projects]] + digest = "1:63722a4b1e1717be7b98fc686e0b30d5e7f734b9e93d7dee86293b6deab7ea28" + name = "github.com/matttproud/golang_protobuf_extensions" + packages = ["pbutil"] + pruneopts = "" + revision = "c12348ce28de40eed0136aa2b644d0ee0650e56c" + version = "v1.0.1" + [[projects]] digest = "1:7365acd48986e205ccb8652cc746f09c8b7876030d53710ea6ef7d0bd0dcd7ca" name = "github.com/pkg/errors" @@ -58,6 +82,49 @@ revision = "792786c7400a136282c1664665ae0a8db921c6c2" version = "v1.0.0" +[[projects]] + branch = "master" + digest = "1:dafd1471a472c8a3905036a89f5711b2e9f8e639489d649d9f497e0222971b39" + name = "github.com/prometheus/client_golang" + packages = [ + "prometheus", + "prometheus/internal", + "prometheus/promhttp", + ] + pruneopts = "" + revision = "b46e6ec51bb1e8aca796f58a8462a8ff125c3ddd" + +[[projects]] + branch = "master" + digest = "1:cd67319ee7536399990c4b00fae07c3413035a53193c644549a676091507cadc" + name = "github.com/prometheus/client_model" + packages = ["go"] + pruneopts = "" + revision = "fd36f4220a901265f90734c3183c5f0c91daa0b8" + +[[projects]] + digest = "1:acd87a73c6a6f2d61ad04822d68b233a5c12f5b72aef3db0985f90680e9ae8f0" + name = "github.com/prometheus/common" + packages = [ + "expfmt", + "internal/bitbucket.org/ww/goautoneg", + "model", + ] + pruneopts = "" + revision = "1ba88736f028e37bc17328369e94a537ae9e0234" + version = "v0.4.0" + +[[projects]] + branch = "master" + digest = "1:5ee701aab36918e3c5db39ee3b42b85a7c8ae0ab60ec137457ea951e9b5e34a6" + name = "github.com/prometheus/procfs" + packages = [ + ".", + "internal/fs", + ] + pruneopts = "" + revision = "be78308d8a4ffcb4610e8d21f0e15524ea0d646f" + [[projects]] digest = "1:3fcbf733a8d810a21265a7f2fe08a3353db2407da052b233f8b204b5afc03d9b" name = "github.com/sirupsen/logrus" @@ -99,6 +166,8 @@ input-imports = [ "github.com/evalphobia/logrus_sentry", "github.com/gorhill/cronexpr", + "github.com/prometheus/client_golang/prometheus", + "github.com/prometheus/client_golang/prometheus/promhttp", "github.com/sirupsen/logrus", "github.com/stretchr/testify/assert", ] diff --git a/Gopkg.toml b/Gopkg.toml index d40c76b..68a21df 100644 --- a/Gopkg.toml +++ b/Gopkg.toml @@ -33,3 +33,7 @@ [[constraint]] name = "github.com/stretchr/testify" version = "~1.1.4" + +[[constraint]] + branch = "master" + name = "github.com/prometheus/client_golang" diff --git a/cron/cron.go b/cron/cron.go index 324bc31..02514b8 100644 --- a/cron/cron.go +++ b/cron/cron.go @@ -4,8 +4,6 @@ import ( "bufio" "context" "fmt" - "github.com/aptible/supercronic/crontab" - "github.com/sirupsen/logrus" "io" "os" "os/exec" @@ -13,6 +11,11 @@ import ( "sync" "syscall" "time" + + "github.com/aptible/supercronic/crontab" + "github.com/aptible/supercronic/prometheus_metrics" + "github.com/prometheus/client_golang/prometheus" + "github.com/sirupsen/logrus" ) var ( @@ -106,11 +109,11 @@ func runJob(cronCtx *crontab.Context, command string, jobLogger *logrus.Entry) e return nil } -func monitorJob(ctx context.Context, expression crontab.Expression, t0 time.Time, jobLogger *logrus.Entry, overlapping bool) { +func monitorJob(ctx context.Context, job *crontab.Job, t0 time.Time, jobLogger *logrus.Entry, overlapping bool, promMetrics *prometheus_metrics.PrometheusMetrics) { t := t0 for { - t = expression.Next(t) + t = job.Expression.Next(t) select { case <-time.After(time.Until(t)): @@ -120,6 +123,8 @@ func monitorJob(ctx context.Context, expression crontab.Expression, t0 time.Time } jobLogger.Warnf("%s: job is still running since %s (%s elapsed)", m, t0, t.Sub(t0)) + + promMetrics.CronsDeadlineExceededCounter.With(jobPromLabels(job)).Inc() case <-ctx.Done(): return } @@ -182,21 +187,47 @@ func startFunc(wg *sync.WaitGroup, exitCtx context.Context, logger *logrus.Entry }() } -func StartJob(wg *sync.WaitGroup, cronCtx *crontab.Context, job *crontab.Job, exitCtx context.Context, cronLogger *logrus.Entry, overlapping bool) { +func StartJob(wg *sync.WaitGroup, cronCtx *crontab.Context, job *crontab.Job, exitCtx context.Context, cronLogger *logrus.Entry, overlapping bool, promMetrics *prometheus_metrics.PrometheusMetrics) { runThisJob := func(t0 time.Time, jobLogger *logrus.Entry) { + promMetrics.CronsCurrentlyRunningGauge.With(jobPromLabels(job)).Inc() + + defer func() { + promMetrics.CronsCurrentlyRunningGauge.With(jobPromLabels(job)).Dec() + }() + monitorCtx, cancelMonitor := context.WithCancel(context.Background()) defer cancelMonitor() - go monitorJob(monitorCtx, job.Expression, t0, jobLogger, overlapping) + go monitorJob(monitorCtx, job, t0, jobLogger, overlapping, promMetrics) + + timer := prometheus.NewTimer(prometheus.ObserverFunc(func(v float64) { + promMetrics.CronsExecutionTimeHistogram.With(jobPromLabels(job)).Observe(v) + })) + + defer timer.ObserveDuration() err := runJob(cronCtx, job.Command, jobLogger) + promMetrics.CronsExecCounter.With(jobPromLabels(job)).Inc() + if err == nil { jobLogger.Info("job succeeded") + + promMetrics.CronsSuccessCounter.With(jobPromLabels(job)).Inc() } else { jobLogger.Error(err) + + promMetrics.CronsFailCounter.With(jobPromLabels(job)).Inc() } } startFunc(wg, exitCtx, cronLogger, overlapping, job.Expression, runThisJob) } + +func jobPromLabels(job *crontab.Job) prometheus.Labels { + return prometheus.Labels{ + "position": fmt.Sprintf("%d", job.Position), + "command": job.Command, + "schedule": job.Schedule, + } +} diff --git a/cron/cron_test.go b/cron/cron_test.go index 7abaddd..9256514 100644 --- a/cron/cron_test.go +++ b/cron/cron_test.go @@ -14,10 +14,12 @@ import ( "github.com/stretchr/testify/assert" "github.com/aptible/supercronic/crontab" + "github.com/aptible/supercronic/prometheus_metrics" ) var ( TEST_CHANNEL_BUFFER_SIZE = 100 + PROM_METRICS = prometheus_metrics.NewPrometheusMetrics() ) type testHook struct { @@ -195,7 +197,7 @@ func TestStartJobExitsOnRequest(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) cancel() - StartJob(&wg, &basicContext, &job, ctx, logger, false) + StartJob(&wg, &basicContext, &job, ctx, logger, false, &PROM_METRICS) wg.Wait() } @@ -215,7 +217,7 @@ func TestStartJobRunsJob(t *testing.T) { logger, channel := newTestLogger() - StartJob(&wg, &basicContext, &job, ctx, logger, false) + StartJob(&wg, &basicContext, &job, ctx, logger, false, &PROM_METRICS) select { case entry := <-channel: diff --git a/main.go b/main.go index 377a146..ff29cb5 100644 --- a/main.go +++ b/main.go @@ -4,16 +4,18 @@ import ( "context" "flag" "fmt" - "github.com/aptible/supercronic/cron" - "github.com/aptible/supercronic/crontab" - "github.com/aptible/supercronic/log/hook" - "github.com/evalphobia/logrus_sentry" - "github.com/sirupsen/logrus" "os" "os/signal" "sync" "syscall" "time" + + "github.com/aptible/supercronic/cron" + "github.com/aptible/supercronic/crontab" + "github.com/aptible/supercronic/log/hook" + "github.com/aptible/supercronic/prometheus_metrics" + "github.com/evalphobia/logrus_sentry" + "github.com/sirupsen/logrus" ) var Usage = func() { @@ -25,6 +27,7 @@ func main() { debug := flag.Bool("debug", false, "enable debug logging") json := flag.Bool("json", false, "enable JSON logging") test := flag.Bool("test", false, "test crontab (does not run jobs)") + prometheusListen := flag.String("prometheus-listen-address", "", "give a valid ip:port address to expose Prometheus metrics at /metrics") splitLogs := flag.Bool("split-logs", false, "split log output into stdout/stderr") sentry := flag.String("sentry-dsn", "", "enable Sentry error logging, using provided DSN") sentryAlias := flag.String("sentryDsn", "", "alias for sentry-dsn") @@ -50,7 +53,6 @@ func main() { } else { logrus.SetFormatter(&logrus.TextFormatter{FullTimestamp: true}) } - if *splitLogs { hook.RegisterSplitLogger( logrus.StandardLogger(), @@ -87,7 +89,24 @@ func main() { } } + promMetrics := prometheus_metrics.NewPrometheusMetrics() + + if *prometheusListen != "" { + promServerShutdownClosure, err := prometheus_metrics.InitHTTPServer(*prometheusListen, context.Background()) + if err != nil { + logrus.Fatalf("prometheus http startup failed: %s", err.Error()) + } + + defer func() { + if err := promServerShutdownClosure(); err != nil { + logrus.Fatalf("prometheus http shutdown failed: %s", err.Error()) + } + }() + } + for true { + promMetrics.Reset() + logrus.Infof("read crontab: %s", crontabFileName) tab, err := readCrontabAtPath(crontabFileName) @@ -112,7 +131,7 @@ func main() { "job.position": job.Position, }) - cron.StartJob(&wg, tab.Context, job, exitCtx, cronLogger, *overlapping) + cron.StartJob(&wg, tab.Context, job, exitCtx, cronLogger, *overlapping, &promMetrics) } termChan := make(chan os.Signal, 1) diff --git a/prometheus_metrics/prommetrics.go b/prometheus_metrics/prommetrics.go new file mode 100644 index 0000000..7ee91ae --- /dev/null +++ b/prometheus_metrics/prommetrics.go @@ -0,0 +1,133 @@ +package prometheus_metrics + +import ( + "context" + "net" + "net/http" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promhttp" + "github.com/sirupsen/logrus" +) + +const ( + namespace = "supercronic" +) + +func genMetricName(name string) string { + return prometheus.BuildFQName(namespace, "", name) +} + +type PrometheusMetrics struct { + CronsCurrentlyRunningGauge prometheus.GaugeVec + CronsExecCounter prometheus.CounterVec + CronsSuccessCounter prometheus.CounterVec + CronsFailCounter prometheus.CounterVec + CronsDeadlineExceededCounter prometheus.CounterVec + CronsExecutionTimeHistogram prometheus.HistogramVec +} + +func NewPrometheusMetrics() PrometheusMetrics { + cronLabels := []string{"command", "position", "schedule"} + + pm := PrometheusMetrics{} + + pm.CronsCurrentlyRunningGauge = *prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: genMetricName("currently_running"), + Help: "count of currently running cron executions", + }, + cronLabels, + ) + prometheus.MustRegister(pm.CronsCurrentlyRunningGauge) + + pm.CronsExecCounter = *prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: genMetricName("executions"), + Help: "count of cron executions", + }, + cronLabels, + ) + prometheus.MustRegister(pm.CronsExecCounter) + + pm.CronsSuccessCounter = *prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: genMetricName("successful_executions"), + Help: "count of successul cron executions", + }, + cronLabels, + ) + prometheus.MustRegister(pm.CronsSuccessCounter) + + pm.CronsFailCounter = *prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: genMetricName("failed_executions"), + Help: "count of failed cron executions", + }, + cronLabels, + ) + prometheus.MustRegister(pm.CronsFailCounter) + + pm.CronsDeadlineExceededCounter = *prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: genMetricName("deadline_exceeded"), + Help: "count of exceeded deadline cron executions", + }, + cronLabels, + ) + prometheus.MustRegister(pm.CronsDeadlineExceededCounter) + + pm.CronsExecutionTimeHistogram = *prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Name: genMetricName("cron_execution_time_seconds"), + Help: "duration of the cron executions", + Buckets: []float64{10.0, 30.0, 60.0, 120.0, 300.0, 600.0, 1800.0, 3600.0}, + }, + cronLabels, + ) + prometheus.MustRegister(pm.CronsExecutionTimeHistogram) + + return pm +} + +func (p *PrometheusMetrics) Reset() { + p.CronsCurrentlyRunningGauge.Reset() + p.CronsExecCounter.Reset() + p.CronsSuccessCounter.Reset() + p.CronsFailCounter.Reset() + p.CronsDeadlineExceededCounter.Reset() + p.CronsExecutionTimeHistogram.Reset() +} + +func InitHTTPServer(listenAddr string, shutdownContext context.Context) (func() error, error) { + promSrv := &http.Server{} + + http.Handle("/metrics", promhttp.Handler()) + + http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { + w.Write([]byte(` + Supercronic + +

Supercronic

+

Metrics

+ + `)) + }) + + shutdownClosure := func() error { + return promSrv.Shutdown(shutdownContext) + } + + listener, err := net.Listen("tcp", listenAddr) + if err != nil { + return shutdownClosure, err + } + + go func() { + if err := promSrv.Serve(listener); err != nil { + logrus.Fatalf("prometheus http serve failed: %s", err.Error()) + } + }() + + return shutdownClosure, nil +}