This commit is contained in:
2026-01-02 16:01:35 +00:00
parent 2be370e379
commit cd5281030e
4 changed files with 295 additions and 54 deletions

View File

@@ -57,6 +57,7 @@ pub fn main() !void {
.server_name = zits.Server.createName(), .server_name = zits.Server.createName(),
.version = "zits-master", .version = "zits-master",
.max_payload = 1048576, .max_payload = 1048576,
.headers = true,
}; };
if (serve_matches.getSingleValue("port")) |port| { if (serve_matches.getSingleValue("port")) |port| {
info.port = std.fmt.parseUnsigned(@TypeOf(info.port), port, 10) catch |err| std.process.fatal("Could not parse port ({s}): {}\n", .{ port, err }); info.port = std.fmt.parseUnsigned(@TypeOf(info.port), port, 10) catch |err| std.process.fatal("Could not parse port ({s}): {}\n", .{ port, err });

View File

@@ -76,6 +76,17 @@ pub fn start(self: *Client, io: std.Io, alloc: std.mem.Allocator, queue: *std.Io
}, },
); );
}, },
.hmsg => |hmsg| {
std.log.debug("Sending hmsg: {any}", .{hmsg});
try self.to_client.print("HMSG {s} {s} {s} {d} {d}\r\n{s}\r\n", .{
hmsg.msg.subject,
hmsg.msg.sid,
hmsg.msg.reply_to orelse "",
hmsg.header_bytes,
hmsg.msg.payload.len,
hmsg.msg.payload,
});
},
.@"-err" => |s| { .@"-err" => |s| {
_ = try self.to_client.print("-ERR '{s}'\r\n", .{s}); _ = try self.to_client.print("-ERR '{s}'\r\n", .{s});
}, },

View File

