mirror of
https://git.robbyzambito.me/zits
synced 2026-02-04 03:34:48 +00:00
starting minimizing the number of queue puts
This commit is contained in:
@@ -347,8 +347,8 @@ fn publishMessage(
|
||||
// in the chunks buf.
|
||||
// The reason for this strategy is to avoid any intermediary allocations between
|
||||
// the publishers read buffer, and the subscribers write buffer.
|
||||
var chunk_count: usize = 7;
|
||||
var msg_chunks_buf: [10][]const u8 = undefined;
|
||||
var chunk_count: usize = 6;
|
||||
var msg_chunks_buf: [9][]const u8 = undefined;
|
||||
var msg_chunks: ArrayList([]const u8) = .initBuffer(&msg_chunks_buf);
|
||||
|
||||
switch (pub_or_hpub) {
|
||||
@@ -364,20 +364,10 @@ fn publishMessage(
|
||||
msg_chunks.appendBounded(reply_to) catch unreachable;
|
||||
msg_chunks.appendBounded(" ") catch unreachable;
|
||||
}
|
||||
switch (pub_or_hpub) {
|
||||
.hpub => {
|
||||
chunk_count += 1;
|
||||
var hlen_buf: [std.fmt.count("{d} ", .{std.math.maxInt(usize)})]u8 = undefined;
|
||||
msg_chunks.appendBounded(
|
||||
std.fmt.bufPrint(&hlen_buf, "{d} ", .{hpubmsg.header_bytes}) catch unreachable,
|
||||
) catch unreachable;
|
||||
},
|
||||
else => {},
|
||||
if (pub_or_hpub == .hpub) {
|
||||
chunk_count += 1;
|
||||
msg_chunks.appendBounded(hpubmsg.header_bytes) catch unreachable;
|
||||
}
|
||||
var len_buf: [std.fmt.count("{d}\r\n", .{std.math.maxInt(usize)})]u8 = undefined;
|
||||
msg_chunks.appendBounded(
|
||||
std.fmt.bufPrint(&len_buf, "{d}\r\n", .{msg.payload.len - 2}) catch unreachable,
|
||||
) catch unreachable;
|
||||
msg_chunks.appendBounded(msg.payload) catch unreachable;
|
||||
|
||||
try subscription.send(io, msg_chunks.items[0..chunk_count]);
|
||||
|
||||
@@ -120,7 +120,7 @@ pub const Message = union(enum) {
|
||||
}
|
||||
};
|
||||
pub const HPub = struct {
|
||||
header_bytes: usize,
|
||||
header_bytes: []const u8,
|
||||
@"pub": Pub,
|
||||
|
||||
pub fn deinit(self: HPub, alloc: Allocator) void {
|
||||
@@ -136,7 +136,7 @@ pub const Message = union(enum) {
|
||||
};
|
||||
|
||||
pub const HMsg = struct {
|
||||
header_bytes: usize,
|
||||
header_bytes: []const u8,
|
||||
msg: Msg,
|
||||
|
||||
pub fn deinit(self: HMsg, alloc: Allocator) void {
|
||||
|
||||
@@ -145,21 +145,22 @@ pub fn @"pub"(in: *Reader) Error!Message.Pub {
|
||||
try in.fill(iter.index + bytes + "\r\n".len + "\r\n".len);
|
||||
continue;
|
||||
}
|
||||
in.toss(iter.index + "\r\n".len);
|
||||
in.toss(iter.index - second.len);
|
||||
return .{
|
||||
.subject = subject,
|
||||
.reply_to = null,
|
||||
.payload = in.take(bytes + 2) catch unreachable,
|
||||
.payload = in.take(bytes + 5) catch unreachable,
|
||||
};
|
||||
}
|
||||
|
||||
switch (in.buffered()[iter.index]) {
|
||||
'\t', ' ' => {
|
||||
const reply_to = second;
|
||||
const bytes = parseUnsigned(usize, iter.next() orelse {
|
||||
const bytes_str = iter.next() orelse {
|
||||
try in.fillMore();
|
||||
continue;
|
||||
}, 10) catch return error.InvalidStream;
|
||||
};
|
||||
const bytes = parseUnsigned(usize, bytes_str, 10) catch return error.InvalidStream;
|
||||
|
||||
if (in.buffered().len == iter.index or in.buffered()[iter.index] != '\r') {
|
||||
try in.fillMore();
|
||||
@@ -171,11 +172,11 @@ pub fn @"pub"(in: *Reader) Error!Message.Pub {
|
||||
continue;
|
||||
}
|
||||
|
||||
in.toss(iter.index + "\r\n".len);
|
||||
in.toss(iter.index - bytes_str.len);
|
||||
return .{
|
||||
.subject = subject,
|
||||
.reply_to = reply_to,
|
||||
.payload = in.take(bytes + 2) catch unreachable,
|
||||
.payload = in.take(bytes + 5) catch unreachable,
|
||||
};
|
||||
},
|
||||
else => {},
|
||||
@@ -195,7 +196,7 @@ test @"pub" {
|
||||
Message.Pub{
|
||||
.subject = "foo",
|
||||
.reply_to = "bar",
|
||||
.payload = "hi\r\n",
|
||||
.payload = "2\r\nhi\r\n",
|
||||
},
|
||||
try @"pub"(&in.interface),
|
||||
);
|
||||
@@ -210,7 +211,7 @@ test @"pub" {
|
||||
Message.Pub{
|
||||
.subject = "foo",
|
||||
.reply_to = null,
|
||||
.payload = "hi\r\n",
|
||||
.payload = "2\r\nhi\r\n",
|
||||
},
|
||||
try @"pub"(&in.interface),
|
||||
);
|
||||
@@ -226,7 +227,7 @@ test @"pub" {
|
||||
Message.Pub{
|
||||
.subject = "foo",
|
||||
.reply_to = null,
|
||||
.payload = "hi\r\n",
|
||||
.payload = "2\r\nhi\r\n",
|
||||
},
|
||||
try @"pub"(&in.interface),
|
||||
);
|
||||
@@ -243,7 +244,7 @@ test @"pub" {
|
||||
Message.Pub{
|
||||
.subject = "foo",
|
||||
.reply_to = null,
|
||||
.payload = "hi\r\n",
|
||||
.payload = "2\r\nhi\r\n",
|
||||
},
|
||||
try @"pub"(&in.interface),
|
||||
);
|
||||
@@ -261,7 +262,7 @@ test @"pub" {
|
||||
Message.Pub{
|
||||
.subject = "foo",
|
||||
.reply_to = null,
|
||||
.payload = "hi\r\n",
|
||||
.payload = "2\r\nhi\r\n",
|
||||
},
|
||||
try @"pub"(&in.interface),
|
||||
);
|
||||
@@ -540,7 +541,7 @@ pub fn hpub(in: *Reader) Error!Message.HPub {
|
||||
const header_bytes_str = second;
|
||||
const total_bytes_str = third;
|
||||
|
||||
const header_bytes = parseUnsigned(usize, header_bytes_str, 10) catch return error.InvalidStream;
|
||||
_ = parseUnsigned(usize, header_bytes_str, 10) catch return error.InvalidStream;
|
||||
const total_bytes = parseUnsigned(usize, total_bytes_str, 10) catch return error.InvalidStream;
|
||||
|
||||
if (in.buffered().len < iter.index + total_bytes + 4) {
|
||||
@@ -548,14 +549,13 @@ pub fn hpub(in: *Reader) Error!Message.HPub {
|
||||
continue;
|
||||
}
|
||||
|
||||
// 4 bytes for CRLF on either side of headers and payload.
|
||||
in.toss(iter.index + 2);
|
||||
in.toss(iter.index - total_bytes_str.len);
|
||||
return .{
|
||||
.header_bytes = header_bytes,
|
||||
.header_bytes = header_bytes_str.ptr[0 .. header_bytes_str.len + 1],
|
||||
.@"pub" = .{
|
||||
.subject = subject,
|
||||
.reply_to = null,
|
||||
.payload = in.take(total_bytes + 2) catch unreachable,
|
||||
.payload = in.take(total_bytes + total_bytes_str.len + 4) catch unreachable,
|
||||
},
|
||||
};
|
||||
}
|
||||
@@ -565,7 +565,7 @@ pub fn hpub(in: *Reader) Error!Message.HPub {
|
||||
const header_bytes_str = third;
|
||||
if (iter.next()) |total_bytes_str| {
|
||||
if (in.buffered().len > iter.index and in.buffered()[iter.index] == '\r') {
|
||||
const header_bytes = parseUnsigned(usize, header_bytes_str, 10) catch return error.InvalidStream;
|
||||
_ = parseUnsigned(usize, header_bytes_str, 10) catch return error.InvalidStream;
|
||||
const total_bytes = parseUnsigned(usize, total_bytes_str, 10) catch return error.InvalidStream;
|
||||
|
||||
if (in.buffered().len < iter.index + total_bytes + 4) {
|
||||
@@ -573,14 +573,13 @@ pub fn hpub(in: *Reader) Error!Message.HPub {
|
||||
continue;
|
||||
}
|
||||
|
||||
// 4 bytes for CRLF on either side of headers and payload.
|
||||
in.toss(iter.index + 2);
|
||||
in.toss(iter.index - total_bytes_str.len);
|
||||
return .{
|
||||
.header_bytes = header_bytes,
|
||||
.header_bytes = header_bytes_str.ptr[0 .. header_bytes_str.len + 1],
|
||||
.@"pub" = .{
|
||||
.subject = subject,
|
||||
.reply_to = reply_to,
|
||||
.payload = in.take(total_bytes + 2) catch unreachable,
|
||||
.payload = in.take(total_bytes + total_bytes_str.len + 4) catch unreachable,
|
||||
},
|
||||
};
|
||||
}
|
||||
@@ -599,16 +598,17 @@ test hpub {
|
||||
var in: std.testing.Reader = .init(&buf, &.{
|
||||
.{ .buffer = " foo 22 33\r\nNATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!\r\n" },
|
||||
});
|
||||
const res = try hpub(&in.interface);
|
||||
try std.testing.expectEqualDeep(
|
||||
Message.HPub{
|
||||
.header_bytes = 22,
|
||||
.header_bytes = "22 ",
|
||||
.@"pub" = .{
|
||||
.subject = "foo",
|
||||
.reply_to = null,
|
||||
.payload = "NATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!\r\n",
|
||||
.payload = "33\r\nNATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!\r\n",
|
||||
},
|
||||
},
|
||||
try hpub(&in.interface),
|
||||
res,
|
||||
);
|
||||
try std.testing.expectEqualSlices(u8, "", in.interface.buffered());
|
||||
}
|
||||
@@ -619,11 +619,11 @@ test hpub {
|
||||
});
|
||||
try std.testing.expectEqualDeep(
|
||||
Message.HPub{
|
||||
.header_bytes = 22,
|
||||
.header_bytes = "22 ",
|
||||
.@"pub" = .{
|
||||
.subject = "foo",
|
||||
.reply_to = "reply",
|
||||
.payload = "NATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!\r\n",
|
||||
.payload = "33\r\nNATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!\r\n",
|
||||
},
|
||||
},
|
||||
try hpub(&in.interface),
|
||||
|
||||
Reference in New Issue
Block a user