Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,7 @@ pnpm-debug.log*
# Temporary
tmp
temp

# Playwright artifacts
test-results
playwright-report
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

103 changes: 103 additions & 0 deletions crates/echo-session-client/examples/publish_pulse.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
// SPDX-License-Identifier: Apache-2.0
// © James Ross Ω FLYING•ROBOTS <https://github.com/flyingrobots>
//! Minimal CLI publisher for exercising the session hub / WVP pipeline.
//!
//! This example connects to the Unix socket session hub, sends a handshake,
//! subscribes to a `warp_id`, then publishes:
//! - a snapshot at epoch 0
//! - N empty diffs (gapless epochs)
//!
//! It exists purely to make it easy to verify that `echo-session-service` +
//! `echo-session-ws-gateway` + `/dashboard` are alive without needing to run the
//! full GUI viewer.

use anyhow::{Context, Result};
use echo_session_proto::{
wire::encode_message, HandshakePayload, Message, RenderGraph, WarpDiff, WarpFrame, WarpId,
WarpSnapshot,
};
use std::io::Write;
use std::os::unix::net::UnixStream;
use std::time::Duration;

fn main() -> Result<()> {
let mut args = std::env::args().skip(1);
let socket_path = args
.next()
.unwrap_or_else(|| "/tmp/echo-session.sock".to_string());
let warp_id: WarpId = args
.next()
.as_deref()
.unwrap_or("1")
.parse()
.context("parse warp_id")?;
let diffs: u64 = args
.next()
.as_deref()
.unwrap_or("5")
.parse()
.context("parse diffs")?;
let delay_ms: u64 = args
.next()
.as_deref()
.unwrap_or("250")
.parse()
.context("parse delay_ms")?;
Comment on lines +28 to +45
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

Consider validating CLI arguments to fail fast on invalid inputs.

The argument parsing accepts any parseable value without validation. For example, diffs or delay_ms could be 0 or negative, leading to confusing behavior (immediate exit from the loop or potential panics).

🔎 Suggested validation
     let warp_id: WarpId = args
         .next()
         .as_deref()
         .unwrap_or("1")
         .parse()
-        .context("parse warp_id")?;
+        .context("parse warp_id")
+        .and_then(|id| {
+            if id == 0 {
+                anyhow::bail!("warp_id must be > 0")
+            }
+            Ok(id)
+        })?;
     let diffs: u64 = args
         .next()
         .as_deref()
         .unwrap_or("5")
         .parse()
-        .context("parse diffs")?;
+        .context("parse diffs")
+        .and_then(|n| {
+            if n == 0 {
+                anyhow::bail!("diffs must be > 0")
+            }
+            Ok(n)
+        })?;

While this is a test-only example, failing fast on nonsensical arguments improves debuggability.

Committable suggestion skipped: line range outside the PR's diff.

🤖 Prompt for AI Agents
In crates/echo-session-client/examples/publish_pulse.rs around lines 28–45, the
parsed CLI values (diffs and delay_ms, and optionally warp_id) are accepted
without semantic validation; add explicit checks after parsing to fail fast on
invalid values (e.g., ensure diffs > 0 and delay_ms > 0, and if WarpId must be
positive validate that too). When a check fails, return an error with context
(using anyhow::bail or mapping to a Context) that clearly states which argument
is invalid and why; this keeps parsing and validation separate and makes errors
actionable.


let mut stream =
UnixStream::connect(&socket_path).with_context(|| format!("connect {socket_path}"))?;

// Best-effort: reduce latency if supported.
let _ = stream.set_nonblocking(false);
Comment on lines +50 to +51
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Misleading comment: blocking mode doesn't reduce latency, and the call is redundant.

UnixStream is blocking by default. Calling set_nonblocking(false) is a no-op, and claiming it "reduces latency" is incorrect—blocking I/O can actually increase latency in some scenarios by preventing concurrent operations.

🔎 Suggested fix
-    // Best-effort: reduce latency if supported.
-    let _ = stream.set_nonblocking(false);
+    // Ensure blocking mode (default, but explicit for clarity).
+    let _ = stream.set_nonblocking(false);

Or simply remove the lines if the default is acceptable.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// Best-effort: reduce latency if supported.
let _ = stream.set_nonblocking(false);
// Ensure blocking mode (default, but explicit for clarity).
let _ = stream.set_nonblocking(false);
🤖 Prompt for AI Agents
In crates/echo-session-client/examples/publish_pulse.rs around lines 50-51, the
comment and call to stream.set_nonblocking(false) are misleading and redundant
because UnixStream is blocking by default and forcing blocking won't reduce
latency; remove the set_nonblocking(false) call and either delete or reword the
comment to reflect that the stream is already blocking by default (or remove the
comment entirely).


let pkt = encode_message(
Message::Handshake(HandshakePayload {
client_version: 1,
capabilities: vec!["demo:pulse".into()],
agent_id: Some("echo-session-client-example:publish_pulse".into()),
session_meta: None,
}),
0,
)
.context("encode handshake")?;
stream.write_all(&pkt).context("write handshake")?;

let pkt = encode_message(Message::SubscribeWarp { warp_id }, 0).context("encode subscribe")?;
stream.write_all(&pkt).context("write subscribe")?;

let snapshot = WarpFrame::Snapshot(WarpSnapshot {
epoch: 0,
graph: RenderGraph::default(),
state_hash: None,
});
let pkt = encode_message(
Message::WarpStream {
warp_id,
frame: snapshot,
},
0,
)
.context("encode snapshot")?;
stream.write_all(&pkt).context("write snapshot")?;

for i in 0..diffs {
let diff = WarpFrame::Diff(WarpDiff {
from_epoch: i,
to_epoch: i.saturating_add(1),
ops: vec![],
state_hash: None,
});
let pkt = encode_message(
Message::WarpStream {
warp_id,
frame: diff,
},
0,
)
.context("encode diff")?;
stream.write_all(&pkt).context("write diff")?;
std::thread::sleep(Duration::from_millis(delay_ms));
}
Comment on lines +53 to +100
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

Consider documenting why all timestamps are hardcoded to 0.

Every message uses ts: 0 (lines 60, 65, 78, 95) without explanation. For a deterministic test publisher, this is probably intentional, but a comment explaining the rationale would clarify intent.

🔎 Suggested addition

Add a comment near the first occurrence:

+    // Use timestamp 0 for all messages to keep test traffic deterministic.
     let pkt = encode_message(
         Message::Handshake(HandshakePayload {
             client_version: 1,
             capabilities: vec!["demo:pulse".into()],
             agent_id: Some("echo-session-client-example:publish_pulse".into()),
             session_meta: None,
         }),
         0,
     )

This helps future readers understand the design choice.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
let pkt = encode_message(
Message::Handshake(HandshakePayload {
client_version: 1,
capabilities: vec!["demo:pulse".into()],
agent_id: Some("echo-session-client-example:publish_pulse".into()),
session_meta: None,
}),
0,
)
.context("encode handshake")?;
stream.write_all(&pkt).context("write handshake")?;
let pkt = encode_message(Message::SubscribeWarp { warp_id }, 0).context("encode subscribe")?;
stream.write_all(&pkt).context("write subscribe")?;
let snapshot = WarpFrame::Snapshot(WarpSnapshot {
epoch: 0,
graph: RenderGraph::default(),
state_hash: None,
});
let pkt = encode_message(
Message::WarpStream {
warp_id,
frame: snapshot,
},
0,
)
.context("encode snapshot")?;
stream.write_all(&pkt).context("write snapshot")?;
for i in 0..diffs {
let diff = WarpFrame::Diff(WarpDiff {
from_epoch: i,
to_epoch: i.saturating_add(1),
ops: vec![],
state_hash: None,
});
let pkt = encode_message(
Message::WarpStream {
warp_id,
frame: diff,
},
0,
)
.context("encode diff")?;
stream.write_all(&pkt).context("write diff")?;
std::thread::sleep(Duration::from_millis(delay_ms));
}
// Use timestamp 0 for all messages to keep test traffic deterministic.
let pkt = encode_message(
Message::Handshake(HandshakePayload {
client_version: 1,
capabilities: vec!["demo:pulse".into()],
agent_id: Some("echo-session-client-example:publish_pulse".into()),
session_meta: None,
}),
0,
)
.context("encode handshake")?;
stream.write_all(&pkt).context("write handshake")?;
let pkt = encode_message(Message::SubscribeWarp { warp_id }, 0).context("encode subscribe")?;
stream.write_all(&pkt).context("write subscribe")?;
let snapshot = WarpFrame::Snapshot(WarpSnapshot {
epoch: 0,
graph: RenderGraph::default(),
state_hash: None,
});
let pkt = encode_message(
Message::WarpStream {
warp_id,
frame: snapshot,
},
0,
)
.context("encode snapshot")?;
stream.write_all(&pkt).context("write snapshot")?;
for i in 0..diffs {
let diff = WarpFrame::Diff(WarpDiff {
from_epoch: i,
to_epoch: i.saturating_add(1),
ops: vec![],
state_hash: None,
});
let pkt = encode_message(
Message::WarpStream {
warp_id,
frame: diff,
},
0,
)
.context("encode diff")?;
stream.write_all(&pkt).context("write diff")?;
std::thread::sleep(Duration::from_millis(delay_ms));
}
🤖 Prompt for AI Agents
In crates/echo-session-client/examples/publish_pulse.rs around lines 53-100,
several messages set ts: 0 (notably at the handshake, subscribe, snapshot and
diff messages around lines 60, 65, 78, 95) with no explanation; add a short
comment at the first occurrence explaining that timestamps are intentionally
hardcoded to 0 (e.g., to produce a deterministic test publisher/reproducible
ordering and simplify tests) and note that this is deliberate and should be
replaced or randomized if used for real/production scenarios.


Ok(())
}
Comment on lines +23 to +103
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Missing rustdoc on main() violates strict documentation policy.

