LMDBCollection

LMDBCollection[K, T] is the primary API facade for regular collections where each key maps to exactly one value.

Table of contents

  1. Overview
  2. Creating a collection
  3. Read operations
    1. fetch
    2. contains
    3. size
    4. Cursor navigation
  4. Write operations
    1. insert
    2. upsertOverwrite
    3. upsert
    4. update
    5. delete
    6. clear
  5. Streaming and collecting
    1. collect
    2. stream
    3. streamWithKeys
  6. Indexes
    1. withIndex
    2. withIndexFull
    3. rebuildIndexes
  7. Transactions
    1. readOnly
    2. readWrite
  8. Cross-collection transactions with lift
  9. Error types

Overview

A LMDBCollection[K, T] is obtained by calling LMDB.collectionCreate and it carries the collection name, the LMDB service reference, and any attached index updaters. The type parameters fix the key type K and value type T for all operations.

val products: LMDBCollection[UUID, Product] =
  LMDB.collectionCreate[UUID, Product]("products", failIfExists = false)

All operations return ZIO or ZStream effects with typed error channels. No exceptions leak through.


Creating a collection

LMDB.collectionCreate[K, T](name: String, failIfExists: Boolean = true): IO[..., LMDBCollection[K, T]]
Parameter Description
name Unique collection name within the database
failIfExists If true (default), fails if the collection already exists

Read operations

fetch

def fetch(key: K): IO[FetchErrors, Option[T]]

Returns the value for key, or None if the key does not exist.

col.fetch(myId).map {
  case Some(record) => println(s"Found: $record")
  case None         => println("Not found")
}

contains

def contains(key: K): IO[ContainsErrors, Boolean]

Returns true if key exists in the collection.

size

def size(): IO[SizeErrors, Long]

Returns the number of key-value pairs in the collection.

Cursor navigation

def head(): IO[FetchErrors, Option[(K, T)]]
def last(): IO[FetchErrors, Option[(K, T)]]
def next(afterThatKey: K): IO[FetchErrors, Option[(K, T)]]
def previous(beforeThatKey: K): IO[FetchErrors, Option[(K, T)]]

Keys are stored in lexicographic order. These operations navigate the sorted key space efficiently without loading the entire collection:

for {
  first  <- col.head()                    // (minKey, record)
  second <- first.flatMap((k,_) => col.next(k).option).getOrElse(None)
  last   <- col.last()                    // (maxKey, record)
} yield (first, second, last)

Write operations

insert

def insert(key: K, document: T): IO[InsertErrors | IndexErrors, Unit]

Strictly inserts document at key. Fails with StorageUserError.KeyAlreadyExists if the key is already present — the existing record is left untouched. The check and the write happen atomically in a single LMDB operation (MDB_NOOVERWRITE), so there is no race window between “does the key exist?” and “write”.

col.insert(id, Product(id, "Widget", 9.99))

Attached indexes are only updated when the insert succeeds.

upsertOverwrite

def upsertOverwrite(key: K, document: T): IO[UpsertErrors | IndexErrors, Unit]

Inserts document at key. If a record already exists for that key it is silently overwritten.

col.upsertOverwrite(id, Product(id, "Widget", 9.99))

upsert

def upsert(key: K, modifier: Option[T] => T): IO[UpsertErrors | IndexErrors, T]

Atomically inserts or updates a record. The modifier function receives Some(existing) if the key already exists, or None for a new key. Returns the new value.

col.upsert(id, {
  case Some(existing) => existing.copy(price = existing.price * 1.1) // 10% price increase
  case None           => Product(id, "New Product", 19.99)           // default on insert
})

update

def update(key: K, modifier: T => T): IO[UpdateErrors | IndexErrors, Option[T]]

Updates an existing record. If the key does not exist, returns None without creating a new entry.

col.update(id, _.copy(name = "Updated Name"))
// Returns Some(updatedRecord) or None if key was missing

delete

def delete(key: K): IO[DeleteErrors | IndexErrors, Option[T]]

Removes the record at key and returns it. Returns None if the key did not exist.

col.delete(id).map {
  case Some(r) => println(s"Deleted $r")
  case None    => println("Key was not found")
}

clear

def clear(): IO[ClearErrors | IndexErrors, Unit]

Removes all records from the collection. Also clears any attached indexes.


Streaming and collecting

collect

def collect(
  keyFilter: K => Boolean = _ => true,
  valueFilter: T => Boolean = _ => true,
  startAfter: Option[K] = None,
  backward: Boolean = false,
  limit: Option[Long] = None
): IO[CollectErrors, List[T]]

