diff --git a/package-lock.json b/package-lock.json index 913c55b78..938bf4b0b 100644 --- a/package-lock.json +++ b/package-lock.json @@ -12,6 +12,7 @@ "async": "^3.2.5", "bits-ui": "^0.19.7", "dayjs": "^1.11.10", + "eventsource-parser": "^1.1.2", "file-saver": "^2.0.5", "highlight.js": "^11.9.0", "i18next": "^23.10.0", @@ -3167,6 +3168,14 @@ "integrity": "sha512-tYUSVOGeQPKt/eC1ABfhHy5Xd96N3oIijJvN3O9+TsC28T5V9yX9oEfEK5faP0EFSNVOG97qtAS68GBrQB2hDg==", "dev": true }, + "node_modules/eventsource-parser": { + "version": "1.1.2", + "resolved": "https://registry.npmjs.org/eventsource-parser/-/eventsource-parser-1.1.2.tgz", + "integrity": "sha512-v0eOBUbiaFojBu2s2NPBfYUoRR9GjcDNvCXVaqEf5vVfpIAh9f8RCo4vXTP8c63QRKCFwoLpMpTdPwwhEKVgzA==", + "engines": { + "node": ">=14.18" + } + }, "node_modules/execa": { "version": "4.1.0", "resolved": "https://registry.npmjs.org/execa/-/execa-4.1.0.tgz", diff --git a/package.json b/package.json index c38120727..d5b276182 100644 --- a/package.json +++ b/package.json @@ -49,6 +49,7 @@ "async": "^3.2.5", "bits-ui": "^0.19.7", "dayjs": "^1.11.10", + "eventsource-parser": "^1.1.2", "file-saver": "^2.0.5", "highlight.js": "^11.9.0", "i18next": "^23.10.0", diff --git a/src/lib/apis/streaming/index.ts b/src/lib/apis/streaming/index.ts index aad42b2b6..a72dbe47d 100644 --- a/src/lib/apis/streaming/index.ts +++ b/src/lib/apis/streaming/index.ts @@ -1,15 +1,22 @@ +import { EventSourceParserStream } from 'eventsource-parser/stream'; +import type { ParsedEvent } from 'eventsource-parser'; + type TextStreamUpdate = { done: boolean; value: string; }; -// createOpenAITextStream takes a ReadableStreamDefaultReader from an SSE response, +// createOpenAITextStream takes a responseBody with a SSE response, // and returns an async generator that emits delta updates with large deltas chunked into random sized chunks export async function createOpenAITextStream( - messageStream: ReadableStreamDefaultReader, + responseBody: ReadableStream, splitLargeDeltas: boolean ): Promise> { - let iterator = openAIStreamToIterator(messageStream); + const eventStream = responseBody + .pipeThrough(new TextDecoderStream()) + .pipeThrough(new EventSourceParserStream()) + .getReader(); + let iterator = openAIStreamToIterator(eventStream); if (splitLargeDeltas) { iterator = streamLargeDeltasAsRandomChunks(iterator); } @@ -17,7 +24,7 @@ export async function createOpenAITextStream( } async function* openAIStreamToIterator( - reader: ReadableStreamDefaultReader + reader: ReadableStreamDefaultReader ): AsyncGenerator { while (true) { const { value, done } = await reader.read(); @@ -25,31 +32,22 @@ async function* openAIStreamToIterator( yield { done: true, value: '' }; break; } - const lines = value.split('\n'); - for (let line of lines) { - if (line.endsWith('\r')) { - // Remove trailing \r - line = line.slice(0, -1); - } - if (line !== '') { - console.log(line); - if (line === 'data: [DONE]') { - yield { done: true, value: '' }; - } else if (line.startsWith(':')) { - // Events starting with : are comments https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events#event_stream_format - // OpenRouter sends heartbeats like ": OPENROUTER PROCESSING" - continue; - } else { - try { - const data = JSON.parse(line.replace(/^data: /, '')); - console.log(data); + if (!value) { + continue; + } + const data = value.data; + if (data.startsWith('[DONE]')) { + yield { done: true, value: '' }; + break; + } - yield { done: false, value: data.choices?.[0]?.delta?.content ?? '' }; - } catch (e) { - console.error('Error extracting delta from SSE event:', e); - } - } - } + try { + const parsedData = JSON.parse(data); + console.log(parsedData); + + yield { done: false, value: parsedData.choices?.[0]?.delta?.content ?? '' }; + } catch (e) { + console.error('Error extracting delta from SSE event:', e); } } } diff --git a/src/routes/(app)/+page.svelte b/src/routes/(app)/+page.svelte index f745fcbe6..e572f82fd 100644 --- a/src/routes/(app)/+page.svelte +++ b/src/routes/(app)/+page.svelte @@ -605,14 +605,8 @@ scrollToBottom(); - if (res && res.ok) { - const reader = res.body - .pipeThrough(new TextDecoderStream()) - .pipeThrough(splitStream('\n')) - .getReader(); - - const textStream = await createOpenAITextStream(reader, $settings.splitLargeChunks); - console.log(textStream); + if (res && res.ok && res.body) { + const textStream = await createOpenAITextStream(res.body, $settings.splitLargeChunks); for await (const update of textStream) { const { value, done } = update; diff --git a/src/routes/(app)/c/[id]/+page.svelte b/src/routes/(app)/c/[id]/+page.svelte index 33578681f..bf5bdf86f 100644 --- a/src/routes/(app)/c/[id]/+page.svelte +++ b/src/routes/(app)/c/[id]/+page.svelte @@ -617,14 +617,8 @@ scrollToBottom(); - if (res && res.ok) { - const reader = res.body - .pipeThrough(new TextDecoderStream()) - .pipeThrough(splitStream('\n')) - .getReader(); - - const textStream = await createOpenAITextStream(reader, $settings.splitLargeChunks); - console.log(textStream); + if (res && res.ok && res.body) { + const textStream = await createOpenAITextStream(res.body, $settings.splitLargeChunks); for await (const update of textStream) { const { value, done } = update;