Examples¶
This page collects complete runnable examples using different parts of uRedis.
Single client¶
#include "uvent/Uvent.h"
#include "uredis/RedisClient.h"
#include <ulog/ulog.h>
using namespace usub::uvent;
using namespace usub::uredis;
namespace task = usub::uvent::task;
task::Awaitable<void> example_single()
{
usub::ulog::info("example_single: start");
RedisConfig cfg;
cfg.host = "127.0.0.1";
cfg.port = 15100;
RedisClient client{cfg};
auto c = co_await client.connect();
if (!c)
{
const auto& err = c.error();
usub::ulog::error("example_single: connect failed, category={}, message={}",
static_cast<int>(err.category), err.message);
co_return;
}
co_await client.set("foo", "bar");
auto g = co_await client.get("foo");
if (g && g.value().has_value())
{
usub::ulog::info("example_single: GET foo -> '{}'", g.value().value());
}
usub::ulog::info("example_single: done");
co_return;
}
int main()
{
usub::ulog::ULogInit log_cfg{
.trace_path = nullptr,
.debug_path = nullptr,
.info_path = nullptr,
.warn_path = nullptr,
.error_path = nullptr,
.flush_interval_ns = 2'000'000ULL,
.queue_capacity = 16384,
.batch_size = 512,
.enable_color_stdout = true,
.max_file_size_bytes = 10 * 1024 * 1024,
.max_files = 3,
.json_mode = false,
.track_metrics = true
};
usub::ulog::init(log_cfg);
usub::ulog::info("main(single): starting uvent");
usub::Uvent uvent(4);
usub::uvent::system::co_spawn(example_single());
uvent.run();
usub::ulog::info("main(single): uvent stopped");
return 0;
}
Pool example¶
#include "uvent/Uvent.h"
#include "uredis/RedisClient.h"
#include "uredis/RedisPool.h"
#include <ulog/ulog.h>
using namespace usub::uvent;
using namespace usub::uredis;
namespace task = usub::uvent::task;
task::Awaitable<void> example_pool()
{
usub::ulog::info("example_pool: start");
RedisPoolConfig pcfg;
pcfg.host = "127.0.0.1";
pcfg.port = 15100;
pcfg.db = 0;
pcfg.size = 8;
RedisPool pool{pcfg};
auto rc = co_await pool.connect_all();
if (!rc)
{
const auto& err = rc.error();
usub::ulog::error("example_pool: connect_all failed, category={}, message={}",
static_cast<int>(err.category), err.message);
co_return;
}
auto r = co_await pool.command("INCRBY", "counter", "1");
if (!r)
{
const auto& err = r.error();
usub::ulog::error("example_pool: INCRBY failed, category={}, message={}",
static_cast<int>(err.category), err.message);
co_return;
}
const RedisValue& v = *r;
if (v.is_integer())
{
usub::ulog::info("example_pool: counter -> {}", v.as_integer());
}
else
{
usub::ulog::warn("example_pool: INCRBY returned non-integer reply");
}
usub::ulog::info("example_pool: done");
co_return;
}
int main()
{
usub::ulog::ULogInit log_cfg{
.trace_path = nullptr,
.debug_path = nullptr,
.info_path = nullptr,
.warn_path = nullptr,
.error_path = nullptr,
.flush_interval_ns = 2'000'000ULL,
.queue_capacity = 16384,
.batch_size = 512,
.enable_color_stdout = true,
.max_file_size_bytes = 10 * 1024 * 1024,
.max_files = 3,
.json_mode = false,
.track_metrics = true
};
usub::ulog::init(log_cfg);
usub::ulog::info("main(pool): starting uvent");
usub::Uvent uvent(4);
usub::uvent::system::co_spawn(example_pool());
uvent.run();
usub::ulog::info("main(pool): uvent stopped");
return 0;
}
Pub/Sub (low-level)¶
#include "uvent/Uvent.h"
#include "uredis/RedisSubscriber.h"
#include "uredis/RedisClient.h"
#include <ulog/ulog.h>
using namespace usub::uvent;
using namespace usub::uredis;
namespace task = usub::uvent::task;
using usub::ulog::info;
using usub::ulog::warn;
using usub::ulog::error;
static std::shared_ptr<RedisSubscriber> g_subscriber;
task::Awaitable<void> subscriber_coro()
{
info("subscriber_coro: start");
RedisConfig cfg;
cfg.host = "127.0.0.1";
cfg.port = 15100;
g_subscriber = std::make_shared<RedisSubscriber>(cfg);
auto c = co_await g_subscriber->connect();
if (!c)
{
const auto& err = c.error();
error("subscriber_coro: connect failed, category={}, message={}",
static_cast<int>(err.category), err.message);
co_return;
}
info("subscriber_coro: connected");
// SUBSCRIBE events
auto r1 = co_await g_subscriber->subscribe(
"events",
[](const std::string& channel, const std::string& payload)
{
std::printf("[SUB] channel='%s' payload='%s'\n",
channel.c_str(), payload.c_str());
});
if (!r1)
{
const auto& err = r1.error();
error("subscriber_coro: SUBSCRIBE events failed, category={}, message={}",
static_cast<int>(err.category), err.message);
co_return;
}
info("subscriber_coro: subscribed to 'events'");
// PSUBSCRIBE events.*
auto r2 = co_await g_subscriber->psubscribe(
"events.*",
[](const std::string& channel, const std::string& payload)
{
std::printf("[PSUB] channel='%s' payload='%s'\n",
channel.c_str(), payload.c_str());
});
if (!r2)
{
const auto& err = r2.error();
error("subscriber_coro: PSUBSCRIBE events.* failed, category={}, message={}",
static_cast<int>(err.category), err.message);
co_return;
}
info("subscriber_coro: psubscribed to 'events.*'");
info("subscriber_coro: waiting for messages...");
using namespace std::chrono_literals;
while (true)
{
co_await system::this_coroutine::sleep_for(1s);
}
co_return;
}
task::Awaitable<void> publisher_coro()
{
info("publisher_coro: start");
RedisConfig cfg;
cfg.host = "127.0.0.1";
cfg.port = 15100;
RedisClient client{cfg};
auto c = co_await client.connect();
if (!c)
{
const auto& err = c.error();
error("publisher_coro: connect failed, category={}, message={}",
static_cast<int>(err.category), err.message);
co_return;
}
info("publisher_coro: connected");
using namespace std::chrono_literals;
for (int i = 1; i <= 5; ++i)
{
std::string payload = "event_" + std::to_string(i);
std::string_view args_arr[2] = {"events", payload};
auto resp = co_await client.command(
"PUBLISH",
std::span<const std::string_view>(args_arr, 2)
);
if (!resp)
{
const auto& err = resp.error();
error("publisher_coro: PUBLISH failed, category={}, message={}",
static_cast<int>(err.category), err.message);
co_return;
}
const RedisValue& v = *resp;
if (v.is_integer())
{
info("publisher_coro: PUBLISH events '{}' -> {} subscribers",
payload, v.as_integer());
}
else
{
warn("publisher_coro: PUBLISH events '{}' -> unexpected reply type", payload);
}
co_await system::this_coroutine::sleep_for(500ms);
}
info("publisher_coro: done");
co_return;
}
task::Awaitable<void> control_coro()
{
using namespace std::chrono_literals;
info("control_coro: waiting before unsubscribe...");
co_await system::this_coroutine::sleep_for(3s);
if (!g_subscriber)
{
warn("control_coro: subscriber not initialized");
co_return;
}
// UNSUBSCRIBE events
{
auto r = co_await g_subscriber->unsubscribe("events");
if (!r)
{
const auto& err = r.error();
error("control_coro: UNSUBSCRIBE events failed, category={}, message={}",
static_cast<int>(err.category), err.message);
}
else
{
info("control_coro: UNSUBSCRIBE events ok");
}
}
// PUNSUBSCRIBE events.*
{
auto r = co_await g_subscriber->punsubscribe("events.*");
if (!r)
{
const auto& err = r.error();
error("control_coro: PUNSUBSCRIBE events.* failed, category={}, message={}",
static_cast<int>(err.category), err.message);
}
else
{
info("control_coro: PUNSUBSCRIBE events.* ok");
}
}
co_return;
}
int main()
{
usub::ulog::ULogInit log_cfg{
.trace_path = nullptr,
.debug_path = nullptr,
.info_path = nullptr,
.warn_path = nullptr,
.error_path = nullptr,
.flush_interval_ns = 2'000'000ULL,
.queue_capacity = 16384,
.batch_size = 512,
.enable_color_stdout = true,
.max_file_size_bytes = 10 * 1024 * 1024,
.max_files = 3,
.json_mode = false,
.track_metrics = true
};
usub::ulog::init(log_cfg);
info("main(pubsub): starting uvent");
usub::Uvent uvent(3);
system::co_spawn(subscriber_coro());
system::co_spawn(publisher_coro());
system::co_spawn(control_coro());
uvent.run();
info("main(pubsub): uvent stopped");
return 0;
}
RedisBus¶
#include "uvent/Uvent.h"
#include "uredis/RedisBus.h"
#include <ulog/ulog.h>
// See the RedisBus section for full example:
// - RedisBus::run() in a background coroutine
// - user coroutine with subscribe/psubscribe and publish calls.
Reflection helpers¶
#include "uvent/Uvent.h"
#include "uredis/RedisClient.h"
#include "uredis/RedisReflect.h"
#include <ulog/ulog.h>
using namespace usub::uvent;
using namespace usub::uredis;
namespace task = usub::uvent::task;
using usub::ulog::info;
using usub::ulog::error;
struct User
{
int64_t id;
std::string name;
bool active;
std::optional<int64_t> age;
};
task::Awaitable<void> reflect_example()
{
info("reflect_example: start");
RedisConfig cfg;
cfg.host = "127.0.0.1";
cfg.port = 15100;
RedisClient client{cfg};
auto c = co_await client.connect();
if (!c)
{
const auto& err = c.error();
error("reflect_example: connect failed, category={}, message={}",
static_cast<int>(err.category), err.message);
co_return;
}
info("reflect_example: connected");
using namespace usub::uredis::reflect;
User u{.id = 42, .name = "Kirill", .active = true, .age = 30};
auto hset_res = co_await hset_struct(client, "user:42", u);
if (!hset_res)
{
const auto& err = hset_res.error();
error("reflect_example: hset_struct failed, category={}, message={}",
static_cast<int>(err.category), err.message);
co_return;
}
info("reflect_example: hset_struct user:42 fields={}", hset_res.value());
auto loaded = co_await hget_struct<User>(client, "user:42");
if (!loaded)
{
const auto& err = loaded.error();
error("reflect_example: hget_struct failed, category={}, message={}",
static_cast<int>(err.category), err.message);
co_return;
}
if (!loaded.value().has_value())
{
info("reflect_example: hget_struct user:42 -> (nil)");
}
else
{
const User& u2 = *loaded.value();
info("reflect_example: hget_struct user:42 -> id={} name='{}' active={} age={}",
u2.id,
u2.name,
u2.active,
u2.age.has_value() ? std::to_string(*u2.age) : std::string("<null>"));
}
info("reflect_example: done");
co_return;
}
int main()
{
usub::ulog::ULogInit log_cfg{
.trace_path = nullptr,
.debug_path = nullptr,
.info_path = nullptr,
.warn_path = nullptr,
.error_path = nullptr,
.flush_interval_ns = 2'000'000ULL,
.queue_capacity = 16384,
.batch_size = 512,
.enable_color_stdout = true,
.max_file_size_bytes = 10 * 1024 * 1024,
.max_files = 3,
.json_mode = false,
.track_metrics = true
};
usub::ulog::init(log_cfg);
info("main(reflect): starting uvent");
usub::Uvent uvent(2);
usub::uvent::system::co_spawn(reflect_example());
uvent.run();
info("main(reflect): uvent stopped");
return 0;
}
Sentinel¶
#include "uvent/Uvent.h"
#include "uredis/RedisSentinelPool.h"
#include <ulog/ulog.h>
using namespace usub::uvent;
using namespace usub::uredis;
namespace task = usub::uvent::task;
using usub::ulog::info;
using usub::ulog::error;
task::Awaitable<void> example_sentinel_pool()
{
info("example_sentinel_pool: start");
RedisSentinelConfig scfg;
scfg.master_name = "mymaster";
scfg.sentinels.push_back(
RedisSentinelNode{
.host = "127.0.0.1",
.port = 26379
});
scfg.base_redis.db = 0;
scfg.base_redis.io_timeout_ms = 5000;
scfg.pool_size = 8;
RedisSentinelPool pool{scfg};
auto c = co_await pool.connect();
if (!c)
{
const auto& err = c.error();
error("example_sentinel_pool: connect failed, category={}, message={}",
static_cast<int>(err.category), err.message);
co_return;
}
auto r = co_await pool.command("INCRBY", "counter", "1");
if (!r)
{
const auto& err = r.error();
error("example_sentinel_pool: INCRBY failed, category={}, message={}",
static_cast<int>(err.category), err.message);
co_return;
}
const RedisValue& v = *r;
info("example_sentinel_pool: counter -> {}", v.as_integer());
info("example_sentinel_pool: done");
co_return;
}
int main()
{
usub::ulog::ULogInit log_cfg{
.trace_path = nullptr,
.debug_path = nullptr,
.info_path = nullptr,
.warn_path = nullptr,
.error_path = nullptr,
.flush_interval_ns = 2'000'000ULL,
.queue_capacity = 16384,
.batch_size = 512,
.enable_color_stdout = true,
.max_file_size_bytes = 10 * 1024 * 1024,
.max_files = 3,
.json_mode = false,
.track_metrics = true
};
usub::ulog::init(log_cfg);
usub::Uvent uvent(4);
usub::uvent::system::co_spawn(example_sentinel_pool());
uvent.run();
}
Cluster¶
#include "uvent/Uvent.h"
#include "uredis/RedisClusterClient.h"
#include <ulog/ulog.h>
using namespace usub::uvent;
using namespace usub::uredis;
namespace task = usub::uvent::task;
using usub::ulog::info;
using usub::ulog::error;
task::Awaitable<void> cluster_example()
{
RedisClusterConfig cfg;
cfg.seeds = {
{"127.0.0.1", 7000},
{"127.0.0.1", 7001},
};
cfg.max_redirections = 8;
RedisClusterClient cluster{cfg};
auto c = co_await cluster.connect();
if (!c)
{
const auto& e = c.error();
error("cluster_example: connect failed: category={} message={}",
static_cast<int>(e.category), e.message);
co_return;
}
auto r1 = co_await cluster.command("SET", "user:42", "Kirill");
if (!r1)
{
const auto& e = r1.error();
error("cluster_example: SET failed: category={} message={}",
static_cast<int>(e.category), e.message);
co_return;
}
auto r2 = co_await cluster.command("GET", "user:42");
if (!r2)
{
const auto& e = r2.error();
error("cluster_example: GET failed: category={} message={}",
static_cast<int>(e.category), e.message);
co_return;
}
const RedisValue& v = *r2;
if (v.is_bulk_string() || v.is_simple_string())
{
info("cluster_example: GET user:42 -> '{}'", v.as_string());
}
co_return;
}
int main()
{
usub::ulog::ULogInit log_cfg{
.trace_path = nullptr,
.debug_path = nullptr,
.info_path = nullptr,
.warn_path = nullptr,
.error_path = nullptr,
.flush_interval_ns = 2'000'000ULL,
.queue_capacity = 16384,
.batch_size = 512,
.enable_color_stdout = true,
.max_file_size_bytes = 10 * 1024 * 1024,
.max_files = 3,
.json_mode = false,
.track_metrics = true
};
usub::ulog::init(log_cfg);
info("main(cluster): starting uvent");
usub::Uvent uvent(4);
usub::uvent::system::co_spawn(cluster_example());
uvent.run();
info("main(cluster): uvent stopped");
return 0;
}
Redlock¶
#include "uvent/Uvent.h"
#include "uredis/RedisClient.h"
#include "uredis/RedisRedlock.h"
#include <ulog/ulog.h>
using namespace usub::uvent;
using namespace usub::uredis;
namespace task = usub::uvent::task;
namespace system = usub::uvent::system;
using usub::ulog::info;
using usub::ulog::warn;
using usub::ulog::error;
task::Awaitable<void> worker_coro(
int id,
std::shared_ptr<RedisRedlock> redlock,
std::shared_ptr<RedisClient> data_client,
int iterations)
{
using namespace std::chrono_literals;
info("worker[{}]: start, iterations={}", id, iterations);
for (int i = 0; i < iterations; ++i)
{
auto lock_res = co_await redlock->lock("lock:demo:counter");
if (!lock_res)
{
const auto& err = lock_res.error();
warn("worker[{}]: lock failed (iter={}): category={}, message={}",
id, i, static_cast<int>(err.category), err.message);
co_await system::this_coroutine::sleep_for(50ms);
continue;
}
auto handle = lock_res.value();
auto g = co_await data_client->get("demo:counter");
if (!g)
{
const auto& err = g.error();
error("worker[{}]: GET demo:counter failed: category={}, message={}",
id, static_cast<int>(err.category), err.message);
co_await redlock->unlock(handle);
co_return;
}
int64_t current = 0;
if (g.value().has_value())
{
try
{
current = std::stoll(g.value().value());
}
catch (...)
{
warn("worker[{}]: invalid counter value='{}', reset to 0",
id, g.value().value());
current = 0;
}
}
int64_t next = current + 1;
std::string next_str = std::to_string(next);
auto s = co_await data_client->set("demo:counter", next_str);
if (!s)
{
const auto& err = s.error();
error("worker[{}]: SET demo:counter failed: category={}, message={}",
id, static_cast<int>(err.category), err.message);
co_await redlock->unlock(handle);
co_return;
}
info("worker[{}]: iter={} counter {} -> {}",
id, i, current, next);
auto u = co_await redlock->unlock(handle);
if (!u)
{
const auto& err = u.error();
warn("worker[{}]: unlock failed: category={}, message={}",
id, static_cast<int>(err.category), err.message);
}
co_await system::this_coroutine::sleep_for(10ms);
}
info("worker[{}]: done", id);
co_return;
}
task::Awaitable<void> redlock_demo()
{
info("redlock_demo: start");
RedlockConfig rcfg;
rcfg.ttl_ms = 2000;
rcfg.retry_count = 5;
rcfg.retry_delay_ms = 100;
{
RedisConfig n1;
n1.host = "127.0.0.1";
n1.port = 15100;
RedisConfig n2 = n1;
n2.port = 15101;
RedisConfig n3 = n1;
n3.port = 15102;
rcfg.nodes.push_back(n1);
rcfg.nodes.push_back(n2);
rcfg.nodes.push_back(n3);
}
auto redlock = std::make_shared<RedisRedlock>(rcfg);
auto redlock_conn = co_await redlock->connect_all();
if (!redlock_conn)
{
const auto& err = redlock_conn.error();
error("redlock_demo: connect_all failed: category={}, message={}",
static_cast<int>(err.category), err.message);
co_return;
}
info("redlock_demo: redlock nodes connected");
RedisConfig data_cfg = rcfg.nodes.front();
auto data_client = std::make_shared<RedisClient>(data_cfg);
auto dc = co_await data_client->connect();
if (!dc)
{
const auto& err = dc.error();
error("redlock_demo: data_client connect failed: category={}, message={}",
static_cast<int>(err.category), err.message);
co_return;
}
info("redlock_demo: data_client connected");
auto r = co_await data_client->set("demo:counter", "0");
if (!r)
{
const auto& err = r.error();
error("redlock_demo: reset demo:counter failed: category={}, message={}",
static_cast<int>(err.category), err.message);
co_return;
}
info("redlock_demo: demo:counter reset to 0");
const int workers = 4;
const int iterations = 10;
for (int i = 0; i < workers; ++i)
{
system::co_spawn(
worker_coro(i, redlock, data_client, iterations));
}
using namespace std::chrono_literals;
co_await system::this_coroutine::sleep_for(3s);
auto g = co_await data_client->get("demo:counter");
if (!g)
{
const auto& err = g.error();
error("redlock_demo: final GET failed: category={}, message={}",
static_cast<int>(err.category), err.message);
co_return;
}
if (g.value().has_value())
{
info("redlock_demo: final demo:counter = '{}'", g.value().value());
}
else
{
warn("redlock_demo: final demo:counter -> (nil)");
}
info("redlock_demo: done");
co_return;
}
int main()
{
usub::ulog::ULogInit log_cfg{
.trace_path = nullptr,
.debug_path = nullptr,
.info_path = nullptr,
.warn_path = nullptr,
.error_path = nullptr,
.flush_interval_ns = 2'000'000ULL,
.queue_capacity = 16384,
.batch_size = 512,
.enable_color_stdout = true,
.max_file_size_bytes = 10 * 1024 * 1024,
.max_files = 3,
.json_mode = false,
.track_metrics = true
};
usub::ulog::init(log_cfg);
info("main(redlock): starting uvent");
usub::Uvent uvent(4);
system::co_spawn(redlock_demo());
uvent.run();
info("main(redlock): uvent stopped");
return 0;
}