diff --git a/cortex/cli.py b/cortex/cli.py index 9261a816..92b72e59 100644 --- a/cortex/cli.py +++ b/cortex/cli.py @@ -381,6 +381,54 @@ def _handle_stack_real_install(self, stack: dict[str, Any], packages: list[str]) console.print(f"Installed {len(packages)} packages") return 0 + # Run system health checks + def doctor(self) -> int: + """Run system health checks and diagnostics. + + Returns: + Exit code: 0 if healthy, 1 if warnings, 2 if failures + """ + from cortex.doctor import SystemDoctor + + doctor = SystemDoctor() + return doctor.run_checks() + + def script(self, args: argparse.Namespace) -> int: + """Handle cortex script commands""" + try: + from cortex.script_gen import ScriptGenerator + + generator = ScriptGenerator() + + if args.script_action == "generate": + generator.generate( + filename=args.filename, + stack=args.stack, + format=args.format, + dry_run=args.dry_run, + ) + return 0 + + elif args.script_action == "test": + generator.test(filename=args.filename, sandbox=args.sandbox) + return 0 + + elif args.script_action == "history": + limit = args.limit + generator.show_history(limit=limit) + return 0 + else: + console.print(f"[red]✗ Unknown script action: {args.script_action}[/red]") + return 1 + + except FileNotFoundError: + filename = getattr(args, "filename", "unknown") + console.print(f"[red]✗ Script file not found: {filename}[/red]") + return 1 + except Exception as e: + console.print(f"[red]✗ Script command failed: {str(e)}[/red]") + return 1 + # --- Sandbox Commands (Docker-based package testing) --- def sandbox(self, args: argparse.Namespace) -> int: """Handle `cortex sandbox` commands for Docker-based package testing.""" @@ -2041,6 +2089,11 @@ def show_rich_help(): table.add_row("docker permissions", "Fix Docker bind-mount permissions") table.add_row("sandbox ", "Test packages in Docker sandbox") table.add_row("doctor", "System health check") + table.add_row( + "script generate --stack ", "Generate installation scripts" + ) + table.add_row("script test ", "Validate script syntax") + table.add_row("script history", "View generation history") console.print(table) console.print() @@ -2130,6 +2183,49 @@ def main(): # Status command (includes comprehensive health checks) subparsers.add_parser("status", help="Show comprehensive system status and health checks") + # script generator command + script_parser = subparsers.add_parser( + "script", help="Generate installation scripts", aliases=["scripts"] + ) + script_subs = script_parser.add_subparsers(dest="script_action", required=True) + + # cortex script generate + generate_parser = script_subs.add_parser("generate", help="Generate installation script") + generate_parser.add_argument("filename", help="Output script filename (e.g., docker-setup.sh)") + generate_parser.add_argument( + "--stack", + "-s", + default="docker", + choices=["docker", "python", "nodejs", "ollama"], + help="Stack to install (default: docker)", + ) + generate_parser.add_argument( + "--format", + "-f", + default="bash", + choices=["bash", "ansible"], + help="Output format (default: bash)", + ) + generate_parser.add_argument( + "--dry-run", action="store_true", help="Preview script without writing to file" + ) + + # cortex script test + test_parser = script_subs.add_parser("test", help="Test generated script syntax") + test_parser.add_argument("filename", help="Script file to validate") + test_parser.add_argument( + "--no-sandbox", + dest="sandbox", + action="store_false", + help="Disable sandbox mode (sandbox is enabled by default)", + ) + test_parser.set_defaults(sandbox=True) + + # cortex script history + history_parser = script_subs.add_parser("history", help="Show script generation history") + history_parser.add_argument( + "--limit", "-l", type=int, default=10, help="Number of entries to show (default: 10)" + ) # Ask command ask_parser = subparsers.add_parser("ask", help="Ask a question about your system") ask_parser.add_argument("question", type=str, help="Natural language question") @@ -2522,6 +2618,10 @@ def main(): return cli.notify(args) elif args.command == "stack": return cli.stack(args) + elif args.command == "doctor": + return cli.doctor() + elif args.command == "script": + return cli.script(args) elif args.command == "sandbox": return cli.sandbox(args) elif args.command == "cache": diff --git a/cortex/script_gen.py b/cortex/script_gen.py new file mode 100644 index 00000000..53b43052 --- /dev/null +++ b/cortex/script_gen.py @@ -0,0 +1,324 @@ +""" +Generates standalone installation scripts for offline or automated deployments. +""" + +import subprocess +import sys +from datetime import datetime +from pathlib import Path +from typing import TypedDict + +from rich.console import Console + +console = Console() + + +class StackConfig(TypedDict): + packages: list[dict | str] + verification: str | None + + +STACK_DEPS: dict[str, StackConfig] = { + "docker": { + "packages": [ + {"name": "docker.io", "check_command": "docker"}, + {"name": "docker-compose", "check_command": "docker-compose"}, + ], + "verification": "docker --version", + }, + "python": { + "packages": [ + {"name": "python3", "check_command": "python3"}, + {"name": "python3-venv", "check_command": "python3 -m venv --help"}, + ], + "verification": "python3 --version", + }, + "nodejs": { + "packages": [ + {"name": "nodejs", "check_command": "node"}, + {"name": "npm", "check_command": "npm"}, + ], + "verification": "node --version", + }, + "ollama": { + "packages": [ + {"name": "ollama", "check_command": "ollama"}, + ], + "verification": "ollama --version", + }, +} + + +BASH_TEMPLATE = """#!/bin/bash +# Generated by Cortex Linux - {date} +# Stack: {stack} + +set -euo pipefail + +GREEN='\\033[0;32m' +YELLOW='\\033[1;33m' +RED='\\033[0;31m' +NC='\\033[0m' + +log_info() {{ echo -e "${{GREEN}}✓${{NC}} $1"; }} +log_warn() {{ echo -e "${{YELLOW}}⚠${{NC}} $1"; }} +log_error() {{ echo -e "${{RED}}✗${{NC}} $1"; }} + +command_exists() {{ + command -v "$1" >/dev/null 2>&1 +}} + +trap 'log_error "Failed at line $LINENO"' ERR + +log_info "Installing {stack}..." + +if ! sudo apt-get update >/dev/null 2>&1; then + log_error "Failed to update package manager" + exit 1 +fi + +{install_commands} + +log_info "Verifying {stack}..." +if ! {verify_commands}; then + log_error "Verification failed for {stack}" + exit 1 +fi +log_info "Complete!" +""" + + +class ScriptGenerator: + """Build installation scripts for supported stacks.""" + + def __init__(self) -> None: + """ + Initialize the ScriptGenerator. + + Sets up the output directory at ~/cortex/install-scripts and prepares + the history tracking file. + """ + self.cortex_dir = Path.home() / "cortex" / "install-scripts" + self.cortex_dir.mkdir(parents=True, exist_ok=True) + self.history_file = self.cortex_dir / "script_history.yaml" + + def generate( + self, filename: str, stack: str = "docker", format: str = "bash", dry_run: bool = False + ) -> None: + """ + Generate an installation script for the specified stack. + Args: + filename: Output script filename (absolute or relative to cortex_dir) + stack: Stack to install (docker, python, nodejs, ollama) + format: Output format (bash or ansible) + dry_run: If True, print content without writing file + Raises: + SystemExit: If stack or format is invalid, or file write fails + """ + if stack not in STACK_DEPS: + console.print(f"[red]✗ Unknown stack: {stack}[/red]") + console.print(f"Available stacks: {', '.join(STACK_DEPS.keys())}") + sys.exit(1) + + deps = STACK_DEPS[stack] + + if format == "bash": + install_cmds: list[str] = [] + for pkg in deps.get("packages", []): + if isinstance(pkg, dict): + pkg_name = pkg["name"] + check_command = pkg.get("check_command", pkg_name) + else: + pkg_name = pkg + check_command = pkg + + install_cmds.append( + f"""if ! command_exists {check_command}; then + log_info "Installing {pkg_name}..." + sudo apt-get install -y {pkg_name} >/dev/null 2>&1 +else + log_warn "{pkg_name} already installed" +fi""" + ) + + verify_commands = deps.get("verification", "true") + content = BASH_TEMPLATE.format( + date=datetime.now().isoformat(), + stack=stack, + install_commands="\n".join(install_cmds), + verify_commands=verify_commands, + ) + + elif format == "ansible": + pkg_lines: list[str] = [] + for pkg in deps.get("packages", []): + if isinstance(pkg, dict): + pkg_name = pkg.get("name") + else: + pkg_name = pkg + pkg_lines.append(f" - {pkg_name}") + pkg_list = "\n".join(pkg_lines) + + content = f"""--- +# Generated by Cortex Linux - {datetime.now().isoformat()} +# Stack: {stack} +- name: Install {stack} + hosts: localhost + gather_facts: yes + become: yes + tasks: + - name: Update apt cache + apt: + update_cache: yes + when: ansible_os_family == "Debian" + - name: Install packages + apt: + pkg: +{pkg_list} + state: present + when: ansible_os_family == "Debian" + - name: Verify installation + command: {deps.get("verification", "echo OK")} + register: version_check + changed_when: false +""" + else: + console.print(f"[red]✗ Unknown format: {format}[/red]") + sys.exit(1) + + if filename.startswith("/"): + path = Path(filename) + else: + path = self.cortex_dir / filename + + if dry_run: + console.print("[yellow]📋 DRY RUN - Script content:[/yellow]") + console.print("[dim]" + content + "[/dim]") + console.print(f"[yellow]Would write to: {path.absolute()}[/yellow]") + else: + try: + path.parent.mkdir(parents=True, exist_ok=True) + path.write_text(content) + path.chmod(0o755) + + if not path.exists(): + console.print(f"[red]✗ Failed to write script to {path.absolute()}[/red]") + sys.exit(1) + + console.print("[green]✓ Generated installation script[/green]") + console.print(f"[dim]Location: {path.absolute()}[/dim]") + console.print(f"[dim]Run: bash {path.absolute()}[/dim]") + + self._save_to_history(stack, format, str(path.absolute())) + except Exception as e: + console.print(f"[red]✗ Error writing script: {str(e)}[/red]") + sys.exit(1) + + def test(self, filename: str, sandbox: bool = True) -> None: + """ + Validate the syntax of a generated script. + Args: + filename: Script file to test (absolute or relative to cortex_dir) + sandbox: If True, run bash -n syntax check; if False, skip validation + Raises: + SystemExit: If script not found or syntax check fails + """ + if filename.startswith("/"): + path = Path(filename) + else: + path = self.cortex_dir / filename + + if not path.exists(): + console.print(f"[red]✗ Script not found: {filename}[/red]") + console.print(f"[dim]Expected: {path.absolute()}[/dim]") + sys.exit(1) + + console.print(f"[cyan]🧪 Testing script: {path.absolute()}[/cyan]") + + try: + if sandbox: + result = subprocess.run( + ["bash", "-n", str(path)], capture_output=True, text=True, timeout=10 + ) + if result.returncode != 0: + console.print("[red]✗ Syntax error in script:[/red]") + console.print(f"[dim]{result.stderr}[/dim]") + sys.exit(1) + + console.print("[green]✓ Syntax check passed[/green]") + console.print("[green]✓ Script is valid bash[/green]") + console.print("[green]✓ Ready for execution[/green]") + else: + console.print("[green]✓ Syntax validation passed[/green]") + + except subprocess.TimeoutExpired: + console.print("[red]✗ Script test timed out[/red]") + sys.exit(1) + except Exception as e: + console.print(f"[red]✗ Test error: {str(e)}[/red]") + sys.exit(1) + + def _save_to_history(self, stack: str, format: str, filename: str) -> None: + """Save generation entry to history.""" + try: + import yaml + except ImportError: + return + + entry = { + "timestamp": datetime.now().isoformat(), + "stack": stack, + "format": format, + "filename": filename, + } + + history: list[dict] = [] + if self.history_file.exists(): + try: + content = self.history_file.read_text() + if content.strip(): + history = yaml.safe_load(content) or [] + except Exception: + history = [] + + history.append(entry) + self.history_file.write_text(yaml.safe_dump(history, sort_keys=False)) + + def show_history(self, limit: int = 10) -> None: + """ + Display recent script generation history. + + Args: + limit: Maximum number of history entries to show (default: 10) + """ + try: + import yaml + except ImportError: + console.print("[yellow]⚠ YAML not available, cannot show history[/yellow]") + return + + if not self.history_file.exists(): + console.print("[yellow]⚠ No script history found[/yellow]") + return + + try: + data = yaml.safe_load(self.history_file.read_text()) or [] + except Exception: + console.print("[yellow]⚠ Error reading history[/yellow]") + return + + if not data: + console.print("[yellow]⚠ Script history is empty[/yellow]") + return + + console.print("[bold]📜 Script Generation History (latest first)[/bold]") + console.print() + + for i, entry in enumerate(reversed(data[-limit:]), 1): + timestamp = entry.get("timestamp", "?") + stack = entry.get("stack", "?") + fmt = entry.get("format", "?") + filename = entry.get("filename", "?") + + console.print(f"{i}. [cyan]{stack}[/cyan] ({fmt}) → {filename}") + console.print(f" [dim]{timestamp}[/dim]") diff --git a/tests/test_script_gen.py b/tests/test_script_gen.py new file mode 100644 index 00000000..265c4f49 --- /dev/null +++ b/tests/test_script_gen.py @@ -0,0 +1,226 @@ +""" +Complete test suite for cortex script generator +""" + +from pathlib import Path + +import pytest + +from cortex.script_gen import STACK_DEPS, ScriptGenerator + + +class TestScriptGenerator: + """Test suite for ScriptGenerator class""" + + @pytest.fixture + def generator(self): + """Create ScriptGenerator instance for each test""" + return ScriptGenerator() + + @pytest.fixture + def temp_script(self, tmp_path): + """Create temporary script file""" + script_file = tmp_path / "test_script.sh" + return script_file + + # ===== GENERATION TESTS ===== + + def test_generate_docker_bash(self, generator, temp_script): + generator.generate(str(temp_script), stack="docker", format="bash") + assert temp_script.exists() + content = temp_script.read_text() + assert "#!/bin/bash" in content + assert "docker.io" in content + assert "docker-compose" in content + assert "set -euo pipefail" in content + + def test_generate_python_bash(self, generator, temp_script): + generator.generate(str(temp_script), stack="python", format="bash") + assert temp_script.exists() + content = temp_script.read_text() + assert "python3" in content + assert "python3 --version" in content + + def test_generate_nodejs_bash(self, generator, temp_script): + generator.generate(str(temp_script), stack="nodejs", format="bash") + assert temp_script.exists() + content = temp_script.read_text() + assert "nodejs" in content + assert "npm" in content + + def test_generate_ollama_bash(self, generator, temp_script): + generator.generate(str(temp_script), stack="ollama", format="bash") + assert temp_script.exists() + content = temp_script.read_text() + assert "ollama" in content + + def test_generate_docker_ansible(self, generator, temp_script): + generator.generate(str(temp_script), stack="docker", format="ansible") + assert temp_script.exists() + content = temp_script.read_text() + assert "---" in content + assert "name: Install docker" in content + assert "docker.io" in content + assert "docker-compose" in content + + def test_generate_python_ansible(self, generator, temp_script): + generator.generate(str(temp_script), stack="python", format="ansible") + assert temp_script.exists() + content = temp_script.read_text() + assert "---" in content + assert "python3" in content + + # ===== FILE OPERATIONS ===== + + def test_file_is_executable(self, generator, temp_script): + generator.generate(str(temp_script), stack="docker", format="bash") + assert temp_script.stat().st_mode & 0o111 # Check execute bit + + def test_file_contains_timestamp(self, generator, temp_script): + generator.generate(str(temp_script), stack="docker", format="bash") + content = temp_script.read_text() + assert "Generated by Cortex Linux" in content + assert "T" in content # ISO format timestamp + + def test_dry_run_no_write(self, generator, temp_script): + generator.generate(str(temp_script), stack="docker", dry_run=True) + assert not temp_script.exists() + + # ===== ERROR HANDLING ===== + + def test_invalid_stack(self, generator, temp_script): + with pytest.raises(SystemExit): + generator.generate(str(temp_script), stack="invalid_stack") + + def test_invalid_format(self, generator, temp_script): + with pytest.raises(SystemExit): + generator.generate(str(temp_script), stack="docker", format="invalid_format") + + # ===== IDEMPOTENCY TESTS ===== + def test_idempotent_check_docker(self, generator, temp_script): + generator.generate(str(temp_script), stack="docker") + content = temp_script.read_text() + + # Check uses 'docker' (check_command), installs 'docker.io' (package) + assert "command_exists docker" in content + assert "Installing docker.io" in content + assert "Installing docker-compose" in content + assert temp_script.stat().st_mode & 0o111 # executable + + def test_idempotent_check_python(self, generator, temp_script): + generator.generate(str(temp_script), stack="python") + content = temp_script.read_text() + + assert "command_exists python3" in content + assert "Installing python3" in content + assert temp_script.stat().st_mode & 0o111 + + def test_idempotent_check_nodejs(self, generator, temp_script): + generator.generate(str(temp_script), stack="nodejs") + content = temp_script.read_text() + + # Check uses 'node' (check_command), installs 'nodejs' (package) + assert "command_exists node" in content + assert "Installing nodejs" in content + assert "Installing npm" in content + assert temp_script.stat().st_mode & 0o111 + + def test_idempotent_check_ollama(self, generator, temp_script): + generator.generate(str(temp_script), stack="ollama") + content = temp_script.read_text() + + assert "command_exists ollama" in content + assert "Installing ollama" in content + assert temp_script.stat().st_mode & 0o111 + + # ===== BASH BEST PRACTICES ===== + def test_strict_mode_bash(self, generator, temp_script): + generator.generate(str(temp_script), stack="docker") + content = temp_script.read_text() + assert "set -euo pipefail" in content + + def test_error_trap_bash(self, generator, temp_script): + generator.generate(str(temp_script), stack="docker") + content = temp_script.read_text() + assert "trap" in content + assert "log_error" in content + + def test_logging_functions_bash(self, generator, temp_script): + generator.generate(str(temp_script), stack="docker") + content = temp_script.read_text() + assert "log_info()" in content + assert "log_warn()" in content + assert "log_error()" in content + + def test_color_codes_bash(self, generator, temp_script): + generator.generate(str(temp_script), stack="docker") + content = temp_script.read_text() + assert "GREEN=" in content + assert "YELLOW=" in content + assert "RED=" in content + + # ===== TESTING MODE ===== + def test_syntax_check_valid(self, generator, temp_script): + generator.generate(str(temp_script), stack="docker") + # Should not raise exception + generator.test(str(temp_script), sandbox=True) + + def test_syntax_check_missing_file(self, generator, temp_script): + with pytest.raises(SystemExit): + generator.test("non_existent_file.sh") + + # ===== TEMPLATE VERIFICATION ===== + def test_verification_command_docker(self, generator, temp_script): + generator.generate(str(temp_script), stack="docker") + content = temp_script.read_text() + assert "docker --version" in content + + def test_verification_command_python(self, generator, temp_script): + generator.generate(str(temp_script), stack="python") + content = temp_script.read_text() + assert "python3 --version" in content + + def test_verification_command_nodejs(self, generator, temp_script): + generator.generate(str(temp_script), stack="nodejs") + content = temp_script.read_text() + assert "node --version" in content + + def test_verification_command_ollama(self, generator, temp_script): + generator.generate(str(temp_script), stack="ollama") + content = temp_script.read_text() + assert "ollama --version" in content + + # ===== ANSIBLE SPECIFIC ===== + def test_ansible_yaml_header(self, generator, temp_script): + generator.generate(str(temp_script), stack="docker", format="ansible") + content = temp_script.read_text() + assert content.startswith("---") + + def test_ansible_hosts_localhost(self, generator, temp_script): + generator.generate(str(temp_script), stack="docker", format="ansible") + content = temp_script.read_text() + assert "hosts: localhost" in content + + def test_ansible_become(self, generator, temp_script): + generator.generate(str(temp_script), stack="docker", format="ansible") + content = temp_script.read_text() + assert "become: yes" in content + + # ===== MULTI-STACK COVERAGE ===== + @pytest.mark.parametrize("stack", ["docker", "python", "nodejs", "ollama"]) + def test_all_stacks_bash(self, generator, temp_script, stack): + generator.generate(str(temp_script), stack=stack, format="bash") + assert temp_script.exists() + content = temp_script.read_text() + assert f"Stack: {stack}" in content + + @pytest.mark.parametrize("stack", ["docker", "python", "nodejs", "ollama"]) + def test_all_stacks_ansible(self, generator, temp_script, stack): + generator.generate(str(temp_script), stack=stack, format="ansible") + assert temp_script.exists() + content = temp_script.read_text() + assert f"Install {stack}" in content + + +if __name__ == "__main__": + pytest.main([__file__, "-v", "--tb=short"])