Skip to content

Commit

Permalink
🚸 (openai) Parse stream on client to correctly handle errors
Browse files Browse the repository at this point in the history
  • Loading branch information
baptisteArno committed Jun 16, 2023
1 parent 83f2a29 commit 524f156
Show file tree
Hide file tree
Showing 11 changed files with 209 additions and 154 deletions.
21 changes: 21 additions & 0 deletions apps/docs/openapi/chat/_spec_.json
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,27 @@
"type": "string",
"description": "Session ID that you get from the initial chat request to a bot. If not provided, it will create a new session."
},
"clientLogs": {
"type": "array",
"items": {
"type": "object",
"properties": {
"status": {
"type": "string"
},
"description": {
"type": "string"
},
"details": {}
},
"required": [
"status",
"description"
],
"additionalProperties": false
},
"description": "Logs while executing client side actions"
},
"startParams": {
"type": "object",
"properties": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,9 @@ import { decrypt, isCredentialsV2 } from '@typebot.io/lib/api/encryption'
import { updateVariables } from '@/features/variables/updateVariables'
import { parseVariableNumber } from '@/features/variables/parseVariableNumber'
import { resumeChatCompletion } from './resumeChatCompletion'
import { isPlaneteScale } from '@/helpers/api/isPlanetScale'
import { isVercel } from '@/helpers/api/isVercel'
import { parseChatCompletionMessages } from './parseChatCompletionMessages'
import { executeChatCompletionOpenAIRequest } from './executeChatCompletionOpenAIRequest'
import { isPlaneteScale } from '@/helpers/api/isPlanetScale'

export const createChatCompletionOpenAI = async (
state: SessionState,
Expand Down Expand Up @@ -58,7 +57,6 @@ export const createChatCompletionOpenAI = async (

if (
isPlaneteScale() &&
isVercel() &&
isCredentialsV2(credentials) &&
newSessionState.isStreamEnabled
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,6 @@ import {
OpenAICredentials,
} from '@typebot.io/schemas/features/blocks/integrations/openai'
import { SessionState } from '@typebot.io/schemas/features/chat'
import {
ParsedEvent,
ReconnectInterval,
createParser,
} from 'eventsource-parser'
import type {
ChatCompletionRequestMessage,
CreateChatCompletionRequest,
Expand Down Expand Up @@ -42,11 +37,6 @@ export const getChatCompletionStream =
options.advancedSettings?.temperature
)

const encoder = new TextEncoder()
const decoder = new TextDecoder()

let counter = 0

const res = await fetch('https://api.openai.com/v1/chat/completions', {
headers: {
'Content-Type': 'application/json',
Expand All @@ -61,43 +51,5 @@ export const getChatCompletionStream =
} satisfies CreateChatCompletionRequest),
})

const stream = new ReadableStream({
async start(controller) {
function onParse(event: ParsedEvent | ReconnectInterval) {
if (event.type === 'event') {
const data = event.data
if (data === '[DONE]') {
controller.close()
return
}
try {
const json = JSON.parse(data) as {
choices: { delta: { content: string } }[]
}
const text = json.choices.at(0)?.delta.content
if (counter < 2 && (text?.match(/\n/) || []).length) {
return
}
const queue = encoder.encode(text)
controller.enqueue(queue)
counter++
} catch (e) {
controller.error(e)
}
}
}

// stream response (SSE) from OpenAI may be fragmented into multiple chunks
// this ensures we properly read chunks & invoke an event for each SSE event stream
const parser = createParser(onParse)

// https://web.dev/streams/#asynchronous-iteration
// eslint-disable-next-line @typescript-eslint/no-explicit-any
for await (const chunk of res.body as any) {
parser.feed(decoder.decode(chunk))
}
},
})

return stream
return res.body
}
17 changes: 16 additions & 1 deletion apps/viewer/src/features/chat/api/sendMessage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import { prefillVariables } from '@/features/variables/prefillVariables'
import { injectVariablesFromExistingResult } from '@/features/variables/injectVariablesFromExistingResult'
import { deepParseVariables } from '@/features/variables/deepParseVariable'
import { parseVariables } from '@/features/variables/parseVariables'
import { saveLog } from '@/features/logs/saveLog'

export const sendMessage = publicProcedure
.meta({
Expand All @@ -42,9 +43,23 @@ export const sendMessage = publicProcedure
.input(sendMessageInputSchema)
.output(chatReplySchema)
.query(
async ({ input: { sessionId, message, startParams }, ctx: { user } }) => {
async ({
input: { sessionId, message, startParams, clientLogs },
ctx: { user },
}) => {
const session = sessionId ? await getSession(sessionId) : null

if (clientLogs) {
for (const log of clientLogs) {
await saveLog({
message: log.description,
status: log.status as 'error' | 'success' | 'info',
resultId: session?.state.result.id,
details: log.details,
})
}
}

if (!session) {
const {
sessionId,
Expand Down
15 changes: 8 additions & 7 deletions apps/viewer/src/features/chat/helpers/continueBotFlow.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import {
SetVariableBlock,
WebhookBlock,
} from '@typebot.io/schemas'
import { isInputBlock, isNotDefined, byId, isDefined } from '@typebot.io/lib'
import { isInputBlock, isNotDefined, byId } from '@typebot.io/lib'
import { executeGroup } from './executeGroup'
import { getNextGroup } from './getNextGroup'
import { validateEmail } from '@/features/blocks/inputs/email/validateEmail'
Expand Down Expand Up @@ -69,15 +69,16 @@ export const continueBotFlow =
)(JSON.parse(reply))
if (result.newSessionState) newSessionState = result.newSessionState
} else if (
isDefined(reply) &&
block.type === IntegrationBlockType.OPEN_AI &&
block.options.task === 'Create chat completion'
) {
const result = await resumeChatCompletion(state, {
options: block.options,
outgoingEdgeId: block.outgoingEdgeId,
})(reply)
newSessionState = result.newSessionState
if (reply) {
const result = await resumeChatCompletion(state, {
options: block.options,
outgoingEdgeId: block.outgoingEdgeId,
})(reply)
newSessionState = result.newSessionState
}
} else if (!isInputBlock(block))
throw new TRPCError({
code: 'INTERNAL_SERVER_ERROR',
Expand Down
1 change: 1 addition & 0 deletions packages/embeds/js/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
"dependencies": {
"@stripe/stripe-js": "1.53.0",
"@udecode/plate-common": "^21.1.5",
"eventsource-parser": "^1.0.0",
"solid-element": "1.7.0",
"solid-js": "1.7.5"
},
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { ChatReply, Theme } from '@typebot.io/schemas'
import { ChatReply, SendMessageInput, Theme } from '@typebot.io/schemas'
import { InputBlockType } from '@typebot.io/schemas/features/blocks/inputs/enums'
import { createEffect, createSignal, For, onMount, Show } from 'solid-js'
import { sendMessageQuery } from '@/queries/sendMessageQuery'
Expand Down Expand Up @@ -79,7 +79,7 @@ export const ConversationContainer = (props: Props) => {
sessionId: props.initialChatReply.sessionId,
})
if (response && 'replyToSend' in response) {
sendMessage(response.replyToSend)
sendMessage(response.replyToSend, response.logs)
return
}
if (response && 'blockedPopupUrl' in response)
Expand All @@ -95,7 +95,11 @@ export const ConversationContainer = (props: Props) => {
)
})

const sendMessage = async (message: string | undefined) => {
const sendMessage = async (
message: string | undefined,
clientLogs?: SendMessageInput['clientLogs']
) => {
if (clientLogs) props.onNewLogs?.(clientLogs)
setHasError(false)
const currentInputBlock = [...chatChunks()].pop()?.input
if (currentInputBlock?.id && props.onAnswer && message)
Expand All @@ -114,6 +118,7 @@ export const ConversationContainer = (props: Props) => {
apiHost: props.context.apiHost,
sessionId: props.initialChatReply.sessionId,
message,
clientLogs,
})
clearTimeout(longRequest)
setIsSending(false)
Expand Down Expand Up @@ -151,7 +156,7 @@ export const ConversationContainer = (props: Props) => {
sessionId: props.initialChatReply.sessionId,
})
if (response && 'replyToSend' in response) {
sendMessage(response.replyToSend)
sendMessage(response.replyToSend, response.logs)
return
}
if (response && 'blockedPopupUrl' in response)
Expand Down Expand Up @@ -200,7 +205,7 @@ export const ConversationContainer = (props: Props) => {
sessionId: props.initialChatReply.sessionId,
})
if (response && 'replyToSend' in response) {
sendMessage(response.replyToSend)
sendMessage(response.replyToSend, response.logs)
return
}
if (response && 'blockedPopupUrl' in response)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
import { getOpenAiStreamerQuery } from '@/queries/getOpenAiStreamerQuery'
import { ClientSideActionContext } from '@/types'
import {
createParser,
ParsedEvent,
ReconnectInterval,
} from 'eventsource-parser'

export const streamChat =
(context: ClientSideActionContext) =>
Expand All @@ -8,25 +13,59 @@ export const streamChat =
content?: string | undefined
role?: 'system' | 'user' | 'assistant' | undefined
}[],
{ onStreamedMessage }: { onStreamedMessage?: (message: string) => void }
) => {
{
onStreamedMessage,
isRetrying,
}: { onStreamedMessage?: (message: string) => void; isRetrying?: boolean }
): Promise<{ message?: string; error?: object }> => {
const data = await getOpenAiStreamerQuery(context)(messages)

if (!data) {
return
}
if (!data) return { error: { message: "Couldn't get streamer data" } }

let message = ''

const reader = data.getReader()
const decoder = new TextDecoder()
let done = false

let message = ''
while (!done) {
const { value, done: doneReading } = await reader.read()
done = doneReading
const chunkValue = decoder.decode(value)
message += chunkValue
onStreamedMessage?.(message)
const onParse = (event: ParsedEvent | ReconnectInterval) => {
if (event.type === 'event') {
const data = event.data
try {
const json = JSON.parse(data) as {
choices: { delta: { content: string } }[]
}
const text = json.choices.at(0)?.delta.content
if (!text) return
message += text
onStreamedMessage?.(message)
} catch (e) {
console.error(e)
}
}
}
return message

const parser = createParser(onParse)

// eslint-disable-next-line no-constant-condition
while (true) {
const { value, done } = await reader.read()
if (done || !value) break
const dataString = decoder.decode(value)
if (dataString.includes('503 Service Temporarily Unavailable')) {
if (isRetrying)
return { error: { message: "Couldn't get streamer data" } }
await new Promise((resolve) => setTimeout(resolve, 1000))
return streamChat(context)(messages, {
onStreamedMessage,
isRetrying: true,
})
}
if (dataString.includes('[DONE]')) break
if (dataString.includes('"error":')) {
return { error: JSON.parse(dataString).error }
}
parser.feed(dataString)
}

return { message }
}
21 changes: 17 additions & 4 deletions packages/embeds/js/src/utils/executeClientSideActions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,16 @@ import { executeSetVariable } from '@/features/blocks/logic/setVariable/executeS
import { executeWait } from '@/features/blocks/logic/wait/utils/executeWait'
import { executeWebhook } from '@/features/blocks/integrations/webhook/executeWebhook'
import { ClientSideActionContext } from '@/types'
import type { ChatReply } from '@typebot.io/schemas'
import type { ChatReply, ReplyLog } from '@typebot.io/schemas'

export const executeClientSideAction = async (
clientSideAction: NonNullable<ChatReply['clientSideActions']>[0],
context: ClientSideActionContext,
onStreamedMessage?: (message: string) => void
): Promise<
{ blockedPopupUrl: string } | { replyToSend: string | undefined } | void
| { blockedPopupUrl: string }
| { replyToSend: string | undefined; logs?: ReplyLog[] }
| void
> => {
if ('chatwoot' in clientSideAction) {
return executeChatwoot(clientSideAction.chatwoot)
Expand All @@ -35,11 +37,22 @@ export const executeClientSideAction = async (
return executeSetVariable(clientSideAction.setVariable.scriptToExecute)
}
if ('streamOpenAiChatCompletion' in clientSideAction) {
const text = await streamChat(context)(
const { error, message } = await streamChat(context)(
clientSideAction.streamOpenAiChatCompletion.messages,
{ onStreamedMessage }
)
return { replyToSend: text }
if (error)
return {
replyToSend: undefined,
logs: [
{
status: 'error',
description: 'Failed to stream OpenAI completion',
details: JSON.stringify(error, null, 2),
},
],
}
return { replyToSend: message }
}
if ('webhookToExecute' in clientSideAction) {
const response = await executeWebhook(clientSideAction.webhookToExecute)
Expand Down
Loading

0 comments on commit 524f156

Please sign in to comment.