Per coding guidelines: "Every public API across crates must carry rustdoc comments... Run cargo clippy --all-targets -- -D missing_docs before every PR." While examples are often exempted, the guideline is absolute and covers --all-targets.

🔎 Proposed addition
+/// CLI publisher that connects to the session hub and publishes a snapshot + gapless diffs.
+///
+/// Usage: `publish_pulse [socket_path] [warp_id] [diffs] [delay_ms]`
+///
+/// Defaults: `/tmp/echo-session.sock`, warp_id=1, diffs=5, delay_ms=250.
 fn main() -> Result<()> {

As per coding guidelines, rustdoc is mandatory even for examples when building with --all-targets.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
fn main() -> Result<()> {
let mut args = std::env::args().skip(1);
let socket_path = args
.next()
.unwrap_or_else(|| "/tmp/echo-session.sock".to_string());
let warp_id: WarpId = args
.next()
.as_deref()
.unwrap_or("1")
.parse()
.context("parse warp_id")?;
let diffs: u64 = args
.next()
.as_deref()
.unwrap_or("5")
.parse()
.context("parse diffs")?;
let delay_ms: u64 = args
.next()
.as_deref()
.unwrap_or("250")
.parse()
.context("parse delay_ms")?;
let mut stream =
UnixStream::connect(&socket_path).with_context(|| format!("connect {socket_path}"))?;
// Best-effort: reduce latency if supported.
let _ = stream.set_nonblocking(false);
let pkt = encode_message(
Message::Handshake(HandshakePayload {
client_version: 1,
capabilities: vec!["demo:pulse".into()],
agent_id: Some("echo-session-client-example:publish_pulse".into()),
session_meta: None,
}),
0,
)
.context("encode handshake")?;
stream.write_all(&pkt).context("write handshake")?;
let pkt = encode_message(Message::SubscribeWarp { warp_id }, 0).context("encode subscribe")?;
stream.write_all(&pkt).context("write subscribe")?;
let snapshot = WarpFrame::Snapshot(WarpSnapshot {
epoch: 0,
graph: RenderGraph::default(),
state_hash: None,
});
let pkt = encode_message(
Message::WarpStream {
warp_id,
frame: snapshot,
},
0,
)
.context("encode snapshot")?;
stream.write_all(&pkt).context("write snapshot")?;
for i in 0..diffs {
let diff = WarpFrame::Diff(WarpDiff {
from_epoch: i,
to_epoch: i.saturating_add(1),
ops: vec![],
state_hash: None,
});
let pkt = encode_message(
Message::WarpStream {
warp_id,
frame: diff,
},
0,
)
.context("encode diff")?;
stream.write_all(&pkt).context("write diff")?;
std::thread::sleep(Duration::from_millis(delay_ms));
}
Ok(())
}
/// CLI publisher that connects to the session hub and publishes a snapshot + gapless diffs.
///
/// Usage: `publish_pulse [socket_path] [warp_id] [diffs] [delay_ms]`
///
/// Defaults: `/tmp/echo-session.sock`, warp_id=1, diffs=5, delay_ms=250.
fn main() -> Result<()> {
let mut args = std::env::args().skip(1);
let socket_path = args
.next()
.unwrap_or_else(|| "/tmp/echo-session.sock".to_string());
let warp_id: WarpId = args
.next()
.as_deref()
.unwrap_or("1")
.parse()
.context("parse warp_id")?;
let diffs: u64 = args
.next()
.as_deref()
.unwrap_or("5")
.parse()
.context("parse diffs")?;
let delay_ms: u64 = args
.next()
.as_deref()
.unwrap_or("250")
.parse()
.context("parse delay_ms")?;
let mut stream =
UnixStream::connect(&socket_path).with_context(|| format!("connect {socket_path}"))?;
// Best-effort: reduce latency if supported.
let _ = stream.set_nonblocking(false);
let pkt = encode_message(
Message::Handshake(HandshakePayload {
client_version: 1,
capabilities: vec!["demo:pulse".into()],
agent_id: Some("echo-session-client-example:publish_pulse".into()),
session_meta: None,
}),
0,
)
.context("encode handshake")?;
stream.write_all(&pkt).context("write handshake")?;
let pkt = encode_message(Message::SubscribeWarp { warp_id }, 0).context("encode subscribe")?;
stream.write_all(&pkt).context("write subscribe")?;
let snapshot = WarpFrame::Snapshot(WarpSnapshot {
epoch: 0,
graph: RenderGraph::default(),
state_hash: None,
});
let pkt = encode_message(
Message::WarpStream {
warp_id,
frame: snapshot,
},
0,
)
.context("encode snapshot")?;
stream.write_all(&pkt).context("write snapshot")?;
for i in 0..diffs {
let diff = WarpFrame::Diff(WarpDiff {
from_epoch: i,
to_epoch: i.saturating_add(1),
ops: vec![],
state_hash: None,
});
let pkt = encode_message(
Message::WarpStream {
warp_id,
frame: diff,
},
0,
)
.context("encode diff")?;
stream.write_all(&pkt).context("write diff")?;
std::thread::sleep(Duration::from_millis(delay_ms));
}
Ok(())
}
🤖 Prompt for AI Agents
crates/echo-session-client/examples/publish_pulse.rs lines 23-103: the review
flags that main() lacks a rustdoc comment and fails the repository's strict
missing_docs policy; add a concise rustdoc (triple-slash ///) immediately above
fn main describing the example's purpose, expected CLI arguments and their
defaults (socket path, warp_id, diffs, delay_ms), and a brief usage note so the
example is documented and satisfies cargo clippy --all-targets -- -D
missing_docs.

2 changes: 2 additions & 0 deletions crates/echo-session-ws-gateway/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ anyhow = "1"
axum = { version = "0.8", features = ["ws"] }
axum-server = { version = "0.8", features = ["tls-rustls"] }
clap = { version = "4", features = ["derive"] }
echo-session-proto = { version = "0.1.0", path = "../echo-session-proto" }
futures-util = "0.3"
serde = { version = "1.0", features = ["derive"] }
tokio = { version = "1", features = ["rt-multi-thread", "macros", "net", "io-util", "signal", "time"] }
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["fmt", "env-filter"] }
7 changes: 7 additions & 0 deletions crates/echo-session-ws-gateway/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,17 @@ cargo run -p echo-session-ws-gateway -- \
--tls-cert cert.pem --tls-key key.pem
```

Then open:

- `http://localhost:8787/dashboard` (session dashboard)
- `http://localhost:8787/api/metrics` (JSON metrics)
- `ws://localhost:8787/ws` (WebSocket endpoint)

## Features

- Binary WS frames → JS-ABI packets over UDS
- Payload guard (8 MiB default)
- Built-in hub observer for `/dashboard` metrics (disable with `--no-observer`; configure with `--observe-warp`)
- Optional origin allowlist
- Optional TLS (rustls)
- Ping/pong keepalive
Loading
Loading