Skip to content

Commit

Permalink
etcdserver/api/rafthttp: add v3 snapshot send/receive metrics
Browse files Browse the repository at this point in the history
etcd_network_snapshot_send
etcd_network_snapshot_send_failures
etcd_network_snapshot_send_total_duration_seconds
etcd_network_snapshot_receive
etcd_network_snapshot_receive_failures
etcd_network_snapshot_receive_total_duration_seconds

Signed-off-by: Gyuho Lee <leegyuho@amazon.com>
  • Loading branch information
gyuho committed Aug 12, 2018
1 parent 85a8224 commit b63790b
Show file tree
Hide file tree
Showing 3 changed files with 103 additions and 13 deletions.
29 changes: 21 additions & 8 deletions etcdserver/api/rafthttp/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"net/http"
"path"
"strings"
"time"

"github.com/coreos/etcd/etcdserver/api/snap"
pioutil "github.com/coreos/etcd/pkg/ioutil"
Expand Down Expand Up @@ -195,16 +196,20 @@ func newSnapshotHandler(t *Transport, r Raft, snapshotter *snap.Snapshotter, cid
// received and processed.
// 2. this case should happen rarely, so no further optimization is done.
func (h *snapshotHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
start := time.Now()

if r.Method != "POST" {
w.Header().Set("Allow", "POST")
http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed)
snapshotReceiveFailures.WithLabelValues("").Inc()
return
}

w.Header().Set("X-Etcd-Cluster-ID", h.cid.String())

if err := checkClusterCompatibilityFromHeader(h.lg, h.localID, r.Header, h.cid); err != nil {
http.Error(w, err.Error(), http.StatusPreconditionFailed)
snapshotReceiveFailures.WithLabelValues("").Inc()
return
}

Expand All @@ -213,46 +218,49 @@ func (h *snapshotHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
dec := &messageDecoder{r: r.Body}
// let snapshots be very large since they can exceed 512MB for large installations
m, err := dec.decodeLimit(uint64(1 << 63))
from := types.ID(m.From).String()
if err != nil {
msg := fmt.Sprintf("failed to decode raft message (%v)", err)
if h.lg != nil {
h.lg.Warn(
"failed to decode Raft message",
zap.String("local-member-id", h.localID.String()),
zap.String("remote-snapshot-sender-id", types.ID(m.From).String()),
zap.String("remote-snapshot-sender-id", from),
zap.Error(err),
)
} else {
plog.Error(msg)
}
http.Error(w, msg, http.StatusBadRequest)
recvFailures.WithLabelValues(r.RemoteAddr).Inc()
snapshotReceiveFailures.WithLabelValues(from).Inc()
return
}

msgSize := m.Size()
receivedBytes.WithLabelValues(types.ID(m.From).String()).Add(float64(msgSize))
receivedBytes.WithLabelValues(from).Add(float64(msgSize))

if m.Type != raftpb.MsgSnap {
if h.lg != nil {
h.lg.Warn(
"unexpected Raft message type",
zap.String("local-member-id", h.localID.String()),
zap.String("remote-snapshot-sender-id", types.ID(m.From).String()),
zap.String("remote-snapshot-sender-id", from),
zap.String("message-type", m.Type.String()),
)
} else {
plog.Errorf("unexpected raft message type %s on snapshot path", m.Type)
}
http.Error(w, "wrong raft message type", http.StatusBadRequest)
snapshotReceiveFailures.WithLabelValues(from).Inc()
return
}

if h.lg != nil {
h.lg.Info(
"receiving database snapshot",
zap.String("local-member-id", h.localID.String()),
zap.String("remote-snapshot-sender-id", types.ID(m.From).String()),
zap.String("remote-snapshot-sender-id", from),
zap.Uint64("incoming-snapshot-index", m.Snapshot.Metadata.Index),
zap.Int("incoming-snapshot-message-size-bytes", msgSize),
zap.String("incoming-snapshot-message-size", humanize.Bytes(uint64(msgSize))),
Expand All @@ -269,24 +277,25 @@ func (h *snapshotHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
h.lg.Warn(
"failed to save incoming database snapshot",
zap.String("local-member-id", h.localID.String()),
zap.String("remote-snapshot-sender-id", types.ID(m.From).String()),
zap.String("remote-snapshot-sender-id", from),
zap.Uint64("incoming-snapshot-index", m.Snapshot.Metadata.Index),
zap.Error(err),
)
} else {
plog.Error(msg)
}
http.Error(w, msg, http.StatusInternalServerError)
snapshotReceiveFailures.WithLabelValues(from).Inc()
return
}

receivedBytes.WithLabelValues(types.ID(m.From).String()).Add(float64(n))
receivedBytes.WithLabelValues(from).Add(float64(n))

if h.lg != nil {
h.lg.Info(
"received and saved database snapshot",
zap.String("local-member-id", h.localID.String()),
zap.String("remote-snapshot-sender-id", types.ID(m.From).String()),
zap.String("remote-snapshot-sender-id", from),
zap.Uint64("incoming-snapshot-index", m.Snapshot.Metadata.Index),
zap.Int64("incoming-snapshot-size-bytes", n),
zap.String("incoming-snapshot-size", humanize.Bytes(uint64(n))),
Expand All @@ -307,20 +316,24 @@ func (h *snapshotHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
h.lg.Warn(
"failed to process Raft message",
zap.String("local-member-id", h.localID.String()),
zap.String("remote-snapshot-sender-id", types.ID(m.From).String()),
zap.String("remote-snapshot-sender-id", from),
zap.Error(err),
)
} else {
plog.Error(msg)
}
http.Error(w, msg, http.StatusInternalServerError)
snapshotReceiveFailures.WithLabelValues(from).Inc()
}
return
}

// Write StatusNoContent header after the message has been processed by
// raft, which facilitates the client to report MsgSnap status.
w.WriteHeader(http.StatusNoContent)

snapshotReceive.WithLabelValues(from).Inc()
snapshotReceiveSeconds.WithLabelValues(from).Observe(time.Since(start).Seconds())
}

type streamHandler struct {
Expand Down
70 changes: 70 additions & 0 deletions etcdserver/api/rafthttp/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,68 @@ var (
[]string{"From"},
)

snapshotSend = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "etcd",
Subsystem: "network",
Name: "snapshot_send",
Help: "Total number of snapshot sends",
},
[]string{"To"},
)

snapshotSendFailures = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "etcd",
Subsystem: "network",
Name: "snapshot_send_failures",
Help: "Total number of snapshot send failures",
},
[]string{"To"},
)

