Skip to content

Commit

Permalink
[gateway] handle the wrong model name and cache inconsistency case (#542
Browse files Browse the repository at this point in the history
)

* return 400 in gateway if model does not exist

Signed-off-by: Jiaxin Shan <seedjeffwan@gmail.com>

* Remove model from the cache if pod has been deleted

this is to prevent the case cache has map key but there’s no available pods.

---------

Signed-off-by: Jiaxin Shan <seedjeffwan@gmail.com>
  • Loading branch information
Jeffwan authored Dec 26, 2024
1 parent 0e4d76b commit 484f2c7
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 9 deletions.
12 changes: 10 additions & 2 deletions pkg/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -403,12 +403,20 @@ func (c *Cache) addPodAndModelMapping(podName, modelName string) {
func (c *Cache) deletePodAndModelMapping(podName, modelName string) {
if models, ok := c.PodToModelMapping[podName]; ok {
delete(models, modelName)
c.PodToModelMapping[podName] = models
if len(models) != 0 {
c.PodToModelMapping[podName] = models
} else {
delete(c.PodToModelMapping, podName)
}
}

if pods, ok := c.ModelToPodMapping[modelName]; ok {
delete(pods, podName)
c.ModelToPodMapping[modelName] = pods
if len(pods) != 0 {
c.ModelToPodMapping[modelName] = pods
} else {
delete(c.ModelToPodMapping, modelName)
}
}
}

Expand Down
21 changes: 14 additions & 7 deletions pkg/plugins/gateway/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,14 +223,23 @@ func (s *Server) HandleRequestBody(ctx context.Context, requestID string, req *e
"error processing request body"), targetPodIP, stream
}

if model, ok = jsonMap["model"].(string); !ok || model == "" { // || !s.cache.CheckModelExists(model) # enable when dynamic lora is enabled
if model, ok = jsonMap["model"].(string); !ok || model == "" {
klog.ErrorS(nil, "model error in request", "requestID", requestID, "jsonMap", jsonMap)
return generateErrorResponse(envoyTypePb.StatusCode_InternalServerError,
[]*configPb.HeaderValueOption{{Header: &configPb.HeaderValue{
Key: "x-no-model", RawValue: []byte(model)}}},
fmt.Sprintf("no model in request body or model %s does not exist", model)), targetPodIP, stream
}

// early reject the request if model doesn't exist.
if !s.cache.CheckModelExists(model) {
klog.ErrorS(nil, "model doesn't exist in cache, probably wrong model name", "requestID", requestID, "jsonMap", jsonMap)
return generateErrorResponse(envoyTypePb.StatusCode_BadRequest,
[]*configPb.HeaderValueOption{{Header: &configPb.HeaderValue{
Key: "x-no-model", RawValue: []byte(model)}}},
fmt.Sprintf("model %s does not exist", model)), targetPodIP, stream
}

stream, ok = jsonMap["stream"].(bool)
if stream && ok {
streamOptions, ok := jsonMap["stream_options"].(map[string]interface{})
Expand All @@ -250,16 +259,15 @@ func (s *Server) HandleRequestBody(ctx context.Context, requestID string, req *e
}

headers := []*configPb.HeaderValueOption{}
switch {
case routingStrategy == "":
if routingStrategy == "" {
headers = append(headers, &configPb.HeaderValueOption{
Header: &configPb.HeaderValue{
Key: "model",
RawValue: []byte(model),
},
})
klog.InfoS("request start", "requestID", requestID, "model", model)
case routingStrategy != "":
} else {
pods, err := s.cache.GetPodsForModel(model)
if len(pods) == 0 || err != nil {
return generateErrorResponse(envoyTypePb.StatusCode_InternalServerError,
Expand Down Expand Up @@ -348,8 +356,7 @@ func (s *Server) HandleResponseBody(ctx context.Context, requestID string, req *
var usage openai.CompletionUsage
headers := []*configPb.HeaderValueOption{}

switch stream {
case true:
if stream {
t := &http.Response{
Body: io.NopCloser(bytes.NewReader(b.ResponseBody.GetBody())),
}
Expand All @@ -370,7 +377,7 @@ func (s *Server) HandleResponseBody(ctx context.Context, requestID string, req *
}}},
err.Error())
}
case false:
} else {
if err := json.Unmarshal(b.ResponseBody.Body, &res); err != nil {
klog.ErrorS(err, "error to unmarshal response", "requestID", requestID, "responseBody", string(b.ResponseBody.GetBody()))
return generateErrorResponse(
Expand Down

0 comments on commit 484f2c7

Please sign in to comment.