This commit is contained in:
2026-01-02 02:13:13 +00:00
parent 9ee8317cb0
commit 1e3c21f150
2 changed files with 102 additions and 56 deletions

View File

@@ -5,7 +5,9 @@ const Client = @This();
connect: ?Message.Connect,
// Messages for this client to receive.
// Messages to send to this client.
send_queue: ?*std.Io.Queue(Message) = null,
// Messages received from this client.
recv_queue: ?*std.Io.Queue(Message) = null,
from_client: *std.Io.Reader,
@@ -23,70 +25,102 @@ pub fn init(
};
}
pub fn start(self: *Client, io: std.Io, alloc: std.mem.Allocator, queue: *std.Io.Queue(Message)) !void {
self.recv_queue = queue;
pub fn start(
self: *Client,
io: std.Io,
alloc: std.mem.Allocator,
send_queue: *std.Io.Queue(Message),
recv_queue: *std.Io.Queue(Message),
) !void {
self.send_queue = send_queue;
self.recv_queue = recv_queue;
var recvL = try io.concurrent(startSendLoop, .{ self, io, alloc });
defer recvL.cancel(io) catch {};
var sendL = try io.concurrent(startRecvLoop, .{ self, io, alloc });
defer sendL.cancel(io) catch {};
// Wait for one of the tasks to cancel.
_ = try io.select(.{
.recv = &recvL,
.send = &sendL,
});
}
fn startSendLoop(self: *Client, io: std.Io, alloc: std.mem.Allocator) !void {
var msgs: [8]Message = undefined;
while (true) {
const len = try queue.get(io, &msgs, 1);
std.debug.assert(len <= msgs.len);
for (0..len) |i| {
const msg = msgs[i];
defer switch (msg) {
.msg => |m| m.deinit(alloc),
else => {},
};
errdefer {
for (msgs[i + 1 .. len]) |mg| switch (mg) {
.msg => |m| {
m.deinit(alloc);
},
if (self.send_queue) |queue| {
while (true) {
const len = try queue.get(io, &msgs, 1);
std.debug.assert(len <= msgs.len);
for (0..len) |i| {
const msg = msgs[i];
defer switch (msg) {
.msg => |m| m.deinit(alloc),
else => {},
};
}
switch (msg) {
.@"+ok" => {
_ = try self.to_client.write("+OK\r\n");
},
.pong => {
_ = try self.to_client.write("PONG\r\n");
},
.info => |info| {
_ = try self.to_client.write("INFO ");
try std.json.Stringify.value(info, .{}, self.to_client);
_ = try self.to_client.write("\r\n");
},
.msg => |m| {
try self.to_client.print(
"MSG {s} {s} {s} {d}\r\n{s}\r\n",
.{
m.subject,
m.sid,
m.reply_to orelse "",
m.payload.len,
m.payload,
errdefer {
for (msgs[i + 1 .. len]) |mg| switch (mg) {
.msg => |m| {
m.deinit(alloc);
},
);
},
.@"-err" => |s| {
_ = try self.to_client.print("-ERR '{s}'\r\n", .{s});
},
else => |m| {
std.debug.panic("unimplemented write: {any}\n", .{m});
},
else => {},
};
}
switch (msg) {
.@"+ok" => {
_ = try self.to_client.write("+OK\r\n");
},
.pong => {
_ = try self.to_client.write("PONG\r\n");
},
.info => |info| {
_ = try self.to_client.write("INFO ");
try std.json.Stringify.value(info, .{}, self.to_client);
_ = try self.to_client.write("\r\n");
},
.msg => |m| {
try self.to_client.print(
"MSG {s} {s} {s} {d}\r\n{s}\r\n",
.{
m.subject,
m.sid,
m.reply_to orelse "",
m.payload.len,
m.payload,
},
);
},
.@"-err" => |s| {
_ = try self.to_client.print("-ERR '{s}'\r\n", .{s});
},
else => |m| {
std.debug.panic("unimplemented write: {any}\n", .{m});
},
}
}
try self.to_client.flush();
}
try self.to_client.flush();
}
} else unreachable;
}
fn startRecvLoop(self: *Client, io: std.Io, alloc: std.mem.Allocator) !void {
if (self.recv_queue) |queue| {
while (Message.next(alloc, self.from_client)) |msg| {
try queue.putOne(io, msg);
} else |_| {}
} else unreachable;
}
pub fn send(self: *Client, io: std.Io, msg: Message) !void {
if (self.recv_queue) |queue| {
if (self.send_queue) |queue| {
try queue.putOne(io, msg);
} else @panic("Must start() the client before sending it messages.");
}
pub fn next(self: *Client, allocator: std.mem.Allocator) !Message {
return Message.next(allocator, self.from_client);
return;
}
test {

View File

@@ -160,11 +160,11 @@ fn handleConnection(
try server.addClient(server_allocator, id, &client);
defer server.removeClient(io, server_allocator, id);
var qbuf: [8]Message = undefined;
var queue: std.Io.Queue(Message) = .init(&qbuf);
var sdqbuf: [8]Message = undefined;
var send_queue: std.Io.Queue(Message) = .init(&sdqbuf);
defer {
queue.close(io);
while (queue.getOne(io)) |msg| {
send_queue.close(io);
while (send_queue.getOne(io)) |msg| {
switch (msg) {
.msg => |m| m.deinit(server_allocator),
else => {},
@@ -172,7 +172,19 @@ fn handleConnection(
} else |_| {}
}
var client_task = try io.concurrent(Client.start, .{ &client, io, server_allocator, &queue });
var rcqbuf: [8]Message = undefined;
var recv_queue: std.Io.Queue(Message) = .init(&rcqbuf);
defer {
recv_queue.close(io);
while (recv_queue.getOne(io)) |msg| {
switch (msg) {
.msg => |m| m.deinit(server_allocator),
else => {},
}
} else |_| {}
}
var client_task = try io.concurrent(Client.start, .{ &client, io, server_allocator, &send_queue, &recv_queue });
defer client_task.cancel(io) catch {};
try io.sleep(std.Io.Duration.fromMilliseconds(5), .real);