Skip to content
This repository has been archived by the owner on Mar 15, 2024. It is now read-only.

Record metrics to prometheus #25

Merged
merged 7 commits into from
Mar 13, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
191 changes: 112 additions & 79 deletions eventrecorder/recorder.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,13 @@ import (
"net/http"
"strings"

"github.com/filecoin-project/lassie/pkg/metrics"
hannahhoward marked this conversation as resolved.
Show resolved Hide resolved
"github.com/filecoin-project/lassie/pkg/types"
"github.com/ipfs/go-log/v2"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgconn"
"github.com/jackc/pgx/v5/pgxpool"
"go.opencensus.io/stats"
)

var logger = log.Logger("lassie/eventrecorder")
Expand Down Expand Up @@ -66,7 +69,8 @@ func (r *EventRecorder) httpServerMux() *http.ServeMux {
mux := http.NewServeMux()
mux.HandleFunc("/v1/retrieval-events", r.handleRetrievalEvents)
mux.HandleFunc("/ready", r.handleReady)
mux.HandleFunc("/v1/summarize-and-clear", r.handleStats)
// register prometheus metrics
mux.Handle("/metrics", metrics.NewExporter())
hannahhoward marked this conversation as resolved.
Show resolved Hide resolved
return mux
}

Expand Down Expand Up @@ -147,99 +151,128 @@ func (r *EventRecorder) handleRetrievalEvents(res http.ResponseWriter, req *http
logger.Errorw("At least one retrieval event insertion failed", "err", err)
return
}
for _, event := range batch.Events {
recordMetrics(event)
}
hannahhoward marked this conversation as resolved.
Show resolved Hide resolved
logger.Infow("Successfully submitted batch event insertion")
}

type EventSummary struct {
TotalAttempts uint64 `json:"totalAttempts"`
AttemptedBitswap uint64 `json:"attemptedBitswap"`
AttemptedGraphSync uint64 `json:"attemptedGraphSync"`
AttemptedBoth uint64 `json:"attemptedBoth"`
AttemptedEither uint64 `json:"attemptedEither"`
BitswapSuccesses uint64 `json:"bitswapSuccesses"`
GraphSyncSuccesses uint64 `json:"graphSyncSuccesses"`
AvgBandwidth *float64 `json:"avgBandwidth"`
FirstByte []float64 `json:"firstByte"`
DownloadSize []float64 `json:"downloadSize"`
GraphsyncAttemptsPastQuery uint64 `json:"graphsyncAttemptsPastQuery"`
}

