mirror of
https://git.robbyzambito.me/zits
synced 2026-02-04 03:34:48 +00:00
not sure why, seems like i'm using the right allocators everywhere? need to take another pass at this later.
282 lines
9.0 KiB
Zig
282 lines
9.0 KiB
Zig
const std = @import("std");
|
|
const builtin = @import("builtin");
|
|
const Message = @import("./message_parser.zig").Message;
|
|
pub const ServerInfo = Message.ServerInfo;
|
|
|
|
const Client = @import("./client.zig");
|
|
const Server = @This();
|
|
|
|
const Subscription = struct {
|
|
subject: []const u8,
|
|
client_id: usize,
|
|
sid: []const u8,
|
|
};
|
|
|
|
info: ServerInfo,
|
|
clients: std.AutoHashMapUnmanaged(usize, *Client) = .empty,
|
|
|
|
subs_lock: std.Io.Mutex = .init,
|
|
subscriptions: std.ArrayList(Subscription) = .empty,
|
|
|
|
var keep_running = std.atomic.Value(bool).init(true);
|
|
|
|
fn handleSigInt(sig: std.os.linux.SIG) callconv(.c) void {
|
|
_ = sig;
|
|
keep_running.store(false, .monotonic);
|
|
}
|
|
|
|
pub fn deinit(server: *Server, io: std.Io, alloc: std.mem.Allocator) void {
|
|
server.subs_lock.lockUncancelable(io);
|
|
defer server.subs_lock.unlock(io);
|
|
for (server.subscriptions.items) |sub| {
|
|
alloc.free(sub.sid);
|
|
alloc.free(sub.subject);
|
|
}
|
|
server.subscriptions.shrinkAndFree(alloc, 0);
|
|
|
|
server.clients.clearAndFree(alloc);
|
|
}
|
|
|
|
pub fn main(alloc: std.mem.Allocator, server_config: ServerInfo) !void {
|
|
// Configure the signal action
|
|
const act = std.posix.Sigaction{
|
|
.handler = .{ .handler = handleSigInt },
|
|
.mask = std.posix.sigemptyset(),
|
|
.flags = 0,
|
|
};
|
|
|
|
// Register the handler for SIGINT (Ctrl+C)
|
|
std.posix.sigaction(std.posix.SIG.INT, &act, null);
|
|
|
|
{
|
|
var dba: std.heap.DebugAllocator(.{}) = .init;
|
|
dba.backing_allocator = alloc;
|
|
defer _ = dba.deinit();
|
|
const gpa = if (builtin.mode == .Debug or builtin.mode == .ReleaseSafe) dba.allocator() else alloc;
|
|
|
|
var threaded: std.Io.Threaded = .init(gpa, .{});
|
|
defer threaded.deinit();
|
|
const io = threaded.io();
|
|
|
|
var server: Server = .{
|
|
.info = server_config,
|
|
};
|
|
defer server.deinit(io, gpa);
|
|
|
|
var server_task = try io.concurrent(start, .{ &server, io, gpa });
|
|
defer server_task.cancel(io) catch {};
|
|
|
|
while (keep_running.load(.monotonic)) {
|
|
try io.sleep(.fromMilliseconds(1), .awake);
|
|
}
|
|
|
|
std.debug.print("\nShutting down...\n", .{});
|
|
server_task.cancel(io) catch {};
|
|
}
|
|
std.debug.print("Goodbye\n", .{});
|
|
}
|
|
|
|
pub fn start(server: *Server, io: std.Io, gpa: std.mem.Allocator) !void {
|
|
var tcp_server = try std.Io.net.IpAddress.listen(try std.Io.net.IpAddress.parse(
|
|
server.info.host,
|
|
server.info.port,
|
|
), io, .{ .reuse_address = true });
|
|
defer tcp_server.deinit(io);
|
|
|
|
var client_group: std.Io.Group = .init;
|
|
defer client_group.cancel(io);
|
|
|
|
var id: usize = 0;
|
|
while (true) : (id +%= 1) {
|
|
std.debug.print("in server loop\n", .{});
|
|
if (server.clients.contains(id)) continue;
|
|
const stream = try tcp_server.accept(io);
|
|
std.debug.print("accepted connection\n", .{});
|
|
_ = client_group.concurrent(io, handleConnectionInfallible, .{ server, gpa, io, id, stream }) catch {
|
|
std.debug.print("could not start concurrent handler for {d}\n", .{id});
|
|
stream.close(io);
|
|
};
|
|
}
|
|
}
|
|
|
|
fn addClient(server: *Server, allocator: std.mem.Allocator, id: usize, client: *Client) !void {
|
|
try server.clients.put(allocator, id, client);
|
|
}
|
|
|
|
fn removeClient(server: *Server, io: std.Io, allocator: std.mem.Allocator, id: usize) void {
|
|
server.subs_lock.lockUncancelable(io);
|
|
defer server.subs_lock.unlock(io);
|
|
if (server.clients.remove(id)) {
|
|
const len = server.subscriptions.items.len;
|
|
for (0..len) |from_end| {
|
|
const i = len - from_end - 1;
|
|
const sub = server.subscriptions.items[i];
|
|
if (sub.client_id == id) {
|
|
allocator.free(sub.sid);
|
|
allocator.free(sub.subject);
|
|
_ = server.subscriptions.swapRemove(i);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
fn handleConnectionInfallible(
|
|
server: *Server,
|
|
server_allocator: std.mem.Allocator,
|
|
io: std.Io,
|
|
id: usize,
|
|
stream: std.Io.net.Stream,
|
|
) void {
|
|
handleConnection(server, server_allocator, io, id, stream) catch {};
|
|
}
|
|
|
|
fn handleConnection(
|
|
server: *Server,
|
|
server_allocator: std.mem.Allocator,
|
|
io: std.Io,
|
|
id: usize,
|
|
stream: std.Io.net.Stream,
|
|
) !void {
|
|
defer stream.close(io);
|
|
|
|
//var client_allocator: std.heap.DebugAllocator(.{}) = .init;
|
|
//client_allocator.backing_allocator = server_allocator;
|
|
//defer _ = client_allocator.deinit();
|
|
//const allocator = if (builtin.mode == .Debug or builtin.mode == .ReleaseSafe) client_allocator.allocator() else server_allocator;
|
|
|
|
// Set up client writer
|
|
var w_buffer: [1024]u8 = undefined;
|
|
var writer = stream.writer(io, &w_buffer);
|
|
const out = &writer.interface;
|
|
|
|
// Set up client reader
|
|
var r_buffer: [1024]u8 = undefined;
|
|
var reader = stream.reader(io, &r_buffer);
|
|
const in = &reader.interface;
|
|
|
|
// Create client
|
|
var client: Client = .init(null, in, out);
|
|
defer client.deinit(server_allocator);
|
|
|
|
try server.addClient(server_allocator, id, &client);
|
|
defer server.removeClient(io, server_allocator, id);
|
|
|
|
var qbuf: [8]Message = undefined;
|
|
var queue: std.Io.Queue(Message) = .init(&qbuf);
|
|
defer {
|
|
queue.close(io);
|
|
while (queue.getOne(io)) |msg| {
|
|
switch (msg) {
|
|
.msg => |m| m.deinit(server_allocator),
|
|
else => {},
|
|
}
|
|
} else |_| {}
|
|
}
|
|
|
|
var client_task = try io.concurrent(Client.start, .{ &client, io, server_allocator, &queue, server.info });
|
|
defer client_task.cancel(io) catch {};
|
|
|
|
// Messages are owned by the server after they are received from the client
|
|
while (client.next(server_allocator)) |msg| {
|
|
switch (msg) {
|
|
.ping => {
|
|
// Respond to ping with pong.
|
|
try client.send(io, .pong);
|
|
},
|
|
.@"pub" => |pb| {
|
|
defer pb.deinit(server_allocator);
|
|
try server.publishMessage(io, server_allocator, &client, pb);
|
|
},
|
|
.sub => |sub| {
|
|
try server.subscribe(io, server_allocator, id, sub);
|
|
},
|
|
.unsub => |unsub| {
|
|
try server.unsubscribe(io, server_allocator, id, unsub);
|
|
},
|
|
.connect => |connect| {
|
|
if (client.connect) |*current| {
|
|
current.deinit(server_allocator);
|
|
}
|
|
client.connect = connect;
|
|
},
|
|
else => |e| {
|
|
std.debug.panic("Unimplemented message: {any}\n", .{e});
|
|
},
|
|
}
|
|
} else |err| switch (err) {
|
|
error.EndOfStream => {
|
|
std.debug.print("Client {d} disconnected\n", .{id});
|
|
},
|
|
else => {
|
|
return err;
|
|
},
|
|
}
|
|
}
|
|
|
|
fn subjectMatches(sub_subject: []const u8, pub_subject: []const u8) bool {
|
|
return std.mem.eql(u8, sub_subject, pub_subject);
|
|
}
|
|
|
|
fn publishMessage(server: *Server, io: std.Io, alloc: std.mem.Allocator, source_client: *Client, msg: Message.Pub) !void {
|
|
errdefer {
|
|
if (source_client.connect) |c| {
|
|
if (c.verbose) {
|
|
source_client.send(io, .{ .@"-err" = "Slow Consumer" }) catch {};
|
|
}
|
|
}
|
|
}
|
|
try server.subs_lock.lock(io);
|
|
defer server.subs_lock.unlock(io);
|
|
for (server.subscriptions.items) |subscription| {
|
|
if (subjectMatches(subscription.subject, msg.subject)) {
|
|
const client = server.clients.get(subscription.client_id) orelse {
|
|
std.debug.print("trying to publish to a client that no longer exists: {d}\n", .{subscription.client_id});
|
|
continue;
|
|
};
|
|
client.send(io, .{ .msg = .{
|
|
.subject = try alloc.dupe(u8, msg.subject),
|
|
.sid = try alloc.dupe(u8, subscription.sid),
|
|
.reply_to = if (msg.reply_to) |r| try alloc.dupe(u8, r) else null,
|
|
.payload = try alloc.dupe(u8, msg.payload),
|
|
} }) catch continue;
|
|
}
|
|
}
|
|
if (source_client.connect) |c| {
|
|
if (c.verbose) {
|
|
source_client.send(io, .@"+ok") catch {};
|
|
}
|
|
}
|
|
}
|
|
|
|
fn subscribe(server: *Server, io: std.Io, gpa: std.mem.Allocator, id: usize, msg: Message.Sub) !void {
|
|
try server.subs_lock.lock(io);
|
|
defer server.subs_lock.unlock(io);
|
|
try server.subscriptions.append(gpa, .{
|
|
.subject = msg.subject,
|
|
.client_id = id,
|
|
.sid = msg.sid,
|
|
});
|
|
}
|
|
|
|
fn unsubscribe(server: *Server, io: std.Io, gpa: std.mem.Allocator, id: usize, msg: Message.Unsub) !void {
|
|
try server.subs_lock.lock(io);
|
|
defer server.subs_lock.unlock(io);
|
|
const len = server.subscriptions.items.len;
|
|
for (0..len) |i| {
|
|
const sub = server.subscriptions.items[len - i - 1];
|
|
if (sub.client_id == id and std.mem.eql(u8, sub.sid, msg.sid)) {
|
|
gpa.free(sub.sid);
|
|
gpa.free(sub.subject);
|
|
_ = server.subscriptions.swapRemove(i);
|
|
}
|
|
}
|
|
}
|
|
|
|
pub fn createId() []const u8 {
|
|
return "SERVERID";
|
|
}
|
|
|
|
pub fn createName() []const u8 {
|
|
return "SERVERNAME";
|
|
}
|