mirror of
https://git.robbyzambito.me/zits
synced 2026-02-04 11:44:48 +00:00
Use Subscription.send
This commit is contained in:
@@ -205,7 +205,6 @@ fn handleConnection(
|
|||||||
@branchHint(.likely);
|
@branchHint(.likely);
|
||||||
// log.debug("received a pub msg", .{});
|
// log.debug("received a pub msg", .{});
|
||||||
server.publishMessage(io, rand, server_allocator, &client, .@"pub") catch |err| switch (err) {
|
server.publishMessage(io, rand, server_allocator, &client, .@"pub") catch |err| switch (err) {
|
||||||
error.WriteFailed => return writer.err.?,
|
|
||||||
error.ReadFailed => return reader.err.?,
|
error.ReadFailed => return reader.err.?,
|
||||||
error.EndOfStream => return error.ClientDisconnected,
|
error.EndOfStream => return error.ClientDisconnected,
|
||||||
else => |e| return e,
|
else => |e| return e,
|
||||||
@@ -214,7 +213,6 @@ fn handleConnection(
|
|||||||
.HPUB => {
|
.HPUB => {
|
||||||
@branchHint(.likely);
|
@branchHint(.likely);
|
||||||
server.publishMessage(io, rand, server_allocator, &client, .hpub) catch |err| switch (err) {
|
server.publishMessage(io, rand, server_allocator, &client, .hpub) catch |err| switch (err) {
|
||||||
error.WriteFailed => return writer.err.?,
|
|
||||||
error.ReadFailed => return reader.err.?,
|
error.ReadFailed => return reader.err.?,
|
||||||
error.EndOfStream => return error.ClientDisconnected,
|
error.EndOfStream => return error.ClientDisconnected,
|
||||||
else => |e| return e,
|
else => |e| return e,
|
||||||
@@ -325,10 +323,6 @@ fn publishMessage(
|
|||||||
var published_queue_groups: ArrayList([]const u8) = .empty;
|
var published_queue_groups: ArrayList([]const u8) = .empty;
|
||||||
defer published_queue_groups.deinit(alloc);
|
defer published_queue_groups.deinit(alloc);
|
||||||
|
|
||||||
var line_writer_allocating: std.Io.Writer.Allocating = .init(alloc);
|
|
||||||
defer line_writer_allocating.deinit();
|
|
||||||
var line_writer = &line_writer_allocating.writer;
|
|
||||||
|
|
||||||
subs: for (0..server.subscriptions.items.len) |i| {
|
subs: for (0..server.subscriptions.items.len) |i| {
|
||||||
var subscription = server.subscriptions.items[i];
|
var subscription = server.subscriptions.items[i];
|
||||||
if (subjectMatches(subscription.subject, msg.subject)) {
|
if (subjectMatches(subscription.subject, msg.subject)) {
|
||||||
@@ -342,25 +336,51 @@ fn publishMessage(
|
|||||||
try published_queue_groups.append(alloc, sg);
|
try published_queue_groups.append(alloc, sg);
|
||||||
}
|
}
|
||||||
|
|
||||||
line_writer_allocating.clearRetainingCapacity();
|
// The rest of this loop is setting up a slice of byte slices to simultaneously
|
||||||
|
// send to the underlying queue.
|
||||||
|
// Each "chunk" is a section of the message to be sent.
|
||||||
|
// The chunk_count starts off at the minimum number of chunks per message, and
|
||||||
|
// then increases as branches add additional chunks.
|
||||||
|
// The msg_chunks_buf.len is the maximum number of chunks in a message.
|
||||||
|
// Each of the appendBounded calls has their error marked as unreachable,
|
||||||
|
// because it is an error for there to be more appendBounded calls than chunks
|
||||||
|
// in the chunks buf.
|
||||||
|
// The reason for this strategy is to avoid any intermediary allocations between
|
||||||
|
// the publishers read buffer, and the subscribers write buffer.
|
||||||
|
var chunk_count: usize = 7;
|
||||||
|
var msg_chunks_buf: [10][]const u8 = undefined;
|
||||||
|
var msg_chunks: ArrayList([]const u8) = .initBuffer(&msg_chunks_buf);
|
||||||
|
|
||||||
switch (pub_or_hpub) {
|
switch (pub_or_hpub) {
|
||||||
.@"pub" => _ = try line_writer.write("MSG "),
|
.@"pub" => _ = msg_chunks.appendBounded("MSG ") catch unreachable,
|
||||||
.hpub => _ = try line_writer.write("HMSG "),
|
.hpub => _ = msg_chunks.appendBounded("HMSG ") catch unreachable,
|
||||||
}
|
}
|
||||||
try line_writer.print("{s} {s} ", .{ msg.subject, subscription.sid });
|
msg_chunks.appendBounded(msg.subject) catch unreachable;
|
||||||
|
msg_chunks.appendBounded(" ") catch unreachable;
|
||||||
|
msg_chunks.appendBounded(subscription.sid) catch unreachable;
|
||||||
|
msg_chunks.appendBounded(" ") catch unreachable;
|
||||||
if (msg.reply_to) |reply_to| {
|
if (msg.reply_to) |reply_to| {
|
||||||
try line_writer.print("{s} ", .{reply_to});
|
chunk_count += 2;
|
||||||
|
msg_chunks.appendBounded(reply_to) catch unreachable;
|
||||||
|
msg_chunks.appendBounded(" ") catch unreachable;
|
||||||
}
|
}
|
||||||
switch (pub_or_hpub) {
|
switch (pub_or_hpub) {
|
||||||
.hpub => {
|
.hpub => {
|
||||||
try line_writer.print("{d} ", .{hpubmsg.header_bytes});
|
chunk_count += 1;
|
||||||
|
var hlen_buf: [std.fmt.count("{d} ", .{std.math.maxInt(usize)})]u8 = undefined;
|
||||||
|
msg_chunks.appendBounded(
|
||||||
|
std.fmt.bufPrint(&hlen_buf, "{d} ", .{hpubmsg.header_bytes}) catch unreachable,
|
||||||
|
) catch unreachable;
|
||||||
},
|
},
|
||||||
else => {},
|
else => {},
|
||||||
}
|
}
|
||||||
try line_writer.print("{d}\r\n", .{msg.payload.len - 2});
|
var len_buf: [std.fmt.count("{d}\r\n", .{std.math.maxInt(usize)})]u8 = undefined;
|
||||||
|
msg_chunks.appendBounded(
|
||||||
|
std.fmt.bufPrint(&len_buf, "{d}\r\n", .{msg.payload.len - 2}) catch unreachable,
|
||||||
|
) catch unreachable;
|
||||||
|
msg_chunks.appendBounded(msg.payload) catch unreachable;
|
||||||
|
|
||||||
try subscription.send(io, &.{ line_writer.buffered(), msg.payload });
|
try subscription.send(io, msg_chunks.items[0..chunk_count]);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user