mirror of
https://git.robbyzambito.me/zits
synced 2026-02-04 03:34:48 +00:00
Add Payload type
stores short message buffers in a colocated array, overflowing to an allocated slice when needed.
This commit is contained in:
@@ -72,25 +72,27 @@ pub fn start(self: *Client, io: std.Io, alloc: std.mem.Allocator) !void {
|
|||||||
switch (msg) {
|
switch (msg) {
|
||||||
.MSG => |m| {
|
.MSG => |m| {
|
||||||
try self.to_client.print(
|
try self.to_client.print(
|
||||||
"MSG {s} {s} {s} {d}\r\n{s}\r\n",
|
"MSG {s} {s} {s} {d}\r\n",
|
||||||
.{
|
.{
|
||||||
m.subject,
|
m.subject,
|
||||||
m.sid,
|
m.sid,
|
||||||
m.reply_to orelse "",
|
m.reply_to orelse "",
|
||||||
m.payload.len,
|
m.payload.len,
|
||||||
m.payload,
|
|
||||||
},
|
},
|
||||||
);
|
);
|
||||||
|
try m.payload.write(self.to_client);
|
||||||
|
try self.to_client.print("\r\n", .{});
|
||||||
},
|
},
|
||||||
.HMSG => |hmsg| {
|
.HMSG => |hmsg| {
|
||||||
try self.to_client.print("HMSG {s} {s} {s} {d} {d}\r\n{s}\r\n", .{
|
try self.to_client.print("HMSG {s} {s} {s} {d} {d}\r\n", .{
|
||||||
hmsg.msg.subject,
|
hmsg.msg.subject,
|
||||||
hmsg.msg.sid,
|
hmsg.msg.sid,
|
||||||
hmsg.msg.reply_to orelse "",
|
hmsg.msg.reply_to orelse "",
|
||||||
hmsg.header_bytes,
|
hmsg.header_bytes,
|
||||||
hmsg.msg.payload.len,
|
hmsg.msg.payload.len,
|
||||||
hmsg.msg.payload,
|
|
||||||
});
|
});
|
||||||
|
try hmsg.msg.payload.write(self.to_client);
|
||||||
|
try self.to_client.print("\r\n", .{});
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -158,7 +158,7 @@ fn handleConnection(
|
|||||||
var recv_queue: Queue(Message) = .init(qbuf);
|
var recv_queue: Queue(Message) = .init(qbuf);
|
||||||
defer recv_queue.close(io);
|
defer recv_queue.close(io);
|
||||||
|
|
||||||
const mbuf: []Msgs = try server_allocator.alloc(Msgs, w_buf_size / (@sizeOf(Msgs) + 128));
|
const mbuf: []Msgs = try server_allocator.alloc(Msgs, w_buf_size / @sizeOf(Msgs));
|
||||||
defer server_allocator.free(mbuf);
|
defer server_allocator.free(mbuf);
|
||||||
var msgs_queue: Queue(Msgs) = .init(mbuf);
|
var msgs_queue: Queue(Msgs) = .init(mbuf);
|
||||||
defer {
|
defer {
|
||||||
|
|||||||
@@ -5,7 +5,8 @@ const ArrayList = std.ArrayList;
|
|||||||
const StaticStringMap = std.StaticStringMap;
|
const StaticStringMap = std.StaticStringMap;
|
||||||
|
|
||||||
const Io = std.Io;
|
const Io = std.Io;
|
||||||
const AllocatingWriter = Io.Writer.Allocating;
|
const Writer = Io.Writer;
|
||||||
|
const AllocatingWriter = Writer.Allocating;
|
||||||
const Reader = Io.Reader;
|
const Reader = Io.Reader;
|
||||||
|
|
||||||
const ascii = std.ascii;
|
const ascii = std.ascii;
|
||||||
@@ -17,6 +18,53 @@ const parseUnsigned = std.fmt.parseUnsigned;
|
|||||||
|
|
||||||
const log = std.log;
|
const log = std.log;
|
||||||
|
|
||||||
|
pub const Payload = struct {
|
||||||
|
len: u32,
|
||||||
|
short: [128]u8,
|
||||||
|
long: ?[]u8,
|
||||||
|
|
||||||
|
pub fn read(alloc: Allocator, in: *Reader, bytes: usize) !Payload {
|
||||||
|
var res: Payload = .{
|
||||||
|
.len = @intCast(bytes),
|
||||||
|
.short = undefined,
|
||||||
|
.long = null,
|
||||||
|
};
|
||||||
|
|
||||||
|
try in.readSliceAll(res.short[0..@min(bytes, res.short.len)]);
|
||||||
|
if (bytes > res.short.len) {
|
||||||
|
const long = try alloc.alloc(u8, bytes - res.short.len);
|
||||||
|
errdefer alloc.free(long);
|
||||||
|
try in.readSliceAll(long);
|
||||||
|
res.long = long;
|
||||||
|
}
|
||||||
|
return res;
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn write(self: Payload, out: *Writer) !void {
|
||||||
|
std.debug.assert(out.buffer.len >= self.short.len);
|
||||||
|
std.debug.assert(self.len <= self.short.len or self.long != null);
|
||||||
|
try out.writeAll(self.short[0..@min(self.len, self.short.len)]);
|
||||||
|
if (self.long) |l| {
|
||||||
|
try out.writeAll(l);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn deinit(self: Payload, alloc: Allocator) void {
|
||||||
|
if (self.long) |l| {
|
||||||
|
alloc.free(l);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn dupe(self: Payload, alloc: Allocator) !Payload {
|
||||||
|
var res = self;
|
||||||
|
if (self.long) |l| {
|
||||||
|
res.long = try alloc.dupe(u8, l);
|
||||||
|
}
|
||||||
|
errdefer if (res.long) |l| alloc.free(l);
|
||||||
|
return res;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
pub const MessageType = @typeInfo(Message).@"union".tag_type.?;
|
pub const MessageType = @typeInfo(Message).@"union".tag_type.?;
|
||||||
|
|
||||||
pub const Message = union(enum) {
|
pub const Message = union(enum) {
|
||||||
@@ -118,11 +166,11 @@ pub const Message = union(enum) {
|
|||||||
/// The reply subject that subscribers can use to send a response back to the publisher/requestor.
|
/// The reply subject that subscribers can use to send a response back to the publisher/requestor.
|
||||||
reply_to: ?[]const u8 = null,
|
reply_to: ?[]const u8 = null,
|
||||||
/// The message payload data.
|
/// The message payload data.
|
||||||
payload: []const u8,
|
payload: Payload,
|
||||||
|
|
||||||
pub fn deinit(self: Pub, alloc: Allocator) void {
|
pub fn deinit(self: Pub, alloc: Allocator) void {
|
||||||
alloc.free(self.subject);
|
alloc.free(self.subject);
|
||||||
alloc.free(self.payload);
|
self.payload.deinit(alloc);
|
||||||
if (self.reply_to) |r| alloc.free(r);
|
if (self.reply_to) |r| alloc.free(r);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -163,6 +211,7 @@ pub const Message = union(enum) {
|
|||||||
pub fn dupe(self: HMsg, alloc: Allocator) !HMsg {
|
pub fn dupe(self: HMsg, alloc: Allocator) !HMsg {
|
||||||
var res = self;
|
var res = self;
|
||||||
res.msg = try self.msg.dupe(alloc);
|
res.msg = try self.msg.dupe(alloc);
|
||||||
|
errdefer alloc.free(res.msg);
|
||||||
return res;
|
return res;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
@@ -194,13 +243,13 @@ pub const Message = union(enum) {
|
|||||||
subject: []const u8,
|
subject: []const u8,
|
||||||
sid: []const u8,
|
sid: []const u8,
|
||||||
reply_to: ?[]const u8,
|
reply_to: ?[]const u8,
|
||||||
payload: []const u8,
|
payload: Payload,
|
||||||
|
|
||||||
pub fn deinit(self: Msg, alloc: Allocator) void {
|
pub fn deinit(self: Msg, alloc: Allocator) void {
|
||||||
alloc.free(self.subject);
|
alloc.free(self.subject);
|
||||||
alloc.free(self.sid);
|
alloc.free(self.sid);
|
||||||
if (self.reply_to) |r| alloc.free(r);
|
if (self.reply_to) |r| alloc.free(r);
|
||||||
alloc.free(self.payload);
|
self.payload.deinit(alloc);
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn dupe(self: Msg, alloc: Allocator) !Msg {
|
pub fn dupe(self: Msg, alloc: Allocator) !Msg {
|
||||||
@@ -211,7 +260,7 @@ pub const Message = union(enum) {
|
|||||||
errdefer alloc.free(res.sid);
|
errdefer alloc.free(res.sid);
|
||||||
res.reply_to = if (self.reply_to) |r| try alloc.dupe(u8, r) else null;
|
res.reply_to = if (self.reply_to) |r| try alloc.dupe(u8, r) else null;
|
||||||
errdefer if (res.reply_to) |r| alloc.free(r);
|
errdefer if (res.reply_to) |r| alloc.free(r);
|
||||||
res.payload = try alloc.dupe(u8, self.payload);
|
res.payload = try self.payload.dupe(alloc);
|
||||||
errdefer alloc.free(res.payload);
|
errdefer alloc.free(res.payload);
|
||||||
return res;
|
return res;
|
||||||
}
|
}
|
||||||
@@ -613,8 +662,6 @@ fn parsePub(alloc: Allocator, in: *Reader) !Message {
|
|||||||
defer second.deinit(alloc);
|
defer second.deinit(alloc);
|
||||||
var third: ?ArrayList(u8) = null;
|
var third: ?ArrayList(u8) = null;
|
||||||
defer if (third) |*t| t.deinit(alloc);
|
defer if (third) |*t| t.deinit(alloc);
|
||||||
var payload: AllocatingWriter = .init(alloc);
|
|
||||||
errdefer payload.deinit();
|
|
||||||
|
|
||||||
sw: switch (@as(States, .before_second)) {
|
sw: switch (@as(States, .before_second)) {
|
||||||
.before_second => {
|
.before_second => {
|
||||||
@@ -671,13 +718,14 @@ fn parsePub(alloc: Allocator, in: *Reader) !Message {
|
|||||||
try parseUnsigned(usize, second.items, 10),
|
try parseUnsigned(usize, second.items, 10),
|
||||||
};
|
};
|
||||||
|
|
||||||
try in.streamExact(&payload.writer, bytes);
|
const payload: Payload = try .read(alloc, in, bytes);
|
||||||
|
errdefer payload.deinit(alloc);
|
||||||
try expectStreamBytes(in, "\r\n");
|
try expectStreamBytes(in, "\r\n");
|
||||||
|
|
||||||
return .{
|
return .{
|
||||||
.PUB = .{
|
.PUB = .{
|
||||||
.subject = subject,
|
.subject = subject,
|
||||||
.payload = try payload.toOwnedSlice(),
|
.payload = payload,
|
||||||
.reply_to = reply_to,
|
.reply_to = reply_to,
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
@@ -763,8 +811,6 @@ fn parseHPub(alloc: Allocator, in: *Reader) !Message {
|
|||||||
defer third.deinit(alloc);
|
defer third.deinit(alloc);
|
||||||
var fourth: ?ArrayList(u8) = null;
|
var fourth: ?ArrayList(u8) = null;
|
||||||
defer if (fourth) |*f| f.deinit(alloc);
|
defer if (fourth) |*f| f.deinit(alloc);
|
||||||
var payload: AllocatingWriter = .init(alloc);
|
|
||||||
errdefer payload.deinit();
|
|
||||||
|
|
||||||
sw: switch (@as(States, .before_second)) {
|
sw: switch (@as(States, .before_second)) {
|
||||||
.before_second => {
|
.before_second => {
|
||||||
@@ -843,7 +889,8 @@ fn parseHPub(alloc: Allocator, in: *Reader) !Message {
|
|||||||
try parseUnsigned(usize, third.items, 10),
|
try parseUnsigned(usize, third.items, 10),
|
||||||
};
|
};
|
||||||
|
|
||||||
try in.streamExact(&payload.writer, total_bytes);
|
const payload: Payload = try .read(alloc, in, total_bytes);
|
||||||
|
errdefer payload.deinit(alloc);
|
||||||
try expectStreamBytes(in, "\r\n");
|
try expectStreamBytes(in, "\r\n");
|
||||||
|
|
||||||
return .{
|
return .{
|
||||||
@@ -851,7 +898,7 @@ fn parseHPub(alloc: Allocator, in: *Reader) !Message {
|
|||||||
.header_bytes = header_bytes,
|
.header_bytes = header_bytes,
|
||||||
.@"pub" = .{
|
.@"pub" = .{
|
||||||
.subject = subject,
|
.subject = subject,
|
||||||
.payload = try payload.toOwnedSlice(),
|
.payload = payload,
|
||||||
.reply_to = reply_to,
|
.reply_to = reply_to,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
|||||||
Reference in New Issue
Block a user