mirror of
https://git.robbyzambito.me/zits
synced 2026-02-04 03:34:48 +00:00
things are running quite smoothly!!!
coder@08714a4174bb:~$ nats bench sub foo -s localhost:4223 03:28:04 Starting Core NATS subscriber benchmark [clients=1, msg-size=128 B, msgs=100,000, multi-subject=false, subject=foo] 03:28:04 [1] Starting Core NATS subscriber, expecting 100,000 messages Finished 6s [====================================================================================] 100% NATS Core NATS subscriber stats: 14,691 msgs/sec ~ 1.8 MiB/sec ~ 68.06us
This commit is contained in:
@@ -13,6 +13,8 @@ const Subscription = struct {
|
|||||||
|
|
||||||
info: ServerInfo,
|
info: ServerInfo,
|
||||||
clients: std.AutoHashMapUnmanaged(usize, *ClientState) = .empty,
|
clients: std.AutoHashMapUnmanaged(usize, *ClientState) = .empty,
|
||||||
|
|
||||||
|
subs_lock: std.Thread.Mutex = .{},
|
||||||
subscriptions: std.ArrayList(Subscription) = .empty,
|
subscriptions: std.ArrayList(Subscription) = .empty,
|
||||||
|
|
||||||
var keep_running = std.atomic.Value(bool).init(true);
|
var keep_running = std.atomic.Value(bool).init(true);
|
||||||
@@ -70,10 +72,18 @@ fn addClient(server: *Server, allocator: std.mem.Allocator, id: usize, client: *
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn removeClient(server: *Server, allocator: std.mem.Allocator, id: usize) void {
|
fn removeClient(server: *Server, allocator: std.mem.Allocator, id: usize) void {
|
||||||
// TODO: implement
|
server.subs_lock.lock();
|
||||||
_ = server;
|
defer server.subs_lock.unlock();
|
||||||
_ = allocator;
|
_ = server.clients.remove(id);
|
||||||
_ = id;
|
const len = server.subscriptions.items.len;
|
||||||
|
for (0..len) |i| {
|
||||||
|
const sub = server.subscriptions.items[len - i - 1];
|
||||||
|
if (sub.client_id == id) {
|
||||||
|
allocator.free(sub.sid);
|
||||||
|
allocator.free(sub.subject);
|
||||||
|
_ = server.subscriptions.swapRemove(i);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn handleConnection(
|
fn handleConnection(
|
||||||
@@ -109,8 +119,8 @@ fn handleConnection(
|
|||||||
try client_state.start(io);
|
try client_state.start(io);
|
||||||
defer client_state.deinit(io, allocator);
|
defer client_state.deinit(io, allocator);
|
||||||
|
|
||||||
try server.addClient(allocator, id, &client_state);
|
try server.addClient(server_allocator, id, &client_state);
|
||||||
defer server.removeClient(allocator, id);
|
defer server.removeClient(server_allocator, id);
|
||||||
|
|
||||||
var msg_arena: std.heap.ArenaAllocator = .init(allocator);
|
var msg_arena: std.heap.ArenaAllocator = .init(allocator);
|
||||||
defer msg_arena.deinit();
|
defer msg_arena.deinit();
|
||||||
@@ -194,6 +204,8 @@ fn publishMessage(server: *Server, io: std.Io, msg: Message.Pub) !void {
|
|||||||
|
|
||||||
fn subscribe(server: *Server, gpa: std.mem.Allocator, id: usize, msg: Message.Sub) !void {
|
fn subscribe(server: *Server, gpa: std.mem.Allocator, id: usize, msg: Message.Sub) !void {
|
||||||
std.debug.print("Recieved SUBSCRIBE message: {any}\n\n", .{msg});
|
std.debug.print("Recieved SUBSCRIBE message: {any}\n\n", .{msg});
|
||||||
|
server.subs_lock.lock();
|
||||||
|
defer server.subs_lock.unlock();
|
||||||
try server.subscriptions.append(gpa, .{
|
try server.subscriptions.append(gpa, .{
|
||||||
.subject = try gpa.dupe(u8, msg.subject),
|
.subject = try gpa.dupe(u8, msg.subject),
|
||||||
.client_id = id,
|
.client_id = id,
|
||||||
@@ -202,9 +214,11 @@ fn subscribe(server: *Server, gpa: std.mem.Allocator, id: usize, msg: Message.Su
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn unsubscribe(server: *Server, gpa: std.mem.Allocator, id: usize, msg: Message.Unsub) !void {
|
fn unsubscribe(server: *Server, gpa: std.mem.Allocator, id: usize, msg: Message.Unsub) !void {
|
||||||
|
server.subs_lock.lock();
|
||||||
|
defer server.subs_lock.unlock();
|
||||||
const len = server.subscriptions.items.len;
|
const len = server.subscriptions.items.len;
|
||||||
for (0..len) |i| {
|
for (0..len) |i| {
|
||||||
const sub = server.subscriptions.items[len - i];
|
const sub = server.subscriptions.items[len - i - 1];
|
||||||
if (sub.client_id == id and std.mem.eql(u8, sub.sid, msg.sid)) {
|
if (sub.client_id == id and std.mem.eql(u8, sub.sid, msg.sid)) {
|
||||||
gpa.free(sub.sid);
|
gpa.free(sub.sid);
|
||||||
gpa.free(sub.subject);
|
gpa.free(sub.subject);
|
||||||
|
|||||||
Reference in New Issue
Block a user