mirror of
https://git.robbyzambito.me/zits
synced 2026-02-04 11:44:48 +00:00
Rename ClientState to Client
This commit is contained in:
@@ -1,59 +1,59 @@
|
|||||||
const Message = @import("message_parser.zig").Message;
|
const Message = @import("message_parser.zig").Message;
|
||||||
const std = @import("std");
|
const std = @import("std");
|
||||||
|
|
||||||
pub const ClientState = struct {
|
const Client = @This();
|
||||||
|
|
||||||
|
connect: ?Message.Connect,
|
||||||
|
|
||||||
|
write_lock: std.Io.Mutex,
|
||||||
|
|
||||||
|
from_client: *std.Io.Reader,
|
||||||
|
to_client: *std.Io.Writer,
|
||||||
|
|
||||||
|
pub fn init(
|
||||||
connect: ?Message.Connect,
|
connect: ?Message.Connect,
|
||||||
|
in: *std.Io.Reader,
|
||||||
|
out: *std.Io.Writer,
|
||||||
|
) Client {
|
||||||
|
return .{
|
||||||
|
.connect = connect,
|
||||||
|
.write_lock = .init,
|
||||||
|
.from_client = in,
|
||||||
|
.to_client = out,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
write_lock: std.Io.Mutex,
|
/// Return true if the value was put in the clients buffer to process, else false.
|
||||||
|
pub fn send(self: *Client, io: std.Io, msg: Message) !void {
|
||||||
|
try self.write_lock.lock(io);
|
||||||
|
defer self.write_lock.unlock(io);
|
||||||
|
|
||||||
from_client: *std.Io.Reader,
|
switch (msg) {
|
||||||
to_client: *std.Io.Writer,
|
.@"+ok" => {
|
||||||
|
try writeOk(self.to_client);
|
||||||
pub fn init(
|
},
|
||||||
connect: ?Message.Connect,
|
.pong => {
|
||||||
in: *std.Io.Reader,
|
try writePong(self.to_client);
|
||||||
out: *std.Io.Writer,
|
},
|
||||||
) ClientState {
|
.info => |info| {
|
||||||
return .{
|
try writeInfo(self.to_client, info);
|
||||||
.connect = connect,
|
},
|
||||||
.write_lock = .init,
|
.msg => |m| {
|
||||||
.from_client = in,
|
try writeMsg(self.to_client, m);
|
||||||
.to_client = out,
|
},
|
||||||
};
|
else => {
|
||||||
|
std.debug.panic("unimplemented write", .{});
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Return true if the value was put in the clients buffer to process, else false.
|
pub fn next(self: *Client, allocator: std.mem.Allocator) !Message {
|
||||||
pub fn send(self: *ClientState, io: std.Io, msg: Message) !void {
|
// std.debug.print("in client awaiting next message\n", .{});
|
||||||
try self.write_lock.lock(io);
|
// errdefer std.debug.print("actually it was canceled\n", .{});
|
||||||
defer self.write_lock.unlock(io);
|
// defer std.debug.print("client returning next message!\n", .{});
|
||||||
|
return Message.next(allocator, self.from_client);
|
||||||
switch (msg) {
|
// return self.send_queue.getOne(io);
|
||||||
.@"+ok" => {
|
}
|
||||||
try writeOk(self.to_client);
|
|
||||||
},
|
|
||||||
.pong => {
|
|
||||||
try writePong(self.to_client);
|
|
||||||
},
|
|
||||||
.info => |info| {
|
|
||||||
try writeInfo(self.to_client, info);
|
|
||||||
},
|
|
||||||
.msg => |m| {
|
|
||||||
try writeMsg(self.to_client, m);
|
|
||||||
},
|
|
||||||
else => {
|
|
||||||
std.debug.panic("unimplemented write", .{});
|
|
||||||
},
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn next(self: *ClientState, allocator: std.mem.Allocator) !Message {
|
|
||||||
// std.debug.print("in client awaiting next message\n", .{});
|
|
||||||
// errdefer std.debug.print("actually it was canceled\n", .{});
|
|
||||||
// defer std.debug.print("client returning next message!\n", .{});
|
|
||||||
return Message.next(allocator, self.from_client);
|
|
||||||
// return self.send_queue.getOne(io);
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
fn writeOk(out: *std.Io.Writer) !void {
|
fn writeOk(out: *std.Io.Writer) !void {
|
||||||
_ = try out.write("+OK\r\n");
|
_ = try out.write("+OK\r\n");
|
||||||
|
|||||||
@@ -2,7 +2,7 @@ const std = @import("std");
|
|||||||
const Message = @import("./message_parser.zig").Message;
|
const Message = @import("./message_parser.zig").Message;
|
||||||
pub const ServerInfo = Message.ServerInfo;
|
pub const ServerInfo = Message.ServerInfo;
|
||||||
|
|
||||||
const ClientState = @import("./client.zig").ClientState;
|
const Client = @import("./client.zig");
|
||||||
const Server = @This();
|
const Server = @This();
|
||||||
|
|
||||||
const Subscription = struct {
|
const Subscription = struct {
|
||||||
@@ -12,7 +12,7 @@ const Subscription = struct {
|
|||||||
};
|
};
|
||||||
|
|
||||||
info: ServerInfo,
|
info: ServerInfo,
|
||||||
clients: std.AutoHashMapUnmanaged(usize, *ClientState) = .empty,
|
clients: std.AutoHashMapUnmanaged(usize, *Client) = .empty,
|
||||||
|
|
||||||
subs_lock: std.Io.Mutex = .init,
|
subs_lock: std.Io.Mutex = .init,
|
||||||
subscriptions: std.ArrayList(Subscription) = .empty,
|
subscriptions: std.ArrayList(Subscription) = .empty,
|
||||||
@@ -43,7 +43,6 @@ pub fn main(gpa: std.mem.Allocator, server_config: ServerInfo) !void {
|
|||||||
defer threaded.deinit();
|
defer threaded.deinit();
|
||||||
const io = threaded.io();
|
const io = threaded.io();
|
||||||
|
|
||||||
|
|
||||||
var tcp_server = try std.Io.net.IpAddress.listen(try std.Io.net.IpAddress.parse(
|
var tcp_server = try std.Io.net.IpAddress.listen(try std.Io.net.IpAddress.parse(
|
||||||
server.info.host,
|
server.info.host,
|
||||||
server.info.port,
|
server.info.port,
|
||||||
@@ -88,7 +87,7 @@ fn processMsgs(server: *Server, io: std.Io, alloc: std.mem.Allocator) void {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn addClient(server: *Server, allocator: std.mem.Allocator, id: usize, client: *ClientState) !void {
|
fn addClient(server: *Server, allocator: std.mem.Allocator, id: usize, client: *Client) !void {
|
||||||
// server.clients.lockPointers();
|
// server.clients.lockPointers();
|
||||||
try server.clients.put(allocator, id, client);
|
try server.clients.put(allocator, id, client);
|
||||||
// server.clients.unlockPointers();
|
// server.clients.unlockPointers();
|
||||||
@@ -132,25 +131,25 @@ fn handleConnection(
|
|||||||
var reader = stream.reader(io, r_buffer);
|
var reader = stream.reader(io, r_buffer);
|
||||||
const in = &reader.interface;
|
const in = &reader.interface;
|
||||||
|
|
||||||
var client_state: ClientState = .init(null, in, out);
|
var client: Client = .init(null, in, out);
|
||||||
try client_state.send(io, .{ .info = server.info });
|
try client.send(io, .{ .info = server.info });
|
||||||
|
|
||||||
var connect_arena: std.heap.ArenaAllocator = .init(allocator);
|
var connect_arena: std.heap.ArenaAllocator = .init(allocator);
|
||||||
defer connect_arena.deinit();
|
defer connect_arena.deinit();
|
||||||
client_state.connect = (Message.next(connect_arena.allocator(), in) catch return).connect;
|
client.connect = (Message.next(connect_arena.allocator(), in) catch return).connect;
|
||||||
|
|
||||||
try server.addClient(server_allocator, id, &client_state);
|
try server.addClient(server_allocator, id, &client);
|
||||||
defer server.removeClient(io, server_allocator, id);
|
defer server.removeClient(io, server_allocator, id);
|
||||||
|
|
||||||
// Messages are owned by the server after they are received from the client
|
// Messages are owned by the server after they are received from the client
|
||||||
while (client_state.next(server_allocator)) |msg| {
|
while (client.next(server_allocator)) |msg| {
|
||||||
switch (msg) {
|
switch (msg) {
|
||||||
.ping => {
|
.ping => {
|
||||||
// Respond to ping with pong.
|
// Respond to ping with pong.
|
||||||
try client_state.send(io, .pong);
|
try client.send(io, .pong);
|
||||||
},
|
},
|
||||||
.@"pub" => |pb| {
|
.@"pub" => |pb| {
|
||||||
_ = io.async(publishMessage, .{ server, io, server_allocator, &client_state, pb });
|
_ = io.async(publishMessage, .{ server, io, server_allocator, &client, pb });
|
||||||
},
|
},
|
||||||
.sub => |sub| {
|
.sub => |sub| {
|
||||||
try server.subscribe(io, server_allocator, id, sub);
|
try server.subscribe(io, server_allocator, id, sub);
|
||||||
@@ -181,7 +180,7 @@ fn subjectMatches(expected: []const u8, actual: []const u8) bool {
|
|||||||
return std.mem.eql(u8, expected, actual);
|
return std.mem.eql(u8, expected, actual);
|
||||||
}
|
}
|
||||||
|
|
||||||
fn publishMessage(server: *Server, io: std.Io, gpa: std.mem.Allocator, source_client: *ClientState, msg: Message.Pub) !void {
|
fn publishMessage(server: *Server, io: std.Io, gpa: std.mem.Allocator, source_client: *Client, msg: Message.Pub) !void {
|
||||||
errdefer {
|
errdefer {
|
||||||
if (source_client.connect) |c| {
|
if (source_client.connect) |c| {
|
||||||
if (c.verbose) {
|
if (c.verbose) {
|
||||||
|
|||||||
Reference in New Issue
Block a user