bolt.new/app/lib/.server/llm/switchable-stream.ts
2024-09-25 19:54:09 +01:00

67 lines
1.4 KiB
TypeScript

export default class SwitchableStream extends TransformStream {
private _controller: TransformStreamDefaultController | null = null;
private _currentReader: ReadableStreamDefaultReader | null = null;
private _switches = 0;
constructor() {
let controllerRef: TransformStreamDefaultController | undefined;
super({
start(controller) {
controllerRef = controller;
},
});
if (controllerRef === undefined) {
throw new Error('Controller not properly initialized');
}
this._controller = controllerRef;
}
async switchSource(newStream: ReadableStream) {
if (this._currentReader) {
await this._currentReader.cancel();
}
this._currentReader = newStream.getReader();
this._pumpStream();
this._switches++;
}
private async _pumpStream() {
if (!this._currentReader || !this._controller) {
throw new Error('Stream is not properly initialized');
}
try {
while (true) {
const { done, value } = await this._currentReader.read();
if (done) {
break;
}
this._controller.enqueue(value);
}
} catch (error) {
console.log(error);
this._controller.error(error);
}
}
close() {
if (this._currentReader) {
this._currentReader.cancel();
}
this._controller?.terminate();
}
get switches() {
return this._switches;
}
}