diff --git a/docs/development/app/README.md b/docs/development/app/README.md index 253b5b7d..02651de5 100644 --- a/docs/development/app/README.md +++ b/docs/development/app/README.md @@ -9,7 +9,7 @@ kind load docker-image aibrix/vllm:v0.1.0 2. Deploy mocked model image ```shell kubectl apply -f deployment.yaml -kubectl port-forward svc/lora-test-mac-only 8000:8000 & +kubectl port-forward svc/llama2-70b 8000:8000 & ``` ## Test python app separately diff --git a/go.mod b/go.mod index fc5d0538..ae647b81 100644 --- a/go.mod +++ b/go.mod @@ -11,6 +11,7 @@ require ( k8s.io/code-generator v0.29.2 k8s.io/klog/v2 v2.110.1 k8s.io/kube-openapi v0.0.0-20231010175941-2dd684a91f00 + k8s.io/utils v0.0.0-20230726121419-3b25d923346b sigs.k8s.io/controller-runtime v0.17.3 sigs.k8s.io/gateway-api v1.0.0 ) @@ -70,7 +71,6 @@ require ( k8s.io/apiextensions-apiserver v0.29.2 // indirect k8s.io/component-base v0.29.2 // indirect k8s.io/gengo v0.0.0-20230829151522-9cce18d56c01 // indirect - k8s.io/utils v0.0.0-20230726121419-3b25d923346b // indirect sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect sigs.k8s.io/structured-merge-diff/v4 v4.4.1 // indirect sigs.k8s.io/yaml v1.4.0 // indirect diff --git a/pkg/plugins/ratelimiter/Dockerfile b/pkg/plugins/ratelimiter/Dockerfile new file mode 100644 index 00000000..9aafc373 --- /dev/null +++ b/pkg/plugins/ratelimiter/Dockerfile @@ -0,0 +1,18 @@ +## Multistage build +FROM golang:1.21 as build +ENV CGO_ENABLED=0 +ENV GOOS=linux +ENV GOARCH=amd64 + +WORKDIR /src +COPY . . +RUN go mod download +RUN go build -o /ext_proc + +## Multistage deploy +FROM gcr.io/distroless/base-debian10 + +WORKDIR / +COPY --from=build /ext_proc /ext_proc + +ENTRYPOINT ["/ext_proc"] diff --git a/pkg/plugins/ratelimiter/Makefile b/pkg/plugins/ratelimiter/Makefile new file mode 100644 index 00000000..d8fa708e --- /dev/null +++ b/pkg/plugins/ratelimiter/Makefile @@ -0,0 +1,15 @@ +.PHONY: build +build: + docker rmi aibrix/tpm:v0.1.0 --force + docker build -t aibrix/tpm:v0.1.0 -f Dockerfile . + kind load docker-image aibrix/tpm:v0.1.0 + +.PHONY: apply +apply: + kubectl apply -f deployment.yaml + kubectl apply -f plugins.yaml + +.PHONY: delete +delete: + kubectl delete -f deployment.yaml + kubectl delete -f plugins.yaml \ No newline at end of file diff --git a/pkg/plugins/ratelimiter/README.md b/pkg/plugins/ratelimiter/README.md new file mode 100644 index 00000000..fa170375 --- /dev/null +++ b/pkg/plugins/ratelimiter/README.md @@ -0,0 +1,27 @@ + + +# Install backed storage for persist rpm/tpm configuration +kubectl apply -f redis.yaml + +# Add rpm/tpm config +kubectl exec -it redis-master- -- redis-cli + +set aibrix:_TPM_LIMIT 100 +set aibrix:_RPM_LIMIT 10 + +# Install extension proc +make build && make apply + +# Test requests +```shell +curl -v http://localhost:8888/v1/chat/completions \ + -H "user: varun" \ + -H "model: llama2-70b" \ + -H "Content-Type: application/json" \ + -H "Authorization: Bearer any_key" \ + -d '{ + "model": "llama2-70b", + "messages": [{"role": "user", "content": "Say this is a test!"}], + "temperature": 0.7 + }' +``` \ No newline at end of file diff --git a/pkg/plugins/ratelimiter/deployment.yaml b/pkg/plugins/ratelimiter/deployment.yaml new file mode 100644 index 00000000..e6ced823 --- /dev/null +++ b/pkg/plugins/ratelimiter/deployment.yaml @@ -0,0 +1,45 @@ +apiVersion: v1 +kind: Service +metadata: + name: grpc-ext-proc +spec: + selector: + app: grpc-ext-proc + ports: + - protocol: TCP + port: 50052 + targetPort: 50052 +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: grpc-ext-proc +spec: + replicas: 1 + selector: + matchLabels: + app: grpc-ext-proc + template: + metadata: + labels: + app: grpc-ext-proc + spec: + containers: + - name: golang-app-container + image: aibrix/tpm:v0.1.0 + ports: + - containerPort: 50052 + env: + - name: REDIS_HOST + value: redis-master + - name: REDIS_PORT + value: "6379" + - name: POD_NAME + valueFrom: + fieldRef: + fieldPath: metadata.name + - name: POD_NAMESPACE + valueFrom: + fieldRef: + fieldPath: metadata.namespace + diff --git a/pkg/plugins/ratelimiter/go.mod b/pkg/plugins/ratelimiter/go.mod new file mode 100644 index 00000000..af7164a5 --- /dev/null +++ b/pkg/plugins/ratelimiter/go.mod @@ -0,0 +1,25 @@ +module github.com/aibrix/aibrix/pkg/plugins/ext_proc + +go 1.21 + +require ( + github.com/coocood/freecache v1.2.4 + github.com/envoyproxy/go-control-plane v0.12.0 + github.com/redis/go-redis/v9 v9.6.1 + github.com/sashabaranov/go-openai v1.28.1 + google.golang.org/grpc v1.65.0 + k8s.io/klog v1.0.0 +) + +require ( + github.com/cespare/xxhash/v2 v2.3.0 // indirect + github.com/cncf/xds/go v0.0.0-20240423153145-555b57ec207b // indirect + github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect + github.com/envoyproxy/protoc-gen-validate v1.0.4 // indirect + github.com/golang/protobuf v1.5.4 // indirect + golang.org/x/net v0.25.0 // indirect + golang.org/x/sys v0.20.0 // indirect + golang.org/x/text v0.15.0 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20240528184218-531527333157 // indirect + google.golang.org/protobuf v1.34.1 // indirect +) diff --git a/pkg/plugins/ratelimiter/go.sum b/pkg/plugins/ratelimiter/go.sum new file mode 100644 index 00000000..1008c2f0 --- /dev/null +++ b/pkg/plugins/ratelimiter/go.sum @@ -0,0 +1,40 @@ +github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs= +github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c= +github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA= +github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0= +github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= +github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/cncf/xds/go v0.0.0-20240423153145-555b57ec207b h1:ga8SEFjZ60pxLcmhnThWgvH2wg8376yUJmPhEH4H3kw= +github.com/cncf/xds/go v0.0.0-20240423153145-555b57ec207b/go.mod h1:W+zGtBO5Y1IgJhy4+A9GOqVhqLpfZi+vwmdNXUehLA8= +github.com/coocood/freecache v1.2.4 h1:UdR6Yz/X1HW4fZOuH0Z94KwG851GWOSknua5VUbb/5M= +github.com/coocood/freecache v1.2.4/go.mod h1:RBUWa/Cy+OHdfTGFEhEuE1pMCMX51Ncizj7rthiQ3vk= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= +github.com/envoyproxy/go-control-plane v0.12.0 h1:4X+VP1GHd1Mhj6IB5mMeGbLCleqxjletLK6K0rbxyZI= +github.com/envoyproxy/go-control-plane v0.12.0/go.mod h1:ZBTaoJ23lqITozF0M6G4/IragXCQKCnYbmlmtHvwRG0= +github.com/envoyproxy/protoc-gen-validate v1.0.4 h1:gVPz/FMfvh57HdSJQyvBtF00j8JU4zdyUgIUNhlgg0A= +github.com/envoyproxy/protoc-gen-validate v1.0.4/go.mod h1:qys6tmnRsYrQqIhm2bvKZH4Blx/1gTIZ2UKVY1M+Yew= +github.com/go-logr/logr v0.1.0/go.mod h1:ixOQHD9gLJUVQQ2ZOR7zLEifBX6tGkNJF4QyIY7sIas= +github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= +github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/redis/go-redis/v9 v9.6.1 h1:HHDteefn6ZkTtY5fGUE8tj8uy85AHk6zP7CpzIAM0y4= +github.com/redis/go-redis/v9 v9.6.1/go.mod h1:0C0c6ycQsdpVNQpxb1njEQIqkx5UcsM8FJCQLgE9+RA= +github.com/sashabaranov/go-openai v1.28.1 h1:aREx6faUTeOZNMDTNGAY8B9vNmmN7qoGvDV0Ke2J1Mc= +github.com/sashabaranov/go-openai v1.28.1/go.mod h1:lj5b/K+zjTSFxVLijLSTDZuP7adOgerWeFyZLUhAKRg= +golang.org/x/net v0.25.0 h1:d/OCCoBEUq33pjydKrGQhw7IlUPI2Oylr+8qLx49kac= +golang.org/x/net v0.25.0/go.mod h1:JkAGAh7GEvH74S6FOH42FLoXpXbE/aqXSrIQjXgsiwM= +golang.org/x/sys v0.20.0 h1:Od9JTbYCk261bKm4M/mw7AklTlFYIa0bIp9BgSm1S8Y= +golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/text v0.15.0 h1:h1V/4gjBv8v9cjcR6+AR5+/cIYK5N/WAgiv4xlsEtAk= +golang.org/x/text v0.15.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240528184218-531527333157 h1:Zy9XzmMEflZ/MAaA7vNcoebnRAld7FsPW1EeBB7V0m8= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240528184218-531527333157/go.mod h1:EfXuqaE1J41VCDicxHzUDm+8rk+7ZdXzHV0IhO/I6s0= +google.golang.org/grpc v1.65.0 h1:bs/cUb4lp1G5iImFFd3u5ixQzweKizoZJAwBNLR42lc= +google.golang.org/grpc v1.65.0/go.mod h1:WgYC2ypjlB0EiQi6wdKixMqukr6lBc0Vo+oOgjrM5ZQ= +google.golang.org/protobuf v1.34.1 h1:9ddQBjfCyZPOHPUiPxpYESBLc+T8P3E+Vo4IbKZgFWg= +google.golang.org/protobuf v1.34.1/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= +k8s.io/klog v1.0.0 h1:Pt+yjF5aB1xDSVbau4VsWe+dQNzA0qv1LlXdC2dF6Q8= +k8s.io/klog v1.0.0/go.mod h1:4Bi6QPql/J/LkTDqv7R/cd3hPo4k2DG6Ptcz060Ez5I= diff --git a/pkg/plugins/ratelimiter/main.go b/pkg/plugins/ratelimiter/main.go new file mode 100644 index 00000000..89653a41 --- /dev/null +++ b/pkg/plugins/ratelimiter/main.go @@ -0,0 +1,278 @@ +/* +Copyright 2024 The Aibrix Team. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package main + +import ( + "context" + "encoding/json" + "fmt" + "io" + "log" + "net" + "os" + "os/signal" + "strings" + "syscall" + "time" + + ratelimiter "github.com/aibrix/aibrix/pkg/plugins/ext_proc/rate_limiter" + "github.com/coocood/freecache" + redis "github.com/redis/go-redis/v9" + openai "github.com/sashabaranov/go-openai" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + "k8s.io/klog" + + configPb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" + filterPb "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/ext_proc/v3" + extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3" + healthPb "google.golang.org/grpc/health/grpc_health_v1" +) + +var ( + cache *freecache.Cache + cacheKey = []byte("key") +) + +type server struct { + ratelimiter ratelimiter.AccountRateLimiter +} +type healthServer struct{} + +func (s *healthServer) Check(ctx context.Context, in *healthPb.HealthCheckRequest) (*healthPb.HealthCheckResponse, error) { + return &healthPb.HealthCheckResponse{Status: healthPb.HealthCheckResponse_SERVING}, nil +} + +func (s *healthServer) Watch(in *healthPb.HealthCheckRequest, srv healthPb.Health_WatchServer) error { + return status.Error(codes.Unimplemented, "watch is not implemented") +} + +func (s *server) Process(srv extProcPb.ExternalProcessor_ProcessServer) error { + var user string + ctx := srv.Context() + + for { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + req, err := srv.Recv() + if err == io.EOF { + return nil + } + if err != nil { + return status.Errorf(codes.Unknown, "cannot receive stream request: %v", err) + } + + resp := &extProcPb.ProcessingResponse{} + + switch v := req.Request.(type) { + + case *extProcPb.ProcessingRequest_RequestHeaders: + r := req.Request + h := r.(*extProcPb.ProcessingRequest_RequestHeaders) + klog.Infof("In RequestHeaders pricessing, Request: %+v, EndOfStream: %+v", r, h.RequestHeaders.EndOfStream) + + for _, n := range h.RequestHeaders.Headers.Headers { + if strings.ToLower(n.Key) == "user" { + user = string(n.RawValue) + } + } + + klog.Infof("user: %v", user) + + // TODO (varun): add check if user exists in backend storage + // if no user name present in the request headers + if user == "" { + klog.Infoln("user does not exists") + return status.Errorf(codes.PermissionDenied, "user does not exists") + } + code, err := s.checkRPM(ctx, user) + if err != nil { + return status.Errorf(code, err.Error()) + } + + code, err = s.checkTPM(ctx, user) + if err != nil { + return status.Errorf(code, err.Error()) + } + + resp = &extProcPb.ProcessingResponse{ + Response: &extProcPb.ProcessingResponse_RequestHeaders{ + RequestHeaders: &extProcPb.HeadersResponse{ + Response: &extProcPb.CommonResponse{ + HeaderMutation: &extProcPb.HeaderMutation{ + SetHeaders: []*configPb.HeaderValueOption{}, + }, + }, + }, + }, + ModeOverride: &filterPb.ProcessingMode{ + ResponseHeaderMode: filterPb.ProcessingMode_DEFAULT, + RequestBodyMode: filterPb.ProcessingMode_NONE, + ResponseBodyMode: filterPb.ProcessingMode_STREAMED, + }, + } + + case *extProcPb.ProcessingRequest_RequestBody: + resp = &extProcPb.ProcessingResponse{ + Response: &extProcPb.ProcessingResponse_RequestBody{ + RequestBody: &extProcPb.BodyResponse{ + Response: &extProcPb.CommonResponse{}, + }, + }, + } + + case *extProcPb.ProcessingRequest_ResponseHeaders: + resp = &extProcPb.ProcessingResponse{ + Response: &extProcPb.ProcessingResponse_ResponseHeaders{ + ResponseHeaders: &extProcPb.HeadersResponse{ + Response: &extProcPb.CommonResponse{}, + }, + }, + } + + case *extProcPb.ProcessingRequest_ResponseBody: + klog.Info("In ResponseBody") + r := req.Request + b := r.(*extProcPb.ProcessingRequest_ResponseBody) + + var res openai.CompletionResponse + json.Unmarshal(b.ResponseBody.Body, &res) + + rpm, _ := s.ratelimiter.Incr(ctx, fmt.Sprintf("%v_RPM_CURRENT", user), 1) + tpm, _ := s.ratelimiter.Incr(ctx, fmt.Sprintf("%v_TPM_CURRENT", user), int64(res.Usage.TotalTokens)) + klog.Infof("Updated RPM: %v, TPM: %v for user: %v", rpm, tpm, user) + + default: + log.Printf("Unknown Request type %+v\n", v) + } + + if err := srv.Send(resp); err != nil { + log.Printf("send error %v", err) + } + } +} + +func (s *server) checkRPM(ctx context.Context, user string) (codes.Code, error) { + rpmLimit, err := s.ratelimiter.GetLimit(ctx, fmt.Sprintf("%v_RPM_LIMIT", user)) + if err != nil { + klog.Error(err) + return codes.Internal, fmt.Errorf("fail to get requests per minute limit for user: %v", user) + } + rpmCurrent, err := s.ratelimiter.Get(ctx, fmt.Sprintf("%v_RPM_CURRENT", user)) + if err != nil { + klog.Error(err) + return codes.Internal, fmt.Errorf("fail to get requests per minute current for user: %v", user) + } + klog.Infof("rmpCurrent: %v, rpmLimit: %v", rpmCurrent, rpmLimit) + if rpmCurrent > rpmLimit { + err := fmt.Errorf("requests per limit of:%v, reached for user: %v", rpmLimit, user) + klog.Errorln(err) + return codes.ResourceExhausted, err + } + + return codes.OK, nil +} + +func (s *server) checkTPM(ctx context.Context, user string) (codes.Code, error) { + tpmLimit, err := s.ratelimiter.GetLimit(ctx, fmt.Sprintf("%v_TPM_LIMIT", user)) + if err != nil { + klog.Error(err) + return codes.Internal, fmt.Errorf("fail to get tokens per minute limit for user: %v", user) + } + tpmCurrent, err := s.ratelimiter.Get(ctx, fmt.Sprintf("%v_TPM_CURRENT", user)) + if err != nil { + klog.Error(err) + return codes.Internal, fmt.Errorf("fail to get tokens per minute current for user: %v", user) + } + klog.Infof("tpmCurrent: %v, tpmLimit: %v", tpmCurrent, tpmLimit) + if tpmCurrent > tpmLimit { + err := fmt.Errorf("tokens per limit of:%v, reached for user: %v", tpmLimit, user) + klog.Errorln(err) + return codes.ResourceExhausted, err + } + + return codes.OK, nil +} + +// Create Redis Client +var ( + host = getEnv("REDIS_HOST", "localhost") + port = string(getEnv("REDIS_PORT", "6379")) +) + +func getEnv(key, defaultValue string) string { + value := os.Getenv(key) + if value == "" { + return defaultValue + } + return value +} + +const ( + RPM_LIMIT = 4 + TPM_LIMIT = 1000 +) + +func main() { + + // Connect to Redis + client := redis.NewClient(&redis.Options{ + Addr: host + ":" + port, + DB: 0, // Default DB + }) + + // Ping the Redis server to check the connection + pong, err := client.Ping(context.Background()).Result() + if err != nil { + log.Fatal("Error connecting to Redis:", err) + } + fmt.Println("Connected to Redis:", pong) + + // grpc server init + lis, err := net.Listen("tcp", ":50052") + if err != nil { + log.Fatalf("failed to listen: %v", err) + } + + s := grpc.NewServer() + + extProcPb.RegisterExternalProcessorServer(s, &server{ + ratelimiter: ratelimiter.NewRedisAccountRateLimiter("aibrix", client, 1*time.Minute), + }) + healthPb.RegisterHealthServer(s, &healthServer{}) + + log.Println("Starting gRPC server on port :50052") + + // shutdown + var gracefulStop = make(chan os.Signal) + signal.Notify(gracefulStop, syscall.SIGTERM) + signal.Notify(gracefulStop, syscall.SIGINT) + go func() { + sig := <-gracefulStop + log.Printf("caught sig: %+v", sig) + log.Println("Wait for 1 second to finish processing") + time.Sleep(1 * time.Second) + os.Exit(0) + }() + + s.Serve(lis) +} diff --git a/pkg/plugins/ratelimiter/plugins.yaml b/pkg/plugins/ratelimiter/plugins.yaml new file mode 100644 index 00000000..b784ad29 --- /dev/null +++ b/pkg/plugins/ratelimiter/plugins.yaml @@ -0,0 +1,18 @@ +# not need as EnvoyExtensionPolicy will be auto-created with HTTPRoute, just keeping for now for sample testing. +apiVersion: gateway.envoyproxy.io/v1alpha1 +kind: EnvoyExtensionPolicy +metadata: + name: ext-proc-example +spec: + targetRef: + group: gateway.networking.k8s.io + kind: Gateway + name: eg + extProc: + - backendRefs: + - name: grpc-ext-proc + port: 50052 + processingMode: + request: {} + response: + body: Streamed diff --git a/pkg/plugins/ratelimiter/rate_limiter/rate_limiter.go b/pkg/plugins/ratelimiter/rate_limiter/rate_limiter.go new file mode 100644 index 00000000..3254d5f5 --- /dev/null +++ b/pkg/plugins/ratelimiter/rate_limiter/rate_limiter.go @@ -0,0 +1,31 @@ +/* +Copyright 2024 The Aibrix Team. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package ratelimiter + +import ( + "context" +) + +type AccountRateLimiter interface { + // Get the rate limit for the given key and return the current value. + Get(ctx context.Context, key string) (int64, error) + + GetLimit(ctx context.Context, key string) (int64, error) + + // Incr increments the rate limit for the given key and return the increased value. + Incr(ctx context.Context, key string, val int64) (int64, error) +} diff --git a/pkg/plugins/ratelimiter/rate_limiter/redis.go b/pkg/plugins/ratelimiter/rate_limiter/redis.go new file mode 100644 index 00000000..d3a89ca2 --- /dev/null +++ b/pkg/plugins/ratelimiter/rate_limiter/redis.go @@ -0,0 +1,89 @@ +/* +Copyright 2024 The Aibrix Team. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package ratelimiter + +import ( + "context" + "errors" + "fmt" + "time" + + "github.com/redis/go-redis/v9" +) + +// 用来限制 rate limiter 使用的 key 的数量,需要大于 2 +const binSize = 64 + +type redisAccountRateLimiter struct { + client *redis.Client + name string + windowSize time.Duration +} + +// simple fixed window rate limiter +func NewRedisAccountRateLimiter(name string, client *redis.Client, windowSize time.Duration) AccountRateLimiter { + if windowSize < time.Second { + windowSize = time.Second + } + + return &redisAccountRateLimiter{ + name: name, + client: client, + windowSize: windowSize, + } +} + +func (rrl redisAccountRateLimiter) Get(ctx context.Context, key string) (int64, error) { + return rrl.get(ctx, rrl.genKey(key)) +} + +func (rrl redisAccountRateLimiter) GetLimit(ctx context.Context, key string) (int64, error) { + return rrl.get(ctx, fmt.Sprintf("%s:%s", rrl.name, key)) +} + +func (rrl redisAccountRateLimiter) get(ctx context.Context, key string) (int64, error) { + val, err := rrl.client.Get(ctx, key).Int64() + if err != nil { + if errors.Is(err, redis.Nil) { + return 0, nil + } + return 0, err + } + return val, err +} + +func (rrl redisAccountRateLimiter) Incr(ctx context.Context, key string, val int64) (int64, error) { + return rrl.incrAndExpire(ctx, rrl.genKey(key), val) +} + +func (rrl redisAccountRateLimiter) genKey(key string) string { + return fmt.Sprintf("%s:%s:%d", rrl.name, key, time.Now().Unix()/int64(rrl.windowSize.Seconds())%binSize) +} + +func (rrl redisAccountRateLimiter) incrAndExpire(ctx context.Context, key string, val int64) (int64, error) { + pipe := rrl.client.Pipeline() + + incr := pipe.IncrBy(ctx, key, val) + pipe.Expire(ctx, key, rrl.windowSize) + + _, err := pipe.Exec(ctx) + if err != nil { + return 0, err + } + + return incr.Val(), nil +} diff --git a/pkg/plugins/ratelimiter/redis.yaml b/pkg/plugins/ratelimiter/redis.yaml new file mode 100644 index 00000000..25f7e5f5 --- /dev/null +++ b/pkg/plugins/ratelimiter/redis.yaml @@ -0,0 +1,47 @@ +# https://www.callicoder.com/deploy-multi-container-go-redis-app-kubernetes/ +apiVersion: apps/v1 # API version +kind: Deployment +metadata: + name: redis-master # Unique name for the deployment + labels: + app: redis # Labels to be applied to this deployment +spec: + selector: + matchLabels: # This deployment applies to the Pods matching these labels + app: redis + role: master + tier: backend + replicas: 1 # Run a single pod in the deployment + template: # Template for the pods that will be created by this deployment + metadata: + labels: # Labels to be applied to the Pods in this deployment + app: redis + role: master + tier: backend + spec: # Spec for the container which will be run inside the Pod. + containers: + - name: master + image: redis + resources: + requests: + cpu: 100m + memory: 100Mi + ports: + - containerPort: 6379 +--- +apiVersion: v1 +kind: Service # Type of Kubernetes resource +metadata: + name: redis-master # Name of the Kubernetes resource + labels: # Labels that will be applied to this resource + app: redis + role: master + tier: backend +spec: + ports: + - port: 6379 # Map incoming connections on port 6379 to the target port 6379 of the Pod + targetPort: 6379 + selector: # Map any Pod with the specified labels to this service + app: redis + role: master + tier: backend \ No newline at end of file