mirror of
https://gitea.invidious.io/iv-org/Invidious-quic-proxy.git
synced 2024-08-15 00:43:22 +00:00
Fix requests getting dropped during conn restarts
The code for reestablishing conns begins after we retrieve a request from the queue. However, since the request was removed from the queue we no longer have access to it, resulting in a dropped request. This commit adds another queue to aggregate any requests during client restarts in order to preserve and handle them after the connection has been reestablished. This queue would have higher priority than the normal requests queue meaning that normal requests would only get processed if and only if there aren't any more "dropped" requests to handle.
This commit is contained in:
parent
26c0765bb0
commit
12206c14c6
1 changed files with 16 additions and 4 deletions
|
@ -53,6 +53,8 @@ class HttpClient(QuicConnectionProtocol):
|
||||||
self._request_events = {}
|
self._request_events = {}
|
||||||
self._request_waiter = {}
|
self._request_waiter = {}
|
||||||
|
|
||||||
|
self.terminated = False
|
||||||
|
|
||||||
async def get(self, url, headers=None):
|
async def get(self, url, headers=None):
|
||||||
"""
|
"""
|
||||||
Perform a GET request.
|
Perform a GET request.
|
||||||
|
@ -84,6 +86,9 @@ class HttpClient(QuicConnectionProtocol):
|
||||||
request_waiter.set_result(self._request_events.pop(stream_id))
|
request_waiter.set_result(self._request_events.pop(stream_id))
|
||||||
|
|
||||||
def quic_event_received(self, event):
|
def quic_event_received(self, event):
|
||||||
|
if isinstance(event, ConnectionTerminated):
|
||||||
|
self.terminated = True
|
||||||
|
|
||||||
# pass event to the HTTP layer
|
# pass event to the HTTP layer
|
||||||
for http_event in self._http.handle_event(event):
|
for http_event in self._http.handle_event(event):
|
||||||
self.http_event_received(http_event)
|
self.http_event_received(http_event)
|
||||||
|
@ -181,12 +186,13 @@ class RequestProcessor:
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
# {InvidiousRequest, storage_dict}
|
# {InvidiousRequest, storage_dict}
|
||||||
self.requests_to_do = asyncio.Queue(0)
|
self.requests_to_do = asyncio.Queue(0)
|
||||||
|
self.paused_requests = asyncio.Queue(0)
|
||||||
self.processors = []
|
self.processors = []
|
||||||
|
|
||||||
async def request_worker(self):
|
async def request_worker(self):
|
||||||
configuration = QuicConfiguration(is_client=True, alpn_protocols=H3_ALPN)
|
configuration = QuicConfiguration(is_client=True, alpn_protocols=H3_ALPN)
|
||||||
|
|
||||||
while await self.recreate_connection_check():
|
while self.recreate_connection_check():
|
||||||
async with connect("youtube.com", 443, configuration=configuration, create_protocol=HttpClient) as client:
|
async with connect("youtube.com", 443, configuration=configuration, create_protocol=HttpClient) as client:
|
||||||
while True:
|
while True:
|
||||||
status = await self._handle_request(client)
|
status = await self._handle_request(client)
|
||||||
|
@ -195,9 +201,15 @@ class RequestProcessor:
|
||||||
break
|
break
|
||||||
|
|
||||||
async def _handle_request(self, client):
|
async def _handle_request(self, client):
|
||||||
request, storage = await self.requests_to_do.get()
|
# Paused requests has a higher priority than the normal staged requests.
|
||||||
|
try:
|
||||||
|
request, storage = self.paused_requests.get_nowait()
|
||||||
|
except asyncio.QueueEmpty:
|
||||||
|
request, storage = await self.requests_to_do.get()
|
||||||
|
|
||||||
if client._quic._state is ConnectionTerminated:
|
if client.terminated:
|
||||||
|
# Client has to be recreated so we'll put the current request on pause.
|
||||||
|
await self.paused_requests.put([request, storage])
|
||||||
return False
|
return False
|
||||||
|
|
||||||
await perform_http_request(client=client, url=request.url, method=request.method,
|
await perform_http_request(client=client, url=request.url, method=request.method,
|
||||||
|
@ -206,7 +218,7 @@ class RequestProcessor:
|
||||||
|
|
||||||
request.completed.set()
|
request.completed.set()
|
||||||
|
|
||||||
async def recreate_connection_check(self):
|
def recreate_connection_check(self):
|
||||||
# TODO in the future this code should calculate whether or not to recreate a connection based on the amount
|
# TODO in the future this code should calculate whether or not to recreate a connection based on the amount
|
||||||
# of connections currently available and the amount of traffic we're currently receiving. For now we'll
|
# of connections currently available and the amount of traffic we're currently receiving. For now we'll
|
||||||
# just have it recreate a connection anytime it's broken.
|
# just have it recreate a connection anytime it's broken.
|
||||||
|
|
Loading…
Reference in a new issue