mirror of
https://git.robbyzambito.me/zits
synced 2026-02-04 19:54:48 +00:00
made some progress on subscriptions
This commit is contained in:
@@ -38,7 +38,7 @@ pub const Message = union(MessageType) {
|
||||
@"pub": Pub,
|
||||
hpub: void,
|
||||
sub: Sub,
|
||||
unsub: void,
|
||||
unsub: Unsub,
|
||||
msg: Msg,
|
||||
hmsg: void,
|
||||
ping,
|
||||
@@ -117,6 +117,12 @@ pub const Message = union(MessageType) {
|
||||
/// A unique alphanumeric subscription ID, generated by the client.
|
||||
sid: []const u8,
|
||||
};
|
||||
pub const Unsub = struct {
|
||||
/// The unique alphanumeric subscription ID of the subject to unsubscribe from.
|
||||
sid: []const u8,
|
||||
/// A number of messages to wait for before automatically unsubscribing.
|
||||
max_msgs: ?usize = null,
|
||||
};
|
||||
pub const Msg = struct {
|
||||
subject: []const u8,
|
||||
sid: []const u8,
|
||||
@@ -225,7 +231,7 @@ pub const Message = union(MessageType) {
|
||||
}
|
||||
} else return error.InvalidStream;
|
||||
|
||||
break :blk try std.fmt.parseUnsigned(u64, byte_count_list.items, 10);
|
||||
break :blk try std.fmt.parseUnsigned(usize, byte_count_list.items, 10);
|
||||
};
|
||||
|
||||
const payload = blk: {
|
||||
@@ -280,6 +286,55 @@ pub const Message = union(MessageType) {
|
||||
},
|
||||
};
|
||||
},
|
||||
.unsub => {
|
||||
std.debug.assert(std.ascii.isWhitespace(try in.takeByte()));
|
||||
// Parse byte count
|
||||
const sid = blk: {
|
||||
var acc: std.ArrayList(u8) = try .initCapacity(alloc, 8);
|
||||
while (in.peekByte() catch null) |byte| {
|
||||
if (std.ascii.isWhitespace(byte)) break;
|
||||
try acc.append(alloc, byte);
|
||||
in.toss(1);
|
||||
} else return error.InvalidStream;
|
||||
break :blk try acc.toOwnedSlice(alloc);
|
||||
};
|
||||
|
||||
if ((try in.peekByte()) == '\r') {
|
||||
std.debug.assert(std.mem.eql(u8, try in.take(2), "\r\n"));
|
||||
return .{
|
||||
.unsub = .{
|
||||
.sid = sid,
|
||||
},
|
||||
};
|
||||
} else if (std.ascii.isWhitespace(try in.peekByte())) {
|
||||
in.toss(1);
|
||||
const max_msgs = blk: {
|
||||
var max_msgs_list: std.ArrayList(u8) = try .initCapacity(alloc, 64);
|
||||
while (in.takeByte() catch null) |byte| {
|
||||
if (std.ascii.isWhitespace(byte)) {
|
||||
std.debug.assert(byte == '\r');
|
||||
std.debug.assert(try in.takeByte() == '\n');
|
||||
break;
|
||||
}
|
||||
|
||||
if (std.ascii.isDigit(byte)) {
|
||||
try max_msgs_list.append(alloc, byte);
|
||||
} else {
|
||||
return error.InvalidStream;
|
||||
}
|
||||
} else return error.InvalidStream;
|
||||
|
||||
break :blk try std.fmt.parseUnsigned(usize, max_msgs_list.items, 10);
|
||||
};
|
||||
|
||||
return .{
|
||||
.unsub = .{
|
||||
.sid = sid,
|
||||
.max_msgs = max_msgs,
|
||||
},
|
||||
};
|
||||
} else return error.InvalidStream;
|
||||
},
|
||||
else => |msg| std.debug.panic("Not implemented: {}\n", .{msg}),
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user