This commit is contained in:
2025-11-29 18:30:39 -05:00
parent c6dfcc541d
commit bd9ed88e5c
4 changed files with 161 additions and 144 deletions

View File

@@ -3,6 +3,7 @@ const zits = @import("zits");
const clap = @import("clap"); const clap = @import("clap");
const Message = zits.MessageParser.Message; const Message = zits.MessageParser.Message;
const Server = zits.Server;
const SubCommands = enum { const SubCommands = enum {
help, help,
@@ -23,7 +24,7 @@ const main_params = clap.parseParamsComptime(
// To pass around arguments returned by clap, `clap.Result` and `clap.ResultEx` can be used to // To pass around arguments returned by clap, `clap.Result` and `clap.ResultEx` can be used to
// get the return type of `clap.parse` and `clap.parseEx`. // get the return type of `clap.parse` and `clap.parseEx`.
const MainArgs = clap.ResultEx(clap.Help, &main_params, main_parsers); pub const MainArgs = clap.ResultEx(clap.Help, &main_params, main_parsers);
pub fn main() !void { pub fn main() !void {
var dba: std.heap.DebugAllocator(.{}) = .init; var dba: std.heap.DebugAllocator(.{}) = .init;
@@ -51,6 +52,7 @@ pub fn main() !void {
return err; return err;
}; };
defer res.deinit(); defer res.deinit();
std.debug.print("res: {any}\n", .{res});
if (res.args.help != 0) if (res.args.help != 0)
return clap.helpToFile(.stderr(), clap.Help, &main_params, .{}); return clap.helpToFile(.stderr(), clap.Help, &main_params, .{});
@@ -58,138 +60,44 @@ pub fn main() !void {
const command = res.positionals[0] orelse return error.MissingCommand; const command = res.positionals[0] orelse return error.MissingCommand;
switch (command) { switch (command) {
.help => return clap.helpToFile(.stderr(), clap.Help, &main_params, .{}), .help => return clap.helpToFile(.stderr(), clap.Help, &main_params, .{}),
.serve => try serverMain(gpa, &iter, res), .serve => try Server.main(gpa, &iter, res),
.@"pub" => unreachable, .@"pub" => unreachable,
} }
} }
const ServerInfo = struct { // fn serverMain(gpa: std.mem.Allocator, iter: *std.process.ArgIterator, main_args: MainArgs) !void {
/// The unique identifier of the NATS server. // _ = iter;
server_id: []const u8, // _ = main_args;
/// The name of the NATS server.
server_name: []const u8,
/// The version of NATS.
version: []const u8,
/// The version of golang the NATS server was built with.
go: []const u8 = "0.0.0",
/// The IP address used to start the NATS server,
/// by default this will be 0.0.0.0 and can be
/// configured with -client_advertise host:port.
host: []const u8 = "0.0.0.0",
/// The port number the NATS server is configured
/// to listen on.
port: u16 = 6868,
/// Whether the server supports headers.
headers: bool = false,
/// Maximum payload size, in bytes, that the server
/// will accept from the client.
max_payload: u64,
/// An integer indicating the protocol version of
/// the server. The server version 1.2.0 sets this
/// to 1 to indicate that it supports the "Echo"
/// feature.
proto: u32 = 1,
};
fn serverMain(gpa: std.mem.Allocator, iter: *std.process.ArgIterator, main_args: MainArgs) !void { // var threaded: std.Io.Threaded = .init(gpa);
_ = iter; // defer threaded.deinit();
_ = main_args; // const io = threaded.io();
var threaded: std.Io.Threaded = .init(gpa); // const info: ServerInfo = .{
defer threaded.deinit(); // .server_id = "NBEK5DBBB4ZO5LTBGPXACZSB2QUTODC6GGN5NLOSPIGSRFWJID4XU52C",
const io = threaded.io(); // .server_name = "bar",
// .version = "2.11.8",
// .go = "go1.24.6",
// .headers = true,
// .max_payload = 1048576,
// };
const info: ServerInfo = .{ // var server = try std.Io.net.IpAddress.listen(
.server_id = "NBEK5DBBB4ZO5LTBGPXACZSB2QUTODC6GGN5NLOSPIGSRFWJID4XU52C", // .{
.server_name = "bar", // .ip4 = .{
.version = "2.11.8", // .bytes = .{ 0, 0, 0, 0 },
.go = "go1.24.6", // .port = info.port,
.headers = true, // },
.max_payload = 1048576, // },
}; // io,
// .{},
// );
// defer server.deinit(io);
var server = try std.Io.net.IpAddress.listen( // var group: std.Io.Group = .init;
.{ // defer group.wait(io);
.ip4 = .{ // for (0..5) |_| {
.bytes = .{ 0, 0, 0, 0 }, // const stream = try server.accept(io);
.port = info.port, // group.async(io, handleConnection, .{ gpa, io, stream, info });
}, // }
}, // }
io,
.{},
);
defer server.deinit(io);
var group: std.Io.Group = .init;
defer group.wait(io);
for (0..5) |_| {
const stream = try server.accept(io);
group.async(io, handleConnection, .{ gpa, io, stream, info });
}
}
fn handleConnection(allocator: std.mem.Allocator, io: std.Io, stream: std.Io.net.Stream, info: ServerInfo) void {
defer stream.close(io);
var w_buffer: [1024]u8 = undefined;
var writer = stream.writer(io, &w_buffer);
const out = &writer.interface;
var r_buffer: [8192]u8 = undefined;
var reader = stream.reader(io, &r_buffer);
const in = &reader.interface;
processClient(allocator, in, out, info) catch |err| {
std.debug.panic("Error processing client: {}\n", .{err});
};
}
fn processClient(gpa: std.mem.Allocator, in: *std.Io.Reader, out: *std.Io.Writer, info: ServerInfo) !void {
try writeInfo(out, info);
var client_state_arena: std.heap.ArenaAllocator = .init(gpa);
defer client_state_arena.deinit();
const client_state = (try Message.next(client_state_arena.allocator(), in)).connect;
_ = client_state;
var message_parsing_arena: std.heap.ArenaAllocator = .init(gpa);
defer message_parsing_arena.deinit();
const message_parsing_allocator = message_parsing_arena.allocator();
while (true) {
defer _ = message_parsing_arena.reset(.retain_capacity);
const next_message = Message.next(message_parsing_allocator, in) catch |err| {
switch (err) {
error.EndOfStream => {
break;
},
else => {
return err;
},
}
};
switch (next_message) {
.connect => |connect| {
std.debug.panic("Connection message after already connected: {any}\n", .{connect});
},
.ping => try writePong(out),
.@"pub" => try writeOk(out),
else => |msg| std.debug.panic("Message type not implemented: {any}\n", .{msg}),
}
}
}
fn writeOk(out: *std.Io.Writer) !void {
_ = try out.write("+OK\r\n");
try out.flush();
}
fn writePong(out: *std.Io.Writer) !void {
_ = try out.write("PONG\r\n");
try out.flush();
}
fn writeInfo(out: *std.Io.Writer, info: ServerInfo) !void {
_ = try out.write("INFO ");
try std.json.Stringify.value(info, .{}, out);
_ = try out.write("\r\n");
try out.flush();
}

View File

@@ -1 +1,2 @@
pub const MessageParser = @import("server/message_parser.zig"); pub const MessageParser = @import("server/message_parser.zig");
pub const Server = @import("server/main.zig");

View File

@@ -1,18 +1,17 @@
const Message = @import("message_parser.zig").Message;
const std = @import("std");
const ClientState = struct { const ClientState = struct {
verbose: bool = false, id: u32,
pedantic: bool = false, /// Used to back `connect` strings.
tls_required: bool = false, string_buffer: [4096]u8,
auth_token: ?[]const u8 = null, connect: Message.Connect,
user: ?[]const u8 = null, send_queue: std.Io.Queue(Message) = blk: {
pass: ?[]const u8 = null, var send_queue_buffer: [1024]Message = undefined;
name: ?[]const u8 = null, break :blk .init(&send_queue_buffer);
lang: []const u8, },
version: []const u8, recv_queue: std.Io.Queue(Message) = blk: {
protocol: u32, var recv_queue_buffer: [1024]Message = undefined;
echo: ?bool = null, break :blk .init(&recv_queue_buffer);
sig: ?[]const u8 = null, },
jwt: ?[]const u8 = null,
no_responders: ?bool = null,
headers: ?bool = null,
nkey: ?[]const u8 = null,
}; };

109
src/server/main.zig Normal file
View File

@@ -0,0 +1,109 @@
const std = @import("std");
const Message = @import("./message_parser.zig");
const ClientState = @import("./client.zig");
const ServerInfo = struct {
/// The unique identifier of the NATS server.
server_id: []const u8,
/// The name of the NATS server.
server_name: []const u8,
/// The version of NATS.
version: []const u8,
/// The version of golang the NATS server was built with.
go: []const u8 = "0.0.0",
/// The IP address used to start the NATS server,
/// by default this will be 0.0.0.0 and can be
/// configured with -client_advertise host:port.
host: []const u8 = "0.0.0.0",
/// The port number the NATS server is configured
/// to listen on.
port: u16 = 6868,
/// Whether the server supports headers.
headers: bool = false,
/// Maximum payload size, in bytes, that the server
/// will accept from the client.
max_payload: u64,
/// An integer indicating the protocol version of
/// the server. The server version 1.2.0 sets this
/// to 1 to indicate that it supports the "Echo"
/// feature.
proto: u32 = 1,
};
server_info: ServerInfo,
clients: std.AutoHashMapUnmanaged(u64, ClientState) = .empty,
/// Map of subjects to client IDs that are subscribed to that subject.
subscriptions: std.StringHashMapUnmanaged(std.ArrayList(u64)),
pub fn main(gpa: std.mem.Allocator, iter: *std.process.ArgIterator, main_args: anytype) !void {
_ = gpa;
_ = iter;
_ = main_args;
}
fn handleConnection(allocator: std.mem.Allocator, io: std.Io, stream: std.Io.net.Stream, info: ServerInfo) void {
defer stream.close(io);
var w_buffer: [1024]u8 = undefined;
var writer = stream.writer(io, &w_buffer);
const out = &writer.interface;
var r_buffer: [8192]u8 = undefined;
var reader = stream.reader(io, &r_buffer);
const in = &reader.interface;
processClient(allocator, in, out, info) catch |err| {
std.debug.panic("Error processing client: {}\n", .{err});
};
}
fn processClient(gpa: std.mem.Allocator, in: *std.Io.Reader, out: *std.Io.Writer, info: ServerInfo) !void {
try writeInfo(out, info);
var client_state_arena: std.heap.ArenaAllocator = .init(gpa);
defer client_state_arena.deinit();
const client_state = (try Message.next(client_state_arena.allocator(), in)).connect;
_ = client_state;
var message_parsing_arena: std.heap.ArenaAllocator = .init(gpa);
defer message_parsing_arena.deinit();
const message_parsing_allocator = message_parsing_arena.allocator();
while (true) {
defer _ = message_parsing_arena.reset(.retain_capacity);
const next_message = Message.next(message_parsing_allocator, in) catch |err| {
switch (err) {
error.EndOfStream => {
break;
},
else => {
return err;
},
}
};
switch (next_message) {
.connect => |connect| {
std.debug.panic("Connection message after already connected: {any}\n", .{connect});
},
.ping => try writePong(out),
.@"pub" => try writeOk(out),
else => |msg| std.debug.panic("Message type not implemented: {any}\n", .{msg}),
}
}
}
fn writeOk(out: *std.Io.Writer) !void {
_ = try out.write("+OK\r\n");
try out.flush();
}
fn writePong(out: *std.Io.Writer) !void {
_ = try out.write("PONG\r\n");
try out.flush();
}
fn writeInfo(out: *std.Io.Writer, info: ServerInfo) !void {
_ = try out.write("INFO ");
try std.json.Stringify.value(info, .{}, out);
_ = try out.write("\r\n");
try out.flush();
}