Cats-Effect Integration
The duck4s-cats-effect module provides a purely functional interface for DuckDB built on cats-effect and fs2. Connections are managed as Resource[IO, DuckDBConnection], all JDBC calls are wrapped in IO.blocking, and errors are raised as DuckDBException rather than returned as Either values.
Installation
SBT
libraryDependencies += "com.softinio" %% "duck4s-cats-effect" % "0.1.4"
Mill
def ivyDeps = Agg(
ivy"com.softinio::duck4s-cats-effect::0.1.4"
)
Imports
import cats.effect.{IO, Resource, IOApp}
import fs2.Stream
import com.softinio.duck4s.algebra.*
import com.softinio.duck4s.effect.*
Connection Management
Use DuckDBIO.connect() to acquire a connection as a Resource. The connection is closed automatically when the resource scope exits.
// In-memory database (default)
val program: IO[Unit] =
DuckDBIO.connect().use { conn =>
conn.executeUpdateIO("CREATE TABLE t (id INTEGER)").void
}
// Persistent database
val config = DuckDBConfig.persistent("/path/to/database.db")
val persistentProgram: IO[Unit] =
DuckDBIO.connect(config).use { conn =>
conn.executeUpdateIO("CREATE TABLE t (id INTEGER)").void
}
Thread safety: DuckDB connections are not thread-safe. Do not share a single
DuckDBConnectionacross fibers. Each fiber should use its own connection, either via a separateDuckDBIO.connect()scope or by callingconn.duplicate().
Executing Queries
DDL and Updates
executeUpdateIO runs INSERT, UPDATE, DELETE, and DDL statements, returning the affected row count:
DuckDBIO.connect().use { conn =>
for
_ <- conn.executeUpdateIO("CREATE TABLE users (id INTEGER, name VARCHAR, age INTEGER)")
n <- conn.executeUpdateIO("INSERT INTO users VALUES (1, 'Alice', 25), (2, 'Bob', 30)")
_ <- IO(println(s"Inserted $n rows"))
yield ()
}
SELECT Queries
executeQueryIO returns a DuckDBResultSet wrapped in IO. You are responsible for closing the result set:
DuckDBIO.connect().use { conn =>
for
_ <- conn.executeUpdateIO("CREATE TABLE users (id INTEGER, name VARCHAR)")
_ <- conn.executeUpdateIO("INSERT INTO users VALUES (1, 'Alice'), (2, 'Bob')")
rs <- conn.executeQueryIO("SELECT * FROM users ORDER BY id")
_ <- IO {
while rs.next() do
println(s"${rs.getInt("id")}: ${rs.getString("name")}")
rs.close()
}
yield ()
}
Streaming Results with fs2
DuckDBIO.stream streams rows from a SELECT query as an fs2.Stream[IO, A]. The result set is opened and closed within the stream's resource scope — no manual cleanup needed.
DuckDBIO.connect().use { conn =>
for
_ <- conn.executeUpdateIO("CREATE TABLE products (id INTEGER, name VARCHAR, price DOUBLE)")
_ <- conn.executeUpdateIO("""
INSERT INTO products VALUES
(1, 'Widget', 9.99),
(2, 'Gadget', 24.99),
(3, 'Doohickey', 4.99)
""")
names <- DuckDBIO
.stream(conn, "SELECT name, price FROM products WHERE price > 5.0") { rs =>
s"${rs.getString("name")} ($$${rs.getDouble("price")})"
}
.compile.toList
_ <- IO(names.foreach(println))
yield ()
}
The row mapper function receives the DuckDBResultSet positioned at each row and should extract the values synchronously.
Prepared Statements
One-shot with automatic cleanup
withPreparedStatementIO prepares a statement, runs a block, then closes it automatically:
DuckDBIO.connect().use { conn =>
for
_ <- conn.executeUpdateIO("CREATE TABLE items (id INTEGER, label VARCHAR)")
count <- conn.withPreparedStatementIO("INSERT INTO items VALUES (?, ?)") { stmt =>
for
_ <- IO.blocking(stmt.setInt(1, 42))
_ <- IO.blocking(stmt.setString(2, "example"))
n <- IO.blocking(stmt.executeUpdate())
yield n
}
_ <- IO(println(s"Inserted $count row(s)"))
yield ()
}
Manual lifecycle
prepareStatementIO returns an IO[DuckDBPreparedStatement] when you need to manage the lifecycle yourself:
import cats.effect.Resource
DuckDBIO.connect().use { conn =>
Resource
.make(conn.prepareStatementIO("SELECT * FROM items WHERE id = ?"))(stmt =>
IO.blocking(stmt.close())
)
.use { stmt =>
IO.blocking(stmt.setInt(1, 42)) >>
IO.blocking(stmt.executeQuery()).flatMap { rs =>
IO {
while rs.next() do println(rs.getString("label"))
rs.close()
}
}
}
}
Batch Operations
One-shot with automatic cleanup
withBatchIO prepares a batch, runs a block, then closes it automatically:
DuckDBIO.connect().use { conn =>
for
_ <- conn.executeUpdateIO("CREATE TABLE employees (id INTEGER, name VARCHAR, salary DOUBLE)")
result <- conn.withBatchIO("INSERT INTO employees VALUES (?, ?, ?)") { batch =>
for
_ <- IO.blocking(batch.addBatch((1, "Alice", 75000.0), (2, "Bob", 80000.0), (3, "Charlie", 85000.0)))
r <- IO.blocking(batch.executeBatch())
yield r
}
_ <- IO(println(s"Inserted ${result.successCount} row(s), ${result.failureCount} failure(s)"))
yield ()
}
Manual lifecycle
prepareBatchIO returns an IO[DuckDBBatch] for manual control:
import cats.effect.Resource
DuckDBIO.connect().use { conn =>
Resource
.make(conn.prepareBatchIO("INSERT INTO employees VALUES (?, ?, ?)"))(b =>
IO.blocking(b.close())
)
.use { batch =>
IO.blocking(batch.addBatch((4, "Dana", 90000.0))) >>
IO.blocking(batch.executeBatch()).void
}
}
Transactions
withTransactionIO disables auto-commit, runs a block, and commits on success or rolls back on any raised error. Auto-commit is restored in a guarantee finalizer regardless of outcome.
DuckDBIO.connect().use { conn =>
for
_ <- conn.executeUpdateIO("CREATE TABLE accounts (id INTEGER, balance DOUBLE)")
_ <- conn.executeUpdateIO("INSERT INTO accounts VALUES (1, 1000.0), (2, 500.0)")
_ <- conn.withTransactionIO { txConn =>
for
_ <- txConn.executeUpdateIO("UPDATE accounts SET balance = balance - 200 WHERE id = 1")
_ <- txConn.executeUpdateIO("UPDATE accounts SET balance = balance + 200 WHERE id = 2")
yield ()
}
yield ()
}
If either update raises an error, the whole transaction is rolled back automatically.
Error Handling
The cats-effect module raises errors as DuckDBException (a RuntimeException subclass) rather than returning Either. Use standard cats-effect error handling:
import com.softinio.duck4s.effect.DuckDBException
DuckDBIO.connect().use { conn =>
conn
.executeQueryIO("SELECT * FROM nonexistent_table")
.handleErrorWith {
case ex: DuckDBException =>
IO(println(s"DuckDB error: ${ex.getMessage}")) >> IO(println(s"Underlying: ${ex.error}"))
case ex =>
IO(println(s"Unexpected error: ${ex.getMessage}"))
}
.void
}
You can also use attempt to recover an Either[Throwable, A]:
DuckDBIO.connect().use { conn =>
conn.executeQueryIO("SELECT * FROM nonexistent_table").attempt.flatMap {
case Right(rs) => IO(rs.close())
case Left(ex) => IO(println(s"Failed: ${ex.getMessage}"))
}
}
Complete Example
import cats.effect.{IO, IOApp}
import com.softinio.duck4s.algebra.*
import com.softinio.duck4s.effect.*
object Main extends IOApp.Simple:
def run: IO[Unit] =
DuckDBIO.connect().use { conn =>
for
_ <- conn.executeUpdateIO("""
CREATE TABLE orders (
id INTEGER,
product VARCHAR,
qty INTEGER,
price DOUBLE
)
""")
_ <- conn.withBatchIO("INSERT INTO orders VALUES (?, ?, ?, ?)") { batch =>
for
_ <- IO.blocking(batch.addBatch(
(1, "Widget", 10, 9.99),
(2, "Gadget", 5, 24.99),
(3, "Doohickey", 20, 4.99)
))
r <- IO.blocking(batch.executeBatch())
_ <- IO(println(s"Inserted ${r.successCount} order(s)"))
yield ()
}
total <- DuckDBIO
.stream(conn, "SELECT product, qty * price AS subtotal FROM orders") { rs =>
rs.getString("product") -> rs.getDouble("subtotal")
}
.evalTap { case (product, subtotal) => IO(println(f"$product%-12s $$${subtotal}%.2f")) }
.map(_._2)
.compile.fold(0.0)(_ + _)
_ <- IO(println(f"Total: $$${total}%.2f"))
yield ()
}
Next Steps
- Getting Started - Core synchronous API
- Core API Documentation - Core API reference
- Cats-Effect API Documentation - Cats-effect module API reference
- cats-effect documentation
- fs2 documentation