Skip to content
This repository has been archived by the owner on Mar 15, 2024. It is now read-only.

Record metrics to prometheus #25

Merged
merged 7 commits into from
Mar 13, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions cmd/recorder/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,15 @@ package main
import (
"context"
"flag"
"net"
"net/http"
"os"
"os/signal"

"github.com/filecoin-project/lassie-event-recorder/eventrecorder"
"github.com/filecoin-project/lassie-event-recorder/metrics"
"github.com/ipfs/go-log/v2"
"github.com/prometheus/client_golang/prometheus/promhttp"
)

var logger = log.Logger("lassie/event_recorder/cmd")
Expand All @@ -18,6 +22,8 @@ func main() {
httpListenAddr := flag.String("httpListenAddr", "0.0.0.0:8080", "The HTTP server listen address in address:port format.")
dbDSN := flag.String("dbDSN", "", "The database Data Source Name. Alternatively, it may be specified via LASSIE_EVENT_RECORDER_DB_DSN environment variable. If both are present, the environment variable takes precedence.")
logLevel := flag.String("logLevel", "info", "The logging level. Only applied if GOLOG_LOG_LEVEL environment variable is unset.")
metricsListenAddr := flag.String("metricsListenAddr", "0.0.0.0:7777", "The metrics server listen address in address:port format.")

flag.Parse()

if _, set := os.LookupEnv("GOLOG_LOG_LEVEL"); !set {
Expand All @@ -28,15 +34,38 @@ func main() {
dbDSN = &v
}

metricsMux := http.NewServeMux()
metricsMux.Handle("/metrics", promhttp.Handler())

metricsServer := &http.Server{
Addr: *metricsListenAddr,
Handler: metricsMux,
TLSConfig: nil,
}

metrics := metrics.New()

r, err := eventrecorder.New(
eventrecorder.WithHttpServerListenAddr(*httpListenAddr),
eventrecorder.WithDatabaseDSN(*dbDSN),
eventrecorder.WithMetrics(metrics),
)

if err != nil {
logger.Fatalw("Failed to instantiate recorder", "err", err)
}

ctx := context.Background()

if err = metrics.Start(); err != nil {
logger.Fatalw("Failed to start metrics", "err", err)
}
ln, err := net.Listen("tcp", metricsServer.Addr)
if err != nil {
logger.Fatalw("Failed to start listening on metrics addr", "err", err)
}
go func() { _ = metricsServer.Serve(ln) }()

if err = r.Start(ctx); err != nil {
logger.Fatalw("Failed to start recorder", "err", err)
}
Expand Down
10 changes: 10 additions & 0 deletions eventrecorder/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"time"

"github.com/filecoin-project/lassie-event-recorder/metrics"
"github.com/jackc/pgx/v5/pgxpool"
)

Expand All @@ -20,6 +21,8 @@ type (
dbDSN string
// pgxPoolConfig is instantiated by parsing config.dbDSN.
pgxPoolConfig *pgxpool.Config

metrics *metrics.Metrics
}
option func(*config) error
)
Expand Down Expand Up @@ -62,3 +65,10 @@ func WithDatabaseDSN(url string) option {
return nil
}
}

func WithMetrics(metrics *metrics.Metrics) option {
return func(cfg *config) error {
cfg.metrics = metrics
return nil
}
}
108 changes: 24 additions & 84 deletions eventrecorder/recorder.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"net/http"
"strings"

