libertaria-stack/l1-identity/qvl/gossip.zig

276 lines
9.0 KiB
Zig

//! RFC-0120 Extension: Aleph-Style Gossip
//!
//! Probabilistic flooding for trust signal propagation.
//! Handles intermittent connectivity (Kenya Rule) via:
//! - Erasure-tolerant message references
//! - Coverage tracking for partition detection
//! - Entropy-stamped messages for replay protection
//!
//! Design: Each gossip message references k random prior messages,
//! creating a DAG structure resilient to packet loss.
const std = @import("std");
const types = @import("types.zig");
const NodeId = types.NodeId;
const RiskGraph = types.RiskGraph;
/// Gossip message with DAG references.
pub const GossipMessage = struct {
/// Unique message ID (hash of content + entropy)
id: u64,
/// Sender node index
sender: NodeId,
/// References to prior messages (DAG structure)
refs: []const u64,
/// Payload type
msg_type: MessageType,
/// Entropy stamp for temporal ordering (RFC-0100)
entropy_stamp: u64,
/// Message payload
payload: []const u8,
pub const MessageType = enum(u8) {
trust_vouch = 0, // New trust edge
trust_revoke = 1, // Edge removal
reputation_update = 2, // Score change
heartbeat = 3, // Liveness check
};
/// Compute message ID from content.
pub fn computeId(sender: NodeId, entropy_stamp: u64, payload: []const u8) u64 {
var hasher = std.hash.Wyhash.init(0);
hasher.update(std.mem.asBytes(&sender));
hasher.update(std.mem.asBytes(&entropy_stamp));
hasher.update(payload);
return hasher.final();
}
};
/// Gossip state tracker for a node.
pub const GossipState = struct {
allocator: std.mem.Allocator,
/// Recent message IDs (for reference sampling)
recent_messages: std.ArrayListUnmanaged(u64),
/// Seen message IDs (for deduplication)
seen_messages: std.AutoHashMapUnmanaged(u64, void),
/// Coverage tracking: which nodes have we heard from recently
heard_from: std.AutoHashMapUnmanaged(NodeId, u64), // node -> last_entropy_stamp
/// Configuration
config: Config,
pub const Config = struct {
/// Number of prior messages to reference
ref_k: usize = 3,
/// Maximum recent messages to track
max_recent: usize = 100,
/// Probability of forwarding (1.0 - drop_prob)
forward_prob: f64 = 0.7,
/// Coverage window (entropy stamp delta)
coverage_window: u64 = 60_000_000_000, // 60 seconds in nanoseconds
};
pub fn init(allocator: std.mem.Allocator, config: Config) GossipState {
return .{
.allocator = allocator,
.recent_messages = .{},
.seen_messages = .{},
.heard_from = .{},
.config = config,
};
}
pub fn deinit(self: *GossipState) void {
self.recent_messages.deinit(self.allocator);
self.seen_messages.deinit(self.allocator);
self.heard_from.deinit(self.allocator);
}
/// Check if message is new (not seen before).
pub fn isNewMessage(self: *GossipState, msg_id: u64) !bool {
if (self.seen_messages.get(msg_id)) |_| {
return false;
}
try self.seen_messages.put(self.allocator, msg_id, {});
return true;
}
/// Record a message as seen.
pub fn recordMessage(self: *GossipState, msg: *const GossipMessage) !void {
// Add to seen set
try self.seen_messages.put(self.allocator, msg.id, {});
// Add to recent messages (for future refs)
if (self.recent_messages.items.len >= self.config.max_recent) {
_ = self.recent_messages.orderedRemove(0);
}
try self.recent_messages.append(self.allocator, msg.id);
// Update heard_from
try self.heard_from.put(self.allocator, msg.sender, msg.entropy_stamp);
}
/// Sample k random references from recent messages.
pub fn sampleRefs(self: *GossipState, rand: std.Random, allocator: std.mem.Allocator) ![]u64 {
const k = @min(self.config.ref_k, self.recent_messages.items.len);
if (k == 0) return &[_]u64{};
var refs = try allocator.alloc(u64, k);
var selected = std.AutoHashMapUnmanaged(usize, void){};
defer selected.deinit(allocator);
var i: usize = 0;
while (i < k) {
const idx = rand.intRangeLessThan(usize, 0, self.recent_messages.items.len);
if (selected.get(idx)) |_| continue;
try selected.put(allocator, idx, {});
refs[i] = self.recent_messages.items[idx];
i += 1;
}
return refs;
}
/// Compute coverage ratio: fraction of nodes heard from recently.
pub fn computeCoverage(self: *const GossipState, total_nodes: usize, current_entropy: u64) f64 {
if (total_nodes == 0) return 1.0;
var active_count: usize = 0;
var it = self.heard_from.iterator();
while (it.next()) |entry| {
const last_stamp = entry.value_ptr.*;
if (current_entropy - last_stamp <= self.config.coverage_window) {
active_count += 1;
}
}
return @as(f64, @floatFromInt(active_count)) / @as(f64, @floatFromInt(total_nodes));
}
};
/// Gossip result after flooding.
pub const FloodResult = struct {
/// Number of neighbors that received the message
sent_count: usize,
/// Total neighbors attempted
total_neighbors: usize,
/// Coverage after flood
coverage: f64,
};
/// Probabilistic flood of a gossip message to neighbors.
pub fn floodMessage(
graph: *const RiskGraph,
sender: NodeId,
message: *const GossipMessage,
state: *GossipState,
rand: std.Random,
// In real impl, this would be a transport callback
// send_fn: *const fn(NodeId, []const u8) void,
) FloodResult {
var sent_count: usize = 0;
const neighbors = graph.neighbors(sender);
for (neighbors) |edge_idx| {
// In real impl: extract neighbor ID and send
_ = edge_idx; // Will be used when UTCP transport is integrated
// Probabilistic drop (simulates lossy network)
if (rand.float(f64) <= state.config.forward_prob) {
// In real impl: send_fn(neighbor, serialize(message));
// TODO: Integrate with UTCP transport layer
sent_count += 1;
}
}
const coverage = state.computeCoverage(graph.nodeCount(), message.entropy_stamp);
return FloodResult{
.sent_count = sent_count,
.total_neighbors = neighbors.len,
.coverage = coverage,
};
}
/// Create a new gossip message.
pub fn createMessage(
sender: NodeId,
msg_type: GossipMessage.MessageType,
payload: []const u8,
entropy_stamp: u64,
state: *GossipState,
rand: std.Random,
allocator: std.mem.Allocator,
) !GossipMessage {
const refs = try state.sampleRefs(rand, allocator);
const id = GossipMessage.computeId(sender, entropy_stamp, payload);
return GossipMessage{
.id = id,
.sender = sender,
.refs = refs,
.msg_type = msg_type,
.entropy_stamp = entropy_stamp,
.payload = payload,
};
}
// ============================================================================
// TESTS
// ============================================================================
test "GossipState: message deduplication" {
const allocator = std.testing.allocator;
var state = GossipState.init(allocator, .{});
defer state.deinit();
const msg_id: u64 = 12345;
// First time: new
try std.testing.expect(try state.isNewMessage(msg_id));
// Second time: duplicate
try std.testing.expect(!(try state.isNewMessage(msg_id)));
}
test "GossipState: coverage tracking" {
const allocator = std.testing.allocator;
var state = GossipState.init(allocator, .{ .coverage_window = 1000 });
defer state.deinit();
const now: u64 = 5000;
// Record messages from 2 nodes
try state.heard_from.put(allocator, 0, now - 500); // Recent
try state.heard_from.put(allocator, 1, now - 2000); // Stale
const coverage = state.computeCoverage(3, now);
// 1 out of 3 nodes heard from recently
try std.testing.expectApproxEqAbs(coverage, 0.333, 0.01);
}
test "GossipState: reference sampling" {
const allocator = std.testing.allocator;
var state = GossipState.init(allocator, .{ .ref_k = 2 });
defer state.deinit();
// Add some recent messages
try state.recent_messages.append(allocator, 100);
try state.recent_messages.append(allocator, 200);
try state.recent_messages.append(allocator, 300);
var prng = std.Random.DefaultPrng.init(42);
const refs = try state.sampleRefs(prng.random(), allocator);
defer allocator.free(refs);
try std.testing.expectEqual(refs.len, 2);
}
test "GossipMessage: ID computation" {
const id1 = GossipMessage.computeId(0, 1000, "hello");
const id2 = GossipMessage.computeId(0, 1000, "hello");
const id3 = GossipMessage.computeId(0, 1001, "hello");
try std.testing.expectEqual(id1, id2); // Same input, same ID
try std.testing.expect(id1 != id3); // Different entropy, different ID
}