From cecfef66bbfb176a1245b7d0bda3380345141b58 Mon Sep 17 00:00:00 2001 From: Zachary Meadows Date: Fri, 14 Nov 2025 09:47:50 -0800 Subject: [PATCH 1/4] bug(net): loop vs. switch break in try_flush_rx_buffer() --- src/net/channel.ixx | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/net/channel.ixx b/src/net/channel.ixx index 89614aa..68d6038 100644 --- a/src/net/channel.ixx +++ b/src/net/channel.ixx @@ -154,7 +154,7 @@ private: tx_buf_.consume(send_rc); bytes_sent += send_rc; if (tx_buf_.empty()) { - break; + return bytes_sent; } } else if (send_rc == -1) { @@ -163,14 +163,14 @@ private: case EWOULDBLOCK: #endif case EAGAIN: { - break; + return bytes_sent; } case EINTR: { continue; } default: { handle_error_event(); - break; + return bytes_sent; } } } From a303a08d0cd09b9b9b3cd8c28bccb80f61ab8569 Mon Sep 17 00:00:00 2001 From: Zachary Meadows Date: Fri, 14 Nov 2025 11:12:18 -0800 Subject: [PATCH 2/4] bug(net): ignore epoll_ctl DEL errors when called for unregistered client fd --- src/net/reactor.ixx | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/net/reactor.ixx b/src/net/reactor.ixx index c894a6f..e3cb206 100644 --- a/src/net/reactor.ixx +++ b/src/net/reactor.ixx @@ -235,7 +235,10 @@ void Reactor::close_channel(Channel* channel) noexcept struct epoll_event ev{}; if (epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, client_fd, &ev) == -1) { - TSKV_LOG_WARN("epoll_ctl DEL failed for fd={}", client_fd); + // TODO[@zmeadows][P1]: structure things such that this check isn't necessary + if (errno != ENOENT) { // Ignore "not in set" errors + TSKV_LOG_WARN("epoll_ctl DEL failed for fd={}", client_fd); + } } channel->detach(); From 5e22d8d884a58630f128e5575bb121e426a9615c Mon Sep 17 00:00:00 2001 From: Zachary Meadows Date: Thu, 20 Nov 2025 15:45:17 -0800 Subject: [PATCH 3/4] docs: update README.md to reflect new project direction --- README.md | 243 ++++++++++++++++++++++++++++++++++++++++++++---------- 1 file changed, 199 insertions(+), 44 deletions(-) diff --git a/README.md b/README.md index 6c44124..cf74e8a 100644 --- a/README.md +++ b/README.md @@ -1,49 +1,204 @@ -# ⏳︎ tskv — Time-Series Key-Value Store +# ⏳︎ tskv — Time-Window Time-Series Key-Value Cache -**TL;DR:** Single-node, crash-safe time-series KV store with a non-blocking TCP server (Linux **epoll**) and LSM-style storage: **Write-Ahead Log (WAL) → memtable → immutable SSTables**, plus a background compaction worker. Built with **C++23 modules**; no third-party libraries. +**TL;DR:** `tskv` is a single-node, in-memory **time-window cache** for time-series data, with: + +- A simple binary protocol over a non-blocking TCP server (Linux **epoll**) +- A bounded sliding retention window (e.g., "last N minutes") +- Time-partitioned in-memory segments for efficient expiration +- Modern **C++23** with modules +- No third-party libraries + +Stage-0 (`v1.0`) focuses on a small, understandable core: a hot time window with clear semantics and basic observability, leaving WAL, multi-threading, and advanced features for future versions. --- ## ◎ Goals -- Demonstrate disciplined systems design in modern C++. -- Show clear **durability** boundaries (WAL append / optional sync) and **read-after-write** visibility. -- Keep **backpressure** and buffers **bounded** for predictable latency. -- Favor **correctness + measurable performance** over feature breadth. - -## ⛶ Architecture -- **Write path:** append to **WAL** → (optional `fdatasync`) → apply to **memtable** → periodic **flush** to **SSTable** (immutable, sorted). -- **Read path:** **memtable** first → then newest-to-oldest **SSTables**; per-file **Bloom filter** to skip negatives; index to jump to the right block. -- **Compaction:** merge overlapping SSTables, keep newest versions, drop obsolete ones; install via **manifest** with durable rename. - -## ⚑ Roadmap (high-level) -- [x] v0.1 — Bootstrap: README, CLI, PR template -- [x] v0.2 — Non-blocking TCP + epoll echo; clean shutdown -- [ ] v0.3 — Framing: header + length; PING/PONG -- [ ] v0.4 — Connection state: RX/TX rings; backpressure cap -- [ ] v0.5 — Engine queues: SPSC/MPSC; dispatcher -- [ ] v0.6 — WAL v1: append+CRC; sync policy flag -- [ ] v0.7 — Recovery: replay WAL; torn-tail safe -- [ ] v0.8 — Memtable v0: std::map; PUT/GET end-to-end -- [ ] v0.9 — SSTable v1: writer/reader; mmap; footer -- [ ] v0.10 — Manifest: live tables; durable rename -- [ ] v0.11 — Wire-through: GET/PUT via SST path -- [ ] v0.12 — Bloom filters: per-SST; bits/key tuning -- [ ] v0.13 — Memtable v1: skiplist + iterator -- [ ] v0.14 — SCAN RPC: streaming RESP; writev batches -- [ ] v0.15 — Concurrency: N I/O, M engine; fairness -- [ ] v0.16 — Metrics: counters + p50/p95/p99 endpoint -- [ ] v0.17 — Compaction v1: merge + manifest install -- [ ] v0.18 — Chaos tests: disk-full; kill-9 loops -- [ ] v0.19 — Perf pass: micro/macro benches; notes -- [ ] v1.0 — Polish: docs, demo.sh, ASan/UBSan; release - -## ∷ C++ Module Layout -- `tskv.common.*` — logging, metrics, ring buffers, fs helpers -- `tskv.net.*` — socket (non-blocking), reactor (**epoll**, edge-triggered), connection, rpc -- `tskv.kv.*` — engine, wal, memtable, sstable, manifest, compaction, filters - -## ∑ Metrics (planned) -- **net:** connections_open, rx_bytes_total, tx_bytes_total, backpressure_events_total -- **rpc:** put_total, get_total, scan_total, errors_total -- **wal/sstable:** appends_total, fsync_total, files_total, bloom_negative_total -- **latency:** p50/p95/p99 for GET & PUT + +- Provide a compact, readable example of a **time-window time-series cache**: + - Bounded memory via a fixed retention window + - Explicit "visible data" contract: recent data only +- Demonstrate disciplined systems design in **modern C++**: + - C++23 modules + - Non-blocking I/O with **epoll** +- Favor **correctness + clear invariants** over feature breadth: + - Simple, explicit write and read paths + - Straightforward retention / expiration logic +- Keep latency and resource usage **predictable**: + - No unbounded growth from infinite history + - Easy-to-reason-about hot path + +--- + +## ⛶ Architecture (Stage-0) + +### Data model + +- Keys are time-series identifiers (e.g. `cpu.user`, `service=api,host=foo`). +- Each write unit is a tuple `(series_id, timestamp, value)`. +- The store maintains only data within a **sliding time window**: + - `timestamp >= now - WINDOW` + - Older data is considered expired and is eventually dropped. + +### In-memory layout + +- Data is stored in **time-partitioned segments**: + - Each segment covers a fixed time slice (e.g. 1–10 seconds). + - Segments are organized in time order (oldest → newest). + - New writes go to the current "tail" segment. +- A periodic retention pass drops the oldest segments whose time range is fully outside the configured window. + +This keeps memory and data size bounded by `WINDOW`, not by total insert volume. + +### Network path + +- Single-node server using **non-blocking TCP** and **epoll**. +- Simple length-prefixed framing for requests and responses. +- Initial RPCs: + - `PING / PONG` for connectivity checks. + - `PUT_TS_AT(series_id, timestamp, value)` to append a point. + - `GET_TS_LATEST(series_id)` to fetch the latest point in-window. + - `GET_TS_RANGE(series_id, from_ts, to_ts)` to read points for a series over a time range (clipped to the window). + +--- + +## ⚑ Roadmap + +### Implemented + +- [x] **v0.1 — Bootstrap** + - README, minimal CLI stub, basic project layout + - PR template and basic coding conventions + +- [x] **v0.2 — Non-blocking TCP** + - Non-blocking server with **epoll** + - Basic echo handler for manual testing + - Clean shutdown path + +### Planned to v1.0 (Stage-0) + +- [ ] **v0.3 — Framing + basic RPCs** + - Length-prefixed request/response framing + - `PING` / `PONG` and error responses + - Skeleton handlers for time-series commands + +- [ ] **v0.4 — In-memory window store v0** + - Global `WINDOW` config (e.g. last N minutes) + - Single in-memory container for `(series_id, timestamp, value)` + - Simple, periodic cleanup of expired entries + - `PUT_TS_AT` + `GET_TS_LATEST` end-to-end + +- [ ] **v0.5 — Time-partitioned segments** + - Replace the single container with fixed-duration segments + - Append writes to the current segment + - Drop whole segments when they fall completely out of window + - Basic `GET_TS_RANGE(series_id, from_ts, to_ts)` over segments + +- [ ] **v0.6 — Window-aware introspection** + - `WINDOW_INFO` RPC: + - Window size, number of segments + - Approximate memory usage + - Debug dump of segments and series counts + +- [ ] **v0.7 — Metrics** + - Simple counters and gauges: + - `ts_put_total`, `ts_get_latest_total`, `ts_range_total`, `ts_errors_total` + - `window_segments`, `window_series_approx`, `window_points_approx` + - Text or simple binary metrics endpoint/command + +- [ ] **v0.8 — Indexing pass (per-segment)** + - Optional per-segment index: + - `series_id -> offsets` + - Speed up `GET_TS_LATEST` and `GET_TS_RANGE` without scanning all entries + - Microbenchmarks for lookup vs. scan + +- [ ] **v0.9 — Reliability + perf pass** + - Basic property tests for window semantics: + - Writes with timestamps < `now - WINDOW` are never visible + - Writes with timestamps in the window remain visible + - Simple load generator for PUT/GET/GET_TS_RANGE + - First round of notes on throughput/latency + +- [ ] **v1.0 — Stage-0 release** + - Minimal but complete time-window cache: + - Protocol, window store, segments, indexing, basic metrics + - Documentation: + - Architecture overview + - Wire protocol reference + - Example usage script (`demo.sh`) + - Sanitizers in CI (ASan/UBSan) and a small test suite + +--- + +## ∷ C++ Module Layout (Stage-0) + +- `tskv.common.*` + - Logging, basic metrics types, time helpers + - Small ring buffers / utility containers +- `tskv.net.*` + - Socket wrapper (non-blocking) + - Reactor (**epoll**, edge-triggered) + - Connection and RPC framing +- `tskv.window.*` + - In-memory segments + - Window management (retention, expiration) + - Query execution (latest / range) + - Simple per-segment indexing + +--- + +## ∑ Metrics (Stage-0, planned) + +- **net:** + - `net_connections_open` + - `net_rx_bytes_total` + - `net_tx_bytes_total` + +- **rpc:** + - `rpc_ping_total` + - `ts_X_total` (for all RPCs `X`) + - `rpc_errors_total` + +- **window:** + - `window_segments` + - `window_points_approx` + - `window_series_approx` + +- **latency (sample-based, coarse):** + - p50 / p95 / p99 for: + - `PUT_TS_AT` + - `GET_TS_LATEST` + - `GET_TS_RANGE` + +--- + +## After v1.0 (ideas, not committed) + +These are potential extensions beyond the stage-0 scope: + +- **Durability for the hot window** + - WAL segments per time-partitioned segment + - Startup replay to reconstruct the last N minutes after a crash + +- **Multi-threading** + - Split I/O and engine into separate threads + - Shard data across multiple engine threads by series id + + **Protocol extensions** + - `PUT_TS_BATCH` for multiple irregular/sparse timestamps + - `PUT_TS_STEPS` for multiple regularly spaced timestamps + - `GET_TS_AT` for exact-timestamp lookups + +- **Richer time-series semantics** + - Per-series TTL overrides + - Server-side aggregates: + - Windowed `SUM` / `AVG` / `MIN` / `MAX` RPCs + +- **Advanced observability** + - More detailed metrics (per-series or per-connection) + - Debug RPCs to inspect segment contents, hot keys, etc. + +- **Replication / HA experiments** + - Simple follower replication for the time window + - Eventually-consistent read replicas + +Stage-0 (`v1.0`) stays deliberately small: a single-node, in-memory time-window cache with a clear contract and straightforward implementation. Everything else can grow out of that foundation. From ed524e506654b7c22f571dbe63c37653c7830f1e Mon Sep 17 00:00:00 2001 From: Zachary Meadows Date: Fri, 5 Dec 2025 08:29:05 -0800 Subject: [PATCH 4/4] chore: factor out accept_client function --- include/tskv/common/logging.hpp | 2 ++ src/net/channel.ixx | 1 - src/net/reactor.ixx | 35 +++------------------------ src/net/socket.ixx | 42 +++++++++++++++++++++++++++++++++ 4 files changed, 47 insertions(+), 33 deletions(-) diff --git a/include/tskv/common/logging.hpp b/include/tskv/common/logging.hpp index 3bb84ec..604be52 100644 --- a/include/tskv/common/logging.hpp +++ b/include/tskv/common/logging.hpp @@ -48,6 +48,7 @@ #define TSKV_SET_LOG_LEVEL(level_name) \ ::tskv::common::set_log_level(::tskv::common::LogLevel::level_name) +// calls std::abort #define TSKV_DEMAND(expr, fmt, ...) \ do { \ if (!(expr)) [[unlikely]] { \ @@ -56,6 +57,7 @@ } \ } while (false) +// calls std::exit #define TSKV_REQUIRE(expr, fmt, ...) \ do { \ if (!(expr)) [[unlikely]] { \ diff --git a/src/net/channel.ixx b/src/net/channel.ixx index 68d6038..3aaa6a5 100644 --- a/src/net/channel.ixx +++ b/src/net/channel.ixx @@ -9,7 +9,6 @@ module; #include #include #include -#include #include #include #include diff --git a/src/net/reactor.ixx b/src/net/reactor.ixx index e3cb206..0e3652e 100644 --- a/src/net/reactor.ixx +++ b/src/net/reactor.ixx @@ -5,6 +5,7 @@ module; #include #include #include +#include #include #include #include @@ -281,38 +282,8 @@ void Reactor::on_channel_event(Channel* channel, std::uint32_t eve template void Reactor::on_listener_event() noexcept { - while (true) { - sockaddr_storage client_addr{}; - socklen_t client_addr_size = sizeof client_addr; - - int client_fd = accept4( - listener_fd_, (sockaddr*)&client_addr, &client_addr_size, SOCK_NONBLOCK | SOCK_CLOEXEC); - - if (client_fd == -1) { - if (errno == EAGAIN || errno == EWOULDBLOCK) { - // done processing new channels, try again later - return; - } - - TSKV_LOG_WARN("failed to accept new channel: errno={}", errno); - - switch (errno) { - case EMFILE: - metrics::inc_counter<"net.accept_error.emfile">(); - break; - case ENFILE: - metrics::inc_counter<"net.accept_error.enfile">(); - break; - case ENOBUFS: - metrics::inc_counter<"net.accept_error.enobufs">(); - break; - default: - metrics::inc_counter<"net.accept_error.other">(); - break; - } - - return; - } + while (std::optional o_client_fd = accept_client(listener_fd_)) { + const int client_fd = *o_client_fd; Channel* channel = pool_.acquire(client_fd); channel->attach(client_fd); diff --git a/src/net/socket.ixx b/src/net/socket.ixx index 8cb0e12..a6a7bb4 100644 --- a/src/net/socket.ixx +++ b/src/net/socket.ixx @@ -5,8 +5,10 @@ module; #include #include #include +#include #include #include +#include #include #include #include @@ -16,9 +18,49 @@ module; export module tskv.net.socket; import tskv.common.logging; +import tskv.common.metrics; + +namespace metrics = tskv::common::metrics; export namespace tskv::net { +std::optional accept_client(int listener_fd) +{ + sockaddr_storage client_addr{}; + socklen_t client_addr_size = sizeof client_addr; + + const int client_fd = + accept4(listener_fd, (sockaddr*)&client_addr, &client_addr_size, SOCK_NONBLOCK | SOCK_CLOEXEC); + + if (client_fd == -1) { + if (errno == EAGAIN || errno == EWOULDBLOCK) { + // done processing new channels, try again later + return {}; + } + + TSKV_LOG_WARN("failed to accept new channel: errno={}", errno); + + switch (errno) { + case EMFILE: + metrics::inc_counter<"net.accept_error.emfile">(); + break; + case ENFILE: + metrics::inc_counter<"net.accept_error.enfile">(); + break; + case ENOBUFS: + metrics::inc_counter<"net.accept_error.enobufs">(); + break; + default: + metrics::inc_counter<"net.accept_error.other">(); + break; + } + + return {}; + } + + return client_fd; +} + int start_listener(const char* const host, std::uint16_t port) { addrinfo hints{};