59 lines
1.7 KiB
Nim
59 lines
1.7 KiB
Nim
# Rumpk Disruptor Ring Buffer
|
|
# Lock-free O(1) inter-fiber communication
|
|
|
|
import std/atomics
|
|
|
|
type
|
|
Cursor* = object
|
|
value*: Atomic[int64]
|
|
|
|
RingBuffer*[T; N: static[int]] = object
|
|
## Lock-free circular buffer for zero-copy I/O
|
|
## N must be power of 2 for masking
|
|
data*: array[N, T]
|
|
head*: Cursor ## Written by producer
|
|
tail*: Cursor ## Read by consumer
|
|
mask: int64 ## N - 1 for fast modulo
|
|
|
|
proc init*[T; N: static[int]](ring: var RingBuffer[T, N]) =
|
|
## Initialize the ring buffer
|
|
ring.head.value.store(0)
|
|
ring.tail.value.store(0)
|
|
ring.mask = N - 1
|
|
|
|
proc push*[T; N: static[int]](ring: var RingBuffer[T, N]; item: T): bool =
|
|
## Push item to ring. Returns false if full.
|
|
let head = ring.head.value.load(moRelaxed)
|
|
let tail = ring.tail.value.load(moAcquire)
|
|
|
|
if head - tail >= N:
|
|
return false # Full
|
|
|
|
ring.data[head and ring.mask] = item
|
|
ring.head.value.store(head + 1, moRelease)
|
|
return true
|
|
|
|
proc pop*[T; N: static[int]](ring: var RingBuffer[T, N]): tuple[ok: bool; item: T] =
|
|
## Pop item from ring. Returns (false, default) if empty.
|
|
let tail = ring.tail.value.load(moRelaxed)
|
|
let head = ring.head.value.load(moAcquire)
|
|
|
|
if tail >= head:
|
|
return (false, default(T)) # Empty
|
|
|
|
let item = ring.data[tail and ring.mask]
|
|
ring.tail.value.store(tail + 1, moRelease)
|
|
return (true, item)
|
|
|
|
proc len*[T; N: static[int]](ring: RingBuffer[T, N]): int =
|
|
## Current number of items in ring
|
|
let head = ring.head.value.load(moRelaxed)
|
|
let tail = ring.tail.value.load(moRelaxed)
|
|
return int(head - tail)
|
|
|
|
proc isFull*[T; N: static[int]](ring: RingBuffer[T, N]): bool =
|
|
ring.len >= N
|
|
|
|
proc isEmpty*[T; N: static[int]](ring: RingBuffer[T, N]): bool =
|
|
ring.len == 0
|