Fix parse errors, ownership errors.

This commit is contained in:
2026-01-03 03:16:51 +00:00
parent 9e32d014c2
commit a4ec798521
2 changed files with 91 additions and 48 deletions

View File

@@ -10,6 +10,11 @@ const Subscription = struct {
subject: []const u8, subject: []const u8,
client_id: usize, client_id: usize,
sid: []const u8, sid: []const u8,
fn deinit(self: Subscription, alloc: std.mem.Allocator) void {
alloc.free(self.subject);
alloc.free(self.sid);
}
}; };
info: ServerInfo, info: ServerInfo,
@@ -29,12 +34,10 @@ pub fn deinit(server: *Server, io: std.Io, alloc: std.mem.Allocator) void {
server.subs_lock.lockUncancelable(io); server.subs_lock.lockUncancelable(io);
defer server.subs_lock.unlock(io); defer server.subs_lock.unlock(io);
for (server.subscriptions.items) |sub| { for (server.subscriptions.items) |sub| {
alloc.free(sub.sid); sub.deinit(alloc);
alloc.free(sub.subject);
} }
server.subscriptions.shrinkAndFree(alloc, 0); server.subscriptions.deinit(alloc);
server.clients.deinit(alloc);
server.clients.clearAndFree(alloc);
} }
pub fn main(alloc: std.mem.Allocator, server_config: ServerInfo) !void { pub fn main(alloc: std.mem.Allocator, server_config: ServerInfo) !void {
@@ -118,8 +121,7 @@ fn removeClient(server: *Server, io: std.Io, allocator: std.mem.Allocator, id: u
const i = len - from_end - 1; const i = len - from_end - 1;
const sub = server.subscriptions.items[i]; const sub = server.subscriptions.items[i];
if (sub.client_id == id) { if (sub.client_id == id) {
allocator.free(sub.sid); sub.deinit(allocator);
allocator.free(sub.subject);
_ = server.subscriptions.swapRemove(i); _ = server.subscriptions.swapRemove(i);
} }
} }
@@ -205,9 +207,11 @@ fn handleConnection(
try server.publishMessage(io, server_allocator, &client, msg); try server.publishMessage(io, server_allocator, &client, msg);
}, },
.sub => |sub| { .sub => |sub| {
defer sub.deinit(server_allocator);
try server.subscribe(io, server_allocator, id, sub); try server.subscribe(io, server_allocator, id, sub);
}, },
.unsub => |unsub| { .unsub => |unsub| {
defer unsub.deinit(server_allocator);
try server.unsubscribe(io, server_allocator, id, unsub); try server.unsubscribe(io, server_allocator, id, unsub);
}, },
.connect => |connect| { .connect => |connect| {
@@ -313,10 +317,14 @@ fn publishMessage(server: *Server, io: std.Io, alloc: std.mem.Allocator, source_
fn subscribe(server: *Server, io: std.Io, gpa: std.mem.Allocator, id: usize, msg: Message.Sub) !void { fn subscribe(server: *Server, io: std.Io, gpa: std.mem.Allocator, id: usize, msg: Message.Sub) !void {
try server.subs_lock.lock(io); try server.subs_lock.lock(io);
defer server.subs_lock.unlock(io); defer server.subs_lock.unlock(io);
const subject = try gpa.dupe(u8, msg.subject);
errdefer gpa.free(subject);
const sid = try gpa.dupe(u8, msg.sid);
errdefer gpa.free(sid);
try server.subscriptions.append(gpa, .{ try server.subscriptions.append(gpa, .{
.subject = msg.subject, .subject = subject,
.client_id = id, .client_id = id,
.sid = msg.sid, .sid = sid,
}); });
} }
@@ -328,8 +336,7 @@ fn unsubscribe(server: *Server, io: std.Io, gpa: std.mem.Allocator, id: usize, m
const i = len - from_end - 1; const i = len - from_end - 1;
const sub = server.subscriptions.items[i]; const sub = server.subscriptions.items[i];
if (sub.client_id == id and std.mem.eql(u8, sub.sid, msg.sid)) { if (sub.client_id == id and std.mem.eql(u8, sub.sid, msg.sid)) {
gpa.free(sub.sid); sub.deinit(gpa);
gpa.free(sub.subject);
_ = server.subscriptions.swapRemove(i); _ = server.subscriptions.swapRemove(i);
} }
} }

View File

@@ -235,8 +235,6 @@ pub const Message = union(enum) {
break :blk .initBuffer(&buf); break :blk .initBuffer(&buf);
}; };
std.log.debug("buffered: '{s}'", .{in.buffered()});
while (in.peekByte()) |byte| { while (in.peekByte()) |byte| {
if (std.ascii.isUpper(byte)) { if (std.ascii.isUpper(byte)) {
try operation_string.appendBounded(byte); try operation_string.appendBounded(byte);
@@ -362,6 +360,7 @@ fn parseSub(alloc: std.mem.Allocator, in: *std.Io.Reader) !Message {
} }
try third.?.append(alloc, byte); try third.?.append(alloc, byte);
in.toss(1); in.toss(1);
continue :sw .in_third;
}, },
.in_end => { .in_end => {
try expectStreamBytes(in, "\r\n"); try expectStreamBytes(in, "\r\n");
@@ -453,49 +452,86 @@ test parseSub {
res, res,
); );
} }
{
var in: std.Io.Reader = .fixed(" foo.echo q 10\r\n");
var res = try parseSub(std.testing.allocator, &in);
defer res.sub.deinit(std.testing.allocator);
try std.testing.expectEqualDeep(
Message{
.sub = .{
.subject = "foo.echo",
.queue_group = "q",
.sid = "10",
},
},
res,
);
}
} }
fn parseUnsub(alloc: std.mem.Allocator, in: *std.Io.Reader) !Message { fn parseUnsub(alloc: std.mem.Allocator, in: *std.Io.Reader) !Message {
try in.discardAll(1); // throw away space const States = enum {
before_first,
in_first,
after_first,
in_second,
in_end,
};
var first: std.ArrayList(u8) = .empty; var first: std.ArrayList(u8) = .empty;
errdefer first.deinit(alloc); errdefer first.deinit(alloc);
var second: ?std.ArrayList(u8) = null;
defer if (second) |*s| s.deinit(alloc);
while (in.peekByte()) |byte| { sw: switch (@as(States, .before_first)) {
if (std.ascii.isWhitespace(byte)) break; .before_first => {
try first.append(alloc, byte); const byte = try in.peekByte();
in.toss(1); if (std.ascii.isWhitespace(byte)) {
} else |err| return err; in.toss(1);
continue :sw .before_first;
while (in.peekByte()) |byte| { }
if (!std.ascii.isWhitespace(byte) or byte == '\r') break; continue :sw .in_first;
in.toss(1); },
} else |err| return err; .in_first => {
const byte = try in.peekByte();
if (try in.peekByte() == '\r') { if (!std.ascii.isWhitespace(byte)) {
try expectStreamBytes(in, "\r\n"); try first.append(alloc, byte);
return .{ in.toss(1);
.unsub = .{ continue :sw .in_first;
.sid = try first.toOwnedSlice(alloc), }
}, continue :sw .after_first;
}; },
} else { .after_first => {
var second: std.ArrayList(u8) = .empty; const byte = try in.peekByte();
defer second.deinit(alloc); if (byte == '\r') {
continue :sw .in_end;
while (in.peekByte()) |byte| { } else if (std.ascii.isWhitespace(byte)) {
if (std.ascii.isWhitespace(byte)) break; in.toss(1);
try second.append(alloc, byte); continue :sw .after_first;
}
second = .empty;
continue :sw .in_second;
},
.in_second => {
const byte = try in.peekByte();
if (byte == '\r') {
continue :sw .in_end;
}
try second.?.append(alloc, byte);
in.toss(1); in.toss(1);
} else |err| return err; continue :sw .in_second;
},
try expectStreamBytes(in, "\r\n"); .in_end => {
return .{ try expectStreamBytes(in, "\r\n");
.unsub = .{ },
.max_msgs = try std.fmt.parseUnsigned(usize, second.items, 10),
.sid = try first.toOwnedSlice(alloc),
},
};
} }
return .{
.unsub = .{
.sid = try first.toOwnedSlice(alloc),
.max_msgs = if (second) |s| try std.fmt.parseUnsigned(usize, s.items, 10) else null,
},
};
} }
test parseUnsub { test parseUnsub {