Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions cortex/approval.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
from enum import Enum


class ApprovalMode(str, Enum):
"""
Defines how much autonomy Cortex has when performing actions.
This is the single source of truth for approval modes.
"""

SUGGEST = "suggest"
AUTO_EDIT = "auto-edit"
FULL_AUTO = "full-auto"

@classmethod
def from_string(cls, value: str) -> "ApprovalMode":
"""
Convert user/config input into ApprovalMode.
Raises ValueError for invalid modes.
"""
try:
return cls(value)
except ValueError as exc:
valid = ", ".join([m.value for m in cls])
raise ValueError(f"Invalid approval mode '{value}'. Valid modes are: {valid}") from exc
74 changes: 74 additions & 0 deletions cortex/approval_policy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
from abc import ABC, abstractmethod
from typing import Literal

from cortex.approval import ApprovalMode

ActionType = Literal[
"show",
"file_edit",
"shell_command",
]


class ApprovalPolicy(ABC):
"""
Base class for approval policies.
Defines what actions are allowed under each approval mode.
"""

def __init__(self, mode: ApprovalMode):
self.mode = mode

@abstractmethod
def allow(self, action: ActionType) -> bool:
"""Return True if the action is allowed."""
pass

@abstractmethod
def requires_confirmation(self, action: ActionType) -> bool:
"""Return True if the action requires user confirmation."""
pass


# ------------------ Policies ------------------


class SuggestPolicy(ApprovalPolicy):
def allow(self, action: ActionType) -> bool:
return action == "show"

def requires_confirmation(self, action: ActionType) -> bool:
return False


class AutoEditPolicy(ApprovalPolicy):
def allow(self, action: ActionType) -> bool:
return action in {"show", "file_edit", "shell_command"}

def requires_confirmation(self, action: ActionType) -> bool:
return action == "shell_command"


class FullAutoPolicy(ApprovalPolicy):
def allow(self, action: ActionType) -> bool:
return True

def requires_confirmation(self, action: ActionType) -> bool:
return False


# ------------------ Factory ------------------


def get_approval_policy(mode: ApprovalMode) -> ApprovalPolicy:
"""
Factory method to get the correct approval policy.
"""
if mode == ApprovalMode.SUGGEST:
return SuggestPolicy(mode)
if mode == ApprovalMode.AUTO_EDIT:
return AutoEditPolicy(mode)
if mode == ApprovalMode.FULL_AUTO:
return FullAutoPolicy(mode)

raise ValueError(f"Unsupported approval mode: {mode}")
54 changes: 54 additions & 0 deletions cortex/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
from typing import TYPE_CHECKING, Any

from cortex.api_key_detector import auto_detect_api_key, setup_api_key
from cortex.approval import ApprovalMode
from cortex.approval_policy import get_approval_policy
from cortex.ask import AskHandler
from cortex.branding import VERSION, console, cx_header, cx_print, show_banner
from cortex.coordinator import InstallationCoordinator, InstallationStep, StepStatus
Expand All @@ -24,6 +26,7 @@
from cortex.network_config import NetworkConfig
from cortex.notification_manager import NotificationManager
from cortex.stack_manager import StackManager
from cortex.user_preferences import UserPreferences
from cortex.validators import validate_api_key, validate_install_request

if TYPE_CHECKING:
Expand All @@ -41,6 +44,8 @@ def __init__(self, verbose: bool = False):
self.spinner_chars = ["⠋", "⠙", "⠹", "⠸", "⠼", "⠴", "⠦", "⠧", "⠇", "⠏"]
self.spinner_idx = 0
self.verbose = verbose
prefs = UserPreferences.load()
self.approval_policy = get_approval_policy(prefs.approval_mode)
Comment on lines +47 to +48
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

self.approval_policy appears unused in CortexCLI.

The approval policy is loaded and stored, but the install() method creates a new InstallationCoordinator without passing this policy (line 734). The coordinator then defaults to FULL_AUTO, ignoring the user's configured preference.

