Skip to content
This repository has been archived by the owner on Jan 9, 2025. It is now read-only.

Commit

Permalink
feat(instill): drop support for "external mode" (#101)
Browse files Browse the repository at this point in the history
Because

- In the Instill Model connector, we need to inject system information
to enable the connector to fetch the model list from the model service.
- We want to drop support for "external mode" for simplicity.

This commit

- Drops support for "external mode".
- Adjusts the `ListOperatorDefinitions` and `ListConnectorDefinitions`
interfaces.
  • Loading branch information
donch1989 authored Apr 24, 2024
1 parent ff49157 commit b0c091b
Show file tree
Hide file tree
Showing 17 changed files with 91 additions and 159 deletions.
2 changes: 2 additions & 0 deletions pkg/base/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ type IConnector interface {
IComponent

LoadConnectorDefinition(definitionJSON []byte, tasksJSON []byte, additionalJSONBytes map[string][]byte) error

// Note: Some content in the definition JSON schema needs to be generated by sysVars or component setting.
GetConnectorDefinition(sysVars map[string]any, component *pipelinePB.ConnectorComponent) (*pipelinePB.ConnectorDefinition, error)

CreateExecution(sysVars map[string]any, connection *structpb.Struct, task string) (*ExecutionWrapper, error)
Expand Down
3 changes: 3 additions & 0 deletions pkg/base/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@ type IOperator interface {
IComponent

LoadOperatorDefinition(definitionJSON []byte, tasksJSON []byte, additionalJSONBytes map[string][]byte) error

// Note: Some content in the definition JSON schema needs to be generated by sysVars or component setting.
GetOperatorDefinition(sysVars map[string]any, component *pipelinePB.OperatorComponent) (*pipelinePB.OperatorDefinition, error)

CreateExecution(sysVars map[string]any, task string) (*ExecutionWrapper, error)
}

Expand Down
49 changes: 1 addition & 48 deletions pkg/connector/instill/v0/config/definition.json
Original file line number Diff line number Diff line change
Expand Up @@ -22,54 +22,7 @@
"connection_specification": {
"$schema": "http://json-schema.org/draft-07/schema#",
"additionalProperties": true,
"oneOf": [
{
"title": "Instill Model Connector Internal Mode",
"properties": {
"mode": {
"const": "Internal Mode"
}
}
},
{
"title": "Instill Model Connector External Mode",
"properties": {
"api_token": {
"description": "To access models on Instill Core/Cloud, enter your Instill Core/Cloud API Token. You can find your tokens by visiting your Console's Settings > API Tokens page.",
"instillUpstreamTypes": [
"reference"
],
"instillAcceptFormats": [
"string"
],
"instillCredentialField": true,
"instillUIOrder": 0,
"title": "API Token",
"type": "string"
},
"mode": {
"const": "External Mode"
},
"server_url": {
"default": "https://api.instill.tech",
"description": "Base URL for the Instill Cloud API. To access models on Instill Cloud, use the base URL `https://api.instill.tech`. To access models on your local Instill Core, use the base URL `http://api-gateway:8080`.",
"instillUpstreamTypes": [
"value"
],
"instillAcceptFormats": [
"string"
],
"instillUIOrder": 1,
"title": "Server URL",
"type": "string"
}
},
"required": [
"api_token",
"server_url"
]
}
],
"properties": {},
"title": "Instill Model Connector",
"type": "object"
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/connector/instill/v0/image_classification.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func (e *execution) executeImageClassification(grpcClient modelPB.ModelPublicSer
Name: modelName,
TaskInputs: taskInputs,
}
ctx := metadata.NewOutgoingContext(context.Background(), getRequestMetadata(e.SystemVariables, e.Connection))
ctx := metadata.NewOutgoingContext(context.Background(), getRequestMetadata(e.SystemVariables))
res, err := grpcClient.TriggerUserModel(ctx, &req)
if err != nil || res == nil {
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion pkg/connector/instill/v0/image_to_image.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func (e *execution) executeImageToImage(grpcClient modelPB.ModelPublicServiceCli
Name: modelName,
TaskInputs: []*modelPB.TaskInput{{Input: taskInput}},
}
ctx := metadata.NewOutgoingContext(context.Background(), getRequestMetadata(e.SystemVariables, e.Connection))
ctx := metadata.NewOutgoingContext(context.Background(), getRequestMetadata(e.SystemVariables))
res, err := grpcClient.TriggerUserModel(ctx, &req)
if err != nil || res == nil {
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion pkg/connector/instill/v0/instance_segmentation.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func (e *execution) executeInstanceSegmentation(grpcClient modelPB.ModelPublicSe
Name: modelName,
TaskInputs: taskInputs,
}
ctx := metadata.NewOutgoingContext(context.Background(), getRequestMetadata(e.SystemVariables, e.Connection))
ctx := metadata.NewOutgoingContext(context.Background(), getRequestMetadata(e.SystemVariables))
res, err := grpcClient.TriggerUserModel(ctx, &req)
if err != nil || res == nil {
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion pkg/connector/instill/v0/keypoint_detection.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func (e *execution) executeKeyPointDetection(grpcClient modelPB.ModelPublicServi
Name: modelName,
TaskInputs: taskInputs,
}
ctx := metadata.NewOutgoingContext(context.Background(), getRequestMetadata(e.SystemVariables, e.Connection))
ctx := metadata.NewOutgoingContext(context.Background(), getRequestMetadata(e.SystemVariables))
res, err := grpcClient.TriggerUserModel(ctx, &req)
if err != nil || res == nil {
return nil, err
Expand Down
166 changes: 70 additions & 96 deletions pkg/connector/instill/v0/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"strings"
"sync"
"time"

"go.uber.org/zap"
"google.golang.org/grpc/metadata"
Expand All @@ -21,10 +22,6 @@ import (
pipelinePB "github.com/instill-ai/protogen-go/vdp/pipeline/v1beta"
)

const (
internalMode = "Internal Mode"
)

var (
//go:embed config/definition.json
definitionJSON []byte
Expand Down Expand Up @@ -64,57 +61,33 @@ func (c *connector) CreateExecution(sysVars map[string]any, connection *structpb
}}, nil
}

func getMode(config *structpb.Struct) string {
return config.GetFields()["mode"].GetStringValue()
}

func getAPIKey(vars map[string]any, config *structpb.Struct) string {
if getMode(config) == internalMode {
return vars["__PIPELINE_HEADER_AUTHORIZATION"].(string)
func getHeaderAuthorization(vars map[string]any) string {
if v, ok := vars["__PIPELINE_HEADER_AUTHORIZATION"]; ok {
return v.(string)
}
return fmt.Sprintf("Bearer %s", config.GetFields()["api_token"].GetStringValue())
return ""
}
func getInstillUserUID(vars map[string]any, config *structpb.Struct) string {
func getInstillUserUID(vars map[string]any) string {
return vars["__PIPELINE_USER_UID"].(string)
}

func getModelServerURL(vars map[string]any, config *structpb.Struct) string {
if getMode(config) == internalMode {
return vars["__MODEL_BACKEND"].(string)
func getModelServerURL(vars map[string]any) string {
if v, ok := vars["__MODEL_BACKEND"]; ok {
return v.(string)
}
serverURL := config.GetFields()["server_url"].GetStringValue()
if strings.HasPrefix(serverURL, "https://") {
if len(strings.Split(serverURL, ":")) == 2 {
serverURL = serverURL + ":443"
}
} else if strings.HasPrefix(serverURL, "http://") {
if len(strings.Split(serverURL, ":")) == 2 {
serverURL = serverURL + ":80"
}
}
return serverURL
return ""
}

func getMgmtServerURL(vars map[string]any, config *structpb.Struct) string {
if getMode(config) == internalMode {
return vars["__MGMT_BACKEND"].(string)
func getMgmtServerURL(vars map[string]any) string {
if v, ok := vars["__MGMT_BACKEND"]; ok {
return v.(string)
}
serverURL := config.GetFields()["server_url"].GetStringValue()
if strings.HasPrefix(serverURL, "https://") {
if len(strings.Split(serverURL, ":")) == 2 {
serverURL = serverURL + ":443"
}
} else if strings.HasPrefix(serverURL, "http://") {
if len(strings.Split(serverURL, ":")) == 2 {
serverURL = serverURL + ":80"
}
}
return serverURL
return ""
}
func getRequestMetadata(vars map[string]any, cfg *structpb.Struct) metadata.MD {
func getRequestMetadata(vars map[string]any) metadata.MD {
return metadata.Pairs(
"Authorization", getAPIKey(vars, cfg),
"Instill-User-Uid", getInstillUserUID(vars, cfg),
"Authorization", getHeaderAuthorization(vars),
"Instill-User-Uid", getInstillUserUID(vars),
"Instill-Auth-Type", "user",
)
}
Expand All @@ -126,18 +99,21 @@ func (e *execution) Execute(inputs []*structpb.Struct) ([]*structpb.Struct, erro
return inputs, fmt.Errorf("invalid input")
}

gRPCCLient, gRPCCLientConn := initModelPublicServiceClient(getModelServerURL(e.SystemVariables, e.Connection))
gRPCCLient, gRPCCLientConn := initModelPublicServiceClient(getModelServerURL(e.SystemVariables))
if gRPCCLientConn != nil {
defer gRPCCLientConn.Close()
}

mgmtGRPCCLient, mgmtGRPCCLientConn := initMgmtPublicServiceClient(getMgmtServerURL(e.SystemVariables, e.Connection))
mgmtGRPCCLient, mgmtGRPCCLientConn := initMgmtPublicServiceClient(getMgmtServerURL(e.SystemVariables))
if mgmtGRPCCLientConn != nil {
defer mgmtGRPCCLientConn.Close()
}

modelNameSplits := strings.Split(inputs[0].GetFields()["model_name"].GetStringValue(), "/")
ctx := metadata.NewOutgoingContext(context.Background(), getRequestMetadata(e.SystemVariables, e.Connection))
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()

ctx = metadata.NewOutgoingContext(ctx, getRequestMetadata(e.SystemVariables))
nsResp, err := mgmtGRPCCLient.CheckNamespace(ctx, &mgmtPB.CheckNamespaceRequest{
Id: modelNameSplits[0],
})
Expand Down Expand Up @@ -187,11 +163,14 @@ func (e *execution) Execute(inputs []*structpb.Struct) ([]*structpb.Struct, erro
}

func (c *connector) Test(sysVars map[string]any, connection *structpb.Struct) error {
gRPCCLient, gRPCCLientConn := initModelPublicServiceClient(getModelServerURL(sysVars, connection))
gRPCCLient, gRPCCLientConn := initModelPublicServiceClient(getModelServerURL(sysVars))
if gRPCCLientConn != nil {
defer gRPCCLientConn.Close()
}
ctx := metadata.NewOutgoingContext(context.Background(), getRequestMetadata(sysVars, connection))
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

ctx = metadata.NewOutgoingContext(ctx, getRequestMetadata(sysVars))
_, err := gRPCCLient.ListModels(ctx, &modelPB.ListModelsRequest{})
if err != nil {
return err
Expand All @@ -200,74 +179,69 @@ func (c *connector) Test(sysVars map[string]any, connection *structpb.Struct) er
return nil
}

// func (c *connector) GetConnectorDefinitionByID(defID string, component *pipelinePB.ConnectorComponent) (*pipelinePB.ConnectorDefinition, error) {
// def, err := c.Connector.GetConnectorDefinitionByID(defID, component)
// if err != nil {
// return nil, err
// }

// return c.GetConnectorDefinitionByUID(uuid.FromStringOrNil(def.Uid), component)
// }

type ModelsResp struct {
Models []struct {
Name string `json:"name"`
Task string `json:"task"`
} `json:"models"`
}

// Generate the model_name enum based on the task
// Generate the `model_name` enum based on the task.
// This implementation is a temporary solution due to the incomplete feature set of Instill Model.
// We'll re-implement this after Instill Model is stable.
func (c *connector) GetConnectorDefinition(sysVars map[string]any, component *pipelinePB.ConnectorComponent) (*pipelinePB.ConnectorDefinition, error) {
oriDef, err := c.BaseConnector.GetConnectorDefinition(nil, nil)
if err != nil {
return nil, err
}
def := proto.Clone(oriDef).(*pipelinePB.ConnectorDefinition)

if component != nil && component.Connection != nil {
if getModelServerURL(sysVars, component.Connection) == "" {
return def, nil
}
if getModelServerURL(sysVars) == "" {
return def, nil
}

gRPCCLient, gRPCCLientConn := initModelPublicServiceClient(getModelServerURL(sysVars, component.Connection))
if gRPCCLientConn != nil {
defer gRPCCLientConn.Close()
gRPCCLient, gRPCCLientConn := initModelPublicServiceClient(getModelServerURL(sysVars))
if gRPCCLientConn != nil {
defer gRPCCLientConn.Close()
}

ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

ctx = metadata.NewOutgoingContext(ctx, getRequestMetadata(sysVars))

pageToken := ""
models := []*modelPB.Model{}
for {
resp, err := gRPCCLient.ListModels(ctx, &modelPB.ListModelsRequest{PageToken: &pageToken})
if err != nil {

return def, nil
}
ctx := metadata.NewOutgoingContext(context.Background(), getRequestMetadata(sysVars, component.Connection))
// We should query by pages and accumulate them in the future

pageToken := ""
models := []*modelPB.Model{}
for {
resp, err := gRPCCLient.ListModels(ctx, &modelPB.ListModelsRequest{PageToken: &pageToken})
if err != nil {
return def, nil
}
models = append(models, resp.Models...)
pageToken = resp.NextPageToken
if pageToken == "" {
break
}
models = append(models, resp.Models...)
pageToken = resp.NextPageToken
if pageToken == "" {
break
}
}

modelNameMap := map[string]*structpb.ListValue{}
modelNameMap := map[string]*structpb.ListValue{}

modelName := &structpb.ListValue{}
for _, model := range models {
if _, ok := modelNameMap[model.Task.String()]; !ok {
modelNameMap[model.Task.String()] = &structpb.ListValue{}
}
namePaths := strings.Split(model.Name, "/")
modelName.Values = append(modelName.Values, structpb.NewStringValue(fmt.Sprintf("%s/%s", namePaths[1], namePaths[3])))
modelNameMap[model.Task.String()].Values = append(modelNameMap[model.Task.String()].Values, structpb.NewStringValue(fmt.Sprintf("%s/%s", namePaths[1], namePaths[3])))
modelName := &structpb.ListValue{}
for _, model := range models {
if _, ok := modelNameMap[model.Task.String()]; !ok {
modelNameMap[model.Task.String()] = &structpb.ListValue{}
}
for _, sch := range def.Spec.ComponentSpecification.Fields["oneOf"].GetListValue().Values {
task := sch.GetStructValue().Fields["properties"].GetStructValue().Fields["task"].GetStructValue().Fields["const"].GetStringValue()
if _, ok := modelNameMap[task]; ok {
addModelEnum(sch.GetStructValue().Fields, modelNameMap[task])
}

namePaths := strings.Split(model.Name, "/")
modelName.Values = append(modelName.Values, structpb.NewStringValue(fmt.Sprintf("%s/%s", namePaths[1], namePaths[3])))
modelNameMap[model.Task.String()].Values = append(modelNameMap[model.Task.String()].Values, structpb.NewStringValue(fmt.Sprintf("%s/%s", namePaths[1], namePaths[3])))
}
for _, sch := range def.Spec.ComponentSpecification.Fields["oneOf"].GetListValue().Values {
task := sch.GetStructValue().Fields["properties"].GetStructValue().Fields["task"].GetStructValue().Fields["const"].GetStringValue()
if _, ok := modelNameMap[task]; ok {
addModelEnum(sch.GetStructValue().Fields, modelNameMap[task])
}

}
return def, nil
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/connector/instill/v0/object_detection.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func (e *execution) executeObjectDetection(grpcClient modelPB.ModelPublicService
TaskInputs: taskInputs,
}

ctx := metadata.NewOutgoingContext(context.Background(), getRequestMetadata(e.SystemVariables, e.Connection))
ctx := metadata.NewOutgoingContext(context.Background(), getRequestMetadata(e.SystemVariables))
res, err := grpcClient.TriggerUserModel(ctx, &req)
if err != nil || res == nil {
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion pkg/connector/instill/v0/ocr.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func (e *execution) executeOCR(grpcClient modelPB.ModelPublicServiceClient, mode
Name: modelName,
TaskInputs: []*modelPB.TaskInput{{Input: taskInput}},
}
ctx := metadata.NewOutgoingContext(context.Background(), getRequestMetadata(e.SystemVariables, e.Connection))
ctx := metadata.NewOutgoingContext(context.Background(), getRequestMetadata(e.SystemVariables))
res, err := grpcClient.TriggerUserModel(ctx, &req)
if err != nil || res == nil {
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion pkg/connector/instill/v0/semantic_segmentation.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func (e *execution) executeSemanticSegmentation(grpcClient modelPB.ModelPublicSe
Name: modelName,
TaskInputs: taskInputs,
}
ctx := metadata.NewOutgoingContext(context.Background(), getRequestMetadata(e.SystemVariables, e.Connection))
ctx := metadata.NewOutgoingContext(context.Background(), getRequestMetadata(e.SystemVariables))
res, err := grpcClient.TriggerUserModel(ctx, &req)
if err != nil || res == nil {
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion pkg/connector/instill/v0/text_generation.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func (e *execution) executeTextGeneration(grpcClient modelPB.ModelPublicServiceC
Name: modelName,
TaskInputs: []*modelPB.TaskInput{{Input: taskInput}},
}
ctx := metadata.NewOutgoingContext(context.Background(), getRequestMetadata(e.SystemVariables, e.Connection))
ctx := metadata.NewOutgoingContext(context.Background(), getRequestMetadata(e.SystemVariables))
res, err := grpcClient.TriggerUserModel(ctx, &req)
if err != nil || res == nil {
return nil, err
Expand Down
Loading

0 comments on commit b0c091b

Please sign in to comment.