Skip to content

Examples

This page collects complete runnable examples using different parts of uRedis.


Single client

#include "uvent/Uvent.h"
#include "uredis/RedisClient.h"
#include <ulog/ulog.h>

using namespace usub::uvent;
using namespace usub::uredis;
namespace task = usub::uvent::task;

task::Awaitable<void> example_single()
{
    usub::ulog::info("example_single: start");

    RedisConfig cfg;
    cfg.host = "127.0.0.1";
    cfg.port = 15100;

    RedisClient client{cfg};
    auto c = co_await client.connect();
    if (!c)
    {
        const auto& err = c.error();
        usub::ulog::error("example_single: connect failed, category={}, message={}",
                          static_cast<int>(err.category), err.message);
        co_return;
    }

    co_await client.set("foo", "bar");
    auto g = co_await client.get("foo");

    if (g && g.value().has_value())
    {
        usub::ulog::info("example_single: GET foo -> '{}'", g.value().value());
    }

    usub::ulog::info("example_single: done");
    co_return;
}

int main()
{
    usub::ulog::ULogInit log_cfg{
        .trace_path = nullptr,
        .debug_path = nullptr,
        .info_path = nullptr,
        .warn_path = nullptr,
        .error_path = nullptr,
        .flush_interval_ns = 2'000'000ULL,
        .queue_capacity = 16384,
        .batch_size = 512,
        .enable_color_stdout = true,
        .max_file_size_bytes = 10 * 1024 * 1024,
        .max_files = 3,
        .json_mode = false,
        .track_metrics = true
    };

    usub::ulog::init(log_cfg);

    usub::ulog::info("main(single): starting uvent");

    usub::Uvent uvent(4);
    usub::uvent::system::co_spawn(example_single());
    uvent.run();

    usub::ulog::info("main(single): uvent stopped");
    return 0;
}

Pool example

#include "uvent/Uvent.h"
#include "uredis/RedisClient.h"
#include "uredis/RedisPool.h"
#include <ulog/ulog.h>

using namespace usub::uvent;
using namespace usub::uredis;
namespace task = usub::uvent::task;

task::Awaitable<void> example_pool()
{
    usub::ulog::info("example_pool: start");

    RedisPoolConfig pcfg;
    pcfg.host = "127.0.0.1";
    pcfg.port = 15100;
    pcfg.db   = 0;
    pcfg.size = 8;

    RedisPool pool{pcfg};
    auto rc = co_await pool.connect_all();
    if (!rc)
    {
        const auto& err = rc.error();
        usub::ulog::error("example_pool: connect_all failed, category={}, message={}",
                          static_cast<int>(err.category), err.message);
        co_return;
    }

    auto r = co_await pool.command("INCRBY", "counter", "1");
    if (!r)
    {
        const auto& err = r.error();
        usub::ulog::error("example_pool: INCRBY failed, category={}, message={}",
                          static_cast<int>(err.category), err.message);
        co_return;
    }

    const RedisValue& v = *r;
    if (v.is_integer())
    {
        usub::ulog::info("example_pool: counter -> {}", v.as_integer());
    }
    else
    {
        usub::ulog::warn("example_pool: INCRBY returned non-integer reply");
    }

    usub::ulog::info("example_pool: done");
    co_return;
}

int main()
{
    usub::ulog::ULogInit log_cfg{
        .trace_path = nullptr,
        .debug_path = nullptr,
        .info_path = nullptr,
        .warn_path = nullptr,
        .error_path = nullptr,
        .flush_interval_ns = 2'000'000ULL,
        .queue_capacity = 16384,
        .batch_size = 512,
        .enable_color_stdout = true,
        .max_file_size_bytes = 10 * 1024 * 1024,
        .max_files = 3,
        .json_mode = false,
        .track_metrics = true
    };

    usub::ulog::init(log_cfg);

    usub::ulog::info("main(pool): starting uvent");

    usub::Uvent uvent(4);
    usub::uvent::system::co_spawn(example_pool());
    uvent.run();

    usub::ulog::info("main(pool): uvent stopped");
    return 0;
}