@@ -83,6 +83,10 @@ pub fn start(server: *Server, io: std.Io, gpa: std.mem.Allocator) !void {
server.info.port, server.info.port,
), io, .{ .reuse_address = true }); ), io, .{ .reuse_address = true });
defer tcp_server.deinit(io); defer tcp_server.deinit(io);
std.log.debug("Server headers: {s}", .{if (server.info.headers) "true" else "false"});
std.log.debug("Server max payload: {d}", .{server.info.max_payload});
std.log.info("Server ID: {s}", .{server.info.server_id});
std.log.info("Server name: {s}", .{server.info.server_name});
std.log.info("Server listening on {s}:{d}", .{ server.info.host, server.info.port }); std.log.info("Server listening on {s}:{d}", .{ server.info.host, server.info.port });
var client_group: std.Io.Group = .init; var client_group: std.Io.Group = .init;
@@ -185,9 +189,13 @@ fn handleConnection(
// Respond to ping with pong. // Respond to ping with pong.
try client.send(io, .pong); try client.send(io, .pong);
}, },
.@"pub" => |pb| { .@"pub", .hpub => {
defer pb.deinit(server_allocator); defer switch (msg) {
try server.publishMessage(io, server_allocator, &client, pb); .@"pub" => |pb| pb.deinit(server_allocator),
.hpub => |hp| hp.deinit(server_allocator),
else => unreachable,
};
try server.publishMessage(io, server_allocator, &client, msg);
}, },
.sub => |sub| { .sub => |sub| {
try server.subscribe(io, server_allocator, id, sub); try server.subscribe(io, server_allocator, id, sub);
@@ -248,7 +256,7 @@ test subjectMatches {
try std.testing.expect(subjectMatches("foo.>", "foo.bar.baz")); try std.testing.expect(subjectMatches("foo.>", "foo.bar.baz"));
} }
fn publishMessage(server: *Server, io: std.Io, alloc: std.mem.Allocator, source_client: *Client, msg: Message.Pub) !void { fn publishMessage(server: *Server, io: std.Io, alloc: std.mem.Allocator, source_client: *Client, msg: Message) !void {
errdefer { errdefer {
if (source_client.connect) |c| { if (source_client.connect) |c| {
if (c.verbose) { if (c.verbose) {
@@ -256,20 +264,36 @@ fn publishMessage(server: *Server, io: std.Io, alloc: std.mem.Allocator, source_
} }
} }
} }
const subject = switch (msg) {
.@"pub" => |pb| pb.subject,
.hpub => |hp| hp.@"pub".subject,
else => unreachable,
};
try server.subs_lock.lock(io); try server.subs_lock.lock(io);
defer server.subs_lock.unlock(io); defer server.subs_lock.unlock(io);
for (server.subscriptions.items) |subscription| { for (server.subscriptions.items) |subscription| {
if (subjectMatches(subscription.subject, msg.subject)) { if (subjectMatches(subscription.subject, subject)) {
const client = server.clients.get(subscription.client_id) orelse { const client = server.clients.get(subscription.client_id) orelse {
std.debug.print("trying to publish to a client that no longer exists: {d}\n", .{subscription.client_id}); std.debug.print("trying to publish to a client that no longer exists: {d}\n", .{subscription.client_id});
continue; continue;
}; };
client.send(io, .{
.msg = try msg.toMsg(alloc, subscription.sid), switch (msg) {
.@"pub" => |pb| client.send(io, .{
.msg = try pb.toMsg(alloc, subscription.sid),
}) catch |err| switch (err) { }) catch |err| switch (err) {
error.Canceled => return err, error.Canceled => return err,
else => {}, else => {},
}; },
.hpub => |hp| client.send(io, .{ .hmsg = try hp.toHMsg(
alloc,
subscription.sid,
) }) catch |err| switch (err) {
error.Canceled => return err,
else => {},
},
else => unreachable,
}
} }
} }
if (source_client.connect) |c| { if (source_client.connect) |c| {

View File

@@ -35,11 +35,11 @@ pub const Message = union(MessageType) {
info: ServerInfo, info: ServerInfo,
connect: Connect, connect: Connect,
@"pub": Pub, @"pub": Pub,
hpub: void, hpub: HPub,
sub: Sub, sub: Sub,
unsub: Unsub, unsub: Unsub,
msg: Msg, msg: Msg,
hmsg: void, hmsg: HMsg,
ping, ping,
pong, pong,
@"+ok": void, @"+ok": void,
@@ -148,6 +148,36 @@ pub const Message = union(MessageType) {
return res.dupe(alloc); return res.dupe(alloc);
} }
}; };
pub const HPub = struct {
header_bytes: usize,
@"pub": Pub,
pub fn deinit(self: HPub, alloc: std.mem.Allocator) void {
self.@"pub".deinit(alloc);
}
pub fn toHMsg(self: HPub, alloc: std.mem.Allocator, sid: []const u8) !HMsg {
return .{
.header_bytes = self.header_bytes,
.msg = try self.@"pub".toMsg(alloc, sid),
};
}
};
pub const HMsg = struct {
header_bytes: usize,
msg: Msg,
pub fn deinit(self: HMsg, alloc: std.mem.Allocator) void {
self.msg.deinit(alloc);
}
pub fn dupe(self: HMsg, alloc: std.mem.Allocator) !HMsg {
var res = self;
res.msg = try self.msg.dupe(alloc);
return res;
}
};
pub const Sub = struct { pub const Sub = struct {
/// The subject name to subscribe to. /// The subject name to subscribe to.
subject: []const u8, subject: []const u8,
@@ -235,6 +265,7 @@ pub const Message = union(MessageType) {
return error.InvalidOperation; return error.InvalidOperation;
}; };
std.log.debug("parsing {s}", .{operation_string.items});
switch (operation) { switch (operation) {
.connect => { .connect => {
// for storing the json string // for storing the json string
@@ -265,47 +296,10 @@ pub const Message = union(MessageType) {
return .{ .connect = try res.dupe(alloc) }; return .{ .connect = try res.dupe(alloc) };
}, },
.@"pub" => { .@"pub" => {
try in.discardAll(1); // throw away space return parsePub(alloc, in);
// Parse subject
const subject: []const u8 = try readSubject(alloc, in);
errdefer alloc.free(subject);
// Parse byte count
const byte_count = blk: {
var byte_count_list: std.ArrayList(u8) = try .initCapacity(alloc, 64);
defer byte_count_list.deinit(alloc);
while (in.peekByte()) |byte| {
if (std.ascii.isWhitespace(byte)) {
try expectStreamBytes(in, "\r\n");
break;
}
defer in.toss(1);
if (std.ascii.isDigit(byte)) {
try byte_count_list.append(alloc, byte);
} else {
return error.InvalidStream;
}
} else |err| return err;
break :blk try std.fmt.parseUnsigned(usize, byte_count_list.items, 10);
};
const payload = blk: {
const bytes = try alloc.alloc(u8, byte_count);
errdefer alloc.free(bytes);
try in.readSliceAll(bytes);
try expectStreamBytes(in, "\r\n");
break :blk bytes;
};
return .{
.@"pub" = .{
.subject = subject,
.payload = payload,
}, },
}; .hpub => {
return parseHPub(alloc, in);
}, },
.ping => { .ping => {
try expectStreamBytes(in, "\r\n"); try expectStreamBytes(in, "\r\n");
@@ -414,6 +408,217 @@ pub const Message = union(MessageType) {
} }
}; };
fn parsePub(alloc: std.mem.Allocator, in: *std.Io.Reader) !Message {
try in.discardAll(1); // throw away space
// Parse subject
const subject: []const u8 = try readSubject(alloc, in);
errdefer alloc.free(subject);
const second = blk: {
// Drop whitespace
while (in.peekByte()) |byte| {
if (std.ascii.isWhitespace(byte)) {
in.toss(1);
} else break;
} else |err| return err;
var acc: std.ArrayList(u8) = .empty;
errdefer acc.deinit(alloc);
while (in.peekByte()) |byte| {
if (std.ascii.isWhitespace(byte)) break;
try acc.append(alloc, byte);
in.toss(1);
} else |err| return err;
break :blk try acc.toOwnedSlice(alloc);
};
defer alloc.free(second);
const byte_count: usize, const reply_to: ?[]const u8 =
if (std.fmt.parseUnsigned(usize, second, 10)) |s| blk: {
try expectStreamBytes(in, "\r\n");
break :blk .{ s, null };
} else |_| .{
blk: {
var byte_count_list: std.ArrayList(u8) = .empty;
defer byte_count_list.deinit(alloc);
try in.discardAll(1); // discard space
while (in.peekByte()) |byte| {
if (std.ascii.isWhitespace(byte)) {
try expectStreamBytes(in, "\r\n");
break;
}
defer in.toss(1);
if (std.ascii.isDigit(byte)) {
try byte_count_list.append(alloc, byte);
} else {
return error.InvalidStream;
}
} else |err| return err;
break :blk try std.fmt.parseUnsigned(usize, byte_count_list.items, 10);
},
try alloc.dupe(u8, second),
};
const payload = blk: {
const bytes = try alloc.alloc(u8, byte_count);
errdefer alloc.free(bytes);
try in.readSliceAll(bytes);
try expectStreamBytes(in, "\r\n");
break :blk bytes;
};
return .{
.@"pub" = .{
.subject = subject,
.payload = payload,
.reply_to = reply_to,
},
};
}
test parsePub {
{
var in: std.Io.Reader = .fixed(" foo 3\r\nbar\r\n");
var res = try parsePub(std.testing.allocator, &in);
defer res.@"pub".deinit(std.testing.allocator);
try std.testing.expectEqualDeep(
Message{ .@"pub" = .{
.subject = "foo",
.reply_to = null,
.payload = "bar",
} },
res,
);
}
{
var in: std.Io.Reader = .fixed(" foo reply.to 3\r\nbar\r\n");
var res = try parsePub(std.testing.allocator, &in);
defer res.@"pub".deinit(std.testing.allocator);
try std.testing.expectEqualDeep(
Message{ .@"pub" = .{
.subject = "foo",
.reply_to = "reply.to",
.payload = "bar",
} },
res,
);
}
// numeric reply subject
{
var in: std.Io.Reader = .fixed(" foo 5 3\r\nbar\r\n");
var res = try parsePub(std.testing.allocator, &in);
defer res.@"pub".deinit(std.testing.allocator);
try std.testing.expectEqualDeep(
Message{ .@"pub" = .{
.subject = "foo",
.reply_to = "reply.to",
.payload = "bar",
} },
res,
);
}
}
fn parseHPub(alloc: std.mem.Allocator, in: *std.Io.Reader) !Message {
try in.discardAll(1); // throw away space
// Parse subject
const subject: []const u8 = try readSubject(alloc, in);
errdefer alloc.free(subject);
const second = blk: {
// Drop whitespace
while (in.peekByte()) |byte| {
if (std.ascii.isWhitespace(byte)) {
in.toss(1);
} else break;
} else |err| return err;
var acc: std.ArrayList(u8) = .empty;
errdefer acc.deinit(alloc);
while (in.peekByte()) |byte| {
if (std.ascii.isWhitespace(byte)) break;
try acc.append(alloc, byte);
in.toss(1);
} else |err| return err;
break :blk try acc.toOwnedSlice(alloc);
};
errdefer alloc.free(second);
const header_byte_count: usize, const reply_to: ?[]const u8 =
if (std.fmt.parseUnsigned(usize, second, 10)) |s| blk: {
try expectStreamBytes(in, "\r\n");
break :blk .{ s, null };
} else |_| .{
blk: {
var byte_count_list: std.ArrayList(u8) = .empty;
defer byte_count_list.deinit(alloc);
try in.discardAll(1); // discard space
while (in.peekByte()) |byte| {
if (std.ascii.isWhitespace(byte)) {
try expectStreamBytes(in, "\r\n");
break;
}
defer in.toss(1);
if (std.ascii.isDigit(byte)) {
try byte_count_list.append(alloc, byte);
} else {
return error.InvalidStream;
}
} else |err| return err;
break :blk try std.fmt.parseUnsigned(usize, byte_count_list.items, 10);
},
second,
};
std.log.debug("buffered: '{s}'", .{in.buffered()});
// Parse byte count
const byte_count = blk: {
var byte_count_list: std.ArrayList(u8) = .empty;
defer byte_count_list.deinit(alloc);
while (in.peekByte()) |byte| {
if (std.ascii.isWhitespace(byte)) {
try expectStreamBytes(in, "\r\n");
break;
}
defer in.toss(1);
if (std.ascii.isDigit(byte)) {
try byte_count_list.append(alloc, byte);
} else {
return error.InvalidStream;
}
} else |err| return err;
break :blk try std.fmt.parseUnsigned(usize, byte_count_list.items, 10);
};
const payload = blk: {
const bytes = try alloc.alloc(u8, byte_count);
errdefer alloc.free(bytes);
try in.readSliceAll(bytes);
try expectStreamBytes(in, "\r\n");
break :blk bytes;
};
return .{
.hpub = .{
.header_bytes = header_byte_count,
.@"pub" = .{
.subject = subject,
.payload = payload,
.reply_to = reply_to,
},
},
};
}
fn readSubject(alloc: std.mem.Allocator, in: *std.Io.Reader) ![]const u8 { fn readSubject(alloc: std.mem.Allocator, in: *std.Io.Reader) ![]const u8 {
var subject_list: std.ArrayList(u8) = try .initCapacity(alloc, 1024); var subject_list: std.ArrayList(u8) = try .initCapacity(alloc, 1024);
errdefer subject_list.deinit(alloc); errdefer subject_list.deinit(alloc);