Merge pull request #7 from tenpura-shrimp/processconns5

Update lsquic binding lifecycle
This commit is contained in:
Perflyst 2021-02-18 15:42:38 +01:00 committed by GitHub
commit 3aab3dd7f1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 46 additions and 6 deletions

View File

@ -145,6 +145,7 @@ module QUIC
@read_timeout : Float64?
@socket : UDPSocket?
@stream_ctx : StreamCtx?
@process_fiber : Fiber
def initialize(@host : String, port = nil, tls : Bool | OpenSSL::SSL::Context::Client = false)
check_host_only(@host)
@ -162,6 +163,7 @@ module QUIC
@stream_channel = Channel(StreamCtx?).new(20)
@stream_ctx = nil
@engine_open = false
@process_fiber = Fiber.new { puts "process_fiber started before run_engine" }
end
def run_engine
@ -209,7 +211,7 @@ module QUIC
while stream_ctx = @stream_channel.receive
LibLsquic.conn_set_ctx(conn, Box.box(stream_ctx))
LibLsquic.conn_make_stream(conn)
LibLsquic.engine_process_conns(engine)
client_process_conns(engine)
end
@engine_open = false
LibLsquic.engine_destroy(engine)
@ -217,13 +219,47 @@ module QUIC
@socket = nil
end
begin
buffer = Bytes.new(0x600)
@process_fiber = spawn do
loop do
bytes_read = socket.read buffer
break if !@engine_open
LibLsquic.engine_packet_in(engine, buffer[0, bytes_read], bytes_read, socket.local_address, socket.remote_address, Box.box(socket), 0) if bytes_read != 0
sleep
LibLsquic.engine_process_conns(engine)
diff = 0
# check advisory time
if LibLsquic.engine_earliest_adv_tick(engine, pointerof(diff)) != 0
Crystal::Scheduler.current_fiber.resume_event.add(diff.microseconds)
end
end
end
begin
buffers = [] of Bytes
bytes_read = [] of Int32
loop do
# wait until the socket has something.
socket.wait_readable
# read available messages from the socket into the buffers.
buffers_read = 0
loop do
if (buffers_read >= buffers.size)
buffers.push(Bytes.new(0x600))
bytes_read.push(0)
end
bytes_read[buffers_read] = LibC.recv(socket.fd, buffers[buffers_read], buffers[buffers_read].size, 0).to_i32
if bytes_read[buffers_read] == -1
if Errno.value == Errno::EAGAIN || Errno.value == Errno::EWOULDBLOCK
# no more messages are currently available to read from the socket.
break
else
raise IO::Error.from_errno("failed to read from socket")
end
end
buffers_read += 1
end
break if !@engine_open
buffers[0, buffers_read].zip(bytes_read) do |buffer, bytes|
LibLsquic.engine_packet_in(engine, buffer[0, bytes], bytes, socket.local_address, socket.remote_address, Box.box(socket), 0) if bytes != 0
end
client_process_conns(engine)
end
@socket.try &.close
@socket = nil
@ -232,6 +268,10 @@ module QUIC
end
end
def client_process_conns(engine)
Crystal::Scheduler.yield @process_fiber
end
def socket : UDPSocket
socket = @socket
return socket.not_nil! if @socket