Actually fast again???

way faster than before even??

coder@08714a4174bb:~$ nats bench pub foo -s localhost:4223
05:12:23 Starting Core NATS publisher benchmark [clients=1, msg-size=128 B, msgs=100,000, multi-subject=false, multi-subject-max=100,000, sleep=0s, subject=foo]
05:12:23 [1] Starting Core NATS publisher, publishing 100,000 messages
Finished      0s [====================================================================================] 100%

NATS Core NATS publisher stats: 574,666 msgs/sec ~ 70 MiB/sec ~ 1.74us

So cool.

src/server/client.zig JJ: M src/server/main.zig JJ: JJ: Lines starting with "JJ:" (like this one) will be 
removed.
This commit is contained in:
2026-01-01 05:12:51 +00:00
parent 5dea33367e
commit 45bd63dbe1
3 changed files with 56 additions and 35 deletions

View File

@@ -1,4 +1,5 @@
const std = @import("std"); const std = @import("std");
const builtin = @import("builtin");
const zits = @import("zits"); const zits = @import("zits");
const yazap = @import("yazap"); const yazap = @import("yazap");
@@ -8,7 +9,7 @@ const Server = zits.Server;
pub fn main() !void { pub fn main() !void {
var dba: std.heap.DebugAllocator(.{}) = .init; var dba: std.heap.DebugAllocator(.{}) = .init;
defer _ = dba.deinit(); defer _ = dba.deinit();
const gpa = dba.allocator(); const gpa = if (builtin.mode == .Debug or builtin.mode == .ReleaseSafe) dba.allocator() else std.heap.smp_allocator;
var app = yazap.App.init(gpa, "zits", "High performance NATS compatible client and server."); var app = yazap.App.init(gpa, "zits", "High performance NATS compatible client and server.");
defer app.deinit(); defer app.deinit();

View File

@@ -5,7 +5,8 @@ const Client = @This();
connect: ?Message.Connect, connect: ?Message.Connect,
write_lock: std.Io.Mutex, // Messages for this client to receive.
recv_queue: ?*std.Io.Queue(Message) = null,
from_client: *std.Io.Reader, from_client: *std.Io.Reader,
to_client: *std.Io.Writer, to_client: *std.Io.Writer,
@@ -17,17 +18,18 @@ pub fn init(
) Client { ) Client {
return .{ return .{
.connect = connect, .connect = connect,
.write_lock = .init,
.from_client = in, .from_client = in,
.to_client = out, .to_client = out,
}; };
} }
/// Return true if the value was put in the clients buffer to process, else false. pub fn start(self: *Client, io: std.Io, alloc: std.mem.Allocator, queue: *std.Io.Queue(Message)) !void {
pub fn send(self: *Client, io: std.Io, msg: Message) !void { self.recv_queue = queue;
try self.write_lock.lock(io); var msgs: [16]Message = undefined;
defer self.write_lock.unlock(io); while (true) {
const len = try queue.get(io, &msgs, 1);
std.debug.assert(len <= msgs.len);
for (msgs[0..len]) |msg| {
switch (msg) { switch (msg) {
.@"+ok" => { .@"+ok" => {
try writeOk(self.to_client); try writeOk(self.to_client);
@@ -39,12 +41,21 @@ pub fn send(self: *Client, io: std.Io, msg: Message) !void {
try writeInfo(self.to_client, info); try writeInfo(self.to_client, info);
}, },
.msg => |m| { .msg => |m| {
defer m.deinit(alloc);
try writeMsg(self.to_client, m); try writeMsg(self.to_client, m);
}, },
else => { else => |m| {
std.debug.panic("unimplemented write", .{}); std.debug.panic("unimplemented write: {any}\n", .{m});
}, },
} }
}
}
}
pub fn send(self: *Client, io: std.Io, msg: Message) !void {
if (self.recv_queue) |queue| {
try queue.putOne(io, msg);
} else @panic("Must start() the client before sending it messages.");
} }
pub fn next(self: *Client, allocator: std.mem.Allocator) !Message { pub fn next(self: *Client, allocator: std.mem.Allocator) !Message {

View File

@@ -1,4 +1,5 @@
const std = @import("std"); const std = @import("std");
const builtin = @import("builtin");
const Message = @import("./message_parser.zig").Message; const Message = @import("./message_parser.zig").Message;
pub const ServerInfo = Message.ServerInfo; pub const ServerInfo = Message.ServerInfo;
@@ -120,16 +121,16 @@ fn handleConnection(
var client_allocator: std.heap.DebugAllocator(.{}) = .init; var client_allocator: std.heap.DebugAllocator(.{}) = .init;
client_allocator.backing_allocator = server_allocator; client_allocator.backing_allocator = server_allocator;
defer _ = client_allocator.deinit(); defer _ = client_allocator.deinit();
const allocator = client_allocator.allocator(); const allocator = if (builtin.mode == .Debug or builtin.mode == .ReleaseSafe) client_allocator.allocator() else server_allocator;
// Set up client writer // Set up client writer
const w_buffer: []u8 = try allocator.alloc(u8, 1024); const w_buffer: []u8 = try allocator.alloc(u8, 1024 * 10);
defer allocator.free(w_buffer); defer allocator.free(w_buffer);
var writer = stream.writer(io, w_buffer); var writer = stream.writer(io, w_buffer);
const out = &writer.interface; const out = &writer.interface;
// Set up client reader // Set up client reader
const r_buffer: []u8 = try allocator.alloc(u8, 1024); const r_buffer: []u8 = try allocator.alloc(u8, 1024 * 10);
defer allocator.free(r_buffer); defer allocator.free(r_buffer);
var reader = stream.reader(io, r_buffer); var reader = stream.reader(io, r_buffer);
const in = &reader.interface; const in = &reader.interface;
@@ -139,6 +140,14 @@ fn handleConnection(
try server.addClient(server_allocator, id, &client); try server.addClient(server_allocator, id, &client);
defer server.removeClient(io, server_allocator, id); defer server.removeClient(io, server_allocator, id);
var qbuf: [1024]Message = undefined;
var queue: std.Io.Queue(Message) = .init(&qbuf);
var client_task = try io.concurrent(Client.start, .{ &client, io, server_allocator, &queue });
defer client_task.cancel(io) catch {};
try io.sleep(std.Io.Duration.fromMilliseconds(5), .real);
// Do initial handshake with client // Do initial handshake with client
try client.send(io, .{ .info = server.info }); try client.send(io, .{ .info = server.info });
var connect_arena: std.heap.ArenaAllocator = .init(allocator); var connect_arena: std.heap.ArenaAllocator = .init(allocator);
@@ -153,7 +162,7 @@ fn handleConnection(
try client.send(io, .pong); try client.send(io, .pong);
}, },
.@"pub" => |pb| { .@"pub" => |pb| {
_ = io.async(publishMessage, .{ server, io, server_allocator, &client, pb }); try server.publishMessage(io, server_allocator, &client, pb);
}, },
.sub => |sub| { .sub => |sub| {
try server.subscribe(io, server_allocator, id, sub); try server.subscribe(io, server_allocator, id, sub);
@@ -167,7 +176,7 @@ fn handleConnection(
} }
} else |err| switch (err) { } else |err| switch (err) {
error.EndOfStream => { error.EndOfStream => {
std.debug.print("Client {d} disconnected", .{}); std.debug.print("Client {d} disconnected\n", .{id});
}, },
else => { else => {
return err; return err;
@@ -186,7 +195,7 @@ fn subjectMatches(expected: []const u8, actual: []const u8) bool {
return std.mem.eql(u8, expected, actual); return std.mem.eql(u8, expected, actual);
} }
fn publishMessage(server: *Server, io: std.Io, gpa: std.mem.Allocator, source_client: *Client, msg: Message.Pub) !void { fn publishMessage(server: *Server, io: std.Io, alloc: std.mem.Allocator, source_client: *Client, msg: Message.Pub) !void {
errdefer { errdefer {
if (source_client.connect) |c| { if (source_client.connect) |c| {
if (c.verbose) { if (c.verbose) {
@@ -194,7 +203,7 @@ fn publishMessage(server: *Server, io: std.Io, gpa: std.mem.Allocator, source_cl
} }
} }
} }
defer msg.deinit(gpa); defer msg.deinit(alloc);
for (server.subscriptions.items) |subscription| { for (server.subscriptions.items) |subscription| {
if (subjectMatches(subscription.subject, msg.subject)) { if (subjectMatches(subscription.subject, msg.subject)) {
const client = server.clients.get(subscription.client_id) orelse { const client = server.clients.get(subscription.client_id) orelse {
@@ -202,10 +211,10 @@ fn publishMessage(server: *Server, io: std.Io, gpa: std.mem.Allocator, source_cl
continue; continue;
}; };
client.send(io, .{ .msg = .{ client.send(io, .{ .msg = .{
.subject = msg.subject, .subject = try alloc.dupe(u8, msg.subject),
.sid = subscription.sid, .sid = try alloc.dupe(u8, subscription.sid),
.reply_to = msg.reply_to, .reply_to = if (msg.reply_to) |r| try alloc.dupe(u8, r) else null,
.payload = msg.payload, .payload = try alloc.dupe(u8, msg.payload),
} }) catch continue; } }) catch continue;
} }
} }