Pub/Sub (low-level)

#include "uvent/Uvent.h"
#include "uredis/RedisSubscriber.h"
#include "uredis/RedisClient.h"
#include <ulog/ulog.h>

using namespace usub::uvent;
using namespace usub::uredis;
namespace task   = usub::uvent::task;

using usub::ulog::info;
using usub::ulog::warn;
using usub::ulog::error;

static std::shared_ptr<RedisSubscriber> g_subscriber;

task::Awaitable<void> subscriber_coro()
{
    info("subscriber_coro: start");

    RedisConfig cfg;
    cfg.host = "127.0.0.1";
    cfg.port = 15100;

    g_subscriber = std::make_shared<RedisSubscriber>(cfg);

    auto c = co_await g_subscriber->connect();
    if (!c)
    {
        const auto& err = c.error();
        error("subscriber_coro: connect failed, category={}, message={}",
              static_cast<int>(err.category), err.message);
        co_return;
    }
    info("subscriber_coro: connected");

    // SUBSCRIBE events
    auto r1 = co_await g_subscriber->subscribe(
        "events",
        [](const std::string& channel, const std::string& payload)
        {
            std::printf("[SUB] channel='%s' payload='%s'\n",
                        channel.c_str(), payload.c_str());
        });
    if (!r1)
    {
        const auto& err = r1.error();
        error("subscriber_coro: SUBSCRIBE events failed, category={}, message={}",
              static_cast<int>(err.category), err.message);
        co_return;
    }
    info("subscriber_coro: subscribed to 'events'");

    // PSUBSCRIBE events.*
    auto r2 = co_await g_subscriber->psubscribe(
        "events.*",
        [](const std::string& channel, const std::string& payload)
        {
            std::printf("[PSUB] channel='%s' payload='%s'\n",
                        channel.c_str(), payload.c_str());
        });
    if (!r2)
    {
        const auto& err = r2.error();
        error("subscriber_coro: PSUBSCRIBE events.* failed, category={}, message={}",
              static_cast<int>(err.category), err.message);
        co_return;
    }
    info("subscriber_coro: psubscribed to 'events.*'");

    info("subscriber_coro: waiting for messages...");
    using namespace std::chrono_literals;
    while (true)
    {
        co_await system::this_coroutine::sleep_for(1s);
    }
    co_return;
}

task::Awaitable<void> publisher_coro()
{
    info("publisher_coro: start");

    RedisConfig cfg;
    cfg.host = "127.0.0.1";
    cfg.port = 15100;

    RedisClient client{cfg};
    auto c = co_await client.connect();
    if (!c)
    {
        const auto& err = c.error();
        error("publisher_coro: connect failed, category={}, message={}",
              static_cast<int>(err.category), err.message);
        co_return;
    }
    info("publisher_coro: connected");

    using namespace std::chrono_literals;

    for (int i = 1; i <= 5; ++i)
    {
        std::string payload = "event_" + std::to_string(i);

        std::string_view args_arr[2] = {"events", payload};
        auto resp = co_await client.command(
            "PUBLISH",
            std::span<const std::string_view>(args_arr, 2)
        );
        if (!resp)
        {
            const auto& err = resp.error();
            error("publisher_coro: PUBLISH failed, category={}, message={}",
                  static_cast<int>(err.category), err.message);
            co_return;
        }

        const RedisValue& v = *resp;
        if (v.is_integer())
        {
            info("publisher_coro: PUBLISH events '{}' -> {} subscribers",
                 payload, v.as_integer());
        }
        else
        {
            warn("publisher_coro: PUBLISH events '{}' -> unexpected reply type", payload);
        }

        co_await system::this_coroutine::sleep_for(500ms);
    }

    info("publisher_coro: done");
    co_return;
}

