Spaces:
Running
Running
""" | |
Support for streaming http requests in emscripten. | |
A few caveats - | |
If your browser (or Node.js) has WebAssembly JavaScript Promise Integration enabled | |
https://github.com/WebAssembly/js-promise-integration/blob/main/proposals/js-promise-integration/Overview.md | |
*and* you launch pyodide using `pyodide.runPythonAsync`, this will fetch data using the | |
JavaScript asynchronous fetch api (wrapped via `pyodide.ffi.call_sync`). In this case | |
timeouts and streaming should just work. | |
Otherwise, it uses a combination of XMLHttpRequest and a web-worker for streaming. | |
This approach has several caveats: | |
Firstly, you can't do streaming http in the main UI thread, because atomics.wait isn't allowed. | |
Streaming only works if you're running pyodide in a web worker. | |
Secondly, this uses an extra web worker and SharedArrayBuffer to do the asynchronous fetch | |
operation, so it requires that you have crossOriginIsolation enabled, by serving over https | |
(or from localhost) with the two headers below set: | |
Cross-Origin-Opener-Policy: same-origin | |
Cross-Origin-Embedder-Policy: require-corp | |
You can tell if cross origin isolation is successfully enabled by looking at the global crossOriginIsolated variable in | |
JavaScript console. If it isn't, streaming requests will fallback to XMLHttpRequest, i.e. getting the whole | |
request into a buffer and then returning it. it shows a warning in the JavaScript console in this case. | |
Finally, the webworker which does the streaming fetch is created on initial import, but will only be started once | |
control is returned to javascript. Call `await wait_for_streaming_ready()` to wait for streaming fetch. | |
NB: in this code, there are a lot of JavaScript objects. They are named js_* | |
to make it clear what type of object they are. | |
""" | |
from __future__ import annotations | |
import io | |
import json | |
from email.parser import Parser | |
from importlib.resources import files | |
from typing import TYPE_CHECKING, Any | |
import js # type: ignore[import-not-found] | |
from pyodide.ffi import ( # type: ignore[import-not-found] | |
JsArray, | |
JsException, | |
JsProxy, | |
to_js, | |
) | |
if TYPE_CHECKING: | |
from typing_extensions import Buffer | |
from .request import EmscriptenRequest | |
from .response import EmscriptenResponse | |
""" | |
There are some headers that trigger unintended CORS preflight requests. | |
See also https://github.com/koenvo/pyodide-http/issues/22 | |
""" | |
HEADERS_TO_IGNORE = ("user-agent",) | |
SUCCESS_HEADER = -1 | |
SUCCESS_EOF = -2 | |
ERROR_TIMEOUT = -3 | |
ERROR_EXCEPTION = -4 | |
_STREAMING_WORKER_CODE = ( | |
files(__package__) | |
.joinpath("emscripten_fetch_worker.js") | |
.read_text(encoding="utf-8") | |
) | |
class _RequestError(Exception): | |
def __init__( | |
self, | |
message: str | None = None, | |
*, | |
request: EmscriptenRequest | None = None, | |
response: EmscriptenResponse | None = None, | |
): | |
self.request = request | |
self.response = response | |
self.message = message | |
super().__init__(self.message) | |
class _StreamingError(_RequestError): | |
pass | |
class _TimeoutError(_RequestError): | |
pass | |
def _obj_from_dict(dict_val: dict[str, Any]) -> JsProxy: | |
return to_js(dict_val, dict_converter=js.Object.fromEntries) | |
class _ReadStream(io.RawIOBase): | |
def __init__( | |
self, | |
int_buffer: JsArray, | |
byte_buffer: JsArray, | |
timeout: float, | |
worker: JsProxy, | |
connection_id: int, | |
request: EmscriptenRequest, | |
): | |
self.int_buffer = int_buffer | |
self.byte_buffer = byte_buffer | |
self.read_pos = 0 | |
self.read_len = 0 | |
self.connection_id = connection_id | |
self.worker = worker | |
self.timeout = int(1000 * timeout) if timeout > 0 else None | |
self.is_live = True | |
self._is_closed = False | |
self.request: EmscriptenRequest | None = request | |
def __del__(self) -> None: | |
self.close() | |
# this is compatible with _base_connection | |
def is_closed(self) -> bool: | |
return self._is_closed | |
# for compatibility with RawIOBase | |
def closed(self) -> bool: | |
return self.is_closed() | |
def close(self) -> None: | |
if self.is_closed(): | |
return | |
self.read_len = 0 | |
self.read_pos = 0 | |
self.int_buffer = None | |
self.byte_buffer = None | |
self._is_closed = True | |
self.request = None | |
if self.is_live: | |
self.worker.postMessage(_obj_from_dict({"close": self.connection_id})) | |
self.is_live = False | |
super().close() | |
def readable(self) -> bool: | |
return True | |
def writable(self) -> bool: | |
return False | |
def seekable(self) -> bool: | |
return False | |
def readinto(self, byte_obj: Buffer) -> int: | |
if not self.int_buffer: | |
raise _StreamingError( | |
"No buffer for stream in _ReadStream.readinto", | |
request=self.request, | |
response=None, | |
) | |
if self.read_len == 0: | |
# wait for the worker to send something | |
js.Atomics.store(self.int_buffer, 0, ERROR_TIMEOUT) | |
self.worker.postMessage(_obj_from_dict({"getMore": self.connection_id})) | |
if ( | |
js.Atomics.wait(self.int_buffer, 0, ERROR_TIMEOUT, self.timeout) | |
== "timed-out" | |
): | |
raise _TimeoutError | |
data_len = self.int_buffer[0] | |
if data_len > 0: | |
self.read_len = data_len | |
self.read_pos = 0 | |
elif data_len == ERROR_EXCEPTION: | |
string_len = self.int_buffer[1] | |
# decode the error string | |
js_decoder = js.TextDecoder.new() | |
json_str = js_decoder.decode(self.byte_buffer.slice(0, string_len)) | |
raise _StreamingError( | |
f"Exception thrown in fetch: {json_str}", | |
request=self.request, | |
response=None, | |
) | |
else: | |
# EOF, free the buffers and return zero | |
# and free the request | |
self.is_live = False | |
self.close() | |
return 0 | |
# copy from int32array to python bytes | |
ret_length = min(self.read_len, len(memoryview(byte_obj))) | |
subarray = self.byte_buffer.subarray( | |
self.read_pos, self.read_pos + ret_length | |
).to_py() | |
memoryview(byte_obj)[0:ret_length] = subarray | |
self.read_len -= ret_length | |
self.read_pos += ret_length | |
return ret_length | |
class _StreamingFetcher: | |
def __init__(self) -> None: | |
# make web-worker and data buffer on startup | |
self.streaming_ready = False | |
js_data_blob = js.Blob.new( | |
to_js([_STREAMING_WORKER_CODE], create_pyproxies=False), | |
_obj_from_dict({"type": "application/javascript"}), | |
) | |
def promise_resolver(js_resolve_fn: JsProxy, js_reject_fn: JsProxy) -> None: | |
def onMsg(e: JsProxy) -> None: | |
self.streaming_ready = True | |
js_resolve_fn(e) | |
def onErr(e: JsProxy) -> None: | |
js_reject_fn(e) # Defensive: never happens in ci | |
self.js_worker.onmessage = onMsg | |
self.js_worker.onerror = onErr | |
js_data_url = js.URL.createObjectURL(js_data_blob) | |
self.js_worker = js.globalThis.Worker.new(js_data_url) | |
self.js_worker_ready_promise = js.globalThis.Promise.new(promise_resolver) | |
def send(self, request: EmscriptenRequest) -> EmscriptenResponse: | |
headers = { | |
k: v for k, v in request.headers.items() if k not in HEADERS_TO_IGNORE | |
} | |
body = request.body | |
fetch_data = {"headers": headers, "body": to_js(body), "method": request.method} | |
# start the request off in the worker | |
timeout = int(1000 * request.timeout) if request.timeout > 0 else None | |
js_shared_buffer = js.SharedArrayBuffer.new(1048576) | |
js_int_buffer = js.Int32Array.new(js_shared_buffer) | |
js_byte_buffer = js.Uint8Array.new(js_shared_buffer, 8) | |
js.Atomics.store(js_int_buffer, 0, ERROR_TIMEOUT) | |
js.Atomics.notify(js_int_buffer, 0) | |
js_absolute_url = js.URL.new(request.url, js.location).href | |
self.js_worker.postMessage( | |
_obj_from_dict( | |
{ | |
"buffer": js_shared_buffer, | |
"url": js_absolute_url, | |
"fetchParams": fetch_data, | |
} | |
) | |
) | |
# wait for the worker to send something | |
js.Atomics.wait(js_int_buffer, 0, ERROR_TIMEOUT, timeout) | |
if js_int_buffer[0] == ERROR_TIMEOUT: | |
raise _TimeoutError( | |
"Timeout connecting to streaming request", | |
request=request, | |
response=None, | |
) | |
elif js_int_buffer[0] == SUCCESS_HEADER: | |
# got response | |
# header length is in second int of intBuffer | |
string_len = js_int_buffer[1] | |
# decode the rest to a JSON string | |
js_decoder = js.TextDecoder.new() | |
# this does a copy (the slice) because decode can't work on shared array | |
# for some silly reason | |
json_str = js_decoder.decode(js_byte_buffer.slice(0, string_len)) | |
# get it as an object | |
response_obj = json.loads(json_str) | |
return EmscriptenResponse( | |
request=request, | |
status_code=response_obj["status"], | |
headers=response_obj["headers"], | |
body=_ReadStream( | |
js_int_buffer, | |
js_byte_buffer, | |
request.timeout, | |
self.js_worker, | |
response_obj["connectionID"], | |
request, | |
), | |
) | |
elif js_int_buffer[0] == ERROR_EXCEPTION: | |
string_len = js_int_buffer[1] | |
# decode the error string | |
js_decoder = js.TextDecoder.new() | |
json_str = js_decoder.decode(js_byte_buffer.slice(0, string_len)) | |
raise _StreamingError( | |
f"Exception thrown in fetch: {json_str}", request=request, response=None | |
) | |
else: | |
raise _StreamingError( | |
f"Unknown status from worker in fetch: {js_int_buffer[0]}", | |
request=request, | |
response=None, | |
) | |
class _JSPIReadStream(io.RawIOBase): | |
""" | |
A read stream that uses pyodide.ffi.run_sync to read from a JavaScript fetch | |
response. This requires support for WebAssembly JavaScript Promise Integration | |
in the containing browser, and for pyodide to be launched via runPythonAsync. | |
:param js_read_stream: | |
The JavaScript stream reader | |
:param timeout: | |
Timeout in seconds | |
:param request: | |
The request we're handling | |
:param response: | |
The response this stream relates to | |
:param js_abort_controller: | |
A JavaScript AbortController object, used for timeouts | |
""" | |
def __init__( | |
self, | |
js_read_stream: Any, | |
timeout: float, | |
request: EmscriptenRequest, | |
response: EmscriptenResponse, | |
js_abort_controller: Any, # JavaScript AbortController for timeouts | |
): | |
self.js_read_stream = js_read_stream | |
self.timeout = timeout | |
self._is_closed = False | |
self._is_done = False | |
self.request: EmscriptenRequest | None = request | |
self.response: EmscriptenResponse | None = response | |
self.current_buffer = None | |
self.current_buffer_pos = 0 | |
self.js_abort_controller = js_abort_controller | |
def __del__(self) -> None: | |
self.close() | |
# this is compatible with _base_connection | |
def is_closed(self) -> bool: | |
return self._is_closed | |
# for compatibility with RawIOBase | |
def closed(self) -> bool: | |
return self.is_closed() | |
def close(self) -> None: | |
if self.is_closed(): | |
return | |
self.read_len = 0 | |
self.read_pos = 0 | |
self.js_read_stream.cancel() | |
self.js_read_stream = None | |
self._is_closed = True | |
self._is_done = True | |
self.request = None | |
self.response = None | |
super().close() | |
def readable(self) -> bool: | |
return True | |
def writable(self) -> bool: | |
return False | |
def seekable(self) -> bool: | |
return False | |
def _get_next_buffer(self) -> bool: | |
result_js = _run_sync_with_timeout( | |
self.js_read_stream.read(), | |
self.timeout, | |
self.js_abort_controller, | |
request=self.request, | |
response=self.response, | |
) | |
if result_js.done: | |
self._is_done = True | |
return False | |
else: | |
self.current_buffer = result_js.value.to_py() | |
self.current_buffer_pos = 0 | |
return True | |
def readinto(self, byte_obj: Buffer) -> int: | |
if self.current_buffer is None: | |
if not self._get_next_buffer() or self.current_buffer is None: | |
self.close() | |
return 0 | |
ret_length = min( | |
len(byte_obj), len(self.current_buffer) - self.current_buffer_pos | |
) | |
byte_obj[0:ret_length] = self.current_buffer[ | |
self.current_buffer_pos : self.current_buffer_pos + ret_length | |
] | |
self.current_buffer_pos += ret_length | |
if self.current_buffer_pos == len(self.current_buffer): | |
self.current_buffer = None | |
return ret_length | |
# check if we are in a worker or not | |
def is_in_browser_main_thread() -> bool: | |
return hasattr(js, "window") and hasattr(js, "self") and js.self == js.window | |
def is_cross_origin_isolated() -> bool: | |
return hasattr(js, "crossOriginIsolated") and js.crossOriginIsolated | |
def is_in_node() -> bool: | |
return ( | |
hasattr(js, "process") | |
and hasattr(js.process, "release") | |
and hasattr(js.process.release, "name") | |
and js.process.release.name == "node" | |
) | |
def is_worker_available() -> bool: | |
return hasattr(js, "Worker") and hasattr(js, "Blob") | |
_fetcher: _StreamingFetcher | None = None | |
if is_worker_available() and ( | |
(is_cross_origin_isolated() and not is_in_browser_main_thread()) | |
and (not is_in_node()) | |
): | |
_fetcher = _StreamingFetcher() | |
else: | |
_fetcher = None | |
NODE_JSPI_ERROR = ( | |
"urllib3 only works in Node.js with pyodide.runPythonAsync" | |
" and requires the flag --experimental-wasm-stack-switching in " | |
" versions of node <24." | |
) | |
def send_streaming_request(request: EmscriptenRequest) -> EmscriptenResponse | None: | |
if has_jspi(): | |
return send_jspi_request(request, True) | |
elif is_in_node(): | |
raise _RequestError( | |
message=NODE_JSPI_ERROR, | |
request=request, | |
response=None, | |
) | |
if _fetcher and streaming_ready(): | |
return _fetcher.send(request) | |
else: | |
_show_streaming_warning() | |
return None | |
_SHOWN_TIMEOUT_WARNING = False | |
def _show_timeout_warning() -> None: | |
global _SHOWN_TIMEOUT_WARNING | |
if not _SHOWN_TIMEOUT_WARNING: | |
_SHOWN_TIMEOUT_WARNING = True | |
message = "Warning: Timeout is not available on main browser thread" | |
js.console.warn(message) | |
_SHOWN_STREAMING_WARNING = False | |
def _show_streaming_warning() -> None: | |
global _SHOWN_STREAMING_WARNING | |
if not _SHOWN_STREAMING_WARNING: | |
_SHOWN_STREAMING_WARNING = True | |
message = "Can't stream HTTP requests because: \n" | |
if not is_cross_origin_isolated(): | |
message += " Page is not cross-origin isolated\n" | |
if is_in_browser_main_thread(): | |
message += " Python is running in main browser thread\n" | |
if not is_worker_available(): | |
message += " Worker or Blob classes are not available in this environment." # Defensive: this is always False in browsers that we test in | |
if streaming_ready() is False: | |
message += """ Streaming fetch worker isn't ready. If you want to be sure that streaming fetch | |
is working, you need to call: 'await urllib3.contrib.emscripten.fetch.wait_for_streaming_ready()`""" | |
from js import console | |
console.warn(message) | |
def send_request(request: EmscriptenRequest) -> EmscriptenResponse: | |
if has_jspi(): | |
return send_jspi_request(request, False) | |
elif is_in_node(): | |
raise _RequestError( | |
message=NODE_JSPI_ERROR, | |
request=request, | |
response=None, | |
) | |
try: | |
js_xhr = js.XMLHttpRequest.new() | |
if not is_in_browser_main_thread(): | |
js_xhr.responseType = "arraybuffer" | |
if request.timeout: | |
js_xhr.timeout = int(request.timeout * 1000) | |
else: | |
js_xhr.overrideMimeType("text/plain; charset=ISO-8859-15") | |
if request.timeout: | |
# timeout isn't available on the main thread - show a warning in console | |
# if it is set | |
_show_timeout_warning() | |
js_xhr.open(request.method, request.url, False) | |
for name, value in request.headers.items(): | |
if name.lower() not in HEADERS_TO_IGNORE: | |
js_xhr.setRequestHeader(name, value) | |
js_xhr.send(to_js(request.body)) | |
headers = dict(Parser().parsestr(js_xhr.getAllResponseHeaders())) | |
if not is_in_browser_main_thread(): | |
body = js_xhr.response.to_py().tobytes() | |
else: | |
body = js_xhr.response.encode("ISO-8859-15") | |
return EmscriptenResponse( | |
status_code=js_xhr.status, headers=headers, body=body, request=request | |
) | |
except JsException as err: | |
if err.name == "TimeoutError": | |
raise _TimeoutError(err.message, request=request) | |
elif err.name == "NetworkError": | |
raise _RequestError(err.message, request=request) | |
else: | |
# general http error | |
raise _RequestError(err.message, request=request) | |
def send_jspi_request( | |
request: EmscriptenRequest, streaming: bool | |
) -> EmscriptenResponse: | |
""" | |
Send a request using WebAssembly JavaScript Promise Integration | |
to wrap the asynchronous JavaScript fetch api (experimental). | |
:param request: | |
Request to send | |
:param streaming: | |
Whether to stream the response | |
:return: The response object | |
:rtype: EmscriptenResponse | |
""" | |
timeout = request.timeout | |
js_abort_controller = js.AbortController.new() | |
headers = {k: v for k, v in request.headers.items() if k not in HEADERS_TO_IGNORE} | |
req_body = request.body | |
fetch_data = { | |
"headers": headers, | |
"body": to_js(req_body), | |
"method": request.method, | |
"signal": js_abort_controller.signal, | |
} | |
# Call JavaScript fetch (async api, returns a promise) | |
fetcher_promise_js = js.fetch(request.url, _obj_from_dict(fetch_data)) | |
# Now suspend WebAssembly until we resolve that promise | |
# or time out. | |
response_js = _run_sync_with_timeout( | |
fetcher_promise_js, | |
timeout, | |
js_abort_controller, | |
request=request, | |
response=None, | |
) | |
headers = {} | |
header_iter = response_js.headers.entries() | |
while True: | |
iter_value_js = header_iter.next() | |
if getattr(iter_value_js, "done", False): | |
break | |
else: | |
headers[str(iter_value_js.value[0])] = str(iter_value_js.value[1]) | |
status_code = response_js.status | |
body: bytes | io.RawIOBase = b"" | |
response = EmscriptenResponse( | |
status_code=status_code, headers=headers, body=b"", request=request | |
) | |
if streaming: | |
# get via inputstream | |
if response_js.body is not None: | |
# get a reader from the fetch response | |
body_stream_js = response_js.body.getReader() | |
body = _JSPIReadStream( | |
body_stream_js, timeout, request, response, js_abort_controller | |
) | |
else: | |
# get directly via arraybuffer | |
# n.b. this is another async JavaScript call. | |
body = _run_sync_with_timeout( | |
response_js.arrayBuffer(), | |
timeout, | |
js_abort_controller, | |
request=request, | |
response=response, | |
).to_py() | |
response.body = body | |
return response | |
def _run_sync_with_timeout( | |
promise: Any, | |
timeout: float, | |
js_abort_controller: Any, | |
request: EmscriptenRequest | None, | |
response: EmscriptenResponse | None, | |
) -> Any: | |
""" | |
Await a JavaScript promise synchronously with a timeout which is implemented | |
via the AbortController | |
:param promise: | |
Javascript promise to await | |
:param timeout: | |
Timeout in seconds | |
:param js_abort_controller: | |
A JavaScript AbortController object, used on timeout | |
:param request: | |
The request being handled | |
:param response: | |
The response being handled (if it exists yet) | |
:raises _TimeoutError: If the request times out | |
:raises _RequestError: If the request raises a JavaScript exception | |
:return: The result of awaiting the promise. | |
""" | |
timer_id = None | |
if timeout > 0: | |
timer_id = js.setTimeout( | |
js_abort_controller.abort.bind(js_abort_controller), int(timeout * 1000) | |
) | |
try: | |
from pyodide.ffi import run_sync | |
# run_sync here uses WebAssembly JavaScript Promise Integration to | |
# suspend python until the JavaScript promise resolves. | |
return run_sync(promise) | |
except JsException as err: | |
if err.name == "AbortError": | |
raise _TimeoutError( | |
message="Request timed out", request=request, response=response | |
) | |
else: | |
raise _RequestError(message=err.message, request=request, response=response) | |
finally: | |
if timer_id is not None: | |
js.clearTimeout(timer_id) | |
def has_jspi() -> bool: | |
""" | |
Return true if jspi can be used. | |
This requires both browser support and also WebAssembly | |
to be in the correct state - i.e. that the javascript | |
call into python was async not sync. | |
:return: True if jspi can be used. | |
:rtype: bool | |
""" | |
try: | |
from pyodide.ffi import can_run_sync, run_sync # noqa: F401 | |
return bool(can_run_sync()) | |
except ImportError: | |
return False | |
def streaming_ready() -> bool | None: | |
if _fetcher: | |
return _fetcher.streaming_ready | |
else: | |
return None # no fetcher, return None to signify that | |
async def wait_for_streaming_ready() -> bool: | |
if _fetcher: | |
await _fetcher.js_worker_ready_promise | |
return True | |
else: | |
return False | |