Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
243 changes: 199 additions & 44 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,49 +1,204 @@
# ⏳︎ tskv — Time-Series Key-Value Store
# ⏳︎ tskv — Time-Window Time-Series Key-Value Cache

**TL;DR:** Single-node, crash-safe time-series KV store with a non-blocking TCP server (Linux **epoll**) and LSM-style storage: **Write-Ahead Log (WAL) → memtable → immutable SSTables**, plus a background compaction worker. Built with **C++23 modules**; no third-party libraries.
**TL;DR:** `tskv` is a single-node, in-memory **time-window cache** for time-series data, with:

- A simple binary protocol over a non-blocking TCP server (Linux **epoll**)
- A bounded sliding retention window (e.g., "last N minutes")
- Time-partitioned in-memory segments for efficient expiration
- Modern **C++23** with modules
- No third-party libraries

Stage-0 (`v1.0`) focuses on a small, understandable core: a hot time window with clear semantics and basic observability, leaving WAL, multi-threading, and advanced features for future versions.

---

## ◎ Goals
- Demonstrate disciplined systems design in modern C++.
- Show clear **durability** boundaries (WAL append / optional sync) and **read-after-write** visibility.
- Keep **backpressure** and buffers **bounded** for predictable latency.
- Favor **correctness + measurable performance** over feature breadth.

## ⛶ Architecture
- **Write path:** append to **WAL** → (optional `fdatasync`) → apply to **memtable** → periodic **flush** to **SSTable** (immutable, sorted).
- **Read path:** **memtable** first → then newest-to-oldest **SSTables**; per-file **Bloom filter** to skip negatives; index to jump to the right block.
- **Compaction:** merge overlapping SSTables, keep newest versions, drop obsolete ones; install via **manifest** with durable rename.

## ⚑ Roadmap (high-level)
- [x] v0.1 — Bootstrap: README, CLI, PR template
- [x] v0.2 — Non-blocking TCP + epoll echo; clean shutdown
- [ ] v0.3 — Framing: header + length; PING/PONG
- [ ] v0.4 — Connection state: RX/TX rings; backpressure cap
- [ ] v0.5 — Engine queues: SPSC/MPSC; dispatcher
- [ ] v0.6 — WAL v1: append+CRC; sync policy flag
- [ ] v0.7 — Recovery: replay WAL; torn-tail safe
- [ ] v0.8 — Memtable v0: std::map; PUT/GET end-to-end
- [ ] v0.9 — SSTable v1: writer/reader; mmap; footer
- [ ] v0.10 — Manifest: live tables; durable rename
- [ ] v0.11 — Wire-through: GET/PUT via SST path
- [ ] v0.12 — Bloom filters: per-SST; bits/key tuning
- [ ] v0.13 — Memtable v1: skiplist + iterator
- [ ] v0.14 — SCAN RPC: streaming RESP; writev batches
- [ ] v0.15 — Concurrency: N I/O, M engine; fairness
- [ ] v0.16 — Metrics: counters + p50/p95/p99 endpoint
- [ ] v0.17 — Compaction v1: merge + manifest install
- [ ] v0.18 — Chaos tests: disk-full; kill-9 loops
- [ ] v0.19 — Perf pass: micro/macro benches; notes
- [ ] v1.0 — Polish: docs, demo.sh, ASan/UBSan; release

## ∷ C++ Module Layout
- `tskv.common.*` — logging, metrics, ring buffers, fs helpers
- `tskv.net.*` — socket (non-blocking), reactor (**epoll**, edge-triggered), connection, rpc
- `tskv.kv.*` — engine, wal, memtable, sstable, manifest, compaction, filters

## ∑ Metrics (planned)
- **net:** connections_open, rx_bytes_total, tx_bytes_total, backpressure_events_total
- **rpc:** put_total, get_total, scan_total, errors_total
- **wal/sstable:** appends_total, fsync_total, files_total, bloom_negative_total
- **latency:** p50/p95/p99 for GET & PUT

- Provide a compact, readable example of a **time-window time-series cache**:
- Bounded memory via a fixed retention window
- Explicit "visible data" contract: recent data only
- Demonstrate disciplined systems design in **modern C++**:
- C++23 modules
- Non-blocking I/O with **epoll**
- Favor **correctness + clear invariants** over feature breadth:
- Simple, explicit write and read paths
- Straightforward retention / expiration logic
- Keep latency and resource usage **predictable**:
- No unbounded growth from infinite history
- Easy-to-reason-about hot path

---

## ⛶ Architecture (Stage-0)

### Data model

