Restructuring parser

Adding tests fore everything
This commit is contained in:
2026-01-03 02:33:24 +00:00
parent f99b44fdb2
commit 9e32d014c2

View File

@@ -168,6 +168,10 @@ pub const Message = union(enum) {
sid: []const u8, sid: []const u8,
/// A number of messages to wait for before automatically unsubscribing. /// A number of messages to wait for before automatically unsubscribing.
max_msgs: ?usize = null, max_msgs: ?usize = null,
pub fn deinit(self: Unsub, alloc: std.mem.Allocator) void {
alloc.free(self.sid);
}
}; };
pub const Msg = struct { pub const Msg = struct {
subject: []const u8, subject: []const u8,
@@ -231,6 +235,8 @@ pub const Message = union(enum) {
break :blk .initBuffer(&buf); break :blk .initBuffer(&buf);
}; };
std.log.debug("buffered: '{s}'", .{in.buffered()});
while (in.peekByte()) |byte| { while (in.peekByte()) |byte| {
if (std.ascii.isUpper(byte)) { if (std.ascii.isUpper(byte)) {
try operation_string.appendBounded(byte); try operation_string.appendBounded(byte);
@@ -239,9 +245,12 @@ pub const Message = union(enum) {
} else |err| return err; } else |err| return err;
const operation = parse(operation_string.items) orelse { const operation = parse(operation_string.items) orelse {
std.log.err("Invalid operation: '{s}'", .{operation_string.items});
return error.InvalidOperation; return error.InvalidOperation;
}; };
errdefer std.log.err("Failed to parse {s}", .{operation_string.items});
switch (operation) { switch (operation) {
.connect => { .connect => {
// for storing the json string // for storing the json string
@@ -290,104 +299,239 @@ pub const Message = union(enum) {
return .pong; return .pong;
}, },
.sub => { .sub => {
if (!std.ascii.isWhitespace(try in.takeByte())) { return parseSub(alloc, in);
@branchHint(.unlikely);
return error.InvalidStream;
}
const subject = try readSubject(alloc, in, .sub);
errdefer alloc.free(subject);
const second = blk: {
// Drop whitespace
while (in.peekByte()) |byte| {
if (std.ascii.isWhitespace(byte)) {
in.toss(1);
} else break;
} else |err| return err;
var acc: std.ArrayList(u8) = .empty;
errdefer acc.deinit(alloc);
while (in.peekByte()) |byte| {
if (std.ascii.isWhitespace(byte)) break;
try acc.append(alloc, byte);
in.toss(1);
} else |err| return err;
break :blk try acc.toOwnedSlice(alloc);
};
errdefer alloc.free(second);
const queue_group = if ((try in.peekByte()) != '\r') second else null;
// We do not need an errdefer free for queue group, because it will only be second (already has errdefer free) or null.
const sid = if (queue_group) |_| try alloc.dupe(u8, try in.takeDelimiterExclusive('\r')) else second;
// if queue_group is null, that means sid is second, and already has an errdefer free.
errdefer if (queue_group) |_| alloc.free(sid);
try expectStreamBytes(in, "\r\n");
return .{
.sub = .{
.subject = subject,
.queue_group = queue_group,
.sid = sid,
},
};
}, },
.unsub => { .unsub => {
if (!std.ascii.isWhitespace(try in.takeByte())) { return parseUnsub(alloc, in);
@branchHint(.unlikely);
return error.InvalidStream;
}
// Parse sid
const sid = blk: {
var acc: std.ArrayList(u8) = .empty;
errdefer acc.deinit(alloc);
while (in.peekByte()) |byte| {
if (std.ascii.isWhitespace(byte)) break;
try acc.append(alloc, byte);
in.toss(1);
} else |err| return err;
break :blk try acc.toOwnedSlice(alloc);
};
errdefer alloc.free(sid);
if ((try in.peekByte()) == '\r') {
try expectStreamBytes(in, "\r\n");
return .{
.unsub = .{
.sid = sid,
},
};
} else if (std.ascii.isWhitespace(try in.peekByte())) {
in.toss(1);
const max_msgs = blk: {
var max_msgs_list: std.ArrayList(u8) = .empty;
errdefer max_msgs_list.deinit(alloc);
while (in.peekByte()) |byte| {
if (std.ascii.isWhitespace(byte)) {
try expectStreamBytes(in, "\r\n");
break;
}
if (std.ascii.isDigit(byte)) {
try max_msgs_list.append(alloc, byte);
} else {
return error.InvalidStream;
}
} else |err| return err;
break :blk try std.fmt.parseUnsigned(usize, max_msgs_list.items, 10);
};
return .{
.unsub = .{
.sid = sid,
.max_msgs = max_msgs,
},
};
} else return error.InvalidStream;
}, },
else => |msg| std.debug.panic("Not implemented: {}\n", .{msg}), else => |msg| std.debug.panic("Not implemented: {}\n", .{msg}),
} }
} }
}; };
fn parseSub(alloc: std.mem.Allocator, in: *std.Io.Reader) !Message {
try in.discardAll(1); // throw away space
const subject = try readSubject(alloc, in, .sub);
const States = enum {
before_second,
in_second,
after_second,
in_third,
in_end,
};
var second: std.ArrayList(u8) = .empty;
errdefer second.deinit(alloc);
var third: ?std.ArrayList(u8) = null;
errdefer if (third) |*t| t.deinit(alloc);
sw: switch (@as(States, .before_second)) {
.before_second => {
const byte = try in.peekByte();
if (std.ascii.isWhitespace(byte)) {
in.toss(1);
continue :sw .before_second;
}
continue :sw .in_second;
},
.in_second => {
const byte = try in.peekByte();
if (!std.ascii.isWhitespace(byte)) {
try second.append(alloc, byte);
in.toss(1);
continue :sw .in_second;
}
continue :sw .after_second;
},
.after_second => {
const byte = try in.peekByte();
if (byte == '\r') {
continue :sw .in_end;
} else if (std.ascii.isWhitespace(byte)) {
in.toss(1);
continue :sw .after_second;
}
third = .empty;
continue :sw .in_third;
},
.in_third => {
const byte = try in.peekByte();
if (byte == '\r') {
continue :sw .in_end;
}
try third.?.append(alloc, byte);
in.toss(1);
},
.in_end => {
try expectStreamBytes(in, "\r\n");
},
}
return .{
.sub = .{
.subject = subject,
.queue_group = if (third) |_| try second.toOwnedSlice(alloc) else null,
.sid = if (third) |*t| try t.toOwnedSlice(alloc) else try second.toOwnedSlice(alloc),
},
};
}
test parseSub {
{
var in: std.Io.Reader = .fixed(" foo 1\r\n");
var res = try parseSub(std.testing.allocator, &in);
defer res.sub.deinit(std.testing.allocator);
try std.testing.expectEqualDeep(
Message{
.sub = .{
.subject = "foo",
.queue_group = null,
.sid = "1",
},
},
res,
);
}
{
var in: std.Io.Reader = .fixed(" foo 1\r\n");
var res = try parseSub(std.testing.allocator, &in);
defer res.sub.deinit(std.testing.allocator);
try std.testing.expectEqualDeep(
Message{
.sub = .{
.subject = "foo",
.queue_group = null,
.sid = "1",
},
},
res,
);
}
{
var in: std.Io.Reader = .fixed(" foo q 1\r\n");
var res = try parseSub(std.testing.allocator, &in);
defer res.sub.deinit(std.testing.allocator);
try std.testing.expectEqualDeep(
Message{
.sub = .{
.subject = "foo",
.queue_group = "q",
.sid = "1",
},
},
res,
);
}
{
var in: std.Io.Reader = .fixed(" 1 q 1\r\n");
var res = try parseSub(std.testing.allocator, &in);
defer res.sub.deinit(std.testing.allocator);
try std.testing.expectEqualDeep(
Message{
.sub = .{
.subject = "1",
.queue_group = "q",
.sid = "1",
},
},
res,
);
}
{
var in: std.Io.Reader = .fixed(" $SRV.PING 4\r\n");
var res = try parseSub(std.testing.allocator, &in);
defer res.sub.deinit(std.testing.allocator);
try std.testing.expectEqualDeep(
Message{
.sub = .{
.subject = "$SRV.PING",
.queue_group = null,
.sid = "4",
},
},
res,
);
}
}
fn parseUnsub(alloc: std.mem.Allocator, in: *std.Io.Reader) !Message {
try in.discardAll(1); // throw away space
var first: std.ArrayList(u8) = .empty;
errdefer first.deinit(alloc);
while (in.peekByte()) |byte| {
if (std.ascii.isWhitespace(byte)) break;
try first.append(alloc, byte);
in.toss(1);
} else |err| return err;
while (in.peekByte()) |byte| {
if (!std.ascii.isWhitespace(byte) or byte == '\r') break;
in.toss(1);
} else |err| return err;
if (try in.peekByte() == '\r') {
try expectStreamBytes(in, "\r\n");
return .{
.unsub = .{
.sid = try first.toOwnedSlice(alloc),
},
};
} else {
var second: std.ArrayList(u8) = .empty;
defer second.deinit(alloc);
while (in.peekByte()) |byte| {
if (std.ascii.isWhitespace(byte)) break;
try second.append(alloc, byte);
in.toss(1);
} else |err| return err;
try expectStreamBytes(in, "\r\n");
return .{
.unsub = .{
.max_msgs = try std.fmt.parseUnsigned(usize, second.items, 10),
.sid = try first.toOwnedSlice(alloc),
},
};
}
}
test parseUnsub {
{
var in: std.Io.Reader = .fixed(" 1\r\n");
var res = try parseUnsub(std.testing.allocator, &in);
defer res.unsub.deinit(std.testing.allocator);
try std.testing.expectEqualDeep(
Message{
.unsub = .{
.sid = "1",
.max_msgs = null,
},
},
res,
);
try std.testing.expectEqual(0, in.buffered().len);
}
{
var in: std.Io.Reader = .fixed(" 1 1\r\n");
var res = try parseUnsub(std.testing.allocator, &in);
defer res.unsub.deinit(std.testing.allocator);
try std.testing.expectEqualDeep(
Message{
.unsub = .{
.sid = "1",
.max_msgs = 1,
},
},
res,
);
try std.testing.expectEqual(0, in.buffered().len);
}
}
fn parsePub(alloc: std.mem.Allocator, in: *std.Io.Reader) !Message { fn parsePub(alloc: std.mem.Allocator, in: *std.Io.Reader) !Message {
try in.discardAll(1); // throw away space try in.discardAll(1); // throw away space
@@ -483,13 +627,16 @@ test parsePub {
var res = try parsePub(std.testing.allocator, &in); var res = try parsePub(std.testing.allocator, &in);
defer res.@"pub".deinit(std.testing.allocator); defer res.@"pub".deinit(std.testing.allocator);
try std.testing.expectEqualDeep( try std.testing.expectEqualDeep(
Message{ .@"pub" = .{ Message{
.@"pub" = .{
.subject = "foo", .subject = "foo",
.reply_to = null, .reply_to = null,
.payload = "bar", .payload = "bar",
} }, },
},
res, res,
); );
try std.testing.expectEqual(0, in.buffered().len);
} }
{ {
@@ -497,13 +644,16 @@ test parsePub {
var res = try parsePub(std.testing.allocator, &in); var res = try parsePub(std.testing.allocator, &in);
defer res.@"pub".deinit(std.testing.allocator); defer res.@"pub".deinit(std.testing.allocator);
try std.testing.expectEqualDeep( try std.testing.expectEqualDeep(
Message{ .@"pub" = .{ Message{
.@"pub" = .{
.subject = "foo", .subject = "foo",
.reply_to = "reply.to", .reply_to = "reply.to",
.payload = "bar", .payload = "bar",
} }, },
},
res, res,
); );
try std.testing.expectEqual(0, in.buffered().len);
} }
// numeric reply subject // numeric reply subject
@@ -512,13 +662,16 @@ test parsePub {
var res = try parsePub(std.testing.allocator, &in); var res = try parsePub(std.testing.allocator, &in);
defer res.@"pub".deinit(std.testing.allocator); defer res.@"pub".deinit(std.testing.allocator);
try std.testing.expectEqualDeep( try std.testing.expectEqualDeep(
Message{ .@"pub" = .{ Message{
.@"pub" = .{
.subject = "foo", .subject = "foo",
.reply_to = "5", .reply_to = "5",
.payload = "bar", .payload = "bar",
} }, },
},
res, res,
); );
try std.testing.expectEqual(0, in.buffered().len);
} }
} }
@@ -658,6 +811,7 @@ test parseHPub {
}, },
res, res,
); );
try std.testing.expectEqual(0, in.buffered().len);
} }
{ {
@@ -677,6 +831,7 @@ test parseHPub {
}, },
res, res,
); );
try std.testing.expectEqual(0, in.buffered().len);
} }
{ {
@@ -696,6 +851,7 @@ test parseHPub {
}, },
res, res,
); );
try std.testing.expectEqual(0, in.buffered().len);
} }
} }