From efa258c69504aff9fab6c47f62e16dce42665318 Mon Sep 17 00:00:00 2001 From: Jun Siang Cheah Date: Sat, 20 Apr 2024 20:03:52 +0100 Subject: [PATCH 1/2] feat: split large openai responses into smaller chunkers --- src/lib/apis/streaming/index.ts | 65 ++++++++++++++++++++++++++++ src/routes/(app)/+page.svelte | 37 +++++----------- src/routes/(app)/c/[id]/+page.svelte | 43 ++++++------------ 3 files changed, 90 insertions(+), 55 deletions(-) create mode 100644 src/lib/apis/streaming/index.ts diff --git a/src/lib/apis/streaming/index.ts b/src/lib/apis/streaming/index.ts new file mode 100644 index 000000000..4d1d2ecec --- /dev/null +++ b/src/lib/apis/streaming/index.ts @@ -0,0 +1,65 @@ +type TextStreamUpdate = { + done: boolean; + value: string; +}; + +// createOpenAITextStream takes a ReadableStreamDefaultReader from an SSE response, +// and returns an async generator that emits delta updates with large deltas chunked into random sized chunks +export async function createOpenAITextStream( + messageStream: ReadableStreamDefaultReader +): Promise> { + return streamLargeDeltasAsRandomChunks(openAIStreamToIterator(messageStream)); +} + +async function* openAIStreamToIterator( + reader: ReadableStreamDefaultReader +): AsyncGenerator { + while (true) { + const { value, done } = await reader.read(); + if (done) { + yield { done: true, value: '' }; + break; + } + const lines = value.split('\n'); + for (const line of lines) { + if (line !== '') { + console.log(line); + if (line === 'data: [DONE]') { + yield { done: true, value: '' }; + } else { + const data = JSON.parse(line.replace(/^data: /, '')); + console.log(data); + + yield { done: false, value: data.choices[0].delta.content ?? '' }; + } + } + } + } +} + +// streamLargeDeltasAsRandomChunks will chunk large deltas (length > 5) into random sized chunks between 1-3 characters +// This is to simulate a more fluid streaming, even though some providers may send large chunks of text at once +async function* streamLargeDeltasAsRandomChunks( + iterator: AsyncGenerator +): AsyncGenerator { + for await (const textStreamUpdate of iterator) { + if (textStreamUpdate.done) { + yield textStreamUpdate; + return; + } + let content = textStreamUpdate.value; + if (content.length < 5) { + yield { done: false, value: content }; + continue; + } + while (content != '') { + const chunkSize = Math.min(Math.floor(Math.random() * 3) + 1, content.length); + const chunk = content.slice(0, chunkSize); + yield { done: false, value: chunk }; + await sleep(5); + content = content.slice(chunkSize); + } + } +} + +const sleep = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms)); diff --git a/src/routes/(app)/+page.svelte b/src/routes/(app)/+page.svelte index bdeff6d7a..bd8676985 100644 --- a/src/routes/(app)/+page.svelte +++ b/src/routes/(app)/+page.svelte @@ -39,6 +39,7 @@ import { RAGTemplate } from '$lib/utils/rag'; import { LITELLM_API_BASE_URL, OLLAMA_API_BASE_URL, OPENAI_API_BASE_URL } from '$lib/constants'; import { WEBUI_BASE_URL } from '$lib/constants'; + import { createOpenAITextStream } from '$lib/apis/streaming'; const i18n = getContext('i18n'); @@ -599,38 +600,22 @@ .pipeThrough(splitStream('\n')) .getReader(); - while (true) { - const { value, done } = await reader.read(); + const textStream = await createOpenAITextStream(reader); + console.log(textStream); + + for await (const update of textStream) { + const { value, done } = update; if (done || stopResponseFlag || _chatId !== $chatId) { responseMessage.done = true; messages = messages; break; } - try { - let lines = value.split('\n'); - - for (const line of lines) { - if (line !== '') { - console.log(line); - if (line === 'data: [DONE]') { - responseMessage.done = true; - messages = messages; - } else { - let data = JSON.parse(line.replace(/^data: /, '')); - console.log(data); - - if (responseMessage.content == '' && data.choices[0].delta.content == '\n') { - continue; - } else { - responseMessage.content += data.choices[0].delta.content ?? ''; - messages = messages; - } - } - } - } - } catch (error) { - console.log(error); + if (responseMessage.content == '' && value == '\n') { + continue; + } else { + responseMessage.content += value; + messages = messages; } if ($settings.notificationEnabled && !document.hasFocus()) { diff --git a/src/routes/(app)/c/[id]/+page.svelte b/src/routes/(app)/c/[id]/+page.svelte index 7502f3c4e..2f8ad7d0b 100644 --- a/src/routes/(app)/c/[id]/+page.svelte +++ b/src/routes/(app)/c/[id]/+page.svelte @@ -42,6 +42,7 @@ OLLAMA_API_BASE_URL, WEBUI_BASE_URL } from '$lib/constants'; + import { createOpenAITextStream } from '$lib/apis/streaming'; const i18n = getContext('i18n'); @@ -551,9 +552,9 @@ messages: [ $settings.system ? { - role: 'system', - content: $settings.system - } + role: 'system', + content: $settings.system + } : undefined, ...messages ] @@ -611,38 +612,22 @@ .pipeThrough(splitStream('\n')) .getReader(); - while (true) { - const { value, done } = await reader.read(); + const textStream = await createOpenAITextStream(reader); + console.log(textStream); + + for await (const update of textStream) { + const { value, done } = update; if (done || stopResponseFlag || _chatId !== $chatId) { responseMessage.done = true; messages = messages; break; } - try { - let lines = value.split('\n'); - - for (const line of lines) { - if (line !== '') { - console.log(line); - if (line === 'data: [DONE]') { - responseMessage.done = true; - messages = messages; - } else { - let data = JSON.parse(line.replace(/^data: /, '')); - console.log(data); - - if (responseMessage.content == '' && data.choices[0].delta.content == '\n') { - continue; - } else { - responseMessage.content += data.choices[0].delta.content ?? ''; - messages = messages; - } - } - } - } - } catch (error) { - console.log(error); + if (responseMessage.content == '' && value == '\n') { + continue; + } else { + responseMessage.content += value; + messages = messages; } if ($settings.notificationEnabled && !document.hasFocus()) { From 67df928c7ae953e4b725c548de08c0b61ce7d1e6 Mon Sep 17 00:00:00 2001 From: Jun Siang Cheah Date: Sun, 21 Apr 2024 10:45:07 +0100 Subject: [PATCH 2/2] feat: make chunk splitting a configurable option --- src/lib/apis/streaming/index.ts | 9 ++++-- .../components/chat/Settings/Interface.svelte | 28 +++++++++++++++++++ src/lib/i18n/locales/en-US/translation.json | 1 + src/routes/(app)/+page.svelte | 2 +- src/routes/(app)/c/[id]/+page.svelte | 8 +++--- 5 files changed, 41 insertions(+), 7 deletions(-) diff --git a/src/lib/apis/streaming/index.ts b/src/lib/apis/streaming/index.ts index 4d1d2ecec..5b89a4668 100644 --- a/src/lib/apis/streaming/index.ts +++ b/src/lib/apis/streaming/index.ts @@ -6,9 +6,14 @@ type TextStreamUpdate = { // createOpenAITextStream takes a ReadableStreamDefaultReader from an SSE response, // and returns an async generator that emits delta updates with large deltas chunked into random sized chunks export async function createOpenAITextStream( - messageStream: ReadableStreamDefaultReader + messageStream: ReadableStreamDefaultReader, + splitLargeDeltas: boolean ): Promise> { - return streamLargeDeltasAsRandomChunks(openAIStreamToIterator(messageStream)); + let iterator = openAIStreamToIterator(messageStream); + if (splitLargeDeltas) { + iterator = streamLargeDeltasAsRandomChunks(iterator); + } + return iterator; } async function* openAIStreamToIterator( diff --git a/src/lib/components/chat/Settings/Interface.svelte b/src/lib/components/chat/Settings/Interface.svelte index ad9e05e7f..37d7fa4ea 100644 --- a/src/lib/components/chat/Settings/Interface.svelte +++ b/src/lib/components/chat/Settings/Interface.svelte @@ -17,11 +17,17 @@ let titleAutoGenerateModelExternal = ''; let fullScreenMode = false; let titleGenerationPrompt = ''; + let splitLargeChunks = false; // Interface let promptSuggestions = []; let showUsername = false; + const toggleSplitLargeChunks = async () => { + splitLargeChunks = !splitLargeChunks; + saveSettings({ splitLargeChunks: splitLargeChunks }); + }; + const toggleFullScreenMode = async () => { fullScreenMode = !fullScreenMode; saveSettings({ fullScreenMode: fullScreenMode }); @@ -197,6 +203,28 @@ + +
+
+
+ {$i18n.t('Fluidly stream large external response chunks')} +
+ + +
+

