//! RFC-0020: OPQ (Offline Packet Queue) - Segmented WAL Storage //! //! This module implements the "Mechanism" of the OPQ: //! A resilient, segmented Write-Ahead Log (WAL) for persisting LWF frames. //! //! Segmented Architecture: //! - Data is split into fixed-size segments (e.g., 4MB). //! - Only one "Active" segment is writable at a time. //! - Completed segments are "Finalized" and become immutable. //! - Pruning works by deleting entire segment files (extremely fast). const std = @import("std"); const lwf = @import("lwf"); pub const SEGMENT_MAGIC: [4]u8 = "LOPQ".*; pub const SEGMENT_VERSION: u8 = 1; pub const DEFAULT_SEGMENT_SIZE: usize = 4 * 1024 * 1024; // 4MB pub const SegmentHeader = struct { magic: [4]u8 = SEGMENT_MAGIC, version: u8 = SEGMENT_VERSION, reserved: [3]u8 = [_]u8{0} ** 3, segment_id: u64, segment_seq: u32, created_at: i64, pub const SIZE = 4 + 1 + 3 + 8 + 4 + 8; // 28 bytes }; pub const WALStore = struct { allocator: std.mem.Allocator, base_dir_path: []const u8, max_segment_size: usize, active_segment: ?std.fs.File = null, active_segment_id: u64 = 0, active_segment_seq: u32 = 0, current_offset: usize = 0, pub fn init(allocator: std.mem.Allocator, base_dir: []const u8, max_size: usize) !WALStore { // Ensure base directory exists std.fs.cwd().makePath(base_dir) catch |err| { if (err != error.PathAlreadyExists) return err; }; return WALStore{ .allocator = allocator, .base_dir_path = try allocator.dupe(u8, base_dir), .max_segment_size = max_size, }; } pub fn deinit(self: *WALStore) void { if (self.active_segment) |file| { file.close(); } self.allocator.free(self.base_dir_path); } /// Append a frame to the active segment pub fn appendFrame(self: *WALStore, frame: *const lwf.LWFFrame) !void { const frame_size = frame.header.payload_len + lwf.LWFHeader.SIZE + lwf.LWFTrailer.SIZE; // Check if we need a new segment if (self.active_segment == null or self.current_offset + frame_size > self.max_segment_size) { try self.rotateSegment(); } const file = self.active_segment.?; const encoded = try frame.encode(self.allocator); defer self.allocator.free(encoded); try file.writeAll(encoded); self.current_offset += encoded.len; } fn rotateSegment(self: *WALStore) !void { if (self.active_segment) |file| { file.close(); self.active_segment = null; } self.active_segment_id = @as(u64, @intCast(std.time.timestamp())); self.active_segment_seq += 1; var name_buf: [64]u8 = undefined; const name = try std.fmt.bufPrint(&name_buf, "segment_{d}_{d}.opq", .{ self.active_segment_id, self.active_segment_seq }); var dir = try std.fs.cwd().openDir(self.base_dir_path, .{}); defer dir.close(); const file = try dir.createFile(name, .{ .read = true }); // Write Header const header = SegmentHeader{ .segment_id = self.active_segment_id, .segment_seq = self.active_segment_seq, .created_at = std.time.timestamp(), }; const header_bytes = std.mem.asBytes(&header); try file.writeAll(header_bytes); self.active_segment = file; self.current_offset = SegmentHeader.SIZE; } /// Prune segments older than TTL pub fn prune(self: *WALStore, max_age_seconds: i64) !usize { var dir = try std.fs.cwd().openDir(self.base_dir_path, .{ .iterate = true }); defer dir.close(); var iterator = dir.iterate(); const now = std.time.timestamp(); var pruned_count: usize = 0; while (try iterator.next()) |entry| { if (entry.kind != .file) continue; if (!std.mem.endsWith(u8, entry.name, ".opq")) continue; // Extract potential timestamp/ID from segment_{id}.opq // For simplicity, we read the header's created_at const file = try dir.openFile(entry.name, .{}); defer file.close(); var header: SegmentHeader = undefined; const bytes_read = try file.readAll(std.mem.asBytes(&header)); if (bytes_read < SegmentHeader.SIZE) continue; if (now - header.created_at > max_age_seconds) { // Check if it's the active one if (header.segment_id == self.active_segment_id and header.segment_seq == self.active_segment_seq) continue; try dir.deleteFile(entry.name); pruned_count += 1; } } return pruned_count; } /// Calculate total disk usage of all .opq files in base_dir pub fn getDiskUsage(self: *WALStore) !u64 { var dir = try std.fs.cwd().openDir(self.base_dir_path, .{ .iterate = true }); defer dir.close(); var iterator = dir.iterate(); var total_size: u64 = 0; while (try iterator.next()) |entry| { if (entry.kind != .file) continue; if (!std.mem.endsWith(u8, entry.name, ".opq")) continue; const stat = try dir.statFile(entry.name); total_size += stat.size; } return total_size; } /// Prune oldest segments until total usage is below target_bytes pub fn pruneToSize(self: *WALStore, target_bytes: u64) !usize { var dir = try std.fs.cwd().openDir(self.base_dir_path, .{ .iterate = true }); defer dir.close(); // 1. Collect all segment files with their timestamps const SegmentFile = struct { name: [64]u8, len: usize, created_at: i64, }; var segments = std.ArrayList(SegmentFile).empty; defer segments.deinit(self.allocator); var iterator = dir.iterate(); var total_size: u64 = 0; while (try iterator.next()) |entry| { if (entry.kind != .file) continue; if (!std.mem.endsWith(u8, entry.name, ".opq")) continue; const file = try dir.openFile(entry.name, .{}); var header: SegmentHeader = undefined; const bytes_read = file.readAll(std.mem.asBytes(&header)) catch 0; file.close(); if (bytes_read < SegmentHeader.SIZE) continue; const stat = try dir.statFile(entry.name); total_size += stat.size; var name_buf: [64]u8 = undefined; @memcpy(name_buf[0..entry.name.len], entry.name); try segments.append(self.allocator, .{ .name = name_buf, .len = entry.name.len, .created_at = header.created_at, }); } if (total_size <= target_bytes) return 0; // 2. Sort by created_at (oldest first) const sortFn = struct { fn lessThan(_: void, a: SegmentFile, b: SegmentFile) bool { return a.created_at < b.created_at; } }.lessThan; std.sort.pdq(SegmentFile, segments.items, {}, sortFn); // 3. Delete oldest segments until under quota var pruned_count: usize = 0; for (segments.items) |seg| { if (total_size <= target_bytes) break; const name = seg.name[0..seg.len]; // Safety: check if it's the active one (we need segment metadata here ideally) // For now, we compare against our active_segment_id/seq logic if match // But if we use the header we already read, we can check. const file = try dir.openFile(name, .{}); var header: SegmentHeader = undefined; _ = try file.readAll(std.mem.asBytes(&header)); file.close(); if (header.segment_id == self.active_segment_id and header.segment_seq == self.active_segment_seq) continue; const stat = try dir.statFile(name); try dir.deleteFile(name); total_size -= stat.size; pruned_count += 1; } return pruned_count; } }; test "OPQ WAL Store: Append and Rotate" { const allocator = std.testing.allocator; const test_dir = "test_opq_wal"; // Clean up if previous run failed std.fs.cwd().deleteTree(test_dir) catch {}; defer std.fs.cwd().deleteTree(test_dir) catch {}; var wal = try WALStore.init(allocator, test_dir, 1024); // Small size for rotation defer wal.deinit(); // 1. Create a frame var frame = try lwf.LWFFrame.init(allocator, 100); defer frame.deinit(allocator); @memset(frame.payload, 'A'); frame.header.payload_len = 100; frame.updateChecksum(); // 2. Append multiple frames to trigger rotation // Frame size is approx 100 + 72 + 36 = 208 bytes // 1024 / 208 ≈ 4 frames per segment (plus header) var i: usize = 0; while (i < 10) : (i += 1) { try wal.appendFrame(&frame); } // 3. Verify files created var dir = try std.fs.cwd().openDir(test_dir, .{ .iterate = true }); defer dir.close(); var iterator = dir.iterate(); var file_count: usize = 0; while (try iterator.next()) |entry| { if (std.mem.endsWith(u8, entry.name, ".opq")) { file_count += 1; } } try std.testing.expect(file_count > 1); } test "OPQ WAL Store: Pruning" { const allocator = std.testing.allocator; const test_dir = "test_opq_pruning"; std.fs.cwd().deleteTree(test_dir) catch {}; defer std.fs.cwd().deleteTree(test_dir) catch {}; var wal = try WALStore.init(allocator, test_dir, 1024 * 1024); defer wal.deinit(); var frame = try lwf.LWFFrame.init(allocator, 10); defer frame.deinit(allocator); try wal.appendFrame(&frame); // Manually finalize and wait 2 seconds (for test purposes we could mock time, // but here we'll just test the logic with a very small TTL) // Wait... we can't easily wait. Let's just verify the function doesn't crash // and correctly identifies old segments if we had them. const pruned = try wal.prune(0); // Prune everything except active try std.testing.expect(pruned == 0); // Active shouldn't be pruned } test "OPQ WAL Store: Space-based Pruning" { const allocator = std.testing.allocator; const test_dir = "test_opq_quota"; std.fs.cwd().deleteTree(test_dir) catch {}; defer std.fs.cwd().deleteTree(test_dir) catch {}; var wal = try WALStore.init(allocator, test_dir, 500); // Very small segments defer wal.deinit(); var frame = try lwf.LWFFrame.init(allocator, 100); defer frame.deinit(allocator); @memset(frame.payload, 'B'); frame.header.payload_len = 100; frame.updateChecksum(); // Append 4 frames (should create multiple segments) var i: usize = 0; while (i < 4) : (i += 1) { try wal.appendFrame(&frame); } const usage_before = try wal.getDiskUsage(); try std.testing.expect(usage_before > 0); // Prune to a small size (should keep only active segment) const pruned = try wal.pruneToSize(100); try std.testing.expect(pruned > 0); const usage_after = try wal.getDiskUsage(); try std.testing.expect(usage_after < usage_before); }