Heap allocate client buffers

This commit is contained in:
2026-01-01 03:07:19 +00:00
parent 0233bc278c
commit 4fcb9e3943

View File

@@ -43,6 +43,7 @@ pub fn main(gpa: std.mem.Allocator, server_config: ServerInfo) !void {
defer threaded.deinit(); defer threaded.deinit();
const io = threaded.io(); const io = threaded.io();
var tcp_server = try std.Io.net.IpAddress.listen(try std.Io.net.IpAddress.parse( var tcp_server = try std.Io.net.IpAddress.listen(try std.Io.net.IpAddress.parse(
server.info.host, server.info.host,
server.info.port, server.info.port,
@@ -121,12 +122,14 @@ fn handleConnection(
const allocator = client_allocator.allocator(); const allocator = client_allocator.allocator();
defer stream.close(io); defer stream.close(io);
var w_buffer: [4096]u8 = undefined; const w_buffer: []u8 = try allocator.alloc(u8, 1024);
var writer = stream.writer(io, &w_buffer); defer allocator.free(w_buffer);
var writer = stream.writer(io, w_buffer);
const out = &writer.interface; const out = &writer.interface;
var r_buffer: [8192]u8 = undefined; const r_buffer: []u8 = try allocator.alloc(u8, 1024);
var reader = stream.reader(io, &r_buffer); defer allocator.free(r_buffer);
var reader = stream.reader(io, r_buffer);
const in = &reader.interface; const in = &reader.interface;
var client_state: ClientState = .init(null, in, out); var client_state: ClientState = .init(null, in, out);
@@ -147,12 +150,7 @@ fn handleConnection(
try client_state.send(io, .pong); try client_state.send(io, .pong);
}, },
.@"pub" => |pb| { .@"pub" => |pb| {
try server.publishMessage(io, server_allocator, pb); _ = io.async(publishMessage, .{ server, io, server_allocator, &client_state, pb });
if (client_state.connect) |c| {
if (c.verbose) {
try client_state.send(io, .@"+ok");
}
}
}, },
.sub => |sub| { .sub => |sub| {
try server.subscribe(io, server_allocator, id, sub); try server.subscribe(io, server_allocator, id, sub);
@@ -183,7 +181,14 @@ fn subjectMatches(expected: []const u8, actual: []const u8) bool {
return std.mem.eql(u8, expected, actual); return std.mem.eql(u8, expected, actual);
} }
fn publishMessage(server: *Server, io: std.Io, gpa: std.mem.Allocator, msg: Message.Pub) !void { fn publishMessage(server: *Server, io: std.Io, gpa: std.mem.Allocator, source_client: *ClientState, msg: Message.Pub) !void {
errdefer {
if (source_client.connect) |c| {
if (c.verbose) {
source_client.send(io, .@"-err") catch {};
}
}
}
defer msg.deinit(gpa); defer msg.deinit(gpa);
for (server.subscriptions.items) |subscription| { for (server.subscriptions.items) |subscription| {
if (subjectMatches(subscription.subject, msg.subject)) { if (subjectMatches(subscription.subject, msg.subject)) {
@@ -199,6 +204,11 @@ fn publishMessage(server: *Server, io: std.Io, gpa: std.mem.Allocator, msg: Mess
} }) catch continue; } }) catch continue;
} }
} }
if (source_client.connect) |c| {
if (c.verbose) {
source_client.send(io, .@"+ok") catch {};
}
}
} }
fn subscribe(server: *Server, io: std.Io, gpa: std.mem.Allocator, id: usize, msg: Message.Sub) !void { fn subscribe(server: *Server, io: std.Io, gpa: std.mem.Allocator, id: usize, msg: Message.Sub) !void {