Loads matching records into memory. The keyFilter is applied before deserialization for efficiency.

// All records
col.collect()

// Records with keys starting with "A"
col.collect(keyFilter = _.startsWith("A"))

// First 100 active products, cheapest first (ascending by key)
col.collect(valueFilter = _.active, limit = Some(100))

// Page 2: records after the last key of page 1
col.collect(startAfter = Some(lastKeyFromPage1), limit = Some(25))

stream

def stream(
  keyFilter: K => Boolean = _ => true,
  startAfter: Option[K] = None,
  backward: Boolean = false
): ZStream[Any, StreamErrors, T]

Streams values lazily without loading everything into memory:

col.stream(backward = true)
   .take(10)
   .runCollect

streamWithKeys

def streamWithKeys(
  keyFilter: K => Boolean = _ => true,
  startAfter: Option[K] = None,
  backward: Boolean = false
): ZStream[Any, StreamErrors, (K, T)]

Like stream but yields (key, value) tuples.


Indexes

Attach an LMDBIndex to a collection so that it is automatically updated whenever the collection changes.

withIndex

def withIndex[IK](index: LMDBIndex[IK, K])(extractor: T => Iterable[IK]): LMDBCollection[K, T]

Links index to the collection. The extractor function derives the index keys from each record value. The collection primary key becomes the index target key automatically.

// Index products by category
val categoryIndex: LMDBIndex[String, UUID] = ...

val indexedProducts = products.withIndex(categoryIndex)(p => List(p.category))

// Now, every upsert/update/delete on indexedProducts also updates categoryIndex
indexedProducts.upsertOverwrite(id, Product(id, "Widget", "electronics", 9.99))

withIndexFull

def withIndexFull[IK, IV](index: LMDBIndex[IK, IV])(extractor: (K, T) => Iterable[(IK, IV)]): LMDBCollection[K, T]

Generic variant where the extractor receives both the collection key and the value, and produces (indexKey, indexValue) pairs. Use this when the index value should differ from the collection key.

rebuildIndexes

def rebuildIndexes(): IO[IndexErrors | StreamErrors | ClearErrors, Unit]

Clears and rebuilds all attached indexes from the current collection content. Useful when adding an index to an already-populated collection.

val indexedProducts = products.withIndex(categoryIndex)(_.category)
indexedProducts.rebuildIndexes()

Transactions

readOnly

def readOnly[R, E, A](f: LMDBCollectionReadOps[K, T] => ZIO[R, E, A]): ZIO[R, E | StorageSystemError, A]

Executes f within a single read-only transaction. Use this to read multiple keys consistently.

col.readOnly { tx =>
  for {
    first <- tx.head()
    last  <- tx.last()
    size  <- tx.size()
  } yield (first, last, size)
}

readWrite

def readWrite[R, E, A](f: LMDBCollectionWriteOps[K, T] => ZIO[R, E, A]): ZIO[R, E | StorageSystemError | NestedWriteTransactionError, A]

Executes f within a single read-write transaction. All writes inside f are applied atomically.

col.readWrite { tx =>
  tx.upsertOverwrite(id1, record1) *>
  tx.upsertOverwrite(id2, record2) *>
  tx.delete(id3)
}

LMDB allows only one active write transaction at a time. Attempting to open a nested write transaction will fail with NestedWriteTransactionError.


Cross-collection transactions with lift

The lift method adapts a collection facade to an already-open global transaction, enabling atomic writes across multiple collections:

lmdb.readWrite { ops =>
  val usersTx  = users.lift(ops)
  val ordersTx = orders.lift(ops)

  usersTx.upsertOverwrite(userId, user) *>
  ordersTx.upsertOverwrite(orderId, order)
}

See Transactions for a full guide.


Error types

Error type When it occurs
FetchErrors Key or collection not found, codec decode failure
ContainsErrors Storage error while checking key existence
SizeErrors Storage error while reading collection size
UpsertErrors Serialization failure or storage error on write
UpdateErrors Key not found for update, serialization/storage error
DeleteErrors Storage error on delete
ClearErrors Storage error on clear
IndexErrors Index update failure
CollectErrors Error during in-memory collection
StreamErrors Error during streaming
StorageSystemError Low-level LMDB error
NestedWriteTransactionError Attempt to open a write transaction inside a write transaction

ZIO-LMDB — Copyright © 2026 David Crosson. Licensed under the Apache 2.0 License.

This site uses Just the Docs, a documentation theme for Jekyll.