Either:

  1. Pass the policy to the coordinator: InstallationCoordinator(..., approval_policy=self.approval_policy)
  2. Remove the unused attribute if not intended
         coordinator = InstallationCoordinator(
             commands=commands,
             descriptions=[f"Step {i + 1}" for i in range(len(commands))],
             timeout=300,
             stop_on_error=True,
             progress_callback=progress_callback,
+            approval_policy=self.approval_policy,
         )
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
prefs = UserPreferences.load()
self.approval_policy = get_approval_policy(prefs.approval_mode)
coordinator = InstallationCoordinator(
commands=commands,
descriptions=[f"Step {i + 1}" for i in range(len(commands))],
timeout=300,
stop_on_error=True,
progress_callback=progress_callback,
approval_policy=self.approval_policy,
)
🧰 Tools
🪛 GitHub Actions: CI

[error] 42-42: Undefined name 'get_approval_policy' (F821) detected by ruff during 'ruff check . --output-format=github'.

🪛 GitHub Actions: Cortex Automation

[error] 42-42: F821 Undefined name 'get_approval_policy'.

🪛 GitHub Check: lint

[failure] 42-42: Ruff (F821)
cortex/cli.py:42:32: F821 Undefined name get_approval_policy

🪛 GitHub Check: Lint

[failure] 42-42: Ruff (F821)
cortex/cli.py:42:32: F821 Undefined name get_approval_policy

🤖 Prompt for AI Agents
In cortex/cli.py around lines 41-42, self.approval_policy is set but never used;
update the install() method (around the InstallationCoordinator instantiation at
~line 734) to pass the loaded policy into the coordinator by adding
approval_policy=self.approval_policy to its constructor call so the user's
preference is respected, or if the attribute is unnecessary remove the
assignment and any unused references to self.approval_policy.


# Define a method to handle Docker-specific permission repairs
def docker_permissions(self, args: argparse.Namespace) -> int:
Expand Down Expand Up @@ -1078,6 +1083,45 @@ def env(self, args: argparse.Namespace) -> int:
traceback.print_exc()
return 1

def config(self, args: argparse.Namespace) -> int:
"""Handle configuration commands."""
action = getattr(args, "config_action", None)

if not action:
self._print_error("Please specify a subcommand (set)")
return 1

try:
if action == "set":
return self._config_set(args)
else:
self._print_error(f"Unknown config subcommand: {action}")
return 1
except ValueError as e:
self._print_error(str(e))
return 1
except Exception as e:
self._print_error(f"Unexpected error: {e}")
if self.verbose:
import traceback

traceback.print_exc()
return 1

def _config_set(self, args: argparse.Namespace) -> int:
key = args.key
value = args.value

if key != "approval-mode":
raise ValueError(f"Unknown config key '{key}'. Supported keys: approval-mode")

prefs = UserPreferences.load()
prefs.approval_mode = ApprovalMode.from_string(value)
prefs.save()

console.print(f"[green]✔ Approval mode set to '{prefs.approval_mode.value}'[/green]")
return 0

def _env_set(self, env_mgr: EnvironmentManager, args: argparse.Namespace) -> int:
"""Set an environment variable."""
app = args.app
Expand Down Expand Up @@ -2127,6 +2171,14 @@ def main():
# Wizard command
wizard_parser = subparsers.add_parser("wizard", help="Configure API key interactively")

# Config command
config_parser = subparsers.add_parser("config", help="Manage Cortex configuration")
config_subs = config_parser.add_subparsers(dest="config_action", help="Config actions")

config_set_parser = config_subs.add_parser("set", help="Set a configuration value")
config_set_parser.add_argument("key", help="Configuration key")
config_set_parser.add_argument("value", help="Configuration value")

# Status command (includes comprehensive health checks)
subparsers.add_parser("status", help="Show comprehensive system status and health checks")

Expand Down Expand Up @@ -2531,6 +2583,8 @@ def main():
return 1
elif args.command == "env":
return cli.env(args)
elif args.command == "config":
return cli.config(args)
else:
parser.print_help()
return 1
Expand Down
6 changes: 6 additions & 0 deletions cortex/confirm.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
def confirm_action(message: str) -> bool:
try:
response = input(f"{message} [y/N]: ").strip().lower()
except EOFError:
return False
return response in {"y", "yes"}
87 changes: 65 additions & 22 deletions cortex/coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@
from enum import Enum
from typing import Any

