mirror of
https://git.robbyzambito.me/zits
synced 2026-02-04 03:34:48 +00:00
Avoid queues completely
This is quite slow
This commit is contained in:
@@ -17,8 +17,6 @@ clients: std.AutoHashMapUnmanaged(usize, *ClientState) = .empty,
|
|||||||
subs_lock: std.Io.Mutex = .init,
|
subs_lock: std.Io.Mutex = .init,
|
||||||
subscriptions: std.ArrayList(Subscription) = .empty,
|
subscriptions: std.ArrayList(Subscription) = .empty,
|
||||||
|
|
||||||
msg_queue: std.Io.Queue(Message.Pub),
|
|
||||||
|
|
||||||
var keep_running = std.atomic.Value(bool).init(true);
|
var keep_running = std.atomic.Value(bool).init(true);
|
||||||
|
|
||||||
fn handleSigInt(sig: std.os.linux.SIG) callconv(.c) void {
|
fn handleSigInt(sig: std.os.linux.SIG) callconv(.c) void {
|
||||||
@@ -37,22 +35,14 @@ pub fn main(gpa: std.mem.Allocator, server_config: ServerInfo) !void {
|
|||||||
// // Register the handler for SIGINT (Ctrl+C)
|
// // Register the handler for SIGINT (Ctrl+C)
|
||||||
// std.posix.sigaction(std.posix.SIG.INT, &act, null);
|
// std.posix.sigaction(std.posix.SIG.INT, &act, null);
|
||||||
|
|
||||||
// 64 mb buffer for messages
|
|
||||||
const queue_buf = try gpa.alloc(Message.Pub, 1024 * 1024);
|
|
||||||
defer gpa.free(queue_buf);
|
|
||||||
|
|
||||||
var server: Server = .{
|
var server: Server = .{
|
||||||
.info = server_config,
|
.info = server_config,
|
||||||
.msg_queue = .init(queue_buf),
|
|
||||||
};
|
};
|
||||||
|
|
||||||
var threaded: std.Io.Threaded = .init(gpa, .{});
|
var threaded: std.Io.Threaded = .init(gpa, .{});
|
||||||
defer threaded.deinit();
|
defer threaded.deinit();
|
||||||
const io = threaded.io();
|
const io = threaded.io();
|
||||||
|
|
||||||
var msgProcess = try io.concurrent(processMsgs, .{ &server, io, gpa });
|
|
||||||
defer msgProcess.cancel(io) catch {};
|
|
||||||
|
|
||||||
var tcp_server = try std.Io.net.IpAddress.listen(try std.Io.net.IpAddress.parse(
|
var tcp_server = try std.Io.net.IpAddress.listen(try std.Io.net.IpAddress.parse(
|
||||||
server.info.host,
|
server.info.host,
|
||||||
server.info.port,
|
server.info.port,
|
||||||
@@ -75,9 +65,9 @@ pub fn main(gpa: std.mem.Allocator, server_config: ServerInfo) !void {
|
|||||||
std.debug.print("Exiting gracefully\n", .{});
|
std.debug.print("Exiting gracefully\n", .{});
|
||||||
}
|
}
|
||||||
|
|
||||||
fn processMsgs(server: *Server, io: std.Io, alloc: std.mem.Allocator) !void {
|
fn processMsgs(server: *Server, io: std.Io, alloc: std.mem.Allocator) void {
|
||||||
while (true) {
|
while (true) {
|
||||||
const msg = try server.msg_queue.getOne(io);
|
const msg = server.msg_queue.getOne(io) catch break;
|
||||||
defer msg.deinit(alloc);
|
defer msg.deinit(alloc);
|
||||||
|
|
||||||
for (server.subscriptions.items) |subscription| {
|
for (server.subscriptions.items) |subscription| {
|
||||||
@@ -86,12 +76,12 @@ fn processMsgs(server: *Server, io: std.Io, alloc: std.mem.Allocator) !void {
|
|||||||
std.debug.print("trying to publish to a client that no longer exists: {d}", .{subscription.client_id});
|
std.debug.print("trying to publish to a client that no longer exists: {d}", .{subscription.client_id});
|
||||||
continue;
|
continue;
|
||||||
};
|
};
|
||||||
try client.send(io, .{ .msg = .{
|
client.send(io, .{ .msg = .{
|
||||||
.subject = msg.subject,
|
.subject = msg.subject,
|
||||||
.sid = subscription.sid,
|
.sid = subscription.sid,
|
||||||
.reply_to = msg.reply_to,
|
.reply_to = msg.reply_to,
|
||||||
.payload = msg.payload,
|
.payload = msg.payload,
|
||||||
} });
|
} }) catch continue;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -157,8 +147,7 @@ fn handleConnection(
|
|||||||
try client_state.send(io, .pong);
|
try client_state.send(io, .pong);
|
||||||
},
|
},
|
||||||
.@"pub" => |pb| {
|
.@"pub" => |pb| {
|
||||||
// Do not free pb, server.publishMessage takes ownership.
|
try server.publishMessage(io, server_allocator, pb);
|
||||||
try server.publishMessage(io, pb);
|
|
||||||
if (client_state.connect) |c| {
|
if (client_state.connect) |c| {
|
||||||
if (c.verbose) {
|
if (c.verbose) {
|
||||||
try client_state.send(io, .@"+ok");
|
try client_state.send(io, .@"+ok");
|
||||||
@@ -194,12 +183,22 @@ fn subjectMatches(expected: []const u8, actual: []const u8) bool {
|
|||||||
return std.mem.eql(u8, expected, actual);
|
return std.mem.eql(u8, expected, actual);
|
||||||
}
|
}
|
||||||
|
|
||||||
fn publishMessage(server: *Server, io: std.Io, msg: Message.Pub) !void {
|
fn publishMessage(server: *Server, io: std.Io, gpa: std.mem.Allocator, msg: Message.Pub) !void {
|
||||||
try server.msg_queue.putOne(io, .{
|
defer msg.deinit(gpa);
|
||||||
.payload = msg.payload,
|
for (server.subscriptions.items) |subscription| {
|
||||||
.reply_to = msg.reply_to,
|
if (subjectMatches(subscription.subject, msg.subject)) {
|
||||||
.subject = msg.subject,
|
const client = server.clients.get(subscription.client_id) orelse {
|
||||||
});
|
std.debug.print("trying to publish to a client that no longer exists: {d}", .{subscription.client_id});
|
||||||
|
continue;
|
||||||
|
};
|
||||||
|
client.send(io, .{ .msg = .{
|
||||||
|
.subject = msg.subject,
|
||||||
|
.sid = subscription.sid,
|
||||||
|
.reply_to = msg.reply_to,
|
||||||
|
.payload = msg.payload,
|
||||||
|
} }) catch continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn subscribe(server: *Server, io: std.Io, gpa: std.mem.Allocator, id: usize, msg: Message.Sub) !void {
|
fn subscribe(server: *Server, io: std.Io, gpa: std.mem.Allocator, id: usize, msg: Message.Sub) !void {
|
||||||
|
|||||||
Reference in New Issue
Block a user