Skip to content

Commit

Permalink
refactor(gateway-plugins): ext-proc server codebase
Browse files Browse the repository at this point in the history
Signed-off-by: bitliu <bitliu@tencent.com>
  • Loading branch information
Xunzhuo committed Mar 6, 2025
1 parent 66f7c4c commit ace2ae8
Show file tree
Hide file tree
Showing 20 changed files with 719 additions and 595 deletions.
2 changes: 1 addition & 1 deletion cmd/plugins/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func main() {
s := grpc.NewServer()

extProcPb.RegisterExternalProcessorServer(s, gateway.NewServer(redisClient, k8sClient))
healthPb.RegisterHealthServer(s, &gateway.HealthServer{})
healthPb.RegisterHealthServer(s, gateway.NewHealthCheckServer())

klog.Info("starting gRPC server on port :50052")

Expand Down
8 changes: 8 additions & 0 deletions pkg/plugins/gateway/algorithms/least_busy_time.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,14 @@ import (
"k8s.io/klog/v2"
)

var (
RouterLeastBusyTime Algorithms = "least-busy-time"
)

func init() {
Register(RouterLeastBusyTime, func() (Router, error) { return NewLeastBusyTimeRouter() })
}

type leastBusyTimeRouter struct {
cache *cache.Cache
}
Expand Down
8 changes: 8 additions & 0 deletions pkg/plugins/gateway/algorithms/least_kv_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,14 @@ import (
"k8s.io/klog/v2"
)

var (
RouterLeastKvCache Algorithms = "least-kv-cache"
)

func init() {
Register(RouterLeastKvCache, func() (Router, error) { return NewLeastKvCacheRouter() })
}

type leastKvCacheRouter struct {
cache *cache.Cache
}
Expand Down
8 changes: 8 additions & 0 deletions pkg/plugins/gateway/algorithms/least_latency.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,14 @@ import (
"k8s.io/klog/v2"
)

var (
RouterLeastLatency Algorithms = "least-latency"
)

func init() {
Register(RouterLeastLatency, func() (Router, error) { return NewLeastExpectedLatencyRouter() })
}

type leastExpectedLatencyRouter struct {
cache *cache.Cache
}
Expand Down
8 changes: 8 additions & 0 deletions pkg/plugins/gateway/algorithms/least_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,14 @@ import (
"k8s.io/klog/v2"
)

var (
RouterLeastRequest Algorithms = "least-request"
)

func init() {
Register(RouterLeastRequest, func() (Router, error) { return NewLeastRequestRouter() })
}

type leastRequestRouter struct {
cache *cache.Cache
}
Expand Down
8 changes: 8 additions & 0 deletions pkg/plugins/gateway/algorithms/prefix_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,14 @@ import (
"k8s.io/klog/v2"
)

var (
RouterPrefixCache Algorithms = "prefix-cache"
)

func init() {
Register(RouterPrefixCache, func() (Router, error) { return NewPrefixCacheRouter() })
}

const (
defaultPrefixCacheMatchThresholdPercent = 50
)
Expand Down
8 changes: 8 additions & 0 deletions pkg/plugins/gateway/algorithms/prefix_cache_and_load.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,14 @@ import (
"k8s.io/klog/v2"
)

var (
RouterPrefixCacheAndLoad Algorithms = "prefix-cache-and-load"
)

func init() {
Register(RouterPrefixCacheAndLoad, func() (Router, error) { return NewPrefixCacheAndLoadRouter() })
}

const (
defaultDecodingLength = 45 // FIXME: decode length is hardcoded. Preble as well.
slidingWindowPeriod = 3 * time.Minute // NOTE: hardcoded
Expand Down
8 changes: 8 additions & 0 deletions pkg/plugins/gateway/algorithms/random.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,14 @@ import (
v1 "k8s.io/api/core/v1"
)

var (
RouterRandom Algorithms = "random"
)

func init() {
Register(RouterRandom, func() (Router, error) { return NewRandomRouter() })
}

type randomRouter struct {
}

Expand Down
34 changes: 21 additions & 13 deletions pkg/plugins/gateway/algorithms/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,27 +18,35 @@ package routingalgorithms

import (
"context"
"fmt"

"github.com/vllm-project/aibrix/pkg/utils"

v1 "k8s.io/api/core/v1"
)

type Algorithms string

// Router defines the interface for routing logic to select target pods.
type Router interface {
// TODO: add routeContext as a function parameter.
// Route returns the target pod
Route(ctx context.Context, pods map[string]*v1.Pod, model, message string) (string, error)
}

// selectRandomPodWithRand selects a random pod from the provided pod map.
// It returns an error if no ready pods are available.
func selectRandomPod(pods map[string]*v1.Pod, randomFn func(int) int) (string, error) {
readyPods := utils.FilterReadyPods(pods)
if len(readyPods) == 0 {
return "", fmt.Errorf("no ready pods available for fallback")
}
randomPod := readyPods[randomFn(len(readyPods))]
return randomPod.Status.PodIP, nil
// Validate validates if user provided routing routers is supported by gateway
func Validate(algorithms Algorithms) bool {
_, ok := routerStores[algorithms]
return ok
}

// Select the user provided routing routers is supported by gateway
func Select(algorithms Algorithms) routerFunc {
return routerRegistry[algorithms]
}

func Register(algorithms Algorithms, router routerFunc) {
routerRegistry[algorithms] = router
routerStores[algorithms] = struct{}{}
}

var routerRegistry = map[Algorithms]routerFunc{}
var routerStores = map[Algorithms]any{}

type routerFunc func() (Router, error)
8 changes: 8 additions & 0 deletions pkg/plugins/gateway/algorithms/throughput.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,14 @@ import (
"k8s.io/klog/v2"
)

var (
RouterThroughput Algorithms = "throughput"
)

func init() {
Register(RouterThroughput, func() (Router, error) { return NewThroughputRouter() })
}

type throughputRouter struct {
cache *cache.Cache
}
Expand Down
14 changes: 14 additions & 0 deletions pkg/plugins/gateway/algorithms/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ package routingalgorithms

import (
"fmt"

"github.com/vllm-project/aibrix/pkg/utils"
v1 "k8s.io/api/core/v1"
)

const podMetricPort = "8000"
Expand All @@ -28,3 +31,14 @@ func getPodAddress(podIP string) (string, error) {
}
return fmt.Sprintf("%v:%v", podIP, podMetricPort), nil
}

// selectRandomPodWithRand selects a random pod from the provided pod map.
// It returns an error if no ready pods are available.
func selectRandomPod(pods map[string]*v1.Pod, randomFn func(int) int) (string, error) {
readyPods := utils.FilterReadyPods(pods)
if len(readyPods) == 0 {
return "", fmt.Errorf("no ready pods available for fallback")
}
randomPod := readyPods[randomFn(len(readyPods))]
return randomPod.Status.PodIP, nil
}
Loading

0 comments on commit ace2ae8

Please sign in to comment.