Spaces:
Running
on
CPU Upgrade
Running
on
CPU Upgrade
import type { InferenceTask, Options, RequestArgs } from "../../types"; | |
import { makeRequestOptions } from "../../lib/makeRequestOptions"; | |
import type { EventSourceMessage } from "../../vendor/fetch-event-source/parse"; | |
import { getLines, getMessages } from "../../vendor/fetch-event-source/parse"; | |
/** | |
* Primitive to make custom inference calls that expect server-sent events, and returns the response through a generator | |
*/ | |
export async function* streamingRequest<T>( | |
args: RequestArgs, | |
options?: Options & { | |
/** When a model can be used for multiple tasks, and we want to run a non-default task */ | |
task?: string | InferenceTask; | |
/** To load default model if needed */ | |
taskHint?: InferenceTask; | |
} | |
): AsyncGenerator<T> { | |
const { url, info } = await makeRequestOptions({ ...args, stream: true }, options); | |
const response = await (options?.fetch ?? fetch)(url, info); | |
if (options?.retry_on_error !== false && response.status === 503 && !options?.wait_for_model) { | |
return yield* streamingRequest(args, { | |
...options, | |
wait_for_model: true, | |
}); | |
} | |
if (!response.ok) { | |
if (response.headers.get("Content-Type")?.startsWith("application/json")) { | |
const output = await response.json(); | |
if (output.error) { | |
throw new Error(output.error); | |
} | |
} | |
throw new Error(`Server response contains error: ${response.status}`); | |
} | |
if (!response.headers.get("content-type")?.startsWith("text/event-stream")) { | |
throw new Error( | |
`Server does not support event stream content type, it returned ` + response.headers.get("content-type") | |
); | |
} | |
if (!response.body) { | |
return; | |
} | |
const reader = response.body.getReader(); | |
let events: EventSourceMessage[] = []; | |
const onEvent = (event: EventSourceMessage) => { | |
// accumulate events in array | |
events.push(event); | |
}; | |
const onChunk = getLines( | |
getMessages( | |
() => {}, | |
() => {}, | |
onEvent | |
) | |
); | |
try { | |
while (true) { | |
const { done, value } = await reader.read(); | |
if (done) return; | |
onChunk(value); | |
for (const event of events) { | |
if (event.data.length > 0) { | |
const data = JSON.parse(event.data); | |
if (typeof data === "object" && data !== null && "error" in data) { | |
throw new Error(data.error); | |
} | |
yield data as T; | |
} | |
} | |
events = []; | |
} | |
} finally { | |
reader.releaseLock(); | |
} | |
} | |