diff --git a/src/lib/i18n/locales/en-US/translation.json b/src/lib/i18n/locales/en-US/translation.json index be89b1b01..fdfe804ba 100644 --- a/src/lib/i18n/locales/en-US/translation.json +++ b/src/lib/i18n/locales/en-US/translation.json @@ -152,6 +152,7 @@ "File Mode": "", "File not found.": "", "Fingerprint spoofing detected: Unable to use initials as avatar. Defaulting to default profile image.": "", + "Fluidly stream large external response chunks": "", "Focus chat input": "", "Format your variables using square brackets like this:": "", "From (Base Model)": "", diff --git a/src/routes/(app)/+page.svelte b/src/routes/(app)/+page.svelte index bd8676985..9fc261773 100644 --- a/src/routes/(app)/+page.svelte +++ b/src/routes/(app)/+page.svelte @@ -600,7 +600,7 @@ .pipeThrough(splitStream('\n')) .getReader(); - const textStream = await createOpenAITextStream(reader); + const textStream = await createOpenAITextStream(reader, $settings.splitLargeChunks); console.log(textStream); for await (const update of textStream) { diff --git a/src/routes/(app)/c/[id]/+page.svelte b/src/routes/(app)/c/[id]/+page.svelte index 2f8ad7d0b..c230eb5c1 100644 --- a/src/routes/(app)/c/[id]/+page.svelte +++ b/src/routes/(app)/c/[id]/+page.svelte @@ -552,9 +552,9 @@ messages: [ $settings.system ? { - role: 'system', - content: $settings.system - } + role: 'system', + content: $settings.system + } : undefined, ...messages ] @@ -612,7 +612,7 @@ .pipeThrough(splitStream('\n')) .getReader(); - const textStream = await createOpenAITextStream(reader); + const textStream = await createOpenAITextStream(reader, $settings.splitLargeChunks); console.log(textStream); for await (const update of textStream) {