task::Awaitable<void> control_coro()
{
    using namespace std::chrono_literals;

    info("control_coro: waiting before unsubscribe...");
    co_await system::this_coroutine::sleep_for(3s);

    if (!g_subscriber)
    {
        warn("control_coro: subscriber not initialized");
        co_return;
    }

    // UNSUBSCRIBE events
    {
        auto r = co_await g_subscriber->unsubscribe("events");
        if (!r)
        {
            const auto& err = r.error();
            error("control_coro: UNSUBSCRIBE events failed, category={}, message={}",
                  static_cast<int>(err.category), err.message);
        }
        else
        {
            info("control_coro: UNSUBSCRIBE events ok");
        }
    }

    // PUNSUBSCRIBE events.*
    {
        auto r = co_await g_subscriber->punsubscribe("events.*");
        if (!r)
        {
            const auto& err = r.error();
            error("control_coro: PUNSUBSCRIBE events.* failed, category={}, message={}",
                  static_cast<int>(err.category), err.message);
        }
        else
        {
            info("control_coro: PUNSUBSCRIBE events.* ok");
        }
    }

    co_return;
}

int main()
{
    usub::ulog::ULogInit log_cfg{
        .trace_path = nullptr,
        .debug_path = nullptr,
        .info_path = nullptr,
        .warn_path = nullptr,
        .error_path = nullptr,
        .flush_interval_ns = 2'000'000ULL,
        .queue_capacity = 16384,
        .batch_size = 512,
        .enable_color_stdout = true,
        .max_file_size_bytes = 10 * 1024 * 1024,
        .max_files = 3,
        .json_mode = false,
        .track_metrics = true
    };

    usub::ulog::init(log_cfg);

    info("main(pubsub): starting uvent");

    usub::Uvent uvent(3);
    system::co_spawn(subscriber_coro());
    system::co_spawn(publisher_coro());
    system::co_spawn(control_coro());
    uvent.run();

    info("main(pubsub): uvent stopped");
    return 0;
}

RedisBus

#include "uvent/Uvent.h"
#include "uredis/RedisBus.h"
#include <ulog/ulog.h>

// See the RedisBus section for full example:
//  - RedisBus::run() in a background coroutine
//  - user coroutine with subscribe/psubscribe and publish calls.

Reflection helpers

#include "uvent/Uvent.h"
#include "uredis/RedisClient.h"
#include "uredis/RedisReflect.h"
#include <ulog/ulog.h>

using namespace usub::uvent;
using namespace usub::uredis;
namespace task = usub::uvent::task;

using usub::ulog::info;
using usub::ulog::error;

struct User
{
    int64_t id;
    std::string name;
    bool active;
    std::optional<int64_t> age;
};

task::Awaitable<void> reflect_example()
{
    info("reflect_example: start");

    RedisConfig cfg;
    cfg.host = "127.0.0.1";
    cfg.port = 15100;

    RedisClient client{cfg};
    auto c = co_await client.connect();
    if (!c)
    {
        const auto& err = c.error();
        error("reflect_example: connect failed, category={}, message={}",
              static_cast<int>(err.category), err.message);
        co_return;
    }
    info("reflect_example: connected");

    using namespace usub::uredis::reflect;

    User u{.id = 42, .name = "Kirill", .active = true, .age = 30};

    auto hset_res = co_await hset_struct(client, "user:42", u);
    if (!hset_res)
    {
        const auto& err = hset_res.error();
        error("reflect_example: hset_struct failed, category={}, message={}",
              static_cast<int>(err.category), err.message);
        co_return;
    }
    info("reflect_example: hset_struct user:42 fields={}", hset_res.value());

    auto loaded = co_await hget_struct<User>(client, "user:42");
    if (!loaded)
    {
        const auto& err = loaded.error();
        error("reflect_example: hget_struct failed, category={}, message={}",
              static_cast<int>(err.category), err.message);
        co_return;
    }

    if (!loaded.value().has_value())
    {
        info("reflect_example: hget_struct user:42 -> (nil)");
    }
    else
    {
        const User& u2 = *loaded.value();
        info("reflect_example: hget_struct user:42 -> id={} name='{}' active={} age={}",
             u2.id,
             u2.name,
             u2.active,
             u2.age.has_value() ? std::to_string(*u2.age) : std::string("<null>"));
    }

    info("reflect_example: done");
    co_return;
}

