|
import { BROKEN_CONNECTION_MSG } from "../constants";
|
|
import type { Client } from "../client";
|
|
import { stream } from "fetch-event-stream";
|
|
|
|
export async function open_stream(this: Client): Promise<void> {
|
|
let {
|
|
event_callbacks,
|
|
unclosed_events,
|
|
pending_stream_messages,
|
|
stream_status,
|
|
config,
|
|
jwt
|
|
} = this;
|
|
|
|
const that = this;
|
|
|
|
if (!config) {
|
|
throw new Error("Could not resolve app config");
|
|
}
|
|
|
|
stream_status.open = true;
|
|
|
|
let stream: EventSource | null = null;
|
|
let params = new URLSearchParams({
|
|
session_hash: this.session_hash
|
|
}).toString();
|
|
|
|
let url = new URL(`${config.root}/queue/data?${params}`);
|
|
|
|
if (jwt) {
|
|
url.searchParams.set("__sign", jwt);
|
|
}
|
|
|
|
stream = this.stream(url);
|
|
|
|
if (!stream) {
|
|
console.warn("Cannot connect to SSE endpoint: " + url.toString());
|
|
return;
|
|
}
|
|
|
|
stream.onmessage = async function (event: MessageEvent) {
|
|
let _data = JSON.parse(event.data);
|
|
if (_data.msg === "close_stream") {
|
|
close_stream(stream_status, that.abort_controller);
|
|
return;
|
|
}
|
|
const event_id = _data.event_id;
|
|
if (!event_id) {
|
|
await Promise.all(
|
|
Object.keys(event_callbacks).map((event_id) =>
|
|
event_callbacks[event_id](_data)
|
|
)
|
|
);
|
|
} else if (event_callbacks[event_id] && config) {
|
|
if (
|
|
_data.msg === "process_completed" &&
|
|
["sse", "sse_v1", "sse_v2", "sse_v2.1", "sse_v3"].includes(
|
|
config.protocol
|
|
)
|
|
) {
|
|
unclosed_events.delete(event_id);
|
|
}
|
|
let fn: (data: any) => void = event_callbacks[event_id];
|
|
|
|
if (typeof window !== "undefined" && typeof document !== "undefined") {
|
|
|
|
setTimeout(fn, 0, _data);
|
|
} else {
|
|
fn(_data);
|
|
}
|
|
} else {
|
|
if (!pending_stream_messages[event_id]) {
|
|
pending_stream_messages[event_id] = [];
|
|
}
|
|
pending_stream_messages[event_id].push(_data);
|
|
}
|
|
};
|
|
stream.onerror = async function () {
|
|
await Promise.all(
|
|
Object.keys(event_callbacks).map((event_id) =>
|
|
event_callbacks[event_id]({
|
|
msg: "unexpected_error",
|
|
message: BROKEN_CONNECTION_MSG
|
|
})
|
|
)
|
|
);
|
|
};
|
|
}
|
|
|
|
export function close_stream(
|
|
stream_status: { open: boolean },
|
|
abort_controller: AbortController | null
|
|
): void {
|
|
if (stream_status) {
|
|
stream_status.open = false;
|
|
abort_controller?.abort();
|
|
}
|
|
}
|
|
|
|
export function apply_diff_stream(
|
|
pending_diff_streams: Record<string, any[][]>,
|
|
event_id: string,
|
|
data: any
|
|
): void {
|
|
let is_first_generation = !pending_diff_streams[event_id];
|
|
if (is_first_generation) {
|
|
pending_diff_streams[event_id] = [];
|
|
data.data.forEach((value: any, i: number) => {
|
|
pending_diff_streams[event_id][i] = value;
|
|
});
|
|
} else {
|
|
data.data.forEach((value: any, i: number) => {
|
|
let new_data = apply_diff(pending_diff_streams[event_id][i], value);
|
|
pending_diff_streams[event_id][i] = new_data;
|
|
data.data[i] = new_data;
|
|
});
|
|
}
|
|
}
|
|
|
|
export function apply_diff(
|
|
obj: any,
|
|
diff: [string, (number | string)[], any][]
|
|
): any {
|
|
diff.forEach(([action, path, value]) => {
|
|
obj = apply_edit(obj, path, action, value);
|
|
});
|
|
|
|
return obj;
|
|
}
|
|
|
|
function apply_edit(
|
|
target: any,
|
|
path: (number | string)[],
|
|
action: string,
|
|
value: any
|
|
): any {
|
|
if (path.length === 0) {
|
|
if (action === "replace") {
|
|
return value;
|
|
} else if (action === "append") {
|
|
return target + value;
|
|
}
|
|
throw new Error(`Unsupported action: ${action}`);
|
|
}
|
|
|
|
let current = target;
|
|
for (let i = 0; i < path.length - 1; i++) {
|
|
current = current[path[i]];
|
|
}
|
|
|
|
const last_path = path[path.length - 1];
|
|
switch (action) {
|
|
case "replace":
|
|
current[last_path] = value;
|
|
break;
|
|
case "append":
|
|
current[last_path] += value;
|
|
break;
|
|
case "add":
|
|
if (Array.isArray(current)) {
|
|
current.splice(Number(last_path), 0, value);
|
|
} else {
|
|
current[last_path] = value;
|
|
}
|
|
break;
|
|
case "delete":
|
|
if (Array.isArray(current)) {
|
|
current.splice(Number(last_path), 1);
|
|
} else {
|
|
delete current[last_path];
|
|
}
|
|
break;
|
|
default:
|
|
throw new Error(`Unknown action: ${action}`);
|
|
}
|
|
return target;
|
|
}
|
|
|
|
export function readable_stream(
|
|
input: RequestInfo | URL,
|
|
init: RequestInit = {}
|
|
): EventSource {
|
|
const instance: EventSource & { readyState: number } = {
|
|
close: () => {
|
|
console.warn("Method not implemented.");
|
|
},
|
|
onerror: null,
|
|
onmessage: null,
|
|
onopen: null,
|
|
readyState: 0,
|
|
url: input.toString(),
|
|
withCredentials: false,
|
|
CONNECTING: 0,
|
|
OPEN: 1,
|
|
CLOSED: 2,
|
|
addEventListener: () => {
|
|
throw new Error("Method not implemented.");
|
|
},
|
|
dispatchEvent: () => {
|
|
throw new Error("Method not implemented.");
|
|
},
|
|
removeEventListener: () => {
|
|
throw new Error("Method not implemented.");
|
|
}
|
|
};
|
|
|
|
stream(input, init)
|
|
.then(async (res) => {
|
|
instance.readyState = instance.OPEN;
|
|
try {
|
|
for await (const chunk of res) {
|
|
|
|
instance.onmessage && instance.onmessage(chunk);
|
|
}
|
|
instance.readyState = instance.CLOSED;
|
|
} catch (e) {
|
|
instance.onerror && instance.onerror(e as Event);
|
|
instance.readyState = instance.CLOSED;
|
|
}
|
|
})
|
|
.catch((e) => {
|
|
console.error(e);
|
|
instance.onerror && instance.onerror(e as Event);
|
|
instance.readyState = instance.CLOSED;
|
|
});
|
|
|
|
return instance as EventSource;
|
|
}
|
|
|