Skip to content

Commit

Permalink
feat: reduce mem usage of rcmgr
Browse files Browse the repository at this point in the history
  • Loading branch information
guseggert committed Dec 17, 2022
1 parent 47f9ea1 commit 410f10e
Show file tree
Hide file tree
Showing 8 changed files with 176 additions and 71 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ require (
github.com/google/gopacket v1.1.19
github.com/gorilla/websocket v1.5.0
github.com/hashicorp/golang-lru v0.5.4
github.com/hashicorp/golang-lru/v2 v2.0.1
github.com/ipfs/go-cid v0.3.2
github.com/ipfs/go-datastore v0.6.0
github.com/ipfs/go-ds-badger v0.3.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,8 @@ github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ
github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hashicorp/golang-lru v0.5.4 h1:YDjusn29QI/Das2iO9M0BHnIbxPeyuCHsjMW+lJfyTc=
github.com/hashicorp/golang-lru v0.5.4/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4=
github.com/hashicorp/golang-lru/v2 v2.0.1 h1:5pv5N1lT1fjLg2VQ5KWc7kmucp2x/kvFOnxuVTqZ6x4=
github.com/hashicorp/golang-lru/v2 v2.0.1/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM=
github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/huin/goupnp v1.0.0/go.mod h1:n9v9KO1tAxYH82qOn+UTIFQDmx5n1Zxd/ClZDMX7Bnc=
Expand Down
152 changes: 91 additions & 61 deletions p2p/host/resource-manager/obs/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"strings"

lru "github.com/hashicorp/golang-lru/v2"
rcmgr "github.com/libp2p/go-libp2p/p2p/host/resource-manager"

"go.opencensus.io/stats"
Expand Down Expand Up @@ -125,19 +126,72 @@ var DefaultViews []*view.View = []*view.View{
}

// StatsTraceReporter reports stats on the resource manager using its traces.
type StatsTraceReporter struct{}
type StatsTraceReporter struct {
// mutsCache map[mutKey][]tag.Mutator
// mutsCacheLock sync.RWMutex

mutsCache *lru.Cache[tags, []tag.Mutator]
}

type tags struct {
direction string
scope string
service string
protocol string
resource string
}

const (
dirInbound = "inbound"
dirOutbound = "outbound"
scopeService = "service"
scopeProtocol = "protocol"
)

func NewStatsTraceReporter() (StatsTraceReporter, error) {
// TODO tell prometheus the system limits
return StatsTraceReporter{}, nil
cache, err := lru.New[tags, []tag.Mutator](2048)
if err != nil {
return StatsTraceReporter{}, err
}
return StatsTraceReporter{
mutsCache: cache,
}, nil
}

func (r StatsTraceReporter) ConsumeEvent(evt rcmgr.TraceEvt) {
func (r *StatsTraceReporter) record(ctx context.Context, tags tags, m stats.Measurement) {
mutators, ok := r.mutsCache.Get(tags)
if !ok {
mutators = nil

if tags.direction != "" {
mutators = append(mutators, tag.Upsert(directionTag, tags.direction))
}
if tags.scope != "" {
mutators = append(mutators, tag.Upsert(scopeTag, tags.scope))
}
if tags.service != "" {
mutators = append(mutators, tag.Upsert(serviceTag, tags.service))
}
if tags.protocol != "" {
mutators = append(mutators, tag.Upsert(protocolTag, tags.protocol))
}
if tags.resource != "" {
mutators = append(mutators, tag.Upsert(resourceTag, tags.resource))
}

r.mutsCache.Add(tags, mutators)
}

stats.RecordWithTags(ctx, mutators, m)
}

func (r *StatsTraceReporter) ConsumeEvent(evt rcmgr.TraceEvt) {
ctx := context.Background()

switch evt.Type {
case rcmgr.TraceAddStreamEvt, rcmgr.TraceRemoveStreamEvt:
if p := rcmgr.ParsePeerScopeName(evt.Name); p.Validate() == nil {
if rcmgr.IsPeerScope(evt.Name) {
// Aggregated peer stats. Counts how many peers have N number of streams open.
// Uses two buckets aggregations. One to count how many streams the
// peer has now. The other to count the negative value, or how many
Expand All @@ -148,31 +202,33 @@ func (r StatsTraceReporter) ConsumeEvent(evt rcmgr.TraceEvt) {
peerStreamsOut := int64(evt.StreamsOut)
if oldStreamsOut != peerStreamsOut {
if oldStreamsOut != 0 {
stats.RecordWithTags(ctx, []tag.Mutator{tag.Upsert(directionTag, "outbound")}, peerStreamsNegative.M(oldStreamsOut))
r.record(ctx, tags{direction: dirOutbound}, peerStreamsNegative.M(oldStreamsOut))
}
if peerStreamsOut != 0 {
stats.RecordWithTags(ctx, []tag.Mutator{tag.Upsert(directionTag, "outbound")}, peerStreams.M(peerStreamsOut))
r.record(ctx, tags{direction: dirOutbound}, peerStreams.M(peerStreamsOut))
}
}

oldStreamsIn := int64(evt.StreamsIn - evt.DeltaIn)
peerStreamsIn := int64(evt.StreamsIn)
if oldStreamsIn != peerStreamsIn {
if oldStreamsIn != 0 {
stats.RecordWithTags(ctx, []tag.Mutator{tag.Upsert(directionTag, "inbound")}, peerStreamsNegative.M(oldStreamsIn))
r.record(ctx, tags{direction: dirInbound}, peerStreamsNegative.M(oldStreamsIn))
}
if peerStreamsIn != 0 {
stats.RecordWithTags(ctx, []tag.Mutator{tag.Upsert(directionTag, "inbound")}, peerStreams.M(peerStreamsIn))
r.record(ctx, tags{direction: dirOutbound}, peerStreams.M(peerStreamsIn))
}
}
} else {
var tags []tag.Mutator
var scope, service, protocol string
if rcmgr.IsSystemScope(evt.Name) || rcmgr.IsTransientScope(evt.Name) {
tags = append(tags, tag.Upsert(scopeTag, evt.Name))
scope = evt.Name
} else if svc := rcmgr.ParseServiceScopeName(evt.Name); svc != "" {
tags = append(tags, tag.Upsert(scopeTag, "service"), tag.Upsert(serviceTag, svc))
scope = scopeService
service = svc
} else if proto := rcmgr.ParseProtocolScopeName(evt.Name); proto != "" {
tags = append(tags, tag.Upsert(scopeTag, "protocol"), tag.Upsert(protocolTag, proto))
scope = scopeProtocol
protocol = proto
} else {
// Not measuring connscope, servicepeer and protocolpeer. Lots of data, and
// you can use aggregated peer stats + service stats to infer
Expand All @@ -181,24 +237,16 @@ func (r StatsTraceReporter) ConsumeEvent(evt rcmgr.TraceEvt) {
}

if evt.DeltaOut != 0 {
stats.RecordWithTags(
ctx,
append([]tag.Mutator{tag.Upsert(directionTag, "outbound")}, tags...),
streams.M(int64(evt.DeltaOut)),
)
r.record(ctx, tags{scope: scope, service: service, protocol: protocol, direction: dirOutbound}, streams.M(int64(evt.DeltaOut)))
}

if evt.DeltaIn != 0 {
stats.RecordWithTags(
ctx,
append([]tag.Mutator{tag.Upsert(directionTag, "inbound")}, tags...),
streams.M(int64(evt.DeltaIn)),
)
r.record(ctx, tags{scope: scope, service: service, protocol: protocol, direction: dirInbound}, streams.M(int64(evt.DeltaOut)))
}
}

case rcmgr.TraceAddConnEvt, rcmgr.TraceRemoveConnEvt:
if p := rcmgr.ParsePeerScopeName(evt.Name); p.Validate() == nil {
if rcmgr.IsPeerScope(evt.Name) {
// Aggregated peer stats. Counts how many peers have N number of connections.
// Uses two buckets aggregations. One to count how many streams the
// peer has now. The other to count the negative value, or how many
Expand All @@ -209,27 +257,27 @@ func (r StatsTraceReporter) ConsumeEvent(evt rcmgr.TraceEvt) {
connsOut := int64(evt.ConnsOut)
if oldConnsOut != connsOut {
if oldConnsOut != 0 {
stats.RecordWithTags(ctx, []tag.Mutator{tag.Upsert(directionTag, "outbound")}, peerConnsNegative.M(oldConnsOut))
r.record(ctx, tags{direction: dirOutbound}, peerConnsNegative.M(oldConnsOut))
}
if connsOut != 0 {
stats.RecordWithTags(ctx, []tag.Mutator{tag.Upsert(directionTag, "outbound")}, peerConns.M(connsOut))
r.record(ctx, tags{direction: dirOutbound}, peerConns.M(oldConnsOut))
}
}

oldConnsIn := int64(evt.ConnsIn - evt.DeltaIn)
connsIn := int64(evt.ConnsIn)
if oldConnsIn != connsIn {
if oldConnsIn != 0 {
stats.RecordWithTags(ctx, []tag.Mutator{tag.Upsert(directionTag, "inbound")}, peerConnsNegative.M(oldConnsIn))
r.record(ctx, tags{direction: dirInbound}, peerConnsNegative.M(oldConnsIn))
}
if connsIn != 0 {
stats.RecordWithTags(ctx, []tag.Mutator{tag.Upsert(directionTag, "inbound")}, peerConns.M(connsIn))
r.record(ctx, tags{direction: dirInbound}, peerConns.M(connsIn))
}
}
} else {
var tags []tag.Mutator
var scope string
if rcmgr.IsSystemScope(evt.Name) || rcmgr.IsTransientScope(evt.Name) {
tags = append(tags, tag.Upsert(scopeTag, evt.Name))
scope = evt.Name
} else if rcmgr.IsConnScope(evt.Name) {
// Not measuring this. I don't think it's useful.
break
Expand All @@ -239,32 +287,20 @@ func (r StatsTraceReporter) ConsumeEvent(evt rcmgr.TraceEvt) {
}

if evt.DeltaOut != 0 {
stats.RecordWithTags(
ctx,
append([]tag.Mutator{tag.Upsert(directionTag, "outbound")}, tags...),
conns.M(int64(evt.DeltaOut)),
)
r.record(ctx, tags{scope: scope, direction: dirOutbound}, conns.M(int64(evt.DeltaOut)))
}

if evt.DeltaIn != 0 {
stats.RecordWithTags(
ctx,
append([]tag.Mutator{tag.Upsert(directionTag, "inbound")}, tags...),
conns.M(int64(evt.DeltaIn)),
)
r.record(ctx, tags{scope: scope, direction: dirInbound}, conns.M(int64(evt.DeltaIn)))
}

// Represents the delta in fds
if evt.Delta != 0 {
stats.RecordWithTags(
ctx,
tags,
fds.M(int64(evt.Delta)),
)
r.record(ctx, tags{scope: scope}, fds.M(int64(evt.Delta)))
}
}
case rcmgr.TraceReserveMemoryEvt, rcmgr.TraceReleaseMemoryEvt:
if p := rcmgr.ParsePeerScopeName(evt.Name); p.Validate() == nil {
if rcmgr.IsPeerScope(evt.Name) {
oldMem := evt.Memory - evt.Delta
if oldMem != evt.Memory {
if oldMem != 0 {
Expand All @@ -285,13 +321,15 @@ func (r StatsTraceReporter) ConsumeEvent(evt rcmgr.TraceEvt) {
}
}
} else {
var tags []tag.Mutator
var scope, service, protocol string
if rcmgr.IsSystemScope(evt.Name) || rcmgr.IsTransientScope(evt.Name) {
tags = append(tags, tag.Upsert(scopeTag, evt.Name))
scope = evt.Name
} else if svc := rcmgr.ParseServiceScopeName(evt.Name); svc != "" {
tags = append(tags, tag.Upsert(scopeTag, "service"), tag.Upsert(serviceTag, svc))
scope = scopeService
service = svc
} else if proto := rcmgr.ParseProtocolScopeName(evt.Name); proto != "" {
tags = append(tags, tag.Upsert(scopeTag, "protocol"), tag.Upsert(protocolTag, proto))
scope = scopeProtocol
protocol = proto
} else {
// Not measuring connscope, servicepeer and protocolpeer. Lots of data, and
// you can use aggregated peer stats + service stats to infer
Expand All @@ -300,7 +338,7 @@ func (r StatsTraceReporter) ConsumeEvent(evt rcmgr.TraceEvt) {
}

if evt.Delta != 0 {
stats.RecordWithTags(ctx, tags, memory.M(int64(evt.Delta)))
r.record(ctx, tags{scope: scope, service: service, protocol: protocol}, memory.M(int64(evt.Delta)))
}
}

Expand All @@ -321,24 +359,16 @@ func (r StatsTraceReporter) ConsumeEvent(evt rcmgr.TraceEvt) {

// If something else gets added here, make sure to update the size hint
// below when we make `tagsWithDir`.
tags := []tag.Mutator{tag.Upsert(scopeTag, scopeName), tag.Upsert(resourceTag, resource)}

if evt.DeltaIn != 0 {
tagsWithDir := make([]tag.Mutator, 0, 3)
tagsWithDir = append(tagsWithDir, tag.Insert(directionTag, "inbound"))
tagsWithDir = append(tagsWithDir, tags...)
stats.RecordWithTags(ctx, tagsWithDir[0:], blockedResources.M(int64(1)))
r.record(ctx, tags{scope: scopeName, resource: resource, direction: dirInbound}, blockedResources.M(int64(1)))
}

if evt.DeltaOut != 0 {
tagsWithDir := make([]tag.Mutator, 0, 3)
tagsWithDir = append(tagsWithDir, tag.Insert(directionTag, "outbound"))
tagsWithDir = append(tagsWithDir, tags...)
stats.RecordWithTags(ctx, tagsWithDir, blockedResources.M(int64(1)))
r.record(ctx, tags{scope: scopeName, resource: resource, direction: dirOutbound}, blockedResources.M(int64(1)))
}

if evt.Delta != 0 {
stats.RecordWithTags(ctx, tags, blockedResources.M(1))
r.record(ctx, tags{scope: scopeName, resource: resource}, blockedResources.M(1))
}
}
}
6 changes: 5 additions & 1 deletion p2p/host/resource-manager/obs/stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,11 @@ import (
)

func TestTraceReporterStartAndClose(t *testing.T) {
rcmgr, err := rcmgr.NewResourceManager(rcmgr.NewFixedLimiter(rcmgr.DefaultLimits.AutoScale()), rcmgr.WithTraceReporter(obs.StatsTraceReporter{}))
tr, err := obs.NewStatsTraceReporter()
if err != nil {
t.Fatal(err)
}
rcmgr, err := rcmgr.NewResourceManager(rcmgr.NewFixedLimiter(rcmgr.DefaultLimits.AutoScale()), rcmgr.WithTraceReporter(tr))
if err != nil {
t.Fatal(err)
}
Expand Down
12 changes: 6 additions & 6 deletions p2p/host/resource-manager/rcmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -518,19 +518,19 @@ func peerScopeName(p peer.ID) string {
}

// ParsePeerScopeName returns "" if name is not a peerScopeName
func ParsePeerScopeName(name string) peer.ID {
func ParsePeerScopeName(name string) string {
if !strings.HasPrefix(name, "peer:") || IsSpan(name) {
return ""
}
parts := strings.SplitN(name, "peer:", 2)
if len(parts) != 2 {
return ""
}
p, err := peer.Decode(parts[1])
if err != nil {
return ""
}
return p
return parts[1]
}

func IsPeerScope(name string) bool {
return strings.HasPrefix(name, "peer:") || !IsSpan(name)
}

// ParseServiceScopeName returns the service name if name is a serviceScopeName.
Expand Down
6 changes: 5 additions & 1 deletion p2p/test/resource-manager/echo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,11 @@ import (
"github.com/stretchr/testify/require"
)

func createEchos(t *testing.T, count int, makeOpts ...func(int) libp2p.Option) []*Echo {
type fataler interface {
Fatal(args ...any)
}

func createEchos(t fataler, count int, makeOpts ...func(int) libp2p.Option) []*Echo {
result := make([]*Echo, 0, count)

for i := 0; i < count; i++ {
Expand Down
Loading

0 comments on commit 410f10e

Please sign in to comment.