- Keys are time-series identifiers (e.g. `cpu.user`, `service=api,host=foo`).
- Each write unit is a tuple `(series_id, timestamp, value)`.
- The store maintains only data within a **sliding time window**:
- `timestamp >= now - WINDOW`
- Older data is considered expired and is eventually dropped.

### In-memory layout

- Data is stored in **time-partitioned segments**:
- Each segment covers a fixed time slice (e.g. 1–10 seconds).
- Segments are organized in time order (oldest → newest).
- New writes go to the current "tail" segment.
- A periodic retention pass drops the oldest segments whose time range is fully outside the configured window.

This keeps memory and data size bounded by `WINDOW`, not by total insert volume.

### Network path

- Single-node server using **non-blocking TCP** and **epoll**.
- Simple length-prefixed framing for requests and responses.
- Initial RPCs:
- `PING / PONG` for connectivity checks.
- `PUT_TS_AT(series_id, timestamp, value)` to append a point.
- `GET_TS_LATEST(series_id)` to fetch the latest point in-window.
- `GET_TS_RANGE(series_id, from_ts, to_ts)` to read points for a series over a time range (clipped to the window).

---

## ⚑ Roadmap

### Implemented

- [x] **v0.1 — Bootstrap**
- README, minimal CLI stub, basic project layout
- PR template and basic coding conventions

- [x] **v0.2 — Non-blocking TCP**
- Non-blocking server with **epoll**
- Basic echo handler for manual testing
- Clean shutdown path

### Planned to v1.0 (Stage-0)

- [ ] **v0.3 — Framing + basic RPCs**
- Length-prefixed request/response framing
- `PING` / `PONG` and error responses
- Skeleton handlers for time-series commands

- [ ] **v0.4 — In-memory window store v0**
- Global `WINDOW` config (e.g. last N minutes)
- Single in-memory container for `(series_id, timestamp, value)`
- Simple, periodic cleanup of expired entries
- `PUT_TS_AT` + `GET_TS_LATEST` end-to-end

- [ ] **v0.5 — Time-partitioned segments**
- Replace the single container with fixed-duration segments
- Append writes to the current segment
- Drop whole segments when they fall completely out of window
- Basic `GET_TS_RANGE(series_id, from_ts, to_ts)` over segments

- [ ] **v0.6 — Window-aware introspection**
- `WINDOW_INFO` RPC:
- Window size, number of segments
- Approximate memory usage
- Debug dump of segments and series counts

- [ ] **v0.7 — Metrics**
- Simple counters and gauges:
- `ts_put_total`, `ts_get_latest_total`, `ts_range_total`, `ts_errors_total`
- `window_segments`, `window_series_approx`, `window_points_approx`
- Text or simple binary metrics endpoint/command

- [ ] **v0.8 — Indexing pass (per-segment)**
- Optional per-segment index:
- `series_id -> offsets`
- Speed up `GET_TS_LATEST` and `GET_TS_RANGE` without scanning all entries
- Microbenchmarks for lookup vs. scan

- [ ] **v0.9 — Reliability + perf pass**
- Basic property tests for window semantics:
- Writes with timestamps < `now - WINDOW` are never visible
- Writes with timestamps in the window remain visible
- Simple load generator for PUT/GET/GET_TS_RANGE
- First round of notes on throughput/latency

- [ ] **v1.0 — Stage-0 release**
- Minimal but complete time-window cache:
- Protocol, window store, segments, indexing, basic metrics
- Documentation:
- Architecture overview
- Wire protocol reference
- Example usage script (`demo.sh`)
- Sanitizers in CI (ASan/UBSan) and a small test suite

---

## ∷ C++ Module Layout (Stage-0)

- `tskv.common.*`
- Logging, basic metrics types, time helpers
- Small ring buffers / utility containers
- `tskv.net.*`
- Socket wrapper (non-blocking)
- Reactor (**epoll**, edge-triggered)
- Connection and RPC framing
- `tskv.window.*`
- In-memory segments
- Window management (retention, expiration)
- Query execution (latest / range)
- Simple per-segment indexing

---

## ∑ Metrics (Stage-0, planned)

- **net:**
- `net_connections_open`
- `net_rx_bytes_total`
- `net_tx_bytes_total`

- **rpc:**
- `rpc_ping_total`
- `ts_X_total` (for all RPCs `X`)
- `rpc_errors_total`

- **window:**
- `window_segments`
- `window_points_approx`
- `window_series_approx`

- **latency (sample-based, coarse):**
- p50 / p95 / p99 for:
- `PUT_TS_AT`
- `GET_TS_LATEST`
- `GET_TS_RANGE`

---

## After v1.0 (ideas, not committed)

