diff --git a/src/lsquic/channeled_pipe.cr b/src/lsquic/channeled_pipe.cr new file mode 100644 index 0000000..dc46915 --- /dev/null +++ b/src/lsquic/channeled_pipe.cr @@ -0,0 +1,90 @@ +# Based on https://github.com/anykeyh/channeled_pipe/blob/master/src/channeled_pipe/channeled_pipe.cr +class IO::ChanneledPipe < IO + BUFFER_SIZE = 8192 + + include IO::Buffered + + @channel : Channel(Bytes?) + @direction : Symbol + @buffer : Bytes? + + getter? closed = false + + protected def initialize(@channel, @direction) + end + + def unbuffered_read(slice : Bytes) + raise "Cannot read from write side" if @direction == :w + return 0 if @channel.closed? && !@buffer + + buffer = @buffer + + if buffer + bytes_read = {slice.size, buffer.size}.min + slice.copy_from(buffer.to_unsafe, bytes_read) + + if buffer.size == bytes_read + @buffer = nil + else + @buffer = buffer[bytes_read, buffer.size - bytes_read] + end + + return bytes_read + else + buffer = @channel.receive + + if buffer + bytes_read = {slice.size, buffer.size}.min + slice.copy_from(buffer.to_unsafe, bytes_read) + + if buffer.size > bytes_read + @buffer = buffer[bytes_read, buffer.size - bytes_read] + end + + return bytes_read + else + @channel.close + return 0 + end + end + end + + def unbuffered_write(slice : Bytes) + raise "Write not allowed on read side" if @direction == :r + raise "Closed stream" if @closed + @channel.send slice.clone + slice.size + end + + def close_channel + @channel.close + end + + def unbuffered_flush + # Nothing + end + + def unbuffered_rewind + raise IO::Error.new("Can't rewind") + end + + def unbuffered_close + return if @closed + @closed = true + @channel.send nil + end + + def self.new(mem = BUFFER_SIZE) + mem = BUFFER_SIZE if mem <= 0 + + capacity = (mem / BUFFER_SIZE) + + ((mem % BUFFER_SIZE != 0) ? 1 : 0) + + channel = Channel(Bytes?).new(capacity: mem) + + { + ChanneledPipe.new(channel, :r), + ChanneledPipe.new(channel, :w), + } + end +end diff --git a/src/lsquic/client.cr b/src/lsquic/client.cr index 3aa8343..1f3477a 100644 --- a/src/lsquic/client.cr +++ b/src/lsquic/client.cr @@ -1,90 +1,85 @@ require "http" require "socket" -struct QUIC::StreamCtx - property request : HTTP::Request - property channel : Channel(IO::FileDescriptor) - property reader : IO::FileDescriptor? - property writer : IO::FileDescriptor? +module QUIC + class StreamCtx + property request : HTTP::Request + property io : IO::ChanneledPipe - def initialize(@request : HTTP::Request) - @channel = Channel(IO::FileDescriptor).new + def initialize(@request, @io) + end end -end -class QUIC::Client - ON_NEW_STREAM = ->(stream_if_ctx : Void*, s : LibLsquic::StreamT) do - stream_ctx = LibLsquic.stream_conn(s) - .try { |c| LibLsquic.conn_get_ctx(c) } - .try { |c| Box(StreamCtx).unbox(c) } - - if LibLsquic.stream_is_pushed(s) != 0 - return Box.box(stream_ctx) + class Client + STREAM_READF = ->(stream_if_ctx : Void*, buf : UInt8*, buf_len : LibC::SizeT, fin : LibC::Int) do + stream_ctx = Box(StreamCtx).unbox(stream_if_ctx) + bytes_read = stream_ctx.io.unbuffered_write Slice.new(buf, buf_len) + bytes_read.to_u64 end - reader, writer = IO.pipe - stream_ctx.reader = reader - stream_ctx.writer = writer + ON_NEW_STREAM = ->(stream_if_ctx : Void*, s : LibLsquic::StreamT) do + stream_ctx = LibLsquic.stream_conn(s) + .try { |c| LibLsquic.conn_get_ctx(c) } + .try { |c| Box(StreamCtx).unbox(c) } - stream_ctx.channel.send reader - - LibLsquic.stream_wantwrite(s, 1) - Box.box(stream_ctx) - end - - ON_WRITE = ->(s : LibLsquic::StreamT, stream_if_ctx : Void*) do - stream_ctx = Box(StreamCtx).unbox(stream_if_ctx) - - headers = [] of LibLsquic::HttpHeader - (stream_ctx.request.headers.to_a.sort_by { |k, v| {":authority", ":path", ":scheme", ":method"}.index(k) || -1 }).reverse.each do |tuple| - name, values = tuple - name = name.downcase - - values.each do |value| - name_vec = LibLsquic::Iovec.new - name_vec.iov_base = name.to_slice - name_vec.iov_len = name.bytesize - - value_vec = LibLsquic::Iovec.new - value_vec.iov_base = value.to_slice - value_vec.iov_len = value.bytesize - - header = LibLsquic::HttpHeader.new - header.name = name_vec - header.value = value_vec - - headers << header + if LibLsquic.stream_is_pushed(s) != 0 + return Box.box(stream_ctx) end + + LibLsquic.stream_wantwrite(s, 1) + Box.box(stream_ctx) end - http_headers = LibLsquic::HttpHeaders.new - http_headers.count = headers.size - http_headers.headers = headers.to_unsafe - - raise "Could not send headers" if LibLsquic.stream_send_headers(s, pointerof(http_headers), stream_ctx.request.body ? 0 : 1) != 0 - - if stream_ctx.request.body - body = stream_ctx.request.body.not_nil!.gets_to_end - LibLsquic.stream_write(s, body, body.bytesize) - LibLsquic.stream_flush(s) - end - - LibLsquic.stream_shutdown(s, 1) - LibLsquic.stream_wantwrite(s, 0) - LibLsquic.stream_wantread(s, 1) - - Box.box(stream_ctx) - end - - ON_READ = ->(s : LibLsquic::StreamT, stream_if_ctx : Void*) do - begin + ON_WRITE = ->(s : LibLsquic::StreamT, stream_if_ctx : Void*) do stream_ctx = Box(StreamCtx).unbox(stream_if_ctx) - buffer = Bytes.new(0x600) - bytes_read = LibLsquic.stream_read(s, buffer, buffer.size) + headers = [] of LibLsquic::HttpHeader + (stream_ctx.request.headers.to_a.sort_by { |k, v| {":authority", ":path", ":scheme", ":method"}.index(k) || -1 }).reverse.each do |tuple| + name, values = tuple + name = name.downcase + + values.each do |value| + name_vec = LibLsquic::Iovec.new + name_vec.iov_base = name.to_slice + name_vec.iov_len = name.bytesize + + value_vec = LibLsquic::Iovec.new + value_vec.iov_base = value.to_slice + value_vec.iov_len = value.bytesize + + header = LibLsquic::HttpHeader.new + header.name = name_vec + header.value = value_vec + + headers << header + end + end + + http_headers = LibLsquic::HttpHeaders.new + http_headers.count = headers.size + http_headers.headers = headers.to_unsafe + + raise "Could not send headers" if LibLsquic.stream_send_headers(s, pointerof(http_headers), stream_ctx.request.body ? 0 : 1) != 0 + + if stream_ctx.request.body + body = stream_ctx.request.body.not_nil!.gets_to_end + LibLsquic.stream_write(s, body, body.bytesize) + LibLsquic.stream_flush(s) + end + + LibLsquic.stream_shutdown(s, 1) + LibLsquic.stream_wantwrite(s, 0) + LibLsquic.stream_wantread(s, 1) + + Box.box(stream_ctx) + end + + ON_READ = ->(s : LibLsquic::StreamT, stream_if_ctx : Void*) do + stream_ctx = Box(StreamCtx).unbox(stream_if_ctx) + bytes_read = LibLsquic.stream_readf(s, STREAM_READF, Box.box(stream_ctx)) if bytes_read > 0 - stream_ctx.writer.try &.write buffer[0, bytes_read] + # Nothing elsif bytes_read == 0 LibLsquic.stream_shutdown(s, 0) LibLsquic.stream_wantread(s, 0) @@ -93,218 +88,209 @@ class QUIC::Client else "Could not read response" end - Box.box(stream_ctx) - rescue ex - LibLsquic.stream_shutdown(s, 0) - LibLsquic.stream_wantread(s, 0) + Box.box(stream_ctx) end - end - ON_CLOSE = ->(s : LibLsquic::StreamT, stream_if_ctx : Void*) do - begin + ON_CLOSE = ->(s : LibLsquic::StreamT, stream_if_ctx : Void*) do stream_ctx = Box(StreamCtx).unbox(stream_if_ctx) - stream_ctx.writer.try &.close - Box.box(stream_ctx) - rescue ex + stream_ctx.io.close Box.box(stream_ctx) end - end - EA_PACKETS_OUT = ->(peer_ctx : Void*, specs : LibLsquic::OutSpec*, count : LibC::UInt) do - packets_out = 0 + EA_PACKETS_OUT = ->(peer_ctx : Void*, specs : LibLsquic::OutSpec*, count : LibC::UInt) do + packets_out = 0 - count.times do |i| - spec = specs[i] - socket = Box(UDPSocket).unbox(spec.peer_ctx) - spec.iovlen.times do |j| - iov = spec.iov[j] - begin - socket.send(iov.iov_base.to_slice(iov.iov_len), to: socket.remote_address) - packets_out += 1 - rescue ex + count.times do |i| + spec = specs[i] + socket = Box(UDPSocket).unbox(spec.peer_ctx) + spec.iovlen.times do |j| + iov = spec.iov[j] + begin + socket.send(iov.iov_base.to_slice(iov.iov_len), to: socket.remote_address) + packets_out += 1 + rescue ex + end end end + + packets_out end - packets_out - end + ENGINE_FLAGS = LibLsquic::LSENG_HTTP + LibLsquic.global_init(ENGINE_FLAGS & LibLsquic::LSENG_SERVER ? LibLsquic::GLOBAL_SERVER : LibLsquic::GLOBAL_CLIENT) - ENGINE_FLAGS = LibLsquic::LSENG_HTTP - LibLsquic.global_init(ENGINE_FLAGS & LibLsquic::LSENG_SERVER ? LibLsquic::GLOBAL_SERVER : LibLsquic::GLOBAL_CLIENT) + # The set of possible valid body types. + alias BodyType = String | Bytes | IO | Nil - # The set of possible valid body types. - alias BodyType = String | Bytes | IO | Nil + getter host : String + getter port : Int32 + getter! tls : OpenSSL::SSL::Context::Client - getter host : String - getter port : Int32 - getter! tls : OpenSSL::SSL::Context::Client + @stream_channel : Channel(StreamCtx?) + @dns_timeout : Float64? + @connect_timeout : Float64? + @read_timeout : Float64? - @stream_channel : Channel(StreamCtx?) - @dns_timeout : Float64? - @connect_timeout : Float64? - @read_timeout : Float64? + def initialize(@host : String, port = nil, tls : Bool | OpenSSL::SSL::Context::Client = false) + check_host_only(@host) - def initialize(@host : String, port = nil, tls : Bool | OpenSSL::SSL::Context::Client = false) - check_host_only(@host) + @tls = case tls + when true + OpenSSL::SSL::Context::Client.new + when OpenSSL::SSL::Context::Client + tls + when false + nil + end - @tls = case tls - when true - OpenSSL::SSL::Context::Client.new - when OpenSSL::SSL::Context::Client - tls - when false - nil - end + @port = (port || 443).to_i + @stream_channel = Channel(StreamCtx?).new(20) + spawn run_engine + end - @port = (port || 443).to_i - @stream_channel = Channel(StreamCtx?).new(20) - spawn run_engine - end + def run_engine + LibLsquic.engine_init_settings(out engine_settings, ENGINE_FLAGS) + engine_settings.es_ua = "Chrome/78.0.3904.97 Linux x86_64" + engine_settings.es_ecn = 0 - def run_engine - LibLsquic.engine_init_settings(out engine_settings, ENGINE_FLAGS) - engine_settings.es_ua = "Chrome/78.0.3904.97 Linux x86_64" - engine_settings.es_ecn = 0 + err_buf = Bytes.new(0x100) + err_code = LibLsquic.engine_check_settings(pointerof(engine_settings), ENGINE_FLAGS, err_buf, err_buf.size) + raise String.new(err_buf) if err_code != 0 - err_buf = Bytes.new(0x100) - err_code = LibLsquic.engine_check_settings(pointerof(engine_settings), ENGINE_FLAGS, err_buf, err_buf.size) - raise String.new(err_buf) if err_code != 0 + stream_if = LibLsquic::StreamIf.new + stream_if.on_new_conn = ->(stream_if_ctx : Void*, c : LibLsquic::ConnT) { stream_if_ctx } + stream_if.on_conn_closed = ->(c : LibLsquic::ConnT) { Box.box(nil) } + stream_if.on_new_stream = ON_NEW_STREAM + stream_if.on_write = ON_WRITE + stream_if.on_read = ON_READ + stream_if.on_close = ON_CLOSE - stream_if = LibLsquic::StreamIf.new - stream_if.on_new_conn = ->(stream_if_ctx : Void*, c : LibLsquic::ConnT) { stream_if_ctx } - stream_if.on_conn_closed = ->(c : LibLsquic::ConnT) { Box.box(nil) } - stream_if.on_new_stream = ON_NEW_STREAM - stream_if.on_write = ON_WRITE - stream_if.on_read = ON_READ - stream_if.on_close = ON_CLOSE + engine_api = LibLsquic::EngineApi.new + engine_api.ea_settings = pointerof(engine_settings) + engine_api.ea_stream_if = pointerof(stream_if) + engine_api.ea_packets_out = EA_PACKETS_OUT - engine_api = LibLsquic::EngineApi.new - engine_api.ea_settings = pointerof(engine_settings) - engine_api.ea_stream_if = pointerof(stream_if) - engine_api.ea_packets_out = EA_PACKETS_OUT + # logger_if = LibLsquic::LoggerIf.new + # logger_if.log_buf = ->(logger_ctx : Void*, msg_buf : LibC::Char*, msg_size : LibC::SizeT) { puts String.new(msg_buf); 0 } + # LibLsquic.logger_init(pointerof(logger_if), nil, LibLsquic::LoggerTimestampStyle::LltsHhmmssms) + # LibLsquic.set_log_level("debug") - # logger_if = LibLsquic::LoggerIf.new - # logger_if.log_buf = ->(logger_ctx : Void*, msg_buf : LibC::Char*, msg_size : LibC::SizeT) { puts String.new(msg_buf); 0 } - # LibLsquic.logger_init(pointerof(logger_if), nil, LibLsquic::LoggerTimestampStyle::LltsHhmmssms) - # LibLsquic.set_log_level("debug") + engine = LibLsquic.engine_new(ENGINE_FLAGS, pointerof(engine_api)) + hostname = host.starts_with?('[') && host.ends_with?(']') ? host[1..-2] : host - engine = LibLsquic.engine_new(ENGINE_FLAGS, pointerof(engine_api)) - hostname = host.starts_with?('[') && host.ends_with?(']') ? host[1..-2] : host - - socket = UDPSocket.new - socket.bind Socket::IPAddress.new("0.0.0.0", 0) - socket.read_timeout = @read_timeout if @read_timeout - Socket::Addrinfo.udp(@host, @port, timeout: @dns_timeout) do |addrinfo| - socket.connect(addrinfo, timeout: @connect_timeout) do |error| - error + socket = UDPSocket.new + socket.bind Socket::IPAddress.new("0.0.0.0", 0) + socket.read_timeout = @read_timeout if @read_timeout + Socket::Addrinfo.udp(@host, @port, timeout: @dns_timeout) do |addrinfo| + socket.connect(addrinfo, timeout: @connect_timeout) do |error| + error + end end - end - socket.sync = false - engine_closed = false + socket.sync = false + engine_closed = false - conn = LibLsquic.engine_connect(engine, LibLsquic::Version::Lsqver046, socket.local_address, socket.remote_address, Box.box(socket), nil, hostname, 0, nil, 0, nil, 0) - spawn do - while stream_ctx = @stream_channel.receive - LibLsquic.conn_set_ctx(conn, Box.box(stream_ctx)) - LibLsquic.conn_make_stream(conn) + conn = LibLsquic.engine_connect(engine, LibLsquic::Version::Lsqver046, socket.local_address, socket.remote_address, Box.box(socket), nil, hostname, 0, nil, 0, nil, 0) + spawn do + while stream_ctx = @stream_channel.receive + LibLsquic.conn_set_ctx(conn, Box.box(stream_ctx)) + LibLsquic.conn_make_stream(conn) + LibLsquic.engine_process_conns(engine) + end + engine_closed = true + LibLsquic.engine_destroy(engine) + end + + buffer = Bytes.new(0x600) + loop do + bytes_read = socket.read buffer + break if engine_closed + LibLsquic.engine_packet_in(engine, buffer[0, bytes_read], bytes_read, socket.local_address, socket.remote_address, Box.box(socket), 0) if bytes_read != 0 LibLsquic.engine_process_conns(engine) end - LibLsquic.conn_close(conn) - LibLsquic.engine_process_conns(engine) - LibLsquic.engine_destroy(engine) - engine_closed = true + socket.close end - buffer = Bytes.new(0x600) - loop do - bytes_read = socket.read buffer - break if engine_closed - LibLsquic.engine_packet_in(engine, buffer[0, bytes_read], bytes_read, socket.local_address, socket.remote_address, Box.box(socket), 0) if bytes_read != 0 - LibLsquic.engine_process_conns(engine) - end - end - - private def check_host_only(string : String) - # When parsing a URI with just a host - # we end up with a URI with just a path - uri = URI.parse(string) - if uri.scheme || uri.host || uri.port || uri.query || uri.user || uri.password || uri.path.includes?('/') + private def check_host_only(string : String) + # When parsing a URI with just a host + # we end up with a URI with just a path + uri = URI.parse(string) + if uri.scheme || uri.host || uri.port || uri.query || uri.user || uri.password || uri.path.includes?('/') + raise_invalid_host(string) + end + rescue URI::Error raise_invalid_host(string) end - rescue URI::Error - raise_invalid_host(string) - end - private def raise_invalid_host(string : String) - raise ArgumentError.new("The string passed to create an HTTP::Client must be just a host, not #{string.inspect}") - end - - def self.new(uri : URI, tls = nil) - tls = tls_flag(uri, tls) - host = validate_host(uri) - new(host, uri.port, tls) - end - - def self.new(uri : URI, tls = nil) - tls = tls_flag(uri, tls) - host = validate_host(uri) - client = new(host, uri.port, tls) - begin - yield client - ensure - client.close + private def raise_invalid_host(string : String) + raise ArgumentError.new("The string passed to create an HTTP::Client must be just a host, not #{string.inspect}") end - end - def self.new(host : String, port = nil, tls = false) - client = new(host, port, tls) - begin - yield client - ensure - client.close + def self.new(uri : URI, tls = nil) + tls = tls_flag(uri, tls) + host = validate_host(uri) + new(host, uri.port, tls) end - end - # Configures this client to perform basic authentication in every - # request. - def basic_auth(username, password) - header = "Basic #{Base64.strict_encode("#{username}:#{password}")}" - before_request do |request| - request.headers["Authorization"] = header + def self.new(uri : URI, tls = nil) + tls = tls_flag(uri, tls) + host = validate_host(uri) + client = new(host, uri.port, tls) + begin + yield client + ensure + client.close + end end - end - def read_timeout=(read_timeout : Number) - @read_timeout = read_timeout.to_f - end + def self.new(host : String, port = nil, tls = false) + client = new(host, port, tls) + begin + yield client + ensure + client.close + end + end - def read_timeout=(read_timeout : Time::Span) - self.read_timeout = read_timeout.total_seconds - end + # Configures this client to perform basic authentication in every + # request. + def basic_auth(username, password) + header = "Basic #{Base64.strict_encode("#{username}:#{password}")}" + before_request do |request| + request.headers["Authorization"] = header + end + end - def connect_timeout=(connect_timeout : Number) - @connect_timeout = connect_timeout.to_f - end + def read_timeout=(read_timeout : Number) + @read_timeout = read_timeout.to_f + end - def connect_timeout=(connect_timeout : Time::Span) - self.connect_timeout = connect_timeout.total_seconds - end + def read_timeout=(read_timeout : Time::Span) + self.read_timeout = read_timeout.total_seconds + end - def dns_timeout=(dns_timeout : Number) - @dns_timeout = dns_timeout.to_f - end + def connect_timeout=(connect_timeout : Number) + @connect_timeout = connect_timeout.to_f + end - def dns_timeout=(dns_timeout : Time::Span) - self.dns_timeout = dns_timeout.total_seconds - end + def connect_timeout=(connect_timeout : Time::Span) + self.connect_timeout = connect_timeout.total_seconds + end - def before_request(&callback : HTTP::Request ->) - before_request = @before_request ||= [] of (HTTP::Request ->) - before_request << callback - end + def dns_timeout=(dns_timeout : Number) + @dns_timeout = dns_timeout.to_f + end - {% for method in %w(get post put head delete patch options) %} + def dns_timeout=(dns_timeout : Time::Span) + self.dns_timeout = dns_timeout.total_seconds + end + + def before_request(&callback : HTTP::Request ->) + before_request = @before_request ||= [] of (HTTP::Request ->) + before_request << callback + end + + {% for method in %w(get post put head delete patch options) %} def {{method.id}}(path, headers : HTTP::Headers? = nil, body : BodyType = nil) : HTTP::Client::Response exec {{method.upcase}}, path, headers, body end @@ -366,189 +352,183 @@ class QUIC::Client end {% end %} - def exec(request : HTTP::Request) : HTTP::Client::Response - exec_internal(request) - end - - private def exec_internal(request) - response = exec_internal_single(request) - return handle_response(response) if response - - # Server probably closed the connection, so retry one - close - request.body.try &.rewind - response = exec_internal_single(request) - return handle_response(response) if response - - raise "Unexpected end of http response" - end - - private def exec_internal_single(request) - io = send_request(request) - HTTP::Client::Response.from_io?(io, ignore_body: request.ignore_body?) - end - - private def handle_response(response) - # close unless response.keep_alive? - response - end - - def exec(request : HTTP::Request, &block) - exec_internal(request) do |response| - yield response + def exec(request : HTTP::Request) : HTTP::Client::Response + exec_internal(request) end - end - private def exec_internal(request, &block : HTTP::Client::Response -> T) : T forall T - exec_internal_single(request) do |response| - if response - return handle_response(response) { yield response } - end + private def exec_internal(request) + response = exec_internal_single(request) + return handle_response(response) if response + + raise "Unexpected end of http response" end - raise "Unexpected end of http response" - end - private def exec_internal_single(request) - io = send_request(request) - HTTP::Client::Response.from_io?(io, ignore_body: request.ignore_body?) do |response| - yield response + private def exec_internal_single(request) + io = send_request(request) + HTTP::Client::Response.from_io?(io, ignore_body: request.ignore_body?) end - end - private def handle_response(response) - value = yield - response.body_io?.try &.close - # close unless response.keep_alive? - value - end - - private def send_request(request) - set_defaults request - run_before_request_callbacks(request) - - stream_ctx = StreamCtx.new(request) - - @stream_channel.send stream_ctx - reader = stream_ctx.channel.receive - reader - end - - private def set_defaults(request) - request.headers[":method"] ||= request.method - request.headers[":scheme"] ||= "https" - request.headers[":path"] ||= request.resource - request.headers[":authority"] ||= host_header - request.headers["user-agent"] ||= "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/78.0.3904.97 Safari/537.36" - end - - private def self.default_one_shot_headers(headers) - headers ||= HTTP::Headers.new - headers["Connection"] ||= "close" - headers - end - - private def run_before_request_callbacks(request) - @before_request.try &.each &.call(request) - end - - def exec(method : String, path, headers : HTTP::Headers? = nil, body : BodyType = nil) : HTTP::Client::Response - exec new_request method, path, headers, body - end - - def exec(method : String, path, headers : HTTP::Headers? = nil, body : BodyType = nil) - exec(new_request(method, path, headers, body)) do |response| - yield response + private def handle_response(response) + # close unless response.keep_alive? + response end - end - def self.exec(method, url : String | URI, headers : HTTP::Headers? = nil, body : BodyType = nil, tls = nil) : HTTP::Client::Response - headers = default_one_shot_headers(headers) - exec(url, tls) do |client, path| - client.exec method, path, headers, body - end - end - - def self.exec(method, url : String | URI, headers : HTTP::Headers? = nil, body : BodyType = nil, tls = nil) - headers = default_one_shot_headers(headers) - exec(url, tls) do |client, path| - client.exec(method, path, headers, body) do |response| + def exec(request : HTTP::Request, &block) + exec_internal(request) do |response| yield response end end - end - def destroy_engine - @stream_channel.send nil - end - - def close - # TODO - end - - private def new_request(method, path, headers, body : BodyType) - HTTP::Request.new(method, path, headers, body) - end - - private def host_header - if (@tls && @port != 443) || (!@tls && @port != 80) - "#{@host}:#{@port}" - else - @host - end - end - - private def self.exec(string : String, tls = nil) - uri = URI.parse(string) - - unless uri.scheme && uri.host - # Assume http if no scheme and host are specified - uri = URI.parse("http://#{string}") - end - - exec(uri, tls) do |client, path| - yield client, path - end - end - - protected def self.tls_flag(uri, context : OpenSSL::SSL::Context::Client?) - scheme = uri.scheme - case {scheme, context} - when {nil, _} - raise ArgumentError.new("Missing scheme: #{uri}") - when {"http", nil} - false - when {"http", OpenSSL::SSL::Context::Client} - raise ArgumentError.new("TLS context given for HTTP URI") - when {"https", nil} - true - when {"https", OpenSSL::SSL::Context::Client} - context - else - raise ArgumentError.new "Unsupported scheme: #{scheme}" - end - end - - protected def self.validate_host(uri) - host = uri.host - return host if host && !host.empty? - - raise ArgumentError.new %(Request URI must have host (URI is: #{uri})) - end - - private def self.exec(uri : URI, tls = nil) - tls = tls_flag(uri, tls) - host = validate_host(uri) - - port = uri.port - path = uri.full_path - user = uri.user - password = uri.password - - HTTP::Client.new(host, port, tls) do |client| - if user && password - client.basic_auth(user, password) + private def exec_internal(request, &block : HTTP::Client::Response -> T) : T forall T + exec_internal_single(request) do |response| + if response + return handle_response(response) { yield response } + end + end + raise "Unexpected end of http response" + end + + private def exec_internal_single(request) + io = send_request(request) + HTTP::Client::Response.from_io?(io, ignore_body: request.ignore_body?) do |response| + yield response + end + end + + private def handle_response(response) + value = yield + response.body_io?.try &.close + # close unless response.keep_alive? + value + end + + private def send_request(request) + set_defaults request + run_before_request_callbacks(request) + + reader, writer = IO::ChanneledPipe.new + stream_ctx = StreamCtx.new(request, writer) + @stream_channel.send stream_ctx + reader + end + + private def set_defaults(request) + request.headers[":method"] ||= request.method + request.headers[":scheme"] ||= "https" + request.headers[":path"] ||= request.resource + request.headers[":authority"] ||= host_header + request.headers["user-agent"] ||= "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/78.0.3904.97 Safari/537.36" + end + + private def self.default_one_shot_headers(headers) + headers ||= HTTP::Headers.new + headers["Connection"] ||= "close" + headers + end + + private def run_before_request_callbacks(request) + @before_request.try &.each &.call(request) + end + + def exec(method : String, path, headers : HTTP::Headers? = nil, body : BodyType = nil) : HTTP::Client::Response + exec new_request method, path, headers, body + end + + def exec(method : String, path, headers : HTTP::Headers? = nil, body : BodyType = nil) + exec(new_request(method, path, headers, body)) do |response| + yield response + end + end + + def self.exec(method, url : String | URI, headers : HTTP::Headers? = nil, body : BodyType = nil, tls = nil) : HTTP::Client::Response + headers = default_one_shot_headers(headers) + exec(url, tls) do |client, path| + client.exec method, path, headers, body + end + end + + def self.exec(method, url : String | URI, headers : HTTP::Headers? = nil, body : BodyType = nil, tls = nil) + headers = default_one_shot_headers(headers) + exec(url, tls) do |client, path| + client.exec(method, path, headers, body) do |response| + yield response + end + end + end + + def destroy_engine + @stream_channel.send nil + end + + def close + # TODO + end + + private def new_request(method, path, headers, body : BodyType) + HTTP::Request.new(method, path, headers, body) + end + + private def host_header + if (@tls && @port != 443) || (!@tls && @port != 80) + "#{@host}:#{@port}" + else + @host + end + end + + private def self.exec(string : String, tls = nil) + uri = URI.parse(string) + + unless uri.scheme && uri.host + # Assume http if no scheme and host are specified + uri = URI.parse("http://#{string}") + end + + exec(uri, tls) do |client, path| + yield client, path + end + end + + protected def self.tls_flag(uri, context : OpenSSL::SSL::Context::Client?) + scheme = uri.scheme + case {scheme, context} + when {nil, _} + raise ArgumentError.new("Missing scheme: #{uri}") + when {"http", nil} + false + when {"http", OpenSSL::SSL::Context::Client} + raise ArgumentError.new("TLS context given for HTTP URI") + when {"https", nil} + true + when {"https", OpenSSL::SSL::Context::Client} + context + else + raise ArgumentError.new "Unsupported scheme: #{scheme}" + end + end + + protected def self.validate_host(uri) + host = uri.host + return host if host && !host.empty? + + raise ArgumentError.new %(Request URI must have host (URI is: #{uri})) + end + + private def self.exec(uri : URI, tls = nil) + tls = tls_flag(uri, tls) + host = validate_host(uri) + + port = uri.port + path = uri.full_path + user = uri.user + password = uri.password + + HTTP::Client.new(host, port, tls) do |client| + if user && password + client.basic_auth(user, password) + end + yield client, path end - yield client, path end end end diff --git a/src/lsquic/liblsquic.cr b/src/lsquic/liblsquic.cr index fccd151..fbee2d0 100644 --- a/src/lsquic/liblsquic.cr +++ b/src/lsquic/liblsquic.cr @@ -67,10 +67,8 @@ lib LibLsquic id : Uint64T end - alias X__Uint8T = UInt8 - alias Uint8T = X__Uint8T - alias X__Uint64T = LibC::ULong - alias Uint64T = X__Uint64T + alias Uint8T = UInt8 + alias Uint64T = LibC::ULong alias Engine = Void alias Conn = Void alias ConnCtx = Void @@ -163,8 +161,7 @@ lib LibLsquic es_cc_algo : LibC::UInt end - alias X__Uint32T = LibC::UInt - alias Uint32T = X__Uint32T + alias Uint32T = LibC::UInt fun engine_init_settings = lsquic_engine_init_settings(x0 : EngineSettings*, engine_flags : LibC::UInt) fun engine_check_settings = lsquic_engine_check_settings(settings : EngineSettings*, engine_flags : LibC::UInt, err_buf : LibC::Char*, err_buf_sz : LibC::SizeT) : LibC::Int @@ -183,8 +180,7 @@ lib LibLsquic shi_lookup : (Void*, Void*, LibC::UInt, Void**, LibC::UInt* -> LibC::Int) end - alias X__TimeT = LibC::Long - alias TimeT = X__TimeT + alias TimeT = LibC::Long struct PackoutMemIf pmi_allocate : (Void*, Void*, LibC::UShort, LibC::Char -> Void*) @@ -276,8 +272,7 @@ lib LibLsquic fun conn_close = lsquic_conn_close(x0 : ConnT) fun stream_wantread = lsquic_stream_wantread(s : StreamT, is_want : LibC::Int) : LibC::Int fun stream_read = lsquic_stream_read(s : StreamT, buf : Void*, len : LibC::SizeT) : SsizeT - alias X__SsizeT = LibC::Long - alias SsizeT = X__SsizeT + alias SsizeT = LibC::Long fun stream_readv = lsquic_stream_readv(s : StreamT, x1 : Iovec*, iovcnt : LibC::Int) : SsizeT fun stream_readf = lsquic_stream_readf(s : StreamT, readf : (Void*, UInt8*, LibC::SizeT, LibC::Int -> LibC::SizeT), ctx : Void*) : SsizeT fun stream_wantwrite = lsquic_stream_wantwrite(s : StreamT, is_want : LibC::Int) : LibC::Int