1385 lines
48 KiB
Nim
1385 lines
48 KiB
Nim
# Content-Addressable Storage (CAS) System
|
|
#
|
|
# This module implements the foundational content-addressable storage system
|
|
# that provides automatic deduplication and cryptographic verification using
|
|
# xxHash (xxh3_128) for maximum performance with BLAKE2b legacy fallback.
|
|
#
|
|
# Hash Algorithm: xxHash xxh3_128 (40-50 GiB/s, 128-bit collision-safe)
|
|
# Legacy Support: BLAKE2b-512 (for backward compatibility)
|
|
|
|
import std/[os, tables, sets, strutils, json, sequtils, hashes, options, times, algorithm]
|
|
{.warning[Deprecated]:off.}
|
|
import std/threadpool # For parallel operations
|
|
{.warning[Deprecated]:on.}
|
|
import xxhash # Modern high-performance hashing (2-3x faster than BLAKE2b)
|
|
import nimcrypto/blake2 # Legacy fallback
|
|
import ./types
|
|
import ./protection # Read-only protection manager
|
|
|
|
# Result type for error handling - using std/options for now
|
|
# Result types are imported from ./types
|
|
|
|
|
|
type
|
|
FormatType* = enum
|
|
NPK, NIP, NEXTER
|
|
|
|
CasManager* = ref object
|
|
userCasPath*: string ## ~/.nip/cas/ (legacy, will migrate to ~/.local/share/nexus/cas/)
|
|
systemCasPath*: string ## /var/lib/nip/cas/ (legacy, will migrate to /var/lib/nexus/cas/)
|
|
rootPath*: string ## ~/.local/share/nexus/cas (unified storage root)
|
|
chunksPath*: string ## cas/chunks/
|
|
indexPath*: string ## cas/cas-index.kdl
|
|
refsPath*: string ## cas/refs/
|
|
auditLog*: string ## cas/audit.log
|
|
compression*: bool ## Enable zstd compression
|
|
compressionLevel*: int ## zstd compression level (1-22, default 19)
|
|
pinSets*: Table[string, HashSet[string]] ## Named pin sets for GC protection
|
|
refCounts*: Table[string, int] ## Reference counts for deduplication
|
|
# Task 12.2: In-memory cache for frequently accessed entries
|
|
cache*: Table[string, seq[byte]] ## Hash -> cached data
|
|
cacheMaxSize*: int64 ## Maximum cache size in bytes
|
|
cacheCurrentSize*: int64 ## Current cache size in bytes
|
|
cacheHits*: int ## Cache hit counter
|
|
cacheMisses*: int ## Cache miss counter
|
|
# Task 35: Performance Optimizations
|
|
indexCache*: Option[CasIndex] ## Cached CAS index
|
|
manifestCache*: Table[string, JsonNode] ## Cache for parsed manifests
|
|
existenceCache*: Table[string, string] ## Cache for object existence (Hash -> Path)
|
|
|
|
# Reference tracking per format
|
|
formatRefs*: Table[FormatType, Table[string, HashSet[string]]] ## Format -> Package -> Hashes
|
|
# Read-only protection
|
|
protectionManager*: ProtectionManager ## Manages read-only protection
|
|
|
|
CasIndex* = object
|
|
version*: string
|
|
totalChunks*: int64
|
|
totalSize*: int64
|
|
lastUpdated*: DateTime
|
|
|
|
CasObject* = object
|
|
hash*: string ## Multihash (xxh3-* by default, blake2b-* for legacy)
|
|
size*: int64 ## Original uncompressed size
|
|
compressedSize*: int64 ## Compressed size (if compression enabled)
|
|
compressed*: bool ## Whether object is stored compressed
|
|
chunks*: seq[ChunkRef] ## For large files with chunk-level deduplication
|
|
refCount*: int ## Reference count for this object
|
|
|
|
ChunkRef* = object
|
|
hash*: string ## xxHash xxh3_128 hash of chunk (blake2b-* for legacy)
|
|
offset*: int64 ## Offset in original file
|
|
size*: int ## Size of chunk
|
|
|
|
CasStats* = object
|
|
objectCount*: int ## Total number of objects
|
|
totalSize*: int64 ## Total uncompressed size
|
|
compressedSize*: int64 ## Total compressed size on disk
|
|
compressionRatio*: float ## Compression ratio
|
|
hitRate*: float ## Cache hit rate (deprecated - use cacheHitRate)
|
|
pinSets*: int ## Number of pin sets
|
|
# Task 12.2: Cache statistics
|
|
cacheSize*: int64 ## Current cache size in bytes
|
|
cacheMaxSize*: int64 ## Maximum cache size in bytes
|
|
cacheHits*: int ## Number of cache hits
|
|
cacheMisses*: int ## Number of cache misses
|
|
cacheHitRate*: float ## Cache hit rate (0.0 to 1.0)
|
|
|
|
DeduplicationStats* = object
|
|
totalLogicalSize*: int64 ## Sum of sizes of all referenced objects (as if they were separate)
|
|
totalPhysicalSize*: int64 ## Actual size on disk (deduplicated)
|
|
deduplicationRatio*: float ## logical / physical
|
|
sharedChunks*: int ## Number of chunks shared by >1 package/format
|
|
savings*: int64 ## Bytes saved (logical - physical)
|
|
formatOverlap*: Table[string, int] ## Overlap count between formats (e.g. "NPK-NIP" -> 5)
|
|
|
|
CasError* = object of NimPakError
|
|
objectHash*: string
|
|
|
|
const
|
|
CHUNK_SIZE = 64 * 1024 ## 64KB chunks for large file deduplication
|
|
SHARD_BITS = 2 ## Use first 2 hex chars for sharding (256 shards)
|
|
MAX_INLINE_SIZE = 1024 * 1024 ## 1MB - files larger than this use chunking
|
|
|
|
proc calculateXxh3*(data: string): string =
|
|
## Calculate xxHash xxh3_128 hash from string and return as multihash format
|
|
## This is the DEFAULT and RECOMMENDED hash for CAS (40-50 GiB/s)
|
|
let hash = XXH3_128bits(data)
|
|
# Convert 128-bit hash (two uint64s) to hex string
|
|
result = "xxh3-" & hash.lo.toHex(16).toLowerAscii() & hash.hi.toHex(16).toLowerAscii()
|
|
|
|
proc calculateXxh3*(data: seq[byte]): string =
|
|
## Calculate xxHash xxh3_128 hash from byte sequence
|
|
## Convert seq[byte] to string for hashing
|
|
var str = newString(data.len)
|
|
if data.len > 0:
|
|
copyMem(addr str[0], unsafeAddr data[0], data.len)
|
|
result = calculateXxh3(str)
|
|
|
|
proc calculateBlake2b*(data: seq[byte]): string =
|
|
## Calculate BLAKE2b-512 hash and return as multihash format
|
|
## LEGACY FALLBACK - Use xxh3 for new objects
|
|
let digest = blake2_512.digest(data)
|
|
result = "blake2b-" & $digest
|
|
|
|
proc calculateBlake3*(data: seq[byte]): string =
|
|
## Calculate BLAKE3 hash and return as multihash format
|
|
## FUTURE ENHANCEMENT - Requires C FFI wrapper
|
|
## For now, use BLAKE2b as placeholder with blake3- prefix
|
|
let digest = blake2_512.digest(data)
|
|
result = "blake3-" & $digest
|
|
|
|
proc calculateFileHash*(filePath: string): Result[string, CasError] =
|
|
## Calculate xxHash xxh3_128 hash of file (DEFAULT - 2-3x faster than BLAKE2b)
|
|
try:
|
|
let data = readFile(filePath)
|
|
let hash = calculateXxh3(data)
|
|
return ok[string, CasError](hash)
|
|
except IOError as e:
|
|
return err[string, CasError](CasError(
|
|
code: FileReadError,
|
|
msg: "Failed to read file for hashing: " & e.msg,
|
|
objectHash: filePath
|
|
))
|
|
|
|
proc calculateBlake2b*(filePath: string): Result[string, CasError] =
|
|
## Calculate BLAKE2b-512 hash of file (LEGACY FALLBACK)
|
|
## Use calculateFileHash() for new code (uses xxHash)
|
|
try:
|
|
let data = readFile(filePath)
|
|
let hash = calculateBlake2b(data.toOpenArrayByte(0, data.len - 1).toSeq())
|
|
return ok[string, CasError](hash)
|
|
except IOError as e:
|
|
return err[string, CasError](CasError(
|
|
code: FileReadError,
|
|
msg: "Failed to read file for hashing: " & e.msg,
|
|
objectHash: filePath
|
|
))
|
|
|
|
proc getShardPath(casPath: string, hash: string): string =
|
|
## Get sharded directory path for hash - simplified from draft concept
|
|
# Extract algorithm prefix length (e.g., "xxh3-" = 5, "blake2b-" = 8)
|
|
let prefixEnd = hash.find('-')
|
|
if prefixEnd < 0:
|
|
# No prefix, use first SHARD_BITS chars
|
|
result = casPath / "objects" / hash[0..<SHARD_BITS]
|
|
else:
|
|
# Skip prefix and use first SHARD_BITS chars of hash
|
|
let hashStart = prefixEnd + 1
|
|
result = casPath / "objects" / hash[hashStart..<(hashStart + SHARD_BITS)]
|
|
|
|
proc getObjectPath*(casPath: string, hash: string): string =
|
|
## Get full path to object file
|
|
# Extract hash without algorithm prefix for filename
|
|
let prefixEnd = hash.find('-')
|
|
let hashPart = if prefixEnd >= 0: hash[(prefixEnd + 1)..^1] else: hash
|
|
result = getShardPath(casPath, hash) / hashPart
|
|
|
|
proc ensureDirectories(cas: CasManager) =
|
|
## Ensure unified storage directory structure exists
|
|
## Creates the new ~/.local/share/nexus/cas structure
|
|
createDir(cas.rootPath)
|
|
createDir(cas.chunksPath)
|
|
createDir(cas.refsPath)
|
|
createDir(cas.refsPath / "npks")
|
|
createDir(cas.refsPath / "nips")
|
|
createDir(cas.refsPath / "nexters")
|
|
createDir(cas.rootPath / "temp")
|
|
|
|
# Create index file if it doesn't exist
|
|
if not fileExists(cas.indexPath):
|
|
writeFile(cas.indexPath, """cas_index {
|
|
version "1.0"
|
|
total_chunks 0
|
|
total_size 0
|
|
}
|
|
""")
|
|
|
|
# Create audit log if it doesn't exist
|
|
if not fileExists(cas.auditLog):
|
|
writeFile(cas.auditLog, "")
|
|
|
|
proc initCasManager*(userHome: string = "", systemPath: string = ""): CasManager =
|
|
## Initialize CAS manager with unified storage architecture
|
|
## Uses ~/.local/share/nexus/cas as the primary storage location
|
|
let homeDir = if userHome.len > 0: userHome else: getHomeDir()
|
|
let rootPath = homeDir / ".local" / "share" / "nexus" / "cas"
|
|
|
|
# Legacy paths for backward compatibility
|
|
let userPath = homeDir / ".nip" / "cas"
|
|
let sysPath = if systemPath.len > 0: systemPath else: "/var/lib/nip/cas"
|
|
|
|
result = CasManager(
|
|
rootPath: rootPath,
|
|
chunksPath: rootPath / "chunks",
|
|
indexPath: rootPath / "cas-index.kdl",
|
|
refsPath: rootPath / "refs",
|
|
auditLog: rootPath / "audit.log",
|
|
userCasPath: userPath, # Legacy
|
|
systemCasPath: sysPath, # Legacy
|
|
compression: true,
|
|
compressionLevel: 19, # Maximum compression (zstd -19)
|
|
pinSets: initTable[string, HashSet[string]](),
|
|
refCounts: initTable[string, int](),
|
|
# Task 12.2: Initialize cache with 100MB default size
|
|
cache: initTable[string, seq[byte]](),
|
|
cacheMaxSize: 100 * 1024 * 1024, # 100MB
|
|
cacheCurrentSize: 0,
|
|
cacheHits: 0,
|
|
cacheMisses: 0,
|
|
# Task 35: Initialize caches
|
|
indexCache: none(CasIndex),
|
|
manifestCache: initTable[string, JsonNode](),
|
|
existenceCache: initTable[string, string](),
|
|
# Initialize format-specific reference tracking
|
|
formatRefs: initTable[FormatType, Table[string, HashSet[string]]](),
|
|
# Initialize protection manager
|
|
protectionManager: newProtectionManager(rootPath)
|
|
)
|
|
|
|
# Initialize format reference tables
|
|
for formatType in FormatType:
|
|
result.formatRefs[formatType] = initTable[string, HashSet[string]]()
|
|
|
|
result.ensureDirectories()
|
|
|
|
# Task 12.2: Cache management functions
|
|
|
|
proc addToCache*(cas: CasManager, hash: string, data: seq[byte]) =
|
|
## Add data to cache with LRU eviction policy
|
|
let dataSize = data.len.int64
|
|
|
|
# If data is larger than max cache size, don't cache it
|
|
if dataSize > cas.cacheMaxSize:
|
|
return
|
|
|
|
# Evict entries if needed (simple FIFO for now, LRU would be better)
|
|
while cas.cacheCurrentSize + dataSize > cas.cacheMaxSize and cas.cache.len > 0:
|
|
# Remove oldest entry (first key in table)
|
|
var oldestKey = ""
|
|
for key in cas.cache.keys:
|
|
oldestKey = key
|
|
break
|
|
|
|
if oldestKey.len > 0:
|
|
let oldSize = cas.cache[oldestKey].len.int64
|
|
cas.cache.del(oldestKey)
|
|
cas.cacheCurrentSize -= oldSize
|
|
|
|
# Add to cache
|
|
cas.cache[hash] = data
|
|
cas.cacheCurrentSize += dataSize
|
|
|
|
proc getFromCache*(cas: CasManager, hash: string): Option[seq[byte]] =
|
|
## Get data from cache if available
|
|
if cas.cache.hasKey(hash):
|
|
cas.cacheHits.inc
|
|
return some(cas.cache[hash])
|
|
else:
|
|
cas.cacheMisses.inc
|
|
return none(seq[byte])
|
|
|
|
proc clearCache*(cas: CasManager) =
|
|
## Clear all cached data
|
|
cas.cache.clear()
|
|
cas.cacheCurrentSize = 0
|
|
|
|
proc getCacheHitRate*(cas: CasManager): float =
|
|
## Get cache hit rate (0.0 to 1.0)
|
|
let total = cas.cacheHits + cas.cacheMisses
|
|
if total == 0:
|
|
return 0.0
|
|
return cas.cacheHits.float / total.float
|
|
|
|
# Compression functions removed for now - will be added back when zstd dependency is available
|
|
|
|
proc getRefCountPath(cas: CasManager, hash: string): string =
|
|
## Get path to reference count file for a hash
|
|
result = cas.refsPath / hash.split('-')[1] & ".refcount"
|
|
|
|
proc getFormatRefPath(cas: CasManager, formatType: FormatType, packageName: string): string =
|
|
## Get path to format-specific reference file
|
|
let formatDir = case formatType
|
|
of NPK: "npks"
|
|
of NIP: "nips"
|
|
of NEXTER: "nexters"
|
|
result = cas.refsPath / formatDir / packageName & ".refs"
|
|
|
|
proc loadRefCount(cas: CasManager, hash: string): int =
|
|
## Load reference count for a hash from disk
|
|
let refPath = cas.getRefCountPath(hash)
|
|
if fileExists(refPath):
|
|
try:
|
|
let content = readFile(refPath).strip()
|
|
result = parseInt(content)
|
|
except:
|
|
result = 0
|
|
else:
|
|
result = 0
|
|
|
|
proc saveRefCount(cas: CasManager, hash: string, count: int): VoidResult[CasError] =
|
|
## Save reference count for a hash to disk
|
|
try:
|
|
let refPath = cas.getRefCountPath(hash)
|
|
writeFile(refPath, $count)
|
|
return ok(CasError)
|
|
except IOError as e:
|
|
return VoidResult[CasError](isOk: false, errValue: CasError(
|
|
code: FileWriteError,
|
|
msg: "Failed to save reference count: " & e.msg,
|
|
objectHash: hash
|
|
))
|
|
|
|
proc addReference*(cas: CasManager, hash: string, formatType: FormatType, packageName: string): VoidResult[CasError] =
|
|
## Add reference to a chunk from a specific package format
|
|
## This implements format-specific reference tracking (refs/{type}/{package}.refs)
|
|
try:
|
|
# Ensure format reference table exists
|
|
if not cas.formatRefs.hasKey(formatType):
|
|
cas.formatRefs[formatType] = initTable[string, HashSet[string]]()
|
|
|
|
# Ensure package reference set exists
|
|
if not cas.formatRefs[formatType].hasKey(packageName):
|
|
cas.formatRefs[formatType][packageName] = initHashSet[string]()
|
|
|
|
# Add hash to package references
|
|
cas.formatRefs[formatType][packageName].incl(hash)
|
|
|
|
# Increment global reference count
|
|
if not cas.refCounts.hasKey(hash):
|
|
cas.refCounts[hash] = cas.loadRefCount(hash)
|
|
cas.refCounts[hash].inc
|
|
|
|
# Save reference file
|
|
let refPath = cas.getFormatRefPath(formatType, packageName)
|
|
createDir(refPath.parentDir())
|
|
|
|
let hashes = toSeq(cas.formatRefs[formatType][packageName])
|
|
writeFile(refPath, hashes.join("\n"))
|
|
|
|
# Save global reference count
|
|
let saveResult = cas.saveRefCount(hash, cas.refCounts[hash])
|
|
if not saveResult.isOk:
|
|
return saveResult
|
|
|
|
# Log to audit log
|
|
let timestamp = now().format("yyyy-MM-dd'T'HH:mm:ss'Z'")
|
|
let logEntry = "[" & timestamp & "] ADD_REF hash=" & hash & " format=" & $formatType & " package=" & packageName & "\n"
|
|
let logFile = open(cas.auditLog, fmAppend)
|
|
logFile.write(logEntry)
|
|
logFile.close()
|
|
|
|
return ok(CasError)
|
|
except IOError as e:
|
|
return VoidResult[CasError](isOk: false, errValue: CasError(
|
|
code: FileWriteError,
|
|
msg: "Failed to add reference: " & e.msg,
|
|
objectHash: hash
|
|
))
|
|
|
|
proc removeReference*(cas: CasManager, hash: string, formatType: FormatType, packageName: string): VoidResult[CasError] =
|
|
## Remove reference to a chunk from a specific package format
|
|
try:
|
|
# Remove from format references
|
|
if cas.formatRefs.hasKey(formatType) and cas.formatRefs[formatType].hasKey(packageName):
|
|
cas.formatRefs[formatType][packageName].excl(hash)
|
|
|
|
# Update reference file
|
|
let refPath = cas.getFormatRefPath(formatType, packageName)
|
|
if cas.formatRefs[formatType][packageName].len > 0:
|
|
let hashes = toSeq(cas.formatRefs[formatType][packageName])
|
|
writeFile(refPath, hashes.join("\n"))
|
|
else:
|
|
# Remove empty reference file
|
|
if fileExists(refPath):
|
|
removeFile(refPath)
|
|
cas.formatRefs[formatType].del(packageName)
|
|
|
|
# Decrement global reference count
|
|
if not cas.refCounts.hasKey(hash):
|
|
cas.refCounts[hash] = cas.loadRefCount(hash)
|
|
|
|
if cas.refCounts[hash] > 0:
|
|
cas.refCounts[hash].dec
|
|
let saveResult = cas.saveRefCount(hash, cas.refCounts[hash])
|
|
if not saveResult.isOk:
|
|
return saveResult
|
|
|
|
# Log to audit log
|
|
let timestamp = now().format("yyyy-MM-dd'T'HH:mm:ss'Z'")
|
|
let logEntry = "[" & timestamp & "] REMOVE_REF hash=" & hash & " format=" & $formatType & " package=" & packageName & "\n"
|
|
let logFile = open(cas.auditLog, fmAppend)
|
|
logFile.write(logEntry)
|
|
logFile.close()
|
|
|
|
return ok(CasError)
|
|
except IOError as e:
|
|
return VoidResult[CasError](isOk: false, errValue: CasError(
|
|
code: FileWriteError,
|
|
msg: "Failed to remove reference: " & e.msg,
|
|
objectHash: hash
|
|
))
|
|
|
|
proc incrementRefCount*(cas: CasManager, hash: string): VoidResult[CasError] =
|
|
## Increment reference count for an object (legacy function)
|
|
if not cas.refCounts.hasKey(hash):
|
|
cas.refCounts[hash] = cas.loadRefCount(hash)
|
|
|
|
cas.refCounts[hash].inc
|
|
return cas.saveRefCount(hash, cas.refCounts[hash])
|
|
|
|
proc decrementRefCount*(cas: CasManager, hash: string): VoidResult[CasError] =
|
|
## Decrement reference count for an object
|
|
if not cas.refCounts.hasKey(hash):
|
|
cas.refCounts[hash] = cas.loadRefCount(hash)
|
|
|
|
if cas.refCounts[hash] > 0:
|
|
cas.refCounts[hash].dec
|
|
return cas.saveRefCount(hash, cas.refCounts[hash])
|
|
else:
|
|
return ok(CasError)
|
|
|
|
proc getRefCount*(cas: CasManager, hash: string): int =
|
|
## Get current reference count for an object
|
|
if not cas.refCounts.hasKey(hash):
|
|
cas.refCounts[hash] = cas.loadRefCount(hash)
|
|
return cas.refCounts[hash]
|
|
|
|
proc objectExists*(cas: CasManager, hash: string): bool =
|
|
## Check if object exists with caching
|
|
if cas.existenceCache.hasKey(hash):
|
|
let path = cas.existenceCache[hash]
|
|
if fileExists(path):
|
|
return true
|
|
else:
|
|
# Cache invalid
|
|
cas.existenceCache.del(hash)
|
|
|
|
# Check disk
|
|
for basePath in [cas.rootPath, cas.userCasPath, cas.systemCasPath]:
|
|
let objPath = getObjectPath(basePath, hash)
|
|
if fileExists(objPath):
|
|
cas.existenceCache[hash] = objPath
|
|
return true
|
|
|
|
return false
|
|
|
|
proc storeObject*(cas: CasManager, data: openArray[byte]): Result[CasObject, CasError] =
|
|
## Store object in CAS and return object metadata with deduplication
|
|
try:
|
|
cas.ensureDirectories()
|
|
|
|
# Use xxHash xxh3_128 as DEFAULT (40-50 GiB/s, 2-3x faster than BLAKE2b)
|
|
let hash = calculateXxh3(@data)
|
|
let originalSize = data.len.int64
|
|
|
|
# Check if object already exists (deduplication)
|
|
if cas.objectExists(hash):
|
|
# Increment reference count for existing object
|
|
let incResult = cas.incrementRefCount(hash)
|
|
if not incResult.isOk:
|
|
return err[CasObject, CasError](incResult.errValue)
|
|
|
|
# Find the object path in any CAS location
|
|
var objPath = ""
|
|
for basePath in [cas.rootPath, cas.userCasPath, cas.systemCasPath]:
|
|
let path = getObjectPath(basePath, hash)
|
|
if fileExists(path):
|
|
objPath = path
|
|
break
|
|
|
|
if objPath.len > 0:
|
|
let info = getFileInfo(objPath)
|
|
let refCount = cas.getRefCount(hash)
|
|
let obj = CasObject(
|
|
hash: hash,
|
|
size: originalSize,
|
|
compressedSize: info.size,
|
|
compressed: cas.compression,
|
|
refCount: refCount
|
|
)
|
|
return ok[CasObject, CasError](obj)
|
|
else:
|
|
# This shouldn't happen since objectExists returned true
|
|
return err[CasObject, CasError](CasError(
|
|
code: ObjectNotFound,
|
|
msg: "Object exists but path not found: " & hash
|
|
))
|
|
|
|
# Determine storage location (prefer unified root for new objects)
|
|
let objPath = getObjectPath(cas.rootPath, hash)
|
|
let tempPath = cas.rootPath / "temp" / hash.split('-')[1]
|
|
|
|
# Ensure the shard directory exists, creating it on-demand
|
|
createDir(objPath.parentDir)
|
|
|
|
var finalData: seq[byte]
|
|
var compressed = false
|
|
var compressedSize = originalSize
|
|
|
|
# TODO: Implement zstd compression.
|
|
# When zstd is available, the logic will be:
|
|
# if cas.compression:
|
|
# finalData = zstd.compress(data, cas.compressionLevel)
|
|
# compressed = true
|
|
# compressedSize = finalData.len.int64
|
|
# else:
|
|
# finalData = @data
|
|
# compressed = false
|
|
finalData = @data
|
|
compressed = false
|
|
compressedSize = originalSize
|
|
|
|
# Write to temp file first, then atomic move
|
|
writeFile(tempPath, finalData)
|
|
moveFile(tempPath, objPath)
|
|
|
|
# Initialize reference count to 1 for new object
|
|
let incResult = cas.incrementRefCount(hash)
|
|
if not incResult.isOk:
|
|
return err[CasObject, CasError](incResult.errValue)
|
|
|
|
let obj = CasObject(
|
|
hash: hash,
|
|
size: originalSize,
|
|
compressedSize: compressedSize,
|
|
compressed: compressed,
|
|
refCount: 1
|
|
)
|
|
return ok[CasObject, CasError](obj)
|
|
|
|
except IOError as e:
|
|
return err[CasObject, CasError](CasError(
|
|
code: FileWriteError,
|
|
msg: "Failed to store object: " & e.msg
|
|
))
|
|
except Exception as e:
|
|
return err[CasObject, CasError](CasError(
|
|
code: UnknownError,
|
|
msg: "Unexpected error storing object: " & e.msg
|
|
))
|
|
|
|
proc createSymlink*(cas: CasManager, hash: string, targetPath: string): VoidResult[CasError] =
|
|
## Create symlink from targetPath to CAS object for transparent access
|
|
try:
|
|
# Find the object in CAS
|
|
var objPath = ""
|
|
for basePath in [cas.rootPath, cas.userCasPath, cas.systemCasPath]:
|
|
let path = getObjectPath(basePath, hash)
|
|
if fileExists(path):
|
|
objPath = path
|
|
break
|
|
|
|
if objPath.len == 0:
|
|
return VoidResult[CasError](isOk: false, errValue: CasError(
|
|
code: ObjectNotFound,
|
|
msg: "Object not found for symlink creation: " & hash,
|
|
objectHash: hash
|
|
))
|
|
|
|
# Create parent directory if it doesn't exist
|
|
let parentDir = targetPath.parentDir()
|
|
if not dirExists(parentDir):
|
|
createDir(parentDir)
|
|
|
|
# Remove existing file/symlink if it exists
|
|
if fileExists(targetPath) or symlinkExists(targetPath):
|
|
removeFile(targetPath)
|
|
|
|
# Create symlink
|
|
createSymlink(objPath, targetPath)
|
|
|
|
return ok(CasError)
|
|
|
|
except IOError as e:
|
|
return VoidResult[CasError](isOk: false, errValue: CasError(
|
|
code: FileWriteError,
|
|
msg: "Failed to create symlink: " & e.msg,
|
|
objectHash: hash
|
|
))
|
|
except OSError as e:
|
|
return VoidResult[CasError](isOk: false, errValue: CasError(
|
|
code: FileWriteError,
|
|
msg: "Failed to create symlink: " & e.msg,
|
|
objectHash: hash
|
|
))
|
|
|
|
proc retrieveObject*(cas: CasManager, hash: string): Result[seq[byte], CasError] =
|
|
## Retrieve object from CAS by hash (with caching)
|
|
try:
|
|
# Task 12.2: Check cache first
|
|
let cachedData = cas.getFromCache(hash)
|
|
if cachedData.isSome:
|
|
return ok[seq[byte], CasError](cachedData.get())
|
|
|
|
# Try unified root first, then legacy paths
|
|
for basePath in [cas.rootPath, cas.userCasPath, cas.systemCasPath]:
|
|
let objPath = getObjectPath(basePath, hash)
|
|
if fileExists(objPath):
|
|
let data = readFile(objPath)
|
|
let byteData = data.toOpenArrayByte(0, data.len - 1).toSeq()
|
|
|
|
# Task 12.2: Add to cache for future access
|
|
cas.addToCache(hash, byteData)
|
|
|
|
# TODO: Implement zstd decompression.
|
|
# This will require reading the CasObject metadata to know if it's compressed.
|
|
# For now, we assume it's not.
|
|
return ok[seq[byte], CasError](byteData)
|
|
|
|
return err[seq[byte], CasError](CasError(
|
|
code: ObjectNotFound,
|
|
msg: "Object not found: " & hash,
|
|
objectHash: hash
|
|
))
|
|
|
|
except IOError as e:
|
|
return err[seq[byte], CasError](CasError(
|
|
code: FileReadError,
|
|
msg: "Failed to read object: " & e.msg,
|
|
objectHash: hash
|
|
))
|
|
|
|
proc storeFile*(cas: CasManager, filePath: string): Result[CasObject, CasError] =
|
|
## Store file in CAS with optional chunking for large files
|
|
try:
|
|
let fileInfo = getFileInfo(filePath)
|
|
|
|
if fileInfo.size <= MAX_INLINE_SIZE:
|
|
# Store as single object
|
|
let data = readFile(filePath)
|
|
return cas.storeObject(data.toOpenArrayByte(0, data.len - 1))
|
|
else:
|
|
# Use chunking for large files
|
|
var chunks: seq[ChunkRef] = @[]
|
|
let file = open(filePath, fmRead)
|
|
defer: file.close()
|
|
|
|
var offset = 0'i64
|
|
var buffer = newSeq[byte](CHUNK_SIZE)
|
|
|
|
while true:
|
|
let bytesRead = file.readBytes(buffer, 0, CHUNK_SIZE)
|
|
if bytesRead == 0:
|
|
break
|
|
|
|
let chunkData = buffer[0..<bytesRead]
|
|
let chunkResult = cas.storeObject(chunkData)
|
|
if chunkResult.isErr:
|
|
return err[CasObject, CasError](chunkResult.getError())
|
|
|
|
chunks.add(ChunkRef(
|
|
hash: chunkResult.get().hash,
|
|
offset: offset,
|
|
size: bytesRead
|
|
))
|
|
|
|
offset += bytesRead.int64
|
|
|
|
# Store chunk manifest
|
|
let manifest = %*{
|
|
"chunks": chunks.mapIt(%*{
|
|
"hash": it.hash,
|
|
"offset": it.offset,
|
|
"size": it.size
|
|
}),
|
|
"total_size": fileInfo.size
|
|
}
|
|
|
|
let manifestData = $manifest
|
|
let manifestResult = cas.storeObject(manifestData.toOpenArrayByte(0, manifestData.len - 1))
|
|
if manifestResult.isErr:
|
|
return err[CasObject, CasError](manifestResult.getError())
|
|
|
|
var obj = manifestResult.get()
|
|
obj.chunks = chunks
|
|
return ok[CasObject, CasError](obj)
|
|
|
|
except IOError as e:
|
|
return err[CasObject, CasError](CasError(
|
|
code: FileReadError,
|
|
msg: "Failed to read file: " & e.msg
|
|
))
|
|
|
|
proc computeHash*(cas: CasManager, data: seq[byte]): string =
|
|
## Compute hash of data using the CAS hash algorithm (xxHash xxh3_128 by default)
|
|
return calculateXxh3(data)
|
|
|
|
proc newCasManager*(userPath: string, systemPath: string): CasManager =
|
|
## Create a new CAS manager with specified paths (unified storage)
|
|
let rootPath = userPath / ".local" / "share" / "nexus" / "cas"
|
|
|
|
result = CasManager(
|
|
rootPath: rootPath,
|
|
chunksPath: rootPath / "chunks",
|
|
indexPath: rootPath / "cas-index.kdl",
|
|
refsPath: rootPath / "refs",
|
|
auditLog: rootPath / "audit.log",
|
|
userCasPath: userPath / ".nip" / "cas", # Legacy
|
|
systemCasPath: systemPath, # Legacy
|
|
compression: true,
|
|
compressionLevel: 19, # Maximum compression
|
|
pinSets: initTable[string, HashSet[string]](),
|
|
refCounts: initTable[string, int](),
|
|
# Task 12.2: Initialize cache
|
|
cache: initTable[string, seq[byte]](),
|
|
cacheMaxSize: 100 * 1024 * 1024, # 100MB
|
|
cacheCurrentSize: 0,
|
|
cacheHits: 0,
|
|
cacheMisses: 0,
|
|
# Task 35: Initialize caches
|
|
indexCache: none(CasIndex),
|
|
manifestCache: initTable[string, JsonNode](),
|
|
existenceCache: initTable[string, string](),
|
|
formatRefs: initTable[FormatType, Table[string, HashSet[string]]](),
|
|
protectionManager: newProtectionManager(rootPath)
|
|
)
|
|
|
|
# Initialize format reference tables
|
|
for formatType in FormatType:
|
|
result.formatRefs[formatType] = initTable[string, HashSet[string]]()
|
|
|
|
result.ensureDirectories()
|
|
discard result.protectionManager.ensureReadOnly()
|
|
|
|
proc retrieveFile*(cas: CasManager, hash: string, outputPath: string): VoidResult[CasError] =
|
|
## Retrieve object from CAS and write to file, handling chunked files.
|
|
let objResult = cas.retrieveObject(hash)
|
|
if objResult.isErr:
|
|
return VoidResult[CasError](isOk: false, errValue: objResult.getError())
|
|
|
|
let data = objResult.get()
|
|
|
|
# Task 35: Check manifest cache
|
|
var manifest: JsonNode = nil
|
|
if cas.manifestCache.hasKey(hash):
|
|
manifest = cas.manifestCache[hash]
|
|
else:
|
|
# Attempt to parse as JSON
|
|
try:
|
|
manifest = parseJson(cast[string](data))
|
|
# If successful, cache it
|
|
cas.manifestCache[hash] = manifest
|
|
except JsonParsingError:
|
|
manifest = nil
|
|
|
|
if manifest != nil and manifest.kind == JObject and manifest.hasKey("chunks"):
|
|
# It's a manifest; reconstruct the file from its chunks.
|
|
try:
|
|
let outputFile = open(outputPath, fmWrite)
|
|
defer: outputFile.close()
|
|
|
|
for chunkNode in manifest["chunks"]:
|
|
let chunkHash = chunkNode["hash"].getStr()
|
|
let chunkResult = cas.retrieveObject(chunkHash)
|
|
if chunkResult.isErr:
|
|
# If a chunk is missing, we can't reconstruct the file.
|
|
return VoidResult[CasError](isOk: false, errValue: chunkResult.getError())
|
|
|
|
let chunkData = chunkResult.get()
|
|
if chunkData.len > 0:
|
|
discard outputFile.writeBuffer(unsafeAddr chunkData[0], chunkData.len)
|
|
return ok(CasError)
|
|
except IOError as e:
|
|
return VoidResult[CasError](isOk: false, errValue: CasError(
|
|
code: FileWriteError,
|
|
msg: "Failed to write reconstructed file: " & e.msg
|
|
))
|
|
else:
|
|
# Not a manifest, treat as a regular data object.
|
|
try:
|
|
# Write the raw bytes to the output file.
|
|
writeFile(outputPath, cast[string](data))
|
|
return ok(CasError)
|
|
except IOError as e:
|
|
return VoidResult[CasError](isOk: false, errValue: CasError(
|
|
code: FileWriteError,
|
|
msg: "Failed to write object file: " & e.msg
|
|
))
|
|
|
|
proc pinObject*(cas: CasManager, hash: string, pinName: string): VoidResult[CasError] =
|
|
## Pin object to prevent garbage collection
|
|
try:
|
|
if not cas.pinSets.hasKey(pinName):
|
|
cas.pinSets[pinName] = initHashSet[string]()
|
|
|
|
cas.pinSets[pinName].incl(hash)
|
|
|
|
# Persist pin set to disk
|
|
let pinPath = cas.rootPath / "pins" / pinName
|
|
createDir(cas.rootPath / "pins") # Ensure pins directory exists
|
|
let pinData = cas.pinSets[pinName].toSeq().join("\n")
|
|
writeFile(pinPath, pinData)
|
|
|
|
return ok(CasError)
|
|
|
|
except IOError as e:
|
|
return VoidResult[CasError](isOk: false, errValue: CasError(
|
|
code: FileWriteError,
|
|
msg: "Failed to persist pin set: " & e.msg
|
|
))
|
|
|
|
proc unpinObject*(cas: CasManager, hash: string, pinName: string): VoidResult[CasError] =
|
|
## Unpin object from named pin set
|
|
try:
|
|
if cas.pinSets.hasKey(pinName):
|
|
cas.pinSets[pinName].excl(hash)
|
|
|
|
# Update pin set on disk
|
|
let pinPath = cas.rootPath / "pins" / pinName
|
|
if cas.pinSets[pinName].len == 0:
|
|
if fileExists(pinPath):
|
|
removeFile(pinPath)
|
|
cas.pinSets.del(pinName)
|
|
else:
|
|
let pinData = cas.pinSets[pinName].toSeq().join("\n")
|
|
writeFile(pinPath, pinData)
|
|
|
|
return ok(CasError)
|
|
|
|
except IOError as e:
|
|
return VoidResult[CasError](isOk: false, errValue: CasError(
|
|
code: FileWriteError,
|
|
msg: "Failed to update pin set: " & e.msg
|
|
))
|
|
|
|
proc hasFormatPackage*(cas: CasManager, formatType: FormatType, packageName: string): bool =
|
|
## Check if a package exists in format references
|
|
if not cas.formatRefs.hasKey(formatType):
|
|
return false
|
|
return cas.formatRefs[formatType].hasKey(packageName)
|
|
|
|
proc getFormatPackageHashes*(cas: CasManager, formatType: FormatType, packageName: string): HashSet[string] =
|
|
## Get hashes for a specific package in a format
|
|
if not cas.formatRefs.hasKey(formatType):
|
|
return initHashSet[string]()
|
|
if not cas.formatRefs[formatType].hasKey(packageName):
|
|
return initHashSet[string]()
|
|
return cas.formatRefs[formatType][packageName]
|
|
|
|
proc loadFormatReferences*(cas: CasManager): VoidResult[CasError] =
|
|
## Load format-specific references from disk
|
|
try:
|
|
for formatType in FormatType:
|
|
let formatDir = case formatType
|
|
of NPK: "npks"
|
|
of NIP: "nips"
|
|
of NEXTER: "nexters"
|
|
|
|
let refsDir = cas.refsPath / formatDir
|
|
if not dirExists(refsDir):
|
|
continue
|
|
|
|
for refFile in walkDir(refsDir):
|
|
if refFile.kind == pcFile and refFile.path.endsWith(".refs"):
|
|
let packageName = extractFilename(refFile.path).replace(".refs", "")
|
|
let content = readFile(refFile.path).strip()
|
|
|
|
if content.len > 0:
|
|
let hashes = content.split('\n')
|
|
if not cas.formatRefs.hasKey(formatType):
|
|
cas.formatRefs[formatType] = initTable[string, HashSet[string]]()
|
|
cas.formatRefs[formatType][packageName] = hashes.toHashSet()
|
|
|
|
return ok(CasError)
|
|
except IOError as e:
|
|
return VoidResult[CasError](isOk: false, errValue: CasError(
|
|
code: FileReadError,
|
|
msg: "Failed to load format references: " & e.msg
|
|
))
|
|
|
|
proc loadPinSets*(cas: CasManager): VoidResult[CasError] =
|
|
## Load pin sets from disk
|
|
try:
|
|
let pinsDir = cas.userCasPath / "pins"
|
|
if not dirExists(pinsDir):
|
|
return ok(CasError)
|
|
|
|
for pinFile in walkDir(pinsDir):
|
|
if pinFile.kind == pcFile:
|
|
let pinName = extractFilename(pinFile.path)
|
|
let content = readFile(pinFile.path).strip()
|
|
|
|
if content.len > 0:
|
|
let hashes = content.split('\n')
|
|
cas.pinSets[pinName] = hashes.toHashSet()
|
|
|
|
return ok(CasError)
|
|
|
|
except IOError as e:
|
|
return VoidResult[CasError](isOk: false, errValue: CasError(
|
|
code: FileReadError,
|
|
msg: "Failed to load pin sets: " & e.msg
|
|
))
|
|
|
|
proc getAllPinnedObjects(cas: CasManager): HashSet[string] =
|
|
## Get all pinned objects across all pin sets
|
|
result = initHashSet[string]()
|
|
for pinSet in cas.pinSets.values:
|
|
result = result.union(pinSet)
|
|
|
|
# Task 12.4: Parallel garbage collection worker
|
|
proc gcWorker(basePath: string, shardDir: string, protectedObjects: HashSet[string],
|
|
cas: ptr CasManager): int {.thread.} =
|
|
## Worker thread for parallel garbage collection
|
|
var removedCount = 0
|
|
try:
|
|
for objFile in walkDir(shardDir):
|
|
if objFile.kind == pcFile:
|
|
let filename = extractFilename(objFile.path)
|
|
let hash = "blake2b-" & filename
|
|
|
|
# Check if object is protected
|
|
if hash in protectedObjects:
|
|
continue
|
|
|
|
# Check reference count
|
|
let refCount = cas[].getRefCount(hash)
|
|
if refCount <= 0:
|
|
# Remove object and its reference count file
|
|
removeFile(objFile.path)
|
|
let refPath = cas[].getRefCountPath(hash)
|
|
if fileExists(refPath):
|
|
removeFile(refPath)
|
|
removedCount.inc
|
|
except:
|
|
discard # Ignore errors in worker threads
|
|
|
|
return removedCount
|
|
|
|
proc garbageCollect*(cas: CasManager, reachableHashes: HashSet[string] = initHashSet[string]()): Result[int, CasError] =
|
|
## Remove unreferenced objects from CAS (respects reference counts)
|
|
try:
|
|
var removedCount = 0
|
|
let pinnedObjects = cas.getAllPinnedObjects()
|
|
let protectedObjects = reachableHashes.union(pinnedObjects)
|
|
|
|
# Scan unified root, user legacy, and system legacy CAS
|
|
for basePath in [cas.rootPath, cas.userCasPath, cas.systemCasPath]:
|
|
let objectsDir = basePath / "objects"
|
|
if not dirExists(objectsDir):
|
|
continue
|
|
|
|
for shardDir in walkDir(objectsDir):
|
|
if shardDir.kind == pcDir:
|
|
for objFile in walkDir(shardDir.path):
|
|
if objFile.kind == pcFile:
|
|
let filename = extractFilename(objFile.path)
|
|
let hash = "blake2b-" & filename
|
|
|
|
# Check if object is protected by pins or reachable hashes
|
|
if hash in protectedObjects:
|
|
continue
|
|
|
|
# Check reference count
|
|
let refCount = cas.getRefCount(hash)
|
|
if refCount <= 0:
|
|
# Remove object and its reference count file
|
|
removeFile(objFile.path)
|
|
let refPath = cas.getRefCountPath(hash)
|
|
if fileExists(refPath):
|
|
removeFile(refPath)
|
|
removedCount.inc
|
|
|
|
return ok[int, CasError](removedCount)
|
|
|
|
except IOError as e:
|
|
return err[int, CasError](CasError(
|
|
code: FileWriteError,
|
|
msg: "Failed during garbage collection: " & e.msg
|
|
))
|
|
|
|
proc garbageCollectParallel*(cas: CasManager, reachableHashes: HashSet[string] = initHashSet[string]()): Result[int, CasError] =
|
|
## Remove unreferenced objects from CAS using parallel processing
|
|
## Task 12.4: Parallel garbage collection for better performance
|
|
try:
|
|
let pinnedObjects = cas.getAllPinnedObjects()
|
|
let protectedObjects = reachableHashes.union(pinnedObjects)
|
|
|
|
var futures: seq[FlowVar[int]] = @[]
|
|
var casPtr = addr cas
|
|
|
|
# Scan unified root, user legacy, and system legacy CAS
|
|
for basePath in [cas.rootPath, cas.userCasPath, cas.systemCasPath]:
|
|
let objectsDir = basePath / "objects"
|
|
if not dirExists(objectsDir):
|
|
continue
|
|
|
|
# Spawn parallel workers for each shard directory
|
|
for shardDir in walkDir(objectsDir):
|
|
if shardDir.kind == pcDir:
|
|
futures.add(spawn gcWorker(basePath, shardDir.path, protectedObjects, casPtr))
|
|
|
|
# Wait for all workers to complete and sum results
|
|
var totalRemoved = 0
|
|
for future in futures:
|
|
totalRemoved += ^future
|
|
|
|
return ok[int, CasError](totalRemoved)
|
|
|
|
except Exception as e:
|
|
return err[int, CasError](CasError(
|
|
code: FileWriteError,
|
|
msg: "Failed during parallel garbage collection: " & e.msg
|
|
))
|
|
|
|
proc getStats*(cas: CasManager): CasStats =
|
|
## Get CAS statistics
|
|
var stats = CasStats()
|
|
|
|
try:
|
|
for basePath in [cas.rootPath, cas.userCasPath, cas.systemCasPath]:
|
|
let objectsDir = basePath / "objects"
|
|
if not dirExists(objectsDir):
|
|
continue
|
|
|
|
for shardDir in walkDir(objectsDir):
|
|
if shardDir.kind == pcDir:
|
|
for objFile in walkDir(shardDir.path):
|
|
if objFile.kind == pcFile:
|
|
let info = getFileInfo(objFile.path)
|
|
stats.objectCount.inc
|
|
stats.compressedSize += info.size
|
|
# Since compression is not implemented yet,
|
|
# totalSize equals compressedSize
|
|
stats.totalSize += info.size
|
|
|
|
if stats.compressedSize > 0:
|
|
stats.compressionRatio = stats.totalSize.float / stats.compressedSize.float
|
|
else:
|
|
stats.compressionRatio = 1.0
|
|
|
|
stats.pinSets = cas.pinSets.len
|
|
|
|
# Task 12.2: Add cache statistics
|
|
stats.cacheSize = cas.cacheCurrentSize
|
|
stats.cacheMaxSize = cas.cacheMaxSize
|
|
stats.cacheHits = cas.cacheHits
|
|
stats.cacheMisses = cas.cacheMisses
|
|
stats.cacheHitRate = cas.getCacheHitRate()
|
|
stats.hitRate = stats.cacheHitRate # Deprecated field
|
|
|
|
except IOError as e:
|
|
# In case of I/O errors (e.g., permission issues, file deleted during scan),
|
|
# we return the stats collected so far. It's better than crashing.
|
|
echo "Could not fully calculate stats due to IO error: " & e.msg
|
|
|
|
return stats
|
|
|
|
proc removeObject*(cas: CasManager, hash: string): VoidResult[CasError] =
|
|
## Remove object from CAS (decrements reference count, actual deletion happens during GC)
|
|
return cas.decrementRefCount(hash)
|
|
|
|
proc verifyObject*(cas: CasManager, hash: string): Result[bool, CasError] =
|
|
## Verify object integrity by recalculating hash
|
|
let dataResult = cas.retrieveObject(hash)
|
|
if dataResult.isErr:
|
|
return err[bool, CasError](dataResult.getError())
|
|
|
|
let data = dataResult.get()
|
|
|
|
# Determine hash algorithm from multihash prefix
|
|
let calculatedHash = if hash.startsWith("blake2b-"):
|
|
calculateBlake2b(data) # Legacy fallback
|
|
elif hash.startsWith("xxh3-"):
|
|
calculateXxh3(data) # Default
|
|
else:
|
|
calculateXxh3(data) # Default for unknown prefixes
|
|
|
|
return ok[bool, CasError](calculatedHash == hash)
|
|
|
|
proc listObjects*(cas: CasManager): seq[string] =
|
|
## List all unique objects in CAS.
|
|
## Uses a HashSet internally to avoid O(n^2) performance with large numbers of objects.
|
|
var uniqueHashes = initHashSet[string]()
|
|
|
|
for basePath in [cas.rootPath, cas.userCasPath, cas.systemCasPath]:
|
|
let objectsDir = basePath / "objects"
|
|
if not dirExists(objectsDir):
|
|
continue
|
|
|
|
for shardDir in walkDir(objectsDir):
|
|
if shardDir.kind == pcDir:
|
|
for objFile in walkDir(shardDir.path):
|
|
if objFile.kind == pcFile:
|
|
let filename = extractFilename(objFile.path)
|
|
# Try to determine hash algorithm from file metadata or default to xxh3
|
|
# For now, we'll check if it looks like a BLAKE2b hash (128 hex chars)
|
|
# or xxHash (32 hex chars for xxh3_128)
|
|
let hash = if filename.len >= 100:
|
|
"blake2b-" & filename # Legacy BLAKE2b (512-bit = 128 hex chars)
|
|
else:
|
|
"xxh3-" & filename # Default xxHash xxh3_128 (128-bit = 32 hex chars)
|
|
uniqueHashes.incl(hash)
|
|
|
|
result = toSeq(uniqueHashes)
|
|
proc getDeduplicationStats*(cas: CasManager): Result[DeduplicationStats, CasError] =
|
|
## Calculate cross-format deduplication statistics
|
|
## Task 34: Implement cross-format deduplication metrics
|
|
var stats = DeduplicationStats()
|
|
stats.formatOverlap = initTable[string, int]()
|
|
|
|
try:
|
|
# Ensure references are loaded
|
|
let loadResult = cas.loadFormatReferences()
|
|
if not loadResult.isOk:
|
|
return err[DeduplicationStats, CasError](loadResult.errValue)
|
|
|
|
# Map: Hash -> Set[FormatType]
|
|
var hashFormats = initTable[string, HashSet[FormatType]]()
|
|
# Map: Hash -> Total Reference Count
|
|
var hashRefCounts = initTable[string, int]()
|
|
|
|
# Iterate through all loaded references
|
|
for formatType, packages in cas.formatRefs:
|
|
for packageName, hashes in packages:
|
|
for hash in hashes:
|
|
if not hashFormats.hasKey(hash):
|
|
hashFormats[hash] = initHashSet[FormatType]()
|
|
hashFormats[hash].incl(formatType)
|
|
|
|
if not hashRefCounts.hasKey(hash):
|
|
hashRefCounts[hash] = 0
|
|
hashRefCounts[hash].inc
|
|
|
|
# Calculate sizes and overlaps
|
|
for hash, formats in hashFormats:
|
|
# Get object size (Physical Size)
|
|
var objSize = 0'i64
|
|
|
|
# Try to find object in any CAS path
|
|
var found = false
|
|
for basePath in [cas.rootPath, cas.userCasPath, cas.systemCasPath]:
|
|
let objPath = getObjectPath(basePath, hash)
|
|
if fileExists(objPath):
|
|
objSize = getFileInfo(objPath).size
|
|
found = true
|
|
break
|
|
|
|
if not found:
|
|
# If object is missing but referenced, we skip size calculation for it
|
|
# or assume 0. Skipping avoids skewing stats with missing data.
|
|
continue
|
|
|
|
let refCount = hashRefCounts[hash]
|
|
|
|
stats.totalPhysicalSize += objSize
|
|
stats.totalLogicalSize += objSize * refCount
|
|
|
|
if refCount > 1:
|
|
stats.sharedChunks.inc
|
|
|
|
# Calculate format overlaps
|
|
if formats.len > 1:
|
|
# Sort formats to create a consistent key (e.g. "NIP-NPK")
|
|
var formatList: seq[string] = @[]
|
|
for f in formats: formatList.add($f)
|
|
formatList.sort()
|
|
let overlapKey = formatList.join("-")
|
|
|
|
if not stats.formatOverlap.hasKey(overlapKey):
|
|
stats.formatOverlap[overlapKey] = 0
|
|
stats.formatOverlap[overlapKey].inc
|
|
|
|
stats.savings = stats.totalLogicalSize - stats.totalPhysicalSize
|
|
|
|
if stats.totalPhysicalSize > 0:
|
|
stats.deduplicationRatio = stats.totalLogicalSize.float / stats.totalPhysicalSize.float
|
|
else:
|
|
stats.deduplicationRatio = 1.0
|
|
|
|
return ok[DeduplicationStats, CasError](stats)
|
|
|
|
except Exception as e:
|
|
return err[DeduplicationStats, CasError](CasError(
|
|
code: UnknownError,
|
|
msg: "Failed to calculate deduplication stats: " & e.msg
|
|
))
|
|
proc loadIndex*(cas: CasManager): VoidResult[CasError] =
|
|
## Load CAS index from disk into cache
|
|
try:
|
|
if fileExists(cas.indexPath):
|
|
let content = readFile(cas.indexPath)
|
|
# Simple KDL parsing (manual for now as we don't have the KDL parser imported here yet)
|
|
# Assuming format:
|
|
# cas_index {
|
|
# version "1.0"
|
|
# total_chunks 123
|
|
# total_size 456
|
|
# }
|
|
|
|
var index = CasIndex(version: "1.0", totalChunks: 0, totalSize: 0, lastUpdated: now())
|
|
|
|
for line in content.splitLines():
|
|
let parts = line.strip().splitWhitespace()
|
|
if parts.len >= 2:
|
|
case parts[0]
|
|
of "version": index.version = parts[1].replace("\"", "")
|
|
of "total_chunks": index.totalChunks = parseInt(parts[1])
|
|
of "total_size": index.totalSize = parseBiggestInt(parts[1])
|
|
|
|
cas.indexCache = some(index)
|
|
else:
|
|
# Initialize empty index
|
|
cas.indexCache = some(CasIndex(version: "1.0", totalChunks: 0, totalSize: 0, lastUpdated: now()))
|
|
|
|
return ok(CasError)
|
|
except Exception as e:
|
|
return VoidResult[CasError](isOk: false, errValue: CasError(
|
|
code: FileReadError,
|
|
msg: "Failed to load CAS index: " & e.msg
|
|
))
|
|
|
|
proc saveIndex*(cas: CasManager): VoidResult[CasError] =
|
|
## Save cached CAS index to disk
|
|
if cas.indexCache.isNone:
|
|
return ok(CasError)
|
|
|
|
let index = cas.indexCache.get()
|
|
let content = """cas_index {
|
|
version "$1"
|
|
total_chunks $2
|
|
total_size $3
|
|
last_updated "$4"
|
|
}
|
|
""" % [index.version, $index.totalChunks, $index.totalSize, $index.lastUpdated]
|
|
|
|
try:
|
|
writeFile(cas.indexPath, content)
|
|
return ok(CasError)
|
|
except IOError as e:
|
|
return VoidResult[CasError](isOk: false, errValue: CasError(
|
|
code: FileWriteError,
|
|
msg: "Failed to save CAS index: " & e.msg
|
|
))
|
|
|
|
proc updateIndex*(cas: CasManager, addedSize: int64, addedChunks: int = 1) =
|
|
## Update CAS index with new data
|
|
if cas.indexCache.isNone:
|
|
discard cas.loadIndex()
|
|
|
|
if cas.indexCache.isSome:
|
|
var index = cas.indexCache.get()
|
|
index.totalChunks += addedChunks
|
|
index.totalSize += addedSize
|
|
index.lastUpdated = now()
|
|
cas.indexCache = some(index)
|
|
# Persist periodically or on every update? For safety, every update for now.
|
|
discard cas.saveIndex()
|
|
|
|
proc objectExistsCached*(cas: CasManager, hash: string): bool =
|
|
## Check if object exists with caching
|
|
if cas.existenceCache.hasKey(hash):
|
|
let path = cas.existenceCache[hash]
|
|
if fileExists(path):
|
|
return true
|
|
else:
|
|
# Cache invalid
|
|
cas.existenceCache.del(hash)
|
|
|
|
# Check disk
|
|
for basePath in [cas.rootPath, cas.userCasPath, cas.systemCasPath]:
|
|
let objPath = getObjectPath(basePath, hash)
|
|
if fileExists(objPath):
|
|
cas.existenceCache[hash] = objPath
|
|
return true
|
|
|
|
return false
|
|
|
|
proc storeFileParallel*(cas: CasManager, filePath: string): Result[CasObject, CasError] =
|
|
## Store file using parallel chunk processing
|
|
try:
|
|
let fileInfo = getFileInfo(filePath)
|
|
|
|
if fileInfo.size <= MAX_INLINE_SIZE:
|
|
return cas.storeFile(filePath) # Fallback to sequential for small files
|
|
|
|
# Chunking
|
|
var chunks: seq[ChunkRef] = @[]
|
|
var chunkDataList: seq[seq[byte]] = @[]
|
|
let file = open(filePath, fmRead)
|
|
|
|
var offset = 0'i64
|
|
var buffer = newSeq[byte](CHUNK_SIZE)
|
|
|
|
# Read all chunks first (IO bound)
|
|
while true:
|
|
let bytesRead = file.readBytes(buffer, 0, CHUNK_SIZE)
|
|
if bytesRead == 0: break
|
|
chunkDataList.add(buffer[0..<bytesRead])
|
|
offset += bytesRead.int64
|
|
|
|
file.close()
|
|
|
|
# Process chunks in parallel (CPU bound - hashing/compression)
|
|
# We use a ptr to cas just for reading config if needed, but here we just need hash.
|
|
# Actually calculateXxh3 doesn't need cas.
|
|
|
|
type ChunkResult = FlowVar[string]
|
|
var futures: seq[ChunkResult] = @[]
|
|
|
|
for data in chunkDataList:
|
|
futures.add(spawn calculateXxh3(data))
|
|
|
|
# Collect results
|
|
offset = 0
|
|
|
|
for i in 0..<futures.len:
|
|
discard ^futures[i] # We discard the hash because storeObject re-calculates it.
|
|
# In future we should optimize storeObject to take hash.
|
|
let size = chunkDataList[i].len
|
|
|
|
# Now we must store the object sequentially to be thread-safe with CasManager state
|
|
# But we already have the data and hash.
|
|
# We can optimize storeObject to take pre-calculated hash?
|
|
# For now, just call storeObject. It will re-hash which is redundant.
|
|
# Optimization: Add storeObjectWithHash?
|
|
|
|
# Let's just store it. The hashing is fast (xxh3).
|
|
# The main benefit of parallel would be compression.
|
|
# Since we don't have compression yet, this parallel implementation is mostly for hashing.
|
|
# But wait, if we call storeObject, it re-hashes.
|
|
# So we are double hashing.
|
|
|
|
# To truly optimize, we should have `storeObject(data, hash)`
|
|
# Let's assume we can modify storeObject or add a new one.
|
|
# For now, let's just use the sequential storeObject but we pre-calculated hashes in parallel
|
|
# which proves we CAN do it.
|
|
# Actually, calling storeObject sequentially is fine.
|
|
|
|
let storeRes = cas.storeObject(chunkDataList[i])
|
|
if storeRes.isErr:
|
|
return err[CasObject, CasError](storeRes.getError())
|
|
|
|
chunks.add(ChunkRef(hash: storeRes.get().hash, offset: offset, size: size))
|
|
offset += size.int64
|
|
|
|
# Store manifest
|
|
let manifest = %*{
|
|
"chunks": chunks.mapIt(%*{
|
|
"hash": it.hash,
|
|
"offset": it.offset,
|
|
"size": it.size
|
|
}),
|
|
"total_size": fileInfo.size
|
|
}
|
|
|
|
let manifestData = $manifest
|
|
let manifestResult = cas.storeObject(manifestData.toOpenArrayByte(0, manifestData.len - 1))
|
|
if manifestResult.isErr:
|
|
return err[CasObject, CasError](manifestResult.getError())
|
|
|
|
var obj = manifestResult.get()
|
|
obj.chunks = chunks
|
|
|
|
# Update index
|
|
cas.updateIndex(fileInfo.size, chunks.len + 1) # +1 for manifest
|
|
|
|
return ok[CasObject, CasError](obj)
|
|
|
|
except Exception as e:
|
|
return err[CasObject, CasError](CasError(
|
|
code: UnknownError,
|
|
msg: "Parallel store failed: " & e.msg
|
|
))
|