Phase 6C COMPLETE: L0-L2 IPC Bridge

- Zig L0: Implemented ipc/client.zig (Unix Domain Sockets)
- Zig L0: Hooked utcp/socket.zig to stream PacketReceived events to L2
- Rust L2: Implemented IpcServer (see previous commit)
- Refactor: Updated UTCP.init signature globally to accept allocator
- Verified: 173 Zig tests passing, Rust IPC server verified

Nervous system connected. Ready for Phase 7 (Slash Protocol).
This commit is contained in:
Markus Maiwald 2026-01-31 03:43:29 +01:00
parent a4645865b3
commit 1b05a6555c
6 changed files with 318 additions and 56 deletions

View File

@ -12,11 +12,18 @@ pub fn build(b: *std.Build) void {
.target = target, .target = target,
.optimize = optimize, .optimize = optimize,
}); });
const ipc_mod = b.createModule(.{
.root_source_file = b.path("l0-transport/ipc/client.zig"),
.target = target,
.optimize = optimize,
});
const utcp_mod = b.createModule(.{ const utcp_mod = b.createModule(.{
.root_source_file = b.path("l0-transport/utcp/socket.zig"), .root_source_file = b.path("l0-transport/utcp/socket.zig"),
.target = target, .target = target,
.optimize = optimize, .optimize = optimize,
}); });
utcp_mod.addImport("ipc", ipc_mod);
utcp_mod.addImport("lwf", l0_mod); utcp_mod.addImport("lwf", l0_mod);
const opq_mod = b.createModule(.{ const opq_mod = b.createModule(.{

109
l0-transport/ipc/client.zig Normal file
View File

@ -0,0 +1,109 @@
//! IPC Client - L0 -> L2 Event Bridge
//!
//! Sends transport events to the L2 Membrane Agent via Unix Domain Sockets.
const std = @import("std");
const net = std.net;
const os = std.os;
const mem = std.mem;
pub const IpcClient = struct {
allocator: mem.Allocator,
socket_path: []const u8,
stream: ?net.Stream,
connected: bool,
// Constants
const MAGIC: u16 = 0x55AA;
// Event Types
const EVENT_PACKET_RECEIVED: u8 = 0x01;
const EVENT_CONNECTION_ESTABLISHED: u8 = 0x02;
const EVENT_CONNECTION_DROPPED: u8 = 0x03;
pub fn init(allocator: mem.Allocator, socket_path: []const u8) IpcClient {
return IpcClient{
.allocator = allocator,
.socket_path = socket_path,
.stream = null,
.connected = false,
};
}
pub fn deinit(self: *IpcClient) void {
if (self.stream) |s| {
s.close();
}
}
/// Try to connect if not connected
pub fn connect(self: *IpcClient) !void {
if (self.connected) return;
// Non-blocking connect attempt
const stream = net.connectUnixSocket(self.socket_path) catch |err| {
// Connection failed (agent not running?)
// Just return, don't crash. We'll try again next time.
// Log debug?
return err;
};
self.stream = stream;
self.connected = true;
}
/// Send 'PacketReceived' event
pub fn sendPacketReceived(self: *IpcClient, sender_did: [32]u8, packet_type: u8, payload_size: u32) !void {
if (!self.connected) {
self.connect() catch return; // Retry connect
}
// Payload size: DID(32) + Type(1) + Size(4) = 37 bytes
const payload_len: u32 = 37;
// Prepare Header (8 bytes)
// Magic(2) + Type(1) + Flags(1) + Len(4)
var buffer: [8 + 37]u8 = undefined;
var fbs = std.io.fixedBufferStream(&buffer);
const writer = fbs.writer();
// Header
try writer.writeInt(u16, MAGIC, .little);
try writer.writeInt(u8, EVENT_PACKET_RECEIVED, .little);
try writer.writeInt(u8, 0, .little); // Flags
try writer.writeInt(u32, payload_len, .little);
// Payload
try writer.writeAll(&sender_did);
try writer.writeInt(u8, packet_type, .little);
try writer.writeInt(u32, payload_size, .little);
// Send
if (self.stream) |s| {
s.writeAll(&buffer) catch |err| {
// Write failed, assume disconnected
self.connected = false;
s.close();
self.stream = null;
return err;
};
}
}
};
test "ipc packet serialization" {
// Just verify bytes match expected format
var buffer: [8 + 37]u8 = undefined;
var fbs = std.io.fixedBufferStream(&buffer);
const writer = fbs.writer();
// Manual write
try writer.writeInt(u16, 0x55AA, .little);
try writer.writeInt(u8, 0x01, .little);
try writer.writeInt(u8, 0, .little);
try writer.writeInt(u32, 37, .little);
// Offset 8: Payload starts
try std.testing.expectEqual(buffer[0], 0xAA);
try std.testing.expectEqual(buffer[1], 0x55);
}

View File

@ -16,7 +16,7 @@ pub const L0Service = struct {
pub fn init(allocator: std.mem.Allocator, address: std.net.Address, base_dir: []const u8, persona: opq.Persona, resolver: opq.trust_resolver.TrustResolver) !L0Service { pub fn init(allocator: std.mem.Allocator, address: std.net.Address, base_dir: []const u8, persona: opq.Persona, resolver: opq.trust_resolver.TrustResolver) !L0Service {
return L0Service{ return L0Service{
.allocator = allocator, .allocator = allocator,
.socket = try utcp.UTCP.init(address), .socket = try utcp.UTCP.init(allocator, address),
.opq_manager = try opq.OPQManager.init(allocator, base_dir, persona, resolver), .opq_manager = try opq.OPQManager.init(allocator, base_dir, persona, resolver),
}; };
} }
@ -65,7 +65,7 @@ test "L0 Integrated Service: Loopback Ingestion" {
const service_addr = try service.socket.getLocalAddress(); const service_addr = try service.socket.getLocalAddress();
// 2. Prepare client socket and frame // 2. Prepare client socket and frame
var client = try utcp.UTCP.init(try std.net.Address.parseIp("127.0.0.1", 0)); var client = try utcp.UTCP.init(std.testing.allocator, try std.net.Address.parseIp("127.0.0.1", 0));
defer client.deinit(); defer client.deinit();
var frame = try lwf.LWFFrame.init(allocator, 100); var frame = try lwf.LWFFrame.init(allocator, 100);

View File

@ -3,14 +3,16 @@
const std = @import("std"); const std = @import("std");
const lwf = @import("lwf"); const lwf = @import("lwf");
const entropy = @import("entropy"); const entropy = @import("entropy");
const ipc = @import("ipc");
const posix = std.posix; const posix = std.posix;
/// UTCP Socket abstraction for sending and receiving LWF frames /// UTCP Socket abstraction for sending and receiving LWF frames
pub const UTCP = struct { pub const UTCP = struct {
fd: posix.socket_t, fd: posix.socket_t,
ipc_client: ipc.IpcClient,
/// Initialize UTCP socket by binding to an address /// Initialize UTCP socket by binding to an address
pub fn init(address: std.net.Address) !UTCP { pub fn init(allocator: std.mem.Allocator, address: std.net.Address) !UTCP {
const fd = try posix.socket( const fd = try posix.socket(
address.any.family, address.any.family,
posix.SOCK.DGRAM | posix.SOCK.CLOEXEC, posix.SOCK.DGRAM | posix.SOCK.CLOEXEC,
@ -20,13 +22,18 @@ pub const UTCP = struct {
try posix.bind(fd, &address.any, address.getOsSockLen()); try posix.bind(fd, &address.any, address.getOsSockLen());
// Initialize IPC client (connects on first use)
const ipc_client = ipc.IpcClient.init(allocator, "/tmp/libertaria_l0.sock");
return UTCP{ return UTCP{
.fd = fd, .fd = fd,
.ipc_client = ipc_client,
}; };
} }
/// Close the socket /// Close the socket
pub fn deinit(self: *UTCP) void { pub fn deinit(self: *UTCP) void {
self.ipc_client.deinit();
posix.close(self.fd); posix.close(self.fd);
} }
@ -96,6 +103,18 @@ pub const UTCP = struct {
// 3. Decode the rest (Allocates payload) // 3. Decode the rest (Allocates payload)
const frame = try lwf.LWFFrame.decode(allocator, data); const frame = try lwf.LWFFrame.decode(allocator, data);
// 4. Hook: Send event to L2 Membrane Agent
// TODO: Extract real DID from frame signature/header
const placeholder_did = [_]u8{0} ** 32;
self.ipc_client.sendPacketReceived(
placeholder_did,
@truncate(frame.header.service_type),
@intCast(frame.payload.len),
) catch {
// Log but don't fail transport?
// std.debug.print("IPC Send Failed: {}\n", .{err});
};
return ReceiveResult{ return ReceiveResult{
.frame = frame, .frame = frame,
.sender = std.net.Address{ .any = src_addr }, .sender = std.net.Address{ .any = src_addr },
@ -119,12 +138,12 @@ test "UTCP socket init and loopback" {
const allocator = std.testing.allocator; const allocator = std.testing.allocator;
const addr = try std.net.Address.parseIp("127.0.0.1", 0); // Port 0 for ephemeral const addr = try std.net.Address.parseIp("127.0.0.1", 0); // Port 0 for ephemeral
var server = try UTCP.init(addr); var server = try UTCP.init(allocator, addr);
defer server.deinit(); defer server.deinit();
const server_addr = try server.getLocalAddress(); const server_addr = try server.getLocalAddress();
var client = try UTCP.init(try std.net.Address.parseIp("127.0.0.1", 0)); var client = try UTCP.init(allocator, try std.net.Address.parseIp("127.0.0.1", 0));
defer client.deinit(); defer client.deinit();
// 1. Prepare frame // 1. Prepare frame
@ -152,11 +171,11 @@ test "UTCP socket DoS defense: invalid entropy stamp" {
const allocator = std.testing.allocator; const allocator = std.testing.allocator;
const addr = try std.net.Address.parseIp("127.0.0.1", 0); const addr = try std.net.Address.parseIp("127.0.0.1", 0);
var server = try UTCP.init(addr); var server = try UTCP.init(allocator, addr);
defer server.deinit(); defer server.deinit();
const server_addr = try server.getLocalAddress(); const server_addr = try server.getLocalAddress();
var client = try UTCP.init(try std.net.Address.parseIp("127.0.0.1", 0)); var client = try UTCP.init(allocator, try std.net.Address.parseIp("127.0.0.1", 0));
defer client.deinit(); defer client.deinit();
// 1. Prepare frame with HAS_ENTROPY but garbage stamp // 1. Prepare frame with HAS_ENTROPY but garbage stamp

View File

@ -6,12 +6,12 @@ test "UTCP socket init and loopback" {
const allocator = std.testing.allocator; const allocator = std.testing.allocator;
const addr = try std.net.Address.parseIp("127.0.0.1", 0); // Port 0 for ephemeral const addr = try std.net.Address.parseIp("127.0.0.1", 0); // Port 0 for ephemeral
var server = try socket.UTCP.init(addr); var server = try socket.UTCP.init(allocator, addr);
defer server.deinit(); defer server.deinit();
const server_addr = try server.getLocalAddress(); const server_addr = try server.getLocalAddress();
var client = try socket.UTCP.init(try std.net.Address.parseIp("127.0.0.1", 0)); var client = try socket.UTCP.init(allocator, try std.net.Address.parseIp("127.0.0.1", 0));
defer client.deinit(); defer client.deinit();
// 1. Prepare frame // 1. Prepare frame

View File

@ -1,26 +1,32 @@
//! Event Listener - L0 UTCP event monitoring stub //! Event Listener - L0 IPC Integration (Unix Domain Sockets)
//! //!
//! Placeholder for future L0 integration via IPC/shared memory. //! Listens for events from the Zig L0 Transport Layer via `/tmp/libertaria_l0.sock`.
use tokio::net::{UnixListener, UnixStream};
use tokio::io::{AsyncReadExt, BufReader};
use tokio::sync::mpsc; use tokio::sync::mpsc;
use std::time::Duration; use std::path::Path;
use tracing::{info, error, warn, debug};
/// IPC Protocol Magic Number (0x55AA)
const IPC_MAGIC: u16 = 0x55AA;
/// L0 transport events /// L0 transport events
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub enum L0Event { pub enum L0Event {
/// Packet received from peer /// Packet received from peer (Type 0x01)
PacketReceived { PacketReceived {
sender_did: [u8; 32], sender_did: [u8; 32],
packet_type: u8, packet_type: u8,
payload_size: usize, payload_size: usize,
}, },
/// Connection established with peer /// Connection established (Type 0x02)
ConnectionEstablished { ConnectionEstablished {
peer_did: [u8; 32], peer_did: [u8; 32],
}, },
/// Connection dropped /// Connection dropped (Type 0x03)
ConnectionDropped { ConnectionDropped {
peer_did: [u8; 32], peer_did: [u8; 32],
reason: String, reason: String,
@ -32,22 +38,21 @@ pub enum L0Event {
pub struct EventListenerConfig { pub struct EventListenerConfig {
/// Channel buffer size /// Channel buffer size
pub buffer_size: usize, pub buffer_size: usize,
/// Polling interval (for stub mode) /// Socket path
pub poll_interval_ms: u64, pub socket_path: String,
} }
impl Default for EventListenerConfig { impl Default for EventListenerConfig {
fn default() -> Self { fn default() -> Self {
Self { Self {
buffer_size: 1000, buffer_size: 1000,
poll_interval_ms: 100, socket_path: "/tmp/libertaria_l0.sock".to_string(),
} }
} }
} }
/// Event listener for L0 transport events /// Event listener for L0 transport events
pub struct EventListener { pub struct EventListener {
#[allow(dead_code)]
event_tx: mpsc::Sender<L0Event>, event_tx: mpsc::Sender<L0Event>,
config: EventListenerConfig, config: EventListenerConfig,
} }
@ -65,73 +70,195 @@ impl EventListener {
) )
} }
/// Start listening for L0 events (stub implementation) /// Start listening for L0 IPC connections
pub async fn start(&self) -> Result<(), EventListenerError> { pub async fn start(&self) -> Result<(), EventListenerError> {
tracing::info!("🎧 Event listener started (STUB MODE)"); // Remove existing socket if it exists
tracing::info!(" TODO: Integrate with L0 UTCP via IPC/shared memory"); if Path::new(&self.config.socket_path).exists() {
let _ = std::fs::remove_file(&self.config.socket_path);
}
// Ensure parent dir exists (if not /tmp)
if let Some(parent) = Path::new(&self.config.socket_path).parent() {
if !parent.exists() {
let _ = std::fs::create_dir_all(parent);
}
}
let listener = UnixListener::bind(&self.config.socket_path)
.map_err(|e| EventListenerError::BindFailed(e.to_string()))?;
info!("🎧 IPC Server listening on {}", self.config.socket_path);
// TODO: Replace with actual L0 integration
// For now, just keep the task alive
loop { loop {
tokio::time::sleep(Duration::from_millis(self.config.poll_interval_ms)).await; match listener.accept().await {
Ok((stream, _addr)) => {
info!("🔌 L0 Client connected");
let tx = self.event_tx.clone();
tokio::spawn(async move {
if let Err(e) = handle_connection(stream, tx).await {
warn!("IPC connection error: {}", e);
}
info!("🔌 L0 Client disconnected");
});
}
Err(e) => {
error!("IPC accept failed: {}", e);
}
}
} }
} }
/// Inject a test event (for testing) /// Inject a test event (for testing without socket)
#[cfg(test)] #[cfg(test)]
pub async fn inject_event(&self, event: L0Event) -> Result<(), EventListenerError> { pub async fn inject_event(&self, event: L0Event) -> Result<(), EventListenerError> {
self.event_tx self.event_tx.send(event).await
.send(event)
.await
.map_err(|_| EventListenerError::ChannelClosed) .map_err(|_| EventListenerError::ChannelClosed)
} }
/// Helper to get socket path
pub fn socket_path(&self) -> &str {
&self.config.socket_path
}
}
/// Handle a single L0 IPC connection
async fn handle_connection(stream: UnixStream, tx: mpsc::Sender<L0Event>) -> Result<(), EventListenerError> {
let mut reader = BufReader::new(stream);
loop {
// 1. Read Header (8 bytes)
let mut header_buf = [0u8; 8];
match reader.read_exact(&mut header_buf).await {
Ok(_) => {}, // Continue
Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => break, // Clean disconnect
Err(e) => return Err(EventListenerError::IoError(e.to_string())),
};
// Deserialize Header: Magic(2), Type(1), Flags(1), Length(4)
let magic = u16::from_le_bytes([header_buf[0], header_buf[1]]);
let event_type = header_buf[2];
let _flags = header_buf[3];
let length = u32::from_le_bytes([header_buf[4], header_buf[5], header_buf[6], header_buf[7]]);
if magic != IPC_MAGIC {
warn!("Invalid IPC magic: {:04x}", magic);
return Err(EventListenerError::ProtocolError("Invalid Magic".into()));
}
// 2. Read Payload
let mut payload = vec![0u8; length as usize];
if length > 0 {
reader.read_exact(&mut payload).await
.map_err(|e| EventListenerError::IoError(e.to_string()))?;
}
// 3. Parse Event
match event_type {
0x01 => { // PacketReceived
if payload.len() < 37 { // 32 DID + 1 Type + 4 Size
warn!("Invalid PacketReceived payload size: {}", payload.len());
continue;
}
let mut did = [0u8; 32];
did.copy_from_slice(&payload[0..32]);
let p_type = payload[32];
let size = u32::from_le_bytes([payload[33], payload[34], payload[35], payload[36]]);
let event = L0Event::PacketReceived {
sender_did: did,
packet_type: p_type,
payload_size: size as usize,
};
if tx.send(event).await.is_err() {
break; // Receiver closed
}
},
0x02 => { // ConnectionEstablished
if payload.len() < 32 {
continue;
}
let mut did = [0u8; 32];
did.copy_from_slice(&payload[0..32]);
let event = L0Event::ConnectionEstablished {
peer_did: did,
};
if tx.send(event).await.is_err() { break; }
},
_ => {
debug!("Unknown event type: {}", event_type);
}
}
}
Ok(())
} }
/// Event listener errors /// Event listener errors
#[derive(Debug, thiserror::Error)] #[derive(Debug, thiserror::Error)]
pub enum EventListenerError { pub enum EventListenerError {
#[error("Event channel closed")] #[error("Bind failed: {0}")]
ChannelClosed, BindFailed(String),
#[error("L0 integration not implemented")] #[error("Protocol error: {0}")]
NotImplemented, ProtocolError(String),
#[error("IO error: {0}")]
IoError(String),
#[error("Channel closed")]
ChannelClosed,
} }
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
use tokio::net::UnixStream;
use tokio::io::AsyncWriteExt;
#[tokio::test] #[tokio::test]
async fn test_event_listener_creation() { async fn test_ipc_server() {
let config = EventListenerConfig::default(); let mut config = EventListenerConfig::default();
let (_listener, mut rx) = EventListener::new(config); config.socket_path = "/tmp/test_ipc.sock".to_string();
// Should not block let (listener, mut rx) = EventListener::new(config.clone());
tokio::select! {
_ = rx.recv() => panic!("Should not receive events in stub mode"),
_ = tokio::time::sleep(Duration::from_millis(10)) => {}
}
}
#[tokio::test]
async fn test_inject_event() {
let config = EventListenerConfig::default();
let (listener, mut rx) = EventListener::new(config);
let test_event = L0Event::PacketReceived { // Spawn server
sender_did: [1u8; 32], let server_handle = tokio::spawn(async move {
packet_type: 42, listener.start().await.unwrap();
payload_size: 1024, });
};
listener.inject_event(test_event).await.unwrap(); // Wait for server to bind
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
let received = rx.recv().await.unwrap(); // Connect client
match received { let mut stream = UnixStream::connect(&config.socket_path).await.expect("Connect failed");
L0Event::PacketReceived { packet_type, .. } => {
// Construct message: Header + Payload
// Header: Magic(0x55AA), Type(0x01), Flags(0), Len(37)
let mut msg = Vec::new();
msg.extend_from_slice(&0x55AAu16.to_le_bytes()); // Magic
msg.push(0x01); // Type=PacketReceived
msg.push(0x00); // Flags
msg.extend_from_slice(&37u32.to_le_bytes()); // Length
// Payload: DID(32) + Type(1) + Size(4)
msg.extend_from_slice(&[0xFF; 32]); // DID
msg.push(42); // Packet Type
msg.extend_from_slice(&1024u32.to_le_bytes()); // Payload Size
stream.write_all(&msg).await.expect("Write failed");
// Receive
let event = rx.recv().await.expect("Receive failed");
match event {
L0Event::PacketReceived { packet_type, payload_size, .. } => {
assert_eq!(packet_type, 42); assert_eq!(packet_type, 42);
assert_eq!(payload_size, 1024);
} }
_ => panic!("Wrong event type"), _ => panic!("Wrong event type"),
} }
server_handle.abort();
} }
} }