const std = @import("std"); const network = @import("network"); const logger = std.log.scoped(.udp2udp2udp); fn incomingAddr(args_it: anytype) !network.EndPoint { const addr_string = args_it.next().?; const port_string = args_it.next().?; const port = try std.fmt.parseInt(u16, port_string, 10); const zig_addr = try std.net.Address.parseIp(addr_string, port); const socklen = zig_addr.getOsSockLen(); return network.EndPoint.fromSocketAddress(&zig_addr.any, socklen); } const WantedMode = enum { send, recv }; const AddrTuple = struct { from: network.EndPoint, to: network.EndPoint }; const Mode = union(enum) { send: AddrTuple, recv: AddrTuple }; const BufferQueue = std.atomic.Queue([]u8); const Context = struct { frame_allocator: std.mem.Allocator, frames: BufferQueue, from_sock: network.Socket, to: network.EndPoint, to_sock: network.Socket, const Self = @This(); pub fn createNode(self: *Self, data: []const u8) !*BufferQueue.Node { var queue_node = try self.frame_allocator.create(BufferQueue.Node); var node_data = try self.frame_allocator.dupe(u8, data); queue_node.data = node_data; return queue_node; } pub fn destroyNode(self: *Self, node: *BufferQueue.Node) void { self.frame_allocator.free(node.data); self.frame_allocator.destroy(node); } }; const SINGLE_BUFFER_SIZE = 1157; const BURST_SIZE = 50; const MAX_JITTER_NS = 50 * std.time.ns_per_ms; const BurstFrame = struct { data: [SINGLE_BUFFER_SIZE]u8, message: []u8, }; fn receiverThread(ctx: *Context) !void { const reader = ctx.from_sock.reader(); var frame_count: usize = 0; var last_frame_timestamp_ns: i128 = 0; var state: enum { Normal, Burst } = .Normal; var burst_buffers: [BURST_SIZE]BurstFrame = undefined; var burst_count: usize = 0; while (true) : (frame_count += 1) { var frame: [SINGLE_BUFFER_SIZE]u8 = undefined; const received_bytes = try reader.read(&frame); const frame_timestamp_ns = std.time.nanoTimestamp(); const message = frame[0..received_bytes]; switch (state) { .Burst => { logger.info("burst: {d}", .{burst_count}); // turn incoming frame into a BurstFrame burst_buffers[burst_count].data = frame; burst_buffers[burst_count].message = burst_buffers[burst_count].data[0..frame.len]; burst_count += 1; // once we reached the limit, dump them all, switch to Normal if (burst_count > (burst_buffers.len - 1)) { for (burst_buffers) |burst_buffer| { ctx.frames.put(try ctx.createNode(burst_buffer.message)); } last_frame_timestamp_ns = frame_timestamp_ns; state = .Normal; } }, .Normal => { const delta = frame_timestamp_ns - last_frame_timestamp_ns; if (delta < MAX_JITTER_NS) { // within acceptable jitter, send frame immediately to queue last_frame_timestamp_ns = frame_timestamp_ns; ctx.frames.put(try ctx.createNode(message)); } else { // we need to fill some more buffers before we can send this frame logger.warn("unacceptable jitter detected {d}ms", .{delta}); burst_count = 0; state = .Burst; } }, } } } fn senderThread(ctx: *Context) !void { while (true) { const node = (ctx.frames.get()) orelse { // if no incoming frames, wait std.time.sleep(100 * std.time.ns_per_ms); continue; }; defer ctx.destroyNode(node); const received_bytes = node.data.len; const sent_bytes = try ctx.to_sock.sendTo(ctx.to, node.data); if (sent_bytes != received_bytes) { logger.warn( "tried to send {d} bytes, actually sent {d}", .{ received_bytes, sent_bytes }, ); } std.time.sleep(375); } } pub fn main() !void { try network.init(); defer network.deinit(); var gpa = std.heap.GeneralPurposeAllocator(.{}){}; defer { _ = gpa.deinit(); } const allocator = gpa.allocator(); var args_it = try std.process.argsWithAllocator(allocator); defer args_it.deinit(); _ = args_it.skip(); const from = try incomingAddr(&args_it); const to = try incomingAddr(&args_it); std.log.info("taking udp from: {s}", .{from}); std.log.info("spitting udp to: {s}", .{to}); logger.info("init udp", .{}); var from_sock = try network.Socket.create(.ipv4, .udp); defer from_sock.close(); try from_sock.enablePortReuse(true); try from_sock.bind(from); var to_sock = try network.Socket.create(.ipv4, .udp); defer to_sock.close(); var queue = BufferQueue.init(); var ctx = Context{ .frame_allocator = allocator, .frames = queue, .from_sock = from_sock, .to = to, .to_sock = to_sock, }; var receiver_thread = try std.Thread.spawn(.{}, receiverThread, .{&ctx}); var sender_thread = try std.Thread.spawn(.{}, senderThread, .{&ctx}); receiver_thread.join(); sender_thread.join(); logger.info("end", .{}); }