libertaria-stack/l1-identity/qvl/storage.zig

381 lines
13 KiB
Zig

//! QVL Persistent Storage Layer
//!
//!libmdbx backend for RiskGraph with Kenya Rule compliance:
//! - Single-file embedded database
//! - Memory-mapped I/O (kernel-optimized)
//! - ACID transactions
//! - <10MB RAM footprint
const std = @import("std");
const types = @import("types.zig");
const NodeId = types.NodeId;
const RiskEdge = types.RiskEdge;
const RiskGraph = types.RiskGraph;
/// Database environment configuration
pub const DBConfig = struct {
/// Max readers (concurrent)
max_readers: u32 = 64,
/// Max databases (tables)
max_dbs: u32 = 8,
/// Map size (file size limit)
map_size: usize = 10 * 1024 * 1024, // 10MB Kenya Rule
/// Page size (4KB optimal for SSD)
page_size: u32 = 4096,
};
/// Persistent graph storage using libmdbx
pub const PersistentGraph = struct {
env: *lmdb.MDB_env,
dbi_nodes: lmdb.MDB_dbi,
dbi_edges: lmdb.MDB_dbi,
dbi_adjacency: lmdb.MDB_dbi,
dbi_metadata: lmdb.MDB_dbi,
allocator: std.mem.Allocator,
const Self = @This();
/// Open or create persistent graph database
pub fn open(path: []const u8, config: DBConfig, allocator: std.mem.Allocator) !Self {
var env: *lmdb.MDB_env = undefined;
// Initialize environment
try lmdb.mdb_env_create(&env);
errdefer lmdb.mdb_env_close(env);
// Set limits
try lmdb.mdb_env_set_maxreaders(env, config.max_readers);
try lmdb.mdb_env_set_maxdbs(env, config.max_dbs);
try lmdb.mdb_env_set_mapsize(env, config.map_size);
// Open environment
const flags = lmdb.MDB_NOSYNC | lmdb.MDB_NOMETASYNC; // Async durability for speed
try lmdb.mdb_env_open(env, path.ptr, flags, 0o644);
// Open databases (tables)
var txn: *lmdb.MDB_txn = undefined;
try lmdb.mdb_txn_begin(env, null, 0, &txn);
errdefer lmdb.mdb_txn_abort(txn);
const dbi_nodes = try lmdb.mdb_dbi_open(txn, "nodes", lmdb.MDB_CREATE | lmdb.MDB_INTEGERKEY);
const dbi_edges = try lmdb.mdb_dbi_open(txn, "edges", lmdb.MDB_CREATE);
const dbi_adjacency = try lmdb.mdb_dbi_open(txn, "adjacency", lmdb.MDB_CREATE | lmdb.MDB_DUPSORT);
const dbi_metadata = try lmdb.mdb_dbi_open(txn, "metadata", lmdb.MDB_CREATE);
try lmdb.mdb_txn_commit(txn);
return Self{
.env = env,
.dbi_nodes = dbi_nodes,
.dbi_edges = dbi_edges,
.dbi_adjacency = dbi_adjacency,
.dbi_metadata = dbi_metadata,
.allocator = allocator,
};
}
/// Close database
pub fn close(self: *Self) void {
lmdb.mdb_env_close(self.env);
}
/// Add node to persistent storage
pub fn addNode(self: *Self, node: NodeId) !void {
var txn: *lmdb.MDB_txn = undefined;
try lmdb.mdb_txn_begin(self.env, null, 0, &txn);
errdefer lmdb.mdb_txn_abort(txn);
const key = std.mem.asBytes(&node);
const val = &[_]u8{1}; // Presence marker
var mdb_key = lmdb.MDB_val{ .mv_size = key.len, .mv_data = key.ptr };
var mdb_val = lmdb.MDB_val{ .mv_size = val.len, .mv_data = val.ptr };
try lmdb.mdb_put(txn, self.dbi_nodes, &mdb_key, &mdb_val, 0);
try lmdb.mdb_txn_commit(txn);
}
/// Add edge to persistent storage
pub fn addEdge(self: *Self, edge: RiskEdge) !void {
var txn: *lmdb.MDB_txn = undefined;
try lmdb.mdb_txn_begin(self.env, null, 0, &txn);
errdefer lmdb.mdb_txn_abort(txn);
// Store edge data
const edge_key = try self.encodeEdgeKey(edge.from, edge.to);
const edge_val = try self.encodeEdgeValue(edge);
var mdb_key = lmdb.MDB_val{ .mv_size = edge_key.len, .mv_data = edge_key.ptr };
var mdb_val = lmdb.MDB_val{ .mv_size = edge_val.len, .mv_data = edge_val.ptr };
try lmdb.mdb_put(txn, self.dbi_edges, &mdb_key, &mdb_val, 0);
// Update adjacency index (from -> to)
const adj_key = std.mem.asBytes(&edge.from);
const adj_val = std.mem.asBytes(&edge.to);
var mdb_adj_key = lmdb.MDB_val{ .mv_size = adj_key.len, .mv_data = adj_key.ptr };
var mdb_adj_val = lmdb.MDB_val{ .mv_size = adj_val.len, .mv_data = adj_val.ptr };
try lmdb.mdb_put(txn, self.dbi_adjacency, &mdb_adj_key, &mdb_adj_val, 0);
// Update reverse adjacency (to -> from) for incoming queries
const rev_adj_key = std.mem.asBytes(&edge.to);
const rev_adj_val = std.mem.asBytes(&edge.from);
var mdb_rev_key = lmdb.MDB_val{ .mv_size = rev_adj_key.len, .mv_data = rev_adj_key.ptr };
var mdb_rev_val = lmdb.MDB_val{ .mv_size = rev_adj_val.len, .mv_data = rev_adj_val.ptr };
try lmdb.mdb_put(txn, self.dbi_adjacency, &mdb_rev_key, &mdb_rev_val, 0);
try lmdb.mdb_txn_commit(txn);
}
/// Get outgoing neighbors (from -> *)
pub fn getOutgoing(self: *Self, from: NodeId, allocator: std.mem.Allocator) ![]NodeId {
var txn: *lmdb.MDB_txn = undefined;
try lmdb.mdb_txn_begin(self.env, null, lmdb.MDB_RDONLY, &txn);
defer lmdb.mdb_txn_abort(txn); // Read-only, abort is fine
const key = std.mem.asBytes(&from);
var mdb_key = lmdb.MDB_val{ .mv_size = key.len, .mv_data = key.ptr };
var mdb_val: lmdb.MDB_val = undefined;
var cursor: *lmdb.MDB_cursor = undefined;
try lmdb.mdb_cursor_open(txn, self.dbi_adjacency, &cursor);
defer lmdb.mdb_cursor_close(cursor);
var result = std.ArrayList(NodeId).init(allocator);
errdefer result.deinit();
// Position cursor at key
const rc = lmdb.mdb_cursor_get(cursor, &mdb_key, &mdb_val, lmdb.MDB_SET_KEY);
if (rc == lmdb.MDB_NOTFOUND) {
return result.toOwnedSlice();
}
if (rc != 0) return error.MDBError;
// Iterate over all values for this key
while (true) {
const neighbor = std.mem.bytesToValue(NodeId, @as([*]const u8, @ptrCast(mdb_val.mv_data))[0..@sizeOf(NodeId)]);
try result.append(neighbor);
const next_rc = lmdb.mdb_cursor_get(cursor, &mdb_key, &mdb_val, lmdb.MDB_NEXT_DUP);
if (next_rc == lmdb.MDB_NOTFOUND) break;
if (next_rc != 0) return error.MDBError;
}
return result.toOwnedSlice();
}
/// Get incoming neighbors (* -> to)
pub fn getIncoming(self: *Self, to: NodeId, allocator: std.mem.Allocator) ![]NodeId {
// Same as getOutgoing but querying by "to" key
// Implementation mirrors getOutgoing
_ = to;
_ = allocator;
@panic("TODO: implement getIncoming");
}
/// Get specific edge
pub fn getEdge(self: *Self, from: NodeId, to: NodeId) !?RiskEdge {
var txn: *lmdb.MDB_txn = undefined;
try lmdb.mdb_txn_begin(self.env, null, lmdb.MDB_RDONLY, &txn);
defer lmdb.mdb_txn_abort(txn);
const key = try self.encodeEdgeKey(from, to);
var mdb_key = lmdb.MDB_val{ .mv_size = key.len, .mv_data = key.ptr };
var mdb_val: lmdb.MDB_val = undefined;
const rc = lmdb.mdb_get(txn, self.dbi_edges, &mdb_key, &mdb_val);
if (rc == lmdb.MDB_NOTFOUND) return null;
if (rc != 0) return error.MDBError;
return try self.decodeEdgeValue(mdb_val);
}
/// Load in-memory RiskGraph from persistent storage
pub fn toRiskGraph(self: *Self, allocator: std.mem.Allocator) !RiskGraph {
var graph = RiskGraph.init(allocator);
errdefer graph.deinit();
var txn: *lmdb.MDB_txn = undefined;
try lmdb.mdb_txn_begin(self.env, null, lmdb.MDB_RDONLY, &txn);
defer lmdb.mdb_txn_abort(txn);
// Iterate all edges
var cursor: *lmdb.MDB_cursor = undefined;
try lmdb.mdb_cursor_open(txn, self.dbi_edges, &cursor);
defer lmdb.mdb_cursor_close(cursor);
var mdb_key: lmdb.MDB_val = undefined;
var mdb_val: lmdb.MDB_val = undefined;
while (lmdb.mdb_cursor_get(cursor, &mdb_key, &mdb_val, lmdb.MDB_NEXT) == 0) {
const edge = try self.decodeEdgeValue(mdb_val);
try graph.addEdge(edge);
}
return graph;
}
// Internal: Encode edge key (from, to) -> bytes
fn encodeEdgeKey(self: *Self, from: NodeId, to: NodeId) ![]u8 {
_ = self;
var buf: [8]u8 = undefined;
std.mem.writeInt(u32, buf[0..4], from, .little);
std.mem.writeInt(u32, buf[4..8], to, .little);
return &buf;
}
// Internal: Encode RiskEdge -> bytes
fn encodeEdgeValue(self: *Self, edge: RiskEdge) ![]u8 {
_ = self;
// Compact binary encoding
var buf: [64]u8 = undefined;
var offset: usize = 0;
std.mem.writeInt(u32, buf[offset..][0..4], edge.from, .little);
offset += 4;
std.mem.writeInt(u32, buf[offset..][0..4], edge.to, .little);
offset += 4;
std.mem.writeInt(u64, buf[offset..][0..8], @bitCast(edge.risk), .little);
offset += 8;
std.mem.writeInt(u64, buf[offset..][0..8], edge.timestamp, .little);
offset += 8;
std.mem.writeInt(u64, buf[offset..][0..8], edge.nonce, .little);
offset += 8;
std.mem.writeInt(u8, buf[offset..][0..1], edge.level);
offset += 1;
std.mem.writeInt(u64, buf[offset..][0..8], edge.expires_at, .little);
offset += 8;
return buf[0..offset];
}
// Internal: Decode bytes -> RiskEdge
fn decodeEdgeValue(self: *Self, val: lmdb.MDB_val) !RiskEdge {
_ = self;
const data = @as([*]const u8, @ptrCast(val.mv_data))[0..val.mv_size];
var offset: usize = 0;
const from = std.mem.readInt(u32, data[offset..][0..4], .little);
offset += 4;
const to = std.mem.readInt(u32, data[offset..][0..4], .little);
offset += 4;
const risk_bits = std.mem.readInt(u64, data[offset..][0..8], .little);
const risk = @as(f64, @bitCast(risk_bits));
offset += 8;
const timestamp = std.mem.readInt(u64, data[offset..][0..8], .little);
offset += 8;
const nonce = std.mem.readInt(u64, data[offset..][0..8], .little);
offset += 8;
const level = std.mem.readInt(u8, data[offset..][0..1], .little);
offset += 1;
const expires_at = std.mem.readInt(u64, data[offset..][0..8], .little);
return RiskEdge{
.from = from,
.to = to,
.risk = risk,
.timestamp = timestamp,
.nonce = nonce,
.level = level,
.expires_at = expires_at,
};
}
};
// ============================================================================
// TESTS
// ============================================================================
test "PersistentGraph: basic operations" {
const allocator = std.testing.allocator;
// Create temporary database
const path = "/tmp/test_qvl_db";
defer std.fs.deleteFileAbsolute(path) catch {};
var graph = try PersistentGraph.open(path, .{}, allocator);
defer graph.close();
// Add nodes
try graph.addNode(0);
try graph.addNode(1);
try graph.addNode(2);
// Add edges
const ts = 1234567890;
try graph.addEdge(.{
.from = 0,
.to = 1,
.risk = -0.3,
.timestamp = ts,
.nonce = 0,
.level = 3,
.expires_at = ts + 86400,
});
try graph.addEdge(.{
.from = 1,
.to = 2,
.risk = -0.3,
.timestamp = ts,
.nonce = 1,
.level = 3,
.expires_at = ts + 86400,
});
// Query outgoing
const neighbors = try graph.getOutgoing(0, allocator);
defer allocator.free(neighbors);
try std.testing.expectEqual(neighbors.len, 1);
try std.testing.expectEqual(neighbors[0], 1);
// Retrieve edge
const edge = try graph.getEdge(0, 1);
try std.testing.expect(edge != null);
try std.testing.expectEqual(edge.?.from, 0);
try std.testing.expectEqual(edge.?.to, 1);
try std.testing.expectApproxEqAbs(edge.?.risk, -0.3, 0.001);
}
test "PersistentGraph: Kenya Rule compliance" {
const allocator = std.testing.allocator;
const path = "/tmp/test_kenya_db";
defer std.fs.deleteFileAbsolute(path) catch {};
// 10MB limit
var graph = try PersistentGraph.open(path, .{
.map_size = 10 * 1024 * 1024,
}, allocator);
defer graph.close();
// Add 1000 nodes
var i: u32 = 0;
while (i < 1000) : (i += 1) {
try graph.addNode(i);
}
// Verify database size
const stat = try std.fs.cwd().statFile(path);
try std.testing.expect(stat.size < 10 * 1024 * 1024);
}