parsing cleanup

This commit is contained in:
2026-01-09 15:43:40 -05:00
parent f4b545f852
commit 0ebc39b5e8
2 changed files with 64 additions and 62 deletions

View File

@@ -195,7 +195,7 @@ fn handleConnection(
// Respond to ping with pong. // Respond to ping with pong.
try client.recv_queue_write_lock.lock(io); try client.recv_queue_write_lock.lock(io);
defer client.recv_queue_write_lock.unlock(io); defer client.recv_queue_write_lock.unlock(io);
_ = try client.from_client.take(2); _ = try client.from_client.take(2); // throw out \r\n
try client.recv_queue.putAll(io, "PONG\r\n"); try client.recv_queue.putAll(io, "PONG\r\n");
}, },
.PUB => { .PUB => {
@@ -355,13 +355,12 @@ fn publishMessage(
}, },
else => {}, else => {},
} }
try line_writer.print("{d}\r\n", .{msg.payload.len}); try line_writer.print("{d}\r\n", .{msg.payload.len - 2});
try subscription.queue_lock.lock(io); try subscription.queue_lock.lock(io);
defer subscription.queue_lock.unlock(io); defer subscription.queue_lock.unlock(io);
try subscription.queue.putAll(io, line_writer.buffered()); try subscription.queue.putAll(io, line_writer.buffered());
try subscription.queue.putAll(io, msg.payload); try subscription.queue.putAll(io, msg.payload);
try subscription.queue.putAll(io, "\r\n");
} }
} }
@@ -426,7 +425,7 @@ fn unsubscribe(
/// Try to match the kernel socket buffers to maximize /// Try to match the kernel socket buffers to maximize
/// the amount of data we push through each syscall. /// the amount of data we push through each syscall.
fn getBufferSizes(io: Io) @Tuple(&.{ usize, usize }) { fn getBufferSizes(io: Io) @Tuple(&.{ usize, usize }) {
const default_size = 4 * 1024; const default_size = 8 * 1024 * 1024;
const default = .{ default_size, default_size }; const default = .{ default_size, default_size };
const dir = Dir.openDirAbsolute(io, "/proc/sys/net/core", .{}) catch { const dir = Dir.openDirAbsolute(io, "/proc/sys/net/core", .{}) catch {

View File

@@ -126,54 +126,59 @@ pub fn @"pub"(in: *Reader) Error!Message.Pub {
// See: https://docs.nats.io/reference/reference-protocols/nats-protocol#syntax-1 // See: https://docs.nats.io/reference/reference-protocols/nats-protocol#syntax-1
while (true) { while (true) {
var iter = std.mem.tokenizeAny(u8, in.buffered(), " \t\r"); var iter = std.mem.tokenizeAny(u8, in.buffered(), " \t\r\n");
const subject = iter.next() orelse {
if (iter.next()) |subject| { try in.fillMore();
if (iter.next()) |second| { continue;
if (in.buffered().len > iter.index) { };
if (in.buffered()[iter.index] == '\r') { const second = iter.next() orelse {
const bytes_str = second; try in.fillMore();
const bytes = parseUnsigned(usize, bytes_str, 10) catch return error.InvalidStream; continue;
};
if (in.buffered().len < iter.index + bytes + 4) { if (in.buffered().len == iter.index) {
try in.fill(iter.index + bytes + 4); try in.fillMore();
// Fill may shift buffer, so we have to retokenize it. continue;
continue; }
} if (in.buffered()[iter.index] == '\r') {
const bytes = parseUnsigned(usize, second, 10) catch return error.InvalidStream;
// 4 bytes for CRLF on either side of the payload. if (in.buffered().len < iter.index + bytes + "\r\n".len + "\r\n".len) {
in.toss(iter.index + 2); try in.fill(iter.index + bytes + "\r\n".len + "\r\n".len);
defer in.toss(2); continue;
return .{
.subject = subject,
.reply_to = null,
.payload = in.take(bytes) catch unreachable,
};
}
const reply_to = second;
if (iter.next()) |bytes_str| {
const bytes = parseUnsigned(usize, bytes_str, 10) catch return error.InvalidStream;
if (in.buffered().len > iter.index and in.buffered()[iter.index] == '\r') {
if (in.buffered().len < iter.index + bytes + 4) {
try in.fill(iter.index + bytes + 4);
// Fill may shift buffer, so we have to retokenize it.
continue;
}
// 4 bytes for CRLF on either side of the payload.
in.toss(iter.index + 2);
defer in.toss(2);
return .{
.subject = subject,
.reply_to = reply_to,
.payload = in.take(bytes) catch unreachable,
};
}
}
}
} }
in.toss(iter.index + "\r\n".len);
return .{
.subject = subject,
.reply_to = null,
.payload = in.take(bytes + 2) catch unreachable,
};
}
switch (in.buffered()[iter.index]) {
'\t', ' ' => {
const reply_to = second;
const bytes = parseUnsigned(usize, iter.next() orelse {
try in.fillMore();
continue;
}, 10) catch return error.InvalidStream;
if (in.buffered().len == iter.index or in.buffered()[iter.index] != '\r') {
try in.fillMore();
continue;
}
if (in.buffered().len < iter.index + bytes + "\r\n".len + "\r\n".len) {
try in.fillMore();
continue;
}
in.toss(iter.index + "\r\n".len);
return .{
.subject = subject,
.reply_to = reply_to,
.payload = in.take(bytes + 2) catch unreachable,
};
},
else => {},
} }
try in.fillMore(); try in.fillMore();
@@ -190,7 +195,7 @@ test @"pub" {
Message.Pub{ Message.Pub{
.subject = "foo", .subject = "foo",
.reply_to = "bar", .reply_to = "bar",
.payload = "hi", .payload = "hi\r\n",
}, },
try @"pub"(&in.interface), try @"pub"(&in.interface),
); );
@@ -205,7 +210,7 @@ test @"pub" {
Message.Pub{ Message.Pub{
.subject = "foo", .subject = "foo",
.reply_to = null, .reply_to = null,
.payload = "hi", .payload = "hi\r\n",
}, },
try @"pub"(&in.interface), try @"pub"(&in.interface),
); );
@@ -221,7 +226,7 @@ test @"pub" {
Message.Pub{ Message.Pub{
.subject = "foo", .subject = "foo",
.reply_to = null, .reply_to = null,
.payload = "hi", .payload = "hi\r\n",
}, },
try @"pub"(&in.interface), try @"pub"(&in.interface),
); );
@@ -238,7 +243,7 @@ test @"pub" {
Message.Pub{ Message.Pub{
.subject = "foo", .subject = "foo",
.reply_to = null, .reply_to = null,
.payload = "hi", .payload = "hi\r\n",
}, },
try @"pub"(&in.interface), try @"pub"(&in.interface),
); );
@@ -256,7 +261,7 @@ test @"pub" {
Message.Pub{ Message.Pub{
.subject = "foo", .subject = "foo",
.reply_to = null, .reply_to = null,
.payload = "hi", .payload = "hi\r\n",
}, },
try @"pub"(&in.interface), try @"pub"(&in.interface),
); );
@@ -545,13 +550,12 @@ pub fn hpub(in: *Reader) Error!Message.HPub {
// 4 bytes for CRLF on either side of headers and payload. // 4 bytes for CRLF on either side of headers and payload.
in.toss(iter.index + 2); in.toss(iter.index + 2);
defer in.toss(2);
return .{ return .{
.header_bytes = header_bytes, .header_bytes = header_bytes,
.@"pub" = .{ .@"pub" = .{
.subject = subject, .subject = subject,
.reply_to = null, .reply_to = null,
.payload = in.take(total_bytes) catch unreachable, .payload = in.take(total_bytes + 2) catch unreachable,
}, },
}; };
} }
@@ -571,13 +575,12 @@ pub fn hpub(in: *Reader) Error!Message.HPub {
// 4 bytes for CRLF on either side of headers and payload. // 4 bytes for CRLF on either side of headers and payload.
in.toss(iter.index + 2); in.toss(iter.index + 2);
defer in.toss(2);
return .{ return .{
.header_bytes = header_bytes, .header_bytes = header_bytes,
.@"pub" = .{ .@"pub" = .{
.subject = subject, .subject = subject,
.reply_to = reply_to, .reply_to = reply_to,
.payload = in.take(total_bytes) catch unreachable, .payload = in.take(total_bytes + 2) catch unreachable,
}, },
}; };
} }
@@ -602,7 +605,7 @@ test hpub {
.@"pub" = .{ .@"pub" = .{
.subject = "foo", .subject = "foo",
.reply_to = null, .reply_to = null,
.payload = "NATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!", .payload = "NATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!\r\n",
}, },
}, },
try hpub(&in.interface), try hpub(&in.interface),
@@ -620,7 +623,7 @@ test hpub {
.@"pub" = .{ .@"pub" = .{
.subject = "foo", .subject = "foo",
.reply_to = "reply", .reply_to = "reply",
.payload = "NATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!", .payload = "NATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!\r\n",
}, },
}, },
try hpub(&in.interface), try hpub(&in.interface),