Skip to content

Commit

Permalink
Introduce tsh bench kube tool (#23781)
Browse files Browse the repository at this point in the history
This PR introduces a Kubernetes benchmark tool that allows us to test the Kubernetes access flow using a similar approach used for ssh.

This PR renames the default SSH benchmark to `tsh bench ssh` while Kube benchmarks are available using `tsh bench kube`.

Closes #23763
  • Loading branch information
tigrato authored Apr 4, 2023
1 parent e0796b6 commit 3ccfad0
Show file tree
Hide file tree
Showing 6 changed files with 349 additions and 64 deletions.
12 changes: 11 additions & 1 deletion examples/bench/example.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"context"
"fmt"
"os"
"strings"
"time"

"github.com/gravitational/teleport/lib/benchmark"
Expand All @@ -34,7 +35,16 @@ func main() {
}

// Run Linear generator
results, err := benchmark.Run(context.TODO(), linear, "ls -l /", "host", "username", "teleport.example.com")
results, err := benchmark.Run(
context.TODO(),
linear,
"host",
"username",
"teleport.example.com",
benchmark.SSHBenchmark{
Command: strings.Split("ls -l /", " "),
},
)
if err != nil {
fmt.Println(err)
os.Exit(1)
Expand Down
87 changes: 38 additions & 49 deletions lib/benchmark/benchmark.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"os"
"os/signal"
"path/filepath"
"strings"
"syscall"
"time"

Expand All @@ -34,7 +33,6 @@ import (

"github.com/gravitational/teleport/lib/client"
"github.com/gravitational/teleport/lib/observability/tracing"
"github.com/gravitational/teleport/lib/utils"
)

const (
Expand All @@ -48,14 +46,20 @@ const (
pauseTimeBetweenBenchmarks = time.Second * 5
)

// Service is a the Teleport service to benchmark.
type Service string

const (
// SSHService is the SSH service
SSHService Service = "ssh"
// KubernetesService is the Kubernetes service
KubernetesService Service = "kube"
)

// Config specifies benchmark requests to run
type Config struct {
// Rate is requests per second origination rate
Rate int
// Command is a command to run
Command []string
// Interactive turns on interactive sessions
Interactive bool
// MinimumWindow is the min duration
MinimumWindow time.Duration
// MinimumMeasurments is the min amount of requests
Expand All @@ -79,9 +83,8 @@ type Result struct {
// Run is used to run the benchmarks, it is given a generator, command to run,
// a host, host login, and proxy. If host login or proxy is an empty string, it will
// use the default login
func Run(ctx context.Context, lg *Linear, cmd, host, login, proxy string) ([]Result, error) {
c := strings.Split(cmd, " ")
lg.config = &Config{Command: c}
func Run(ctx context.Context, lg *Linear, host, login, proxy string, suite BenchmarkSuite) ([]Result, error) {
lg.config = &Config{}
if err := validateConfig(lg); err != nil {
return nil, trace.Wrap(err)
}
Expand Down Expand Up @@ -112,7 +115,7 @@ func Run(ctx context.Context, lg *Linear, cmd, host, login, proxy string) ([]Res
if benchmarkC == nil {
break
}
result, err := benchmarkC.Benchmark(ctx, tc)
result, err := benchmarkC.Benchmark(ctx, tc, suite)
if err != nil {
return results, trace.Wrap(err)
}
Expand All @@ -128,7 +131,7 @@ func ExportLatencyProfile(path string, h *hdrhistogram.Histogram, ticks int32, v
timeStamp := time.Now().Format("2006-01-02_15:04:05")
suffix := fmt.Sprintf("latency_profile_%s.txt", timeStamp)
if path != "." {
if err := os.MkdirAll(path, 0700); err != nil {
if err := os.MkdirAll(path, 0o700); err != nil {
return "", trace.Wrap(err)
}
}
Expand All @@ -140,7 +143,6 @@ func ExportLatencyProfile(path string, h *hdrhistogram.Histogram, ticks int32, v

if _, err := h.PercentilesPrint(fo, ticks, valueScale); err != nil {
if err := fo.Close(); err != nil {

logrus.WithError(err).Warningf("failed to close file")
}
return "", trace.Wrap(err)
Expand All @@ -152,13 +154,33 @@ func ExportLatencyProfile(path string, h *hdrhistogram.Histogram, ticks int32, v
return fo.Name(), nil
}

// WorkloadFunc is a function that executes a single benchmark call.
type WorkloadFunc func(context.Context) error

// BenchmarkSuite is an interface that defines a benchmark suite.
type BenchmarkSuite interface {
// BenchBuilder returns a function that executes a single benchmark call.
// The returned function is called in a loop until the context is canceled.
BenchBuilder(context.Context, *client.TeleportClient) (WorkloadFunc, error)
}

// Benchmark connects to remote server and executes requests in parallel according
// to benchmark spec. It returns benchmark result when completed.
// This is a blocking function that can be canceled via context argument.
func (c *Config) Benchmark(ctx context.Context, tc *client.TeleportClient) (Result, error) {
func (c *Config) Benchmark(ctx context.Context, tc *client.TeleportClient, suite BenchmarkSuite) (Result, error) {
if suite == nil {
return Result{}, trace.BadParameter("missing benchmark suite")
}

workload, err := suite.BenchBuilder(ctx, tc)
if err != nil {
return Result{}, trace.Wrap(err)
}

tc.Stdout = io.Discard
tc.Stderr = io.Discard
tc.Stdin = &bytes.Buffer{}

var delay time.Duration
ctx, cancel := context.WithCancel(ctx)
defer cancel()
Expand All @@ -180,11 +202,8 @@ func (c *Config) Benchmark(ctx context.Context, tc *client.TeleportClient) (Resu
t := start.Add(delay)
measure := benchMeasure{
ResponseStart: t,
command: c.Command,
client: tc,
interactive: c.Interactive,
}
go work(ctx, measure, resultC)
go work(ctx, measure, resultC, workload)
case <-ctx.Done():
close(requestsC)
return
Expand Down Expand Up @@ -226,13 +245,10 @@ type benchMeasure struct {
ResponseStart time.Time
End time.Time
Error error
client *client.TeleportClient
command []string
interactive bool
}

func work(ctx context.Context, m benchMeasure, send chan<- benchMeasure) {
m.Error = execute(m)
func work(ctx context.Context, m benchMeasure, send chan<- benchMeasure, workload WorkloadFunc) {
m.Error = workload(ctx)
m.End = time.Now()
select {
case send <- m:
Expand All @@ -241,33 +257,6 @@ func work(ctx context.Context, m benchMeasure, send chan<- benchMeasure) {
}
}

func execute(m benchMeasure) error {
if !m.interactive {
// do not use parent context that will cancel in flight requests
// because we give test some time to gracefully wrap up
// the in-flight connections to avoid extra errors
return m.client.SSH(context.TODO(), m.command, false)
}
config := m.client.Config
client, err := client.NewClient(&config)
if err != nil {
return err
}
reader, writer := io.Pipe()
defer reader.Close()
defer writer.Close()
client.Stdin = reader
out := &utils.SyncBuffer{}
client.Stdout = out
client.Stderr = out
err = m.client.SSH(context.TODO(), nil, false)
if err != nil {
return err
}
writer.Write([]byte(strings.Join(m.command, " ") + "\r\nexit\r\n"))
return nil
}

// makeTeleportClient creates an instance of a teleport client
func makeTeleportClient(host, login, proxy string) (*client.TeleportClient, error) {
c := client.Config{
Expand Down
Loading

0 comments on commit 3ccfad0

Please sign in to comment.