diff --git a/pkg/ingester/tailer.go b/pkg/ingester/tailer.go index 1a282c802b774..b0927a2cb73d7 100644 --- a/pkg/ingester/tailer.go +++ b/pkg/ingester/tailer.go @@ -2,15 +2,15 @@ package ingester import ( "encoding/binary" - "fmt" "hash/fnv" "sync" "time" - "github.com/cortexproject/cortex/pkg/util" + cortex_util "github.com/cortexproject/cortex/pkg/util" "github.com/go-kit/kit/log/level" "github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/logql" + "github.com/grafana/loki/pkg/util" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/pkg/labels" ) @@ -91,7 +91,10 @@ func (t *tailer) loop() { tailResponse := logproto.TailResponse{Stream: stream, DroppedStreams: t.popDroppedStreams()} err = t.conn.Send(&tailResponse) if err != nil { - level.Error(util.Logger).Log("Error writing to tail client", fmt.Sprintf("%v", err)) + // Don't log any error due to tail client closing the connection + if !util.IsConnCanceled(err) { + level.Error(cortex_util.Logger).Log("msg", "Error writing to tail client", "err", err) + } t.close() return } diff --git a/pkg/querier/http.go b/pkg/querier/http.go index 2b6172f16946c..887fae501046b 100644 --- a/pkg/querier/http.go +++ b/pkg/querier/http.go @@ -354,19 +354,19 @@ func (q *Querier) TailHandler(w http.ResponseWriter, r *http.Request) { if tailRequestPtr.DelayFor > maxDelayForInTailing { server.WriteError(w, fmt.Errorf("delay_for can't be greater than %d", maxDelayForInTailing)) - level.Error(util.Logger).Log("Error in upgrading websocket", fmt.Sprintf("%v", err)) + level.Error(util.Logger).Log("msg", "Error in upgrading websocket", "err", err) return } conn, err := upgrader.Upgrade(w, r, nil) if err != nil { - level.Error(util.Logger).Log("Error in upgrading websocket", fmt.Sprintf("%v", err)) + level.Error(util.Logger).Log("msg", "Error in upgrading websocket", "err", err) return } defer func() { if err := conn.Close(); err != nil { - level.Error(util.Logger).Log("Error closing websocket", fmt.Sprintf("%v", err)) + level.Error(util.Logger).Log("msg", "Error closing websocket", "err", err) } }() @@ -377,13 +377,13 @@ func (q *Querier) TailHandler(w http.ResponseWriter, r *http.Request) { tailer, err := q.Tail(r.Context(), &tailRequest) if err != nil { if err := conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseInternalServerErr, err.Error())); err != nil { - level.Error(util.Logger).Log("Error connecting to ingesters for tailing", fmt.Sprintf("%v", err)) + level.Error(util.Logger).Log("msg", "Error connecting to ingesters for tailing", "err", err) } return } defer func() { if err := tailer.close(); err != nil { - level.Error(util.Logger).Log("Error closing Tailer", fmt.Sprintf("%v", err)) + level.Error(util.Logger).Log("msg", "Error closing Tailer", "err", err) } }() @@ -404,25 +404,25 @@ func (q *Querier) TailHandler(w http.ResponseWriter, r *http.Request) { err = marshal_legacy.WriteTailResponseJSON(*response, conn) } if err != nil { - level.Error(util.Logger).Log("Error writing to websocket", fmt.Sprintf("%v", err)) + level.Error(util.Logger).Log("msg", "Error writing to websocket", "err", err) if err := conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseInternalServerErr, err.Error())); err != nil { - level.Error(util.Logger).Log("Error writing close message to websocket", fmt.Sprintf("%v", err)) + level.Error(util.Logger).Log("msg", "Error writing close message to websocket", "err", err) } return } case err := <-closeErrChan: - level.Error(util.Logger).Log("Error from iterator", fmt.Sprintf("%v", err)) + level.Error(util.Logger).Log("msg", "Error from iterator", "err", err) if err := conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseInternalServerErr, err.Error())); err != nil { - level.Error(util.Logger).Log("Error writing close message to websocket", fmt.Sprintf("%v", err)) + level.Error(util.Logger).Log("msg", "Error writing close message to websocket", "err", err) } return case <-ticker.C: // This is to periodically check whether connection is active, useful to clean up dead connections when there are no entries to send if err := conn.WriteMessage(websocket.PingMessage, nil); err != nil { - level.Error(util.Logger).Log("Error writing ping message to websocket", fmt.Sprintf("%v", err)) + level.Error(util.Logger).Log("msg", "Error writing ping message to websocket", "err", err) if err := conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseInternalServerErr, err.Error())); err != nil { - level.Error(util.Logger).Log("Error writing close message to websocket", fmt.Sprintf("%v", err)) + level.Error(util.Logger).Log("msg", "Error writing close message to websocket", "err", err) } return } diff --git a/pkg/querier/tail.go b/pkg/querier/tail.go index 6296387f095a5..48abc3fc4822c 100644 --- a/pkg/querier/tail.go +++ b/pkg/querier/tail.go @@ -1,7 +1,6 @@ package querier import ( - "fmt" "sync" "time" @@ -126,12 +125,12 @@ func (t *Tailer) loop() { if numClients == 0 { // All the connections to ingesters are dropped, try reconnecting or return error if err := t.checkIngesterConnections(); err != nil { - level.Error(util.Logger).Log("Error reconnecting to ingesters", fmt.Sprintf("%v", err)) + level.Error(util.Logger).Log("msg", "Error reconnecting to ingesters", "err", err) } else { continue } if err := t.close(); err != nil { - level.Error(util.Logger).Log("Error closing Tailer", fmt.Sprintf("%v", err)) + level.Error(util.Logger).Log("msg", "Error closing Tailer", "err", err) } t.closeErrChan <- errors.New("all ingesters closed the connection") return @@ -199,13 +198,16 @@ func (t *Tailer) readTailClient(addr string, querierTailClient logproto.Querier_ for { if t.stopped { if err := querierTailClient.CloseSend(); err != nil { - level.Error(util.Logger).Log("Error closing grpc tail client", fmt.Sprintf("%v", err)) + level.Error(util.Logger).Log("msg", "Error closing grpc tail client", "err", err) } break } resp, err = querierTailClient.Recv() if err != nil { - level.Error(util.Logger).Log("Error receiving response from grpc tail client", fmt.Sprintf("%v", err)) + // We don't want to log error when its due to stopping the tail request + if !t.stopped { + level.Error(util.Logger).Log("msg", "Error receiving response from grpc tail client", "err", err) + } break } t.pushTailResponseFromIngester(resp) diff --git a/pkg/util/errors.go b/pkg/util/errors.go index 377b40a381bc1..0a07d87039437 100644 --- a/pkg/util/errors.go +++ b/pkg/util/errors.go @@ -3,6 +3,9 @@ package util import ( "bytes" "fmt" + + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) // The MultiError type implements the error interface, and contains the @@ -46,3 +49,20 @@ func (es MultiError) Err() error { } return es } + +// IsConnCanceled returns true, if error is from a closed gRPC connection. +// copied from https://github.com/etcd-io/etcd/blob/7f47de84146bdc9225d2080ec8678ca8189a2d2b/clientv3/client.go#L646 +func IsConnCanceled(err error) bool { + if err == nil { + return false + } + + // >= gRPC v1.23.x + s, ok := status.FromError(err) + if ok { + // connection is canceled or server has already closed the connection + return s.Code() == codes.Canceled || s.Message() == "transport is closing" + } + + return false +}