cleanup imports

This commit is contained in:
2026-01-03 05:53:23 +00:00
parent bd9829f684
commit dcd09e2f10

View File

@@ -1,4 +1,21 @@
const std = @import("std"); const std = @import("std");
const Allocator = std.mem.Allocator;
const ArenaAllocator = std.heap.ArenaAllocator;
const ArrayList = std.ArrayList;
const StaticStringMap = std.StaticStringMap;
const Io = std.Io;
const AllocatingWriter = Io.Writer.Allocating;
const Reader = Io.Reader;
const ascii = std.ascii;
const isDigit = std.ascii.isDigit;
const isUpper = std.ascii.isUpper;
const isWhitespace = std.ascii.isWhitespace;
const parseUnsigned = std.fmt.parseUnsigned;
const log = std.log;
pub const MessageType = @typeInfo(Message).@"union".tag_type.?; pub const MessageType = @typeInfo(Message).@"union".tag_type.?;
@@ -60,7 +77,7 @@ pub const Message = union(enum) {
headers: ?bool = null, headers: ?bool = null,
nkey: ?[]const u8 = null, nkey: ?[]const u8 = null,
pub fn deinit(self: Connect, alloc: std.mem.Allocator) void { pub fn deinit(self: Connect, alloc: Allocator) void {
if (self.auth_token) |a| alloc.free(a); if (self.auth_token) |a| alloc.free(a);
if (self.user) |u| alloc.free(u); if (self.user) |u| alloc.free(u);
if (self.pass) |p| alloc.free(p); if (self.pass) |p| alloc.free(p);
@@ -72,7 +89,7 @@ pub const Message = union(enum) {
if (self.nkey) |n| alloc.free(n); if (self.nkey) |n| alloc.free(n);
} }
pub fn dupe(self: Connect, alloc: std.mem.Allocator) !Connect { pub fn dupe(self: Connect, alloc: Allocator) !Connect {
var res = self; var res = self;
res.auth_token = if (self.auth_token) |a| try alloc.dupe(u8, a) else null; res.auth_token = if (self.auth_token) |a| try alloc.dupe(u8, a) else null;
errdefer if (res.auth_token) |a| alloc.free(a); errdefer if (res.auth_token) |a| alloc.free(a);
@@ -103,13 +120,13 @@ pub const Message = union(enum) {
/// The message payload data. /// The message payload data.
payload: []const u8, payload: []const u8,
pub fn deinit(self: Pub, alloc: std.mem.Allocator) void { pub fn deinit(self: Pub, alloc: Allocator) void {
alloc.free(self.subject); alloc.free(self.subject);
alloc.free(self.payload); alloc.free(self.payload);
if (self.reply_to) |r| alloc.free(r); if (self.reply_to) |r| alloc.free(r);
} }
pub fn toMsg(self: Pub, alloc: std.mem.Allocator, sid: []const u8) !Msg { pub fn toMsg(self: Pub, alloc: Allocator, sid: []const u8) !Msg {
const res: Msg = .{ const res: Msg = .{
.subject = self.subject, .subject = self.subject,
.sid = sid, .sid = sid,
@@ -123,11 +140,11 @@ pub const Message = union(enum) {
header_bytes: usize, header_bytes: usize,
@"pub": Pub, @"pub": Pub,
pub fn deinit(self: HPub, alloc: std.mem.Allocator) void { pub fn deinit(self: HPub, alloc: Allocator) void {
self.@"pub".deinit(alloc); self.@"pub".deinit(alloc);
} }
pub fn toHMsg(self: HPub, alloc: std.mem.Allocator, sid: []const u8) !HMsg { pub fn toHMsg(self: HPub, alloc: Allocator, sid: []const u8) !HMsg {
return .{ return .{
.header_bytes = self.header_bytes, .header_bytes = self.header_bytes,
.msg = try self.@"pub".toMsg(alloc, sid), .msg = try self.@"pub".toMsg(alloc, sid),
@@ -139,11 +156,11 @@ pub const Message = union(enum) {
header_bytes: usize, header_bytes: usize,
msg: Msg, msg: Msg,
pub fn deinit(self: HMsg, alloc: std.mem.Allocator) void { pub fn deinit(self: HMsg, alloc: Allocator) void {
self.msg.deinit(alloc); self.msg.deinit(alloc);
} }
pub fn dupe(self: HMsg, alloc: std.mem.Allocator) !HMsg { pub fn dupe(self: HMsg, alloc: Allocator) !HMsg {
var res = self; var res = self;
res.msg = try self.msg.dupe(alloc); res.msg = try self.msg.dupe(alloc);
return res; return res;
@@ -157,7 +174,7 @@ pub const Message = union(enum) {
/// A unique alphanumeric subscription ID, generated by the client. /// A unique alphanumeric subscription ID, generated by the client.
sid: []const u8, sid: []const u8,
pub fn deinit(self: Sub, alloc: std.mem.Allocator) void { pub fn deinit(self: Sub, alloc: Allocator) void {
alloc.free(self.subject); alloc.free(self.subject);
alloc.free(self.sid); alloc.free(self.sid);
if (self.queue_group) |q| alloc.free(q); if (self.queue_group) |q| alloc.free(q);
@@ -169,7 +186,7 @@ pub const Message = union(enum) {
/// A number of messages to wait for before automatically unsubscribing. /// A number of messages to wait for before automatically unsubscribing.
max_msgs: ?usize = null, max_msgs: ?usize = null,
pub fn deinit(self: Unsub, alloc: std.mem.Allocator) void { pub fn deinit(self: Unsub, alloc: Allocator) void {
alloc.free(self.sid); alloc.free(self.sid);
} }
}; };
@@ -179,14 +196,14 @@ pub const Message = union(enum) {
reply_to: ?[]const u8, reply_to: ?[]const u8,
payload: []const u8, payload: []const u8,
pub fn deinit(self: Msg, alloc: std.mem.Allocator) void { pub fn deinit(self: Msg, alloc: Allocator) void {
alloc.free(self.subject); alloc.free(self.subject);
alloc.free(self.sid); alloc.free(self.sid);
if (self.reply_to) |r| alloc.free(r); if (self.reply_to) |r| alloc.free(r);
alloc.free(self.payload); alloc.free(self.payload);
} }
pub fn dupe(self: Msg, alloc: std.mem.Allocator) !Msg { pub fn dupe(self: Msg, alloc: Allocator) !Msg {
var res: Msg = undefined; var res: Msg = undefined;
res.subject = try alloc.dupe(u8, self.subject); res.subject = try alloc.dupe(u8, self.subject);
errdefer alloc.free(res.subject); errdefer alloc.free(res.subject);
@@ -200,7 +217,7 @@ pub const Message = union(enum) {
} }
}; };
const client_types = std.StaticStringMap(MessageType).initComptime( const client_types = StaticStringMap(MessageType).initComptime(
.{ .{
// {"INFO", .info}, // {"INFO", .info},
.{ "CONNECT", .connect }, .{ "CONNECT", .connect },
@@ -223,8 +240,8 @@ pub const Message = union(enum) {
pub const parse = parseStaticStringMap; pub const parse = parseStaticStringMap;
/// An error should be handled by cleaning up this connection. /// An error should be handled by cleaning up this connection.
pub fn next(alloc: std.mem.Allocator, in: *std.Io.Reader) !Message { pub fn next(alloc: Allocator, in: *Reader) !Message {
var operation_string: std.ArrayList(u8) = blk: { var operation_string: ArrayList(u8) = blk: {
comptime var buf_len = 0; comptime var buf_len = 0;
comptime { comptime {
for (client_types.keys()) |key| { for (client_types.keys()) |key| {
@@ -236,28 +253,28 @@ pub const Message = union(enum) {
}; };
while (in.peekByte()) |byte| { while (in.peekByte()) |byte| {
if (std.ascii.isUpper(byte)) { if (isUpper(byte)) {
try operation_string.appendBounded(byte); try operation_string.appendBounded(byte);
in.toss(1); in.toss(1);
} else break; } else break;
} else |err| return err; } else |err| return err;
const operation = parse(operation_string.items) orelse { const operation = parse(operation_string.items) orelse {
std.log.err("Invalid operation: '{s}'", .{operation_string.items}); log.err("Invalid operation: '{s}'", .{operation_string.items});
return error.InvalidOperation; return error.InvalidOperation;
}; };
errdefer std.log.err("Failed to parse {s}", .{operation_string.items}); errdefer log.err("Failed to parse {s}", .{operation_string.items});
switch (operation) { switch (operation) {
.connect => { .connect => {
// for storing the json string // for storing the json string
var connect_string_writer_allocating: std.Io.Writer.Allocating = .init(alloc); var connect_string_writer_allocating: AllocatingWriter = .init(alloc);
defer connect_string_writer_allocating.deinit(); defer connect_string_writer_allocating.deinit();
var connect_string_writer = &connect_string_writer_allocating.writer; var connect_string_writer = &connect_string_writer_allocating.writer;
// for parsing the json string // for parsing the json string
var connect_arena_allocator: std.heap.ArenaAllocator = .init(alloc); var connect_arena_allocator: ArenaAllocator = .init(alloc);
defer connect_arena_allocator.deinit(); defer connect_arena_allocator.deinit();
const connect_allocator = connect_arena_allocator.allocator(); const connect_allocator = connect_arena_allocator.allocator();
@@ -307,7 +324,7 @@ pub const Message = union(enum) {
} }
}; };
fn parseSub(alloc: std.mem.Allocator, in: *std.Io.Reader) !Message { fn parseSub(alloc: Allocator, in: *Reader) !Message {
try in.discardAll(1); // throw away space try in.discardAll(1); // throw away space
const subject = try readSubject(alloc, in, .sub); const subject = try readSubject(alloc, in, .sub);
@@ -319,15 +336,15 @@ fn parseSub(alloc: std.mem.Allocator, in: *std.Io.Reader) !Message {
in_end, in_end,
}; };
var second: std.ArrayList(u8) = .empty; var second: ArrayList(u8) = .empty;
errdefer second.deinit(alloc); errdefer second.deinit(alloc);
var third: ?std.ArrayList(u8) = null; var third: ?ArrayList(u8) = null;
errdefer if (third) |*t| t.deinit(alloc); errdefer if (third) |*t| t.deinit(alloc);
sw: switch (@as(States, .before_second)) { sw: switch (@as(States, .before_second)) {
.before_second => { .before_second => {
const byte = try in.peekByte(); const byte = try in.peekByte();
if (std.ascii.isWhitespace(byte)) { if (isWhitespace(byte)) {
in.toss(1); in.toss(1);
continue :sw .before_second; continue :sw .before_second;
} }
@@ -335,7 +352,7 @@ fn parseSub(alloc: std.mem.Allocator, in: *std.Io.Reader) !Message {
}, },
.in_second => { .in_second => {
const byte = try in.peekByte(); const byte = try in.peekByte();
if (!std.ascii.isWhitespace(byte)) { if (!isWhitespace(byte)) {
try second.append(alloc, byte); try second.append(alloc, byte);
in.toss(1); in.toss(1);
continue :sw .in_second; continue :sw .in_second;
@@ -346,7 +363,7 @@ fn parseSub(alloc: std.mem.Allocator, in: *std.Io.Reader) !Message {
const byte = try in.peekByte(); const byte = try in.peekByte();
if (byte == '\r') { if (byte == '\r') {
continue :sw .in_end; continue :sw .in_end;
} else if (std.ascii.isWhitespace(byte)) { } else if (isWhitespace(byte)) {
in.toss(1); in.toss(1);
continue :sw .after_second; continue :sw .after_second;
} }
@@ -377,11 +394,13 @@ fn parseSub(alloc: std.mem.Allocator, in: *std.Io.Reader) !Message {
} }
test parseSub { test parseSub {
const alloc = std.testing.allocator;
const expectEqualDeep = std.testing.expectEqualDeep;
{ {
var in: std.Io.Reader = .fixed(" foo 1\r\n"); var in: Reader = .fixed(" foo 1\r\n");
var res = try parseSub(std.testing.allocator, &in); var res = try parseSub(alloc, &in);
defer res.sub.deinit(std.testing.allocator); defer res.sub.deinit(alloc);
try std.testing.expectEqualDeep( try expectEqualDeep(
Message{ Message{
.sub = .{ .sub = .{
.subject = "foo", .subject = "foo",
@@ -393,10 +412,10 @@ test parseSub {
); );
} }
{ {
var in: std.Io.Reader = .fixed(" foo 1\r\n"); var in: Reader = .fixed(" foo 1\r\n");
var res = try parseSub(std.testing.allocator, &in); var res = try parseSub(alloc, &in);
defer res.sub.deinit(std.testing.allocator); defer res.sub.deinit(alloc);
try std.testing.expectEqualDeep( try expectEqualDeep(
Message{ Message{
.sub = .{ .sub = .{
.subject = "foo", .subject = "foo",
@@ -408,10 +427,10 @@ test parseSub {
); );
} }
{ {
var in: std.Io.Reader = .fixed(" foo q 1\r\n"); var in: Reader = .fixed(" foo q 1\r\n");
var res = try parseSub(std.testing.allocator, &in); var res = try parseSub(alloc, &in);
defer res.sub.deinit(std.testing.allocator); defer res.sub.deinit(alloc);
try std.testing.expectEqualDeep( try expectEqualDeep(
Message{ Message{
.sub = .{ .sub = .{
.subject = "foo", .subject = "foo",
@@ -423,10 +442,10 @@ test parseSub {
); );
} }
{ {
var in: std.Io.Reader = .fixed(" 1 q 1\r\n"); var in: Reader = .fixed(" 1 q 1\r\n");
var res = try parseSub(std.testing.allocator, &in); var res = try parseSub(alloc, &in);
defer res.sub.deinit(std.testing.allocator); defer res.sub.deinit(alloc);
try std.testing.expectEqualDeep( try expectEqualDeep(
Message{ Message{
.sub = .{ .sub = .{
.subject = "1", .subject = "1",
@@ -438,10 +457,10 @@ test parseSub {
); );
} }
{ {
var in: std.Io.Reader = .fixed(" $SRV.PING 4\r\n"); var in: Reader = .fixed(" $SRV.PING 4\r\n");
var res = try parseSub(std.testing.allocator, &in); var res = try parseSub(alloc, &in);
defer res.sub.deinit(std.testing.allocator); defer res.sub.deinit(alloc);
try std.testing.expectEqualDeep( try expectEqualDeep(
Message{ Message{
.sub = .{ .sub = .{
.subject = "$SRV.PING", .subject = "$SRV.PING",
@@ -453,10 +472,10 @@ test parseSub {
); );
} }
{ {
var in: std.Io.Reader = .fixed(" foo.echo q 10\r\n"); var in: Reader = .fixed(" foo.echo q 10\r\n");
var res = try parseSub(std.testing.allocator, &in); var res = try parseSub(alloc, &in);
defer res.sub.deinit(std.testing.allocator); defer res.sub.deinit(alloc);
try std.testing.expectEqualDeep( try expectEqualDeep(
Message{ Message{
.sub = .{ .sub = .{
.subject = "foo.echo", .subject = "foo.echo",
@@ -469,7 +488,7 @@ test parseSub {
} }
} }
fn parseUnsub(alloc: std.mem.Allocator, in: *std.Io.Reader) !Message { fn parseUnsub(alloc: Allocator, in: *Reader) !Message {
const States = enum { const States = enum {
before_first, before_first,
in_first, in_first,
@@ -478,15 +497,15 @@ fn parseUnsub(alloc: std.mem.Allocator, in: *std.Io.Reader) !Message {
in_end, in_end,
}; };
var first: std.ArrayList(u8) = .empty; var first: ArrayList(u8) = .empty;
errdefer first.deinit(alloc); errdefer first.deinit(alloc);
var second: ?std.ArrayList(u8) = null; var second: ?ArrayList(u8) = null;
defer if (second) |*s| s.deinit(alloc); defer if (second) |*s| s.deinit(alloc);
sw: switch (@as(States, .before_first)) { sw: switch (@as(States, .before_first)) {
.before_first => { .before_first => {
const byte = try in.peekByte(); const byte = try in.peekByte();
if (std.ascii.isWhitespace(byte)) { if (isWhitespace(byte)) {
in.toss(1); in.toss(1);
continue :sw .before_first; continue :sw .before_first;
} }
@@ -494,7 +513,7 @@ fn parseUnsub(alloc: std.mem.Allocator, in: *std.Io.Reader) !Message {
}, },
.in_first => { .in_first => {
const byte = try in.peekByte(); const byte = try in.peekByte();
if (!std.ascii.isWhitespace(byte)) { if (!isWhitespace(byte)) {
try first.append(alloc, byte); try first.append(alloc, byte);
in.toss(1); in.toss(1);
continue :sw .in_first; continue :sw .in_first;
@@ -505,7 +524,7 @@ fn parseUnsub(alloc: std.mem.Allocator, in: *std.Io.Reader) !Message {
const byte = try in.peekByte(); const byte = try in.peekByte();
if (byte == '\r') { if (byte == '\r') {
continue :sw .in_end; continue :sw .in_end;
} else if (std.ascii.isWhitespace(byte)) { } else if (isWhitespace(byte)) {
in.toss(1); in.toss(1);
continue :sw .after_first; continue :sw .after_first;
} }
@@ -529,17 +548,20 @@ fn parseUnsub(alloc: std.mem.Allocator, in: *std.Io.Reader) !Message {
return .{ return .{
.unsub = .{ .unsub = .{
.sid = try first.toOwnedSlice(alloc), .sid = try first.toOwnedSlice(alloc),
.max_msgs = if (second) |s| try std.fmt.parseUnsigned(usize, s.items, 10) else null, .max_msgs = if (second) |s| try parseUnsigned(usize, s.items, 10) else null,
}, },
}; };
} }
test parseUnsub { test parseUnsub {
const alloc = std.testing.allocator;
const expectEqualDeep = std.testing.expectEqualDeep;
const expectEqual = std.testing.expectEqual;
{ {
var in: std.Io.Reader = .fixed(" 1\r\n"); var in: Reader = .fixed(" 1\r\n");
var res = try parseUnsub(std.testing.allocator, &in); var res = try parseUnsub(alloc, &in);
defer res.unsub.deinit(std.testing.allocator); defer res.unsub.deinit(alloc);
try std.testing.expectEqualDeep( try expectEqualDeep(
Message{ Message{
.unsub = .{ .unsub = .{
.sid = "1", .sid = "1",
@@ -548,14 +570,14 @@ test parseUnsub {
}, },
res, res,
); );
try std.testing.expectEqual(0, in.buffered().len); try expectEqual(0, in.buffered().len);
} }
{ {
var in: std.Io.Reader = .fixed(" 1 1\r\n"); var in: Reader = .fixed(" 1 1\r\n");
var res = try parseUnsub(std.testing.allocator, &in); var res = try parseUnsub(alloc, &in);
defer res.unsub.deinit(std.testing.allocator); defer res.unsub.deinit(alloc);
try std.testing.expectEqualDeep( try expectEqualDeep(
Message{ Message{
.unsub = .{ .unsub = .{
.sid = "1", .sid = "1",
@@ -564,11 +586,11 @@ test parseUnsub {
}, },
res, res,
); );
try std.testing.expectEqual(0, in.buffered().len); try expectEqual(0, in.buffered().len);
} }
} }
fn parsePub(alloc: std.mem.Allocator, in: *std.Io.Reader) !Message { fn parsePub(alloc: Allocator, in: *Reader) !Message {
try in.discardAll(1); // throw away space try in.discardAll(1); // throw away space
// Parse subject // Parse subject
@@ -583,18 +605,18 @@ fn parsePub(alloc: std.mem.Allocator, in: *std.Io.Reader) !Message {
in_end, in_end,
}; };
var second: std.ArrayList(u8) = .empty; var second: ArrayList(u8) = .empty;
defer second.deinit(alloc); defer second.deinit(alloc);
var third: ?std.ArrayList(u8) = null; var third: ?ArrayList(u8) = null;
defer if (third) |*t| t.deinit(alloc); defer if (third) |*t| t.deinit(alloc);
var payload: std.Io.Writer.Allocating = .init(alloc); var payload: AllocatingWriter = .init(alloc);
errdefer payload.deinit(); errdefer payload.deinit();
sw: switch (@as(States, .before_second)) { sw: switch (@as(States, .before_second)) {
.before_second => { .before_second => {
// Drop whitespace // Drop whitespace
const byte = try in.peekByte(); const byte = try in.peekByte();
if (std.ascii.isWhitespace(byte)) { if (isWhitespace(byte)) {
in.toss(1); in.toss(1);
continue :sw .before_second; continue :sw .before_second;
} }
@@ -602,7 +624,7 @@ fn parsePub(alloc: std.mem.Allocator, in: *std.Io.Reader) !Message {
}, },
.in_second => { .in_second => {
const byte = try in.peekByte(); const byte = try in.peekByte();
if (!std.ascii.isWhitespace(byte)) { if (!isWhitespace(byte)) {
try second.append(alloc, byte); try second.append(alloc, byte);
in.toss(1); in.toss(1);
continue :sw .in_second; continue :sw .in_second;
@@ -613,7 +635,7 @@ fn parsePub(alloc: std.mem.Allocator, in: *std.Io.Reader) !Message {
const byte = try in.peekByte(); const byte = try in.peekByte();
if (byte == '\r') { if (byte == '\r') {
continue :sw .in_end; continue :sw .in_end;
} else if (std.ascii.isWhitespace(byte)) { } else if (isWhitespace(byte)) {
in.toss(1); in.toss(1);
continue :sw .after_second; continue :sw .after_second;
} }
@@ -624,7 +646,7 @@ fn parsePub(alloc: std.mem.Allocator, in: *std.Io.Reader) !Message {
const byte = try in.peekByte(); const byte = try in.peekByte();
if (byte == '\r') { if (byte == '\r') {
continue :sw .in_end; continue :sw .in_end;
} else if (std.ascii.isDigit(byte)) { } else if (isDigit(byte)) {
try third.?.append(alloc, byte); try third.?.append(alloc, byte);
in.toss(1); in.toss(1);
continue :sw .in_third; continue :sw .in_third;
@@ -639,10 +661,10 @@ fn parsePub(alloc: std.mem.Allocator, in: *std.Io.Reader) !Message {
const reply_to: ?[]const u8, const bytes: usize = const reply_to: ?[]const u8, const bytes: usize =
if (third) |t| .{ if (third) |t| .{
try alloc.dupe(u8, second.items), try alloc.dupe(u8, second.items),
try std.fmt.parseUnsigned(usize, t.items, 10), try parseUnsigned(usize, t.items, 10),
} else .{ } else .{
null, null,
try std.fmt.parseUnsigned(usize, second.items, 10), try parseUnsigned(usize, second.items, 10),
}; };
try in.streamExact(&payload.writer, bytes); try in.streamExact(&payload.writer, bytes);
@@ -658,11 +680,14 @@ fn parsePub(alloc: std.mem.Allocator, in: *std.Io.Reader) !Message {
} }
test parsePub { test parsePub {
const alloc = std.testing.allocator;
const expectEqualDeep = std.testing.expectEqualDeep;
const expectEqual = std.testing.expectEqual;
{ {
var in: std.Io.Reader = .fixed(" foo 3\r\nbar\r\n"); var in: Reader = .fixed(" foo 3\r\nbar\r\n");
var res = try parsePub(std.testing.allocator, &in); var res = try parsePub(alloc, &in);
defer res.@"pub".deinit(std.testing.allocator); defer res.@"pub".deinit(alloc);
try std.testing.expectEqualDeep( try expectEqualDeep(
Message{ Message{
.@"pub" = .{ .@"pub" = .{
.subject = "foo", .subject = "foo",
@@ -672,14 +697,14 @@ test parsePub {
}, },
res, res,
); );
try std.testing.expectEqual(0, in.buffered().len); try expectEqual(0, in.buffered().len);
} }
{ {
var in: std.Io.Reader = .fixed(" foo reply.to 3\r\nbar\r\n"); var in: Reader = .fixed(" foo reply.to 3\r\nbar\r\n");
var res = try parsePub(std.testing.allocator, &in); var res = try parsePub(alloc, &in);
defer res.@"pub".deinit(std.testing.allocator); defer res.@"pub".deinit(alloc);
try std.testing.expectEqualDeep( try expectEqualDeep(
Message{ Message{
.@"pub" = .{ .@"pub" = .{
.subject = "foo", .subject = "foo",
@@ -689,15 +714,15 @@ test parsePub {
}, },
res, res,
); );
try std.testing.expectEqual(0, in.buffered().len); try expectEqual(0, in.buffered().len);
} }
// numeric reply subject // numeric reply subject
{ {
var in: std.Io.Reader = .fixed(" foo 5 3\r\nbar\r\n"); var in: Reader = .fixed(" foo 5 3\r\nbar\r\n");
var res = try parsePub(std.testing.allocator, &in); var res = try parsePub(alloc, &in);
defer res.@"pub".deinit(std.testing.allocator); defer res.@"pub".deinit(alloc);
try std.testing.expectEqualDeep( try expectEqualDeep(
Message{ Message{
.@"pub" = .{ .@"pub" = .{
.subject = "foo", .subject = "foo",
@@ -707,11 +732,11 @@ test parsePub {
}, },
res, res,
); );
try std.testing.expectEqual(0, in.buffered().len); try expectEqual(0, in.buffered().len);
} }
} }
fn parseHPub(alloc: std.mem.Allocator, in: *std.Io.Reader) !Message { fn parseHPub(alloc: Allocator, in: *Reader) !Message {
try in.discardAll(1); // throw away space try in.discardAll(1); // throw away space
// Parse subject // Parse subject
@@ -728,20 +753,20 @@ fn parseHPub(alloc: std.mem.Allocator, in: *std.Io.Reader) !Message {
in_end, in_end,
}; };
var second: std.ArrayList(u8) = .empty; var second: ArrayList(u8) = .empty;
defer second.deinit(alloc); defer second.deinit(alloc);
var third: std.ArrayList(u8) = .empty; var third: ArrayList(u8) = .empty;
defer third.deinit(alloc); defer third.deinit(alloc);
var fourth: ?std.ArrayList(u8) = null; var fourth: ?ArrayList(u8) = null;
defer if (fourth) |*f| f.deinit(alloc); defer if (fourth) |*f| f.deinit(alloc);
var payload: std.Io.Writer.Allocating = .init(alloc); var payload: AllocatingWriter = .init(alloc);
errdefer payload.deinit(); errdefer payload.deinit();
sw: switch (@as(States, .before_second)) { sw: switch (@as(States, .before_second)) {
.before_second => { .before_second => {
// Drop whitespace // Drop whitespace
const byte = try in.peekByte(); const byte = try in.peekByte();
if (std.ascii.isWhitespace(byte)) { if (isWhitespace(byte)) {
in.toss(1); in.toss(1);
continue :sw .before_second; continue :sw .before_second;
} }
@@ -749,7 +774,7 @@ fn parseHPub(alloc: std.mem.Allocator, in: *std.Io.Reader) !Message {
}, },
.in_second => { .in_second => {
const byte = try in.peekByte(); const byte = try in.peekByte();
if (!std.ascii.isWhitespace(byte)) { if (!isWhitespace(byte)) {
try second.append(alloc, byte); try second.append(alloc, byte);
in.toss(1); in.toss(1);
continue :sw .in_second; continue :sw .in_second;
@@ -760,7 +785,7 @@ fn parseHPub(alloc: std.mem.Allocator, in: *std.Io.Reader) !Message {
const byte = try in.peekByte(); const byte = try in.peekByte();
if (byte == '\r') { if (byte == '\r') {
continue :sw .in_end; continue :sw .in_end;
} else if (std.ascii.isWhitespace(byte)) { } else if (isWhitespace(byte)) {
in.toss(1); in.toss(1);
continue :sw .after_second; continue :sw .after_second;
} }
@@ -769,7 +794,7 @@ fn parseHPub(alloc: std.mem.Allocator, in: *std.Io.Reader) !Message {
}, },
.in_third => { .in_third => {
const byte = try in.peekByte(); const byte = try in.peekByte();
if (!std.ascii.isWhitespace(byte)) { if (!isWhitespace(byte)) {
try third.append(alloc, byte); try third.append(alloc, byte);
in.toss(1); in.toss(1);
continue :sw .in_third; continue :sw .in_third;
@@ -780,7 +805,7 @@ fn parseHPub(alloc: std.mem.Allocator, in: *std.Io.Reader) !Message {
const byte = try in.peekByte(); const byte = try in.peekByte();
if (byte == '\r') { if (byte == '\r') {
continue :sw .in_end; continue :sw .in_end;
} else if (std.ascii.isWhitespace(byte)) { } else if (isWhitespace(byte)) {
in.toss(1); in.toss(1);
continue :sw .after_third; continue :sw .after_third;
} }
@@ -791,7 +816,7 @@ fn parseHPub(alloc: std.mem.Allocator, in: *std.Io.Reader) !Message {
const byte = try in.peekByte(); const byte = try in.peekByte();
if (byte == '\r') { if (byte == '\r') {
continue :sw .in_end; continue :sw .in_end;
} else if (std.ascii.isDigit(byte)) { } else if (isDigit(byte)) {
try fourth.?.append(alloc, byte); try fourth.?.append(alloc, byte);
in.toss(1); in.toss(1);
continue :sw .in_fourth; continue :sw .in_fourth;
@@ -806,12 +831,12 @@ fn parseHPub(alloc: std.mem.Allocator, in: *std.Io.Reader) !Message {
const reply_to: ?[]const u8, const header_bytes: usize, const total_bytes: usize = const reply_to: ?[]const u8, const header_bytes: usize, const total_bytes: usize =
if (fourth) |f| .{ if (fourth) |f| .{
try alloc.dupe(u8, second.items), try alloc.dupe(u8, second.items),
try std.fmt.parseUnsigned(usize, third.items, 10), try parseUnsigned(usize, third.items, 10),
try std.fmt.parseUnsigned(usize, f.items, 10), try parseUnsigned(usize, f.items, 10),
} else .{ } else .{
null, null,
try std.fmt.parseUnsigned(usize, second.items, 10), try parseUnsigned(usize, second.items, 10),
try std.fmt.parseUnsigned(usize, third.items, 10), try parseUnsigned(usize, third.items, 10),
}; };
try in.streamExact(&payload.writer, total_bytes); try in.streamExact(&payload.writer, total_bytes);
@@ -830,11 +855,14 @@ fn parseHPub(alloc: std.mem.Allocator, in: *std.Io.Reader) !Message {
} }
test parseHPub { test parseHPub {
const alloc = std.testing.allocator;
const expectEqualDeep = std.testing.expectEqualDeep;
const expectEqual = std.testing.expectEqual;
{ {
var in: std.Io.Reader = .fixed(" foo 22 33\r\nNATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!\r\n"); var in: Reader = .fixed(" foo 22 33\r\nNATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!\r\n");
var res = try parseHPub(std.testing.allocator, &in); var res = try parseHPub(alloc, &in);
defer res.hpub.deinit(std.testing.allocator); defer res.hpub.deinit(alloc);
try std.testing.expectEqualDeep( try expectEqualDeep(
Message{ Message{
.hpub = .{ .hpub = .{
.header_bytes = 22, .header_bytes = 22,
@@ -847,14 +875,14 @@ test parseHPub {
}, },
res, res,
); );
try std.testing.expectEqual(0, in.buffered().len); try expectEqual(0, in.buffered().len);
} }
{ {
var in: std.Io.Reader = .fixed(" foo reply.to 22 33\r\nNATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!\r\n"); var in: Reader = .fixed(" foo reply.to 22 33\r\nNATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!\r\n");
var res = try parseHPub(std.testing.allocator, &in); var res = try parseHPub(alloc, &in);
defer res.hpub.deinit(std.testing.allocator); defer res.hpub.deinit(alloc);
try std.testing.expectEqualDeep( try expectEqualDeep(
Message{ Message{
.hpub = .{ .hpub = .{
.header_bytes = 22, .header_bytes = 22,
@@ -867,14 +895,14 @@ test parseHPub {
}, },
res, res,
); );
try std.testing.expectEqual(0, in.buffered().len); try expectEqual(0, in.buffered().len);
} }
{ {
var in: std.Io.Reader = .fixed(" foo 6 22 33\r\nNATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!\r\n"); var in: Reader = .fixed(" foo 6 22 33\r\nNATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!\r\n");
var res = try parseHPub(std.testing.allocator, &in); var res = try parseHPub(alloc, &in);
defer res.hpub.deinit(std.testing.allocator); defer res.hpub.deinit(alloc);
try std.testing.expectEqualDeep( try expectEqualDeep(
Message{ Message{
.hpub = .{ .hpub = .{
.header_bytes = 22, .header_bytes = 22,
@@ -887,18 +915,18 @@ test parseHPub {
}, },
res, res,
); );
try std.testing.expectEqual(0, in.buffered().len); try expectEqual(0, in.buffered().len);
} }
} }
fn readSubject(alloc: std.mem.Allocator, in: *std.Io.Reader, comptime pub_or_sub: enum { @"pub", sub }) ![]const u8 { fn readSubject(alloc: Allocator, in: *Reader, comptime pub_or_sub: enum { @"pub", sub }) ![]const u8 {
var subject_list: std.ArrayList(u8) = .empty; var subject_list: ArrayList(u8) = .empty;
errdefer subject_list.deinit(alloc); errdefer subject_list.deinit(alloc);
// Handle the first character // Handle the first character
{ {
const byte = try in.takeByte(); const byte = try in.takeByte();
if (std.ascii.isWhitespace(byte) or byte == '.' or (pub_or_sub == .@"pub" and (byte == '*' or byte == '>'))) if (isWhitespace(byte) or byte == '.' or (pub_or_sub == .@"pub" and (byte == '*' or byte == '>')))
return error.InvalidStream; return error.InvalidStream;
try subject_list.append(alloc, byte); try subject_list.append(alloc, byte);
@@ -907,37 +935,33 @@ fn readSubject(alloc: std.mem.Allocator, in: *std.Io.Reader, comptime pub_or_sub
switch (pub_or_sub) { switch (pub_or_sub) {
.sub => { .sub => {
while (in.takeByte()) |byte| { while (in.takeByte()) |byte| {
if (std.ascii.isWhitespace(byte)) break; if (isWhitespace(byte)) break;
if (std.ascii.isAscii(byte)) {
if (byte == '.') { if (byte == '.') {
const next_byte = try in.peekByte(); const next_byte = try in.peekByte();
if (next_byte == '.' or std.ascii.isWhitespace(next_byte)) if (next_byte == '.' or isWhitespace(next_byte))
return error.InvalidStream; return error.InvalidStream;
} else if (byte == '>') { } else if (byte == '>') {
const next_byte = try in.takeByte(); const next_byte = try in.takeByte();
if (!std.ascii.isWhitespace(next_byte)) if (!isWhitespace(next_byte))
return error.InvalidStream; return error.InvalidStream;
} else if (byte == '*') { } else if (byte == '*') {
const next_byte = try in.peekByte(); const next_byte = try in.peekByte();
if (next_byte != '.' and !std.ascii.isWhitespace(next_byte)) if (next_byte != '.' and !isWhitespace(next_byte))
return error.InvalidStream; return error.InvalidStream;
} }
try subject_list.append(alloc, byte); try subject_list.append(alloc, byte);
}
} else |err| return err; } else |err| return err;
}, },
.@"pub" => { .@"pub" => {
while (in.takeByte()) |byte| { while (in.takeByte()) |byte| {
if (std.ascii.isWhitespace(byte)) break; if (isWhitespace(byte)) break;
if (std.ascii.isAscii(byte)) {
if (byte == '*' or byte == '>') return error.InvalidStream; if (byte == '*' or byte == '>') return error.InvalidStream;
if (byte == '.') { if (byte == '.') {
const next_byte = try in.peekByte(); const next_byte = try in.peekByte();
if (next_byte == '.' or std.ascii.isWhitespace(next_byte)) if (next_byte == '.' or isWhitespace(next_byte))
return error.InvalidStream; return error.InvalidStream;
} }
try subject_list.append(alloc, byte); try subject_list.append(alloc, byte);
}
} else |err| return err; } else |err| return err;
}, },
} }
@@ -945,7 +969,7 @@ fn readSubject(alloc: std.mem.Allocator, in: *std.Io.Reader, comptime pub_or_sub
return subject_list.toOwnedSlice(alloc); return subject_list.toOwnedSlice(alloc);
} }
inline fn expectStreamBytes(reader: *std.Io.Reader, expected: []const u8) !void { inline fn expectStreamBytes(reader: *Reader, expected: []const u8) !void {
if (!std.mem.eql(u8, try reader.take(expected.len), expected)) { if (!std.mem.eql(u8, try reader.take(expected.len), expected)) {
@branchHint(.unlikely); @branchHint(.unlikely);
return error.InvalidStream; return error.InvalidStream;
@@ -953,12 +977,14 @@ inline fn expectStreamBytes(reader: *std.Io.Reader, expected: []const u8) !void
} }
test "parsing a stream" { test "parsing a stream" {
const alloc = std.testing.allocator;
const expectEqualDeep = std.testing.expectEqualDeep;
const input = "CONNECT {\"verbose\":false,\"pedantic\":false,\"tls_required\":fa" ++ const input = "CONNECT {\"verbose\":false,\"pedantic\":false,\"tls_required\":fa" ++
"lse,\"name\":\"NATS CLI Version v0.2.4\",\"lang\":\"go\",\"version\":\"1.43" ++ "lse,\"name\":\"NATS CLI Version v0.2.4\",\"lang\":\"go\",\"version\":\"1.43" ++
".0\",\"protocol\":1,\"echo\":true,\"headers\":true,\"no_responders\":true}\r" ++ ".0\",\"protocol\":1,\"echo\":true,\"headers\":true,\"no_responders\":true}\r" ++
"\nPUB hi 3\r\nfoo\r\n"; "\nPUB hi 3\r\nfoo\r\n";
var reader: std.Io.Reader = .fixed(input); var reader: Reader = .fixed(input);
var arena: std.heap.ArenaAllocator = .init(std.testing.allocator); var arena: ArenaAllocator = .init(alloc);
defer arena.deinit(); defer arena.deinit();
const gpa = arena.allocator(); const gpa = arena.allocator();
@@ -977,7 +1003,7 @@ test "parsing a stream" {
.no_responders = true, .no_responders = true,
} }; } };
try std.testing.expectEqualDeep(expected, msg); try expectEqualDeep(expected, msg);
} }
{ {
const msg: Message = try Message.next(gpa, &reader); const msg: Message = try Message.next(gpa, &reader);
@@ -985,6 +1011,6 @@ test "parsing a stream" {
.subject = "hi", .subject = "hi",
.payload = "foo", .payload = "foo",
} }; } };
try std.testing.expectEqualDeep(expected, msg); try expectEqualDeep(expected, msg);
} }
} }