Spaces:
Running
on
CPU Upgrade
Running
on
CPU Upgrade
File size: 2,318 Bytes
94753b6 ca548dc 94753b6 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 |
import type { InferenceTask, Options, RequestArgs } from "../../types";
import { makeRequestOptions } from "../../lib/makeRequestOptions";
import type { EventSourceMessage } from "../../vendor/fetch-event-source/parse";
import { getLines, getMessages } from "../../vendor/fetch-event-source/parse";
/**
* Primitive to make custom inference calls that expect server-sent events, and returns the response through a generator
*/
export async function* streamingRequest<T>(
args: RequestArgs,
options?: Options & {
/** When a model can be used for multiple tasks, and we want to run a non-default task */
task?: string | InferenceTask;
/** To load default model if needed */
taskHint?: InferenceTask;
}
): AsyncGenerator<T> {
const { url, info } = await makeRequestOptions({ ...args, stream: true }, options);
const response = await (options?.fetch ?? fetch)(url, info);
if (options?.retry_on_error !== false && response.status === 503 && !options?.wait_for_model) {
return yield* streamingRequest(args, {
...options,
wait_for_model: true,
});
}
if (!response.ok) {
if (response.headers.get("Content-Type")?.startsWith("application/json")) {
const output = await response.json();
if (output.error) {
throw new Error(output.error);
}
}
throw new Error(`Server response contains error: ${response.status}`);
}
if (!response.headers.get("content-type")?.startsWith("text/event-stream")) {
throw new Error(
`Server does not support event stream content type, it returned ` + response.headers.get("content-type")
);
}
if (!response.body) {
return;
}
const reader = response.body.getReader();
let events: EventSourceMessage[] = [];
const onEvent = (event: EventSourceMessage) => {
// accumulate events in array
events.push(event);
};
const onChunk = getLines(
getMessages(
() => {},
() => {},
onEvent
)
);
try {
while (true) {
const { done, value } = await reader.read();
if (done) return;
onChunk(value);
for (const event of events) {
if (event.data.length > 0) {
const data = JSON.parse(event.data);
if (typeof data === "object" && data !== null && "error" in data) {
throw new Error(data.error);
}
yield data as T;
}
}
events = [];
}
} finally {
reader.releaseLock();
}
}
|