Skip to content
This repository has been archived by the owner on Jan 9, 2025. It is now read-only.

Commit

Permalink
feat(ai): build standardised ai component (#344)
Browse files Browse the repository at this point in the history
Because

- we want users to use ai component with better experience

This commit

- openai v0: fix bugs of openai multiple choices when streaming
- openai v0: improve error messages from openai
- openai v1: expose new client in openai for universal ai
- universalai v0: add a universal ai component to inference models in
the vendors
- ai package: add task chat data struct
- make httpclient.Client to be interface
- build ConcurrentExecutor in execution wrapper

### Design
The v1 standardised ai component should use same implementation of
universal ai component.
So, in the design, we use adaptor, in universal ai component, to call
the implementation of vendor-specific ai component.
  • Loading branch information
chuang8511 authored Sep 18, 2024
1 parent 892c51f commit e46d228
Show file tree
Hide file tree
Showing 22 changed files with 1,739 additions and 9 deletions.
22 changes: 14 additions & 8 deletions ai/openai/v0/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"encoding/base64"
"encoding/json"
"fmt"
"io"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -283,9 +284,10 @@ func (e *execution) worker(ctx context.Context, client *httpclient.Client, job *
}

if restyResp.StatusCode() != 200 {

res := restyResp.Body()
job.Error.Error(ctx, fmt.Errorf("send request to openai error with error code: %d, msg %s", restyResp.StatusCode(), res))
rawBody := restyResp.RawBody()
defer rawBody.Close()
bodyBytes, err := io.ReadAll(rawBody)
job.Error.Error(ctx, fmt.Errorf("send request to openai error with error code: %d, msg %s, %s", restyResp.StatusCode(), bodyBytes, err))
return
}
scanner := bufio.NewScanner(restyResp.RawResponse.Body)
Expand Down Expand Up @@ -337,11 +339,15 @@ func (e *execution) worker(ctx context.Context, client *httpclient.Client, job *
return
}

if outputStruct.Texts == nil {
outputStruct.Texts = make([]string, len(response.Choices))
}
for idx, c := range response.Choices {
outputStruct.Texts[idx] += c.Delta.Content
for _, c := range response.Choices {
// Now, there is no document to describe it.
// But, when we test it, we found that the choices idx is not in order.
// So, we need to get idx from the choice, and the len of the choices is always 1.
responseIdx := c.Index
if len(outputStruct.Texts) <= responseIdx {
outputStruct.Texts = append(outputStruct.Texts, "")
}
outputStruct.Texts[responseIdx] += c.Delta.Content

}

Expand Down
59 changes: 59 additions & 0 deletions ai/openai/v1/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package openaiv1

import (
"github.com/instill-ai/component/internal/util/httpclient"
"go.uber.org/zap"
"google.golang.org/protobuf/types/known/structpb"
)

func NewClient(setup *structpb.Struct, logger *zap.Logger) *httpclient.Client {
c := httpclient.New("OpenAI", getBasePath(setup),
httpclient.WithLogger(logger),
httpclient.WithEndUserError(new(errBody)),
)

c.SetAuthToken(getAPIKey(setup))
c.SetRetryCount(retryCount)

org := getOrg(setup)
if org != "" {
c.SetHeader("OpenAI-Organization", org)
}

return c
}

type errBody struct {
Error struct {
Message string `json:"message"`
} `json:"error"`
}

func (e errBody) Message() string {
return e.Error.Message
}

// getBasePath returns OpenAI's API URL. This configuration param allows us to
// override the API the connector will point to. It isn't meant to be exposed
// to users. Rather, it can serve to test the logic against a fake server.
// TODO instead of having the API value hardcoded in the codebase, it should be
// read from a setup file or environment variable.
func getBasePath(setup *structpb.Struct) string {
v, ok := setup.GetFields()["base-path"]
if !ok {
return host
}
return v.GetStringValue()
}

func getAPIKey(setup *structpb.Struct) string {
return setup.GetFields()[cfgAPIKey].GetStringValue()
}

func getOrg(setup *structpb.Struct) string {
val, ok := setup.GetFields()[cfgOrganization]
if !ok {
return ""
}
return val.GetStringValue()
}
Empty file.
Empty file added ai/openai/v1/config/setup.json
Empty file.
Empty file added ai/openai/v1/config/tasks.json
Empty file.
124 changes: 124 additions & 0 deletions ai/openai/v1/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
package openaiv1

import (
"context"
_ "embed"
"fmt"
"sync"

"google.golang.org/protobuf/types/known/structpb"

"github.com/instill-ai/component/base"
"github.com/instill-ai/component/internal/util/httpclient"
)

const (
host = "https://api.openai.com"

TextChatTask = "TASK_CHAT"
cfgAPIKey = "api-key"
cfgOrganization = "organization"
retryCount = 3
)

var (
//go:embed config/definition.json
definitionJSON []byte
//go:embed config/setup.json
setupJSON []byte
//go:embed config/tasks.json
tasksJSON []byte

once sync.Once
comp *component
)

// Connector executes queries against OpenAI.
type component struct {
base.Component

instillAPIKey string
}

type execution struct {
base.ComponentExecution
usesInstillCredentials bool
client *httpclient.Client
execute func(*structpb.Struct, *base.Job, context.Context) (*structpb.Struct, error)
}

// Init returns an initialized OpenAI connector.
func Init(bc base.Component) *component {
once.Do(func() {
comp = &component{Component: bc}
err := comp.LoadDefinition(definitionJSON, setupJSON, tasksJSON, nil)
if err != nil {
panic(err)
}
})

return comp
}

func (c *component) CreateExecution(x base.ComponentExecution) (base.IExecution, error) {
resolvedSetup, resolved, err := c.resolveSetup(x.Setup)
if err != nil {
return nil, err
}

x.Setup = resolvedSetup
client := NewClient(x.Setup, x.GetLogger())

e := &execution{
ComponentExecution: x,
usesInstillCredentials: resolved,
client: client,
}

switch x.Task {
case TextChatTask:
e.execute = e.ExecuteTextChat
default:
return nil, fmt.Errorf("unknown task: %s", x.Task)
}

return e, nil
}

func (e *execution) Execute(ctx context.Context, jobs []*base.Job) error {
return base.ConcurrentExecutor(ctx, jobs, e.execute)
}

func (e *execution) UsesInstillCredentials() bool {
return e.usesInstillCredentials
}

// WithInstillCredentials loads Instill credentials into the component, which
// can be used to configure it with globally defined parameters instead of with
// user-defined credential values.
func (c *component) WithInstillCredentials(s map[string]any) *component {
c.instillAPIKey = base.ReadFromGlobalConfig(cfgAPIKey, s)
return c
}

// resolveSetup checks whether the component is configured to use the Instill
// credentials injected during initialization and, if so, returns a new setup
// with the secret credential values.
func (c *component) resolveSetup(setup *structpb.Struct) (*structpb.Struct, bool, error) {
if setup == nil || setup.Fields == nil {
setup = &structpb.Struct{Fields: map[string]*structpb.Value{}}
}
if v, ok := setup.GetFields()[cfgAPIKey]; ok {
apiKey := v.GetStringValue()
if apiKey != "" && apiKey != base.SecretKeyword {
return setup, false, nil
}
}

if c.instillAPIKey == "" {
return nil, false, base.NewUnresolvedCredential(cfgAPIKey)
}

setup.GetFields()[cfgAPIKey] = structpb.NewStringValue(c.instillAPIKey)
return setup, true, nil
}
Loading

0 comments on commit e46d228

Please sign in to comment.