From ebc4ab22127f97c4d89cb10a4329a3f5229c6be6 Mon Sep 17 00:00:00 2001 From: Sandeep Sukhani Date: Wed, 2 Oct 2019 11:41:36 +0530 Subject: [PATCH 1/2] Removed some noise in logs by detecting cases where connection is erroring due to stopped or closed tail request --- pkg/ingester/tailer.go | 8 ++++++-- pkg/querier/tail.go | 5 ++++- pkg/util/errors.go | 20 ++++++++++++++++++++ 3 files changed, 30 insertions(+), 3 deletions(-) diff --git a/pkg/ingester/tailer.go b/pkg/ingester/tailer.go index 1a282c802b774..594502e672f3f 100644 --- a/pkg/ingester/tailer.go +++ b/pkg/ingester/tailer.go @@ -7,10 +7,11 @@ import ( "sync" "time" - "github.com/cortexproject/cortex/pkg/util" + cortex_util "github.com/cortexproject/cortex/pkg/util" "github.com/go-kit/kit/log/level" "github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/logql" + "github.com/grafana/loki/pkg/util" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/pkg/labels" ) @@ -91,7 +92,10 @@ func (t *tailer) loop() { tailResponse := logproto.TailResponse{Stream: stream, DroppedStreams: t.popDroppedStreams()} err = t.conn.Send(&tailResponse) if err != nil { - level.Error(util.Logger).Log("Error writing to tail client", fmt.Sprintf("%v", err)) + // Don't log any error due to tail client closing the connection + if !util.IsConnCanceled(err) { + level.Error(cortex_util.Logger).Log("Error writing to tail client", fmt.Sprintf("%v", err)) + } t.close() return } diff --git a/pkg/querier/tail.go b/pkg/querier/tail.go index 6296387f095a5..1a34d2519e0ee 100644 --- a/pkg/querier/tail.go +++ b/pkg/querier/tail.go @@ -205,7 +205,10 @@ func (t *Tailer) readTailClient(addr string, querierTailClient logproto.Querier_ } resp, err = querierTailClient.Recv() if err != nil { - level.Error(util.Logger).Log("Error receiving response from grpc tail client", fmt.Sprintf("%v", err)) + // We don't want to log error when its due to stopping the tail request + if !t.stopped { + level.Error(util.Logger).Log("Error receiving response from grpc tail client", fmt.Sprintf("%v", err)) + } break } t.pushTailResponseFromIngester(resp) diff --git a/pkg/util/errors.go b/pkg/util/errors.go index 377b40a381bc1..0a07d87039437 100644 --- a/pkg/util/errors.go +++ b/pkg/util/errors.go @@ -3,6 +3,9 @@ package util import ( "bytes" "fmt" + + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) // The MultiError type implements the error interface, and contains the @@ -46,3 +49,20 @@ func (es MultiError) Err() error { } return es } + +// IsConnCanceled returns true, if error is from a closed gRPC connection. +// copied from https://github.com/etcd-io/etcd/blob/7f47de84146bdc9225d2080ec8678ca8189a2d2b/clientv3/client.go#L646 +func IsConnCanceled(err error) bool { + if err == nil { + return false + } + + // >= gRPC v1.23.x + s, ok := status.FromError(err) + if ok { + // connection is canceled or server has already closed the connection + return s.Code() == codes.Canceled || s.Message() == "transport is closing" + } + + return false +} From e1be484059c4d3b70bdc50a6057815f7041f712c Mon Sep 17 00:00:00 2001 From: Sandeep Sukhani Date: Thu, 3 Oct 2019 09:59:33 +0530 Subject: [PATCH 2/2] fixed log formats --- pkg/ingester/tailer.go | 3 +-- pkg/querier/http.go | 22 +++++++++++----------- pkg/querier/tail.go | 9 ++++----- 3 files changed, 16 insertions(+), 18 deletions(-) diff --git a/pkg/ingester/tailer.go b/pkg/ingester/tailer.go index 594502e672f3f..b0927a2cb73d7 100644 --- a/pkg/ingester/tailer.go +++ b/pkg/ingester/tailer.go @@ -2,7 +2,6 @@ package ingester import ( "encoding/binary" - "fmt" "hash/fnv" "sync" "time" @@ -94,7 +93,7 @@ func (t *tailer) loop() { if err != nil { // Don't log any error due to tail client closing the connection if !util.IsConnCanceled(err) { - level.Error(cortex_util.Logger).Log("Error writing to tail client", fmt.Sprintf("%v", err)) + level.Error(cortex_util.Logger).Log("msg", "Error writing to tail client", "err", err) } t.close() return diff --git a/pkg/querier/http.go b/pkg/querier/http.go index 2b6172f16946c..887fae501046b 100644 --- a/pkg/querier/http.go +++ b/pkg/querier/http.go @@ -354,19 +354,19 @@ func (q *Querier) TailHandler(w http.ResponseWriter, r *http.Request) { if tailRequestPtr.DelayFor > maxDelayForInTailing { server.WriteError(w, fmt.Errorf("delay_for can't be greater than %d", maxDelayForInTailing)) - level.Error(util.Logger).Log("Error in upgrading websocket", fmt.Sprintf("%v", err)) + level.Error(util.Logger).Log("msg", "Error in upgrading websocket", "err", err) return } conn, err := upgrader.Upgrade(w, r, nil) if err != nil { - level.Error(util.Logger).Log("Error in upgrading websocket", fmt.Sprintf("%v", err)) + level.Error(util.Logger).Log("msg", "Error in upgrading websocket", "err", err) return } defer func() { if err := conn.Close(); err != nil { - level.Error(util.Logger).Log("Error closing websocket", fmt.Sprintf("%v", err)) + level.Error(util.Logger).Log("msg", "Error closing websocket", "err", err) } }() @@ -377,13 +377,13 @@ func (q *Querier) TailHandler(w http.ResponseWriter, r *http.Request) { tailer, err := q.Tail(r.Context(), &tailRequest) if err != nil { if err := conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseInternalServerErr, err.Error())); err != nil { - level.Error(util.Logger).Log("Error connecting to ingesters for tailing", fmt.Sprintf("%v", err)) + level.Error(util.Logger).Log("msg", "Error connecting to ingesters for tailing", "err", err) } return } defer func() { if err := tailer.close(); err != nil { - level.Error(util.Logger).Log("Error closing Tailer", fmt.Sprintf("%v", err)) + level.Error(util.Logger).Log("msg", "Error closing Tailer", "err", err) } }() @@ -404,25 +404,25 @@ func (q *Querier) TailHandler(w http.ResponseWriter, r *http.Request) { err = marshal_legacy.WriteTailResponseJSON(*response, conn) } if err != nil { - level.Error(util.Logger).Log("Error writing to websocket", fmt.Sprintf("%v", err)) + level.Error(util.Logger).Log("msg", "Error writing to websocket", "err", err) if err := conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseInternalServerErr, err.Error())); err != nil { - level.Error(util.Logger).Log("Error writing close message to websocket", fmt.Sprintf("%v", err)) + level.Error(util.Logger).Log("msg", "Error writing close message to websocket", "err", err) } return } case err := <-closeErrChan: - level.Error(util.Logger).Log("Error from iterator", fmt.Sprintf("%v", err)) + level.Error(util.Logger).Log("msg", "Error from iterator", "err", err) if err := conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseInternalServerErr, err.Error())); err != nil { - level.Error(util.Logger).Log("Error writing close message to websocket", fmt.Sprintf("%v", err)) + level.Error(util.Logger).Log("msg", "Error writing close message to websocket", "err", err) } return case <-ticker.C: // This is to periodically check whether connection is active, useful to clean up dead connections when there are no entries to send if err := conn.WriteMessage(websocket.PingMessage, nil); err != nil { - level.Error(util.Logger).Log("Error writing ping message to websocket", fmt.Sprintf("%v", err)) + level.Error(util.Logger).Log("msg", "Error writing ping message to websocket", "err", err) if err := conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseInternalServerErr, err.Error())); err != nil { - level.Error(util.Logger).Log("Error writing close message to websocket", fmt.Sprintf("%v", err)) + level.Error(util.Logger).Log("msg", "Error writing close message to websocket", "err", err) } return } diff --git a/pkg/querier/tail.go b/pkg/querier/tail.go index 1a34d2519e0ee..48abc3fc4822c 100644 --- a/pkg/querier/tail.go +++ b/pkg/querier/tail.go @@ -1,7 +1,6 @@ package querier import ( - "fmt" "sync" "time" @@ -126,12 +125,12 @@ func (t *Tailer) loop() { if numClients == 0 { // All the connections to ingesters are dropped, try reconnecting or return error if err := t.checkIngesterConnections(); err != nil { - level.Error(util.Logger).Log("Error reconnecting to ingesters", fmt.Sprintf("%v", err)) + level.Error(util.Logger).Log("msg", "Error reconnecting to ingesters", "err", err) } else { continue } if err := t.close(); err != nil { - level.Error(util.Logger).Log("Error closing Tailer", fmt.Sprintf("%v", err)) + level.Error(util.Logger).Log("msg", "Error closing Tailer", "err", err) } t.closeErrChan <- errors.New("all ingesters closed the connection") return @@ -199,7 +198,7 @@ func (t *Tailer) readTailClient(addr string, querierTailClient logproto.Querier_ for { if t.stopped { if err := querierTailClient.CloseSend(); err != nil { - level.Error(util.Logger).Log("Error closing grpc tail client", fmt.Sprintf("%v", err)) + level.Error(util.Logger).Log("msg", "Error closing grpc tail client", "err", err) } break } @@ -207,7 +206,7 @@ func (t *Tailer) readTailClient(addr string, querierTailClient logproto.Querier_ if err != nil { // We don't want to log error when its due to stopping the tail request if !t.stopped { - level.Error(util.Logger).Log("Error receiving response from grpc tail client", fmt.Sprintf("%v", err)) + level.Error(util.Logger).Log("msg", "Error receiving response from grpc tail client", "err", err) } break }