Skip to content

Commit

Permalink
Speech streaming prototype.
Browse files Browse the repository at this point in the history
  • Loading branch information
lgrammel committed Jan 17, 2024
1 parent b234974 commit da585bb
Show file tree
Hide file tree
Showing 8 changed files with 304 additions and 8 deletions.
86 changes: 86 additions & 0 deletions examples/next-openai/app/api/completion-speech-lmnt/route.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
import OpenAI from 'openai';
import {
OpenAIStream,
StreamingTextResponse,
experimental_StreamData,
} from 'ai';
import Speech from 'lmnt-node';

// Create an OpenAI API client (that's edge friendly!)
const openai = new OpenAI({
apiKey: process.env.OPENAI_API_KEY || '',
});

const speech = new Speech(process.env.LMNT_API_KEY || '');

// IMPORTANT! Set the runtime to edge
// export const runtime = 'edge';

export async function POST(req: Request) {
// Extract the `prompt` from the body of the request
const { prompt } = await req.json();

// Ask OpenAI for a streaming completion given the prompt
const response = await openai.completions.create({
model: 'gpt-3.5-turbo-instruct',
max_tokens: 2000,
stream: true,
prompt,
});

const speechStreamingConnection = speech.synthesizeStreaming(
'034b632b-df71-46c8-b440-86a42ffc3cf3', // Henry
{},
);

const data = new experimental_StreamData();

// create a promise to wait for the speech stream to finish
let resolveSpeech: (value: unknown) => void = () => {};
const speechFinishedPromise = new Promise(resolve => {
resolveSpeech = resolve;
});

// run in parallel:
(async () => {
let i = 0;
for await (const chunk of speechStreamingConnection) {
try {
const chunkAny = chunk as any;
const audioBuffer: Buffer = chunkAny.audio;

// base64 encode the audio buffer
const base64Audio = audioBuffer.toString('base64');

console.log('streaming speech chunk #' + i++);

data.appendSpeech(base64Audio);
} catch (err) {
console.error(err);
}
}
console.log('done streaming speech');

resolveSpeech?.(undefined);
})();

// Convert the response into a friendly text-stream
const stream = OpenAIStream(response, {
onToken(token) {
speechStreamingConnection.appendText(token);
speechStreamingConnection.flush();
},
async onFinal(completion) {
speechStreamingConnection.finish();

await speechFinishedPromise;
data.close();

console.log('done streaming text');
},
experimental_streamData: true,
});

// Respond with the stream
return new StreamingTextResponse(stream, {}, data);
}
53 changes: 53 additions & 0 deletions examples/next-openai/app/completion-speech-lmnt/page.tsx
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
'use client';

import { useCompletion } from 'ai/react';

