starting zero alloc parsing

This commit is contained in:
2026-01-07 17:26:10 -05:00
parent e2a60c9427
commit 96a3705069
6 changed files with 1001 additions and 914 deletions

View File

@@ -11,29 +11,23 @@ pub const Msgs = union(enum) {
};
connect: ?Message.Connect,
// Used to own messages that we receive in our queues.
alloc: std.mem.Allocator,
// Messages for this client to receive.
recv_queue: *Queue(Message),
msg_queue: *Queue(Msgs),
// Byte queue for this client to receive.
recv_queue: *Queue(u8),
// Only necessary to hold this lock for writing to the queue (to avoid interleaving message writes).
recv_queue_write_lock: std.Io.Mutex = .init,
from_client: *std.Io.Reader,
to_client: *std.Io.Writer,
pub fn init(
connect: ?Message.Connect,
alloc: std.mem.Allocator,
recv_queue: *Queue(Message),
msg_queue: *Queue(Msgs),
recv_queue: *Queue(u8),
in: *std.Io.Reader,
out: *std.Io.Writer,
) Client {
return .{
.connect = connect,
.alloc = alloc,
.recv_queue = recv_queue,
.msg_queue = msg_queue,
.from_client = in,
.to_client = out,
};
@@ -47,97 +41,91 @@ pub fn deinit(self: *Client, alloc: std.mem.Allocator) void {
}
pub fn start(self: *Client, io: std.Io) !void {
var msgs_buf: [1024]Msgs = undefined;
var recv_msgs_task = io.concurrent(Queue(Msgs).get, .{ self.msg_queue, io, &msgs_buf, 1 }) catch @panic("Concurrency unavailable");
errdefer _ = recv_msgs_task.cancel(io) catch {};
var recv_proto_task = io.concurrent(Queue(Message).getOne, .{ self.recv_queue, io }) catch unreachable;
errdefer _ = recv_proto_task.cancel(io) catch {};
std.debug.assert(self.to_client.buffer.len > 0);
std.debug.assert(self.to_client.end == 0);
while (true) {
switch (try io.select(.{ .msgs = &recv_msgs_task, .proto = &recv_proto_task })) {
.msgs => |len_err| {
@branchHint(.likely);
const msgs = msgs_buf[0..try len_err];
for (0..msgs.len) |i| {
const msg = msgs[i];
defer switch (msg) {
.MSG => |m| m.deinit(self.alloc),
.HMSG => |h| h.deinit(self.alloc),
};
errdefer for (msgs[i + 1 ..]) |mg| switch (mg) {
.MSG => |m| {
m.deinit(self.alloc);
},
.HMSG => |h| {
h.deinit(self.alloc);
},
};
switch (msg) {
.MSG => |m| {
try self.to_client.print(
"MSG {s} {s} {s} {d}\r\n",
.{
m.subject,
m.sid,
m.reply_to orelse "",
m.payload.len,
},
);
try m.payload.write(self.to_client);
try self.to_client.print("\r\n", .{});
},
.HMSG => |hmsg| {
try self.to_client.print("HMSG {s} {s} {s} {d} {d}\r\n", .{
hmsg.msg.subject,
hmsg.msg.sid,
hmsg.msg.reply_to orelse "",
hmsg.header_bytes,
hmsg.msg.payload.len,
});
try hmsg.msg.payload.write(self.to_client);
try self.to_client.print("\r\n", .{});
},
}
}
recv_msgs_task = io.concurrent(Queue(Msgs).get, .{ self.msg_queue, io, &msgs_buf, 1 }) catch unreachable;
},
.proto => |msg_err| {
@branchHint(.unlikely);
const msg = try msg_err;
switch (msg) {
.@"+OK" => {
_ = try self.to_client.write("+OK\r\n");
},
.PONG => {
_ = try self.to_client.write("PONG\r\n");
},
.INFO => |info| {
_ = try self.to_client.write("INFO ");
try std.json.Stringify.value(info, .{}, self.to_client);
_ = try self.to_client.write("\r\n");
},
.@"-ERR" => |s| {
_ = try self.to_client.print("-ERR '{s}'\r\n", .{s});
},
else => |m| {
std.debug.panic("unimplemented write: {any}\n", .{m});
},
}
recv_proto_task = io.concurrent(Queue(Message).getOne, .{ self.recv_queue, io }) catch unreachable;
},
}
self.to_client.end = try self.recv_queue.get(io, self.to_client.buffer, 1);
try self.to_client.flush();
}
// while (true) {
// switch (try io.select(.{ .msgs = &recv_msgs_task, .proto = &recv_proto_task })) {
// .msgs => |len_err| {
// @branchHint(.likely);
// const msgs = msgs_buf[0..try len_err];
// for (0..msgs.len) |i| {
// const msg = msgs[i];
// defer switch (msg) {
// .MSG => |m| m.deinit(self.alloc),
// .HMSG => |h| h.deinit(self.alloc),
// };
// errdefer for (msgs[i + 1 ..]) |mg| switch (mg) {
// .MSG => |m| {
// m.deinit(self.alloc);
// },
// .HMSG => |h| {
// h.deinit(self.alloc);
// },
// };
// switch (msg) {
// .MSG => |m| {
// try self.to_client.print(
// "MSG {s} {s} {s} {d}\r\n",
// .{
// m.subject,
// m.sid,
// m.reply_to orelse "",
// m.payload.len,
// },
// );
// try m.payload.write(self.to_client);
// try self.to_client.print("\r\n", .{});
// },
// .HMSG => |hmsg| {
// try self.to_client.print("HMSG {s} {s} {s} {d} {d}\r\n", .{
// hmsg.msg.subject,
// hmsg.msg.sid,
// hmsg.msg.reply_to orelse "",
// hmsg.header_bytes,
// hmsg.msg.payload.len,
// });
// try hmsg.msg.payload.write(self.to_client);
// try self.to_client.print("\r\n", .{});
// },
// }
// }
// recv_msgs_task = io.concurrent(Queue(Msgs).get, .{ self.msg_queue, io, &msgs_buf, 1 }) catch unreachable;
// },
// .proto => |msg_err| {
// @branchHint(.unlikely);
// const msg = try msg_err;
// switch (msg) {
// .@"+OK" => {
// _ = try self.to_client.write("+OK\r\n");
// },
// .PONG => {
// _ = try self.to_client.write("PONG\r\n");
// },
// .INFO => |info| {
// _ = try self.to_client.write("INFO ");
// try std.json.Stringify.value(info, .{}, self.to_client);
// _ = try self.to_client.write("\r\n");
// },
// .@"-ERR" => |s| {
// _ = try self.to_client.print("-ERR '{s}'\r\n", .{s});
// },
// else => |m| {
// std.debug.panic("unimplemented write: {any}\n", .{m});
// },
// }
// recv_proto_task = io.concurrent(Queue(Message).getOne, .{ self.recv_queue, io }) catch unreachable;
// },
// }
// try self.to_client.flush();
// }
}
pub fn send(self: *Client, io: std.Io, msg: Message) !void {
switch (msg) {
.MSG => |m| try self.msg_queue.putOne(io, .{ .MSG = m }),
.HMSG => |m| try self.msg_queue.putOne(io, .{ .HMSG = m }),
else => try self.recv_queue.putOne(io, msg),
}
pub fn send(self: *Client, io: std.Io, msg: []const u8) !void {
try self.recv_queue.putAll(io, msg);
}
test send {
@@ -148,19 +136,15 @@ test send {
var buf: [1024]u8 = undefined;
break :blk &buf;
});
var recv_queue: Queue(Message) = .init(&.{});
var msgs_queue: Queue(Msgs) = .init(blk: {
var buf: [1]Msgs = undefined;
break :blk &buf;
});
var client: Client = .init(null, gpa, &recv_queue, &msgs_queue, undefined, &to_client);
var recv_queue: Queue(u8) = .init(&.{});
var client: Client = .init(null, &recv_queue, undefined, &to_client);
defer client.deinit(gpa);
var c_task = try io.concurrent(Client.start, .{ &client, io });
defer c_task.cancel(io) catch {};
{
try client.send(io, .PONG);
try client.send(io, "PONG\r\n");
// Wait for the concurrent client task to write to the writer
try io.sleep(.fromMilliseconds(1), .awake);
try std.testing.expectEqualSlices(u8, "PONG\r\n", to_client.buffered());