Probe for optimal network buffer size.

We want to match the underlying system socket buffer.
Filling this buffer minimizes the number of syscalls we do.
Larger would be a waste.

Also changed parser to use enums that more closely match the NATS
message types.
This commit is contained in:
2026-01-04 20:25:30 -05:00
parent e81bcda920
commit 69528a1b72
3 changed files with 159 additions and 97 deletions

View File

@@ -41,29 +41,29 @@ pub fn start(self: *Client, io: std.Io, alloc: std.mem.Allocator) !void {
for (0..len) |i| { for (0..len) |i| {
const msg = msgs[i]; const msg = msgs[i];
defer switch (msg) { defer switch (msg) {
.msg => |m| m.deinit(alloc), .MSG => |m| m.deinit(alloc),
.hmsg => |h| h.deinit(alloc), .HMSG => |h| h.deinit(alloc),
else => {}, else => {},
}; };
errdefer for (msgs[i + 1 .. len]) |mg| switch (mg) { errdefer for (msgs[i + 1 .. len]) |mg| switch (mg) {
.msg => |m| { .MSG => |m| {
m.deinit(alloc); m.deinit(alloc);
}, },
else => {}, else => {},
}; };
switch (msg) { switch (msg) {
.@"+ok" => { .@"+OK" => {
_ = try self.to_client.write("+OK\r\n"); _ = try self.to_client.write("+OK\r\n");
}, },
.pong => { .PONG => {
_ = try self.to_client.write("PONG\r\n"); _ = try self.to_client.write("PONG\r\n");
}, },
.info => |info| { .INFO => |info| {
_ = try self.to_client.write("INFO "); _ = try self.to_client.write("INFO ");
try std.json.Stringify.value(info, .{}, self.to_client); try std.json.Stringify.value(info, .{}, self.to_client);
_ = try self.to_client.write("\r\n"); _ = try self.to_client.write("\r\n");
}, },
.msg => |m| { .MSG => |m| {
@branchHint(.likely); @branchHint(.likely);
try self.to_client.print( try self.to_client.print(
"MSG {s} {s} {s} {d}\r\n{s}\r\n", "MSG {s} {s} {s} {d}\r\n{s}\r\n",
@@ -76,7 +76,7 @@ pub fn start(self: *Client, io: std.Io, alloc: std.mem.Allocator) !void {
}, },
); );
}, },
.hmsg => |hmsg| { .HMSG => |hmsg| {
@branchHint(.likely); @branchHint(.likely);
try self.to_client.print("HMSG {s} {s} {s} {d} {d}\r\n{s}\r\n", .{ try self.to_client.print("HMSG {s} {s} {s} {d} {d}\r\n{s}\r\n", .{
hmsg.msg.subject, hmsg.msg.subject,
@@ -87,7 +87,7 @@ pub fn start(self: *Client, io: std.Io, alloc: std.mem.Allocator) !void {
hmsg.msg.payload, hmsg.msg.payload,
}); });
}, },
.@"-err" => |s| { .@"-ERR" => |s| {
_ = try self.to_client.print("-ERR '{s}'\r\n", .{s}); _ = try self.to_client.print("-ERR '{s}'\r\n", .{s});
}, },
else => |m| { else => |m| {

View File

@@ -4,6 +4,7 @@ const ArrayList = std.ArrayList;
const AutoHashMapUnmanaged = std.AutoHashMapUnmanaged; const AutoHashMapUnmanaged = std.AutoHashMapUnmanaged;
const Io = std.Io; const Io = std.Io;
const Dir = Io.Dir;
const Group = Io.Group; const Group = Io.Group;
const IpAddress = std.Io.net.IpAddress; const IpAddress = std.Io.net.IpAddress;
const Mutex = Io.Mutex; const Mutex = Io.Mutex;
@@ -21,6 +22,7 @@ pub const Subscription = struct {
subject: []const u8, subject: []const u8,
client_id: usize, client_id: usize,
sid: []const u8, sid: []const u8,
queue: *Queue(Message),
fn deinit(self: Subscription, alloc: Allocator) void { fn deinit(self: Subscription, alloc: Allocator) void {
alloc.free(self.subject); alloc.free(self.subject);
@@ -63,13 +65,24 @@ pub fn start(server: *Server, io: Io, gpa: Allocator) !void {
var client_group: Group = .init; var client_group: Group = .init;
defer client_group.cancel(io); defer client_group.cancel(io);
const read_buffer_size, const write_buffer_size = getBufferSizes(io);
log.debug("read buf: {d} write buf: {d}", .{ read_buffer_size, write_buffer_size });
var id: usize = 0; var id: usize = 0;
while (true) : (id +%= 1) { while (true) : (id +%= 1) {
if (server.clients.contains(id)) continue; if (server.clients.contains(id)) continue;
log.debug("Accepting next client", .{}); log.debug("Accepting next client", .{});
const stream = try tcp_server.accept(io); const stream = try tcp_server.accept(io);
log.debug("Accepted connection {d}", .{id}); log.debug("Accepted connection {d}", .{id});
_ = client_group.concurrent(io, handleConnectionInfallible, .{ server, gpa, io, id, stream }) catch { _ = client_group.concurrent(io, handleConnectionInfallible, .{
server,
gpa,
io,
id,
stream,
read_buffer_size,
write_buffer_size,
}) catch {
log.err("Could not start concurrent handler for {d}", .{id}); log.err("Could not start concurrent handler for {d}", .{id});
stream.close(io); stream.close(io);
}; };
@@ -96,13 +109,29 @@ fn removeClient(server: *Server, io: Io, allocator: Allocator, id: usize) void {
} }
} }
fn handleConnectionInfallible(server: *Server, server_allocator: Allocator, io: Io, id: usize, stream: Stream) void { fn handleConnectionInfallible(
handleConnection(server, server_allocator, io, id, stream) catch |err| { server: *Server,
server_allocator: Allocator,
io: Io,
id: usize,
stream: Stream,
r_buf_size: usize,
w_buf_size: usize,
) void {
handleConnection(server, server_allocator, io, id, stream, r_buf_size, w_buf_size) catch |err| {
log.err("Failed processing client {d}: {any}", .{ id, err }); log.err("Failed processing client {d}: {any}", .{ id, err });
}; };
} }
fn handleConnection(server: *Server, server_allocator: Allocator, io: Io, id: usize, stream: Stream) !void { fn handleConnection(
server: *Server,
server_allocator: Allocator,
io: Io,
id: usize,
stream: Stream,
r_buf_size: usize,
w_buf_size: usize,
) !void {
defer stream.close(io); defer stream.close(io);
// TODO: use a client allocator for things that should only live for as long as the client? // TODO: use a client allocator for things that should only live for as long as the client?
@@ -111,26 +140,27 @@ fn handleConnection(server: *Server, server_allocator: Allocator, io: Io, id: us
// messages when done processing them (usually outside the client process, ie: publish). // messages when done processing them (usually outside the client process, ie: publish).
// Set up client writer // Set up client writer
// TODO: how many bytes can fit in a network write syscall? cat /proc/sys/net/core/wmem_max const w_buffer: []u8 = try server_allocator.alloc(u8, w_buf_size);
var w_buffer: [212992]u8 = undefined; defer server_allocator.free(w_buffer);
var writer = stream.writer(io, &w_buffer); var writer = stream.writer(io, w_buffer);
const out = &writer.interface; const out = &writer.interface;
// Set up client reader // Set up client reader
// TODO: how many bytes can fit in a network read syscall? cat /proc/sys/net/core/rmem_max const r_buffer: []u8 = try server_allocator.alloc(u8, r_buf_size);
var r_buffer: [212992]u8 = undefined; defer server_allocator.free(r_buffer);
var reader = stream.reader(io, &r_buffer); var reader = stream.reader(io, r_buffer);
const in = &reader.interface; const in = &reader.interface;
// Set up buffer queue // Set up buffer queue
var qbuf: [r_buffer.len / @sizeOf(Message)]Message = undefined; const qbuf: []Message = try server_allocator.alloc(Message, r_buffer.len / @sizeOf(Message));
var queue: Queue(Message) = .init(&qbuf); defer server_allocator.free(qbuf);
var queue: Queue(Message) = .init(qbuf);
defer { defer {
queue.close(io); queue.close(io);
while (queue.getOne(io)) |msg| { while (queue.getOne(io)) |msg| {
switch (msg) { switch (msg) {
.msg => |m| m.deinit(server_allocator), .MSG => |m| m.deinit(server_allocator),
.hmsg => |h| h.deinit(server_allocator), .HMSG => |h| h.deinit(server_allocator),
else => {}, else => {},
} }
} else |_| {} } else |_| {}
@@ -144,7 +174,7 @@ fn handleConnection(server: *Server, server_allocator: Allocator, io: Io, id: us
defer server.removeClient(io, server_allocator, id); defer server.removeClient(io, server_allocator, id);
// Do initial handshake with client // Do initial handshake with client
try queue.putOne(io, .{ .info = server.info }); try queue.putOne(io, .{ .INFO = server.info });
var client_task = try io.concurrent(Client.start, .{ &client, io, server_allocator }); var client_task = try io.concurrent(Client.start, .{ &client, io, server_allocator });
defer client_task.cancel(io) catch {}; defer client_task.cancel(io) catch {};
@@ -152,27 +182,28 @@ fn handleConnection(server: *Server, server_allocator: Allocator, io: Io, id: us
// Messages are owned by the server after they are received from the client // Messages are owned by the server after they are received from the client
while (client.next(server_allocator)) |msg| { while (client.next(server_allocator)) |msg| {
switch (msg) { switch (msg) {
.ping => { .PING => {
// Respond to ping with pong. // Respond to ping with pong.
try client.send(io, .pong); try client.send(io, .PONG);
}, },
.@"pub", .hpub => { .PUB, .HPUB => {
@branchHint(.likely);
defer switch (msg) { defer switch (msg) {
.@"pub" => |pb| pb.deinit(server_allocator), .PUB => |pb| pb.deinit(server_allocator),
.hpub => |hp| hp.deinit(server_allocator), .HPUB => |hp| hp.deinit(server_allocator),
else => unreachable, else => unreachable,
}; };
try server.publishMessage(io, server_allocator, &client, msg); try server.publishMessage(io, server_allocator, &client, msg);
}, },
.sub => |sub| { .SUB => |sub| {
defer sub.deinit(server_allocator); defer sub.deinit(server_allocator);
try server.subscribe(io, server_allocator, id, sub); try server.subscribe(io, server_allocator, id, &queue, sub);
}, },
.unsub => |unsub| { .UNSUB => |unsub| {
defer unsub.deinit(server_allocator); defer unsub.deinit(server_allocator);
try server.unsubscribe(io, server_allocator, id, unsub); try server.unsubscribe(io, server_allocator, id, unsub);
}, },
.connect => |connect| { .CONNECT => |connect| {
if (client.connect) |*current| { if (client.connect) |*current| {
current.deinit(server_allocator); current.deinit(server_allocator);
} }
@@ -227,53 +258,46 @@ test subjectMatches {
} }
fn publishMessage(server: *Server, io: Io, alloc: Allocator, source_client: *Client, msg: Message) !void { fn publishMessage(server: *Server, io: Io, alloc: Allocator, source_client: *Client, msg: Message) !void {
errdefer { defer if (source_client.connect) |c| {
if (source_client.connect) |c| {
if (c.verbose) { if (c.verbose) {
source_client.send(io, .{ .@"-err" = "Slow Consumer" }) catch {}; source_client.send(io, .@"+OK") catch {};
}
}
} }
};
const subject = switch (msg) { const subject = switch (msg) {
.@"pub" => |pb| pb.subject, .PUB => |pb| pb.subject,
.hpub => |hp| hp.@"pub".subject, .HPUB => |hp| hp.@"pub".subject,
else => unreachable, else => unreachable,
}; };
try server.subs_lock.lock(io); try server.subs_lock.lock(io);
defer server.subs_lock.unlock(io); defer server.subs_lock.unlock(io);
for (server.subscriptions.items) |subscription| { for (server.subscriptions.items) |subscription| {
if (subjectMatches(subscription.subject, subject)) { if (subjectMatches(subscription.subject, subject)) {
const client = server.clients.get(subscription.client_id) orelse {
log.debug("Trying to publish to a client that no longer exists: {d}\n", .{subscription.client_id});
continue;
};
switch (msg) { switch (msg) {
.@"pub" => |pb| client.send(io, .{ .PUB => |pb| {
.msg = try pb.toMsg(alloc, subscription.sid), try subscription.queue.putOne(io, .{
}) catch |err| switch (err) { .MSG = try pb.toMsg(alloc, subscription.sid),
error.Canceled => return err, });
else => {},
}, },
.hpub => |hp| client.send(io, .{ .hmsg = try hp.toHMsg( .HPUB => |hp| {
alloc, try subscription.queue.putOne(io, .{
subscription.sid, .HMSG = try hp.toHMsg(alloc, subscription.sid),
) }) catch |err| switch (err) { });
error.Canceled => return err,
else => {},
}, },
else => unreachable, else => unreachable,
} }
} }
} }
if (source_client.connect) |c| {
if (c.verbose) {
source_client.send(io, .@"+ok") catch {};
}
}
} }
fn subscribe(server: *Server, io: Io, gpa: Allocator, id: usize, msg: Message.Sub) !void { fn subscribe(
server: *Server,
io: Io,
gpa: Allocator,
id: usize,
queue: *Queue(Message),
msg: Message.Sub,
) !void {
try server.subs_lock.lock(io); try server.subs_lock.lock(io);
defer server.subs_lock.unlock(io); defer server.subs_lock.unlock(io);
const subject = try gpa.dupe(u8, msg.subject); const subject = try gpa.dupe(u8, msg.subject);
@@ -284,10 +308,17 @@ fn subscribe(server: *Server, io: Io, gpa: Allocator, id: usize, msg: Message.Su
.subject = subject, .subject = subject,
.client_id = id, .client_id = id,
.sid = sid, .sid = sid,
.queue = queue,
}); });
} }
fn unsubscribe(server: *Server, io: Io, gpa: Allocator, id: usize, msg: Message.Unsub) !void { fn unsubscribe(
server: *Server,
io: Io,
gpa: Allocator,
id: usize,
msg: Message.Unsub,
) !void {
try server.subs_lock.lock(io); try server.subs_lock.lock(io);
defer server.subs_lock.unlock(io); defer server.subs_lock.unlock(io);
const len = server.subscriptions.items.len; const len = server.subscriptions.items.len;
@@ -301,5 +332,36 @@ fn unsubscribe(server: *Server, io: Io, gpa: Allocator, id: usize, msg: Message.
} }
} }
const parseUnsigned = std.fmt.parseUnsigned;
fn getBufferSizes(io: Io) struct { usize, usize } {
const default_size = 4 * 1024;
const default = .{ default_size, default_size };
const dir = Dir.openDirAbsolute(io, "/proc/sys/net/core", .{}) catch {
log.err("couldn't open /proc/sys/net/core", .{});
return default;
};
var buf: [64]u8 = undefined;
const rmem_max = readBufferSize(io, dir, "rmem_max", &buf, default_size);
const wmem_max = readBufferSize(io, dir, "wmem_max", &buf, default_size);
return .{ rmem_max, wmem_max };
}
fn readBufferSize(io: Io, dir: anytype, filename: []const u8, buf: []u8, default: usize) usize {
const bytes = dir.readFile(io, filename, buf) catch |err| {
log.err("couldn't open {s}: {any}", .{ filename, err });
return default;
};
return parseUnsigned(usize, bytes[0 .. bytes.len - 1], 10) catch |err| {
log.err("couldn't parse {s}: {any}", .{ bytes[0 .. bytes.len - 1], err });
return default;
};
}
pub const default_id = "server-id-123"; pub const default_id = "server-id-123";
pub const default_name = "Zits Server"; pub const default_name = "Zits Server";

View File

@@ -20,18 +20,18 @@ const log = std.log;
pub const MessageType = @typeInfo(Message).@"union".tag_type.?; pub const MessageType = @typeInfo(Message).@"union".tag_type.?;
pub const Message = union(enum) { pub const Message = union(enum) {
info: ServerInfo, INFO: ServerInfo,
connect: Connect, CONNECT: Connect,
@"pub": Pub, PUB: Pub,
hpub: HPub, HPUB: HPub,
sub: Sub, SUB: Sub,
unsub: Unsub, UNSUB: Unsub,
msg: Msg, MSG: Msg,
hmsg: HMsg, HMSG: HMsg,
ping, PING,
pong, PONG,
@"+ok": void, @"+OK": void,
@"-err": []const u8, @"-ERR": []const u8,
pub const ServerInfo = struct { pub const ServerInfo = struct {
/// The unique identifier of the NATS server. /// The unique identifier of the NATS server.
server_id: []const u8, server_id: []const u8,
@@ -220,15 +220,15 @@ pub const Message = union(enum) {
const client_types = StaticStringMap(MessageType).initComptime( const client_types = StaticStringMap(MessageType).initComptime(
.{ .{
// {"INFO", .info}, // {"INFO", .info},
.{ "CONNECT", .connect }, .{ @tagName(.CONNECT), .CONNECT },
.{ "PUB", .@"pub" }, .{ @tagName(.PUB), .PUB },
.{ "HPUB", .hpub }, .{ @tagName(.HPUB), .HPUB },
.{ "SUB", .sub }, .{ @tagName(.SUB), .SUB },
.{ "UNSUB", .unsub }, .{ @tagName(.UNSUB), .UNSUB },
// {"MSG", .msg}, // {"MSG", .msg},
// {"HMSG", .hmsg}, // {"HMSG", .hmsg},
.{ "PING", .ping }, .{ @tagName(.PING), .PING },
.{ "PONG", .pong }, .{ @tagName(.PONG), .PONG },
// {"+OK", .@"+ok"}, // {"+OK", .@"+ok"},
// {"-ERR", .@"-err"}, // {"-ERR", .@"-err"},
}, },
@@ -267,7 +267,7 @@ pub const Message = union(enum) {
errdefer log.err("Failed to parse {s}", .{operation_string.items}); errdefer log.err("Failed to parse {s}", .{operation_string.items});
switch (operation) { switch (operation) {
.connect => { .CONNECT => {
// for storing the json string // for storing the json string
var connect_string_writer_allocating: AllocatingWriter = .init(alloc); var connect_string_writer_allocating: AllocatingWriter = .init(alloc);
defer connect_string_writer_allocating.deinit(); defer connect_string_writer_allocating.deinit();
@@ -295,28 +295,28 @@ pub const Message = union(enum) {
.{ .allocate = .alloc_always }, .{ .allocate = .alloc_always },
); );
return .{ .connect = try res.dupe(alloc) }; return .{ .CONNECT = try res.dupe(alloc) };
}, },
.@"pub" => { .PUB => {
@branchHint(.likely); @branchHint(.likely);
return parsePub(alloc, in); return parsePub(alloc, in);
}, },
.hpub => { .HPUB => {
@branchHint(.likely); @branchHint(.likely);
return parseHPub(alloc, in); return parseHPub(alloc, in);
}, },
.ping => { .PING => {
try expectStreamBytes(in, "\r\n"); try expectStreamBytes(in, "\r\n");
return .ping; return .PING;
}, },
.pong => { .PONG => {
try expectStreamBytes(in, "\r\n"); try expectStreamBytes(in, "\r\n");
return .pong; return .PONG;
}, },
.sub => { .SUB => {
return parseSub(alloc, in); return parseSub(alloc, in);
}, },
.unsub => { .UNSUB => {
return parseUnsub(alloc, in); return parseUnsub(alloc, in);
}, },
else => |msg| std.debug.panic("Not implemented: {}\n", .{msg}), else => |msg| std.debug.panic("Not implemented: {}\n", .{msg}),
@@ -385,7 +385,7 @@ fn parseSub(alloc: Allocator, in: *Reader) !Message {
} }
return .{ return .{
.sub = .{ .SUB = .{
.subject = subject, .subject = subject,
.queue_group = if (third) |_| try second.toOwnedSlice(alloc) else null, .queue_group = if (third) |_| try second.toOwnedSlice(alloc) else null,
.sid = if (third) |*t| try t.toOwnedSlice(alloc) else try second.toOwnedSlice(alloc), .sid = if (third) |*t| try t.toOwnedSlice(alloc) else try second.toOwnedSlice(alloc),
@@ -546,7 +546,7 @@ fn parseUnsub(alloc: Allocator, in: *Reader) !Message {
} }
return .{ return .{
.unsub = .{ .UNSUB = .{
.sid = try first.toOwnedSlice(alloc), .sid = try first.toOwnedSlice(alloc),
.max_msgs = if (second) |s| try parseUnsigned(usize, s.items, 10) else null, .max_msgs = if (second) |s| try parseUnsigned(usize, s.items, 10) else null,
}, },
@@ -671,7 +671,7 @@ fn parsePub(alloc: Allocator, in: *Reader) !Message {
try expectStreamBytes(in, "\r\n"); try expectStreamBytes(in, "\r\n");
return .{ return .{
.@"pub" = .{ .PUB = .{
.subject = subject, .subject = subject,
.payload = try payload.toOwnedSlice(), .payload = try payload.toOwnedSlice(),
.reply_to = reply_to, .reply_to = reply_to,
@@ -843,7 +843,7 @@ fn parseHPub(alloc: Allocator, in: *Reader) !Message {
try expectStreamBytes(in, "\r\n"); try expectStreamBytes(in, "\r\n");
return .{ return .{
.hpub = .{ .HPUB = .{
.header_bytes = header_bytes, .header_bytes = header_bytes,
.@"pub" = .{ .@"pub" = .{
.subject = subject, .subject = subject,