mirror of
https://git.robbyzambito.me/zits
synced 2026-02-04 03:34:48 +00:00
Restructuring
Add a bunch of tests for the client
This commit is contained in:
@@ -11,16 +11,18 @@ const Mutex = Io.Mutex;
|
|||||||
const Queue = Io.Queue;
|
const Queue = Io.Queue;
|
||||||
const Stream = std.Io.net.Stream;
|
const Stream = std.Io.net.Stream;
|
||||||
|
|
||||||
const message_parser = @import("./message_parser.zig");
|
pub const Client = @import("./Server/Client.zig");
|
||||||
|
|
||||||
|
const message_parser = @import("./Server/message_parser.zig");
|
||||||
|
|
||||||
pub const MessageType = message_parser.MessageType;
|
pub const MessageType = message_parser.MessageType;
|
||||||
pub const Message = message_parser.Message;
|
pub const Message = message_parser.Message;
|
||||||
const ServerInfo = Message.ServerInfo;
|
const ServerInfo = Message.ServerInfo;
|
||||||
pub const Client = @import("./Client.zig");
|
|
||||||
const Msgs = Client.Msgs;
|
const Msgs = Client.Msgs;
|
||||||
const Server = @This();
|
const Server = @This();
|
||||||
|
|
||||||
const builtin = @import("builtin");
|
const builtin = @import("builtin");
|
||||||
const safe_build = builtin.mode == .Debug or builtin.mode == .ReleaseSafe;
|
|
||||||
|
|
||||||
pub const Subscription = struct {
|
pub const Subscription = struct {
|
||||||
subject: []const u8,
|
subject: []const u8,
|
||||||
@@ -39,7 +41,7 @@ pub const Subscription = struct {
|
|||||||
};
|
};
|
||||||
|
|
||||||
const eql = std.mem.eql;
|
const eql = std.mem.eql;
|
||||||
const log = std.log;
|
const log = std.log.scoped(.zits);
|
||||||
const panic = std.debug.panic;
|
const panic = std.debug.panic;
|
||||||
|
|
||||||
info: ServerInfo,
|
info: ServerInfo,
|
||||||
@@ -147,7 +149,10 @@ fn handleConnection(
|
|||||||
var dba: std.heap.DebugAllocator(.{}) = .init;
|
var dba: std.heap.DebugAllocator(.{}) = .init;
|
||||||
dba.backing_allocator = server_allocator;
|
dba.backing_allocator = server_allocator;
|
||||||
defer _ = dba.deinit();
|
defer _ = dba.deinit();
|
||||||
const alloc = if (safe_build) dba.allocator() else server_allocator;
|
const alloc = if (builtin.mode == .Debug or builtin.mode == .ReleaseSafe)
|
||||||
|
dba.allocator()
|
||||||
|
else
|
||||||
|
server_allocator;
|
||||||
|
|
||||||
// Set up client writer
|
// Set up client writer
|
||||||
const w_buffer: []u8 = try alloc.alloc(u8, w_buf_size);
|
const w_buffer: []u8 = try alloc.alloc(u8, w_buf_size);
|
||||||
@@ -132,122 +132,109 @@ pub fn start(self: *Client, io: std.Io) !void {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub fn send(self: *Client, io: std.Io, msg: Message) !void {
|
pub fn send(self: *Client, io: std.Io, msg: Message) !void {
|
||||||
try self.recv_queue.putOne(io, msg);
|
switch (msg) {
|
||||||
|
.MSG => |m| try self.msg_queue.putOne(io, .{ .MSG = m }),
|
||||||
|
.HMSG => |m| try self.msg_queue.putOne(io, .{ .HMSG = m }),
|
||||||
|
else => try self.recv_queue.putOne(io, msg),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
test send {
|
||||||
|
const io = std.testing.io;
|
||||||
|
const gpa = std.testing.allocator;
|
||||||
|
|
||||||
|
var to_client: std.Io.Writer = .fixed(blk: {
|
||||||
|
var buf: [1024]u8 = undefined;
|
||||||
|
break :blk &buf;
|
||||||
|
});
|
||||||
|
var recv_queue: Queue(Message) = .init(&.{});
|
||||||
|
var msgs_queue: Queue(Msgs) = .init(blk: {
|
||||||
|
var buf: [1]Msgs = undefined;
|
||||||
|
break :blk &buf;
|
||||||
|
});
|
||||||
|
var client: Client = .init(null, gpa, &recv_queue, &msgs_queue, undefined, &to_client);
|
||||||
|
defer client.deinit(gpa);
|
||||||
|
|
||||||
|
var c_task = try io.concurrent(Client.start, .{ &client, io });
|
||||||
|
defer c_task.cancel(io) catch {};
|
||||||
|
|
||||||
|
{
|
||||||
|
try client.send(io, .PONG);
|
||||||
|
// Wait for the concurrent client task to write to the writer
|
||||||
|
try io.sleep(.fromMilliseconds(1), .awake);
|
||||||
|
try std.testing.expectEqualSlices(u8, "PONG\r\n", to_client.buffered());
|
||||||
|
}
|
||||||
|
|
||||||
|
to_client.end = 0;
|
||||||
|
|
||||||
|
{
|
||||||
|
const payload = "payload";
|
||||||
|
const msg: Message.Msg = .{
|
||||||
|
.sid = "1",
|
||||||
|
.subject = "subject",
|
||||||
|
.reply_to = "reply",
|
||||||
|
.payload = .{
|
||||||
|
.len = payload.len,
|
||||||
|
.short = blk: {
|
||||||
|
var buf: [128]u8 = undefined;
|
||||||
|
@memcpy(buf[0..payload.len], payload);
|
||||||
|
break :blk buf;
|
||||||
|
},
|
||||||
|
.long = null,
|
||||||
|
},
|
||||||
|
};
|
||||||
|
try client.send(io, .{
|
||||||
|
// msg must be owned by the allocator the client uses
|
||||||
|
.MSG = try msg.dupe(gpa),
|
||||||
|
});
|
||||||
|
try io.sleep(.fromMilliseconds(1), .awake);
|
||||||
|
try std.testing.expectEqualSlices(u8, "MSG subject 1 reply 7\r\npayload\r\n", to_client.buffered());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn next(self: *Client, allocator: std.mem.Allocator) !Message {
|
pub fn next(self: *Client, allocator: std.mem.Allocator) !Message {
|
||||||
return Message.next(allocator, self.from_client);
|
return Message.next(allocator, self.from_client);
|
||||||
}
|
}
|
||||||
|
|
||||||
test {
|
test next {
|
||||||
const io = std.testing.io;
|
|
||||||
const gpa = std.testing.allocator;
|
const gpa = std.testing.allocator;
|
||||||
|
|
||||||
var from_client: std.Io.Reader = .fixed(
|
var from_client: std.Io.Reader = .fixed(
|
||||||
"CONNECT {\"verbose\":false,\"pedantic\":false,\"tls_required\":false,\"name\":\"NATS CLI Version v0.2.4\",\"lang\":\"go\",\"version\":\"1.43.0\",\"protocol\":1,\"echo\":true,\"headers\":true,\"no_responders\":true}\r\n" ++
|
"CONNECT {\"verbose\":false,\"pedantic\":false,\"tls_r" ++
|
||||||
|
"equired\":false,\"name\":\"NATS CLI Version v0.2." ++
|
||||||
|
"4\",\"lang\":\"go\",\"version\":\"1.43.0\",\"prot" ++
|
||||||
|
"ocol\":1,\"echo\":true,\"headers\":true,\"no_responders\":true}\r\n" ++
|
||||||
"PING\r\n",
|
"PING\r\n",
|
||||||
);
|
);
|
||||||
var from_client_buf: [1024]Message = undefined;
|
|
||||||
var from_client_queue: std.Io.Queue(Message) = .init(&from_client_buf);
|
var client: Client = .init(null, undefined, undefined, undefined, &from_client, undefined);
|
||||||
|
|
||||||
{
|
{
|
||||||
// Simulate stream
|
// Simulate stream
|
||||||
while (Message.next(gpa, &from_client)) |msg| {
|
|
||||||
try from_client_queue.putOne(io, msg);
|
|
||||||
} else |err| switch (err) {
|
|
||||||
error.EndOfStream => from_client_queue.close(io),
|
|
||||||
else => return err,
|
|
||||||
}
|
|
||||||
|
|
||||||
while (from_client_queue.getOne(io)) |msg| {
|
{
|
||||||
switch (msg) {
|
const msg = try client.next(gpa);
|
||||||
.connect => |*c| {
|
try std.testing.expectEqual(.CONNECT, std.meta.activeTag(msg));
|
||||||
std.debug.print("Message: {any}\n", .{msg});
|
defer msg.CONNECT.deinit(gpa);
|
||||||
c.deinit(gpa);
|
try std.testing.expectEqualDeep(Message{
|
||||||
|
.CONNECT = .{
|
||||||
|
.verbose = false,
|
||||||
|
.pedantic = false,
|
||||||
|
.tls_required = false,
|
||||||
|
.name = "NATS CLI Version v0.2.4",
|
||||||
|
.lang = "go",
|
||||||
|
.version = "1.43.0",
|
||||||
|
.protocol = 1,
|
||||||
|
.echo = true,
|
||||||
|
.headers = true,
|
||||||
|
.no_responders = true,
|
||||||
},
|
},
|
||||||
else => {
|
}, msg);
|
||||||
std.debug.print("Message: {any}\n", .{msg});
|
|
||||||
},
|
|
||||||
}
|
|
||||||
} else |_| {}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
from_client_queue = .init(&from_client_buf);
|
{
|
||||||
// Reset the reader to process it again.
|
const msg = try client.next(gpa);
|
||||||
from_client.seek = 0;
|
try std.testing.expectEqual(.PING, std.meta.activeTag(msg));
|
||||||
|
}
|
||||||
// {
|
}
|
||||||
// const SemiClient = struct {
|
|
||||||
// q: std.Io.Queue(Message),
|
|
||||||
|
|
||||||
// fn parseClientInput(self: *@This(), ioh: std.Io, in: *std.Io.Reader) void {
|
|
||||||
// defer std.debug.print("done parse\n", .{});
|
|
||||||
// while (Message.next(gpa, in)) |msg| {
|
|
||||||
// self.q.putOne(ioh, msg) catch return;
|
|
||||||
// } else |_| {}
|
|
||||||
// }
|
|
||||||
|
|
||||||
// fn next(self: *@This(), ioh: std.Io) !Message {
|
|
||||||
// return self.q.getOne(ioh);
|
|
||||||
// }
|
|
||||||
|
|
||||||
// fn printAll(self: *@This(), ioh: std.Io) void {
|
|
||||||
// defer std.debug.print("done print\n", .{});
|
|
||||||
// while (self.next(ioh)) |*msg| {
|
|
||||||
// std.debug.print("Client msg: {any}\n", .{msg});
|
|
||||||
// switch (msg.*) {
|
|
||||||
// .connect => |c| {
|
|
||||||
// c.deinit(gpa);
|
|
||||||
// },
|
|
||||||
// else => {},
|
|
||||||
// }
|
|
||||||
// } else |_| {}
|
|
||||||
// }
|
|
||||||
// };
|
|
||||||
|
|
||||||
// var c: SemiClient = .{ .q = from_client_queue };
|
|
||||||
// var group: std.Io.Group = .init;
|
|
||||||
// defer group.wait(io);
|
|
||||||
|
|
||||||
// group.concurrent(io, SemiClient.printAll, .{ &c, io }) catch {
|
|
||||||
// @panic("could not start printAll\n");
|
|
||||||
// };
|
|
||||||
|
|
||||||
// group.concurrent(io, SemiClient.parseClientInput, .{ &c, io, &from_client }) catch {
|
|
||||||
// @panic("could not start printAll\n");
|
|
||||||
// };
|
|
||||||
// }
|
|
||||||
|
|
||||||
////////
|
|
||||||
|
|
||||||
// const connect = (Message.next(gpa, &from_client) catch unreachable).connect;
|
|
||||||
|
|
||||||
// var to_client_alloc: std.Io.Writer.Allocating = .init(gpa);
|
|
||||||
// defer to_client_alloc.deinit();
|
|
||||||
// var to_client = to_client_alloc.writer;
|
|
||||||
|
|
||||||
// var client: ClientState = try .init(io, gpa, 0, connect, &from_client, &to_client);
|
|
||||||
// defer client.deinit(gpa);
|
|
||||||
|
|
||||||
// {
|
|
||||||
// var get_next = io.concurrent(ClientState.next, .{ &client, io }) catch unreachable;
|
|
||||||
// defer if (get_next.cancel(io)) |_| {} else |_| @panic("fail");
|
|
||||||
|
|
||||||
// var timeout = io.concurrent(std.Io.sleep, .{ io, .fromMilliseconds(1000), .awake }) catch unreachable;
|
|
||||||
// defer timeout.cancel(io) catch {};
|
|
||||||
|
|
||||||
// switch (try io.select(.{
|
|
||||||
// .get_next = &get_next,
|
|
||||||
// .timeout = &timeout,
|
|
||||||
// })) {
|
|
||||||
// .get_next => |next| {
|
|
||||||
// std.debug.print("next is {any}\n", .{next});
|
|
||||||
// try std.testing.expect((next catch |err| return err) == .ping);
|
|
||||||
// },
|
|
||||||
// .timeout => {
|
|
||||||
// std.debug.print("reached timeout\n", .{});
|
|
||||||
// return error.TestUnexpectedResult;
|
|
||||||
// },
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
}
|
}
|
||||||
26
src/main.zig
26
src/main.zig
@@ -6,6 +6,8 @@ const yazap = @import("yazap");
|
|||||||
const Message = zits.MessageParser.Message;
|
const Message = zits.MessageParser.Message;
|
||||||
const Server = zits.Server;
|
const Server = zits.Server;
|
||||||
|
|
||||||
|
const serverSubcommand = @import("./subcommand/server.zig").main;
|
||||||
|
|
||||||
pub fn main() !void {
|
pub fn main() !void {
|
||||||
var dba: std.heap.DebugAllocator(.{}) = .init;
|
var dba: std.heap.DebugAllocator(.{}) = .init;
|
||||||
defer _ = dba.deinit();
|
defer _ = dba.deinit();
|
||||||
@@ -67,7 +69,7 @@ pub fn main() !void {
|
|||||||
info.server_name = name;
|
info.server_name = name;
|
||||||
}
|
}
|
||||||
|
|
||||||
try @import("./server/main.zig").main(gpa, info);
|
try serverSubcommand(gpa, info);
|
||||||
return;
|
return;
|
||||||
} else if (matches.subcommandMatches("pub")) |_| {
|
} else if (matches.subcommandMatches("pub")) |_| {
|
||||||
std.debug.print("Unimplemented\n", .{});
|
std.debug.print("Unimplemented\n", .{});
|
||||||
@@ -76,3 +78,25 @@ pub fn main() !void {
|
|||||||
|
|
||||||
try app.displayHelp(io);
|
try app.displayHelp(io);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub const std_options: std.Options = .{
|
||||||
|
// By default, in safe build modes, the standard library will attach a segfault handler to the program to
|
||||||
|
// print a helpful stack trace if a segmentation fault occurs. Here, we can disable this, or even enable
|
||||||
|
// it in unsafe build modes.
|
||||||
|
.enable_segfault_handler = true,
|
||||||
|
// This is the logging function used by `std.log`.
|
||||||
|
.logFn = myLogFn,
|
||||||
|
};
|
||||||
|
|
||||||
|
fn myLogFn(
|
||||||
|
comptime level: std.log.Level,
|
||||||
|
comptime scope: @EnumLiteral(),
|
||||||
|
comptime format: []const u8,
|
||||||
|
args: anytype,
|
||||||
|
) void {
|
||||||
|
if (scope == .zits) {
|
||||||
|
std.log.defaultLog(level, std.log.default_log_scope, format, args);
|
||||||
|
} else {
|
||||||
|
std.log.defaultLog(level, scope, format, args);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
@@ -1,3 +1 @@
|
|||||||
const MessageParser = @import("server/message_parser.zig");
|
pub const Server = @import("Server.zig");
|
||||||
|
|
||||||
pub const Server = @import("server/Server.zig");
|
|
||||||
|
|||||||
Reference in New Issue
Block a user