Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
8746c48
feat: add permission auditor & fixer
altynai9128 Jan 7, 2026
2191e3f
style: apply black formatting and fix ruff warnings
altynai9128 Jan 7, 2026
990b727
feat: implement Permission Auditor & Fixer (Issue #446)
altynai9128 Jan 7, 2026
3eb0961
Add: new tests, updated old once for the full coverage
altynai9128 Jan 7, 2026
9905c8f
Add: added one more test feature
altynai9128 Jan 7, 2026
9106230
FIX: correct Path mocking in tests
altynai9128 Jan 7, 2026
bef7c42
fix: correct test for DockerPermissionHandler method
altynai9128 Jan 7, 2026
10a13c3
Fix: fixed docker tests again
altynai9128 Jan 7, 2026
c9aeed5
ix: address CodeRabbit review comments
altynai9128 Jan 7, 2026
b3e8926
fix: complete CodeRabbit review feedback
altynai9128 Jan 7, 2026
0508dce
fix: fixed the last coderabbit issue
altynai9128 Jan 7, 2026
99e9fbf
fix: final adjustments
altynai9128 Jan 7, 2026
8ac1364
Merge branch 'main' into permission-simple
Anshgrover23 Jan 7, 2026
e8d162a
fix: fixed conflicts
altynai9128 Jan 12, 2026
c5267c1
Merge branch 'permission-simple' of https://github.com/altynai9128/co…
altynai9128 Jan 12, 2026
a10561c
chore: remove out-of-scope files (fix_indents.py, test_hwprofiler.py)
altynai9128 Jan 12, 2026
cfad74e
Merge branch 'main' into permission-simple
altynai9128 Jan 12, 2026
0047451
Fix syntax error
altynai9128 Jan 12, 2026
40491d9
Merge branch 'permission-simple' of https://github.com/altynai9128/co…
altynai9128 Jan 12, 2026
beb2596
Fix syntax errors in cli.py
altynai9128 Jan 12, 2026
2ba682b
Fix syntax errors in cli.py
altynai9128 Jan 12, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 85 additions & 13 deletions cortex/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@
from cortex.llm.interpreter import CommandInterpreter
from cortex.network_config import NetworkConfig
from cortex.notification_manager import NotificationManager
from cortex.permissions.auditor_fixer import PermissionAuditor

PermissionManager = PermissionAuditor # Alias for compatibility
from cortex.stack_manager import StackManager
from cortex.validators import validate_api_key, validate_install_request

Expand Down Expand Up @@ -84,15 +87,17 @@ def docker_permissions(self, args: argparse.Namespace) -> int:
"warning",
)
try:
# Interactive confirmation prompt for administrative repair.
# Interactive confirmation prompt for administrative
# repair.
response = console.input(
"[bold cyan]Reclaim ownership using sudo? (y/n): [/bold cyan]"
)
if response.lower() not in ("y", "yes"):
cx_print("Operation cancelled", "info")
return 0
except (EOFError, KeyboardInterrupt):
# Graceful handling of terminal exit or manual interruption.
# Graceful handling of terminal exit or manual
# interruption.
console.print()
cx_print("Operation cancelled", "info")
return 0
Expand All @@ -116,10 +121,53 @@ def docker_permissions(self, args: argparse.Namespace) -> int:
cx_print(f"❌ {e}", "error")
return 1
except Exception as e:
# Safety net for unexpected runtime exceptions to prevent CLI crashes.
# Safety net for unexpected runtime exceptions to prevent CLI
# crashes.
cx_print(f"❌ Unexpected error: {e}", "error")
return 1

# --- Permission Auditor Command ---
def audit_permissions(self, args: argparse.Namespace) -> int:
"""Handle permission auditing and fixing.

Args:
args: Parsed command-line arguments containing path, fix, dry_run, docker, and verbose flags.

Returns:
int: 0 if no issues found or fixes successfully applied, 1 otherwise.
"""
try:
# Initialize manager
manager = PermissionManager(
verbose=getattr(args, "verbose", False),
dry_run=getattr(args, "dry_run", True),
docker_context=getattr(args, "docker", False),
)

