Use client allocator to own incoming messages to a client

This commit is contained in:
2026-01-06 10:03:03 -05:00
parent 318d467f5c
commit 6e9f6998bd
2 changed files with 40 additions and 29 deletions

View File

@@ -10,6 +10,8 @@ pub const Msgs = union(enum) {
}; };
connect: ?Message.Connect, connect: ?Message.Connect,
// Used to own messages that we receive in our queues.
alloc: std.mem.Allocator,
// Messages for this client to receive. // Messages for this client to receive.
recv_queue: *Queue(Message), recv_queue: *Queue(Message),
@@ -20,6 +22,7 @@ to_client: *std.Io.Writer,
pub fn init( pub fn init(
connect: ?Message.Connect, connect: ?Message.Connect,
alloc: std.mem.Allocator,
recv_queue: *Queue(Message), recv_queue: *Queue(Message),
msg_queue: *Queue(Msgs), msg_queue: *Queue(Msgs),
in: *std.Io.Reader, in: *std.Io.Reader,
@@ -27,6 +30,7 @@ pub fn init(
) Client { ) Client {
return .{ return .{
.connect = connect, .connect = connect,
.alloc = alloc,
.recv_queue = recv_queue, .recv_queue = recv_queue,
.msg_queue = msg_queue, .msg_queue = msg_queue,
.from_client = in, .from_client = in,
@@ -41,7 +45,7 @@ pub fn deinit(self: *Client, alloc: std.mem.Allocator) void {
self.* = undefined; self.* = undefined;
} }
pub fn start(self: *Client, io: std.Io, alloc: std.mem.Allocator) !void { pub fn start(self: *Client, io: std.Io) !void {
var msgs_buf: [1024]Msgs = undefined; var msgs_buf: [1024]Msgs = undefined;
var recv_msgs_task = io.concurrent(Queue(Msgs).get, .{ self.msg_queue, io, &msgs_buf, 1 }) catch @panic("Concurrency unavailable"); var recv_msgs_task = io.concurrent(Queue(Msgs).get, .{ self.msg_queue, io, &msgs_buf, 1 }) catch @panic("Concurrency unavailable");
@@ -58,15 +62,15 @@ pub fn start(self: *Client, io: std.Io, alloc: std.mem.Allocator) !void {
for (0..msgs.len) |i| { for (0..msgs.len) |i| {
const msg = msgs[i]; const msg = msgs[i];
defer switch (msg) { defer switch (msg) {
.MSG => |m| m.deinit(alloc), .MSG => |m| m.deinit(self.alloc),
.HMSG => |h| h.deinit(alloc), .HMSG => |h| h.deinit(self.alloc),
}; };
errdefer for (msgs[i + 1 ..]) |mg| switch (mg) { errdefer for (msgs[i + 1 ..]) |mg| switch (mg) {
.MSG => |m| { .MSG => |m| {
m.deinit(alloc); m.deinit(self.alloc);
}, },
.HMSG => |h| { .HMSG => |h| {
h.deinit(alloc); h.deinit(self.alloc);
}, },
}; };
switch (msg) { switch (msg) {

View File

@@ -19,11 +19,16 @@ pub const Client = @import("./Client.zig");
const Msgs = Client.Msgs; const Msgs = Client.Msgs;
const Server = @This(); const Server = @This();
const builtin = @import("builtin");
const safe_build = builtin.mode == .Debug or builtin.mode == .ReleaseSafe;
pub const Subscription = struct { pub const Subscription = struct {
subject: []const u8, subject: []const u8,
client_id: usize, client_id: usize,
sid: []const u8, sid: []const u8,
queue: *Queue(Msgs), queue: *Queue(Msgs),
// used to alloc messages in the queue
alloc: Allocator,
fn deinit(self: Subscription, alloc: Allocator) void { fn deinit(self: Subscription, alloc: Allocator) void {
alloc.free(self.subject); alloc.free(self.subject);
@@ -47,6 +52,7 @@ pub fn deinit(server: *Server, io: Io, alloc: Allocator) void {
for (server.subscriptions.items) |sub| { for (server.subscriptions.items) |sub| {
sub.deinit(alloc); sub.deinit(alloc);
} }
// TODO drain subscription queues
server.subscriptions.deinit(alloc); server.subscriptions.deinit(alloc);
server.clients.deinit(alloc); server.clients.deinit(alloc);
} }
@@ -135,44 +141,44 @@ fn handleConnection(
) !void { ) !void {
defer stream.close(io); defer stream.close(io);
// TODO: use a client allocator for things that should only live for as long as the client? var dba: std.heap.DebugAllocator(.{}) = .init;
// I had this before, but it seemed to have made lifetimes harder to track. dba.backing_allocator = server_allocator;
// Messages made sense to parse using a client allocator, but that makes it hard to free defer _ = dba.deinit();
// messages when done processing them (usually outside the client process, ie: publish). const alloc = if (safe_build) dba.allocator() else server_allocator;
// Set up client writer // Set up client writer
const w_buffer: []u8 = try server_allocator.alloc(u8, w_buf_size); const w_buffer: []u8 = try alloc.alloc(u8, w_buf_size);
defer server_allocator.free(w_buffer); defer alloc.free(w_buffer);
var writer = stream.writer(io, w_buffer); var writer = stream.writer(io, w_buffer);
const out = &writer.interface; const out = &writer.interface;
// Set up client reader // Set up client reader
const r_buffer: []u8 = try server_allocator.alloc(u8, r_buf_size); const r_buffer: []u8 = try alloc.alloc(u8, r_buf_size);
defer server_allocator.free(r_buffer); defer alloc.free(r_buffer);
var reader = stream.reader(io, r_buffer); var reader = stream.reader(io, r_buffer);
const in = &reader.interface; const in = &reader.interface;
// Set up buffer queue // Set up buffer queue
const qbuf: []Message = try server_allocator.alloc(Message, 16); const qbuf: []Message = try alloc.alloc(Message, 16);
defer server_allocator.free(qbuf); defer alloc.free(qbuf);
var recv_queue: Queue(Message) = .init(qbuf); var recv_queue: Queue(Message) = .init(qbuf);
defer recv_queue.close(io); defer recv_queue.close(io);
const mbuf: []Msgs = try server_allocator.alloc(Msgs, w_buf_size / @sizeOf(Msgs)); const mbuf: []Msgs = try alloc.alloc(Msgs, w_buf_size / @sizeOf(Msgs));
defer server_allocator.free(mbuf); defer alloc.free(mbuf);
var msgs_queue: Queue(Msgs) = .init(mbuf); var msgs_queue: Queue(Msgs) = .init(mbuf);
defer { defer {
msgs_queue.close(io); msgs_queue.close(io);
while (msgs_queue.getOne(io)) |msg| { while (msgs_queue.getOne(io)) |msg| {
switch (msg) { switch (msg) {
.MSG => |m| m.deinit(server_allocator), .MSG => |m| m.deinit(alloc),
.HMSG => |h| h.deinit(server_allocator), .HMSG => |h| h.deinit(alloc),
} }
} else |_| {} } else |_| {}
} }
// Create client // Create client
var client: Client = .init(null, &recv_queue, &msgs_queue, in, out); var client: Client = .init(null, alloc, &recv_queue, &msgs_queue, in, out);
defer client.deinit(server_allocator); defer client.deinit(server_allocator);
try server.addClient(server_allocator, id, &client); try server.addClient(server_allocator, id, &client);
@@ -182,7 +188,7 @@ fn handleConnection(
// try recv_queue.putOne(io, .PONG); // try recv_queue.putOne(io, .PONG);
try recv_queue.putOne(io, .{ .INFO = server.info }); try recv_queue.putOne(io, .{ .INFO = server.info });
var client_task = try io.concurrent(Client.start, .{ &client, io, server_allocator }); var client_task = try io.concurrent(Client.start, .{ &client, io });
defer client_task.cancel(io) catch {}; defer client_task.cancel(io) catch {};
// Messages are owned by the server after they are received from the client // Messages are owned by the server after they are received from the client
@@ -195,16 +201,16 @@ fn handleConnection(
.PUB => |pb| { .PUB => |pb| {
@branchHint(.likely); @branchHint(.likely);
defer pb.deinit(server_allocator); defer pb.deinit(server_allocator);
try server.publishMessage(io, server_allocator, &client, msg); try server.publishMessage(io, &client, msg);
}, },
.HPUB => |hp| { .HPUB => |hp| {
@branchHint(.likely); @branchHint(.likely);
defer hp.deinit(server_allocator); defer hp.deinit(server_allocator);
try server.publishMessage(io, server_allocator, &client, msg); try server.publishMessage(io, &client, msg);
}, },
.SUB => |sub| { .SUB => |sub| {
defer sub.deinit(server_allocator); defer sub.deinit(server_allocator);
try server.subscribe(io, server_allocator, id, &msgs_queue, sub); try server.subscribe(io, server_allocator, client, id, sub);
}, },
.UNSUB => |unsub| { .UNSUB => |unsub| {
defer unsub.deinit(server_allocator); defer unsub.deinit(server_allocator);
@@ -264,7 +270,7 @@ test subjectMatches {
try expect(subjectMatches("foo.>", "foo.bar.baz")); try expect(subjectMatches("foo.>", "foo.bar.baz"));
} }
fn publishMessage(server: *Server, io: Io, alloc: Allocator, source_client: *Client, msg: Message) !void { fn publishMessage(server: *Server, io: Io, source_client: *Client, msg: Message) !void {
defer if (source_client.connect) |c| { defer if (source_client.connect) |c| {
if (c.verbose) { if (c.verbose) {
source_client.send(io, .@"+OK") catch {}; source_client.send(io, .@"+OK") catch {};
@@ -283,12 +289,12 @@ fn publishMessage(server: *Server, io: Io, alloc: Allocator, source_client: *Cli
switch (msg) { switch (msg) {
.PUB => |pb| { .PUB => |pb| {
try subscription.queue.putOne(io, .{ try subscription.queue.putOne(io, .{
.MSG = try pb.toMsg(alloc, subscription.sid), .MSG = try pb.toMsg(subscription.alloc, subscription.sid),
}); });
}, },
.HPUB => |hp| { .HPUB => |hp| {
try subscription.queue.putOne(io, .{ try subscription.queue.putOne(io, .{
.HMSG = try hp.toHMsg(alloc, subscription.sid), .HMSG = try hp.toHMsg(subscription.alloc, subscription.sid),
}); });
}, },
else => unreachable, else => unreachable,
@@ -301,8 +307,8 @@ fn subscribe(
server: *Server, server: *Server,
io: Io, io: Io,
gpa: Allocator, gpa: Allocator,
client: Client,
id: usize, id: usize,
queue: *Queue(Msgs),
msg: Message.Sub, msg: Message.Sub,
) !void { ) !void {
try server.subs_lock.lock(io); try server.subs_lock.lock(io);
@@ -315,7 +321,8 @@ fn subscribe(
.subject = subject, .subject = subject,
.client_id = id, .client_id = id,
.sid = sid, .sid = sid,
.queue = queue, .queue = client.msg_queue,
.alloc = client.alloc,
}); });
} }