Skip to content

Commit

Permalink
refactor bench code
Browse files Browse the repository at this point in the history
  • Loading branch information
tigrato committed Mar 30, 2023
1 parent bd2f352 commit 34d222a
Show file tree
Hide file tree
Showing 6 changed files with 244 additions and 185 deletions.
12 changes: 11 additions & 1 deletion examples/bench/example.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"context"
"fmt"
"os"
"strings"
"time"

"github.com/gravitational/teleport/lib/benchmark"
Expand All @@ -34,7 +35,16 @@ func main() {
}

// Run Linear generator
results, err := benchmark.Run(context.TODO(), linear, "ls -l /", "host", "username", "teleport.example.com")
results, err := benchmark.Run(
context.TODO(),
linear,
"host",
"username",
"teleport.example.com",
benchmark.SSHBenchmark{
Command: strings.Split("ls -l /", " "),
},
)
if err != nil {
fmt.Println(err)
os.Exit(1)
Expand Down
177 changes: 12 additions & 165 deletions lib/benchmark/benchmark.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,28 +21,19 @@ import (
"context"
"fmt"
"io"
"net/http"
"os"
"os/signal"
"path/filepath"
"strings"
"syscall"
"time"

"github.com/HdrHistogram/hdrhistogram-go"
"github.com/gravitational/trace"
"github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/remotecommand"
"k8s.io/kubectl/pkg/scheme"

"github.com/gravitational/teleport/lib/client"
"github.com/gravitational/teleport/lib/observability/tracing"
"github.com/gravitational/teleport/lib/utils"
)

const (
Expand Down Expand Up @@ -70,40 +61,10 @@ const (
type Config struct {
// Rate is requests per second origination rate
Rate int
// Command is a command to run
Command []string
// Interactive turns on interactive sessions
Interactive bool
// MinimumWindow is the min duration
MinimumWindow time.Duration
// MinimumMeasurments is the min amount of requests
MinimumMeasurements int
// Service is the service to benchmark
Service Service
// PodExecBenchmark is the pod exec benchmark config.
// When not nil, it will be used to run the benchmark using the pod exec
// method. If nil, the benchmark will list pods in all namespaces.
PodExecBenchmark *PodExecBenchmark
// PodNamespace is the namespace of the pod to run the benchmark against.
PodNamespace string
}

type PodExecBenchmark struct {
// ContainerName is the name of the container to run the benchmark against.
ContainerName string
// PodName is the name of the pod to run the benchmark against.
PodName string
}

// CheckAndSetDefaults checks and sets default values for the benchmark config.
func (c *Config) CheckAndSetDefaults() error {
switch c.Service {
case SSHService:
case KubernetesService:
default:
return trace.BadParameter("unsupported service %q", c.Service)
}
return nil
}

// Result is a result of the benchmark
Expand All @@ -123,9 +84,8 @@ type Result struct {
// Run is used to run the benchmarks, it is given a generator, command to run,
// a host, host login, and proxy. If host login or proxy is an empty string, it will
// use the default login
func Run(ctx context.Context, lg *Linear, cmd, host, login, proxy string) ([]Result, error) {
c := strings.Split(cmd, " ")
lg.config = &Config{Command: c}
func Run(ctx context.Context, lg *Linear, host, login, proxy string, suite BenchmarkSuite) ([]Result, error) {
lg.config = &Config{}
if err := validateConfig(lg); err != nil {
return nil, trace.Wrap(err)
}
Expand Down Expand Up @@ -156,7 +116,7 @@ func Run(ctx context.Context, lg *Linear, cmd, host, login, proxy string) ([]Res
if benchmarkC == nil {
break
}
result, err := benchmarkC.Benchmark(ctx, tc)
result, err := benchmarkC.Benchmark(ctx, tc, suite)
if err != nil {
return results, trace.Wrap(err)
}
Expand Down Expand Up @@ -196,35 +156,25 @@ func ExportLatencyProfile(path string, h *hdrhistogram.Histogram, ticks int32, v
}

// workloadFunc is a function that executes a single benchmark call.
type workloadFunc func(benchMeasure) error
type workloadFunc func(context.Context, benchMeasure) error

// Benchmark connects to remote server and executes requests in parallel according
// to benchmark spec. It returns benchmark result when completed.
// This is a blocking function that can be canceled via context argument.
func (c *Config) Benchmark(ctx context.Context, tc *client.TeleportClient) (Result, error) {
if err := c.CheckAndSetDefaults(); err != nil {
return Result{}, trace.Wrap(err)
func (c *Config) Benchmark(ctx context.Context, tc *client.TeleportClient, suite BenchmarkSuite) (Result, error) {
if suite == nil {
return Result{}, trace.BadParameter("missing benchmark suite")
}

var (
workload workloadFunc
err error
)
switch c.Service {
case SSHService:
workload = executeSSHBenchmark
case KubernetesService:
workload, err = c.kubernetesBenchmarkCreator(ctx, tc)
if err != nil {
return Result{}, trace.Wrap(err)
}
default:
return Result{}, trace.BadParameter("unsupported service %q", c.Service)
workload, err := suite.workload(ctx, tc)
if err != nil {
return Result{}, trace.Wrap(err)
}

tc.Stdout = io.Discard
tc.Stderr = io.Discard
tc.Stdin = &bytes.Buffer{}

var delay time.Duration
ctx, cancel := context.WithCancel(ctx)
defer cancel()
Expand All @@ -246,9 +196,6 @@ func (c *Config) Benchmark(ctx context.Context, tc *client.TeleportClient) (Resu
t := start.Add(delay)
measure := benchMeasure{
ResponseStart: t,
command: c.Command,
client: tc,
interactive: c.Interactive,
}
go work(ctx, measure, resultC, workload)
case <-ctx.Done():
Expand Down Expand Up @@ -292,13 +239,10 @@ type benchMeasure struct {
ResponseStart time.Time
End time.Time
Error error
client *client.TeleportClient
command []string
interactive bool
}

func work(ctx context.Context, m benchMeasure, send chan<- benchMeasure, workload workloadFunc) {
m.Error = workload(m)
m.Error = workload(ctx, m)
m.End = time.Now()
select {
case send <- m:
Expand All @@ -307,103 +251,6 @@ func work(ctx context.Context, m benchMeasure, send chan<- benchMeasure, workloa
}
}

func executeSSHBenchmark(m benchMeasure) error {
if !m.interactive {
// do not use parent context that will cancel in flight requests
// because we give test some time to gracefully wrap up
// the in-flight connections to avoid extra errors
return m.client.SSH(context.TODO(), m.command, false)
}
config := m.client.Config
client, err := client.NewClient(&config)
if err != nil {
return err
}
reader, writer := io.Pipe()
defer reader.Close()
defer writer.Close()
client.Stdin = reader
out := &utils.SyncBuffer{}
client.Stdout = out
client.Stderr = out
err = m.client.SSH(context.TODO(), nil, false)
if err != nil {
return err
}
writer.Write([]byte(strings.Join(m.command, " ") + "\r\nexit\r\n"))
return nil
}

func (c *Config) kubernetesBenchmarkCreator(ctx context.Context, tc *client.TeleportClient) (workloadFunc, error) {
tlsClientConfig, err := getKubeTLSClientConfig(ctx, tc)
if err != nil {
return nil, trace.Wrap(err)
}
restConfig := &rest.Config{
Host: tc.KubeClusterAddr(),
TLSClientConfig: tlsClientConfig,
APIPath: "/api",
ContentConfig: rest.ContentConfig{
GroupVersion: &schema.GroupVersion{Version: "v1"},
NegotiatedSerializer: scheme.Codecs,
},
}

// if the user has specified a pod exec benchmark, use that instead of the
// default kubernetes client.
if c.PodExecBenchmark != nil {
exec, err := c.kubeExecOnPod(ctx, tc, restConfig)
if err != nil {
return nil, trace.Wrap(err)
}
return func(bm benchMeasure) error {
err := exec.StreamWithContext(ctx, remotecommand.StreamOptions{
Stdin: tc.Stdin,
Stdout: tc.Stdout,
Stderr: tc.Stderr,
Tty: c.Interactive,
})
return trace.Wrap(err)
}, nil
}

// create a kubernetes client that will be used to execute the benchmark.
kubeClient, err := kubernetes.NewForConfig(restConfig)
if err != nil {
return nil, trace.Wrap(err)
}
return func(bm benchMeasure) error {
// List all pods in all namespaces.
_, err := kubeClient.CoreV1().Pods(c.PodNamespace).List(ctx, metav1.ListOptions{})
return trace.Wrap(err)
}, nil
}

func (c *Config) kubeExecOnPod(ctx context.Context, tc *client.TeleportClient, restConfig *rest.Config) (remotecommand.Executor, error) {
restClient, err := rest.RESTClientFor(restConfig)
if err != nil {
return nil, trace.Wrap(err)
}

req := restClient.Post().
Resource("pods").
Name(c.PodExecBenchmark.PodName).
Namespace(c.PodNamespace).
SubResource("exec")

req.VersionedParams(&corev1.PodExecOptions{
Container: c.PodExecBenchmark.ContainerName,
Command: c.Command,
Stdin: tc.Stdin != nil,
Stdout: tc.Stdout != nil,
Stderr: tc.Stderr != nil,
TTY: c.Interactive,
}, scheme.ParameterCodec)

exec, err := remotecommand.NewSPDYExecutor(restConfig, http.MethodPost, req.URL())
return exec, trace.Wrap(err)
}

// getKubeTLSClientConfig returns a TLS client config for the kubernetes cluster
// that the client wants to connected to.
func getKubeTLSClientConfig(ctx context.Context, tc *client.TeleportClient) (rest.TLSClientConfig, error) {
Expand Down
Loading

0 comments on commit 34d222a

Please sign in to comment.