snapshotSendSeconds = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "etcd",
Subsystem: "network",
Name: "snapshot_send_total_duration_seconds",
Help: "Total latency distributions of v3 snapshot sends",

// lowest bucket start of upper bound 0.001 sec (1 ms) with factor 2
// highest bucket start of 0.001 sec * 2^13 == 8.192 sec
Buckets: prometheus.ExponentialBuckets(0.001, 2, 14),
},
[]string{"To"},
)

snapshotReceive = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "etcd",
Subsystem: "network",
Name: "snapshot_receive",
Help: "Total number of snapshot receives",
},
[]string{"From"},
)

snapshotReceiveFailures = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "etcd",
Subsystem: "network",
Name: "snapshot_receive_failures",
Help: "Total number of snapshot receive failures",
},
[]string{"From"},
)

snapshotReceiveSeconds = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "etcd",
Subsystem: "network",
Name: "snapshot_receive_total_duration_seconds",
Help: "Total latency distributions of v3 snapshot receives",

// lowest bucket start of upper bound 0.001 sec (1 ms) with factor 2
// highest bucket start of 0.001 sec * 2^13 == 8.192 sec
Buckets: prometheus.ExponentialBuckets(0.001, 2, 14),
},
[]string{"From"},
)

rttSec = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "etcd",
Subsystem: "network",
Expand All @@ -92,5 +154,13 @@ func init() {
prometheus.MustRegister(receivedBytes)
prometheus.MustRegister(sentFailures)
prometheus.MustRegister(recvFailures)

prometheus.MustRegister(snapshotSend)
prometheus.MustRegister(snapshotSendFailures)
prometheus.MustRegister(snapshotSendSeconds)
prometheus.MustRegister(snapshotReceive)
prometheus.MustRegister(snapshotReceiveFailures)
prometheus.MustRegister(snapshotReceiveSeconds)

prometheus.MustRegister(rttSec)
}
17 changes: 12 additions & 5 deletions etcdserver/api/rafthttp/snapshot_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,10 @@ func newSnapshotSender(tr *Transport, picker *urlPicker, to types.ID, status *pe
func (s *snapshotSender) stop() { close(s.stopc) }

func (s *snapshotSender) send(merged snap.Message) {
start := time.Now()

m := merged.Message
to := types.ID(m.To).String()

body := createSnapBody(s.tr.Logger, merged)
defer body.Close()
Expand All @@ -79,7 +82,7 @@ func (s *snapshotSender) send(merged snap.Message) {
s.tr.Logger.Info(
"sending database snapshot",
zap.Uint64("snapshot-index", m.Snapshot.Metadata.Index),
zap.String("remote-peer-id", types.ID(m.To).String()),
zap.String("remote-peer-id", to),
zap.Int64("bytes", merged.TotalSize),
zap.String("size", humanize.Bytes(uint64(merged.TotalSize))),
)
Expand All @@ -94,7 +97,7 @@ func (s *snapshotSender) send(merged snap.Message) {
s.tr.Logger.Warn(
"failed to send database snapshot",
zap.Uint64("snapshot-index", m.Snapshot.Metadata.Index),
zap.String("remote-peer-id", types.ID(m.To).String()),
zap.String("remote-peer-id", to),
zap.Int64("bytes", merged.TotalSize),
zap.String("size", humanize.Bytes(uint64(merged.TotalSize))),
zap.Error(err),
Expand All @@ -116,7 +119,8 @@ func (s *snapshotSender) send(merged snap.Message) {
// machine knows about it, it would pause a while and retry sending
// new snapshot message.
s.r.ReportSnapshot(m.To, raft.SnapshotFailure)
sentFailures.WithLabelValues(types.ID(m.To).String()).Inc()
sentFailures.WithLabelValues(to).Inc()
snapshotSendFailures.WithLabelValues(to).Inc()
return
}
s.status.activate()
Expand All @@ -126,15 +130,18 @@ func (s *snapshotSender) send(merged snap.Message) {
s.tr.Logger.Info(
"sent database snapshot",
zap.Uint64("snapshot-index", m.Snapshot.Metadata.Index),
zap.String("remote-peer-id", types.ID(m.To).String()),
zap.String("remote-peer-id", to),
zap.Int64("bytes", merged.TotalSize),
zap.String("size", humanize.Bytes(uint64(merged.TotalSize))),
)
} else {
plog.Infof("database snapshot [index: %d, to: %s] sent out successfully", m.Snapshot.Metadata.Index, types.ID(m.To))
}

sentBytes.WithLabelValues(types.ID(m.To).String()).Add(float64(merged.TotalSize))
sentBytes.WithLabelValues(to).Add(float64(merged.TotalSize))

snapshotSend.WithLabelValues(to).Inc()
snapshotSendSeconds.WithLabelValues(to).Observe(time.Since(start).Seconds())
}

// post posts the given request.
Expand Down

0 comments on commit b63790b

Please sign in to comment.