From 605b9347a4395e14456e46d971c2b4c55bca9aca Mon Sep 17 00:00:00 2001 From: Luna Date: Wed, 21 Dec 2022 22:13:09 -0300 Subject: [PATCH] attempt to make things threaded i made them worse --- src/main.zig | 145 +++++++++++++++++++++++++++++++++++++++++---------- 1 file changed, 118 insertions(+), 27 deletions(-) diff --git a/src/main.zig b/src/main.zig index 1227401..2a5070b 100644 --- a/src/main.zig +++ b/src/main.zig @@ -18,6 +18,112 @@ const WantedMode = enum { send, recv }; const AddrTuple = struct { from: network.EndPoint, to: network.EndPoint }; const Mode = union(enum) { send: AddrTuple, recv: AddrTuple }; +const BufferQueue = std.atomic.Queue([]u8); + +const Context = struct { + frame_allocator: std.mem.Allocator, + frames: BufferQueue, + from_sock: network.Socket, + to: network.EndPoint, + to_sock: network.Socket, + + const Self = @This(); + + pub fn createNode(self: *Self, data: []const u8) !*BufferQueue.Node { + var queue_node = try self.frame_allocator.create(BufferQueue.Node); + var node_data = try self.frame_allocator.dupe(u8, data); + queue_node.data = node_data; + return queue_node; + } + pub fn destroyNode(self: *Self, node: *BufferQueue.Node) void { + self.frame_allocator.free(node.data); + self.frame_allocator.destroy(node); + } +}; + +const SINGLE_BUFFER_SIZE = 1157; +const BURST_SIZE = 50; +const MAX_JITTER_NS = 50 * std.time.ns_per_ms; + +const BurstFrame = struct { + data: [SINGLE_BUFFER_SIZE]u8, + message: []u8, +}; + +fn receiverThread(ctx: *Context) !void { + const reader = ctx.from_sock.reader(); + + var frame_count: usize = 0; + var last_frame_timestamp_ns: i128 = 0; + var state: enum { Normal, Burst } = .Normal; + + var burst_buffers: [BURST_SIZE]BurstFrame = undefined; + var burst_count: usize = 0; + + while (true) : (frame_count += 1) { + var frame: [SINGLE_BUFFER_SIZE]u8 = undefined; + const received_bytes = try reader.read(&frame); + const frame_timestamp_ns = std.time.nanoTimestamp(); + const message = frame[0..received_bytes]; + + switch (state) { + .Burst => { + logger.info("burst: {d}", .{burst_count}); + // turn incoming frame into a BurstFrame + burst_buffers[burst_count].data = frame; + burst_buffers[burst_count].message = burst_buffers[burst_count].data[0..frame.len]; + burst_count += 1; + + // once we reached the limit, dump them all, switch to Normal + if (burst_count > (burst_buffers.len - 1)) { + for (burst_buffers) |burst_buffer| { + ctx.frames.put(try ctx.createNode(burst_buffer.message)); + } + + last_frame_timestamp_ns = frame_timestamp_ns; + state = .Normal; + } + }, + .Normal => { + const delta = frame_timestamp_ns - last_frame_timestamp_ns; + + if (delta < MAX_JITTER_NS) { + // within acceptable jitter, send frame immediately to queue + last_frame_timestamp_ns = frame_timestamp_ns; + ctx.frames.put(try ctx.createNode(message)); + } else { + // we need to fill some more buffers before we can send this frame + logger.warn("unacceptable jitter detected {d}ms", .{delta}); + burst_count = 0; + state = .Burst; + } + }, + } + } +} + +fn senderThread(ctx: *Context) !void { + while (true) { + const node = (ctx.frames.get()) orelse { + // if no incoming frames, wait + std.time.sleep(100 * std.time.ns_per_ms); + continue; + }; + defer ctx.destroyNode(node); + + const received_bytes = node.data.len; + const sent_bytes = try ctx.to_sock.sendTo(ctx.to, node.data); + if (sent_bytes != received_bytes) { + logger.warn( + "tried to send {d} bytes, actually sent {d}", + .{ received_bytes, sent_bytes }, + ); + } + + std.time.sleep(375); + } +} + pub fn main() !void { try network.init(); defer network.deinit(); @@ -47,33 +153,18 @@ pub fn main() !void { var to_sock = try network.Socket.create(.ipv4, .udp); defer to_sock.close(); - const BUFSIZE = 4; - const Buffer = struct { - raw_data: [2048]u8, - message: []u8, + var queue = BufferQueue.init(); + var ctx = Context{ + .frame_allocator = allocator, + .frames = queue, + .from_sock = from_sock, + .to = to, + .to_sock = to_sock, }; + var receiver_thread = try std.Thread.spawn(.{}, receiverThread, .{&ctx}); + var sender_thread = try std.Thread.spawn(.{}, senderThread, .{&ctx}); - var receive_buffers: [BUFSIZE]Buffer = undefined; - var receive_buffer_index: usize = 0; - - const reader = from_sock.reader(); - - while (true) : (receive_buffer_index += 1) { - const array_index = receive_buffer_index % BUFSIZE; - const raw_data = &receive_buffers[array_index].raw_data; - const received_bytes = try reader.read(raw_data); - receive_buffers[array_index].message = raw_data[0..received_bytes]; - - if (receive_buffer_index > BUFSIZE) { - const index_to_send = (receive_buffer_index - BUFSIZE - 1) % BUFSIZE; - const bytes_to_send = receive_buffers[index_to_send].message; - const sent_bytes = try to_sock.sendTo(to, bytes_to_send); - if (sent_bytes != received_bytes) { - logger.warn( - "tried to send {d} bytes, actually sent {d}", - .{ received_bytes, sent_bytes }, - ); - } - } - } + receiver_thread.join(); + sender_thread.join(); + logger.info("end", .{}); }