"github.com/filecoin-project/lassie-event-recorder/metrics"
"github.com/filecoin-project/lassie/pkg/types"
"github.com/ipfs/go-log/v2"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgconn"
Expand All @@ -17,9 +19,10 @@ import (
var logger = log.Logger("lassie/eventrecorder")

type EventRecorder struct {
cfg *config
server *http.Server
db *pgxpool.Pool
cfg *config
server *http.Server
db *pgxpool.Pool
metrics *metrics.Metrics
}

func New(opts ...option) (*EventRecorder, error) {
Expand Down Expand Up @@ -66,7 +69,6 @@ func (r *EventRecorder) httpServerMux() *http.ServeMux {
mux := http.NewServeMux()
mux.HandleFunc("/v1/retrieval-events", r.handleRetrievalEvents)
mux.HandleFunc("/ready", r.handleReady)
mux.HandleFunc("/v1/summarize-and-clear", r.handleStats)
return mux
}

Expand Down Expand Up @@ -147,87 +149,25 @@ func (r *EventRecorder) handleRetrievalEvents(res http.ResponseWriter, req *http
logger.Errorw("At least one retrieval event insertion failed", "err", err)
return
}
logger.Infow("Successfully submitted batch event insertion")
}

type EventSummary struct {
TotalAttempts uint64 `json:"totalAttempts"`
AttemptedBitswap uint64 `json:"attemptedBitswap"`
AttemptedGraphSync uint64 `json:"attemptedGraphSync"`
AttemptedBoth uint64 `json:"attemptedBoth"`
AttemptedEither uint64 `json:"attemptedEither"`
BitswapSuccesses uint64 `json:"bitswapSuccesses"`
GraphSyncSuccesses uint64 `json:"graphSyncSuccesses"`
AvgBandwidth *float64 `json:"avgBandwidth"`
FirstByte []float64 `json:"firstByte"`
DownloadSize []float64 `json:"downloadSize"`
GraphsyncAttemptsPastQuery uint64 `json:"graphsyncAttemptsPastQuery"`
}

func (r *EventRecorder) handleStats(res http.ResponseWriter, req *http.Request) {
logger := logger.With("method", req.Method, "path", req.URL.Path)
if req.Method != http.MethodGet {
res.Header().Add("Allow", http.MethodGet)
http.Error(res, "", http.StatusMethodNotAllowed)
logger.Warn("Rejected disallowed method")
return
}

ctx := req.Context()
runQuery := `
select count(all_attempts.retrieval_id) as total_attempts,
count(bitswap_retrievals.retrieval_id) as attempted_bitswap,
count(graphsync_retrievals.retrieval_id) as attempted_graphsync,
sum(case when bitswap_retrievals.retrieval_id IS NOT NULL and graphsync_retrievals.retrieval_id IS NOT NULL then 1 else 0 end) as attempted_both,
sum(case when bitswap_retrievals.retrieval_id IS NOT NULL or graphsync_retrievals.retrieval_id IS NOT NULL then 1 else 0 end) as attempted_either,
sum(case when successful_retrievals.storage_provider_id = 'Bitswap' then 1 else 0 end) as bitswap_successes,
sum(case when successful_retrievals.storage_provider_id <> 'Bitswap' and successful_retrievals.retrieval_id IS NOT NULL then 1 else 0 end) as graphsync_successes,
case when extract('epoch' from sum(successful_retrievals.event_time - first_byte_retrievals.event_time)) = 0 then 0 else sum(successful_retrievals.received_size)::float / extract('epoch' from sum(successful_retrievals.event_time - first_byte_retrievals.event_time))::float end as avg_bandwidth,
percentile_cont('{0.5, 0.9, 0.95}'::double precision[]) WITHIN GROUP (ORDER BY (extract ('epoch' from first_byte_retrievals.event_time - all_attempts.event_time))) as p50_p90_p95_first_byte,
percentile_cont('{0.5, 0.9, 0.95}'::double precision[]) WITHIN GROUP (ORDER BY (successful_retrievals.received_size)) as p50_p90_p95_download_size,
count(graphsync_retrieval_attempts.retrieval_id) as graphsync_retrieval_attempts_past_query
from (
select distinct on (retrieval_id) retrieval_id, event_time from retrieval_events order by retrieval_id, event_time
) as all_attempts left join (
select distinct retrieval_id from retrieval_events where storage_provider_id = 'Bitswap'
) as bitswap_retrievals on all_attempts.retrieval_id = bitswap_retrievals.retrieval_id left join (
select distinct retrieval_id from retrieval_events where storage_provider_id <> 'Bitswap' and phase <> 'indexer'
) as graphsync_retrievals on graphsync_retrievals.retrieval_id = all_attempts.retrieval_id left join (
select distinct on (retrieval_id) retrieval_id, event_time, storage_provider_id, (event_details -> 'receivedSize')::int8 as received_size from retrieval_events where event_name = 'success' order by retrieval_id, event_time
) as successful_retrievals on successful_retrievals.retrieval_id = all_attempts.retrieval_id left join (
select retrieval_id, event_time, storage_provider_id from retrieval_events where event_name = 'first-byte-received'
) as first_byte_retrievals on successful_retrievals.retrieval_id = first_byte_retrievals.retrieval_id and successful_retrievals.storage_provider_id = first_byte_retrievals.storage_provider_id left join (
select distinct retrieval_id from retrieval_events where storage_provider_id <> 'Bitswap' and phase = 'retrieval'
) as graphsync_retrieval_attempts on graphsync_retrievals.retrieval_id = graphsync_retrieval_attempts.retrieval_id
`

row := r.db.QueryRow(ctx, runQuery)
var summary EventSummary
err := row.Scan(&summary.TotalAttempts,
&summary.AttemptedBitswap,
&summary.AttemptedGraphSync,
&summary.AttemptedBoth,
&summary.AttemptedEither,
&summary.BitswapSuccesses,
&summary.GraphSyncSuccesses,
&summary.AvgBandwidth,
&summary.FirstByte,
&summary.DownloadSize,
&summary.GraphsyncAttemptsPastQuery)
if err != nil {
http.Error(res, err.Error(), http.StatusInternalServerError)
logger.Errorw("Failure to execute query", "err", err)
return
}
err = json.NewEncoder(res).Encode(summary)
if err != nil {
http.Error(res, err.Error(), http.StatusInternalServerError)
logger.Errorw("failed encoding result", "err", err)
return
if r.metrics != nil {
for _, event := range batch.Events {
switch event.EventName {
case types.StartedCode:
r.metrics.HandleStartedEvent(ctx, event.RetrievalId, event.Phase, event.EventTime, event.StorageProviderId)
case types.CandidatesFoundCode:
r.metrics.HandleCandidatesFoundEvent(ctx, event.RetrievalId, event.EventTime, event.EventDetails)
case types.CandidatesFilteredCode:
r.metrics.HandleCandidatesFilteredEvent(ctx, event.RetrievalId, event.EventDetails)
case types.FailedCode:
r.metrics.HandleFailureEvent(ctx, event.RetrievalId, event.Phase, event.EventDetails)
case types.FirstByteCode:
r.metrics.HandleTimeToFirstByteEvent(ctx, event.RetrievalId, event.EventTime)
case types.SuccessCode:
r.metrics.HandleSuccessEvent(ctx, event.RetrievalId, event.EventTime, event.StorageProviderId, event.EventDetails)
}
}
}
hannahhoward marked this conversation as resolved.
Show resolved Hide resolved

r.db.Exec(ctx, "TRUNCATE TABLE retrieval_events")
logger.Infow("Successfully ran summary and cleared DB")
logger.Infow("Successfully submitted batch event insertion")
}

func (r *EventRecorder) handleReady(res http.ResponseWriter, req *http.Request) {
hannahhoward marked this conversation as resolved.
Show resolved Hide resolved
Expand Down
15 changes: 0 additions & 15 deletions eventrecorder/recorder_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,19 +124,4 @@ func TestPostEvent(t *testing.T) {
require.Equal(t, wantEvent.EventTime.UnixMicro(), e.EventTime.UnixMicro())

require.False(t, rows.Next())

// verify we are able to fetch a summary and decode, and use this to clear the DB
resp, err = http.Get("http://localhost:8080/v1/summarize-and-clear")
require.NoError(t, err)
require.Equal(t, resp.StatusCode, 200)

var summary eventrecorder.EventSummary
err = json.NewDecoder(resp.Body).Decode(&summary)
require.NoError(t, err)

require.Equal(t, summary.TotalAttempts, uint64(1))
require.Equal(t, summary.AttemptedBitswap, uint64(0))
require.Equal(t, summary.AttemptedGraphSync, uint64(1))
require.Equal(t, summary.AttemptedBoth, uint64(0))
require.Equal(t, summary.AttemptedEither, uint64(1))
}
20 changes: 20 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,16 @@ require (
github.com/ipfs/go-log/v2 v2.5.1
github.com/jackc/pgx/v5 v5.3.1
github.com/stretchr/testify v1.8.2
go.opentelemetry.io/otel/exporters/prometheus v0.37.0
go.opentelemetry.io/otel/metric v0.37.0
go.opentelemetry.io/otel/sdk v1.14.0
go.opentelemetry.io/otel/sdk/metric v0.37.0
)

require (
github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/cpuguy83/go-md2man/v2 v2.0.2 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.1.0 // indirect
Expand All @@ -24,7 +31,10 @@ require (
github.com/filecoin-project/go-state-types v0.10.0 // indirect
github.com/filecoin-project/go-statemachine v1.0.3 // indirect
github.com/filecoin-project/go-statestore v0.2.0 // indirect
github.com/go-logr/logr v1.2.3 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/hannahhoward/cbor-gen-for v0.0.0-20230214144701-5d17c9d5243c // indirect
github.com/hannahhoward/go-pubsub v1.0.0 // indirect
Expand All @@ -50,12 +60,16 @@ require (
github.com/jackc/puddle/v2 v2.2.0 // indirect
github.com/jbenet/goprocess v0.1.4 // indirect
github.com/jpillora/backoff v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/cpuid/v2 v2.2.4 // indirect
github.com/libp2p/go-buffer-pool v0.1.0 // indirect
github.com/libp2p/go-libp2p v0.26.1 // indirect
github.com/mattn/go-isatty v0.0.17 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
github.com/minio/blake2b-simd v0.0.0-20160723061019-3f5f724cb5b1 // indirect
github.com/minio/sha256-simd v1.0.0 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/mr-tron/base58 v1.2.0 // indirect
github.com/multiformats/go-base32 v0.1.0 // indirect
github.com/multiformats/go-base36 v0.2.0 // indirect
Expand All @@ -68,11 +82,17 @@ require (
github.com/opentracing/opentracing-go v1.2.0 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/polydawn/refmt v0.89.0 // indirect
github.com/prometheus/client_golang v1.14.0 // indirect
github.com/prometheus/client_model v0.3.0 // indirect
github.com/prometheus/common v0.40.0 // indirect
github.com/prometheus/procfs v0.9.0 // indirect
github.com/russross/blackfriday/v2 v2.1.0 // indirect
github.com/spaolacci/murmur3 v1.1.0 // indirect
github.com/urfave/cli/v2 v2.24.4 // indirect
github.com/whyrusleeping/cbor-gen v0.0.0-20230126041949-52956bd4c9aa // indirect
github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 // indirect
go.opentelemetry.io/otel v1.14.0 // indirect
go.opentelemetry.io/otel/trace v1.14.0 // indirect
go.uber.org/atomic v1.10.0 // indirect
go.uber.org/multierr v1.9.0 // indirect
go.uber.org/zap v1.24.0 // indirect
Expand Down
Loading