These are potential extensions beyond the stage-0 scope:

- **Durability for the hot window**
- WAL segments per time-partitioned segment
- Startup replay to reconstruct the last N minutes after a crash

- **Multi-threading**
- Split I/O and engine into separate threads
- Shard data across multiple engine threads by series id

**Protocol extensions**
- `PUT_TS_BATCH` for multiple irregular/sparse timestamps
- `PUT_TS_STEPS` for multiple regularly spaced timestamps
- `GET_TS_AT` for exact-timestamp lookups

- **Richer time-series semantics**
- Per-series TTL overrides
- Server-side aggregates:
- Windowed `SUM` / `AVG` / `MIN` / `MAX` RPCs

- **Advanced observability**
- More detailed metrics (per-series or per-connection)
- Debug RPCs to inspect segment contents, hot keys, etc.

- **Replication / HA experiments**
- Simple follower replication for the time window
- Eventually-consistent read replicas

Stage-0 (`v1.0`) stays deliberately small: a single-node, in-memory time-window cache with a clear contract and straightforward implementation. Everything else can grow out of that foundation.
2 changes: 2 additions & 0 deletions include/tskv/common/logging.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
#define TSKV_SET_LOG_LEVEL(level_name) \
::tskv::common::set_log_level(::tskv::common::LogLevel::level_name)

// calls std::abort
#define TSKV_DEMAND(expr, fmt, ...) \
do { \
if (!(expr)) [[unlikely]] { \
Expand All @@ -56,6 +57,7 @@
} \
} while (false)

// calls std::exit
#define TSKV_REQUIRE(expr, fmt, ...) \
do { \
if (!(expr)) [[unlikely]] { \
Expand Down
7 changes: 3 additions & 4 deletions src/net/channel.ixx
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ module;
#include <cstring>
#include <limits>
#include <memory>
#include <netdb.h>
#include <numeric>
#include <span>
#include <sys/epoll.h>
Expand Down Expand Up @@ -154,7 +153,7 @@ private:
tx_buf_.consume(send_rc);
bytes_sent += send_rc;
if (tx_buf_.empty()) {
break;
return bytes_sent;
}
}
else if (send_rc == -1) {
Expand All @@ -163,14 +162,14 @@ private:
case EWOULDBLOCK:
#endif
case EAGAIN: {
break;
return bytes_sent;
}
case EINTR: {
continue;
}
default: {
handle_error_event();
break;
return bytes_sent;
}
}
}
Expand Down
40 changes: 7 additions & 33 deletions src/net/reactor.ixx
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ module;
#include <cstring>
#include <errno.h>
#include <fcntl.h>
#include <optional>
#include <signal.h>
#include <sys/epoll.h>
#include <sys/eventfd.h>
Expand Down Expand Up @@ -235,7 +236,10 @@ void Reactor<Proto>::close_channel(Channel<Proto>* channel) noexcept

struct epoll_event ev{};
if (epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, client_fd, &ev) == -1) {
TSKV_LOG_WARN("epoll_ctl DEL failed for fd={}", client_fd);
// TODO[@zmeadows][P1]: structure things such that this check isn't necessary
if (errno != ENOENT) { // Ignore "not in set" errors
TSKV_LOG_WARN("epoll_ctl DEL failed for fd={}", client_fd);
}
}

channel->detach();
Expand Down Expand Up @@ -278,38 +282,8 @@ void Reactor<Proto>::on_channel_event(Channel<Proto>* channel, std::uint32_t eve
template <Protocol Proto>
void Reactor<Proto>::on_listener_event() noexcept
{
while (true) {
sockaddr_storage client_addr{};
socklen_t client_addr_size = sizeof client_addr;

int client_fd = accept4(
listener_fd_, (sockaddr*)&client_addr, &client_addr_size, SOCK_NONBLOCK | SOCK_CLOEXEC);

if (client_fd == -1) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
// done processing new channels, try again later
return;
}

TSKV_LOG_WARN("failed to accept new channel: errno={}", errno);

switch (errno) {
case EMFILE:
metrics::inc_counter<"net.accept_error.emfile">();
break;
case ENFILE:
metrics::inc_counter<"net.accept_error.enfile">();
break;
case ENOBUFS:
metrics::inc_counter<"net.accept_error.enobufs">();
break;
default:
metrics::inc_counter<"net.accept_error.other">();
break;
}

return;
}
while (std::optional<int> o_client_fd = accept_client(listener_fd_)) {
const int client_fd = *o_client_fd;

Channel<Proto>* channel = pool_.acquire(client_fd);
channel->attach(client_fd);
Expand Down
Loading