-
Notifications
You must be signed in to change notification settings - Fork 968
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(telemetry): add blackbox instrumentation to the header module + share module + p2p bandwidth metrics #1376
Changes from all commits
cf06f35
20a6e05
c7f219e
77c3ef4
9e2f8e7
4873431
b7a9ce5
50b3f0b
ea53b3c
731fad1
7acbd90
e12b1b1
9e43517
c49070b
c355187
36bee9f
2a1214e
6df5d21
c626255
669dd25
37b4aba
900773f
fc0eaaf
677c260
b436e5f
3b59ccf
c45a60a
78135ec
802fc8b
6014c3d
326baae
6a46c5d
aa54481
f78e787
7959dc5
1a936dd
08518dd
88ba11c
9025cee
ca92182
1446352
4e4a334
33e18af
1af715c
008ee2e
e8e5e09
0f66046
afa705d
9664666
df68ff2
08e40b1
98ff6fd
9513399
2b647e5
b75ebf2
e959de7
8446602
81ee795
d368579
f996fce
1409d53
c10a17b
e0ee3b9
0adaeff
d57e7b2
0993628
b6fb9b6
8f94b6f
4c1fa3b
531abfb
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,3 +20,4 @@ vendor | |
/cel-key | ||
coverage.txt | ||
go.work | ||
nodebuilder/keyring-test/ | ||
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -109,6 +109,5 @@ func startGRPCServer( | |
log.Error("serving GRPC: ", err) | ||
} | ||
}() | ||
|
||
return grpcSrv, nil | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -141,9 +141,11 @@ func (m *metrics) observeSample( | |
if m == nil { | ||
return | ||
} | ||
|
||
m.sampleTime.Record(ctx, sampleTime.Seconds(), | ||
attribute.Bool("failed", err != nil), | ||
attribute.Int("header_width", len(h.DAH.RowsRoots)), | ||
attribute.Int("header", int(h.RawHeader.Height)), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If header high used as attribute, it will be evaluated as vector dimension in the prometheus. Header hight will have very high cardinality and will totally kill prometheus performance. Thats the primary reason I've not added it as an attribute. Please consider using it as a metric value(Gauge) if it is a must for your observation or use an aggregated value for observation like There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also, @rootulp was advocating long ago to keep our metrics with low cardinality and not to overwhelm our incentivized testnet OTEL collector and Prometheus behind it. |
||
) | ||
|
||
m.sampled.Add(ctx, 1, | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,57 @@ | ||
package p2p | ||
|
||
import ( | ||
"context" | ||
|
||
"go.opentelemetry.io/otel/metric/global" | ||
"go.opentelemetry.io/otel/metric/instrument" | ||
"go.opentelemetry.io/otel/metric/instrument/syncfloat64" | ||
) | ||
|
||
type metrics struct { | ||
responseSize syncfloat64.Histogram | ||
responseDuration syncfloat64.Histogram | ||
} | ||
|
||
var ( | ||
meter = global.MeterProvider().Meter("libs/header/p2p") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The meter naming is inconsistent. |
||
) | ||
|
||
func (ex *Exchange[H]) RegisterMetrics() error { | ||
responseSize, err := meter. | ||
SyncFloat64(). | ||
Histogram( | ||
"libhead_get_headers_response_size", | ||
instrument.WithDescription("Size of get headers response in bytes"), | ||
) | ||
if err != nil { | ||
panic(err) | ||
} | ||
|
||
responseDuration, err := meter. | ||
SyncFloat64(). | ||
Histogram( | ||
"libhead_get_headers_request_duration", | ||
instrument.WithDescription("Duration of get headers request in seconds"), | ||
) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
m := &metrics{ | ||
responseSize: responseSize, | ||
responseDuration: responseDuration, | ||
} | ||
|
||
ex.metrics = m | ||
|
||
return nil | ||
} | ||
|
||
func (m *metrics) ObserveRequest(ctx context.Context, size uint64, duration uint64) { | ||
if m == nil { | ||
return | ||
} | ||
m.responseSize.Record(ctx, float64(size)) | ||
m.responseDuration.Record(ctx, float64(duration)) | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,20 @@ | ||
package header | ||
|
||
import ( | ||
header "github.com/celestiaorg/celestia-node/header" | ||
libhead "github.com/celestiaorg/celestia-node/libs/header" | ||
p2p "github.com/celestiaorg/celestia-node/libs/header/p2p" | ||
) | ||
|
||
// WithMetrics provides sets `MetricsEnabled` to true on ClientParameters for the header exchange | ||
func WithMetrics(ex libhead.Exchange[*header.ExtendedHeader]) error { | ||
exchange, ok := (ex).(*p2p.Exchange[*header.ExtendedHeader]) | ||
if !ok { | ||
// not all implementations of libhead.Exchange[*header.ExtendedHeader] | ||
// are p2p.Exchange[*header.ExtendedHeader | ||
// thus we need to avoid panicking here for when | ||
// ex is of another base type | ||
return nil | ||
} | ||
return exchange.RegisterMetrics() | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -9,7 +9,7 @@ import ( | |
"github.com/celestiaorg/celestia-node/libs/header/sync" | ||
) | ||
|
||
// Service represents the header Service that can be started / stopped on a node. | ||
// Service represents the header service that can be started / stopped on a node. | ||
// Service's main function is to manage its sub-services. Service can contain several | ||
// sub-services, such as Exchange, ExchangeServer, Syncer, and so forth. | ||
type Service struct { | ||
|
@@ -45,6 +45,11 @@ func (s *Service) Head(ctx context.Context) (*header.ExtendedHeader, error) { | |
return s.store.Head(ctx) | ||
} | ||
|
||
func (s *Service) IsSyncing(context.Context) bool { | ||
func (s *Service) IsSyncing(ctx context.Context) bool { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. unneeded change in the ctx |
||
return !s.syncer.State().Finished() | ||
} | ||
|
||
// SyncerHead returns the ExtendedHeader of the chain head from the validator network. | ||
func (s *Service) SyncerHead(ctx context.Context) (*header.ExtendedHeader, error) { | ||
return s.syncer.Head(ctx) | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,82 @@ | ||
package p2p | ||
|
||
import ( | ||
"context" | ||
|
||
"github.com/libp2p/go-libp2p/core/metrics" | ||
"go.opentelemetry.io/otel/metric/global" | ||
"go.opentelemetry.io/otel/metric/instrument" | ||
"go.opentelemetry.io/otel/metric/unit" | ||
) | ||
|
||
// global meter provider (see opentelemetry docs) | ||
var ( | ||
meter = global.MeterProvider().Meter("p2p") | ||
) | ||
|
||
// WithMetrics option sets up metrics for p2p networking. | ||
func WithMetrics(bc *metrics.BandwidthCounter) error { | ||
bandwidthTotalInbound, err := meter. | ||
SyncInt64(). | ||
Histogram( | ||
"p2p_bandwidth_total_inbound", | ||
instrument.WithUnit(unit.Bytes), | ||
instrument.WithDescription("total number of bytes received by the host"), | ||
) | ||
if err != nil { | ||
return err | ||
} | ||
bandwidthTotalOutbound, _ := meter. | ||
SyncInt64(). | ||
Histogram( | ||
"p2p_bandwidth_total_outbound", | ||
instrument.WithUnit(unit.Bytes), | ||
instrument.WithDescription("total number of bytes sent by the host"), | ||
) | ||
if err != nil { | ||
return err | ||
} | ||
bandwidthRateInbound, _ := meter. | ||
SyncFloat64(). | ||
Histogram( | ||
"p2p_bandwidth_rate_inbound", | ||
instrument.WithDescription("total number of bytes sent by the host"), | ||
) | ||
if err != nil { | ||
return err | ||
} | ||
bandwidthRateOutbound, _ := meter. | ||
SyncFloat64(). | ||
Histogram( | ||
"p2p_bandwidth_rate_outbound", | ||
instrument.WithDescription("total number of bytes sent by the host"), | ||
) | ||
if err != nil { | ||
return err | ||
} | ||
p2pPeerCount, _ := meter. | ||
AsyncFloat64(). | ||
Gauge( | ||
"p2p_peer_count", | ||
instrument.WithDescription("number of peers connected to the host"), | ||
) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
return meter.RegisterCallback( | ||
[]instrument.Asynchronous{ | ||
p2pPeerCount, | ||
}, func(ctx context.Context) { | ||
bcStats := bc.GetBandwidthTotals() | ||
bcByPeerStats := bc.GetBandwidthByPeer() | ||
|
||
bandwidthTotalInbound.Record(ctx, bcStats.TotalIn) | ||
bandwidthTotalOutbound.Record(ctx, bcStats.TotalOut) | ||
bandwidthRateInbound.Record(ctx, bcStats.RateIn) | ||
bandwidthRateOutbound.Record(ctx, bcStats.RateOut) | ||
|
||
p2pPeerCount.Observe(ctx, float64(len(bcByPeerStats))) | ||
}, | ||
) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If it happens, it means some test broke and writes keyring data on disk. It does not hurt to have it though to prevent accidental push of this test data.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
New line