mirror of
https://git.robbyzambito.me/zits
synced 2026-02-04 03:34:48 +00:00
97 mbps !!! super fast
dosen't flush every message, pulls batches from the queue to send, and flushes at the end of each batch. batches are a min of 1 message, but may be more.
This commit is contained in:
@@ -25,7 +25,7 @@ pub fn init(
|
|||||||
|
|
||||||
pub fn start(self: *Client, io: std.Io, alloc: std.mem.Allocator, queue: *std.Io.Queue(Message)) !void {
|
pub fn start(self: *Client, io: std.Io, alloc: std.mem.Allocator, queue: *std.Io.Queue(Message)) !void {
|
||||||
self.recv_queue = queue;
|
self.recv_queue = queue;
|
||||||
var msgs: [16]Message = undefined;
|
var msgs: [8]Message = undefined;
|
||||||
while (true) {
|
while (true) {
|
||||||
const len = try queue.get(io, &msgs, 1);
|
const len = try queue.get(io, &msgs, 1);
|
||||||
std.debug.assert(len <= msgs.len);
|
std.debug.assert(len <= msgs.len);
|
||||||
@@ -49,6 +49,7 @@ pub fn start(self: *Client, io: std.Io, alloc: std.mem.Allocator, queue: *std.Io
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
try self.to_client.flush();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -68,19 +69,20 @@ pub fn next(self: *Client, allocator: std.mem.Allocator) !Message {
|
|||||||
|
|
||||||
fn writeOk(out: *std.Io.Writer) !void {
|
fn writeOk(out: *std.Io.Writer) !void {
|
||||||
_ = try out.write("+OK\r\n");
|
_ = try out.write("+OK\r\n");
|
||||||
try out.flush();
|
}
|
||||||
|
|
||||||
|
fn writeErr(out: *std.Io.Writer, msg: []const u8) !void {
|
||||||
|
_ = try out.print("-ERR '{s}'\r\n", .{msg});
|
||||||
}
|
}
|
||||||
|
|
||||||
fn writePong(out: *std.Io.Writer) !void {
|
fn writePong(out: *std.Io.Writer) !void {
|
||||||
_ = try out.write("PONG\r\n");
|
_ = try out.write("PONG\r\n");
|
||||||
try out.flush();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn writeInfo(out: *std.Io.Writer, info: Message.ServerInfo) !void {
|
pub fn writeInfo(out: *std.Io.Writer, info: Message.ServerInfo) !void {
|
||||||
_ = try out.write("INFO ");
|
_ = try out.write("INFO ");
|
||||||
try std.json.Stringify.value(info, .{}, out);
|
try std.json.Stringify.value(info, .{}, out);
|
||||||
_ = try out.write("\r\n");
|
_ = try out.write("\r\n");
|
||||||
try out.flush();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn writeMsg(out: *std.Io.Writer, msg: Message.Msg) !void {
|
fn writeMsg(out: *std.Io.Writer, msg: Message.Msg) !void {
|
||||||
@@ -94,7 +96,6 @@ fn writeMsg(out: *std.Io.Writer, msg: Message.Msg) !void {
|
|||||||
msg.payload,
|
msg.payload,
|
||||||
},
|
},
|
||||||
);
|
);
|
||||||
try out.flush();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
test {
|
test {
|
||||||
|
|||||||
@@ -126,12 +126,12 @@ fn handleConnection(
|
|||||||
const allocator = if (builtin.mode == .Debug or builtin.mode == .ReleaseSafe) client_allocator.allocator() else server_allocator;
|
const allocator = if (builtin.mode == .Debug or builtin.mode == .ReleaseSafe) client_allocator.allocator() else server_allocator;
|
||||||
|
|
||||||
// Set up client writer
|
// Set up client writer
|
||||||
var w_buffer: [256]u8 = undefined;
|
var w_buffer: [1024]u8 = undefined;
|
||||||
var writer = stream.writer(io, &w_buffer);
|
var writer = stream.writer(io, &w_buffer);
|
||||||
const out = &writer.interface;
|
const out = &writer.interface;
|
||||||
|
|
||||||
// Set up client reader
|
// Set up client reader
|
||||||
var r_buffer: [256]u8 = undefined;
|
var r_buffer: [1024]u8 = undefined;
|
||||||
var reader = stream.reader(io, &r_buffer);
|
var reader = stream.reader(io, &r_buffer);
|
||||||
const in = &reader.interface;
|
const in = &reader.interface;
|
||||||
|
|
||||||
@@ -140,8 +140,9 @@ fn handleConnection(
|
|||||||
try server.addClient(server_allocator, id, &client);
|
try server.addClient(server_allocator, id, &client);
|
||||||
defer server.removeClient(io, server_allocator, id);
|
defer server.removeClient(io, server_allocator, id);
|
||||||
|
|
||||||
var qbuf: [16]Message = undefined;
|
var qbuf: [8]Message = undefined;
|
||||||
var queue: std.Io.Queue(Message) = .init(&qbuf);
|
var queue: std.Io.Queue(Message) = .init(&qbuf);
|
||||||
|
defer queue.close(io);
|
||||||
|
|
||||||
var client_task = try io.concurrent(Client.start, .{ &client, io, server_allocator, &queue });
|
var client_task = try io.concurrent(Client.start, .{ &client, io, server_allocator, &queue });
|
||||||
defer client_task.cancel(io) catch {};
|
defer client_task.cancel(io) catch {};
|
||||||
|
|||||||
Reference in New Issue
Block a user