mirror of
https://gitea.invidious.io/iv-org/lsquic.cr
synced 2024-08-15 00:43:31 +00:00
Replace IO::FileDescriptor with IO::ChanneledPipe
This commit is contained in:
parent
0c0d8748a7
commit
016f733109
3 changed files with 481 additions and 416 deletions
90
src/lsquic/channeled_pipe.cr
Normal file
90
src/lsquic/channeled_pipe.cr
Normal file
|
@ -0,0 +1,90 @@
|
||||||
|
# Based on https://github.com/anykeyh/channeled_pipe/blob/master/src/channeled_pipe/channeled_pipe.cr
|
||||||
|
class IO::ChanneledPipe < IO
|
||||||
|
BUFFER_SIZE = 8192
|
||||||
|
|
||||||
|
include IO::Buffered
|
||||||
|
|
||||||
|
@channel : Channel(Bytes?)
|
||||||
|
@direction : Symbol
|
||||||
|
@buffer : Bytes?
|
||||||
|
|
||||||
|
getter? closed = false
|
||||||
|
|
||||||
|
protected def initialize(@channel, @direction)
|
||||||
|
end
|
||||||
|
|
||||||
|
def unbuffered_read(slice : Bytes)
|
||||||
|
raise "Cannot read from write side" if @direction == :w
|
||||||
|
return 0 if @channel.closed? && !@buffer
|
||||||
|
|
||||||
|
buffer = @buffer
|
||||||
|
|
||||||
|
if buffer
|
||||||
|
bytes_read = {slice.size, buffer.size}.min
|
||||||
|
slice.copy_from(buffer.to_unsafe, bytes_read)
|
||||||
|
|
||||||
|
if buffer.size == bytes_read
|
||||||
|
@buffer = nil
|
||||||
|
else
|
||||||
|
@buffer = buffer[bytes_read, buffer.size - bytes_read]
|
||||||
|
end
|
||||||
|
|
||||||
|
return bytes_read
|
||||||
|
else
|
||||||
|
buffer = @channel.receive
|
||||||
|
|
||||||
|
if buffer
|
||||||
|
bytes_read = {slice.size, buffer.size}.min
|
||||||
|
slice.copy_from(buffer.to_unsafe, bytes_read)
|
||||||
|
|
||||||
|
if buffer.size > bytes_read
|
||||||
|
@buffer = buffer[bytes_read, buffer.size - bytes_read]
|
||||||
|
end
|
||||||
|
|
||||||
|
return bytes_read
|
||||||
|
else
|
||||||
|
@channel.close
|
||||||
|
return 0
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
def unbuffered_write(slice : Bytes)
|
||||||
|
raise "Write not allowed on read side" if @direction == :r
|
||||||
|
raise "Closed stream" if @closed
|
||||||
|
@channel.send slice.clone
|
||||||
|
slice.size
|
||||||
|
end
|
||||||
|
|
||||||
|
def close_channel
|
||||||
|
@channel.close
|
||||||
|
end
|
||||||
|
|
||||||
|
def unbuffered_flush
|
||||||
|
# Nothing
|
||||||
|
end
|
||||||
|
|
||||||
|
def unbuffered_rewind
|
||||||
|
raise IO::Error.new("Can't rewind")
|
||||||
|
end
|
||||||
|
|
||||||
|
def unbuffered_close
|
||||||
|
return if @closed
|
||||||
|
@closed = true
|
||||||
|
@channel.send nil
|
||||||
|
end
|
||||||
|
|
||||||
|
def self.new(mem = BUFFER_SIZE)
|
||||||
|
mem = BUFFER_SIZE if mem <= 0
|
||||||
|
|
||||||
|
capacity = (mem / BUFFER_SIZE) +
|
||||||
|
((mem % BUFFER_SIZE != 0) ? 1 : 0)
|
||||||
|
|
||||||
|
channel = Channel(Bytes?).new(capacity: mem)
|
||||||
|
|
||||||
|
{
|
||||||
|
ChanneledPipe.new(channel, :r),
|
||||||
|
ChanneledPipe.new(channel, :w),
|
||||||
|
}
|
||||||
|
end
|
||||||
|
end
|
|
@ -1,18 +1,22 @@
|
||||||
require "http"
|
require "http"
|
||||||
require "socket"
|
require "socket"
|
||||||
|
|
||||||
struct QUIC::StreamCtx
|
module QUIC
|
||||||
|
class StreamCtx
|
||||||
property request : HTTP::Request
|
property request : HTTP::Request
|
||||||
property channel : Channel(IO::FileDescriptor)
|
property io : IO::ChanneledPipe
|
||||||
property reader : IO::FileDescriptor?
|
|
||||||
property writer : IO::FileDescriptor?
|
|
||||||
|
|
||||||
def initialize(@request : HTTP::Request)
|
def initialize(@request, @io)
|
||||||
@channel = Channel(IO::FileDescriptor).new
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
class Client
|
||||||
|
STREAM_READF = ->(stream_if_ctx : Void*, buf : UInt8*, buf_len : LibC::SizeT, fin : LibC::Int) do
|
||||||
|
stream_ctx = Box(StreamCtx).unbox(stream_if_ctx)
|
||||||
|
bytes_read = stream_ctx.io.unbuffered_write Slice.new(buf, buf_len)
|
||||||
|
bytes_read.to_u64
|
||||||
end
|
end
|
||||||
end
|
|
||||||
|
|
||||||
class QUIC::Client
|
|
||||||
ON_NEW_STREAM = ->(stream_if_ctx : Void*, s : LibLsquic::StreamT) do
|
ON_NEW_STREAM = ->(stream_if_ctx : Void*, s : LibLsquic::StreamT) do
|
||||||
stream_ctx = LibLsquic.stream_conn(s)
|
stream_ctx = LibLsquic.stream_conn(s)
|
||||||
.try { |c| LibLsquic.conn_get_ctx(c) }
|
.try { |c| LibLsquic.conn_get_ctx(c) }
|
||||||
|
@ -22,12 +26,6 @@ class QUIC::Client
|
||||||
return Box.box(stream_ctx)
|
return Box.box(stream_ctx)
|
||||||
end
|
end
|
||||||
|
|
||||||
reader, writer = IO.pipe
|
|
||||||
stream_ctx.reader = reader
|
|
||||||
stream_ctx.writer = writer
|
|
||||||
|
|
||||||
stream_ctx.channel.send reader
|
|
||||||
|
|
||||||
LibLsquic.stream_wantwrite(s, 1)
|
LibLsquic.stream_wantwrite(s, 1)
|
||||||
Box.box(stream_ctx)
|
Box.box(stream_ctx)
|
||||||
end
|
end
|
||||||
|
@ -77,14 +75,11 @@ class QUIC::Client
|
||||||
end
|
end
|
||||||
|
|
||||||
ON_READ = ->(s : LibLsquic::StreamT, stream_if_ctx : Void*) do
|
ON_READ = ->(s : LibLsquic::StreamT, stream_if_ctx : Void*) do
|
||||||
begin
|
|
||||||
stream_ctx = Box(StreamCtx).unbox(stream_if_ctx)
|
stream_ctx = Box(StreamCtx).unbox(stream_if_ctx)
|
||||||
|
bytes_read = LibLsquic.stream_readf(s, STREAM_READF, Box.box(stream_ctx))
|
||||||
buffer = Bytes.new(0x600)
|
|
||||||
bytes_read = LibLsquic.stream_read(s, buffer, buffer.size)
|
|
||||||
|
|
||||||
if bytes_read > 0
|
if bytes_read > 0
|
||||||
stream_ctx.writer.try &.write buffer[0, bytes_read]
|
# Nothing
|
||||||
elsif bytes_read == 0
|
elsif bytes_read == 0
|
||||||
LibLsquic.stream_shutdown(s, 0)
|
LibLsquic.stream_shutdown(s, 0)
|
||||||
LibLsquic.stream_wantread(s, 0)
|
LibLsquic.stream_wantread(s, 0)
|
||||||
|
@ -93,22 +88,14 @@ class QUIC::Client
|
||||||
else
|
else
|
||||||
"Could not read response"
|
"Could not read response"
|
||||||
end
|
end
|
||||||
|
|
||||||
Box.box(stream_ctx)
|
Box.box(stream_ctx)
|
||||||
rescue ex
|
|
||||||
LibLsquic.stream_shutdown(s, 0)
|
|
||||||
LibLsquic.stream_wantread(s, 0)
|
|
||||||
Box.box(stream_ctx)
|
|
||||||
end
|
|
||||||
end
|
end
|
||||||
|
|
||||||
ON_CLOSE = ->(s : LibLsquic::StreamT, stream_if_ctx : Void*) do
|
ON_CLOSE = ->(s : LibLsquic::StreamT, stream_if_ctx : Void*) do
|
||||||
begin
|
|
||||||
stream_ctx = Box(StreamCtx).unbox(stream_if_ctx)
|
stream_ctx = Box(StreamCtx).unbox(stream_if_ctx)
|
||||||
stream_ctx.writer.try &.close
|
stream_ctx.io.close
|
||||||
Box.box(stream_ctx)
|
Box.box(stream_ctx)
|
||||||
rescue ex
|
|
||||||
Box.box(stream_ctx)
|
|
||||||
end
|
|
||||||
end
|
end
|
||||||
|
|
||||||
EA_PACKETS_OUT = ->(peer_ctx : Void*, specs : LibLsquic::OutSpec*, count : LibC::UInt) do
|
EA_PACKETS_OUT = ->(peer_ctx : Void*, specs : LibLsquic::OutSpec*, count : LibC::UInt) do
|
||||||
|
@ -210,10 +197,8 @@ class QUIC::Client
|
||||||
LibLsquic.conn_make_stream(conn)
|
LibLsquic.conn_make_stream(conn)
|
||||||
LibLsquic.engine_process_conns(engine)
|
LibLsquic.engine_process_conns(engine)
|
||||||
end
|
end
|
||||||
LibLsquic.conn_close(conn)
|
|
||||||
LibLsquic.engine_process_conns(engine)
|
|
||||||
LibLsquic.engine_destroy(engine)
|
|
||||||
engine_closed = true
|
engine_closed = true
|
||||||
|
LibLsquic.engine_destroy(engine)
|
||||||
end
|
end
|
||||||
|
|
||||||
buffer = Bytes.new(0x600)
|
buffer = Bytes.new(0x600)
|
||||||
|
@ -223,6 +208,7 @@ class QUIC::Client
|
||||||
LibLsquic.engine_packet_in(engine, buffer[0, bytes_read], bytes_read, socket.local_address, socket.remote_address, Box.box(socket), 0) if bytes_read != 0
|
LibLsquic.engine_packet_in(engine, buffer[0, bytes_read], bytes_read, socket.local_address, socket.remote_address, Box.box(socket), 0) if bytes_read != 0
|
||||||
LibLsquic.engine_process_conns(engine)
|
LibLsquic.engine_process_conns(engine)
|
||||||
end
|
end
|
||||||
|
socket.close
|
||||||
end
|
end
|
||||||
|
|
||||||
private def check_host_only(string : String)
|
private def check_host_only(string : String)
|
||||||
|
@ -374,12 +360,6 @@ class QUIC::Client
|
||||||
response = exec_internal_single(request)
|
response = exec_internal_single(request)
|
||||||
return handle_response(response) if response
|
return handle_response(response) if response
|
||||||
|
|
||||||
# Server probably closed the connection, so retry one
|
|
||||||
close
|
|
||||||
request.body.try &.rewind
|
|
||||||
response = exec_internal_single(request)
|
|
||||||
return handle_response(response) if response
|
|
||||||
|
|
||||||
raise "Unexpected end of http response"
|
raise "Unexpected end of http response"
|
||||||
end
|
end
|
||||||
|
|
||||||
|
@ -426,10 +406,9 @@ class QUIC::Client
|
||||||
set_defaults request
|
set_defaults request
|
||||||
run_before_request_callbacks(request)
|
run_before_request_callbacks(request)
|
||||||
|
|
||||||
stream_ctx = StreamCtx.new(request)
|
reader, writer = IO::ChanneledPipe.new
|
||||||
|
stream_ctx = StreamCtx.new(request, writer)
|
||||||
@stream_channel.send stream_ctx
|
@stream_channel.send stream_ctx
|
||||||
reader = stream_ctx.channel.receive
|
|
||||||
reader
|
reader
|
||||||
end
|
end
|
||||||
|
|
||||||
|
@ -551,4 +530,5 @@ class QUIC::Client
|
||||||
yield client, path
|
yield client, path
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
end
|
||||||
end
|
end
|
||||||
|
|
|
@ -67,10 +67,8 @@ lib LibLsquic
|
||||||
id : Uint64T
|
id : Uint64T
|
||||||
end
|
end
|
||||||
|
|
||||||
alias X__Uint8T = UInt8
|
alias Uint8T = UInt8
|
||||||
alias Uint8T = X__Uint8T
|
alias Uint64T = LibC::ULong
|
||||||
alias X__Uint64T = LibC::ULong
|
|
||||||
alias Uint64T = X__Uint64T
|
|
||||||
alias Engine = Void
|
alias Engine = Void
|
||||||
alias Conn = Void
|
alias Conn = Void
|
||||||
alias ConnCtx = Void
|
alias ConnCtx = Void
|
||||||
|
@ -163,8 +161,7 @@ lib LibLsquic
|
||||||
es_cc_algo : LibC::UInt
|
es_cc_algo : LibC::UInt
|
||||||
end
|
end
|
||||||
|
|
||||||
alias X__Uint32T = LibC::UInt
|
alias Uint32T = LibC::UInt
|
||||||
alias Uint32T = X__Uint32T
|
|
||||||
fun engine_init_settings = lsquic_engine_init_settings(x0 : EngineSettings*, engine_flags : LibC::UInt)
|
fun engine_init_settings = lsquic_engine_init_settings(x0 : EngineSettings*, engine_flags : LibC::UInt)
|
||||||
fun engine_check_settings = lsquic_engine_check_settings(settings : EngineSettings*, engine_flags : LibC::UInt, err_buf : LibC::Char*, err_buf_sz : LibC::SizeT) : LibC::Int
|
fun engine_check_settings = lsquic_engine_check_settings(settings : EngineSettings*, engine_flags : LibC::UInt, err_buf : LibC::Char*, err_buf_sz : LibC::SizeT) : LibC::Int
|
||||||
|
|
||||||
|
@ -183,8 +180,7 @@ lib LibLsquic
|
||||||
shi_lookup : (Void*, Void*, LibC::UInt, Void**, LibC::UInt* -> LibC::Int)
|
shi_lookup : (Void*, Void*, LibC::UInt, Void**, LibC::UInt* -> LibC::Int)
|
||||||
end
|
end
|
||||||
|
|
||||||
alias X__TimeT = LibC::Long
|
alias TimeT = LibC::Long
|
||||||
alias TimeT = X__TimeT
|
|
||||||
|
|
||||||
struct PackoutMemIf
|
struct PackoutMemIf
|
||||||
pmi_allocate : (Void*, Void*, LibC::UShort, LibC::Char -> Void*)
|
pmi_allocate : (Void*, Void*, LibC::UShort, LibC::Char -> Void*)
|
||||||
|
@ -276,8 +272,7 @@ lib LibLsquic
|
||||||
fun conn_close = lsquic_conn_close(x0 : ConnT)
|
fun conn_close = lsquic_conn_close(x0 : ConnT)
|
||||||
fun stream_wantread = lsquic_stream_wantread(s : StreamT, is_want : LibC::Int) : LibC::Int
|
fun stream_wantread = lsquic_stream_wantread(s : StreamT, is_want : LibC::Int) : LibC::Int
|
||||||
fun stream_read = lsquic_stream_read(s : StreamT, buf : Void*, len : LibC::SizeT) : SsizeT
|
fun stream_read = lsquic_stream_read(s : StreamT, buf : Void*, len : LibC::SizeT) : SsizeT
|
||||||
alias X__SsizeT = LibC::Long
|
alias SsizeT = LibC::Long
|
||||||
alias SsizeT = X__SsizeT
|
|
||||||
fun stream_readv = lsquic_stream_readv(s : StreamT, x1 : Iovec*, iovcnt : LibC::Int) : SsizeT
|
fun stream_readv = lsquic_stream_readv(s : StreamT, x1 : Iovec*, iovcnt : LibC::Int) : SsizeT
|
||||||
fun stream_readf = lsquic_stream_readf(s : StreamT, readf : (Void*, UInt8*, LibC::SizeT, LibC::Int -> LibC::SizeT), ctx : Void*) : SsizeT
|
fun stream_readf = lsquic_stream_readf(s : StreamT, readf : (Void*, UInt8*, LibC::SizeT, LibC::Int -> LibC::SizeT), ctx : Void*) : SsizeT
|
||||||
fun stream_wantwrite = lsquic_stream_wantwrite(s : StreamT, is_want : LibC::Int) : LibC::Int
|
fun stream_wantwrite = lsquic_stream_wantwrite(s : StreamT, is_want : LibC::Int) : LibC::Int
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue