|
from typing import Any, cast, Dict, List, Optional, Tuple, Type |
|
|
|
import pytest |
|
|
|
from .._connection import _body_framing, _keep_alive, Connection, NEED_DATA, PAUSED |
|
from .._events import ( |
|
ConnectionClosed, |
|
Data, |
|
EndOfMessage, |
|
Event, |
|
InformationalResponse, |
|
Request, |
|
Response, |
|
) |
|
from .._state import ( |
|
CLIENT, |
|
CLOSED, |
|
DONE, |
|
ERROR, |
|
IDLE, |
|
MIGHT_SWITCH_PROTOCOL, |
|
MUST_CLOSE, |
|
SEND_BODY, |
|
SEND_RESPONSE, |
|
SERVER, |
|
SWITCHED_PROTOCOL, |
|
) |
|
from .._util import LocalProtocolError, RemoteProtocolError, Sentinel |
|
from .helpers import ConnectionPair, get_all_events, receive_and_get |
|
|
|
|
|
def test__keep_alive() -> None: |
|
assert _keep_alive( |
|
Request(method="GET", target="/", headers=[("Host", "Example.com")]) |
|
) |
|
assert not _keep_alive( |
|
Request( |
|
method="GET", |
|
target="/", |
|
headers=[("Host", "Example.com"), ("Connection", "close")], |
|
) |
|
) |
|
assert not _keep_alive( |
|
Request( |
|
method="GET", |
|
target="/", |
|
headers=[("Host", "Example.com"), ("Connection", "a, b, cLOse, foo")], |
|
) |
|
) |
|
assert not _keep_alive( |
|
Request(method="GET", target="/", headers=[], http_version="1.0") |
|
) |
|
|
|
assert _keep_alive(Response(status_code=200, headers=[])) |
|
assert not _keep_alive(Response(status_code=200, headers=[("Connection", "close")])) |
|
assert not _keep_alive( |
|
Response(status_code=200, headers=[("Connection", "a, b, cLOse, foo")]) |
|
) |
|
assert not _keep_alive(Response(status_code=200, headers=[], http_version="1.0")) |
|
|
|
|
|
def test__body_framing() -> None: |
|
def headers(cl: Optional[int], te: bool) -> List[Tuple[str, str]]: |
|
headers = [] |
|
if cl is not None: |
|
headers.append(("Content-Length", str(cl))) |
|
if te: |
|
headers.append(("Transfer-Encoding", "chunked")) |
|
return headers |
|
|
|
def resp( |
|
status_code: int = 200, cl: Optional[int] = None, te: bool = False |
|
) -> Response: |
|
return Response(status_code=status_code, headers=headers(cl, te)) |
|
|
|
def req(cl: Optional[int] = None, te: bool = False) -> Request: |
|
h = headers(cl, te) |
|
h += [("Host", "example.com")] |
|
return Request(method="GET", target="/", headers=h) |
|
|
|
|
|
for kwargs in [{}, {"cl": 100}, {"te": True}, {"cl": 100, "te": True}]: |
|
kwargs = cast(Dict[str, Any], kwargs) |
|
for meth, r in [ |
|
(b"HEAD", resp(**kwargs)), |
|
(b"GET", resp(status_code=204, **kwargs)), |
|
(b"GET", resp(status_code=304, **kwargs)), |
|
]: |
|
assert _body_framing(meth, r) == ("content-length", (0,)) |
|
|
|
|
|
for kwargs in [{"te": True}, {"cl": 100, "te": True}]: |
|
kwargs = cast(Dict[str, Any], kwargs) |
|
for meth, r in [(None, req(**kwargs)), (b"GET", resp(**kwargs))]: |
|
assert _body_framing(meth, r) == ("chunked", ()) |
|
|
|
|
|
for meth, r in [(None, req(cl=100)), (b"GET", resp(cl=100))]: |
|
assert _body_framing(meth, r) == ("content-length", (100,)) |
|
|
|
|
|
assert _body_framing(None, req()) == ("content-length", (0,)) |
|
assert _body_framing(b"GET", resp()) == ("http/1.0", ()) |
|
|
|
|
|
def test_Connection_basics_and_content_length() -> None: |
|
with pytest.raises(ValueError): |
|
Connection("CLIENT") |
|
|
|
p = ConnectionPair() |
|
assert p.conn[CLIENT].our_role is CLIENT |
|
assert p.conn[CLIENT].their_role is SERVER |
|
assert p.conn[SERVER].our_role is SERVER |
|
assert p.conn[SERVER].their_role is CLIENT |
|
|
|
data = p.send( |
|
CLIENT, |
|
Request( |
|
method="GET", |
|
target="/", |
|
headers=[("Host", "example.com"), ("Content-Length", "10")], |
|
), |
|
) |
|
assert data == ( |
|
b"GET / HTTP/1.1\r\n" b"Host: example.com\r\n" b"Content-Length: 10\r\n\r\n" |
|
) |
|
|
|
for conn in p.conns: |
|
assert conn.states == {CLIENT: SEND_BODY, SERVER: SEND_RESPONSE} |
|
assert p.conn[CLIENT].our_state is SEND_BODY |
|
assert p.conn[CLIENT].their_state is SEND_RESPONSE |
|
assert p.conn[SERVER].our_state is SEND_RESPONSE |
|
assert p.conn[SERVER].their_state is SEND_BODY |
|
|
|
assert p.conn[CLIENT].their_http_version is None |
|
assert p.conn[SERVER].their_http_version == b"1.1" |
|
|
|
data = p.send(SERVER, InformationalResponse(status_code=100, headers=[])) |
|
assert data == b"HTTP/1.1 100 \r\n\r\n" |
|
|
|
data = p.send(SERVER, Response(status_code=200, headers=[("Content-Length", "11")])) |
|
assert data == b"HTTP/1.1 200 \r\nContent-Length: 11\r\n\r\n" |
|
|
|
for conn in p.conns: |
|
assert conn.states == {CLIENT: SEND_BODY, SERVER: SEND_BODY} |
|
|
|
assert p.conn[CLIENT].their_http_version == b"1.1" |
|
assert p.conn[SERVER].their_http_version == b"1.1" |
|
|
|
data = p.send(CLIENT, Data(data=b"12345")) |
|
assert data == b"12345" |
|
data = p.send( |
|
CLIENT, Data(data=b"67890"), expect=[Data(data=b"67890"), EndOfMessage()] |
|
) |
|
assert data == b"67890" |
|
data = p.send(CLIENT, EndOfMessage(), expect=[]) |
|
assert data == b"" |
|
|
|
for conn in p.conns: |
|
assert conn.states == {CLIENT: DONE, SERVER: SEND_BODY} |
|
|
|
data = p.send(SERVER, Data(data=b"1234567890")) |
|
assert data == b"1234567890" |
|
data = p.send(SERVER, Data(data=b"1"), expect=[Data(data=b"1"), EndOfMessage()]) |
|
assert data == b"1" |
|
data = p.send(SERVER, EndOfMessage(), expect=[]) |
|
assert data == b"" |
|
|
|
for conn in p.conns: |
|
assert conn.states == {CLIENT: DONE, SERVER: DONE} |
|
|
|
|
|
def test_chunked() -> None: |
|
p = ConnectionPair() |
|
|
|
p.send( |
|
CLIENT, |
|
Request( |
|
method="GET", |
|
target="/", |
|
headers=[("Host", "example.com"), ("Transfer-Encoding", "chunked")], |
|
), |
|
) |
|
data = p.send(CLIENT, Data(data=b"1234567890", chunk_start=True, chunk_end=True)) |
|
assert data == b"a\r\n1234567890\r\n" |
|
data = p.send(CLIENT, Data(data=b"abcde", chunk_start=True, chunk_end=True)) |
|
assert data == b"5\r\nabcde\r\n" |
|
data = p.send(CLIENT, Data(data=b""), expect=[]) |
|
assert data == b"" |
|
data = p.send(CLIENT, EndOfMessage(headers=[("hello", "there")])) |
|
assert data == b"0\r\nhello: there\r\n\r\n" |
|
|
|
p.send( |
|
SERVER, Response(status_code=200, headers=[("Transfer-Encoding", "chunked")]) |
|
) |
|
p.send(SERVER, Data(data=b"54321", chunk_start=True, chunk_end=True)) |
|
p.send(SERVER, Data(data=b"12345", chunk_start=True, chunk_end=True)) |
|
p.send(SERVER, EndOfMessage()) |
|
|
|
for conn in p.conns: |
|
assert conn.states == {CLIENT: DONE, SERVER: DONE} |
|
|
|
|
|
def test_chunk_boundaries() -> None: |
|
conn = Connection(our_role=SERVER) |
|
|
|
request = ( |
|
b"POST / HTTP/1.1\r\n" |
|
b"Host: example.com\r\n" |
|
b"Transfer-Encoding: chunked\r\n" |
|
b"\r\n" |
|
) |
|
conn.receive_data(request) |
|
assert conn.next_event() == Request( |
|
method="POST", |
|
target="/", |
|
headers=[("Host", "example.com"), ("Transfer-Encoding", "chunked")], |
|
) |
|
assert conn.next_event() is NEED_DATA |
|
|
|
conn.receive_data(b"5\r\nhello\r\n") |
|
assert conn.next_event() == Data(data=b"hello", chunk_start=True, chunk_end=True) |
|
|
|
conn.receive_data(b"5\r\nhel") |
|
assert conn.next_event() == Data(data=b"hel", chunk_start=True, chunk_end=False) |
|
|
|
conn.receive_data(b"l") |
|
assert conn.next_event() == Data(data=b"l", chunk_start=False, chunk_end=False) |
|
|
|
conn.receive_data(b"o\r\n") |
|
assert conn.next_event() == Data(data=b"o", chunk_start=False, chunk_end=True) |
|
|
|
conn.receive_data(b"5\r\nhello") |
|
assert conn.next_event() == Data(data=b"hello", chunk_start=True, chunk_end=True) |
|
|
|
conn.receive_data(b"\r\n") |
|
assert conn.next_event() == NEED_DATA |
|
|
|
conn.receive_data(b"0\r\n\r\n") |
|
assert conn.next_event() == EndOfMessage() |
|
|
|
|
|
def test_client_talking_to_http10_server() -> None: |
|
c = Connection(CLIENT) |
|
c.send(Request(method="GET", target="/", headers=[("Host", "example.com")])) |
|
c.send(EndOfMessage()) |
|
assert c.our_state is DONE |
|
|
|
assert receive_and_get(c, b"HTTP/1.0 200 OK\r\n\r\n") == [ |
|
Response(status_code=200, headers=[], http_version="1.0", reason=b"OK") |
|
] |
|
assert c.our_state is MUST_CLOSE |
|
assert receive_and_get(c, b"12345") == [Data(data=b"12345")] |
|
assert receive_and_get(c, b"67890") == [Data(data=b"67890")] |
|
assert receive_and_get(c, b"") == [EndOfMessage(), ConnectionClosed()] |
|
assert c.their_state is CLOSED |
|
|
|
|
|
def test_server_talking_to_http10_client() -> None: |
|
c = Connection(SERVER) |
|
|
|
|
|
assert receive_and_get(c, b"GET / HTTP/1.0\r\n\r\n") == [ |
|
Request(method="GET", target="/", headers=[], http_version="1.0"), |
|
EndOfMessage(), |
|
] |
|
assert c.their_state is MUST_CLOSE |
|
|
|
|
|
assert ( |
|
c.send(Response(status_code=200, headers=[])) |
|
== b"HTTP/1.1 200 \r\nConnection: close\r\n\r\n" |
|
) |
|
|
|
assert c.send(Data(data=b"12345")) == b"12345" |
|
assert c.send(EndOfMessage()) == b"" |
|
assert c.our_state is MUST_CLOSE |
|
|
|
|
|
c = Connection(SERVER) |
|
|
|
assert receive_and_get(c, b"POST / HTTP/1.0\r\nContent-Length: 10\r\n\r\n1") == [ |
|
Request( |
|
method="POST", |
|
target="/", |
|
headers=[("Content-Length", "10")], |
|
http_version="1.0", |
|
), |
|
Data(data=b"1"), |
|
] |
|
assert receive_and_get(c, b"234567890") == [Data(data=b"234567890"), EndOfMessage()] |
|
assert c.their_state is MUST_CLOSE |
|
assert receive_and_get(c, b"") == [ConnectionClosed()] |
|
|
|
|
|
def test_automatic_transfer_encoding_in_response() -> None: |
|
|
|
|
|
|
|
|
|
for user_headers in [ |
|
[("Transfer-Encoding", "chunked")], |
|
[], |
|
|
|
|
|
[("Transfer-Encoding", "chunked"), ("Content-Length", "100")], |
|
]: |
|
user_headers = cast(List[Tuple[str, str]], user_headers) |
|
p = ConnectionPair() |
|
p.send( |
|
CLIENT, |
|
[ |
|
Request(method="GET", target="/", headers=[("Host", "example.com")]), |
|
EndOfMessage(), |
|
], |
|
) |
|
|
|
|
|
p.send( |
|
SERVER, |
|
Response(status_code=200, headers=user_headers), |
|
expect=Response( |
|
status_code=200, headers=[("Transfer-Encoding", "chunked")] |
|
), |
|
) |
|
|
|
|
|
|
|
c = Connection(SERVER) |
|
receive_and_get(c, b"GET / HTTP/1.0\r\n\r\n") |
|
assert ( |
|
c.send(Response(status_code=200, headers=user_headers)) |
|
== b"HTTP/1.1 200 \r\nConnection: close\r\n\r\n" |
|
) |
|
assert c.send(Data(data=b"12345")) == b"12345" |
|
|
|
|
|
def test_automagic_connection_close_handling() -> None: |
|
p = ConnectionPair() |
|
|
|
|
|
p.send( |
|
CLIENT, |
|
[ |
|
Request( |
|
method="GET", |
|
target="/", |
|
headers=[("Host", "example.com"), ("Connection", "close")], |
|
), |
|
EndOfMessage(), |
|
], |
|
) |
|
for conn in p.conns: |
|
assert conn.states[CLIENT] is MUST_CLOSE |
|
|
|
p.send( |
|
SERVER, |
|
|
|
[Response(status_code=204, headers=[]), EndOfMessage()], |
|
|
|
expect=[ |
|
Response(status_code=204, headers=[("connection", "close")]), |
|
EndOfMessage(), |
|
], |
|
) |
|
for conn in p.conns: |
|
assert conn.states == {CLIENT: MUST_CLOSE, SERVER: MUST_CLOSE} |
|
|
|
|
|
def test_100_continue() -> None: |
|
def setup() -> ConnectionPair: |
|
p = ConnectionPair() |
|
p.send( |
|
CLIENT, |
|
Request( |
|
method="GET", |
|
target="/", |
|
headers=[ |
|
("Host", "example.com"), |
|
("Content-Length", "100"), |
|
("Expect", "100-continue"), |
|
], |
|
), |
|
) |
|
for conn in p.conns: |
|
assert conn.client_is_waiting_for_100_continue |
|
assert not p.conn[CLIENT].they_are_waiting_for_100_continue |
|
assert p.conn[SERVER].they_are_waiting_for_100_continue |
|
return p |
|
|
|
|
|
p = setup() |
|
p.send(SERVER, InformationalResponse(status_code=100, headers=[])) |
|
for conn in p.conns: |
|
assert not conn.client_is_waiting_for_100_continue |
|
assert not conn.they_are_waiting_for_100_continue |
|
|
|
|
|
p = setup() |
|
p.send( |
|
SERVER, Response(status_code=200, headers=[("Transfer-Encoding", "chunked")]) |
|
) |
|
for conn in p.conns: |
|
assert not conn.client_is_waiting_for_100_continue |
|
assert not conn.they_are_waiting_for_100_continue |
|
|
|
|
|
p = setup() |
|
p.send(CLIENT, Data(data=b"12345")) |
|
for conn in p.conns: |
|
assert not conn.client_is_waiting_for_100_continue |
|
assert not conn.they_are_waiting_for_100_continue |
|
|
|
|
|
def test_max_incomplete_event_size_countermeasure() -> None: |
|
|
|
c = Connection(SERVER) |
|
c.receive_data(b"GET / HTTP/1.0\r\nEndless: ") |
|
assert c.next_event() is NEED_DATA |
|
with pytest.raises(RemoteProtocolError): |
|
while True: |
|
c.receive_data(b"a" * 1024) |
|
c.next_event() |
|
|
|
|
|
|
|
c = Connection(SERVER, max_incomplete_event_size=5000) |
|
c.receive_data(b"GET / HTTP/1.0\r\nBig: ") |
|
c.receive_data(b"a" * 4000) |
|
c.receive_data(b"\r\n\r\n") |
|
assert get_all_events(c) == [ |
|
Request( |
|
method="GET", target="/", http_version="1.0", headers=[("big", "a" * 4000)] |
|
), |
|
EndOfMessage(), |
|
] |
|
|
|
c = Connection(SERVER, max_incomplete_event_size=4000) |
|
c.receive_data(b"GET / HTTP/1.0\r\nBig: ") |
|
c.receive_data(b"a" * 4000) |
|
with pytest.raises(RemoteProtocolError): |
|
c.next_event() |
|
|
|
|
|
|
|
c = Connection(SERVER, max_incomplete_event_size=5000) |
|
c.receive_data(b"GET / HTTP/1.0\r\nContent-Length: 10000") |
|
c.receive_data(b"\r\n\r\n" + b"a" * 10000) |
|
assert get_all_events(c) == [ |
|
Request( |
|
method="GET", |
|
target="/", |
|
http_version="1.0", |
|
headers=[("Content-Length", "10000")], |
|
), |
|
Data(data=b"a" * 10000), |
|
EndOfMessage(), |
|
] |
|
|
|
c = Connection(SERVER, max_incomplete_event_size=100) |
|
|
|
|
|
c.receive_data( |
|
b"GET /1 HTTP/1.1\r\nHost: a\r\n\r\n" |
|
b"GET /2 HTTP/1.1\r\nHost: b\r\n\r\n" + b"X" * 1000 |
|
) |
|
assert get_all_events(c) == [ |
|
Request(method="GET", target="/1", headers=[("host", "a")]), |
|
EndOfMessage(), |
|
] |
|
|
|
c.receive_data(b"X" * 1000) |
|
|
|
c.send(Response(status_code=200, headers=[])) |
|
c.send(EndOfMessage()) |
|
c.start_next_cycle() |
|
assert get_all_events(c) == [ |
|
Request(method="GET", target="/2", headers=[("host", "b")]), |
|
EndOfMessage(), |
|
] |
|
|
|
|
|
|
|
c.send(Response(status_code=200, headers=[])) |
|
c.send(EndOfMessage()) |
|
c.start_next_cycle() |
|
with pytest.raises(RemoteProtocolError): |
|
c.next_event() |
|
|
|
|
|
def test_reuse_simple() -> None: |
|
p = ConnectionPair() |
|
p.send( |
|
CLIENT, |
|
[Request(method="GET", target="/", headers=[("Host", "a")]), EndOfMessage()], |
|
) |
|
p.send( |
|
SERVER, |
|
[ |
|
Response(status_code=200, headers=[(b"transfer-encoding", b"chunked")]), |
|
EndOfMessage(), |
|
], |
|
) |
|
for conn in p.conns: |
|
assert conn.states == {CLIENT: DONE, SERVER: DONE} |
|
conn.start_next_cycle() |
|
|
|
p.send( |
|
CLIENT, |
|
[ |
|
Request(method="DELETE", target="/foo", headers=[("Host", "a")]), |
|
EndOfMessage(), |
|
], |
|
) |
|
p.send( |
|
SERVER, |
|
[ |
|
Response(status_code=404, headers=[(b"transfer-encoding", b"chunked")]), |
|
EndOfMessage(), |
|
], |
|
) |
|
|
|
|
|
def test_pipelining() -> None: |
|
|
|
c = Connection(SERVER) |
|
assert c.next_event() is NEED_DATA |
|
|
|
c.receive_data( |
|
b"GET /1 HTTP/1.1\r\nHost: a.com\r\nContent-Length: 5\r\n\r\n" |
|
b"12345" |
|
b"GET /2 HTTP/1.1\r\nHost: a.com\r\nContent-Length: 5\r\n\r\n" |
|
b"67890" |
|
b"GET /3 HTTP/1.1\r\nHost: a.com\r\n\r\n" |
|
) |
|
assert get_all_events(c) == [ |
|
Request( |
|
method="GET", |
|
target="/1", |
|
headers=[("Host", "a.com"), ("Content-Length", "5")], |
|
), |
|
Data(data=b"12345"), |
|
EndOfMessage(), |
|
] |
|
assert c.their_state is DONE |
|
assert c.our_state is SEND_RESPONSE |
|
|
|
assert c.next_event() is PAUSED |
|
|
|
c.send(Response(status_code=200, headers=[])) |
|
c.send(EndOfMessage()) |
|
assert c.their_state is DONE |
|
assert c.our_state is DONE |
|
|
|
c.start_next_cycle() |
|
|
|
assert get_all_events(c) == [ |
|
Request( |
|
method="GET", |
|
target="/2", |
|
headers=[("Host", "a.com"), ("Content-Length", "5")], |
|
), |
|
Data(data=b"67890"), |
|
EndOfMessage(), |
|
] |
|
assert c.next_event() is PAUSED |
|
c.send(Response(status_code=200, headers=[])) |
|
c.send(EndOfMessage()) |
|
c.start_next_cycle() |
|
|
|
assert get_all_events(c) == [ |
|
Request(method="GET", target="/3", headers=[("Host", "a.com")]), |
|
EndOfMessage(), |
|
] |
|
|
|
assert c.next_event() is NEED_DATA |
|
c.send(Response(status_code=200, headers=[])) |
|
c.send(EndOfMessage()) |
|
|
|
|
|
assert c.next_event() is NEED_DATA |
|
c.receive_data(b"SADF") |
|
assert c.next_event() is PAUSED |
|
assert c.trailing_data == (b"SADF", False) |
|
|
|
c.receive_data(b"") |
|
assert c.trailing_data == (b"SADF", True) |
|
assert c.next_event() is PAUSED |
|
c.receive_data(b"") |
|
assert c.next_event() is PAUSED |
|
|
|
with pytest.raises(RuntimeError): |
|
c.receive_data(b"FDSA") |
|
|
|
|
|
def test_protocol_switch() -> None: |
|
for (req, deny, accept) in [ |
|
( |
|
Request( |
|
method="CONNECT", |
|
target="example.com:443", |
|
headers=[("Host", "foo"), ("Content-Length", "1")], |
|
), |
|
Response(status_code=404, headers=[(b"transfer-encoding", b"chunked")]), |
|
Response(status_code=200, headers=[(b"transfer-encoding", b"chunked")]), |
|
), |
|
( |
|
Request( |
|
method="GET", |
|
target="/", |
|
headers=[("Host", "foo"), ("Content-Length", "1"), ("Upgrade", "a, b")], |
|
), |
|
Response(status_code=200, headers=[(b"transfer-encoding", b"chunked")]), |
|
InformationalResponse(status_code=101, headers=[("Upgrade", "a")]), |
|
), |
|
( |
|
Request( |
|
method="CONNECT", |
|
target="example.com:443", |
|
headers=[("Host", "foo"), ("Content-Length", "1"), ("Upgrade", "a, b")], |
|
), |
|
Response(status_code=404, headers=[(b"transfer-encoding", b"chunked")]), |
|
|
|
Response(status_code=200, headers=[(b"transfer-encoding", b"chunked")]), |
|
), |
|
( |
|
Request( |
|
method="CONNECT", |
|
target="example.com:443", |
|
headers=[("Host", "foo"), ("Content-Length", "1"), ("Upgrade", "a, b")], |
|
), |
|
Response(status_code=404, headers=[(b"transfer-encoding", b"chunked")]), |
|
|
|
InformationalResponse(status_code=101, headers=[("Upgrade", "b")]), |
|
), |
|
]: |
|
|
|
def setup() -> ConnectionPair: |
|
p = ConnectionPair() |
|
p.send(CLIENT, req) |
|
|
|
|
|
for conn in p.conns: |
|
assert conn.states[CLIENT] is SEND_BODY |
|
p.send(CLIENT, [Data(data=b"1"), EndOfMessage()]) |
|
for conn in p.conns: |
|
assert conn.states[CLIENT] is MIGHT_SWITCH_PROTOCOL |
|
assert p.conn[SERVER].next_event() is PAUSED |
|
return p |
|
|
|
|
|
p = setup() |
|
p.send(SERVER, deny) |
|
for conn in p.conns: |
|
assert conn.states == {CLIENT: DONE, SERVER: SEND_BODY} |
|
p.send(SERVER, EndOfMessage()) |
|
|
|
for conn in p.conns: |
|
conn.start_next_cycle() |
|
|
|
|
|
p = setup() |
|
p.send(SERVER, accept) |
|
for conn in p.conns: |
|
assert conn.states == {CLIENT: SWITCHED_PROTOCOL, SERVER: SWITCHED_PROTOCOL} |
|
conn.receive_data(b"123") |
|
assert conn.next_event() is PAUSED |
|
conn.receive_data(b"456") |
|
assert conn.next_event() is PAUSED |
|
assert conn.trailing_data == (b"123456", False) |
|
|
|
|
|
|
|
|
|
|
|
p = setup() |
|
sc = p.conn[SERVER] |
|
sc.receive_data(b"GET / HTTP/1.0\r\n\r\n") |
|
assert sc.next_event() is PAUSED |
|
assert sc.trailing_data == (b"GET / HTTP/1.0\r\n\r\n", False) |
|
sc.send(deny) |
|
assert sc.next_event() is PAUSED |
|
sc.send(EndOfMessage()) |
|
sc.start_next_cycle() |
|
assert get_all_events(sc) == [ |
|
Request(method="GET", target="/", headers=[], http_version="1.0"), |
|
EndOfMessage(), |
|
] |
|
|
|
|
|
|
|
|
|
p = setup() |
|
sc = p.conn[SERVER] |
|
sc.receive_data(b"") |
|
assert sc.next_event() is PAUSED |
|
assert sc.trailing_data == (b"", True) |
|
p.send(SERVER, accept) |
|
assert sc.next_event() is PAUSED |
|
|
|
p = setup() |
|
sc = p.conn[SERVER] |
|
sc.receive_data(b"") |
|
assert sc.next_event() is PAUSED |
|
sc.send(deny) |
|
assert sc.next_event() == ConnectionClosed() |
|
|
|
|
|
|
|
p = setup() |
|
with pytest.raises(LocalProtocolError): |
|
p.conn[CLIENT].send( |
|
Request(method="GET", target="/", headers=[("Host", "a")]) |
|
) |
|
p = setup() |
|
p.send(SERVER, accept) |
|
with pytest.raises(LocalProtocolError): |
|
p.conn[SERVER].send(Data(data=b"123")) |
|
|
|
|
|
def test_close_simple() -> None: |
|
|
|
|
|
for (who_shot_first, who_shot_second) in [(CLIENT, SERVER), (SERVER, CLIENT)]: |
|
|
|
def setup() -> ConnectionPair: |
|
p = ConnectionPair() |
|
p.send(who_shot_first, ConnectionClosed()) |
|
for conn in p.conns: |
|
assert conn.states == { |
|
who_shot_first: CLOSED, |
|
who_shot_second: MUST_CLOSE, |
|
} |
|
return p |
|
|
|
|
|
|
|
p = setup() |
|
assert p.conn[who_shot_second].next_event() == ConnectionClosed() |
|
assert p.conn[who_shot_second].next_event() == ConnectionClosed() |
|
p.conn[who_shot_second].receive_data(b"") |
|
assert p.conn[who_shot_second].next_event() == ConnectionClosed() |
|
|
|
p = setup() |
|
p.send(who_shot_second, ConnectionClosed()) |
|
for conn in p.conns: |
|
assert conn.our_state is CLOSED |
|
assert conn.their_state is CLOSED |
|
|
|
|
|
|
|
p = setup() |
|
with pytest.raises(RuntimeError): |
|
p.conn[who_shot_second].receive_data(b"123") |
|
|
|
p = setup() |
|
p.conn[who_shot_first].receive_data(b"GET") |
|
with pytest.raises(RemoteProtocolError): |
|
p.conn[who_shot_first].next_event() |
|
|
|
|
|
def test_close_different_states() -> None: |
|
req = [ |
|
Request(method="GET", target="/foo", headers=[("Host", "a")]), |
|
EndOfMessage(), |
|
] |
|
resp = [ |
|
Response(status_code=200, headers=[(b"transfer-encoding", b"chunked")]), |
|
EndOfMessage(), |
|
] |
|
|
|
|
|
p = ConnectionPair() |
|
p.send(CLIENT, ConnectionClosed()) |
|
for conn in p.conns: |
|
assert conn.states == {CLIENT: CLOSED, SERVER: MUST_CLOSE} |
|
|
|
|
|
p = ConnectionPair() |
|
p.send(CLIENT, req) |
|
p.send(CLIENT, ConnectionClosed()) |
|
for conn in p.conns: |
|
assert conn.states == {CLIENT: CLOSED, SERVER: SEND_RESPONSE} |
|
|
|
|
|
p = ConnectionPair() |
|
p.send(CLIENT, req) |
|
with pytest.raises(LocalProtocolError): |
|
p.conn[SERVER].send(ConnectionClosed()) |
|
p.conn[CLIENT].receive_data(b"") |
|
with pytest.raises(RemoteProtocolError): |
|
p.conn[CLIENT].next_event() |
|
|
|
|
|
p = ConnectionPair() |
|
p.send(CLIENT, req) |
|
p.send(SERVER, resp) |
|
p.send(SERVER, ConnectionClosed()) |
|
for conn in p.conns: |
|
assert conn.states == {CLIENT: MUST_CLOSE, SERVER: CLOSED} |
|
|
|
|
|
p = ConnectionPair() |
|
p.send(CLIENT, req) |
|
p.send(SERVER, resp) |
|
p.send(CLIENT, ConnectionClosed()) |
|
p.send(SERVER, ConnectionClosed()) |
|
p.send(CLIENT, ConnectionClosed()) |
|
p.send(SERVER, ConnectionClosed()) |
|
|
|
|
|
p = ConnectionPair() |
|
p.send( |
|
CLIENT, |
|
Request( |
|
method="GET", target="/", headers=[("Host", "a"), ("Content-Length", "10")] |
|
), |
|
) |
|
with pytest.raises(LocalProtocolError): |
|
p.conn[CLIENT].send(ConnectionClosed()) |
|
p.conn[SERVER].receive_data(b"") |
|
with pytest.raises(RemoteProtocolError): |
|
p.conn[SERVER].next_event() |
|
|
|
|
|
|
|
|
|
def test_pipelined_close() -> None: |
|
c = Connection(SERVER) |
|
|
|
c.receive_data( |
|
b"GET /1 HTTP/1.1\r\nHost: a.com\r\nContent-Length: 5\r\n\r\n" |
|
b"12345" |
|
b"GET /2 HTTP/1.1\r\nHost: a.com\r\nContent-Length: 5\r\n\r\n" |
|
b"67890" |
|
) |
|
c.receive_data(b"") |
|
assert get_all_events(c) == [ |
|
Request( |
|
method="GET", |
|
target="/1", |
|
headers=[("host", "a.com"), ("content-length", "5")], |
|
), |
|
Data(data=b"12345"), |
|
EndOfMessage(), |
|
] |
|
assert c.states[CLIENT] is DONE |
|
c.send(Response(status_code=200, headers=[])) |
|
c.send(EndOfMessage()) |
|
assert c.states[SERVER] is DONE |
|
c.start_next_cycle() |
|
assert get_all_events(c) == [ |
|
Request( |
|
method="GET", |
|
target="/2", |
|
headers=[("host", "a.com"), ("content-length", "5")], |
|
), |
|
Data(data=b"67890"), |
|
EndOfMessage(), |
|
ConnectionClosed(), |
|
] |
|
assert c.states == {CLIENT: CLOSED, SERVER: SEND_RESPONSE} |
|
c.send(Response(status_code=200, headers=[])) |
|
c.send(EndOfMessage()) |
|
assert c.states == {CLIENT: CLOSED, SERVER: MUST_CLOSE} |
|
c.send(ConnectionClosed()) |
|
assert c.states == {CLIENT: CLOSED, SERVER: CLOSED} |
|
|
|
|
|
def test_sendfile() -> None: |
|
class SendfilePlaceholder: |
|
def __len__(self) -> int: |
|
return 10 |
|
|
|
placeholder = SendfilePlaceholder() |
|
|
|
def setup( |
|
header: Tuple[str, str], http_version: str |
|
) -> Tuple[Connection, Optional[List[bytes]]]: |
|
c = Connection(SERVER) |
|
receive_and_get( |
|
c, "GET / HTTP/{}\r\nHost: a\r\n\r\n".format(http_version).encode("ascii") |
|
) |
|
headers = [] |
|
if header: |
|
headers.append(header) |
|
c.send(Response(status_code=200, headers=headers)) |
|
return c, c.send_with_data_passthrough(Data(data=placeholder)) |
|
|
|
c, data = setup(("Content-Length", "10"), "1.1") |
|
assert data == [placeholder] |
|
|
|
|
|
c.send(EndOfMessage()) |
|
|
|
_, data = setup(("Transfer-Encoding", "chunked"), "1.1") |
|
assert placeholder in data |
|
data[data.index(placeholder)] = b"x" * 10 |
|
assert b"".join(data) == b"a\r\nxxxxxxxxxx\r\n" |
|
|
|
c, data = setup(None, "1.0") |
|
assert data == [placeholder] |
|
assert c.our_state is SEND_BODY |
|
|
|
|
|
def test_errors() -> None: |
|
|
|
for role in [CLIENT, SERVER]: |
|
c = Connection(our_role=role) |
|
c.receive_data(b"gibberish\r\n\r\n") |
|
with pytest.raises(RemoteProtocolError): |
|
c.next_event() |
|
|
|
assert c.their_state is ERROR |
|
assert c.our_state is not ERROR |
|
print(c._cstate.states) |
|
with pytest.raises(RemoteProtocolError): |
|
c.next_event() |
|
|
|
if role is SERVER: |
|
assert ( |
|
c.send(Response(status_code=400, headers=[])) |
|
== b"HTTP/1.1 400 \r\nConnection: close\r\n\r\n" |
|
) |
|
|
|
|
|
|
|
|
|
def conn(role: Type[Sentinel]) -> Connection: |
|
c = Connection(our_role=role) |
|
if role is SERVER: |
|
|
|
receive_and_get(c, b"GET / HTTP/1.0\r\n\r\n") |
|
assert c.our_state is SEND_RESPONSE |
|
return c |
|
|
|
for role in [CLIENT, SERVER]: |
|
if role is CLIENT: |
|
|
|
|
|
good = Request(method="GET", target="/", headers=[("Host", "example.com")]) |
|
bad = Request( |
|
method="GET", |
|
target="/", |
|
headers=[("Host", "example.com")], |
|
http_version="1.0", |
|
) |
|
elif role is SERVER: |
|
good = Response(status_code=200, headers=[]) |
|
bad = Response(status_code=200, headers=[], http_version="1.0") |
|
|
|
c = conn(role) |
|
c.send(good) |
|
assert c.our_state is not ERROR |
|
|
|
c = conn(role) |
|
with pytest.raises(LocalProtocolError): |
|
c.send(bad) |
|
assert c.our_state is ERROR |
|
assert c.their_state is not ERROR |
|
|
|
with pytest.raises(LocalProtocolError): |
|
c.send(good) |
|
|
|
|
|
c = conn(role) |
|
c.send_failed() |
|
assert c.our_state is ERROR |
|
assert c.their_state is not ERROR |
|
|
|
c.send_failed() |
|
assert c.our_state is ERROR |
|
assert c.their_state is not ERROR |
|
|
|
|
|
def test_idle_receive_nothing() -> None: |
|
|
|
for role in [CLIENT, SERVER]: |
|
c = Connection(role) |
|
assert c.next_event() is NEED_DATA |
|
|
|
|
|
def test_connection_drop() -> None: |
|
c = Connection(SERVER) |
|
c.receive_data(b"GET /") |
|
assert c.next_event() is NEED_DATA |
|
c.receive_data(b"") |
|
with pytest.raises(RemoteProtocolError): |
|
c.next_event() |
|
|
|
|
|
def test_408_request_timeout() -> None: |
|
|
|
|
|
p = ConnectionPair() |
|
p.send(SERVER, Response(status_code=408, headers=[(b"connection", b"close")])) |
|
|
|
|
|
|
|
def test_empty_request() -> None: |
|
c = Connection(SERVER) |
|
c.receive_data(b"\r\n") |
|
with pytest.raises(RemoteProtocolError): |
|
c.next_event() |
|
|
|
|
|
|
|
def test_empty_response() -> None: |
|
c = Connection(CLIENT) |
|
c.send(Request(method="GET", target="/", headers=[("Host", "a")])) |
|
c.receive_data(b"\r\n") |
|
with pytest.raises(RemoteProtocolError): |
|
c.next_event() |
|
|
|
|
|
@pytest.mark.parametrize( |
|
"data", |
|
[ |
|
b"\x00", |
|
b"\x20", |
|
b"\x16\x03\x01\x00\xa5", |
|
], |
|
) |
|
def test_early_detection_of_invalid_request(data: bytes) -> None: |
|
c = Connection(SERVER) |
|
|
|
c.receive_data(data) |
|
with pytest.raises(RemoteProtocolError): |
|
c.next_event() |
|
|
|
|
|
@pytest.mark.parametrize( |
|
"data", |
|
[ |
|
b"\x00", |
|
b"\x20", |
|
b"\x16\x03\x03\x00\x31", |
|
], |
|
) |
|
def test_early_detection_of_invalid_response(data: bytes) -> None: |
|
c = Connection(CLIENT) |
|
|
|
c.receive_data(data) |
|
with pytest.raises(RemoteProtocolError): |
|
c.next_event() |
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_HEAD_framing_headers() -> None: |
|
def setup(method: bytes, http_version: bytes) -> Connection: |
|
c = Connection(SERVER) |
|
c.receive_data( |
|
method + b" / HTTP/" + http_version + b"\r\n" + b"Host: example.com\r\n\r\n" |
|
) |
|
assert type(c.next_event()) is Request |
|
assert type(c.next_event()) is EndOfMessage |
|
return c |
|
|
|
for method in [b"GET", b"HEAD"]: |
|
|
|
c = setup(method, b"1.1") |
|
assert ( |
|
c.send(Response(status_code=200, headers=[])) == b"HTTP/1.1 200 \r\n" |
|
b"Transfer-Encoding: chunked\r\n\r\n" |
|
) |
|
|
|
|
|
c = setup(method, b"1.0") |
|
assert ( |
|
c.send(Response(status_code=200, headers=[])) == b"HTTP/1.1 200 \r\n" |
|
b"Connection: close\r\n\r\n" |
|
) |
|
|
|
|
|
c = setup(method, b"1.1") |
|
assert ( |
|
c.send( |
|
Response( |
|
status_code=200, |
|
headers=[ |
|
("Content-Length", "100"), |
|
("Transfer-Encoding", "chunked"), |
|
], |
|
) |
|
) |
|
== b"HTTP/1.1 200 \r\n" |
|
b"Transfer-Encoding: chunked\r\n\r\n" |
|
) |
|
|
|
|
|
def test_special_exceptions_for_lost_connection_in_message_body() -> None: |
|
c = Connection(SERVER) |
|
c.receive_data( |
|
b"POST / HTTP/1.1\r\n" b"Host: example.com\r\n" b"Content-Length: 100\r\n\r\n" |
|
) |
|
assert type(c.next_event()) is Request |
|
assert c.next_event() is NEED_DATA |
|
c.receive_data(b"12345") |
|
assert c.next_event() == Data(data=b"12345") |
|
c.receive_data(b"") |
|
with pytest.raises(RemoteProtocolError) as excinfo: |
|
c.next_event() |
|
assert "received 5 bytes" in str(excinfo.value) |
|
assert "expected 100" in str(excinfo.value) |
|
|
|
c = Connection(SERVER) |
|
c.receive_data( |
|
b"POST / HTTP/1.1\r\n" |
|
b"Host: example.com\r\n" |
|
b"Transfer-Encoding: chunked\r\n\r\n" |
|
) |
|
assert type(c.next_event()) is Request |
|
assert c.next_event() is NEED_DATA |
|
c.receive_data(b"8\r\n012345") |
|
assert c.next_event().data == b"012345" |
|
c.receive_data(b"") |
|
with pytest.raises(RemoteProtocolError) as excinfo: |
|
c.next_event() |
|
assert "incomplete chunked read" in str(excinfo.value) |
|
|