Parsing cleanup

This commit is contained in:
2025-12-29 04:48:05 +00:00
parent 4bf5ddca15
commit b9d0672971

View File

@@ -170,7 +170,7 @@ pub const Message = union(MessageType) {
while (in.peekByte()) |byte| { while (in.peekByte()) |byte| {
if (std.ascii.isUpper(byte)) { if (std.ascii.isUpper(byte)) {
try operation_string.appendBounded(byte); try operation_string.appendBounded(byte);
try in.discardAll(1); in.toss(1);
} else break; } else break;
} else |err| return err; } else |err| return err;
@@ -190,7 +190,7 @@ pub const Message = union(MessageType) {
// Should read the next JSON object to the fixed buffer writer. // Should read the next JSON object to the fixed buffer writer.
_ = try in.streamDelimiter(&connect_string_writer, '}'); _ = try in.streamDelimiter(&connect_string_writer, '}');
try connect_string_writer.writeByte('}'); try connect_string_writer.writeByte('}');
std.debug.assert(std.mem.eql(u8, try in.take(3), "}\r\n")); // discard '}\r\n' try assertStreamBytes(in, "}\r\n"); // discard '}\r\n'
// TODO: should be CONNECTION allocator // TODO: should be CONNECTION allocator
const res = try std.json.parseFromSliceLeaky(Connect, connect_allocator, connect_string_writer.buffered(), .{ .allocate = .alloc_always }); const res = try std.json.parseFromSliceLeaky(Connect, connect_allocator, connect_string_writer.buffered(), .{ .allocate = .alloc_always });
@@ -206,12 +206,12 @@ pub const Message = union(MessageType) {
// Parse byte count // Parse byte count
const byte_count = blk: { const byte_count = blk: {
var byte_count_list: std.ArrayList(u8) = try .initCapacity(alloc, 64); var byte_count_list: std.ArrayList(u8) = try .initCapacity(alloc, 64);
while (in.takeByte()) |byte| { while (in.peekByte()) |byte| {
if (std.ascii.isWhitespace(byte)) { if (std.ascii.isWhitespace(byte)) {
std.debug.assert(byte == '\r'); try assertStreamBytes(in, "\r\n");
std.debug.assert(try in.takeByte() == '\n');
break; break;
} }
defer in.toss(1);
if (std.ascii.isDigit(byte)) { if (std.ascii.isDigit(byte)) {
try byte_count_list.append(alloc, byte); try byte_count_list.append(alloc, byte);
@@ -226,7 +226,7 @@ pub const Message = union(MessageType) {
const payload = blk: { const payload = blk: {
const bytes = try alloc.alloc(u8, byte_count); const bytes = try alloc.alloc(u8, byte_count);
try in.readSliceAll(bytes); try in.readSliceAll(bytes);
try assertStreamBytes(std.mem.eql(u8, try in.take(2), "\r\n")); try assertStreamBytes(in, "\r\n");
break :blk bytes; break :blk bytes;
}; };
@@ -238,15 +238,18 @@ pub const Message = union(MessageType) {
}; };
}, },
.ping => { .ping => {
try assertStreamBytes(std.mem.eql(u8, try in.take(2), "\r\n")); try assertStreamBytes(in, "\r\n");
return .ping; return .ping;
}, },
.pong => { .pong => {
try assertStreamBytes(std.mem.eql(u8, try in.take(2), "\r\n")); try assertStreamBytes(in, "\r\n");
return .pong; return .pong;
}, },
.sub => { .sub => {
try assertStreamBytes(std.ascii.isWhitespace(try in.takeByte())); if (!std.ascii.isWhitespace(try in.takeByte())) {
@branchHint(.unlikely);
return error.InvalidStream;
}
const subject = try readSubject(alloc, in); const subject = try readSubject(alloc, in);
const second = blk: { const second = blk: {
// Drop whitespace // Drop whitespace
@@ -266,8 +269,7 @@ pub const Message = union(MessageType) {
}; };
const queue_group = if ((try in.peekByte()) != '\r') second else null; const queue_group = if ((try in.peekByte()) != '\r') second else null;
const sid = if (queue_group) |_| try in.takeDelimiterExclusive('\r') else second; const sid = if (queue_group) |_| try in.takeDelimiterExclusive('\r') else second;
std.debug.print("SID is '{s}'\n", .{sid}); try assertStreamBytes(in, "\r\n");
std.debug.assert(std.mem.eql(u8, try in.take(2), "\r\n"));
return .{ return .{
.sub = .{ .sub = .{
.subject = subject, .subject = subject,
@@ -277,7 +279,10 @@ pub const Message = union(MessageType) {
}; };
}, },
.unsub => { .unsub => {
try assertStreamBytes(std.ascii.isWhitespace(try in.takeByte())); if (!std.ascii.isWhitespace(try in.takeByte())) {
@branchHint(.unlikely);
return error.InvalidStream;
}
// Parse byte count // Parse byte count
const sid = blk: { const sid = blk: {
var acc: std.ArrayList(u8) = try .initCapacity(alloc, 8); var acc: std.ArrayList(u8) = try .initCapacity(alloc, 8);
@@ -290,7 +295,7 @@ pub const Message = union(MessageType) {
}; };
if ((try in.peekByte()) == '\r') { if ((try in.peekByte()) == '\r') {
try assertStreamBytes(std.mem.eql(u8, try in.take(2), "\r\n")); try assertStreamBytes(in, "\r\n");
return .{ return .{
.unsub = .{ .unsub = .{
.sid = sid, .sid = sid,
@@ -300,10 +305,9 @@ pub const Message = union(MessageType) {
in.toss(1); in.toss(1);
const max_msgs = blk: { const max_msgs = blk: {
var max_msgs_list: std.ArrayList(u8) = try .initCapacity(alloc, 64); var max_msgs_list: std.ArrayList(u8) = try .initCapacity(alloc, 64);
while (in.takeByte()) |byte| { while (in.peekByte()) |byte| {
if (std.ascii.isWhitespace(byte)) { if (std.ascii.isWhitespace(byte)) {
std.debug.assert(byte == '\r'); try assertStreamBytes(in, "\r\n");
std.debug.assert(try in.takeByte() == '\n');
break; break;
} }
@@ -337,9 +341,8 @@ fn readSubject(alloc: std.mem.Allocator, in: *std.Io.Reader) ![]const u8 {
// Handle the first character // Handle the first character
{ {
const byte = try in.takeByte(); const byte = try in.takeByte();
std.debug.assert(!std.ascii.isWhitespace(byte)); if (std.ascii.isWhitespace(byte) or byte == '.')
if (byte == '.') return error.InvalidStream;
return error.InvalidSubject;
try subject_list.append(alloc, byte); try subject_list.append(alloc, byte);
} }
@@ -386,8 +389,8 @@ fn parsePub(in: *std.Io.Reader) !Message.Pub {
}; };
} }
inline fn assertStreamBytes(cond: bool) !void { inline fn assertStreamBytes(reader: *std.Io.Reader, expected: []const u8) !void {
if (!cond) { if (!std.mem.eql(u8, try reader.take(expected.len), expected)) {
@branchHint(.unlikely); @branchHint(.unlikely);
return error.InvalidStream; return error.InvalidStream;
} }