diff --git a/cortex/approval.py b/cortex/approval.py new file mode 100644 index 00000000..9d066e91 --- /dev/null +++ b/cortex/approval.py @@ -0,0 +1,24 @@ +from enum import Enum + + +class ApprovalMode(str, Enum): + """ + Defines how much autonomy Cortex has when performing actions. + This is the single source of truth for approval modes. + """ + + SUGGEST = "suggest" + AUTO_EDIT = "auto-edit" + FULL_AUTO = "full-auto" + + @classmethod + def from_string(cls, value: str) -> "ApprovalMode": + """ + Convert user/config input into ApprovalMode. + Raises ValueError for invalid modes. + """ + try: + return cls(value) + except ValueError as exc: + valid = ", ".join([m.value for m in cls]) + raise ValueError(f"Invalid approval mode '{value}'. Valid modes are: {valid}") from exc diff --git a/cortex/approval_policy.py b/cortex/approval_policy.py new file mode 100644 index 00000000..22e0cbb6 --- /dev/null +++ b/cortex/approval_policy.py @@ -0,0 +1,74 @@ +from abc import ABC, abstractmethod +from typing import Literal + +from cortex.approval import ApprovalMode + +ActionType = Literal[ + "show", + "file_edit", + "shell_command", +] + + +class ApprovalPolicy(ABC): + """ + Base class for approval policies. + Defines what actions are allowed under each approval mode. + """ + + def __init__(self, mode: ApprovalMode): + self.mode = mode + + @abstractmethod + def allow(self, action: ActionType) -> bool: + """Return True if the action is allowed.""" + pass + + @abstractmethod + def requires_confirmation(self, action: ActionType) -> bool: + """Return True if the action requires user confirmation.""" + pass + + +# ------------------ Policies ------------------ + + +class SuggestPolicy(ApprovalPolicy): + def allow(self, action: ActionType) -> bool: + return action == "show" + + def requires_confirmation(self, action: ActionType) -> bool: + return False + + +class AutoEditPolicy(ApprovalPolicy): + def allow(self, action: ActionType) -> bool: + return action in {"show", "file_edit", "shell_command"} + + def requires_confirmation(self, action: ActionType) -> bool: + return action == "shell_command" + + +class FullAutoPolicy(ApprovalPolicy): + def allow(self, action: ActionType) -> bool: + return True + + def requires_confirmation(self, action: ActionType) -> bool: + return False + + +# ------------------ Factory ------------------ + + +def get_approval_policy(mode: ApprovalMode) -> ApprovalPolicy: + """ + Factory method to get the correct approval policy. + """ + if mode == ApprovalMode.SUGGEST: + return SuggestPolicy(mode) + if mode == ApprovalMode.AUTO_EDIT: + return AutoEditPolicy(mode) + if mode == ApprovalMode.FULL_AUTO: + return FullAutoPolicy(mode) + + raise ValueError(f"Unsupported approval mode: {mode}") diff --git a/cortex/cli.py b/cortex/cli.py index ea8976d1..bf6fa550 100644 --- a/cortex/cli.py +++ b/cortex/cli.py @@ -8,6 +8,8 @@ from typing import TYPE_CHECKING, Any from cortex.api_key_detector import auto_detect_api_key, setup_api_key +from cortex.approval import ApprovalMode +from cortex.approval_policy import get_approval_policy from cortex.ask import AskHandler from cortex.branding import VERSION, console, cx_header, cx_print, show_banner from cortex.coordinator import InstallationCoordinator, InstallationStep, StepStatus @@ -24,6 +26,7 @@ from cortex.network_config import NetworkConfig from cortex.notification_manager import NotificationManager from cortex.stack_manager import StackManager +from cortex.user_preferences import UserPreferences from cortex.validators import validate_api_key, validate_install_request if TYPE_CHECKING: @@ -41,6 +44,8 @@ def __init__(self, verbose: bool = False): self.spinner_chars = ["⠋", "⠙", "⠹", "⠸", "⠼", "⠴", "⠦", "⠧", "⠇", "⠏"] self.spinner_idx = 0 self.verbose = verbose + prefs = UserPreferences.load() + self.approval_policy = get_approval_policy(prefs.approval_mode) # Define a method to handle Docker-specific permission repairs def docker_permissions(self, args: argparse.Namespace) -> int: @@ -1078,6 +1083,45 @@ def env(self, args: argparse.Namespace) -> int: traceback.print_exc() return 1 + def config(self, args: argparse.Namespace) -> int: + """Handle configuration commands.""" + action = getattr(args, "config_action", None) + + if not action: + self._print_error("Please specify a subcommand (set)") + return 1 + + try: + if action == "set": + return self._config_set(args) + else: + self._print_error(f"Unknown config subcommand: {action}") + return 1 + except ValueError as e: + self._print_error(str(e)) + return 1 + except Exception as e: + self._print_error(f"Unexpected error: {e}") + if self.verbose: + import traceback + + traceback.print_exc() + return 1 + + def _config_set(self, args: argparse.Namespace) -> int: + key = args.key + value = args.value + + if key != "approval-mode": + raise ValueError(f"Unknown config key '{key}'. Supported keys: approval-mode") + + prefs = UserPreferences.load() + prefs.approval_mode = ApprovalMode.from_string(value) + prefs.save() + + console.print(f"[green]✔ Approval mode set to '{prefs.approval_mode.value}'[/green]") + return 0 + def _env_set(self, env_mgr: EnvironmentManager, args: argparse.Namespace) -> int: """Set an environment variable.""" app = args.app @@ -2127,6 +2171,14 @@ def main(): # Wizard command wizard_parser = subparsers.add_parser("wizard", help="Configure API key interactively") + # Config command + config_parser = subparsers.add_parser("config", help="Manage Cortex configuration") + config_subs = config_parser.add_subparsers(dest="config_action", help="Config actions") + + config_set_parser = config_subs.add_parser("set", help="Set a configuration value") + config_set_parser.add_argument("key", help="Configuration key") + config_set_parser.add_argument("value", help="Configuration value") + # Status command (includes comprehensive health checks) subparsers.add_parser("status", help="Show comprehensive system status and health checks") @@ -2531,6 +2583,8 @@ def main(): return 1 elif args.command == "env": return cli.env(args) + elif args.command == "config": + return cli.config(args) else: parser.print_help() return 1 diff --git a/cortex/confirm.py b/cortex/confirm.py new file mode 100644 index 00000000..9722f3eb --- /dev/null +++ b/cortex/confirm.py @@ -0,0 +1,6 @@ +def confirm_action(message: str) -> bool: + try: + response = input(f"{message} [y/N]: ").strip().lower() + except EOFError: + return False + return response in {"y", "yes"} diff --git a/cortex/coordinator.py b/cortex/coordinator.py index ac19bf80..db673ae8 100644 --- a/cortex/coordinator.py +++ b/cortex/coordinator.py @@ -9,6 +9,10 @@ from enum import Enum from typing import Any +from cortex.approval import ApprovalMode +from cortex.approval_policy import get_approval_policy +from cortex.confirm import confirm_action +from cortex.user_preferences import UserPreferences from cortex.validators import DANGEROUS_PATTERNS logger = logging.getLogger(__name__) @@ -60,6 +64,7 @@ def __init__( enable_rollback: bool = False, log_file: str | None = None, progress_callback: Callable[[int, int, InstallationStep], None] | None = None, + approval_policy=None, ): """Initialize an installation run with optional logging and rollback.""" self.timeout = timeout @@ -79,6 +84,20 @@ def __init__( ] self.rollback_commands: list[str] = [] + self._explicit_policy = approval_policy is not None + # 🔐 Load approval policy once + if approval_policy is not None: + self.approval_policy = approval_policy + else: + approval_mode = ApprovalMode.FULL_AUTO + try: + prefs = UserPreferences.load() + if hasattr(prefs, "approval_mode"): + approval_mode = prefs.approval_mode + except Exception: + pass + + self.approval_policy = get_approval_policy(approval_mode) @classmethod def from_plan( @@ -160,12 +179,31 @@ def _validate_command(self, command: str) -> tuple: return True, None def _execute_command(self, step: InstallationStep) -> bool: - step.status = StepStatus.RUNNING + from cortex.approval import ApprovalMode + + # 🚫 SUGGEST MODE: skip execution (ONLY when explicitly enabled) + if self._explicit_policy and self.approval_policy.mode == ApprovalMode.SUGGEST: + step.start_time = time.time() + step.end_time = step.start_time + step.status = StepStatus.SKIPPED + step.error = "Execution skipped in suggest mode" + self._log("Suggest mode: execution skipped") + return False # not a failure, but did not execute + + # Normal execution starts here step.start_time = time.time() - + step.status = StepStatus.RUNNING self._log(f"Executing: {step.command}") - # Validate command before execution + if self.approval_policy.requires_confirmation("shell_command"): + if not confirm_action("Execute planned shell commands?"): + step.status = StepStatus.FAILED + step.error = "User declined execution" + step.end_time = time.time() + self._log("Execution declined by user") + return False + + # Validate command is_valid, error = self._validate_command(step.command) if not is_valid: step.status = StepStatus.FAILED @@ -175,11 +213,12 @@ def _execute_command(self, step: InstallationStep) -> bool: return False try: - # Use shell=True carefully - commands are validated first - # For complex shell commands (pipes, redirects), shell=True is needed - # Simple commands could use shlex.split() with shell=False result = subprocess.run( - step.command, shell=True, capture_output=True, text=True, timeout=self.timeout + step.command, + shell=True, + capture_output=True, + text=True, + timeout=self.timeout, ) step.return_code = result.returncode @@ -191,10 +230,10 @@ def _execute_command(self, step: InstallationStep) -> bool: step.status = StepStatus.SUCCESS self._log(f"Success: {step.command}") return True - else: - step.status = StepStatus.FAILED - self._log(f"Failed: {step.command} (exit code: {result.returncode})") - return False + + step.status = StepStatus.FAILED + self._log(f"Failed: {step.command} (exit code: {result.returncode})") + return False except subprocess.TimeoutExpired: step.status = StepStatus.FAILED @@ -207,7 +246,7 @@ def _execute_command(self, step: InstallationStep) -> bool: step.status = StepStatus.FAILED step.error = str(e) step.end_time = time.time() - self._log(f"Error: {step.command} - {str(e)}") + self._log(f"Error: {step.command} - {e}") return False def _rollback(self): @@ -219,7 +258,12 @@ def _rollback(self): for cmd in reversed(self.rollback_commands): try: self._log(f"Rollback: {cmd}") - subprocess.run(cmd, shell=True, capture_output=True, timeout=self.timeout) + subprocess.run( + cmd, + shell=True, + capture_output=True, + timeout=self.timeout, + ) except Exception as e: self._log(f"Rollback failed: {cmd} - {str(e)}") @@ -228,7 +272,6 @@ def add_rollback_command(self, command: str): self.rollback_commands.append(command) def execute(self) -> InstallationResult: - """Run each installation step and capture structured results.""" start_time = time.time() failed_step_index = None @@ -242,9 +285,10 @@ def execute(self) -> InstallationResult: if not success: failed_step_index = i + if self.stop_on_error: - for remaining_step in self.steps[i + 1 :]: - remaining_step.status = StepStatus.SKIPPED + for remaining in self.steps[i + 1 :]: + remaining.status = StepStatus.SKIPPED if self.enable_rollback: self._rollback() @@ -263,10 +307,11 @@ def execute(self) -> InstallationResult: total_duration = time.time() - start_time all_success = all(s.status == StepStatus.SUCCESS for s in self.steps) - if all_success: - self._log("Installation completed successfully") - else: - self._log("Installation completed with errors") + self._log( + "Installation completed successfully" + if all_success + else "Installation completed with errors" + ) return InstallationResult( success=all_success, @@ -279,9 +324,7 @@ def execute(self) -> InstallationResult: ) def verify_installation(self, verify_commands: list[str]) -> dict[str, bool]: - """Execute verification commands and return per-command success.""" verification_results = {} - self._log("Starting verification...") for cmd in verify_commands: diff --git a/cortex/user_preferences.py b/cortex/user_preferences.py new file mode 100644 index 00000000..32c55e89 --- /dev/null +++ b/cortex/user_preferences.py @@ -0,0 +1,55 @@ +from __future__ import annotations + +import json +from dataclasses import asdict, dataclass +from pathlib import Path +from typing import Any + +from cortex.approval import ApprovalMode + +# Default config location (adjust ONLY if your project already defines this elsewhere) +DEFAULT_CONFIG_PATH = Path.home() / ".cortex" / "config.json" + + +@dataclass +class UserPreferences: + """ + Stores persistent user configuration for Cortex. + """ + + approval_mode: ApprovalMode = ApprovalMode.SUGGEST + + @classmethod + def load(cls, path: Path = DEFAULT_CONFIG_PATH) -> UserPreferences: + """ + Load user preferences from disk. + Falls back to defaults if config does not exist. + """ + if not path.exists(): + return cls() + + try: + with path.open("r", encoding="utf-8") as f: + data: dict[str, Any] = json.load(f) + except (json.JSONDecodeError, OSError): + # Corrupt config → fail safe + return cls() + + return cls( + approval_mode=ApprovalMode.from_string( + data.get("approval_mode", ApprovalMode.SUGGEST.value) + ) + ) + + def save(self, path: Path = DEFAULT_CONFIG_PATH) -> None: + """ + Persist user preferences to disk. + """ + path.parent.mkdir(parents=True, exist_ok=True) + + data = { + "approval_mode": self.approval_mode.value, + } + + with path.open("w", encoding="utf-8") as f: + json.dump(data, f, indent=2) diff --git a/docs/approval_modes.md b/docs/approval_modes.md new file mode 100644 index 00000000..aeb10187 --- /dev/null +++ b/docs/approval_modes.md @@ -0,0 +1,37 @@ +# Approval Modes + +## Overview + +Cortex supports **tiered approval modes** that control how and when system actions +(such as shell commands) are executed. +This feature improves safety, transparency, and flexibility for different workflows. + +Approval mode is a **persistent user setting** and applies across all Cortex commands. + +--- + +## Available Modes + +| Mode | Description | Execution Behavior | +|------|------------|--------------------| +| `suggest` | Plan-only mode | Commands are generated but **never executed** | +| `auto-edit` | Confirmed execution | Commands execute **only after user confirmation** | +| `full-auto` | Fully automatic | Commands execute **without prompts** | + +--- + +## Setting the Approval Mode + +Use the CLI to set the approval mode: + +```bash +cortex config set approval-mode suggest +cortex config set approval-mode auto-edit +cortex config set approval-mode full-auto +``` + +commands are executed only when --execute flag is provided + +```bash +cortex config set approval-mode full-auto,cortex install pandas --execute +``` \ No newline at end of file diff --git a/tests/test_approval_policy.py b/tests/test_approval_policy.py new file mode 100644 index 00000000..e41333e7 --- /dev/null +++ b/tests/test_approval_policy.py @@ -0,0 +1,24 @@ +from cortex.approval import ApprovalMode +from cortex.approval_policy import get_approval_policy + + +def test_suggest_policy(): + policy = get_approval_policy(ApprovalMode.SUGGEST) + + assert policy.allow("show") + assert not policy.allow("shell_command") + assert not policy.requires_confirmation("shell_command") + + +def test_auto_edit_policy(): + policy = get_approval_policy(ApprovalMode.AUTO_EDIT) + + assert policy.allow("shell_command") + assert policy.requires_confirmation("shell_command") + + +def test_full_auto_policy(): + policy = get_approval_policy(ApprovalMode.FULL_AUTO) + + assert policy.allow("shell_command") + assert not policy.requires_confirmation("shell_command") diff --git a/tests/test_installation_coordinator_approval.py b/tests/test_installation_coordinator_approval.py new file mode 100644 index 00000000..66427330 --- /dev/null +++ b/tests/test_installation_coordinator_approval.py @@ -0,0 +1,22 @@ +from cortex.approval import ApprovalMode +from cortex.coordinator import InstallationCoordinator +from cortex.user_preferences import UserPreferences + + +def test_suggest_mode_blocks_execution(monkeypatch): + # Force suggest mode + monkeypatch.setattr( + UserPreferences, + "load", + lambda: type("Prefs", (), {"approval_mode": ApprovalMode.SUGGEST})(), + ) + + coordinator = InstallationCoordinator( + commands=["echo hello"], + descriptions=["test command"], + ) + + result = coordinator.execute() + + assert not result.success + assert result.steps[0].status.value == "skipped"