Keep reference so GC doesn't collect stream_ctx

This commit is contained in:
Omar Roth 2019-12-14 16:06:12 -05:00
parent 0d1ee4e712
commit e9db6c73b5
1 changed files with 37 additions and 29 deletions

View File

@ -11,17 +11,21 @@ module QUIC
end
class Client
STREAM_READF = ->(stream_if_ctx : Void*, buf : UInt8*, buf_len : LibC::SizeT, fin : LibC::Int) do
begin
stream_ctx = Box(StreamCtx).unbox(stream_if_ctx)
stream_ctx.io.write Slice.new(buf, buf_len)
buf_len
rescue ex
0_u64
end
def self.stream_readf(stream_if_ctx : Void*, buf : UInt8*, buf_len : LibC::SizeT, fin : LibC::Int)
stream_ctx = Box(StreamCtx).unbox(stream_if_ctx)
stream_ctx.io.write Slice.new(buf, buf_len)
buf_len
end
ON_NEW_STREAM = ->(stream_if_ctx : Void*, s : LibLsquic::StreamT) do
def self.on_new_conn(stream_if_ctx : Void*, c : LibLsquic::ConnT)
stream_if_ctx
end
def self.on_conn_closed(c : LibLsquic::ConnT)
Box.box(nil)
end
def self.on_new_stream(stream_if_ctx : Void*, s : LibLsquic::StreamT)
stream_ctx = LibLsquic.stream_conn(s)
.try { |c| LibLsquic.conn_get_ctx(c) }
.try { |c| Box(StreamCtx).unbox(c) }
@ -34,7 +38,7 @@ module QUIC
Box.box(stream_ctx)
end
ON_WRITE = ->(s : LibLsquic::StreamT, stream_if_ctx : Void*) do
def self.on_write(s : LibLsquic::StreamT, stream_if_ctx : Void*)
stream_ctx = Box(StreamCtx).unbox(stream_if_ctx)
headers = [] of LibLsquic::HttpHeader
@ -65,8 +69,7 @@ module QUIC
raise "Could not send headers" if LibLsquic.stream_send_headers(s, pointerof(http_headers), stream_ctx.request.body ? 0 : 1) != 0
if stream_ctx.request.body
body = stream_ctx.request.body.not_nil!.gets_to_end
if body = stream_ctx.request.body.try &.gets_to_end
LibLsquic.stream_write(s, body, body.bytesize)
LibLsquic.stream_flush(s)
end
@ -78,9 +81,9 @@ module QUIC
Box.box(stream_ctx)
end
ON_READ = ->(s : LibLsquic::StreamT, stream_if_ctx : Void*) do
def self.on_read(s : LibLsquic::StreamT, stream_if_ctx : Void*)
stream_ctx = Box(StreamCtx).unbox(stream_if_ctx)
bytes_read = LibLsquic.stream_readf(s, STREAM_READF, Box.box(stream_ctx))
bytes_read = LibLsquic.stream_readf(s, ->stream_readf, Box.box(stream_ctx))
if bytes_read > 0
# Nothing
@ -90,19 +93,20 @@ module QUIC
elsif LibLsquic.stream_is_rejected(s) == 1
LibLsquic.stream_close(s)
else
"Could not read response"
# raise "Could not read response"
end
Box.box(stream_ctx)
stream_if_ctx
end
ON_CLOSE = ->(s : LibLsquic::StreamT, stream_if_ctx : Void*) do
def self.on_close(s : LibLsquic::StreamT, stream_if_ctx : Void*)
stream_ctx = Box(StreamCtx).unbox(stream_if_ctx)
stream_ctx.io.close
Box.box(stream_ctx)
GC.free stream_if_ctx
stream_if_ctx
end
EA_PACKETS_OUT = ->(peer_ctx : Void*, specs : LibLsquic::OutSpec*, count : LibC::UInt) do
def self.ea_packets_out(peer_ctx : Void*, specs : LibLsquic::OutSpec*, count : LibC::UInt)
packets_out = 0
count.times do |i|
@ -114,6 +118,7 @@ module QUIC
socket.send(iov.iov_base.to_slice(iov.iov_len), to: socket.remote_address)
packets_out += 1
rescue ex
break
end
end
end
@ -138,6 +143,7 @@ module QUIC
@connect_timeout : Float64?
@read_timeout : Float64?
@socket : UDPSocket?
@stream_ctx : StreamCtx?
def initialize(@host : String, port = nil, tls : Bool | OpenSSL::SSL::Context::Client = false)
check_host_only(@host)
@ -153,6 +159,7 @@ module QUIC
@port = (port || 443).to_i
@stream_channel = Channel(StreamCtx?).new(20)
@stream_ctx = nil
@engine_open = false
end
@ -166,17 +173,17 @@ module QUIC
raise String.new(err_buf) if err_code != 0
stream_if = LibLsquic::StreamIf.new
stream_if.on_new_conn = ->(stream_if_ctx : Void*, c : LibLsquic::ConnT) { stream_if_ctx }
stream_if.on_conn_closed = ->(c : LibLsquic::ConnT) { Box.box(nil) }
stream_if.on_new_stream = ON_NEW_STREAM
stream_if.on_write = ON_WRITE
stream_if.on_read = ON_READ
stream_if.on_close = ON_CLOSE
stream_if.on_new_conn = ->QUIC::Client.on_new_conn(Void*, LibLsquic::ConnT)
stream_if.on_conn_closed = ->QUIC::Client.on_conn_closed(LibLsquic::ConnT)
stream_if.on_new_stream = ->QUIC::Client.on_new_stream(Void*, LibLsquic::StreamT)
stream_if.on_write = ->QUIC::Client.on_write(LibLsquic::StreamT, Void*)
stream_if.on_read = ->QUIC::Client.on_read(LibLsquic::StreamT, Void*)
stream_if.on_close = ->QUIC::Client.on_close(LibLsquic::StreamT, Void*)
engine_api = LibLsquic::EngineApi.new
engine_api.ea_settings = pointerof(engine_settings)
engine_api.ea_stream_if = pointerof(stream_if)
engine_api.ea_packets_out = EA_PACKETS_OUT
engine_api.ea_packets_out = ->QUIC::Client.ea_packets_out(Void*, LibLsquic::OutSpec*, LibC::UInt)
# logger_if = LibLsquic::LoggerIf.new
# logger_if.log_buf = ->(logger_ctx : Void*, msg_buf : LibC::Char*, msg_size : LibC::SizeT) { puts String.new(msg_buf); 0 }
@ -434,8 +441,9 @@ module QUIC
spawn run_engine if !@engine_open
reader, writer = IO::ChanneledPipe.new
stream_ctx = StreamCtx.new(request, writer)
@stream_channel.send stream_ctx
# See https://github.com/crystal-lang/crystal/blob/0.32.0/src/openssl/ssl/context.cr#L126
@stream_ctx = StreamCtx.new(request, writer)
@stream_channel.send @stream_ctx
reader
end
@ -449,7 +457,7 @@ module QUIC
private def self.default_one_shot_headers(headers)
headers ||= HTTP::Headers.new
headers["Connection"] ||= "close"
headers["connection"] ||= "close"
headers
end