# Rumpk Disruptor Ring Buffer # Lock-free O(1) inter-fiber communication import std/atomics type Cursor* = object value*: Atomic[int64] RingBuffer*[T; N: static[int]] = object ## Lock-free circular buffer for zero-copy I/O ## N must be power of 2 for masking data*: array[N, T] head*: Cursor ## Written by producer tail*: Cursor ## Read by consumer mask: int64 ## N - 1 for fast modulo proc init*[T; N: static[int]](ring: var RingBuffer[T, N]) = ## Initialize the ring buffer ring.head.value.store(0) ring.tail.value.store(0) ring.mask = N - 1 proc push*[T; N: static[int]](ring: var RingBuffer[T, N]; item: T): bool = ## Push item to ring. Returns false if full. let head = ring.head.value.load(moRelaxed) let tail = ring.tail.value.load(moAcquire) if head - tail >= N: return false # Full ring.data[head and ring.mask] = item ring.head.value.store(head + 1, moRelease) return true proc pop*[T; N: static[int]](ring: var RingBuffer[T, N]): tuple[ok: bool; item: T] = ## Pop item from ring. Returns (false, default) if empty. let tail = ring.tail.value.load(moRelaxed) let head = ring.head.value.load(moAcquire) if tail >= head: return (false, default(T)) # Empty let item = ring.data[tail and ring.mask] ring.tail.value.store(tail + 1, moRelease) return (true, item) proc len*[T; N: static[int]](ring: RingBuffer[T, N]): int = ## Current number of items in ring let head = ring.head.value.load(moRelaxed) let tail = ring.tail.value.load(moRelaxed) return int(head - tail) proc isFull*[T; N: static[int]](ring: RingBuffer[T, N]): bool = ring.len >= N proc isEmpty*[T; N: static[int]](ring: RingBuffer[T, N]): bool = ring.len == 0