mirror of
https://git.robbyzambito.me/zits
synced 2026-02-04 03:34:48 +00:00
Use memorypool for hot messages
This commit is contained in:
@@ -1,5 +1,6 @@
|
|||||||
const Message = @import("message_parser.zig").Message;
|
const Message = @import("message_parser.zig").Message;
|
||||||
const std = @import("std");
|
const std = @import("std");
|
||||||
|
const HotMessageManager = @import("message_parser.zig").HotMessageManager;
|
||||||
|
|
||||||
const Client = @This();
|
const Client = @This();
|
||||||
|
|
||||||
@@ -26,7 +27,7 @@ pub fn init(
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub fn deinit(self: *Client, alloc: std.mem.Allocator) void {
|
pub fn deinit(self: *Client, alloc: std.mem.Allocator) void {
|
||||||
if (self.connect) |c| {
|
if (self.connect) |*c| {
|
||||||
c.deinit(alloc);
|
c.deinit(alloc);
|
||||||
}
|
}
|
||||||
self.* = undefined;
|
self.* = undefined;
|
||||||
@@ -41,13 +42,20 @@ pub fn start(self: *Client, io: std.Io, alloc: std.mem.Allocator) !void {
|
|||||||
for (0..len) |i| {
|
for (0..len) |i| {
|
||||||
const msg = msgs[i];
|
const msg = msgs[i];
|
||||||
defer switch (msg) {
|
defer switch (msg) {
|
||||||
.msg => |m| m.deinit(alloc),
|
.msg => {
|
||||||
.hmsg => |h| h.deinit(alloc),
|
msg.msg.deinit(alloc);
|
||||||
|
},
|
||||||
|
.hmsg => {
|
||||||
|
msg.hmsg.msg.deinit(alloc);
|
||||||
|
},
|
||||||
else => {},
|
else => {},
|
||||||
};
|
};
|
||||||
errdefer for (msgs[i + 1 .. len]) |mg| switch (mg) {
|
errdefer for (msgs[i + 1 .. len]) |mg| switch (mg) {
|
||||||
.msg => |m| {
|
.msg => {
|
||||||
m.deinit(alloc);
|
mg.msg.deinit(alloc);
|
||||||
|
},
|
||||||
|
.hmsg => {
|
||||||
|
mg.hmsg.msg.deinit(alloc);
|
||||||
},
|
},
|
||||||
else => {},
|
else => {},
|
||||||
};
|
};
|
||||||
@@ -103,8 +111,12 @@ pub fn send(self: *Client, io: std.Io, msg: Message) !void {
|
|||||||
try self.recv_queue.putOne(io, msg);
|
try self.recv_queue.putOne(io, msg);
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn next(self: *Client, allocator: std.mem.Allocator) !Message {
|
pub fn next(
|
||||||
return Message.next(allocator, self.from_client);
|
self: *Client,
|
||||||
|
allocator: std.mem.Allocator,
|
||||||
|
hot_msg_manager: *HotMessageManager,
|
||||||
|
) !Message {
|
||||||
|
return Message.next(allocator, self.from_client, hot_msg_manager);
|
||||||
}
|
}
|
||||||
|
|
||||||
test {
|
test {
|
||||||
|
|||||||
@@ -13,6 +13,7 @@ const Stream = std.Io.net.Stream;
|
|||||||
const message_parser = @import("./message_parser.zig");
|
const message_parser = @import("./message_parser.zig");
|
||||||
pub const MessageType = message_parser.MessageType;
|
pub const MessageType = message_parser.MessageType;
|
||||||
pub const Message = message_parser.Message;
|
pub const Message = message_parser.Message;
|
||||||
|
const HotMessageManager = message_parser.HotMessageManager;
|
||||||
const ServerInfo = Message.ServerInfo;
|
const ServerInfo = Message.ServerInfo;
|
||||||
pub const Client = @import("./Client.zig");
|
pub const Client = @import("./Client.zig");
|
||||||
const Server = @This();
|
const Server = @This();
|
||||||
@@ -49,11 +50,15 @@ pub fn deinit(server: *Server, io: Io, alloc: Allocator) void {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub fn start(server: *Server, io: Io, gpa: Allocator) !void {
|
pub fn start(server: *Server, io: Io, gpa: Allocator) !void {
|
||||||
|
var hot_msg_manager: HotMessageManager = .{ .io = io };
|
||||||
|
defer hot_msg_manager.deinit(gpa);
|
||||||
|
|
||||||
var tcp_server = try IpAddress.listen(try IpAddress.parse(
|
var tcp_server = try IpAddress.listen(try IpAddress.parse(
|
||||||
server.info.host,
|
server.info.host,
|
||||||
server.info.port,
|
server.info.port,
|
||||||
), io, .{ .reuse_address = true });
|
), io, .{ .reuse_address = true });
|
||||||
defer tcp_server.deinit(io);
|
defer tcp_server.deinit(io);
|
||||||
|
|
||||||
log.debug("Server headers: {s}", .{if (server.info.headers) "true" else "false"});
|
log.debug("Server headers: {s}", .{if (server.info.headers) "true" else "false"});
|
||||||
log.debug("Server max payload: {d}", .{server.info.max_payload});
|
log.debug("Server max payload: {d}", .{server.info.max_payload});
|
||||||
log.info("Server ID: {s}", .{server.info.server_id});
|
log.info("Server ID: {s}", .{server.info.server_id});
|
||||||
@@ -69,7 +74,14 @@ pub fn start(server: *Server, io: Io, gpa: Allocator) !void {
|
|||||||
log.debug("Accepting next client", .{});
|
log.debug("Accepting next client", .{});
|
||||||
const stream = try tcp_server.accept(io);
|
const stream = try tcp_server.accept(io);
|
||||||
log.debug("Accepted connection {d}", .{id});
|
log.debug("Accepted connection {d}", .{id});
|
||||||
_ = client_group.concurrent(io, handleConnectionInfallible, .{ server, gpa, io, id, stream }) catch {
|
_ = client_group.concurrent(io, handleConnectionInfallible, .{
|
||||||
|
server,
|
||||||
|
gpa,
|
||||||
|
io,
|
||||||
|
id,
|
||||||
|
stream,
|
||||||
|
&hot_msg_manager,
|
||||||
|
}) catch {
|
||||||
log.err("Could not start concurrent handler for {d}", .{id});
|
log.err("Could not start concurrent handler for {d}", .{id});
|
||||||
stream.close(io);
|
stream.close(io);
|
||||||
};
|
};
|
||||||
@@ -96,13 +108,27 @@ fn removeClient(server: *Server, io: Io, allocator: Allocator, id: usize) void {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn handleConnectionInfallible(server: *Server, server_allocator: Allocator, io: Io, id: usize, stream: Stream) void {
|
fn handleConnectionInfallible(
|
||||||
handleConnection(server, server_allocator, io, id, stream) catch |err| {
|
server: *Server,
|
||||||
|
server_allocator: Allocator,
|
||||||
|
io: Io,
|
||||||
|
id: usize,
|
||||||
|
stream: Stream,
|
||||||
|
hot_msg_manager: *HotMessageManager,
|
||||||
|
) void {
|
||||||
|
handleConnection(server, server_allocator, io, id, stream, hot_msg_manager) catch |err| {
|
||||||
log.err("Failed processing client {d}: {any}", .{ id, err });
|
log.err("Failed processing client {d}: {any}", .{ id, err });
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
fn handleConnection(server: *Server, server_allocator: Allocator, io: Io, id: usize, stream: Stream) !void {
|
fn handleConnection(
|
||||||
|
server: *Server,
|
||||||
|
server_allocator: Allocator,
|
||||||
|
io: Io,
|
||||||
|
id: usize,
|
||||||
|
stream: Stream,
|
||||||
|
hot_msg_manager: *HotMessageManager,
|
||||||
|
) !void {
|
||||||
defer stream.close(io);
|
defer stream.close(io);
|
||||||
|
|
||||||
// TODO: use a client allocator for things that should only live for as long as the client?
|
// TODO: use a client allocator for things that should only live for as long as the client?
|
||||||
@@ -129,8 +155,14 @@ fn handleConnection(server: *Server, server_allocator: Allocator, io: Io, id: us
|
|||||||
queue.close(io);
|
queue.close(io);
|
||||||
while (queue.getOne(io)) |msg| {
|
while (queue.getOne(io)) |msg| {
|
||||||
switch (msg) {
|
switch (msg) {
|
||||||
.msg => |m| m.deinit(server_allocator),
|
.msg => {
|
||||||
.hmsg => |h| h.deinit(server_allocator),
|
msg.msg.deinit(server_allocator);
|
||||||
|
hot_msg_manager.destroyMsg(msg.msg);
|
||||||
|
},
|
||||||
|
.hmsg => {
|
||||||
|
msg.hmsg.msg.deinit(server_allocator);
|
||||||
|
hot_msg_manager.destroyMsg(msg.hmsg.msg);
|
||||||
|
},
|
||||||
else => {},
|
else => {},
|
||||||
}
|
}
|
||||||
} else |_| {}
|
} else |_| {}
|
||||||
@@ -150,7 +182,7 @@ fn handleConnection(server: *Server, server_allocator: Allocator, io: Io, id: us
|
|||||||
defer client_task.cancel(io) catch {};
|
defer client_task.cancel(io) catch {};
|
||||||
|
|
||||||
// Messages are owned by the server after they are received from the client
|
// Messages are owned by the server after they are received from the client
|
||||||
while (client.next(server_allocator)) |msg| {
|
while (client.next(server_allocator, hot_msg_manager)) |msg| {
|
||||||
switch (msg) {
|
switch (msg) {
|
||||||
.ping => {
|
.ping => {
|
||||||
// Respond to ping with pong.
|
// Respond to ping with pong.
|
||||||
@@ -158,18 +190,18 @@ fn handleConnection(server: *Server, server_allocator: Allocator, io: Io, id: us
|
|||||||
},
|
},
|
||||||
.@"pub", .hpub => {
|
.@"pub", .hpub => {
|
||||||
defer switch (msg) {
|
defer switch (msg) {
|
||||||
.@"pub" => |pb| pb.deinit(server_allocator),
|
.@"pub" => msg.@"pub".deinit(server_allocator),
|
||||||
.hpub => |hp| hp.deinit(server_allocator),
|
.hpub => msg.hpub.@"pub".deinit(server_allocator),
|
||||||
else => unreachable,
|
else => unreachable,
|
||||||
};
|
};
|
||||||
try server.publishMessage(io, server_allocator, &client, msg);
|
try server.publishMessage(io, server_allocator, hot_msg_manager, &client, msg);
|
||||||
},
|
},
|
||||||
.sub => |sub| {
|
.sub => |sub| {
|
||||||
defer sub.deinit(server_allocator);
|
// defer sub.deinit(server_allocator);
|
||||||
try server.subscribe(io, server_allocator, id, sub);
|
try server.subscribe(io, server_allocator, id, sub);
|
||||||
},
|
},
|
||||||
.unsub => |unsub| {
|
.unsub => |unsub| {
|
||||||
defer unsub.deinit(server_allocator);
|
// defer unsub.deinit(server_allocator);
|
||||||
try server.unsubscribe(io, server_allocator, id, unsub);
|
try server.unsubscribe(io, server_allocator, id, unsub);
|
||||||
},
|
},
|
||||||
.connect => |connect| {
|
.connect => |connect| {
|
||||||
@@ -226,7 +258,14 @@ test subjectMatches {
|
|||||||
try expect(subjectMatches("foo.>", "foo.bar.baz"));
|
try expect(subjectMatches("foo.>", "foo.bar.baz"));
|
||||||
}
|
}
|
||||||
|
|
||||||
fn publishMessage(server: *Server, io: Io, alloc: Allocator, source_client: *Client, msg: Message) !void {
|
fn publishMessage(
|
||||||
|
server: *Server,
|
||||||
|
io: Io,
|
||||||
|
alloc: Allocator,
|
||||||
|
hot_msg_manager: *HotMessageManager,
|
||||||
|
source_client: *Client,
|
||||||
|
msg: Message,
|
||||||
|
) !void {
|
||||||
errdefer {
|
errdefer {
|
||||||
if (source_client.connect) |c| {
|
if (source_client.connect) |c| {
|
||||||
if (c.verbose) {
|
if (c.verbose) {
|
||||||
@@ -250,13 +289,14 @@ fn publishMessage(server: *Server, io: Io, alloc: Allocator, source_client: *Cli
|
|||||||
|
|
||||||
switch (msg) {
|
switch (msg) {
|
||||||
.@"pub" => |pb| client.send(io, .{
|
.@"pub" => |pb| client.send(io, .{
|
||||||
.msg = try pb.toMsg(alloc, subscription.sid),
|
.msg = try pb.toMsg(alloc, hot_msg_manager, subscription.sid),
|
||||||
}) catch |err| switch (err) {
|
}) catch |err| switch (err) {
|
||||||
error.Canceled => return err,
|
error.Canceled => return err,
|
||||||
else => {},
|
else => {},
|
||||||
},
|
},
|
||||||
.hpub => |hp| client.send(io, .{ .hmsg = try hp.toHMsg(
|
.hpub => |hp| client.send(io, .{ .hmsg = try hp.toHMsg(
|
||||||
alloc,
|
alloc,
|
||||||
|
hot_msg_manager,
|
||||||
subscription.sid,
|
subscription.sid,
|
||||||
) }) catch |err| switch (err) {
|
) }) catch |err| switch (err) {
|
||||||
error.Canceled => return err,
|
error.Canceled => return err,
|
||||||
|
|||||||
@@ -6,6 +6,7 @@ const StaticStringMap = std.StaticStringMap;
|
|||||||
|
|
||||||
const Io = std.Io;
|
const Io = std.Io;
|
||||||
const AllocatingWriter = Io.Writer.Allocating;
|
const AllocatingWriter = Io.Writer.Allocating;
|
||||||
|
const Mutex = Io.Mutex;
|
||||||
const Reader = Io.Reader;
|
const Reader = Io.Reader;
|
||||||
|
|
||||||
const ascii = std.ascii;
|
const ascii = std.ascii;
|
||||||
@@ -22,11 +23,11 @@ pub const MessageType = @typeInfo(Message).@"union".tag_type.?;
|
|||||||
pub const Message = union(enum) {
|
pub const Message = union(enum) {
|
||||||
info: ServerInfo,
|
info: ServerInfo,
|
||||||
connect: Connect,
|
connect: Connect,
|
||||||
@"pub": Pub,
|
@"pub": *Pub,
|
||||||
hpub: HPub,
|
hpub: HPub,
|
||||||
sub: Sub,
|
sub: Sub,
|
||||||
unsub: Unsub,
|
unsub: Unsub,
|
||||||
msg: Msg,
|
msg: *Msg,
|
||||||
hmsg: HMsg,
|
hmsg: HMsg,
|
||||||
ping,
|
ping,
|
||||||
pong,
|
pong,
|
||||||
@@ -77,7 +78,7 @@ pub const Message = union(enum) {
|
|||||||
headers: ?bool = null,
|
headers: ?bool = null,
|
||||||
nkey: ?[]const u8 = null,
|
nkey: ?[]const u8 = null,
|
||||||
|
|
||||||
pub fn deinit(self: Connect, alloc: Allocator) void {
|
pub fn deinit(self: *Connect, alloc: Allocator) void {
|
||||||
if (self.auth_token) |a| alloc.free(a);
|
if (self.auth_token) |a| alloc.free(a);
|
||||||
if (self.user) |u| alloc.free(u);
|
if (self.user) |u| alloc.free(u);
|
||||||
if (self.pass) |p| alloc.free(p);
|
if (self.pass) |p| alloc.free(p);
|
||||||
@@ -87,6 +88,7 @@ pub const Message = union(enum) {
|
|||||||
if (self.sig) |s| alloc.free(s);
|
if (self.sig) |s| alloc.free(s);
|
||||||
if (self.jwt) |j| alloc.free(j);
|
if (self.jwt) |j| alloc.free(j);
|
||||||
if (self.nkey) |n| alloc.free(n);
|
if (self.nkey) |n| alloc.free(n);
|
||||||
|
self.* = undefined;
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn dupe(self: Connect, alloc: Allocator) !Connect {
|
pub fn dupe(self: Connect, alloc: Allocator) !Connect {
|
||||||
@@ -118,46 +120,58 @@ pub const Message = union(enum) {
|
|||||||
/// The reply subject that subscribers can use to send a response back to the publisher/requestor.
|
/// The reply subject that subscribers can use to send a response back to the publisher/requestor.
|
||||||
reply_to: ?[]const u8 = null,
|
reply_to: ?[]const u8 = null,
|
||||||
/// The message payload data.
|
/// The message payload data.
|
||||||
payload: []const u8,
|
payload_len: usize,
|
||||||
|
payload: [128]u8,
|
||||||
|
payload_extra: []const u8 = &.{},
|
||||||
|
|
||||||
pub fn deinit(self: Pub, alloc: Allocator) void {
|
pub fn deinit(self: *Pub, alloc: Allocator) void {
|
||||||
alloc.free(self.subject);
|
alloc.free(self.subject);
|
||||||
alloc.free(self.payload);
|
alloc.free(self.payload_extra);
|
||||||
if (self.reply_to) |r| alloc.free(r);
|
if (self.reply_to) |r| alloc.free(r);
|
||||||
|
self.* = undefined;
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn toMsg(self: Pub, alloc: Allocator, sid: []const u8) !Msg {
|
pub fn toMsg(
|
||||||
|
self: Pub,
|
||||||
|
alloc: Allocator,
|
||||||
|
hot_msg_manager: *HotMessageManager,
|
||||||
|
sid: []const u8,
|
||||||
|
) !*Msg {
|
||||||
const res: Msg = .{
|
const res: Msg = .{
|
||||||
.subject = self.subject,
|
.subject = self.subject,
|
||||||
.sid = sid,
|
.sid = sid,
|
||||||
.reply_to = self.reply_to,
|
.reply_to = self.reply_to,
|
||||||
|
.payload_len = self.payload_len,
|
||||||
.payload = self.payload,
|
.payload = self.payload,
|
||||||
|
.payload_extra = self.payload_extra,
|
||||||
};
|
};
|
||||||
return res.dupe(alloc);
|
return res.dupe(alloc, hot_msg_manager);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
pub const HPub = struct {
|
pub const HPub = struct {
|
||||||
header_bytes: usize,
|
header_bytes: usize,
|
||||||
@"pub": Pub,
|
@"pub": *Pub,
|
||||||
|
|
||||||
pub fn deinit(self: HPub, alloc: Allocator) void {
|
pub fn deinit(self: *HPub, alloc: Allocator) void {
|
||||||
self.@"pub".deinit(alloc);
|
self.@"pub".deinit(alloc);
|
||||||
|
self.* = undefined;
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn toHMsg(self: HPub, alloc: Allocator, sid: []const u8) !HMsg {
|
pub fn toHMsg(self: HPub, alloc: Allocator, hot_msg_manager: *HotMessageManager, sid: []const u8) !HMsg {
|
||||||
return .{
|
return .{
|
||||||
.header_bytes = self.header_bytes,
|
.header_bytes = self.header_bytes,
|
||||||
.msg = try self.@"pub".toMsg(alloc, sid),
|
.msg = try self.@"pub".toMsg(alloc, hot_msg_manager, sid),
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
pub const HMsg = struct {
|
pub const HMsg = struct {
|
||||||
header_bytes: usize,
|
header_bytes: usize,
|
||||||
msg: Msg,
|
msg: *Msg,
|
||||||
|
|
||||||
pub fn deinit(self: HMsg, alloc: Allocator) void {
|
pub fn deinit(self: *HMsg, alloc: Allocator) void {
|
||||||
self.msg.deinit(alloc);
|
self.msg.deinit(alloc);
|
||||||
|
self.* = undefined;
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn dupe(self: HMsg, alloc: Allocator) !HMsg {
|
pub fn dupe(self: HMsg, alloc: Allocator) !HMsg {
|
||||||
@@ -174,10 +188,11 @@ pub const Message = union(enum) {
|
|||||||
/// A unique alphanumeric subscription ID, generated by the client.
|
/// A unique alphanumeric subscription ID, generated by the client.
|
||||||
sid: []const u8,
|
sid: []const u8,
|
||||||
|
|
||||||
pub fn deinit(self: Sub, alloc: Allocator) void {
|
pub fn deinit(self: *Sub, alloc: Allocator) void {
|
||||||
alloc.free(self.subject);
|
alloc.free(self.subject);
|
||||||
alloc.free(self.sid);
|
alloc.free(self.sid);
|
||||||
if (self.queue_group) |q| alloc.free(q);
|
if (self.queue_group) |q| alloc.free(q);
|
||||||
|
self.* = undefined;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
pub const Unsub = struct {
|
pub const Unsub = struct {
|
||||||
@@ -186,33 +201,39 @@ pub const Message = union(enum) {
|
|||||||
/// A number of messages to wait for before automatically unsubscribing.
|
/// A number of messages to wait for before automatically unsubscribing.
|
||||||
max_msgs: ?usize = null,
|
max_msgs: ?usize = null,
|
||||||
|
|
||||||
pub fn deinit(self: Unsub, alloc: Allocator) void {
|
pub fn deinit(self: *Unsub, alloc: Allocator) void {
|
||||||
alloc.free(self.sid);
|
alloc.free(self.sid);
|
||||||
|
self.* = undefined;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
pub const Msg = struct {
|
pub const Msg = struct {
|
||||||
subject: []const u8,
|
subject: []const u8,
|
||||||
sid: []const u8,
|
sid: []const u8,
|
||||||
reply_to: ?[]const u8,
|
reply_to: ?[]const u8,
|
||||||
payload: []const u8,
|
payload_len: usize,
|
||||||
|
payload: [128]u8,
|
||||||
|
payload_extra: []const u8,
|
||||||
|
|
||||||
pub fn deinit(self: Msg, alloc: Allocator) void {
|
pub fn deinit(self: *Msg, alloc: Allocator) void {
|
||||||
alloc.free(self.subject);
|
alloc.free(self.subject);
|
||||||
alloc.free(self.sid);
|
alloc.free(self.sid);
|
||||||
if (self.reply_to) |r| alloc.free(r);
|
if (self.reply_to) |r| alloc.free(r);
|
||||||
alloc.free(self.payload);
|
alloc.free(self.payload_extra);
|
||||||
|
self.* = undefined;
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn dupe(self: Msg, alloc: Allocator) !Msg {
|
pub fn dupe(self: Msg, alloc: Allocator, hot_msg_manager: *HotMessageManager) !*Msg {
|
||||||
var res: Msg = undefined;
|
var res = try hot_msg_manager.createMsg(alloc);
|
||||||
res.subject = try alloc.dupe(u8, self.subject);
|
res.subject = try alloc.dupe(u8, self.subject);
|
||||||
errdefer alloc.free(res.subject);
|
errdefer alloc.free(res.subject);
|
||||||
res.sid = try alloc.dupe(u8, self.sid);
|
res.sid = try alloc.dupe(u8, self.sid);
|
||||||
errdefer alloc.free(res.sid);
|
errdefer alloc.free(res.sid);
|
||||||
res.reply_to = if (self.reply_to) |r| try alloc.dupe(u8, r) else null;
|
res.reply_to = if (self.reply_to) |r| try alloc.dupe(u8, r) else null;
|
||||||
errdefer if (res.reply_to) |r| alloc.free(r);
|
errdefer if (res.reply_to) |r| alloc.free(r);
|
||||||
res.payload = try alloc.dupe(u8, self.payload);
|
res.payload_len = self.payload_len;
|
||||||
errdefer alloc.free(res.payload);
|
@memcpy(&res.payload, &self.payload);
|
||||||
|
res.payload_extra = try alloc.dupe(u8, self.payload_extra);
|
||||||
|
errdefer alloc.free(res.payload_extra);
|
||||||
return res;
|
return res;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
@@ -240,7 +261,11 @@ pub const Message = union(enum) {
|
|||||||
pub const parse = parseStaticStringMap;
|
pub const parse = parseStaticStringMap;
|
||||||
|
|
||||||
/// An error should be handled by cleaning up this connection.
|
/// An error should be handled by cleaning up this connection.
|
||||||
pub fn next(alloc: Allocator, in: *Reader) !Message {
|
pub fn next(
|
||||||
|
alloc: Allocator,
|
||||||
|
in: *Reader,
|
||||||
|
hot_msg_manager: *HotMessageManager,
|
||||||
|
) !Message {
|
||||||
var operation_string: ArrayList(u8) = blk: {
|
var operation_string: ArrayList(u8) = blk: {
|
||||||
comptime var buf_len = 0;
|
comptime var buf_len = 0;
|
||||||
comptime {
|
comptime {
|
||||||
@@ -299,11 +324,11 @@ pub const Message = union(enum) {
|
|||||||
},
|
},
|
||||||
.@"pub" => {
|
.@"pub" => {
|
||||||
@branchHint(.likely);
|
@branchHint(.likely);
|
||||||
return parsePub(alloc, in);
|
return parsePub(alloc, in, hot_msg_manager);
|
||||||
},
|
},
|
||||||
.hpub => {
|
.hpub => {
|
||||||
@branchHint(.likely);
|
@branchHint(.likely);
|
||||||
return parseHPub(alloc, in);
|
return parseHPub(alloc, in, hot_msg_manager);
|
||||||
},
|
},
|
||||||
.ping => {
|
.ping => {
|
||||||
try expectStreamBytes(in, "\r\n");
|
try expectStreamBytes(in, "\r\n");
|
||||||
@@ -590,12 +615,15 @@ test parseUnsub {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn parsePub(alloc: Allocator, in: *Reader) !Message {
|
fn parsePub(alloc: Allocator, in: *Reader, hot_msg_manager: *HotMessageManager) !Message {
|
||||||
try in.discardAll(1); // throw away space
|
try in.discardAll(1); // throw away space
|
||||||
|
|
||||||
|
const res = try hot_msg_manager.createPub(alloc);
|
||||||
|
errdefer hot_msg_manager.destroyPub(res);
|
||||||
|
|
||||||
// Parse subject
|
// Parse subject
|
||||||
const subject: []const u8 = try readSubject(alloc, in, .@"pub");
|
res.*.subject = try readSubject(alloc, in, .@"pub");
|
||||||
errdefer alloc.free(subject);
|
errdefer alloc.free(res.*.subject);
|
||||||
|
|
||||||
const States = enum {
|
const States = enum {
|
||||||
before_second,
|
before_second,
|
||||||
@@ -609,8 +637,6 @@ fn parsePub(alloc: Allocator, in: *Reader) !Message {
|
|||||||
defer second.deinit(alloc);
|
defer second.deinit(alloc);
|
||||||
var third: ?ArrayList(u8) = null;
|
var third: ?ArrayList(u8) = null;
|
||||||
defer if (third) |*t| t.deinit(alloc);
|
defer if (third) |*t| t.deinit(alloc);
|
||||||
var payload: AllocatingWriter = .init(alloc);
|
|
||||||
errdefer payload.deinit();
|
|
||||||
|
|
||||||
sw: switch (@as(States, .before_second)) {
|
sw: switch (@as(States, .before_second)) {
|
||||||
.before_second => {
|
.before_second => {
|
||||||
@@ -658,25 +684,26 @@ fn parsePub(alloc: Allocator, in: *Reader) !Message {
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
const reply_to: ?[]const u8, const bytes: usize =
|
res.*.reply_to, res.*.payload_len =
|
||||||
if (third) |t| .{
|
if (third) |t| .{
|
||||||
try alloc.dupe(u8, second.items),
|
try second.toOwnedSlice(alloc),
|
||||||
try parseUnsigned(usize, t.items, 10),
|
try parseUnsigned(usize, t.items, 10),
|
||||||
} else .{
|
} else .{
|
||||||
null,
|
null,
|
||||||
try parseUnsigned(usize, second.items, 10),
|
try parseUnsigned(usize, second.items, 10),
|
||||||
};
|
};
|
||||||
|
|
||||||
try in.streamExact(&payload.writer, bytes);
|
try in.readSliceAll(res.*.payload[0..@min(res.*.payload.len, res.*.payload_len)]);
|
||||||
|
{
|
||||||
|
const remaining_payload_bytes = res.*.payload_len - res.*.payload.len;
|
||||||
|
var payload: AllocatingWriter = .init(alloc);
|
||||||
|
errdefer payload.deinit();
|
||||||
|
try in.streamExact(&payload.writer, remaining_payload_bytes);
|
||||||
|
res.*.payload_extra = try payload.toOwnedSlice();
|
||||||
|
}
|
||||||
try expectStreamBytes(in, "\r\n");
|
try expectStreamBytes(in, "\r\n");
|
||||||
|
|
||||||
return .{
|
return .{ .@"pub" = res };
|
||||||
.@"pub" = .{
|
|
||||||
.subject = subject,
|
|
||||||
.payload = try payload.toOwnedSlice(),
|
|
||||||
.reply_to = reply_to,
|
|
||||||
},
|
|
||||||
};
|
|
||||||
}
|
}
|
||||||
|
|
||||||
test parsePub {
|
test parsePub {
|
||||||
@@ -736,12 +763,15 @@ test parsePub {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn parseHPub(alloc: Allocator, in: *Reader) !Message {
|
fn parseHPub(alloc: Allocator, in: *Reader, hot_msg_manager: *HotMessageManager) !Message {
|
||||||
try in.discardAll(1); // throw away space
|
try in.discardAll(1); // throw away space
|
||||||
|
|
||||||
|
const nested_pub = try hot_msg_manager.createPub(alloc);
|
||||||
|
errdefer hot_msg_manager.destroyPub(nested_pub);
|
||||||
|
|
||||||
// Parse subject
|
// Parse subject
|
||||||
const subject: []const u8 = try readSubject(alloc, in, .@"pub");
|
nested_pub.*.subject = try readSubject(alloc, in, .@"pub");
|
||||||
errdefer alloc.free(subject);
|
errdefer alloc.free(nested_pub.*.subject);
|
||||||
|
|
||||||
const States = enum {
|
const States = enum {
|
||||||
before_second,
|
before_second,
|
||||||
@@ -759,8 +789,6 @@ fn parseHPub(alloc: Allocator, in: *Reader) !Message {
|
|||||||
defer third.deinit(alloc);
|
defer third.deinit(alloc);
|
||||||
var fourth: ?ArrayList(u8) = null;
|
var fourth: ?ArrayList(u8) = null;
|
||||||
defer if (fourth) |*f| f.deinit(alloc);
|
defer if (fourth) |*f| f.deinit(alloc);
|
||||||
var payload: AllocatingWriter = .init(alloc);
|
|
||||||
errdefer payload.deinit();
|
|
||||||
|
|
||||||
sw: switch (@as(States, .before_second)) {
|
sw: switch (@as(States, .before_second)) {
|
||||||
.before_second => {
|
.before_second => {
|
||||||
@@ -828,7 +856,12 @@ fn parseHPub(alloc: Allocator, in: *Reader) !Message {
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
const reply_to: ?[]const u8, const header_bytes: usize, const total_bytes: usize =
|
var res: Message.HPub = .{
|
||||||
|
.header_bytes = 0,
|
||||||
|
.@"pub" = nested_pub,
|
||||||
|
};
|
||||||
|
|
||||||
|
res.@"pub".*.reply_to, res.header_bytes, res.@"pub".*.payload_len =
|
||||||
if (fourth) |f| .{
|
if (fourth) |f| .{
|
||||||
try alloc.dupe(u8, second.items),
|
try alloc.dupe(u8, second.items),
|
||||||
try parseUnsigned(usize, third.items, 10),
|
try parseUnsigned(usize, third.items, 10),
|
||||||
@@ -839,19 +872,17 @@ fn parseHPub(alloc: Allocator, in: *Reader) !Message {
|
|||||||
try parseUnsigned(usize, third.items, 10),
|
try parseUnsigned(usize, third.items, 10),
|
||||||
};
|
};
|
||||||
|
|
||||||
try in.streamExact(&payload.writer, total_bytes);
|
try in.readSliceAll(res.@"pub".*.payload[0..@min(res.@"pub".*.payload.len, res.@"pub".*.payload_len)]);
|
||||||
|
{
|
||||||
|
const remaining_payload_bytes = res.@"pub".*.payload_len - res.@"pub".*.payload.len;
|
||||||
|
var payload: AllocatingWriter = .init(alloc);
|
||||||
|
errdefer payload.deinit();
|
||||||
|
try in.streamExact(&payload.writer, remaining_payload_bytes);
|
||||||
|
res.@"pub".*.payload_extra = try payload.toOwnedSlice();
|
||||||
|
}
|
||||||
try expectStreamBytes(in, "\r\n");
|
try expectStreamBytes(in, "\r\n");
|
||||||
|
|
||||||
return .{
|
return .{ .hpub = res };
|
||||||
.hpub = .{
|
|
||||||
.header_bytes = header_bytes,
|
|
||||||
.@"pub" = .{
|
|
||||||
.subject = subject,
|
|
||||||
.payload = try payload.toOwnedSlice(),
|
|
||||||
.reply_to = reply_to,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
};
|
|
||||||
}
|
}
|
||||||
|
|
||||||
test parseHPub {
|
test parseHPub {
|
||||||
@@ -976,6 +1007,44 @@ inline fn expectStreamBytes(reader: *Reader, expected: []const u8) !void {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub const HotMessageManager = struct {
|
||||||
|
pub_pool_lock: Mutex = .init,
|
||||||
|
pub_pool: std.heap.MemoryPool(Message.Pub) = .empty,
|
||||||
|
msg_pool_lock: Mutex = .init,
|
||||||
|
msg_pool: std.heap.MemoryPool(Message.Msg) = .empty,
|
||||||
|
// used for locking
|
||||||
|
io: Io,
|
||||||
|
|
||||||
|
pub fn deinit(self: *HotMessageManager, alloc: Allocator) void {
|
||||||
|
self.pub_pool.deinit(alloc);
|
||||||
|
self.msg_pool.deinit(alloc);
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn createPub(self: *HotMessageManager, alloc: Allocator) !*Message.Pub {
|
||||||
|
try self.pub_pool_lock.lock(self.io);
|
||||||
|
defer self.pub_pool_lock.unlock(self.io);
|
||||||
|
return self.pub_pool.create(alloc);
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn destroyPub(self: *HotMessageManager, msg: *Message.Pub) void {
|
||||||
|
self.pub_pool_lock.lockUncancelable(self.io);
|
||||||
|
defer self.pub_pool_lock.unlock(self.io);
|
||||||
|
self.pub_pool.destroy(msg);
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn createMsg(self: *HotMessageManager, alloc: Allocator) !*Message.Msg {
|
||||||
|
try self.msg_pool_lock.lock(self.io);
|
||||||
|
defer self.msg_pool_lock.unlock(self.io);
|
||||||
|
return self.msg_pool.create(alloc);
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn destroyMsg(self: *HotMessageManager, msg: *Message.Msg) void {
|
||||||
|
self.msg_pool_lock.lockUncancelable(self.io);
|
||||||
|
defer self.msg_pool_lock.unlock(self.io);
|
||||||
|
self.msg_pool.destroy(msg);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
test "parsing a stream" {
|
test "parsing a stream" {
|
||||||
const alloc = std.testing.allocator;
|
const alloc = std.testing.allocator;
|
||||||
const expectEqualDeep = std.testing.expectEqualDeep;
|
const expectEqualDeep = std.testing.expectEqualDeep;
|
||||||
|
|||||||
Reference in New Issue
Block a user