From 381752aebbd7e82937cf5cd6bd3a4f6948b28398 Mon Sep 17 00:00:00 2001 From: qj0r9j0vc2 Date: Tue, 20 Jan 2026 10:18:56 +0900 Subject: [PATCH] feat: add task supervisor for graceful shutdown --- Cargo.toml | 2 +- crates/node/Cargo.toml | 1 + crates/node/src/lib.rs | 2 + crates/node/src/main.rs | 84 +++++-- crates/node/src/node.rs | 105 ++++++++- crates/node/src/supervisor.rs | 415 ++++++++++++++++++++++++++++++++++ 6 files changed, 580 insertions(+), 29 deletions(-) create mode 100644 crates/node/src/supervisor.rs diff --git a/Cargo.toml b/Cargo.toml index 906bbbe..2767793 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,7 +22,7 @@ repository = "https://github.com/decipherhub/cipherbft" [workspace.dependencies] # Async runtime tokio = { version = "1.35", features = ["full"] } -tokio-util = { version = "0.7", features = ["codec"] } +tokio-util = { version = "0.7", features = ["codec", "rt"] } async-trait = "0.1" # ABCI & Tendermint types diff --git a/crates/node/Cargo.toml b/crates/node/Cargo.toml index 2fce971..98f28a4 100644 --- a/crates/node/Cargo.toml +++ b/crates/node/Cargo.toml @@ -24,6 +24,7 @@ alloy-primitives = { version = "1", features = ["serde"] } # Async runtime tokio = { workspace = true, features = ["full", "signal"] } +tokio-util = { workspace = true } async-trait = { workspace = true } futures = { workspace = true } diff --git a/crates/node/src/lib.rs b/crates/node/src/lib.rs index a21a11e..fa75990 100644 --- a/crates/node/src/lib.rs +++ b/crates/node/src/lib.rs @@ -11,6 +11,7 @@ pub mod genesis_bootstrap; pub mod key_cli; pub mod network; pub mod node; +pub mod supervisor; pub mod util; pub use client_config::ClientConfig; @@ -26,3 +27,4 @@ pub use genesis_bootstrap::{ }; pub use key_cli::{execute_keys_command, KeysCommand}; pub use node::Node; +pub use supervisor::{NodeSupervisor, ShutdownError}; diff --git a/crates/node/src/main.rs b/crates/node/src/main.rs index 0f443dc..00470a3 100644 --- a/crates/node/src/main.rs +++ b/crates/node/src/main.rs @@ -8,8 +8,8 @@ use cipherbft_crypto::{BlsKeyPair, BlsSecretKey, Ed25519KeyPair, Ed25519SecretKe use cipherd::key_cli::KeyringBackendArg; use cipherd::{ execute_keys_command, generate_local_configs, GenesisGenerator, GenesisGeneratorConfig, - GenesisLoader, KeysCommand, Node, NodeConfig, ValidatorKeyFile, CIPHERD_HOME_ENV, - DEFAULT_HOME_DIR, + GenesisLoader, KeysCommand, Node, NodeConfig, NodeSupervisor, ValidatorKeyFile, + CIPHERD_HOME_ENV, DEFAULT_HOME_DIR, }; use clap::{Parser, Subcommand}; use std::path::PathBuf; @@ -720,7 +720,24 @@ async fn cmd_start( let mut node = Node::new(config, bls_keypair, ed25519_keypair)?; node.bootstrap_validators_from_genesis(&genesis)?; - node.run().await?; + // Create a supervisor for structured task management and graceful shutdown + let supervisor = NodeSupervisor::new(); + + // Set up signal handling for graceful shutdown + let shutdown_supervisor = supervisor.clone(); + tokio::spawn(async move { + if let Err(e) = tokio::signal::ctrl_c().await { + tracing::error!("Failed to listen for Ctrl+C: {}", e); + return; + } + info!("Received Ctrl+C, initiating graceful shutdown..."); + if let Err(e) = shutdown_supervisor.shutdown().await { + tracing::warn!("Shutdown warning: {}", e); + } + }); + + // Run node with the supervisor for coordinated task management + node.run_with_supervisor(supervisor).await?; Ok(()) } @@ -908,11 +925,13 @@ async fn cmd_testnet_start(num_validators: usize, duration: u64) -> Result<()> { }) .collect(); - // Create and start nodes - let mut handles = Vec::new(); + // Create a shared supervisor for coordinated task management across all nodes + // This ensures graceful shutdown propagates to all validators simultaneously + let supervisor = NodeSupervisor::new(); + // Create and start nodes under the shared supervisor for tc in test_configs { - let validator_id = tc + let _validator_id = tc .config .validator_id .expect("test config should have validator_id"); @@ -924,33 +943,56 @@ async fn cmd_testnet_start(num_validators: usize, duration: u64) -> Result<()> { node.add_validator(*vid, bls_pubkey.clone(), ed25519_pubkey.clone()); } - let handle = tokio::spawn(async move { - if let Err(e) = node.run().await { - tracing::error!("Node {:?} error: {}", validator_id, e); - } - }); - - handles.push(handle); + // Spawn each node under the shared supervisor for coordinated lifecycle management + let node_supervisor = supervisor.clone(); + supervisor.spawn( + // Note: Using a static string for task name as required by spawn() + "validator-node", + async move { + node.run_with_supervisor(node_supervisor).await?; + Ok(()) + }, + ); // Stagger node startup slightly tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; } - info!("All {} nodes started", num_validators); + info!( + "All {} nodes started under shared supervisor", + num_validators + ); - // Run for specified duration or until Ctrl+C - if duration > 0 { + // Wait for shutdown trigger (duration or Ctrl+C) + let shutdown_reason = if duration > 0 { info!("Running for {} seconds...", duration); - tokio::time::sleep(tokio::time::Duration::from_secs(duration)).await; - info!("Duration elapsed, shutting down..."); + tokio::select! { + _ = tokio::time::sleep(tokio::time::Duration::from_secs(duration)) => { + "duration elapsed" + } + _ = tokio::signal::ctrl_c() => { + "Ctrl+C received" + } + } } else { info!("Press Ctrl+C to stop..."); tokio::signal::ctrl_c().await?; - info!("Received shutdown signal..."); + "Ctrl+C received" + }; + + info!("Shutdown triggered: {}", shutdown_reason); + + // Initiate graceful shutdown through the supervisor + // This propagates cancellation to all nodes in the correct order + info!( + "Initiating coordinated shutdown of all {} validators...", + num_validators + ); + if let Err(e) = supervisor.shutdown().await { + tracing::warn!("Shutdown warning: {}", e); } - // All handles will be dropped, tasks will be cancelled - info!("Testnet stopped"); + info!("Testnet stopped gracefully"); Ok(()) } diff --git a/crates/node/src/node.rs b/crates/node/src/node.rs index e1831dc..c4ad56f 100644 --- a/crates/node/src/node.rs +++ b/crates/node/src/node.rs @@ -1,8 +1,24 @@ //! Node runner - ties Primary, Workers, and Network together +//! +//! # Task Supervision +//! +//! The node uses a [`NodeSupervisor`] for structured task management: +//! - All background tasks are tracked and can be cancelled gracefully +//! - Shutdown follows a specific order to ensure clean state +//! - Critical task failures trigger coordinated shutdown +//! +//! # Shutdown Order +//! +//! 1. Stop accepting new network connections +//! 2. Drain in-flight consensus rounds (via cancellation signal) +//! 3. Flush pending storage writes +//! 4. Close database connections +//! 5. Exit use crate::config::NodeConfig; use crate::execution_bridge::ExecutionBridge; use crate::network::TcpPrimaryNetwork; +use crate::supervisor::NodeSupervisor; use crate::util::validator_id_from_bls; use anyhow::{Context, Result}; use cipherbft_consensus::{ @@ -27,6 +43,7 @@ use std::collections::HashMap; use std::sync::Arc; use std::time::Duration; use tokio::sync::mpsc; +use tokio_util::sync::CancellationToken; use tracing::{debug, error, info, warn}; /// Validator public key information for both DCL and Consensus layers @@ -266,13 +283,34 @@ impl Node { Ok(self) } - /// Run the node + /// Run the node with a default supervisor. + /// + /// Creates a new [`NodeSupervisor`] and runs the node until shutdown is triggered + /// (e.g., via Ctrl+C signal). pub async fn run(self) -> Result<()> { + let supervisor = NodeSupervisor::new(); + self.run_with_supervisor(supervisor).await + } + + /// Run the node with a provided supervisor. + /// + /// This allows external control over task supervision, useful for: + /// - Testing with custom shutdown timing + /// - Coordinating multiple nodes in a single process + /// - Custom shutdown ordering + /// + /// # Arguments + /// + /// * `supervisor` - The supervisor that manages task lifecycle + pub async fn run_with_supervisor(self, supervisor: NodeSupervisor) -> Result<()> { info!("Starting node with validator ID: {:?}", self.validator_id); // Create data directory if needed std::fs::create_dir_all(&self.config.data_dir)?; + // Get cancellation token for graceful shutdown + let cancel_token = supervisor.cancellation_token(); + // Create channels for Primary let (primary_incoming_tx, mut primary_incoming_rx) = mpsc::channel::<(ValidatorId, DclMessage)>(1000); @@ -295,16 +333,25 @@ impl Node { ) })?; - // Connect to peers (with retry) - tokio::spawn({ + // Connect to peers (with retry) - supervised task + supervisor.spawn_cancellable("peer-connector", { let network = Arc::clone(&primary_network); - async move { + move |token| async move { // Initial delay to let other nodes start tokio::time::sleep(Duration::from_secs(1)).await; loop { - network.connect_to_all_peers().await; - tokio::time::sleep(Duration::from_secs(5)).await; + tokio::select! { + biased; + _ = token.cancelled() => { + info!("Peer connector shutting down"); + break; + } + _ = tokio::time::sleep(Duration::from_secs(5)) => { + network.connect_to_all_peers().await; + } + } } + Ok(()) } }); @@ -478,9 +525,53 @@ impl Node { // Clone execution bridge for use in event loop let execution_bridge = self.execution_bridge.clone(); - // Main event loop + // Run the main event loop with graceful shutdown support + let result = Self::run_event_loop( + cancel_token, + &mut primary_incoming_rx, + &mut primary_handle, + &cut_tx, + &mut decided_rx, + execution_bridge, + ) + .await; + + // Graceful shutdown sequence + info!("Shutting down node components..."); + + // Step 1: Signal Primary to stop (it will stop accepting new batches) + info!("Stopping Primary..."); + primary_handle.shutdown().await; + + // Step 2: Wait for supervisor to complete all tracked tasks + info!("Waiting for supervised tasks to complete..."); + if let Err(e) = supervisor.shutdown().await { + warn!("Some tasks did not complete cleanly: {}", e); + } + + info!("Node shutdown complete"); + result + } + + /// Internal event loop that handles messages and can be cancelled. + async fn run_event_loop( + cancel_token: CancellationToken, + primary_incoming_rx: &mut mpsc::Receiver<(ValidatorId, DclMessage)>, + primary_handle: &mut cipherbft_data_chain::primary::PrimaryHandle, + cut_tx: &mpsc::Sender, + decided_rx: &mut mpsc::Receiver<(ConsensusHeight, Cut)>, + execution_bridge: Option>, + ) -> Result<()> { loop { tokio::select! { + biased; + + // Check for shutdown signal first (high priority) + _ = cancel_token.cancelled() => { + info!("Received shutdown signal, exiting event loop"); + return Ok(()); + } + // Incoming network messages -> forward to Primary Some((from, msg)) = primary_incoming_rx.recv() => { debug!("Received message from {:?}: {:?}", from, msg.type_name()); diff --git a/crates/node/src/supervisor.rs b/crates/node/src/supervisor.rs new file mode 100644 index 0000000..0765aad --- /dev/null +++ b/crates/node/src/supervisor.rs @@ -0,0 +1,415 @@ +//! Task Supervision for CipherBFT Node +//! +//! This module implements a structured task supervision tree similar to Erlang/OTP, +//! using tokio-util's `TaskTracker` and `CancellationToken` for: +//! +//! - **Coordinated Shutdown**: All tasks receive cancellation signals in a controlled order +//! - **Failure Propagation**: If one critical task fails, others are notified +//! - **Resource Cleanup**: Ensures storage is flushed before process exit +//! +//! # Shutdown Order +//! +//! 1. Stop accepting new network connections +//! 2. Drain in-flight consensus rounds +//! 3. Flush pending storage writes +//! 4. Close database connections +//! 5. Exit +//! +//! # Example +//! +//! ```ignore +//! let supervisor = NodeSupervisor::new(); +//! +//! // Spawn tasks under supervision +//! supervisor.spawn("network", async move { +//! // Network task +//! Ok(()) +//! }); +//! +//! // Graceful shutdown +//! supervisor.shutdown().await; +//! ``` + +use std::future::Future; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; +use std::time::Duration; +use tokio_util::sync::CancellationToken; +use tokio_util::task::TaskTracker; +use tracing::{error, info, warn}; + +/// Default timeout for graceful shutdown before forcing termination +const DEFAULT_SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(30); + +/// Result type for supervised tasks +pub type SupervisedResult = Result<(), anyhow::Error>; + +/// A supervisor that manages the lifecycle of all node tasks. +/// +/// Implements a hierarchical task management strategy: +/// - All spawned tasks are tracked by `TaskTracker` +/// - Cancellation is signaled via `CancellationToken` +/// - Shutdown waits for all tasks to complete gracefully +#[derive(Clone)] +pub struct NodeSupervisor { + /// Tracks all spawned tasks + tracker: TaskTracker, + /// Token for signaling cancellation to all tasks + token: CancellationToken, + /// Flag indicating shutdown has been initiated + shutting_down: Arc, + /// Timeout for graceful shutdown + shutdown_timeout: Duration, +} + +impl Default for NodeSupervisor { + fn default() -> Self { + Self::new() + } +} + +impl NodeSupervisor { + /// Create a new supervisor with default settings + pub fn new() -> Self { + Self { + tracker: TaskTracker::new(), + token: CancellationToken::new(), + shutting_down: Arc::new(AtomicBool::new(false)), + shutdown_timeout: DEFAULT_SHUTDOWN_TIMEOUT, + } + } + + /// Create a new supervisor with custom shutdown timeout + pub fn with_timeout(timeout: Duration) -> Self { + Self { + tracker: TaskTracker::new(), + token: CancellationToken::new(), + shutting_down: Arc::new(AtomicBool::new(false)), + shutdown_timeout: timeout, + } + } + + /// Get a clone of the cancellation token for use in tasks + pub fn cancellation_token(&self) -> CancellationToken { + self.token.clone() + } + + /// Check if shutdown has been initiated + pub fn is_shutting_down(&self) -> bool { + self.shutting_down.load(Ordering::SeqCst) + } + + /// Spawn a supervised task. + /// + /// The task will be: + /// - Tracked by the supervisor + /// - Cancelled when shutdown is initiated + /// - Logged on completion or failure + /// + /// # Arguments + /// + /// * `name` - Human-readable name for logging + /// * `future` - The async task to run + pub fn spawn(&self, name: &'static str, future: F) + where + F: Future + Send + 'static, + { + let token = self.token.clone(); + let shutting_down = self.shutting_down.clone(); + + self.tracker.spawn(async move { + tokio::select! { + biased; + + // Check for cancellation first + _ = token.cancelled() => { + info!("[{}] Received shutdown signal, stopping gracefully", name); + } + + // Run the actual task + result = future => { + match result { + Ok(()) => { + info!("[{}] Task completed successfully", name); + } + Err(e) => { + // Check if this is during shutdown - if so, log as info, not error + if shutting_down.load(Ordering::SeqCst) { + info!("[{}] Task stopped during shutdown: {}", name, e); + } else { + error!("[{}] Task failed: {:?}", name, e); + } + } + } + } + } + }); + } + + /// Spawn a supervised task that should complete quickly on cancellation. + /// + /// Unlike `spawn`, this variant expects the task to handle cancellation + /// internally and complete promptly when cancelled. + /// + /// # Arguments + /// + /// * `name` - Human-readable name for logging + /// * `future` - The async task that accepts a cancellation token + pub fn spawn_cancellable(&self, name: &'static str, f: F) + where + F: FnOnce(CancellationToken) -> Fut + Send + 'static, + Fut: Future + Send + 'static, + { + let token = self.token.clone(); + let shutting_down = self.shutting_down.clone(); + + self.tracker.spawn(async move { + let result = f(token).await; + match result { + Ok(()) => { + info!("[{}] Task completed successfully", name); + } + Err(e) => { + if shutting_down.load(Ordering::SeqCst) { + info!("[{}] Task stopped during shutdown: {}", name, e); + } else { + error!("[{}] Task failed: {:?}", name, e); + } + } + } + }); + } + + /// Spawn a critical task that triggers full shutdown on failure. + /// + /// If this task fails unexpectedly (not during shutdown), it will + /// initiate shutdown of all other tasks. + /// + /// # Arguments + /// + /// * `name` - Human-readable name for logging + /// * `future` - The async task to run + pub fn spawn_critical(&self, name: &'static str, future: F) + where + F: Future + Send + 'static, + { + let token = self.token.clone(); + let shutting_down = self.shutting_down.clone(); + let self_token = self.token.clone(); + + self.tracker.spawn(async move { + tokio::select! { + biased; + + _ = token.cancelled() => { + info!("[{}] Critical task received shutdown signal", name); + } + + result = future => { + match result { + Ok(()) => { + info!("[{}] Critical task completed successfully", name); + } + Err(e) => { + if !shutting_down.load(Ordering::SeqCst) { + error!("[{}] CRITICAL TASK FAILED: {:?}", name, e); + error!("Initiating emergency shutdown due to critical task failure"); + // Trigger shutdown of all tasks + self_token.cancel(); + } else { + info!("[{}] Critical task stopped during shutdown: {}", name, e); + } + } + } + } + } + }); + } + + /// Initiate graceful shutdown. + /// + /// This will: + /// 1. Signal all tasks to stop via cancellation token + /// 2. Close the task tracker to prevent new tasks + /// 3. Wait for all tasks to complete (with timeout) + /// + /// # Returns + /// + /// Returns `Ok(())` if all tasks completed within the timeout, + /// or `Err` if the timeout was exceeded (tasks may still be running). + pub async fn shutdown(&self) -> Result<(), ShutdownError> { + if self.shutting_down.swap(true, Ordering::SeqCst) { + // Already shutting down + warn!("Shutdown already in progress"); + return Ok(()); + } + + info!("Initiating graceful shutdown..."); + + // Step 1: Close the tracker to prevent new tasks + self.tracker.close(); + + // Step 2: Signal all tasks to stop + self.token.cancel(); + + // Step 3: Wait for all tasks with timeout + let wait_result = tokio::time::timeout(self.shutdown_timeout, self.tracker.wait()).await; + + match wait_result { + Ok(()) => { + info!("All tasks terminated gracefully"); + Ok(()) + } + Err(_) => { + error!( + "Shutdown timeout ({:?}) exceeded, some tasks may still be running", + self.shutdown_timeout + ); + Err(ShutdownError::Timeout) + } + } + } + + /// Initiate shutdown and wait indefinitely for all tasks to complete. + /// + /// Use this only when you're certain all tasks will eventually stop. + pub async fn shutdown_and_wait(&self) { + if self.shutting_down.swap(true, Ordering::SeqCst) { + warn!("Shutdown already in progress"); + return; + } + + info!("Initiating shutdown (waiting indefinitely)..."); + + self.tracker.close(); + self.token.cancel(); + self.tracker.wait().await; + + info!("All tasks terminated"); + } + + /// Wait for all tracked tasks to complete without initiating shutdown. + /// + /// This is useful for waiting on tasks that are expected to complete + /// naturally (e.g., after receiving a result). + pub async fn wait(&self) { + self.tracker.wait().await; + } + + /// Get the number of currently tracked tasks. + /// + /// Note: This is approximate due to the async nature of task spawning/completion. + pub fn task_count(&self) -> usize { + self.tracker.len() + } +} + +/// Errors that can occur during shutdown +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum ShutdownError { + /// Shutdown timed out waiting for tasks to complete + Timeout, +} + +impl std::fmt::Display for ShutdownError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + ShutdownError::Timeout => write!(f, "shutdown timeout exceeded"), + } + } +} + +impl std::error::Error for ShutdownError {} + +#[cfg(test)] +mod tests { + use super::*; + use std::sync::atomic::AtomicU32; + use tokio::time::sleep; + + #[tokio::test] + async fn test_supervisor_spawn_and_shutdown() { + let supervisor = NodeSupervisor::with_timeout(Duration::from_secs(5)); + let counter = Arc::new(AtomicU32::new(0)); + + // Spawn some tasks + for _ in 0..3 { + let counter = counter.clone(); + supervisor.spawn("test-task", async move { + counter.fetch_add(1, Ordering::SeqCst); + sleep(Duration::from_millis(100)).await; + Ok(()) + }); + } + + // Give tasks time to start + sleep(Duration::from_millis(50)).await; + + // Shutdown should wait for tasks + let result = supervisor.shutdown().await; + assert!(result.is_ok()); + + // All tasks should have incremented counter + assert_eq!(counter.load(Ordering::SeqCst), 3); + } + + #[tokio::test] + async fn test_supervisor_cancellation() { + let supervisor = NodeSupervisor::new(); + let completed = Arc::new(AtomicBool::new(false)); + let cancelled = Arc::new(AtomicBool::new(false)); + + let cancelled_clone = cancelled.clone(); + supervisor.spawn_cancellable("cancellable-task", move |token| { + let cancelled = cancelled_clone; + async move { + tokio::select! { + _ = token.cancelled() => { + cancelled.store(true, Ordering::SeqCst); + } + _ = sleep(Duration::from_secs(60)) => { + // This should never complete + } + } + Ok(()) + } + }); + + // Give task time to start + sleep(Duration::from_millis(50)).await; + + // Initiate shutdown + let result = supervisor.shutdown().await; + assert!(result.is_ok()); + + // Task should have been cancelled + assert!(cancelled.load(Ordering::SeqCst)); + assert!(!completed.load(Ordering::SeqCst)); + } + + #[tokio::test] + async fn test_critical_task_failure_triggers_shutdown() { + let supervisor = NodeSupervisor::new(); + let other_task_cancelled = Arc::new(AtomicBool::new(false)); + + // Spawn a task that will observe the cancellation + let cancelled_clone = other_task_cancelled.clone(); + let token = supervisor.cancellation_token(); + tokio::spawn(async move { + token.cancelled().await; + cancelled_clone.store(true, Ordering::SeqCst); + }); + + // Spawn a critical task that fails + supervisor.spawn_critical("failing-critical", async move { + sleep(Duration::from_millis(50)).await; + Err(anyhow::anyhow!("Critical failure!")) + }); + + // Wait a bit for the critical task to fail and trigger shutdown + sleep(Duration::from_millis(200)).await; + + // The other task should have been cancelled + assert!(other_task_cancelled.load(Ordering::SeqCst)); + } +}