From 695ef1d45c29c19c4ad0bdb1538d055dbdb85757 Mon Sep 17 00:00:00 2001 From: Will May Date: Mon, 27 Feb 2023 15:14:01 +0000 Subject: [PATCH] Expose metrics for libp2p using OpenTelemetry (#2074) libp2p _used_ to support metrics using OpenCensus, but this was recently changed to use Prometheus instead - https://github.com/libp2p/go-libp2p/issues/1955. Unfortunately, it is extremely difficult to get Prometheus metrics into OpenTelemetry without running the external OTEL agent. This re-implements the same metrics using OpenTelemetry using the _new_ Prometheus names rather than the old OpenCensus naming. Fixes #2059 --- go.mod | 1 - go.sum | 2 - pkg/libp2p/rcmgr/defaults.go | 54 +++-- pkg/libp2p/rcmgr/metrics.go | 373 +++++++++++++++++++++++++++++++ pkg/libp2p/rcmgr/metrics_test.go | 132 +++++++++++ pkg/telemetry/metrics.go | 3 - 6 files changed, 531 insertions(+), 34 deletions(-) create mode 100644 pkg/libp2p/rcmgr/metrics.go create mode 100644 pkg/libp2p/rcmgr/metrics_test.go diff --git a/go.mod b/go.mod index 0b59b36948..1d9a575d27 100644 --- a/go.mod +++ b/go.mod @@ -74,7 +74,6 @@ require ( github.com/xeipuuv/gojsonschema v1.2.0 go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.39.0 go.opentelemetry.io/otel v1.13.0 - go.opentelemetry.io/otel/bridge/opencensus v0.36.0 go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v0.35.0 go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v0.35.0 go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.13.0 diff --git a/go.sum b/go.sum index a3205dbcab..217f774460 100644 --- a/go.sum +++ b/go.sum @@ -2331,8 +2331,6 @@ go.opentelemetry.io/otel v1.3.0/go.mod h1:PWIKzi6JCp7sM0k9yZ43VX+T345uNbAkDKwHVj go.opentelemetry.io/otel v1.7.0/go.mod h1:5BdUoMIz5WEs0vt0CUEMtSSaTSHBBVwrhnz7+nrD5xk= go.opentelemetry.io/otel v1.13.0 h1:1ZAKnNQKwBBxFtww/GwxNUyTf0AxkZzrukO8MeXqe4Y= go.opentelemetry.io/otel v1.13.0/go.mod h1:FH3RtdZCzRkJYFTCsAKDy9l/XYjMdNv6QrkFFB8DvVg= -go.opentelemetry.io/otel/bridge/opencensus v0.36.0 h1:/RJRbdVBP+Yoh6tx8zu0L+b1KE/iGmo3adr5LUvCGWA= -go.opentelemetry.io/otel/bridge/opencensus v0.36.0/go.mod h1:otgxgSPzGNRPjDYJUrQwqGu4MXRngDPEbFxuVl355Fk= go.opentelemetry.io/otel/exporters/jaeger v1.7.0 h1:wXgjiRldljksZkZrldGVe6XrG9u3kYDyQmkZwmm5dI0= go.opentelemetry.io/otel/exporters/jaeger v1.7.0/go.mod h1:PwQAOqBgqbLQRKlj466DuD2qyMjbtcPpfPfj+AqbSBs= go.opentelemetry.io/otel/exporters/otlp v0.20.0/go.mod h1:YIieizyaN77rtLJra0buKiNBOm9XQfkPEKBeuhoMwAM= diff --git a/pkg/libp2p/rcmgr/defaults.go b/pkg/libp2p/rcmgr/defaults.go index 6777fdadf0..5bb49294ee 100644 --- a/pkg/libp2p/rcmgr/defaults.go +++ b/pkg/libp2p/rcmgr/defaults.go @@ -4,10 +4,11 @@ import ( "github.com/bacalhau-project/bacalhau/pkg/transport/bprotocol" "github.com/libp2p/go-libp2p" libp2p_rcmgr "github.com/libp2p/go-libp2p/p2p/host/resource-manager" - "github.com/libp2p/go-libp2p/p2p/host/resource-manager/obs" + "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/metric/global" ) -func SetDefaultServiceLimits(config *libp2p_rcmgr.ScalingLimitConfig) { +func setDefaultServiceLimits(config *libp2p_rcmgr.ScalingLimitConfig) { // Requester -> Compute nodes // reasoning behind these limits: // - Requester nodes should have a high number of outbound streams to compute nodes @@ -41,33 +42,30 @@ func SetDefaultServiceLimits(config *libp2p_rcmgr.ScalingLimitConfig) { ) } -var DefaultResourceManager = func(cfg *libp2p.Config) error { - // Default memory limit: 1/8th of total memory, minimum 128MB, maximum 1GB - limits := libp2p_rcmgr.DefaultLimits - libp2p.SetDefaultServiceLimits(&limits) - SetDefaultServiceLimits(&limits) +func DefaultResourceManager(cfg *libp2p.Config) error { + return resourceManagerWithMetricsProvider(global.MeterProvider())(cfg) +} - // TODO: libp2p removed open census and replaced with prometheus metrics. But we only use - // opentelemetry. Somebody needs to figure out how to map or convert the prom metrics to - // oltp metrics. - // // Hook up the trace reporter metrics. This will expose all opencensus - // // stats via the default prometheus registry. See https://opencensus.io/exporters/supported-exporters/go/prometheus/ for other options. - // err := view.Register(obs.DefaultViews...) - // if err != nil { - // log.Warn().Err(err).Msg("failed to register resource manager metrics") - // } +func resourceManagerWithMetricsProvider(provider metric.MeterProvider) func(cfg *libp2p.Config) error { + return func(cfg *libp2p.Config) error { + // Default memory limit: 1/8th of total memory, minimum 128MB, maximum 1GB + limits := libp2p_rcmgr.DefaultLimits + libp2p.SetDefaultServiceLimits(&limits) + setDefaultServiceLimits(&limits) - str, err := obs.NewStatsTraceReporter() - if err != nil { - return err - } - mgr, err := libp2p_rcmgr.NewResourceManager( - libp2p_rcmgr.NewFixedLimiter(limits.AutoScale()), - libp2p_rcmgr.WithTraceReporter(str), - ) - if err != nil { - return err - } + metricReporter, err := metricsReporter(provider) + if err != nil { + return err + } + + mgr, err := libp2p_rcmgr.NewResourceManager( + libp2p_rcmgr.NewFixedLimiter(limits.AutoScale()), + libp2p_rcmgr.WithTraceReporter(metricReporter), + ) + if err != nil { + return err + } - return cfg.Apply(libp2p.ResourceManager(mgr)) + return cfg.Apply(libp2p.ResourceManager(mgr)) + } } diff --git a/pkg/libp2p/rcmgr/metrics.go b/pkg/libp2p/rcmgr/metrics.go new file mode 100644 index 0000000000..820d92163a --- /dev/null +++ b/pkg/libp2p/rcmgr/metrics.go @@ -0,0 +1,373 @@ +package rcmgr + +import ( + "context" + "strings" + + rcmgr "github.com/libp2p/go-libp2p/p2p/host/resource-manager" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/metric/instrument" +) + +// metricsReporter will build create a rcmgr.TraceReporter similar to the obs.StatsTraceReporter but with OpenTelemetry +// support rather than Prometheus. +func metricsReporter(meterProvider metric.MeterProvider) (rcmgr.TraceReporter, error) { //nolint:funlen + meter := meterProvider.Meter("libp2p") + + connections, err := meter.Float64Counter( + "rcmgr_connections", + instrument.WithDescription("Number of Connections"), + ) + if err != nil { + return nil, err + } + peerConnections, err := meter.Float64Histogram( + "rcmgr_peer_connections", + instrument.WithDescription("Number of connections this peer has"), + ) + if err != nil { + return nil, err + } + previousPeerConnections, err := meter.Float64Histogram( + "rcmgr_previous_peer_connections", + instrument.WithDescription("Number of connections this peer previously had. "+ + "This is used to get the current connection number per peer histogram by subtracting this from the peer_connections histogram"), + ) + if err != nil { + return nil, err + } + + streams, err := meter.Float64Counter( + "rcmgr_streams", + instrument.WithDescription("Number of Streams"), + ) + if err != nil { + return nil, err + } + peerStreams, err := meter.Float64Histogram( + "rcmgr_peer_streams", + instrument.WithDescription("Number of streams this peer has"), + ) + if err != nil { + return nil, err + } + previousPeerStreams, err := meter.Float64Histogram( + "rcmgr_previous_peer_streams", + instrument.WithDescription("Number of streams this peer has"), + ) + if err != nil { + return nil, err + } + + memory, err := meter.Float64Counter( + "rcmgr_memory", + instrument.WithDescription("Amount of memory reserved as reported to the Resource Manager"), + ) + if err != nil { + return nil, err + } + peerMemory, err := meter.Float64Histogram( + "rcmgr_peer_memory", + instrument.WithDescription("How many peers have reserved this bucket of memory, as reported to the Resource Manager"), + ) + if err != nil { + return nil, err + } + previousPeerMemory, err := meter.Float64Histogram( + "rcmgr_previous_peer_memory", + instrument.WithDescription("How many peers have previously reserved this bucket of memory, as reported to the Resource Manager"), + ) + if err != nil { + return nil, err + } + + connectionMemory, err := meter.Float64Histogram( + "rcmgr_connection_memory", + instrument.WithDescription("How many connections have reserved this bucket of memory, as reported to the Resource Manager"), + ) + if err != nil { + return nil, err + } + previousConnectionMemory, err := meter.Float64Histogram( + "rcmgr_previous_connection_memory", + instrument.WithDescription("How many connections have previously reserved this bucket of memory, as reported to the Resource Manager"), + ) + if err != nil { + return nil, err + } + + fileDescriptors, err := meter.Float64Counter( + "rcmgr_fds", + instrument.WithDescription("Number of file descriptors reserved as reported to the Resource Manager"), + ) + if err != nil { + return nil, err + } + blocked, err := meter.Float64Counter( + "rcmgr_blocked_resources", + instrument.WithDescription("Number of blocked resources"), + ) + if err != nil { + return nil, err + } + + return reporter{ + connections: connections, + peerConnections: peerConnections, + previousPeerConnections: previousPeerConnections, + streams: streams, + peerStreams: peerStreams, + previousPeerStreams: previousPeerStreams, + memory: memory, + peerMemory: peerMemory, + previousPeerMemory: previousPeerMemory, + connectionMemory: connectionMemory, + previousConnectionMemory: previousConnectionMemory, + fileDescriptors: fileDescriptors, + blockedResources: blocked, + }, nil +} + +var _ rcmgr.TraceReporter = reporter{} + +type reporter struct { + connections instrument.Float64Counter + peerConnections instrument.Float64Histogram + previousPeerConnections instrument.Float64Histogram + + streams instrument.Float64Counter + peerStreams instrument.Float64Histogram + previousPeerStreams instrument.Float64Histogram + + memory instrument.Float64Counter + peerMemory instrument.Float64Histogram + previousPeerMemory instrument.Float64Histogram + + connectionMemory instrument.Float64Histogram + previousConnectionMemory instrument.Float64Histogram + + fileDescriptors instrument.Float64Counter + blockedResources instrument.Float64Counter +} + +// ConsumeEvent is a reimplementation of consumeEventWithLabelSlice in obs.StatsTraceReporter but using OTEL rather +// than Prometheus. Comments, variable names, and logic are all from the original with the only difference being how +// the metrics are recorded. +func (r reporter) ConsumeEvent(evt rcmgr.TraceEvt) { //nolint:funlen,gocyclo + ctx := context.Background() + + switch evt.Type { + case rcmgr.TraceAddStreamEvt, rcmgr.TraceRemoveStreamEvt: + if p := rcmgr.PeerStrInScopeName(evt.Name); p != "" { + // Aggregated peer stats. Counts how many peers have N number of streams open. + // Uses two buckets aggregations. One to count how many streams the + // peer has now. The other to count the negative value, or how many + // streams did the peer use to have. When looking at the data you + // take the difference from the two. + + oldStreamsOut := int64(evt.StreamsOut - evt.DeltaOut) + peerStreamsOut := int64(evt.StreamsOut) + if oldStreamsOut != peerStreamsOut { + if oldStreamsOut != 0 { + r.previousPeerStreams.Record(ctx, float64(oldStreamsOut), attribute.String("dir", "inbound")) + } + if peerStreamsOut != 0 { + r.peerStreams.Record(ctx, float64(peerStreamsOut), attribute.String("dir", "outbound")) + } + } + + oldStreamsIn := int64(evt.StreamsIn - evt.DeltaIn) + peerStreamsIn := int64(evt.StreamsIn) + if oldStreamsIn != peerStreamsIn { + if oldStreamsIn != 0 { + r.previousPeerStreams.Record(ctx, float64(peerStreamsIn), attribute.String("dir", "inbound")) + } + if peerStreamsIn != 0 { + r.peerStreams.Record(ctx, float64(peerStreamsIn), attribute.String("dir", "inbound")) + } + } + } else { + if evt.DeltaOut != 0 { + if rcmgr.IsSystemScope(evt.Name) || rcmgr.IsTransientScope(evt.Name) { + r.streams.Add(ctx, float64(evt.StreamsOut), + attribute.String("dir", "outbound"), + attribute.String("scope", evt.Name), + attribute.String("protocol", ""), + ) + } else if proto := rcmgr.ParseProtocolScopeName(evt.Name); proto != "" { + r.streams.Add(ctx, float64(evt.StreamsOut), + attribute.String("dir", "outbound"), + attribute.String("scope", "protocol"), + attribute.String("protocol", proto), + ) + } else { + // Not measuring service scope, connscope, servicepeer and protocolpeer. Lots of data, and + // you can use aggregated peer stats + service stats to infer + // this. + break + } + } + + if evt.DeltaIn != 0 { + if rcmgr.IsSystemScope(evt.Name) || rcmgr.IsTransientScope(evt.Name) { + r.streams.Add(ctx, float64(evt.StreamsIn), + attribute.String("dir", "inbound"), + attribute.String("scope", evt.Name), + attribute.String("protocol", ""), + ) + } else if proto := rcmgr.ParseProtocolScopeName(evt.Name); proto != "" { + r.streams.Add(ctx, float64(evt.StreamsIn), + attribute.String("dir", "inbound"), + attribute.String("scope", "protocol"), + attribute.String("protocol", proto), + ) + } else { + // Not measuring service scope, connscope, servicepeer and protocolpeer. Lots of data, and + // you can use aggregated peer stats + service stats to infer + // this. + break + } + } + } + case rcmgr.TraceAddConnEvt, rcmgr.TraceRemoveConnEvt: + if p := rcmgr.PeerStrInScopeName(evt.Name); p != "" { + // Aggregated peer stats. Counts how many peers have N number of connections. + // Uses two buckets aggregations. One to count how many streams the + // peer has now. The other to count the negative value, or how many + // conns did the peer use to have. When looking at the data you + // take the difference from the two. + + oldConnsOut := int64(evt.ConnsOut - evt.DeltaOut) + connsOut := int64(evt.ConnsOut) + if oldConnsOut != connsOut { + if oldConnsOut != 0 { + r.previousPeerConnections.Record(ctx, float64(oldConnsOut), attribute.String("dir", "outbound")) + } + if connsOut != 0 { + r.peerConnections.Record(ctx, float64(oldConnsOut), attribute.String("dir", "outbound")) + } + } + + oldConnsIn := int64(evt.ConnsIn - evt.DeltaIn) + connsIn := int64(evt.ConnsIn) + if oldConnsIn != connsIn { + if oldConnsIn != 0 { + r.previousPeerConnections.Record(ctx, float64(oldConnsIn), attribute.String("dir", "inbound")) + } + if connsIn != 0 { + r.peerConnections.Record(ctx, float64(connsIn), attribute.String("dir", "inbound")) + } + } + } else { + if rcmgr.IsConnScope(evt.Name) { + // Not measuring this. I don't think it's useful. + break + } + + if rcmgr.IsSystemScope(evt.Name) { + r.connections.Add(ctx, float64(evt.ConnsIn), attribute.String("dir", "inbound"), attribute.String("scope", "system")) + r.connections.Add(ctx, float64(evt.ConnsOut), attribute.String("dir", "outbound"), attribute.String("scope", "system")) + } else if rcmgr.IsTransientScope(evt.Name) { + r.connections.Add(ctx, float64(evt.ConnsIn), attribute.String("dir", "inbound"), attribute.String("scope", "transient")) + r.connections.Add(ctx, float64(evt.ConnsOut), attribute.String("dir", "outbound"), attribute.String("scope", "transient")) + } + + // Represents the delta in fds + if evt.Delta != 0 { + if rcmgr.IsSystemScope(evt.Name) { + r.fileDescriptors.Add(ctx, float64(evt.FD), attribute.String("scope", "system")) + } else if rcmgr.IsTransientScope(evt.Name) { + r.fileDescriptors.Add(ctx, float64(evt.FD), attribute.String("scope", "transient")) + } + } + } + case rcmgr.TraceReserveMemoryEvt, rcmgr.TraceReleaseMemoryEvt: + if p := rcmgr.PeerStrInScopeName(evt.Name); p != "" { + oldMem := evt.Memory - evt.Delta + if oldMem != evt.Memory { + if oldMem != 0 { + r.previousPeerMemory.Record(ctx, float64(oldMem)) + } + if evt.Memory != 0 { + r.peerMemory.Record(ctx, float64(evt.Memory)) + } + } + } else if rcmgr.IsConnScope(evt.Name) { + oldMem := evt.Memory - evt.Delta + if oldMem != evt.Memory { + if oldMem != 0 { + r.previousConnectionMemory.Record(ctx, float64(oldMem)) + } + if evt.Memory != 0 { + r.connectionMemory.Record(ctx, float64(evt.Memory)) + } + } + } else { + if rcmgr.IsSystemScope(evt.Name) || rcmgr.IsTransientScope(evt.Name) { + r.memory.Add(ctx, float64(evt.Memory), attribute.String("scope", evt.Name), attribute.String("protocol", "")) + } else if proto := rcmgr.ParseProtocolScopeName(evt.Name); proto != "" { + r.memory.Add(ctx, float64(evt.Memory), attribute.String("scope", "protocol"), attribute.String("protocol", proto)) + } else { + // Not measuring connscope, servicepeer and protocolpeer. Lots of data, and + // you can use aggregated peer stats + service stats to infer + // this. + break + } + } + + case rcmgr.TraceBlockAddConnEvt, rcmgr.TraceBlockAddStreamEvt, rcmgr.TraceBlockReserveMemoryEvt: + var resource string + if evt.Type == rcmgr.TraceBlockAddConnEvt { + resource = "connection" + } else if evt.Type == rcmgr.TraceBlockAddStreamEvt { + resource = "stream" + } else { + resource = "memory" + } + + scopeName := evt.Name + // Only the top scopeName. We don't want to get the peerid here. + // Using indexes and slices to avoid allocating. + scopeSplitIdx := strings.IndexByte(scopeName, ':') + if scopeSplitIdx != -1 { + scopeName = evt.Name[0:scopeSplitIdx] + } + // Drop the connection or stream id + idSplitIdx := strings.IndexByte(scopeName, '-') + if idSplitIdx != -1 { + scopeName = scopeName[0:idSplitIdx] + } + + if evt.DeltaIn != 0 { + r.blockedResources.Add(ctx, float64(evt.DeltaIn), + attribute.String("dir", "inbound"), + attribute.String("scope", scopeName), + attribute.String("resource", resource), + ) + } + + if evt.DeltaOut != 0 { + r.blockedResources.Add(ctx, float64(evt.DeltaOut), + attribute.String("dir", "outbound"), + attribute.String("scope", scopeName), + attribute.String("resource", resource), + ) + } + + if evt.Delta != 0 && resource == "connection" { + // This represents fds blocked + r.blockedResources.Add(ctx, float64(evt.Delta), + attribute.String("dir", ""), + attribute.String("scope", scopeName), + attribute.String("resource", "fd"), + ) + } else if evt.Delta != 0 { + r.blockedResources.Add(ctx, float64(evt.Delta), + attribute.String("dir", ""), + attribute.String("scope", scopeName), + attribute.String("resource", resource), + ) + } + } +} diff --git a/pkg/libp2p/rcmgr/metrics_test.go b/pkg/libp2p/rcmgr/metrics_test.go new file mode 100644 index 0000000000..db219d1717 --- /dev/null +++ b/pkg/libp2p/rcmgr/metrics_test.go @@ -0,0 +1,132 @@ +package rcmgr + +import ( + "bufio" + "context" + "fmt" + "github.com/libp2p/go-libp2p" + "github.com/libp2p/go-libp2p/core/crypto" + "github.com/libp2p/go-libp2p/core/host" + "github.com/libp2p/go-libp2p/core/network" + "github.com/libp2p/go-libp2p/core/peerstore" + "github.com/phayes/freeport" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc" + "go.opentelemetry.io/otel/metric" + sdkmetric "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/metric/metricdata" + "golang.org/x/exp/slices" + "io" + "testing" + "time" +) + +func TestMetricsReporter(t *testing.T) { + // Connect two libp2p hosts together, send a message, then make sure a metric was reported + + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute) + defer cancel() + + exp, err := otlpmetricgrpc.New(ctx) + require.NoError(t, err) + reader := sdkmetric.NewPeriodicReader(exp) + + meterProvider := sdkmetric.NewMeterProvider( + sdkmetric.WithReader(reader), + ) + + port1, err := freeport.GetFreePort() + require.NoError(t, err) + + host1 := startListener(t, port1, meterProvider) + + port2, err := freeport.GetFreePort() + require.NoError(t, err) + + host2 := startListener(t, port2, meterProvider) + + host2.SetStreamHandler("/echo/1.0", func(s network.Stream) { + buf := bufio.NewReader(s) + str, err := buf.ReadString('\n') + if !assert.NoError(t, err) { + assert.NoError(t, s.Reset()) + return + } + + if _, err = s.Write([]byte(str)); !assert.NoError(t, err) { + assert.NoError(t, s.Reset()) + return + } + assert.NoError(t, s.Close()) + }) + + host1.Peerstore().AddAddrs(host2.ID(), host2.Addrs(), peerstore.PermanentAddrTTL) + stream, err := host1.NewStream(ctx, host2.ID(), "/echo/1.0") + require.NoError(t, err) + + t.Cleanup(func() { + assert.NoError(t, stream.Close()) + }) + + _, err = stream.Write([]byte("testing\n")) + require.NoError(t, err) + + read, err := io.ReadAll(stream) + require.NoError(t, err) + assert.Equal(t, "testing\n", string(read)) + + metrics, err := reader.Collect(ctx) + require.NoError(t, err) + + t.Log(metrics) + + require.Len(t, metrics.ScopeMetrics, 1) + + connectionsMetric := slices.IndexFunc(metrics.ScopeMetrics[0].Metrics, func(metrics metricdata.Metrics) bool { + return metrics.Name == "rcmgr_connections" + }) + require.NotEqual(t, -1, connectionsMetric) + + connectionData := metrics.ScopeMetrics[0].Metrics[connectionsMetric].Data.(metricdata.Sum[float64]) + outboundConnection := slices.IndexFunc(connectionData.DataPoints, func(d metricdata.DataPoint[float64]) bool { + if bound, ok := d.Attributes.Value("dir"); !ok { + return false + } else if bound.AsString() != "outbound" { + return false + } + + if scope, ok := d.Attributes.Value("scope"); !ok { + return false + } else if scope.AsString() != "transient" { + return false + } + + return true + }) + require.NotEqual(t, -1, outboundConnection) + + assert.NotZero(t, connectionData.DataPoints[outboundConnection].Value) +} + +func startListener(t *testing.T, port int, provider metric.MeterProvider) host.Host { + priv, _, err := crypto.GenerateKeyPair(crypto.RSA, 2048) + require.NoError(t, err) + + t.Log("Starting listener on port", port) + + h, err := libp2p.New( + libp2p.ListenAddrStrings(fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", port)), + libp2p.Identity(priv), + libp2p.DisableRelay(), + libp2p.NoSecurity, + resourceManagerWithMetricsProvider(provider), + ) + require.NoError(t, err) + + t.Cleanup(func() { + assert.NoError(t, h.Close()) + }) + + return h +} diff --git a/pkg/telemetry/metrics.go b/pkg/telemetry/metrics.go index c46c69d0f9..d8cec3b9ed 100644 --- a/pkg/telemetry/metrics.go +++ b/pkg/telemetry/metrics.go @@ -6,7 +6,6 @@ import ( "os" "github.com/rs/zerolog/log" - "go.opentelemetry.io/otel/bridge/opencensus" "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc" "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp" "go.opentelemetry.io/otel/metric/global" @@ -30,9 +29,7 @@ func newMeterProvider() { return } - // reader that also bridges opencensus metrics to capture libp2p metrics reader := sdkmetric.NewPeriodicReader(exp) - reader.RegisterProducer(opencensus.NewMetricProducer()) meterProvider = sdkmetric.NewMeterProvider( sdkmetric.WithResource(newResource()),