Add a warning log when producers are stalled

This commit is contained in:
2026-01-12 18:08:29 -05:00
parent 7bcc0c19aa
commit 9f690fe27a

View File

@@ -154,6 +154,8 @@ fn handleConnection(
) !void { ) !void {
defer stream.close(io); defer stream.close(io);
const clock: std.Io.Clock = .real;
var dba: std.heap.DebugAllocator(.{}) = .init; var dba: std.heap.DebugAllocator(.{}) = .init;
dba.backing_allocator = server_allocator; dba.backing_allocator = server_allocator;
defer _ = dba.deinit(); defer _ = dba.deinit();
@@ -208,20 +210,29 @@ fn handleConnection(
}, },
.PUB => { .PUB => {
@branchHint(.likely); @branchHint(.likely);
// log.debug("received a pub msg", .{}); const before = try clock.now(io);
server.publishMessage(io, rand, server_allocator, msg_write_buf, &client, .@"pub") catch |err| switch (err) { server.publishMessage(io, rand, server_allocator, msg_write_buf, &client, .@"pub") catch |err| switch (err) {
error.ReadFailed => return reader.err.?, error.ReadFailed => return reader.err.?,
error.EndOfStream => return error.ClientDisconnected, error.EndOfStream => return error.ClientDisconnected,
else => |e| return e, else => |e| return e,
}; };
const pub_ns: i64 = @intCast(before.durationTo(try clock.now(io)).toNanoseconds());
if (pub_ns > 5 * std.time.ns_per_ms) {
log.warn("Producer was stalled for a total of for {D}", .{pub_ns});
}
}, },
.HPUB => { .HPUB => {
@branchHint(.likely); @branchHint(.likely);
const before = try clock.now(io);
server.publishMessage(io, rand, server_allocator, msg_write_buf, &client, .hpub) catch |err| switch (err) { server.publishMessage(io, rand, server_allocator, msg_write_buf, &client, .hpub) catch |err| switch (err) {
error.ReadFailed => return reader.err.?, error.ReadFailed => return reader.err.?,
error.EndOfStream => return error.ClientDisconnected, error.EndOfStream => return error.ClientDisconnected,
else => |e| return e, else => |e| return e,
}; };
const pub_ns: i64 = @intCast(before.durationTo(try clock.now(io)).toNanoseconds());
if (pub_ns > 5 * std.time.ns_per_ms) {
log.warn("Producer was stalled for a total of for {D}", .{pub_ns});
}
}, },
.SUB => { .SUB => {
server.subscribe(io, server_allocator, &client, id) catch |err| switch (err) { server.subscribe(io, server_allocator, &client, id) catch |err| switch (err) {