Skip to content

Commit

Permalink
Fix decode stream with broken data
Browse files Browse the repository at this point in the history
  • Loading branch information
aiql-admin authored Jul 18, 2024
1 parent f85dbdb commit 6ab8da0
Showing 1 changed file with 70 additions and 39 deletions.
109 changes: 70 additions & 39 deletions index.html
Original file line number Diff line number Diff line change
Expand Up @@ -1095,20 +1095,22 @@ <h5 class="font-weight-bold">{{ column.key }}</h5>
role: "assistant",
});

let buffer = ''

// Read the stream
read(reader, messageStore.conversation);
read(reader, messageStore.conversation, buffer);
} catch (error) {

snackbarStore.showErrorMessage(error.message);
}
};

const read = async (
reader,
target,
buffer
) => {
// TextDecoder is a built-in object that allows you to convert a stream of bytes into a string
const decoder = new TextDecoder("utf-8");
const decoder = new TextDecoder();
// Destructure the value returned by reader.read()
const { done, value } = await reader.read();

Expand All @@ -1117,49 +1119,78 @@ <h5 class="font-weight-bold">{{ column.key }}</h5>
return reader.releaseLock();
}
// Convert the stream of bytes into a string
const chunk = decoder.decode(value, { stream: true });
// Split the string into an array of strings
const jsons = chunk
.split("data:")
.map((data) => {
// Trim any whitespace
const trimData = data.trim();
// If the string is empty, return undefined
if (trimData === "") return undefined;
// If the string is [DONE], return undefined
if (trimData === "[DONE]") return undefined;
// Otherwise, parse the string as JSON
const chunks = decoder.decode(value);

// Split stream
let parts = chunks.split('\n')

if (parts.length === 1) {
buffer += parts[0]
return read(reader, target, buffer);
}

if (buffer.length > 0) {
parts[0] = buffer + parts[0];
buffer = ''
}

const last = parts[parts.length - 1];
if (last && last.length > 0) {
buffer = parts.pop();
}

const jsons = parts
.map((line) => line.trim())
.filter((line) => line.length > 0)
.map((line) => {
const pos = line.indexOf(':');
const name = line.substring(0, pos);
if (name !== 'data') {
return undefined
}
const content = line.substring(pos + 1).trim()
if (content.length === 0) {
return undefined
} else if (content === "[DONE]") {
return undefined
}
try {
return JSON.parse(data)
} catch (error) {
console.log(data)
// return undefined
return data
const parsed = JSON.parse(content)
const choices = parsed.choices
return parseChoice(parsed.choices, target)
} catch (e) {
console.log(e, line)
return undefined
}
})
// Filter out any undefined values
.filter((data) => data);
// Combine the data into a single string
const streamMessage = jsons
.map((jn) => {
if (!jn.choices) {
return jn;
}
return jn.choices.map((choice) => {
// Support both steam or not
return choice.delta?.content || choice.message?.content
}
).join("")
}).join("");
// Update the ref to the target element with the new string
const response = streamMessage;
if (target instanceof Array) {
target[target.length - 1].content += response;
} else {
target = target += response;

// server cannot support stream
if (jsons.length === 0) {
const parsed = JSON.parse(chunks)
parseChoice(parsed.choices, target)
}

// Repeat the process
return read(reader, target);
return read(reader, target, buffer);
};

const parseChoice = (choices, target) => {
const choice = choices.map((choice) => {
// Support both steam or not
return choice.delta?.content || choice.message?.content
})
if (choice) {
if (target instanceof Array) {
target[target.length - 1].content += choice;
} else {
target = target += choice;
}
return choice
} else {
return undefined
}
};

watch(computed(() => messageStore.conversation),
Expand Down

0 comments on commit 6ab8da0

Please sign in to comment.