Files
zits/src/Server/Client.zig

146 lines
4.2 KiB
Zig

const message = @import("message.zig");
const parse = message.parse;
const Message = message.Message;
const std = @import("std");
const Queue = std.Io.Queue;
const Client = @This();
connect: ?Message.Connect,
// Byte queue for this client to receive.
recv_queue: *Queue(u8),
// Only necessary to hold this lock for writing to the queue (to avoid interleaving message writes).
recv_queue_write_lock: std.Io.Mutex = .init,
from_client: *std.Io.Reader,
to_client: *std.Io.Writer,
pub fn init(
connect: ?Message.Connect,
recv_queue: *Queue(u8),
in: *std.Io.Reader,
out: *std.Io.Writer,
) Client {
return .{
.connect = connect,
.recv_queue = recv_queue,
.from_client = in,
.to_client = out,
};
}
pub fn deinit(self: Client, alloc: std.mem.Allocator) void {
if (self.connect) |c| {
c.deinit(alloc);
}
}
pub fn start(self: *Client, io: std.Io) !void {
std.debug.assert(self.to_client.buffer.len > 0);
std.debug.assert(self.to_client.end == 0);
while (true) {
self.to_client.end = try self.recv_queue.get(io, self.to_client.buffer, 1);
try self.to_client.flush();
}
}
pub fn send(self: *Client, io: std.Io, msg: []const u8) !void {
try self.recv_queue.putAll(io, msg);
}
test send {
const io = std.testing.io;
const gpa = std.testing.allocator;
var to_client: std.Io.Writer = .fixed(blk: {
var buf: [1024]u8 = undefined;
break :blk &buf;
});
var recv_queue: Queue(u8) = .init(&.{});
var client: Client = .init(null, &recv_queue, undefined, &to_client);
defer client.deinit(gpa);
var c_task = try io.concurrent(Client.start, .{ &client, io });
defer c_task.cancel(io) catch {};
{
try client.send(io, "PONG\r\n");
// Wait for the concurrent client task to write to the writer
try io.sleep(.fromMilliseconds(1), .awake);
try std.testing.expectEqualSlices(u8, "PONG\r\n", to_client.buffered());
}
to_client.end = 0;
{
const payload = "payload";
const msg: Message.Msg = .{
.sid = "1",
.subject = "subject",
.reply_to = "reply",
.payload = .{
.len = payload.len,
.short = blk: {
var buf: [128]u8 = undefined;
@memcpy(buf[0..payload.len], payload);
break :blk buf;
},
.long = null,
},
};
try client.send(io, .{
// msg must be owned by the allocator the client uses
.MSG = try msg.dupe(gpa),
});
try io.sleep(.fromMilliseconds(1), .awake);
try std.testing.expectEqualSlices(u8, "MSG subject 1 reply 7\r\npayload\r\n", to_client.buffered());
}
}
pub fn next(self: *Client) !message.Control {
return parse.control(self.from_client);
}
test next {
const gpa = std.testing.allocator;
var from_client: std.Io.Reader = .fixed(
"CONNECT {\"verbose\":false,\"pedantic\":false,\"tls_r" ++
"equired\":false,\"name\":\"NATS CLI Version v0.2." ++
"4\",\"lang\":\"go\",\"version\":\"1.43.0\",\"prot" ++
"ocol\":1,\"echo\":true,\"headers\":true,\"no_responders\":true}\r\n" ++
"PING\r\n",
);
var client: Client = .init(null, undefined, undefined, undefined, &from_client, undefined);
{
// Simulate stream
{
const msg = try client.next(gpa);
try std.testing.expectEqual(.CONNECT, std.meta.activeTag(msg));
defer msg.CONNECT.deinit(gpa);
try std.testing.expectEqualDeep(Message{
.CONNECT = .{
.verbose = false,
.pedantic = false,
.tls_required = false,
.name = "NATS CLI Version v0.2.4",
.lang = "go",
.version = "1.43.0",
.protocol = 1,
.echo = true,
.headers = true,
.no_responders = true,
},
}, msg);
}
{
const msg = try client.next(gpa);
try std.testing.expectEqual(.PING, std.meta.activeTag(msg));
}
}
}