path = getattr(args, "path", ".")
dry_run_flag = getattr(args, "dry_run", False)
fix_flag = getattr(args, "fix", False)
apply_fixes = dry_run_flag or fix_flag

# Scan and fix
result = manager.scan_and_fix(path=path, apply_fixes=apply_fixes, dry_run=dry_run_flag)

# Print report
cx_print(result["report"], "info")

# Return exit code
issues_found = result.get("issues_found", 0)
fixes_applied = result.get("fixed", False)

if issues_found == 0 or fixes_applied:
return 0
else:
return 1

except Exception as e:
cx_print(f"Error during permission audit: {e}", "error")
return 1

def _debug(self, message: str):
"""Print debug info only in verbose mode"""
if self.verbose:
Expand Down Expand Up @@ -225,7 +273,8 @@ def notify(self, args):
elif args.notify_action == "enable":
mgr.config["enabled"] = True
# Addressing CodeRabbit feedback: Ideally should use a public method instead of private _save_config,
# but keeping as is for a simple fix (or adding a save method to NotificationManager would be best).
# but keeping as is for a simple fix (or adding a save method to
# NotificationManager would be best).
mgr._save_config()
self._print_success("Notifications enabled")
return 0
Expand Down Expand Up @@ -334,8 +383,7 @@ def _handle_stack_install(self, manager: StackManager, args: argparse.Namespace)

if suggested_name != original_name:
cx_print(
f"💡 No GPU detected, using '{suggested_name}' instead of '{original_name}'",
"info",
f"💡 No GPU detected, using '{suggested_name}' instead of '{original_name}'", "info"
)

stack = manager.find_stack(suggested_name)
Expand Down Expand Up @@ -366,7 +414,7 @@ def _handle_stack_dry_run(self, stack: dict[str, Any], packages: list[str]) -> i
return 0

def _handle_stack_real_install(self, stack: dict[str, Any], packages: list[str]) -> int:
"""Install all packages in the stack."""
"""Install all packages in the stack"""
cx_print(f"\n🚀 Installing stack: {stack['name']}\n", "success")

