attempt to make things threaded

i made them worse
This commit is contained in:
Luna 2022-12-21 22:13:09 -03:00
parent f3fe9b60d2
commit 605b9347a4
1 changed files with 118 additions and 27 deletions

View File

@ -18,6 +18,112 @@ const WantedMode = enum { send, recv };
const AddrTuple = struct { from: network.EndPoint, to: network.EndPoint }; const AddrTuple = struct { from: network.EndPoint, to: network.EndPoint };
const Mode = union(enum) { send: AddrTuple, recv: AddrTuple }; const Mode = union(enum) { send: AddrTuple, recv: AddrTuple };
const BufferQueue = std.atomic.Queue([]u8);
const Context = struct {
frame_allocator: std.mem.Allocator,
frames: BufferQueue,
from_sock: network.Socket,
to: network.EndPoint,
to_sock: network.Socket,
const Self = @This();
pub fn createNode(self: *Self, data: []const u8) !*BufferQueue.Node {
var queue_node = try self.frame_allocator.create(BufferQueue.Node);
var node_data = try self.frame_allocator.dupe(u8, data);
queue_node.data = node_data;
return queue_node;
}
pub fn destroyNode(self: *Self, node: *BufferQueue.Node) void {
self.frame_allocator.free(node.data);
self.frame_allocator.destroy(node);
}
};
const SINGLE_BUFFER_SIZE = 1157;
const BURST_SIZE = 50;
const MAX_JITTER_NS = 50 * std.time.ns_per_ms;
const BurstFrame = struct {
data: [SINGLE_BUFFER_SIZE]u8,
message: []u8,
};
fn receiverThread(ctx: *Context) !void {
const reader = ctx.from_sock.reader();
var frame_count: usize = 0;
var last_frame_timestamp_ns: i128 = 0;
var state: enum { Normal, Burst } = .Normal;
var burst_buffers: [BURST_SIZE]BurstFrame = undefined;
var burst_count: usize = 0;
while (true) : (frame_count += 1) {
var frame: [SINGLE_BUFFER_SIZE]u8 = undefined;
const received_bytes = try reader.read(&frame);
const frame_timestamp_ns = std.time.nanoTimestamp();
const message = frame[0..received_bytes];
switch (state) {
.Burst => {
logger.info("burst: {d}", .{burst_count});
// turn incoming frame into a BurstFrame
burst_buffers[burst_count].data = frame;
burst_buffers[burst_count].message = burst_buffers[burst_count].data[0..frame.len];
burst_count += 1;
// once we reached the limit, dump them all, switch to Normal
if (burst_count > (burst_buffers.len - 1)) {
for (burst_buffers) |burst_buffer| {
ctx.frames.put(try ctx.createNode(burst_buffer.message));
}
last_frame_timestamp_ns = frame_timestamp_ns;
state = .Normal;
}
},
.Normal => {
const delta = frame_timestamp_ns - last_frame_timestamp_ns;
if (delta < MAX_JITTER_NS) {
// within acceptable jitter, send frame immediately to queue
last_frame_timestamp_ns = frame_timestamp_ns;
ctx.frames.put(try ctx.createNode(message));
} else {
// we need to fill some more buffers before we can send this frame
logger.warn("unacceptable jitter detected {d}ms", .{delta});
burst_count = 0;
state = .Burst;
}
},
}
}
}
fn senderThread(ctx: *Context) !void {
while (true) {
const node = (ctx.frames.get()) orelse {
// if no incoming frames, wait
std.time.sleep(100 * std.time.ns_per_ms);
continue;
};
defer ctx.destroyNode(node);
const received_bytes = node.data.len;
const sent_bytes = try ctx.to_sock.sendTo(ctx.to, node.data);
if (sent_bytes != received_bytes) {
logger.warn(
"tried to send {d} bytes, actually sent {d}",
.{ received_bytes, sent_bytes },
);
}
std.time.sleep(375);
}
}
pub fn main() !void { pub fn main() !void {
try network.init(); try network.init();
defer network.deinit(); defer network.deinit();
@ -47,33 +153,18 @@ pub fn main() !void {
var to_sock = try network.Socket.create(.ipv4, .udp); var to_sock = try network.Socket.create(.ipv4, .udp);
defer to_sock.close(); defer to_sock.close();
const BUFSIZE = 4; var queue = BufferQueue.init();
const Buffer = struct { var ctx = Context{
raw_data: [2048]u8, .frame_allocator = allocator,
message: []u8, .frames = queue,
.from_sock = from_sock,
.to = to,
.to_sock = to_sock,
}; };
var receiver_thread = try std.Thread.spawn(.{}, receiverThread, .{&ctx});
var sender_thread = try std.Thread.spawn(.{}, senderThread, .{&ctx});
var receive_buffers: [BUFSIZE]Buffer = undefined; receiver_thread.join();
var receive_buffer_index: usize = 0; sender_thread.join();
logger.info("end", .{});
const reader = from_sock.reader();
while (true) : (receive_buffer_index += 1) {
const array_index = receive_buffer_index % BUFSIZE;
const raw_data = &receive_buffers[array_index].raw_data;
const received_bytes = try reader.read(raw_data);
receive_buffers[array_index].message = raw_data[0..received_bytes];
if (receive_buffer_index > BUFSIZE) {
const index_to_send = (receive_buffer_index - BUFSIZE - 1) % BUFSIZE;
const bytes_to_send = receive_buffers[index_to_send].message;
const sent_bytes = try to_sock.sendTo(to, bytes_to_send);
if (sent_bytes != received_bytes) {
logger.warn(
"tried to send {d} bytes, actually sent {d}",
.{ received_bytes, sent_bytes },
);
}
}
}
} }