From 410f10eb7f5fa2ac2bbc989923e8e4a5e68fc427 Mon Sep 17 00:00:00 2001 From: Gus Eggert Date: Fri, 16 Dec 2022 16:26:47 -0500 Subject: [PATCH] feat: reduce mem usage of rcmgr --- go.mod | 1 + go.sum | 2 + p2p/host/resource-manager/obs/stats.go | 152 +++++++++++------- p2p/host/resource-manager/obs/stats_test.go | 6 +- p2p/host/resource-manager/rcmgr.go | 12 +- p2p/test/resource-manager/echo_test.go | 6 +- p2p/test/resource-manager/rcmgr_bench_test.go | 60 +++++++ p2p/test/resource-manager/rcmgr_test.go | 8 +- 8 files changed, 176 insertions(+), 71 deletions(-) create mode 100644 p2p/test/resource-manager/rcmgr_bench_test.go diff --git a/go.mod b/go.mod index 941258988c..bf580cd116 100644 --- a/go.mod +++ b/go.mod @@ -12,6 +12,7 @@ require ( github.com/google/gopacket v1.1.19 github.com/gorilla/websocket v1.5.0 github.com/hashicorp/golang-lru v0.5.4 + github.com/hashicorp/golang-lru/v2 v2.0.1 github.com/ipfs/go-cid v0.3.2 github.com/ipfs/go-datastore v0.6.0 github.com/ipfs/go-ds-badger v0.3.0 diff --git a/go.sum b/go.sum index 76783b2be0..01fd8a4fe7 100644 --- a/go.sum +++ b/go.sum @@ -230,6 +230,8 @@ github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/golang-lru v0.5.4 h1:YDjusn29QI/Das2iO9M0BHnIbxPeyuCHsjMW+lJfyTc= github.com/hashicorp/golang-lru v0.5.4/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4= +github.com/hashicorp/golang-lru/v2 v2.0.1 h1:5pv5N1lT1fjLg2VQ5KWc7kmucp2x/kvFOnxuVTqZ6x4= +github.com/hashicorp/golang-lru/v2 v2.0.1/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM= github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/huin/goupnp v1.0.0/go.mod h1:n9v9KO1tAxYH82qOn+UTIFQDmx5n1Zxd/ClZDMX7Bnc= diff --git a/p2p/host/resource-manager/obs/stats.go b/p2p/host/resource-manager/obs/stats.go index 2b0b5cb72a..67953994fc 100644 --- a/p2p/host/resource-manager/obs/stats.go +++ b/p2p/host/resource-manager/obs/stats.go @@ -4,6 +4,7 @@ import ( "context" "strings" + lru "github.com/hashicorp/golang-lru/v2" rcmgr "github.com/libp2p/go-libp2p/p2p/host/resource-manager" "go.opencensus.io/stats" @@ -125,19 +126,72 @@ var DefaultViews []*view.View = []*view.View{ } // StatsTraceReporter reports stats on the resource manager using its traces. -type StatsTraceReporter struct{} +type StatsTraceReporter struct { + // mutsCache map[mutKey][]tag.Mutator + // mutsCacheLock sync.RWMutex + + mutsCache *lru.Cache[tags, []tag.Mutator] +} + +type tags struct { + direction string + scope string + service string + protocol string + resource string +} + +const ( + dirInbound = "inbound" + dirOutbound = "outbound" + scopeService = "service" + scopeProtocol = "protocol" +) func NewStatsTraceReporter() (StatsTraceReporter, error) { // TODO tell prometheus the system limits - return StatsTraceReporter{}, nil + cache, err := lru.New[tags, []tag.Mutator](2048) + if err != nil { + return StatsTraceReporter{}, err + } + return StatsTraceReporter{ + mutsCache: cache, + }, nil } -func (r StatsTraceReporter) ConsumeEvent(evt rcmgr.TraceEvt) { +func (r *StatsTraceReporter) record(ctx context.Context, tags tags, m stats.Measurement) { + mutators, ok := r.mutsCache.Get(tags) + if !ok { + mutators = nil + + if tags.direction != "" { + mutators = append(mutators, tag.Upsert(directionTag, tags.direction)) + } + if tags.scope != "" { + mutators = append(mutators, tag.Upsert(scopeTag, tags.scope)) + } + if tags.service != "" { + mutators = append(mutators, tag.Upsert(serviceTag, tags.service)) + } + if tags.protocol != "" { + mutators = append(mutators, tag.Upsert(protocolTag, tags.protocol)) + } + if tags.resource != "" { + mutators = append(mutators, tag.Upsert(resourceTag, tags.resource)) + } + + r.mutsCache.Add(tags, mutators) + } + + stats.RecordWithTags(ctx, mutators, m) +} + +func (r *StatsTraceReporter) ConsumeEvent(evt rcmgr.TraceEvt) { ctx := context.Background() switch evt.Type { case rcmgr.TraceAddStreamEvt, rcmgr.TraceRemoveStreamEvt: - if p := rcmgr.ParsePeerScopeName(evt.Name); p.Validate() == nil { + if rcmgr.IsPeerScope(evt.Name) { // Aggregated peer stats. Counts how many peers have N number of streams open. // Uses two buckets aggregations. One to count how many streams the // peer has now. The other to count the negative value, or how many @@ -148,10 +202,10 @@ func (r StatsTraceReporter) ConsumeEvent(evt rcmgr.TraceEvt) { peerStreamsOut := int64(evt.StreamsOut) if oldStreamsOut != peerStreamsOut { if oldStreamsOut != 0 { - stats.RecordWithTags(ctx, []tag.Mutator{tag.Upsert(directionTag, "outbound")}, peerStreamsNegative.M(oldStreamsOut)) + r.record(ctx, tags{direction: dirOutbound}, peerStreamsNegative.M(oldStreamsOut)) } if peerStreamsOut != 0 { - stats.RecordWithTags(ctx, []tag.Mutator{tag.Upsert(directionTag, "outbound")}, peerStreams.M(peerStreamsOut)) + r.record(ctx, tags{direction: dirOutbound}, peerStreams.M(peerStreamsOut)) } } @@ -159,20 +213,22 @@ func (r StatsTraceReporter) ConsumeEvent(evt rcmgr.TraceEvt) { peerStreamsIn := int64(evt.StreamsIn) if oldStreamsIn != peerStreamsIn { if oldStreamsIn != 0 { - stats.RecordWithTags(ctx, []tag.Mutator{tag.Upsert(directionTag, "inbound")}, peerStreamsNegative.M(oldStreamsIn)) + r.record(ctx, tags{direction: dirInbound}, peerStreamsNegative.M(oldStreamsIn)) } if peerStreamsIn != 0 { - stats.RecordWithTags(ctx, []tag.Mutator{tag.Upsert(directionTag, "inbound")}, peerStreams.M(peerStreamsIn)) + r.record(ctx, tags{direction: dirOutbound}, peerStreams.M(peerStreamsIn)) } } } else { - var tags []tag.Mutator + var scope, service, protocol string if rcmgr.IsSystemScope(evt.Name) || rcmgr.IsTransientScope(evt.Name) { - tags = append(tags, tag.Upsert(scopeTag, evt.Name)) + scope = evt.Name } else if svc := rcmgr.ParseServiceScopeName(evt.Name); svc != "" { - tags = append(tags, tag.Upsert(scopeTag, "service"), tag.Upsert(serviceTag, svc)) + scope = scopeService + service = svc } else if proto := rcmgr.ParseProtocolScopeName(evt.Name); proto != "" { - tags = append(tags, tag.Upsert(scopeTag, "protocol"), tag.Upsert(protocolTag, proto)) + scope = scopeProtocol + protocol = proto } else { // Not measuring connscope, servicepeer and protocolpeer. Lots of data, and // you can use aggregated peer stats + service stats to infer @@ -181,24 +237,16 @@ func (r StatsTraceReporter) ConsumeEvent(evt rcmgr.TraceEvt) { } if evt.DeltaOut != 0 { - stats.RecordWithTags( - ctx, - append([]tag.Mutator{tag.Upsert(directionTag, "outbound")}, tags...), - streams.M(int64(evt.DeltaOut)), - ) + r.record(ctx, tags{scope: scope, service: service, protocol: protocol, direction: dirOutbound}, streams.M(int64(evt.DeltaOut))) } if evt.DeltaIn != 0 { - stats.RecordWithTags( - ctx, - append([]tag.Mutator{tag.Upsert(directionTag, "inbound")}, tags...), - streams.M(int64(evt.DeltaIn)), - ) + r.record(ctx, tags{scope: scope, service: service, protocol: protocol, direction: dirInbound}, streams.M(int64(evt.DeltaOut))) } } case rcmgr.TraceAddConnEvt, rcmgr.TraceRemoveConnEvt: - if p := rcmgr.ParsePeerScopeName(evt.Name); p.Validate() == nil { + if rcmgr.IsPeerScope(evt.Name) { // Aggregated peer stats. Counts how many peers have N number of connections. // Uses two buckets aggregations. One to count how many streams the // peer has now. The other to count the negative value, or how many @@ -209,10 +257,10 @@ func (r StatsTraceReporter) ConsumeEvent(evt rcmgr.TraceEvt) { connsOut := int64(evt.ConnsOut) if oldConnsOut != connsOut { if oldConnsOut != 0 { - stats.RecordWithTags(ctx, []tag.Mutator{tag.Upsert(directionTag, "outbound")}, peerConnsNegative.M(oldConnsOut)) + r.record(ctx, tags{direction: dirOutbound}, peerConnsNegative.M(oldConnsOut)) } if connsOut != 0 { - stats.RecordWithTags(ctx, []tag.Mutator{tag.Upsert(directionTag, "outbound")}, peerConns.M(connsOut)) + r.record(ctx, tags{direction: dirOutbound}, peerConns.M(oldConnsOut)) } } @@ -220,16 +268,16 @@ func (r StatsTraceReporter) ConsumeEvent(evt rcmgr.TraceEvt) { connsIn := int64(evt.ConnsIn) if oldConnsIn != connsIn { if oldConnsIn != 0 { - stats.RecordWithTags(ctx, []tag.Mutator{tag.Upsert(directionTag, "inbound")}, peerConnsNegative.M(oldConnsIn)) + r.record(ctx, tags{direction: dirInbound}, peerConnsNegative.M(oldConnsIn)) } if connsIn != 0 { - stats.RecordWithTags(ctx, []tag.Mutator{tag.Upsert(directionTag, "inbound")}, peerConns.M(connsIn)) + r.record(ctx, tags{direction: dirInbound}, peerConns.M(connsIn)) } } } else { - var tags []tag.Mutator + var scope string if rcmgr.IsSystemScope(evt.Name) || rcmgr.IsTransientScope(evt.Name) { - tags = append(tags, tag.Upsert(scopeTag, evt.Name)) + scope = evt.Name } else if rcmgr.IsConnScope(evt.Name) { // Not measuring this. I don't think it's useful. break @@ -239,32 +287,20 @@ func (r StatsTraceReporter) ConsumeEvent(evt rcmgr.TraceEvt) { } if evt.DeltaOut != 0 { - stats.RecordWithTags( - ctx, - append([]tag.Mutator{tag.Upsert(directionTag, "outbound")}, tags...), - conns.M(int64(evt.DeltaOut)), - ) + r.record(ctx, tags{scope: scope, direction: dirOutbound}, conns.M(int64(evt.DeltaOut))) } if evt.DeltaIn != 0 { - stats.RecordWithTags( - ctx, - append([]tag.Mutator{tag.Upsert(directionTag, "inbound")}, tags...), - conns.M(int64(evt.DeltaIn)), - ) + r.record(ctx, tags{scope: scope, direction: dirInbound}, conns.M(int64(evt.DeltaIn))) } // Represents the delta in fds if evt.Delta != 0 { - stats.RecordWithTags( - ctx, - tags, - fds.M(int64(evt.Delta)), - ) + r.record(ctx, tags{scope: scope}, fds.M(int64(evt.Delta))) } } case rcmgr.TraceReserveMemoryEvt, rcmgr.TraceReleaseMemoryEvt: - if p := rcmgr.ParsePeerScopeName(evt.Name); p.Validate() == nil { + if rcmgr.IsPeerScope(evt.Name) { oldMem := evt.Memory - evt.Delta if oldMem != evt.Memory { if oldMem != 0 { @@ -285,13 +321,15 @@ func (r StatsTraceReporter) ConsumeEvent(evt rcmgr.TraceEvt) { } } } else { - var tags []tag.Mutator + var scope, service, protocol string if rcmgr.IsSystemScope(evt.Name) || rcmgr.IsTransientScope(evt.Name) { - tags = append(tags, tag.Upsert(scopeTag, evt.Name)) + scope = evt.Name } else if svc := rcmgr.ParseServiceScopeName(evt.Name); svc != "" { - tags = append(tags, tag.Upsert(scopeTag, "service"), tag.Upsert(serviceTag, svc)) + scope = scopeService + service = svc } else if proto := rcmgr.ParseProtocolScopeName(evt.Name); proto != "" { - tags = append(tags, tag.Upsert(scopeTag, "protocol"), tag.Upsert(protocolTag, proto)) + scope = scopeProtocol + protocol = proto } else { // Not measuring connscope, servicepeer and protocolpeer. Lots of data, and // you can use aggregated peer stats + service stats to infer @@ -300,7 +338,7 @@ func (r StatsTraceReporter) ConsumeEvent(evt rcmgr.TraceEvt) { } if evt.Delta != 0 { - stats.RecordWithTags(ctx, tags, memory.M(int64(evt.Delta))) + r.record(ctx, tags{scope: scope, service: service, protocol: protocol}, memory.M(int64(evt.Delta))) } } @@ -321,24 +359,16 @@ func (r StatsTraceReporter) ConsumeEvent(evt rcmgr.TraceEvt) { // If something else gets added here, make sure to update the size hint // below when we make `tagsWithDir`. - tags := []tag.Mutator{tag.Upsert(scopeTag, scopeName), tag.Upsert(resourceTag, resource)} - if evt.DeltaIn != 0 { - tagsWithDir := make([]tag.Mutator, 0, 3) - tagsWithDir = append(tagsWithDir, tag.Insert(directionTag, "inbound")) - tagsWithDir = append(tagsWithDir, tags...) - stats.RecordWithTags(ctx, tagsWithDir[0:], blockedResources.M(int64(1))) + r.record(ctx, tags{scope: scopeName, resource: resource, direction: dirInbound}, blockedResources.M(int64(1))) } if evt.DeltaOut != 0 { - tagsWithDir := make([]tag.Mutator, 0, 3) - tagsWithDir = append(tagsWithDir, tag.Insert(directionTag, "outbound")) - tagsWithDir = append(tagsWithDir, tags...) - stats.RecordWithTags(ctx, tagsWithDir, blockedResources.M(int64(1))) + r.record(ctx, tags{scope: scopeName, resource: resource, direction: dirOutbound}, blockedResources.M(int64(1))) } if evt.Delta != 0 { - stats.RecordWithTags(ctx, tags, blockedResources.M(1)) + r.record(ctx, tags{scope: scopeName, resource: resource}, blockedResources.M(1)) } } } diff --git a/p2p/host/resource-manager/obs/stats_test.go b/p2p/host/resource-manager/obs/stats_test.go index 81a2009e6c..8fbaa1a5af 100644 --- a/p2p/host/resource-manager/obs/stats_test.go +++ b/p2p/host/resource-manager/obs/stats_test.go @@ -11,7 +11,11 @@ import ( ) func TestTraceReporterStartAndClose(t *testing.T) { - rcmgr, err := rcmgr.NewResourceManager(rcmgr.NewFixedLimiter(rcmgr.DefaultLimits.AutoScale()), rcmgr.WithTraceReporter(obs.StatsTraceReporter{})) + tr, err := obs.NewStatsTraceReporter() + if err != nil { + t.Fatal(err) + } + rcmgr, err := rcmgr.NewResourceManager(rcmgr.NewFixedLimiter(rcmgr.DefaultLimits.AutoScale()), rcmgr.WithTraceReporter(tr)) if err != nil { t.Fatal(err) } diff --git a/p2p/host/resource-manager/rcmgr.go b/p2p/host/resource-manager/rcmgr.go index 03d100a634..6a924c4b00 100644 --- a/p2p/host/resource-manager/rcmgr.go +++ b/p2p/host/resource-manager/rcmgr.go @@ -518,7 +518,7 @@ func peerScopeName(p peer.ID) string { } // ParsePeerScopeName returns "" if name is not a peerScopeName -func ParsePeerScopeName(name string) peer.ID { +func ParsePeerScopeName(name string) string { if !strings.HasPrefix(name, "peer:") || IsSpan(name) { return "" } @@ -526,11 +526,11 @@ func ParsePeerScopeName(name string) peer.ID { if len(parts) != 2 { return "" } - p, err := peer.Decode(parts[1]) - if err != nil { - return "" - } - return p + return parts[1] +} + +func IsPeerScope(name string) bool { + return strings.HasPrefix(name, "peer:") || !IsSpan(name) } // ParseServiceScopeName returns the service name if name is a serviceScopeName. diff --git a/p2p/test/resource-manager/echo_test.go b/p2p/test/resource-manager/echo_test.go index d6896fbdef..a16ace6757 100644 --- a/p2p/test/resource-manager/echo_test.go +++ b/p2p/test/resource-manager/echo_test.go @@ -12,7 +12,11 @@ import ( "github.com/stretchr/testify/require" ) -func createEchos(t *testing.T, count int, makeOpts ...func(int) libp2p.Option) []*Echo { +type fataler interface { + Fatal(args ...any) +} + +func createEchos(t fataler, count int, makeOpts ...func(int) libp2p.Option) []*Echo { result := make([]*Echo, 0, count) for i := 0; i < count; i++ { diff --git a/p2p/test/resource-manager/rcmgr_bench_test.go b/p2p/test/resource-manager/rcmgr_bench_test.go new file mode 100644 index 0000000000..7c92749bb8 --- /dev/null +++ b/p2p/test/resource-manager/rcmgr_bench_test.go @@ -0,0 +1,60 @@ +package itest + +import ( + "context" + "log" + "testing" + + rcmgr "github.com/libp2p/go-libp2p/p2p/host/resource-manager" + "github.com/libp2p/go-libp2p/p2p/host/resource-manager/obs" +) + +func BenchmarkMetrics(b *testing.B) { + // this test checks that we can not exceed the inbound conn limit at system level + // we specify: 1 conn per peer, 3 conns total, and we try to create 4 conns + cfg := rcmgr.DefaultLimits.AutoScale() + + b.Run("on", func(b *testing.B) { + b.ReportAllocs() + + tr, err := obs.NewStatsTraceReporter() + if err != nil { + b.Fatal(err) + } + echos := createEchos(b, 2, makeRcmgrOption(b, cfg, rcmgr.WithTraceReporter(&tr))) + defer closeEchos(echos) + defer closeRcmgrs(echos) + + host1 := echos[0] + host2 := echos[1] + + for i := 0; i < b.N; i++ { + stream, err := host1.Host.NewStream(context.Background(), host2.Host.ID(), EchoProtoID) + if err != nil { + log.Fatal(err) + } + stream.Close() + stream.Conn().Close() + } + }) + + b.Run("off", func(b *testing.B) { + b.ReportAllocs() + + echos := createEchos(b, 2, makeRcmgrOption(b, cfg)) + defer closeEchos(echos) + defer closeRcmgrs(echos) + + host1 := echos[0] + host2 := echos[1] + + for i := 0; i < b.N; i++ { + stream, err := host1.Host.NewStream(context.Background(), host2.Host.ID(), EchoProtoID) + if err != nil { + log.Fatal(err) + } + stream.Close() + stream.Conn().Close() + } + }) +} diff --git a/p2p/test/resource-manager/rcmgr_test.go b/p2p/test/resource-manager/rcmgr_test.go index ae546c5af9..7d421a5128 100644 --- a/p2p/test/resource-manager/rcmgr_test.go +++ b/p2p/test/resource-manager/rcmgr_test.go @@ -17,9 +17,13 @@ import ( "github.com/stretchr/testify/require" ) -func makeRcmgrOption(t *testing.T, cfg rcmgr.LimitConfig) func(int) libp2p.Option { +type rcmgrOptionT interface { + require.TestingT + Name() string +} + +func makeRcmgrOption(t rcmgrOptionT, cfg rcmgr.LimitConfig, opts ...rcmgr.Option) func(int) libp2p.Option { return func(i int) libp2p.Option { - var opts []rcmgr.Option if os.Getenv("LIBP2P_TEST_RCMGR_TRACE") == "1" { opts = append(opts, rcmgr.WithTrace(fmt.Sprintf("%s-%d.json.gz", t.Name(), i))) }