Skip to content

Commit

Permalink
Expose storage states on the metrics endpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
magik6k committed Oct 1, 2021
1 parent 95e8b59 commit c2ae434
Show file tree
Hide file tree
Showing 7 changed files with 105 additions and 9 deletions.
3 changes: 2 additions & 1 deletion cmd/lotus-seal-worker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/filecoin-project/lotus/lib/lotuslog"
"github.com/filecoin-project/lotus/lib/rpcenc"
"github.com/filecoin-project/lotus/metrics"
"github.com/filecoin-project/lotus/metrics/proxy"
"github.com/filecoin-project/lotus/node/modules"
"github.com/filecoin-project/lotus/node/repo"
)
Expand Down Expand Up @@ -409,7 +410,7 @@ var runCmd = &cli.Command{

readerHandler, readerServerOpt := rpcenc.ReaderParamDecoder()
rpcServer := jsonrpc.NewServer(readerServerOpt)
rpcServer.Register("Filecoin", api.PermissionedWorkerAPI(metrics.MetricedWorkerAPI(workerApi)))
rpcServer.Register("Filecoin", api.PermissionedWorkerAPI(proxy.MetricedWorkerAPI(workerApi)))

mux.Handle("/rpc/v0", rpcServer)
mux.Handle("/rpc/streams/v0/push/{uuid}", readerHandler)
Expand Down
3 changes: 2 additions & 1 deletion cmd/lotus-wallet/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
lcli "github.com/filecoin-project/lotus/cli"
"github.com/filecoin-project/lotus/lib/lotuslog"
"github.com/filecoin-project/lotus/metrics"
"github.com/filecoin-project/lotus/metrics/proxy"
"github.com/filecoin-project/lotus/node/modules"
"github.com/filecoin-project/lotus/node/repo"
)
Expand Down Expand Up @@ -204,7 +205,7 @@ var runCmd = &cli.Command{
w = &LoggedWallet{under: w}
}

rpcApi := metrics.MetricedWalletAPI(w)
rpcApi := proxy.MetricedWalletAPI(w)
if !cctx.Bool("disable-auth") {
rpcApi = api.PermissionedWalletAPI(rpcApi)
}
Expand Down
22 changes: 22 additions & 0 deletions extern/sector-storage/stores/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,22 @@ import (
"context"
"errors"
"fmt"
"go.opencensus.io/tag"
"net/url"
gopath "path"
"sort"
"sync"
"time"

"go.opencensus.io/stats"
"golang.org/x/xerrors"

"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/big"

"github.com/filecoin-project/lotus/extern/sector-storage/fsutil"
"github.com/filecoin-project/lotus/extern/sector-storage/storiface"
"github.com/filecoin-project/lotus/metrics"
)

var HeartbeatInterval = 10 * time.Second
Expand Down Expand Up @@ -192,6 +195,25 @@ func (i *Index) StorageReportHealth(ctx context.Context, id ID, report HealthRep
}
ent.lastHeartbeat = time.Now()

if report.Stat.Capacity > 0 {
ctx, _ = tag.New(ctx, tag.Upsert(metrics.StorageID, string(id)))

stats.Record(ctx, metrics.StorageFSAvailable.M(float64(report.Stat.FSAvailable)/float64(report.Stat.Capacity)))
stats.Record(ctx, metrics.StorageAvailable.M(float64(report.Stat.Available)/float64(report.Stat.Capacity)))
stats.Record(ctx, metrics.StorageReserved.M(float64(report.Stat.Reserved)/float64(report.Stat.Capacity)))

stats.Record(ctx, metrics.StorageCapacityBytes.M(report.Stat.Capacity))
stats.Record(ctx, metrics.StorageFSAvailableBytes.M(report.Stat.FSAvailable))
stats.Record(ctx, metrics.StorageAvailableBytes.M(report.Stat.Available))
stats.Record(ctx, metrics.StorageReservedBytes.M(report.Stat.Reserved))

if report.Stat.Max > 0 {
stats.Record(ctx, metrics.StorageLimitUsed.M(float64(report.Stat.Used)/float64(report.Stat.Max)))
stats.Record(ctx, metrics.StorageLimitUsedBytes.M(report.Stat.Used))
stats.Record(ctx, metrics.StorageLimitMaxBytes.M(report.Stat.Max))
}
}

