Skip to content
This repository has been archived by the owner on Mar 15, 2024. It is now read-only.

Commit

Permalink
feat(eventrecorder): record metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
hannahhoward committed Mar 8, 2023
1 parent 1dff0fa commit 570ded3
Show file tree
Hide file tree
Showing 4 changed files with 565 additions and 96 deletions.
191 changes: 112 additions & 79 deletions eventrecorder/recorder.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,13 @@ import (
"net/http"
"strings"

"github.com/filecoin-project/lassie/pkg/metrics"
"github.com/filecoin-project/lassie/pkg/types"
"github.com/ipfs/go-log/v2"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgconn"
"github.com/jackc/pgx/v5/pgxpool"
"go.opencensus.io/stats"
)

var logger = log.Logger("lassie/eventrecorder")
Expand Down Expand Up @@ -66,7 +69,8 @@ func (r *EventRecorder) httpServerMux() *http.ServeMux {
mux := http.NewServeMux()
mux.HandleFunc("/v1/retrieval-events", r.handleRetrievalEvents)
mux.HandleFunc("/ready", r.handleReady)
mux.HandleFunc("/v1/summarize-and-clear", r.handleStats)
// register prometheus metrics
mux.Handle("/metrics", metrics.NewExporter())
return mux
}

Expand Down Expand Up @@ -147,99 +151,128 @@ func (r *EventRecorder) handleRetrievalEvents(res http.ResponseWriter, req *http
logger.Errorw("At least one retrieval event insertion failed", "err", err)
return
}
for _, event := range batch.Events {
recordMetrics(event)
}
logger.Infow("Successfully submitted batch event insertion")
}

type EventSummary struct {
TotalAttempts uint64 `json:"totalAttempts"`
AttemptedBitswap uint64 `json:"attemptedBitswap"`
AttemptedGraphSync uint64 `json:"attemptedGraphSync"`
AttemptedBoth uint64 `json:"attemptedBoth"`
AttemptedEither uint64 `json:"attemptedEither"`
BitswapSuccesses uint64 `json:"bitswapSuccesses"`
GraphSyncSuccesses uint64 `json:"graphSyncSuccesses"`
AvgBandwidth *float64 `json:"avgBandwidth"`
FirstByte []float64 `json:"firstByte"`
DownloadSize []float64 `json:"downloadSize"`
GraphsyncAttemptsPastQuery uint64 `json:"graphsyncAttemptsPastQuery"`
}

