diff --git a/pkg/server/http/ipfs.go b/pkg/server/http/ipfs.go index dd0e005e..f11a6841 100644 --- a/pkg/server/http/ipfs.go +++ b/pkg/server/http/ipfs.go @@ -5,9 +5,7 @@ import ( "fmt" "net/http" "strconv" - "time" - "github.com/filecoin-project/lassie/pkg/events" lassie "github.com/filecoin-project/lassie/pkg/lassie" "github.com/filecoin-project/lassie/pkg/retriever" "github.com/filecoin-project/lassie/pkg/storage" @@ -16,7 +14,6 @@ import ( "github.com/ipld/go-ipld-prime/datamodel" cidlink "github.com/ipld/go-ipld-prime/linking/cid" "github.com/libp2p/go-libp2p/core/peer" - servertiming "github.com/mitchellh/go-server-timing" "github.com/multiformats/go-multicodec" ) @@ -187,41 +184,8 @@ func ipfsHandler(lassie *lassie.Lassie, cfg HttpServerConfig) func(http.Response } // servertiming metrics - startTimeMap := make(map[string]time.Time) - logger.Debugw("fetching CID", "retrievalId", retrievalId, "CID", rootCid.String(), "path", path.String(), "dagScope", dagScope) - stats, err := lassie.Fetch(req.Context(), request, func(re types.RetrievalEvent) { - header := servertiming.FromContext(req.Context()) - if header == nil { - return - } - - var metricName string - switch re.(type) { - case events.StartedFindingCandidatesEvent: - header.NewMetric("indexer") - startTimeMap["indexer"] = re.Time() - - case events.StartedRetrievalEvent: - header.NewMetric("retrieval") - startTimeMap["retrieval"] = re.Time() - metricName = "indexer" - - case events.FinishedEvent: - metricName = "retrieval" - } - - // Set the metric duration - header.Lock() - if header.Metrics != nil { - for _, m := range header.Metrics { - if metricName != "" && m.Name == metricName { - m.Duration = re.Time().Sub(startTimeMap[metricName]) - } - } - } - header.Unlock() - }) + stats, err := lassie.Fetch(req.Context(), request, servertimingsSubscriber(req)) // force all blocks to flush if cerr := carWriter.Close(); cerr != nil { diff --git a/pkg/server/http/servertimingssubscriber.go b/pkg/server/http/servertimingssubscriber.go new file mode 100644 index 00000000..beeb5cb3 --- /dev/null +++ b/pkg/server/http/servertimingssubscriber.go @@ -0,0 +1,111 @@ +package httpserver + +import ( + "fmt" + "net/http" + "strconv" + "sync" + "time" + + "github.com/filecoin-project/lassie/pkg/events" + "github.com/filecoin-project/lassie/pkg/types" + servertiming "github.com/mitchellh/go-server-timing" +) + +// servertimingsSubscriber is a retrieval event subscriber that records +// RetrievalEvents and generates a Server-Timing header on an http request. +// The "dur" field is the duration since the "started-fetch" event in milliseconds. +// The extra fields are the other events that took place and their durations in +// nanoseconds since the "started-fetch" event. +// +// We are unable to get the duration of the entire fetch or successful retrievals +// due to the way in which the headers are written. Since the headers are written +// before an http `Write` occurs, we can only collect info about the retrievals +// until a `first-byte-received` event that results in data being written to the client. +// The http `Write` ends up occurring before the success and finished events are emitted, +// therefore cutting off the trailing events that occur for any given retrieval. +// Because of this, the `started-fetch`, `success`, and `finished` events are not processed. +// +// Additionally, we use the `dur` field to record the time since the `started-fetch` event +// instead of the duration of the event itself. We do this to render something in the browser +// since the "dur" field is the only field rendered. +func servertimingsSubscriber(req *http.Request) types.RetrievalEventSubscriber { + var fetchStartTime time.Time + + var candidateFindingMetric *servertiming.Metric + var candidateFindingStartTime time.Time + + mapLock := sync.Mutex{} + retrievalMetricMap := make(map[string]*servertiming.Metric) + retrievalTimingMap := make(map[string]time.Time) + + return func(re types.RetrievalEvent) { + timing := servertiming.FromContext(req.Context()) + if timing == nil { + return + } + + switch event := re.(type) { + case events.StartedFetchEvent: + fetchStartTime = re.Time() + + // Candidate finding cases + case events.StartedFindingCandidatesEvent: + candidateFindingMetric = timing.NewMetric(string(re.Code())) + candidateFindingMetric.Extra = make(map[string]string) + candidateFindingMetric.Extra["dur"] = formatDuration(re.Time().Sub(fetchStartTime)) + candidateFindingStartTime = re.Time() + case events.CandidatesFoundEvent: + candidateFindingMetric.Duration = re.Time().Sub(candidateFindingStartTime) + candidateFindingMetric.Extra[string(re.Code())] = fmt.Sprintf("%d", re.Time().Sub(candidateFindingStartTime)) + case events.CandidatesFilteredEvent: + candidateFindingMetric.Duration = re.Time().Sub(candidateFindingStartTime) + candidateFindingMetric.Extra[string(re.Code())] = fmt.Sprintf("%d", re.Time().Sub(candidateFindingStartTime)) + + // Retrieval cases + case events.StartedRetrievalEvent: + name := fmt.Sprintf("retrieval-%s", events.Identifier(re)) + retrievalMetric := timing.NewMetric(name) + retrievalMetric.Extra = make(map[string]string) + // We're using "dur" here to render the time since the "started-fetch" in the browser + retrievalMetric.Extra["dur"] = formatDuration(re.Time().Sub(fetchStartTime)) + retrievalMetricMap[name] = retrievalMetric + retrievalTimingMap[name] = re.Time() + case events.FirstByteEvent: + name := fmt.Sprintf("retrieval-%s", events.Identifier(event)) + mapLock.Lock() + if retrievalMetric, ok := retrievalMetricMap[name]; ok { + retrievalMetric.Extra[string(re.Code())] = fmt.Sprintf("%d", re.Time().Sub(retrievalTimingMap[name])) + } + mapLock.Unlock() + case events.FailedRetrievalEvent: + name := fmt.Sprintf("retrieval-%s", events.Identifier(re)) + mapLock.Lock() + if retrievalMetric, ok := retrievalMetricMap[name]; ok { + retrievalMetric.Extra[string(re.Code())] = fmt.Sprintf("%d", re.Time().Sub(retrievalTimingMap[name])) + retrievalMetric.Duration = re.Time().Sub(retrievalTimingMap[name]) + } + mapLock.Unlock() + + // Due to the timing in which the Server-Timing header is written, + // the success and finished events are never emitted in time to make it into the header. + case events.SucceededEvent: + case events.FinishedEvent: + + default: + if event, ok := re.(events.EventWithProviderID); ok { + name := fmt.Sprintf("retrieval-%s", events.Identifier(event)) + mapLock.Lock() + if retrievalMetric, ok := retrievalMetricMap[name]; ok { + retrievalMetric.Extra[string(re.Code())] = fmt.Sprintf("%d", re.Time().Sub(retrievalTimingMap[name])) + } + mapLock.Unlock() + } + } + } +} + +// formatDuration formats a duration in milliseconds in the same way the go-server-timing library does. +func formatDuration(d time.Duration) string { + return strconv.FormatFloat(float64(d)/float64(time.Millisecond), 'f', -1, 64) +}