# Batch into a single LLM request
Expand Down Expand Up @@ -740,7 +788,7 @@ def parallel_log_callback(message: str, level: str = "info"):
success, parallel_tasks = asyncio.run(
run_parallel_install(
commands=commands,
descriptions=[f"Step {i + 1}" for i in range(len(commands))],
descriptions=[f"Step {i +1}" for i in range(len(commands))],
timeout=300,
stop_on_error=True,
log_callback=parallel_log_callback,
Expand Down Expand Up @@ -812,7 +860,7 @@ def parallel_log_callback(message: str, level: str = "info"):

coordinator = InstallationCoordinator(
commands=commands,
descriptions=[f"Step {i + 1}" for i in range(len(commands))],
descriptions=[f"Step {i +1}" for i in range(len(commands))],
timeout=300,
stop_on_error=True,
progress_callback=progress_callback,
Expand Down Expand Up @@ -1240,7 +1288,8 @@ def _env_import(self, env_mgr: EnvironmentManager, args: argparse.Namespace) ->
else:
cx_print("No variables imported", "info")

# Return success (0) even with partial errors - some vars imported successfully
# Return success (0) even with partial errors - some vars imported
# successfully
return 0

except FileNotFoundError:
Expand Down Expand Up @@ -1421,7 +1470,7 @@ def _env_audit(self, args: argparse.Namespace) -> int:
# Sort by number of sources (most definitions first)
sorted_vars = sorted(audit.variables.items(), key=lambda x: len(x[1]), reverse=True)
for var_name, sources in sorted_vars[:20]: # Limit to top 20
console.print(f"\n [cyan]{var_name}[/cyan] ({len(sources)} definition(s))")
console.print(f"\n [cyan]{var_name}[/cyan] ({len(sources)} definition(s))")
for src in sources:
console.print(f" [dim]{src.file}:{src.line_number}[/dim]")
# Show truncated value
Expand Down Expand Up @@ -2086,7 +2135,8 @@ def main():

if temp_args.command in NETWORK_COMMANDS:
# Now detect network (only when needed)
network.detect(check_quality=True) # Include quality check for these commands
# Include quality check for these commands
network.detect(check_quality=True)
network.auto_configure()

except Exception as e:
Expand Down Expand Up @@ -2274,7 +2324,8 @@ def main():
env_parser = subparsers.add_parser("env", help="Manage environment variables")
env_subs = env_parser.add_subparsers(dest="env_action", help="Environment actions")

# env set <app> <KEY> <VALUE> [--encrypt] [--type TYPE] [--description DESC]
# env set <app> <KEY> <VALUE> [--encrypt] [--type TYPE] [--description
# DESC]
env_set_parser = env_subs.add_parser("set", help="Set an environment variable")
env_set_parser.add_argument("app", help="Application name")
env_set_parser.add_argument("key", help="Variable name")
Expand Down Expand Up @@ -2361,6 +2412,25 @@ def main():
"--encrypt-keys", help="Comma-separated list of keys to encrypt"
)

# --- Audit Permissions Command ---
audit_parser = subparsers.add_parser(
"audit-permissions", help="Audit and fix dangerous file permissions"
)
audit_parser.add_argument(
"path", nargs="?", default=".", help="Path to scan (default: current directory)"
)

fix_group = audit_parser.add_mutually_exclusive_group()
fix_group.add_argument("--fix", action="store_true", help="Apply safe fixes")
fix_group.add_argument(
"--dry-run", action="store_true", help="Show what would be fixed without changes"
)

audit_parser.add_argument(
"--docker", action="store_true", help="Consider Docker container UID mappings"
)
audit_parser.add_argument("--verbose", "-v", action="store_true", help="Show detailed output")

# --- Shell Environment Analyzer Commands ---
# env audit - show all shell variables with sources
env_audit_parser = env_subs.add_parser(
Expand Down Expand Up @@ -2531,6 +2601,8 @@ def main():
return 1
elif args.command == "env":
return cli.env(args)
elif args.command == "audit-permissions":
return cli.audit_permissions(args)
else:
parser.print_help()
return 1
Expand Down
71 changes: 71 additions & 0 deletions cortex/permissions/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
"""
Permission Auditor & Fixer module.
"""

from typing import Any


class PermissionAuditor:
"""Proxy class for PermissionAuditor with lazy loading."""

def __new__(cls, *args, **kwargs):
from .auditor_fixer import PermissionAuditor as RealPermissionAuditor

return RealPermissionAuditor(*args, **kwargs)


class DockerPermissionHandler:
"""Proxy class for DockerPermissionHandler with lazy loading."""

def __new__(cls, *args, **kwargs):
from .docker_handler import DockerPermissionHandler as RealDockerPermissionHandler

return RealDockerPermissionHandler(*args, **kwargs)


PermissionManager = PermissionAuditor
PermissionFixer = PermissionAuditor


def scan_path(path: str) -> Any:
"""
Simplified interface to scan a path for permission issues.

Args:
path: Directory path to scan

Returns:
Scan results from PermissionAuditor.scan_directory()
"""
auditor = PermissionAuditor()
return auditor.scan_directory(path)


def analyze_permissions(path: str) -> dict[str, Any]:
"""
Analyze permissions and return detailed report.

Args:
path: Directory path to analyze

Returns:
Dictionary with scan results and analysis
"""
auditor = PermissionAuditor()
scan = auditor.scan_directory(path)
return {
"path": path,
"auditor": auditor,
"scan": scan,
"issues_count": len(scan.get("world_writable", [])) + len(scan.get("dangerous", [])),
}


__all__ = [
"PermissionAuditor",
"DockerPermissionHandler",
"PermissionManager",
"PermissionFixer",
"scan_path",
"analyze_permissions",
]
Loading
Loading