func (r *EventRecorder) handleStats(res http.ResponseWriter, req *http.Request) {
logger := logger.With("method", req.Method, "path", req.URL.Path)
if req.Method != http.MethodGet {
func (r *EventRecorder) handleReady(res http.ResponseWriter, req *http.Request) {
hannahhoward marked this conversation as resolved.
Show resolved Hide resolved
switch req.Method {
case http.MethodGet:
// TODO: ping DB as part of readiness check?
res.Header().Add("Allow", http.MethodGet)
default:
http.Error(res, "", http.StatusMethodNotAllowed)
logger.Warn("Rejected disallowed method")
return
}
}

ctx := req.Context()
runQuery := `
select count(all_attempts.retrieval_id) as total_attempts,
count(bitswap_retrievals.retrieval_id) as attempted_bitswap,
count(graphsync_retrievals.retrieval_id) as attempted_graphsync,
sum(case when bitswap_retrievals.retrieval_id IS NOT NULL and graphsync_retrievals.retrieval_id IS NOT NULL then 1 else 0 end) as attempted_both,
sum(case when bitswap_retrievals.retrieval_id IS NOT NULL or graphsync_retrievals.retrieval_id IS NOT NULL then 1 else 0 end) as attempted_either,
sum(case when successful_retrievals.storage_provider_id = 'Bitswap' then 1 else 0 end) as bitswap_successes,
sum(case when successful_retrievals.storage_provider_id <> 'Bitswap' and successful_retrievals.retrieval_id IS NOT NULL then 1 else 0 end) as graphsync_successes,
case when extract('epoch' from sum(successful_retrievals.event_time - first_byte_retrievals.event_time)) = 0 then 0 else sum(successful_retrievals.received_size)::float / extract('epoch' from sum(successful_retrievals.event_time - first_byte_retrievals.event_time))::float end as avg_bandwidth,
percentile_cont('{0.5, 0.9, 0.95}'::double precision[]) WITHIN GROUP (ORDER BY (extract ('epoch' from first_byte_retrievals.event_time - all_attempts.event_time))) as p50_p90_p95_first_byte,
percentile_cont('{0.5, 0.9, 0.95}'::double precision[]) WITHIN GROUP (ORDER BY (successful_retrievals.received_size)) as p50_p90_p95_download_size,
count(graphsync_retrieval_attempts.retrieval_id) as graphsync_retrieval_attempts_past_query
from (
select distinct on (retrieval_id) retrieval_id, event_time from retrieval_events order by retrieval_id, event_time
) as all_attempts left join (
select distinct retrieval_id from retrieval_events where storage_provider_id = 'Bitswap'
) as bitswap_retrievals on all_attempts.retrieval_id = bitswap_retrievals.retrieval_id left join (
select distinct retrieval_id from retrieval_events where storage_provider_id <> 'Bitswap' and phase <> 'indexer'
) as graphsync_retrievals on graphsync_retrievals.retrieval_id = all_attempts.retrieval_id left join (
select distinct on (retrieval_id) retrieval_id, event_time, storage_provider_id, (event_details -> 'receivedSize')::int8 as received_size from retrieval_events where event_name = 'success' order by retrieval_id, event_time
) as successful_retrievals on successful_retrievals.retrieval_id = all_attempts.retrieval_id left join (
select retrieval_id, event_time, storage_provider_id from retrieval_events where event_name = 'first-byte-received'
) as first_byte_retrievals on successful_retrievals.retrieval_id = first_byte_retrievals.retrieval_id and successful_retrievals.storage_provider_id = first_byte_retrievals.storage_provider_id left join (
select distinct retrieval_id from retrieval_events where storage_provider_id <> 'Bitswap' and phase = 'retrieval'
) as graphsync_retrieval_attempts on graphsync_retrievals.retrieval_id = graphsync_retrieval_attempts.retrieval_id
`

row := r.db.QueryRow(ctx, runQuery)
var summary EventSummary
err := row.Scan(&summary.TotalAttempts,
&summary.AttemptedBitswap,
&summary.AttemptedGraphSync,
&summary.AttemptedBoth,
&summary.AttemptedEither,
&summary.BitswapSuccesses,
&summary.GraphSyncSuccesses,
&summary.AvgBandwidth,
&summary.FirstByte,
&summary.DownloadSize,
&summary.GraphsyncAttemptsPastQuery)
if err != nil {
http.Error(res, err.Error(), http.StatusInternalServerError)
logger.Errorw("Failure to execute query", "err", err)
func (r *EventRecorder) Shutdown(ctx context.Context) error {
return r.server.Shutdown(ctx)
}

// Implement RetrievalSubscriber
func recordMetrics(event Event) {
switch event.EventName {
case types.CandidatesFoundCode:
handleCandidatesFoundEvent(event)
case types.CandidatesFilteredCode:
handleCandidatesFilteredEvent(event)
case types.StartedCode:
handleStartedEvent(event)
case types.FailedCode:
handleFailureEvent(event)
case types.QueryAskedCode: // query-ask success
handleQueryAskEvent()
case types.QueryAskedFilteredCode:
handleQueryAskFilteredEvent()
}
Copy link
Member

@masih masih Mar 9, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

log warning on default case. This would be useful to detect missing cases when lassie types are updated but this code is not.

}

func handleQueryAskFilteredEvent() {
stats.Record(context.Background(), metrics.RequestWithSuccessfulQueriesFilteredCount.M(1))
}

func handleQueryAskEvent() {
stats.Record(context.Background(), metrics.RequestWithSuccessfulQueriesCount.M(1))
}

// handleFailureEvent is called when a query _or_ retrieval fails
func handleFailureEvent(event Event) {

detailsObj, ok := event.EventDetails.(map[string]interface{})
if !ok {
return
}
err = json.NewEncoder(res).Encode(summary)
if err != nil {
http.Error(res, err.Error(), http.StatusInternalServerError)
logger.Errorw("failed encoding result", "err", err)
msg, ok := detailsObj["Error"].(string)
if !ok {
return
}
switch event.Phase {
case types.QueryPhase:
var matched bool
for substr, metric := range metrics.QueryErrorMetricMatches {
if strings.Contains(msg, substr) {
stats.Record(context.Background(), metric.M(1))
matched = true
break
}
}
if !matched {
stats.Record(context.Background(), metrics.QueryErrorOtherCount.M(1))
}
case types.RetrievalPhase:
stats.Record(context.Background(), metrics.RetrievalDealFailCount.M(1))
stats.Record(context.Background(), metrics.RetrievalDealActiveCount.M(-1))

r.db.Exec(ctx, "TRUNCATE TABLE retrieval_events")
logger.Infow("Successfully ran summary and cleared DB")
var matched bool
for substr, metric := range metrics.ErrorMetricMatches {
if strings.Contains(msg, substr) {
stats.Record(context.Background(), metric.M(1))
matched = true
break
hannahhoward marked this conversation as resolved.
Show resolved Hide resolved
}
}
if !matched {
stats.Record(context.Background(), metrics.RetrievalErrorOtherCount.M(1))
}
}
}

func (r *EventRecorder) handleReady(res http.ResponseWriter, req *http.Request) {
switch req.Method {
case http.MethodGet:
// TODO: ping DB as part of readiness check?
res.Header().Add("Allow", http.MethodGet)
default:
http.Error(res, "", http.StatusMethodNotAllowed)
func handleStartedEvent(event Event) {
if event.Phase == types.RetrievalPhase {
stats.Record(context.Background(), metrics.RetrievalRequestCount.M(1))
stats.Record(context.Background(), metrics.RetrievalDealActiveCount.M(1))
}
}

func (r *EventRecorder) Shutdown(ctx context.Context) error {
return r.server.Shutdown(ctx)
func handleCandidatesFilteredEvent(event Event) {
detailsObj, ok := event.EventDetails.(map[string]interface{})
if !ok {
return
}

candidateCount, ok := detailsObj["CandidateCount"].(float64)
if !ok {
return
}
if candidateCount > 0 {
stats.Record(context.Background(), metrics.RequestWithIndexerCandidatesFilteredCount.M(1))
}
}

func handleCandidatesFoundEvent(event Event) {
detailsObj, ok := event.EventDetails.(map[string]interface{})
if !ok {
return
}

candidateCount, ok := detailsObj["CandidateCount"].(float64)
if !ok {
return
}
if candidateCount > 0 {
stats.Record(context.Background(), metrics.RequestWithIndexerCandidatesCount.M(1))
}
stats.Record(context.Background(), metrics.IndexerCandidatesPerRequestCount.M(int64(candidateCount)))
masih marked this conversation as resolved.
Show resolved Hide resolved
}
15 changes: 0 additions & 15 deletions eventrecorder/recorder_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,19 +124,4 @@ func TestPostEvent(t *testing.T) {
require.Equal(t, wantEvent.EventTime.UnixMicro(), e.EventTime.UnixMicro())

require.False(t, rows.Next())

// verify we are able to fetch a summary and decode, and use this to clear the DB
resp, err = http.Get("http://localhost:8080/v1/summarize-and-clear")
require.NoError(t, err)
require.Equal(t, resp.StatusCode, 200)

var summary eventrecorder.EventSummary
err = json.NewDecoder(resp.Body).Decode(&summary)
require.NoError(t, err)

require.Equal(t, summary.TotalAttempts, uint64(1))
require.Equal(t, summary.AttemptedBitswap, uint64(0))
require.Equal(t, summary.AttemptedGraphSync, uint64(1))
require.Equal(t, summary.AttemptedBoth, uint64(0))
require.Equal(t, summary.AttemptedEither, uint64(1))
}
18 changes: 18 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,13 @@ require (
github.com/ipfs/go-log/v2 v2.5.1
github.com/jackc/pgx/v5 v5.3.1
github.com/stretchr/testify v1.8.2
go.opencensus.io v0.24.0
)

require (
contrib.go.opencensus.io/exporter/prometheus v0.4.2 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/cpuguy83/go-md2man/v2 v2.0.2 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.1.0 // indirect
Expand All @@ -24,7 +28,11 @@ require (
github.com/filecoin-project/go-state-types v0.10.0 // indirect
github.com/filecoin-project/go-statemachine v1.0.3 // indirect
github.com/filecoin-project/go-statestore v0.2.0 // indirect
github.com/go-kit/log v0.2.1 // indirect
github.com/go-logfmt/logfmt v0.6.0 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/hannahhoward/cbor-gen-for v0.0.0-20230214144701-5d17c9d5243c // indirect
github.com/hannahhoward/go-pubsub v1.0.0 // indirect
Expand All @@ -50,12 +58,16 @@ require (
github.com/jackc/puddle/v2 v2.2.0 // indirect
github.com/jbenet/goprocess v0.1.4 // indirect
github.com/jpillora/backoff v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/cpuid/v2 v2.2.4 // indirect
github.com/libp2p/go-buffer-pool v0.1.0 // indirect
github.com/libp2p/go-libp2p v0.26.1 // indirect
github.com/mattn/go-isatty v0.0.17 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
github.com/minio/blake2b-simd v0.0.0-20160723061019-3f5f724cb5b1 // indirect
github.com/minio/sha256-simd v1.0.0 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/mr-tron/base58 v1.2.0 // indirect
github.com/multiformats/go-base32 v0.1.0 // indirect
github.com/multiformats/go-base36 v0.2.0 // indirect
Expand All @@ -68,6 +80,11 @@ require (
github.com/opentracing/opentracing-go v1.2.0 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/polydawn/refmt v0.89.0 // indirect
github.com/prometheus/client_golang v1.14.0 // indirect
github.com/prometheus/client_model v0.3.0 // indirect
github.com/prometheus/common v0.40.0 // indirect
github.com/prometheus/procfs v0.9.0 // indirect
github.com/prometheus/statsd_exporter v0.23.0 // indirect
github.com/russross/blackfriday/v2 v2.1.0 // indirect
github.com/spaolacci/murmur3 v1.1.0 // indirect
github.com/urfave/cli/v2 v2.24.4 // indirect
Expand All @@ -84,6 +101,7 @@ require (
golang.org/x/tools v0.6.0 // indirect
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
google.golang.org/protobuf v1.28.1 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
lukechampine.com/blake3 v1.1.7 // indirect
)
Loading