Skip to content

Commit

Permalink
wgengine: instrument with usermetrics
Browse files Browse the repository at this point in the history
Updates tailscale/corp#22075

Signed-off-by: Kristoffer Dalby <kristoffer@tailscale.com>
  • Loading branch information
kradalby committed Oct 14, 2024
1 parent adc8368 commit 40c991f
Show file tree
Hide file tree
Showing 7 changed files with 509 additions and 23 deletions.
146 changes: 145 additions & 1 deletion tsnet/tsnet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
dto "github.com/prometheus/client_model/go"
"github.com/prometheus/common/expfmt"
"golang.org/x/net/proxy"
"tailscale.com/client/tailscale"
"tailscale.com/cmd/testwrapper/flakytest"
"tailscale.com/health"
"tailscale.com/ipn"
Expand Down Expand Up @@ -874,6 +875,78 @@ func promMetricLabelsStr(labels []*dto.LabelPair) string {
return b.String()
}

// sendData sends a given amount of bytes from s1 to s2.
func sendData(logf func(format string, args ...any), ctx context.Context, bytesCount int, s1, s2 *Server, s1ip, s2ip netip.Addr) error {
l := must.Get(s1.Listen("tcp", fmt.Sprintf("%s:8081", s1ip)))
defer l.Close()

// Dial to s1 from s2
w, err := s2.Dial(ctx, "tcp", fmt.Sprintf("%s:8081", s1ip))
if err != nil {
return err
}
defer w.Close()

stopReceive := make(chan struct{})
defer close(stopReceive)
allReceived := make(chan error)
defer close(allReceived)

go func() {
conn, err := l.Accept()
if err != nil {
allReceived <- err
return
}
conn.SetWriteDeadline(time.Now().Add(30 * time.Second))

total := 0
recvStart := time.Now()
for {
got := make([]byte, bytesCount)
n, err := conn.Read(got)
if n != bytesCount {
logf("read %d bytes, want %d", n, bytesCount)
}

select {
case <-stopReceive:
return
default:
}

if err != nil {
allReceived <- fmt.Errorf("failed reading packet, %s", err)
return
}

total += n
logf("received %d/%d bytes, %.2f %%", total, bytesCount, (float64(total) / (float64(bytesCount)) * 100))
if total == bytesCount {
break
}
}

logf("all received, took: %s", time.Since(recvStart).String())
allReceived <- nil
}()

sendStart := time.Now()
w.SetWriteDeadline(time.Now().Add(30 * time.Second))
if _, err := w.Write(bytes.Repeat([]byte("A"), bytesCount)); err != nil {
stopReceive <- struct{}{}
return err
}

logf("all sent (%s), waiting for all packets (%d) to be received", time.Since(sendStart).String(), bytesCount)
err, _ = <-allReceived
if err != nil {
return err
}

return nil
}

