diff --git a/cycode/cli/app.py b/cycode/cli/app.py index 3ef0b322..e838519e 100644 --- a/cycode/cli/app.py +++ b/cycode/cli/app.py @@ -9,7 +9,7 @@ from typer.completion import install_callback, show_callback from cycode import __version__ -from cycode.cli.apps import ai_remediation, auth, configure, ignore, report, report_import, scan, status +from cycode.cli.apps import ai_guardrails, ai_remediation, auth, configure, ignore, report, report_import, scan, status if sys.version_info >= (3, 10): from cycode.cli.apps import mcp @@ -45,6 +45,7 @@ add_completion=False, # we add it manually to control the rich help panel ) +app.add_typer(ai_guardrails.app) app.add_typer(ai_remediation.app) app.add_typer(auth.app) app.add_typer(configure.app) diff --git a/cycode/cli/apps/ai_guardrails/__init__.py b/cycode/cli/apps/ai_guardrails/__init__.py new file mode 100644 index 00000000..f8486ed4 --- /dev/null +++ b/cycode/cli/apps/ai_guardrails/__init__.py @@ -0,0 +1,19 @@ +import typer + +from cycode.cli.apps.ai_guardrails.install_command import install_command +from cycode.cli.apps.ai_guardrails.scan.scan_command import scan_command +from cycode.cli.apps.ai_guardrails.status_command import status_command +from cycode.cli.apps.ai_guardrails.uninstall_command import uninstall_command + +app = typer.Typer(name='ai-guardrails', no_args_is_help=True, hidden=True) + +app.command(hidden=True, name='install', short_help='Install AI guardrails hooks for supported IDEs.')(install_command) +app.command(hidden=True, name='uninstall', short_help='Remove AI guardrails hooks from supported IDEs.')( + uninstall_command +) +app.command(hidden=True, name='status', short_help='Show AI guardrails hook installation status.')(status_command) +app.command( + hidden=True, + name='scan', + short_help='Scan content from AI IDE hooks for secrets (reads JSON from stdin).', +)(scan_command) diff --git a/cycode/cli/apps/ai_guardrails/command_utils.py b/cycode/cli/apps/ai_guardrails/command_utils.py new file mode 100644 index 00000000..e010f0a2 --- /dev/null +++ b/cycode/cli/apps/ai_guardrails/command_utils.py @@ -0,0 +1,66 @@ +"""Common utilities for AI guardrails commands.""" + +import os +from pathlib import Path +from typing import Optional + +import typer +from rich.console import Console + +from cycode.cli.apps.ai_guardrails.consts import AIIDEType + +console = Console() + + +def validate_and_parse_ide(ide: str) -> AIIDEType: + """Validate IDE parameter and convert to AIIDEType enum. + + Args: + ide: IDE name string (e.g., 'cursor') + + Returns: + AIIDEType enum value + + Raises: + typer.Exit: If IDE is invalid + """ + try: + return AIIDEType(ide.lower()) + except ValueError: + valid_ides = ', '.join([ide_type.value for ide_type in AIIDEType]) + console.print( + f'[red]Error:[/] Invalid IDE "{ide}". Supported IDEs: {valid_ides}', + style='bold red', + ) + raise typer.Exit(1) from None + + +def validate_scope(scope: str, allowed_scopes: tuple[str, ...] = ('user', 'repo')) -> None: + """Validate scope parameter. + + Args: + scope: Scope string to validate + allowed_scopes: Tuple of allowed scope values + + Raises: + typer.Exit: If scope is invalid + """ + if scope not in allowed_scopes: + scopes_list = ', '.join(f'"{s}"' for s in allowed_scopes) + console.print(f'[red]Error:[/] Invalid scope. Use {scopes_list}.', style='bold red') + raise typer.Exit(1) + + +def resolve_repo_path(scope: str, repo_path: Optional[Path]) -> Optional[Path]: + """Resolve repository path, defaulting to current directory for repo scope. + + Args: + scope: The command scope ('user' or 'repo') + repo_path: Provided repo path or None + + Returns: + Resolved Path for repo scope, None for user scope + """ + if scope == 'repo' and repo_path is None: + return Path(os.getcwd()) + return repo_path diff --git a/cycode/cli/apps/ai_guardrails/consts.py b/cycode/cli/apps/ai_guardrails/consts.py new file mode 100644 index 00000000..21d89a3f --- /dev/null +++ b/cycode/cli/apps/ai_guardrails/consts.py @@ -0,0 +1,78 @@ +"""Constants for AI guardrails hooks management. + +Currently supports: +- Cursor + +To add a new IDE (e.g., Claude Code): +1. Add new value to AIIDEType enum +2. Create _get__hooks_dir() function with platform-specific paths +3. Add entry to IDE_CONFIGS dict with IDE-specific hook event names +4. Unhide --ide option in commands (install, uninstall, status) +""" + +import platform +from enum import Enum +from pathlib import Path +from typing import NamedTuple + + +class AIIDEType(str, Enum): + """Supported AI IDE types.""" + + CURSOR = 'cursor' + + +class IDEConfig(NamedTuple): + """Configuration for an AI IDE.""" + + name: str + hooks_dir: Path + repo_hooks_subdir: str # Subdirectory in repo for hooks (e.g., '.cursor') + hooks_file_name: str + hook_events: list[str] # List of supported hook event names for this IDE + + +def _get_cursor_hooks_dir() -> Path: + """Get Cursor hooks directory based on platform.""" + if platform.system() == 'Darwin': + return Path.home() / '.cursor' + if platform.system() == 'Windows': + return Path.home() / 'AppData' / 'Roaming' / 'Cursor' + # Linux + return Path.home() / '.config' / 'Cursor' + + +# IDE-specific configurations +IDE_CONFIGS: dict[AIIDEType, IDEConfig] = { + AIIDEType.CURSOR: IDEConfig( + name='Cursor', + hooks_dir=_get_cursor_hooks_dir(), + repo_hooks_subdir='.cursor', + hooks_file_name='hooks.json', + hook_events=['beforeSubmitPrompt', 'beforeReadFile', 'beforeMCPExecution'], + ), +} + +# Default IDE +DEFAULT_IDE = AIIDEType.CURSOR + +# Command used in hooks +CYCODE_SCAN_PROMPT_COMMAND = 'cycode ai-guardrails scan' + + +def get_hooks_config(ide: AIIDEType) -> dict: + """Get the hooks configuration for a specific IDE. + + Args: + ide: The AI IDE type + + Returns: + Dict with hooks configuration for the specified IDE + """ + config = IDE_CONFIGS[ide] + hooks = {event: [{'command': CYCODE_SCAN_PROMPT_COMMAND}] for event in config.hook_events} + + return { + 'version': 1, + 'hooks': hooks, + } diff --git a/cycode/cli/apps/ai_guardrails/hooks_manager.py b/cycode/cli/apps/ai_guardrails/hooks_manager.py new file mode 100644 index 00000000..42f879f6 --- /dev/null +++ b/cycode/cli/apps/ai_guardrails/hooks_manager.py @@ -0,0 +1,200 @@ +""" +Hooks manager for AI guardrails. + +Handles installation, removal, and status checking of AI IDE hooks. +Supports multiple IDEs: Cursor, Claude Code (future). +""" + +import json +from pathlib import Path +from typing import Optional + +from cycode.cli.apps.ai_guardrails.consts import ( + CYCODE_SCAN_PROMPT_COMMAND, + DEFAULT_IDE, + IDE_CONFIGS, + AIIDEType, + get_hooks_config, +) +from cycode.logger import get_logger + +logger = get_logger('AI Guardrails Hooks') + + +def get_hooks_path(scope: str, repo_path: Optional[Path] = None, ide: AIIDEType = DEFAULT_IDE) -> Path: + """Get the hooks.json path for the given scope and IDE. + + Args: + scope: 'user' for user-level hooks, 'repo' for repository-level hooks + repo_path: Repository path (required if scope is 'repo') + ide: The AI IDE type (default: Cursor) + """ + config = IDE_CONFIGS[ide] + if scope == 'repo' and repo_path: + return repo_path / config.repo_hooks_subdir / config.hooks_file_name + return config.hooks_dir / config.hooks_file_name + + +def load_hooks_file(hooks_path: Path) -> Optional[dict]: + """Load existing hooks.json file.""" + if not hooks_path.exists(): + return None + try: + content = hooks_path.read_text(encoding='utf-8') + return json.loads(content) + except Exception as e: + logger.debug('Failed to load hooks file', exc_info=e) + return None + + +def save_hooks_file(hooks_path: Path, hooks_config: dict) -> bool: + """Save hooks.json file.""" + try: + hooks_path.parent.mkdir(parents=True, exist_ok=True) + hooks_path.write_text(json.dumps(hooks_config, indent=2), encoding='utf-8') + return True + except Exception as e: + logger.error('Failed to save hooks file', exc_info=e) + return False + + +def is_cycode_hook_entry(entry: dict) -> bool: + """Check if a hook entry is from cycode-cli.""" + command = entry.get('command', '') + return CYCODE_SCAN_PROMPT_COMMAND in command + + +def install_hooks( + scope: str = 'user', repo_path: Optional[Path] = None, ide: AIIDEType = DEFAULT_IDE +) -> tuple[bool, str]: + """ + Install Cycode AI guardrails hooks. + + Args: + scope: 'user' for user-level hooks, 'repo' for repository-level hooks + repo_path: Repository path (required if scope is 'repo') + ide: The AI IDE type (default: Cursor) + + Returns: + Tuple of (success, message) + """ + hooks_path = get_hooks_path(scope, repo_path, ide) + + # Load existing hooks or create new + existing = load_hooks_file(hooks_path) or {'version': 1, 'hooks': {}} + existing.setdefault('version', 1) + existing.setdefault('hooks', {}) + + # Get IDE-specific hooks configuration + hooks_config = get_hooks_config(ide) + + # Add/update Cycode hooks + for event, entries in hooks_config['hooks'].items(): + existing['hooks'].setdefault(event, []) + + # Remove any existing Cycode entries for this event + existing['hooks'][event] = [e for e in existing['hooks'][event] if not is_cycode_hook_entry(e)] + + # Add new Cycode entries + for entry in entries: + existing['hooks'][event].append(entry) + + # Save + if save_hooks_file(hooks_path, existing): + return True, f'AI guardrails hooks installed: {hooks_path}' + return False, f'Failed to install hooks to {hooks_path}' + + +def uninstall_hooks( + scope: str = 'user', repo_path: Optional[Path] = None, ide: AIIDEType = DEFAULT_IDE +) -> tuple[bool, str]: + """ + Remove Cycode AI guardrails hooks. + + Args: + scope: 'user' for user-level hooks, 'repo' for repository-level hooks + repo_path: Repository path (required if scope is 'repo') + ide: The AI IDE type (default: Cursor) + + Returns: + Tuple of (success, message) + """ + hooks_path = get_hooks_path(scope, repo_path, ide) + + existing = load_hooks_file(hooks_path) + if existing is None: + return True, f'No hooks file found at {hooks_path}' + + # Remove Cycode entries from all events + modified = False + for event in list(existing.get('hooks', {}).keys()): + original_count = len(existing['hooks'][event]) + existing['hooks'][event] = [e for e in existing['hooks'][event] if not is_cycode_hook_entry(e)] + if len(existing['hooks'][event]) != original_count: + modified = True + # Remove empty event lists + if not existing['hooks'][event]: + del existing['hooks'][event] + + if not modified: + return True, 'No Cycode hooks found to remove' + + # Save or delete if empty + if not existing.get('hooks'): + try: + hooks_path.unlink() + return True, f'Removed hooks file: {hooks_path}' + except Exception as e: + logger.debug('Failed to delete hooks file', exc_info=e) + return False, f'Failed to remove hooks file: {hooks_path}' + + if save_hooks_file(hooks_path, existing): + return True, f'Cycode hooks removed from: {hooks_path}' + return False, f'Failed to update hooks file: {hooks_path}' + + +def get_hooks_status(scope: str = 'user', repo_path: Optional[Path] = None, ide: AIIDEType = DEFAULT_IDE) -> dict: + """ + Get the status of AI guardrails hooks. + + Args: + scope: 'user' for user-level hooks, 'repo' for repository-level hooks + repo_path: Repository path (required if scope is 'repo') + ide: The AI IDE type (default: Cursor) + + Returns: + Dict with status information + """ + hooks_path = get_hooks_path(scope, repo_path, ide) + + status = { + 'scope': scope, + 'ide': ide.value, + 'ide_name': IDE_CONFIGS[ide].name, + 'hooks_path': str(hooks_path), + 'file_exists': hooks_path.exists(), + 'cycode_installed': False, + 'hooks': {}, + } + + existing = load_hooks_file(hooks_path) + if existing is None: + return status + + # Check each hook event for this IDE + ide_config = IDE_CONFIGS[ide] + has_cycode_hooks = False + for event in ide_config.hook_events: + entries = existing.get('hooks', {}).get(event, []) + cycode_entries = [e for e in entries if is_cycode_hook_entry(e)] + if cycode_entries: + has_cycode_hooks = True + status['hooks'][event] = { + 'total_entries': len(entries), + 'cycode_entries': len(cycode_entries), + 'enabled': len(cycode_entries) > 0, + } + + status['cycode_installed'] = has_cycode_hooks + + return status diff --git a/cycode/cli/apps/ai_guardrails/install_command.py b/cycode/cli/apps/ai_guardrails/install_command.py new file mode 100644 index 00000000..6186752d --- /dev/null +++ b/cycode/cli/apps/ai_guardrails/install_command.py @@ -0,0 +1,78 @@ +"""Install command for AI guardrails hooks.""" + +from pathlib import Path +from typing import Annotated, Optional + +import typer + +from cycode.cli.apps.ai_guardrails.command_utils import ( + console, + resolve_repo_path, + validate_and_parse_ide, + validate_scope, +) +from cycode.cli.apps.ai_guardrails.consts import IDE_CONFIGS +from cycode.cli.apps.ai_guardrails.hooks_manager import install_hooks +from cycode.cli.utils.sentry import add_breadcrumb + + +def install_command( + ctx: typer.Context, + scope: Annotated[ + str, + typer.Option( + '--scope', + '-s', + help='Installation scope: "user" for all projects, "repo" for current repository only.', + ), + ] = 'user', + ide: Annotated[ + str, + typer.Option( + '--ide', + help='IDE to install hooks for (e.g., "cursor"). Defaults to cursor.', + ), + ] = 'cursor', + repo_path: Annotated[ + Optional[Path], + typer.Option( + '--repo-path', + help='Repository path for repo-scoped installation (defaults to current directory).', + exists=True, + file_okay=False, + dir_okay=True, + resolve_path=True, + ), + ] = None, +) -> None: + """Install AI guardrails hooks for supported IDEs. + + This command configures the specified IDE to use Cycode for scanning prompts, file reads, + and MCP tool calls for secrets before they are sent to AI models. + + Examples: + cycode ai-guardrails install # Install for all projects (user scope) + cycode ai-guardrails install --scope repo # Install for current repo only + cycode ai-guardrails install --ide cursor # Install for Cursor IDE + cycode ai-guardrails install --scope repo --repo-path /path/to/repo + """ + add_breadcrumb('ai-guardrails-install') + + # Validate inputs + validate_scope(scope) + repo_path = resolve_repo_path(scope, repo_path) + ide_type = validate_and_parse_ide(ide) + ide_name = IDE_CONFIGS[ide_type].name + success, message = install_hooks(scope, repo_path, ide=ide_type) + + if success: + console.print(f'[green]✓[/] {message}') + console.print() + console.print('[bold]Next steps:[/]') + console.print(f'1. Restart {ide_name} to activate the hooks') + console.print('2. (Optional) Customize policy in ~/.cycode/ai-guardrails.yaml') + console.print() + console.print('[dim]The hooks will scan prompts, file reads, and MCP tool calls for secrets.[/]') + else: + console.print(f'[red]✗[/] {message}', style='bold red') + raise typer.Exit(1) diff --git a/cycode/cli/apps/ai_guardrails/scan/__init__.py b/cycode/cli/apps/ai_guardrails/scan/__init__.py new file mode 100644 index 00000000..47349e78 --- /dev/null +++ b/cycode/cli/apps/ai_guardrails/scan/__init__.py @@ -0,0 +1 @@ +# Prompt scan command for AI guardrails (hooks) diff --git a/cycode/cli/apps/ai_guardrails/scan/consts.py b/cycode/cli/apps/ai_guardrails/scan/consts.py new file mode 100644 index 00000000..007892a8 --- /dev/null +++ b/cycode/cli/apps/ai_guardrails/scan/consts.py @@ -0,0 +1,48 @@ +""" +Constants and default configuration for AI guardrails. + +These defaults can be overridden by: +1. User-level config: ~/.cycode/ai-guardrails.yaml +2. Repo-level config: /.cycode/ai-guardrails.yaml +""" + +# Policy file name +POLICY_FILE_NAME = 'ai-guardrails.yaml' + +# Default policy configuration +DEFAULT_POLICY = { + 'version': 1, + 'mode': 'block', # block | warn + 'fail_open': True, # allow if scan fails/timeouts + 'secrets': { + 'scan_type': 'secret', + 'timeout_ms': 30000, + 'max_bytes': 200000, + }, + 'prompt': { + 'enabled': True, + 'action': 'block', + }, + 'file_read': { + 'enabled': True, + 'action': 'block', + 'deny_globs': [ + '.env', + '.env.*', + '*.pem', + '*.p12', + '*.key', + '.aws/**', + '.ssh/**', + '*kubeconfig*', + '.npmrc', + '.netrc', + ], + 'scan_content': True, + }, + 'mcp': { + 'enabled': True, + 'action': 'block', + 'scan_arguments': True, + }, +} diff --git a/cycode/cli/apps/ai_guardrails/scan/handlers.py b/cycode/cli/apps/ai_guardrails/scan/handlers.py new file mode 100644 index 00000000..95e9d606 --- /dev/null +++ b/cycode/cli/apps/ai_guardrails/scan/handlers.py @@ -0,0 +1,341 @@ +""" +Hook handlers for AI IDE events. + +Each handler receives a unified payload from an IDE, applies policy rules, +and returns a response that either allows or blocks the action. +""" + +import json +import os +from multiprocessing.pool import ThreadPool +from multiprocessing.pool import TimeoutError as PoolTimeoutError +from typing import Callable, Optional + +import typer + +from cycode.cli.apps.ai_guardrails.scan.payload import AIHookPayload +from cycode.cli.apps.ai_guardrails.scan.policy import get_policy_value +from cycode.cli.apps.ai_guardrails.scan.response_builders import get_response_builder +from cycode.cli.apps.ai_guardrails.scan.types import AiHookEventType, AIHookOutcome, BlockReason +from cycode.cli.apps.ai_guardrails.scan.utils import is_denied_path, truncate_utf8 +from cycode.cli.apps.scan.code_scanner import _get_scan_documents_thread_func +from cycode.cli.apps.scan.scan_parameters import get_scan_parameters +from cycode.cli.cli_types import ScanTypeOption, SeverityOption +from cycode.cli.models import Document +from cycode.cli.utils.progress_bar import DummyProgressBar, ScanProgressBarSection +from cycode.cli.utils.scan_utils import build_violation_summary +from cycode.logger import get_logger + +logger = get_logger('AI Guardrails') + + +def handle_before_submit_prompt(ctx: typer.Context, payload: AIHookPayload, policy: dict) -> dict: + """ + Handle beforeSubmitPrompt hook. + + Scans prompt text for secrets before it's sent to the AI model. + Returns {"continue": False} to block, {"continue": True} to allow. + """ + ai_client = ctx.obj['ai_security_client'] + ide = payload.ide_provider + response_builder = get_response_builder(ide) + + prompt_config = get_policy_value(policy, 'prompt', default={}) + ai_client.create_conversation(payload) + if not get_policy_value(prompt_config, 'enabled', default=True): + ai_client.create_event(payload, AiHookEventType.PROMPT, AIHookOutcome.ALLOWED) + return response_builder.allow_prompt() + + mode = get_policy_value(policy, 'mode', default='block') + prompt = payload.prompt or '' + max_bytes = get_policy_value(policy, 'secrets', 'max_bytes', default=200000) + timeout_ms = get_policy_value(policy, 'secrets', 'timeout_ms', default=30000) + clipped = truncate_utf8(prompt, max_bytes) + + scan_id = None + block_reason = None + outcome = AIHookOutcome.ALLOWED + + try: + violation_summary, scan_id = _scan_text_for_secrets(ctx, clipped, timeout_ms) + + if ( + violation_summary + and get_policy_value(prompt_config, 'action', default='block') == 'block' + and mode == 'block' + ): + outcome = AIHookOutcome.BLOCKED + block_reason = BlockReason.SECRETS_IN_PROMPT + user_message = f'{violation_summary}. Remove secrets before sending.' + response = response_builder.deny_prompt(user_message) + else: + if violation_summary: + outcome = AIHookOutcome.WARNED + response = response_builder.allow_prompt() + return response + except Exception as e: + outcome = ( + AIHookOutcome.ALLOWED if get_policy_value(policy, 'fail_open', default=True) else AIHookOutcome.BLOCKED + ) + block_reason = BlockReason.SCAN_FAILURE if outcome == AIHookOutcome.BLOCKED else None + raise e + finally: + ai_client.create_event( + payload, + AiHookEventType.PROMPT, + outcome, + scan_id=scan_id, + block_reason=block_reason, + ) + + +def handle_before_read_file(ctx: typer.Context, payload: AIHookPayload, policy: dict) -> dict: + """ + Handle beforeReadFile hook. + + Blocks sensitive files (via deny_globs) and scans file content for secrets. + Returns {"permission": "deny"} to block, {"permission": "allow"} to allow. + """ + ai_client = ctx.obj['ai_security_client'] + ide = payload.ide_provider + response_builder = get_response_builder(ide) + + file_read_config = get_policy_value(policy, 'file_read', default={}) + ai_client.create_conversation(payload) + if not get_policy_value(file_read_config, 'enabled', default=True): + ai_client.create_event(payload, AiHookEventType.FILE_READ, AIHookOutcome.ALLOWED) + return response_builder.allow_permission() + + mode = get_policy_value(policy, 'mode', default='block') + file_path = payload.file_path or '' + action = get_policy_value(file_read_config, 'action', default='block') + + scan_id = None + block_reason = None + outcome = AIHookOutcome.ALLOWED + + try: + # Check path-based denylist first + if is_denied_path(file_path, policy) and action == 'block': + outcome = AIHookOutcome.BLOCKED + block_reason = BlockReason.SENSITIVE_PATH + user_message = f'Cycode blocked sending {file_path} to the AI (sensitive path policy).' + return response_builder.deny_permission( + user_message, + 'This file path is classified as sensitive; do not read/send it to the model.', + ) + + # Scan file content if enabled + if get_policy_value(file_read_config, 'scan_content', default=True): + violation_summary, scan_id = _scan_path_for_secrets(ctx, file_path, policy) + if violation_summary and action == 'block' and mode == 'block': + outcome = AIHookOutcome.BLOCKED + block_reason = BlockReason.SECRETS_IN_FILE + user_message = f'Cycode blocked reading {file_path}. {violation_summary}' + return response_builder.deny_permission( + user_message, + 'Secrets detected; do not send this file to the model.', + ) + if violation_summary: + outcome = AIHookOutcome.WARNED + return response_builder.allow_permission() + + return response_builder.allow_permission() + except Exception as e: + outcome = ( + AIHookOutcome.ALLOWED if get_policy_value(policy, 'fail_open', default=True) else AIHookOutcome.BLOCKED + ) + block_reason = BlockReason.SCAN_FAILURE if outcome == AIHookOutcome.BLOCKED else None + raise e + finally: + ai_client.create_event( + payload, + AiHookEventType.FILE_READ, + outcome, + scan_id=scan_id, + block_reason=block_reason, + ) + + +def handle_before_mcp_execution(ctx: typer.Context, payload: AIHookPayload, policy: dict) -> dict: + """ + Handle beforeMCPExecution hook. + + Scans tool arguments for secrets before MCP tool execution. + Returns {"permission": "deny"} to block, {"permission": "ask"} to warn, + {"permission": "allow"} to allow. + """ + ai_client = ctx.obj['ai_security_client'] + ide = payload.ide_provider + response_builder = get_response_builder(ide) + + mcp_config = get_policy_value(policy, 'mcp', default={}) + ai_client.create_conversation(payload) + if not get_policy_value(mcp_config, 'enabled', default=True): + ai_client.create_event(payload, AiHookEventType.MCP_EXECUTION, AIHookOutcome.ALLOWED) + return response_builder.allow_permission() + + mode = get_policy_value(policy, 'mode', default='block') + tool = payload.mcp_tool_name or 'unknown' + args = payload.mcp_arguments or {} + args_text = args if isinstance(args, str) else json.dumps(args) + max_bytes = get_policy_value(policy, 'secrets', 'max_bytes', default=200000) + timeout_ms = get_policy_value(policy, 'secrets', 'timeout_ms', default=30000) + clipped = truncate_utf8(args_text, max_bytes) + action = get_policy_value(mcp_config, 'action', default='block') + + scan_id = None + block_reason = None + outcome = AIHookOutcome.ALLOWED + + try: + if get_policy_value(mcp_config, 'scan_arguments', default=True): + violation_summary, scan_id = _scan_text_for_secrets(ctx, clipped, timeout_ms) + if violation_summary: + if mode == 'block' and action == 'block': + outcome = AIHookOutcome.BLOCKED + block_reason = BlockReason.SECRETS_IN_MCP_ARGS + user_message = f'Cycode blocked MCP tool call "{tool}". {violation_summary}' + return response_builder.deny_permission( + user_message, + 'Do not pass secrets to tools. Use secret references (name/id) instead.', + ) + outcome = AIHookOutcome.WARNED + return response_builder.ask_permission( + f'{violation_summary} in MCP tool call "{tool}". Allow execution?', + 'Possible secrets detected in tool arguments; proceed with caution.', + ) + + return response_builder.allow_permission() + except Exception as e: + outcome = ( + AIHookOutcome.ALLOWED if get_policy_value(policy, 'fail_open', default=True) else AIHookOutcome.BLOCKED + ) + block_reason = BlockReason.SCAN_FAILURE if outcome == AIHookOutcome.BLOCKED else None + raise e + finally: + ai_client.create_event( + payload, + AiHookEventType.MCP_EXECUTION, + outcome, + scan_id=scan_id, + block_reason=block_reason, + ) + + +def get_handler_for_event(event_type: str) -> Optional[Callable[[typer.Context, AIHookPayload, dict], dict]]: + """Get the appropriate handler function for a canonical event type. + + Args: + event_type: Canonical event type string (from AiHookEventType enum) + + Returns: + Handler function or None if event type is not recognized + """ + handlers = { + AiHookEventType.PROMPT.value: handle_before_submit_prompt, + AiHookEventType.FILE_READ.value: handle_before_read_file, + AiHookEventType.MCP_EXECUTION.value: handle_before_mcp_execution, + } + return handlers.get(event_type) + + +def _setup_scan_context(ctx: typer.Context) -> typer.Context: + """Set up minimal context for scan_documents without progress bars or printing.""" + + # Set up minimal required context + ctx.obj['progress_bar'] = DummyProgressBar([ScanProgressBarSection]) + ctx.obj['sync'] = True # Synchronous scan + ctx.obj['scan_type'] = ScanTypeOption.SECRET # AI guardrails always scans for secrets + ctx.obj['severity_threshold'] = SeverityOption.INFO # Report all severities + + # Set command name for scan logic + ctx.info_name = 'ai_guardrails' + + return ctx + + +def _perform_scan( + ctx: typer.Context, documents: list[Document], scan_parameters: dict, timeout_seconds: float +) -> tuple[Optional[str], Optional[str]]: + """ + Perform a scan on documents and extract results. + + Returns tuple of (violation_summary, scan_id) if secrets found, (None, scan_id) if clean. + Raises exception if scan fails or times out (triggers fail_open policy). + """ + if not documents: + return None, None + + # Get the thread function for scanning + scan_batch_thread_func = _get_scan_documents_thread_func( + ctx, is_git_diff=False, is_commit_range=False, scan_parameters=scan_parameters + ) + + # Use ThreadPool.apply_async with timeout to abort if scan takes too long + # This uses the same ThreadPool mechanism as run_parallel_batched_scan but with timeout support + with ThreadPool(processes=1) as pool: + result = pool.apply_async(scan_batch_thread_func, (documents,)) + try: + scan_id, error, local_scan_result = result.get(timeout=timeout_seconds) + except PoolTimeoutError: + logger.debug('Scan timed out after %s seconds', timeout_seconds) + raise RuntimeError(f'Scan timed out after {timeout_seconds} seconds') from None + + # Check if scan failed - raise exception to trigger fail_open policy + if error: + raise RuntimeError(error.message) + + if not local_scan_result: + return None, None + + scan_id = local_scan_result.scan_id + + # Check if there are any detections + if local_scan_result.detections_count > 0: + violation_summary = build_violation_summary([local_scan_result]) + return violation_summary, scan_id + + return None, scan_id + + +def _scan_text_for_secrets(ctx: typer.Context, text: str, timeout_ms: int) -> tuple[Optional[str], Optional[str]]: + """ + Scan text content for secrets using Cycode CLI. + + Returns tuple of (violation_summary, scan_id) if secrets found, (None, scan_id) if clean. + Raises exception on error or timeout. + """ + if not text: + return None, None + + document = Document(path='prompt-content.txt', content=text, is_git_diff_format=False) + scan_ctx = _setup_scan_context(ctx) + timeout_seconds = timeout_ms / 1000.0 + return _perform_scan(scan_ctx, [document], get_scan_parameters(scan_ctx, None), timeout_seconds) + + +def _scan_path_for_secrets(ctx: typer.Context, file_path: str, policy: dict) -> tuple[Optional[str], Optional[str]]: + """ + Scan a file path for secrets. + + Returns tuple of (violation_summary, scan_id) if secrets found, (None, scan_id) if clean. + Raises exception on error or timeout. + """ + if not file_path or not os.path.exists(file_path): + return None, None + + with open(file_path, encoding='utf-8', errors='replace') as f: + content = f.read() + + # Truncate content based on policy max_bytes + max_bytes = get_policy_value(policy, 'secrets', 'max_bytes', default=200000) + content = truncate_utf8(content, max_bytes) + + # Get timeout from policy + timeout_ms = get_policy_value(policy, 'secrets', 'timeout_ms', default=30000) + timeout_seconds = timeout_ms / 1000.0 + + document = Document(path=os.path.basename(file_path), content=content, is_git_diff_format=False) + scan_ctx = _setup_scan_context(ctx) + return _perform_scan(scan_ctx, [document], get_scan_parameters(scan_ctx, (file_path,)), timeout_seconds) diff --git a/cycode/cli/apps/ai_guardrails/scan/payload.py b/cycode/cli/apps/ai_guardrails/scan/payload.py new file mode 100644 index 00000000..83787348 --- /dev/null +++ b/cycode/cli/apps/ai_guardrails/scan/payload.py @@ -0,0 +1,72 @@ +"""Unified payload object for AI hook events from different tools.""" + +from dataclasses import dataclass +from typing import Optional + +from cycode.cli.apps.ai_guardrails.scan.types import CURSOR_EVENT_MAPPING + + +@dataclass +class AIHookPayload: + """Unified payload object that normalizes field names from different AI tools.""" + + # Event identification + event_name: str # Canonical event type (e.g., 'prompt', 'file_read', 'mcp_execution') + conversation_id: Optional[str] = None + generation_id: Optional[str] = None + + # User and IDE information + ide_user_email: Optional[str] = None + model: Optional[str] = None + ide_provider: str = None # e.g., 'cursor', 'claude-code' + ide_version: Optional[str] = None + + # Event-specific data + prompt: Optional[str] = None # For prompt events + file_path: Optional[str] = None # For file_read events + mcp_server_name: Optional[str] = None # For mcp_execution events + mcp_tool_name: Optional[str] = None # For mcp_execution events + mcp_arguments: Optional[dict] = None # For mcp_execution events + + @classmethod + def from_cursor_payload(cls, payload: dict) -> 'AIHookPayload': + """Create AIHookPayload from Cursor IDE payload. + + Maps Cursor-specific event names to canonical event types. + """ + cursor_event_name = payload.get('hook_event_name', '') + # Map Cursor event name to canonical type, fallback to original if not found + canonical_event = CURSOR_EVENT_MAPPING.get(cursor_event_name, cursor_event_name) + + return cls( + event_name=canonical_event, + conversation_id=payload.get('conversation_id'), + generation_id=payload.get('generation_id'), + ide_user_email=payload.get('user_email'), + model=payload.get('model'), + ide_provider='cursor', + ide_version=payload.get('cursor_version'), + prompt=payload.get('prompt', ''), + file_path=payload.get('file_path') or payload.get('path'), + mcp_server_name=payload.get('command'), # MCP server name + mcp_tool_name=payload.get('tool_name') or payload.get('tool'), + mcp_arguments=payload.get('arguments') or payload.get('tool_input') or payload.get('input'), + ) + + @classmethod + def from_payload(cls, payload: dict, tool: str = 'cursor') -> 'AIHookPayload': + """Create AIHookPayload from any tool's payload. + + Args: + payload: The raw payload from the IDE + tool: The IDE/tool name (e.g., 'cursor') + + Returns: + AIHookPayload instance + + Raises: + ValueError: If the tool is not supported + """ + if tool == 'cursor': + return cls.from_cursor_payload(payload) + raise ValueError(f'Unsupported IDE/tool: {tool}.') diff --git a/cycode/cli/apps/ai_guardrails/scan/policy.py b/cycode/cli/apps/ai_guardrails/scan/policy.py new file mode 100644 index 00000000..f40d77c0 --- /dev/null +++ b/cycode/cli/apps/ai_guardrails/scan/policy.py @@ -0,0 +1,85 @@ +""" +Policy loading and configuration management for AI guardrails. + +Policies are loaded and merged in order (later overrides earlier): +1. Built-in defaults (consts.DEFAULT_POLICY) +2. User-level config (~/.cycode/ai-guardrails.yaml) +3. Repo-level config (/.cycode/ai-guardrails.yaml) +""" + +import json +from pathlib import Path +from typing import Any, Optional + +import yaml + +from cycode.cli.apps.ai_guardrails.scan.consts import DEFAULT_POLICY, POLICY_FILE_NAME + + +def deep_merge(base: dict, override: dict) -> dict: + """Deep merge two dictionaries, with override taking precedence.""" + result = base.copy() + for key, value in override.items(): + if key in result and isinstance(result[key], dict) and isinstance(value, dict): + result[key] = deep_merge(result[key], value) + else: + result[key] = value + return result + + +def load_yaml_file(path: Path) -> Optional[dict]: + """Load a YAML or JSON config file.""" + if not path.exists(): + return None + try: + content = path.read_text(encoding='utf-8') + if path.suffix in ('.yaml', '.yml'): + return yaml.safe_load(content) + return json.loads(content) + except Exception: + return None + + +def load_defaults() -> dict: + """Load built-in defaults.""" + return DEFAULT_POLICY.copy() + + +def get_policy_value(policy: dict, *keys: str, default: Any = None) -> Any: + """Get a nested value from the policy dict.""" + current = policy + for key in keys: + if not isinstance(current, dict): + return default + current = current.get(key) + if current is None: + return default + return current + + +def load_policy(workspace_root: Optional[str] = None) -> dict: + """ + Load policy by merging configs in order of precedence. + + Merge order: defaults <- user config <- repo config + + Args: + workspace_root: Workspace root path for repo-level config lookup. + """ + # Start with defaults + policy = load_defaults() + + # Merge user-level config (if exists) + user_policy_path = Path.home() / '.cycode' / POLICY_FILE_NAME + user_config = load_yaml_file(user_policy_path) + if user_config: + policy = deep_merge(policy, user_config) + + # Merge repo-level config (if exists) - highest precedence + if workspace_root: + repo_policy_path = Path(workspace_root) / '.cycode' / POLICY_FILE_NAME + repo_config = load_yaml_file(repo_policy_path) + if repo_config: + policy = deep_merge(policy, repo_config) + + return policy diff --git a/cycode/cli/apps/ai_guardrails/scan/response_builders.py b/cycode/cli/apps/ai_guardrails/scan/response_builders.py new file mode 100644 index 00000000..867965c3 --- /dev/null +++ b/cycode/cli/apps/ai_guardrails/scan/response_builders.py @@ -0,0 +1,86 @@ +""" +Response builders for different AI IDE hooks. + +Each IDE has its own response format for hooks. This module provides +an abstract interface and concrete implementations for each supported IDE. +""" + +from abc import ABC, abstractmethod + + +class IDEResponseBuilder(ABC): + """Abstract base class for IDE-specific response builders.""" + + @abstractmethod + def allow_permission(self) -> dict: + """Build response to allow file read or MCP execution.""" + + @abstractmethod + def deny_permission(self, user_message: str, agent_message: str) -> dict: + """Build response to deny file read or MCP execution.""" + + @abstractmethod + def ask_permission(self, user_message: str, agent_message: str) -> dict: + """Build response to ask user for permission (warn mode).""" + + @abstractmethod + def allow_prompt(self) -> dict: + """Build response to allow prompt submission.""" + + @abstractmethod + def deny_prompt(self, user_message: str) -> dict: + """Build response to deny prompt submission.""" + + +class CursorResponseBuilder(IDEResponseBuilder): + """Response builder for Cursor IDE hooks. + + Cursor hook response formats: + - beforeSubmitPrompt: {"continue": bool, "user_message": str} + - beforeReadFile: {"permission": str, "user_message": str, "agent_message": str} + - beforeMCPExecution: {"permission": str, "user_message": str, "agent_message": str} + """ + + def allow_permission(self) -> dict: + """Allow file read or MCP execution.""" + return {'permission': 'allow'} + + def deny_permission(self, user_message: str, agent_message: str) -> dict: + """Deny file read or MCP execution.""" + return {'permission': 'deny', 'user_message': user_message, 'agent_message': agent_message} + + def ask_permission(self, user_message: str, agent_message: str) -> dict: + """Ask user for permission (warn mode).""" + return {'permission': 'ask', 'user_message': user_message, 'agent_message': agent_message} + + def allow_prompt(self) -> dict: + """Allow prompt submission.""" + return {'continue': True} + + def deny_prompt(self, user_message: str) -> dict: + """Deny prompt submission.""" + return {'continue': False, 'user_message': user_message} + + +# Registry of response builders by IDE name +_RESPONSE_BUILDERS: dict[str, IDEResponseBuilder] = { + 'cursor': CursorResponseBuilder(), +} + + +def get_response_builder(ide: str = 'cursor') -> IDEResponseBuilder: + """Get the response builder for a specific IDE. + + Args: + ide: The IDE name (e.g., 'cursor', 'claude-code') + + Returns: + IDEResponseBuilder instance for the specified IDE + + Raises: + ValueError: If the IDE is not supported + """ + builder = _RESPONSE_BUILDERS.get(ide.lower()) + if not builder: + raise ValueError(f'Unsupported IDE: {ide}. Supported IDEs: {list(_RESPONSE_BUILDERS.keys())}') + return builder diff --git a/cycode/cli/apps/ai_guardrails/scan/scan_command.py b/cycode/cli/apps/ai_guardrails/scan/scan_command.py new file mode 100644 index 00000000..e08bb4de --- /dev/null +++ b/cycode/cli/apps/ai_guardrails/scan/scan_command.py @@ -0,0 +1,134 @@ +""" +Scan command for AI guardrails. + +This command handles AI IDE hooks by reading JSON from stdin and outputting +a JSON response to stdout. It scans prompts, file reads, and MCP tool calls +for secrets before they are sent to AI models. + +Supports multiple IDEs with different hook event types. The specific hook events +supported depend on the IDE being used (e.g., Cursor supports beforeSubmitPrompt, +beforeReadFile, beforeMCPExecution). +""" + +import sys +from typing import Annotated + +import click +import typer + +from cycode.cli.apps.ai_guardrails.scan.handlers import get_handler_for_event +from cycode.cli.apps.ai_guardrails.scan.payload import AIHookPayload +from cycode.cli.apps.ai_guardrails.scan.policy import load_policy +from cycode.cli.apps.ai_guardrails.scan.response_builders import get_response_builder +from cycode.cli.apps.ai_guardrails.scan.types import AiHookEventType +from cycode.cli.apps.ai_guardrails.scan.utils import output_json, safe_json_parse +from cycode.cli.exceptions.custom_exceptions import HttpUnauthorizedError +from cycode.cli.utils.get_api_client import get_ai_security_manager_client, get_scan_cycode_client +from cycode.cli.utils.sentry import add_breadcrumb +from cycode.logger import get_logger + +logger = get_logger('AI Guardrails') + + +def _get_auth_error_message(error: Exception) -> str: + """Get user-friendly message for authentication errors.""" + if isinstance(error, click.ClickException): + # Missing credentials + return f'{error.message} Please run `cycode configure` to set up your credentials.' + + if isinstance(error, HttpUnauthorizedError): + # Invalid/expired credentials + return ( + 'Unable to authenticate to Cycode. Your credentials are invalid or have expired. ' + 'Please run `cycode configure` to update your credentials.' + ) + + # Fallback + return 'Authentication failed. Please run `cycode configure` to set up your credentials.' + + +def _initialize_clients(ctx: typer.Context) -> None: + """Initialize API clients. + + May raise click.ClickException if credentials are missing, + or HttpUnauthorizedError if credentials are invalid. + """ + scan_client = get_scan_cycode_client(ctx) + ctx.obj['client'] = scan_client + + ai_security_client = get_ai_security_manager_client(ctx) + ctx.obj['ai_security_client'] = ai_security_client + + +def scan_command( + ctx: typer.Context, + ide: Annotated[ + str, + typer.Option( + '--ide', + help='IDE that sent the payload (e.g., "cursor"). Defaults to cursor.', + hidden=True, + ), + ] = 'cursor', +) -> None: + """Scan content from AI IDE hooks for secrets. + + This command reads a JSON payload from stdin containing hook event data + and outputs a JSON response to stdout indicating whether to allow or block the action. + + The hook event type is determined from the event field in the payload (field name + varies by IDE). Each IDE may support different hook events for scanning prompts, + file access, and tool executions. + + Example usage (from IDE hooks configuration): + { "command": "cycode ai-guardrails scan" } + """ + add_breadcrumb('ai-guardrails-scan') + + stdin_data = sys.stdin.read().strip() + payload = safe_json_parse(stdin_data) + + tool = ide.lower() + response_builder = get_response_builder(tool) + + if not payload: + logger.debug('Empty or invalid JSON payload received') + output_json(response_builder.allow_prompt()) + return + + unified_payload = AIHookPayload.from_payload(payload, tool=tool) + event_name = unified_payload.event_name + logger.debug('Processing AI guardrails hook', extra={'event_name': event_name, 'tool': tool}) + + workspace_roots = payload.get('workspace_roots', ['.']) + policy = load_policy(workspace_roots[0]) + + try: + _initialize_clients(ctx) + + handler = get_handler_for_event(event_name) + if handler is None: + logger.debug('Unknown hook event, allowing by default', extra={'event_name': event_name}) + output_json(response_builder.allow_prompt()) + return + + response = handler(ctx, unified_payload, policy) + logger.debug('Hook handler completed', extra={'event_name': event_name, 'response': response}) + output_json(response) + + except (click.ClickException, HttpUnauthorizedError) as e: + error_message = _get_auth_error_message(e) + if event_name == AiHookEventType.PROMPT: + output_json(response_builder.deny_prompt(error_message)) + return + output_json(response_builder.deny_permission(error_message, 'Authentication required')) + + except Exception as e: + logger.error('Hook handler failed', exc_info=e) + if policy.get('fail_open', True): + output_json(response_builder.allow_prompt()) + return + if event_name == AiHookEventType.PROMPT: + output_json(response_builder.deny_prompt('Cycode guardrails error - blocking due to fail-closed policy')) + return + output_json(response_builder.deny_permission('Cycode guardrails error', 'Blocking due to fail-closed policy')) diff --git a/cycode/cli/apps/ai_guardrails/scan/types.py b/cycode/cli/apps/ai_guardrails/scan/types.py new file mode 100644 index 00000000..095ca61b --- /dev/null +++ b/cycode/cli/apps/ai_guardrails/scan/types.py @@ -0,0 +1,54 @@ +"""Type definitions for AI guardrails.""" + +import sys + +if sys.version_info >= (3, 11): + from enum import StrEnum +else: + from enum import Enum + + class StrEnum(str, Enum): + def __str__(self) -> str: + return self.value + + +class AiHookEventType(StrEnum): + """Canonical event types for AI guardrails. + + These are IDE-agnostic event types. Each IDE's specific event names + are mapped to these canonical types using the mapping dictionaries below. + """ + + PROMPT = 'Prompt' + FILE_READ = 'FileRead' + MCP_EXECUTION = 'McpExecution' + + +# IDE-specific event name mappings to canonical types +CURSOR_EVENT_MAPPING = { + 'beforeSubmitPrompt': AiHookEventType.PROMPT, + 'beforeReadFile': AiHookEventType.FILE_READ, + 'beforeMCPExecution': AiHookEventType.MCP_EXECUTION, +} + + +class AIHookOutcome(StrEnum): + """Outcome of an AI hook event evaluation.""" + + ALLOWED = 'allowed' + BLOCKED = 'blocked' + WARNED = 'warned' + + +class BlockReason(StrEnum): + """Reason why an AI hook event was blocked. + + These are categorical reasons sent to the backend for tracking/analytics, + separate from the detailed user-facing messages. + """ + + SECRETS_IN_PROMPT = 'secrets_in_prompt' + SECRETS_IN_FILE = 'secrets_in_file' + SECRETS_IN_MCP_ARGS = 'secrets_in_mcp_args' + SENSITIVE_PATH = 'sensitive_path' + SCAN_FAILURE = 'scan_failure' diff --git a/cycode/cli/apps/ai_guardrails/scan/utils.py b/cycode/cli/apps/ai_guardrails/scan/utils.py new file mode 100644 index 00000000..e14c1c02 --- /dev/null +++ b/cycode/cli/apps/ai_guardrails/scan/utils.py @@ -0,0 +1,72 @@ +""" +Utility functions for AI guardrails. + +Includes JSON parsing, path matching, and text handling utilities. +""" + +import json +import os +from pathlib import Path + +from cycode.cli.apps.ai_guardrails.scan.policy import get_policy_value + + +def safe_json_parse(s: str) -> dict: + """Parse JSON string, returning empty dict on failure.""" + try: + return json.loads(s) if s else {} + except (json.JSONDecodeError, TypeError): + return {} + + +def truncate_utf8(text: str, max_bytes: int) -> str: + """Truncate text to max bytes while preserving valid UTF-8.""" + if not text: + return '' + encoded = text.encode('utf-8') + if len(encoded) <= max_bytes: + return text + return encoded[:max_bytes].decode('utf-8', errors='ignore') + + +def normalize_path(file_path: str) -> str: + """Normalize path to prevent traversal attacks.""" + if not file_path: + return '' + normalized = os.path.normpath(file_path) + # Reject paths that attempt to escape outside bounds + if normalized.startswith('..'): + return '' + return normalized + + +def matches_glob(file_path: str, pattern: str) -> bool: + """Check if file path matches a glob pattern. + + Case-insensitive matching for cross-platform compatibility. + """ + normalized = normalize_path(file_path) + if not normalized or not pattern: + return False + + path = Path(normalized) + # Try case-sensitive first + if path.match(pattern): + return True + + # Then try case-insensitive by lowercasing both path and pattern + path_lower = Path(normalized.lower()) + return path_lower.match(pattern.lower()) + + +def is_denied_path(file_path: str, policy: dict) -> bool: + """Check if file path is in the denylist.""" + if not file_path: + return False + globs = get_policy_value(policy, 'file_read', 'deny_globs', default=[]) + return any(matches_glob(file_path, g) for g in globs) + + +def output_json(obj: dict) -> None: + """Write JSON response to stdout (for IDE to read).""" + print(json.dumps(obj), end='') # noqa: T201 diff --git a/cycode/cli/apps/ai_guardrails/status_command.py b/cycode/cli/apps/ai_guardrails/status_command.py new file mode 100644 index 00000000..0a9801b5 --- /dev/null +++ b/cycode/cli/apps/ai_guardrails/status_command.py @@ -0,0 +1,92 @@ +"""Status command for AI guardrails hooks.""" + +import os +from pathlib import Path +from typing import Annotated, Optional + +import typer +from rich.table import Table + +from cycode.cli.apps.ai_guardrails.command_utils import console, validate_and_parse_ide, validate_scope +from cycode.cli.apps.ai_guardrails.hooks_manager import get_hooks_status +from cycode.cli.utils.sentry import add_breadcrumb + + +def status_command( + ctx: typer.Context, + scope: Annotated[ + str, + typer.Option( + '--scope', + '-s', + help='Check scope: "user", "repo", or "all" for both.', + ), + ] = 'all', + ide: Annotated[ + str, + typer.Option( + '--ide', + help='IDE to check status for (e.g., "cursor"). Defaults to cursor.', + ), + ] = 'cursor', + repo_path: Annotated[ + Optional[Path], + typer.Option( + '--repo-path', + help='Repository path for repo-scoped status (defaults to current directory).', + exists=True, + file_okay=False, + dir_okay=True, + resolve_path=True, + ), + ] = None, +) -> None: + """Show AI guardrails hook installation status. + + Displays the current status of Cycode AI guardrails hooks for the specified IDE. + + Examples: + cycode ai-guardrails status # Show both user and repo status + cycode ai-guardrails status --scope user # Show only user-level status + cycode ai-guardrails status --scope repo # Show only repo-level status + cycode ai-guardrails status --ide cursor # Check status for Cursor IDE + """ + add_breadcrumb('ai-guardrails-status') + + # Validate inputs (status allows 'all' scope) + validate_scope(scope, allowed_scopes=('user', 'repo', 'all')) + if repo_path is None: + repo_path = Path(os.getcwd()) + ide_type = validate_and_parse_ide(ide) + + scopes_to_check = ['user', 'repo'] if scope == 'all' else [scope] + + for check_scope in scopes_to_check: + status = get_hooks_status(check_scope, repo_path if check_scope == 'repo' else None, ide=ide_type) + + console.print() + console.print(f'[bold]{check_scope.upper()} SCOPE[/]') + console.print(f'Path: {status["hooks_path"]}') + + if not status['file_exists']: + console.print('[dim]No hooks.json file found[/]') + continue + + if status['cycode_installed']: + console.print('[green]✓ Cycode AI guardrails: INSTALLED[/]') + else: + console.print('[yellow]○ Cycode AI guardrails: NOT INSTALLED[/]') + + # Show hook details + table = Table(show_header=True, header_style='bold') + table.add_column('Hook Event') + table.add_column('Cycode Enabled') + table.add_column('Total Hooks') + + for event, info in status['hooks'].items(): + enabled = '[green]Yes[/]' if info['enabled'] else '[dim]No[/]' + table.add_row(event, enabled, str(info['total_entries'])) + + console.print(table) + + console.print() diff --git a/cycode/cli/apps/ai_guardrails/uninstall_command.py b/cycode/cli/apps/ai_guardrails/uninstall_command.py new file mode 100644 index 00000000..23315693 --- /dev/null +++ b/cycode/cli/apps/ai_guardrails/uninstall_command.py @@ -0,0 +1,73 @@ +"""Uninstall command for AI guardrails hooks.""" + +from pathlib import Path +from typing import Annotated, Optional + +import typer + +from cycode.cli.apps.ai_guardrails.command_utils import ( + console, + resolve_repo_path, + validate_and_parse_ide, + validate_scope, +) +from cycode.cli.apps.ai_guardrails.consts import IDE_CONFIGS +from cycode.cli.apps.ai_guardrails.hooks_manager import uninstall_hooks +from cycode.cli.utils.sentry import add_breadcrumb + + +def uninstall_command( + ctx: typer.Context, + scope: Annotated[ + str, + typer.Option( + '--scope', + '-s', + help='Uninstall scope: "user" for user-level hooks, "repo" for repository-level hooks.', + ), + ] = 'user', + ide: Annotated[ + str, + typer.Option( + '--ide', + help='IDE to uninstall hooks from (e.g., "cursor"). Defaults to cursor.', + ), + ] = 'cursor', + repo_path: Annotated[ + Optional[Path], + typer.Option( + '--repo-path', + help='Repository path for repo-scoped uninstallation (defaults to current directory).', + exists=True, + file_okay=False, + dir_okay=True, + resolve_path=True, + ), + ] = None, +) -> None: + """Remove AI guardrails hooks from supported IDEs. + + This command removes Cycode hooks from the IDE's hooks configuration. + Other hooks (if any) will be preserved. + + Examples: + cycode ai-guardrails uninstall # Remove user-level hooks + cycode ai-guardrails uninstall --scope repo # Remove repo-level hooks + cycode ai-guardrails uninstall --ide cursor # Uninstall from Cursor IDE + """ + add_breadcrumb('ai-guardrails-uninstall') + + # Validate inputs + validate_scope(scope) + repo_path = resolve_repo_path(scope, repo_path) + ide_type = validate_and_parse_ide(ide) + ide_name = IDE_CONFIGS[ide_type].name + success, message = uninstall_hooks(scope, repo_path, ide=ide_type) + + if success: + console.print(f'[green]✓[/] {message}') + console.print() + console.print(f'[dim]Restart {ide_name} for changes to take effect.[/]') + else: + console.print(f'[red]✗[/] {message}', style='bold red') + raise typer.Exit(1) diff --git a/cycode/cli/apps/scan/code_scanner.py b/cycode/cli/apps/scan/code_scanner.py index d3e325f3..3ffefd0f 100644 --- a/cycode/cli/apps/scan/code_scanner.py +++ b/cycode/cli/apps/scan/code_scanner.py @@ -91,7 +91,7 @@ def _should_use_sync_flow(command_scan_type: str, scan_type: str, sync_option: b if not sync_option and scan_type != consts.IAC_SCAN_TYPE: return False - if command_scan_type not in {'path', 'repository'}: + if command_scan_type not in {'path', 'repository', 'ai_guardrails'}: return False if scan_type == consts.IAC_SCAN_TYPE: diff --git a/cycode/cli/cli_types.py b/cycode/cli/cli_types.py index 63a1cb36..bd88faea 100644 --- a/cycode/cli/cli_types.py +++ b/cycode/cli/cli_types.py @@ -86,6 +86,10 @@ def get_member_color(name: str) -> str: def get_member_emoji(name: str) -> str: return _SEVERITY_EMOJIS.get(name.lower(), _SEVERITY_DEFAULT_EMOJI) + @staticmethod + def get_member_unicode_emoji(name: str) -> str: + return _SEVERITY_UNICODE_EMOJIS.get(name.lower(), _SEVERITY_DEFAULT_UNICODE_EMOJI) + def __rich__(self) -> str: color = self.get_member_color(self.value) return f'[{color}]{self.value.upper()}[/]' @@ -117,3 +121,12 @@ def __rich__(self) -> str: SeverityOption.HIGH.value: ':red_circle:', SeverityOption.CRITICAL.value: ':exclamation_mark:', # double_exclamation_mark is not red } + +_SEVERITY_DEFAULT_UNICODE_EMOJI = '⚪' +_SEVERITY_UNICODE_EMOJIS = { + SeverityOption.INFO.value: '🔵', + SeverityOption.LOW.value: '🟡', + SeverityOption.MEDIUM.value: '🟠', + SeverityOption.HIGH.value: '🔴', + SeverityOption.CRITICAL.value: '❗', +} diff --git a/cycode/cli/utils/get_api_client.py b/cycode/cli/utils/get_api_client.py index 5c712288..b69666d3 100644 --- a/cycode/cli/utils/get_api_client.py +++ b/cycode/cli/utils/get_api_client.py @@ -3,11 +3,17 @@ import click from cycode.cli.user_settings.credentials_manager import CredentialsManager -from cycode.cyclient.client_creator import create_import_sbom_client, create_report_client, create_scan_client +from cycode.cyclient.client_creator import ( + create_ai_security_manager_client, + create_import_sbom_client, + create_report_client, + create_scan_client, +) if TYPE_CHECKING: import typer + from cycode.cyclient.ai_security_manager_client import AISecurityManagerClient from cycode.cyclient.import_sbom_client import ImportSbomClient from cycode.cyclient.report_client import ReportClient from cycode.cyclient.scan_client import ScanClient @@ -19,7 +25,7 @@ def _get_cycode_client( client_secret: Optional[str], hide_response_log: bool, id_token: Optional[str] = None, -) -> Union['ScanClient', 'ReportClient']: +) -> Union['ScanClient', 'ReportClient', 'ImportSbomClient', 'AISecurityManagerClient']: if client_id and id_token: return create_client_func(client_id, None, hide_response_log, id_token) @@ -62,6 +68,13 @@ def get_import_sbom_cycode_client(ctx: 'typer.Context', hide_response_log: bool return _get_cycode_client(create_import_sbom_client, client_id, client_secret, hide_response_log, id_token) +def get_ai_security_manager_client(ctx: 'typer.Context', hide_response_log: bool = True) -> 'AISecurityManagerClient': + client_id = ctx.obj.get('client_id') + client_secret = ctx.obj.get('client_secret') + id_token = ctx.obj.get('id_token') + return _get_cycode_client(create_ai_security_manager_client, client_id, client_secret, hide_response_log, id_token) + + def _get_configured_credentials() -> tuple[str, str]: credentials_manager = CredentialsManager() return credentials_manager.get_credentials() diff --git a/cycode/cli/utils/scan_utils.py b/cycode/cli/utils/scan_utils.py index 1332a7cf..be86716b 100644 --- a/cycode/cli/utils/scan_utils.py +++ b/cycode/cli/utils/scan_utils.py @@ -1,9 +1,12 @@ import os +from collections import defaultdict from typing import TYPE_CHECKING, Optional from uuid import UUID, uuid4 import typer +from cycode.cli.cli_types import SeverityOption + if TYPE_CHECKING: from cycode.cli.models import LocalScanResult from cycode.cyclient.models import ScanConfiguration @@ -33,3 +36,24 @@ def generate_unique_scan_id() -> UUID: return UUID(os.environ['PYTEST_TEST_UNIQUE_ID']) return uuid4() + + +def build_violation_summary(local_scan_results: list['LocalScanResult']) -> str: + """Build violation summary string with severity breakdown and emojis.""" + detections_count = 0 + severity_counts = defaultdict(int) + + for local_scan_result in local_scan_results: + for document_detections in local_scan_result.document_detections: + for detection in document_detections.detections: + if detection.severity: + detections_count += 1 + severity_counts[SeverityOption(detection.severity)] += 1 + + severity_parts = [] + for severity in reversed(SeverityOption): + emoji = SeverityOption.get_member_unicode_emoji(severity) + count = severity_counts[severity] + severity_parts.append(f'{emoji} {severity.upper()} - {count}') + + return f'Cycode found {detections_count} violations: {" | ".join(severity_parts)}' diff --git a/cycode/cyclient/ai_security_manager_client.py b/cycode/cyclient/ai_security_manager_client.py new file mode 100644 index 00000000..627e2b33 --- /dev/null +++ b/cycode/cyclient/ai_security_manager_client.py @@ -0,0 +1,86 @@ +"""Client for AI Security Manager service.""" + +from typing import TYPE_CHECKING, Optional + +from cycode.cli.exceptions.custom_exceptions import HttpUnauthorizedError +from cycode.cyclient.cycode_client_base import CycodeClientBase +from cycode.cyclient.logger import logger + +if TYPE_CHECKING: + from cycode.cli.apps.ai_guardrails.scan.payload import AIHookPayload + from cycode.cli.apps.ai_guardrails.scan.types import AiHookEventType, AIHookOutcome, BlockReason + from cycode.cyclient.ai_security_manager_service_config import AISecurityManagerServiceConfigBase + + +class AISecurityManagerClient: + """Client for interacting with AI Security Manager service.""" + + _CONVERSATIONS_PATH = 'v4/ai-security/interactions/conversations' + _EVENTS_PATH = 'v4/ai-security/interactions/events' + + def __init__(self, client: CycodeClientBase, service_config: 'AISecurityManagerServiceConfigBase') -> None: + self.client = client + self.service_config = service_config + + def _build_endpoint_path(self, path: str) -> str: + """Build the full endpoint path including service name/port.""" + service_name = self.service_config.get_service_name() + if service_name: + return f'{service_name}/{path}' + return path + + def create_conversation(self, payload: 'AIHookPayload') -> Optional[str]: + """Creates an AI conversation from hook payload.""" + conversation_id = payload.conversation_id + if not conversation_id: + return None + + body = { + 'id': conversation_id, + 'ide_user_email': payload.ide_user_email, + 'model': payload.model, + 'ide_provider': payload.ide_provider, + 'ide_version': payload.ide_version, + } + + try: + self.client.post(self._build_endpoint_path(self._CONVERSATIONS_PATH), body=body) + except HttpUnauthorizedError: + # Authentication error - re-raise so prompt_command can catch it + raise + except Exception as e: + logger.debug('Failed to create conversation', exc_info=e) + # Don't fail the hook if tracking fails (non-auth errors) + + return conversation_id + + def create_event( + self, + payload: 'AIHookPayload', + event_type: 'AiHookEventType', + outcome: 'AIHookOutcome', + scan_id: Optional[str] = None, + block_reason: Optional['BlockReason'] = None, + ) -> None: + """Create an AI hook event from hook payload.""" + conversation_id = payload.conversation_id + if not conversation_id: + logger.debug('No conversation ID available, skipping event creation') + return + + body = { + 'conversation_id': conversation_id, + 'event_type': event_type, + 'outcome': outcome, + 'generation_id': payload.generation_id, + 'block_reason': block_reason, + 'cli_scan_id': scan_id, + 'mcp_server_name': payload.mcp_server_name, + 'mcp_tool_name': payload.mcp_tool_name, + } + + try: + self.client.post(self._build_endpoint_path(self._EVENTS_PATH), body=body) + except Exception as e: + logger.debug('Failed to create AI hook event', exc_info=e) + # Don't fail the hook if tracking fails diff --git a/cycode/cyclient/ai_security_manager_service_config.py b/cycode/cyclient/ai_security_manager_service_config.py new file mode 100644 index 00000000..60d7f2dd --- /dev/null +++ b/cycode/cyclient/ai_security_manager_service_config.py @@ -0,0 +1,27 @@ +"""Service configuration for AI Security Manager.""" + + +class AISecurityManagerServiceConfigBase: + """Base class for AI Security Manager service configuration.""" + + def get_service_name(self) -> str: + """Get the service name or port for URL construction. + + In dev mode, returns the port number. + In production, returns the service name. + """ + raise NotImplementedError + + +class DevAISecurityManagerServiceConfig(AISecurityManagerServiceConfigBase): + """Dev configuration for AI Security Manager.""" + + def get_service_name(self) -> str: + return '5163/api' + + +class DefaultAISecurityManagerServiceConfig(AISecurityManagerServiceConfigBase): + """Production configuration for AI Security Manager.""" + + def get_service_name(self) -> str: + return '' diff --git a/cycode/cyclient/client_creator.py b/cycode/cyclient/client_creator.py index 01ab6b59..c26795c7 100644 --- a/cycode/cyclient/client_creator.py +++ b/cycode/cyclient/client_creator.py @@ -1,5 +1,10 @@ from typing import Optional +from cycode.cyclient.ai_security_manager_client import AISecurityManagerClient +from cycode.cyclient.ai_security_manager_service_config import ( + DefaultAISecurityManagerServiceConfig, + DevAISecurityManagerServiceConfig, +) from cycode.cyclient.config import dev_mode from cycode.cyclient.config_dev import DEV_CYCODE_API_URL from cycode.cyclient.cycode_dev_based_client import CycodeDevBasedClient @@ -49,3 +54,18 @@ def create_import_sbom_client( else: client = CycodeTokenBasedClient(client_id, client_secret) return ImportSbomClient(client) + + +def create_ai_security_manager_client( + client_id: str, client_secret: Optional[str] = None, _: bool = False, id_token: Optional[str] = None +) -> AISecurityManagerClient: + if dev_mode: + client = CycodeDevBasedClient(DEV_CYCODE_API_URL) + service_config = DevAISecurityManagerServiceConfig() + else: + if id_token: + client = CycodeOidcBasedClient(client_id, id_token) + else: + client = CycodeTokenBasedClient(client_id, client_secret) + service_config = DefaultAISecurityManagerServiceConfig() + return AISecurityManagerClient(client, service_config) diff --git a/tests/cli/commands/ai_guardrails/__init__.py b/tests/cli/commands/ai_guardrails/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/cli/commands/ai_guardrails/scan/__init__.py b/tests/cli/commands/ai_guardrails/scan/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/cli/commands/ai_guardrails/scan/test_handlers.py b/tests/cli/commands/ai_guardrails/scan/test_handlers.py new file mode 100644 index 00000000..58dfe195 --- /dev/null +++ b/tests/cli/commands/ai_guardrails/scan/test_handlers.py @@ -0,0 +1,361 @@ +"""Tests for AI guardrails handlers.""" + +from typing import Any +from unittest.mock import MagicMock, patch + +import pytest +import typer + +from cycode.cli.apps.ai_guardrails.scan.handlers import ( + handle_before_mcp_execution, + handle_before_read_file, + handle_before_submit_prompt, +) +from cycode.cli.apps.ai_guardrails.scan.payload import AIHookPayload +from cycode.cli.apps.ai_guardrails.scan.types import AIHookOutcome, BlockReason + + +@pytest.fixture +def mock_ctx() -> MagicMock: + """Create a mock Typer context.""" + ctx = MagicMock(spec=typer.Context) + ctx.obj = { + 'ai_security_client': MagicMock(), + 'scan_type': 'secret', + } + return ctx + + +@pytest.fixture +def mock_payload() -> AIHookPayload: + """Create a mock AIHookPayload.""" + return AIHookPayload( + event_name='prompt', + conversation_id='test-conv-id', + generation_id='test-gen-id', + ide_user_email='test@example.com', + model='gpt-4', + ide_provider='cursor', + ide_version='1.0.0', + prompt='Test prompt', + ) + + +@pytest.fixture +def default_policy() -> dict[str, Any]: + """Create a default policy dict.""" + return { + 'mode': 'block', + 'fail_open': True, + 'secrets': {'max_bytes': 200000}, + 'prompt': {'enabled': True, 'action': 'block'}, + 'file_read': {'enabled': True, 'action': 'block', 'scan_content': True, 'deny_globs': []}, + 'mcp': {'enabled': True, 'action': 'block', 'scan_arguments': True}, + } + + +# Tests for handle_before_submit_prompt + + +def test_handle_before_submit_prompt_disabled( + mock_ctx: MagicMock, mock_payload: AIHookPayload, default_policy: dict[str, Any] +) -> None: + """Test that disabled prompt scanning allows the prompt.""" + default_policy['prompt']['enabled'] = False + + result = handle_before_submit_prompt(mock_ctx, mock_payload, default_policy) + + assert result == {'continue': True} + mock_ctx.obj['ai_security_client'].create_event.assert_called_once() + + +@patch('cycode.cli.apps.ai_guardrails.scan.handlers._scan_text_for_secrets') +def test_handle_before_submit_prompt_no_secrets( + mock_scan: MagicMock, mock_ctx: MagicMock, mock_payload: AIHookPayload, default_policy: dict[str, Any] +) -> None: + """Test that prompt with no secrets is allowed.""" + mock_scan.return_value = (None, 'scan-id-123') + + result = handle_before_submit_prompt(mock_ctx, mock_payload, default_policy) + + assert result == {'continue': True} + mock_ctx.obj['ai_security_client'].create_event.assert_called_once() + call_args = mock_ctx.obj['ai_security_client'].create_event.call_args + # outcome is arg[2], scan_id and block_reason are kwargs + assert call_args.args[2] == AIHookOutcome.ALLOWED + assert call_args.kwargs['scan_id'] == 'scan-id-123' + assert call_args.kwargs['block_reason'] is None + + +@patch('cycode.cli.apps.ai_guardrails.scan.handlers._scan_text_for_secrets') +def test_handle_before_submit_prompt_with_secrets_blocked( + mock_scan: MagicMock, mock_ctx: MagicMock, mock_payload: AIHookPayload, default_policy: dict[str, Any] +) -> None: + """Test that prompt with secrets is blocked.""" + mock_scan.return_value = ('Found 1 secret: API key', 'scan-id-456') + + result = handle_before_submit_prompt(mock_ctx, mock_payload, default_policy) + + assert result['continue'] is False + assert 'Found 1 secret: API key' in result['user_message'] + mock_ctx.obj['ai_security_client'].create_event.assert_called_once() + call_args = mock_ctx.obj['ai_security_client'].create_event.call_args + assert call_args.args[2] == AIHookOutcome.BLOCKED + assert call_args.kwargs['block_reason'] == BlockReason.SECRETS_IN_PROMPT + + +@patch('cycode.cli.apps.ai_guardrails.scan.handlers._scan_text_for_secrets') +def test_handle_before_submit_prompt_with_secrets_warned( + mock_scan: MagicMock, mock_ctx: MagicMock, mock_payload: AIHookPayload, default_policy: dict[str, Any] +) -> None: + """Test that prompt with secrets in warn mode is allowed.""" + default_policy['prompt']['action'] = 'warn' + mock_scan.return_value = ('Found 1 secret: API key', 'scan-id-789') + + result = handle_before_submit_prompt(mock_ctx, mock_payload, default_policy) + + assert result == {'continue': True} + mock_ctx.obj['ai_security_client'].create_event.assert_called_once() + call_args = mock_ctx.obj['ai_security_client'].create_event.call_args + assert call_args.args[2] == AIHookOutcome.WARNED + + +@patch('cycode.cli.apps.ai_guardrails.scan.handlers._scan_text_for_secrets') +def test_handle_before_submit_prompt_scan_failure_fail_open( + mock_scan: MagicMock, mock_ctx: MagicMock, mock_payload: AIHookPayload, default_policy: dict[str, Any] +) -> None: + """Test that scan failure with fail_open=True allows the prompt.""" + mock_scan.side_effect = RuntimeError('Scan failed') + default_policy['fail_open'] = True + + with pytest.raises(RuntimeError): + handle_before_submit_prompt(mock_ctx, mock_payload, default_policy) + + # Event should be tracked even on exception + mock_ctx.obj['ai_security_client'].create_event.assert_called_once() + call_args = mock_ctx.obj['ai_security_client'].create_event.call_args + assert call_args.args[2] == AIHookOutcome.ALLOWED + # When fail_open=True, no block_reason since action is allowed + assert call_args.kwargs['block_reason'] is None + + +@patch('cycode.cli.apps.ai_guardrails.scan.handlers._scan_text_for_secrets') +def test_handle_before_submit_prompt_scan_failure_fail_closed( + mock_scan: MagicMock, mock_ctx: MagicMock, mock_payload: AIHookPayload, default_policy: dict[str, Any] +) -> None: + """Test that scan failure with fail_open=False blocks the prompt.""" + mock_scan.side_effect = RuntimeError('Scan failed') + default_policy['fail_open'] = False + + with pytest.raises(RuntimeError): + handle_before_submit_prompt(mock_ctx, mock_payload, default_policy) + + # Event should be tracked even on exception + mock_ctx.obj['ai_security_client'].create_event.assert_called_once() + call_args = mock_ctx.obj['ai_security_client'].create_event.call_args + assert call_args.args[2] == AIHookOutcome.BLOCKED + assert call_args.kwargs['block_reason'] == BlockReason.SCAN_FAILURE + + +# Tests for handle_before_read_file + + +def test_handle_before_read_file_disabled(mock_ctx: MagicMock, default_policy: dict[str, Any]) -> None: + """Test that disabled file read scanning allows the file.""" + default_policy['file_read']['enabled'] = False + payload = AIHookPayload( + event_name='file_read', + ide_provider='cursor', + file_path='/path/to/file.txt', + ) + + result = handle_before_read_file(mock_ctx, payload, default_policy) + + assert result == {'permission': 'allow'} + + +@patch('cycode.cli.apps.ai_guardrails.scan.handlers.is_denied_path') +def test_handle_before_read_file_sensitive_path( + mock_is_denied: MagicMock, mock_ctx: MagicMock, default_policy: dict[str, Any] +) -> None: + """Test that sensitive path is blocked.""" + mock_is_denied.return_value = True + payload = AIHookPayload( + event_name='file_read', + ide_provider='cursor', + file_path='/path/to/.env', + ) + + result = handle_before_read_file(mock_ctx, payload, default_policy) + + assert result['permission'] == 'deny' + assert '.env' in result['user_message'] + mock_ctx.obj['ai_security_client'].create_event.assert_called_once() + call_args = mock_ctx.obj['ai_security_client'].create_event.call_args + assert call_args.args[2] == AIHookOutcome.BLOCKED + assert call_args.kwargs['block_reason'] == BlockReason.SENSITIVE_PATH + + +@patch('cycode.cli.apps.ai_guardrails.scan.handlers.is_denied_path') +@patch('cycode.cli.apps.ai_guardrails.scan.handlers._scan_path_for_secrets') +def test_handle_before_read_file_no_secrets( + mock_scan: MagicMock, mock_is_denied: MagicMock, mock_ctx: MagicMock, default_policy: dict[str, Any] +) -> None: + """Test that file with no secrets is allowed.""" + mock_is_denied.return_value = False + mock_scan.return_value = (None, 'scan-id-123') + payload = AIHookPayload( + event_name='file_read', + ide_provider='cursor', + file_path='/path/to/file.txt', + ) + + result = handle_before_read_file(mock_ctx, payload, default_policy) + + assert result == {'permission': 'allow'} + call_args = mock_ctx.obj['ai_security_client'].create_event.call_args + assert call_args.args[2] == AIHookOutcome.ALLOWED + + +@patch('cycode.cli.apps.ai_guardrails.scan.handlers.is_denied_path') +@patch('cycode.cli.apps.ai_guardrails.scan.handlers._scan_path_for_secrets') +def test_handle_before_read_file_with_secrets( + mock_scan: MagicMock, mock_is_denied: MagicMock, mock_ctx: MagicMock, default_policy: dict[str, Any] +) -> None: + """Test that file with secrets is blocked.""" + mock_is_denied.return_value = False + mock_scan.return_value = ('Found 1 secret: password', 'scan-id-456') + payload = AIHookPayload( + event_name='file_read', + ide_provider='cursor', + file_path='/path/to/file.txt', + ) + + result = handle_before_read_file(mock_ctx, payload, default_policy) + + assert result['permission'] == 'deny' + assert 'Found 1 secret: password' in result['user_message'] + call_args = mock_ctx.obj['ai_security_client'].create_event.call_args + assert call_args.args[2] == AIHookOutcome.BLOCKED + assert call_args.kwargs['block_reason'] == BlockReason.SECRETS_IN_FILE + + +@patch('cycode.cli.apps.ai_guardrails.scan.handlers.is_denied_path') +@patch('cycode.cli.apps.ai_guardrails.scan.handlers._scan_path_for_secrets') +def test_handle_before_read_file_scan_disabled( + mock_scan: MagicMock, mock_is_denied: MagicMock, mock_ctx: MagicMock, default_policy: dict[str, Any] +) -> None: + """Test that file is allowed when content scanning is disabled.""" + mock_is_denied.return_value = False + default_policy['file_read']['scan_content'] = False + payload = AIHookPayload( + event_name='file_read', + ide_provider='cursor', + file_path='/path/to/file.txt', + ) + + result = handle_before_read_file(mock_ctx, payload, default_policy) + + assert result == {'permission': 'allow'} + mock_scan.assert_not_called() + + +# Tests for handle_before_mcp_execution + + +def test_handle_before_mcp_execution_disabled(mock_ctx: MagicMock, default_policy: dict[str, Any]) -> None: + """Test that disabled MCP scanning allows the execution.""" + default_policy['mcp']['enabled'] = False + payload = AIHookPayload( + event_name='mcp_execution', + ide_provider='cursor', + mcp_tool_name='test_tool', + mcp_arguments={'arg1': 'value1'}, + ) + + result = handle_before_mcp_execution(mock_ctx, payload, default_policy) + + assert result == {'permission': 'allow'} + + +@patch('cycode.cli.apps.ai_guardrails.scan.handlers._scan_text_for_secrets') +def test_handle_before_mcp_execution_no_secrets( + mock_scan: MagicMock, mock_ctx: MagicMock, default_policy: dict[str, Any] +) -> None: + """Test that MCP execution with no secrets is allowed.""" + mock_scan.return_value = (None, 'scan-id-123') + payload = AIHookPayload( + event_name='mcp_execution', + ide_provider='cursor', + mcp_tool_name='test_tool', + mcp_arguments={'arg1': 'value1'}, + ) + + result = handle_before_mcp_execution(mock_ctx, payload, default_policy) + + assert result == {'permission': 'allow'} + call_args = mock_ctx.obj['ai_security_client'].create_event.call_args + assert call_args.args[2] == AIHookOutcome.ALLOWED + + +@patch('cycode.cli.apps.ai_guardrails.scan.handlers._scan_text_for_secrets') +def test_handle_before_mcp_execution_with_secrets_blocked( + mock_scan: MagicMock, mock_ctx: MagicMock, default_policy: dict[str, Any] +) -> None: + """Test that MCP execution with secrets is blocked.""" + mock_scan.return_value = ('Found 1 secret: token', 'scan-id-456') + payload = AIHookPayload( + event_name='mcp_execution', + ide_provider='cursor', + mcp_tool_name='test_tool', + mcp_arguments={'arg1': 'secret_token_12345'}, + ) + + result = handle_before_mcp_execution(mock_ctx, payload, default_policy) + + assert result['permission'] == 'deny' + assert 'Found 1 secret: token' in result['user_message'] + call_args = mock_ctx.obj['ai_security_client'].create_event.call_args + assert call_args.args[2] == AIHookOutcome.BLOCKED + assert call_args.kwargs['block_reason'] == BlockReason.SECRETS_IN_MCP_ARGS + + +@patch('cycode.cli.apps.ai_guardrails.scan.handlers._scan_text_for_secrets') +def test_handle_before_mcp_execution_with_secrets_warned( + mock_scan: MagicMock, mock_ctx: MagicMock, default_policy: dict[str, Any] +) -> None: + """Test that MCP execution with secrets in warn mode asks permission.""" + mock_scan.return_value = ('Found 1 secret: token', 'scan-id-789') + default_policy['mcp']['action'] = 'warn' + payload = AIHookPayload( + event_name='mcp_execution', + ide_provider='cursor', + mcp_tool_name='test_tool', + mcp_arguments={'arg1': 'secret_token_12345'}, + ) + + result = handle_before_mcp_execution(mock_ctx, payload, default_policy) + + assert result['permission'] == 'ask' + assert 'Found 1 secret: token' in result['user_message'] + call_args = mock_ctx.obj['ai_security_client'].create_event.call_args + assert call_args.args[2] == AIHookOutcome.WARNED + + +@patch('cycode.cli.apps.ai_guardrails.scan.handlers._scan_text_for_secrets') +def test_handle_before_mcp_execution_scan_disabled( + mock_scan: MagicMock, mock_ctx: MagicMock, default_policy: dict[str, Any] +) -> None: + """Test that MCP execution is allowed when argument scanning is disabled.""" + default_policy['mcp']['scan_arguments'] = False + payload = AIHookPayload( + event_name='mcp_execution', + ide_provider='cursor', + mcp_tool_name='test_tool', + mcp_arguments={'arg1': 'value1'}, + ) + + result = handle_before_mcp_execution(mock_ctx, payload, default_policy) + + assert result == {'permission': 'allow'} + mock_scan.assert_not_called() diff --git a/tests/cli/commands/ai_guardrails/scan/test_payload.py b/tests/cli/commands/ai_guardrails/scan/test_payload.py new file mode 100644 index 00000000..9d14dda3 --- /dev/null +++ b/tests/cli/commands/ai_guardrails/scan/test_payload.py @@ -0,0 +1,135 @@ +"""Tests for AI hook payload normalization.""" + +import pytest + +from cycode.cli.apps.ai_guardrails.scan.payload import AIHookPayload +from cycode.cli.apps.ai_guardrails.scan.types import AiHookEventType + + +def test_from_cursor_payload_prompt_event() -> None: + """Test conversion of Cursor beforeSubmitPrompt payload.""" + cursor_payload = { + 'hook_event_name': 'beforeSubmitPrompt', + 'conversation_id': 'conv-123', + 'generation_id': 'gen-456', + 'user_email': 'user@example.com', + 'model': 'gpt-4', + 'cursor_version': '0.42.0', + 'prompt': 'Test prompt', + } + + unified = AIHookPayload.from_cursor_payload(cursor_payload) + + assert unified.event_name == AiHookEventType.PROMPT + assert unified.conversation_id == 'conv-123' + assert unified.generation_id == 'gen-456' + assert unified.ide_user_email == 'user@example.com' + assert unified.model == 'gpt-4' + assert unified.ide_provider == 'cursor' + assert unified.ide_version == '0.42.0' + assert unified.prompt == 'Test prompt' + + +def test_from_cursor_payload_file_read_event() -> None: + """Test conversion of Cursor beforeReadFile payload.""" + cursor_payload = { + 'hook_event_name': 'beforeReadFile', + 'conversation_id': 'conv-123', + 'file_path': '/path/to/secret.env', + } + + unified = AIHookPayload.from_cursor_payload(cursor_payload) + + assert unified.event_name == AiHookEventType.FILE_READ + assert unified.file_path == '/path/to/secret.env' + assert unified.ide_provider == 'cursor' + + +def test_from_cursor_payload_mcp_execution_event() -> None: + """Test conversion of Cursor beforeMCPExecution payload.""" + cursor_payload = { + 'hook_event_name': 'beforeMCPExecution', + 'conversation_id': 'conv-123', + 'command': 'GitLab', + 'tool_name': 'discussion_list', + 'arguments': {'resource_type': 'merge_request', 'parent_id': 'organization/repo', 'resource_id': '4'}, + } + + unified = AIHookPayload.from_cursor_payload(cursor_payload) + + assert unified.event_name == AiHookEventType.MCP_EXECUTION + assert unified.mcp_server_name == 'GitLab' + assert unified.mcp_tool_name == 'discussion_list' + assert unified.mcp_arguments == { + 'resource_type': 'merge_request', + 'parent_id': 'organization/repo', + 'resource_id': '4', + } + + +def test_from_cursor_payload_with_alternative_field_names() -> None: + """Test that alternative field names are handled (path vs file_path, etc.).""" + cursor_payload = { + 'hook_event_name': 'beforeReadFile', + 'path': '/alternative/path.txt', # Alternative to file_path + } + + unified = AIHookPayload.from_cursor_payload(cursor_payload) + assert unified.file_path == '/alternative/path.txt' + + cursor_payload = { + 'hook_event_name': 'beforeMCPExecution', + 'tool': 'my_tool', # Alternative to tool_name + 'tool_input': {'key': 'value'}, # Alternative to arguments + } + + unified = AIHookPayload.from_cursor_payload(cursor_payload) + assert unified.mcp_tool_name == 'my_tool' + assert unified.mcp_arguments == {'key': 'value'} + + +def test_from_cursor_payload_unknown_event() -> None: + """Test that unknown event names are passed through as-is.""" + cursor_payload = { + 'hook_event_name': 'unknownEvent', + 'conversation_id': 'conv-123', + } + + unified = AIHookPayload.from_cursor_payload(cursor_payload) + # Unknown events fall back to original name + assert unified.event_name == 'unknownEvent' + + +def test_from_payload_cursor() -> None: + """Test from_payload dispatcher with Cursor tool.""" + cursor_payload = { + 'hook_event_name': 'beforeSubmitPrompt', + 'prompt': 'test', + } + + unified = AIHookPayload.from_payload(cursor_payload, tool='cursor') + assert unified.event_name == AiHookEventType.PROMPT + assert unified.ide_provider == 'cursor' + + +def test_from_payload_unsupported_tool() -> None: + """Test from_payload raises ValueError for unsupported tools.""" + payload = {'hook_event_name': 'someEvent'} + + with pytest.raises(ValueError, match='Unsupported IDE/tool: unsupported'): + AIHookPayload.from_payload(payload, tool='unsupported') + + +def test_from_cursor_payload_empty_fields() -> None: + """Test handling of empty/missing fields.""" + cursor_payload = { + 'hook_event_name': 'beforeSubmitPrompt', + # Most fields missing + } + + unified = AIHookPayload.from_cursor_payload(cursor_payload) + + assert unified.event_name == AiHookEventType.PROMPT + assert unified.conversation_id is None + assert unified.prompt == '' # Default to empty string + assert unified.ide_provider == 'cursor' diff --git a/tests/cli/commands/ai_guardrails/scan/test_policy.py b/tests/cli/commands/ai_guardrails/scan/test_policy.py new file mode 100644 index 00000000..bbe884b0 --- /dev/null +++ b/tests/cli/commands/ai_guardrails/scan/test_policy.py @@ -0,0 +1,199 @@ +"""Tests for AI guardrails policy loading and management.""" + +from pathlib import Path +from typing import Optional +from unittest.mock import MagicMock, patch + +from pyfakefs.fake_filesystem import FakeFilesystem + +from cycode.cli.apps.ai_guardrails.scan.policy import ( + deep_merge, + get_policy_value, + load_defaults, + load_policy, + load_yaml_file, +) + + +def test_deep_merge_simple() -> None: + """Test deep merging two simple dictionaries.""" + base = {'a': 1, 'b': 2} + override = {'b': 3, 'c': 4} + result = deep_merge(base, override) + + assert result == {'a': 1, 'b': 3, 'c': 4} + + +def test_deep_merge_nested() -> None: + """Test deep merging nested dictionaries.""" + base = {'level1': {'level2': {'key1': 'value1', 'key2': 'value2'}}} + override = {'level1': {'level2': {'key2': 'override2', 'key3': 'value3'}}} + result = deep_merge(base, override) + + assert result == {'level1': {'level2': {'key1': 'value1', 'key2': 'override2', 'key3': 'value3'}}} + + +def test_deep_merge_override_with_non_dict() -> None: + """Test that non-dict overrides replace the base value entirely.""" + base = {'key': {'nested': 'value'}} + override = {'key': 'simple_value'} + result = deep_merge(base, override) + + assert result == {'key': 'simple_value'} + + +def test_load_yaml_file_nonexistent(fs: FakeFilesystem) -> None: + """Test loading a non-existent file returns None.""" + result = load_yaml_file(Path('/fake/nonexistent.yaml')) + assert result is None + + +def test_load_yaml_file_valid_yaml(fs: FakeFilesystem) -> None: + """Test loading a valid YAML file.""" + fs.create_file('/fake/config.yaml', contents='mode: block\nfail_open: true\n') + + result = load_yaml_file(Path('/fake/config.yaml')) + assert result == {'mode': 'block', 'fail_open': True} + + +def test_load_yaml_file_valid_json(fs: FakeFilesystem) -> None: + """Test loading a valid JSON file.""" + fs.create_file('/fake/config.json', contents='{"mode": "block", "fail_open": true}') + + result = load_yaml_file(Path('/fake/config.json')) + assert result == {'mode': 'block', 'fail_open': True} + + +def test_load_yaml_file_invalid_yaml(fs: FakeFilesystem) -> None: + """Test loading an invalid YAML file returns None.""" + fs.create_file('/fake/invalid.yaml', contents='{ invalid yaml content [') + + result = load_yaml_file(Path('/fake/invalid.yaml')) + assert result is None + + +def test_load_defaults() -> None: + """Test that load_defaults returns a dict with expected keys.""" + defaults = load_defaults() + + assert isinstance(defaults, dict) + assert 'mode' in defaults + assert 'fail_open' in defaults + assert 'prompt' in defaults + assert 'file_read' in defaults + assert 'mcp' in defaults + + +def test_get_policy_value_single_key() -> None: + """Test getting a single-level value.""" + policy = {'mode': 'block', 'fail_open': True} + + assert get_policy_value(policy, 'mode') == 'block' + assert get_policy_value(policy, 'fail_open') is True + + +def test_get_policy_value_nested_keys() -> None: + """Test getting a nested value.""" + policy = {'prompt': {'enabled': True, 'action': 'block'}} + + assert get_policy_value(policy, 'prompt', 'enabled') is True + assert get_policy_value(policy, 'prompt', 'action') == 'block' + + +def test_get_policy_value_missing_key() -> None: + """Test that missing keys return the default value.""" + policy = {'mode': 'block'} + + assert get_policy_value(policy, 'nonexistent', default='default_value') == 'default_value' + + +def test_get_policy_value_deeply_nested() -> None: + """Test getting deeply nested values.""" + policy = {'level1': {'level2': {'level3': 'value'}}} + + assert get_policy_value(policy, 'level1', 'level2', 'level3') == 'value' + assert get_policy_value(policy, 'level1', 'level2', 'missing', default='def') == 'def' + + +def test_get_policy_value_non_dict_in_path() -> None: + """Test that non-dict values in path return default.""" + policy = {'key': 'string_value'} + + # Trying to access nested key on non-dict should return default + assert get_policy_value(policy, 'key', 'nested', default='default') == 'default' + + +@patch('cycode.cli.apps.ai_guardrails.scan.policy.load_yaml_file') +def test_load_policy_defaults_only(mock_load: MagicMock) -> None: + """Test loading policy with only defaults (no user or repo config).""" + mock_load.return_value = None # No user or repo config + + policy = load_policy() + + assert 'mode' in policy + assert 'fail_open' in policy + + +@patch('pathlib.Path.home') +def test_load_policy_with_user_config(mock_home: MagicMock, fs: FakeFilesystem) -> None: + """Test loading policy with user config override.""" + mock_home.return_value = Path('/home/testuser') + + # Create user config in fake filesystem + fs.create_file('/home/testuser/.cycode/ai-guardrails.yaml', contents='mode: warn\nfail_open: false\n') + + policy = load_policy() + + # User config should override defaults + assert policy['mode'] == 'warn' + assert policy['fail_open'] is False + + +@patch('cycode.cli.apps.ai_guardrails.scan.policy.load_yaml_file') +def test_load_policy_with_repo_config(mock_load: MagicMock) -> None: + """Test loading policy with repo config (highest precedence).""" + repo_path = Path('/fake/repo') + repo_config = repo_path / '.cycode' / 'ai-guardrails.yaml' + + def side_effect(path: Path) -> Optional[dict]: + if path == repo_config: + return {'mode': 'block', 'prompt': {'enabled': False}} + return None + + mock_load.side_effect = side_effect + + policy = load_policy(str(repo_path)) + + # Repo config should have highest precedence + assert policy['mode'] == 'block' + assert policy['prompt']['enabled'] is False + + +@patch('pathlib.Path.home') +def test_load_policy_precedence(mock_home: MagicMock, fs: FakeFilesystem) -> None: + """Test that policy precedence is: defaults < user < repo.""" + mock_home.return_value = Path('/home/testuser') + + # Create user config + fs.create_file('/home/testuser/.cycode/ai-guardrails.yaml', contents='mode: warn\nfail_open: false\n') + + # Create repo config + fs.create_file('/fake/repo/.cycode/ai-guardrails.yaml', contents='mode: block\n') + + policy = load_policy('/fake/repo') + + # mode should come from repo (highest precedence) + assert policy['mode'] == 'block' + # fail_open should come from user config (repo doesn't override it) + assert policy['fail_open'] is False + + +@patch('cycode.cli.apps.ai_guardrails.scan.policy.load_yaml_file') +def test_load_policy_none_workspace_root(mock_load: MagicMock) -> None: + """Test that None workspace_root is handled correctly.""" + mock_load.return_value = None + + policy = load_policy(None) + + # Should only load defaults (no repo config) + assert 'mode' in policy diff --git a/tests/cli/commands/ai_guardrails/scan/test_response_builders.py b/tests/cli/commands/ai_guardrails/scan/test_response_builders.py new file mode 100644 index 00000000..86e87ca7 --- /dev/null +++ b/tests/cli/commands/ai_guardrails/scan/test_response_builders.py @@ -0,0 +1,79 @@ +"""Tests for IDE response builders.""" + +import pytest + +from cycode.cli.apps.ai_guardrails.scan.response_builders import ( + CursorResponseBuilder, + IDEResponseBuilder, + get_response_builder, +) + + +def test_cursor_response_builder_allow_permission() -> None: + """Test Cursor allow permission response.""" + builder = CursorResponseBuilder() + response = builder.allow_permission() + + assert response == {'permission': 'allow'} + + +def test_cursor_response_builder_deny_permission() -> None: + """Test Cursor deny permission response with messages.""" + builder = CursorResponseBuilder() + response = builder.deny_permission('User message', 'Agent message') + + assert response == { + 'permission': 'deny', + 'user_message': 'User message', + 'agent_message': 'Agent message', + } + + +def test_cursor_response_builder_ask_permission() -> None: + """Test Cursor ask permission response for warnings.""" + builder = CursorResponseBuilder() + response = builder.ask_permission('Warning message', 'Agent warning') + + assert response == { + 'permission': 'ask', + 'user_message': 'Warning message', + 'agent_message': 'Agent warning', + } + + +def test_cursor_response_builder_allow_prompt() -> None: + """Test Cursor allow prompt response.""" + builder = CursorResponseBuilder() + response = builder.allow_prompt() + + assert response == {'continue': True} + + +def test_cursor_response_builder_deny_prompt() -> None: + """Test Cursor deny prompt response with message.""" + builder = CursorResponseBuilder() + response = builder.deny_prompt('Secrets detected') + + assert response == {'continue': False, 'user_message': 'Secrets detected'} + + +def test_get_response_builder_cursor() -> None: + """Test getting Cursor response builder.""" + builder = get_response_builder('cursor') + + assert isinstance(builder, CursorResponseBuilder) + assert isinstance(builder, IDEResponseBuilder) + + +def test_get_response_builder_unsupported() -> None: + """Test that unsupported IDE raises ValueError.""" + with pytest.raises(ValueError, match='Unsupported IDE: unknown'): + get_response_builder('unknown') + + +def test_cursor_response_builder_is_singleton() -> None: + """Test that getting the same builder returns the same instance.""" + builder1 = get_response_builder('cursor') + builder2 = get_response_builder('cursor') + + assert builder1 is builder2 diff --git a/tests/cli/commands/ai_guardrails/scan/test_utils.py b/tests/cli/commands/ai_guardrails/scan/test_utils.py new file mode 100644 index 00000000..ce84c609 --- /dev/null +++ b/tests/cli/commands/ai_guardrails/scan/test_utils.py @@ -0,0 +1,113 @@ +"""Tests for AI guardrails utility functions.""" + +from cycode.cli.apps.ai_guardrails.scan.utils import ( + is_denied_path, + matches_glob, + normalize_path, +) + + +def test_normalize_path_rejects_escape() -> None: + """Test that paths attempting to escape are rejected.""" + path = '../../../etc/passwd' + result = normalize_path(path) + + assert result == '' + + +def test_normalize_path_empty() -> None: + """Test normalizing empty path.""" + result = normalize_path('') + + assert result == '' + + +def test_matches_glob_simple() -> None: + """Test simple glob pattern matching.""" + assert matches_glob('secret.env', '*.env') is True + assert matches_glob('secret.txt', '*.env') is False + + +def test_matches_glob_recursive() -> None: + """Test recursive glob pattern with **.""" + assert matches_glob('path/to/secret.env', '**/*.env') is True + # Note: '**/*.env' requires at least one path separator, so 'secret.env' won't match + assert matches_glob('secret.env', '*.env') is True # Use non-recursive pattern instead + assert matches_glob('path/to/file.txt', '**/*.env') is False + + +def test_matches_glob_directory() -> None: + """Test matching files in specific directories.""" + assert matches_glob('.env', '.env') is True + assert matches_glob('config/.env', '**/.env') is True + assert matches_glob('other/file', '**/.env') is False + + +def test_matches_glob_case_insensitive() -> None: + """Test that glob matching handles case variations.""" + # Case-insensitive matching for cross-platform compatibility + assert matches_glob('secret.env', '*.env') is True + assert matches_glob('SECRET.ENV', '*.env') is True # Uppercase path matches lowercase pattern + assert matches_glob('Secret.Env', '*.env') is True # Mixed case matches + assert matches_glob('secret.env', '*.ENV') is True # Lowercase path matches uppercase pattern + assert matches_glob('SECRET.ENV', '*.ENV') is True # Both uppercase match + + +def test_matches_glob_empty_inputs() -> None: + """Test glob matching with empty inputs.""" + assert matches_glob('', '*.env') is False + assert matches_glob('file.env', '') is False + assert matches_glob('', '') is False + + +def test_matches_glob_with_traversal_attempt() -> None: + """Test that path traversal is normalized before matching.""" + # Path traversal attempts should be normalized + assert matches_glob('../secret.env', '*.env') is False + + +def test_is_denied_path_with_deny_globs() -> None: + """Test path denial with deny_globs policy.""" + policy = {'file_read': {'deny_globs': ['*.env', '.git/*', '**/secrets/*']}} + + assert is_denied_path('.env', policy) is True + # Note: Path.match('*.env') matches paths ending with .env, including nested paths + assert is_denied_path('config/.env', policy) is True # Matches *.env + assert is_denied_path('.git/config', policy) is True # Matches .git/* + assert is_denied_path('app/secrets/api_keys.txt', policy) is True # Matches **/secrets/* + assert is_denied_path('app/config.yaml', policy) is False + + +def test_is_denied_path_nested_patterns() -> None: + """Test denial with various nesting patterns.""" + policy = {'file_read': {'deny_globs': ['*.key', '**/*.key', 'config/*.env']}} + + # *.key matches .key files at root level, **/*.key for nested + assert is_denied_path('private.key', policy) is True + assert is_denied_path('app/private.key', policy) is True + # config/*.env only matches .env files directly in config/ + assert is_denied_path('config/app.env', policy) is True + assert is_denied_path('config/sub/app.env', policy) is False # Not direct child + assert is_denied_path('app/config.yaml', policy) is False + + +def test_is_denied_path_empty_globs() -> None: + """Test that empty deny_globs list denies nothing.""" + policy = {'file_read': {'deny_globs': []}} + + assert is_denied_path('.env', policy) is False + assert is_denied_path('any/path', policy) is False + + +def test_is_denied_path_no_policy() -> None: + """Test denial with missing policy configuration.""" + policy = {} + + assert is_denied_path('.env', policy) is False + + +def test_is_denied_path_empty_path() -> None: + """Test denial check with empty path.""" + policy = {'file_read': {'deny_globs': ['*.env']}} + + assert is_denied_path('', policy) is False diff --git a/tests/cli/commands/ai_guardrails/test_command_utils.py b/tests/cli/commands/ai_guardrails/test_command_utils.py new file mode 100644 index 00000000..4f0ef55e --- /dev/null +++ b/tests/cli/commands/ai_guardrails/test_command_utils.py @@ -0,0 +1,57 @@ +"""Tests for AI guardrails command utilities.""" + +import pytest +import typer + +from cycode.cli.apps.ai_guardrails.command_utils import ( + validate_and_parse_ide, + validate_scope, +) +from cycode.cli.apps.ai_guardrails.consts import AIIDEType + + +def test_validate_and_parse_ide_valid() -> None: + """Test parsing valid IDE names.""" + assert validate_and_parse_ide('cursor') == AIIDEType.CURSOR + assert validate_and_parse_ide('CURSOR') == AIIDEType.CURSOR + assert validate_and_parse_ide('CuRsOr') == AIIDEType.CURSOR + + +def test_validate_and_parse_ide_invalid() -> None: + """Test that invalid IDE raises typer.Exit.""" + with pytest.raises(typer.Exit) as exc_info: + validate_and_parse_ide('invalid_ide') + assert exc_info.value.exit_code == 1 + + +def test_validate_scope_valid_default() -> None: + """Test validating valid scope with default allowed scopes.""" + # Should not raise any exception + validate_scope('user') + validate_scope('repo') + + +def test_validate_scope_invalid_default() -> None: + """Test that invalid scope raises typer.Exit with default allowed scopes.""" + with pytest.raises(typer.Exit) as exc_info: + validate_scope('invalid') + assert exc_info.value.exit_code == 1 + + with pytest.raises(typer.Exit) as exc_info: + validate_scope('all') # 'all' not in default allowed scopes + assert exc_info.value.exit_code == 1 + + +def test_validate_scope_valid_custom() -> None: + """Test validating scope with custom allowed scopes.""" + # Should not raise any exception + validate_scope('user', allowed_scopes=('user', 'repo', 'all')) + validate_scope('repo', allowed_scopes=('user', 'repo', 'all')) + validate_scope('all', allowed_scopes=('user', 'repo', 'all')) + + +def test_validate_scope_invalid_custom() -> None: + """Test that invalid scope raises typer.Exit with custom allowed scopes.""" + with pytest.raises(typer.Exit) as exc_info: + validate_scope('invalid', allowed_scopes=('user', 'repo', 'all')) + assert exc_info.value.exit_code == 1