This commit is contained in:
2025-12-02 19:53:03 -05:00
parent 41f4ee721b
commit aceb671ddc
4 changed files with 366 additions and 141 deletions

View File

@@ -32,20 +32,48 @@ pub const MessageType = enum {
};
pub const Message = union(enum) {
info: void,
/// TODO: REMOVE
not_real: void,
info: ServerInfo,
connect: Connect,
@"pub": Pub,
hpub: void,
sub: void,
sub: Sub,
unsub: void,
msg: void,
msg: Msg,
hmsg: void,
ping,
pong,
@"+ok": void,
@"-err": void,
const Connect = struct {
pub const ServerInfo = struct {
/// The unique identifier of the NATS server.
server_id: []const u8,
/// The name of the NATS server.
server_name: []const u8,
/// The version of NATS.
version: []const u8,
/// The version of golang the NATS server was built with.
go: []const u8 = "0.0.0",
/// The IP address used to start the NATS server,
/// by default this will be 0.0.0.0 and can be
/// configured with -client_advertise host:port.
host: []const u8 = "0.0.0.0",
/// The port number the NATS server is configured
/// to listen on.
port: u16 = 4222,
/// Whether the server supports headers.
headers: bool = false,
/// Maximum payload size, in bytes, that the server
/// will accept from the client.
max_payload: u64,
/// An integer indicating the protocol version of
/// the server. The server version 1.2.0 sets this
/// to 1 to indicate that it supports the "Echo"
/// feature.
proto: u32 = 1,
};
pub const Connect = struct {
verbose: bool = false,
pedantic: bool = false,
tls_required: bool = false,
@@ -63,9 +91,26 @@ pub const Message = union(enum) {
headers: ?bool = null,
nkey: ?[]const u8 = null,
};
const Pub = struct {
pub const Pub = struct {
/// The destination subject to publish to.
subject: []const u8,
/// The reply subject that subscribers can use to send a response back to the publisher/requestor.
reply_to: ?[]const u8 = null,
/// The message payload data.
payload: []const u8,
};
pub const Sub = struct {
/// The subject name to subscribe to.
subject: []const u8,
/// If specified, the subscriber will join this queue group.
queue_group: ?[]const u8,
/// A unique alphanumeric subscription ID, generated by the client.
sid: []const u8,
};
pub const Msg = struct {
subject: []const u8,
sid: []const u8,
reply_to: ?[]const u8,
payload: []const u8,
};
@@ -132,32 +177,7 @@ pub const Message = union(enum) {
try in.discardAll(1); // throw away space
// Parse subject
const subject: []const u8 = blk: {
// TODO: should be ARENA allocator
var subject_list: std.ArrayList(u8) = try .initCapacity(alloc, 1024);
// Handle the first character
{
const byte = try in.takeByte();
if (byte == '.' or std.ascii.isWhitespace(byte))
return error.InvalidSubject;
try subject_list.append(alloc, byte);
}
while (in.takeByte() catch null) |byte| {
if (std.ascii.isWhitespace(byte)) break;
if (std.ascii.isAscii(byte)) {
if (byte == '.') {
const next_byte = try in.peekByte();
if (next_byte == '.' or std.ascii.isWhitespace(next_byte))
return error.InvalidSubject;
}
try subject_list.append(alloc, byte);
}
} else return error.InvalidStream;
break :blk subject_list.items;
};
const subject: []const u8 = try readSubject(alloc, in);
// Parse byte count
const byte_count = blk: {
@@ -186,12 +206,12 @@ pub const Message = union(enum) {
break :blk bytes;
};
std.debug.print("buffer: '{s}'\n", .{in.buffered()});
// return std.debug.panic("not implemented", .{});
return .{ .@"pub" = .{
.subject = subject,
.payload = payload,
} };
return .{
.@"pub" = .{
.subject = subject,
.payload = payload,
},
};
},
.ping => {
std.debug.assert(std.mem.eql(u8, try in.take(2), "\r\n"));
@@ -201,11 +221,69 @@ pub const Message = union(enum) {
std.debug.assert(std.mem.eql(u8, try in.take(2), "\r\n"));
return .pong;
},
.sub => {
std.debug.assert(std.ascii.isWhitespace(try in.takeByte()));
const subject = try readSubject(alloc, in);
const second = blk: {
// Drop whitespace
while (in.peekByte() catch null) |byte| {
if (std.ascii.isWhitespace(byte)) {
in.toss(1);
} else break;
} else return error.InvalidStream;
var acc: std.ArrayList(u8) = try .initCapacity(alloc, 32);
while (in.takeByte() catch null) |byte| {
if (std.ascii.isWhitespace(byte)) break;
try acc.append(alloc, byte);
} else return error.InvalidStream;
break :blk try acc.toOwnedSlice(alloc);
};
const queue_group = if ((try in.peekByte()) != '\r') second else null;
const sid = if (queue_group) |_| try in.takeDelimiterExclusive('\r') else second;
std.debug.assert(std.mem.eql(u8, try in.take(2), "\r\n"));
return .{
.sub = .{
.subject = subject,
.queue_group = queue_group,
.sid = sid,
},
};
},
else => |msg| std.debug.panic("Not implemented: {}\n", .{msg}),
}
}
};
fn readSubject(alloc: std.mem.Allocator, in: *std.Io.Reader) ![]const u8 {
// TODO: should be ARENA allocator
var subject_list: std.ArrayList(u8) = try .initCapacity(alloc, 1024);
// Handle the first character
{
const byte = try in.takeByte();
std.debug.assert(!std.ascii.isWhitespace(byte));
if (byte == '.')
return error.InvalidSubject;
try subject_list.append(alloc, byte);
}
while (in.takeByte() catch null) |byte| {
if (std.ascii.isWhitespace(byte)) break;
if (std.ascii.isAscii(byte)) {
if (byte == '.') {
const next_byte = try in.peekByte();
if (next_byte == '.' or std.ascii.isWhitespace(next_byte))
return error.InvalidSubject;
}
try subject_list.append(alloc, byte);
}
} else return error.InvalidStream;
return subject_list.toOwnedSlice(alloc);
}
fn parseJsonMessage(T: type, alloc: std.mem.Allocator, in: *std.Io.Reader) !T {
var reader: std.json.Reader = .init(alloc, in);
return std.json.innerParse(T, alloc, &reader, .{