Support queue groups

This commit is contained in:
2026-01-06 13:42:30 -05:00
parent 81a93654a1
commit c676a8218e

View File

@@ -26,6 +26,7 @@ pub const Subscription = struct {
subject: []const u8,
client_id: usize,
sid: []const u8,
queue_group: ?[]const u8,
queue: *Queue(Msgs),
// used to alloc messages in the queue
alloc: Allocator,
@@ -33,6 +34,7 @@ pub const Subscription = struct {
fn deinit(self: Subscription, alloc: Allocator) void {
alloc.free(self.subject);
alloc.free(self.sid);
if (self.queue_group) |g| alloc.free(g);
}
};
@@ -201,12 +203,12 @@ fn handleConnection(
.PUB => |pb| {
@branchHint(.likely);
defer pb.deinit(server_allocator);
try server.publishMessage(io, &client, msg);
try server.publishMessage(io, server_allocator, &client, msg);
},
.HPUB => |hp| {
@branchHint(.likely);
defer hp.deinit(server_allocator);
try server.publishMessage(io, &client, msg);
try server.publishMessage(io, server_allocator, &client, msg);
},
.SUB => |sub| {
defer sub.deinit(server_allocator);
@@ -270,7 +272,13 @@ test subjectMatches {
try expect(subjectMatches("foo.>", "foo.bar.baz"));
}
fn publishMessage(server: *Server, io: Io, source_client: *Client, msg: Message) !void {
fn publishMessage(
server: *Server,
io: Io,
alloc: Allocator,
source_client: *Client,
msg: Message,
) !void {
defer if (source_client.connect) |c| {
if (c.verbose) {
source_client.send(io, .@"+OK") catch {};
@@ -284,8 +292,26 @@ fn publishMessage(server: *Server, io: Io, source_client: *Client, msg: Message)
};
try server.subs_lock.lock(io);
defer server.subs_lock.unlock(io);
for (server.subscriptions.items) |subscription| {
var published_queue_groups: ArrayList([]const u8) = .empty;
defer published_queue_groups.deinit(alloc);
var published_queue_sub_idxs: ArrayList(usize) = .empty;
defer published_queue_sub_idxs.deinit(alloc);
subs: for (0..server.subscriptions.items.len) |i| {
const subscription = server.subscriptions.items[i];
if (subjectMatches(subscription.subject, subject)) {
if (subscription.queue_group) |sg| {
for (published_queue_groups.items) |g| {
if (eql(u8, g, sg)) {
continue :subs;
}
}
// Don't republish to the same queue
try published_queue_groups.append(alloc, sg);
// Move this index to the end of the subscription list,
// to prioritize other subscriptions in the queue next time.
try published_queue_sub_idxs.append(alloc, i);
}
switch (msg) {
.PUB => |pb| {
try subscription.queue.putOne(io, .{
@@ -301,6 +327,11 @@ fn publishMessage(server: *Server, io: Io, source_client: *Client, msg: Message)
}
}
}
for (0..published_queue_sub_idxs.items.len) |from_end| {
const i = published_queue_sub_idxs.items.len - from_end - 1;
server.subscriptions.appendAssumeCapacity(server.subscriptions.orderedRemove(i));
}
}
fn subscribe(
@@ -317,10 +348,13 @@ fn subscribe(
errdefer gpa.free(subject);
const sid = try gpa.dupe(u8, msg.sid);
errdefer gpa.free(sid);
const queue_group = if (msg.queue_group) |q| try gpa.dupe(u8, q) else null;
errdefer if (queue_group) |q| gpa.free(q);
try server.subscriptions.append(gpa, .{
.subject = subject,
.client_id = id,
.sid = sid,
.queue_group = queue_group,
.queue = client.msg_queue,
.alloc = client.alloc,
});