import { createParser } from 'eventsource-parser' export async function* streamAsyncIterable(stream: ReadableStream) { const reader = stream.getReader() try { while (true) { const { done, value } = await reader.read() if (done) { return } yield value } } finally { reader.releaseLock() } } export class llmError extends Error { statusCode?: number; statusText?: string; isFinal?: boolean; accountId?: string; cause?: any; constructor(message: string, cause?: any) { super(message); if (cause) { this.cause = cause; } // Set the prototype explicitly. Object.setPrototypeOf(this, llmError.prototype); } } export async function fetchSSE( url: string, options: Parameters[1] & { onMessage: (data: string) => void onError?: (error: any) => void }, ) { const { onMessage, onError, ...fetchOptions } = options const res = await fetch(url, fetchOptions) if (!res.ok) { let reason: string try { reason = await res.text() } catch (err) { reason = res.statusText } const msg = `llm error ${res.status}: ${reason}` const error = new llmError(msg, { cause: res }) error.statusCode = res.status error.statusText = res.statusText throw error } const parser = createParser((event) => { onMessage(event.data) }) // handle special response errors const feed = (chunk: string) => { let response = null try { response = JSON.parse(chunk) } catch { // ignore } if (response?.detail?.type === 'invalid_request_error') { const msg = `llm error ${response.detail.message}: ${response.detail.code} (${response.detail.type})` const error = new llmError(msg, { cause: response }) error.statusCode = response.detail.code error.statusText = response.detail.message if (onError) { onError(error) } else { console.error(error) } // don't feed to the event parser return } parser.feed(chunk) } if (!res?.body?.getReader) { // Vercel polyfills `fetch` with `node-fetch`, which doesn't conform to // web standards, so this is a workaround... const body: NodeJS.ReadableStream = res.body as any if (!body.on || !body.read) { throw new llmError('unsupported "fetch" implementation') } body.on('readable', () => { let chunk: string | Buffer while (null !== (chunk = body.read())) { feed(chunk.toString()) } }) } else { for await (const chunk of streamAsyncIterable(res.body)) { const str = new TextDecoder().decode(chunk) feed(str) } } }