support hpub

fixed issue where not all data was being sent
request reply has a performance issue but technically works
This commit is contained in:
2026-01-08 11:48:12 -05:00
parent 45feccbad8
commit d8488fde49
3 changed files with 186 additions and 859 deletions

View File

@@ -281,9 +281,15 @@ fn publishMessage(
}
};
_ = pub_or_hpub;
const hpubmsg = switch (pub_or_hpub) {
.@"pub" => {},
.hpub => try parse.hpub(source_client.from_client),
};
const msg = try parse.@"pub"(source_client.from_client);
const msg: Message.Pub = switch (pub_or_hpub) {
.@"pub" => try parse.@"pub"(source_client.from_client),
.hpub => hpubmsg.@"pub",
};
// const subject = switch (pub_or_hpub) {
// .PUB => |pb| pb.subject,
@@ -297,6 +303,10 @@ fn publishMessage(
var published_queue_sub_idxs: ArrayList(usize) = .empty;
defer published_queue_sub_idxs.deinit(alloc);
var line_writer_allocating: std.Io.Writer.Allocating = .init(alloc);
defer line_writer_allocating.deinit();
var line_writer = &line_writer_allocating.writer;
subs: for (0..server.subscriptions.items.len) |i| {
const subscription = server.subscriptions.items[i];
if (subjectMatches(subscription.subject, msg.subject)) {
@@ -313,45 +323,29 @@ fn publishMessage(
try published_queue_sub_idxs.append(alloc, i);
}
const m = msg.toMsg(subscription.sid);
var msg_line_buf: [1024]u8 = undefined;
var msg_line_writer: std.Io.Writer = .fixed(&msg_line_buf);
line_writer_allocating.clearRetainingCapacity();
// try self.to_client.print(
// ,
// );
// try m.payload.write(self.to_client);
// try self.to_client.print("\r\n", .{});
try msg_line_writer.print(
"MSG {s} {s} {s} {d}\r\n",
.{
m.subject,
m.sid,
m.reply_to orelse "",
m.payload.len,
switch (pub_or_hpub) {
.@"pub" => _ = try line_writer.write("MSG "),
.hpub => _ = try line_writer.write("HMSG "),
}
try line_writer.print("{s} {s} ", .{ msg.subject, subscription.sid });
if (msg.reply_to) |reply_to| {
try line_writer.print("{s} ", .{reply_to});
}
switch (pub_or_hpub) {
.hpub => {
try line_writer.print("{d} ", .{hpubmsg.header_bytes});
},
);
else => {},
}
try line_writer.print("{d}\r\n", .{msg.payload.len});
try subscription.queue_lock.lock(io);
defer subscription.queue_lock.unlock(io);
try subscription.queue.putAll(io, msg_line_writer.buffered());
try subscription.queue.putAll(io, m.payload);
try subscription.queue.putAll(io, line_writer.buffered());
try subscription.queue.putAll(io, msg.payload);
try subscription.queue.putAll(io, "\r\n");
// switch (msg) {
// .PUB => |pb| {
// try subscription.queue.putOne(io, .{
// .MSG = try pb.toMsg(subscription.alloc, subscription.sid),
// });
// },
// .HPUB => |hp| {
// try subscription.queue.putOne(io, .{
// .HMSG = try hp.toHMsg(subscription.alloc, subscription.sid),
// });
// },
// else => unreachable,
// }
}
}