Simplified queue access

Also correctly move resetting the task to the end instead of defer.
We don't want to reset the task in the case of an error, so shouldn't
use defer.
This commit is contained in:
2026-01-05 13:56:31 -05:00
parent 80d14f7303
commit 1d2af4a69a

View File

@@ -1,5 +1,6 @@
const Message = @import("message_parser.zig").Message;
const std = @import("std");
const Queue = std.Io.Queue;
const Client = @This();
@@ -11,16 +12,16 @@ pub const Msgs = union(enum) {
connect: ?Message.Connect,
// Messages for this client to receive.
recv_queue: *std.Io.Queue(Message),
msg_queue: *std.Io.Queue(Msgs),
recv_queue: *Queue(Message),
msg_queue: *Queue(Msgs),
from_client: *std.Io.Reader,
to_client: *std.Io.Writer,
pub fn init(
connect: ?Message.Connect,
recv_queue: *std.Io.Queue(Message),
msg_queue: *std.Io.Queue(Msgs),
recv_queue: *Queue(Message),
msg_queue: *Queue(Msgs),
in: *std.Io.Reader,
out: *std.Io.Writer,
) Client {
@@ -43,18 +44,17 @@ pub fn deinit(self: *Client, alloc: std.mem.Allocator) void {
pub fn start(self: *Client, io: std.Io, alloc: std.mem.Allocator) !void {
var msgs_buf: [1024]Msgs = undefined;
var recv_msgs_task = io.concurrent(recvMsgsMsg, .{ self, io, &msgs_buf }) catch @panic("Concurrency unavailable");
var recv_msgs_task = io.concurrent(Queue(Msgs).get, .{ self.msg_queue, io, &msgs_buf, 1 }) catch @panic("Concurrency unavailable");
errdefer _ = recv_msgs_task.cancel(io) catch {};
var recv_proto_task = io.concurrent(recvProtoMsg, .{ self, io }) catch unreachable;
var recv_proto_task = io.concurrent(Queue(Message).getOne, .{ self.recv_queue, io }) catch unreachable;
errdefer _ = recv_proto_task.cancel(io) catch {};
while (true) {
switch (try io.select(.{ .msgs = &recv_msgs_task, .proto = &recv_proto_task })) {
.msgs => |msgs_err| {
.msgs => |len_err| {
@branchHint(.likely);
defer recv_msgs_task = io.concurrent(recvMsgsMsg, .{ self, io, &msgs_buf }) catch unreachable;
const msgs = try msgs_err;
const msgs = msgs_buf[0..try len_err];
for (0..msgs.len) |i| {
const msg = msgs[i];
defer switch (msg) {
@@ -96,10 +96,10 @@ pub fn start(self: *Client, io: std.Io, alloc: std.mem.Allocator) !void {
},
}
}
recv_msgs_task = io.concurrent(Queue(Msgs).get, .{ self.msg_queue, io, &msgs_buf, 1 }) catch unreachable;
},
.proto => |msg_err| {
@branchHint(.unlikely);
defer recv_proto_task = io.concurrent(recvProtoMsg, .{ self, io }) catch unreachable;
const msg = try msg_err;
switch (msg) {
.@"+OK" => {
@@ -120,21 +120,13 @@ pub fn start(self: *Client, io: std.Io, alloc: std.mem.Allocator) !void {
std.debug.panic("unimplemented write: {any}\n", .{m});
},
}
recv_proto_task = io.concurrent(Queue(Message).getOne, .{ self.recv_queue, io }) catch unreachable;
},
}
try self.to_client.flush();
}
}
fn recvProtoMsg(self: *Client, io: std.Io) !Message {
return self.recv_queue.getOne(io);
}
fn recvMsgsMsg(self: *Client, io: std.Io, buf: []Msgs) ![]Msgs {
const len = try self.msg_queue.get(io, buf, 1);
return buf[0..len];
}
pub fn send(self: *Client, io: std.Io, msg: Message) !void {
try self.recv_queue.putOne(io, msg);
}