From c965e0a380d7272908d5466440b42f8dda3988b1 Mon Sep 17 00:00:00 2001 From: Andrew Zhao Date: Fri, 5 Feb 2021 20:59:29 -0500 Subject: [PATCH 1/2] Implement engine_process_connections timer The previous code did not have an implementation to set timers for processing connections. This leads to an occasional deadlock where process_connections must be called to continue. This change implements this timer modelled after the lsquic reference implementation. Reference: https://github.com/litespeedtech/lsquic/blob/master/bin/prog.c#L535 --- src/lsquic/client.cr | 22 ++++++++++++++++++++-- 1 file changed, 20 insertions(+), 2 deletions(-) diff --git a/src/lsquic/client.cr b/src/lsquic/client.cr index 63a7a24..70f01e8 100644 --- a/src/lsquic/client.cr +++ b/src/lsquic/client.cr @@ -145,6 +145,7 @@ module QUIC @read_timeout : Float64? @socket : UDPSocket? @stream_ctx : StreamCtx? + @process_fiber : Fiber def initialize(@host : String, port = nil, tls : Bool | OpenSSL::SSL::Context::Client = false) check_host_only(@host) @@ -162,6 +163,7 @@ module QUIC @stream_channel = Channel(StreamCtx?).new(20) @stream_ctx = nil @engine_open = false + @process_fiber = Fiber.new { puts "process_fiber started before run_engine" } end def run_engine @@ -209,7 +211,7 @@ module QUIC while stream_ctx = @stream_channel.receive LibLsquic.conn_set_ctx(conn, Box.box(stream_ctx)) LibLsquic.conn_make_stream(conn) - LibLsquic.engine_process_conns(engine) + client_process_conns(engine) end @engine_open = false LibLsquic.engine_destroy(engine) @@ -217,13 +219,25 @@ module QUIC @socket = nil end + @process_fiber = spawn do + loop do + sleep + LibLsquic.engine_process_conns(engine) + diff = 0 + # check advisory time + if LibLsquic.engine_earliest_adv_tick(engine, pointerof(diff)) != 0 + Thread.current.scheduler.@current.resume_event.add(diff.microseconds) + end + end + end + begin buffer = Bytes.new(0x600) loop do bytes_read = socket.read buffer break if !@engine_open LibLsquic.engine_packet_in(engine, buffer[0, bytes_read], bytes_read, socket.local_address, socket.remote_address, Box.box(socket), 0) if bytes_read != 0 - LibLsquic.engine_process_conns(engine) + client_process_conns(engine) end @socket.try &.close @socket = nil @@ -232,6 +246,10 @@ module QUIC end end + def client_process_conns(engine) + Crystal::Scheduler.yield @process_fiber + end + def socket : UDPSocket socket = @socket return socket.not_nil! if @socket From a3d2458c282b0b2c1a8907dca81851618efb2b1b Mon Sep 17 00:00:00 2001 From: Andrew Zhao Date: Sun, 7 Feb 2021 14:12:30 -0500 Subject: [PATCH 2/2] Update recieve function to read full data from socket at once The existing code reads each message from the socket individually and calls engine_process_conns after each message. This seems inefficient as it leads to excess calls engine_process_conns. This changes the code that reads from the socket to read all available messages before calling engine_process_conns based off of the logic in the reference implementation. reference: https://github.com/litespeedtech/lsquic/blob/master/bin/test_common.c#L737 --- src/lsquic/client.cr | 30 ++++++++++++++++++++++++++---- 1 file changed, 26 insertions(+), 4 deletions(-) diff --git a/src/lsquic/client.cr b/src/lsquic/client.cr index 70f01e8..ecefd84 100644 --- a/src/lsquic/client.cr +++ b/src/lsquic/client.cr @@ -226,17 +226,39 @@ module QUIC diff = 0 # check advisory time if LibLsquic.engine_earliest_adv_tick(engine, pointerof(diff)) != 0 - Thread.current.scheduler.@current.resume_event.add(diff.microseconds) + Crystal::Scheduler.current_fiber.resume_event.add(diff.microseconds) end end end begin - buffer = Bytes.new(0x600) + buffers = [] of Bytes + bytes_read = [] of Int32 loop do - bytes_read = socket.read buffer + # wait until the socket has something. + socket.wait_readable + # read available messages from the socket into the buffers. + buffers_read = 0 + loop do + if (buffers_read >= buffers.size) + buffers.push(Bytes.new(0x600)) + bytes_read.push(0) + end + bytes_read[buffers_read] = LibC.recv(socket.fd, buffers[buffers_read], buffers[buffers_read].size, 0).to_i32 + if bytes_read[buffers_read] == -1 + if Errno.value == Errno::EAGAIN || Errno.value == Errno::EWOULDBLOCK + # no more messages are currently available to read from the socket. + break + else + raise IO::Error.from_errno("failed to read from socket") + end + end + buffers_read += 1 + end break if !@engine_open - LibLsquic.engine_packet_in(engine, buffer[0, bytes_read], bytes_read, socket.local_address, socket.remote_address, Box.box(socket), 0) if bytes_read != 0 + buffers[0, buffers_read].zip(bytes_read) do |buffer, bytes| + LibLsquic.engine_packet_in(engine, buffer[0, bytes], bytes, socket.local_address, socket.remote_address, Box.box(socket), 0) if bytes != 0 + end client_process_conns(engine) end @socket.try &.close