mirror of
https://git.robbyzambito.me/zits
synced 2026-02-04 03:34:48 +00:00
publish works
starting to use errors instead of unreachable for stream parsing
This commit is contained in:
@@ -109,17 +109,6 @@ pub fn writeInfo(out: *std.Io.Writer, info: Message.ServerInfo) !void {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn writeMsg(out: *std.Io.Writer, msg: Message.Msg) !void {
|
fn writeMsg(out: *std.Io.Writer, msg: Message.Msg) !void {
|
||||||
std.debug.print("PRINTING MESSAGE\n\n\n\n", .{});
|
|
||||||
std.debug.print(
|
|
||||||
"MSG {s} {s} {s} {d}\r\n{s}\r\n-\n\n\n",
|
|
||||||
.{
|
|
||||||
msg.subject,
|
|
||||||
msg.sid,
|
|
||||||
msg.reply_to orelse "",
|
|
||||||
msg.payload.len,
|
|
||||||
msg.payload,
|
|
||||||
},
|
|
||||||
);
|
|
||||||
try out.print(
|
try out.print(
|
||||||
"MSG {s} {s} {s} {d}\r\n{s}\r\n",
|
"MSG {s} {s} {s} {d}\r\n{s}\r\n",
|
||||||
.{
|
.{
|
||||||
|
|||||||
@@ -180,6 +180,7 @@ fn publishMessage(server: *Server, io: std.Io, msg: Message.Pub) !void {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn subscribe(server: *Server, gpa: std.mem.Allocator, id: usize, msg: Message.Sub) !void {
|
fn subscribe(server: *Server, gpa: std.mem.Allocator, id: usize, msg: Message.Sub) !void {
|
||||||
|
std.debug.print("Recieved SUBSCRIBE message: {any}\n\n", .{msg});
|
||||||
var subs_for_subject: std.AutoHashMapUnmanaged(usize, []const u8) = if (server.subscriptions.fetchRemove(msg.subject)) |s| s.value else .empty;
|
var subs_for_subject: std.AutoHashMapUnmanaged(usize, []const u8) = if (server.subscriptions.fetchRemove(msg.subject)) |s| s.value else .empty;
|
||||||
try subs_for_subject.put(gpa, id, msg.sid);
|
try subs_for_subject.put(gpa, id, msg.sid);
|
||||||
try server.subscriptions.put(gpa, msg.subject, subs_for_subject);
|
try server.subscriptions.put(gpa, msg.subject, subs_for_subject);
|
||||||
|
|||||||
@@ -206,7 +206,7 @@ pub const Message = union(MessageType) {
|
|||||||
// Parse byte count
|
// Parse byte count
|
||||||
const byte_count = blk: {
|
const byte_count = blk: {
|
||||||
var byte_count_list: std.ArrayList(u8) = try .initCapacity(alloc, 64);
|
var byte_count_list: std.ArrayList(u8) = try .initCapacity(alloc, 64);
|
||||||
while (in.takeByte() catch null) |byte| {
|
while (in.takeByte()) |byte| {
|
||||||
if (std.ascii.isWhitespace(byte)) {
|
if (std.ascii.isWhitespace(byte)) {
|
||||||
std.debug.assert(byte == '\r');
|
std.debug.assert(byte == '\r');
|
||||||
std.debug.assert(try in.takeByte() == '\n');
|
std.debug.assert(try in.takeByte() == '\n');
|
||||||
@@ -218,7 +218,7 @@ pub const Message = union(MessageType) {
|
|||||||
} else {
|
} else {
|
||||||
return error.InvalidStream;
|
return error.InvalidStream;
|
||||||
}
|
}
|
||||||
} else return error.InvalidStream;
|
} else |err| return err;
|
||||||
|
|
||||||
break :blk try std.fmt.parseUnsigned(usize, byte_count_list.items, 10);
|
break :blk try std.fmt.parseUnsigned(usize, byte_count_list.items, 10);
|
||||||
};
|
};
|
||||||
@@ -226,7 +226,7 @@ pub const Message = union(MessageType) {
|
|||||||
const payload = blk: {
|
const payload = blk: {
|
||||||
const bytes = try alloc.alloc(u8, byte_count);
|
const bytes = try alloc.alloc(u8, byte_count);
|
||||||
try in.readSliceAll(bytes);
|
try in.readSliceAll(bytes);
|
||||||
std.debug.assert(std.mem.eql(u8, try in.take(2), "\r\n"));
|
try assertStreamBytes(std.mem.eql(u8, try in.take(2), "\r\n"));
|
||||||
break :blk bytes;
|
break :blk bytes;
|
||||||
};
|
};
|
||||||
|
|
||||||
@@ -238,34 +238,35 @@ pub const Message = union(MessageType) {
|
|||||||
};
|
};
|
||||||
},
|
},
|
||||||
.ping => {
|
.ping => {
|
||||||
std.debug.assert(std.mem.eql(u8, try in.take(2), "\r\n"));
|
try assertStreamBytes(std.mem.eql(u8, try in.take(2), "\r\n"));
|
||||||
return .ping;
|
return .ping;
|
||||||
},
|
},
|
||||||
.pong => {
|
.pong => {
|
||||||
std.debug.assert(std.mem.eql(u8, try in.take(2), "\r\n"));
|
try assertStreamBytes(std.mem.eql(u8, try in.take(2), "\r\n"));
|
||||||
return .pong;
|
return .pong;
|
||||||
},
|
},
|
||||||
.sub => {
|
.sub => {
|
||||||
std.debug.assert(std.ascii.isWhitespace(try in.takeByte()));
|
try assertStreamBytes(std.ascii.isWhitespace(try in.takeByte()));
|
||||||
const subject = try readSubject(alloc, in);
|
const subject = try readSubject(alloc, in);
|
||||||
const second = blk: {
|
const second = blk: {
|
||||||
// Drop whitespace
|
// Drop whitespace
|
||||||
while (in.peekByte() catch null) |byte| {
|
while (in.peekByte()) |byte| {
|
||||||
if (std.ascii.isWhitespace(byte)) {
|
if (std.ascii.isWhitespace(byte)) {
|
||||||
in.toss(1);
|
in.toss(1);
|
||||||
} else break;
|
} else break;
|
||||||
} else return error.InvalidStream;
|
} else |err| return err;
|
||||||
|
|
||||||
var acc: std.ArrayList(u8) = try .initCapacity(alloc, 32);
|
var acc: std.ArrayList(u8) = try .initCapacity(alloc, 32);
|
||||||
while (in.takeByte() catch null) |byte| {
|
while (in.takeByte()) |byte| {
|
||||||
if (std.ascii.isWhitespace(byte)) break;
|
if (std.ascii.isWhitespace(byte)) break;
|
||||||
try acc.append(alloc, byte);
|
try acc.append(alloc, byte);
|
||||||
} else return error.InvalidStream;
|
} else |err| return err;
|
||||||
|
|
||||||
break :blk try acc.toOwnedSlice(alloc);
|
break :blk try acc.toOwnedSlice(alloc);
|
||||||
};
|
};
|
||||||
const queue_group = if ((try in.peekByte()) != '\r') second else null;
|
const queue_group = if ((try in.peekByte()) != '\r') second else null;
|
||||||
const sid = if (queue_group) |_| try in.takeDelimiterExclusive('\r') else second;
|
const sid = if (queue_group) |_| try in.takeDelimiterExclusive('\r') else second;
|
||||||
|
std.debug.print("SID is '{s}'\n", .{sid});
|
||||||
std.debug.assert(std.mem.eql(u8, try in.take(2), "\r\n"));
|
std.debug.assert(std.mem.eql(u8, try in.take(2), "\r\n"));
|
||||||
return .{
|
return .{
|
||||||
.sub = .{
|
.sub = .{
|
||||||
@@ -276,20 +277,20 @@ pub const Message = union(MessageType) {
|
|||||||
};
|
};
|
||||||
},
|
},
|
||||||
.unsub => {
|
.unsub => {
|
||||||
std.debug.assert(std.ascii.isWhitespace(try in.takeByte()));
|
try assertStreamBytes(std.ascii.isWhitespace(try in.takeByte()));
|
||||||
// Parse byte count
|
// Parse byte count
|
||||||
const sid = blk: {
|
const sid = blk: {
|
||||||
var acc: std.ArrayList(u8) = try .initCapacity(alloc, 8);
|
var acc: std.ArrayList(u8) = try .initCapacity(alloc, 8);
|
||||||
while (in.peekByte() catch null) |byte| {
|
while (in.peekByte()) |byte| {
|
||||||
if (std.ascii.isWhitespace(byte)) break;
|
if (std.ascii.isWhitespace(byte)) break;
|
||||||
try acc.append(alloc, byte);
|
try acc.append(alloc, byte);
|
||||||
in.toss(1);
|
in.toss(1);
|
||||||
} else return error.InvalidStream;
|
} else |err| return err;
|
||||||
break :blk try acc.toOwnedSlice(alloc);
|
break :blk try acc.toOwnedSlice(alloc);
|
||||||
};
|
};
|
||||||
|
|
||||||
if ((try in.peekByte()) == '\r') {
|
if ((try in.peekByte()) == '\r') {
|
||||||
std.debug.assert(std.mem.eql(u8, try in.take(2), "\r\n"));
|
try assertStreamBytes(std.mem.eql(u8, try in.take(2), "\r\n"));
|
||||||
return .{
|
return .{
|
||||||
.unsub = .{
|
.unsub = .{
|
||||||
.sid = sid,
|
.sid = sid,
|
||||||
@@ -299,7 +300,7 @@ pub const Message = union(MessageType) {
|
|||||||
in.toss(1);
|
in.toss(1);
|
||||||
const max_msgs = blk: {
|
const max_msgs = blk: {
|
||||||
var max_msgs_list: std.ArrayList(u8) = try .initCapacity(alloc, 64);
|
var max_msgs_list: std.ArrayList(u8) = try .initCapacity(alloc, 64);
|
||||||
while (in.takeByte() catch null) |byte| {
|
while (in.takeByte()) |byte| {
|
||||||
if (std.ascii.isWhitespace(byte)) {
|
if (std.ascii.isWhitespace(byte)) {
|
||||||
std.debug.assert(byte == '\r');
|
std.debug.assert(byte == '\r');
|
||||||
std.debug.assert(try in.takeByte() == '\n');
|
std.debug.assert(try in.takeByte() == '\n');
|
||||||
@@ -311,7 +312,7 @@ pub const Message = union(MessageType) {
|
|||||||
} else {
|
} else {
|
||||||
return error.InvalidStream;
|
return error.InvalidStream;
|
||||||
}
|
}
|
||||||
} else return error.InvalidStream;
|
} else |err| return err;
|
||||||
|
|
||||||
break :blk try std.fmt.parseUnsigned(usize, max_msgs_list.items, 10);
|
break :blk try std.fmt.parseUnsigned(usize, max_msgs_list.items, 10);
|
||||||
};
|
};
|
||||||
@@ -343,7 +344,7 @@ fn readSubject(alloc: std.mem.Allocator, in: *std.Io.Reader) ![]const u8 {
|
|||||||
try subject_list.append(alloc, byte);
|
try subject_list.append(alloc, byte);
|
||||||
}
|
}
|
||||||
|
|
||||||
while (in.takeByte() catch null) |byte| {
|
while (in.takeByte()) |byte| {
|
||||||
if (std.ascii.isWhitespace(byte)) break;
|
if (std.ascii.isWhitespace(byte)) break;
|
||||||
if (std.ascii.isAscii(byte)) {
|
if (std.ascii.isAscii(byte)) {
|
||||||
if (byte == '.') {
|
if (byte == '.') {
|
||||||
@@ -353,7 +354,7 @@ fn readSubject(alloc: std.mem.Allocator, in: *std.Io.Reader) ![]const u8 {
|
|||||||
}
|
}
|
||||||
try subject_list.append(alloc, byte);
|
try subject_list.append(alloc, byte);
|
||||||
}
|
}
|
||||||
} else return error.InvalidStream;
|
} else |err| return err;
|
||||||
return subject_list.toOwnedSlice(alloc);
|
return subject_list.toOwnedSlice(alloc);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -385,6 +386,13 @@ fn parsePub(in: *std.Io.Reader) !Message.Pub {
|
|||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
inline fn assertStreamBytes(cond: bool) !void {
|
||||||
|
if (!cond) {
|
||||||
|
@branchHint(.unlikely);
|
||||||
|
return error.InvalidStream;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// try returning error in debug mode, only null in release?
|
// try returning error in debug mode, only null in release?
|
||||||
// pub fn parseNextMessage(alloc: std.mem.Allocator, in: *std.Io.Reader) ?Message {
|
// pub fn parseNextMessage(alloc: std.mem.Allocator, in: *std.Io.Reader) ?Message {
|
||||||
// const message_type: MessageType = blk: {
|
// const message_type: MessageType = blk: {
|
||||||
|
|||||||
Reference in New Issue
Block a user