3 Commits

Author SHA1 Message Date
5b3e322b60 Move connection writer into connection
Make chunk size a part of the connection type
2026-01-25 13:35:08 -05:00
62f7c98619 Start building connection writer 2026-01-25 01:14:15 -05:00
7f7f162dc2 wait for child to die 2026-01-25 01:14:13 -05:00
4 changed files with 216 additions and 78 deletions

View File

@@ -137,7 +137,7 @@ pub fn connect(self: Client, io: Io, payload: []const u8) !SaprusConnection {
const RawSocket = @import("./RawSocket.zig"); const RawSocket = @import("./RawSocket.zig");
const SaprusMessage = @import("message.zig").Message; const SaprusMessage = @import("message.zig").Message;
const SaprusConnection = @import("Connection.zig"); const SaprusConnection = @import("Connection.zig").Default;
const EthIpUdp = @import("./EthIpUdp.zig").EthIpUdp; const EthIpUdp = @import("./EthIpUdp.zig").EthIpUdp;
const std = @import("std"); const std = @import("std");

View File

@@ -1,18 +1,22 @@
socket: RawSocket, pub fn Chunked(comptime cs: usize) type {
headers: EthIpUdp, return struct {
connection: SaprusMessage, socket: RawSocket,
headers: EthIpUdp,
connection: SaprusMessage,
const Connection = @This(); const Self = @This();
pub fn init(socket: RawSocket, headers: EthIpUdp, connection: SaprusMessage) Connection { pub const chunk_size = cs;
pub fn init(socket: RawSocket, headers: EthIpUdp, connection: SaprusMessage) Self {
return .{ return .{
.socket = socket, .socket = socket,
.headers = headers, .headers = headers,
.connection = connection, .connection = connection,
}; };
} }
pub fn next(self: Connection, io: Io, buf: []u8) ![]const u8 { pub fn next(self: Self, io: Io, buf: []u8) ![]const u8 {
_ = io; _ = io;
log.debug("Awaiting connection message", .{}); log.debug("Awaiting connection message", .{});
const res = try self.socket.receive(buf); const res = try self.socket.receive(buf);
@@ -23,9 +27,9 @@ pub fn next(self: Connection, io: Io, buf: []u8) ![]const u8 {
log.debug("Payload was {s}", .{connection_res.payload}); log.debug("Payload was {s}", .{connection_res.payload});
return connection_res.payload; return connection_res.payload;
} }
pub fn send(self: *Connection, io: Io, buf: []const u8) !void { pub fn send(self: *Self, io: Io, buf: []const u8) !void {
const io_source: std.Random.IoSource = .{ .io = io }; const io_source: std.Random.IoSource = .{ .io = io };
const rand = io_source.interface(); const rand = io_source.interface();
@@ -39,7 +43,7 @@ pub fn send(self: *Connection, io: Io, buf: []const u8) !void {
self.headers.setPayloadLen(connection_bytes.len); self.headers.setPayloadLen(connection_bytes.len);
var msg_buf: [2048]u8 = undefined; var msg_buf: [2048]u8 = undefined;
var msg_w: Writer = .fixed(&msg_buf); var msg_w: Io.Writer = .fixed(&msg_buf);
try msg_w.writeAll(&self.headers.toBytes()); try msg_w.writeAll(&self.headers.toBytes());
try msg_w.writeAll(connection_bytes); try msg_w.writeAll(connection_bytes);
const full_msg = msg_w.buffered(); const full_msg = msg_w.buffered();
@@ -47,11 +51,93 @@ pub fn send(self: *Connection, io: Io, buf: []const u8) !void {
try self.socket.send(full_msg); try self.socket.send(full_msg);
log.debug("Sent {} byte connection message", .{full_msg.len}); log.debug("Sent {} byte connection message", .{full_msg.len});
}
pub const Writer = struct {
connection: *Self,
io: Io,
interface: Io.Writer,
err: ?anyerror,
pub fn init(io: Io, connection: *Self, buf: []u8) Writer {
return .{
.connection = connection,
.io = io,
.interface = .{
.vtable = &.{
.drain = drain,
},
.buffer = buf,
},
.err = null,
};
}
pub fn drain(io_w: *Io.Writer, data: []const []const u8, splat: usize) Io.Writer.Error!usize {
_ = splat;
const self: *Writer = @alignCast(@fieldParentPtr("interface", io_w));
var res: usize = 0;
// Get buffered data from the writer
const buffered = io_w.buffered();
var buf_offset: usize = 0;
// Process buffered data in chunks
while (buf_offset < buffered.len) {
const current_chunk_size = @min(chunk_size, buffered.len - buf_offset);
const chunk = buffered[buf_offset..][0..current_chunk_size];
// Base64 encode the chunk
var encoded_buf: [chunk_size * 2]u8 = undefined;
const encoded_len = std.base64.standard.Encoder.calcSize(chunk.len);
const encoded = std.base64.standard.Encoder.encode(&encoded_buf, chunk);
// Send encoded chunk
self.connection.send(self.io, encoded[0..encoded_len]) catch |err| {
self.err = err;
return error.WriteFailed;
};
self.io.sleep(.fromMilliseconds(40), .boot) catch @panic("honk shoo");
buf_offset += current_chunk_size;
res += current_chunk_size;
}
// Process data slices
for (data) |slice| {
var slice_offset: usize = 0;
while (slice_offset < slice.len) {
const current_chunk_size = @min(chunk_size, slice.len - slice_offset);
const chunk = slice[slice_offset..][0..current_chunk_size];
// Base64 encode the chunk
var encoded_buf: [chunk_size * 2]u8 = undefined;
const encoded_len = std.base64.standard.Encoder.calcSize(chunk.len);
const encoded = std.base64.standard.Encoder.encode(&encoded_buf, chunk);
// Send encoded chunk
self.connection.send(self.io, encoded[0..encoded_len]) catch |err| {
self.err = err;
return error.WriteFailed;
};
self.io.sleep(.fromMilliseconds(40), .boot) catch @panic("honk shoo");
slice_offset += current_chunk_size;
res += current_chunk_size;
}
}
return res;
}
};
};
} }
pub const Default = Chunked(RawSocket.max_payload_len);
const std = @import("std"); const std = @import("std");
const Io = std.Io; const Io = std.Io;
const Writer = std.Io.Writer;
const log = std.log; const log = std.log;

View File

@@ -46,7 +46,7 @@ export fn zaprus_connect(
const c: ?*zaprus.Client = @ptrCast(@alignCast(client)); const c: ?*zaprus.Client = @ptrCast(@alignCast(client));
const zc = c orelse return null; const zc = c orelse return null;
const connection = alloc.create(zaprus.Connection) catch return null; const connection = alloc.create(zaprus.Connection.Default) catch return null;
connection.* = zc.connect(io, payload[0..payload_len]) catch { connection.* = zc.connect(io, payload[0..payload_len]) catch {
alloc.destroy(connection); alloc.destroy(connection);
return null; return null;
@@ -55,7 +55,7 @@ export fn zaprus_connect(
} }
export fn zaprus_deinit_connection(connection: ?*ZaprusConnection) void { export fn zaprus_deinit_connection(connection: ?*ZaprusConnection) void {
const c: ?*zaprus.Connection = @ptrCast(@alignCast(connection)); const c: ?*zaprus.Connection.Default = @ptrCast(@alignCast(connection));
if (c) |zc| { if (c) |zc| {
alloc.destroy(zc); alloc.destroy(zc);
} }
@@ -67,7 +67,7 @@ export fn zaprus_connection_next(
capacity: usize, capacity: usize,
out_len: *usize, out_len: *usize,
) c_int { ) c_int {
const c: ?*zaprus.Connection = @ptrCast(@alignCast(connection)); const c: ?*zaprus.Connection.Default = @ptrCast(@alignCast(connection));
const zc = c orelse return 1; const zc = c orelse return 1;
const result = zc.next(io, out[0..capacity]) catch return 1; const result = zc.next(io, out[0..capacity]) catch return 1;
@@ -80,7 +80,7 @@ export fn zaprus_connection_send(
payload: [*c]const u8, payload: [*c]const u8,
payload_len: usize, payload_len: usize,
) c_int { ) c_int {
const c: ?*zaprus.Connection = @ptrCast(@alignCast(connection)); const c: ?*zaprus.Connection.Default = @ptrCast(@alignCast(connection));
const zc = c orelse return 1; const zc = c orelse return 1;
zc.send(io, payload[0..payload_len]) catch return 1; zc.send(io, payload[0..payload_len]) catch return 1;

View File

@@ -124,8 +124,8 @@ pub fn main(init: std.process.Init) !void {
return; return;
} }
var init_con_buf: [SaprusClient.max_payload_len]u8 = undefined; var con_buf: [SaprusClient.max_payload_len * 2]u8 = undefined;
var w: Writer = .fixed(&init_con_buf); var w: Writer = .fixed(&con_buf);
try w.print("{b64}", .{flags.connect.?}); try w.print("{b64}", .{flags.connect.?});
if (flags.connect != null) { if (flags.connect != null) {
@@ -142,6 +142,8 @@ pub fn main(init: std.process.Init) !void {
log.debug("Connection started", .{}); log.debug("Connection started", .{});
var connection_writer: zaprus.Connection.Default.Writer = .init(init.io, &connection, &con_buf);
next_message: while (true) { next_message: while (true) {
var res_buf: [2048]u8 = undefined; var res_buf: [2048]u8 = undefined;
try client.socket.setTimeout(if (is_debug) 60 else 600, 0); try client.socket.setTimeout(if (is_debug) 60 else 600, 0);
@@ -157,36 +159,18 @@ pub fn main(init: std.process.Init) !void {
continue; continue;
}; };
const child = std.process.spawn(init.io, .{ var child = std.process.spawn(init.io, .{
.argv = &.{ "bash", "-c", connection_payload }, .argv = &.{ "bash", "-c", connection_payload },
.stdout = .pipe, .stdout = .pipe,
.stderr = .pipe,
}) catch continue; }) catch continue;
var child_stdout: std.ArrayList(u8) = .empty; var child_output_buf: [SaprusClient.max_payload_len]u8 = undefined;
defer child_stdout.deinit(init.gpa); var child_output_reader = child.stdout.?.reader(init.io, &child_output_buf);
var child_stderr: std.ArrayList(u8) = .empty;
defer child_stderr.deinit(init.gpa);
child.collectOutput(init.gpa, &child_stdout, &child_stderr, std.math.maxInt(usize)) catch |err| { _ = child_output_reader.interface.stream(
log.debug("Failed to collect output: {t}", .{err}); &connection_writer.interface,
continue; .limited(@TypeOf(connection_writer.connection.*).chunk_size * 10),
}; ) catch continue :next_message;
var cmd_output_buf: [SaprusClient.max_payload_len * 2]u8 = undefined;
var cmd_output: Writer = .fixed(&cmd_output_buf);
var cmd_output_window_iter = std.mem.window(u8, child_stdout.items, SaprusClient.max_payload_len, SaprusClient.max_payload_len);
while (cmd_output_window_iter.next()) |chunk| {
cmd_output.end = 0;
// Unreachable because the cmd_output_buf is twice the size of the chunk.
cmd_output.print("{b64}", .{chunk}) catch unreachable;
connection.send(init.io, cmd_output.buffered()) catch |err| {
log.debug("Failed to send connection chunk: {t}", .{err});
continue :next_message;
};
try init.io.sleep(.fromMilliseconds(40), .boot);
}
} }
} }
} }
@@ -194,6 +178,74 @@ pub fn main(init: std.process.Init) !void {
unreachable; unreachable;
} }
// const ConnectionWriter = struct {
// connection: *zaprus.Connection,
// io: std.Io,
// interface: Writer,
// err: ?anyerror,
// pub fn init(io: std.Io, connection: *zaprus.Connection) ConnectionWriter {
// return .{
// .connection = connection,
// .io = io,
// .interface = .{},
// };
// }
// pub fn drain(io_w: *Writer, data: []const []const u8, splat: usize) Writer.Error!usize {
// var res: usize = 0;
// const w: *ConnectionWriter = @alignCast(@fieldParentPtr("interface", io_w));
// var buffered_reader: std.Io.Reader = .fixed(io_w.buffered());
// const io = w.io;
// // Collect the output in chunks
// var output_buf: [SaprusClient.max_payload_len * 2]u8 = undefined;
// var output_writer: Writer = .fixed(&output_buf);
// while (buffered_reader.end - buffered_reader.seek > SaprusClient.max_payload_len) {
// output_writer.end = 0;
// output_writer.print("{b64}", .{&buffered_reader.takeArray(SaprusClient.max_payload_len)});
// self.connection.send(io, output_writer.buffered()) catch |err| {
// self.err = err;
// return error.WriteFailed;
// };
// res += SaprusClient.max_payload_len;
// }
// // accumulate the remainder of buffered and the data slices before writing b64 to the output_writer
// var output_acc_buf: [SaprusClient.max_payload_len]u8 = undefined;
// var output_acc_w: Writer = .fixed(&output_acc_buf);
// // We can write the rest of buffered_reader to the output_writer because we know after
// // the previous loop the maximum length of the remaining data is SaprusClient.max_payload_len.
// output_writer.end = 0;
// res += output_acc_w.write(buffered_reader.buffered()) catch unreachable;
// for (data[0 .. data.len - 1]) |chunk| {
// if (chunk.len < SaprusClient.max_payload_len - output_acc_w.end) {
// res += output_acc_w.write(chunk) catch unreachable;
// continue;
// }
// var chunk_reader: std.Io.Reader = .fixed(chunk);
// while (chunk_reader.end - chunk_reader.seek > 0) {
// res += chunk_reader.stream(
// &output_acc_w,
// .limited(SaprusClient.max_payload_len - output_acc_w.end),
// ) catch unreachable;
// if (SaprusClient.max_payload_len - output_acc_w.end == 0) {
// output_writer.print("{b64}", .{output_acc_w.buffered()});
// output_acc_w.end = 0;
// self.connection.send(io, output_writer.buffered()) catch |err| {
// self.err = err;
// return error.WriteFailed;
// };
// output_writer.end = 0;
// }
// }
// }
// return res;
// }
// };
fn parseDest(in: ?[]const u8) [4]u8 { fn parseDest(in: ?[]const u8) [4]u8 {
if (in) |dest| { if (in) |dest| {
if (dest.len <= 4) { if (dest.len <= 4) {