mirror of
https://git.robbyzambito.me/zits
synced 2026-02-04 03:34:48 +00:00
Slower, but probably more correct
This commit is contained in:
@@ -97,16 +97,18 @@ fn addClient(server: *Server, allocator: std.mem.Allocator, id: usize, client: *
|
|||||||
fn removeClient(server: *Server, io: std.Io, allocator: std.mem.Allocator, id: usize) void {
|
fn removeClient(server: *Server, io: std.Io, allocator: std.mem.Allocator, id: usize) void {
|
||||||
server.subs_lock.lockUncancelable(io);
|
server.subs_lock.lockUncancelable(io);
|
||||||
defer server.subs_lock.unlock(io);
|
defer server.subs_lock.unlock(io);
|
||||||
_ = server.clients.remove(id);
|
if (server.clients.remove(id)) {
|
||||||
const len = server.subscriptions.items.len;
|
const len = server.subscriptions.items.len;
|
||||||
for (0..len) |i| {
|
for (0..len) |from_end| {
|
||||||
const sub = server.subscriptions.items[len - i - 1];
|
const i = len - from_end - 1;
|
||||||
|
const sub = server.subscriptions.items[i];
|
||||||
if (sub.client_id == id) {
|
if (sub.client_id == id) {
|
||||||
allocator.free(sub.sid);
|
allocator.free(sub.sid);
|
||||||
allocator.free(sub.subject);
|
allocator.free(sub.subject);
|
||||||
_ = server.subscriptions.swapRemove(i);
|
_ = server.subscriptions.swapRemove(i);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn handleConnection(
|
fn handleConnection(
|
||||||
@@ -138,7 +140,7 @@ fn handleConnection(
|
|||||||
try server.addClient(server_allocator, id, &client);
|
try server.addClient(server_allocator, id, &client);
|
||||||
defer server.removeClient(io, server_allocator, id);
|
defer server.removeClient(io, server_allocator, id);
|
||||||
|
|
||||||
var qbuf: [1024]Message = undefined;
|
var qbuf: [16]Message = undefined;
|
||||||
var queue: std.Io.Queue(Message) = .init(&qbuf);
|
var queue: std.Io.Queue(Message) = .init(&qbuf);
|
||||||
|
|
||||||
var client_task = try io.concurrent(Client.start, .{ &client, io, server_allocator, &queue });
|
var client_task = try io.concurrent(Client.start, .{ &client, io, server_allocator, &queue });
|
||||||
@@ -207,7 +209,7 @@ fn publishMessage(server: *Server, io: std.Io, alloc: std.mem.Allocator, source_
|
|||||||
for (server.subscriptions.items) |subscription| {
|
for (server.subscriptions.items) |subscription| {
|
||||||
if (subjectMatches(subscription.subject, msg.subject)) {
|
if (subjectMatches(subscription.subject, msg.subject)) {
|
||||||
const client = server.clients.get(subscription.client_id) orelse {
|
const client = server.clients.get(subscription.client_id) orelse {
|
||||||
std.debug.print("trying to publish to a client that no longer exists: {d}", .{subscription.client_id});
|
std.debug.print("trying to publish to a client that no longer exists: {d}\n", .{subscription.client_id});
|
||||||
continue;
|
continue;
|
||||||
};
|
};
|
||||||
client.send(io, .{ .msg = .{
|
client.send(io, .{ .msg = .{
|
||||||
|
|||||||
Reference in New Issue
Block a user