mirror of
https://git.robbyzambito.me/zits
synced 2026-02-04 19:54:48 +00:00
some cleanup and freeing
This commit is contained in:
@@ -68,9 +68,8 @@ pub const ClientState = struct {
|
||||
}
|
||||
|
||||
/// Return true if the value was put in the clients buffer to process, else false.
|
||||
pub fn send(self: *ClientState, io: std.Io, msg: Message) (std.Io.Cancelable || std.Io.QueueClosedError)!bool {
|
||||
pub fn send(self: *ClientState, io: std.Io, msg: Message) (std.Io.Cancelable || std.Io.QueueClosedError)!void {
|
||||
try self.recv_queue.putOne(io, msg);
|
||||
return true;
|
||||
}
|
||||
|
||||
pub fn next(self: *ClientState, allocator: std.mem.Allocator) !Message {
|
||||
|
||||
@@ -111,22 +111,35 @@ fn handleConnection(
|
||||
switch (msg) {
|
||||
.ping => {
|
||||
// Respond to ping with pong.
|
||||
for (0..5) |_| {
|
||||
if (try client_state.send(io, .pong)) {
|
||||
break;
|
||||
}
|
||||
} else {}
|
||||
try client_state.send(io, .pong);
|
||||
},
|
||||
.@"pub" => |@"pub"| {
|
||||
try server.publishMessage(io, @"pub");
|
||||
.@"pub" => |pb| {
|
||||
defer {
|
||||
allocator.free(pb.payload);
|
||||
allocator.free(pb.subject);
|
||||
if (pb.reply_to) |r| {
|
||||
allocator.free(r);
|
||||
}
|
||||
}
|
||||
try server.publishMessage(io, pb);
|
||||
if (client_state.connect.connect.verbose) {
|
||||
_ = try client_state.send(io, .@"+ok");
|
||||
try client_state.send(io, .@"+ok");
|
||||
}
|
||||
},
|
||||
.sub => |sub| {
|
||||
defer {
|
||||
allocator.free(sub.subject);
|
||||
allocator.free(sub.sid);
|
||||
if (sub.queue_group) |q| {
|
||||
allocator.free(q);
|
||||
}
|
||||
}
|
||||
try server.subscribe(allocator, id, sub);
|
||||
},
|
||||
.unsub => |unsub| {
|
||||
defer {
|
||||
allocator.free(unsub.sid);
|
||||
}
|
||||
try server.unsubscribe(id, unsub);
|
||||
},
|
||||
else => |e| {
|
||||
|
||||
@@ -206,6 +206,7 @@ pub const Message = union(MessageType) {
|
||||
// Parse byte count
|
||||
const byte_count = blk: {
|
||||
var byte_count_list: std.ArrayList(u8) = try .initCapacity(alloc, 64);
|
||||
defer byte_count_list.deinit(alloc);
|
||||
while (in.peekByte()) |byte| {
|
||||
if (std.ascii.isWhitespace(byte)) {
|
||||
try expectStreamBytes(in, "\r\n");
|
||||
|
||||
Reference in New Issue
Block a user