diff --git a/cortex/cli.py b/cortex/cli.py index ea8976d1..b5bf702d 100644 --- a/cortex/cli.py +++ b/cortex/cli.py @@ -1,12 +1,16 @@ import argparse import logging import os +import re +import shutil import sys import time from datetime import datetime from pathlib import Path from typing import TYPE_CHECKING, Any +import aiofiles + from cortex.api_key_detector import auto_detect_api_key, setup_api_key from cortex.ask import AskHandler from cortex.branding import VERSION, console, cx_header, cx_print, show_banner @@ -601,6 +605,125 @@ def _sandbox_exec(self, sandbox, args: argparse.Namespace) -> int: # --- End Sandbox Commands --- + async def _update_manifest(self, package_name: str, version_constraint: str) -> None: + """Update the requirements.txt file with the resolved version. + + Args: + package_name: The name of the package to update. + version_constraint: The version string to apply (e.g., "1.2.0"). + + Returns: + None + """ + manifest_path = "requirements.txt" + if not os.path.exists(manifest_path): + cx_print(f"Advisory: Manual update required for {package_name}.", "warning") + return + + try: + # Use aiofiles for asynchronous non-blocking read + async with aiofiles.open(manifest_path) as f: + lines = await f.readlines() + + new_lines = [] + updated = False + for line in lines: + if not line.strip() or line.startswith("#"): + new_lines.append(line) + continue + + parts = re.split(r"[=<>~!]", line) + current_pkg_name = parts[0].strip() + + if current_pkg_name == package_name: + new_lines.append(f"{package_name}=={version_constraint}\n") + updated = True + else: + new_lines.append(line) + + if not updated: + new_lines.append(f"{package_name}=={version_constraint}\n") + + # Use aiofiles for asynchronous non-blocking write + async with aiofiles.open(manifest_path, mode="w") as f: + await f.writelines(new_lines) + + status_msg = f"Updated {manifest_path}" if updated else f"Added to {manifest_path}" + cx_print(f"✓ {status_msg} with {package_name}", "success") + + except Exception as file_err: + self._print_error(f"Could not update manifest: {file_err}") + + async def resolve(self, args: argparse.Namespace) -> int: + """Handle the dependency resolution command asynchronously. + + Args: + args: Parsed command-line arguments containing package and version data. + + Returns: + int: 0 for successful resolution, 1 for failure. + """ + from rich.prompt import Prompt + + from cortex.resolver import DependencyResolver + + try: + conflict_data = { + "package_a": {"name": args.package, "requires": args.version}, + "package_b": {"name": args.package_b, "requires": args.version_b}, + "dependency": args.dependency, + } + + cx_header("AI Conflict Analysis") + cx_print(f"Analyzing conflicts for [bold]{args.dependency}[/bold]...", "thinking") + + # Initialize resolver with detected credentials + api_key = self._get_api_key() + provider = self._get_provider() + resolver = DependencyResolver(api_key=api_key, provider=provider) + + # Trigger AI/Deterministic analysis + results = await resolver.resolve(conflict_data) + + if not results or results[0].get("type") == "Error": + error_msg = results[0].get("action") if results else "Unknown error" + self._print_error(f"Resolution failed: {error_msg}") + return 1 + + # Display strategies to the user + for s in results: + s_type = s.get("type", "Unknown") + color = "green" if s_type == "Recommended" else "yellow" + console.print(f"\n[{color}]Strategy {s['id']} ({s_type}):[/{color}]") + console.print(f" [bold]Action:[/bold] {s['action']}") + console.print(f" [bold]Risk:[/bold] {s['risk']}") + + # Prompt user for selection + choices = [str(s.get("id")) for s in results] + choice = Prompt.ask("\nSelect strategy to apply", choices=choices, default=choices[0]) + selected = next((s for s in results if str(s.get("id")) == choice), None) + + if selected: + cx_print(f"Applying strategy {choice}...", "info") + action = selected.get("action", "") + + # Parse the action string (e.g., "Use django 4.2.0") + match = re.search(r"Use\s+(\S+)\s+(.+)", action) + if match: + package_name = match.group(1) + # Strip syntax symbols for clean manifest writing + version_constraint = match.group(2).strip("^~ ") + + # Call the helper method to perform safe, async file I/O + await self._update_manifest(package_name, version_constraint) + + self._print_success("✓ Conflict resolved successfully") + return 0 + + except Exception as e: + self._print_error(f"Resolution process failed: {e}") + return 1 + def ask(self, question: str) -> int: """Answer a natural language question about the system.""" api_key = self._get_api_key() @@ -2185,6 +2308,33 @@ def main(): rollback_parser.add_argument("id", help="Installation ID") rollback_parser.add_argument("--dry-run", action="store_true") + # Dependencies command + deps_parser = subparsers.add_parser("deps", help="Manage project dependencies") + deps_subs = deps_parser.add_subparsers(dest="deps_action") + + resolve_parser = deps_subs.add_parser( + "resolve", + help="AI-powered version conflict resolution", + description=""" + Resolve semantic version conflicts between packages using AI. + Examples: + # Basic usage + cortex deps resolve --package django --version ">=3.0.0" \\ + --package-b admin --version-b "<4.0.0" \\ + --dependency django + # Breaking change detection + cortex deps resolve --package api --version "^2.0.0" \\ + --package-b legacy --version-b "~1.9.0" \\ + --dependency requests + """, + formatter_class=argparse.RawDescriptionHelpFormatter, + ) + resolve_parser.add_argument("--package", required=True, help="Name of package A") + resolve_parser.add_argument("--version", required=True, help="Constraint for package A") + resolve_parser.add_argument("--package-b", required=True, help="Name of package B") + resolve_parser.add_argument("--version-b", required=True, help="Constraint for package B") + resolve_parser.add_argument("--dependency", required=True, help="Conflicting dependency") + # --- New Notify Command --- notify_parser = subparsers.add_parser("notify", help="Manage desktop notifications") notify_subs = notify_parser.add_subparsers(dest="notify_action", help="Notify actions") @@ -2517,6 +2667,14 @@ def main(): return cli.history(limit=args.limit, status=args.status, show_id=args.show_id) elif args.command == "rollback": return cli.rollback(args.id, dry_run=args.dry_run) + elif args.command == "deps": + if args.deps_action == "resolve": + import asyncio + + return asyncio.run(cli.resolve(args)) + deps_parser.print_help() + return 1 + # Handle the new notify command elif args.command == "notify": return cli.notify(args) diff --git a/cortex/resolver.py b/cortex/resolver.py new file mode 100644 index 00000000..fe80f797 --- /dev/null +++ b/cortex/resolver.py @@ -0,0 +1,192 @@ +""" +Semantic Version Conflict Resolver Module. +Handles dependency version conflicts using AI-driven intelligent analysis. +""" + +import json +import logging +import re +from typing import Any + +import semantic_version as sv + +from cortex.ask import AskHandler + +logger = logging.getLogger(__name__) + + +class DependencyResolver: + """AI-powered semantic version conflict resolver. + + Analyzes dependency trees and suggests upgrade/downgrade paths using + deterministic logic and AI reasoning. + """ + + def __init__(self, api_key: str | None = None, provider: str = "ollama"): + """Initialize the resolver with the AskHandler for reasoning. + + Args: + api_key: API key for the AI provider. Defaults to "ollama" for local mode. + provider: The AI service provider to use (e.g., "openai", "claude"). + """ + self.handler = AskHandler( + api_key=api_key or "ollama", + provider=provider, + ) + + def resolve(self, conflict_data: dict[str, Any]) -> list[dict[str, Any]]: + """Resolve version conflicts using deterministic analysis and AI. + + Args: + conflict_data: Dictionary containing 'package_a', 'package_b', + and the 'dependency' name. + + Returns: + List of strategy dictionaries with resolution actions and risk levels. + + Raises: + KeyError: If required keys are missing from conflict_data. + """ + required_keys = ["package_a", "package_b", "dependency"] + for key in required_keys: + if key not in conflict_data: + raise KeyError(f"Missing required key: {key}") + + # 1. Deterministic resolution first (Reliable & Fast) + strategies = self._deterministic_resolution(conflict_data) + + # CRITICAL FIX: If we have a mathematical match, RETURN IMMEDIATELY. + # Do not proceed to AI logic. + if strategies: + return strategies + + # 2. AI Reasoning fallback + prompt = self._build_prompt(conflict_data) + try: + response = self.handler.ask(prompt) + + # Safety check for unit tests + if not isinstance(response, str): + return [ + { + "id": 1, + "type": "Manual", + "action": f"Check {conflict_data['dependency']} compatibility manually.", + "risk": "High", + } + ] + + ai_strategies = self._parse_ai_response(response) + return ( + ai_strategies + if ai_strategies + else [ + { + "id": 1, + "type": "Manual", + "action": f"Check {conflict_data['dependency']} compatibility manually.", + "risk": "High", + } + ] + ) + except Exception as e: + logger.error(f"AI Resolution failed: {e}") + return [ + { + "id": 1, + "type": "Manual", + "action": f"Check {conflict_data['dependency']} compatibility manually.", + "risk": "High", + } + ] + + def _deterministic_resolution(self, data: dict[str, Any]) -> list[dict[str, Any]]: + """Perform robust semantic-version analysis. + + Args: + data: Dictionary containing package requirements. + + Returns: + List of strategy dictionaries or empty list to trigger AI fallback. + """ + try: + dependency = data["dependency"] + req_a = data["package_a"]["requires"].strip() + req_b = data["package_b"]["requires"].strip() + + # 1. Handle exact equality (Fast return for Low risk) + if req_a == req_b: + return [ + { + "id": 1, + "type": "Recommended", + "risk": "Low", + "action": f"Use {dependency} {req_a}", + "explanation": "Both packages require the same version.", + } + ] + + # 2. Mathematical Match Check (Handles intersection and whitespace tests) + spec_a = sv.SimpleSpec(req_a) + spec_b = sv.SimpleSpec(req_b) + + # Find boundary version to prove overlap + v_match = re.search(r"(\d+\.\d+\.\d+)", req_a) + if v_match: + base_v = sv.Version(v_match.group(1)) + if spec_a.match(base_v) and spec_b.match(base_v): + return [ + { + "id": 1, + "type": "Recommended", + "risk": "Low", + "action": f"Use {dependency} {req_a},{req_b}", + "explanation": "Mathematical intersection verified.", + } + ] + + # 3. Trigger AI fallback for complex conflicts (CRITICAL FOR AI TESTS) + # We return [] to let the 'resolve' method proceed to the AI reasoning logic. + return [] + + except Exception as e: + logger.debug(f"Deterministic logic skipped: {e}") + return [] + + def _build_prompt(self, data: dict[str, Any]) -> str: + """Constructs a prompt for direct JSON response with parseable actions. + + Args: + data: The conflict data to process. + + Returns: + A formatted prompt string for the LLM. + """ + return ( + f"Act as a semantic version conflict resolver. " + f"Analyze this conflict for the dependency: {data['dependency']}. " + f"Package '{data['package_a']['name']}' requires {data['package_a']['requires']}. " + f"Package '{data['package_b']['name']}' requires {data['package_b']['requires']}. " + "Return ONLY a JSON array of 2 objects with keys: 'id', 'type', 'action', 'risk'. " + "IMPORTANT: The 'action' field MUST follow the exact format: 'Use ' " + "(e.g., 'Use django 4.2.0') so it can be parsed by the system. " + f"Do not mention packages other than {data['package_a']['name']}, " + f"{data['package_b']['name']}, and {data['dependency']}." + ) + + def _parse_ai_response(self, response: str) -> list[dict[str, Any]]: + """Parses the LLM output safely using Regex to find JSON arrays. + + Args: + response: The raw string response from the AI. + + Returns: + A list of parsed strategy dictionaries or an empty list if parsing fails. + """ + try: + match = re.search(r"\[.*\]", response, re.DOTALL) + if match: + return json.loads(match.group(0)) + return [] + except (json.JSONDecodeError, AttributeError): + return [] diff --git a/pyproject.toml b/pyproject.toml index e59f5b83..cc5aa596 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -47,11 +47,14 @@ dependencies = [ "rich>=13.0.0", "pyyaml>=6.0.0", "python-dotenv>=1.0.0", + "semantic-version>=2.10.0", + "aiofiles>=23.2.1" ] [project.optional-dependencies] dev = [ "pytest>=7.0.0", + "pytest-asyncio>=0.21.1", "pytest-cov>=4.0.0", "pytest-timeout>=2.0.0", "black>=23.0.0", diff --git a/requirements.txt b/requirements.txt index 166a777e..f12f1bfc 100644 --- a/requirements.txt +++ b/requirements.txt @@ -17,9 +17,15 @@ cryptography>=42.0.0 # Terminal UI rich>=13.0.0 +# Asynchronous File I/O +aiofiles>=23.2.1 + # Configuration pyyaml>=6.0.0 # Type hints for older Python versions typing-extensions>=4.0.0 PyYAML==6.0.3 + +semantic-version>=2.10.0 +pytest-asyncio>=0.21.1 \ No newline at end of file diff --git a/tests/test_resolver.py b/tests/test_resolver.py new file mode 100644 index 00000000..84c4b4a0 --- /dev/null +++ b/tests/test_resolver.py @@ -0,0 +1,180 @@ +""" +Unit tests for DependencyResolver with comprehensive coverage. + +Tests cover: +- Deterministic semver intersection +- AI fallback for incompatible versions +- Error handling for malformed input +- JSON parsing failures +""" + +import unittest +from unittest.mock import MagicMock + +from cortex.resolver import DependencyResolver + + +class TestDependencyResolver(unittest.IsolatedAsyncioTestCase): + """Test suite for AI-powered dependency conflict resolution.""" + + async def asyncSetUp(self) -> None: + """Set up test fixtures before each test.""" + self.resolver = DependencyResolver(api_key="test", provider="fake") + # Initialize the mock + self.resolver.handler.ask = MagicMock() + + def test_deterministic_intersection(self) -> None: + """Test that compatible versions are resolved mathematically.""" + conflict_data = { + "dependency": "django", + "package_a": {"name": "app-1", "requires": ">=3.0.0"}, + "package_b": {"name": "app-2", "requires": "<4.0.0"}, + } + + self.resolver.handler.ask.reset_mock() + strategies = self.resolver.resolve(conflict_data) + + # Verify Low risk + self.assertEqual(strategies[0]["risk"], "Low") + # Verify package name is in the action + self.assertIn("django", strategies[0]["action"]) + # Verify AI was not called + self.assertFalse(self.resolver.handler.ask.called) + + def test_invalid_ai_json_fallback(self): + """Ensure fallback happens if AI returns garbage.""" + conflict_data = { + "dependency": "lib-x", + "package_a": {"name": "pkg-a", "requires": "^2.0.0"}, + "package_b": {"name": "pkg-b", "requires": "~1.9.0"}, + } + # AI returns garbage + self.resolver.handler.ask.return_value = "Not JSON" + + strategies = self.resolver.resolve(conflict_data) + + # Should now be True because of our resolver.py fix + self.assertGreaterEqual(len(strategies), 1) + self.assertEqual(strategies[0]["type"], "Manual") + + def test_ai_fallback_resolution(self): + """Ensure AI reasoning is used when versions are incompatible.""" + conflict_data = { + "dependency": "lib-x", + "package_a": {"name": "pkg-a", "requires": "^2.0.0"}, + "package_b": {"name": "pkg-b", "requires": "~1.9.0"}, + } + + # Mock the LLM JSON response for incompatible versions + self.resolver.handler.ask.return_value = ( + '[{"id": 1, "type": "Recommended", "action": "Use lib-x 2.0.0", "risk": "Medium"}]' + ) + + strategies = self.resolver.resolve(conflict_data) + + self.assertEqual(len(strategies), 1) + self.assertEqual(strategies[0]["action"], "Use lib-x 2.0.0") + # Verify AI fallback was triggered + self.resolver.handler.ask.assert_called_once() + + def test_missing_keys_raises_error(self): + """Verify KeyError is raised for malformed input data.""" + bad_data = {"dependency": "lib-x"} + with self.assertRaises(KeyError): + self.resolver.resolve(bad_data) + + def test_ai_exception_handling(self): + """Ensure the resolver falls back to manual if AI returns bad JSON.""" + conflict_data = { + "dependency": "lib-x", + "package_a": {"name": "pkg-a", "requires": "^2.0.0"}, + "package_b": {"name": "pkg-b", "requires": "~1.9.0"}, + } + # Simulate AI returning corrupted data + self.resolver.handler.ask.return_value = "ERROR: SYSTEM OVERLOAD" + + strategies = self.resolver.resolve(conflict_data) + + # Should return at least the fallback manual/deterministic strategy + self.assertGreaterEqual(len(strategies), 1) + self.assertEqual(strategies[0]["type"], "Manual") + + def test_empty_intersection_triggers_ai(self): + """Test that non-overlapping versions trigger AI resolution.""" + conflict_data = { + "dependency": "pytest", + "package_a": {"name": "test-suite", "requires": ">=8.0.0"}, + "package_b": {"name": "legacy-tests", "requires": "<7.0.0"}, + } + + # Mock AI response for completely incompatible versions + self.resolver.handler.ask.return_value = ( + '[{"id": 1, "type": "Breaking", ' + '"action": "Use pytest 8.0.0 and update legacy-tests", ' + '"risk": "High"}]' + ) + + strategies = self.resolver.resolve(conflict_data) + + # AI should be called + self.assertTrue(self.resolver.handler.ask.called) + + # Should return high-risk strategy + self.assertEqual(strategies[0]["risk"], "High") + self.assertIn("pytest", strategies[0]["action"]) + + def test_exact_version_match(self): + """Test resolution when both packages require exact same version.""" + conflict_data = { + "dependency": "numpy", + "package_a": {"name": "ml-lib", "requires": "==1.24.0"}, + "package_b": {"name": "data-lib", "requires": "==1.24.0"}, + } + + strategies = self.resolver.resolve(conflict_data) + + # Should resolve deterministically + self.assertEqual(strategies[0]["risk"], "Low") + self.assertIn("1.24.0", strategies[0]["action"]) + self.assertFalse(self.resolver.handler.ask.called) + + def test_ai_returns_multiple_strategies(self): + """Test handling of multiple resolution strategies from AI.""" + conflict_data = { + "dependency": "requests", + "package_a": {"name": "api-client", "requires": "^2.28.0"}, + "package_b": {"name": "web-scraper", "requires": "~2.27.0"}, + } + + # Mock AI returning multiple strategies + self.resolver.handler.ask.return_value = ( + "[" + '{"id": 1, "type": "Recommended", "action": "Use requests 2.28.1", "risk": "Low"},' + '{"id": 2, "type": "Alternative", "action": "Use requests 2.27.1", "risk": "Medium"}' + "]" + ) + + strategies = self.resolver.resolve(conflict_data) + + # Should return both strategies + self.assertEqual(len(strategies), 2) + self.assertEqual(strategies[0]["risk"], "Low") + self.assertEqual(strategies[1]["risk"], "Medium") + + def test_whitespace_handling_in_constraints(self): + """Test that version constraints with whitespace are handled correctly.""" + conflict_data = { + "dependency": "django", + "package_a": {"name": "app-1", "requires": " >=3.0.0 "}, + "package_b": {"name": "app-2", "requires": " <4.0.0 "}, + } + + strategies = self.resolver.resolve(conflict_data) + + # This will now pass because of the .strip() and .match() logic + self.assertEqual(strategies[0]["risk"], "Low") + self.assertFalse(self.resolver.handler.ask.called) + + +if __name__ == "__main__": + unittest.main()