PgTransaction¶
PgTransaction provides coroutine-safe transactional operations for PostgreSQL using a dedicated pooled connection.
It wraps BEGIN / COMMIT / ROLLBACK, automatically manages connection reuse, supports nested subtransactions
(SAVEPOINT), and integrates reflection-based parameter/result mapping.
Overview¶
- Uses a dedicated connection from
PgPool - Configurable isolation / read-only / deferrable modes
- Executes async queries on the same pinned connection
- Commits or rolls back explicitly
- Automatically returns or retires connection after use
- Supports subtransactions via
SAVEPOINT - Supports pipelined execution via compile-time flag
- Reflect-aware helpers for SELECT and parameter binding
All methods are coroutine-awaitable (Awaitable).
Example¶
task::Awaitable<void> transfer_example()
{
usub::pg::PgTransaction txn;
if (!co_await txn.begin())
{
std::cout << "[ERROR] BEGIN failed\n";
co_return;
}
auto r1 = co_await txn.query(
"UPDATE accounts SET balance = balance - $1 WHERE id = $2 RETURNING balance;",
100, 1
);
auto r2 = co_await txn.query(
"UPDATE accounts SET balance = balance + $1 WHERE id = $2 RETURNING balance;",
100, 2
);
if (!r1.ok || !r2.ok)
{
co_await txn.rollback();
co_return;
}
if (!co_await txn.commit())
{
std::cout << "[ERROR] COMMIT failed\n";
co_return;
}
std::cout << "[OK] transfer complete\n";
}
Configuration¶
Use PgTransactionConfig to control isolation and access mode:
usub::pg::PgTransactionConfig cfg{
.isolation = usub::pg::TxIsolationLevel::Serializable,
.read_only = false,
.deferrable = false
};
usub::pg::PgTransaction txn(&usub::pg::PgPool::instance(), cfg);
co_await txn.begin();
Generated SQL:
Other examples:
Supported isolation levels: READ COMMITTED, REPEATABLE READ, SERIALIZABLE.
Executing queries¶
All queries run on the pinned connection:
auto qr = co_await txn.query(
"INSERT INTO logs(message) VALUES($1) RETURNING id;",
"started"
);
if (!qr.ok)
{
std::cout << "insert failed: " << qr.error << "\n";
co_await txn.rollback();
}
Reflect-based queries¶
Reflection enables automatic struct/tuple expansion for parameters and decoding of results into C++ aggregates.
SELECT → std::vector<T> / std::optional<T>¶
struct UserRow
{
int64_t id;
std::string username; // maps from "name AS username"
std::optional<std::string> password;
std::vector<int> roles;
std::vector<std::string> tags;
};
auto many = co_await txn.query_reflect<UserRow>(
"SELECT id, name AS username, password, roles, tags FROM users ORDER BY id;"
);
auto one = co_await txn.query_reflect_one<UserRow>(
"SELECT id, name AS username, password, roles, tags FROM users WHERE id = $1;",
1
);
Aggregates or tuples → parameters (INSERT/UPDATE)¶
struct NewUser
{
std::string name;
std::optional<std::string> password;
std::vector<int> roles;
std::vector<std::string> tags;
};
NewUser nu{ "bob", std::nullopt, {1, 2}, {"vip"} };
auto ins = co_await txn.exec_reflect(
"INSERT INTO users(name, password, roles, tags) VALUES ($1,$2,$3,$4);",
nu
);
Mapping rules¶
- Fields matched by name; aliases (e.g.
AS username) supported. If names unavailable, falls back to positional order. std::optional<T>↔NULL.- Containers (
std::vector,std::array, etc.) ↔ PostgreSQL arrays. - Aggregates and tuples expand into
$1..$Nparameters. - Pointers (except
char*) are not supported.
Pipelined query execution¶
Enable pipeline mode at compile time using template flag <true>:
auto r1 = co_await txn.query<true>("INSERT INTO logs(msg) VALUES($1)", "A");
auto r2 = co_await txn.query<true>("INSERT INTO logs(msg) VALUES($1)", "B");
When <true>:
- Queries are queued without waiting for previous results.
PQsendQueryParamscalls are batched and flushed once.- Results are consumed after an implicit sync.
Use only for independent statements.
| Template flag | Description |
|---|---|
<false> |
Default. Sequential per-query execution. |
<true> |
Pipeline mode. Batched async query submission. |
Commit and rollback¶
Helpers:
co_await txn.abort();— sendsABORTco_await txn.finish();— rollback if active, then cleanup
Connection lifecycle¶
Each transaction pins one connection:
- Acquired from pool (
acquire_connection()) - Used exclusively until
commit()/rollback() - Released via
release_connection_async()(which drains pending results before recycle)
Broken connections are retired automatically.
Subtransactions (SAVEPOINT)¶
Nested transactions use PgSubtransaction.
auto sub = txn.make_subtx();
if (co_await sub.begin())
{
struct Patch { int v; int id; };
auto upd = co_await sub.exec_reflect(
"UPDATE t SET v=$1 WHERE id=$2",
Patch{42, 5}
);
struct Row { int id; int v; };
auto one = co_await sub.query_reflect_one<Row>(
"SELECT id, v FROM t WHERE id=$1",
5
);
if (!upd.ok) co_await sub.rollback();
else co_await sub.commit();
}
Semantics:
| Method | SQL issued |
|---|---|
begin() |
SAVEPOINT <name> |
commit() |
RELEASE SAVEPOINT <name> |
rollback() |
ROLLBACK TO SAVEPOINT <name> |
They share the same parent connection. A failed subtransaction does not rollback the parent automatically.
Error model¶
- No exceptions — all results are structured (
QueryResult) - Connection failures automatically invalidate the transaction
- Using after rollback yields
PgErrorCode::InvalidFuture
Summary¶
| Feature | Description |
|---|---|
| Dedicated connection | One PGconn per transaction |
| Configurable isolation | Serializable / Repeatable Read / Read Committed |
| Async operations | Coroutine-suspending only, non-blocking |
| Safe pool return | release_connection_async() drains before recycle |
| Auto invalidation | Broken connections retired automatically |
| Subtransactions | Nested SAVEPOINT support |
| Reflect helpers | query_reflect, query_reflect_one, exec_reflect |
| Pipeline execution | Compile-time <true> toggle for batched queries |