func (r *EventRecorder) handleStats(res http.ResponseWriter, req *http.Request) {
logger := logger.With("method", req.Method, "path", req.URL.Path)
if req.Method != http.MethodGet {
func (r *EventRecorder) handleReady(res http.ResponseWriter, req *http.Request) {
switch req.Method {
case http.MethodGet:
// TODO: ping DB as part of readiness check?
res.Header().Add("Allow", http.MethodGet)
default:
http.Error(res, "", http.StatusMethodNotAllowed)
logger.Warn("Rejected disallowed method")
return
}
}

ctx := req.Context()
runQuery := `
select count(all_attempts.retrieval_id) as total_attempts,
count(bitswap_retrievals.retrieval_id) as attempted_bitswap,
count(graphsync_retrievals.retrieval_id) as attempted_graphsync,
sum(case when bitswap_retrievals.retrieval_id IS NOT NULL and graphsync_retrievals.retrieval_id IS NOT NULL then 1 else 0 end) as attempted_both,
sum(case when bitswap_retrievals.retrieval_id IS NOT NULL or graphsync_retrievals.retrieval_id IS NOT NULL then 1 else 0 end) as attempted_either,
sum(case when successful_retrievals.storage_provider_id = 'Bitswap' then 1 else 0 end) as bitswap_successes,
sum(case when successful_retrievals.storage_provider_id <> 'Bitswap' and successful_retrievals.retrieval_id IS NOT NULL then 1 else 0 end) as graphsync_successes,
case when extract('epoch' from sum(successful_retrievals.event_time - first_byte_retrievals.event_time)) = 0 then 0 else sum(successful_retrievals.received_size)::float / extract('epoch' from sum(successful_retrievals.event_time - first_byte_retrievals.event_time))::float end as avg_bandwidth,
percentile_cont('{0.5, 0.9, 0.95}'::double precision[]) WITHIN GROUP (ORDER BY (extract ('epoch' from first_byte_retrievals.event_time - all_attempts.event_time))) as p50_p90_p95_first_byte,
percentile_cont('{0.5, 0.9, 0.95}'::double precision[]) WITHIN GROUP (ORDER BY (successful_retrievals.received_size)) as p50_p90_p95_download_size,
count(graphsync_retrieval_attempts.retrieval_id) as graphsync_retrieval_attempts_past_query
from (
select distinct on (retrieval_id) retrieval_id, event_time from retrieval_events order by retrieval_id, event_time
) as all_attempts left join (
select distinct retrieval_id from retrieval_events where storage_provider_id = 'Bitswap'
) as bitswap_retrievals on all_attempts.retrieval_id = bitswap_retrievals.retrieval_id left join (
select distinct retrieval_id from retrieval_events where storage_provider_id <> 'Bitswap' and phase <> 'indexer'
) as graphsync_retrievals on graphsync_retrievals.retrieval_id = all_attempts.retrieval_id left join (
select distinct on (retrieval_id) retrieval_id, event_time, storage_provider_id, (event_details -> 'receivedSize')::int8 as received_size from retrieval_events where event_name = 'success' order by retrieval_id, event_time
) as successful_retrievals on successful_retrievals.retrieval_id = all_attempts.retrieval_id left join (
select retrieval_id, event_time, storage_provider_id from retrieval_events where event_name = 'first-byte-received'
) as first_byte_retrievals on successful_retrievals.retrieval_id = first_byte_retrievals.retrieval_id and successful_retrievals.storage_provider_id = first_byte_retrievals.storage_provider_id left join (
select distinct retrieval_id from retrieval_events where storage_provider_id <> 'Bitswap' and phase = 'retrieval'
) as graphsync_retrieval_attempts on graphsync_retrievals.retrieval_id = graphsync_retrieval_attempts.retrieval_id
`

row := r.db.QueryRow(ctx, runQuery)
var summary EventSummary
err := row.Scan(&summary.TotalAttempts,
&summary.AttemptedBitswap,
&summary.AttemptedGraphSync,
&summary.AttemptedBoth,
&summary.AttemptedEither,
&summary.BitswapSuccesses,
&summary.GraphSyncSuccesses,
&summary.AvgBandwidth,
&summary.FirstByte,
&summary.DownloadSize,
&summary.GraphsyncAttemptsPastQuery)
if err != nil {
http.Error(res, err.Error(), http.StatusInternalServerError)
logger.Errorw("Failure to execute query", "err", err)
func (r *EventRecorder) Shutdown(ctx context.Context) error {
return r.server.Shutdown(ctx)
}

// Implement RetrievalSubscriber
func recordMetrics(event Event) {
switch event.EventName {
case types.CandidatesFoundCode:
handleCandidatesFoundEvent(event)
case types.CandidatesFilteredCode:
handleCandidatesFilteredEvent(event)
case types.StartedCode:
handleStartedEvent(event)
case types.FailedCode:
handleFailureEvent(event)
case types.QueryAskedCode: // query-ask success
handleQueryAskEvent()
case types.QueryAskedFilteredCode:
handleQueryAskFilteredEvent()
}
}

func handleQueryAskFilteredEvent() {
stats.Record(context.Background(), metrics.RequestWithSuccessfulQueriesFilteredCount.M(1))
}

func handleQueryAskEvent() {
stats.Record(context.Background(), metrics.RequestWithSuccessfulQueriesCount.M(1))
}

// handleFailureEvent is called when a query _or_ retrieval fails
func handleFailureEvent(event Event) {

detailsObj, ok := event.EventDetails.(map[string]interface{})
if !ok {
return
}
err = json.NewEncoder(res).Encode(summary)
if err != nil {
http.Error(res, err.Error(), http.StatusInternalServerError)
logger.Errorw("failed encoding result", "err", err)
msg, ok := detailsObj["Error"].(string)
if !ok {
return
}
switch event.Phase {
case types.QueryPhase:
var matched bool
for substr, metric := range metrics.QueryErrorMetricMatches {
if strings.Contains(msg, substr) {
stats.Record(context.Background(), metric.M(1))
matched = true
break
}
}
if !matched {
stats.Record(context.Background(), metrics.QueryErrorOtherCount.M(1))
}
case types.RetrievalPhase:
stats.Record(context.Background(), metrics.RetrievalDealFailCount.M(1))
stats.Record(context.Background(), metrics.RetrievalDealActiveCount.M(-1))

r.db.Exec(ctx, "TRUNCATE TABLE retrieval_events")
logger.Infow("Successfully ran summary and cleared DB")
var matched bool
for substr, metric := range metrics.ErrorMetricMatches {
if strings.Contains(msg, substr) {
stats.Record(context.Background(), metric.M(1))
matched = true
break
}
}
if !matched {
stats.Record(context.Background(), metrics.RetrievalErrorOtherCount.M(1))
}
}
}

func (r *EventRecorder) handleReady(res http.ResponseWriter, req *http.Request) {
switch req.Method {
case http.MethodGet:
// TODO: ping DB as part of readiness check?
res.Header().Add("Allow", http.MethodGet)
default:
http.Error(res, "", http.StatusMethodNotAllowed)
func handleStartedEvent(event Event) {
if event.Phase == types.RetrievalPhase {
stats.Record(context.Background(), metrics.RetrievalRequestCount.M(1))
stats.Record(context.Background(), metrics.RetrievalDealActiveCount.M(1))
}
}

func (r *EventRecorder) Shutdown(ctx context.Context) error {
return r.server.Shutdown(ctx)
func handleCandidatesFilteredEvent(event Event) {
detailsObj, ok := event.EventDetails.(map[string]interface{})
if !ok {
return
}

candidateCount, ok := detailsObj["CandidateCount"].(float64)
if !ok {
return
}
if candidateCount > 0 {
stats.Record(context.Background(), metrics.RequestWithIndexerCandidatesFilteredCount.M(1))
}
}

func handleCandidatesFoundEvent(event Event) {
detailsObj, ok := event.EventDetails.(map[string]interface{})
if !ok {
return
}

candidateCount, ok := detailsObj["CandidateCount"].(float64)
if !ok {
return
}
if candidateCount > 0 {
stats.Record(context.Background(), metrics.RequestWithIndexerCandidatesCount.M(1))
}
stats.Record(context.Background(), metrics.IndexerCandidatesPerRequestCount.M(int64(candidateCount)))
}
15 changes: 0 additions & 15 deletions eventrecorder/recorder_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,19 +124,4 @@ func TestPostEvent(t *testing.T) {
require.Equal(t, wantEvent.EventTime.UnixMicro(), e.EventTime.UnixMicro())

require.False(t, rows.Next())

// verify we are able to fetch a summary and decode, and use this to clear the DB
resp, err = http.Get("http://localhost:8080/v1/summarize-and-clear")
require.NoError(t, err)
require.Equal(t, resp.StatusCode, 200)

var summary eventrecorder.EventSummary
err = json.NewDecoder(resp.Body).Decode(&summary)
require.NoError(t, err)

require.Equal(t, summary.TotalAttempts, uint64(1))
require.Equal(t, summary.AttemptedBitswap, uint64(0))
require.Equal(t, summary.AttemptedGraphSync, uint64(1))
require.Equal(t, summary.AttemptedBoth, uint64(0))
require.Equal(t, summary.AttemptedEither, uint64(1))
}
18 changes: 18 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,13 @@ require (
github.com/ipfs/go-log/v2 v2.5.1
github.com/jackc/pgx/v5 v5.3.1
github.com/stretchr/testify v1.8.2
go.opencensus.io v0.24.0
)

require (
contrib.go.opencensus.io/exporter/prometheus v0.4.2 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/cpuguy83/go-md2man/v2 v2.0.2 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.1.0 // indirect
Expand All @@ -24,7 +28,11 @@ require (
github.com/filecoin-project/go-state-types v0.10.0 // indirect
github.com/filecoin-project/go-statemachine v1.0.3 // indirect
github.com/filecoin-project/go-statestore v0.2.0 // indirect
github.com/go-kit/log v0.2.1 // indirect
github.com/go-logfmt/logfmt v0.6.0 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/hannahhoward/cbor-gen-for v0.0.0-20230214144701-5d17c9d5243c // indirect
github.com/hannahhoward/go-pubsub v1.0.0 // indirect
Expand All @@ -50,12 +58,16 @@ require (
github.com/jackc/puddle/v2 v2.2.0 // indirect
github.com/jbenet/goprocess v0.1.4 // indirect
github.com/jpillora/backoff v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/cpuid/v2 v2.2.4 // indirect
github.com/libp2p/go-buffer-pool v0.1.0 // indirect
github.com/libp2p/go-libp2p v0.26.1 // indirect
github.com/mattn/go-isatty v0.0.17 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
github.com/minio/blake2b-simd v0.0.0-20160723061019-3f5f724cb5b1 // indirect
github.com/minio/sha256-simd v1.0.0 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/mr-tron/base58 v1.2.0 // indirect
github.com/multiformats/go-base32 v0.1.0 // indirect
github.com/multiformats/go-base36 v0.2.0 // indirect
Expand All @@ -68,6 +80,11 @@ require (
github.com/opentracing/opentracing-go v1.2.0 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/polydawn/refmt v0.89.0 // indirect
github.com/prometheus/client_golang v1.14.0 // indirect
github.com/prometheus/client_model v0.3.0 // indirect
github.com/prometheus/common v0.40.0 // indirect
github.com/prometheus/procfs v0.9.0 // indirect
github.com/prometheus/statsd_exporter v0.23.0 // indirect
github.com/russross/blackfriday/v2 v2.1.0 // indirect
github.com/spaolacci/murmur3 v1.1.0 // indirect
github.com/urfave/cli/v2 v2.24.4 // indirect
Expand All @@ -84,6 +101,7 @@ require (
golang.org/x/tools v0.6.0 // indirect
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
google.golang.org/protobuf v1.28.1 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
lukechampine.com/blake3 v1.1.7 // indirect
)
Loading

0 comments on commit 570ded3

Please sign in to comment.