Skip to content

Commit

Permalink
feat(http): Gather values written stats
Browse files Browse the repository at this point in the history
WritePointsWithContext() was added to propagate context values down to
the engine and communicate stats to the caller.
  • Loading branch information
ayang64 committed Aug 3, 2020
1 parent 8e6ff4f commit 7a5d6f4
Show file tree
Hide file tree
Showing 3 changed files with 108 additions and 22 deletions.
101 changes: 80 additions & 21 deletions coordinator/points_writer.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package coordinator

import (
"context"
"errors"
"fmt"
"sort"
Expand Down Expand Up @@ -282,13 +283,37 @@ func (w *PointsWriter) WritePointsInto(p *IntoWriteRequest) error {
return w.WritePointsPrivileged(p.Database, p.RetentionPolicy, models.ConsistencyLevelOne, p.Points)
}

// WritePoints writes the data to the underlying storage. consitencyLevel and user are only used for clustered scenarios
// A wrapper for WritePointsWithContext()
func (w *PointsWriter) WritePoints(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, user meta.User, points []models.Point) error {
return w.WritePointsPrivileged(database, retentionPolicy, consistencyLevel, points)
return w.WritePointsWithContext(context.Background(), database, retentionPolicy, consistencyLevel, user, points)

}

type MetricKey int

const (
StatPointsWritten = MetricKey(iota)
StatValuesWritten
)

// WritePointsWithContext writes data to the underlying storage. consitencyLevel and user are only used for clustered scenarios.
//
func (w *PointsWriter) WritePointsWithContext(ctx context.Context, database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, user meta.User, points []models.Point) error {
return w.WritePointsPrivilegedWithContext(ctx, database, retentionPolicy, consistencyLevel, points)
}

// WritePointsPrivileged writes the data to the underlying storage, consitencyLevel is only used for clustered scenarios
func (w *PointsWriter) WritePointsPrivileged(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, points []models.Point) error {
return w.WritePointsPrivilegedWithContext(context.Background(), database, retentionPolicy, consistencyLevel, points)
}

// WritePointsPrivilegedWithContext writes the data to the underlying storage,
// consitencyLevel is only used for clustered scenarios
//
// If a request for StatPointsWritten or StatValueWritten of type MetricKey is
// sent via context values, this stores the total points and fields written in
// the memory pointed to by the associated wth the int64 pointers.
//
func (w *PointsWriter) WritePointsPrivilegedWithContext(ctx context.Context, database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, points []models.Point) error {
atomic.AddInt64(&w.stats.WriteReq, 1)
atomic.AddInt64(&w.stats.PointWriteReq, int64(len(points)))

Expand All @@ -309,10 +334,23 @@ func (w *PointsWriter) WritePointsPrivileged(database, retentionPolicy string, c
ch := make(chan error, len(shardMappings.Points))
for shardID, points := range shardMappings.Points {
go func(shard *meta.ShardInfo, database, retentionPolicy string, points []models.Point) {
err := w.writeToShard(shard, database, retentionPolicy, points)
var numPoints, numValues int64
ctx = context.WithValue(ctx, tsdb.StatPointsWritten, &numPoints)
ctx = context.WithValue(ctx, tsdb.StatValuesWritten, &numValues)

err := w.writeToShardWithContext(ctx, shard, database, retentionPolicy, points)
if err == tsdb.ErrShardDeletion {
err = tsdb.PartialWriteError{Reason: fmt.Sprintf("shard %d is pending deletion", shard.ID), Dropped: len(points)}
}

if v, ok := ctx.Value(StatPointsWritten).(*int64); ok {
atomic.AddInt64(v, numPoints)
}

if v, ok := ctx.Value(StatValuesWritten).(*int64); ok {
atomic.AddInt64(v, numValues)
}

ch <- err
}(shardMappings.Shards[shardID], database, retentionPolicy, points)
}
Expand Down Expand Up @@ -365,30 +403,51 @@ func (w *PointsWriter) WritePointsPrivileged(database, retentionPolicy string, c

// writeToShards writes points to a shard.
func (w *PointsWriter) writeToShard(shard *meta.ShardInfo, database, retentionPolicy string, points []models.Point) error {
return w.writeToShardWithContext(context.Background(), shard, database, retentionPolicy, points)
}

func (w *PointsWriter) writeToShardWithContext(ctx context.Context, shard *meta.ShardInfo, database, retentionPolicy string, points []models.Point) error {
atomic.AddInt64(&w.stats.PointWriteReqLocal, int64(len(points)))

err := w.TSDBStore.WriteToShard(shard.ID, points)
if err == nil {
atomic.AddInt64(&w.stats.WriteOK, 1)
// This is a small wrapper to make type-switching over w.TSDBStore a little
// less verbose.
writeToShard := func() error {
type shardWriterWithContext interface {
WriteToShardWithContext(context.Context, uint64, []models.Point) error
}
switch sw := w.TSDBStore.(type) {
case shardWriterWithContext:
if err := sw.WriteToShardWithContext(ctx, shard.ID, points); err != nil {
return err
}
default:
if err := w.TSDBStore.WriteToShard(shard.ID, points); err != nil {
return err
}
}
return nil
}

// Except tsdb.ErrShardNotFound no error can be handled here
if err != tsdb.ErrShardNotFound {
atomic.AddInt64(&w.stats.WriteErr, 1)
return err
}

// If we've written to shard that should exist on the current node, but the store has
// not actually created this shard, tell it to create it and retry the write
if err = w.TSDBStore.CreateShard(database, retentionPolicy, shard.ID, true); err != nil {
w.Logger.Info("Write failed", zap.Uint64("shard", shard.ID), zap.Error(err))
atomic.AddInt64(&w.stats.WriteErr, 1)
return err
}
if err := writeToShard(); err == tsdb.ErrShardNotFound {
// Shard doesn't exist -- lets create it and try again..

// If we've written to shard that should exist on the current node, but the
// store has not actually created this shard, tell it to create it and
// retry the write
if err = w.TSDBStore.CreateShard(database, retentionPolicy, shard.ID, true); err != nil {
w.Logger.Info("Write failed", zap.Uint64("shard", shard.ID), zap.Error(err))
atomic.AddInt64(&w.stats.WriteErr, 1)
return err
}

if err = w.TSDBStore.WriteToShard(shard.ID, points); err != nil {
w.Logger.Info("Write failed", zap.Uint64("shard", shard.ID), zap.Error(err))
// Now that we've created the shard, try to write to it again.
if err := writeToShard(); err != nil {
w.Logger.Info("Write failed", zap.Uint64("shard", shard.ID), zap.Error(err))
atomic.AddInt64(&w.stats.WriteErr, 1)
return err
}
} else {
atomic.AddInt64(&w.stats.WriteErr, 1)
return err
}
Expand Down
28 changes: 27 additions & 1 deletion services/httpd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/influxdata/flux"
"github.com/influxdata/flux/lang"
"github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/coordinator"
"github.com/influxdata/influxdb/logger"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/monitor"
Expand Down Expand Up @@ -352,6 +353,7 @@ type Statistics struct {
WriteRequestBytesReceived int64
QueryRequestBytesTransmitted int64
PointsWrittenOK int64
ValuesWrittenOK int64
PointsWrittenDropped int64
PointsWrittenFail int64
AuthenticationFailures int64
Expand Down Expand Up @@ -383,6 +385,7 @@ func (h *Handler) Statistics(tags map[string]string) []models.Statistic {
statWriteRequestBytesReceived: atomic.LoadInt64(&h.stats.WriteRequestBytesReceived),
statQueryRequestBytesTransmitted: atomic.LoadInt64(&h.stats.QueryRequestBytesTransmitted),
statPointsWrittenOK: atomic.LoadInt64(&h.stats.PointsWrittenOK),
statValuesWrittenOK: atomic.LoadInt64(&h.stats.ValuesWrittenOK),
statPointsWrittenDropped: atomic.LoadInt64(&h.stats.PointsWrittenDropped),
statPointsWrittenFail: atomic.LoadInt64(&h.stats.PointsWrittenFail),
statAuthFail: atomic.LoadInt64(&h.stats.AuthenticationFailures),
Expand Down Expand Up @@ -968,8 +971,31 @@ func (h *Handler) serveWrite(database, retentionPolicy, precision string, w http
}
}

type pointsWriterWithContext interface {
WritePointsWithContext(context.Context, string, string, models.ConsistencyLevel, meta.User, []models.Point) error
}

writePoints := func() error {
switch pw := h.PointsWriter.(type) {
case pointsWriterWithContext:
var npoints, nvalues int64
ctx := context.WithValue(context.Background(), coordinator.StatPointsWritten, &npoints)
ctx = context.WithValue(ctx, coordinator.StatValuesWritten, &nvalues)

// for now, just store the number of values used.
err := pw.WritePointsWithContext(ctx, database, retentionPolicy, consistency, user, points)
atomic.AddInt64(&h.stats.ValuesWrittenOK, nvalues)
if err != nil {
return err
}
return nil
default:
return h.PointsWriter.WritePoints(database, retentionPolicy, consistency, user, points)
}
}

// Write points.
if err := h.PointsWriter.WritePoints(database, retentionPolicy, consistency, user, points); influxdb.IsClientError(err) {
if err := writePoints(); influxdb.IsClientError(err) {
atomic.AddInt64(&h.stats.PointsWrittenFail, int64(len(points)))
h.httpError(w, err.Error(), http.StatusBadRequest)
return
Expand Down
1 change: 1 addition & 0 deletions services/httpd/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ const (
statWriteRequestBytesReceived = "writeReqBytes" // Sum of all bytes in write requests.
statQueryRequestBytesTransmitted = "queryRespBytes" // Sum of all bytes returned in query reponses.
statPointsWrittenOK = "pointsWrittenOK" // Number of points written OK.
statValuesWrittenOK = "valuesWrittenOK" // Number of values (fields) written OK.
statPointsWrittenDropped = "pointsWrittenDropped" // Number of points dropped by the storage engine.
statPointsWrittenFail = "pointsWrittenFail" // Number of points that failed to be written.
statAuthFail = "authFail" // Number of authentication failures.
Expand Down

0 comments on commit 7a5d6f4

Please sign in to comment.