Major restructuring

This makes things much easier to use as a library
This commit is contained in:
2026-01-06 21:56:39 -05:00
parent cc03631838
commit 4896928352
7 changed files with 648 additions and 694 deletions

835
src/Server/parse.zig Normal file
View File

@@ -0,0 +1,835 @@
const std = @import("std");
const ArenaAllocator = std.heap.ArenaAllocator;
const Allocator = std.mem.Allocator;
const ArrayList = std.ArrayList;
const Reader = std.Io.Reader;
const Writer = std.Io.Writer;
const AllocatingWriter = std.Io.Writer.Allocating;
const StaticStringMap = std.StaticStringMap;
const log = std.log;
const isDigit = std.ascii.isDigit;
const isUpper = std.ascii.isUpper;
const isWhitespace = std.ascii.isWhitespace;
const parseUnsigned = std.fmt.parseUnsigned;
const message = @import("./parse/message.zig");
pub const Message = message.Message;
pub const Payload = @import("./parse/Payload.zig");
const client_types = StaticStringMap(message.Control).initComptime(
.{
// {"INFO", .info},
.{ @tagName(.CONNECT), .CONNECT },
.{ @tagName(.PUB), .PUB },
.{ @tagName(.HPUB), .HPUB },
.{ @tagName(.SUB), .SUB },
.{ @tagName(.UNSUB), .UNSUB },
// {"MSG", .msg},
// {"HMSG", .hmsg},
.{ @tagName(.PING), .PING },
.{ @tagName(.PONG), .PONG },
// {"+OK", .@"+ok"},
// {"-ERR", .@"-err"},
},
);
fn parseStaticStringMap(input: []const u8) ?message.Control {
return client_types.get(input);
}
/// Parse a string into its associated MessageType.
const parse = parseStaticStringMap;
/// Get the next Message from the input stream.
pub fn next(alloc: Allocator, in: *Reader) !Message {
var operation_string: ArrayList(u8) = blk: {
comptime var buf_len = 0;
comptime {
for (client_types.keys()) |key| {
buf_len = @max(buf_len, key.len);
}
}
var buf: [buf_len]u8 = undefined;
break :blk .initBuffer(&buf);
};
while (in.peekByte()) |byte| {
if (isUpper(byte)) {
try operation_string.appendBounded(byte);
in.toss(1);
} else break;
} else |err| return err;
const operation = parse(operation_string.items) orelse {
log.err("Invalid operation: '{s}'", .{operation_string.items});
return error.InvalidOperation;
};
errdefer log.err("Failed to parse {s}", .{operation_string.items});
switch (operation) {
.CONNECT => return connect(alloc, in),
.PUB => {
@branchHint(.likely);
return @"pub"(alloc, in);
},
.HPUB => {
@branchHint(.likely);
return hpub(alloc, in);
},
.PING => {
try expectStreamBytes(in, "\r\n");
return .PING;
},
.PONG => {
try expectStreamBytes(in, "\r\n");
return .PONG;
},
.SUB => return sub(alloc, in),
.UNSUB => return unsub(alloc, in),
else => |msg| std.debug.panic("Not implemented: {}\n", .{msg}),
}
}
pub fn connect(alloc: Allocator, in: *Reader) !Message {
// for storing the json string
var connect_string_writer_allocating: AllocatingWriter = .init(alloc);
defer connect_string_writer_allocating.deinit();
var connect_string_writer = &connect_string_writer_allocating.writer;
// for parsing the json string
var connect_arena_allocator: ArenaAllocator = .init(alloc);
defer connect_arena_allocator.deinit();
const connect_allocator = connect_arena_allocator.allocator();
try in.discardAll(1); // throw away space
// Should read the next JSON object to the fixed buffer writer.
_ = try in.streamDelimiter(connect_string_writer, '}');
try connect_string_writer.writeByte('}');
try expectStreamBytes(in, "}\r\n"); // discard '}\r\n'
const connect_str = try connect_string_writer_allocating.toOwnedSlice();
defer alloc.free(connect_str);
const res = try std.json.parseFromSliceLeaky(
Message.Connect,
connect_allocator,
connect_str,
.{ .allocate = .alloc_always },
);
return .{ .CONNECT = try res.dupe(alloc) };
}
pub fn sub(alloc: Allocator, in: *Reader) !Message {
try in.discardAll(1); // throw away space
const subject = try readSubject(alloc, in, .sub);
const States = enum {
before_second,
in_second,
after_second,
in_third,
in_end,
};
var second: ArrayList(u8) = .empty;
errdefer second.deinit(alloc);
var third: ?ArrayList(u8) = null;
errdefer if (third) |*t| t.deinit(alloc);
sw: switch (@as(States, .before_second)) {
.before_second => {
const byte = try in.peekByte();
if (isWhitespace(byte)) {
in.toss(1);
continue :sw .before_second;
}
continue :sw .in_second;
},
.in_second => {
for (1..in.buffer.len) |i| {
try in.fill(i + 1);
if (isWhitespace(in.buffered()[i])) {
@memcpy(try second.addManyAsSlice(alloc, i), in.buffered()[0..i]);
in.toss(i);
break;
}
} else return error.EndOfStream;
continue :sw .after_second;
},
.after_second => {
const byte = try in.peekByte();
if (byte == '\r') {
continue :sw .in_end;
} else if (isWhitespace(byte)) {
in.toss(1);
continue :sw .after_second;
}
third = .empty;
continue :sw .in_third;
},
.in_third => {
for (1..in.buffer.len) |i| {
try in.fill(i + 1);
if (isWhitespace(in.buffered()[i])) {
@memcpy(try third.?.addManyAsSlice(alloc, i), in.buffered()[0..i]);
in.toss(i);
break;
}
} else return error.EndOfStream;
continue :sw .in_end;
},
.in_end => {
try expectStreamBytes(in, "\r\n");
},
}
return .{
.SUB = .{
.subject = subject,
.queue_group = if (third) |_| try second.toOwnedSlice(alloc) else null,
.sid = if (third) |*t| try t.toOwnedSlice(alloc) else try second.toOwnedSlice(alloc),
},
};
}
pub fn unsub(alloc: Allocator, in: *Reader) !Message {
const States = enum {
before_first,
in_first,
after_first,
in_second,
in_end,
};
var first: ArrayList(u8) = .empty;
errdefer first.deinit(alloc);
var second: ?ArrayList(u8) = null;
defer if (second) |*s| s.deinit(alloc);
sw: switch (@as(States, .before_first)) {
.before_first => {
const byte = try in.peekByte();
if (isWhitespace(byte)) {
in.toss(1);
continue :sw .before_first;
}
continue :sw .in_first;
},
.in_first => {
const byte = try in.peekByte();
if (!isWhitespace(byte)) {
try first.append(alloc, byte);
in.toss(1);
continue :sw .in_first;
}
continue :sw .after_first;
},
.after_first => {
const byte = try in.peekByte();
if (byte == '\r') {
continue :sw .in_end;
} else if (isWhitespace(byte)) {
in.toss(1);
continue :sw .after_first;
}
second = .empty;
continue :sw .in_second;
},
.in_second => {
const byte = try in.peekByte();
if (byte == '\r') {
continue :sw .in_end;
}
try second.?.append(alloc, byte);
in.toss(1);
continue :sw .in_second;
},
.in_end => {
try expectStreamBytes(in, "\r\n");
},
}
return .{
.UNSUB = .{
.sid = try first.toOwnedSlice(alloc),
.max_msgs = if (second) |s| try parseUnsigned(usize, s.items, 10) else null,
},
};
}
pub fn @"pub"(alloc: Allocator, in: *Reader) !Message {
try in.discardAll(1); // throw away space
// Parse subject
const subject: []const u8 = try readSubject(alloc, in, .@"pub");
errdefer alloc.free(subject);
const States = enum {
before_second,
in_second,
after_second,
in_third,
in_end,
};
var second: ArrayList(u8) = .empty;
defer second.deinit(alloc);
var third: ?ArrayList(u8) = null;
defer if (third) |*t| t.deinit(alloc);
sw: switch (@as(States, .before_second)) {
.before_second => {
// Drop whitespace
const byte = try in.peekByte();
if (isWhitespace(byte)) {
in.toss(1);
continue :sw .before_second;
}
continue :sw .in_second;
},
.in_second => {
for (1..in.buffer.len) |i| {
try in.fill(i + 1);
if (isWhitespace(in.buffered()[i])) {
@memcpy(try second.addManyAsSlice(alloc, i), in.buffered()[0..i]);
in.toss(i);
break;
}
} else return error.EndOfStream;
continue :sw .after_second;
},
.after_second => {
const byte = try in.peekByte();
if (byte == '\r') {
continue :sw .in_end;
} else if (isWhitespace(byte)) {
in.toss(1);
continue :sw .after_second;
}
third = .empty;
continue :sw .in_third;
},
.in_third => {
for (1..in.buffer.len) |i| {
try in.fill(i + 1);
if (isWhitespace(in.buffered()[i])) {
@memcpy(try third.?.addManyAsSlice(alloc, i), in.buffered()[0..i]);
in.toss(i);
break;
}
} else return error.EndOfStream;
continue :sw .in_end;
},
.in_end => {
try expectStreamBytes(in, "\r\n");
},
}
const reply_to: ?[]const u8, const bytes: usize =
if (third) |t| .{
try alloc.dupe(u8, second.items),
try parseUnsigned(usize, t.items, 10),
} else .{
null,
try parseUnsigned(usize, second.items, 10),
};
const payload: Payload = try .read(alloc, in, bytes);
errdefer payload.deinit(alloc);
try expectStreamBytes(in, "\r\n");
return .{
.PUB = .{
.subject = subject,
.payload = payload,
.reply_to = reply_to,
},
};
}
pub fn hpub(alloc: Allocator, in: *Reader) !Message {
try in.discardAll(1); // throw away space
// Parse subject
const subject: []const u8 = try readSubject(alloc, in, .@"pub");
errdefer alloc.free(subject);
const States = enum {
before_second,
in_second,
after_second,
in_third,
after_third,
in_fourth,
in_end,
};
var second: ArrayList(u8) = .empty;
defer second.deinit(alloc);
var third: ArrayList(u8) = .empty;
defer third.deinit(alloc);
var fourth: ?ArrayList(u8) = null;
defer if (fourth) |*f| f.deinit(alloc);
sw: switch (@as(States, .before_second)) {
.before_second => {
// Drop whitespace
const byte = try in.peekByte();
if (isWhitespace(byte)) {
in.toss(1);
continue :sw .before_second;
}
continue :sw .in_second;
},
.in_second => {
for (1..in.buffer.len) |i| {
try in.fill(i + 1);
if (isWhitespace(in.buffered()[i])) {
@memcpy(try second.addManyAsSlice(alloc, i), in.buffered()[0..i]);
in.toss(i);
break;
}
} else return error.EndOfStream;
continue :sw .after_second;
},
.after_second => {
const byte = try in.peekByte();
if (byte == '\r') {
continue :sw .in_end;
} else if (isWhitespace(byte)) {
in.toss(1);
continue :sw .after_second;
}
third = .empty;
continue :sw .in_third;
},
.in_third => {
for (1..in.buffer.len) |i| {
try in.fill(i + 1);
if (isWhitespace(in.buffered()[i])) {
@memcpy(try third.addManyAsSlice(alloc, i), in.buffered()[0..i]);
in.toss(i);
break;
}
} else return error.EndOfStream;
continue :sw .after_third;
},
.after_third => {
const byte = try in.peekByte();
if (byte == '\r') {
continue :sw .in_end;
} else if (isWhitespace(byte)) {
in.toss(1);
continue :sw .after_third;
}
fourth = .empty;
continue :sw .in_fourth;
},
.in_fourth => {
for (1..in.buffer.len) |i| {
try in.fill(i + 1);
if (isWhitespace(in.buffered()[i])) {
@memcpy(try fourth.?.addManyAsSlice(alloc, i), in.buffered()[0..i]);
in.toss(i);
break;
}
} else return error.EndOfStream;
continue :sw .in_end;
},
.in_end => {
try expectStreamBytes(in, "\r\n");
},
}
const reply_to: ?[]const u8, const header_bytes: usize, const total_bytes: usize =
if (fourth) |f| .{
try alloc.dupe(u8, second.items),
try parseUnsigned(usize, third.items, 10),
try parseUnsigned(usize, f.items, 10),
} else .{
null,
try parseUnsigned(usize, second.items, 10),
try parseUnsigned(usize, third.items, 10),
};
const payload: Payload = try .read(alloc, in, total_bytes);
errdefer payload.deinit(alloc);
try expectStreamBytes(in, "\r\n");
return .{
.HPUB = .{
.header_bytes = header_bytes,
.@"pub" = .{
.subject = subject,
.payload = payload,
.reply_to = reply_to,
},
},
};
}
fn readSubject(alloc: Allocator, in: *Reader, comptime pub_or_sub: enum { @"pub", sub }) ![]const u8 {
var subject_list: ArrayList(u8) = .empty;
errdefer subject_list.deinit(alloc);
// Handle the first character
{
const byte = try in.takeByte();
if (isWhitespace(byte) or byte == '.' or (pub_or_sub == .@"pub" and (byte == '*' or byte == '>')))
return error.InvalidStream;
try subject_list.append(alloc, byte);
}
switch (pub_or_sub) {
.sub => {
while (in.takeByte()) |byte| {
if (isWhitespace(byte)) break;
if (byte == '.') {
const next_byte = try in.peekByte();
if (next_byte == '.' or isWhitespace(next_byte))
return error.InvalidStream;
} else if (byte == '>') {
const next_byte = try in.takeByte();
if (!isWhitespace(next_byte))
return error.InvalidStream;
} else if (byte == '*') {
const next_byte = try in.peekByte();
if (next_byte != '.' and !isWhitespace(next_byte))
return error.InvalidStream;
}
try subject_list.append(alloc, byte);
} else |err| return err;
},
.@"pub" => {
while (in.takeByte()) |byte| {
if (isWhitespace(byte)) break;
if (byte == '*' or byte == '>') return error.InvalidStream;
if (byte == '.') {
const next_byte = try in.peekByte();
if (next_byte == '.' or isWhitespace(next_byte))
return error.InvalidStream;
}
try subject_list.append(alloc, byte);
} else |err| return err;
},
}
return subject_list.toOwnedSlice(alloc);
}
inline fn expectStreamBytes(reader: *Reader, expected: []const u8) !void {
if (!std.mem.eql(u8, try reader.take(expected.len), expected)) {
@branchHint(.unlikely);
return error.InvalidStream;
}
}
test sub {
const alloc = std.testing.allocator;
const expectEqualDeep = std.testing.expectEqualDeep;
{
var in: Reader = .fixed(" foo 1\r\n");
var res = try sub(alloc, &in);
defer res.SUB.deinit(alloc);
try expectEqualDeep(
Message{
.SUB = .{
.subject = "foo",
.queue_group = null,
.sid = "1",
},
},
res,
);
}
{
var in: Reader = .fixed(" foo 1\r\n");
var res = try sub(alloc, &in);
defer res.SUB.deinit(alloc);
try expectEqualDeep(
Message{
.SUB = .{
.subject = "foo",
.queue_group = null,
.sid = "1",
},
},
res,
);
}
{
var in: Reader = .fixed(" foo q 1\r\n");
var res = try sub(alloc, &in);
defer res.SUB.deinit(alloc);
try expectEqualDeep(
Message{
.SUB = .{
.subject = "foo",
.queue_group = "q",
.sid = "1",
},
},
res,
);
}
{
var in: Reader = .fixed(" 1 q 1\r\n");
var res = try sub(alloc, &in);
defer res.SUB.deinit(alloc);
try expectEqualDeep(
Message{
.SUB = .{
.subject = "1",
.queue_group = "q",
.sid = "1",
},
},
res,
);
}
{
var in: Reader = .fixed(" $SRV.PING 4\r\n");
var res = try sub(alloc, &in);
defer res.SUB.deinit(alloc);
try expectEqualDeep(
Message{
.SUB = .{
.subject = "$SRV.PING",
.queue_group = null,
.sid = "4",
},
},
res,
);
}
{
var in: Reader = .fixed(" foo.echo q 10\r\n");
var res = try sub(alloc, &in);
defer res.SUB.deinit(alloc);
try expectEqualDeep(
Message{
.SUB = .{
.subject = "foo.echo",
.queue_group = "q",
.sid = "10",
},
},
res,
);
}
}
test unsub {
const alloc = std.testing.allocator;
const expectEqualDeep = std.testing.expectEqualDeep;
const expectEqual = std.testing.expectEqual;
{
var in: Reader = .fixed(" 1\r\n");
var res = try unsub(alloc, &in);
defer res.UNSUB.deinit(alloc);
try expectEqualDeep(
Message{
.UNSUB = .{
.sid = "1",
.max_msgs = null,
},
},
res,
);
try expectEqual(0, in.buffered().len);
}
{
var in: Reader = .fixed(" 1 1\r\n");
var res = try unsub(alloc, &in);
defer res.UNSUB.deinit(alloc);
try expectEqualDeep(
Message{
.UNSUB = .{
.sid = "1",
.max_msgs = 1,
},
},
res,
);
try expectEqual(0, in.buffered().len);
}
}
test @"pub" {
const alloc = std.testing.allocator;
const expectEqualDeep = std.testing.expectEqualDeep;
const expectEqual = std.testing.expectEqual;
{
var in: Reader = .fixed(" foo 3\r\nbar\r\n");
var res = try @"pub"(alloc, &in);
defer res.PUB.deinit(alloc);
try expectEqualDeep(
Message{
.PUB = .{
.subject = "foo",
.reply_to = null,
.payload = .{
.len = 3,
.short = blk: {
var s: [128]u8 = undefined;
@memcpy(s[0..3], "bar");
break :blk s;
},
.long = null,
},
},
},
res,
);
try expectEqual(0, in.buffered().len);
}
{
var in: Reader = .fixed(" foo reply.to 3\r\nbar\r\n");
var res = try @"pub"(alloc, &in);
defer res.PUB.deinit(alloc);
try expectEqualDeep(
Message{
.PUB = .{
.subject = "foo",
.reply_to = "reply.to",
.payload = .{
.len = 3,
.short = blk: {
var s: [128]u8 = undefined;
@memcpy(s[0..3], "bar");
break :blk s;
},
.long = null,
},
},
},
res,
);
try expectEqual(0, in.buffered().len);
}
// numeric reply subject
{
var in: Reader = .fixed(" foo 5 3\r\nbar\r\n");
var res = try @"pub"(alloc, &in);
defer res.PUB.deinit(alloc);
try expectEqualDeep(
Message{
.PUB = .{
.subject = "foo",
.reply_to = "5",
.payload = .{
.len = 3,
.short = blk: {
var s: [128]u8 = undefined;
@memcpy(s[0..3], "bar");
break :blk s;
},
.long = null,
},
},
},
res,
);
try expectEqual(0, in.buffered().len);
}
}
test hpub {
const alloc = std.testing.allocator;
const expectEqualDeep = std.testing.expectEqualDeep;
const expectEqual = std.testing.expectEqual;
{
var in: Reader = .fixed(" foo 22 33\r\nNATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!\r\n");
var res = try hpub(alloc, &in);
defer res.HPUB.deinit(alloc);
try expectEqualDeep(
Message{
.HPUB = .{
.header_bytes = 22,
.@"pub" = .{
.subject = "foo",
.reply_to = null,
.payload = .{
.len = "NATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!".len,
.short = blk: {
var s: [128]u8 = undefined;
const str = "NATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!";
@memcpy(s[0..str.len], str);
break :blk s;
},
.long = null,
},
},
},
},
res,
);
try expectEqual(0, in.buffered().len);
}
{
var in: Reader = .fixed(" foo reply.to 22 33\r\nNATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!\r\n");
var res = try hpub(alloc, &in);
defer res.HPUB.deinit(alloc);
try expectEqualDeep(
Message{
.HPUB = .{
.header_bytes = 22,
.@"pub" = .{
.subject = "foo",
.reply_to = "reply.to",
.payload = .{
.len = "NATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!".len,
.short = blk: {
var s: [128]u8 = undefined;
const str = "NATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!";
@memcpy(s[0..str.len], str);
break :blk s;
},
.long = null,
},
},
},
},
res,
);
try expectEqual(0, in.buffered().len);
}
{
var in: Reader = .fixed(" foo 6 22 33\r\nNATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!\r\n");
var res = try hpub(alloc, &in);
defer res.HPUB.deinit(alloc);
try expectEqualDeep(
Message{
.HPUB = .{
.header_bytes = 22,
.@"pub" = .{
.subject = "foo",
.reply_to = "6",
.payload = .{
.len = "NATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!".len,
.short = blk: {
var s: [128]u8 = undefined;
const str = "NATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!";
@memcpy(s[0..str.len], str);
break :blk s;
},
.long = null,
},
},
},
},
res,
);
try expectEqual(0, in.buffered().len);
}
}