int main()
{
    usub::ulog::ULogInit log_cfg{
        .trace_path = nullptr,
        .debug_path = nullptr,
        .info_path = nullptr,
        .warn_path = nullptr,
        .error_path = nullptr,
        .flush_interval_ns = 2'000'000ULL,
        .queue_capacity = 16384,
        .batch_size = 512,
        .enable_color_stdout = true,
        .max_file_size_bytes = 10 * 1024 * 1024,
        .max_files = 3,
        .json_mode = false,
        .track_metrics = true
    };

    usub::ulog::init(log_cfg);

    info("main(reflect): starting uvent");

    usub::Uvent uvent(2);
    usub::uvent::system::co_spawn(reflect_example());
    uvent.run();

    info("main(reflect): uvent stopped");
    return 0;
}

Sentinel

#include "uvent/Uvent.h"
#include "uredis/RedisSentinelPool.h"
#include <ulog/ulog.h>

using namespace usub::uvent;
using namespace usub::uredis;
namespace task = usub::uvent::task;

using usub::ulog::info;
using usub::ulog::error;

task::Awaitable<void> example_sentinel_pool()
{
    info("example_sentinel_pool: start");

    RedisSentinelConfig scfg;
    scfg.master_name = "mymaster";
    scfg.sentinels.push_back(
        RedisSentinelNode{
            .host = "127.0.0.1",
            .port = 26379
        });
    scfg.base_redis.db = 0;
    scfg.base_redis.io_timeout_ms = 5000;
    scfg.pool_size = 8;

    RedisSentinelPool pool{scfg};

    auto c = co_await pool.connect();
    if (!c)
    {
        const auto& err = c.error();
        error("example_sentinel_pool: connect failed, category={}, message={}",
              static_cast<int>(err.category), err.message);
        co_return;
    }

    auto r = co_await pool.command("INCRBY", "counter", "1");
    if (!r)
    {
        const auto& err = r.error();
        error("example_sentinel_pool: INCRBY failed, category={}, message={}",
              static_cast<int>(err.category), err.message);
        co_return;
    }

    const RedisValue& v = *r;
    info("example_sentinel_pool: counter -> {}", v.as_integer());
    info("example_sentinel_pool: done");
    co_return;
}

int main()
{
    usub::ulog::ULogInit log_cfg{
        .trace_path = nullptr,
        .debug_path = nullptr,
        .info_path = nullptr,
        .warn_path = nullptr,
        .error_path = nullptr,
        .flush_interval_ns = 2'000'000ULL,
        .queue_capacity = 16384,
        .batch_size = 512,
        .enable_color_stdout = true,
        .max_file_size_bytes = 10 * 1024 * 1024,
        .max_files = 3,
        .json_mode = false,
        .track_metrics = true
    };
    usub::ulog::init(log_cfg);

    usub::Uvent uvent(4);
    usub::uvent::system::co_spawn(example_sentinel_pool());
    uvent.run();
}

Cluster

#include "uvent/Uvent.h"
#include "uredis/RedisClusterClient.h"
#include <ulog/ulog.h>

using namespace usub::uvent;
using namespace usub::uredis;
namespace task = usub::uvent::task;

using usub::ulog::info;
using usub::ulog::error;

