From 7f2ecbfcdca1e0e6aac41eea267462a23ce399b1 Mon Sep 17 00:00:00 2001 From: Omar Roth Date: Sat, 23 Nov 2019 17:35:46 -0500 Subject: [PATCH] Handle engine in separate fiber --- src/lsquic.cr | 3 +- src/lsquic/client.cr | 411 +++++++++++++++++++--------------------- src/lsquic/liblsquic.cr | 10 +- 3 files changed, 198 insertions(+), 226 deletions(-) diff --git a/src/lsquic.cr b/src/lsquic.cr index 2091de0..8de68de 100644 --- a/src/lsquic.cr +++ b/src/lsquic.cr @@ -1,6 +1,5 @@ require "./lsquic/*" module QUIC - VERSION = "0.1.0" - QUIC_VERSION = "#{LibLsquic::MAJOR_VERSION}.#{LibLsquic::MINOR_VERSION}.#{LibLsquic::PATCH_VERSION}" + VERSION = "#{LibLsquic::MAJOR_VERSION}.#{LibLsquic::MINOR_VERSION}.#{LibLsquic::PATCH_VERSION}" end diff --git a/src/lsquic/client.cr b/src/lsquic/client.cr index 0857dc7..9ae7494 100644 --- a/src/lsquic/client.cr +++ b/src/lsquic/client.cr @@ -1,32 +1,132 @@ require "http" require "socket" -struct QUIC::PeerCtx - property socket : UDPSocket - - def initialize(@socket) - end - - def local_address - @socket.local_address - end - - def remote_address - @socket.remote_address - end -end - struct QUIC::StreamCtx - property requests : Array(HTTP::Request) - property io : IO + property request : HTTP::Request + property channel : Channel(IO::FileDescriptor) + property reader : IO::FileDescriptor? + property writer : IO::FileDescriptor? - def initialize - @requests = [] of HTTP::Request - @io = IO::Memory.new + def initialize(@request : HTTP::Request) + @channel = Channel(IO::FileDescriptor).new end end class QUIC::Client + ON_NEW_STREAM = ->(stream_if_ctx : Void*, s : LibLsquic::StreamT) do + stream_ctx = LibLsquic.stream_conn(s) + .try { |c| LibLsquic.conn_get_ctx(c) } + .try { |c| Box(StreamCtx).unbox(c) } + + if LibLsquic.stream_is_pushed(s) != 0 + return Box.box(stream_ctx) + end + + reader, writer = IO.pipe + stream_ctx.reader = reader + stream_ctx.writer = writer + + stream_ctx.channel.send reader + + LibLsquic.stream_wantwrite(s, 1) + Box.box(stream_ctx) + end + + ON_WRITE = ->(s : LibLsquic::StreamT, stream_if_ctx : Void*) do + stream_ctx = Box(StreamCtx).unbox(stream_if_ctx) + + headers = [] of LibLsquic::HttpHeader + (stream_ctx.request.headers.to_a.sort_by { |k, v| {":authority", ":path", ":scheme", ":method"}.index(k) || -1 }).reverse.each do |tuple| + name, values = tuple + name = name.downcase + + values.each do |value| + name_vec = LibLsquic::Iovec.new + name_vec.iov_base = name.to_slice + name_vec.iov_len = name.bytesize + + value_vec = LibLsquic::Iovec.new + value_vec.iov_base = value.to_slice + value_vec.iov_len = value.bytesize + + header = LibLsquic::HttpHeader.new + header.name = name_vec + header.value = value_vec + + headers << header + end + end + + http_headers = LibLsquic::HttpHeaders.new + http_headers.count = headers.size + http_headers.headers = headers.to_unsafe + + raise "Could not send headers" if LibLsquic.stream_send_headers(s, pointerof(http_headers), stream_ctx.request.body ? 0 : 1) != 0 + + if stream_ctx.request.body + body = stream_ctx.request.body.not_nil!.gets_to_end + LibLsquic.stream_write(s, body, body.bytesize) + LibLsquic.stream_flush(s) + end + + LibLsquic.stream_shutdown(s, 1) + LibLsquic.stream_wantwrite(s, 0) + LibLsquic.stream_wantread(s, 1) + + Box.box(stream_ctx) + end + + ON_READ = ->(s : LibLsquic::StreamT, stream_if_ctx : Void*) do + stream_ctx = Box(StreamCtx).unbox(stream_if_ctx) + + buffer = Bytes.new(0x600) + bytes_read = LibLsquic.stream_read(s, buffer, buffer.size) + + if bytes_read > 0 + if stream_ctx.writer && !stream_ctx.writer.try &.closed? + stream_ctx.writer.try &.write buffer[0, bytes_read] + else + LibLsquic.stream_shutdown(s, 0) + LibLsquic.stream_wantread(s, 0) + end + elsif bytes_read == 0 + LibLsquic.stream_shutdown(s, 0) + LibLsquic.stream_wantread(s, 0) + elsif LibLsquic.stream_is_rejected(s) == 1 + LibLsquic.stream_close(s) + else + "Could not read response" + end + + Box.box(stream_ctx) + end + + ON_CLOSE = ->(s : LibLsquic::StreamT, stream_if_ctx : Void*) do + stream_ctx = Box(StreamCtx).unbox(stream_if_ctx) + stream_ctx.writer.try &.close + + Box.box(stream_ctx) + end + + EA_PACKETS_OUT = ->(peer_ctx : Void*, specs : LibLsquic::OutSpec*, count : LibC::UInt) do + packets_out = 0 + + count.times do |i| + spec = specs[i] + socket = Box(UDPSocket).unbox(spec.peer_ctx) + spec.iovlen.times do |j| + iov = spec.iov[j] + begin + socket.send(iov.iov_base.to_slice(iov.iov_len), to: socket.remote_address) + packets_out += 1 + rescue ex + end + end + end + + packets_out + end + ENGINE_FLAGS = LibLsquic::LSENG_HTTP LibLsquic.global_init(ENGINE_FLAGS & LibLsquic::LSENG_SERVER ? LibLsquic::GLOBAL_SERVER : LibLsquic::GLOBAL_CLIENT) @@ -37,15 +137,7 @@ class QUIC::Client getter port : Int32 getter! tls : OpenSSL::SSL::Context::Client - @conn : LibLsquic::ConnT | Nil - @engine : LibLsquic::EngineT | Nil - @engine : LibLsquic::EngineT | Nil - @engine_api : LibLsquic::EngineApi - @engine_settings : LibLsquic::EngineSettings - @peer_ctx : PeerCtx | Nil - @stream_ctx : StreamCtx - @stream_if : LibLsquic::StreamIf - + @stream_channel : Channel(StreamCtx?) @dns_timeout : Float64? @connect_timeout : Float64? @read_timeout : Float64? @@ -63,124 +155,70 @@ class QUIC::Client end @port = (port || 443).to_i + @stream_channel = Channel(StreamCtx?).new(20) + spawn run_engine + end - LibLsquic.engine_init_settings(out @engine_settings, ENGINE_FLAGS) - @engine_settings.es_ua = "Chrome/78.0.3904.97 Linux x86_64" - @engine_settings.es_ecn = 0 + def run_engine + LibLsquic.engine_init_settings(out engine_settings, ENGINE_FLAGS) + engine_settings.es_ua = "Chrome/78.0.3904.97 Linux x86_64" + engine_settings.es_ecn = 0 err_buf = Bytes.new(0x100) - err_code = LibLsquic.engine_check_settings(pointerof(@engine_settings), ENGINE_FLAGS, err_buf, err_buf.size) + err_code = LibLsquic.engine_check_settings(pointerof(engine_settings), ENGINE_FLAGS, err_buf, err_buf.size) raise String.new(err_buf) if err_code != 0 - @stream_if = LibLsquic::StreamIf.new - @stream_if.on_new_conn = ->(stream_if_ctx : Void*, c : LibLsquic::ConnT) { stream_if_ctx } - @stream_if.on_conn_closed = ->(c : LibLsquic::ConnT) do - Box.box(nil) + stream_if = LibLsquic::StreamIf.new + stream_if.on_new_conn = ->(stream_if_ctx : Void*, c : LibLsquic::ConnT) { stream_if_ctx } + stream_if.on_conn_closed = ->(c : LibLsquic::ConnT) { Box.box(nil) } + stream_if.on_new_stream = ON_NEW_STREAM + stream_if.on_write = ON_WRITE + stream_if.on_read = ON_READ + stream_if.on_close = ON_CLOSE + + engine_api = LibLsquic::EngineApi.new + engine_api.ea_settings = pointerof(engine_settings) + engine_api.ea_stream_if = pointerof(stream_if) + engine_api.ea_packets_out = EA_PACKETS_OUT + + # logger_if = LibLsquic::LoggerIf.new + # logger_if.log_buf = ->(logger_ctx : Void*, msg_buf : LibC::Char*, msg_size : LibC::SizeT) { puts String.new(msg_buf); 0 } + # LibLsquic.logger_init(pointerof(logger_if), nil, LibLsquic::LoggerTimestampStyle::LltsHhmmssms) + # LibLsquic.set_log_level("debug") + + engine = LibLsquic.engine_new(ENGINE_FLAGS, pointerof(engine_api)) + hostname = host.starts_with?('[') && host.ends_with?(']') ? host[1..-2] : host + + socket = UDPSocket.new + socket.bind Socket::IPAddress.new("0.0.0.0", 0) + socket.read_timeout = @read_timeout if @read_timeout + Socket::Addrinfo.udp(@host, @port, timeout: @dns_timeout) do |addrinfo| + socket.connect(addrinfo, timeout: @connect_timeout) do |error| + error + end + end + socket.sync = false + engine_closed = false + + conn = LibLsquic.engine_connect(engine, LibLsquic::Version::Lsqver046, socket.local_address, socket.remote_address, Box.box(socket), nil, hostname, 0, nil, 0, nil, 0) + spawn do + while stream_ctx = @stream_channel.receive + LibLsquic.conn_set_ctx(conn, Box.box(stream_ctx)) + LibLsquic.conn_make_stream(conn) + LibLsquic.engine_process_conns(engine) + end + LibLsquic.conn_close(conn) + LibLsquic.engine_process_conns(engine) + LibLsquic.engine_destroy(engine) + engine_closed = true end - @stream_if.on_new_stream = ->(stream_if_ctx : Void*, s : LibLsquic::StreamT) do - if LibLsquic.stream_is_pushed(s) != 0 - return stream_if_ctx - end - - LibLsquic.stream_wantwrite(s, 1) - stream_if_ctx - end - - @stream_if.on_write = ->(s : LibLsquic::StreamT, stream_if_ctx : Void*) do - request = Box(StreamCtx).unbox(stream_if_ctx).requests.shift - raise "No request" if !request - - headers = [] of LibLsquic::HttpHeader - (request.headers.to_a.sort_by { |k, v| {":authority", ":path", ":scheme", ":method"}.index(k) || -1 }).reverse.each do |tuple| - name, values = tuple - name = name.downcase - - values.each do |value| - name_vec = LibLsquic::Iovec.new - name_vec.iov_base = name.to_slice - name_vec.iov_len = name.bytesize - - value_vec = LibLsquic::Iovec.new - value_vec.iov_base = value.to_slice - value_vec.iov_len = value.bytesize - - header = LibLsquic::HttpHeader.new - header.name = name_vec - header.value = value_vec - - headers << header - end - end - - http_headers = LibLsquic::HttpHeaders.new - http_headers.count = headers.size - http_headers.headers = headers.to_unsafe - - # For payload, last argument is 0 - raise "Could not send headers" if LibLsquic.stream_send_headers(s, pointerof(http_headers), request.body ? 0 : 1) != 0 - - if request.body - body = request.body.not_nil!.gets_to_end - LibLsquic.stream_write(s, body, body.bytesize) - LibLsquic.stream_flush(s) - end - - LibLsquic.stream_shutdown(s, 1) - LibLsquic.stream_wantwrite(s, 0) - LibLsquic.stream_wantread(s, 1) - - stream_if_ctx - end - - @stream_if.on_read = ->(s : LibLsquic::StreamT, stream_if_ctx : Void*) do - stream_ctx = Box(StreamCtx).unbox(stream_if_ctx) - - buffer = Bytes.new(0x200) - bytes_read = LibLsquic.stream_read(s, buffer, buffer.size) - if bytes_read > 0 - stream_ctx.io.write buffer[0, bytes_read] - elsif bytes_read == 0 - LibLsquic.stream_shutdown(s, 0) - elsif LibLsquic.stream_is_rejected(s) - LibLsquic.stream_close(s) - else - raise "Could not read stream" - end - - stream_if_ctx - end - - # TODO: Allow engine to break with existing connections - @stream_if.on_close = ->(s : LibLsquic::StreamT, stream_if_ctx : Void*) do - LibLsquic.conn_close(LibLsquic.stream_conn(s)) - stream_if_ctx - end - - @engine_api = LibLsquic::EngineApi.new - @engine_api.ea_settings = pointerof(@engine_settings) - @engine_api.ea_stream_if = pointerof(@stream_if) - - @stream_ctx = StreamCtx.new - @engine_api.ea_stream_if_ctx = Box.box(@stream_ctx) # TODO - - @engine_api.ea_packets_out = ->(peer_ctx : Void*, specs : LibLsquic::OutSpec*, count : LibC::UInt) do - packets_out = 0 - - count.times do |i| - spec = specs[i] - peer_ctx = Box(PeerCtx).unbox(spec.peer_ctx) - spec.iovlen.times do |j| - iov = spec.iov[j] - begin - peer_ctx.socket.send(iov.iov_base.to_slice(iov.iov_len), to: peer_ctx.remote_address) - packets_out += 1 - rescue ex - end - end - end - - packets_out + buffer = Bytes.new(0x600) + loop do + bytes_read = socket.read buffer + break if engine_closed + LibLsquic.engine_packet_in(engine, buffer[0, bytes_read], bytes_read, socket.local_address, socket.remote_address, Box.box(socket), 0) if bytes_read != 0 + LibLsquic.engine_process_conns(engine) end end @@ -343,14 +381,12 @@ class QUIC::Client end private def exec_internal_single(request) - send_request(request) - @stream_ctx.io.rewind - - HTTP::Client::Response.from_io?(@stream_ctx.io, ignore_body: request.ignore_body?) + io = send_request(request) + HTTP::Client::Response.from_io?(io, ignore_body: request.ignore_body?) end private def handle_response(response) - close # unless response.keep_alive? + # close unless response.keep_alive? response end @@ -365,24 +401,13 @@ class QUIC::Client if response return handle_response(response) { yield response } end - - # Server probably closed the connection, so retry once - close - request.body.try &.rewind - exec_internal_single(request) do |response| - if response - return handle_response(response) do - yield response - end - end - end end raise "Unexpected end of http response" end private def exec_internal_single(request) - send_request(request) - HTTP::Client::Response.from_io?(@stream_ctx.io, ignore_body: request.ignore_body?) do |response| + io = send_request(request) + HTTP::Client::Response.from_io?(io, ignore_body: request.ignore_body?) do |response| yield response end end @@ -390,7 +415,7 @@ class QUIC::Client private def handle_response(response) value = yield response.body_io?.try &.close - close # unless response.keep_alive? + # close unless response.keep_alive? value end @@ -398,10 +423,11 @@ class QUIC::Client set_defaults request run_before_request_callbacks(request) - @stream_ctx.requests << request - LibLsquic.conn_make_stream(conn) + stream_ctx = StreamCtx.new(request) - run_engine + @stream_channel.send stream_ctx + reader = stream_ctx.channel.receive + reader end private def set_defaults(request) @@ -448,69 +474,18 @@ class QUIC::Client end end + def destroy_engine + @stream_channel.send nil + end + def close - # @conn.try { |c| LibLsquic.conn_close(c) } - @conn = nil + # TODO end private def new_request(method, path, headers, body : BodyType) HTTP::Request.new(method, path, headers, body) end - private def engine - engine = @engine - return engine if engine - - engine = LibLsquic.engine_new(ENGINE_FLAGS, pointerof(@engine_api)) - @engine = engine - end - - def run_engine - buffer = Bytes.new(0x600) - - loop do - LibLsquic.engine_process_conns(engine) - - if LibLsquic.engine_earliest_adv_tick(engine, out diff) == 0 - break - # else - # sleep (diff / 1000000).seconds - # sleep (diff % 1000000).microseconds - end - - bytes_read = peer_ctx.socket.read(buffer) - LibLsquic.engine_packet_in(engine, buffer[0, bytes_read], bytes_read, peer_ctx.local_address, peer_ctx.remote_address, Box.box(peer_ctx), 0) - end - end - - private def peer_ctx - peer_ctx = @peer_ctx - return peer_ctx if peer_ctx - - hostname = @host.starts_with?('[') && @host.ends_with?(']') ? @host[1..-2] : @host - socket = UDPSocket.new - socket.bind Socket::IPAddress.new("0.0.0.0", 0) - socket.read_timeout = @read_timeout if @read_timeout - Socket::Addrinfo.udp(host, port, timeout: @dns_timeout) do |addrinfo| - socket.connect(addrinfo, timeout: @connect_timeout) do |error| - error - end - end - socket.sync = false - - peer_ctx = PeerCtx.new(socket) - @peer_ctx = peer_ctx - end - - def conn - conn = @conn - return conn if conn - - hostname = @host.starts_with?('[') && @host.ends_with?(']') ? @host[1..-2] : @host - conn = LibLsquic.engine_connect(engine, LibLsquic::Version::Lsqver046, peer_ctx.local_address, peer_ctx.remote_address, Box.box(peer_ctx), nil, hostname, 0, nil, 0, nil, 0) - @conn = conn - end - private def host_header if (@tls && @port != 443) || (!@tls && @port != 80) "#{@host}:#{@port}" diff --git a/src/lsquic/liblsquic.cr b/src/lsquic/liblsquic.cr index 817fdf1..fccd151 100644 --- a/src/lsquic/liblsquic.cr +++ b/src/lsquic/liblsquic.cr @@ -108,9 +108,7 @@ lib LibLsquic end type ConnT = Void* - type ConnCtxT = Void* type StreamT = Void* - type StreamCtxT = Void* enum HskStatus LsqHskFail = 0 LsqHskOk = 1 @@ -271,7 +269,7 @@ lib LibLsquic fun engine_send_unsent_packets = lsquic_engine_send_unsent_packets(engine : EngineT) fun engine_destroy = lsquic_engine_destroy(x0 : EngineT) fun conn_n_avail_streams = lsquic_conn_n_avail_streams(x0 : ConnT) : LibC::UInt - fun conn_make_stream = lsquic_conn_make_stream(x0 : ConnT) : ConnCtxT + fun conn_make_stream = lsquic_conn_make_stream(x0 : ConnT) : Void* fun conn_n_pending_streams = lsquic_conn_n_pending_streams(x0 : ConnT) : LibC::UInt fun conn_cancel_pending_streams = lsquic_conn_cancel_pending_streams(x0 : ConnT, n : LibC::UInt) : LibC::UInt fun conn_going_away = lsquic_conn_going_away(x0 : ConnT) @@ -304,7 +302,7 @@ lib LibLsquic fun conn_get_server_cert_chain = lsquic_conn_get_server_cert_chain(x0 : ConnT) : StackStX509* fun stream_id = lsquic_stream_id(s : StreamT) : StreamIdT alias StreamIdT = Uint64T - fun stream_get_ctx = lsquic_stream_get_ctx(s : StreamT) : StreamCtxT + fun stream_get_ctx = lsquic_stream_get_ctx(s : StreamT) : Void* fun stream_is_pushed = lsquic_stream_is_pushed(s : StreamT) : LibC::Int fun stream_is_rejected = lsquic_stream_is_rejected(s : StreamT) : LibC::Int fun stream_refuse_push = lsquic_stream_refuse_push(s : StreamT) : LibC::Int @@ -350,8 +348,8 @@ lib LibLsquic fun engine_cooldown = lsquic_engine_cooldown(x0 : EngineT) fun hsk_getssl = lsquic_hsk_getssl(conn : ConnT) : SslSt* alias SslSt = Void - fun conn_get_ctx = lsquic_conn_get_ctx(x0 : ConnT) : ConnCtxT - fun conn_set_ctx = lsquic_conn_set_ctx(x0 : ConnT, x1 : ConnCtxT) + fun conn_get_ctx = lsquic_conn_get_ctx(x0 : ConnT) : Void* + fun conn_set_ctx = lsquic_conn_set_ctx(x0 : ConnT, x1 : Void*) fun conn_get_peer_ctx = lsquic_conn_get_peer_ctx(x0 : ConnT, local_sa : LibC::Sockaddr*) : Void* fun conn_abort = lsquic_conn_abort(x0 : ConnT) fun get_alt_svc_versions = lsquic_get_alt_svc_versions(versions : LibC::UInt) : LibC::Char*