From e9db6c73b5570c550dfbc9de88204795a6df48b5 Mon Sep 17 00:00:00 2001 From: Omar Roth Date: Sat, 14 Dec 2019 16:06:12 -0500 Subject: [PATCH] Keep reference so GC doesn't collect stream_ctx --- src/lsquic/client.cr | 66 +++++++++++++++++++++++++------------------- 1 file changed, 37 insertions(+), 29 deletions(-) diff --git a/src/lsquic/client.cr b/src/lsquic/client.cr index 8ebcfcf..75a7458 100644 --- a/src/lsquic/client.cr +++ b/src/lsquic/client.cr @@ -11,17 +11,21 @@ module QUIC end class Client - STREAM_READF = ->(stream_if_ctx : Void*, buf : UInt8*, buf_len : LibC::SizeT, fin : LibC::Int) do - begin - stream_ctx = Box(StreamCtx).unbox(stream_if_ctx) - stream_ctx.io.write Slice.new(buf, buf_len) - buf_len - rescue ex - 0_u64 - end + def self.stream_readf(stream_if_ctx : Void*, buf : UInt8*, buf_len : LibC::SizeT, fin : LibC::Int) + stream_ctx = Box(StreamCtx).unbox(stream_if_ctx) + stream_ctx.io.write Slice.new(buf, buf_len) + buf_len end - ON_NEW_STREAM = ->(stream_if_ctx : Void*, s : LibLsquic::StreamT) do + def self.on_new_conn(stream_if_ctx : Void*, c : LibLsquic::ConnT) + stream_if_ctx + end + + def self.on_conn_closed(c : LibLsquic::ConnT) + Box.box(nil) + end + + def self.on_new_stream(stream_if_ctx : Void*, s : LibLsquic::StreamT) stream_ctx = LibLsquic.stream_conn(s) .try { |c| LibLsquic.conn_get_ctx(c) } .try { |c| Box(StreamCtx).unbox(c) } @@ -34,7 +38,7 @@ module QUIC Box.box(stream_ctx) end - ON_WRITE = ->(s : LibLsquic::StreamT, stream_if_ctx : Void*) do + def self.on_write(s : LibLsquic::StreamT, stream_if_ctx : Void*) stream_ctx = Box(StreamCtx).unbox(stream_if_ctx) headers = [] of LibLsquic::HttpHeader @@ -65,8 +69,7 @@ module QUIC raise "Could not send headers" if LibLsquic.stream_send_headers(s, pointerof(http_headers), stream_ctx.request.body ? 0 : 1) != 0 - if stream_ctx.request.body - body = stream_ctx.request.body.not_nil!.gets_to_end + if body = stream_ctx.request.body.try &.gets_to_end LibLsquic.stream_write(s, body, body.bytesize) LibLsquic.stream_flush(s) end @@ -78,9 +81,9 @@ module QUIC Box.box(stream_ctx) end - ON_READ = ->(s : LibLsquic::StreamT, stream_if_ctx : Void*) do + def self.on_read(s : LibLsquic::StreamT, stream_if_ctx : Void*) stream_ctx = Box(StreamCtx).unbox(stream_if_ctx) - bytes_read = LibLsquic.stream_readf(s, STREAM_READF, Box.box(stream_ctx)) + bytes_read = LibLsquic.stream_readf(s, ->stream_readf, Box.box(stream_ctx)) if bytes_read > 0 # Nothing @@ -90,19 +93,20 @@ module QUIC elsif LibLsquic.stream_is_rejected(s) == 1 LibLsquic.stream_close(s) else - "Could not read response" + # raise "Could not read response" end - Box.box(stream_ctx) + stream_if_ctx end - ON_CLOSE = ->(s : LibLsquic::StreamT, stream_if_ctx : Void*) do + def self.on_close(s : LibLsquic::StreamT, stream_if_ctx : Void*) stream_ctx = Box(StreamCtx).unbox(stream_if_ctx) stream_ctx.io.close - Box.box(stream_ctx) + GC.free stream_if_ctx + stream_if_ctx end - EA_PACKETS_OUT = ->(peer_ctx : Void*, specs : LibLsquic::OutSpec*, count : LibC::UInt) do + def self.ea_packets_out(peer_ctx : Void*, specs : LibLsquic::OutSpec*, count : LibC::UInt) packets_out = 0 count.times do |i| @@ -114,6 +118,7 @@ module QUIC socket.send(iov.iov_base.to_slice(iov.iov_len), to: socket.remote_address) packets_out += 1 rescue ex + break end end end @@ -138,6 +143,7 @@ module QUIC @connect_timeout : Float64? @read_timeout : Float64? @socket : UDPSocket? + @stream_ctx : StreamCtx? def initialize(@host : String, port = nil, tls : Bool | OpenSSL::SSL::Context::Client = false) check_host_only(@host) @@ -153,6 +159,7 @@ module QUIC @port = (port || 443).to_i @stream_channel = Channel(StreamCtx?).new(20) + @stream_ctx = nil @engine_open = false end @@ -166,17 +173,17 @@ module QUIC raise String.new(err_buf) if err_code != 0 stream_if = LibLsquic::StreamIf.new - stream_if.on_new_conn = ->(stream_if_ctx : Void*, c : LibLsquic::ConnT) { stream_if_ctx } - stream_if.on_conn_closed = ->(c : LibLsquic::ConnT) { Box.box(nil) } - stream_if.on_new_stream = ON_NEW_STREAM - stream_if.on_write = ON_WRITE - stream_if.on_read = ON_READ - stream_if.on_close = ON_CLOSE + stream_if.on_new_conn = ->QUIC::Client.on_new_conn(Void*, LibLsquic::ConnT) + stream_if.on_conn_closed = ->QUIC::Client.on_conn_closed(LibLsquic::ConnT) + stream_if.on_new_stream = ->QUIC::Client.on_new_stream(Void*, LibLsquic::StreamT) + stream_if.on_write = ->QUIC::Client.on_write(LibLsquic::StreamT, Void*) + stream_if.on_read = ->QUIC::Client.on_read(LibLsquic::StreamT, Void*) + stream_if.on_close = ->QUIC::Client.on_close(LibLsquic::StreamT, Void*) engine_api = LibLsquic::EngineApi.new engine_api.ea_settings = pointerof(engine_settings) engine_api.ea_stream_if = pointerof(stream_if) - engine_api.ea_packets_out = EA_PACKETS_OUT + engine_api.ea_packets_out = ->QUIC::Client.ea_packets_out(Void*, LibLsquic::OutSpec*, LibC::UInt) # logger_if = LibLsquic::LoggerIf.new # logger_if.log_buf = ->(logger_ctx : Void*, msg_buf : LibC::Char*, msg_size : LibC::SizeT) { puts String.new(msg_buf); 0 } @@ -434,8 +441,9 @@ module QUIC spawn run_engine if !@engine_open reader, writer = IO::ChanneledPipe.new - stream_ctx = StreamCtx.new(request, writer) - @stream_channel.send stream_ctx + # See https://github.com/crystal-lang/crystal/blob/0.32.0/src/openssl/ssl/context.cr#L126 + @stream_ctx = StreamCtx.new(request, writer) + @stream_channel.send @stream_ctx reader end @@ -449,7 +457,7 @@ module QUIC private def self.default_one_shot_headers(headers) headers ||= HTTP::Headers.new - headers["Connection"] ||= "close" + headers["connection"] ||= "close" headers end