mirror of
https://git.robbyzambito.me/zits
synced 2026-02-04 03:34:48 +00:00
slow again :(
This commit is contained in:
@@ -44,6 +44,9 @@ pub fn start(self: *Client, io: std.Io, alloc: std.mem.Allocator, queue: *std.Io
|
|||||||
defer m.deinit(alloc);
|
defer m.deinit(alloc);
|
||||||
try writeMsg(self.to_client, m);
|
try writeMsg(self.to_client, m);
|
||||||
},
|
},
|
||||||
|
.@"-err" => |s| {
|
||||||
|
try writeErr(self.to_client, s);
|
||||||
|
},
|
||||||
else => |m| {
|
else => |m| {
|
||||||
std.debug.panic("unimplemented write: {any}\n", .{m});
|
std.debug.panic("unimplemented write: {any}\n", .{m});
|
||||||
},
|
},
|
||||||
@@ -71,6 +74,11 @@ fn writeOk(out: *std.Io.Writer) !void {
|
|||||||
try out.flush();
|
try out.flush();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn writeErr(out: *std.Io.Writer, msg: []const u8) !void {
|
||||||
|
_ = try out.print("-ERR '{s}'\r\n", .{msg});
|
||||||
|
try out.flush();
|
||||||
|
}
|
||||||
|
|
||||||
fn writePong(out: *std.Io.Writer) !void {
|
fn writePong(out: *std.Io.Writer) !void {
|
||||||
_ = try out.write("PONG\r\n");
|
_ = try out.write("PONG\r\n");
|
||||||
try out.flush();
|
try out.flush();
|
||||||
|
|||||||
@@ -162,7 +162,23 @@ fn handleConnection(
|
|||||||
try client.send(io, .pong);
|
try client.send(io, .pong);
|
||||||
},
|
},
|
||||||
.@"pub" => |pb| {
|
.@"pub" => |pb| {
|
||||||
try server.publishMessage(io, server_allocator, &client, pb);
|
var pub_task = io.async(publishMessage, .{ server, io, server_allocator, &client, pb });
|
||||||
|
defer pub_task.cancel(io) catch {};
|
||||||
|
|
||||||
|
var timeout_task = io.async(std.Io.sleep, .{ io, .fromMilliseconds(200), .real });
|
||||||
|
defer timeout_task.cancel(io) catch {};
|
||||||
|
|
||||||
|
switch (try io.select(.{
|
||||||
|
.publish = &pub_task,
|
||||||
|
.timeout = &timeout_task,
|
||||||
|
})) {
|
||||||
|
.publish => {
|
||||||
|
timeout_task.cancel(io) catch {};
|
||||||
|
},
|
||||||
|
.timeout => {
|
||||||
|
pub_task.cancel(io) catch {};
|
||||||
|
},
|
||||||
|
}
|
||||||
},
|
},
|
||||||
.sub => |sub| {
|
.sub => |sub| {
|
||||||
try server.subscribe(io, server_allocator, id, sub);
|
try server.subscribe(io, server_allocator, id, sub);
|
||||||
@@ -196,33 +212,35 @@ fn subjectMatches(expected: []const u8, actual: []const u8) bool {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn publishMessage(server: *Server, io: std.Io, alloc: std.mem.Allocator, source_client: *Client, msg: Message.Pub) !void {
|
fn publishMessage(server: *Server, io: std.Io, alloc: std.mem.Allocator, source_client: *Client, msg: Message.Pub) !void {
|
||||||
errdefer {
|
// errdefer {
|
||||||
if (source_client.connect) |c| {
|
// if (source_client.connect) |c| {
|
||||||
if (c.verbose) {
|
// if (c.verbose) {
|
||||||
source_client.send(io, .@"-err") catch {};
|
// source_client.send(io, .{ .@"-err" = "Slow Consumer" }) catch {};
|
||||||
|
// }
|
||||||
|
// }
|
||||||
|
// }
|
||||||
|
{
|
||||||
|
defer msg.deinit(alloc);
|
||||||
|
try server.subs_lock.lock(io);
|
||||||
|
defer server.subs_lock.unlock(io);
|
||||||
|
for (server.subscriptions.items) |subscription| {
|
||||||
|
if (subjectMatches(subscription.subject, msg.subject)) {
|
||||||
|
const client = server.clients.get(subscription.client_id) orelse {
|
||||||
|
std.debug.print("trying to publish to a client that no longer exists: {d}\n", .{subscription.client_id});
|
||||||
|
continue;
|
||||||
|
};
|
||||||
|
client.send(io, .{ .msg = .{
|
||||||
|
.subject = try alloc.dupe(u8, msg.subject),
|
||||||
|
.sid = try alloc.dupe(u8, subscription.sid),
|
||||||
|
.reply_to = if (msg.reply_to) |r| try alloc.dupe(u8, r) else null,
|
||||||
|
.payload = try alloc.dupe(u8, msg.payload),
|
||||||
|
} }) catch continue;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
defer msg.deinit(alloc);
|
|
||||||
try server.subs_lock.lock(io);
|
|
||||||
defer server.subs_lock.unlock(io);
|
|
||||||
for (server.subscriptions.items) |subscription| {
|
|
||||||
if (subjectMatches(subscription.subject, msg.subject)) {
|
|
||||||
const client = server.clients.get(subscription.client_id) orelse {
|
|
||||||
std.debug.print("trying to publish to a client that no longer exists: {d}\n", .{subscription.client_id});
|
|
||||||
continue;
|
|
||||||
};
|
|
||||||
client.send(io, .{ .msg = .{
|
|
||||||
.subject = try alloc.dupe(u8, msg.subject),
|
|
||||||
.sid = try alloc.dupe(u8, subscription.sid),
|
|
||||||
.reply_to = if (msg.reply_to) |r| try alloc.dupe(u8, r) else null,
|
|
||||||
.payload = try alloc.dupe(u8, msg.payload),
|
|
||||||
} }) catch continue;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (source_client.connect) |c| {
|
if (source_client.connect) |c| {
|
||||||
if (c.verbose) {
|
if (c.verbose) {
|
||||||
source_client.send(io, .@"+ok") catch {};
|
try source_client.send(io, .@"+ok");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -43,7 +43,7 @@ pub const Message = union(MessageType) {
|
|||||||
ping,
|
ping,
|
||||||
pong,
|
pong,
|
||||||
@"+ok": void,
|
@"+ok": void,
|
||||||
@"-err": void,
|
@"-err": []const u8,
|
||||||
pub const ServerInfo = struct {
|
pub const ServerInfo = struct {
|
||||||
/// The unique identifier of the NATS server.
|
/// The unique identifier of the NATS server.
|
||||||
server_id: []const u8,
|
server_id: []const u8,
|
||||||
|
|||||||
Reference in New Issue
Block a user