mirror of
https://git.robbyzambito.me/zits
synced 2026-02-04 03:34:48 +00:00
WAY FASTER but doesn't send all?
Seems to not flush the last message
This commit is contained in:
141
src/Server.zig
141
src/Server.zig
@@ -13,10 +13,11 @@ const Stream = std.Io.net.Stream;
|
||||
|
||||
pub const Client = @import("./Server/Client.zig");
|
||||
|
||||
pub const parse = @import("./Server/parse.zig");
|
||||
pub const message = @import("./Server/message.zig");
|
||||
const parse = message.parse;
|
||||
|
||||
const MessageType = parse.MessageType;
|
||||
const Message = parse.Message;
|
||||
const MessageType = message.Control;
|
||||
const Message = message.Message;
|
||||
const ServerInfo = Message.ServerInfo;
|
||||
|
||||
const Msgs = Client.Msgs;
|
||||
@@ -29,9 +30,8 @@ const Subscription = struct {
|
||||
client_id: usize,
|
||||
sid: []const u8,
|
||||
queue_group: ?[]const u8,
|
||||
queue: *Queue(Msgs),
|
||||
// used to alloc messages in the queue
|
||||
alloc: Allocator,
|
||||
queue_lock: *Mutex,
|
||||
queue: *Queue(u8),
|
||||
|
||||
fn deinit(self: Subscription, alloc: Allocator) void {
|
||||
alloc.free(self.subject);
|
||||
@@ -168,66 +168,57 @@ fn handleConnection(
|
||||
const in = &reader.interface;
|
||||
|
||||
// Set up buffer queue
|
||||
const qbuf: []Message = try alloc.alloc(Message, 16);
|
||||
const qbuf: []u8 = try alloc.alloc(u8, r_buf_size);
|
||||
defer alloc.free(qbuf);
|
||||
var recv_queue: Queue(Message) = .init(qbuf);
|
||||
var recv_queue: Queue(u8) = .init(qbuf);
|
||||
defer recv_queue.close(io);
|
||||
|
||||
const mbuf: []Msgs = try alloc.alloc(Msgs, w_buf_size / @sizeOf(Msgs));
|
||||
defer alloc.free(mbuf);
|
||||
var msgs_queue: Queue(Msgs) = .init(mbuf);
|
||||
defer {
|
||||
msgs_queue.close(io);
|
||||
while (msgs_queue.getOne(io)) |msg| {
|
||||
switch (msg) {
|
||||
.MSG => |m| m.deinit(alloc),
|
||||
.HMSG => |h| h.deinit(alloc),
|
||||
}
|
||||
} else |_| {}
|
||||
}
|
||||
|
||||
// Create client
|
||||
var client: Client = .init(null, alloc, &recv_queue, &msgs_queue, in, out);
|
||||
var client: Client = .init(null, &recv_queue, in, out);
|
||||
defer client.deinit(server_allocator);
|
||||
|
||||
try server.addClient(server_allocator, id, &client);
|
||||
defer server.removeClient(io, server_allocator, id);
|
||||
|
||||
// Do initial handshake with client
|
||||
// try recv_queue.putOne(io, .PONG);
|
||||
try recv_queue.putOne(io, .{ .INFO = server.info });
|
||||
_ = try out.write("INFO ");
|
||||
try std.json.Stringify.value(server.info, .{}, out);
|
||||
_ = try out.write("\r\n");
|
||||
try out.flush();
|
||||
|
||||
var client_task = try io.concurrent(Client.start, .{ &client, io });
|
||||
defer client_task.cancel(io) catch {};
|
||||
|
||||
while (client.next(server_allocator)) |ctrl| {
|
||||
while (client.next()) |ctrl| {
|
||||
switch (ctrl) {
|
||||
.PING => {
|
||||
// Respond to ping with pong.
|
||||
try client.recv_queue_write_lock.lock(io);
|
||||
defer client.recv_queue_write_lock.unlock(io);
|
||||
try client.send(io, "PONG\r\n");
|
||||
_ = try client.from_client.take(2);
|
||||
try client.recv_queue.putAll(io, "PONG\r\n");
|
||||
// try client.send(io, "PONG\r\n");
|
||||
},
|
||||
.PUB => {
|
||||
@branchHint(.likely);
|
||||
try server.publishMessage(io, server_allocator, &client, msg);
|
||||
// log.debug("received a pub msg", .{});
|
||||
try server.publishMessage(io, server_allocator, &client, .@"pub");
|
||||
},
|
||||
.HPUB => {
|
||||
@branchHint(.likely);
|
||||
try server.publishMessage(io, server_allocator, &client, msg);
|
||||
try server.publishMessage(io, server_allocator, &client, .hpub);
|
||||
},
|
||||
.SUB => {
|
||||
try server.subscribe(io, server_allocator, client, id, sub);
|
||||
try server.subscribe(io, server_allocator, &client, id);
|
||||
},
|
||||
.UNSUB => {
|
||||
defer unsub.deinit(server_allocator);
|
||||
try server.unsubscribe(io, server_allocator, id, unsub);
|
||||
try server.unsubscribe(io, server_allocator, client, id);
|
||||
},
|
||||
.CONNECT => {
|
||||
if (client.connect) |*current| {
|
||||
current.deinit(server_allocator);
|
||||
}
|
||||
client.connect = connect;
|
||||
client.connect = try parse.connect(server_allocator, client.from_client);
|
||||
},
|
||||
else => |e| {
|
||||
panic("Unimplemented message: {any}\n", .{e});
|
||||
@@ -279,19 +270,26 @@ fn publishMessage(
|
||||
io: Io,
|
||||
alloc: Allocator,
|
||||
source_client: *Client,
|
||||
msg: Message,
|
||||
comptime pub_or_hpub: enum { @"pub", hpub },
|
||||
) !void {
|
||||
defer if (source_client.connect) |c| {
|
||||
if (c.verbose) {
|
||||
source_client.send(io, .@"+OK") catch {};
|
||||
if (source_client.recv_queue_write_lock.lock(io)) |_| {
|
||||
defer source_client.recv_queue_write_lock.unlock(io);
|
||||
source_client.recv_queue.putAll(io, "+OK\r\n") catch {};
|
||||
} else |_| {}
|
||||
}
|
||||
};
|
||||
|
||||
const subject = switch (msg) {
|
||||
.PUB => |pb| pb.subject,
|
||||
.HPUB => |hp| hp.@"pub".subject,
|
||||
else => unreachable,
|
||||
};
|
||||
_ = pub_or_hpub;
|
||||
|
||||
const msg = try parse.@"pub"(source_client.from_client);
|
||||
|
||||
// const subject = switch (pub_or_hpub) {
|
||||
// .PUB => |pb| pb.subject,
|
||||
// .HPUB => |hp| hp.@"pub".subject,
|
||||
// else => unreachable,
|
||||
// };
|
||||
try server.subs_lock.lock(io);
|
||||
defer server.subs_lock.unlock(io);
|
||||
var published_queue_groups: ArrayList([]const u8) = .empty;
|
||||
@@ -301,7 +299,7 @@ fn publishMessage(
|
||||
|
||||
subs: for (0..server.subscriptions.items.len) |i| {
|
||||
const subscription = server.subscriptions.items[i];
|
||||
if (subjectMatches(subscription.subject, subject)) {
|
||||
if (subjectMatches(subscription.subject, msg.subject)) {
|
||||
if (subscription.queue_group) |sg| {
|
||||
for (published_queue_groups.items) |g| {
|
||||
if (eql(u8, g, sg)) {
|
||||
@@ -314,19 +312,46 @@ fn publishMessage(
|
||||
// to prioritize other subscriptions in the queue next time.
|
||||
try published_queue_sub_idxs.append(alloc, i);
|
||||
}
|
||||
switch (msg) {
|
||||
.PUB => |pb| {
|
||||
try subscription.queue.putOne(io, .{
|
||||
.MSG = try pb.toMsg(subscription.alloc, subscription.sid),
|
||||
});
|
||||
|
||||
const m = msg.toMsg(subscription.sid);
|
||||
var msg_line_buf: [1024]u8 = undefined;
|
||||
var msg_line_writer: std.Io.Writer = .fixed(&msg_line_buf);
|
||||
|
||||
// try self.to_client.print(
|
||||
// ,
|
||||
|
||||
// );
|
||||
// try m.payload.write(self.to_client);
|
||||
// try self.to_client.print("\r\n", .{});
|
||||
try msg_line_writer.print(
|
||||
"MSG {s} {s} {s} {d}\r\n",
|
||||
.{
|
||||
m.subject,
|
||||
m.sid,
|
||||
m.reply_to orelse "",
|
||||
m.payload.len,
|
||||
},
|
||||
.HPUB => |hp| {
|
||||
try subscription.queue.putOne(io, .{
|
||||
.HMSG = try hp.toHMsg(subscription.alloc, subscription.sid),
|
||||
});
|
||||
},
|
||||
else => unreachable,
|
||||
}
|
||||
);
|
||||
|
||||
try subscription.queue_lock.lock(io);
|
||||
defer subscription.queue_lock.unlock(io);
|
||||
try subscription.queue.putAll(io, msg_line_writer.buffered());
|
||||
try subscription.queue.putAll(io, m.payload);
|
||||
try subscription.queue.putAll(io, "\r\n");
|
||||
|
||||
// switch (msg) {
|
||||
// .PUB => |pb| {
|
||||
// try subscription.queue.putOne(io, .{
|
||||
// .MSG = try pb.toMsg(subscription.alloc, subscription.sid),
|
||||
// });
|
||||
// },
|
||||
// .HPUB => |hp| {
|
||||
// try subscription.queue.putOne(io, .{
|
||||
// .HMSG = try hp.toHMsg(subscription.alloc, subscription.sid),
|
||||
// });
|
||||
// },
|
||||
// else => unreachable,
|
||||
// }
|
||||
}
|
||||
}
|
||||
|
||||
@@ -340,10 +365,11 @@ fn subscribe(
|
||||
server: *Server,
|
||||
io: Io,
|
||||
gpa: Allocator,
|
||||
client: Client,
|
||||
client: *Client,
|
||||
id: usize,
|
||||
msg: Message.Sub,
|
||||
// msg: Message.Sub,
|
||||
) !void {
|
||||
const msg = try parse.sub(client.from_client);
|
||||
try server.subs_lock.lock(io);
|
||||
defer server.subs_lock.unlock(io);
|
||||
const subject = try gpa.dupe(u8, msg.subject);
|
||||
@@ -357,8 +383,8 @@ fn subscribe(
|
||||
.client_id = id,
|
||||
.sid = sid,
|
||||
.queue_group = queue_group,
|
||||
.queue = client.msg_queue,
|
||||
.alloc = client.alloc,
|
||||
.queue_lock = &client.recv_queue_write_lock,
|
||||
.queue = client.recv_queue,
|
||||
});
|
||||
}
|
||||
|
||||
@@ -366,9 +392,10 @@ fn unsubscribe(
|
||||
server: *Server,
|
||||
io: Io,
|
||||
gpa: Allocator,
|
||||
client: Client,
|
||||
id: usize,
|
||||
msg: Message.Unsub,
|
||||
) !void {
|
||||
const msg = try parse.unsub(client.from_client);
|
||||
try server.subs_lock.lock(io);
|
||||
defer server.subs_lock.unlock(io);
|
||||
const len = server.subscriptions.items.len;
|
||||
|
||||
Reference in New Issue
Block a user