Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce tsh bench kube tool #23781

Merged
merged 1 commit into from
Apr 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion examples/bench/example.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"context"
"fmt"
"os"
"strings"
"time"

"github.com/gravitational/teleport/lib/benchmark"
Expand All @@ -34,7 +35,16 @@ func main() {
}

// Run Linear generator
results, err := benchmark.Run(context.TODO(), linear, "ls -l /", "host", "username", "teleport.example.com")
results, err := benchmark.Run(
context.TODO(),
linear,
"host",
"username",
"teleport.example.com",
benchmark.SSHBenchmark{
Command: strings.Split("ls -l /", " "),
},
)
if err != nil {
fmt.Println(err)
os.Exit(1)
Expand Down
87 changes: 38 additions & 49 deletions lib/benchmark/benchmark.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"os"
"os/signal"
"path/filepath"
"strings"
"syscall"
"time"

Expand All @@ -34,7 +33,6 @@ import (

"github.com/gravitational/teleport/lib/client"
"github.com/gravitational/teleport/lib/observability/tracing"
"github.com/gravitational/teleport/lib/utils"
)

const (
Expand All @@ -48,14 +46,20 @@ const (
pauseTimeBetweenBenchmarks = time.Second * 5
)

// Service is a the Teleport service to benchmark.
type Service string

const (
// SSHService is the SSH service
SSHService Service = "ssh"
// KubernetesService is the Kubernetes service
KubernetesService Service = "kube"
)

// Config specifies benchmark requests to run
type Config struct {
// Rate is requests per second origination rate
Rate int
// Command is a command to run
Command []string
// Interactive turns on interactive sessions
Interactive bool
// MinimumWindow is the min duration
MinimumWindow time.Duration
// MinimumMeasurments is the min amount of requests
Expand All @@ -79,9 +83,8 @@ type Result struct {
// Run is used to run the benchmarks, it is given a generator, command to run,
// a host, host login, and proxy. If host login or proxy is an empty string, it will
// use the default login
func Run(ctx context.Context, lg *Linear, cmd, host, login, proxy string) ([]Result, error) {
c := strings.Split(cmd, " ")
lg.config = &Config{Command: c}
func Run(ctx context.Context, lg *Linear, host, login, proxy string, suite BenchmarkSuite) ([]Result, error) {
lg.config = &Config{}
if err := validateConfig(lg); err != nil {
return nil, trace.Wrap(err)
}
Expand Down Expand Up @@ -112,7 +115,7 @@ func Run(ctx context.Context, lg *Linear, cmd, host, login, proxy string) ([]Res
if benchmarkC == nil {
break
}
result, err := benchmarkC.Benchmark(ctx, tc)
result, err := benchmarkC.Benchmark(ctx, tc, suite)
if err != nil {
return results, trace.Wrap(err)
}
Expand All @@ -128,7 +131,7 @@ func ExportLatencyProfile(path string, h *hdrhistogram.Histogram, ticks int32, v
timeStamp := time.Now().Format("2006-01-02_15:04:05")
suffix := fmt.Sprintf("latency_profile_%s.txt", timeStamp)
if path != "." {
if err := os.MkdirAll(path, 0700); err != nil {
if err := os.MkdirAll(path, 0o700); err != nil {
return "", trace.Wrap(err)
}
}
Expand All @@ -140,7 +143,6 @@ func ExportLatencyProfile(path string, h *hdrhistogram.Histogram, ticks int32, v

if _, err := h.PercentilesPrint(fo, ticks, valueScale); err != nil {
if err := fo.Close(); err != nil {

logrus.WithError(err).Warningf("failed to close file")
}
return "", trace.Wrap(err)
Expand All @@ -152,13 +154,33 @@ func ExportLatencyProfile(path string, h *hdrhistogram.Histogram, ticks int32, v
return fo.Name(), nil
}

// WorkloadFunc is a function that executes a single benchmark call.
type WorkloadFunc func(context.Context) error

// BenchmarkSuite is an interface that defines a benchmark suite.
type BenchmarkSuite interface {
// BenchBuilder returns a function that executes a single benchmark call.
// The returned function is called in a loop until the context is canceled.
BenchBuilder(context.Context, *client.TeleportClient) (WorkloadFunc, error)
}

// Benchmark connects to remote server and executes requests in parallel according
// to benchmark spec. It returns benchmark result when completed.
// This is a blocking function that can be canceled via context argument.
func (c *Config) Benchmark(ctx context.Context, tc *client.TeleportClient) (Result, error) {
func (c *Config) Benchmark(ctx context.Context, tc *client.TeleportClient, suite BenchmarkSuite) (Result, error) {
if suite == nil {
return Result{}, trace.BadParameter("missing benchmark suite")
}

workload, err := suite.BenchBuilder(ctx, tc)
if err != nil {
return Result{}, trace.Wrap(err)
}

tc.Stdout = io.Discard
tc.Stderr = io.Discard
tc.Stdin = &bytes.Buffer{}

var delay time.Duration
ctx, cancel := context.WithCancel(ctx)
defer cancel()
Expand All @@ -180,11 +202,8 @@ func (c *Config) Benchmark(ctx context.Context, tc *client.TeleportClient) (Resu
t := start.Add(delay)
measure := benchMeasure{
ResponseStart: t,
command: c.Command,
client: tc,
interactive: c.Interactive,
}
go work(ctx, measure, resultC)
go work(ctx, measure, resultC, workload)
case <-ctx.Done():
close(requestsC)
return
Expand Down Expand Up @@ -226,13 +245,10 @@ type benchMeasure struct {
ResponseStart time.Time
End time.Time
Error error
client *client.TeleportClient
command []string
interactive bool
}

func work(ctx context.Context, m benchMeasure, send chan<- benchMeasure) {
m.Error = execute(m)
func work(ctx context.Context, m benchMeasure, send chan<- benchMeasure, workload WorkloadFunc) {
m.Error = workload(ctx)
m.End = time.Now()
select {
case send <- m:
Expand All @@ -241,33 +257,6 @@ func work(ctx context.Context, m benchMeasure, send chan<- benchMeasure) {
}
}

func execute(m benchMeasure) error {
if !m.interactive {
// do not use parent context that will cancel in flight requests
// because we give test some time to gracefully wrap up
// the in-flight connections to avoid extra errors
return m.client.SSH(context.TODO(), m.command, false)
}
config := m.client.Config
client, err := client.NewClient(&config)
if err != nil {
return err
}
reader, writer := io.Pipe()
defer reader.Close()
defer writer.Close()
client.Stdin = reader
out := &utils.SyncBuffer{}
client.Stdout = out
client.Stderr = out
err = m.client.SSH(context.TODO(), nil, false)
if err != nil {
return err
}
writer.Write([]byte(strings.Join(m.command, " ") + "\r\nexit\r\n"))
return nil
}

// makeTeleportClient creates an instance of a teleport client
func makeTeleportClient(host, login, proxy string) (*client.TeleportClient, error) {
c := client.Config{
Expand Down
Loading