from cortex.approval import ApprovalMode
from cortex.approval_policy import get_approval_policy
from cortex.confirm import confirm_action
from cortex.user_preferences import UserPreferences
from cortex.validators import DANGEROUS_PATTERNS

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -60,6 +64,7 @@ def __init__(
enable_rollback: bool = False,
log_file: str | None = None,
progress_callback: Callable[[int, int, InstallationStep], None] | None = None,
approval_policy=None,
):
"""Initialize an installation run with optional logging and rollback."""
self.timeout = timeout
Expand All @@ -79,6 +84,20 @@ def __init__(
]

self.rollback_commands: list[str] = []
self._explicit_policy = approval_policy is not None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Design flaw: _explicit_policy flag undermines the approval system.

The _explicit_policy flag (line 87) creates a two-tier system where approval checks are only enforced when a policy is explicitly passed to the constructor. This design has several problems:

  1. Breaks primary use case: The PR objectives specify CLI/config-based approval control. The flag bypasses approval for config-based usage, breaking the main feature.

  2. No backward compatibility needed: This is a new feature, not a modification of existing behavior. There's no legacy behavior to preserve.

  3. Security by obscurity: Developers must know to explicitly pass a policy to get security guarantees, rather than security being the default.

  4. Inconsistent application: The flag is checked in _execute_command but not in _rollback or verify_installation, creating unpredictable behavior.

Recommendation: Remove the _explicit_policy flag entirely and always enforce the loaded approval policy. If programmatic bypass is truly needed for specific scenarios (e.g., internal testing), that should be achieved through a dedicated bypass mode or factory method, not by silently disabling security checks.

🔎 Proposed fix to remove _explicit_policy flag
         self.rollback_commands: list[str] = []
-        self._explicit_policy = approval_policy is not None
         # 🔐 Load approval policy once
         if approval_policy is not None:
             self.approval_policy = approval_policy
         else:
-            approval_mode = ApprovalMode.FULL_AUTO
+            approval_mode = ApprovalMode.SUGGEST
             try:
                 prefs = UserPreferences.load()
                 if hasattr(prefs, "approval_mode"):
                     approval_mode = prefs.approval_mode
             except Exception:
                 pass

             self.approval_policy = get_approval_policy(approval_mode)

Then remove all references to self._explicit_policy throughout the file (as shown in previous fix suggestions).

Committable suggestion skipped: line range outside the PR's diff.

🤖 Prompt for AI Agents
In cortex/coordinator.py around line 87, the _explicit_policy flag undermines
approval enforcement; remove the flag and all logic that conditions security
checks on it so the loaded approval policy is always enforced. Delete the
attribute assignment, remove all references/branches that check
self._explicit_policy in methods like _execute_command, _rollback,
verify_installation (and any other methods), and ensure approval checks call the
policy unconditionally; if a programmatic bypass is required later, implement a
distinct explicit bypass mode or factory method instead of a silent flag. Update
or add unit tests to reflect the always-on policy behavior and adjust
constructor/initialization docs accordingly.

# 🔐 Load approval policy once
if approval_policy is not None:
self.approval_policy = approval_policy
else:
approval_mode = ApprovalMode.FULL_AUTO
try:
prefs = UserPreferences.load()
if hasattr(prefs, "approval_mode"):
approval_mode = prefs.approval_mode
except Exception:
pass

self.approval_policy = get_approval_policy(approval_mode)
Comment on lines +87 to +100
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Unsafe default: FULL_AUTO contradicts safe-by-default principles.

Lines 92-98 default to ApprovalMode.FULL_AUTO when preference loading fails. This is the most permissive mode and contradicts the retrieved learnings that specify "Dry-run by default for all installations" and "No silent sudo execution - require explicit user confirmation."

Additionally, the broad except Exception at line 97 silently swallows all errors (including file system issues, JSON decode errors, etc.) and falls back to the most dangerous mode.

Based on learnings, the safe default should be SUGGEST mode (most restrictive), not FULL_AUTO.

🔎 Proposed fix to default to SUGGEST mode
         self._explicit_policy = approval_policy is not None
         # 🔐 Load approval policy once
         if approval_policy is not None:
             self.approval_policy = approval_policy
         else:
-            approval_mode = ApprovalMode.FULL_AUTO
+            approval_mode = ApprovalMode.SUGGEST
             try:
                 prefs = UserPreferences.load()
                 if hasattr(prefs, "approval_mode"):
                     approval_mode = prefs.approval_mode
             except Exception:
+                # Fail safe: default to most restrictive mode
                 pass

             self.approval_policy = get_approval_policy(approval_mode)
