From 311590cdfb0ca359f208293ba457148ef81f3699 Mon Sep 17 00:00:00 2001 From: Pavani Manchala Date: Sat, 3 Jan 2026 12:37:07 +0530 Subject: [PATCH 1/2] feat: add tiered approval modes withexecution policy, CLI support, and tests --- cortex/approval.py | 24 ++++++ cortex/approval_policy.py | 74 +++++++++++++++++++ cortex/cli.py | 54 ++++++++++++++ cortex/confirm.py | 6 ++ cortex/coordinator.py | 40 +++++++++- cortex/user_preferences.py | 55 ++++++++++++++ docs/approval_modes.md | 37 ++++++++++ tests/test_approval_policy.py | 24 ++++++ .../test_installation_coordinator_approval.py | 22 ++++++ 9 files changed, 335 insertions(+), 1 deletion(-) create mode 100644 cortex/approval.py create mode 100644 cortex/approval_policy.py create mode 100644 cortex/confirm.py create mode 100644 cortex/user_preferences.py create mode 100644 docs/approval_modes.md create mode 100644 tests/test_approval_policy.py create mode 100644 tests/test_installation_coordinator_approval.py diff --git a/cortex/approval.py b/cortex/approval.py new file mode 100644 index 00000000..9d066e91 --- /dev/null +++ b/cortex/approval.py @@ -0,0 +1,24 @@ +from enum import Enum + + +class ApprovalMode(str, Enum): + """ + Defines how much autonomy Cortex has when performing actions. + This is the single source of truth for approval modes. + """ + + SUGGEST = "suggest" + AUTO_EDIT = "auto-edit" + FULL_AUTO = "full-auto" + + @classmethod + def from_string(cls, value: str) -> "ApprovalMode": + """ + Convert user/config input into ApprovalMode. + Raises ValueError for invalid modes. + """ + try: + return cls(value) + except ValueError as exc: + valid = ", ".join([m.value for m in cls]) + raise ValueError(f"Invalid approval mode '{value}'. Valid modes are: {valid}") from exc diff --git a/cortex/approval_policy.py b/cortex/approval_policy.py new file mode 100644 index 00000000..22e0cbb6 --- /dev/null +++ b/cortex/approval_policy.py @@ -0,0 +1,74 @@ +from abc import ABC, abstractmethod +from typing import Literal + +from cortex.approval import ApprovalMode + +ActionType = Literal[ + "show", + "file_edit", + "shell_command", +] + + +class ApprovalPolicy(ABC): + """ + Base class for approval policies. + Defines what actions are allowed under each approval mode. + """ + + def __init__(self, mode: ApprovalMode): + self.mode = mode + + @abstractmethod + def allow(self, action: ActionType) -> bool: + """Return True if the action is allowed.""" + pass + + @abstractmethod + def requires_confirmation(self, action: ActionType) -> bool: + """Return True if the action requires user confirmation.""" + pass + + +# ------------------ Policies ------------------ + + +class SuggestPolicy(ApprovalPolicy): + def allow(self, action: ActionType) -> bool: + return action == "show" + + def requires_confirmation(self, action: ActionType) -> bool: + return False + + +class AutoEditPolicy(ApprovalPolicy): + def allow(self, action: ActionType) -> bool: + return action in {"show", "file_edit", "shell_command"} + + def requires_confirmation(self, action: ActionType) -> bool: + return action == "shell_command" + + +class FullAutoPolicy(ApprovalPolicy): + def allow(self, action: ActionType) -> bool: + return True + + def requires_confirmation(self, action: ActionType) -> bool: + return False + + +# ------------------ Factory ------------------ + + +def get_approval_policy(mode: ApprovalMode) -> ApprovalPolicy: + """ + Factory method to get the correct approval policy. + """ + if mode == ApprovalMode.SUGGEST: + return SuggestPolicy(mode) + if mode == ApprovalMode.AUTO_EDIT: + return AutoEditPolicy(mode) + if mode == ApprovalMode.FULL_AUTO: + return FullAutoPolicy(mode) + + raise ValueError(f"Unsupported approval mode: {mode}") diff --git a/cortex/cli.py b/cortex/cli.py index 7d248002..2f5cb493 100644 --- a/cortex/cli.py +++ b/cortex/cli.py @@ -6,6 +6,8 @@ from datetime import datetime from typing import Any +from cortex.approval import ApprovalMode +from cortex.approval_policy import get_approval_policy from cortex.ask import AskHandler from cortex.branding import VERSION, console, cx_header, cx_print, show_banner from cortex.coordinator import InstallationCoordinator, InstallationStep, StepStatus @@ -22,6 +24,7 @@ from cortex.network_config import NetworkConfig from cortex.notification_manager import NotificationManager from cortex.stack_manager import StackManager +from cortex.user_preferences import UserPreferences from cortex.validators import validate_api_key, validate_install_request # Suppress noisy log messages in normal operation @@ -36,6 +39,8 @@ def __init__(self, verbose: bool = False): self.spinner_chars = ["⠋", "⠙", "⠹", "⠸", "⠼", "⠴", "⠦", "⠧", "⠇", "⠏"] self.spinner_idx = 0 self.verbose = verbose + prefs = UserPreferences.load() + self.approval_policy = get_approval_policy(prefs.approval_mode) def _debug(self, message: str): """Print debug info only in verbose mode""" @@ -976,6 +981,45 @@ def env(self, args: argparse.Namespace) -> int: traceback.print_exc() return 1 + def config(self, args: argparse.Namespace) -> int: + """Handle configuration commands.""" + action = getattr(args, "config_action", None) + + if not action: + self._print_error("Please specify a subcommand (set)") + return 1 + + try: + if action == "set": + return self._config_set(args) + else: + self._print_error(f"Unknown config subcommand: {action}") + return 1 + except ValueError as e: + self._print_error(str(e)) + return 1 + except Exception as e: + self._print_error(f"Unexpected error: {e}") + if self.verbose: + import traceback + + traceback.print_exc() + return 1 + + def _config_set(self, args: argparse.Namespace) -> int: + key = args.key + value = args.value + + if key != "approval-mode": + raise ValueError(f"Unknown config key '{key}'. Supported keys: approval-mode") + + prefs = UserPreferences.load() + prefs.approval_mode = ApprovalMode.from_string(value) + prefs.save() + + console.print(f"[green]✔ Approval mode set to '{prefs.approval_mode.value}'[/green]") + return 0 + def _env_set(self, env_mgr: EnvironmentManager, args: argparse.Namespace) -> int: """Set an environment variable.""" app = args.app @@ -1624,6 +1668,14 @@ def main(): # Wizard command wizard_parser = subparsers.add_parser("wizard", help="Configure API key interactively") + # Config command + config_parser = subparsers.add_parser("config", help="Manage Cortex configuration") + config_subs = config_parser.add_subparsers(dest="config_action", help="Config actions") + + config_set_parser = config_subs.add_parser("set", help="Set a configuration value") + config_set_parser.add_argument("key", help="Configuration key") + config_set_parser.add_argument("value", help="Configuration value") + # Status command (includes comprehensive health checks) subparsers.add_parser("status", help="Show comprehensive system status and health checks") @@ -1903,6 +1955,8 @@ def main(): return 1 elif args.command == "env": return cli.env(args) + elif args.command == "config": + return cli.config(args) else: parser.print_help() return 1 diff --git a/cortex/confirm.py b/cortex/confirm.py new file mode 100644 index 00000000..9722f3eb --- /dev/null +++ b/cortex/confirm.py @@ -0,0 +1,6 @@ +def confirm_action(message: str) -> bool: + try: + response = input(f"{message} [y/N]: ").strip().lower() + except EOFError: + return False + return response in {"y", "yes"} diff --git a/cortex/coordinator.py b/cortex/coordinator.py index ac19bf80..7ed74bc4 100644 --- a/cortex/coordinator.py +++ b/cortex/coordinator.py @@ -9,6 +9,10 @@ from enum import Enum from typing import Any +from cortex.approval import ApprovalMode +from cortex.approval_policy import get_approval_policy +from cortex.confirm import confirm_action +from cortex.user_preferences import UserPreferences from cortex.validators import DANGEROUS_PATTERNS logger = logging.getLogger(__name__) @@ -60,6 +64,7 @@ def __init__( enable_rollback: bool = False, log_file: str | None = None, progress_callback: Callable[[int, int, InstallationStep], None] | None = None, + approval_policy=None, ): """Initialize an installation run with optional logging and rollback.""" self.timeout = timeout @@ -79,6 +84,12 @@ def __init__( ] self.rollback_commands: list[str] = [] + # 🔐 Load approval policy once + if approval_policy is not None: + self.approval_policy = approval_policy + else: + # Default behavior for library/tests: full-auto + self.approval_policy = get_approval_policy(ApprovalMode.FULL_AUTO) @classmethod def from_plan( @@ -164,6 +175,19 @@ def _execute_command(self, step: InstallationStep) -> bool: step.start_time = time.time() self._log(f"Executing: {step.command}") + # 🔐 Approval check: shell command execution + if not self.approval_policy.allow("shell_command"): + step.status = StepStatus.SKIPPED + step.error = "Shell execution disabled by approval policy" + self._log("Blocked by approval policy") + return False + + if self.approval_policy.requires_confirmation("shell_command"): + if not confirm_action("Execute planned shell commands?"): + step.status = StepStatus.SKIPPED + step.error = "User declined execution" + self._log("Execution declined by user") + return False # Validate command before execution is_valid, error = self._validate_command(step.command) @@ -217,9 +241,19 @@ def _rollback(self): self._log("Starting rollback...") for cmd in reversed(self.rollback_commands): + # 🔐 Approval check: rollback shell command + if not self.approval_policy.allow("shell_command"): + self._log("Rollback blocked by approval policy") + return + try: self._log(f"Rollback: {cmd}") - subprocess.run(cmd, shell=True, capture_output=True, timeout=self.timeout) + subprocess.run( + cmd, + shell=True, + capture_output=True, + timeout=self.timeout, + ) except Exception as e: self._log(f"Rollback failed: {cmd} - {str(e)}") @@ -285,6 +319,10 @@ def verify_installation(self, verify_commands: list[str]) -> dict[str, bool]: self._log("Starting verification...") for cmd in verify_commands: + if not self.approval_policy.allow("shell_command"): + verification_results[cmd] = False + self._log("Verification blocked by approval policy") + continue try: result = subprocess.run(cmd, shell=True, capture_output=True, text=True, timeout=30) success = result.returncode == 0 diff --git a/cortex/user_preferences.py b/cortex/user_preferences.py new file mode 100644 index 00000000..4e576371 --- /dev/null +++ b/cortex/user_preferences.py @@ -0,0 +1,55 @@ +from __future__ import annotations + +import json +from dataclasses import asdict, dataclass +from pathlib import Path +from typing import Any, dict + +from cortex.approval import ApprovalMode + +# Default config location (adjust ONLY if your project already defines this elsewhere) +DEFAULT_CONFIG_PATH = Path.home() / ".cortex" / "config.json" + + +@dataclass +class UserPreferences: + """ + Stores persistent user configuration for Cortex. + """ + + approval_mode: ApprovalMode = ApprovalMode.SUGGEST + + @classmethod + def load(cls, path: Path = DEFAULT_CONFIG_PATH) -> UserPreferences: + """ + Load user preferences from disk. + Falls back to defaults if config does not exist. + """ + if not path.exists(): + return cls() + + try: + with path.open("r", encoding="utf-8") as f: + data: dict[str, Any] = json.load(f) + except (json.JSONDecodeError, OSError): + # Corrupt config → fail safe + return cls() + + return cls( + approval_mode=ApprovalMode.from_string( + data.get("approval_mode", ApprovalMode.SUGGEST.value) + ) + ) + + def save(self, path: Path = DEFAULT_CONFIG_PATH) -> None: + """ + Persist user preferences to disk. + """ + path.parent.mkdir(parents=True, exist_ok=True) + + data = { + "approval_mode": self.approval_mode.value, + } + + with path.open("w", encoding="utf-8") as f: + json.dump(data, f, indent=2) diff --git a/docs/approval_modes.md b/docs/approval_modes.md new file mode 100644 index 00000000..aeb10187 --- /dev/null +++ b/docs/approval_modes.md @@ -0,0 +1,37 @@ +# Approval Modes + +## Overview + +Cortex supports **tiered approval modes** that control how and when system actions +(such as shell commands) are executed. +This feature improves safety, transparency, and flexibility for different workflows. + +Approval mode is a **persistent user setting** and applies across all Cortex commands. + +--- + +## Available Modes + +| Mode | Description | Execution Behavior | +|------|------------|--------------------| +| `suggest` | Plan-only mode | Commands are generated but **never executed** | +| `auto-edit` | Confirmed execution | Commands execute **only after user confirmation** | +| `full-auto` | Fully automatic | Commands execute **without prompts** | + +--- + +## Setting the Approval Mode + +Use the CLI to set the approval mode: + +```bash +cortex config set approval-mode suggest +cortex config set approval-mode auto-edit +cortex config set approval-mode full-auto +``` + +commands are executed only when --execute flag is provided + +```bash +cortex config set approval-mode full-auto,cortex install pandas --execute +``` \ No newline at end of file diff --git a/tests/test_approval_policy.py b/tests/test_approval_policy.py new file mode 100644 index 00000000..e41333e7 --- /dev/null +++ b/tests/test_approval_policy.py @@ -0,0 +1,24 @@ +from cortex.approval import ApprovalMode +from cortex.approval_policy import get_approval_policy + + +def test_suggest_policy(): + policy = get_approval_policy(ApprovalMode.SUGGEST) + + assert policy.allow("show") + assert not policy.allow("shell_command") + assert not policy.requires_confirmation("shell_command") + + +def test_auto_edit_policy(): + policy = get_approval_policy(ApprovalMode.AUTO_EDIT) + + assert policy.allow("shell_command") + assert policy.requires_confirmation("shell_command") + + +def test_full_auto_policy(): + policy = get_approval_policy(ApprovalMode.FULL_AUTO) + + assert policy.allow("shell_command") + assert not policy.requires_confirmation("shell_command") diff --git a/tests/test_installation_coordinator_approval.py b/tests/test_installation_coordinator_approval.py new file mode 100644 index 00000000..66427330 --- /dev/null +++ b/tests/test_installation_coordinator_approval.py @@ -0,0 +1,22 @@ +from cortex.approval import ApprovalMode +from cortex.coordinator import InstallationCoordinator +from cortex.user_preferences import UserPreferences + + +def test_suggest_mode_blocks_execution(monkeypatch): + # Force suggest mode + monkeypatch.setattr( + UserPreferences, + "load", + lambda: type("Prefs", (), {"approval_mode": ApprovalMode.SUGGEST})(), + ) + + coordinator = InstallationCoordinator( + commands=["echo hello"], + descriptions=["test command"], + ) + + result = coordinator.execute() + + assert not result.success + assert result.steps[0].status.value == "skipped" From 34647654a4eb5456f05b470c48f65f49457bf516 Mon Sep 17 00:00:00 2001 From: Pavani Manchala Date: Sat, 3 Jan 2026 22:43:07 +0530 Subject: [PATCH 2/2] feature modified --- cortex/coordinator.py | 71 +++++++++++++++---------------------------- 1 file changed, 25 insertions(+), 46 deletions(-) diff --git a/cortex/coordinator.py b/cortex/coordinator.py index 44824f7b..db673ae8 100644 --- a/cortex/coordinator.py +++ b/cortex/coordinator.py @@ -84,6 +84,7 @@ def __init__( ] self.rollback_commands: list[str] = [] + self._explicit_policy = approval_policy is not None # 🔐 Load approval policy once if approval_policy is not None: self.approval_policy = approval_policy @@ -180,28 +181,20 @@ def _validate_command(self, command: str) -> tuple: def _execute_command(self, step: InstallationStep) -> bool: from cortex.approval import ApprovalMode - step.status = StepStatus.RUNNING - step.start_time = time.time() - - self._log(f"Executing: {step.command}") - - # 🚫 Suggest mode must NEVER execute commands - if self.approval_policy.mode == ApprovalMode.SUGGEST: + # 🚫 SUGGEST MODE: skip execution (ONLY when explicitly enabled) + if self._explicit_policy and self.approval_policy.mode == ApprovalMode.SUGGEST: + step.start_time = time.time() + step.end_time = step.start_time step.status = StepStatus.SKIPPED step.error = "Execution skipped in suggest mode" - step.end_time = step.start_time self._log("Suggest mode: execution skipped") - return False + return False # not a failure, but did not execute - # 🔐 Approval check: shell command execution - if not self.approval_policy.allow("shell_command"): - step.status = StepStatus.FAILED - step.error = "Shell execution disabled by approval policy" - step.end_time = step.start_time - self._log(f"Execution blocked by approval policy: {step.command}") - return False + # Normal execution starts here + step.start_time = time.time() + step.status = StepStatus.RUNNING + self._log(f"Executing: {step.command}") - # 🔐 Approval check: confirmation required if self.approval_policy.requires_confirmation("shell_command"): if not confirm_action("Execute planned shell commands?"): step.status = StepStatus.FAILED @@ -210,7 +203,7 @@ def _execute_command(self, step: InstallationStep) -> bool: self._log("Execution declined by user") return False - # Validate command before execution + # Validate command is_valid, error = self._validate_command(step.command) if not is_valid: step.status = StepStatus.FAILED @@ -237,10 +230,10 @@ def _execute_command(self, step: InstallationStep) -> bool: step.status = StepStatus.SUCCESS self._log(f"Success: {step.command}") return True - else: - step.status = StepStatus.FAILED - self._log(f"Failed: {step.command} (exit code: {result.returncode})") - return False + + step.status = StepStatus.FAILED + self._log(f"Failed: {step.command} (exit code: {result.returncode})") + return False except subprocess.TimeoutExpired: step.status = StepStatus.FAILED @@ -253,7 +246,7 @@ def _execute_command(self, step: InstallationStep) -> bool: step.status = StepStatus.FAILED step.error = str(e) step.end_time = time.time() - self._log(f"Error: {step.command} - {str(e)}") + self._log(f"Error: {step.command} - {e}") return False def _rollback(self): @@ -263,11 +256,6 @@ def _rollback(self): self._log("Starting rollback...") for cmd in reversed(self.rollback_commands): - # 🔐 Approval check: rollback shell command - if not self.approval_policy.allow("shell_command"): - self._log("Rollback blocked by approval policy") - return - try: self._log(f"Rollback: {cmd}") subprocess.run( @@ -284,7 +272,6 @@ def add_rollback_command(self, command: str): self.rollback_commands.append(command) def execute(self) -> InstallationResult: - """Run each installation step and capture structured results.""" start_time = time.time() failed_step_index = None @@ -298,9 +285,10 @@ def execute(self) -> InstallationResult: if not success: failed_step_index = i + if self.stop_on_error: - for remaining_step in self.steps[i + 1 :]: - remaining_step.status = StepStatus.SKIPPED + for remaining in self.steps[i + 1 :]: + remaining.status = StepStatus.SKIPPED if self.enable_rollback: self._rollback() @@ -317,16 +305,13 @@ def execute(self) -> InstallationResult: ) total_duration = time.time() - start_time - # ❗ Suggest mode should never report success - if self.approval_policy.mode == ApprovalMode.SUGGEST: - all_success = False - else: - all_success = all(s.status == StepStatus.SUCCESS for s in self.steps) + all_success = all(s.status == StepStatus.SUCCESS for s in self.steps) - if all_success: - self._log("Installation completed successfully") - else: - self._log("Installation completed with errors") + self._log( + "Installation completed successfully" + if all_success + else "Installation completed with errors" + ) return InstallationResult( success=all_success, @@ -339,16 +324,10 @@ def execute(self) -> InstallationResult: ) def verify_installation(self, verify_commands: list[str]) -> dict[str, bool]: - """Execute verification commands and return per-command success.""" verification_results = {} - self._log("Starting verification...") for cmd in verify_commands: - if not self.approval_policy.allow("shell_command"): - verification_results[cmd] = False - self._log("Verification blocked by approval policy") - continue try: result = subprocess.run(cmd, shell=True, capture_output=True, text=True, timeout=30) success = result.returncode == 0