From efa258c69504aff9fab6c47f62e16dce42665318 Mon Sep 17 00:00:00 2001 From: Jun Siang Cheah Date: Sat, 20 Apr 2024 20:03:52 +0100 Subject: [PATCH] feat: split large openai responses into smaller chunkers --- src/lib/apis/streaming/index.ts | 65 ++++++++++++++++++++++++++++ src/routes/(app)/+page.svelte | 37 +++++----------- src/routes/(app)/c/[id]/+page.svelte | 43 ++++++------------ 3 files changed, 90 insertions(+), 55 deletions(-) create mode 100644 src/lib/apis/streaming/index.ts diff --git a/src/lib/apis/streaming/index.ts b/src/lib/apis/streaming/index.ts new file mode 100644 index 000000000..4d1d2ecec --- /dev/null +++ b/src/lib/apis/streaming/index.ts @@ -0,0 +1,65 @@ +type TextStreamUpdate = { + done: boolean; + value: string; +}; + +// createOpenAITextStream takes a ReadableStreamDefaultReader from an SSE response, +// and returns an async generator that emits delta updates with large deltas chunked into random sized chunks +export async function createOpenAITextStream( + messageStream: ReadableStreamDefaultReader +): Promise> { + return streamLargeDeltasAsRandomChunks(openAIStreamToIterator(messageStream)); +} + +async function* openAIStreamToIterator( + reader: ReadableStreamDefaultReader +): AsyncGenerator { + while (true) { + const { value, done } = await reader.read(); + if (done) { + yield { done: true, value: '' }; + break; + } + const lines = value.split('\n'); + for (const line of lines) { + if (line !== '') { + console.log(line); + if (line === 'data: [DONE]') { + yield { done: true, value: '' }; + } else { + const data = JSON.parse(line.replace(/^data: /, '')); + console.log(data); + + yield { done: false, value: data.choices[0].delta.content ?? '' }; + } + } + } + } +} + +// streamLargeDeltasAsRandomChunks will chunk large deltas (length > 5) into random sized chunks between 1-3 characters +// This is to simulate a more fluid streaming, even though some providers may send large chunks of text at once +async function* streamLargeDeltasAsRandomChunks( + iterator: AsyncGenerator +): AsyncGenerator { + for await (const textStreamUpdate of iterator) { + if (textStreamUpdate.done) { + yield textStreamUpdate; + return; + } + let content = textStreamUpdate.value; + if (content.length < 5) { + yield { done: false, value: content }; + continue; + } + while (content != '') { + const chunkSize = Math.min(Math.floor(Math.random() * 3) + 1, content.length); + const chunk = content.slice(0, chunkSize); + yield { done: false, value: chunk }; + await sleep(5); + content = content.slice(chunkSize); + } + } +} + +const sleep = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms)); diff --git a/src/routes/(app)/+page.svelte b/src/routes/(app)/+page.svelte index bdeff6d7a..bd8676985 100644 --- a/src/routes/(app)/+page.svelte +++ b/src/routes/(app)/+page.svelte @@ -39,6 +39,7 @@ import { RAGTemplate } from '$lib/utils/rag'; import { LITELLM_API_BASE_URL, OLLAMA_API_BASE_URL, OPENAI_API_BASE_URL } from '$lib/constants'; import { WEBUI_BASE_URL } from '$lib/constants'; + import { createOpenAITextStream } from '$lib/apis/streaming'; const i18n = getContext('i18n'); @@ -599,38 +600,22 @@ .pipeThrough(splitStream('\n')) .getReader(); - while (true) { - const { value, done } = await reader.read(); + const textStream = await createOpenAITextStream(reader); + console.log(textStream); + + for await (const update of textStream) { + const { value, done } = update; if (done || stopResponseFlag || _chatId !== $chatId) { responseMessage.done = true; messages = messages; break; } - try { - let lines = value.split('\n'); - - for (const line of lines) { - if (line !== '') { - console.log(line); - if (line === 'data: [DONE]') { - responseMessage.done = true; - messages = messages; - } else { - let data = JSON.parse(line.replace(/^data: /, '')); - console.log(data); - - if (responseMessage.content == '' && data.choices[0].delta.content == '\n') { - continue; - } else { - responseMessage.content += data.choices[0].delta.content ?? ''; - messages = messages; - } - } - } - } - } catch (error) { - console.log(error); + if (responseMessage.content == '' && value == '\n') { + continue; + } else { + responseMessage.content += value; + messages = messages; } if ($settings.notificationEnabled && !document.hasFocus()) { diff --git a/src/routes/(app)/c/[id]/+page.svelte b/src/routes/(app)/c/[id]/+page.svelte index 7502f3c4e..2f8ad7d0b 100644 --- a/src/routes/(app)/c/[id]/+page.svelte +++ b/src/routes/(app)/c/[id]/+page.svelte @@ -42,6 +42,7 @@ OLLAMA_API_BASE_URL, WEBUI_BASE_URL } from '$lib/constants'; + import { createOpenAITextStream } from '$lib/apis/streaming'; const i18n = getContext('i18n'); @@ -551,9 +552,9 @@ messages: [ $settings.system ? { - role: 'system', - content: $settings.system - } + role: 'system', + content: $settings.system + } : undefined, ...messages ] @@ -611,38 +612,22 @@ .pipeThrough(splitStream('\n')) .getReader(); - while (true) { - const { value, done } = await reader.read(); + const textStream = await createOpenAITextStream(reader); + console.log(textStream); + + for await (const update of textStream) { + const { value, done } = update; if (done || stopResponseFlag || _chatId !== $chatId) { responseMessage.done = true; messages = messages; break; } - try { - let lines = value.split('\n'); - - for (const line of lines) { - if (line !== '') { - console.log(line); - if (line === 'data: [DONE]') { - responseMessage.done = true; - messages = messages; - } else { - let data = JSON.parse(line.replace(/^data: /, '')); - console.log(data); - - if (responseMessage.content == '' && data.choices[0].delta.content == '\n') { - continue; - } else { - responseMessage.content += data.choices[0].delta.content ?? ''; - messages = messages; - } - } - } - } - } catch (error) { - console.log(error); + if (responseMessage.content == '' && value == '\n') { + continue; + } else { + responseMessage.content += value; + messages = messages; } if ($settings.notificationEnabled && !document.hasFocus()) {