gracefully exit

simplify code
clean up dead code
This commit is contained in:
2026-01-01 19:38:36 +00:00
parent 987dc492a6
commit fc68749669
3 changed files with 91 additions and 227 deletions

View File

@@ -25,25 +25,45 @@ fn handleSigInt(sig: std.os.linux.SIG) callconv(.c) void {
keep_running.store(false, .monotonic);
}
pub fn main(gpa: std.mem.Allocator, server_config: ServerInfo) !void {
pub fn main(alloc: std.mem.Allocator, server_config: ServerInfo) !void {
// Configure the signal action
// const act = std.posix.Sigaction{
// .handler = .{ .handler = handleSigInt },
// .mask = std.posix.sigemptyset(),
// .flags = 0,
// };
// // Register the handler for SIGINT (Ctrl+C)
// std.posix.sigaction(std.posix.SIG.INT, &act, null);
var server: Server = .{
.info = server_config,
const act = std.posix.Sigaction{
.handler = .{ .handler = handleSigInt },
.mask = std.posix.sigemptyset(),
.flags = 0,
};
var threaded: std.Io.Threaded = .init(gpa, .{});
defer threaded.deinit();
const io = threaded.io();
// Register the handler for SIGINT (Ctrl+C)
std.posix.sigaction(std.posix.SIG.INT, &act, null);
{
var dba: std.heap.DebugAllocator(.{}) = .init;
dba.backing_allocator = alloc;
defer _ = dba.deinit();
const gpa = if (builtin.mode == .Debug or builtin.mode == .ReleaseSafe) dba.allocator() else alloc;
var server: Server = .{
.info = server_config,
};
var threaded: std.Io.Threaded = .init(gpa, .{});
defer threaded.deinit();
const io = threaded.io();
var server_task = try io.concurrent(start, .{ &server, io, gpa });
defer server_task.cancel(io) catch {};
while (keep_running.load(.monotonic)) {
try io.sleep(.fromMilliseconds(1), .awake);
}
std.debug.print("\nShutting down...\n", .{});
server_task.cancel(io) catch {};
}
std.debug.print("Goodbye\n", .{});
}
pub fn start(server: *Server, io: std.Io, gpa: std.mem.Allocator) !void {
var tcp_server = try std.Io.net.IpAddress.listen(try std.Io.net.IpAddress.parse(
server.info.host,
server.info.port,
@@ -52,46 +72,20 @@ pub fn main(gpa: std.mem.Allocator, server_config: ServerInfo) !void {
var id: usize = 0;
// Run until SIGINT is handled, then exit gracefully
while (keep_running.load(.monotonic)) : (id +%= 1) {
while (true) : (id +%= 1) {
std.debug.print("in server loop\n", .{});
if (server.clients.contains(id)) continue;
const stream = try tcp_server.accept(io);
std.debug.print("accepted connection\n", .{});
_ = io.concurrent(handleConnection, .{ &server, gpa, io, id, stream }) catch {
_ = io.concurrent(handleConnection, .{ server, gpa, io, id, stream }) catch {
std.debug.print("could not start concurrent handler for {d}\n", .{id});
stream.close(io);
};
}
std.debug.print("Exiting gracefully\n", .{});
}
fn processMsgs(server: *Server, io: std.Io, alloc: std.mem.Allocator) void {
while (true) {
const msg = server.msg_queue.getOne(io) catch break;
defer msg.deinit(alloc);
for (server.subscriptions.items) |subscription| {
if (subjectMatches(subscription.subject, msg.subject)) {
const client = server.clients.get(subscription.client_id) orelse {
std.debug.print("trying to publish to a client that no longer exists: {d}", .{subscription.client_id});
continue;
};
client.send(io, .{ .msg = .{
.subject = msg.subject,
.sid = subscription.sid,
.reply_to = msg.reply_to,
.payload = msg.payload,
} }) catch continue;
}
}
}
}
fn addClient(server: *Server, allocator: std.mem.Allocator, id: usize, client: *Client) !void {
// server.clients.lockPointers();
try server.clients.put(allocator, id, client);
// server.clients.unlockPointers();
}
fn removeClient(server: *Server, io: std.Io, allocator: std.mem.Allocator, id: usize) void {
@@ -185,13 +179,6 @@ fn handleConnection(
}
}
// // Result is owned by the caller
// fn subscribers(server: *Server, gpa: std.mem.Allocator, subject: []const u8) []ClientState {
// var acc: std.ArrayList(ClientState) = .empty;
// return acc.toOwnedSlice();
// }
fn subjectMatches(expected: []const u8, actual: []const u8) bool {
return std.mem.eql(u8, expected, actual);
}
@@ -200,7 +187,7 @@ fn publishMessage(server: *Server, io: std.Io, alloc: std.mem.Allocator, source_
errdefer {
if (source_client.connect) |c| {
if (c.verbose) {
source_client.send(io, .@"-err") catch {};
source_client.send(io, .{ .@"-err" = "Slow Consumer" }) catch {};
}
}
}
@@ -259,42 +246,3 @@ pub fn createId() []const u8 {
pub fn createName() []const u8 {
return "SERVERNAME";
}
// TESTING
// fn initTestServer() Server {
// return .{
// .info = .{
// .server_id = "ABCD",
// .server_name = "test server",
// .version = "0.1.2",
// .max_payload = 1234,
// },
// };
// }
// fn initTestClient(
// io: std.Io,
// allocator: std.mem.Allocator,
// id: usize,
// data_from: []const u8,
// ) !struct {
// Client,
// *std.Io.Reader,
// *std.Io.Writer,
// } {
// return .init(io, allocator, id, .{}, in, out);
// }
// test {
// const gpa = std.testing.allocator;
// const io = std.testing.io;
// const server = initTestServer();
// const client: Client = .init(
// io,
// gpa,
// 1,
// .{},
// );
// }