return nil
}

Expand Down
4 changes: 2 additions & 2 deletions gateway/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/api/v0api"
"github.com/filecoin-project/lotus/api/v1api"
"github.com/filecoin-project/lotus/metrics"
"github.com/filecoin-project/lotus/metrics/proxy"
"github.com/gorilla/mux"
promclient "github.com/prometheus/client_golang/prometheus"
)
Expand All @@ -23,7 +23,7 @@ func Handler(a api.Gateway, opts ...jsonrpc.ServerOption) (http.Handler, error)
m.Handle(path, rpcServer)
}

ma := metrics.MetricedGatewayAPI(a)
ma := proxy.MetricedGatewayAPI(a)

serveRpc("/rpc/v1", ma)
serveRpc("/rpc/v0", api.Wrap(new(v1api.FullNodeStruct), new(v0api.WrapperV1Full), ma))
Expand Down
70 changes: 70 additions & 0 deletions metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ var (
// miner
TaskType, _ = tag.NewKey("task_type")
WorkerHostname, _ = tag.NewKey("worker_hostname")
StorageID, _ = tag.NewKey("storage_id")
)

// Measures
Expand Down Expand Up @@ -97,6 +98,17 @@ var (
WorkerCallsReturnedDuration = stats.Float64("sealing/worker_calls_returned_ms", "Counter of returned worker tasks", stats.UnitMilliseconds)
WorkerUntrackedCallsReturned = stats.Int64("sealing/worker_untracked_calls_returned", "Counter of returned untracked worker tasks", stats.UnitDimensionless)

StorageFSAvailable = stats.Float64("storage/path_fs_available_frac", "Fraction of filesystem available storage", stats.UnitDimensionless)
StorageAvailable = stats.Float64("storage/path_available_frac", "Fraction of available storage", stats.UnitDimensionless)
StorageReserved = stats.Float64("storage/path_reserved_frac", "Fraction of reserved storage", stats.UnitDimensionless)
StorageLimitUsed = stats.Float64("storage/path_limit_used_frac", "Fraction of used optional storage limit", stats.UnitDimensionless)
StorageCapacityBytes = stats.Int64("storage/path_capacity_bytes", "storage path capacity", stats.UnitBytes)
StorageFSAvailableBytes = stats.Int64("storage/path_fs_available_bytes", "filesystem available storage bytes", stats.UnitBytes)
StorageAvailableBytes = stats.Int64("storage/path_available_bytes", "available storage bytes", stats.UnitBytes)
StorageReservedBytes = stats.Int64("storage/path_reserved_bytes", "reserved storage bytes", stats.UnitBytes)
StorageLimitUsedBytes = stats.Int64("storage/path_limit_used_bytes", "used optional storage limit bytes", stats.UnitBytes)
StorageLimitMaxBytes = stats.Int64("storage/path_limit_max_bytes", "optional storage limit", stats.UnitBytes)

// splitstore
SplitstoreMiss = stats.Int64("splitstore/miss", "Number of misses in hotstre access", stats.UnitDimensionless)
SplitstoreCompactionTimeSeconds = stats.Float64("splitstore/compaction_time", "Compaction time in seconds", stats.UnitSeconds)
Expand Down Expand Up @@ -296,6 +308,56 @@ var (
Aggregation: workMillisecondsDistribution,
TagKeys: []tag.Key{TaskType, WorkerHostname},
}
StorageFSAvailableView = &view.View{
Measure: StorageFSAvailable,
Aggregation: view.LastValue(),
TagKeys: []tag.Key{StorageID},
}
StorageAvailableView = &view.View{
Measure: StorageAvailable,
Aggregation: view.LastValue(),
TagKeys: []tag.Key{StorageID},
}
StorageReservedView = &view.View{
Measure: StorageReserved,
Aggregation: view.LastValue(),
TagKeys: []tag.Key{StorageID},
}
StorageLimitUsedView = &view.View{
Measure: StorageLimitUsed,
Aggregation: view.LastValue(),
TagKeys: []tag.Key{StorageID},
}
StorageCapacityBytesView = &view.View{
Measure: StorageCapacityBytes,
Aggregation: view.LastValue(),
TagKeys: []tag.Key{StorageID},
}
StorageFSAvailableBytesView = &view.View{
Measure: StorageFSAvailableBytes,
Aggregation: view.LastValue(),
TagKeys: []tag.Key{StorageID},
}
StorageAvailableBytesView = &view.View{
Measure: StorageAvailableBytes,
Aggregation: view.LastValue(),
TagKeys: []tag.Key{StorageID},
}
StorageReservedBytesView = &view.View{
Measure: StorageReservedBytes,
Aggregation: view.LastValue(),
TagKeys: []tag.Key{StorageID},
}
StorageLimitUsedBytesView = &view.View{
Measure: StorageLimitUsedBytes,
Aggregation: view.LastValue(),
TagKeys: []tag.Key{StorageID},
}
StorageLimitMaxBytesView = &view.View{
Measure: StorageLimitMaxBytes,
Aggregation: view.LastValue(),
TagKeys: []tag.Key{StorageID},
}

// splitstore
SplitstoreMissView = &view.View{
Expand Down Expand Up @@ -379,6 +441,14 @@ var MinerNodeViews = append([]*view.View{
WorkerCallsReturnedCountView,
WorkerUntrackedCallsReturnedView,
WorkerCallsReturnedDurationView,
StorageFSAvailableView,
StorageAvailableView,
StorageReservedView,
StorageLimitUsedView,
StorageFSAvailableBytesView,
StorageAvailableBytesView,
StorageReservedBytesView,
StorageLimitUsedBytesView,
}, DefaultViews...)

// SinceInMilliseconds returns the duration of time since the provide time as a float64.
Expand Down
7 changes: 4 additions & 3 deletions metrics/proxy.go → metrics/proxy/proxy.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package metrics
package proxy

import (
"context"
Expand All @@ -7,6 +7,7 @@ import (
"go.opencensus.io/tag"

"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/metrics"
)

func MetricedStorMinerAPI(a api.StorageMiner) api.StorageMiner {
Expand Down Expand Up @@ -52,8 +53,8 @@ func proxy(in interface{}, outstr interface{}) {
rint.Field(f).Set(reflect.MakeFunc(field.Type, func(args []reflect.Value) (results []reflect.Value) {
ctx := args[0].Interface().(context.Context)
// upsert function name into context
ctx, _ = tag.New(ctx, tag.Upsert(Endpoint, field.Name))
stop := Timer(ctx, APIRequestDuration)
ctx, _ = tag.New(ctx, tag.Upsert(metrics.Endpoint, field.Name))
stop := metrics.Timer(ctx, metrics.APIRequestDuration)
defer stop()
// pass tagged ctx back into function call
args[0] = reflect.ValueOf(ctx)
Expand Down
5 changes: 3 additions & 2 deletions node/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/filecoin-project/lotus/api/v1api"
"github.com/filecoin-project/lotus/lib/rpcenc"
"github.com/filecoin-project/lotus/metrics"
"github.com/filecoin-project/lotus/metrics/proxy"
"github.com/filecoin-project/lotus/node/impl"
)

Expand Down Expand Up @@ -78,7 +79,7 @@ func FullNodeHandler(a v1api.FullNode, permissioned bool, opts ...jsonrpc.Server
m.Handle(path, handler)
}

fnapi := metrics.MetricedFullAPI(a)
fnapi := proxy.MetricedFullAPI(a)
if permissioned {
fnapi = api.PermissionedFullAPI(fnapi)
}
Expand Down Expand Up @@ -113,7 +114,7 @@ func FullNodeHandler(a v1api.FullNode, permissioned bool, opts ...jsonrpc.Server
func MinerHandler(a api.StorageMiner, permissioned bool) (http.Handler, error) {
m := mux.NewRouter()

mapi := metrics.MetricedStorMinerAPI(a)
mapi := proxy.MetricedStorMinerAPI(a)
if permissioned {
mapi = api.PermissionedStorMinerAPI(mapi)
}
Expand Down

0 comments on commit c2ae434

Please sign in to comment.