Support subject patterns

clean up some tests
This commit is contained in:
2026-01-02 15:19:19 +00:00
parent 539255adb1
commit 2be370e379
2 changed files with 71 additions and 69 deletions

View File

@@ -112,28 +112,17 @@ test {
{ {
// Simulate stream // Simulate stream
while (Message.next(gpa, &from_client)) |msg| { while (Message.next(gpa, &from_client)) |msg| {
switch (msg) {
.eos => {
try from_client_queue.putOne(io, msg); try from_client_queue.putOne(io, msg);
break;
},
else => {
try from_client_queue.putOne(io, msg);
},
}
} else |err| switch (err) { } else |err| switch (err) {
error.EndOfStream => try from_client_queue.close(io), error.EndOfStream => from_client_queue.close(io),
else => return err, else => return err,
} }
while (from_client_queue.getOne(io)) |msg| { while (from_client_queue.getOne(io)) |msg| {
switch (msg) { switch (msg) {
.eos => {
break;
},
.connect => |*c| { .connect => |*c| {
std.debug.print("Message: {any}\n", .{msg}); std.debug.print("Message: {any}\n", .{msg});
c.deinit(); c.deinit(gpa);
}, },
else => { else => {
std.debug.print("Message: {any}\n", .{msg}); std.debug.print("Message: {any}\n", .{msg});
@@ -146,63 +135,47 @@ test {
// Reset the reader to process it again. // Reset the reader to process it again.
from_client.seek = 0; from_client.seek = 0;
{ // {
const SemiClient = struct { // const SemiClient = struct {
q: std.Io.Queue(Message), // q: std.Io.Queue(Message),
fn parseClientInput(self: *@This(), ioh: std.Io, in: *std.Io.Reader) void { // fn parseClientInput(self: *@This(), ioh: std.Io, in: *std.Io.Reader) void {
defer std.debug.print("done parse\n", .{}); // defer std.debug.print("done parse\n", .{});
while (Message.next(gpa, in)) |msg| { // while (Message.next(gpa, in)) |msg| {
switch (msg) { // self.q.putOne(ioh, msg) catch return;
.eos => { // } else |_| {}
self.q.putOne(ioh, msg) catch return; // }
break;
},
else => {
self.q.putOne(ioh, msg) catch return;
},
}
} else |_| {}
}
fn next(self: *@This(), ioh: std.Io) !Message { // fn next(self: *@This(), ioh: std.Io) !Message {
return self.q.getOne(ioh); // return self.q.getOne(ioh);
} // }
fn printAll(self: *@This(), ioh: std.Io) void { // fn printAll(self: *@This(), ioh: std.Io) void {
defer std.debug.print("done print\n", .{}); // defer std.debug.print("done print\n", .{});
while (self.next(ioh)) |*msg| { // while (self.next(ioh)) |*msg| {
switch (msg.*) { // std.debug.print("Client msg: {any}\n", .{msg});
.eos => |_| { // switch (msg.*) {
break; // .connect => |c| {
}, // c.deinit(gpa);
else => {}, // },
} // else => {},
std.debug.print("Client msg: {any}\n", .{msg}); // }
switch (msg.*) { // } else |_| {}
.connect => |c| { // }
// c.allocator.deinit(); // };
c.deinit();
// @constCast(c).deinit();
},
else => {},
}
} else |_| {}
}
};
var c: SemiClient = .{ .q = from_client_queue }; // var c: SemiClient = .{ .q = from_client_queue };
var group: std.Io.Group = .init; // var group: std.Io.Group = .init;
defer group.wait(io); // defer group.wait(io);
group.concurrent(io, SemiClient.printAll, .{ &c, io }) catch { // group.concurrent(io, SemiClient.printAll, .{ &c, io }) catch {
@panic("could not start printAll\n"); // @panic("could not start printAll\n");
}; // };
group.concurrent(io, SemiClient.parseClientInput, .{ &c, io, &from_client }) catch { // group.concurrent(io, SemiClient.parseClientInput, .{ &c, io, &from_client }) catch {
@panic("could not start printAll\n"); // @panic("could not start printAll\n");
}; // };
} // }
//////// ////////

View File

@@ -83,7 +83,7 @@ pub fn start(server: *Server, io: std.Io, gpa: std.mem.Allocator) !void {
server.info.port, server.info.port,
), io, .{ .reuse_address = true }); ), io, .{ .reuse_address = true });
defer tcp_server.deinit(io); defer tcp_server.deinit(io);
std.log.info("Server listening on {s}:{d}", .{server.info.host, server.info.port}); std.log.info("Server listening on {s}:{d}", .{ server.info.host, server.info.port });
var client_group: std.Io.Group = .init; var client_group: std.Io.Group = .init;
defer client_group.cancel(io); defer client_group.cancel(io);
@@ -147,12 +147,12 @@ fn handleConnection(
//const allocator = if (builtin.mode == .Debug or builtin.mode == .ReleaseSafe) client_allocator.allocator() else server_allocator; //const allocator = if (builtin.mode == .Debug or builtin.mode == .ReleaseSafe) client_allocator.allocator() else server_allocator;
// Set up client writer // Set up client writer
var w_buffer: [1024*16]u8 = undefined; var w_buffer: [1024 * 16]u8 = undefined;
var writer = stream.writer(io, &w_buffer); var writer = stream.writer(io, &w_buffer);
const out = &writer.interface; const out = &writer.interface;
// Set up client reader // Set up client reader
var r_buffer: [1024*16]u8 = undefined; var r_buffer: [1024 * 16]u8 = undefined;
var reader = stream.reader(io, &r_buffer); var reader = stream.reader(io, &r_buffer);
const in = &reader.interface; const in = &reader.interface;
@@ -216,7 +216,36 @@ fn handleConnection(
} }
fn subjectMatches(sub_subject: []const u8, pub_subject: []const u8) bool { fn subjectMatches(sub_subject: []const u8, pub_subject: []const u8) bool {
return std.mem.eql(u8, sub_subject, pub_subject); // TODO: assert that sub_subject and pub_subject are valid.
var sub_iter = std.mem.splitScalar(u8, sub_subject, '.');
var pub_iter = std.mem.splitScalar(u8, pub_subject, '.');
while (sub_iter.next()) |st| {
const pt = pub_iter.next() orelse return false;
if (std.mem.eql(u8, st, ">")) return true;
if (!std.mem.eql(u8, st, "*") and !std.mem.eql(u8, st, pt)) {
return false;
}
}
return pub_iter.next() == null;
}
test subjectMatches {
try std.testing.expect(subjectMatches("foo", "foo"));
try std.testing.expect(!subjectMatches("foo", "bar"));
try std.testing.expect(subjectMatches("foo.*", "foo.bar"));
try std.testing.expect(!subjectMatches("foo.*", "foo"));
try std.testing.expect(!subjectMatches("foo.>", "foo"));
// the wildcard subscriptions foo.*.quux and foo.> both match foo.bar.quux, but only the latter matches foo.bar.baz.
try std.testing.expect(subjectMatches("foo.*.quux", "foo.bar.quux"));
try std.testing.expect(subjectMatches("foo.>", "foo.bar.quux"));
try std.testing.expect(!subjectMatches("foo.*.quux", "foo.bar.baz"));
try std.testing.expect(subjectMatches("foo.>", "foo.bar.baz"));
} }
fn publishMessage(server: *Server, io: std.Io, alloc: std.mem.Allocator, source_client: *Client, msg: Message.Pub) !void { fn publishMessage(server: *Server, io: std.Io, alloc: std.mem.Allocator, source_client: *Client, msg: Message.Pub) !void {