diff --git a/cortex/cli.py b/cortex/cli.py index ea8976d1..a62b26f1 100644 --- a/cortex/cli.py +++ b/cortex/cli.py @@ -2001,6 +2001,66 @@ def progress_callback(current: int, total: int, step: InstallationStep) -> None: console.print(f"Error: {result.error_message}", style="red") return 1 + def systemd(self, args: argparse.Namespace) -> int: + """Handle systemd service helper commands. + + Args: + args: Parsed command-line arguments. + + Returns: + int: Exit code (0 for success, 1 for error). + """ + from cortex.systemd_helper import ( + run_deps_command, + run_diagnose_command, + run_generate_command, + run_status_command, + ) + + action = getattr(args, "systemd_action", None) + + if action == "status": + try: + run_status_command(args.service) + return 0 + except RuntimeError as e: + console.print(f"[red]Error: {e}[/red]") + return 1 + + elif action == "diagnose": + try: + run_diagnose_command(args.service, lines=args.lines) + return 0 + except RuntimeError as e: + console.print(f"[red]Error: {e}[/red]") + return 1 + + elif action == "deps": + try: + run_deps_command(args.service) + return 0 + except RuntimeError as e: + console.print(f"[red]Error: {e}[/red]") + return 1 + + elif action == "generate": + try: + run_generate_command() + return 0 + except RuntimeError as e: + console.print(f"[red]Error: {e}[/red]") + return 1 + + else: + console.print("[bold]Systemd Service Helper[/bold]") + console.print() + console.print("Commands:") + console.print(" status - Explain service status in plain English") + console.print(" diagnose - Diagnose why a service failed") + console.print(" deps - Show service dependencies") + console.print(" generate - Interactive unit file generator") + return 0 + # -------------------------- @@ -2040,6 +2100,7 @@ def show_rich_help(): table.add_row("stack ", "Install the stack") table.add_row("docker permissions", "Fix Docker bind-mount permissions") table.add_row("sandbox ", "Test packages in Docker sandbox") + table.add_row("systemd ", "Systemd service helper") table.add_row("doctor", "System health check") console.print(table) @@ -2270,6 +2331,37 @@ def main(): sandbox_exec_parser.add_argument("cmd", nargs="+", help="Command to execute") # -------------------------- + # --- Systemd Service Helper Commands --- + systemd_parser = subparsers.add_parser( + "systemd", help="Systemd service helper with plain-English explanations" + ) + systemd_subs = systemd_parser.add_subparsers(dest="systemd_action", help="Systemd actions") + + # systemd status + systemd_status_parser = systemd_subs.add_parser( + "status", help="Explain a service's status in plain English" + ) + systemd_status_parser.add_argument("service", help="Service name to check") + + # systemd diagnose [--lines N] + systemd_diagnose_parser = systemd_subs.add_parser( + "diagnose", help="Diagnose why a service failed" + ) + systemd_diagnose_parser.add_argument("service", help="Service name to diagnose") + systemd_diagnose_parser.add_argument( + "--lines", "-n", type=int, default=50, help="Number of log lines to analyze" + ) + + # systemd deps + systemd_deps_parser = systemd_subs.add_parser( + "deps", help="Show service dependencies as a visual tree" + ) + systemd_deps_parser.add_argument("service", help="Service name") + + # systemd generate + systemd_subs.add_parser("generate", help="Interactive wizard to generate a systemd unit file") + # -------------------------- + # --- Environment Variable Management Commands --- env_parser = subparsers.add_parser("env", help="Manage environment variables") env_subs = env_parser.add_subparsers(dest="env_action", help="Environment actions") @@ -2531,6 +2623,8 @@ def main(): return 1 elif args.command == "env": return cli.env(args) + elif args.command == "systemd": + return cli.systemd(args) else: parser.print_help() return 1 diff --git a/cortex/systemd_helper.py b/cortex/systemd_helper.py new file mode 100644 index 00000000..fcee2db3 --- /dev/null +++ b/cortex/systemd_helper.py @@ -0,0 +1,685 @@ +""" +Systemd Service Helper for Cortex Linux. +Provides plain-English explanations, unit file generation, +failure diagnostics, and dependency visualization. +""" + +import re +import subprocess +from dataclasses import dataclass +from enum import Enum + +from rich.panel import Panel +from rich.prompt import Confirm, Prompt +from rich.tree import Tree + +from cortex.branding import console + +# Timeout constants (in seconds) +SYSTEMCTL_TIMEOUT = 10 +JOURNALCTL_TIMEOUT = 30 +VERSION_CHECK_TIMEOUT = 5 + +# Valid service name pattern +SERVICE_NAME_PATTERN = re.compile(r"^[a-zA-Z0-9_@.-]+$") + + +class ServiceState(Enum): + """Possible states of a systemd service.""" + + RUNNING = "running" + STOPPED = "stopped" + FAILED = "failed" + INACTIVE = "inactive" + ACTIVATING = "activating" + DEACTIVATING = "deactivating" + UNKNOWN = "unknown" + + +@dataclass +class ServiceStatus: + """Parsed status information for a systemd service. + + Attributes: + name: The service name (without .service suffix). + state: Current state of the service. + description: Human-readable description of the service. + load_state: Whether the unit file is loaded. + active_state: Active state (active, inactive, failed). + sub_state: Sub-state providing more detail. + pid: Process ID if running. + memory: Memory usage if available. + cpu: CPU time if available. + started_at: When the service started. + exit_code: Exit code if service failed. + main_pid_code: The exit status of the main process. + """ + + name: str + state: ServiceState + description: str = "" + load_state: str = "" + active_state: str = "" + sub_state: str = "" + pid: int | None = None + memory: str = "" + cpu: str = "" + started_at: str = "" + exit_code: int | None = None + main_pid_code: str = "" + + +def _validate_service_name(service_name: str) -> str: + """Validate and normalize a service name. + + Args: + service_name: The service name to validate. + + Returns: + Normalized service name with .service suffix. + + Raises: + ValueError: If the service name is invalid. + """ + if not service_name: + raise ValueError("Service name cannot be empty") + + # Remove .service suffix for validation + base_name = service_name.replace(".service", "") + + if not SERVICE_NAME_PATTERN.match(base_name): + raise ValueError( + f"Invalid service name: {service_name}. " + "Service names can only contain letters, numbers, underscores, @, dots, and hyphens." + ) + + # Normalize with .service suffix + if not service_name.endswith(".service"): + return f"{service_name}.service" + return service_name + + +class SystemdHelper: + """ + Helper for managing and understanding systemd services. + + Provides plain-English explanations of service status, + generates unit files from simple descriptions, diagnoses + failures, and visualizes service dependencies. + """ + + def __init__(self) -> None: + """Initialize the SystemdHelper.""" + self._check_systemd_available() + + def _check_systemd_available(self) -> None: + """Check if systemd is available on the system. + + Raises: + RuntimeError: If systemd is not available. + """ + try: + result = subprocess.run( + ["systemctl", "--version"], + capture_output=True, + text=True, + timeout=VERSION_CHECK_TIMEOUT, + ) + if result.returncode != 0: + raise RuntimeError("systemd is not available on this system") + except FileNotFoundError: + raise RuntimeError("systemctl command not found - is systemd installed?") + except subprocess.TimeoutExpired: + raise RuntimeError("Timeout checking systemd availability") + + def get_service_status(self, service_name: str) -> ServiceStatus: + """Get the status of a systemd service. + + Args: + service_name: Name of the service (with or without .service suffix). + + Returns: + ServiceStatus object with parsed information. + + Raises: + ValueError: If the service name is invalid. + RuntimeError: If the status cannot be retrieved. + """ + service_name = _validate_service_name(service_name) + + try: + result = subprocess.run( + ["systemctl", "show", service_name, "--no-pager"], + capture_output=True, + text=True, + timeout=SYSTEMCTL_TIMEOUT, + ) + if result.returncode != 0: + raise RuntimeError( + f"Failed to get service status for {service_name}: {result.stderr.strip()}" + ) + except FileNotFoundError: + raise RuntimeError("systemctl command not found - is systemd installed?") + except subprocess.TimeoutExpired: + raise RuntimeError(f"Timeout getting service status for {service_name}") + + properties = {} + for line in result.stdout.strip().split("\n"): + if "=" in line: + key, _, value = line.partition("=") + properties[key] = value + + # Determine state + active_state = properties.get("ActiveState", "unknown") + sub_state = properties.get("SubState", "") + + if active_state == "active" and sub_state == "running": + state = ServiceState.RUNNING + elif active_state == "failed": + state = ServiceState.FAILED + elif active_state == "inactive": + state = ServiceState.INACTIVE + elif active_state == "activating": + state = ServiceState.ACTIVATING + elif active_state == "deactivating": + state = ServiceState.DEACTIVATING + else: + state = ServiceState.UNKNOWN + + # Parse PID + pid = None + main_pid = properties.get("MainPID", "0") + if main_pid.isdigit() and int(main_pid) > 0: + pid = int(main_pid) + + # Parse exit code + exit_code = None + exec_main_status = properties.get("ExecMainStatus", "0") + if exec_main_status.isdigit(): + exit_code = int(exec_main_status) + + return ServiceStatus( + name=service_name.replace(".service", ""), + state=state, + description=properties.get("Description", ""), + load_state=properties.get("LoadState", ""), + active_state=active_state, + sub_state=sub_state, + pid=pid, + memory=properties.get("MemoryCurrent", ""), + cpu=properties.get("CPUUsageNSec", ""), + started_at=properties.get("ActiveEnterTimestamp", ""), + exit_code=exit_code, + main_pid_code=properties.get("ExecMainCode", ""), + ) + + def explain_status(self, service_name: str) -> str: + """Explain a service's status in plain English. + + Args: + service_name: Name of the service to explain. + + Returns: + Human-readable explanation of the service status. + """ + status = self.get_service_status(service_name) + + explanations = [] + + # Main status + if status.state == ServiceState.RUNNING: + explanations.append(f"[green]'{status.name}' is running normally.[/green]") + if status.pid: + explanations.append(f" Process ID: {status.pid}") + if status.started_at: + explanations.append(f" Started: {status.started_at}") + elif status.state == ServiceState.FAILED: + explanations.append(f"[red]'{status.name}' has failed![/red]") + if status.exit_code and status.exit_code != 0: + explanations.append(f" Exit code: {status.exit_code}") + explanations.append(self._explain_exit_code(status.exit_code)) + elif status.state == ServiceState.INACTIVE: + explanations.append(f"[yellow]'{status.name}' is not running (inactive).[/yellow]") + explanations.append(" The service is stopped but can be started.") + elif status.state == ServiceState.ACTIVATING: + explanations.append(f"[cyan]'{status.name}' is starting up...[/cyan]") + elif status.state == ServiceState.DEACTIVATING: + explanations.append(f"[cyan]'{status.name}' is shutting down...[/cyan]") + else: + explanations.append(f"[yellow]'{status.name}' is in an unknown state.[/yellow]") + + if status.description: + explanations.append(f" Description: {status.description}") + + return "\n".join(explanations) + + def _explain_exit_code(self, exit_code: int) -> str: + """Provide explanation for common exit codes. + + Args: + exit_code: The exit code to explain. + + Returns: + Human-readable explanation of the exit code. + """ + # Common exit codes + common_codes = { + 1: " Likely cause: General error or misconfiguration", + 2: " Likely cause: Misuse of command, invalid arguments, or bad configuration", + 126: " Likely cause: Command found but not executable (permission issue)", + 127: " Likely cause: Command not found (check if binary exists)", + 128: " Likely cause: Invalid exit argument", + 130: " Likely cause: Script terminated by Ctrl+C (SIGINT)", + 137: " Likely cause: Process killed (SIGKILL) - possibly out of memory", + 139: " Likely cause: Segmentation fault (SIGSEGV) - program crash", + 141: " Likely cause: Broken pipe (SIGPIPE) - write to closed connection", + 142: " Likely cause: Alarm timeout (SIGALRM)", + 143: " Likely cause: Process terminated (SIGTERM) - graceful shutdown requested", + 255: " Likely cause: Exit status out of range or SSH error", + } + + if exit_code in common_codes: + return common_codes[exit_code] + + # Exit codes 128+N mean killed by signal N + if exit_code > 128: + signal_num = exit_code - 128 + return f" Likely cause: Process killed by signal {signal_num}" + + return f" Exit code {exit_code} - check service logs for details" + + def diagnose_failure(self, service_name: str, lines: int = 50) -> str: + """Diagnose why a service failed using journal logs. + + Args: + service_name: Name of the service to diagnose. + lines: Number of log lines to analyze (1-1000). + + Returns: + Diagnostic report with actionable advice. + """ + # Validate lines parameter + lines = max(1, min(1000, lines)) + + status = self.get_service_status(service_name) + + report = [] + report.append(f"[bold]Diagnostic Report for '{service_name}'[/bold]\n") + + # Get status explanation + report.append(self.explain_status(service_name)) + report.append("") + + # Get recent logs with proper error handling + try: + result = subprocess.run( + [ + "journalctl", + "-u", + service_name, + "-n", + str(lines), + "--no-pager", + "--since", + "1 hour ago", + ], + capture_output=True, + text=True, + timeout=JOURNALCTL_TIMEOUT, + ) + if result.returncode != 0: + report.append(f"[red]Failed to read logs: {result.stderr.strip()}[/red]") + return "\n".join(report) + logs = result.stdout + except FileNotFoundError: + report.append("[red]journalctl command not found - cannot read logs[/red]") + return "\n".join(report) + except subprocess.TimeoutExpired: + report.append("[yellow]Timeout reading logs - journal may be very large[/yellow]") + logs = "" + + # Analyze logs for common issues + report.append("[bold]Log Analysis:[/bold]") + + issues_found = [] + + # Check for common error patterns + error_patterns = [ + ( + r"permission denied", + "Permission issue detected", + "Try: Check file permissions and user/group settings in the unit file", + ), + ( + r"address already in use", + "Port conflict detected", + "Try: Check what's using the port with 'ss -tlnp | grep '", + ), + ( + r"no such file or directory", + "Missing file or directory", + "Try: Verify all paths in the unit file exist", + ), + ( + r"connection refused", + "Network connection issue", + "Try: Check if the target service is running and accessible", + ), + ( + r"out of memory|cannot allocate|oom", + "Memory issue", + "Try: Increase available memory or add memory limits to prevent OOM", + ), + ( + r"timeout|timed out", + "Timeout occurred", + "Try: Increase TimeoutStartSec/TimeoutStopSec in the unit file", + ), + ( + r"dependency|requires|wants", + "Dependency issue", + "Try: Check if required services are running with 'systemctl status '", + ), + ] + + for pattern, issue, advice in error_patterns: + if re.search(pattern, logs, re.IGNORECASE): + issues_found.append((issue, advice)) + + if issues_found: + for issue, advice in issues_found: + report.append(f" [red]! {issue}[/red]") + report.append(f" {advice}") + else: + report.append(" No common error patterns detected in logs.") + report.append(" Review the full logs below for details.") + + report.append("") + report.append("[bold]Recent Logs:[/bold]") + # Show last 20 lines of logs + log_lines = logs.strip().split("\n")[-20:] if logs else [] + if not log_lines or (len(log_lines) == 1 and not log_lines[0]): + report.append(" [dim]No logs found for this service[/dim]") + else: + for line in log_lines: + if "error" in line.lower() or "fail" in line.lower(): + report.append(f" [red]{line}[/red]") + elif "warn" in line.lower(): + report.append(f" [yellow]{line}[/yellow]") + else: + report.append(f" {line}") + + return "\n".join(report) + + def show_dependencies(self, service_name: str) -> Tree: + """Show service dependencies as a visual tree. + + Args: + service_name: Name of the service. + + Returns: + Rich Tree object showing dependencies. + """ + service_name = _validate_service_name(service_name) + + tree = Tree(f"[bold cyan]{service_name}[/bold cyan]") + + try: + result = subprocess.run( + ["systemctl", "list-dependencies", service_name, "--no-pager"], + capture_output=True, + text=True, + timeout=SYSTEMCTL_TIMEOUT, + ) + except FileNotFoundError: + tree.add("[red]systemctl command not found[/red]") + return tree + except subprocess.TimeoutExpired: + tree.add("[yellow]Timed out fetching dependencies[/yellow]") + return tree + + if result.returncode != 0: + tree.add(f"[red]Error: {result.stderr.strip()}[/red]") + return tree + + lines = result.stdout.strip().split("\n")[1:] # Skip header + + # Parse the tree structure with error recovery + current_nodes = {0: tree} + + for line in lines: + if not line.strip(): + continue + + try: + # Count indentation (handle various tree characters) + indent = 0 + for char in line: + if char in " │├└─●○": + indent += 1 + else: + break + + # Get the service name - remove tree drawing characters + clean_line = re.sub(r"[│├└─●○\s]+", "", line).strip() + if not clean_line: + continue + + # Calculate tree level (approximately 2 chars per level) + level = indent // 2 + + # Color based on unit type + if ".target" in clean_line: + styled = f"[blue]{clean_line}[/blue]" + elif ".socket" in clean_line: + styled = f"[magenta]{clean_line}[/magenta]" + elif ".service" in clean_line: + styled = f"[green]{clean_line}[/green]" + elif ".slice" in clean_line: + styled = f"[cyan]{clean_line}[/cyan]" + else: + styled = clean_line + + # Add to tree with fallback to root + parent_level = max(0, level - 1) + if parent_level in current_nodes: + new_node = current_nodes[parent_level].add(styled) + current_nodes[level] = new_node + else: + # Fallback to root if parent not found + new_node = tree.add(styled) + current_nodes[level] = new_node + except Exception: + # Skip malformed lines + continue + + return tree + + def generate_unit_file( + self, + description: str, + exec_start: str, + service_name: str | None = None, + user: str | None = None, + working_dir: str | None = None, + restart: bool = True, + after: list[str] | None = None, + environment: dict[str, str] | None = None, + ) -> str: + """Generate a systemd unit file from simple parameters. + + Args: + description: Human-readable description of the service. + exec_start: Command to run when starting the service. + service_name: Name for the service (optional). + user: User to run the service as (optional). + working_dir: Working directory for the service (optional). + restart: Whether to restart on failure (default True). + after: List of units to start after (optional). + environment: Environment variables to set (optional). + + Returns: + Complete systemd unit file content as a string. + """ + lines = [] + + # [Unit] section + lines.append("[Unit]") + lines.append(f"Description={description}") + + if after: + lines.append(f"After={' '.join(after)}") + else: + lines.append("After=network.target") + + lines.append("") + + # [Service] section + lines.append("[Service]") + lines.append("Type=simple") + lines.append(f"ExecStart={exec_start}") + + if user: + lines.append(f"User={user}") + + if working_dir: + lines.append(f"WorkingDirectory={working_dir}") + + if environment: + for key, value in environment.items(): + # Escape special characters for systemd Environment= + # Must escape: backslash, double quote, dollar sign, backtick + # Must remove: newlines (not supported in Environment=) + needs_quoting = any(c in value for c in ' "$`\n\\') + if needs_quoting: + # Remove newlines (not supported) + escaped_value = value.replace("\n", " ") + # Escape in correct order: backslash first, then others + escaped_value = escaped_value.replace("\\", "\\\\") + escaped_value = escaped_value.replace("$", "\\$") + escaped_value = escaped_value.replace("`", "\\`") + escaped_value = escaped_value.replace('"', '\\"') + lines.append(f'Environment={key}="{escaped_value}"') + else: + lines.append(f"Environment={key}={value}") + + if restart: + lines.append("Restart=on-failure") + lines.append("RestartSec=5") + + lines.append("") + + # [Install] section + lines.append("[Install]") + lines.append("WantedBy=multi-user.target") + lines.append("") + + return "\n".join(lines) + + def interactive_unit_generator(self) -> str: + """Interactive CLI wizard for generating a unit file. + + Asks the user simple questions and generates a complete + systemd unit file based on their answers. + + Returns: + Generated unit file content as a string. + """ + console.print("\n[bold cyan]Systemd Unit File Generator[/bold cyan]") + console.print("Answer a few questions to create your service file.\n") + + # Service name + service_name = Prompt.ask("[cyan]Service name[/cyan]", default="my-service") + + # Description + description = Prompt.ask( + "[cyan]What does this service do?[/cyan]", default="My custom service" + ) + + # Command + exec_start = Prompt.ask("[cyan]Command to run[/cyan]", default="/usr/local/bin/my-app") + + # User + run_as_root = Confirm.ask("[cyan]Run as root?[/cyan]", default=False) + user = None + if not run_as_root: + user = Prompt.ask("[cyan]Run as which user?[/cyan]", default="nobody") + + # Working directory + has_workdir = Confirm.ask("[cyan]Set a working directory?[/cyan]", default=False) + working_dir = None + if has_workdir: + working_dir = Prompt.ask( + "[cyan]Working directory path[/cyan]", default="/var/lib/my-service" + ) + + # Restart on failure + restart = Confirm.ask("[cyan]Restart automatically on failure?[/cyan]", default=True) + + # Start on boot + start_on_boot = Confirm.ask("[cyan]Start automatically on boot?[/cyan]", default=True) + + # Generate the unit file + unit_content = self.generate_unit_file( + description=description, + exec_start=exec_start, + service_name=service_name, + user=user, + working_dir=working_dir, + restart=restart, + ) + + console.print("\n[bold green]Generated Unit File:[/bold green]\n") + console.print(Panel(unit_content, title=f"{service_name}.service", border_style="green")) + + # Installation instructions + console.print("\n[bold]Installation Instructions:[/bold]") + console.print(f"1. Save to: sudo tee /etc/systemd/system/{service_name}.service") + console.print("2. Reload systemd: sudo systemctl daemon-reload") + if start_on_boot: + console.print(f"3. Enable service: sudo systemctl enable {service_name}") + console.print(f"4. Start service: sudo systemctl start {service_name}") + console.print(f"5. Check status: sudo systemctl status {service_name}") + + return unit_content + + +def run_status_command(service_name: str) -> None: + """Run the status explanation command. + + Args: + service_name: Name of the service to check. + """ + helper = SystemdHelper() + explanation = helper.explain_status(service_name) + console.print(Panel(explanation, title="Service Status", border_style="cyan")) + + +def run_diagnose_command(service_name: str, lines: int = 50) -> None: + """Run the diagnostic command for a failed service. + + Args: + service_name: Name of the service to diagnose. + lines: Number of log lines to analyze. + """ + helper = SystemdHelper() + report = helper.diagnose_failure(service_name, lines=lines) + console.print(report) + + +def run_deps_command(service_name: str) -> None: + """Show dependencies for a service. + + Args: + service_name: Name of the service. + """ + helper = SystemdHelper() + tree = helper.show_dependencies(service_name) + console.print("\n[bold]Service Dependencies:[/bold]") + console.print(tree) + + +def run_generate_command() -> None: + """Run the interactive unit file generator.""" + helper = SystemdHelper() + helper.interactive_unit_generator() diff --git a/demo_systemd.cast b/demo_systemd.cast new file mode 100644 index 00000000..8589b86b --- /dev/null +++ b/demo_systemd.cast @@ -0,0 +1,122 @@ +{"version": 2, "width": 100, "height": 30, "timestamp": 1768059791, "title": "Cortex Systemd Helper Demo", "env": {"SHELL": "/bin/bash", "TERM": "xterm-256color"}} +[0.5, "o", "\u001b[1;36m=== Cortex Systemd Helper Demo ===\u001b[0m\r\n\r\n"] +[0.8, "o", "\u001b[1;33m1. Plain-English service status explanation:\u001b[0m\r\n"] +[1.1, "o", "\r\n\u001b[32m$\u001b[0m "] +[1.1300000000000001, "o", "c"] +[1.1600000000000001, "o", "o"] +[1.1900000000000002, "o", "r"] +[1.2200000000000002, "o", "t"] +[1.2500000000000002, "o", "e"] +[1.2800000000000002, "o", "x"] +[1.3100000000000003, "o", " "] +[1.3400000000000003, "o", "s"] +[1.3700000000000003, "o", "y"] +[1.4000000000000004, "o", "s"] +[1.4300000000000004, "o", "t"] +[1.4600000000000004, "o", "e"] +[1.4900000000000004, "o", "m"] +[1.5200000000000005, "o", "d"] +[1.5500000000000005, "o", " "] +[1.5800000000000005, "o", "s"] +[1.6100000000000005, "o", "t"] +[1.6400000000000006, "o", "a"] +[1.6700000000000006, "o", "t"] +[1.7000000000000006, "o", "u"] +[1.7300000000000006, "o", "s"] +[1.7600000000000007, "o", " "] +[1.7900000000000007, "o", "s"] +[1.8200000000000007, "o", "s"] +[1.8500000000000008, "o", "h"] +[1.9500000000000008, "o", "\r\n"] +[1.9700000000000009, "o", "\u256d\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500 Service Status \u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u256e\r\n"] +[1.9900000000000009, "o", "\u2502 'ssh' is running normally. \u2502\r\n"] +[2.0100000000000007, "o", "\u2502 Process ID: 2470 \u2502\r\n"] +[2.0300000000000007, "o", "\u2502 Started: Sat 2025-10-25 14:02:02 CDT \u2502\r\n"] +[2.0500000000000007, "o", "\u2502 Description: OpenBSD Secure Shell server \u2502\r\n"] +[2.0700000000000007, "o", "\u2570\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u256f\r\n"] +[2.0900000000000007, "o", "\r\n"] +[3.3900000000000006, "o", "\r\n\u001b[1;33m2. Visual dependency tree:\u001b[0m\r\n"] +[3.6900000000000004, "o", "\r\n\u001b[32m$\u001b[0m "] +[3.72, "o", "c"] +[3.75, "o", "o"] +[3.78, "o", "r"] +[3.8099999999999996, "o", "t"] +[3.8399999999999994, "o", "e"] +[3.869999999999999, "o", "x"] +[3.899999999999999, "o", " "] +[3.929999999999999, "o", "s"] +[3.9599999999999986, "o", "y"] +[3.9899999999999984, "o", "s"] +[4.019999999999999, "o", "t"] +[4.049999999999999, "o", "e"] +[4.079999999999999, "o", "m"] +[4.109999999999999, "o", "d"] +[4.14, "o", " "] +[4.17, "o", "d"] +[4.2, "o", "e"] +[4.23, "o", "p"] +[4.260000000000001, "o", "s"] +[4.290000000000001, "o", " "] +[4.320000000000001, "o", "s"] +[4.350000000000001, "o", "y"] +[4.380000000000002, "o", "s"] +[4.410000000000002, "o", "t"] +[4.440000000000002, "o", "e"] +[4.470000000000002, "o", "m"] +[4.500000000000003, "o", "d"] +[4.530000000000003, "o", "-"] +[4.560000000000003, "o", "j"] +[4.590000000000003, "o", "o"] +[4.620000000000004, "o", "u"] +[4.650000000000004, "o", "r"] +[4.680000000000004, "o", "n"] +[4.710000000000004, "o", "a"] +[4.740000000000005, "o", "l"] +[4.770000000000005, "o", "d"] +[4.8700000000000045, "o", "\r\n"] +[4.890000000000004, "o", "\r\n"] +[4.910000000000004, "o", "Service Dependencies:\r\n"] +[4.930000000000003, "o", "systemd-journald.service\r\n"] +[4.950000000000003, "o", "\u2514\u2500\u2500 \u25cf-.mount\r\n"] +[4.970000000000002, "o", " \u2514\u2500\u2500 \u25cfsystem.slice\r\n"] +[4.990000000000002, "o", " \u2514\u2500\u2500 \u25cfsystemd-journald-dev-log.socket\r\n"] +[5.010000000000002, "o", " \u2514\u2500\u2500 \u25cfsystemd-journald.socket\r\n"] +[5.030000000000001, "o", "\r\n"] +[6.330000000000001, "o", "\r\n\u001b[1;33m3. Available commands:\u001b[0m\r\n"] +[6.630000000000001, "o", "\r\n\u001b[32m$\u001b[0m "] +[6.660000000000001, "o", "c"] +[6.690000000000001, "o", "o"] +[6.7200000000000015, "o", "r"] +[6.750000000000002, "o", "t"] +[6.780000000000002, "o", "e"] +[6.810000000000002, "o", "x"] +[6.8400000000000025, "o", " "] +[6.870000000000003, "o", "s"] +[6.900000000000003, "o", "y"] +[6.930000000000003, "o", "s"] +[6.9600000000000035, "o", "t"] +[6.990000000000004, "o", "e"] +[7.020000000000004, "o", "m"] +[7.050000000000004, "o", "d"] +[7.0800000000000045, "o", " "] +[7.110000000000005, "o", "-"] +[7.140000000000005, "o", "-"] +[7.170000000000005, "o", "h"] +[7.2000000000000055, "o", "e"] +[7.230000000000006, "o", "l"] +[7.260000000000006, "o", "p"] +[7.360000000000006, "o", "\r\n"] +[7.380000000000005, "o", "usage: cortex systemd [-h] {status,diagnose,deps,generate} ...\r\n"] +[7.400000000000005, "o", "\r\n"] +[7.420000000000004, "o", "positional arguments:\r\n"] +[7.440000000000004, "o", " {status,diagnose,deps,generate}\r\n"] +[7.4600000000000035, "o", " Systemd actions\r\n"] +[7.480000000000003, "o", " status Explain a service's status in plain English\r\n"] +[7.500000000000003, "o", " diagnose Diagnose why a service failed\r\n"] +[7.520000000000002, "o", " deps Show service dependencies as a visual tree\r\n"] +[7.540000000000002, "o", " generate Interactive wizard to generate a systemd unit file\r\n"] +[7.560000000000001, "o", "\r\n"] +[7.580000000000001, "o", "options:\r\n"] +[7.6000000000000005, "o", " -h, --help show this help message and exit\r\n"] +[7.62, "o", "\r\n"] +[9.120000000000001, "o", "\r\n\u001b[1;32m=== Demo Complete ===\u001b[0m\r\n"] diff --git a/demo_systemd.sh b/demo_systemd.sh new file mode 100644 index 00000000..55761d18 --- /dev/null +++ b/demo_systemd.sh @@ -0,0 +1,36 @@ +#!/bin/bash +# Demo script for cortex systemd helper +# Run with: asciinema rec demo.cast && bash demo_systemd.sh + +echo "=== Cortex Systemd Helper Demo ===" +echo "" +sleep 1 + +echo "1. Check service status with plain-English explanation:" +echo " $ cortex systemd status ssh" +sleep 1 +cortex systemd status ssh +sleep 2 + +echo "" +echo "2. View service dependencies as a visual tree:" +echo " $ cortex systemd deps ssh" +sleep 1 +cortex systemd deps ssh +sleep 2 + +echo "" +echo "3. Diagnose a failed service (showing what it would look like):" +echo " $ cortex systemd diagnose some-failed-service" +sleep 1 +cortex systemd diagnose ssh 2>/dev/null || echo "[Would show diagnosis for failed services]" +sleep 2 + +echo "" +echo "4. Generate a systemd unit file interactively:" +echo " $ cortex systemd generate" +echo " [Interactive wizard would start here]" +sleep 2 + +echo "" +echo "=== Demo Complete ===" diff --git a/tests/test_systemd_helper.py b/tests/test_systemd_helper.py new file mode 100644 index 00000000..93db7d96 --- /dev/null +++ b/tests/test_systemd_helper.py @@ -0,0 +1,789 @@ +""" +Unit tests for cortex/systemd_helper.py - Systemd Service Helper. +Tests the SystemdHelper class used by 'cortex systemd' commands. +""" + +import subprocess +from unittest.mock import MagicMock, patch + +import pytest + +from cortex.systemd_helper import ( + JOURNALCTL_TIMEOUT, + SERVICE_NAME_PATTERN, + SYSTEMCTL_TIMEOUT, + ServiceState, + ServiceStatus, + SystemdHelper, + _validate_service_name, + run_deps_command, + run_diagnose_command, + run_generate_command, + run_status_command, +) + + +class TestServiceStateEnum: + """Test the ServiceState enum values.""" + + def test_all_states_exist(self): + assert ServiceState.RUNNING.value == "running" + assert ServiceState.STOPPED.value == "stopped" + assert ServiceState.FAILED.value == "failed" + assert ServiceState.INACTIVE.value == "inactive" + assert ServiceState.ACTIVATING.value == "activating" + assert ServiceState.DEACTIVATING.value == "deactivating" + assert ServiceState.UNKNOWN.value == "unknown" + + +class TestServiceStatusDataclass: + """Test the ServiceStatus dataclass.""" + + def test_basic_creation(self): + status = ServiceStatus(name="nginx", state=ServiceState.RUNNING) + assert status.name == "nginx" + assert status.state == ServiceState.RUNNING + assert status.description == "" + assert status.pid is None + assert status.exit_code is None + + def test_full_creation(self): + status = ServiceStatus( + name="nginx", + state=ServiceState.RUNNING, + description="A high performance web server", + load_state="loaded", + active_state="active", + sub_state="running", + pid=1234, + memory="50M", + cpu="100ns", + started_at="2024-01-01 12:00:00", + exit_code=0, + main_pid_code="exited", + ) + assert status.pid == 1234 + assert status.memory == "50M" + assert status.started_at == "2024-01-01 12:00:00" + + +class TestValidateServiceName: + """Test service name validation.""" + + def test_valid_simple_name(self): + assert _validate_service_name("nginx") == "nginx.service" + + def test_valid_with_suffix(self): + assert _validate_service_name("nginx.service") == "nginx.service" + + def test_valid_with_hyphen(self): + assert _validate_service_name("my-service") == "my-service.service" + + def test_valid_with_underscore(self): + assert _validate_service_name("my_service") == "my_service.service" + + def test_valid_with_dots(self): + assert _validate_service_name("my.service.name") == "my.service.name.service" + + def test_valid_with_at_sign(self): + # Instantiated services like getty@tty1 + assert _validate_service_name("getty@tty1") == "getty@tty1.service" + + def test_valid_with_numbers(self): + assert _validate_service_name("service123") == "service123.service" + + def test_invalid_empty(self): + with pytest.raises(ValueError, match="cannot be empty"): + _validate_service_name("") + + def test_invalid_shell_chars(self): + with pytest.raises(ValueError, match="Invalid service name"): + _validate_service_name("nginx; rm -rf /") + + def test_invalid_path_separator(self): + with pytest.raises(ValueError, match="Invalid service name"): + _validate_service_name("../../etc/passwd") + + def test_invalid_dollar_sign(self): + with pytest.raises(ValueError, match="Invalid service name"): + _validate_service_name("$HOME/service") + + def test_invalid_backtick(self): + with pytest.raises(ValueError, match="Invalid service name"): + _validate_service_name("`whoami`") + + +class TestServiceNamePattern: + """Test the SERVICE_NAME_PATTERN regex.""" + + def test_valid_patterns(self): + valid_names = [ + "nginx", + "my-service", + "my_service", + "service123", + "My.Service", + "getty@tty1", + "user-1000", + "a", + "ABC123", + ] + for name in valid_names: + assert SERVICE_NAME_PATTERN.match(name), f"{name} should be valid" + + def test_invalid_patterns(self): + invalid_names = [ + "", + " nginx", + "nginx ", + "nginx;rm", + "../test", + "$HOME", + "`cmd`", + "service\nname", + "service\tname", + ] + for name in invalid_names: + # Empty string doesn't match, others shouldn't match the full pattern + if name: + match = SERVICE_NAME_PATTERN.match(name) + # Should either not match or not match the full string + assert match is None or match.group() != name, f"{name!r} should be invalid" + + +class TestSystemdHelperInit: + """Test SystemdHelper initialization.""" + + def test_init_success(self): + mock_result = MagicMock(returncode=0) + with patch("subprocess.run", return_value=mock_result): + helper = SystemdHelper() + assert helper is not None + + def test_init_systemd_not_available(self): + mock_result = MagicMock(returncode=1) + with patch("subprocess.run", return_value=mock_result): + with pytest.raises(RuntimeError, match="systemd is not available"): + SystemdHelper() + + def test_init_systemctl_not_found(self): + with patch("subprocess.run", side_effect=FileNotFoundError): + with pytest.raises(RuntimeError, match="systemctl command not found"): + SystemdHelper() + + def test_init_timeout(self): + with patch("subprocess.run", side_effect=subprocess.TimeoutExpired("cmd", 5)): + with pytest.raises(RuntimeError, match="Timeout"): + SystemdHelper() + + +class TestGetServiceStatus: + """Test get_service_status method.""" + + def _create_helper(self): + mock_result = MagicMock(returncode=0) + with patch("subprocess.run", return_value=mock_result): + return SystemdHelper() + + def test_get_status_running(self): + helper = self._create_helper() + mock_output = """ActiveState=active +SubState=running +Description=The NGINX HTTP Server +MainPID=1234 +LoadState=loaded +MemoryCurrent=52428800 +""" + mock_result = MagicMock(returncode=0, stdout=mock_output) + with patch("subprocess.run", return_value=mock_result): + status = helper.get_service_status("nginx") + + assert status.name == "nginx" + assert status.state == ServiceState.RUNNING + assert status.pid == 1234 + assert status.description == "The NGINX HTTP Server" + + def test_get_status_failed(self): + helper = self._create_helper() + mock_output = """ActiveState=failed +SubState=failed +ExecMainStatus=1 +""" + mock_result = MagicMock(returncode=0, stdout=mock_output) + with patch("subprocess.run", return_value=mock_result): + status = helper.get_service_status("nginx") + + assert status.state == ServiceState.FAILED + assert status.exit_code == 1 + + def test_get_status_inactive(self): + helper = self._create_helper() + mock_output = """ActiveState=inactive +SubState=dead +""" + mock_result = MagicMock(returncode=0, stdout=mock_output) + with patch("subprocess.run", return_value=mock_result): + status = helper.get_service_status("nginx") + + assert status.state == ServiceState.INACTIVE + + def test_get_status_activating(self): + helper = self._create_helper() + mock_output = """ActiveState=activating +SubState=start +""" + mock_result = MagicMock(returncode=0, stdout=mock_output) + with patch("subprocess.run", return_value=mock_result): + status = helper.get_service_status("nginx") + + assert status.state == ServiceState.ACTIVATING + + def test_get_status_deactivating(self): + helper = self._create_helper() + mock_output = """ActiveState=deactivating +SubState=stop-sigterm +""" + mock_result = MagicMock(returncode=0, stdout=mock_output) + with patch("subprocess.run", return_value=mock_result): + status = helper.get_service_status("nginx") + + assert status.state == ServiceState.DEACTIVATING + + def test_get_status_invalid_service_name(self): + helper = self._create_helper() + with pytest.raises(ValueError, match="Invalid service name"): + helper.get_service_status("nginx; rm -rf /") + + def test_get_status_command_fails(self): + helper = self._create_helper() + mock_result = MagicMock(returncode=1, stderr="Unit not found") + with patch("subprocess.run", return_value=mock_result): + with pytest.raises(RuntimeError, match="Failed to get service status"): + helper.get_service_status("nginx") + + def test_get_status_filenotfound(self): + helper = self._create_helper() + with patch("subprocess.run", side_effect=FileNotFoundError): + with pytest.raises(RuntimeError, match="systemctl command not found"): + helper.get_service_status("nginx") + + def test_get_status_timeout(self): + helper = self._create_helper() + with patch("subprocess.run", side_effect=subprocess.TimeoutExpired("cmd", 10)): + with pytest.raises(RuntimeError, match="Timeout"): + helper.get_service_status("nginx") + + +class TestExplainStatus: + """Test explain_status method.""" + + def _create_helper(self): + mock_result = MagicMock(returncode=0) + with patch("subprocess.run", return_value=mock_result): + return SystemdHelper() + + def test_explain_running(self): + helper = self._create_helper() + mock_output = """ActiveState=active +SubState=running +Description=Web Server +MainPID=1234 +ActiveEnterTimestamp=Mon 2024-01-01 12:00:00 UTC +""" + mock_result = MagicMock(returncode=0, stdout=mock_output) + with patch("subprocess.run", return_value=mock_result): + explanation = helper.explain_status("nginx") + + assert "running normally" in explanation + assert "1234" in explanation + + def test_explain_failed(self): + helper = self._create_helper() + mock_output = """ActiveState=failed +SubState=failed +ExecMainStatus=137 +""" + mock_result = MagicMock(returncode=0, stdout=mock_output) + with patch("subprocess.run", return_value=mock_result): + explanation = helper.explain_status("nginx") + + assert "failed" in explanation + assert "137" in explanation + assert "SIGKILL" in explanation or "killed" in explanation.lower() + + def test_explain_inactive(self): + helper = self._create_helper() + mock_output = """ActiveState=inactive +SubState=dead +""" + mock_result = MagicMock(returncode=0, stdout=mock_output) + with patch("subprocess.run", return_value=mock_result): + explanation = helper.explain_status("nginx") + + assert "not running" in explanation or "inactive" in explanation + + +class TestExplainExitCode: + """Test _explain_exit_code method.""" + + def _create_helper(self): + mock_result = MagicMock(returncode=0) + with patch("subprocess.run", return_value=mock_result): + return SystemdHelper() + + def test_common_codes(self): + helper = self._create_helper() + + # Test specific exit codes + assert "General error" in helper._explain_exit_code(1) + assert ( + "Misuse" in helper._explain_exit_code(2) + or "invalid" in helper._explain_exit_code(2).lower() + ) + assert ( + "not executable" in helper._explain_exit_code(126) + or "permission" in helper._explain_exit_code(126).lower() + ) + assert "not found" in helper._explain_exit_code(127) + assert ( + "SIGKILL" in helper._explain_exit_code(137) + or "killed" in helper._explain_exit_code(137).lower() + ) + assert "Segmentation" in helper._explain_exit_code( + 139 + ) or "SIGSEGV" in helper._explain_exit_code(139) + assert ( + "SIGTERM" in helper._explain_exit_code(143) + or "terminated" in helper._explain_exit_code(143).lower() + ) + + def test_signal_calculation(self): + helper = self._create_helper() + # Exit codes > 128 are signal codes (128 + signal number) + explanation = helper._explain_exit_code(129) # 128 + 1 = SIGHUP + assert "signal" in explanation.lower() + + def test_unknown_code(self): + helper = self._create_helper() + explanation = helper._explain_exit_code(42) + assert "42" in explanation + + +class TestDiagnoseFailure: + """Test diagnose_failure method.""" + + def _create_helper(self): + mock_result = MagicMock(returncode=0) + with patch("subprocess.run", return_value=mock_result): + return SystemdHelper() + + def test_diagnose_with_permission_error(self): + helper = self._create_helper() + + # First call: systemctl show + status_output = """ActiveState=failed +SubState=failed +ExecMainStatus=1 +""" + # Second call: journalctl + log_output = ( + "Jan 01 12:00:00 server myapp[1234]: Error: permission denied opening /var/log/app.log" + ) + + def mock_run(cmd, *args, **kwargs): + result = MagicMock() + result.returncode = 0 + if "show" in cmd: + result.stdout = status_output + else: + result.stdout = log_output + return result + + with patch("subprocess.run", side_effect=mock_run): + report = helper.diagnose_failure("myapp") + + assert "Permission issue" in report + + def test_diagnose_with_port_conflict(self): + helper = self._create_helper() + + status_output = "ActiveState=failed\nSubState=failed\n" + log_output = "Error: address already in use :8080" + + def mock_run(cmd, *args, **kwargs): + result = MagicMock() + result.returncode = 0 + if "show" in cmd: + result.stdout = status_output + else: + result.stdout = log_output + return result + + with patch("subprocess.run", side_effect=mock_run): + report = helper.diagnose_failure("myapp") + + assert "Port conflict" in report + + def test_diagnose_lines_validation(self): + helper = self._create_helper() + + status_output = "ActiveState=active\nSubState=running\n" + log_output = "" + + def mock_run(cmd, *args, **kwargs): + result = MagicMock() + result.returncode = 0 + if "show" in cmd: + result.stdout = status_output + else: + result.stdout = log_output + # Verify lines parameter is clamped + if "-n" in cmd: + idx = cmd.index("-n") + lines_val = int(cmd[idx + 1]) + assert 1 <= lines_val <= 1000 + return result + + with patch("subprocess.run", side_effect=mock_run): + # Test negative value gets clamped to 1 + helper.diagnose_failure("myapp", lines=-10) + # Test huge value gets clamped to 1000 + helper.diagnose_failure("myapp", lines=999999) + + def test_diagnose_journalctl_not_found(self): + helper = self._create_helper() + + status_output = "ActiveState=failed\nSubState=failed\n" + + def mock_run(cmd, *args, **kwargs): + if "journalctl" in cmd: + raise FileNotFoundError() + result = MagicMock() + result.returncode = 0 + result.stdout = status_output + return result + + with patch("subprocess.run", side_effect=mock_run): + report = helper.diagnose_failure("myapp") + + assert "journalctl command not found" in report + + def test_diagnose_journalctl_timeout(self): + helper = self._create_helper() + + status_output = "ActiveState=failed\nSubState=failed\n" + + def mock_run(cmd, *args, **kwargs): + if "journalctl" in cmd: + raise subprocess.TimeoutExpired("journalctl", 30) + result = MagicMock() + result.returncode = 0 + result.stdout = status_output + return result + + with patch("subprocess.run", side_effect=mock_run): + report = helper.diagnose_failure("myapp") + + assert "Timeout" in report + + +class TestShowDependencies: + """Test show_dependencies method.""" + + def _create_helper(self): + mock_result = MagicMock(returncode=0) + with patch("subprocess.run", return_value=mock_result): + return SystemdHelper() + + def test_show_deps_success(self): + helper = self._create_helper() + + # First call: init check, Second call: dependencies + deps_output = """nginx.service +├─system.slice +│ └─-.slice +└─network.target + └─network-pre.target +""" + mock_result = MagicMock(returncode=0, stdout=deps_output) + with patch("subprocess.run", return_value=mock_result): + tree = helper.show_dependencies("nginx") + + # Tree should exist + assert tree is not None + + def test_show_deps_invalid_service_name(self): + helper = self._create_helper() + with pytest.raises(ValueError, match="Invalid service name"): + helper.show_dependencies("nginx; rm -rf /") + + def test_show_deps_filenotfound(self): + helper = self._create_helper() + with patch("subprocess.run", side_effect=FileNotFoundError): + tree = helper.show_dependencies("nginx") + # Should return a tree with error message + assert tree is not None + + def test_show_deps_timeout(self): + helper = self._create_helper() + with patch("subprocess.run", side_effect=subprocess.TimeoutExpired("cmd", 10)): + tree = helper.show_dependencies("nginx") + # Should return a tree with timeout message + assert tree is not None + + +class TestGenerateUnitFile: + """Test generate_unit_file method.""" + + def _create_helper(self): + mock_result = MagicMock(returncode=0) + with patch("subprocess.run", return_value=mock_result): + return SystemdHelper() + + def test_generate_basic(self): + helper = self._create_helper() + unit = helper.generate_unit_file( + description="My Test Service", + exec_start="/usr/bin/myapp", + ) + + assert "[Unit]" in unit + assert "Description=My Test Service" in unit + assert "[Service]" in unit + assert "ExecStart=/usr/bin/myapp" in unit + assert "[Install]" in unit + assert "WantedBy=multi-user.target" in unit + + def test_generate_with_user(self): + helper = self._create_helper() + unit = helper.generate_unit_file( + description="My Service", + exec_start="/usr/bin/myapp", + user="nobody", + ) + + assert "User=nobody" in unit + + def test_generate_with_working_dir(self): + helper = self._create_helper() + unit = helper.generate_unit_file( + description="My Service", + exec_start="/usr/bin/myapp", + working_dir="/var/lib/myapp", + ) + + assert "WorkingDirectory=/var/lib/myapp" in unit + + def test_generate_with_restart(self): + helper = self._create_helper() + unit = helper.generate_unit_file( + description="My Service", + exec_start="/usr/bin/myapp", + restart=True, + ) + + assert "Restart=on-failure" in unit + assert "RestartSec=5" in unit + + def test_generate_without_restart(self): + helper = self._create_helper() + unit = helper.generate_unit_file( + description="My Service", + exec_start="/usr/bin/myapp", + restart=False, + ) + + assert "Restart=" not in unit + + def test_generate_with_after(self): + helper = self._create_helper() + unit = helper.generate_unit_file( + description="My Service", + exec_start="/usr/bin/myapp", + after=["postgresql.service", "redis.service"], + ) + + assert "After=postgresql.service redis.service" in unit + + def test_generate_with_simple_environment(self): + helper = self._create_helper() + unit = helper.generate_unit_file( + description="My Service", + exec_start="/usr/bin/myapp", + environment={"FOO": "bar", "DEBUG": "true"}, + ) + + assert "Environment=FOO=bar" in unit + assert "Environment=DEBUG=true" in unit + + def test_generate_with_escaped_environment(self): + helper = self._create_helper() + unit = helper.generate_unit_file( + description="My Service", + exec_start="/usr/bin/myapp", + environment={ + "PATH_WITH_SPACE": "/some path/with spaces", + "DOLLAR_VAR": "$HOME/test", + "BACKTICK_VAR": "`whoami`", + "QUOTE_VAR": 'say "hello"', + "NEWLINE_VAR": "line1\nline2", + "BACKSLASH_VAR": "C:\\Windows\\Path", + }, + ) + + # Check proper escaping + assert 'Environment=PATH_WITH_SPACE="/some path/with spaces"' in unit + assert r'Environment=DOLLAR_VAR="\$HOME/test"' in unit + assert r'Environment=BACKTICK_VAR="\`whoami\`"' in unit + assert r'Environment=QUOTE_VAR="say \"hello\""' in unit + # Newlines should be replaced with spaces + assert "line1 line2" in unit + # Backslashes should be escaped + assert r"\\" in unit + + +class TestInteractiveUnitGenerator: + """Test interactive_unit_generator method.""" + + def _create_helper(self): + mock_result = MagicMock(returncode=0) + with patch("subprocess.run", return_value=mock_result): + return SystemdHelper() + + def test_interactive_generator(self): + helper = self._create_helper() + + # Mock user inputs + with patch("cortex.systemd_helper.Prompt.ask") as mock_prompt: + with patch("cortex.systemd_helper.Confirm.ask") as mock_confirm: + with patch("cortex.systemd_helper.console.print"): + mock_prompt.side_effect = [ + "my-service", # service name + "My Test Service", # description + "/usr/bin/myapp", # command + "appuser", # user + "/var/lib/myapp", # working dir + ] + mock_confirm.side_effect = [ + False, # run as root + True, # set working dir + True, # restart on failure + True, # start on boot + ] + + unit = helper.interactive_unit_generator() + + assert "Description=My Test Service" in unit + assert "ExecStart=/usr/bin/myapp" in unit + + +class TestRunCommands: + """Test the run_* command functions.""" + + def test_run_status_command(self): + status_output = "ActiveState=active\nSubState=running\n" + mock_result = MagicMock(returncode=0, stdout=status_output) + + with patch("subprocess.run", return_value=mock_result): + with patch("cortex.systemd_helper.console.print"): + run_status_command("nginx") + + def test_run_diagnose_command(self): + status_output = "ActiveState=failed\nSubState=failed\n" + log_output = "Some logs here" + + def mock_run(cmd, *args, **kwargs): + result = MagicMock() + result.returncode = 0 + if "show" in cmd: + result.stdout = status_output + else: + result.stdout = log_output + return result + + with patch("subprocess.run", side_effect=mock_run): + with patch("cortex.systemd_helper.console.print"): + run_diagnose_command("nginx", lines=50) + + def test_run_deps_command(self): + deps_output = "nginx.service\n├─system.slice\n" + mock_result = MagicMock(returncode=0, stdout=deps_output) + + with patch("subprocess.run", return_value=mock_result): + with patch("cortex.systemd_helper.console.print"): + run_deps_command("nginx") + + def test_run_generate_command(self): + mock_result = MagicMock(returncode=0) + + with patch("subprocess.run", return_value=mock_result): + with patch("cortex.systemd_helper.Prompt.ask") as mock_prompt: + with patch("cortex.systemd_helper.Confirm.ask") as mock_confirm: + with patch("cortex.systemd_helper.console.print"): + mock_prompt.side_effect = [ + "test-service", + "Test Service", + "/usr/bin/test", + "testuser", + "/var/lib/test", + ] + mock_confirm.side_effect = [False, True, True, True] + run_generate_command() + + +class TestConstants: + """Test module constants.""" + + def test_timeout_constants_reasonable(self): + assert SYSTEMCTL_TIMEOUT > 0 + assert SYSTEMCTL_TIMEOUT <= 60 + assert JOURNALCTL_TIMEOUT > 0 + assert JOURNALCTL_TIMEOUT <= 120 + + +class TestEdgeCases: + """Test edge cases and error handling.""" + + def _create_helper(self): + mock_result = MagicMock(returncode=0) + with patch("subprocess.run", return_value=mock_result): + return SystemdHelper() + + def test_empty_systemctl_output(self): + helper = self._create_helper() + mock_result = MagicMock(returncode=0, stdout="") + + with patch("subprocess.run", return_value=mock_result): + status = helper.get_service_status("nginx") + + assert status.state == ServiceState.UNKNOWN + + def test_malformed_systemctl_output(self): + helper = self._create_helper() + mock_result = MagicMock(returncode=0, stdout="no equals sign here\n") + + with patch("subprocess.run", return_value=mock_result): + status = helper.get_service_status("nginx") + + # Should handle gracefully + assert status.state == ServiceState.UNKNOWN + + def test_environment_with_all_special_chars(self): + helper = self._create_helper() + unit = helper.generate_unit_file( + description="Test", + exec_start="/bin/test", + environment={ + "COMPLEX": 'Value with $var, `cmd`, "quotes", \\backslash, and\nnewline', + }, + ) + + # Should produce valid output without crashes + assert "Environment=COMPLEX=" in unit + # Critical characters should be escaped + assert r"\$" in unit + assert r"\`" in unit + + +if __name__ == "__main__": + pytest.main([__file__, "-v"])