Rework pub and hpub parse

support hpub in general, and properly support reply subjects
This commit is contained in:
2026-01-02 19:15:48 +00:00
parent cd5281030e
commit c38d13e911

View File

@@ -415,64 +415,82 @@ fn parsePub(alloc: std.mem.Allocator, in: *std.Io.Reader) !Message {
const subject: []const u8 = try readSubject(alloc, in); const subject: []const u8 = try readSubject(alloc, in);
errdefer alloc.free(subject); errdefer alloc.free(subject);
const second = blk: { const States = enum {
// Drop whitespace before_second,
while (in.peekByte()) |byte| { in_second,
after_second,
in_third,
in_end,
};
var second: std.ArrayList(u8) = .empty;
defer second.deinit(alloc);
var third: ?std.ArrayList(u8) = null;
defer if (third) |*t| t.deinit(alloc);
var payload: std.Io.Writer.Allocating = .init(alloc);
sw: switch (@as(States, .before_second)) {
.before_second => {
// Drop whitespace
const byte = try in.peekByte();
if (std.ascii.isWhitespace(byte)) { if (std.ascii.isWhitespace(byte)) {
in.toss(1); in.toss(1);
} else break; continue :sw .before_second;
} else |err| return err; }
continue :sw .in_second;
var acc: std.ArrayList(u8) = .empty; },
errdefer acc.deinit(alloc); .in_second => {
while (in.peekByte()) |byte| { const byte = try in.peekByte();
if (std.ascii.isWhitespace(byte)) break; if (!std.ascii.isWhitespace(byte)) {
try acc.append(alloc, byte); try second.append(alloc, byte);
in.toss(1); in.toss(1);
} else |err| return err; continue :sw .in_second;
}
break :blk try acc.toOwnedSlice(alloc); continue :sw .after_second;
}; },
defer alloc.free(second); .after_second => {
const byte = try in.peekByte();
const byte_count: usize, const reply_to: ?[]const u8 = if (byte == '\r') {
if (std.fmt.parseUnsigned(usize, second, 10)) |s| blk: { continue :sw .in_end;
} else if (std.ascii.isWhitespace(byte)) {
in.toss(1);
continue :sw .after_second;
}
third = .empty;
continue :sw .in_third;
},
.in_third => {
const byte = try in.peekByte();
if (byte == '\r') {
continue :sw .in_end;
} else if (std.ascii.isDigit(byte)) {
try third.?.append(alloc, byte);
in.toss(1);
continue :sw .in_third;
}
return error.InvalidStream;
},
.in_end => {
try expectStreamBytes(in, "\r\n"); try expectStreamBytes(in, "\r\n");
break :blk .{ s, null }; },
} else |_| .{ }
blk: {
var byte_count_list: std.ArrayList(u8) = .empty; const reply_to: ?[]const u8, const bytes: usize =
defer byte_count_list.deinit(alloc); if (third) |t| .{
try in.discardAll(1); // discard space try alloc.dupe(u8, second.items),
while (in.peekByte()) |byte| { try std.fmt.parseUnsigned(usize, t.items, 10),
if (std.ascii.isWhitespace(byte)) { } else .{
try expectStreamBytes(in, "\r\n"); null,
break; try std.fmt.parseUnsigned(usize, second.items, 10),
}
defer in.toss(1);
if (std.ascii.isDigit(byte)) {
try byte_count_list.append(alloc, byte);
} else {
return error.InvalidStream;
}
} else |err| return err;
break :blk try std.fmt.parseUnsigned(usize, byte_count_list.items, 10);
},
try alloc.dupe(u8, second),
}; };
const payload = blk: { try in.streamExact(&payload.writer, bytes);
const bytes = try alloc.alloc(u8, byte_count); try expectStreamBytes(in, "\r\n");
errdefer alloc.free(bytes);
try in.readSliceAll(bytes);
try expectStreamBytes(in, "\r\n");
break :blk bytes;
};
return .{ return .{
.@"pub" = .{ .@"pub" = .{
.subject = subject, .subject = subject,
.payload = payload, .payload = try payload.toOwnedSlice(),
.reply_to = reply_to, .reply_to = reply_to,
}, },
}; };
@@ -515,7 +533,7 @@ test parsePub {
try std.testing.expectEqualDeep( try std.testing.expectEqualDeep(
Message{ .@"pub" = .{ Message{ .@"pub" = .{
.subject = "foo", .subject = "foo",
.reply_to = "reply.to", .reply_to = "5",
.payload = "bar", .payload = "bar",
} }, } },
res, res,
@@ -530,95 +548,175 @@ fn parseHPub(alloc: std.mem.Allocator, in: *std.Io.Reader) !Message {
const subject: []const u8 = try readSubject(alloc, in); const subject: []const u8 = try readSubject(alloc, in);
errdefer alloc.free(subject); errdefer alloc.free(subject);
const second = blk: { const States = enum {
// Drop whitespace before_second,
while (in.peekByte()) |byte| { in_second,
after_second,
in_third,
after_third,
in_fourth,
in_end,
};
var second: std.ArrayList(u8) = .empty;
defer second.deinit(alloc);
var third: std.ArrayList(u8) = .empty;
defer third.deinit(alloc);
var fourth: ?std.ArrayList(u8) = null;
defer if (fourth) |*f| f.deinit(alloc);
var payload: std.Io.Writer.Allocating = .init(alloc);
sw: switch (@as(States, .before_second)) {
.before_second => {
// Drop whitespace
const byte = try in.peekByte();
if (std.ascii.isWhitespace(byte)) { if (std.ascii.isWhitespace(byte)) {
in.toss(1); in.toss(1);
} else break; continue :sw .before_second;
} else |err| return err; }
continue :sw .in_second;
var acc: std.ArrayList(u8) = .empty; },
errdefer acc.deinit(alloc); .in_second => {
while (in.peekByte()) |byte| { const byte = try in.peekByte();
if (std.ascii.isWhitespace(byte)) break; if (!std.ascii.isWhitespace(byte)) {
try acc.append(alloc, byte); try second.append(alloc, byte);
in.toss(1); in.toss(1);
} else |err| return err; continue :sw .in_second;
}
break :blk try acc.toOwnedSlice(alloc); continue :sw .after_second;
}; },
errdefer alloc.free(second); .after_second => {
const byte = try in.peekByte();
const header_byte_count: usize, const reply_to: ?[]const u8 = if (byte == '\r') {
if (std.fmt.parseUnsigned(usize, second, 10)) |s| blk: { continue :sw .in_end;
} else if (std.ascii.isWhitespace(byte)) {
in.toss(1);
continue :sw .after_second;
}
third = .empty;
continue :sw .in_third;
},
.in_third => {
const byte = try in.peekByte();
if (!std.ascii.isWhitespace(byte)) {
try third.append(alloc, byte);
in.toss(1);
continue :sw .in_third;
}
continue :sw .after_third;
},
.after_third => {
const byte = try in.peekByte();
if (byte == '\r') {
continue :sw .in_end;
} else if (std.ascii.isWhitespace(byte)) {
in.toss(1);
continue :sw .after_third;
}
fourth = .empty;
continue :sw .in_fourth;
},
.in_fourth => {
const byte = try in.peekByte();
if (byte == '\r') {
continue :sw .in_end;
} else if (std.ascii.isDigit(byte)) {
try fourth.?.append(alloc, byte);
in.toss(1);
continue :sw .in_fourth;
}
return error.InvalidStream;
},
.in_end => {
try expectStreamBytes(in, "\r\n"); try expectStreamBytes(in, "\r\n");
break :blk .{ s, null }; },
} else |_| .{ }
blk: {
var byte_count_list: std.ArrayList(u8) = .empty; const reply_to: ?[]const u8, const header_bytes: usize, const total_bytes: usize =
defer byte_count_list.deinit(alloc); if (fourth) |f| .{
try in.discardAll(1); // discard space try alloc.dupe(u8, second.items),
while (in.peekByte()) |byte| { try std.fmt.parseUnsigned(usize, third.items, 10),
if (std.ascii.isWhitespace(byte)) { try std.fmt.parseUnsigned(usize, f.items, 10),
try expectStreamBytes(in, "\r\n"); } else .{
break; null,
} try std.fmt.parseUnsigned(usize, second.items, 10),
defer in.toss(1); try std.fmt.parseUnsigned(usize, third.items, 10),
if (std.ascii.isDigit(byte)) {
try byte_count_list.append(alloc, byte);
} else {
return error.InvalidStream;
}
} else |err| return err;
break :blk try std.fmt.parseUnsigned(usize, byte_count_list.items, 10);
},
second,
}; };
std.log.debug("buffered: '{s}'", .{in.buffered()}); try in.streamExact(&payload.writer, total_bytes);
try expectStreamBytes(in, "\r\n");
// Parse byte count
const byte_count = blk: {
var byte_count_list: std.ArrayList(u8) = .empty;
defer byte_count_list.deinit(alloc);
while (in.peekByte()) |byte| {
if (std.ascii.isWhitespace(byte)) {
try expectStreamBytes(in, "\r\n");
break;
}
defer in.toss(1);
if (std.ascii.isDigit(byte)) {
try byte_count_list.append(alloc, byte);
} else {
return error.InvalidStream;
}
} else |err| return err;
break :blk try std.fmt.parseUnsigned(usize, byte_count_list.items, 10);
};
const payload = blk: {
const bytes = try alloc.alloc(u8, byte_count);
errdefer alloc.free(bytes);
try in.readSliceAll(bytes);
try expectStreamBytes(in, "\r\n");
break :blk bytes;
};
return .{ return .{
.hpub = .{ .hpub = .{
.header_bytes = header_byte_count, .header_bytes = header_bytes,
.@"pub" = .{ .@"pub" = .{
.subject = subject, .subject = subject,
.payload = payload, .payload = try payload.toOwnedSlice(),
.reply_to = reply_to, .reply_to = reply_to,
}, },
}, },
}; };
} }
test parseHPub {
{
var in: std.Io.Reader = .fixed(" foo 22 33\r\nNATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!\r\n");
var res = try parseHPub(std.testing.allocator, &in);
defer res.hpub.deinit(std.testing.allocator);
try std.testing.expectEqualDeep(
Message{
.hpub = .{
.header_bytes = 22,
.@"pub" = .{
.subject = "foo",
.reply_to = null,
.payload = "NATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!",
},
},
},
res,
);
}
{
var in: std.Io.Reader = .fixed(" foo reply.to 22 33\r\nNATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!\r\n");
var res = try parseHPub(std.testing.allocator, &in);
defer res.hpub.deinit(std.testing.allocator);
try std.testing.expectEqualDeep(
Message{
.hpub = .{
.header_bytes = 22,
.@"pub" = .{
.subject = "foo",
.reply_to = "reply.to",
.payload = "NATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!",
},
},
},
res,
);
}
{
var in: std.Io.Reader = .fixed(" foo 6 22 33\r\nNATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!\r\n");
var res = try parseHPub(std.testing.allocator, &in);
defer res.hpub.deinit(std.testing.allocator);
try std.testing.expectEqualDeep(
Message{
.hpub = .{
.header_bytes = 22,
.@"pub" = .{
.subject = "foo",
.reply_to = "6",
.payload = "NATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!",
},
},
},
res,
);
}
}
fn readSubject(alloc: std.mem.Allocator, in: *std.Io.Reader) ![]const u8 { fn readSubject(alloc: std.mem.Allocator, in: *std.Io.Reader) ![]const u8 {
var subject_list: std.ArrayList(u8) = try .initCapacity(alloc, 1024); var subject_list: std.ArrayList(u8) = try .initCapacity(alloc, 1024);
errdefer subject_list.deinit(alloc); errdefer subject_list.deinit(alloc);