export default function Chat() {
const {
completion,
input,
handleInputChange,
handleSubmit,
error,
speechUrl,
} = useCompletion({
api: '/api/completion-speech-lmnt',
});

return (
<div className="flex flex-col w-full max-w-md py-24 mx-auto stretch">
<h4 className="text-xl font-bold text-gray-900 md:text-xl pb-4">
useCompletion Example
</h4>
{error && (
<div className="fixed top-0 left-0 w-full p-4 text-center bg-red-500 text-white">
{error.message}
</div>
)}

{completion}

<div className="flex justify-center mt-4">
{speechUrl != null && (
<>
<audio
controls
controlsList="nodownload nofullscreen noremoteplayback"
autoPlay={true}
src={speechUrl}
/>
</>
)}
</div>

<form onSubmit={handleSubmit}>
<input
className="fixed bottom-0 w-full max-w-md p-2 mb-8 border border-gray-300 rounded shadow-xl"
value={input}
placeholder="Say something..."
onChange={handleInputChange}
/>
</form>
</div>
);
}
2 changes: 2 additions & 0 deletions examples/next-openai/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@
},
"dependencies": {
"ai": "2.2.31",
"bufferutil": "4.0.8",
"next": "14.0.3",
"lmnt-node": "1.0.0",
"openai": "4.16.1",
"react": "18.2.0",
"react-dom": "^18.2.0"
Expand Down
69 changes: 68 additions & 1 deletion packages/core/react/use-completion.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ export type UseCompletionHelpers = {
isLoading: boolean;
/** Additional data added on the server via StreamData */
data?: JSONValue[] | undefined;

speechUrl: string | null;
};

export function useCompletion({
Expand Down Expand Up @@ -96,6 +98,61 @@ export function useCompletion({
const [abortController, setAbortController] =
useState<AbortController | null>(null);

// speech setup
// TODO reset on completionId change
const mediaSourceRef = useRef<MediaSource | undefined>(undefined);
const sourceBufferRef = useRef<SourceBuffer | undefined>(undefined);
const audioChunks = useRef<ArrayBufferLike[]>([]);
const [speechUrl, setSpeechUrl] = useState<string | null>(null);

const tryAppendNextChunk = () => {
const sourceBuffer = sourceBufferRef.current;
const chunks = audioChunks.current;

console.log('tryAppendNextChunk', {
sourceBuffer: sourceBuffer != null,
updating: sourceBuffer?.updating,
audioChunks: chunks.length,
});

if (sourceBuffer != null && !sourceBuffer.updating && chunks.length > 0) {
console.log('appendBuffer');
// get first audio chunk and append it to the source buffer
sourceBuffer.appendBuffer(chunks.shift()!);
}
};

useEffect(() => {
if (typeof window !== 'undefined' && mediaSourceRef.current == null) {
console.log('create media source');
mediaSourceRef.current = new MediaSource();

const sourceOpen = () => {
console.log('sourceopen');

sourceBufferRef.current =
mediaSourceRef.current?.addSourceBuffer('audio/mpeg'); // TODO configurable

sourceBufferRef.current?.addEventListener('updateend', () => {
console.log('updateend');
tryAppendNextChunk();
});
};

mediaSourceRef.current.addEventListener('sourceopen', sourceOpen, {
once: true,
});

console.log('setSpeechUrl');
setSpeechUrl(URL.createObjectURL(mediaSourceRef.current));

return () => {
console.log('removeEventListener');
// mediaSourceRef.current?.removeEventListener('sourceopen', sourceOpen);
};
}
}, []);

const extraMetadataRef = useRef({
credentials,
headers,
Expand Down Expand Up @@ -125,11 +182,20 @@ export function useCompletion({
setError,
setAbortController,
onResponse,
onFinish,
onFinish: (prompt, completion) => {
if (mediaSourceRef.current?.readyState === 'open') {
mediaSourceRef.current?.endOfStream();
}
onFinish?.(prompt, completion);
},
onError,
onData: data => {
mutateStreamData([...(streamData || []), ...(data || [])], false);
},
onSpeechPart: data => {
audioChunks.current.push(data);
tryAppendNextChunk();
},
}),
[
mutate,
Expand Down Expand Up @@ -194,5 +260,6 @@ export function useCompletion({
handleSubmit,
isLoading,
data: streamData,
speechUrl,
};
}
12 changes: 12 additions & 0 deletions packages/core/shared/call-completion-api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ export async function callCompletionApi({
onFinish,
onError,
onData,
onSpeechPart,
}: {
api: string;
prompt: string;
Expand All @@ -30,6 +31,7 @@ export async function callCompletionApi({
onFinish?: (prompt: string, completion: string) => void;
onError?: (error: Error) => void;
onData?: (data: JSONValue[]) => void;
onSpeechPart?: (data: ArrayBufferLike) => void;
}) {
try {
setLoading(true);
Expand Down Expand Up @@ -94,6 +96,16 @@ export async function callCompletionApi({
onData?.(value);
break;
}
case 'audio': {
console.log('decode audio');

// convert base64 to Uint8Array:
const bytes = Uint8Array.from(atob(value), char =>
char.charCodeAt(0),
).buffer;

onSpeechPart?.(bytes);
}
}
}
} else {
Expand Down
20 changes: 18 additions & 2 deletions packages/core/shared/stream-parts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,17 @@ const toolCallStreamPart: StreamPart<
},
};

const audioPart: StreamPart<'8', 'audio', string> = {
code: '8',
name: 'audio',
parse: (value: JSONValue) => {
if (typeof value !== 'string') {
throw new Error('"audio" parts expect a string value.');
}
return { type: 'audio', value };
},
};

const streamParts = [
textStreamPart,
functionCallStreamPart,
Expand All @@ -230,6 +241,7 @@ const streamParts = [
assistantControlDataStreamPart,
dataMessageStreamPart,
toolCallStreamPart,
audioPart,
] as const;

// union type of all stream parts
Expand All @@ -241,7 +253,8 @@ type StreamParts =
| typeof assistantMessageStreamPart
| typeof assistantControlDataStreamPart
| typeof dataMessageStreamPart
| typeof toolCallStreamPart;
| typeof toolCallStreamPart
| typeof audioPart;

/**
* Maps the type of a stream part to its value type.
Expand All @@ -258,7 +271,8 @@ export type StreamPartType =
| ReturnType<typeof assistantMessageStreamPart.parse>
| ReturnType<typeof assistantControlDataStreamPart.parse>
| ReturnType<typeof dataMessageStreamPart.parse>
| ReturnType<typeof toolCallStreamPart.parse>;
| ReturnType<typeof toolCallStreamPart.parse>
| ReturnType<typeof audioPart.parse>;

export const streamPartsByCode = {
[textStreamPart.code]: textStreamPart,
Expand All @@ -269,6 +283,7 @@ export const streamPartsByCode = {
[assistantControlDataStreamPart.code]: assistantControlDataStreamPart,
[dataMessageStreamPart.code]: dataMessageStreamPart,
[toolCallStreamPart.code]: toolCallStreamPart,
[audioPart.code]: audioPart,
} as const;

/**
Expand Down Expand Up @@ -302,6 +317,7 @@ export const StreamStringPrefixes = {
[assistantControlDataStreamPart.name]: assistantControlDataStreamPart.code,
[dataMessageStreamPart.name]: dataMessageStreamPart.code,
[toolCallStreamPart.name]: toolCallStreamPart.code,
[audioPart.name]: audioPart.code,
} as const;

export const validCodes = streamParts.map(part => part.code);
Expand Down
12 changes: 12 additions & 0 deletions packages/core/streams/stream-data.ts
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,18 @@ export class experimental_StreamData {

this.data.push(value);
}

appendSpeech(speechBase64: string) {
if (this.isClosed) {
throw new Error('Data Stream has already been closed.');
}

console.log('appendSpeech', speechBase64.length, this.controller != null);

this.controller?.enqueue(
this.encoder.encode(formatStreamPart('audio', speechBase64)),
);
}
}

/**
Expand Down
Loading

0 comments on commit da585bb

Please sign in to comment.