Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

logging: removed some noise in logs from live-tailing #1100

Merged
merged 2 commits into from
Oct 3, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions pkg/ingester/tailer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,15 @@ package ingester

import (
"encoding/binary"
"fmt"
"hash/fnv"
"sync"
"time"

"github.com/cortexproject/cortex/pkg/util"
cortex_util "github.com/cortexproject/cortex/pkg/util"
"github.com/go-kit/kit/log/level"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql"
"github.com/grafana/loki/pkg/util"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
)
Expand Down Expand Up @@ -91,7 +91,10 @@ func (t *tailer) loop() {
tailResponse := logproto.TailResponse{Stream: stream, DroppedStreams: t.popDroppedStreams()}
err = t.conn.Send(&tailResponse)
if err != nil {
level.Error(util.Logger).Log("Error writing to tail client", fmt.Sprintf("%v", err))
// Don't log any error due to tail client closing the connection
if !util.IsConnCanceled(err) {
level.Error(cortex_util.Logger).Log("msg", "Error writing to tail client", "err", err)
}
t.close()
return
}
Expand Down
22 changes: 11 additions & 11 deletions pkg/querier/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -354,19 +354,19 @@ func (q *Querier) TailHandler(w http.ResponseWriter, r *http.Request) {

if tailRequestPtr.DelayFor > maxDelayForInTailing {
server.WriteError(w, fmt.Errorf("delay_for can't be greater than %d", maxDelayForInTailing))
level.Error(util.Logger).Log("Error in upgrading websocket", fmt.Sprintf("%v", err))
level.Error(util.Logger).Log("msg", "Error in upgrading websocket", "err", err)
return
}

conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
level.Error(util.Logger).Log("Error in upgrading websocket", fmt.Sprintf("%v", err))
level.Error(util.Logger).Log("msg", "Error in upgrading websocket", "err", err)
return
}

defer func() {
if err := conn.Close(); err != nil {
level.Error(util.Logger).Log("Error closing websocket", fmt.Sprintf("%v", err))
level.Error(util.Logger).Log("msg", "Error closing websocket", "err", err)
}
}()

Expand All @@ -377,13 +377,13 @@ func (q *Querier) TailHandler(w http.ResponseWriter, r *http.Request) {
tailer, err := q.Tail(r.Context(), &tailRequest)
if err != nil {
if err := conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseInternalServerErr, err.Error())); err != nil {
level.Error(util.Logger).Log("Error connecting to ingesters for tailing", fmt.Sprintf("%v", err))
level.Error(util.Logger).Log("msg", "Error connecting to ingesters for tailing", "err", err)
}
return
}
defer func() {
if err := tailer.close(); err != nil {
level.Error(util.Logger).Log("Error closing Tailer", fmt.Sprintf("%v", err))
level.Error(util.Logger).Log("msg", "Error closing Tailer", "err", err)
}
}()

Expand All @@ -404,25 +404,25 @@ func (q *Querier) TailHandler(w http.ResponseWriter, r *http.Request) {
err = marshal_legacy.WriteTailResponseJSON(*response, conn)
}
if err != nil {
level.Error(util.Logger).Log("Error writing to websocket", fmt.Sprintf("%v", err))
level.Error(util.Logger).Log("msg", "Error writing to websocket", "err", err)
if err := conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseInternalServerErr, err.Error())); err != nil {
level.Error(util.Logger).Log("Error writing close message to websocket", fmt.Sprintf("%v", err))
level.Error(util.Logger).Log("msg", "Error writing close message to websocket", "err", err)
}
return
}

case err := <-closeErrChan:
level.Error(util.Logger).Log("Error from iterator", fmt.Sprintf("%v", err))
level.Error(util.Logger).Log("msg", "Error from iterator", "err", err)
if err := conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseInternalServerErr, err.Error())); err != nil {
level.Error(util.Logger).Log("Error writing close message to websocket", fmt.Sprintf("%v", err))
level.Error(util.Logger).Log("msg", "Error writing close message to websocket", "err", err)
}
return
case <-ticker.C:
// This is to periodically check whether connection is active, useful to clean up dead connections when there are no entries to send
if err := conn.WriteMessage(websocket.PingMessage, nil); err != nil {
level.Error(util.Logger).Log("Error writing ping message to websocket", fmt.Sprintf("%v", err))
level.Error(util.Logger).Log("msg", "Error writing ping message to websocket", "err", err)
if err := conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseInternalServerErr, err.Error())); err != nil {
level.Error(util.Logger).Log("Error writing close message to websocket", fmt.Sprintf("%v", err))
level.Error(util.Logger).Log("msg", "Error writing close message to websocket", "err", err)
}
return
}
Expand Down
12 changes: 7 additions & 5 deletions pkg/querier/tail.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package querier

import (
"fmt"
"sync"
"time"

Expand Down Expand Up @@ -126,12 +125,12 @@ func (t *Tailer) loop() {
if numClients == 0 {
// All the connections to ingesters are dropped, try reconnecting or return error
if err := t.checkIngesterConnections(); err != nil {
level.Error(util.Logger).Log("Error reconnecting to ingesters", fmt.Sprintf("%v", err))
level.Error(util.Logger).Log("msg", "Error reconnecting to ingesters", "err", err)
} else {
continue
}
if err := t.close(); err != nil {
level.Error(util.Logger).Log("Error closing Tailer", fmt.Sprintf("%v", err))
level.Error(util.Logger).Log("msg", "Error closing Tailer", "err", err)
}
t.closeErrChan <- errors.New("all ingesters closed the connection")
return
Expand Down Expand Up @@ -199,13 +198,16 @@ func (t *Tailer) readTailClient(addr string, querierTailClient logproto.Querier_
for {
if t.stopped {
if err := querierTailClient.CloseSend(); err != nil {
level.Error(util.Logger).Log("Error closing grpc tail client", fmt.Sprintf("%v", err))
level.Error(util.Logger).Log("msg", "Error closing grpc tail client", "err", err)
}
break
}
resp, err = querierTailClient.Recv()
if err != nil {
level.Error(util.Logger).Log("Error receiving response from grpc tail client", fmt.Sprintf("%v", err))
// We don't want to log error when its due to stopping the tail request
if !t.stopped {
level.Error(util.Logger).Log("msg", "Error receiving response from grpc tail client", "err", err)
}
break
}
t.pushTailResponseFromIngester(resp)
Expand Down
20 changes: 20 additions & 0 deletions pkg/util/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ package util
import (
"bytes"
"fmt"

"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

// The MultiError type implements the error interface, and contains the
Expand Down Expand Up @@ -46,3 +49,20 @@ func (es MultiError) Err() error {
}
return es
}

// IsConnCanceled returns true, if error is from a closed gRPC connection.
// copied from https://github.com/etcd-io/etcd/blob/7f47de84146bdc9225d2080ec8678ca8189a2d2b/clientv3/client.go#L646
func IsConnCanceled(err error) bool {
if err == nil {
return false
}

// >= gRPC v1.23.x
s, ok := status.FromError(err)
if ok {
// connection is canceled or server has already closed the connection
return s.Code() == codes.Canceled || s.Message() == "transport is closing"
}

return false
}