🤖 Prompt for AI Agents
In cortex/coordinator.py around lines 87 to 100, change the unsafe fallback and
broad exception handling: set the initial fallback approval_mode to
ApprovalMode.SUGGEST instead of FULL_AUTO, and replace the blanket "except
Exception:" with handling only expected errors from loading preferences (e.g.,
FileNotFoundError, JSONDecodeError, ValueError) so other unexpected errors
surface; on preference-load failure for those expected cases, silently fall back
to SUGGEST (or optionally log the load error), and for unexpected exceptions let
them propagate or be logged explicitly before falling back.


@classmethod
def from_plan(
Expand Down Expand Up @@ -160,12 +179,31 @@ def _validate_command(self, command: str) -> tuple:
return True, None

def _execute_command(self, step: InstallationStep) -> bool:
step.status = StepStatus.RUNNING
from cortex.approval import ApprovalMode
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major

Remove redundant import.

ApprovalMode is already imported at line 12. This local import is unnecessary.

🔎 Proposed fix
     def _execute_command(self, step: InstallationStep) -> bool:
-        from cortex.approval import ApprovalMode
-
         step.start_time = time.time()
🤖 Prompt for AI Agents
In cortex/coordinator.py around line 182, remove the redundant local import
"from cortex.approval import ApprovalMode" (it's already imported at line 12);
delete that single-line import and ensure imports remain organized (run
linters/tests to confirm no remaining references depend on the local import).


# 🚫 SUGGEST MODE: skip execution (ONLY when explicitly enabled)
if self._explicit_policy and self.approval_policy.mode == ApprovalMode.SUGGEST:
step.start_time = time.time()
step.end_time = step.start_time
step.status = StepStatus.SKIPPED
step.error = "Execution skipped in suggest mode"
self._log("Suggest mode: execution skipped")
return False # not a failure, but did not execute

# Normal execution starts here
step.start_time = time.time()

step.status = StepStatus.RUNNING
self._log(f"Executing: {step.command}")

# Validate command before execution
if self.approval_policy.requires_confirmation("shell_command"):
if not confirm_action("Execute planned shell commands?"):
step.status = StepStatus.FAILED
step.error = "User declined execution"
step.end_time = time.time()
self._log("Execution declined by user")
return False

# Validate command
is_valid, error = self._validate_command(step.command)
if not is_valid:
step.status = StepStatus.FAILED
Expand All @@ -175,11 +213,12 @@ def _execute_command(self, step: InstallationStep) -> bool:
return False

try:
# Use shell=True carefully - commands are validated first
# For complex shell commands (pipes, redirects), shell=True is needed
# Simple commands could use shlex.split() with shell=False
result = subprocess.run(
step.command, shell=True, capture_output=True, text=True, timeout=self.timeout
step.command,
shell=True,
capture_output=True,
text=True,
timeout=self.timeout,
)

step.return_code = result.returncode
Expand All @@ -191,10 +230,10 @@ def _execute_command(self, step: InstallationStep) -> bool:
step.status = StepStatus.SUCCESS
self._log(f"Success: {step.command}")
return True
else:
step.status = StepStatus.FAILED
self._log(f"Failed: {step.command} (exit code: {result.returncode})")
return False

step.status = StepStatus.FAILED
self._log(f"Failed: {step.command} (exit code: {result.returncode})")
return False

except subprocess.TimeoutExpired:
step.status = StepStatus.FAILED
Expand All @@ -207,7 +246,7 @@ def _execute_command(self, step: InstallationStep) -> bool:
step.status = StepStatus.FAILED
step.error = str(e)
step.end_time = time.time()
self._log(f"Error: {step.command} - {str(e)}")
self._log(f"Error: {step.command} - {e}")
return False

def _rollback(self):
Expand All @@ -219,7 +258,12 @@ def _rollback(self):
for cmd in reversed(self.rollback_commands):
try:
self._log(f"Rollback: {cmd}")
subprocess.run(cmd, shell=True, capture_output=True, timeout=self.timeout)
subprocess.run(
cmd,
shell=True,
capture_output=True,
timeout=self.timeout,
)
Comment on lines 259 to +266
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

# First, let's find the coordinator.py file and examine the relevant sections
find . -name "coordinator.py" -type f

Repository: cortexlinux/cortex

Length of output: 85


🏁 Script executed:

# Get the full context around lines 267-274 and understand the _rollback method
if [ -f "cortex/coordinator.py" ]; then
  echo "=== Lines 250-290 of _rollback method ==="
  sed -n '250,290p' cortex/coordinator.py | cat -n
fi

Repository: cortexlinux/cortex

Length of output: 1760


🏁 Script executed:

# Search for approval policy checks in the file
if [ -f "cortex/coordinator.py" ]; then
  echo "=== Searching for approval policy references ==="
  rg "approval|_explicit_policy|SUGGEST|APPROVE" cortex/coordinator.py -n -A 2 -B 2
fi

Repository: cortexlinux/cortex

Length of output: 2571


🏁 Script executed:

# Find the _execute_command method to understand how approval is checked
if [ -f "cortex/coordinator.py" ]; then
  echo "=== Looking for _execute_command method ==="
  rg "_execute_command" cortex/coordinator.py -n -A 10 -B 2
fi

Repository: cortexlinux/cortex

Length of output: 1181


🏁 Script executed:

# Look for verify_installation method mentioned in the review comment
if [ -f "cortex/coordinator.py" ]; then
  echo "=== Looking for verify_installation method ==="
  rg "def verify_installation" cortex/coordinator.py -n -A 20
fi

Repository: cortexlinux/cortex

Length of output: 1218


🏁 Script executed:

# Check for any docstrings or comments about rollback design
if [ -f "cortex/coordinator.py" ]; then
  echo "=== Checking context and docstrings ==="
  sed -n '255,280p' cortex/coordinator.py | cat -n
fi

Repository: cortexlinux/cortex

Length of output: 1161


Missing approval policy enforcement in rollback and verify_installation methods.

The _rollback method (lines 267-274) executes rollback commands without checking the approval policy, creating inconsistent behavior:

  • _execute_command respects the approval policy when explicitly set (skipping execution in SUGGEST mode, blocking if not allowed)
  • _rollback and verify_installation unconditionally execute without any policy checks

If rollback should always execute regardless of approval mode (recommended for safety), document this design decision with a comment explaining why rollback is exempt from approval checks. If it should respect the policy for consistency, apply approval checks similar to _execute_command.

🤖 Prompt for AI Agents
In cortex/coordinator.py around lines 267-274 (and the verify_installation
method), the rollback and verification paths unconditionally invoke
subprocess.run and therefore bypass the approval-policy semantics enforced by
_execute_command; either (preferred for consistency) add the same
approval-policy checks used in _execute_command (i.e., consult the approval
mode, skip execution in SUGGEST mode and raise or block when policy disallows)
before running rollback/verify commands, or if rollback must always run for
safety, add a clear comment above these calls documenting that rollback/verify
are intentionally exempt from approval checks and why; apply the chosen approach
to both _rollback and verify_installation to ensure consistent behavior and
logging.

except Exception as e:
self._log(f"Rollback failed: {cmd} - {str(e)}")

Expand All @@ -228,7 +272,6 @@ def add_rollback_command(self, command: str):
self.rollback_commands.append(command)

def execute(self) -> InstallationResult:
"""Run each installation step and capture structured results."""
start_time = time.time()
failed_step_index = None

Expand All @@ -242,9 +285,10 @@ def execute(self) -> InstallationResult:

if not success:
failed_step_index = i

if self.stop_on_error:
for remaining_step in self.steps[i + 1 :]:
remaining_step.status = StepStatus.SKIPPED
for remaining in self.steps[i + 1 :]:
remaining.status = StepStatus.SKIPPED

if self.enable_rollback:
self._rollback()
Expand All @@ -263,10 +307,11 @@ def execute(self) -> InstallationResult:
total_duration = time.time() - start_time
all_success = all(s.status == StepStatus.SUCCESS for s in self.steps)

if all_success:
self._log("Installation completed successfully")
else:
self._log("Installation completed with errors")
self._log(
"Installation completed successfully"
if all_success
else "Installation completed with errors"
)

return InstallationResult(
success=all_success,
Expand All @@ -279,9 +324,7 @@ def execute(self) -> InstallationResult:
)

def verify_installation(self, verify_commands: list[str]) -> dict[str, bool]:
"""Execute verification commands and return per-command success."""
verification_results = {}

self._log("Starting verification...")

for cmd in verify_commands:
Expand Down
Loading
Loading