Skip to content

Commit

Permalink
Introduce tsh bench kube <kube_cluster> tool
Browse files Browse the repository at this point in the history
This PR introduces a Kubernetes benchmark tool that allow us to test the
Kubernetes access flow using a similar approach used for ssh.

This PR renames the default SSH benchmark to `tsh bench ssh` while Kube
benchmarks are available using `tsh bench kube`.

Fixes #23763
  • Loading branch information
tigrato committed Mar 29, 2023
1 parent 875ba61 commit f1208f6
Show file tree
Hide file tree
Showing 2 changed files with 143 additions and 13 deletions.
135 changes: 129 additions & 6 deletions lib/benchmark/benchmark.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ import (
"github.com/HdrHistogram/hdrhistogram-go"
"github.com/gravitational/trace"
"github.com/sirupsen/logrus"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"

"github.com/gravitational/teleport/lib/client"
"github.com/gravitational/teleport/lib/observability/tracing"
Expand All @@ -48,6 +51,16 @@ const (
pauseTimeBetweenBenchmarks = time.Second * 5
)

// Service is a the Teleport service to benchmark.
type Service string

const (
// SSHService is the SSH service
SSHService Service = "ssh"
// KubernetesService is the Kubernetes service
KubernetesService Service = "kube"
)

// Config specifies benchmark requests to run
type Config struct {
// Rate is requests per second origination rate
Expand All @@ -60,6 +73,19 @@ type Config struct {
MinimumWindow time.Duration
// MinimumMeasurments is the min amount of requests
MinimumMeasurements int
// Service is the service to benchmark
Service Service
}

// CheckAndSetDefaults checks and sets default values for the benchmark config.
func (c *Config) CheckAndSetDefaults() error {
switch c.Service {
case SSHService:
case KubernetesService:
default:
return trace.BadParameter("unsupported service %q", c.Service)
}
return nil
}

// Result is a result of the benchmark
Expand Down Expand Up @@ -128,7 +154,7 @@ func ExportLatencyProfile(path string, h *hdrhistogram.Histogram, ticks int32, v
timeStamp := time.Now().Format("2006-01-02_15:04:05")
suffix := fmt.Sprintf("latency_profile_%s.txt", timeStamp)
if path != "." {
if err := os.MkdirAll(path, 0700); err != nil {
if err := os.MkdirAll(path, 0o700); err != nil {
return "", trace.Wrap(err)
}
}
Expand All @@ -140,7 +166,6 @@ func ExportLatencyProfile(path string, h *hdrhistogram.Histogram, ticks int32, v

if _, err := h.PercentilesPrint(fo, ticks, valueScale); err != nil {
if err := fo.Close(); err != nil {

logrus.WithError(err).Warningf("failed to close file")
}
return "", trace.Wrap(err)
Expand All @@ -152,10 +177,33 @@ func ExportLatencyProfile(path string, h *hdrhistogram.Histogram, ticks int32, v
return fo.Name(), nil
}

// workloadFunc is a function that executes a single benchmark call.
type workloadFunc func(benchMeasure) error

// Benchmark connects to remote server and executes requests in parallel according
// to benchmark spec. It returns benchmark result when completed.
// This is a blocking function that can be canceled via context argument.
func (c *Config) Benchmark(ctx context.Context, tc *client.TeleportClient) (Result, error) {
if err := c.CheckAndSetDefaults(); err != nil {
return Result{}, trace.Wrap(err)
}

var (
workload workloadFunc
err error
)
switch c.Service {
case SSHService:
workload = executeSSHBenchmark
case KubernetesService:
workload, err = kubernetesBenchmarkCreator(ctx, tc)
if err != nil {
return Result{}, trace.Wrap(err)
}
default:
return Result{}, trace.BadParameter("unsupported service %q", c.Service)
}

tc.Stdout = io.Discard
tc.Stderr = io.Discard
tc.Stdin = &bytes.Buffer{}
Expand Down Expand Up @@ -184,7 +232,7 @@ func (c *Config) Benchmark(ctx context.Context, tc *client.TeleportClient) (Resu
client: tc,
interactive: c.Interactive,
}
go work(ctx, measure, resultC)
go work(ctx, measure, resultC, workload)
case <-ctx.Done():
close(requestsC)
return
Expand Down Expand Up @@ -231,8 +279,8 @@ type benchMeasure struct {
interactive bool
}

func work(ctx context.Context, m benchMeasure, send chan<- benchMeasure) {
m.Error = execute(m)
func work(ctx context.Context, m benchMeasure, send chan<- benchMeasure, workload workloadFunc) {
m.Error = workload(m)
m.End = time.Now()
select {
case send <- m:
Expand All @@ -241,7 +289,7 @@ func work(ctx context.Context, m benchMeasure, send chan<- benchMeasure) {
}
}

func execute(m benchMeasure) error {
func executeSSHBenchmark(m benchMeasure) error {
if !m.interactive {
// do not use parent context that will cancel in flight requests
// because we give test some time to gracefully wrap up
Expand All @@ -268,6 +316,81 @@ func execute(m benchMeasure) error {
return nil
}

func kubernetesBenchmarkCreator(ctx context.Context, tc *client.TeleportClient) (workloadFunc, error) {
tlsClientConfig, err := getKubeTLSClientConfig(ctx, tc)
if err != nil {
return nil, trace.Wrap(err)
}
// create a kubernetes client that will be used to execute the benchmark.
kubeClient, err := kubernetes.NewForConfig(&rest.Config{
Host: tc.KubeClusterAddr(),
TLSClientConfig: tlsClientConfig,
})
if err != nil {
return nil, trace.Wrap(err)
}
return func(bm benchMeasure) error {
// List all pods in all namespaces.
_, err := kubeClient.CoreV1().Pods("").List(ctx, metav1.ListOptions{})
return trace.Wrap(err)
}, nil
}

// getKubeTLSClientConfig returns a TLS client config for the kubernetes cluster
// that the client wants to connected to.
func getKubeTLSClientConfig(ctx context.Context, tc *client.TeleportClient) (rest.TLSClientConfig, error) {
var k *client.Key
err := client.RetryWithRelogin(ctx, tc, func() error {
var err error
k, err = tc.IssueUserCertsWithMFA(ctx, client.ReissueParams{
RouteToCluster: tc.SiteName,
KubernetesCluster: tc.KubernetesCluster,
}, nil /*applyOpts*/)
return err
})
if err != nil {
return rest.TLSClientConfig{}, trace.Wrap(err)
}

certPem := k.KubeTLSCerts[tc.KubernetesCluster]

rsaKeyPEM, err := k.PrivateKey.RSAPrivateKeyPEM()
if err != nil {
return rest.TLSClientConfig{}, trace.Wrap(err)
}

credentials, err := tc.LocalAgent().GetCoreKey()
if err != nil {
return rest.TLSClientConfig{}, trace.Wrap(err)
}

var clusterCAs [][]byte
if tc.LoadAllCAs {
clusterCAs = credentials.TLSCAs()
} else {
clusterCAs, err = credentials.RootClusterCAs()
if err != nil {
return rest.TLSClientConfig{}, trace.Wrap(err)
}
}
if len(clusterCAs) == 0 {
return rest.TLSClientConfig{}, trace.BadParameter("no trusted CAs found")
}

tlsServerName := ""
if tc.TLSRoutingEnabled {
k8host, _ := tc.KubeProxyHostPort()
tlsServerName = client.GetKubeTLSServerName(k8host)
}

return rest.TLSClientConfig{
CAData: bytes.Join(clusterCAs, []byte("\n")),
CertData: certPem,
KeyData: rsaKeyPEM,
ServerName: tlsServerName,
}, nil
}

// makeTeleportClient creates an instance of a teleport client
func makeTeleportClient(host, login, proxy string) (*client.TeleportClient, error) {
c := client.Config{
Expand Down
21 changes: 14 additions & 7 deletions tool/tsh/tsh.go
Original file line number Diff line number Diff line change
Expand Up @@ -860,17 +860,21 @@ func Run(ctx context.Context, args []string, opts ...cliOption) error {
// bench
bench := app.Command("bench", "Run shell or execute a command on a remote SSH node").Hidden()
bench.Flag("cluster", clusterHelp).Short('c').StringVar(&cf.SiteName)
bench.Arg("[user@]host", "Remote hostname and the login to use").Required().StringVar(&cf.UserHost)
bench.Arg("command", "Command to execute on a remote host").Required().StringsVar(&cf.RemoteCommand)
bench.Flag("port", "SSH port on a remote host").Short('p').Int32Var(&cf.NodePort)
bench.Flag("duration", "Test duration").Default("1s").DurationVar(&cf.BenchDuration)
bench.Flag("rate", "Requests per second rate").Default("10").IntVar(&cf.BenchRate)
bench.Flag("interactive", "Create interactive SSH session").BoolVar(&cf.BenchInteractive)
bench.Flag("export", "Export the latency profile").BoolVar(&cf.BenchExport)
bench.Flag("path", "Directory to save the latency profile to, default path is the current directory").Default(".").StringVar(&cf.BenchExportPath)
bench.Flag("ticks", "Ticks per half distance").Default("100").Int32Var(&cf.BenchTicks)
bench.Flag("scale", "Value scale in which to scale the recorded values").Default("1.0").Float64Var(&cf.BenchValueScale)

benchSSH := bench.Command("ssh", "Run SSH benchmark test")
benchSSH.Arg("[user@]host", "Remote hostname and the login to use").Required().StringVar(&cf.UserHost)
benchSSH.Arg("command", "Command to execute on a remote host").Required().StringsVar(&cf.RemoteCommand)
benchSSH.Flag("port", "SSH port on a remote host").Short('p').Int32Var(&cf.NodePort)
benchSSH.Flag("interactive", "Create interactive SSH session").BoolVar(&cf.BenchInteractive)

benchKube := bench.Command("kube", "Run Kube benchmark test")
benchKube.Arg("kube_cluster", "Kubernetes cluster to use").Required().StringVar(&cf.KubernetesCluster)
// show key
show := app.Command("show", "Read an identity from file and print to stdout").Hidden()
show.Arg("identity_file", "The file containing a public key or a certificate").Required().StringVar(&cf.IdentityFileIn)
Expand Down Expand Up @@ -1126,8 +1130,10 @@ func Run(ctx context.Context, args []string, opts ...cliOption) error {
err = onVersion(&cf)
case ssh.FullCommand():
err = onSSH(&cf)
case bench.FullCommand():
err = onBenchmark(&cf)
case benchSSH.FullCommand():
err = onBenchmark(&cf, benchmark.SSHService)
case benchKube.FullCommand():
err = onBenchmark(&cf, benchmark.KubernetesService)
case join.FullCommand():
err = onJoin(&cf)
case scp.FullCommand():
Expand Down Expand Up @@ -3079,7 +3085,7 @@ func onSSH(cf *CLIConf) error {
}

// onBenchmark executes benchmark
func onBenchmark(cf *CLIConf) error {
func onBenchmark(cf *CLIConf, service benchmark.Service) error {
tc, err := makeClient(cf, false)
if err != nil {
return trace.Wrap(err)
Expand All @@ -3088,6 +3094,7 @@ func onBenchmark(cf *CLIConf) error {
Command: cf.RemoteCommand,
MinimumWindow: cf.BenchDuration,
Rate: cf.BenchRate,
Service: service,
}
result, err := cnf.Benchmark(cf.Context, tc)
if err != nil {
Expand Down

0 comments on commit f1208f6

Please sign in to comment.