task::Awaitable<void> cluster_example()
{
    RedisClusterConfig cfg;
    cfg.seeds = {
        {"127.0.0.1", 7000},
        {"127.0.0.1", 7001},
    };
    cfg.max_redirections = 8;

    RedisClusterClient cluster{cfg};

    auto c = co_await cluster.connect();
    if (!c)
    {
        const auto& e = c.error();
        error("cluster_example: connect failed: category={} message={}",
              static_cast<int>(e.category), e.message);
        co_return;
    }

    auto r1 = co_await cluster.command("SET", "user:42", "Kirill");
    if (!r1)
    {
        const auto& e = r1.error();
        error("cluster_example: SET failed: category={} message={}",
              static_cast<int>(e.category), e.message);
        co_return;
    }

    auto r2 = co_await cluster.command("GET", "user:42");
    if (!r2)
    {
        const auto& e = r2.error();
        error("cluster_example: GET failed: category={} message={}",
              static_cast<int>(e.category), e.message);
        co_return;
    }

    const RedisValue& v = *r2;
    if (v.is_bulk_string() || v.is_simple_string())
    {
        info("cluster_example: GET user:42 -> '{}'", v.as_string());
    }

    co_return;
}

int main()
{
    usub::ulog::ULogInit log_cfg{
        .trace_path = nullptr,
        .debug_path = nullptr,
        .info_path = nullptr,
        .warn_path = nullptr,
        .error_path = nullptr,
        .flush_interval_ns = 2'000'000ULL,
        .queue_capacity = 16384,
        .batch_size = 512,
        .enable_color_stdout = true,
        .max_file_size_bytes = 10 * 1024 * 1024,
        .max_files = 3,
        .json_mode = false,
        .track_metrics = true
    };
    usub::ulog::init(log_cfg);

    info("main(cluster): starting uvent");

    usub::Uvent uvent(4);
    usub::uvent::system::co_spawn(cluster_example());
    uvent.run();

    info("main(cluster): uvent stopped");
    return 0;
}

Redlock

#include "uvent/Uvent.h"
#include "uredis/RedisClient.h"
#include "uredis/RedisRedlock.h"
#include <ulog/ulog.h>

using namespace usub::uvent;
using namespace usub::uredis;

namespace task   = usub::uvent::task;
namespace system = usub::uvent::system;

using usub::ulog::info;
using usub::ulog::warn;
using usub::ulog::error;

task::Awaitable<void> worker_coro(
    int id,
    std::shared_ptr<RedisRedlock> redlock,
    std::shared_ptr<RedisClient> data_client,
    int iterations)
{
    using namespace std::chrono_literals;

    info("worker[{}]: start, iterations={}", id, iterations);

    for (int i = 0; i < iterations; ++i)
    {
        auto lock_res = co_await redlock->lock("lock:demo:counter");
        if (!lock_res)
        {
            const auto& err = lock_res.error();
            warn("worker[{}]: lock failed (iter={}): category={}, message={}",
                 id, i, static_cast<int>(err.category), err.message);
            co_await system::this_coroutine::sleep_for(50ms);
            continue;
        }

        auto handle = lock_res.value();

        auto g = co_await data_client->get("demo:counter");
        if (!g)
        {
            const auto& err = g.error();
            error("worker[{}]: GET demo:counter failed: category={}, message={}",
                  id, static_cast<int>(err.category), err.message);
            co_await redlock->unlock(handle);
            co_return;
        }

        int64_t current = 0;
        if (g.value().has_value())
        {
            try
            {
                current = std::stoll(g.value().value());
            }
            catch (...)
            {
                warn("worker[{}]: invalid counter value='{}', reset to 0",
                     id, g.value().value());
                current = 0;
            }
        }

        int64_t next = current + 1;
        std::string next_str = std::to_string(next);

        auto s = co_await data_client->set("demo:counter", next_str);
        if (!s)
        {
            const auto& err = s.error();
            error("worker[{}]: SET demo:counter failed: category={}, message={}",
                  id, static_cast<int>(err.category), err.message);
            co_await redlock->unlock(handle);
            co_return;
        }

        info("worker[{}]: iter={} counter {} -> {}",
             id, i, current, next);

        auto u = co_await redlock->unlock(handle);
        if (!u)
        {
            const auto& err = u.error();
            warn("worker[{}]: unlock failed: category={}, message={}",
                 id, static_cast<int>(err.category), err.message);
        }

        co_await system::this_coroutine::sleep_for(10ms);
    }

    info("worker[{}]: done", id);
    co_return;
}

