import { ParsedEvent, ReconnectInterval, createParser, } from 'eventsource-parser'; export class LLMError extends Error { type: string; param: string; code: string; constructor(message: string, type: string, param: string, code: string) { super(message); this.name = 'LLMError'; this.type = type; this.param = param; this.code = code; } } export const LLMStream = async (baseUrl: string, messages: any[]) => { let url = `${baseUrl}/v1/chat/completions`; const res = await fetch(url, { headers: { 'Content-Type': 'application/json' }, method: 'POST', body: JSON.stringify({ messages, stream: true, }), }); const encoder = new TextEncoder(); const decoder = new TextDecoder(); if (res.status !== 200) { const result = await res.json(); if (result.error) { throw new LLMError( result.error.message, result.error.type, result.error.param, result.error.code, ); } else { throw new Error( `API returned an error: ${ decoder.decode(result?.value) || result.statusText }`, ); } } const stream = new ReadableStream({ async start(controller) { let accumulatedContent = ""; // To accumulate message content const onParse = (event: ParsedEvent | ReconnectInterval) => { if (event.type === 'event') { const data = event.data; if (data === '[DONE]') { const queue = encoder.encode(accumulatedContent); controller.enqueue(queue); controller.close(); return; } try { const parsedData = JSON.parse(data); const content = parsedData?.choices?.[0]?.message?.content; accumulatedContent += content ? content + " " : ""; } catch (e) { controller.error(`Error parsing message: ${e}`); } } }; const parser = createParser(onParse); for await (const chunk of res.body as any) { parser.feed(decoder.decode(chunk)); } }, }); return stream; };