-
-
Notifications
You must be signed in to change notification settings - Fork 49
feat: add tiered approval modes withexecution policy, CLI support, an… #459
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
311590c
7a9f249
3464765
ffe9247
7e442d9
ede1f9a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,24 @@ | ||
| from enum import Enum | ||
|
|
||
|
|
||
| class ApprovalMode(str, Enum): | ||
| """ | ||
| Defines how much autonomy Cortex has when performing actions. | ||
| This is the single source of truth for approval modes. | ||
| """ | ||
|
|
||
| SUGGEST = "suggest" | ||
| AUTO_EDIT = "auto-edit" | ||
| FULL_AUTO = "full-auto" | ||
|
|
||
| @classmethod | ||
| def from_string(cls, value: str) -> "ApprovalMode": | ||
| """ | ||
| Convert user/config input into ApprovalMode. | ||
| Raises ValueError for invalid modes. | ||
| """ | ||
| try: | ||
| return cls(value) | ||
| except ValueError as exc: | ||
| valid = ", ".join([m.value for m in cls]) | ||
| raise ValueError(f"Invalid approval mode '{value}'. Valid modes are: {valid}") from exc |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,74 @@ | ||
| from abc import ABC, abstractmethod | ||
| from typing import Literal | ||
|
|
||
| from cortex.approval import ApprovalMode | ||
|
|
||
| ActionType = Literal[ | ||
| "show", | ||
| "file_edit", | ||
| "shell_command", | ||
| ] | ||
|
|
||
|
|
||
| class ApprovalPolicy(ABC): | ||
| """ | ||
| Base class for approval policies. | ||
| Defines what actions are allowed under each approval mode. | ||
| """ | ||
|
|
||
| def __init__(self, mode: ApprovalMode): | ||
| self.mode = mode | ||
|
|
||
| @abstractmethod | ||
| def allow(self, action: ActionType) -> bool: | ||
| """Return True if the action is allowed.""" | ||
| pass | ||
|
|
||
| @abstractmethod | ||
| def requires_confirmation(self, action: ActionType) -> bool: | ||
| """Return True if the action requires user confirmation.""" | ||
| pass | ||
|
|
||
|
|
||
| # ------------------ Policies ------------------ | ||
|
|
||
|
|
||
| class SuggestPolicy(ApprovalPolicy): | ||
| def allow(self, action: ActionType) -> bool: | ||
| return action == "show" | ||
|
|
||
| def requires_confirmation(self, action: ActionType) -> bool: | ||
| return False | ||
|
|
||
|
|
||
| class AutoEditPolicy(ApprovalPolicy): | ||
| def allow(self, action: ActionType) -> bool: | ||
| return action in {"show", "file_edit", "shell_command"} | ||
|
|
||
| def requires_confirmation(self, action: ActionType) -> bool: | ||
| return action == "shell_command" | ||
|
|
||
|
|
||
| class FullAutoPolicy(ApprovalPolicy): | ||
| def allow(self, action: ActionType) -> bool: | ||
| return True | ||
|
|
||
| def requires_confirmation(self, action: ActionType) -> bool: | ||
| return False | ||
|
|
||
|
|
||
| # ------------------ Factory ------------------ | ||
|
|
||
|
|
||
| def get_approval_policy(mode: ApprovalMode) -> ApprovalPolicy: | ||
| """ | ||
| Factory method to get the correct approval policy. | ||
| """ | ||
| if mode == ApprovalMode.SUGGEST: | ||
| return SuggestPolicy(mode) | ||
| if mode == ApprovalMode.AUTO_EDIT: | ||
| return AutoEditPolicy(mode) | ||
| if mode == ApprovalMode.FULL_AUTO: | ||
| return FullAutoPolicy(mode) | ||
|
|
||
| raise ValueError(f"Unsupported approval mode: {mode}") | ||
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -8,6 +8,8 @@ | |||||||||||||||||||||
| from typing import TYPE_CHECKING, Any | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| from cortex.api_key_detector import auto_detect_api_key, setup_api_key | ||||||||||||||||||||||
| from cortex.approval import ApprovalMode | ||||||||||||||||||||||
| from cortex.approval_policy import get_approval_policy | ||||||||||||||||||||||
| from cortex.ask import AskHandler | ||||||||||||||||||||||
| from cortex.branding import VERSION, console, cx_header, cx_print, show_banner | ||||||||||||||||||||||
| from cortex.coordinator import InstallationCoordinator, InstallationStep, StepStatus | ||||||||||||||||||||||
|
|
@@ -24,6 +26,7 @@ | |||||||||||||||||||||
| from cortex.network_config import NetworkConfig | ||||||||||||||||||||||
| from cortex.notification_manager import NotificationManager | ||||||||||||||||||||||
| from cortex.stack_manager import StackManager | ||||||||||||||||||||||
| from cortex.user_preferences import UserPreferences | ||||||||||||||||||||||
coderabbitai[bot] marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||||||||||||||
| from cortex.validators import validate_api_key, validate_install_request | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| if TYPE_CHECKING: | ||||||||||||||||||||||
|
|
@@ -41,6 +44,8 @@ def __init__(self, verbose: bool = False): | |||||||||||||||||||||
| self.spinner_chars = ["⠋", "⠙", "⠹", "⠸", "⠼", "⠴", "⠦", "⠧", "⠇", "⠏"] | ||||||||||||||||||||||
| self.spinner_idx = 0 | ||||||||||||||||||||||
| self.verbose = verbose | ||||||||||||||||||||||
| prefs = UserPreferences.load() | ||||||||||||||||||||||
| self.approval_policy = get_approval_policy(prefs.approval_mode) | ||||||||||||||||||||||
|
Comment on lines
+47
to
+48
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
The approval policy is loaded and stored, but the Either:
coordinator = InstallationCoordinator(
commands=commands,
descriptions=[f"Step {i + 1}" for i in range(len(commands))],
timeout=300,
stop_on_error=True,
progress_callback=progress_callback,
+ approval_policy=self.approval_policy,
)📝 Committable suggestion
Suggested change
🧰 Tools🪛 GitHub Actions: CI[error] 42-42: Undefined name 'get_approval_policy' (F821) detected by ruff during 'ruff check . --output-format=github'. 🪛 GitHub Actions: Cortex Automation[error] 42-42: F821 Undefined name 'get_approval_policy'. 🪛 GitHub Check: lint[failure] 42-42: Ruff (F821) 🪛 GitHub Check: Lint[failure] 42-42: Ruff (F821) 🤖 Prompt for AI Agents |
||||||||||||||||||||||
|
|
||||||||||||||||||||||
| # Define a method to handle Docker-specific permission repairs | ||||||||||||||||||||||
| def docker_permissions(self, args: argparse.Namespace) -> int: | ||||||||||||||||||||||
|
|
@@ -1078,6 +1083,45 @@ def env(self, args: argparse.Namespace) -> int: | |||||||||||||||||||||
| traceback.print_exc() | ||||||||||||||||||||||
| return 1 | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| def config(self, args: argparse.Namespace) -> int: | ||||||||||||||||||||||
| """Handle configuration commands.""" | ||||||||||||||||||||||
| action = getattr(args, "config_action", None) | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| if not action: | ||||||||||||||||||||||
| self._print_error("Please specify a subcommand (set)") | ||||||||||||||||||||||
| return 1 | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| try: | ||||||||||||||||||||||
| if action == "set": | ||||||||||||||||||||||
| return self._config_set(args) | ||||||||||||||||||||||
| else: | ||||||||||||||||||||||
| self._print_error(f"Unknown config subcommand: {action}") | ||||||||||||||||||||||
| return 1 | ||||||||||||||||||||||
| except ValueError as e: | ||||||||||||||||||||||
| self._print_error(str(e)) | ||||||||||||||||||||||
| return 1 | ||||||||||||||||||||||
| except Exception as e: | ||||||||||||||||||||||
| self._print_error(f"Unexpected error: {e}") | ||||||||||||||||||||||
| if self.verbose: | ||||||||||||||||||||||
| import traceback | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| traceback.print_exc() | ||||||||||||||||||||||
| return 1 | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| def _config_set(self, args: argparse.Namespace) -> int: | ||||||||||||||||||||||
| key = args.key | ||||||||||||||||||||||
| value = args.value | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| if key != "approval-mode": | ||||||||||||||||||||||
| raise ValueError(f"Unknown config key '{key}'. Supported keys: approval-mode") | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| prefs = UserPreferences.load() | ||||||||||||||||||||||
| prefs.approval_mode = ApprovalMode.from_string(value) | ||||||||||||||||||||||
| prefs.save() | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| console.print(f"[green]✔ Approval mode set to '{prefs.approval_mode.value}'[/green]") | ||||||||||||||||||||||
| return 0 | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| def _env_set(self, env_mgr: EnvironmentManager, args: argparse.Namespace) -> int: | ||||||||||||||||||||||
| """Set an environment variable.""" | ||||||||||||||||||||||
| app = args.app | ||||||||||||||||||||||
|
|
@@ -2127,6 +2171,14 @@ def main(): | |||||||||||||||||||||
| # Wizard command | ||||||||||||||||||||||
| wizard_parser = subparsers.add_parser("wizard", help="Configure API key interactively") | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| # Config command | ||||||||||||||||||||||
| config_parser = subparsers.add_parser("config", help="Manage Cortex configuration") | ||||||||||||||||||||||
| config_subs = config_parser.add_subparsers(dest="config_action", help="Config actions") | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| config_set_parser = config_subs.add_parser("set", help="Set a configuration value") | ||||||||||||||||||||||
| config_set_parser.add_argument("key", help="Configuration key") | ||||||||||||||||||||||
| config_set_parser.add_argument("value", help="Configuration value") | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| # Status command (includes comprehensive health checks) | ||||||||||||||||||||||
| subparsers.add_parser("status", help="Show comprehensive system status and health checks") | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
|
|
@@ -2531,6 +2583,8 @@ def main(): | |||||||||||||||||||||
| return 1 | ||||||||||||||||||||||
| elif args.command == "env": | ||||||||||||||||||||||
| return cli.env(args) | ||||||||||||||||||||||
| elif args.command == "config": | ||||||||||||||||||||||
| return cli.config(args) | ||||||||||||||||||||||
| else: | ||||||||||||||||||||||
| parser.print_help() | ||||||||||||||||||||||
| return 1 | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,6 @@ | ||
| def confirm_action(message: str) -> bool: | ||
| try: | ||
| response = input(f"{message} [y/N]: ").strip().lower() | ||
| except EOFError: | ||
| return False | ||
| return response in {"y", "yes"} |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -9,6 +9,10 @@ | |
| from enum import Enum | ||
| from typing import Any | ||
|
|
||
| from cortex.approval import ApprovalMode | ||
| from cortex.approval_policy import get_approval_policy | ||
| from cortex.confirm import confirm_action | ||
| from cortex.user_preferences import UserPreferences | ||
| from cortex.validators import DANGEROUS_PATTERNS | ||
|
|
||
| logger = logging.getLogger(__name__) | ||
|
|
@@ -60,6 +64,7 @@ def __init__( | |
| enable_rollback: bool = False, | ||
| log_file: str | None = None, | ||
| progress_callback: Callable[[int, int, InstallationStep], None] | None = None, | ||
| approval_policy=None, | ||
| ): | ||
| """Initialize an installation run with optional logging and rollback.""" | ||
| self.timeout = timeout | ||
|
|
@@ -79,6 +84,20 @@ def __init__( | |
| ] | ||
|
|
||
| self.rollback_commands: list[str] = [] | ||
| self._explicit_policy = approval_policy is not None | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Design flaw: _explicit_policy flag undermines the approval system. The
Recommendation: Remove the 🔎 Proposed fix to remove _explicit_policy flag self.rollback_commands: list[str] = []
- self._explicit_policy = approval_policy is not None
# 🔐 Load approval policy once
if approval_policy is not None:
self.approval_policy = approval_policy
else:
- approval_mode = ApprovalMode.FULL_AUTO
+ approval_mode = ApprovalMode.SUGGEST
try:
prefs = UserPreferences.load()
if hasattr(prefs, "approval_mode"):
approval_mode = prefs.approval_mode
except Exception:
pass
self.approval_policy = get_approval_policy(approval_mode)Then remove all references to
🤖 Prompt for AI Agents |
||
| # 🔐 Load approval policy once | ||
| if approval_policy is not None: | ||
| self.approval_policy = approval_policy | ||
| else: | ||
| approval_mode = ApprovalMode.FULL_AUTO | ||
| try: | ||
| prefs = UserPreferences.load() | ||
| if hasattr(prefs, "approval_mode"): | ||
| approval_mode = prefs.approval_mode | ||
| except Exception: | ||
| pass | ||
|
|
||
| self.approval_policy = get_approval_policy(approval_mode) | ||
|
Comment on lines
+87
to
+100
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Unsafe default: FULL_AUTO contradicts safe-by-default principles. Lines 92-98 default to Additionally, the broad Based on learnings, the safe default should be 🔎 Proposed fix to default to SUGGEST mode self._explicit_policy = approval_policy is not None
# 🔐 Load approval policy once
if approval_policy is not None:
self.approval_policy = approval_policy
else:
- approval_mode = ApprovalMode.FULL_AUTO
+ approval_mode = ApprovalMode.SUGGEST
try:
prefs = UserPreferences.load()
if hasattr(prefs, "approval_mode"):
approval_mode = prefs.approval_mode
except Exception:
+ # Fail safe: default to most restrictive mode
pass
self.approval_policy = get_approval_policy(approval_mode)🤖 Prompt for AI Agents |
||
|
|
||
| @classmethod | ||
| def from_plan( | ||
|
|
@@ -160,12 +179,31 @@ def _validate_command(self, command: str) -> tuple: | |
| return True, None | ||
|
|
||
| def _execute_command(self, step: InstallationStep) -> bool: | ||
| step.status = StepStatus.RUNNING | ||
| from cortex.approval import ApprovalMode | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion | 🟠 Major Remove redundant import.
🔎 Proposed fix def _execute_command(self, step: InstallationStep) -> bool:
- from cortex.approval import ApprovalMode
-
step.start_time = time.time()🤖 Prompt for AI Agents |
||
|
|
||
| # 🚫 SUGGEST MODE: skip execution (ONLY when explicitly enabled) | ||
| if self._explicit_policy and self.approval_policy.mode == ApprovalMode.SUGGEST: | ||
| step.start_time = time.time() | ||
| step.end_time = step.start_time | ||
| step.status = StepStatus.SKIPPED | ||
| step.error = "Execution skipped in suggest mode" | ||
| self._log("Suggest mode: execution skipped") | ||
| return False # not a failure, but did not execute | ||
|
|
||
| # Normal execution starts here | ||
| step.start_time = time.time() | ||
|
|
||
| step.status = StepStatus.RUNNING | ||
| self._log(f"Executing: {step.command}") | ||
|
|
||
| # Validate command before execution | ||
| if self.approval_policy.requires_confirmation("shell_command"): | ||
| if not confirm_action("Execute planned shell commands?"): | ||
| step.status = StepStatus.FAILED | ||
| step.error = "User declined execution" | ||
| step.end_time = time.time() | ||
| self._log("Execution declined by user") | ||
| return False | ||
|
|
||
| # Validate command | ||
| is_valid, error = self._validate_command(step.command) | ||
| if not is_valid: | ||
| step.status = StepStatus.FAILED | ||
|
|
@@ -175,11 +213,12 @@ def _execute_command(self, step: InstallationStep) -> bool: | |
| return False | ||
|
|
||
| try: | ||
| # Use shell=True carefully - commands are validated first | ||
| # For complex shell commands (pipes, redirects), shell=True is needed | ||
| # Simple commands could use shlex.split() with shell=False | ||
| result = subprocess.run( | ||
| step.command, shell=True, capture_output=True, text=True, timeout=self.timeout | ||
| step.command, | ||
| shell=True, | ||
| capture_output=True, | ||
| text=True, | ||
| timeout=self.timeout, | ||
| ) | ||
|
|
||
| step.return_code = result.returncode | ||
|
|
@@ -191,10 +230,10 @@ def _execute_command(self, step: InstallationStep) -> bool: | |
| step.status = StepStatus.SUCCESS | ||
| self._log(f"Success: {step.command}") | ||
| return True | ||
| else: | ||
| step.status = StepStatus.FAILED | ||
| self._log(f"Failed: {step.command} (exit code: {result.returncode})") | ||
| return False | ||
|
|
||
| step.status = StepStatus.FAILED | ||
| self._log(f"Failed: {step.command} (exit code: {result.returncode})") | ||
| return False | ||
|
|
||
| except subprocess.TimeoutExpired: | ||
| step.status = StepStatus.FAILED | ||
|
|
@@ -207,7 +246,7 @@ def _execute_command(self, step: InstallationStep) -> bool: | |
| step.status = StepStatus.FAILED | ||
| step.error = str(e) | ||
| step.end_time = time.time() | ||
| self._log(f"Error: {step.command} - {str(e)}") | ||
| self._log(f"Error: {step.command} - {e}") | ||
| return False | ||
|
|
||
| def _rollback(self): | ||
|
|
@@ -219,7 +258,12 @@ def _rollback(self): | |
| for cmd in reversed(self.rollback_commands): | ||
| try: | ||
| self._log(f"Rollback: {cmd}") | ||
| subprocess.run(cmd, shell=True, capture_output=True, timeout=self.timeout) | ||
| subprocess.run( | ||
| cmd, | ||
| shell=True, | ||
| capture_output=True, | ||
| timeout=self.timeout, | ||
| ) | ||
|
Comment on lines
259
to
+266
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🧩 Analysis chain🏁 Script executed: # First, let's find the coordinator.py file and examine the relevant sections
find . -name "coordinator.py" -type fRepository: cortexlinux/cortex Length of output: 85 🏁 Script executed: # Get the full context around lines 267-274 and understand the _rollback method
if [ -f "cortex/coordinator.py" ]; then
echo "=== Lines 250-290 of _rollback method ==="
sed -n '250,290p' cortex/coordinator.py | cat -n
fiRepository: cortexlinux/cortex Length of output: 1760 🏁 Script executed: # Search for approval policy checks in the file
if [ -f "cortex/coordinator.py" ]; then
echo "=== Searching for approval policy references ==="
rg "approval|_explicit_policy|SUGGEST|APPROVE" cortex/coordinator.py -n -A 2 -B 2
fiRepository: cortexlinux/cortex Length of output: 2571 🏁 Script executed: # Find the _execute_command method to understand how approval is checked
if [ -f "cortex/coordinator.py" ]; then
echo "=== Looking for _execute_command method ==="
rg "_execute_command" cortex/coordinator.py -n -A 10 -B 2
fiRepository: cortexlinux/cortex Length of output: 1181 🏁 Script executed: # Look for verify_installation method mentioned in the review comment
if [ -f "cortex/coordinator.py" ]; then
echo "=== Looking for verify_installation method ==="
rg "def verify_installation" cortex/coordinator.py -n -A 20
fiRepository: cortexlinux/cortex Length of output: 1218 🏁 Script executed: # Check for any docstrings or comments about rollback design
if [ -f "cortex/coordinator.py" ]; then
echo "=== Checking context and docstrings ==="
sed -n '255,280p' cortex/coordinator.py | cat -n
fiRepository: cortexlinux/cortex Length of output: 1161 Missing approval policy enforcement in rollback and verify_installation methods. The
If rollback should always execute regardless of approval mode (recommended for safety), document this design decision with a comment explaining why rollback is exempt from approval checks. If it should respect the policy for consistency, apply approval checks similar to 🤖 Prompt for AI Agents |
||
| except Exception as e: | ||
| self._log(f"Rollback failed: {cmd} - {str(e)}") | ||
|
|
||
|
|
@@ -228,7 +272,6 @@ def add_rollback_command(self, command: str): | |
| self.rollback_commands.append(command) | ||
|
|
||
| def execute(self) -> InstallationResult: | ||
| """Run each installation step and capture structured results.""" | ||
| start_time = time.time() | ||
| failed_step_index = None | ||
|
|
||
|
|
@@ -242,9 +285,10 @@ def execute(self) -> InstallationResult: | |
|
|
||
| if not success: | ||
| failed_step_index = i | ||
|
|
||
| if self.stop_on_error: | ||
| for remaining_step in self.steps[i + 1 :]: | ||
| remaining_step.status = StepStatus.SKIPPED | ||
| for remaining in self.steps[i + 1 :]: | ||
| remaining.status = StepStatus.SKIPPED | ||
|
|
||
| if self.enable_rollback: | ||
| self._rollback() | ||
|
|
@@ -263,10 +307,11 @@ def execute(self) -> InstallationResult: | |
| total_duration = time.time() - start_time | ||
| all_success = all(s.status == StepStatus.SUCCESS for s in self.steps) | ||
|
|
||
| if all_success: | ||
| self._log("Installation completed successfully") | ||
| else: | ||
| self._log("Installation completed with errors") | ||
| self._log( | ||
| "Installation completed successfully" | ||
| if all_success | ||
| else "Installation completed with errors" | ||
| ) | ||
|
|
||
| return InstallationResult( | ||
| success=all_success, | ||
|
|
@@ -279,9 +324,7 @@ def execute(self) -> InstallationResult: | |
| ) | ||
|
|
||
| def verify_installation(self, verify_commands: list[str]) -> dict[str, bool]: | ||
| """Execute verification commands and return per-command success.""" | ||
| verification_results = {} | ||
|
|
||
| self._log("Starting verification...") | ||
|
|
||
| for cmd in verify_commands: | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.