Skip to content

Commit

Permalink
feat: extract server timings and measure individual retrievals
Browse files Browse the repository at this point in the history
  • Loading branch information
kylehuntsman committed Jun 28, 2023
1 parent 8fba48d commit 1d22d1f
Show file tree
Hide file tree
Showing 2 changed files with 112 additions and 37 deletions.
38 changes: 1 addition & 37 deletions pkg/server/http/ipfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,7 @@ import (
"fmt"
"net/http"
"strconv"
"time"

"github.com/filecoin-project/lassie/pkg/events"
lassie "github.com/filecoin-project/lassie/pkg/lassie"
"github.com/filecoin-project/lassie/pkg/retriever"
"github.com/filecoin-project/lassie/pkg/storage"
Expand All @@ -16,7 +14,6 @@ import (
"github.com/ipld/go-ipld-prime/datamodel"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
"github.com/libp2p/go-libp2p/core/peer"
servertiming "github.com/mitchellh/go-server-timing"
"github.com/multiformats/go-multicodec"
)

Expand Down Expand Up @@ -187,41 +184,8 @@ func ipfsHandler(lassie *lassie.Lassie, cfg HttpServerConfig) func(http.Response
}

// servertiming metrics
startTimeMap := make(map[string]time.Time)

logger.Debugw("fetching CID", "retrievalId", retrievalId, "CID", rootCid.String(), "path", path.String(), "dagScope", dagScope)
stats, err := lassie.Fetch(req.Context(), request, func(re types.RetrievalEvent) {
header := servertiming.FromContext(req.Context())
if header == nil {
return
}

var metricName string
switch re.(type) {
case events.StartedFindingCandidatesEvent:
header.NewMetric("indexer")
startTimeMap["indexer"] = re.Time()

case events.StartedRetrievalEvent:
header.NewMetric("retrieval")
startTimeMap["retrieval"] = re.Time()
metricName = "indexer"

case events.FinishedEvent:
metricName = "retrieval"
}

// Set the metric duration
header.Lock()
if header.Metrics != nil {
for _, m := range header.Metrics {
if metricName != "" && m.Name == metricName {
m.Duration = re.Time().Sub(startTimeMap[metricName])
}
}
}
header.Unlock()
})
stats, err := lassie.Fetch(req.Context(), request, servertimingsSubscriber(req))

// force all blocks to flush
if cerr := carWriter.Close(); cerr != nil {
Expand Down
111 changes: 111 additions & 0 deletions pkg/server/http/servertimingssubscriber.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
package httpserver

import (
"fmt"
"net/http"
"strconv"
"sync"
"time"

"github.com/filecoin-project/lassie/pkg/events"
"github.com/filecoin-project/lassie/pkg/types"
servertiming "github.com/mitchellh/go-server-timing"
)

// servertimingsSubscriber is a retrieval event subscriber that records
// RetrievalEvents and generates a Server-Timing header on an http request.
// The "dur" field is the duration since the "started-fetch" event in milliseconds.
// The extra fields are the other events that took place and their durations in
// nanoseconds since the "started-fetch" event.
//
// We are unable to get the duration of the entire fetch or successful retrievals
// due to the way in which the headers are written. Since the headers are written
// before an http `Write` occurs, we can only collect info about the retrievals
// until a `first-byte-received` event that results in data being written to the client.
// The http `Write` ends up occurring before the success and finished events are emitted,
// therefore cutting off the trailing events that occur for any given retrieval.
// Because of this, the `started-fetch`, `success`, and `finished` events are not processed.
//
// Additionally, we use the `dur` field to record the time since the `started-fetch` event
// instead of the duration of the event itself. We do this to render something in the browser
// since the "dur" field is the only field rendered.
func servertimingsSubscriber(req *http.Request) types.RetrievalEventSubscriber {
var fetchStartTime time.Time

var candidateFindingMetric *servertiming.Metric
var candidateFindingStartTime time.Time

mapLock := sync.Mutex{}
retrievalMetricMap := make(map[string]*servertiming.Metric)
retrievalTimingMap := make(map[string]time.Time)

return func(re types.RetrievalEvent) {
timing := servertiming.FromContext(req.Context())
if timing == nil {
return
}

switch event := re.(type) {
case events.StartedFetchEvent:
fetchStartTime = re.Time()

// Candidate finding cases
case events.StartedFindingCandidatesEvent:
candidateFindingMetric = timing.NewMetric(string(re.Code()))
candidateFindingMetric.Extra = make(map[string]string)
candidateFindingMetric.Extra["dur"] = formatDuration(re.Time().Sub(fetchStartTime))
candidateFindingStartTime = re.Time()
case events.CandidatesFoundEvent:
candidateFindingMetric.Duration = re.Time().Sub(candidateFindingStartTime)
candidateFindingMetric.Extra[string(re.Code())] = fmt.Sprintf("%d", re.Time().Sub(candidateFindingStartTime))
case events.CandidatesFilteredEvent:
candidateFindingMetric.Duration = re.Time().Sub(candidateFindingStartTime)
candidateFindingMetric.Extra[string(re.Code())] = fmt.Sprintf("%d", re.Time().Sub(candidateFindingStartTime))

// Retrieval cases
case events.StartedRetrievalEvent:
name := fmt.Sprintf("retrieval-%s", events.Identifier(re))
retrievalMetric := timing.NewMetric(name)
retrievalMetric.Extra = make(map[string]string)
// We're using "dur" here to render the time since the "started-fetch" in the browser
retrievalMetric.Extra["dur"] = formatDuration(re.Time().Sub(fetchStartTime))
retrievalMetricMap[name] = retrievalMetric
retrievalTimingMap[name] = re.Time()
case events.FirstByteEvent:
name := fmt.Sprintf("retrieval-%s", events.Identifier(event))
mapLock.Lock()
if retrievalMetric, ok := retrievalMetricMap[name]; ok {
retrievalMetric.Extra[string(re.Code())] = fmt.Sprintf("%d", re.Time().Sub(retrievalTimingMap[name]))
}
mapLock.Unlock()
case events.FailedRetrievalEvent:
name := fmt.Sprintf("retrieval-%s", events.Identifier(re))
mapLock.Lock()
if retrievalMetric, ok := retrievalMetricMap[name]; ok {
retrievalMetric.Extra[string(re.Code())] = fmt.Sprintf("%d", re.Time().Sub(retrievalTimingMap[name]))
retrievalMetric.Duration = re.Time().Sub(retrievalTimingMap[name])
}
mapLock.Unlock()

// Due to the timing in which the Server-Timing header is written,
// the success and finished events are never emitted in time to make it into the header.
case events.SucceededEvent:
case events.FinishedEvent:

default:
if event, ok := re.(events.EventWithProviderID); ok {
name := fmt.Sprintf("retrieval-%s", events.Identifier(event))
mapLock.Lock()
if retrievalMetric, ok := retrievalMetricMap[name]; ok {
retrievalMetric.Extra[string(re.Code())] = fmt.Sprintf("%d", re.Time().Sub(retrievalTimingMap[name]))
}
mapLock.Unlock()
}
}
}
}

// formatDuration formats a duration in milliseconds in the same way the go-server-timing library does.
func formatDuration(d time.Duration) string {
return strconv.FormatFloat(float64(d)/float64(time.Millisecond), 'f', -1, 64)
}

0 comments on commit 1d22d1f

Please sign in to comment.