task::Awaitable<void> redlock_demo()
{
    info("redlock_demo: start");

    RedlockConfig rcfg;
    rcfg.ttl_ms        = 2000;
    rcfg.retry_count   = 5;
    rcfg.retry_delay_ms = 100;

    {
        RedisConfig n1;
        n1.host = "127.0.0.1";
        n1.port = 15100;

        RedisConfig n2 = n1;
        n2.port = 15101;

        RedisConfig n3 = n1;
        n3.port = 15102;

        rcfg.nodes.push_back(n1);
        rcfg.nodes.push_back(n2);
        rcfg.nodes.push_back(n3);
    }

    auto redlock = std::make_shared<RedisRedlock>(rcfg);
    auto redlock_conn = co_await redlock->connect_all();
    if (!redlock_conn)
    {
        const auto& err = redlock_conn.error();
        error("redlock_demo: connect_all failed: category={}, message={}",
              static_cast<int>(err.category), err.message);
        co_return;
    }
    info("redlock_demo: redlock nodes connected");

    RedisConfig data_cfg = rcfg.nodes.front();
    auto data_client = std::make_shared<RedisClient>(data_cfg);
    auto dc = co_await data_client->connect();
    if (!dc)
    {
        const auto& err = dc.error();
        error("redlock_demo: data_client connect failed: category={}, message={}",
              static_cast<int>(err.category), err.message);
        co_return;
    }
    info("redlock_demo: data_client connected");

    auto r = co_await data_client->set("demo:counter", "0");
    if (!r)
    {
        const auto& err = r.error();
        error("redlock_demo: reset demo:counter failed: category={}, message={}",
              static_cast<int>(err.category), err.message);
        co_return;
    }
    info("redlock_demo: demo:counter reset to 0");

    const int workers   = 4;
    const int iterations = 10;

    for (int i = 0; i < workers; ++i)
    {
        system::co_spawn(
            worker_coro(i, redlock, data_client, iterations));
    }

    using namespace std::chrono_literals;
    co_await system::this_coroutine::sleep_for(3s);

    auto g = co_await data_client->get("demo:counter");
    if (!g)
    {
        const auto& err = g.error();
        error("redlock_demo: final GET failed: category={}, message={}",
              static_cast<int>(err.category), err.message);
        co_return;
    }

    if (g.value().has_value())
    {
        info("redlock_demo: final demo:counter = '{}'", g.value().value());
    }
    else
    {
        warn("redlock_demo: final demo:counter -> (nil)");
    }

    info("redlock_demo: done");
    co_return;
}

int main()
{
    usub::ulog::ULogInit log_cfg{
        .trace_path = nullptr,
        .debug_path = nullptr,
        .info_path = nullptr,
        .warn_path = nullptr,
        .error_path = nullptr,
        .flush_interval_ns = 2'000'000ULL,
        .queue_capacity = 16384,
        .batch_size = 512,
        .enable_color_stdout = true,
        .max_file_size_bytes = 10 * 1024 * 1024,
        .max_files = 3,
        .json_mode = false,
        .track_metrics = true
    };

    usub::ulog::init(log_cfg);

    info("main(redlock): starting uvent");

    usub::Uvent uvent(4);
    system::co_spawn(redlock_demo());
    uvent.run();

    info("main(redlock): uvent stopped");
    return 0;
}