From 7207b4a3b0acc6495e3c18324ef7a2325579a602 Mon Sep 17 00:00:00 2001 From: Joe Elliott Date: Fri, 3 Apr 2020 09:18:35 -0400 Subject: [PATCH 1/7] First pass loki canary pause Signed-off-by: Joe Elliott --- cmd/loki-canary/main.go | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/cmd/loki-canary/main.go b/cmd/loki-canary/main.go index e166c91357224..e19ea5b85c8fc 100644 --- a/cmd/loki-canary/main.go +++ b/cmd/loki-canary/main.go @@ -56,6 +56,9 @@ func main() { r := reader.NewReader(os.Stderr, receivedChan, *tls, *addr, *user, *pass, *lName, *lVal) c := comparator.NewComparator(os.Stderr, *wait, *pruneInterval, *buckets, sentChan, receivedChan, r, true) + http.HandleFunc("/suspend", func(_ http.ResponseWriter, _ *http.Request) { + stopCanary(w, r, c) + }) http.Handle("/metrics", promhttp.Handler()) go func() { err := http.ListenAndServe(":"+strconv.Itoa(*port), nil) @@ -72,17 +75,16 @@ func main() { for { select { case <-interrupt: - _, _ = fmt.Fprintf(os.Stderr, "suspending indefinitely\n") - w.Stop() - r.Stop() - c.Stop() case <-terminate: _, _ = fmt.Fprintf(os.Stderr, "shutting down\n") - w.Stop() - r.Stop() - c.Stop() + stopCanary(w, r, c) return } } +} +func stopCanary(w *writer.Writer, r *reader.Reader, c *comparator.Comparator) { + w.Stop() + r.Stop() + c.Stop() } From f0baa72dc65fb2f29f8b35ee4ef02b278d51d7d0 Mon Sep 17 00:00:00 2001 From: Joe Elliott Date: Fri, 3 Apr 2020 09:19:18 -0400 Subject: [PATCH 2/7] Write suspending msg to stderr Signed-off-by: Joe Elliott --- cmd/loki-canary/main.go | 1 + 1 file changed, 1 insertion(+) diff --git a/cmd/loki-canary/main.go b/cmd/loki-canary/main.go index e19ea5b85c8fc..7cd910ade3b02 100644 --- a/cmd/loki-canary/main.go +++ b/cmd/loki-canary/main.go @@ -57,6 +57,7 @@ func main() { c := comparator.NewComparator(os.Stderr, *wait, *pruneInterval, *buckets, sentChan, receivedChan, r, true) http.HandleFunc("/suspend", func(_ http.ResponseWriter, _ *http.Request) { + _, _ = fmt.Fprintf(os.Stderr, "suspending indefinitely\n") stopCanary(w, r, c) }) http.Handle("/metrics", promhttp.Handler()) From a31bbaa738cb883f70e016a0a96f91482f7ee8c5 Mon Sep 17 00:00:00 2001 From: Joe Elliott Date: Fri, 3 Apr 2020 13:05:11 -0400 Subject: [PATCH 3/7] Added resume endpoint Signed-off-by: Joe Elliott --- cmd/loki-canary/main.go | 56 +++++++++++++++++++++++------ pkg/canary/comparator/comparator.go | 14 ++++---- 2 files changed, 54 insertions(+), 16 deletions(-) diff --git a/cmd/loki-canary/main.go b/cmd/loki-canary/main.go index 7cd910ade3b02..40692a3fad863 100644 --- a/cmd/loki-canary/main.go +++ b/cmd/loki-canary/main.go @@ -7,6 +7,7 @@ import ( "os" "os/signal" "strconv" + "sync" "syscall" "time" @@ -19,6 +20,14 @@ import ( "github.com/grafana/loki/pkg/canary/writer" ) +type canary struct { + lock sync.Mutex + + writer *writer.Writer + reader *reader.Reader + comparator *comparator.Comparator +} + func main() { lName := flag.String("labelname", "name", "The label name for this instance of loki-canary to use in the log selector") @@ -52,13 +61,29 @@ func main() { sentChan := make(chan time.Time) receivedChan := make(chan time.Time) - w := writer.NewWriter(os.Stdout, sentChan, *interval, *size) - r := reader.NewReader(os.Stderr, receivedChan, *tls, *addr, *user, *pass, *lName, *lVal) - c := comparator.NewComparator(os.Stderr, *wait, *pruneInterval, *buckets, sentChan, receivedChan, r, true) + c := &canary{} + startCanary := func() *canary { + c.stop() + + c.lock.Lock() + defer c.lock.Unlock() + + c.writer = writer.NewWriter(os.Stdout, sentChan, *interval, *size) + c.reader = reader.NewReader(os.Stderr, receivedChan, *tls, *addr, *user, *pass, *lName, *lVal) + c.comparator = comparator.NewComparator(os.Stderr, *wait, *pruneInterval, *buckets, sentChan, receivedChan, c.reader, true) + + return c + } + startCanary() + + http.HandleFunc("/resume", func(_ http.ResponseWriter, _ *http.Request) { + _, _ = fmt.Fprintf(os.Stderr, "restarting\n") + startCanary() + }) http.HandleFunc("/suspend", func(_ http.ResponseWriter, _ *http.Request) { - _, _ = fmt.Fprintf(os.Stderr, "suspending indefinitely\n") - stopCanary(w, r, c) + _, _ = fmt.Fprintf(os.Stderr, "suspending\n") + c.stop() }) http.Handle("/metrics", promhttp.Handler()) go func() { @@ -78,14 +103,25 @@ func main() { case <-interrupt: case <-terminate: _, _ = fmt.Fprintf(os.Stderr, "shutting down\n") - stopCanary(w, r, c) + c.stop() return } } } -func stopCanary(w *writer.Writer, r *reader.Reader, c *comparator.Comparator) { - w.Stop() - r.Stop() - c.Stop() +func (c *canary) stop() { + c.lock.Lock() + defer c.lock.Unlock() + + if c.writer == nil || c.reader == nil || c.comparator == nil { + return + } + + c.writer.Stop() + c.reader.Stop() + c.comparator.Stop() + + c.writer = nil + c.reader = nil + c.comparator = nil } diff --git a/pkg/canary/comparator/comparator.go b/pkg/canary/comparator/comparator.go index 3d118eb0c7b71..b687ce26cd60d 100644 --- a/pkg/canary/comparator/comparator.go +++ b/pkg/canary/comparator/comparator.go @@ -86,12 +86,14 @@ func NewComparator(writer io.Writer, maxWait time.Duration, pruneInterval time.D done: make(chan struct{}), } - responseLatency = promauto.NewHistogram(prometheus.HistogramOpts{ - Namespace: "loki_canary", - Name: "response_latency", - Help: "is how long it takes for log lines to be returned from Loki in seconds.", - Buckets: prometheus.ExponentialBuckets(0.5, 2, buckets), - }) + if responseLatency == nil { + responseLatency = promauto.NewHistogram(prometheus.HistogramOpts{ + Namespace: "loki_canary", + Name: "response_latency", + Help: "is how long it takes for log lines to be returned from Loki in seconds.", + Buckets: prometheus.ExponentialBuckets(0.5, 2, buckets), + }) + } go c.run() From 5eff05f550caafd74c821b47367a1de063af8909 Mon Sep 17 00:00:00 2001 From: Joe Elliott Date: Fri, 3 Apr 2020 13:11:18 -0400 Subject: [PATCH 4/7] Added documentation on new endpoints Signed-off-by: Joe Elliott --- docs/operations/loki-canary.md | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/docs/operations/loki-canary.md b/docs/operations/loki-canary.md index 1ce48a3599b75..900ec297ef575 100644 --- a/docs/operations/loki-canary.md +++ b/docs/operations/loki-canary.md @@ -47,6 +47,13 @@ determine if they are truly missing or only missing from the WebSocket. If missing entries are not found in the direct query, the `missing_entries` counter is incremented. +### Control + +Loki Canary responds to two endpoints to allow dynamic suspending/resuming of the +canary process. This can be useful if you'd like to quickly disable or reenable the +canary. To stop or start the canary issue an HTTP GET request against the `/suspend` or +`/resume` endpoints. + ## Installation ### Binary From 7f99f35aab58494f15a36140284458ca39ae56a3 Mon Sep 17 00:00:00 2001 From: Joe Elliott Date: Fri, 3 Apr 2020 13:15:10 -0400 Subject: [PATCH 5/7] Pass both signals to the same channel Signed-off-by: Joe Elliott --- cmd/loki-canary/main.go | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/cmd/loki-canary/main.go b/cmd/loki-canary/main.go index 40692a3fad863..b49eabaa62722 100644 --- a/cmd/loki-canary/main.go +++ b/cmd/loki-canary/main.go @@ -93,14 +93,11 @@ func main() { } }() - interrupt := make(chan os.Signal, 1) terminate := make(chan os.Signal, 1) - signal.Notify(interrupt, os.Interrupt) - signal.Notify(terminate, syscall.SIGTERM) + signal.Notify(terminate, syscall.SIGTERM, os.Interrupt) for { select { - case <-interrupt: case <-terminate: _, _ = fmt.Fprintf(os.Stderr, "shutting down\n") c.stop() From 975789f6a901df231bf97f3a3a740e88a7c7160b Mon Sep 17 00:00:00 2001 From: Joe Elliott Date: Fri, 3 Apr 2020 15:09:09 -0400 Subject: [PATCH 6/7] lint Signed-off-by: Joe Elliott --- cmd/loki-canary/main.go | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/cmd/loki-canary/main.go b/cmd/loki-canary/main.go index b49eabaa62722..4444810b27112 100644 --- a/cmd/loki-canary/main.go +++ b/cmd/loki-canary/main.go @@ -96,13 +96,10 @@ func main() { terminate := make(chan os.Signal, 1) signal.Notify(terminate, syscall.SIGTERM, os.Interrupt) - for { - select { - case <-terminate: - _, _ = fmt.Fprintf(os.Stderr, "shutting down\n") - c.stop() - return - } + for range terminate { + _, _ = fmt.Fprintf(os.Stderr, "shutting down\n") + c.stop() + return } } From bb12f998e73e8e4a30adccc7b1d2956adda3f3bf Mon Sep 17 00:00:00 2001 From: Joe Elliott Date: Sat, 4 Apr 2020 09:03:27 -0400 Subject: [PATCH 7/7] Removed return on startCanary Signed-off-by: Joe Elliott --- cmd/loki-canary/main.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/cmd/loki-canary/main.go b/cmd/loki-canary/main.go index 4444810b27112..cb75cf19f5175 100644 --- a/cmd/loki-canary/main.go +++ b/cmd/loki-canary/main.go @@ -62,7 +62,7 @@ func main() { receivedChan := make(chan time.Time) c := &canary{} - startCanary := func() *canary { + startCanary := func() { c.stop() c.lock.Lock() @@ -71,8 +71,6 @@ func main() { c.writer = writer.NewWriter(os.Stdout, sentChan, *interval, *size) c.reader = reader.NewReader(os.Stderr, receivedChan, *tls, *addr, *user, *pass, *lName, *lVal) c.comparator = comparator.NewComparator(os.Stderr, *wait, *pruneInterval, *buckets, sentChan, receivedChan, c.reader, true) - - return c } startCanary()