func TestUserMetrics(t *testing.T) {
flakytest.Mark(t, "https://github.com/tailscale/tailscale/issues/13420")
tstest.ResourceCheck(t)
Expand All @@ -882,7 +955,7 @@ func TestUserMetrics(t *testing.T) {

controlURL, c := startControl(t)
s1, s1ip, s1PubKey := startServer(t, ctx, controlURL, "s1")
s2, _, _ := startServer(t, ctx, controlURL, "s2")
s2, s2ip, _ := startServer(t, ctx, controlURL, "s2")

s1.lb.EditPrefs(&ipn.MaskedPrefs{
Prefs: ipn.Prefs{
Expand Down Expand Up @@ -951,6 +1024,20 @@ func TestUserMetrics(t *testing.T) {
return status1.Self.PrimaryRoutes != nil && status1.Self.PrimaryRoutes.Len() == int(wantRoutes)+1
})

mustDirect(t, t.Logf, lc1, lc2)

// 10 megabytes
bytesToSend := 10 * 1024 * 1024

// This asserts generates some traffic, it is factored out
// of TestUDPConn.
start := time.Now()
err = sendData(t.Logf, ctx, bytesToSend, s1, s2, s1ip, s2ip)
if err != nil {
t.Fatalf("Failed to send packets: %v", err)
}
t.Logf("Sent %d bytes from s1 to s2 in %s", bytesToSend, time.Since(start).String())

ctxLc, cancelLc := context.WithTimeout(context.Background(), 5*time.Second)
defer cancelLc()
metrics1, err := lc1.UserMetrics(ctxLc)
Expand All @@ -968,6 +1055,9 @@ func TestUserMetrics(t *testing.T) {
t.Fatal(err)
}

// Allow the metrics for the bytes sent to be off by 15%.
bytesSentTolerance := 1.15

t.Logf("Metrics1:\n%s\n", metrics1)

// The node is advertising 4 routes:
Expand Down Expand Up @@ -997,6 +1087,18 @@ func TestUserMetrics(t *testing.T) {
t.Errorf("metrics1, tailscaled_primary_routes: got %v, want %v", got, want)
}

// Verify that the amount of data recorded in bytes is higher or equal to the
// 10 megabytes sent.
inboundBytes1 := parsedMetrics1[`tailscaled_inbound_bytes_total{path="direct_ipv4"}`]
if inboundBytes1 < float64(bytesToSend) {
t.Errorf(`metrics1, tailscaled_inbound_bytes_total{path="direct_ipv4"}: expected higher (or equal) than %d, got: %f`, bytesToSend, inboundBytes1)
}

// But ensure that it is not too much higher than the 10 megabytes sent.
if inboundBytes1 > float64(bytesToSend)*bytesSentTolerance {
t.Errorf(`metrics1, tailscaled_inbound_bytes_total{path="direct_ipv4"}: expected lower than %f, got: %f`, float64(bytesToSend)*bytesSentTolerance, inboundBytes1)
}

metrics2, err := lc2.UserMetrics(ctx)
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -1033,6 +1135,18 @@ func TestUserMetrics(t *testing.T) {
if got, want := parsedMetrics2["tailscaled_primary_routes"], 0.0; got != want {
t.Errorf("metrics2, tailscaled_primary_routes: got %v, want %v", got, want)
}

// Verify that the amount of data recorded in bytes is higher or equal than the
// 10 megabytes sent.
outboundBytes2 := parsedMetrics2[`tailscaled_outbound_bytes_total{path="direct_ipv4"}`]
if outboundBytes2 < float64(bytesToSend) {
t.Errorf(`metrics2, tailscaled_outbound_bytes_total{path="direct_ipv4"}: expected higher (or equal) than %d, got: %f`, bytesToSend, outboundBytes2)
}

// But ensure that it is not too much higher than the 10 megabytes sent.
if outboundBytes2 > float64(bytesToSend)*bytesSentTolerance {
t.Errorf(`metrics2, tailscaled_outbound_bytes_total{path="direct_ipv4"}: expected lower than %f, got: %f`, float64(bytesToSend)*bytesSentTolerance, outboundBytes2)
}
}

func waitForCondition(t *testing.T, msg string, waitTime time.Duration, f func() bool) {
Expand All @@ -1044,3 +1158,33 @@ func waitForCondition(t *testing.T, msg string, waitTime time.Duration, f func()
}
t.Fatalf("waiting for condition: %s", msg)
}

// mustDirect ensures there is a direct connection between LocalClient 1 and 2
func mustDirect(t *testing.T, logf logger.Logf, lc1, lc2 *tailscale.LocalClient) {
t.Helper()
lastLog := time.Now().Add(-time.Minute)
// See https://github.com/tailscale/tailscale/issues/654
// and https://github.com/tailscale/tailscale/issues/3247 for discussions of this deadline.
for deadline := time.Now().Add(30 * time.Second); time.Now().Before(deadline); time.Sleep(10 * time.Millisecond) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
status1, err := lc1.Status(ctx)
if err != nil {
continue
}
status2, err := lc2.Status(ctx)
if err != nil {
continue
}
pst := status1.Peer[status2.Self.PublicKey]
if pst.CurAddr != "" {
logf("direct link %s->%s found with addr %s", status1.Self.HostName, status2.Self.HostName, pst.CurAddr)
return
}
if now := time.Now(); now.Sub(lastLog) > time.Second {
logf("no direct path %s->%s yet, addrs %v", status1.Self.HostName, status2.Self.HostName, pst.Addrs)
lastLog = now
}
}
t.Error("magicsock did not find a direct path from lc1 to lc2")
}
51 changes: 51 additions & 0 deletions util/clientmetric/clientmetric.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,16 @@ import (
"bytes"
"encoding/binary"
"encoding/hex"
"expvar"
"fmt"
"io"
"sort"
"strings"
"sync"
"sync/atomic"
"time"

"tailscale.com/util/set"
)

var (
Expand Down Expand Up @@ -223,6 +226,54 @@ func NewGaugeFunc(name string, f func() int64) *Metric {
return m
}

// AggregateCounter returns a sum of expvar counters registered with it.
type AggregateCounter struct {
mu sync.RWMutex
counters set.Set[*expvar.Int]
}

func (c *AggregateCounter) Value() int64 {
c.mu.RLock()
defer c.mu.RUnlock()
var sum int64
for cnt := range c.counters {
sum += cnt.Value()
}
return sum
}

// Register registers provided expvar counter.
// When a counter is added to the counter, it will be reset
// to start counting from 0. This is to avoid incrementing the
// counter with an unexpectedly large value.
func (c *AggregateCounter) Register(counter *expvar.Int) {
c.mu.Lock()
defer c.mu.Unlock()
// No need to do anything if it's already registered.
if c.counters.Contains(counter) {
return
}
counter.Set(0)
c.counters.Add(counter)
}

// UnregisterAll unregisters all counters resulting in it
// starting back down at zero. This is to ensure monotonicity
// and respect the semantics of the counter.
func (c *AggregateCounter) UnregisterAll() {
c.mu.Lock()
defer c.mu.Unlock()
c.counters = set.Set[*expvar.Int]{}
}

// NewAggregateCounter returns a new aggregate counter that returns
// a sum of expvar variables registered with it.
func NewAggregateCounter(name string) *AggregateCounter {
c := &AggregateCounter{counters: set.Set[*expvar.Int]{}}
NewGaugeFunc(name, c.Value)
return c
}

// WritePrometheusExpositionFormat writes all client metrics to w in
// the Prometheus text-based exposition format.
//
Expand Down
49 changes: 49 additions & 0 deletions util/clientmetric/clientmetric_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,11 @@
package clientmetric

import (
"expvar"
"testing"
"time"

qt "github.com/frankban/quicktest"
)

func TestDeltaEncBuf(t *testing.T) {
Expand Down Expand Up @@ -107,3 +110,49 @@ func TestWithFunc(t *testing.T) {
t.Errorf("second = %q; want %q", got, want)
}
}

func TestAggregateCounter(t *testing.T) {
clearMetrics()

c := qt.New(t)

expv1 := &expvar.Int{}
expv2 := &expvar.Int{}
expv3 := &expvar.Int{}

aggCounter := NewAggregateCounter("agg_counter")

aggCounter.Register(expv1)
c.Assert(aggCounter.Value(), qt.Equals, int64(0))

expv1.Add(1)
c.Assert(aggCounter.Value(), qt.Equals, int64(1))

aggCounter.Register(expv2)
c.Assert(aggCounter.Value(), qt.Equals, int64(1))

expv1.Add(1)
expv2.Add(1)
c.Assert(aggCounter.Value(), qt.Equals, int64(3))

// Adding a new expvar should not change the value
// and any value the counter already had is reset
expv3.Set(5)
aggCounter.Register(expv3)
c.Assert(aggCounter.Value(), qt.Equals, int64(3))

// Registering the same expvar multiple times should not change the value
aggCounter.Register(expv3)
c.Assert(aggCounter.Value(), qt.Equals, int64(3))

aggCounter.UnregisterAll()
c.Assert(aggCounter.Value(), qt.Equals, int64(0))

// Start over
expv3.Set(5)
aggCounter.Register(expv3)
c.Assert(aggCounter.Value(), qt.Equals, int64(0))

expv3.Set(5)
c.Assert(aggCounter.Value(), qt.Equals, int64(5))
}
6 changes: 4 additions & 2 deletions wgengine/magicsock/derp.go
Original file line number Diff line number Diff line change
Expand Up @@ -669,7 +669,8 @@ func (c *Conn) runDerpWriter(ctx context.Context, dc *derphttp.Client, ch <-chan
c.logf("magicsock: derp.Send(%v): %v", wr.addr, err)
metricSendDERPError.Add(1)
} else {
metricSendDERP.Add(1)
c.metrics.outboundPacketsDERPTotal.Add(1)
c.metrics.outboundBytesDERPTotal.Add(int64(len(wr.b)))
}
}
}
Expand All @@ -690,7 +691,8 @@ func (c *connBind) receiveDERP(buffs [][]byte, sizes []int, eps []conn.Endpoint)
// No data read occurred. Wait for another packet.
continue
}
metricRecvDataDERP.Add(1)
c.metrics.inboundPacketsDERPTotal.Add(1)
c.metrics.inboundBytesDERPTotal.Add(int64(n))
sizes[0] = n
eps[0] = ep
return 1, nil
Expand Down
27 changes: 20 additions & 7 deletions wgengine/magicsock/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -960,26 +960,39 @@ func (de *endpoint) send(buffs [][]byte) error {
de.noteBadEndpoint(udpAddr)
}

var txBytes int
for _, b := range buffs {
txBytes += len(b)
}

switch {
case udpAddr.Addr().Is4():
de.c.metrics.outboundPacketsIPv4Total.Add(int64(len(buffs)))
de.c.metrics.outboundBytesIPv4Total.Add(int64(txBytes))
case udpAddr.Addr().Is6():
de.c.metrics.outboundPacketsIPv6Total.Add(int64(len(buffs)))
de.c.metrics.outboundBytesIPv6Total.Add(int64(txBytes))
}

// TODO(raggi): needs updating for accuracy, as in error conditions we may have partial sends.
if stats := de.c.stats.Load(); err == nil && stats != nil {
var txBytes int
for _, b := range buffs {
txBytes += len(b)
}
stats.UpdateTxPhysical(de.nodeAddr, udpAddr, txBytes)
}
}
if derpAddr.IsValid() {
allOk := true
var txBytes int
for _, buff := range buffs {
ok, _ := de.c.sendAddr(derpAddr, de.publicKey, buff)
if stats := de.c.stats.Load(); stats != nil {
stats.UpdateTxPhysical(de.nodeAddr, derpAddr, len(buff))
}
txBytes += len(buff)
if !ok {
allOk = false
}
}

if stats := de.c.stats.Load(); stats != nil {
stats.UpdateTxPhysical(de.nodeAddr, derpAddr, txBytes)
}
if allOk {
return nil
}
Expand Down
Loading

0 comments on commit 40c991f

Please sign in to comment.