diff --git a/CLAUDE.md b/CLAUDE.md index e0f9ea3e..7391dc77 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -116,6 +116,7 @@ npm run lint # Run ESLint ruff check . # Lint mypy . # Type check python test_security.py # Security unit tests (12 tests) +python test_detach.py # Detach/reattach tests (53 tests) python test_security_integration.py # Integration tests (9 tests) python -m pytest test_client.py # Client tests (20 tests) python -m pytest test_dependency_resolver.py # Dependency resolver tests (12 tests) @@ -175,6 +176,7 @@ Publishing: `npm publish` (triggers `prepublishOnly` which builds UI, then publi - `api/database.py` - SQLAlchemy models (Feature, Schedule, ScheduleOverride) - `api/dependency_resolver.py` - Cycle detection (Kahn's algorithm + DFS) and dependency validation - `api/migration.py` - JSON-to-SQLite migration utility +- `detach.py` - Project detach/reattach functionality for Claude Code integration ### Project Registry @@ -393,11 +395,62 @@ blocked_commands: **Files:** - `security.py` - Command validation logic and hardcoded blocklist - `test_security.py` - Unit tests for security system +- `test_detach.py` - Unit tests for detach/reattach functionality (53 tests) - `test_security_integration.py` - Integration tests with real hooks - `examples/project_allowed_commands.yaml` - Project config example (all commented by default) - `examples/org_config.yaml` - Org config example (all commented by default) - `examples/README.md` - Comprehensive guide with use cases, testing, and troubleshooting +#### Project Detach/Reattach + +The detach feature allows temporarily removing AutoForge files from a project, enabling Claude Code to run without AutoForge restrictions on completed projects. + +**CLI Usage:** + +```bash +# Detach project (move AutoForge files to backup) +python detach.py my-project + +# Reattach project (restore files from backup) +python detach.py --reattach my-project + +# Check status +python detach.py --status my-project + +# List all projects with detach status +python detach.py --list + +# Preview detach operation (dry run) +python detach.py --dry-run my-project + +# Exclude .playwright-mcp artifacts from backup +python detach.py --no-artifacts my-project +``` + +**API Endpoints:** + +- `GET /api/projects/{name}/detach-status` - Check if project is detached +- `POST /api/projects/{name}/detach` - Detach project (move files to backup) +- `POST /api/projects/{name}/reattach` - Reattach project (restore from backup) + +**Security Features:** + +- Path traversal protection during restore (validates all paths stay within project directory) +- Copy-then-delete backup approach (atomic operations prevent data loss on partial failures) +- Lock file with PID/timestamp for stale lock recovery +- Manifest version validation for forward compatibility + +**Files backed up:** + +- `.autoforge/` directory +- `prompts/` directory (legacy location) +- `.playwright-mcp/` directory (unless `--no-artifacts`) +- `features.db`, `assistant.db` (and WAL files) +- `CLAUDE.md`, `.claude_settings.json`, `.agent.lock` +- Generated test files (`test-*.json`, `test-*.py`, etc.) + +**Tests:** `test_detach.py` (53 tests including security tests) + ### Vertex AI Configuration (Optional) Run coding agents via Google Cloud Vertex AI: diff --git a/detach.py b/detach.py new file mode 100644 index 00000000..71d6a14b --- /dev/null +++ b/detach.py @@ -0,0 +1,1305 @@ +#!/usr/bin/env python3 +""" +Project Detach/Reattach Module +============================== + +Manages the separation of AutoForge files from project directories, +allowing Claude Code to run without restrictions on completed projects. + +Features: +- Detach: Moves AutoForge files to .autoforge-backup/ +- Reattach: Restores files from backup +- Status: Checks detach state and backup info +""" + +import argparse +import hashlib +import json +import logging +import os +import shutil +import sys +import tempfile +import tomllib +from datetime import datetime, timezone +from pathlib import Path +from typing import TypedDict + +from registry import get_project_path, list_registered_projects + +# Module logger +logger = logging.getLogger(__name__) + +# Backup directory name +BACKUP_DIR = ".autoforge-backup" +PRE_REATTACH_BACKUP_DIR = ".pre-reattach-backup" +MANIFEST_FILE = "manifest.json" +DETACH_LOCK = ".autoforge-detach.lock" + +# Version for manifest format +MANIFEST_VERSION = 1 + +# Lock file timeout in seconds (5 minutes) +LOCK_TIMEOUT_SECONDS = 300 + + +def get_autoforge_version() -> str: + """Get autoforge version from pyproject.toml, with fallback.""" + try: + pyproject_path = Path(__file__).parent / "pyproject.toml" + if pyproject_path.exists(): + with open(pyproject_path, "rb") as f: + data = tomllib.load(f) + version = data.get("project", {}).get("version", "1.0.0") + return str(version) if version is not None else "1.0.0" + except Exception as e: + logger.debug("Failed to read version from pyproject.toml: %s", e) + return "1.0.0" # Fallback + + +# AutoForge file patterns to detect and move +# Directories (will be moved recursively) +AUTOFORGE_DIRECTORIES = { + ".autoforge", + ".autocoder", # Legacy fallback + "prompts", + ".playwright-mcp", +} + +# Files with exact names +AUTOFORGE_FILES = { + "features.db", + "features.db-shm", # SQLite shared memory file + "features.db-wal", # SQLite write-ahead log + "assistant.db", + "assistant.db-shm", # SQLite shared memory file + "assistant.db-wal", # SQLite write-ahead log + "CLAUDE.md", + ".claude_settings.json", + ".claude_assistant_settings.json", + ".agent.lock", + "claude-progress.txt", +} + +# Glob patterns for generated files (searched in AUTOFORGE_DIRECTORIES only) +AUTOFORGE_PATTERNS = [ + "test-*.json", + "test-*.py", + "test-*.html", + "test-*.sql", # SQL test files + "test-*.php", # PHP test files + "create-*-test*.php", # Test helper scripts (e.g., create-xss-test.php) + "rollback-*.json", # Rollback test data + "generate-*.py", + "mark_feature*.py", + ".claude_settings.expand.*.json", +] + +# Patterns for agent-generated files at ROOT level +# More specific patterns to avoid false positives with user files like test-myfeature.py +AUTOFORGE_ROOT_PATTERNS = [ + "test-feature*.json", # Feature test data + "test-feature*.py", # Feature test scripts + "test-feature*.html", # Feature test pages + "test-feature*.sql", # Feature test SQL + "test-feature*.php", # Feature test PHP + "generate-*.py", # Generator scripts + "mark_feature*.py", # Feature marking scripts + "rollback-*.json", # Rollback data + "create-*-test*.php", # Test helper scripts +] + + +class FileEntry(TypedDict): + """Type for manifest file entry.""" + path: str + type: str # "file", "directory", or "symlink" + size: int + checksum: str | None # MD5 for files, None for directories + file_count: int | None # Number of files for directories + + +class Manifest(TypedDict): + """Type for manifest.json structure.""" + version: int + detached_at: str + project_name: str + autocoder_version: str + files: list[FileEntry] + total_size_bytes: int + file_count: int + + +def compute_file_checksum(file_path: Path, algorithm: str = "sha256") -> str: + """Compute checksum for a file using specified algorithm. + + Args: + file_path: Path to the file + algorithm: Hash algorithm to use ("sha256" or "md5") + + Returns: + Hex digest of the file checksum + """ + if algorithm == "md5": + hasher = hashlib.md5(usedforsecurity=False) + else: + hasher = hashlib.sha256() + with open(file_path, "rb") as f: + for chunk in iter(lambda: f.read(8192), b""): + hasher.update(chunk) + return hasher.hexdigest() + + +def verify_checksum(file_path: Path, expected: str) -> bool: + """Verify file checksum with algorithm auto-detection (SHA-256 vs MD5). + + Detects algorithm by checksum length: MD5=32 hex chars, SHA-256=64 hex chars. + This provides backward compatibility with older backups using MD5. + + Args: + file_path: Path to the file to verify + expected: Expected checksum value + + Returns: + True if checksum matches, False otherwise + """ + # Detect algorithm by checksum length: MD5=32 hex chars, SHA-256=64 hex chars + if len(expected) == 32: + actual = compute_file_checksum(file_path, algorithm="md5") + else: + actual = compute_file_checksum(file_path, algorithm="sha256") + return actual == expected + + +def get_directory_info(dir_path: Path) -> tuple[int, int]: + """Get total size and file count for a directory.""" + total_size = 0 + file_count = 0 + for item in dir_path.rglob("*"): + if item.is_file(): + total_size += item.stat().st_size + file_count += 1 + return total_size, file_count + + +def get_autoforge_files(project_dir: Path, include_artifacts: bool = True) -> list[Path]: + """ + Detect all AutoForge files in a project directory. + + Args: + project_dir: Path to the project directory + include_artifacts: Whether to include .playwright-mcp and other artifacts + + Returns: + List of Path objects for AutoForge files/directories + """ + files = [] + + # Check directories + for dir_name in AUTOFORGE_DIRECTORIES: + if not include_artifacts and dir_name == ".playwright-mcp": + continue + dir_path = project_dir / dir_name + if dir_path.exists(): + files.append(dir_path) + + # Check exact files + for file_name in AUTOFORGE_FILES: + file_path = project_dir / file_name + if file_path.exists(): + files.append(file_path) + + # Check glob patterns ONLY in AutoForge-owned directories + # to avoid accidentally moving user files like test-myfeature.py + for dir_name in AUTOFORGE_DIRECTORIES: + dir_path = project_dir / dir_name + if dir_path.exists() and dir_path.is_dir(): + for pattern in AUTOFORGE_PATTERNS: + for match in dir_path.rglob(pattern): + if match.exists() and match not in files: + files.append(match) + + # Check ROOT-safe patterns at project root level + # These are more specific patterns to avoid false positives + for pattern in AUTOFORGE_ROOT_PATTERNS: + for match in project_dir.glob(pattern): # glob, not rglob - root level only + if match.exists() and match not in files: + files.append(match) + + return sorted(files, key=lambda p: p.name) + + +def is_project_detached(project_dir: Path) -> bool: + """Check if a project is currently detached.""" + manifest_path = project_dir / BACKUP_DIR / MANIFEST_FILE + return manifest_path.exists() + + +def get_project_detach_state(project_dir: Path, include_artifacts: bool = True) -> tuple[str, list[Path]]: + """ + Determine the actual detach state of a project. + + This function detects inconsistent states where both manifest AND files exist, + which can happen after a partial reattach operation. + + Args: + project_dir: Path to the project directory + include_artifacts: Whether to include .playwright-mcp and other artifacts + + Returns: + Tuple of (state, files) where state is one of: + - "detached": Manifest exists, no AutoForge files at root + - "attached": No manifest, files present at root + - "inconsistent": Both manifest and files exist (needs cleanup) + - "clean": No manifest, no AutoForge files + """ + manifest_exists = is_project_detached(project_dir) + files = get_autoforge_files(project_dir, include_artifacts=include_artifacts) + + if manifest_exists and files: + return "inconsistent", files + elif manifest_exists and not files: + return "detached", [] + elif not manifest_exists and files: + return "attached", files + else: + return "clean", [] + + +def has_backup(project_dir: Path) -> bool: + """Check if a backup exists for a project.""" + return is_project_detached(project_dir) + + +def get_backup_info(project_dir: Path) -> Manifest | None: + """ + Get manifest info from backup if it exists. + + Returns: + Manifest dict or None if no backup exists + """ + manifest_path = project_dir / BACKUP_DIR / MANIFEST_FILE + if not manifest_path.exists(): + return None + + try: + with open(manifest_path, "r", encoding="utf-8") as f: + data: Manifest = json.load(f) + return data + except (json.JSONDecodeError, OSError) as e: + logger.warning("Failed to read manifest: %s", e) + return None + + +def acquire_detach_lock(project_dir: Path) -> bool: + """ + Acquire lock for detach operations using atomic file creation. + + Uses O_CREAT|O_EXCL for atomic lock creation to prevent TOCTOU race conditions. + Writes PID and timestamp to lock file for stale lock detection. + + Returns: + True if lock acquired, False if already locked + """ + lock_file = project_dir / DETACH_LOCK + + def try_atomic_create() -> bool: + """Attempt atomic lock file creation. Returns True if successful.""" + try: + fd = os.open(str(lock_file), os.O_CREAT | os.O_EXCL | os.O_WRONLY) + try: + lock_data = { + "pid": os.getpid(), + "timestamp": datetime.now(timezone.utc).timestamp(), + } + os.write(fd, json.dumps(lock_data).encode("utf-8")) + finally: + os.close(fd) + return True + except FileExistsError: + return False + except OSError as e: + logger.error("Failed to create lock file: %s", e) + return False + + # First attempt + if try_atomic_create(): + return True + + # Lock exists - check if stale/corrupted + try: + lock_data = json.loads(lock_file.read_text(encoding="utf-8")) + lock_pid = lock_data.get("pid") + lock_time = lock_data.get("timestamp", 0) + + if lock_pid is not None: + try: + os.kill(lock_pid, 0) # Check if process exists + # Process exists, check timeout + elapsed = datetime.now(timezone.utc).timestamp() - lock_time + if elapsed < LOCK_TIMEOUT_SECONDS: + return False # Valid lock held by another process + logger.warning("Removing stale lock (timeout): pid=%s", lock_pid) + except OSError: + # Process doesn't exist - stale lock + logger.warning("Removing stale lock (dead process): pid=%s", lock_pid) + except (json.JSONDecodeError, OSError, KeyError): + logger.warning("Removing corrupted lock file") + + # Remove stale/corrupted lock and retry once + try: + lock_file.unlink() + except OSError: + pass + + return try_atomic_create() + + +def release_detach_lock(project_dir: Path) -> None: + """Release detach operation lock.""" + lock_file = project_dir / DETACH_LOCK + lock_file.unlink(missing_ok=True) + + +def create_backup( + project_dir: Path, + project_name: str, + files: list[Path], + dry_run: bool = False +) -> Manifest: + """ + Create backup of AutoForge files. + + Uses copy-then-delete approach to prevent data loss on partial failures. + + Args: + project_dir: Path to project directory + project_name: Name of the project + files: List of files/directories to backup + dry_run: If True, only simulate the operation + + Returns: + Manifest describing the backup + """ + backup_dir = project_dir / BACKUP_DIR + + # Build manifest + manifest_files: list[FileEntry] = [] + total_size = 0 + total_file_count = 0 + + for file_path in files: + relative_path = file_path.relative_to(project_dir) + + if file_path.is_symlink(): + # Handle symlinks before is_dir() which follows symlinks + manifest_files.append({ + "path": str(relative_path), + "type": "symlink", + "size": 0, + "checksum": None, + "file_count": None, + }) + total_file_count += 1 + elif file_path.is_dir(): + size, count = get_directory_info(file_path) + manifest_files.append({ + "path": str(relative_path), + "type": "directory", + "size": size, + "checksum": None, + "file_count": count, + }) + total_size += size + total_file_count += count + else: + size = file_path.stat().st_size + checksum = compute_file_checksum(file_path) if not dry_run else "dry-run" + manifest_files.append({ + "path": str(relative_path), + "type": "file", + "size": size, + "checksum": checksum, + "file_count": None, + }) + total_size += size + total_file_count += 1 + + manifest: Manifest = { + "version": MANIFEST_VERSION, + "detached_at": datetime.now(timezone.utc).isoformat(), + "project_name": project_name, + "autocoder_version": get_autoforge_version(), + "files": manifest_files, + "total_size_bytes": total_size, + "file_count": total_file_count, + } + + if dry_run: + return manifest + + # Create backup directory + backup_dir.mkdir(parents=True, exist_ok=True) + phase = 1 # Track phase: 1=Copy, 2=Manifest, 3=Delete originals + + try: + # Phase 1: Copy files to backup (preserves originals on failure) + for file_path in files: + relative_path = file_path.relative_to(project_dir) + dest_path = backup_dir / relative_path + + # Ensure parent directory exists + dest_path.parent.mkdir(parents=True, exist_ok=True) + + # Copy file/directory (handle symlinks explicitly) + if file_path.is_symlink(): + # Preserve symlinks as symlinks + link_target = os.readlink(file_path) + dest_path.symlink_to(link_target) + elif file_path.is_dir(): + shutil.copytree(file_path, dest_path, symlinks=True) + else: + shutil.copy2(file_path, dest_path) + + logger.debug("Copied %s to backup", relative_path) + + # Phase 2: Write manifest (before deleting originals) + phase = 2 + manifest_path = backup_dir / MANIFEST_FILE + with open(manifest_path, "w", encoding="utf-8") as f: + json.dump(manifest, f, indent=2) + + # Phase 3: Delete originals (only after successful copy + manifest) + phase = 3 + logger.debug("Phase 3: Deleting %d original files", len(files)) + for file_path in files: + if file_path.is_dir() and not file_path.is_symlink(): + shutil.rmtree(file_path) + else: + file_path.unlink() + logger.debug("Removed original: %s", file_path.relative_to(project_dir)) + + except Exception as e: + # Cleanup partial backup on failure - but only for Phase 1/2 + # Phase 3 failure means backup is valid, keep it for recovery + if phase < 3: + logger.error("Backup failed in phase %d: %s - cleaning up partial backup", phase, e) + if backup_dir.exists(): + shutil.rmtree(backup_dir) + else: + logger.error("Delete originals failed in phase 3: %s - backup preserved for recovery", e) + raise + + return manifest + + +def restore_backup(project_dir: Path, verify_checksums: bool = False) -> tuple[bool, int, list[str]]: + """ + Restore files from backup. + + Uses copy-then-delete approach to prevent data loss on partial failures. + Detects and backs up conflicting user files before restore. + + Args: + project_dir: Path to project directory + verify_checksums: If True, verify file checksums after restore + + Returns: + Tuple of (success, files_restored, conflicts_backed_up) + """ + backup_dir = project_dir / BACKUP_DIR + manifest_path = backup_dir / MANIFEST_FILE + project_dir_resolved = project_dir.resolve() + + if not manifest_path.exists(): + logger.error("No backup manifest found") + return False, 0, [] + + # Read manifest + try: + with open(manifest_path, "r", encoding="utf-8") as f: + manifest: Manifest = json.load(f) + except (json.JSONDecodeError, OSError) as e: + logger.error("Failed to read manifest: %s", e) + return False, 0, [] + + # Validate manifest structure + required_keys = {"version", "files", "detached_at"} + if not required_keys.issubset(manifest.keys()): + logger.error("Invalid manifest structure: missing required keys") + return False, 0, [] + + # Check manifest version compatibility + manifest_version = manifest.get("version", 1) + if manifest_version > MANIFEST_VERSION: + logger.error( + "Manifest version %d not supported (max: %d)", + manifest_version, MANIFEST_VERSION + ) + return False, 0, [] + + # Detect and backup user files that would be overwritten + conflicts = detect_conflicts(project_dir, manifest) + if conflicts: + backup_conflicts(project_dir, conflicts) + logger.info("Backed up %d user files to %s", len(conflicts), PRE_REATTACH_BACKUP_DIR) + + # Restore files + files_restored = 0 + restored_entries: list[FileEntry] = [] + + for entry in manifest["files"]: + src_path = backup_dir / entry["path"] + dest_path = project_dir / entry["path"] + + # SECURITY: Validate path to prevent path traversal attacks + try: + dest_resolved = dest_path.resolve() + # Ensure the resolved path is within the project directory + dest_resolved.relative_to(project_dir_resolved) + except ValueError: + logger.error("Path traversal detected: %s", entry["path"]) + return False, 0, [] + + if not src_path.exists(): + logger.warning("Backup file missing: %s", entry["path"]) + continue + + # Ensure parent directory exists + dest_path.parent.mkdir(parents=True, exist_ok=True) + + # Atomic copy-then-replace: copy to temp, then atomically replace destination + temp_path: Path | None = None + try: + if src_path.is_symlink(): + # Symlinks can be created atomically - remove existing first + if dest_path.exists() or dest_path.is_symlink(): + if dest_path.is_dir() and not dest_path.is_symlink(): + shutil.rmtree(dest_path) + else: + dest_path.unlink() + link_target = os.readlink(src_path) + dest_path.symlink_to(link_target) + elif src_path.is_dir(): + # Directories: copy to temp location, then replace + temp_fd, temp_path_str = tempfile.mkstemp( + dir=dest_path.parent, + prefix=f".{dest_path.name}.", + suffix=".tmp" + ) + temp_path = Path(temp_path_str) + os.close(temp_fd) + # mkstemp creates a file, but we need a directory + temp_path.unlink() + shutil.copytree(src_path, temp_path, symlinks=True) + + # Remove existing destination if needed + if dest_path.exists(): + if dest_path.is_dir() and not dest_path.is_symlink(): + shutil.rmtree(dest_path) + else: + dest_path.unlink() + + os.replace(temp_path, dest_path) + temp_path = None # Successfully moved, no cleanup needed + else: + # Files: copy to temp location, then atomically replace + temp_fd, temp_path_str = tempfile.mkstemp( + dir=dest_path.parent, + prefix=f".{dest_path.name}.", + suffix=".tmp" + ) + temp_path = Path(temp_path_str) + os.close(temp_fd) + shutil.copy2(src_path, temp_path) + + # Remove existing destination if needed (handles dir where file should be) + if dest_path.exists(): + if dest_path.is_dir() and not dest_path.is_symlink(): + shutil.rmtree(dest_path) + else: + dest_path.unlink() + + # Atomic replace + os.replace(temp_path, dest_path) + temp_path = None # Successfully moved, no cleanup needed + + except OSError as e: + logger.error("Failed to restore %s: %s", entry["path"], e) + # Clean up temp file/directory on failure + if temp_path and temp_path.exists(): + try: + if temp_path.is_dir(): + shutil.rmtree(temp_path) + else: + temp_path.unlink() + except OSError: + pass + return False, files_restored, conflicts + + # Verify checksum if requested and available + entry_checksum = entry.get("checksum") + if verify_checksums and entry_checksum and entry["type"] == "file": + if not verify_checksum(dest_path, entry_checksum): + logger.error( + "Checksum mismatch for %s: expected %s", + entry["path"], entry["checksum"] + ) + return False, files_restored, conflicts + + if entry["type"] == "directory": + files_restored += entry.get("file_count") or 0 + else: + files_restored += 1 + + restored_entries.append(entry) + logger.debug("Restored %s", entry["path"]) + + # Only remove backup directory if ALL files were restored + expected_count = len(manifest["files"]) + restored_count = len(restored_entries) + + if restored_count == expected_count: + shutil.rmtree(backup_dir) + logger.info("Backup directory removed after successful restore") + return True, files_restored, conflicts + else: + # Partial restore - delete manifest to allow re-detach, but keep backup files + manifest_path = backup_dir / MANIFEST_FILE + manifest_path.unlink(missing_ok=True) + logger.warning( + "Partial restore: %d/%d files - manifest removed to allow re-detach, backup files preserved", + restored_count, expected_count + ) + return False, files_restored, conflicts + + +def update_gitignore(project_dir: Path) -> None: + """Add backup directories to .gitignore if not already present.""" + gitignore_path = project_dir / ".gitignore" + + patterns = [ + (f"{BACKUP_DIR}/", "AutoForge backup (for reattach)"), + (f"{PRE_REATTACH_BACKUP_DIR}/", "User files backup (for detach)"), + ] + + if gitignore_path.exists(): + content = gitignore_path.read_text(encoding="utf-8") + lines = content.splitlines() + + for pattern, comment in patterns: + if not any(line.strip() == pattern for line in lines): + with open(gitignore_path, "a", encoding="utf-8") as f: + f.write(f"\n# {comment}\n{pattern}\n") + logger.info("Added %s to .gitignore", pattern) + else: + entries = "\n".join(f"# {comment}\n{pattern}" for pattern, comment in patterns) + gitignore_path.write_text(entries + "\n", encoding="utf-8") + logger.info("Created .gitignore with backup entries") + + +def detect_conflicts(project_dir: Path, manifest: Manifest) -> list[str]: + """Return list of relative paths that exist in both backup and project. + + These are files the user created/modified after detaching that would + be overwritten by restoring autoforge files. + + Args: + project_dir: Path to the project directory + manifest: The backup manifest containing file entries + + Returns: + List of relative path strings for conflicting files + """ + conflicts = [] + project_dir_resolved = project_dir.resolve() + for entry in manifest["files"]: + dest = project_dir / entry["path"] + # SECURITY: Validate path to prevent path traversal attacks + try: + dest.resolve().relative_to(project_dir_resolved) + except ValueError: + logger.error("Path traversal detected in manifest: %s", entry["path"]) + continue # Skip malicious path + if dest.exists(): + conflicts.append(entry["path"]) + return conflicts + + +def backup_conflicts(project_dir: Path, conflicts: list[str]) -> Path: + """Backup conflicting user files to .pre-reattach-backup/ before restore. + + If backup dir already exists, merges new conflicts (doesn't overwrite). + + Args: + project_dir: Path to the project directory + conflicts: List of relative paths to backup + + Returns: + Path to the backup directory + """ + backup_dir = project_dir / PRE_REATTACH_BACKUP_DIR + backup_dir.mkdir(parents=True, exist_ok=True) + backup_dir_resolved = backup_dir.resolve() + + for rel_path in conflicts: + src = project_dir / rel_path + dest = backup_dir / rel_path + + # SECURITY: Validate path to prevent path traversal attacks + try: + dest.resolve().relative_to(backup_dir_resolved) + except ValueError: + logger.error("Path traversal detected in conflicts: %s", rel_path) + continue # Skip malicious path + + # Don't overwrite existing backups (merge mode) + if dest.exists(): + logger.debug("Skipping existing backup: %s", rel_path) + continue + + dest.parent.mkdir(parents=True, exist_ok=True) + + if src.is_dir(): + shutil.copytree(src, dest, symlinks=True) + else: + shutil.copy2(src, dest) + + logger.debug("Backed up user file: %s", rel_path) + + return backup_dir + + +def restore_pre_reattach_backup(project_dir: Path) -> int: + """Restore user files from .pre-reattach-backup/ after detaching. + + Includes path traversal protection. + + Args: + project_dir: Path to the project directory + + Returns: + Number of files restored + """ + backup_dir = project_dir / PRE_REATTACH_BACKUP_DIR + if not backup_dir.exists(): + return 0 + + project_dir_resolved = project_dir.resolve() + files_restored = 0 + files_failed = 0 + + for item in backup_dir.rglob("*"): + if item.is_file(): + rel_path = item.relative_to(backup_dir) + dest = project_dir / rel_path + + # SECURITY: Path traversal protection + try: + dest.resolve().relative_to(project_dir_resolved) + except ValueError: + logger.error("Path traversal detected in pre-reattach backup: %s", rel_path) + continue # Skip malicious path + + try: + dest.parent.mkdir(parents=True, exist_ok=True) + shutil.copy2(item, dest) + files_restored += 1 + logger.debug("Restored user file: %s", rel_path) + except OSError as e: + logger.error("Failed to restore user file %s: %s", rel_path, e) + files_failed += 1 + + # Only clean up backup directory if all files were restored successfully + if files_failed == 0: + shutil.rmtree(backup_dir) + logger.info("Removed %s after restoring %d files", PRE_REATTACH_BACKUP_DIR, files_restored) + else: + logger.warning("Kept %s - %d files failed to restore", PRE_REATTACH_BACKUP_DIR, files_failed) + + return files_restored + + +def _checkpoint_databases(project_dir: Path) -> None: + """Checkpoint SQLite databases to merge WAL files into main database. + + This ensures -wal and -shm files are empty/minimal before backup, + preventing them from being recreated during the detach operation. + """ + import sqlite3 + + for db_name in ["features.db", "assistant.db"]: + db_file = project_dir / db_name + if db_file.exists(): + try: + conn = sqlite3.connect(str(db_file)) + conn.execute("PRAGMA wal_checkpoint(TRUNCATE)") + conn.close() + logger.debug(f"Checkpointed {db_name}") + except Exception as e: + logger.warning(f"Failed to checkpoint {db_name}: {e}") + + +def detach_project( + name_or_path: str, + force: bool = False, + include_artifacts: bool = True, + dry_run: bool = False +) -> tuple[bool, str, Manifest | None, int]: + """ + Detach a project by moving AutoForge files to backup. + + Args: + name_or_path: Project name (from registry) or absolute path + force: Skip confirmations + include_artifacts: Include .playwright-mcp and other artifacts + dry_run: Only simulate, don't actually move files + + Returns: + Tuple of (success, message, manifest, user_files_restored) + """ + # Resolve project path + project_dir = get_project_path(name_or_path) + if project_dir is None: + # Try as path + project_dir = Path(name_or_path) + if not project_dir.exists(): + return False, f"Project '{name_or_path}' not found in registry and path doesn't exist", None, 0 + + project_dir = Path(project_dir).resolve() + project_name = name_or_path + + # Check project state + state, existing_files = get_project_detach_state(project_dir, include_artifacts) + + if state == "detached": + return False, "Project is already detached. Use --reattach to restore.", None, 0 + elif state == "inconsistent": + # Files exist but so does manifest - likely partial reattach + # Clean up old backup and proceed with fresh detach + if not force: + return False, ( + "Inconsistent state detected: backup manifest exists but AutoForge files are also present. " + "This can happen after a partial reattach. Use --force to clean up and detach." + ), None, 0 + # Force mode: remove old backup and proceed + backup_dir = project_dir / BACKUP_DIR + if not dry_run: + shutil.rmtree(backup_dir) + logger.info("Removed stale backup directory due to --force") + elif state == "clean": + return False, "No AutoForge files found in project.", None, 0 + # state == "attached" -> proceed normally with existing_files + + # Clean up orphaned backup directory (exists without manifest) + # This can happen after partial reattach removes manifest but keeps backup files + backup_dir = project_dir / BACKUP_DIR + if backup_dir.exists() and not (backup_dir / MANIFEST_FILE).exists(): + if not dry_run: + shutil.rmtree(backup_dir) + logger.info("Removed orphaned backup directory (no manifest)") + + # Check for agent lock + agent_lock = project_dir / ".agent.lock" + if agent_lock.exists() and not force: + return False, "Agent is currently running. Stop the agent first or use --force.", None, 0 + + # Acquire detach lock + if not dry_run and not acquire_detach_lock(project_dir): + return False, "Another detach operation is in progress.", None, 0 + + try: + # Use files from state detection if available, otherwise get them fresh + files = existing_files if existing_files else get_autoforge_files(project_dir, include_artifacts) + if not files: + return False, "No AutoForge files found in project.", None, 0 + + # Checkpoint databases to merge WAL files before backup + if not dry_run: + _checkpoint_databases(project_dir) + + # Create backup + manifest = create_backup(project_dir, project_name, files, dry_run) + + # Update .gitignore + if not dry_run: + update_gitignore(project_dir) + + # Restore user files from pre-reattach backup if exists + user_files_restored = 0 + if not dry_run: + user_files_restored = restore_pre_reattach_backup(project_dir) + + action = "Would move" if dry_run else "Moved" + message = f"{action} {manifest['file_count']} files ({manifest['total_size_bytes'] / 1024 / 1024:.1f} MB) to backup" + if user_files_restored > 0: + message += f", restored {user_files_restored} user files" + + return True, message, manifest, user_files_restored + + finally: + if not dry_run: + release_detach_lock(project_dir) + + +def _cleanup_orphaned_db_files(project_dir: Path, manifest: Manifest) -> list[str]: + """Remove database files that were recreated after detach. + + When the UI/API accesses a detached project, it may recreate empty + database files. This function detects and removes them before restore. + + Heuristic: If root file is smaller than backup file, it was recreated empty. + + Args: + project_dir: Path to the project directory + manifest: Backup manifest containing original file info + + Returns: + List of files that were cleaned up + """ + cleaned = [] + + # Build map of backup database files with their sizes + backup_db_files = {} + for entry in manifest.get("files", []): + path = entry.get("path", "") + if path in ("features.db", "assistant.db"): + backup_db_files[path] = entry.get("size", 0) + + for db_name in ["features.db", "assistant.db"]: + root_file = project_dir / db_name + + # If root file exists but backup also has it, check if recreated + if root_file.exists() and db_name in backup_db_files: + root_size = root_file.stat().st_size + backup_size = backup_db_files[db_name] + + # If root is much smaller than backup, it was likely recreated empty + # Empty SQLite DB is typically 4-8KB, real DB with features is much larger + if backup_size > 0 and root_size < backup_size: + try: + root_file.unlink() + cleaned.append(db_name) + logger.info(f"Removed recreated {db_name} ({root_size}B < {backup_size}B backup)") + except OSError as e: + logger.warning(f"Failed to remove orphaned {db_name}: {e}") + + # Always clean WAL/SHM files at root - they should be in backup if needed + for ext in ["-shm", "-wal"]: + wal_file = project_dir / f"{db_name}{ext}" + if wal_file.exists(): + try: + wal_file.unlink() + cleaned.append(f"{db_name}{ext}") + logger.debug(f"Removed orphaned {db_name}{ext}") + except OSError as e: + logger.warning(f"Failed to remove {db_name}{ext}: {e}") + + return cleaned + + +def reattach_project(name_or_path: str) -> tuple[bool, str, int, list[str]]: + """ + Reattach a project by restoring AutoForge files from backup. + + Args: + name_or_path: Project name (from registry) or absolute path + + Returns: + Tuple of (success, message, files_restored, conflicts_backed_up) + """ + # Resolve project path + project_dir = get_project_path(name_or_path) + if project_dir is None: + project_dir = Path(name_or_path) + if not project_dir.exists(): + return False, f"Project '{name_or_path}' not found in registry and path doesn't exist", 0, [] + + project_dir = Path(project_dir).resolve() + + # Check for agent lock - don't reattach while agent is running + agent_lock = project_dir / ".agent.lock" + if agent_lock.exists(): + return False, "Agent is currently running. Stop the agent first.", 0, [] + + # Check if backup exists + if not has_backup(project_dir): + # Distinguish between "attached" (files at root) and "clean" (no files) + files_at_root = get_autoforge_files(project_dir) + if files_at_root: + return False, "Project is already attached. Nothing to restore.", 0, [] + return False, "No backup found. Project is not detached.", 0, [] + # Backup exists - proceed with restore (handles "detached" and "inconsistent" states) + + # Acquire detach lock + if not acquire_detach_lock(project_dir): + return False, "Another detach operation is in progress.", 0, [] + + try: + # Read manifest for cleanup decision + manifest = get_backup_info(project_dir) + + # Clean up orphaned database files that may have been recreated + # by the UI/API accessing the detached project + if manifest: + cleaned = _cleanup_orphaned_db_files(project_dir, manifest) + if cleaned: + logger.info(f"Cleaned up {len(cleaned)} orphaned files before restore: {cleaned}") + + success, files_restored, conflicts = restore_backup(project_dir) + if success: + if conflicts: + return True, f"Restored {files_restored} files. {len(conflicts)} user files saved to {PRE_REATTACH_BACKUP_DIR}/", files_restored, conflicts + return True, f"Restored {files_restored} files from backup", files_restored, [] + else: + return False, "Failed to restore backup", 0, [] + finally: + release_detach_lock(project_dir) + + +def get_detach_status(name_or_path: str) -> dict: + """ + Get detach status for a project. + + Args: + name_or_path: Project name (from registry) or absolute path + + Returns: + Dict with status information including: + - state: "detached", "attached", "inconsistent", or "clean" + - is_detached: True if cleanly detached + - is_inconsistent: True if both manifest and files exist + - files_at_root: Number of AutoForge files at project root + - backup_exists: True if backup directory exists + """ + project_dir = get_project_path(name_or_path) + if project_dir is None: + project_dir = Path(name_or_path) + if not project_dir.exists(): + return { + "state": "error", + "is_detached": False, + "is_inconsistent": False, + "files_at_root": 0, + "backup_exists": False, + "error": f"Project '{name_or_path}' not found", + } + + project_dir = Path(project_dir).resolve() + state, files = get_project_detach_state(project_dir) + backup_dir = project_dir / BACKUP_DIR + manifest = get_backup_info(project_dir) if backup_dir.exists() else None + + return { + "state": state, + "is_detached": state == "detached", + "is_inconsistent": state == "inconsistent", + "files_at_root": len(files), + "backup_exists": backup_dir.exists(), + "backup_size": manifest["total_size_bytes"] if manifest else None, + "detached_at": manifest["detached_at"] if manifest else None, + "file_count": manifest["file_count"] if manifest else None, + } + + +def list_projects_with_status() -> list[dict]: + """ + List all registered projects with their detach status. + + Returns: + List of project dicts with name, path, and is_detached + """ + projects = list_registered_projects() + result = [] + + for name, info in projects.items(): + project_dir = Path(info["path"]) + if project_dir.exists(): + result.append({ + "name": name, + "path": info["path"], + "is_detached": is_project_detached(project_dir), + }) + + return sorted(result, key=lambda p: p["name"]) + + +def main() -> int: + """CLI entry point.""" + parser = argparse.ArgumentParser( + description="Detach/Reattach AutoForge files from projects", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=""" +Examples: + python detach.py my-project # Detach project + python detach.py --reattach my-project # Reattach project + python detach.py --status my-project # Check status + python detach.py --list # List all projects with status + python detach.py --dry-run my-project # Preview detach operation + """, + ) + + parser.add_argument( + "project", + nargs="?", + help="Project name (from registry) or path", + ) + parser.add_argument( + "--reattach", + action="store_true", + help="Reattach project (restore files from backup)", + ) + parser.add_argument( + "--status", + action="store_true", + help="Show detach status for project", + ) + parser.add_argument( + "--list", + action="store_true", + help="List all projects with detach status", + ) + parser.add_argument( + "--dry-run", + action="store_true", + help="Preview what would happen without making changes", + ) + parser.add_argument( + "--force", + action="store_true", + help="Skip confirmations and safety checks", + ) + + # Mutually exclusive artifact options + artifact_group = parser.add_mutually_exclusive_group() + artifact_group.add_argument( + "--include-artifacts", + dest="include_artifacts", + action="store_true", + default=True, + help="Include artifacts (.playwright-mcp, screenshots) in backup (default)", + ) + artifact_group.add_argument( + "--no-artifacts", + dest="include_artifacts", + action="store_false", + help="Exclude artifacts from backup", + ) + + parser.add_argument( + "-v", "--verbose", + action="store_true", + help="Enable verbose logging", + ) + + try: + args = parser.parse_args() + except SystemExit as e: + return e.code if e.code else 0 + + # Configure logging + logging.basicConfig( + level=logging.DEBUG if args.verbose else logging.INFO, + format="%(levelname)s: %(message)s", + ) + + try: + # Handle --list + if args.list: + projects = list_projects_with_status() + if not projects: + print("No projects registered.") + return 0 + + print("\nRegistered Projects:") + print("-" * 60) + for p in projects: + status_text = "DETACHED" if p["is_detached"] else "attached" + print(f" [{status_text:8}] {p['name']}") + print(f" {p['path']}") + print() + return 0 + + # All other commands require a project + if not args.project: + parser.print_help() + return 1 + + # Handle --status + if args.status: + status_info = get_detach_status(args.project) + if "error" in status_info: + print(f"Error: {status_info['error']}") + return 1 + + print(f"\nProject: {args.project}") + print("-" * 40) + state = status_info.get("state", "unknown") + if state == "detached": + print(" Status: DETACHED") + print(f" Detached at: {status_info['detached_at']}") + backup_size = status_info['backup_size'] + if backup_size is not None: + print(f" Backup size: {backup_size / 1024 / 1024:.1f} MB") + print(f" Files in backup: {status_info['file_count']}") + elif state == "inconsistent": + print(" Status: INCONSISTENT (needs cleanup)") + print(f" Files at root: {status_info['files_at_root']}") + print(" Backup manifest exists but AutoForge files also present.") + print(" Use --force to clean up and detach.") + elif state == "attached": + print(" Status: attached (AutoForge files present)") + print(f" Files at root: {status_info['files_at_root']}") + else: + print(" Status: clean (no AutoForge files)") + print() + return 0 + + # Handle --reattach + if args.reattach: + print(f"\nReattaching project: {args.project}") + success, message, files_restored, conflicts = reattach_project(args.project) + print(f" {message}") + if conflicts: + print(f" ⚠ {len(conflicts)} user files backed up to {PRE_REATTACH_BACKUP_DIR}/") + for f in conflicts[:5]: + print(f" - {f}") + if len(conflicts) > 5: + print(f" ... and {len(conflicts) - 5} more") + return 0 if success else 1 + + # Handle detach (default) + if args.dry_run: + print(f"\nDRY RUN - Previewing detach for: {args.project}") + else: + print(f"\nDetaching project: {args.project}") + + success, message, manifest, user_files_restored = detach_project( + args.project, + force=args.force, + include_artifacts=args.include_artifacts, + dry_run=args.dry_run, + ) + + print(f" {message}") + if user_files_restored > 0: + print(f" ✓ Restored {user_files_restored} user files from previous session") + + if manifest and args.dry_run: + print("\n Files to be moved:") + for entry in manifest["files"]: + size_str = f"{entry['size'] / 1024:.1f} KB" + if entry["type"] == "directory": + print(f" [DIR] {entry['path']} ({entry['file_count']} files, {size_str})") + else: + print(f" [FILE] {entry['path']} ({size_str})") + + return 0 if success else 1 + + except KeyboardInterrupt: + print("\n\nOperation cancelled.") + return 130 # Standard exit code for Ctrl+C + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/pyproject.toml b/pyproject.toml index 698aa07a..9db862ff 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,3 +1,7 @@ +[project] +name = "autocoder" +version = "1.0.0" + [tool.ruff] line-length = 120 target-version = "py311" diff --git a/server/dependencies.py b/server/dependencies.py new file mode 100644 index 00000000..40e691c8 --- /dev/null +++ b/server/dependencies.py @@ -0,0 +1,87 @@ +""" +Server Dependencies +=================== + +FastAPI dependencies for common validation patterns. +""" + +import sys +from pathlib import Path + +from fastapi import HTTPException + + +def _get_detach_module(): + """Lazy import of detach module.""" + root = Path(__file__).parent.parent + if str(root) not in sys.path: + sys.path.insert(0, str(root)) + import detach + return detach + + +def _get_registry_module(): + """Lazy import of registry module.""" + root = Path(__file__).parent.parent + if str(root) not in sys.path: + sys.path.insert(0, str(root)) + from registry import get_project_path + return get_project_path + + +def validate_project_not_detached(project_name: str) -> Path: + """Validate that a project is not detached. + + This dependency ensures that database operations are not performed + on detached projects, which would cause empty database recreation. + + Args: + project_name: The project name to validate + + Returns: + Path to the project directory if accessible + + Raises: + HTTPException 404: If project not found in registry + HTTPException 409: If project is detached (Conflict) + """ + get_project_path = _get_registry_module() + detach = _get_detach_module() + + project_dir = get_project_path(project_name) + if project_dir is None: + raise HTTPException( + status_code=404, + detail=f"Project '{project_name}' not found in registry" + ) + + project_dir = Path(project_dir) + if not project_dir.exists(): + raise HTTPException( + status_code=404, + detail=f"Project directory not found: {project_dir}" + ) + + if detach.is_project_detached(project_dir): + raise HTTPException( + status_code=409, + detail=f"Project '{project_name}' is detached. Reattach to access features." + ) + + return project_dir + + +def check_project_detached_for_background(project_dir: Path) -> bool: + """Check if a project is detached (for background services). + + Unlike validate_project_not_detached, this doesn't raise exceptions. + It's meant for background services that should silently skip detached projects. + + Args: + project_dir: Path to the project directory + + Returns: + True if project is detached, False otherwise + """ + detach = _get_detach_module() + return bool(detach.is_project_detached(project_dir)) diff --git a/server/routers/agent.py b/server/routers/agent.py index 26605e4b..58e3038e 100644 --- a/server/routers/agent.py +++ b/server/routers/agent.py @@ -17,6 +17,16 @@ from ..utils.validation import validate_project_name +def _get_detach_module(): + """Lazy import of detach module.""" + import sys + root = Path(__file__).parent.parent.parent + if str(root) not in sys.path: + sys.path.insert(0, str(root)) + import detach + return detach + + def _get_settings_defaults() -> tuple[bool, str, int, bool, int]: """Get defaults from global settings. @@ -93,6 +103,16 @@ async def start_agent( request: AgentStartRequest = AgentStartRequest(), ): """Start the agent for a project.""" + # Check detach status before starting agent + project_dir = _get_project_path(project_name) + if project_dir: + detach = _get_detach_module() + if detach.is_project_detached(project_dir): + raise HTTPException( + status_code=409, + detail=f"Project '{project_name}' is detached. Reattach to start agent." + ) + manager = get_project_manager(project_name) # Get defaults from global settings if not provided in request diff --git a/server/routers/features.py b/server/routers/features.py index 488c088c..e2cdf85e 100644 --- a/server/routers/features.py +++ b/server/routers/features.py @@ -12,6 +12,7 @@ from fastapi import APIRouter, HTTPException +from ..dependencies import validate_project_not_detached from ..schemas import ( DependencyGraphEdge, DependencyGraphNode, @@ -24,7 +25,6 @@ FeatureResponse, FeatureUpdate, ) -from ..utils.project_helpers import get_project_path as _get_project_path from ..utils.validation import validate_project_name # Lazy imports to avoid circular dependencies @@ -118,13 +118,9 @@ async def list_features(project_name: str): - done: passes=True """ project_name = validate_project_name(project_name) - project_dir = _get_project_path(project_name) - if not project_dir: - raise HTTPException(status_code=404, detail=f"Project '{project_name}' not found in registry") - - if not project_dir.exists(): - raise HTTPException(status_code=404, detail="Project directory not found") + # Check detach status before accessing database + project_dir = validate_project_not_detached(project_name) from autoforge_paths import get_features_db_path db_file = get_features_db_path(project_dir) @@ -169,13 +165,9 @@ async def list_features(project_name: str): async def create_feature(project_name: str, feature: FeatureCreate): """Create a new feature/test case manually.""" project_name = validate_project_name(project_name) - project_dir = _get_project_path(project_name) - - if not project_dir: - raise HTTPException(status_code=404, detail=f"Project '{project_name}' not found in registry") - if not project_dir.exists(): - raise HTTPException(status_code=404, detail="Project directory not found") + # Check detach status before accessing database + project_dir = validate_project_not_detached(project_name) _, Feature = _get_db_classes() @@ -235,13 +227,9 @@ async def create_features_bulk(project_name: str, bulk: FeatureBulkCreate): {"created": N, "features": [...]} """ project_name = validate_project_name(project_name) - project_dir = _get_project_path(project_name) - - if not project_dir: - raise HTTPException(status_code=404, detail=f"Project '{project_name}' not found in registry") - if not project_dir.exists(): - raise HTTPException(status_code=404, detail="Project directory not found") + # Check detach status before accessing database + project_dir = validate_project_not_detached(project_name) if not bulk.features: return FeatureBulkCreateResponse(created=0, features=[]) @@ -314,13 +302,9 @@ async def get_dependency_graph(project_name: str): rendering with React Flow or similar graph libraries. """ project_name = validate_project_name(project_name) - project_dir = _get_project_path(project_name) - if not project_dir: - raise HTTPException(status_code=404, detail=f"Project '{project_name}' not found in registry") - - if not project_dir.exists(): - raise HTTPException(status_code=404, detail="Project directory not found") + # Check detach status before accessing database + project_dir = validate_project_not_detached(project_name) from autoforge_paths import get_features_db_path db_file = get_features_db_path(project_dir) @@ -380,13 +364,9 @@ async def get_dependency_graph(project_name: str): async def get_feature(project_name: str, feature_id: int): """Get details of a specific feature.""" project_name = validate_project_name(project_name) - project_dir = _get_project_path(project_name) - - if not project_dir: - raise HTTPException(status_code=404, detail=f"Project '{project_name}' not found in registry") - if not project_dir.exists(): - raise HTTPException(status_code=404, detail="Project directory not found") + # Check detach status before accessing database + project_dir = validate_project_not_detached(project_name) from autoforge_paths import get_features_db_path db_file = get_features_db_path(project_dir) @@ -420,13 +400,9 @@ async def update_feature(project_name: str, feature_id: int, update: FeatureUpda when the agent is stuck or implementing a feature incorrectly. """ project_name = validate_project_name(project_name) - project_dir = _get_project_path(project_name) - - if not project_dir: - raise HTTPException(status_code=404, detail=f"Project '{project_name}' not found in registry") - if not project_dir.exists(): - raise HTTPException(status_code=404, detail="Project directory not found") + # Check detach status before accessing database + project_dir = validate_project_not_detached(project_name) _, Feature = _get_db_classes() @@ -482,13 +458,9 @@ async def delete_feature(project_name: str, feature_id: int): dependencies that would permanently block features. """ project_name = validate_project_name(project_name) - project_dir = _get_project_path(project_name) - if not project_dir: - raise HTTPException(status_code=404, detail=f"Project '{project_name}' not found in registry") - - if not project_dir.exists(): - raise HTTPException(status_code=404, detail="Project directory not found") + # Check detach status before accessing database + project_dir = validate_project_not_detached(project_name) _, Feature = _get_db_classes() @@ -533,13 +505,9 @@ async def skip_feature(project_name: str, feature_id: int): so it will be processed last. """ project_name = validate_project_name(project_name) - project_dir = _get_project_path(project_name) - - if not project_dir: - raise HTTPException(status_code=404, detail=f"Project '{project_name}' not found in registry") - if not project_dir.exists(): - raise HTTPException(status_code=404, detail="Project directory not found") + # Check detach status before accessing database + project_dir = validate_project_not_detached(project_name) _, Feature = _get_db_classes() @@ -592,13 +560,8 @@ async def add_dependency(project_name: str, feature_id: int, dep_id: int): if feature_id == dep_id: raise HTTPException(status_code=400, detail="A feature cannot depend on itself") - project_dir = _get_project_path(project_name) - - if not project_dir: - raise HTTPException(status_code=404, detail=f"Project '{project_name}' not found in registry") - - if not project_dir.exists(): - raise HTTPException(status_code=404, detail="Project directory not found") + # Check detach status before accessing database + project_dir = validate_project_not_detached(project_name) would_create_circular_dependency, MAX_DEPENDENCIES_PER_FEATURE = _get_dependency_resolver() _, Feature = _get_db_classes() @@ -644,13 +607,9 @@ async def add_dependency(project_name: str, feature_id: int, dep_id: int): async def remove_dependency(project_name: str, feature_id: int, dep_id: int): """Remove a dependency from a feature.""" project_name = validate_project_name(project_name) - project_dir = _get_project_path(project_name) - - if not project_dir: - raise HTTPException(status_code=404, detail=f"Project '{project_name}' not found in registry") - if not project_dir.exists(): - raise HTTPException(status_code=404, detail="Project directory not found") + # Check detach status before accessing database + project_dir = validate_project_not_detached(project_name) _, Feature = _get_db_classes() @@ -683,13 +642,9 @@ async def set_dependencies(project_name: str, feature_id: int, update: Dependenc Validates: self-reference, existence of all dependencies, circular dependencies, max limit. """ project_name = validate_project_name(project_name) - project_dir = _get_project_path(project_name) - - if not project_dir: - raise HTTPException(status_code=404, detail=f"Project '{project_name}' not found in registry") - if not project_dir.exists(): - raise HTTPException(status_code=404, detail="Project directory not found") + # Check detach status before accessing database + project_dir = validate_project_not_detached(project_name) dependency_ids = update.dependency_ids diff --git a/server/routers/projects.py b/server/routers/projects.py index 36f7ffdc..fc8a6a17 100644 --- a/server/routers/projects.py +++ b/server/routers/projects.py @@ -15,6 +15,8 @@ from fastapi import APIRouter, HTTPException from ..schemas import ( + DetachResponse, + DetachStatusResponse, ProjectCreate, ProjectDetail, ProjectPrompts, @@ -22,6 +24,7 @@ ProjectSettingsUpdate, ProjectStats, ProjectSummary, + ReattachResponse, ) # Lazy imports to avoid circular dependencies @@ -31,13 +34,14 @@ _scaffold_project_prompts: Callable[..., Any] | None = None _get_project_prompts_dir: Callable[..., Any] | None = None _count_passing_tests: Callable[..., Any] | None = None +_detach_module: Any = None def _init_imports(): """Lazy import of project-level modules.""" global _imports_initialized, _check_spec_exists global _scaffold_project_prompts, _get_project_prompts_dir - global _count_passing_tests + global _count_passing_tests, _detach_module if _imports_initialized: return @@ -47,6 +51,7 @@ def _init_imports(): if str(root) not in sys.path: sys.path.insert(0, str(root)) + import detach as detach_module from progress import count_passing_tests from prompts import get_project_prompts_dir, scaffold_project_prompts from start import check_spec_exists @@ -55,6 +60,7 @@ def _init_imports(): _scaffold_project_prompts = scaffold_project_prompts _get_project_prompts_dir = get_project_prompts_dir _count_passing_tests = count_passing_tests + _detach_module = detach_module _imports_initialized = True @@ -133,6 +139,7 @@ async def list_projects(): has_spec = _check_spec_exists(project_dir) stats = get_project_stats(project_dir) + is_detached = _detach_module.is_project_detached(project_dir) result.append(ProjectSummary( name=name, @@ -140,6 +147,7 @@ async def list_projects(): has_spec=has_spec, stats=stats, default_concurrency=info.get("default_concurrency", 3), + is_detached=is_detached, )) return result @@ -254,6 +262,7 @@ async def get_project(name: str): stats=stats, prompts_dir=str(prompts_dir), default_concurrency=get_project_concurrency(name), + is_detached=_detach_module.is_project_detached(project_dir), ) @@ -521,4 +530,150 @@ async def update_project_settings(name: str, settings: ProjectSettingsUpdate): stats=stats, prompts_dir=str(prompts_dir), default_concurrency=get_project_concurrency(name), + is_detached=_detach_module.is_project_detached(project_dir), + ) + + +# ============================================================================ +# Detach/Reattach Endpoints +# ============================================================================ + +@router.get("/{name}/detach-status", response_model=DetachStatusResponse) +def get_detach_status(name: str): + """Check if a project is detached and get backup info.""" + _init_imports() + (_, _, get_project_path, _, _, _, _) = _get_registry_functions() + + name = validate_project_name(name) + project_dir = get_project_path(name) + + if not project_dir: + raise HTTPException(status_code=404, detail=f"Project '{name}' not found") + + if not project_dir.exists(): + raise HTTPException(status_code=404, detail="Project directory not found") + + status = _detach_module.get_detach_status(name) + + return DetachStatusResponse( + is_detached=status["is_detached"], + backup_exists=status["backup_exists"], + backup_size=status.get("backup_size"), + detached_at=status.get("detached_at"), + file_count=status.get("file_count"), + ) + + +@router.post("/{name}/detach", response_model=DetachResponse) +def detach_project(name: str): + """ + Detach a project by moving Autocoder files to backup. + + This allows Claude Code to run without Autocoder restrictions. + Files can be restored later with reattach. + + Note: Using sync function because detach_project() performs blocking I/O. + FastAPI will run this in a threadpool automatically. + """ + _init_imports() + (_, _, get_project_path, _, _, _, _) = _get_registry_functions() + + name = validate_project_name(name) + project_dir = get_project_path(name) + + if not project_dir: + raise HTTPException(status_code=404, detail=f"Project '{name}' not found") + + if not project_dir.exists(): + raise HTTPException(status_code=404, detail="Project directory not found") + + # Note: Agent lock check is handled inside detach_project() to avoid TOCTOU race. + # The detach module will return an appropriate error message if agent is running. + + # Dispose cached database engines before detach to release file locks (Windows) + from api.database import dispose_engine as dispose_features_engine + from server.services.assistant_database import dispose_engine as dispose_assistant_engine + dispose_features_engine(project_dir) + dispose_assistant_engine(project_dir) + + assert _detach_module is not None + success, message, manifest, user_files_restored = _detach_module.detach_project( + name, + force=False, + include_artifacts=True, + dry_run=False, + ) + + if not success: + # Map common error messages to appropriate HTTP status codes + if "Agent is currently running" in message: + raise HTTPException(status_code=409, detail=message) + elif "already detached" in message: + raise HTTPException(status_code=409, detail=message) + elif "in progress" in message: + raise HTTPException(status_code=409, detail=message) + raise HTTPException(status_code=400, detail=message) + + return DetachResponse( + success=True, + files_moved=manifest["file_count"] if manifest else 0, + backup_size=manifest["total_size_bytes"] if manifest else 0, + backup_path=_detach_module.BACKUP_DIR, # Return relative path, not absolute + message=message, + user_files_restored=user_files_restored, + ) + + +@router.post("/{name}/reattach", response_model=ReattachResponse) +def reattach_project(name: str): + """ + Reattach a project by restoring Autocoder files from backup. + + This restores all Autocoder files and re-enables restrictions. + + Note: Using sync function because reattach_project() performs blocking I/O. + FastAPI will run this in a threadpool automatically. + """ + _init_imports() + (_, _, get_project_path, _, _, _, _) = _get_registry_functions() + + name = validate_project_name(name) + project_dir = get_project_path(name) + + if not project_dir: + raise HTTPException(status_code=404, detail=f"Project '{name}' not found") + + if not project_dir.exists(): + raise HTTPException(status_code=404, detail="Project directory not found") + + # Check if agent is running (consistent with detach endpoint) + lock_file = project_dir / ".agent.lock" + if lock_file.exists(): + raise HTTPException( + status_code=409, + detail="Cannot reattach while agent is running. Stop the agent first." + ) + + success, message, files_restored, conflicts = _detach_module.reattach_project(name) + + if not success: + # Map common error messages to appropriate HTTP status codes + if "in progress" in message: + raise HTTPException(status_code=409, detail=message) + elif "already attached" in message: + raise HTTPException(status_code=409, detail=message) + raise HTTPException(status_code=400, detail=message) + + # Dispose cached database engines so next request gets fresh connection + from api.database import dispose_engine as dispose_features_engine + from server.services.assistant_database import dispose_engine as dispose_assistant_engine + dispose_features_engine(project_dir) + dispose_assistant_engine(project_dir) + + return ReattachResponse( + success=True, + files_restored=files_restored, + message=message, + conflicts=conflicts, + conflicts_backup_path=_detach_module.PRE_REATTACH_BACKUP_DIR if conflicts else None, ) diff --git a/server/routers/review.py b/server/routers/review.py new file mode 100644 index 00000000..ff7041bd --- /dev/null +++ b/server/routers/review.py @@ -0,0 +1,385 @@ +""" +Review Agent API Router +======================= + +REST API endpoints for automatic code review. + +Endpoints: +- POST /api/review/run - Run code review on a project +- GET /api/review/reports/{project_name} - List review reports +- GET /api/review/reports/{project_name}/{filename} - Get specific report +- POST /api/review/create-features - Create features from review issues +""" + +import json +import logging +import sys +from pathlib import Path +from typing import Optional + +from fastapi import APIRouter, HTTPException +from pydantic import BaseModel, Field +from review_agent import ReviewAgent + +from registry import get_project_path + + +def _get_detach_module(): + """Lazy import of detach module.""" + root = Path(__file__).parent.parent.parent + if str(root) not in sys.path: + sys.path.insert(0, str(root)) + import detach + return detach + + +def _validate_project_not_detached(project_dir: Path, project_name: str) -> None: + """Validate project is not detached before database operations. + + Args: + project_dir: Path to the project directory + project_name: Name of the project (for error message) + + Raises: + HTTPException 409: If project is detached + """ + detach = _get_detach_module() + if detach.is_project_detached(project_dir): + raise HTTPException( + status_code=409, + detail=f"Project '{project_name}' is detached. Reattach to access features." + ) + +logger = logging.getLogger(__name__) + +router = APIRouter(prefix="/api/review", tags=["review"]) + + +# ============================================================================ +# Request/Response Models +# ============================================================================ + + +class RunReviewRequest(BaseModel): + """Request to run a code review.""" + + project_name: str = Field(..., description="Project name or path") + commits: Optional[list[str]] = Field(None, description="Specific commits to review") + files: Optional[list[str]] = Field(None, description="Specific files to review") + save_report: bool = Field(True, description="Whether to save the report") + checks: Optional[dict] = Field( + None, + description="Which checks to run (dead_code, naming, error_handling, security, complexity)", + ) + + +class ReviewIssueResponse(BaseModel): + """A review issue.""" + + category: str + severity: str + title: str + description: str + file_path: str + line_number: Optional[int] = None + code_snippet: Optional[str] = None + suggestion: Optional[str] = None + + +class ReviewSummary(BaseModel): + """Summary of review results.""" + + total_issues: int + by_severity: dict + by_category: dict + + +class RunReviewResponse(BaseModel): + """Response from running a review.""" + + project_dir: str + review_time: str + commits_reviewed: list[str] + files_reviewed: list[str] + issues: list[ReviewIssueResponse] + summary: ReviewSummary + report_path: Optional[str] = None + + +class ReportListItem(BaseModel): + """A review report in the list.""" + + filename: str + review_time: str + total_issues: int + errors: int + warnings: int + + +class ReportListResponse(BaseModel): + """List of review reports.""" + + reports: list[ReportListItem] + count: int + + +class CreateFeaturesRequest(BaseModel): + """Request to create features from review issues.""" + + project_name: str = Field(..., description="Project name") + issues: list[dict] = Field(..., description="Issues to convert to features") + + +class CreateFeaturesResponse(BaseModel): + """Response from creating features.""" + + created: int + features: list[dict] + + +# ============================================================================ +# Helper Functions +# ============================================================================ + + +def get_project_dir(project_name: str) -> Path: + """Get project directory from name or path.""" + # Try to get from registry + project_path = get_project_path(project_name) + if project_path: + path = Path(project_path) + if path.exists() and path.is_dir(): + return path + # Fall through to direct path check + + # Check if it's a direct path + path = Path(project_name) + if path.exists() and path.is_dir(): + return path + + raise HTTPException(status_code=404, detail=f"Project not found: {project_name}") + + +# ============================================================================ +# Endpoints +# ============================================================================ + + +@router.post("/run", response_model=RunReviewResponse) +def run_code_review(request: RunReviewRequest): + """ + Run code review on a project. + + Analyzes code for common issues: + - Dead code (unused imports, variables) + - Naming convention violations + - Missing error handling + - Security vulnerabilities + - Code complexity + """ + project_dir = get_project_dir(request.project_name) + project_dir_resolved = project_dir.resolve() + + # Validate files to prevent path traversal attacks + validated_files = None + if request.files: + validated_files = [] + for file_path in request.files: + # Resolve the full path and validate it's within project boundaries + full_path = (project_dir / file_path).resolve() + if not full_path.is_relative_to(project_dir_resolved): + raise HTTPException( + status_code=400, + detail=f"Invalid file path: path traversal detected in '{file_path}'", + ) + # Store the relative path for the agent + validated_files.append(str(full_path.relative_to(project_dir_resolved))) + + # Configure checks + check_config = request.checks or {} + + try: + agent = ReviewAgent( + project_dir=project_dir, + check_dead_code=check_config.get("dead_code", True), + check_naming=check_config.get("naming", True), + check_error_handling=check_config.get("error_handling", True), + check_security=check_config.get("security", True), + check_complexity=check_config.get("complexity", True), + ) + + report = agent.review( + commits=request.commits, + files=validated_files, + ) + + report_path = None + if request.save_report: + saved_path = agent.save_report(report) + report_path = str(saved_path.relative_to(project_dir)) + + report_dict = report.to_dict() + + return RunReviewResponse( + project_dir=report_dict["project_dir"], + review_time=report_dict["review_time"], + commits_reviewed=report_dict["commits_reviewed"], + files_reviewed=report_dict["files_reviewed"], + issues=[ReviewIssueResponse(**i) for i in report_dict["issues"]], + summary=ReviewSummary(**report_dict["summary"]), + report_path=report_path, + ) + + except Exception as e: + logger.error(f"Review failed for {project_dir}: {e}") + raise HTTPException(status_code=500, detail=str(e)) + + +@router.get("/reports/{project_name}", response_model=ReportListResponse) +def list_reports(project_name: str): + """ + List all review reports for a project. + """ + project_dir = get_project_dir(project_name) + reports_dir = project_dir / ".autocoder" / "review-reports" + + if not reports_dir.exists(): + return ReportListResponse(reports=[], count=0) + + reports = [] + for report_file in sorted(reports_dir.glob("review_*.json"), reverse=True): + try: + with open(report_file) as f: + data = json.load(f) + + summary = data.get("summary", {}) + by_severity = summary.get("by_severity", {}) + + reports.append( + ReportListItem( + filename=report_file.name, + review_time=data.get("review_time", ""), + total_issues=summary.get("total_issues", 0), + errors=by_severity.get("error", 0), + warnings=by_severity.get("warning", 0), + ) + ) + except Exception as e: + logger.warning(f"Error reading report {report_file}: {e}") + continue + + return ReportListResponse(reports=reports, count=len(reports)) + + +@router.get("/reports/{project_name}/{filename}") +def get_report(project_name: str, filename: str): + """ + Get a specific review report. + """ + # Validate filename FIRST to prevent path traversal + if ".." in filename or "/" in filename or "\\" in filename: + raise HTTPException(status_code=400, detail="Invalid filename") + + project_dir = get_project_dir(project_name) + report_path = project_dir / ".autocoder" / "review-reports" / filename + + if not report_path.exists(): + raise HTTPException(status_code=404, detail=f"Report not found: {filename}") + + try: + with open(report_path) as f: + return json.load(f) + except Exception as e: + raise HTTPException(status_code=500, detail=f"Error reading report: {e}") + + +@router.post("/create-features", response_model=CreateFeaturesResponse) +def create_features_from_issues(request: CreateFeaturesRequest): + """ + Create features from review issues. + + Converts review issues into trackable features that can be assigned + to coding agents for resolution. + """ + from api.database import Feature, create_database + + project_dir = get_project_dir(request.project_name) + + # Check detach status before accessing database + _validate_project_not_detached(project_dir, request.project_name) + + db_path = project_dir / "features.db" + + if not db_path.exists(): + raise HTTPException(status_code=404, detail="Project database not found") + + created_features = [] + session = None + + try: + _, SessionLocal = create_database(project_dir) + session = SessionLocal() + + # Get max priority for ordering + max_priority = session.query(Feature.priority).order_by(Feature.priority.desc()).first() + current_priority = (max_priority[0] if max_priority else 0) + 1 + + for issue in request.issues: + # Create feature from issue + feature = Feature( + priority=current_priority, + category=issue.get("category", "Code Review"), + name=issue.get("name", issue.get("title", "Review Issue")), + description=issue.get("description", ""), + steps=json.dumps(issue.get("steps", ["Fix the identified issue"])), + passes=False, + in_progress=False, + ) + + session.add(feature) + current_priority += 1 + + created_features.append( + { + "priority": feature.priority, + "category": feature.category, + "name": feature.name, + "description": feature.description, + } + ) + + session.commit() + + return CreateFeaturesResponse( + created=len(created_features), + features=created_features, + ) + + except Exception as e: + logger.error(f"Failed to create features: {e}") + raise HTTPException(status_code=500, detail=str(e)) + finally: + if session is not None: + session.close() + + +@router.delete("/reports/{project_name}/{filename}") +def delete_report(project_name: str, filename: str): + """ + Delete a specific review report. + """ + project_dir = get_project_dir(project_name) + report_path = project_dir / ".autocoder" / "review-reports" / filename + + # Validate filename to prevent path traversal + if ".." in filename or "/" in filename or "\\" in filename: + raise HTTPException(status_code=400, detail="Invalid filename") + + if not report_path.exists(): + raise HTTPException(status_code=404, detail=f"Report not found: {filename}") + + try: + report_path.unlink() + return {"deleted": True, "filename": filename} + except Exception as e: + raise HTTPException(status_code=500, detail=f"Error deleting report: {e}") diff --git a/server/routers/schedules.py b/server/routers/schedules.py index 1758f623..e35760a3 100644 --- a/server/routers/schedules.py +++ b/server/routers/schedules.py @@ -14,6 +14,8 @@ from fastapi import APIRouter, HTTPException from sqlalchemy.orm import Session +from ..dependencies import validate_project_not_detached + # Schedule limits to prevent resource exhaustion MAX_SCHEDULES_PER_PROJECT = 50 @@ -24,7 +26,6 @@ ScheduleResponse, ScheduleUpdate, ) -from ..utils.project_helpers import get_project_path as _get_project_path from ..utils.validation import validate_project_name if TYPE_CHECKING: @@ -54,23 +55,17 @@ def _get_db_session(project_name: str) -> Generator[Tuple[Session, Path], None, with _get_db_session(project_name) as (db, project_path): # ... use db ... # db is automatically closed + + Raises: + HTTPException 404: If project not found + HTTPException 409: If project is detached """ from api.database import create_database project_name = validate_project_name(project_name) - project_path = _get_project_path(project_name) - if not project_path: - raise HTTPException( - status_code=404, - detail=f"Project '{project_name}' not found in registry" - ) - - if not project_path.exists(): - raise HTTPException( - status_code=404, - detail=f"Project directory not found: {project_path}" - ) + # Check detach status before accessing database + project_path = validate_project_not_detached(project_name) _, SessionLocal = create_database(project_path) db = SessionLocal() diff --git a/server/schemas.py b/server/schemas.py index 5f546e2b..080ae659 100644 --- a/server/schemas.py +++ b/server/schemas.py @@ -46,6 +46,7 @@ class ProjectSummary(BaseModel): has_spec: bool stats: ProjectStats default_concurrency: int = 3 + is_detached: bool = False # True if Autocoder files moved to backup class ProjectDetail(BaseModel): @@ -56,6 +57,7 @@ class ProjectDetail(BaseModel): stats: ProjectStats prompts_dir: str default_concurrency: int = 3 + is_detached: bool = False # True if Autocoder files moved to backup class ProjectPrompts(BaseModel): @@ -65,6 +67,38 @@ class ProjectPrompts(BaseModel): coding_prompt: str = "" +# ============================================================================ +# Detach/Reattach Schemas +# ============================================================================ + +class DetachResponse(BaseModel): + """Response schema for detach operation.""" + success: bool + files_moved: int + backup_size: int + backup_path: str # Relative path to backup directory (not absolute for security) + message: str = "" + user_files_restored: int = 0 # User files restored from pre-reattach backup + + +class ReattachResponse(BaseModel): + """Response schema for reattach operation.""" + success: bool + files_restored: int + message: str = "" + conflicts: list[str] = [] # List of user files that were backed up + conflicts_backup_path: str | None = None # Path to backup dir if conflicts exist + + +class DetachStatusResponse(BaseModel): + """Response schema for detach status check.""" + is_detached: bool + backup_exists: bool + backup_size: int | None = None + detached_at: str | None = None + file_count: int | None = None + + class ProjectPromptsUpdate(BaseModel): """Request schema for updating project prompts.""" app_spec: str | None = None diff --git a/server/services/scheduler_service.py b/server/services/scheduler_service.py index 3e0576d6..c75badf6 100644 --- a/server/services/scheduler_service.py +++ b/server/services/scheduler_service.py @@ -21,6 +21,21 @@ logger = logging.getLogger(__name__) + +def _is_project_detached(project_dir: Path) -> bool: + """Check if a project is detached. + + Used by background scheduler to skip detached projects silently. + + Args: + project_dir: Path to the project directory + + Returns: + True if project is detached, False otherwise + """ + import detach + return detach.is_project_detached(project_dir) + # Constants MAX_CRASH_RETRIES = 3 CRASH_BACKOFF_BASE = 10 # seconds @@ -94,6 +109,11 @@ async def _load_project_schedules(self, project_name: str, project_dir: Path) -> from api.database import Schedule, create_database from autoforge_paths import get_features_db_path + # Skip detached projects - don't access their database + if _is_project_detached(project_dir): + logger.debug(f"Skipping detached project '{project_name}' in schedule loading") + return 0 + db_path = get_features_db_path(project_dir) if not db_path.exists(): return 0 @@ -212,6 +232,11 @@ async def _handle_scheduled_start( logger.info(f"Scheduled start triggered for {project_name} (schedule {schedule_id})") project_dir = Path(project_dir_str) + # Skip detached projects - don't access their database + if _is_project_detached(project_dir): + logger.info(f"Skipping scheduled start for detached project '{project_name}'") + return + try: from api.database import Schedule, ScheduleOverride, create_database @@ -258,6 +283,11 @@ async def _handle_scheduled_stop( logger.info(f"Scheduled stop triggered for {project_name} (schedule {schedule_id})") project_dir = Path(project_dir_str) + # Skip detached projects - don't access their database + if _is_project_detached(project_dir): + logger.info(f"Skipping scheduled stop for detached project '{project_name}'") + return + try: from api.database import Schedule, ScheduleOverride, create_database @@ -415,6 +445,11 @@ async def _stop_agent(self, project_name: str, project_dir: Path): async def handle_crash_during_window(self, project_name: str, project_dir: Path): """Called when agent crashes. Attempt restart with backoff.""" + # Skip detached projects - don't access their database + if _is_project_detached(project_dir): + logger.info(f"Skipping crash recovery for detached project '{project_name}'") + return + from api.database import Schedule, create_database _, SessionLocal = create_database(project_dir) @@ -472,6 +507,11 @@ def _create_override_for_active_schedules( Uses atomic delete-then-create pattern to prevent race conditions. """ + # Skip detached projects - don't access their database + if _is_project_detached(project_dir): + logger.debug(f"Skipping override creation for detached project '{project_name}'") + return + from api.database import Schedule, ScheduleOverride, create_database try: @@ -567,6 +607,11 @@ async def _check_project_on_startup( self, project_name: str, project_dir: Path, now: datetime ): """Check if a project should be started on server startup.""" + # Skip detached projects - don't access their database + if _is_project_detached(project_dir): + logger.debug(f"Skipping startup check for detached project '{project_name}'") + return + from api.database import Schedule, ScheduleOverride, create_database from autoforge_paths import get_features_db_path diff --git a/start.py b/start.py index a47da397..890464d6 100644 --- a/start.py +++ b/start.py @@ -15,11 +15,8 @@ from dotenv import load_dotenv +import detach as detach_module from auth import is_auth_error, print_auth_error_help - -# Load environment variables from .env file if present -load_dotenv() - from prompts import ( get_project_prompts_dir, has_project_prompts, @@ -31,6 +28,9 @@ register_project, ) +# Load environment variables from .env file if present +load_dotenv() + def check_spec_exists(project_dir: Path) -> bool: """ @@ -88,6 +88,7 @@ def display_menu(projects: list[tuple[str, Path]]) -> None: if projects: print("[2] Continue existing project") + print("[3] Detach/Reattach project") print("[q] Quit") print() @@ -231,7 +232,9 @@ def run_spec_creation(project_dir: Path) -> bool: check=False, # Don't raise on non-zero exit cwd=str(Path(__file__).parent), # Run from project root stderr=subprocess.PIPE, - text=True + text=True, + encoding="utf-8", # Fix Windows CP1252 encoding issue (#138) + errors="replace", ) # Check for authentication errors in stderr @@ -369,6 +372,103 @@ def create_new_project_flow() -> tuple[str, Path] | None: return project_name, project_dir +def display_detach_menu(projects: list[tuple[str, Path]]) -> None: + """Display the detach/reattach menu.""" + print("\n" + "-" * 40) + print(" Detach/Reattach Projects") + print("-" * 40) + print("\nDetached projects have Autocoder files moved to backup,") + print("allowing Claude Code to run without restrictions.\n") + + for i, (name, path) in enumerate(projects, 1): + is_detached = detach_module.is_project_detached(path) + status = "DETACHED" if is_detached else "attached" + print(f" [{i}] [{status:8}] {name}") + + print("\n [b] Back to main menu") + print() + + +def handle_detach_reattach(projects: list[tuple[str, Path]]) -> None: + """Handle detach/reattach flow for projects.""" + while True: + display_detach_menu(projects) + + try: + choice = input("Select project number: ").strip().lower() + except KeyboardInterrupt: + print("\n\nCancelled.") + return + + if choice == 'b': + return + + try: + idx = int(choice) - 1 + if 0 <= idx < len(projects): + project_name, project_path = projects[idx] + handle_project_detach_action(project_name, project_path) + else: + print(f"Please enter a number between 1 and {len(projects)}") + except ValueError: + print("Invalid input. Enter a number or 'b' to go back.") + + +def handle_project_detach_action(project_name: str, project_path: Path) -> None: + """Handle detach or reattach action for a single project.""" + is_detached = detach_module.is_project_detached(project_path) + + print(f"\nProject: {project_name}") + print(f"Path: {project_path}") + print(f"Status: {'DETACHED' if is_detached else 'attached'}") + + if is_detached: + # Offer to reattach + print("\nThis project is detached. Autocoder files are in backup.") + try: + confirm = input("Reattach project (restore files)? [y/N]: ").strip().lower() + except KeyboardInterrupt: + print("\n\nCancelled.") + return + if confirm == 'y': + print("\nReattaching...") + success, message, _, conflicts = detach_module.reattach_project(project_name) + if success: + print(f" ✓ {message}") + if conflicts: + print(f" ⚠ {len(conflicts)} user files backed up to .pre-reattach-backup/") + for f in conflicts[:5]: # Show first 5 + print(f" - {f}") + if len(conflicts) > 5: + print(f" ... and {len(conflicts) - 5} more") + else: + print(f" ✗ {message}") + else: + # Offer to detach + print("\nThis project is attached. Autocoder files are present.") + print("Detaching will move files to backup and allow Claude Code full access.") + try: + confirm = input("Detach project? [y/N]: ").strip().lower() + except KeyboardInterrupt: + print("\n\nCancelled.") + return + if confirm == 'y': + print("\nDetaching...") + success, message, manifest, user_files_restored = detach_module.detach_project(project_name) + if success: + print(f" ✓ {message}") + if user_files_restored > 0: + print(f" ✓ Restored {user_files_restored} user files from previous session") + else: + print(f" ✗ {message}") + + try: + input("\nPress Enter to continue...") + except KeyboardInterrupt: + print("\n\nCancelled.") + return + + def run_agent(project_name: str, project_dir: Path) -> None: """Run the autonomous agent with the given project. @@ -403,7 +503,9 @@ def run_agent(project_name: str, project_dir: Path) -> None: cmd, check=False, stderr=subprocess.PIPE, - text=True + text=True, + encoding="utf-8", # Fix Windows CP1252 encoding issue (#138) + errors="replace", ) # Check for authentication errors @@ -451,6 +553,9 @@ def main() -> None: project_name, project_dir = selected run_agent(project_name, project_dir) + elif choice == '3' and projects: + handle_detach_reattach(projects) + else: print("Invalid option. Please try again.") diff --git a/test_detach.py b/test_detach.py new file mode 100644 index 00000000..63cef2b6 --- /dev/null +++ b/test_detach.py @@ -0,0 +1,1645 @@ +#!/usr/bin/env python3 +""" +Unit tests for detach.py module. + +Tests cover: +- File detection patterns +- Backup creation and manifest +- Restore functionality +- Edge cases (locked projects, missing backups, etc.) +""" + +import json +import shutil +import tempfile +import unittest +from pathlib import Path +from unittest.mock import patch + +import detach + + +class TestGetAutoforgeFiles(unittest.TestCase): + """Tests for get_autoforge_files function.""" + + def setUp(self): + """Create temporary project directory.""" + self.temp_dir = tempfile.mkdtemp() + self.project_dir = Path(self.temp_dir) + + def tearDown(self): + """Clean up temporary directory.""" + shutil.rmtree(self.temp_dir) + + def test_detects_autoforge_directory(self): + """Should detect .autoforge directory.""" + (self.project_dir / ".autoforge").mkdir() + files = detach.get_autoforge_files(self.project_dir) + self.assertEqual(len(files), 1) + self.assertEqual(files[0].name, ".autoforge") + + def test_detects_prompts_directory(self): + """Should detect prompts directory.""" + (self.project_dir / "prompts").mkdir() + files = detach.get_autoforge_files(self.project_dir) + self.assertEqual(len(files), 1) + self.assertEqual(files[0].name, "prompts") + + def test_detects_features_db(self): + """Should detect features.db file.""" + (self.project_dir / "features.db").touch() + files = detach.get_autoforge_files(self.project_dir) + self.assertEqual(len(files), 1) + self.assertEqual(files[0].name, "features.db") + + def test_detects_claude_md(self): + """Should detect CLAUDE.md file.""" + (self.project_dir / "CLAUDE.md").touch() + files = detach.get_autoforge_files(self.project_dir) + self.assertEqual(len(files), 1) + self.assertEqual(files[0].name, "CLAUDE.md") + + def test_detects_glob_patterns(self): + """Should detect files matching glob patterns in AutoForge directories. + + Patterns are only matched within .autoforge/, prompts/, and .playwright-mcp/ + to avoid accidentally moving user files like test-myfeature.py at root. + """ + # Create AutoForge directory structure + (self.project_dir / ".autoforge").mkdir() + (self.project_dir / ".autoforge" / "test-login.json").touch() + (self.project_dir / ".autoforge" / "test-api.py").touch() + (self.project_dir / ".autoforge" / "generate-data.py").touch() + files = detach.get_autoforge_files(self.project_dir) + # 1 directory + 3 pattern-matched files + self.assertEqual(len(files), 4) + names = {f.name for f in files} + self.assertIn(".autoforge", names) + self.assertIn("test-login.json", names) + self.assertIn("test-api.py", names) + self.assertIn("generate-data.py", names) + + def test_detects_sqlite_wal_files(self): + """Should detect SQLite WAL companion files.""" + (self.project_dir / "features.db").touch() + (self.project_dir / "features.db-shm").write_bytes(b"\x00" * 32768) + (self.project_dir / "features.db-wal").touch() + (self.project_dir / "assistant.db").touch() + (self.project_dir / "assistant.db-shm").write_bytes(b"\x00" * 32768) + (self.project_dir / "assistant.db-wal").touch() + files = detach.get_autoforge_files(self.project_dir) + names = {f.name for f in files} + self.assertIn("features.db", names) + self.assertIn("features.db-shm", names) + self.assertIn("features.db-wal", names) + self.assertIn("assistant.db", names) + self.assertIn("assistant.db-shm", names) + self.assertIn("assistant.db-wal", names) + self.assertEqual(len(files), 6) + + def test_detects_sql_test_files(self): + """Should detect test-*.sql files in AutoForge directories.""" + (self.project_dir / "prompts").mkdir() + (self.project_dir / "prompts" / "test-feature153-create-page.sql").touch() + (self.project_dir / "prompts" / "test-database-migration.sql").touch() + files = detach.get_autoforge_files(self.project_dir) + names = {f.name for f in files} + self.assertIn("prompts", names) + self.assertIn("test-feature153-create-page.sql", names) + self.assertIn("test-database-migration.sql", names) + self.assertEqual(len(files), 3) # 1 directory + 2 files + + def test_detects_php_test_files(self): + """Should detect test-*.php files in AutoForge directories.""" + (self.project_dir / ".autoforge").mkdir() + (self.project_dir / ".autoforge" / "test-feature28-create-page.php").touch() + (self.project_dir / ".autoforge" / "test-api-endpoint.php").touch() + files = detach.get_autoforge_files(self.project_dir) + names = {f.name for f in files} + self.assertIn(".autoforge", names) + self.assertIn("test-feature28-create-page.php", names) + self.assertIn("test-api-endpoint.php", names) + self.assertEqual(len(files), 3) # 1 directory + 2 files + + def test_detects_test_helper_php_files(self): + """Should detect create-*-test*.php helper scripts in AutoForge directories.""" + (self.project_dir / ".playwright-mcp").mkdir() + (self.project_dir / ".playwright-mcp" / "create-xss-direct-test.php").touch() + (self.project_dir / ".playwright-mcp" / "create-xss-test-page.php").touch() + (self.project_dir / ".playwright-mcp" / "create-csrf-test.php").touch() + files = detach.get_autoforge_files(self.project_dir) + names = {f.name for f in files} + self.assertIn(".playwright-mcp", names) + self.assertIn("create-xss-direct-test.php", names) + self.assertIn("create-xss-test-page.php", names) + self.assertIn("create-csrf-test.php", names) + self.assertEqual(len(files), 4) # 1 directory + 3 files + + def test_detects_rollback_json_files(self): + """Should detect rollback-*.json files in AutoForge directories.""" + (self.project_dir / "prompts").mkdir() + (self.project_dir / "prompts" / "rollback-test-translated.json").touch() + (self.project_dir / "prompts" / "rollback-migration-v2.json").touch() + files = detach.get_autoforge_files(self.project_dir) + names = {f.name for f in files} + self.assertIn("prompts", names) + self.assertIn("rollback-test-translated.json", names) + self.assertIn("rollback-migration-v2.json", names) + self.assertEqual(len(files), 3) # 1 directory + 2 files + + def test_does_not_capture_user_files_at_root(self): + """Should NOT capture generic user files matching patterns if at project root. + + This prevents accidentally moving user files like test-myfeature.py. + Generic patterns are only applied within AutoForge-owned directories. + More specific patterns (test-feature*.py) are allowed at root. + """ + # User files at project root - should NOT be captured + (self.project_dir / "test-myfeature.py").touch() + (self.project_dir / "test-user-data.json").touch() + + files = detach.get_autoforge_files(self.project_dir) + self.assertEqual(len(files), 0) + names = {f.name for f in files} + self.assertNotIn("test-myfeature.py", names) + self.assertNotIn("test-user-data.json", names) + + def test_detects_feature_test_files_at_root(self): + """Should detect test-feature*.py files at project root. + + These are agent-generated feature test files that should be moved. + """ + (self.project_dir / "test-feature184-missing-config.py").touch() + (self.project_dir / "test-feature182-log-archiving.py").touch() + (self.project_dir / "test-feature100-basic.json").touch() + + files = detach.get_autoforge_files(self.project_dir) + names = {f.name for f in files} + self.assertIn("test-feature184-missing-config.py", names) + self.assertIn("test-feature182-log-archiving.py", names) + self.assertIn("test-feature100-basic.json", names) + self.assertEqual(len(files), 3) + + def test_detects_generate_files_at_root(self): + """Should detect generate-*.py files at project root.""" + (self.project_dir / "generate-100items.py").touch() + (self.project_dir / "generate-test-data.py").touch() + + files = detach.get_autoforge_files(self.project_dir) + names = {f.name for f in files} + self.assertIn("generate-100items.py", names) + self.assertIn("generate-test-data.py", names) + self.assertEqual(len(files), 2) + + def test_detects_mark_feature_files_at_root(self): + """Should detect mark_feature*.py files at project root.""" + (self.project_dir / "mark_feature123.py").touch() + (self.project_dir / "mark_feature_passing.py").touch() + + files = detach.get_autoforge_files(self.project_dir) + names = {f.name for f in files} + self.assertIn("mark_feature123.py", names) + self.assertIn("mark_feature_passing.py", names) + self.assertEqual(len(files), 2) + + def test_detects_rollback_json_at_root(self): + """Should detect rollback-*.json files at project root.""" + (self.project_dir / "rollback-test-translated.json").touch() + (self.project_dir / "rollback-migration.json").touch() + + files = detach.get_autoforge_files(self.project_dir) + names = {f.name for f in files} + self.assertIn("rollback-test-translated.json", names) + self.assertIn("rollback-migration.json", names) + self.assertEqual(len(files), 2) + + def test_detects_create_test_php_at_root(self): + """Should detect create-*-test*.php files at project root.""" + (self.project_dir / "create-xss-test.php").touch() + (self.project_dir / "create-csrf-test-page.php").touch() + + files = detach.get_autoforge_files(self.project_dir) + names = {f.name for f in files} + self.assertIn("create-xss-test.php", names) + self.assertIn("create-csrf-test-page.php", names) + self.assertEqual(len(files), 2) + + def test_excludes_artifacts_when_disabled(self): + """Should exclude .playwright-mcp when include_artifacts=False.""" + (self.project_dir / ".playwright-mcp").mkdir() + (self.project_dir / "features.db").touch() + + # With artifacts + files_with = detach.get_autoforge_files(self.project_dir, include_artifacts=True) + names_with = {f.name for f in files_with} + self.assertIn(".playwright-mcp", names_with) + + # Without artifacts + files_without = detach.get_autoforge_files(self.project_dir, include_artifacts=False) + names_without = {f.name for f in files_without} + self.assertNotIn(".playwright-mcp", names_without) + self.assertIn("features.db", names_without) + + def test_returns_empty_for_non_autoforge_project(self): + """Should return empty list for projects without AutoForge files.""" + (self.project_dir / "src").mkdir() + (self.project_dir / "package.json").touch() + files = detach.get_autoforge_files(self.project_dir) + self.assertEqual(len(files), 0) + + def test_returns_sorted_results(self): + """Should return files sorted by name.""" + (self.project_dir / "prompts").mkdir() + (self.project_dir / ".autoforge").mkdir() + (self.project_dir / "features.db").touch() + files = detach.get_autoforge_files(self.project_dir) + names = [f.name for f in files] + self.assertEqual(names, sorted(names)) + + +class TestBackupCreation(unittest.TestCase): + """Tests for create_backup function.""" + + def setUp(self): + """Create temporary project with AutoForge files.""" + self.temp_dir = tempfile.mkdtemp() + self.project_dir = Path(self.temp_dir) + + # Create AutoForge files + (self.project_dir / ".autoforge").mkdir() + (self.project_dir / ".autoforge" / "config.yaml").write_text("test: true") + (self.project_dir / "prompts").mkdir() + (self.project_dir / "prompts" / "app_spec.txt").write_text("spec content") + (self.project_dir / "features.db").write_bytes(b"SQLite database") + + def tearDown(self): + """Clean up temporary directory.""" + shutil.rmtree(self.temp_dir) + + def test_creates_backup_directory(self): + """Should create .autoforge-backup directory.""" + files = detach.get_autoforge_files(self.project_dir) + detach.create_backup(self.project_dir, "test-project", files) + + backup_dir = self.project_dir / detach.BACKUP_DIR + self.assertTrue(backup_dir.exists()) + + def test_moves_files_to_backup(self): + """Should move all files to backup directory.""" + files = detach.get_autoforge_files(self.project_dir) + detach.create_backup(self.project_dir, "test-project", files) + + backup_dir = self.project_dir / detach.BACKUP_DIR + + # Original locations should be gone + self.assertFalse((self.project_dir / ".autoforge").exists()) + self.assertFalse((self.project_dir / "prompts").exists()) + self.assertFalse((self.project_dir / "features.db").exists()) + + # Backup locations should exist + self.assertTrue((backup_dir / ".autoforge").exists()) + self.assertTrue((backup_dir / "prompts").exists()) + self.assertTrue((backup_dir / "features.db").exists()) + + def test_creates_manifest(self): + """Should create manifest.json with correct structure.""" + files = detach.get_autoforge_files(self.project_dir) + manifest = detach.create_backup(self.project_dir, "test-project", files) + + # Check manifest structure + self.assertEqual(manifest["version"], detach.MANIFEST_VERSION) + self.assertEqual(manifest["project_name"], "test-project") + self.assertIn("detached_at", manifest) + self.assertIn("files", manifest) + self.assertIn("total_size_bytes", manifest) + self.assertIn("file_count", manifest) + + # Check manifest file exists + manifest_path = self.project_dir / detach.BACKUP_DIR / detach.MANIFEST_FILE + self.assertTrue(manifest_path.exists()) + + def test_manifest_contains_checksums(self): + """Should include checksums for files.""" + files = detach.get_autoforge_files(self.project_dir) + manifest = detach.create_backup(self.project_dir, "test-project", files) + + for entry in manifest["files"]: + if entry["type"] == "file": + self.assertIsNotNone(entry["checksum"]) + else: + self.assertIsNone(entry["checksum"]) + + def test_dry_run_does_not_move_files(self): + """Dry run should not move files or create backup.""" + files = detach.get_autoforge_files(self.project_dir) + manifest = detach.create_backup(self.project_dir, "test-project", files, dry_run=True) + + # Original files should still exist + self.assertTrue((self.project_dir / ".autoforge").exists()) + self.assertTrue((self.project_dir / "prompts").exists()) + self.assertTrue((self.project_dir / "features.db").exists()) + + # Backup should not exist + backup_dir = self.project_dir / detach.BACKUP_DIR + self.assertFalse(backup_dir.exists()) + + # Manifest should still be returned + self.assertIsNotNone(manifest) + self.assertEqual(manifest["project_name"], "test-project") + + +class TestBackupRestore(unittest.TestCase): + """Tests for restore_backup function.""" + + def setUp(self): + """Create temporary project with backup.""" + self.temp_dir = tempfile.mkdtemp() + self.project_dir = Path(self.temp_dir) + + # Create AutoForge files and backup them + (self.project_dir / ".autoforge").mkdir() + (self.project_dir / ".autoforge" / "config.yaml").write_text("test: true") + (self.project_dir / "prompts").mkdir() + (self.project_dir / "prompts" / "app_spec.txt").write_text("spec content") + (self.project_dir / "features.db").write_bytes(b"SQLite database") + + files = detach.get_autoforge_files(self.project_dir) + detach.create_backup(self.project_dir, "test-project", files) + + def tearDown(self): + """Clean up temporary directory.""" + shutil.rmtree(self.temp_dir) + + def test_restores_files_from_backup(self): + """Should restore all files from backup.""" + success, files_restored, conflicts = detach.restore_backup(self.project_dir) + + self.assertTrue(success) + self.assertEqual(files_restored, 3) # .autoforge, prompts, features.db + self.assertEqual(conflicts, []) # No conflicts expected + + # Files should be restored + self.assertTrue((self.project_dir / ".autoforge").exists()) + self.assertTrue((self.project_dir / "prompts").exists()) + self.assertTrue((self.project_dir / "features.db").exists()) + + def test_removes_backup_after_restore(self): + """Should remove backup directory after successful restore.""" + detach.restore_backup(self.project_dir) + + backup_dir = self.project_dir / detach.BACKUP_DIR + self.assertFalse(backup_dir.exists()) + + def test_restores_file_contents(self): + """Should restore correct file contents.""" + detach.restore_backup(self.project_dir) + + config_content = (self.project_dir / ".autoforge" / "config.yaml").read_text() + self.assertEqual(config_content, "test: true") + + spec_content = (self.project_dir / "prompts" / "app_spec.txt").read_text() + self.assertEqual(spec_content, "spec content") + + def test_fails_without_backup(self): + """Should fail gracefully if no backup exists.""" + # Remove backup + shutil.rmtree(self.project_dir / detach.BACKUP_DIR) + + success, files_restored, conflicts = detach.restore_backup(self.project_dir) + self.assertFalse(success) + self.assertEqual(files_restored, 0) + self.assertEqual(conflicts, []) + + def test_partial_restore_removes_manifest(self): + """Partial restore should remove manifest to allow re-detach.""" + # Remove one backup file to simulate partial restore + backup_dir = self.project_dir / detach.BACKUP_DIR + (backup_dir / "features.db").unlink() + + success, files_restored, conflicts = detach.restore_backup(self.project_dir) + + # Should fail (partial restore) + self.assertFalse(success) + # Manifest should be removed to allow re-detach + self.assertFalse((backup_dir / detach.MANIFEST_FILE).exists()) + # Backup directory should still exist (preserving remaining files) + self.assertTrue(backup_dir.exists()) + + +class TestDetachStatus(unittest.TestCase): + """Tests for status checking functions.""" + + def setUp(self): + """Create temporary project directory.""" + self.temp_dir = tempfile.mkdtemp() + self.project_dir = Path(self.temp_dir) + + def tearDown(self): + """Clean up temporary directory.""" + shutil.rmtree(self.temp_dir) + + def test_is_project_detached_false(self): + """Should return False for non-detached project.""" + (self.project_dir / "features.db").touch() + self.assertFalse(detach.is_project_detached(self.project_dir)) + + def test_is_project_detached_true(self): + """Should return True for detached project.""" + backup_dir = self.project_dir / detach.BACKUP_DIR + backup_dir.mkdir() + (backup_dir / detach.MANIFEST_FILE).write_text("{}") + self.assertTrue(detach.is_project_detached(self.project_dir)) + + def test_has_backup(self): + """Should correctly detect backup existence.""" + self.assertFalse(detach.has_backup(self.project_dir)) + + backup_dir = self.project_dir / detach.BACKUP_DIR + backup_dir.mkdir() + (backup_dir / detach.MANIFEST_FILE).write_text("{}") + + self.assertTrue(detach.has_backup(self.project_dir)) + + def test_get_backup_info(self): + """Should return manifest info when backup exists.""" + backup_dir = self.project_dir / detach.BACKUP_DIR + backup_dir.mkdir() + + manifest = { + "version": 1, + "project_name": "test", + "total_size_bytes": 1000, + "file_count": 5, + } + (backup_dir / detach.MANIFEST_FILE).write_text(json.dumps(manifest)) + + info = detach.get_backup_info(self.project_dir) + self.assertIsNotNone(info) + self.assertEqual(info["project_name"], "test") + self.assertEqual(info["total_size_bytes"], 1000) + + def test_get_backup_info_returns_none_without_backup(self): + """Should return None when no backup exists.""" + info = detach.get_backup_info(self.project_dir) + self.assertIsNone(info) + + @patch('detach.get_project_path') + def test_get_detach_status_reports_state(self, mock_get_path): + """Should report state field in detach status.""" + mock_get_path.return_value = self.project_dir + + # Attached state + (self.project_dir / "features.db").touch() + status = detach.get_detach_status("test-project") + self.assertEqual(status["state"], "attached") + self.assertFalse(status["is_detached"]) + self.assertFalse(status["is_inconsistent"]) + self.assertEqual(status["files_at_root"], 1) + + @patch('detach.get_project_path') + def test_get_detach_status_reports_inconsistent(self, mock_get_path): + """Should report inconsistent state in detach status.""" + mock_get_path.return_value = self.project_dir + + # Create both files at root AND backup manifest + (self.project_dir / "features.db").touch() + backup_dir = self.project_dir / detach.BACKUP_DIR + backup_dir.mkdir() + (backup_dir / detach.MANIFEST_FILE).write_text("{}") + + status = detach.get_detach_status("test-project") + self.assertEqual(status["state"], "inconsistent") + self.assertFalse(status["is_detached"]) + self.assertTrue(status["is_inconsistent"]) + self.assertEqual(status["files_at_root"], 1) + self.assertTrue(status["backup_exists"]) + + +class TestProjectDetachState(unittest.TestCase): + """Tests for get_project_detach_state function.""" + + def setUp(self): + """Create temporary project directory.""" + self.temp_dir = tempfile.mkdtemp() + self.project_dir = Path(self.temp_dir) + + def tearDown(self): + """Clean up temporary directory.""" + shutil.rmtree(self.temp_dir) + + def test_state_clean_no_files_no_manifest(self): + """Should return 'clean' when no files and no manifest.""" + state, files = detach.get_project_detach_state(self.project_dir) + self.assertEqual(state, "clean") + self.assertEqual(files, []) + + def test_state_attached_files_present(self): + """Should return 'attached' when files present, no manifest.""" + (self.project_dir / "features.db").touch() + (self.project_dir / ".autoforge").mkdir() + + state, files = detach.get_project_detach_state(self.project_dir) + self.assertEqual(state, "attached") + self.assertEqual(len(files), 2) + + def test_state_detached_manifest_only(self): + """Should return 'detached' when manifest exists, no files at root.""" + backup_dir = self.project_dir / detach.BACKUP_DIR + backup_dir.mkdir() + (backup_dir / detach.MANIFEST_FILE).write_text("{}") + + state, files = detach.get_project_detach_state(self.project_dir) + self.assertEqual(state, "detached") + self.assertEqual(files, []) + + def test_state_inconsistent_both_exist(self): + """Should return 'inconsistent' when both manifest and files exist.""" + # Create backup with manifest + backup_dir = self.project_dir / detach.BACKUP_DIR + backup_dir.mkdir() + (backup_dir / detach.MANIFEST_FILE).write_text("{}") + + # Also create files at root (simulating partial reattach) + (self.project_dir / "features.db").touch() + (self.project_dir / ".autoforge").mkdir() + + state, files = detach.get_project_detach_state(self.project_dir) + self.assertEqual(state, "inconsistent") + self.assertEqual(len(files), 2) + + +class TestDetachProject(unittest.TestCase): + """Tests for detach_project function.""" + + def setUp(self): + """Create temporary project with AutoForge files.""" + self.temp_dir = tempfile.mkdtemp() + self.project_dir = Path(self.temp_dir) + + # Create AutoForge files + (self.project_dir / ".autoforge").mkdir() + (self.project_dir / "features.db").touch() + (self.project_dir / "prompts").mkdir() + + def tearDown(self): + """Clean up temporary directory.""" + shutil.rmtree(self.temp_dir) + + @patch('detach.get_project_path') + def test_detach_by_path(self, mock_get_path): + """Should detach project by path.""" + mock_get_path.return_value = None + + success, message, manifest, user_files_restored = detach.detach_project(str(self.project_dir)) + + self.assertTrue(success) + self.assertIn("files", message) + self.assertIsNotNone(manifest) + self.assertEqual(user_files_restored, 0) + + @patch('detach.get_project_path') + def test_detach_by_name(self, mock_get_path): + """Should detach project by registry name.""" + mock_get_path.return_value = self.project_dir + + success, message, manifest, user_files_restored = detach.detach_project("test-project") + + self.assertTrue(success) + self.assertIsNotNone(manifest) + self.assertEqual(user_files_restored, 0) + + @patch('detach.get_project_path') + def test_fails_if_already_detached(self, mock_get_path): + """Should fail if project is already detached (clean detach state).""" + mock_get_path.return_value = self.project_dir + + # Remove AutoForge files from root to simulate clean detach + shutil.rmtree(self.project_dir / ".autoforge") + (self.project_dir / "features.db").unlink() + shutil.rmtree(self.project_dir / "prompts") + + # Create backup (simulating files moved to backup) + backup_dir = self.project_dir / detach.BACKUP_DIR + backup_dir.mkdir() + (backup_dir / detach.MANIFEST_FILE).write_text("{}") + + success, message, manifest, user_files_restored = detach.detach_project("test-project") + + self.assertFalse(success) + self.assertIn("already detached", message) + self.assertEqual(user_files_restored, 0) + + @patch('detach.get_project_path') + def test_fails_if_agent_running(self, mock_get_path): + """Should fail if agent is running (lock file exists).""" + mock_get_path.return_value = self.project_dir + + # Create agent lock + (self.project_dir / ".agent.lock").touch() + + success, message, manifest, user_files_restored = detach.detach_project("test-project") + + self.assertFalse(success) + self.assertIn("Agent is currently running", message) + self.assertEqual(user_files_restored, 0) + + @patch('detach.get_project_path') + def test_force_bypasses_agent_check(self, mock_get_path): + """Should bypass agent check with force=True.""" + mock_get_path.return_value = self.project_dir + + # Create agent lock + (self.project_dir / ".agent.lock").touch() + + success, message, manifest, user_files_restored = detach.detach_project("test-project", force=True) + + self.assertTrue(success) + self.assertEqual(user_files_restored, 0) + + @patch('detach.get_project_path') + def test_fails_if_no_autoforge_files(self, mock_get_path): + """Should fail if no AutoForge files found.""" + # Remove AutoForge files + shutil.rmtree(self.project_dir / ".autoforge") + (self.project_dir / "features.db").unlink() + shutil.rmtree(self.project_dir / "prompts") + + mock_get_path.return_value = self.project_dir + + success, message, manifest, user_files_restored = detach.detach_project("test-project") + + self.assertFalse(success) + self.assertIn("No AutoForge files found", message) + self.assertEqual(user_files_restored, 0) + + @patch('detach.get_project_path') + def test_fails_on_inconsistent_state_without_force(self, mock_get_path): + """Should fail on inconsistent state without --force.""" + mock_get_path.return_value = self.project_dir + + # Create backup with manifest (simulating previous partial reattach) + backup_dir = self.project_dir / detach.BACKUP_DIR + backup_dir.mkdir() + (backup_dir / detach.MANIFEST_FILE).write_text("{}") + + # AutoForge files also exist at root + success, message, manifest, user_files_restored = detach.detach_project("test-project") + + self.assertFalse(success) + self.assertIn("Inconsistent state", message) + self.assertIn("--force", message) + + @patch('detach.get_project_path') + def test_force_cleans_inconsistent_state(self, mock_get_path): + """Should clean up old backup with --force on inconsistent state.""" + mock_get_path.return_value = self.project_dir + + # Create backup with manifest (simulating previous partial reattach) + backup_dir = self.project_dir / detach.BACKUP_DIR + backup_dir.mkdir() + (backup_dir / detach.MANIFEST_FILE).write_text("{}") + (backup_dir / "old_features.db").write_bytes(b"old backup content") + + # AutoForge files also exist at root (from partial reattach) + success, message, manifest, user_files_restored = detach.detach_project( + "test-project", force=True + ) + + self.assertTrue(success) + self.assertIn("files", message) + # New backup should be created with fresh data + self.assertTrue((backup_dir / detach.MANIFEST_FILE).exists()) + + @patch('detach.get_project_path') + def test_cleans_orphaned_backup_directory(self, mock_get_path): + """Detach should clean up orphaned backup directory (no manifest). + + This can happen after partial reattach removes manifest but keeps + backup files due to restore failures. + """ + mock_get_path.return_value = self.project_dir + + # Create orphaned backup directory (simulates partial reattach) + backup_dir = self.project_dir / detach.BACKUP_DIR + backup_dir.mkdir() + (backup_dir / ".autoforge").mkdir() + (backup_dir / "old_features.db").write_bytes(b"orphaned backup content") + # NO manifest.json - this is the orphaned state + + # Detach should succeed and clean up orphaned backup first + success, message, manifest, user_files_restored = detach.detach_project("test-project") + + self.assertTrue(success) + self.assertIn("files", message) + + # Verify features.db moved to backup (not orphaned old backup) + self.assertFalse((self.project_dir / "features.db").exists()) + self.assertTrue((backup_dir / "features.db").exists()) + + # Verify orphaned files were cleaned up and replaced with new backup + self.assertFalse((backup_dir / "old_features.db").exists()) + self.assertTrue((backup_dir / detach.MANIFEST_FILE).exists()) + + +class TestReattachProject(unittest.TestCase): + """Tests for reattach_project function.""" + + def setUp(self): + """Create temporary project with backup.""" + self.temp_dir = tempfile.mkdtemp() + self.project_dir = Path(self.temp_dir) + + # Create and backup AutoForge files + (self.project_dir / ".autoforge").mkdir() + (self.project_dir / "features.db").write_bytes(b"test") + + files = detach.get_autoforge_files(self.project_dir) + detach.create_backup(self.project_dir, "test-project", files) + + def tearDown(self): + """Clean up temporary directory.""" + shutil.rmtree(self.temp_dir) + + @patch('detach.get_project_path') + def test_reattach_restores_files(self, mock_get_path): + """Should restore files from backup.""" + mock_get_path.return_value = self.project_dir + + success, message, files_restored, conflicts = detach.reattach_project("test-project") + + self.assertTrue(success) + self.assertGreater(files_restored, 0) + self.assertEqual(conflicts, []) + self.assertTrue((self.project_dir / ".autoforge").exists()) + self.assertTrue((self.project_dir / "features.db").exists()) + + @patch('detach.get_project_path') + def test_reattach_fails_without_backup(self, mock_get_path): + """Should fail if no backup exists.""" + mock_get_path.return_value = self.project_dir + + # Remove backup + shutil.rmtree(self.project_dir / detach.BACKUP_DIR) + + success, message, files_restored, conflicts = detach.reattach_project("test-project") + + self.assertFalse(success) + self.assertIn("No backup found", message) + self.assertEqual(conflicts, []) + + @patch('detach.get_project_path') + def test_reattach_fails_when_agent_running(self, mock_get_path): + """Should fail if agent lock exists.""" + mock_get_path.return_value = self.project_dir + + # Create agent lock file + (self.project_dir / ".agent.lock").touch() + + success, message, files_restored, conflicts = detach.reattach_project("test-project") + + self.assertFalse(success) + self.assertIn("Agent is currently running", message) + self.assertEqual(files_restored, 0) + self.assertEqual(conflicts, []) + + @patch('detach.get_project_path') + def test_reattach_fails_when_already_attached(self, mock_get_path): + """Should fail if project is already attached (no backup, files at root).""" + mock_get_path.return_value = self.project_dir + + # Remove backup but keep files at root + shutil.rmtree(self.project_dir / detach.BACKUP_DIR) + (self.project_dir / "features.db").write_bytes(b"test") + (self.project_dir / ".autoforge").mkdir() + + success, message, files_restored, conflicts = detach.reattach_project("test-project") + + self.assertFalse(success) + self.assertIn("already attached", message) + self.assertEqual(files_restored, 0) + self.assertEqual(conflicts, []) + + @patch('detach.get_project_path') + def test_reattach_handles_inconsistent_state_by_restoring(self, mock_get_path): + """Should restore from backup even if some files exist at root (user-created). + + This handles the case where user creates files while detached. + The conflicting files get backed up to .pre-reattach-backup/ before restore. + """ + mock_get_path.return_value = self.project_dir + + # Backup exists from setUp (with features.db and .autoforge in backup) + # Add files at root too (simulates user creating files while detached) + (self.project_dir / "features.db").write_bytes(b"user-created") + (self.project_dir / ".autoforge").mkdir() + + success, message, files_restored, conflicts = detach.reattach_project("test-project") + + # Should succeed - user files get backed up, autoforge files restored + self.assertTrue(success) + self.assertIn("features.db", conflicts) # User file was backed up + self.assertGreater(files_restored, 0) + + +class TestGitignoreUpdate(unittest.TestCase): + """Tests for update_gitignore function.""" + + def setUp(self): + """Create temporary project directory.""" + self.temp_dir = tempfile.mkdtemp() + self.project_dir = Path(self.temp_dir) + + def tearDown(self): + """Clean up temporary directory.""" + shutil.rmtree(self.temp_dir) + + def test_creates_gitignore_if_missing(self): + """Should create .gitignore if it doesn't exist.""" + detach.update_gitignore(self.project_dir) + + gitignore = self.project_dir / ".gitignore" + self.assertTrue(gitignore.exists()) + content = gitignore.read_text() + self.assertIn(detach.BACKUP_DIR, content) + + def test_appends_to_existing_gitignore(self): + """Should append to existing .gitignore.""" + gitignore = self.project_dir / ".gitignore" + gitignore.write_text("node_modules/\n") + + detach.update_gitignore(self.project_dir) + + content = gitignore.read_text() + self.assertIn("node_modules/", content) + self.assertIn(detach.BACKUP_DIR, content) + + def test_does_not_duplicate_entry(self): + """Should not add duplicate entry.""" + gitignore = self.project_dir / ".gitignore" + gitignore.write_text(f"{detach.BACKUP_DIR}/\n") + + detach.update_gitignore(self.project_dir) + + content = gitignore.read_text() + # Should only appear once + self.assertEqual(content.count(detach.BACKUP_DIR), 1) + + +class TestDetachLock(unittest.TestCase): + """Tests for detach lock functions.""" + + def setUp(self): + """Create temporary project directory.""" + self.temp_dir = tempfile.mkdtemp() + self.project_dir = Path(self.temp_dir) + + def tearDown(self): + """Clean up temporary directory.""" + shutil.rmtree(self.temp_dir) + + def test_acquire_lock(self): + """Should acquire lock successfully.""" + result = detach.acquire_detach_lock(self.project_dir) + self.assertTrue(result) + self.assertTrue((self.project_dir / detach.DETACH_LOCK).exists()) + + def test_acquire_lock_writes_pid_and_timestamp(self): + """Should write PID and timestamp to lock file.""" + import os + detach.acquire_detach_lock(self.project_dir) + lock_content = json.loads((self.project_dir / detach.DETACH_LOCK).read_text()) + self.assertEqual(lock_content["pid"], os.getpid()) + self.assertIn("timestamp", lock_content) + + def test_acquire_lock_fails_if_locked_by_live_process(self): + """Should fail to acquire lock if already locked by live process.""" + import os + # Create lock with current process PID (which is alive) + lock_data = {"pid": os.getpid(), "timestamp": 9999999999} + (self.project_dir / detach.DETACH_LOCK).write_text(json.dumps(lock_data)) + result = detach.acquire_detach_lock(self.project_dir) + self.assertFalse(result) + + def test_acquire_lock_removes_stale_lock_dead_process(self): + """Should remove stale lock from dead process.""" + # Create lock with non-existent PID + lock_data = {"pid": 999999999, "timestamp": 9999999999} + (self.project_dir / detach.DETACH_LOCK).write_text(json.dumps(lock_data)) + result = detach.acquire_detach_lock(self.project_dir) + self.assertTrue(result) # Should succeed after removing stale lock + + def test_acquire_lock_removes_corrupted_lock(self): + """Should remove corrupted lock file.""" + (self.project_dir / detach.DETACH_LOCK).write_text("not valid json") + result = detach.acquire_detach_lock(self.project_dir) + self.assertTrue(result) + + def test_release_lock(self): + """Should release lock successfully.""" + (self.project_dir / detach.DETACH_LOCK).write_text("{}") + detach.release_detach_lock(self.project_dir) + self.assertFalse((self.project_dir / detach.DETACH_LOCK).exists()) + + def test_release_lock_handles_missing_file(self): + """Should handle missing lock file gracefully.""" + # Should not raise + detach.release_detach_lock(self.project_dir) + + +class TestSecurityPathTraversal(unittest.TestCase): + """Security tests for path traversal protection.""" + + def setUp(self): + """Create temporary project with backup.""" + self.temp_dir = tempfile.mkdtemp() + self.project_dir = Path(self.temp_dir) + + # Create backup directory + backup_dir = self.project_dir / detach.BACKUP_DIR + backup_dir.mkdir() + + def tearDown(self): + """Clean up temporary directory.""" + shutil.rmtree(self.temp_dir) + + def test_restore_blocks_path_traversal(self): + """Should reject manifest with path traversal attempt.""" + backup_dir = self.project_dir / detach.BACKUP_DIR + + # Create malicious manifest with path traversal + manifest = { + "version": 1, + "detached_at": "2024-01-01T00:00:00Z", + "project_name": "malicious", + "autocoder_version": "1.0.0", + "files": [ + { + "path": "../../../etc/passwd", # Path traversal attempt + "type": "file", + "size": 100, + "checksum": None, + "file_count": None, + } + ], + "total_size_bytes": 100, + "file_count": 1, + } + (backup_dir / detach.MANIFEST_FILE).write_text(json.dumps(manifest)) + + # Note: We don't need to create the actual malicious file - the validation + # catches it during path resolution before attempting to access the source file + + success, _, _ = detach.restore_backup(self.project_dir) + self.assertFalse(success) + + def test_restore_blocks_absolute_path(self): + """Should reject manifest with absolute path.""" + backup_dir = self.project_dir / detach.BACKUP_DIR + + manifest = { + "version": 1, + "detached_at": "2024-01-01T00:00:00Z", + "project_name": "malicious", + "autocoder_version": "1.0.0", + "files": [ + { + "path": "/etc/passwd", # Absolute path + "type": "file", + "size": 100, + "checksum": None, + "file_count": None, + } + ], + "total_size_bytes": 100, + "file_count": 1, + } + (backup_dir / detach.MANIFEST_FILE).write_text(json.dumps(manifest)) + + success, _, _ = detach.restore_backup(self.project_dir) + self.assertFalse(success) + + def test_restore_rejects_unsupported_manifest_version(self): + """Should reject manifest with unsupported version.""" + backup_dir = self.project_dir / detach.BACKUP_DIR + + manifest = { + "version": 999, # Future version + "detached_at": "2024-01-01T00:00:00Z", + "files": [], + "total_size_bytes": 0, + "file_count": 0, + } + (backup_dir / detach.MANIFEST_FILE).write_text(json.dumps(manifest)) + + success, _, _ = detach.restore_backup(self.project_dir) + self.assertFalse(success) + + def test_restore_rejects_invalid_manifest_structure(self): + """Should reject manifest with missing required keys.""" + backup_dir = self.project_dir / detach.BACKUP_DIR + + # Missing required keys + manifest = { + "version": 1, + # missing "files" and "detached_at" + } + (backup_dir / detach.MANIFEST_FILE).write_text(json.dumps(manifest)) + + success, _, _ = detach.restore_backup(self.project_dir) + self.assertFalse(success) + + +class TestGitignoreLineMatching(unittest.TestCase): + """Tests for gitignore line-based matching.""" + + def setUp(self): + """Create temporary project directory.""" + self.temp_dir = tempfile.mkdtemp() + self.project_dir = Path(self.temp_dir) + + def tearDown(self): + """Clean up temporary directory.""" + shutil.rmtree(self.temp_dir) + + def test_does_not_match_comment(self): + """Should not match backup dir name in comment.""" + gitignore = self.project_dir / ".gitignore" + gitignore.write_text(f"# Ignore {detach.BACKUP_DIR} directory\nnode_modules/\n") + + detach.update_gitignore(self.project_dir) + + content = gitignore.read_text() + # Should have added the actual entry + lines = [line.strip() for line in content.splitlines()] + self.assertIn(f"{detach.BACKUP_DIR}/", lines) + + def test_does_not_match_path_substring(self): + """Should not match backup dir name as substring of path.""" + gitignore = self.project_dir / ".gitignore" + gitignore.write_text(f"some/path/{detach.BACKUP_DIR}/other\n") + + detach.update_gitignore(self.project_dir) + + content = gitignore.read_text() + # Should have added the standalone entry + lines = [line.strip() for line in content.splitlines()] + self.assertIn(f"{detach.BACKUP_DIR}/", lines) + + def test_matches_exact_entry(self): + """Should match exact entry and not duplicate.""" + gitignore = self.project_dir / ".gitignore" + gitignore.write_text(f"{detach.BACKUP_DIR}/\n") + + detach.update_gitignore(self.project_dir) + + content = gitignore.read_text() + # Should only appear once + self.assertEqual(content.count(f"{detach.BACKUP_DIR}/"), 1) + + +class TestBackupAtomicity(unittest.TestCase): + """Tests for atomic backup operations (copy-then-delete).""" + + def setUp(self): + """Create temporary project with files.""" + self.temp_dir = tempfile.mkdtemp() + self.project_dir = Path(self.temp_dir) + + # Create AutoForge files (only regular files to test copy2) + (self.project_dir / "features.db").write_bytes(b"database content") + (self.project_dir / "CLAUDE.md").write_text("# Test") + + def tearDown(self): + """Clean up temporary directory.""" + shutil.rmtree(self.temp_dir) + + def test_backup_preserves_originals_on_copy_failure(self): + """Should preserve originals if copy fails.""" + files = detach.get_autoforge_files(self.project_dir) + + # Mock shutil.copy2 to fail on second file + original_copy2 = shutil.copy2 + call_count = [0] + + def failing_copy2(src, dst): + call_count[0] += 1 + if call_count[0] > 1: + raise OSError("Simulated copy failure") + return original_copy2(src, dst) + + with patch('detach.shutil.copy2', side_effect=failing_copy2): + with self.assertRaises(OSError): + detach.create_backup(self.project_dir, "test", files) + + # Original files should still exist + self.assertTrue((self.project_dir / "CLAUDE.md").exists()) + self.assertTrue((self.project_dir / "features.db").exists()) + + # Backup directory should be cleaned up + self.assertFalse((self.project_dir / detach.BACKUP_DIR).exists()) + + +class TestFileConflictDetection(unittest.TestCase): + """Tests for file conflict detection and backup during reattach.""" + + def setUp(self): + """Create temporary project directory.""" + self.temp_dir = tempfile.mkdtemp() + self.project_dir = Path(self.temp_dir) + + def tearDown(self): + """Clean up temporary directory.""" + shutil.rmtree(self.temp_dir) + + def test_detect_conflicts_no_conflicts(self): + """Should return empty list when no conflicts exist.""" + manifest: detach.Manifest = { + "version": 1, + "detached_at": "2024-01-01T00:00:00Z", + "project_name": "test", + "autocoder_version": "1.0.0", + "files": [ + {"path": "CLAUDE.md", "type": "file", "size": 100, "checksum": None, "file_count": None} + ], + "total_size_bytes": 100, + "file_count": 1, + } + conflicts = detach.detect_conflicts(self.project_dir, manifest) + self.assertEqual(conflicts, []) + + def test_detect_conflicts_with_file(self): + """Should detect conflicting file.""" + (self.project_dir / "CLAUDE.md").write_text("User content") + + manifest: detach.Manifest = { + "version": 1, + "detached_at": "2024-01-01T00:00:00Z", + "project_name": "test", + "autocoder_version": "1.0.0", + "files": [ + {"path": "CLAUDE.md", "type": "file", "size": 100, "checksum": None, "file_count": None} + ], + "total_size_bytes": 100, + "file_count": 1, + } + conflicts = detach.detect_conflicts(self.project_dir, manifest) + self.assertEqual(conflicts, ["CLAUDE.md"]) + + def test_detect_conflicts_with_directory(self): + """Should detect conflicting directory.""" + (self.project_dir / "prompts").mkdir() + (self.project_dir / "prompts" / "user_file.txt").write_text("User file") + + manifest: detach.Manifest = { + "version": 1, + "detached_at": "2024-01-01T00:00:00Z", + "project_name": "test", + "autocoder_version": "1.0.0", + "files": [ + {"path": "prompts", "type": "directory", "size": 100, "checksum": None, "file_count": 1} + ], + "total_size_bytes": 100, + "file_count": 1, + } + conflicts = detach.detect_conflicts(self.project_dir, manifest) + self.assertEqual(conflicts, ["prompts"]) + + def test_backup_conflicts_creates_backup(self): + """Should backup conflicting files to pre-reattach-backup dir.""" + (self.project_dir / "CLAUDE.md").write_text("User content") + conflicts = ["CLAUDE.md"] + + backup_path = detach.backup_conflicts(self.project_dir, conflicts) + + self.assertEqual(backup_path, self.project_dir / detach.PRE_REATTACH_BACKUP_DIR) + self.assertTrue((backup_path / "CLAUDE.md").exists()) + self.assertEqual((backup_path / "CLAUDE.md").read_text(), "User content") + + @patch('detach.get_project_path') + def test_reattach_with_conflicts_preserves_new_files(self, mock_get_path): + """Should backup user files when they conflict with autoforge files.""" + mock_get_path.return_value = self.project_dir + + # Create AutoForge files and backup them + (self.project_dir / "CLAUDE.md").write_text("AutoForge content") + (self.project_dir / "features.db").write_bytes(b"test") + files = detach.get_autoforge_files(self.project_dir) + detach.create_backup(self.project_dir, "test-project", files) + + # Simulate user creating CLAUDE.md while detached + (self.project_dir / "CLAUDE.md").write_text("User content after /init") + + # Reattach + success, message, files_restored, conflicts = detach.reattach_project("test-project") + + self.assertTrue(success) + self.assertEqual(conflicts, ["CLAUDE.md"]) + self.assertIn("user files saved", message) + + # AutoForge content restored + self.assertEqual((self.project_dir / "CLAUDE.md").read_text(), "AutoForge content") + + # User content backed up + backup_path = self.project_dir / detach.PRE_REATTACH_BACKUP_DIR + self.assertTrue(backup_path.exists()) + self.assertEqual((backup_path / "CLAUDE.md").read_text(), "User content after /init") + + @patch('detach.get_project_path') + def test_reattach_no_conflicts_no_backup(self, mock_get_path): + """Should not create backup directory when no conflicts exist.""" + mock_get_path.return_value = self.project_dir + + # Create AutoForge files and backup them + (self.project_dir / "CLAUDE.md").write_text("AutoForge content") + files = detach.get_autoforge_files(self.project_dir) + detach.create_backup(self.project_dir, "test-project", files) + + # No user files created (no conflict) + + # Reattach + success, message, files_restored, conflicts = detach.reattach_project("test-project") + + self.assertTrue(success) + self.assertEqual(conflicts, []) + self.assertNotIn("user files", message) + + # No pre-reattach backup should exist + backup_path = self.project_dir / detach.PRE_REATTACH_BACKUP_DIR + self.assertFalse(backup_path.exists()) + + def test_restore_pre_reattach_backup(self): + """Should restore user files from pre-reattach backup.""" + # Create pre-reattach backup + backup_dir = self.project_dir / detach.PRE_REATTACH_BACKUP_DIR + backup_dir.mkdir() + (backup_dir / "CLAUDE.md").write_text("User content") + (backup_dir / "nested").mkdir() + (backup_dir / "nested" / "file.txt").write_text("Nested user file") + + # Restore + files_restored = detach.restore_pre_reattach_backup(self.project_dir) + + self.assertEqual(files_restored, 2) + self.assertEqual((self.project_dir / "CLAUDE.md").read_text(), "User content") + self.assertEqual((self.project_dir / "nested" / "file.txt").read_text(), "Nested user file") + + # Backup directory should be removed + self.assertFalse(backup_dir.exists()) + + def test_restore_pre_reattach_backup_path_traversal(self): + """Should skip files with path traversal in pre-reattach backup.""" + backup_dir = self.project_dir / detach.PRE_REATTACH_BACKUP_DIR + backup_dir.mkdir() + + # Create a normal file + (backup_dir / "safe.txt").write_text("Safe content") + + # Note: Path traversal protection is tested by the restore function + # which validates each path before restoring. We can't easily create + # a malicious backup file, but the validation logic ensures safety. + + # Restore should only restore safe.txt + files_restored = detach.restore_pre_reattach_backup(self.project_dir) + self.assertEqual(files_restored, 1) + self.assertEqual((self.project_dir / "safe.txt").read_text(), "Safe content") + + @patch('detach.get_project_path') + def test_detach_restores_user_files(self, mock_get_path): + """Should restore user files from pre-reattach backup on detach.""" + mock_get_path.return_value = self.project_dir + + # Create pre-reattach backup (from previous reattach) + backup_dir = self.project_dir / detach.PRE_REATTACH_BACKUP_DIR + backup_dir.mkdir() + (backup_dir / "CLAUDE.md").write_text("User content from previous session") + + # Create AutoForge files + (self.project_dir / ".autoforge").mkdir() + (self.project_dir / "features.db").touch() + + # Detach + success, message, manifest, user_files_restored = detach.detach_project("test-project") + + self.assertTrue(success) + self.assertEqual(user_files_restored, 1) + self.assertIn("restored 1 user files", message) + + # User file restored + self.assertEqual((self.project_dir / "CLAUDE.md").read_text(), "User content from previous session") + + # Pre-reattach backup cleaned up + self.assertFalse(backup_dir.exists()) + + @patch('detach.get_project_path') + def test_full_cycle_preserves_both_files(self, mock_get_path): + """Full cycle: detach -> create user file -> reattach -> detach preserves both.""" + mock_get_path.return_value = self.project_dir + + # Initial state: AutoForge files + (self.project_dir / "CLAUDE.md").write_text("AutoForge CLAUDE.md") + (self.project_dir / "features.db").touch() + + # Step 1: Detach + success, msg, manifest, user_restored = detach.detach_project("test-project") + self.assertTrue(success) + self.assertEqual(user_restored, 0) # No user files to restore initially + + # Step 2: User creates their own CLAUDE.md (e.g., via /init) + (self.project_dir / "CLAUDE.md").write_text("User CLAUDE.md from /init") + + # Step 3: Reattach + success, msg, files_restored, conflicts = detach.reattach_project("test-project") + self.assertTrue(success) + self.assertEqual(conflicts, ["CLAUDE.md"]) # User file was backed up + + # AutoForge content restored + self.assertEqual((self.project_dir / "CLAUDE.md").read_text(), "AutoForge CLAUDE.md") + + # User content in pre-reattach backup + self.assertEqual( + (self.project_dir / detach.PRE_REATTACH_BACKUP_DIR / "CLAUDE.md").read_text(), + "User CLAUDE.md from /init" + ) + + # Step 4: Detach again + success, msg, manifest, user_restored = detach.detach_project("test-project") + self.assertTrue(success) + self.assertEqual(user_restored, 1) # User file restored + + # User content back in place + self.assertEqual((self.project_dir / "CLAUDE.md").read_text(), "User CLAUDE.md from /init") + + # Pre-reattach backup cleaned up + self.assertFalse((self.project_dir / detach.PRE_REATTACH_BACKUP_DIR).exists()) + + # Verify features.db is in backup, not in project root (main bug fix verification) + backup_dir = self.project_dir / detach.BACKUP_DIR + self.assertFalse((self.project_dir / "features.db").exists()) + self.assertTrue((backup_dir / "features.db").exists()) + + @patch('detach.get_project_path') + def test_reattach_merges_existing_pre_reattach_backup(self, mock_get_path): + """Should merge new conflicts with existing pre-reattach backup.""" + mock_get_path.return_value = self.project_dir + + # Create existing pre-reattach backup + backup_dir = self.project_dir / detach.PRE_REATTACH_BACKUP_DIR + backup_dir.mkdir() + (backup_dir / "old_user_file.txt").write_text("Old user file") + + # Create AutoForge files and backup + (self.project_dir / "CLAUDE.md").write_text("AutoForge CLAUDE.md") + files = detach.get_autoforge_files(self.project_dir) + detach.create_backup(self.project_dir, "test", files) + + # User creates new CLAUDE.md + (self.project_dir / "CLAUDE.md").write_text("New user CLAUDE.md") + + # Reattach - should merge, not overwrite + success, msg, files_restored, conflicts = detach.reattach_project("test") + self.assertTrue(success) + self.assertEqual(conflicts, ["CLAUDE.md"]) + + # Both files should exist in backup + self.assertEqual((backup_dir / "old_user_file.txt").read_text(), "Old user file") + self.assertEqual((backup_dir / "CLAUDE.md").read_text(), "New user CLAUDE.md") + + def test_backup_conflicts_does_not_overwrite_existing(self): + """Backup should not overwrite existing files (merge mode).""" + # Create pre-reattach backup with existing file + backup_dir = self.project_dir / detach.PRE_REATTACH_BACKUP_DIR + backup_dir.mkdir() + (backup_dir / "CLAUDE.md").write_text("Original backup") + + # Create conflicting file + (self.project_dir / "CLAUDE.md").write_text("New content") + + # Backup conflicts + detach.backup_conflicts(self.project_dir, ["CLAUDE.md"]) + + # Original backup should be preserved + self.assertEqual((backup_dir / "CLAUDE.md").read_text(), "Original backup") + + +class TestGitignorePreReattachBackup(unittest.TestCase): + """Tests for .pre-reattach-backup/ in .gitignore.""" + + def setUp(self): + """Create temporary project directory.""" + self.temp_dir = tempfile.mkdtemp() + self.project_dir = Path(self.temp_dir) + + def tearDown(self): + """Clean up temporary directory.""" + shutil.rmtree(self.temp_dir) + + def test_gitignore_includes_both_backup_dirs(self): + """Should add both backup directories to .gitignore.""" + detach.update_gitignore(self.project_dir) + + gitignore = self.project_dir / ".gitignore" + content = gitignore.read_text() + lines = content.splitlines() + + # Both patterns should be present as standalone lines + self.assertTrue(any(line.strip() == f"{detach.BACKUP_DIR}/" for line in lines)) + self.assertTrue(any(line.strip() == f"{detach.PRE_REATTACH_BACKUP_DIR}/" for line in lines)) + + def test_gitignore_appends_missing_patterns(self): + """Should append only missing patterns to existing .gitignore.""" + gitignore = self.project_dir / ".gitignore" + gitignore.write_text(f"{detach.BACKUP_DIR}/\n") + + detach.update_gitignore(self.project_dir) + + content = gitignore.read_text() + # BACKUP_DIR should appear once, PRE_REATTACH_BACKUP_DIR should be added + self.assertEqual(content.count(f"{detach.BACKUP_DIR}/"), 1) + self.assertIn(f"{detach.PRE_REATTACH_BACKUP_DIR}/", content) + + +class TestOrphanedDbCleanup(unittest.TestCase): + """Tests for orphaned database file cleanup during reattach.""" + + def setUp(self): + """Create temporary project with backup.""" + self.temp_dir = tempfile.mkdtemp() + self.project_dir = Path(self.temp_dir) + + # Create AutoForge files with realistic database + (self.project_dir / ".autoforge").mkdir() + # Create a features.db that's larger than an empty one (simulate real data) + (self.project_dir / "features.db").write_bytes(b"x" * 120000) # 120KB - realistic size + (self.project_dir / "features.db-wal").write_bytes(b"wal") + (self.project_dir / "features.db-shm").write_bytes(b"shm") + + # Create backup + files = detach.get_autoforge_files(self.project_dir) + detach.create_backup(self.project_dir, "test-project", files) + + def tearDown(self): + """Clean up temporary directory.""" + shutil.rmtree(self.temp_dir) + + @patch('detach.get_project_path') + def test_cleanup_removes_recreated_small_db(self, mock_get_path): + """Should remove recreated empty database before restore.""" + mock_get_path.return_value = self.project_dir + + # Simulate API recreating empty features.db after detach + (self.project_dir / "features.db").write_bytes(b"x" * 4096) # 4KB - empty SQLite + + # Reattach should clean up the small file and restore the large one + success, message, files_restored, conflicts = detach.reattach_project("test-project") + + self.assertTrue(success) + # The restored file should be the large one from backup + restored_size = (self.project_dir / "features.db").stat().st_size + self.assertEqual(restored_size, 120000) + + @patch('detach.get_project_path') + def test_cleanup_removes_orphan_wal_files(self, mock_get_path): + """Should remove orphaned WAL/SHM files before restore.""" + mock_get_path.return_value = self.project_dir + + # Simulate orphaned WAL files created by API + (self.project_dir / "features.db-wal").write_bytes(b"orphan wal") + (self.project_dir / "features.db-shm").write_bytes(b"orphan shm") + (self.project_dir / "assistant.db-wal").write_bytes(b"orphan wal") + + # Reattach should clean up and restore + success, message, files_restored, conflicts = detach.reattach_project("test-project") + + self.assertTrue(success) + # WAL files should be from backup (or not exist if not in backup) + # In this case they were in original, so should be restored + self.assertTrue((self.project_dir / "features.db-wal").exists()) + + def test_cleanup_helper_function_directly(self): + """Test _cleanup_orphaned_db_files directly.""" + # Create manifest + manifest: detach.Manifest = { + "version": 1, + "detached_at": "2024-01-01T00:00:00Z", + "project_name": "test", + "autocoder_version": "1.0.0", + "files": [ + {"path": "features.db", "type": "file", "size": 120000, "checksum": "abc", "file_count": None} + ], + "total_size_bytes": 120000, + "file_count": 1, + } + + # Create small recreated file at root + (self.project_dir / "features.db").write_bytes(b"x" * 4096) + (self.project_dir / "features.db-wal").write_bytes(b"wal") + + # Run cleanup + cleaned = detach._cleanup_orphaned_db_files(self.project_dir, manifest) + + # Both should be cleaned + self.assertIn("features.db", cleaned) + self.assertIn("features.db-wal", cleaned) + self.assertFalse((self.project_dir / "features.db").exists()) + self.assertFalse((self.project_dir / "features.db-wal").exists()) + + def test_cleanup_preserves_large_user_db(self): + """Should NOT remove database if it's larger than backup (user modifications).""" + manifest: detach.Manifest = { + "version": 1, + "detached_at": "2024-01-01T00:00:00Z", + "project_name": "test", + "autocoder_version": "1.0.0", + "files": [ + {"path": "features.db", "type": "file", "size": 50000, "checksum": "abc", "file_count": None} + ], + "total_size_bytes": 50000, + "file_count": 1, + } + + # Create larger file at root (user added data) + (self.project_dir / "features.db").write_bytes(b"x" * 100000) + + # Run cleanup + cleaned = detach._cleanup_orphaned_db_files(self.project_dir, manifest) + + # features.db should NOT be cleaned (it's larger) + self.assertNotIn("features.db", cleaned) + self.assertTrue((self.project_dir / "features.db").exists()) + + +class TestServerDependencies(unittest.TestCase): + """Tests for server/dependencies.py validation functions.""" + + def setUp(self): + """Create temporary project directory.""" + self.temp_dir = tempfile.mkdtemp() + self.project_dir = Path(self.temp_dir) + + def tearDown(self): + """Clean up temporary directory.""" + shutil.rmtree(self.temp_dir) + + def test_validate_project_not_detached_raises_on_detached(self): + """Should raise HTTPException 409 for detached project.""" + # Import here to avoid issues if server not set up + try: + from fastapi import HTTPException + + from server.dependencies import validate_project_not_detached + except ImportError: + self.skipTest("Server dependencies not available") + + # Create detached state (backup with manifest) + backup_dir = self.project_dir / detach.BACKUP_DIR + backup_dir.mkdir() + (backup_dir / detach.MANIFEST_FILE).write_text("{}") + + with patch('server.dependencies._get_registry_module') as mock_registry: + mock_registry.return_value = lambda x: self.project_dir + + with self.assertRaises(HTTPException) as ctx: + validate_project_not_detached("test-project") + + self.assertEqual(ctx.exception.status_code, 409) + self.assertIn("detached", ctx.exception.detail) + + def test_validate_project_not_detached_passes_for_attached(self): + """Should return project_dir for attached project.""" + try: + from server.dependencies import validate_project_not_detached + except ImportError: + self.skipTest("Server dependencies not available") + + # Create attached state (files at root, no backup) + (self.project_dir / "features.db").touch() + + with patch('server.dependencies._get_registry_module') as mock_registry: + mock_registry.return_value = lambda x: self.project_dir + + result = validate_project_not_detached("test-project") + self.assertEqual(result, self.project_dir) + + def test_check_project_detached_for_background_returns_bool(self): + """Background check should return bool, not raise.""" + try: + from server.dependencies import check_project_detached_for_background + except ImportError: + self.skipTest("Server dependencies not available") + + # Attached state + (self.project_dir / "features.db").touch() + result = check_project_detached_for_background(self.project_dir) + self.assertFalse(result) + + # Detached state + backup_dir = self.project_dir / detach.BACKUP_DIR + backup_dir.mkdir() + (backup_dir / detach.MANIFEST_FILE).write_text("{}") + + result = check_project_detached_for_background(self.project_dir) + self.assertTrue(result) + + +if __name__ == "__main__": + unittest.main() diff --git a/ui/src/App.tsx b/ui/src/App.tsx index ef916f30..74de17ec 100644 --- a/ui/src/App.tsx +++ b/ui/src/App.tsx @@ -77,22 +77,26 @@ function App() { const queryClient = useQueryClient() const { data: projects, isLoading: projectsLoading } = useProjects() - const { data: features } = useFeatures(selectedProject) + + // Get selected project data FIRST (needed for isDetached check) + const selectedProjectData = projects?.find(p => p.name === selectedProject) + // While projects are loading, treat as detached to prevent premature API calls + const isDetached = projectsLoading ? true : (selectedProjectData?.is_detached ?? false) + const hasSpec = selectedProjectData?.has_spec ?? true + + // Now use features with detach flag + const { data: features } = useFeatures(selectedProject, isDetached) const { data: settings } = useSettings() useAgentStatus(selectedProject) // Keep polling for status updates const wsState = useProjectWebSocket(selectedProject) const { theme, setTheme, darkMode, toggleDarkMode, themes } = useTheme() - // Get has_spec from the selected project - const selectedProjectData = projects?.find(p => p.name === selectedProject) - const hasSpec = selectedProjectData?.has_spec ?? true - - // Fetch graph data when in graph view + // Fetch graph data when in graph view (disabled when detached) const { data: graphData } = useQuery({ queryKey: ['dependencyGraph', selectedProject], queryFn: () => getDependencyGraph(selectedProject!), - enabled: !!selectedProject && viewMode === 'graph', - refetchInterval: 5000, // Refresh every 5 seconds + enabled: !!selectedProject && viewMode === 'graph' && !isDetached, + refetchInterval: isDetached ? false : 5000, // Refresh every 5 seconds }) // Persist view mode to localStorage @@ -286,6 +290,7 @@ function App() { projectName={selectedProject} status={wsState.agentStatus} defaultConcurrency={selectedProjectData?.default_concurrency} + isDetached={isDetached} /> setShowScheduleModal(false)} + isDetached={isDetached} /> ) diff --git a/ui/src/components/ConfirmDialog.tsx b/ui/src/components/ConfirmDialog.tsx index 6d04893c..e080f697 100644 --- a/ui/src/components/ConfirmDialog.tsx +++ b/ui/src/components/ConfirmDialog.tsx @@ -23,6 +23,7 @@ interface ConfirmDialogProps { message: ReactNode confirmLabel?: string cancelLabel?: string + loadingLabel?: string // Custom label shown during loading (default: confirmLabel + "...") variant?: 'danger' | 'warning' isLoading?: boolean onConfirm: () => void @@ -35,6 +36,7 @@ export function ConfirmDialog({ message, confirmLabel = 'Confirm', cancelLabel = 'Cancel', + loadingLabel, variant = 'danger', isLoading = false, onConfirm, @@ -69,7 +71,7 @@ export function ConfirmDialog({ onClick={onConfirm} disabled={isLoading} > - {isLoading ? 'Deleting...' : confirmLabel} + {isLoading ? (loadingLabel || `${confirmLabel}...`) : confirmLabel} diff --git a/ui/src/components/ProjectSelector.tsx b/ui/src/components/ProjectSelector.tsx index 59738952..440f40af 100644 --- a/ui/src/components/ProjectSelector.tsx +++ b/ui/src/components/ProjectSelector.tsx @@ -1,9 +1,9 @@ import { useState } from 'react' -import { ChevronDown, Plus, FolderOpen, Loader2, Trash2 } from 'lucide-react' +import { ChevronDown, Plus, FolderOpen, Loader2, Trash2, Unlink, Link2 } from 'lucide-react' import type { ProjectSummary } from '../lib/types' import { NewProjectModal } from './NewProjectModal' import { ConfirmDialog } from './ConfirmDialog' -import { useDeleteProject } from '../hooks/useProjects' +import { useDeleteProject, useDetachProject, useReattachProject } from '../hooks/useProjects' import { Button } from '@/components/ui/button' import { Badge } from '@/components/ui/badge' import { @@ -32,8 +32,12 @@ export function ProjectSelector({ const [isOpen, setIsOpen] = useState(false) const [showNewProjectModal, setShowNewProjectModal] = useState(false) const [projectToDelete, setProjectToDelete] = useState(null) + const [projectToDetach, setProjectToDetach] = useState(null) + const [projectToReattach, setProjectToReattach] = useState(null) const deleteProject = useDeleteProject() + const detachProject = useDetachProject() + const reattachProject = useReattachProject() const handleProjectCreated = (projectName: string) => { onSelectProject(projectName) @@ -65,6 +69,44 @@ export function ProjectSelector({ setProjectToDelete(null) } + const handleDetachClick = (e: React.MouseEvent, projectName: string) => { + e.stopPropagation() + e.preventDefault() + setProjectToDetach(projectName) + } + + const handleReattachClick = (e: React.MouseEvent, projectName: string) => { + e.stopPropagation() + e.preventDefault() + setProjectToReattach(projectName) + } + + const handleConfirmDetach = async () => { + if (!projectToDetach) return + + try { + const result = await detachProject.mutateAsync(projectToDetach) + console.log(`Project detached: ${result.files_moved} files moved to backup`) + setProjectToDetach(null) + } catch (error) { + console.error('Failed to detach project:', error) + setProjectToDetach(null) + } + } + + const handleConfirmReattach = async () => { + if (!projectToReattach) return + + try { + const result = await reattachProject.mutateAsync(projectToReattach) + console.log(`Project reattached: ${result.files_restored} files restored`) + setProjectToReattach(null) + } catch (error) { + console.error('Failed to reattach project:', error) + setProjectToReattach(null) + } + } + const selectedProjectData = projects.find(p => p.name === selectedProject) return ( @@ -108,23 +150,51 @@ export function ProjectSelector({ onSelectProject(project.name) }} > - - - {project.name} - {project.stats.total > 0 && ( - + + + {project.name} + {project.is_detached && ( + + DETACHED + + )} + {project.stats.total > 0 && !project.is_detached && ( + {project.stats.passing}/{project.stats.total} )} - +
+ {project.is_detached ? ( + + ) : ( + + )} + +
))} @@ -170,6 +240,32 @@ export function ProjectSelector({ onConfirm={handleConfirmDelete} onCancel={handleCancelDelete} /> + + {/* Detach Confirmation Dialog */} + setProjectToDetach(null)} + /> + + {/* Reattach Confirmation Dialog */} + setProjectToReattach(null)} + /> ) } diff --git a/ui/src/components/ScheduleModal.tsx b/ui/src/components/ScheduleModal.tsx index 58851242..f301b220 100644 --- a/ui/src/components/ScheduleModal.tsx +++ b/ui/src/components/ScheduleModal.tsx @@ -41,14 +41,15 @@ interface ScheduleModalProps { projectName: string isOpen: boolean onClose: () => void + isDetached?: boolean } -export function ScheduleModal({ projectName, isOpen, onClose }: ScheduleModalProps) { +export function ScheduleModal({ projectName, isOpen, onClose, isDetached = false }: ScheduleModalProps) { const modalRef = useRef(null) const firstFocusableRef = useRef(null) // Queries and mutations - const { data: schedulesData, isLoading } = useSchedules(projectName) + const { data: schedulesData, isLoading } = useSchedules(projectName, isDetached) const createSchedule = useCreateSchedule(projectName) const deleteSchedule = useDeleteSchedule(projectName) const toggleSchedule = useToggleSchedule(projectName) diff --git a/ui/src/hooks/useProjects.ts b/ui/src/hooks/useProjects.ts index e4154544..3c980274 100644 --- a/ui/src/hooks/useProjects.ts +++ b/ui/src/hooks/useProjects.ts @@ -4,7 +4,7 @@ import { useQuery, useMutation, useQueryClient } from '@tanstack/react-query' import * as api from '../lib/api' -import type { FeatureCreate, FeatureUpdate, ModelsResponse, ProjectSettingsUpdate, ProvidersResponse, Settings, SettingsUpdate } from '../lib/types' +import type { FeatureCreate, FeatureUpdate, ModelsResponse, ProjectSettingsUpdate, ProjectSummary, ProvidersResponse, Settings, SettingsUpdate } from '../lib/types' // ============================================================================ // Projects @@ -75,16 +75,67 @@ export function useUpdateProjectSettings(projectName: string) { }) } +export function useDetachProject() { + const queryClient = useQueryClient() + + return useMutation({ + mutationFn: (name: string) => api.detachProject(name), + onSuccess: (_data, name) => { + // Optimistically set is_detached=true in projects cache to prevent race condition + // This ensures isDetached becomes true immediately before the refetch completes + queryClient.setQueryData(['projects'], (oldData: ProjectSummary[] | undefined) => { + if (!oldData) return oldData + return oldData.map(p => p.name === name ? { ...p, is_detached: true } : p) + }) + + // Clear features data immediately (prevents stale cache) + queryClient.setQueryData(['features', name], null) + queryClient.setQueryData(['dependencyGraph', name], null) + queryClient.setQueryData(['schedules', name], null) + queryClient.setQueryData(['nextRun', name], null) + + // Invalidate to refresh + queryClient.invalidateQueries({ queryKey: ['projects'] }) + queryClient.invalidateQueries({ queryKey: ['project', name] }) + queryClient.invalidateQueries({ queryKey: ['features', name] }) + queryClient.invalidateQueries({ queryKey: ['schedules', name] }) + queryClient.invalidateQueries({ queryKey: ['nextRun', name] }) + }, + }) +} + +export function useReattachProject() { + const queryClient = useQueryClient() + + return useMutation({ + mutationFn: (name: string) => api.reattachProject(name), + onSuccess: (_data, name) => { + // Optimistically set is_detached=false in projects cache + queryClient.setQueryData(['projects'], (oldData: ProjectSummary[] | undefined) => { + if (!oldData) return oldData + return oldData.map(p => p.name === name ? { ...p, is_detached: false } : p) + }) + + // Invalidate to refresh all data + queryClient.invalidateQueries({ queryKey: ['projects'] }) + queryClient.invalidateQueries({ queryKey: ['project', name] }) + queryClient.invalidateQueries({ queryKey: ['features', name] }) + queryClient.invalidateQueries({ queryKey: ['schedules', name] }) + queryClient.invalidateQueries({ queryKey: ['nextRun', name] }) + }, + }) +} + // ============================================================================ // Features // ============================================================================ -export function useFeatures(projectName: string | null) { +export function useFeatures(projectName: string | null, isDetached: boolean = false) { return useQuery({ queryKey: ['features', projectName], queryFn: () => api.listFeatures(projectName!), - enabled: !!projectName, - refetchInterval: 5000, // Refetch every 5 seconds for real-time updates + enabled: !!projectName && !isDetached, + refetchInterval: isDetached ? false : 5000, // Refetch every 5 seconds for real-time updates }) } diff --git a/ui/src/hooks/useSchedules.ts b/ui/src/hooks/useSchedules.ts index 45411b0e..3622584c 100644 --- a/ui/src/hooks/useSchedules.ts +++ b/ui/src/hooks/useSchedules.ts @@ -13,11 +13,11 @@ import type { ScheduleCreate, ScheduleUpdate } from '../lib/types' /** * Hook to fetch all schedules for a project. */ -export function useSchedules(projectName: string | null) { +export function useSchedules(projectName: string | null, isDetached: boolean = false) { return useQuery({ queryKey: ['schedules', projectName], queryFn: () => api.listSchedules(projectName!), - enabled: !!projectName, + enabled: !!projectName && !isDetached, }) } @@ -102,11 +102,11 @@ export function useToggleSchedule(projectName: string) { * Hook to fetch the next scheduled run for a project. * Polls every 30 seconds to keep status up-to-date. */ -export function useNextScheduledRun(projectName: string | null) { +export function useNextScheduledRun(projectName: string | null, isDetached: boolean = false) { return useQuery({ queryKey: ['nextRun', projectName], queryFn: () => api.getNextScheduledRun(projectName!), - enabled: !!projectName, - refetchInterval: 30000, // Refresh every 30 seconds + enabled: !!projectName && !isDetached, + refetchInterval: isDetached ? false : 30000, // Refresh every 30 seconds }) } diff --git a/ui/src/lib/api.ts b/ui/src/lib/api.ts index 10b577b4..fc096afc 100644 --- a/ui/src/lib/api.ts +++ b/ui/src/lib/api.ts @@ -33,6 +33,9 @@ import type { ScheduleUpdate, ScheduleListResponse, NextRunResponse, + DetachResponse, + ReattachResponse, + DetachStatusResponse, } from './types' const API_BASE = '/api' @@ -129,6 +132,26 @@ export async function resetProject( }) } +// ============================================================================ +// Detach/Reattach API +// ============================================================================ + +export async function detachProject(name: string): Promise { + return fetchJSON(`/projects/${encodeURIComponent(name)}/detach`, { + method: 'POST', + }) +} + +export async function reattachProject(name: string): Promise { + return fetchJSON(`/projects/${encodeURIComponent(name)}/reattach`, { + method: 'POST', + }) +} + +export async function getDetachStatus(name: string): Promise { + return fetchJSON(`/projects/${encodeURIComponent(name)}/detach-status`) +} + // ============================================================================ // Features API // ============================================================================ diff --git a/ui/src/lib/types.ts b/ui/src/lib/types.ts index ba8eab94..cd165263 100644 --- a/ui/src/lib/types.ts +++ b/ui/src/lib/types.ts @@ -16,12 +16,39 @@ export interface ProjectSummary { has_spec: boolean stats: ProjectStats default_concurrency: number + is_detached: boolean // True if Autocoder files moved to backup } export interface ProjectDetail extends ProjectSummary { prompts_dir: string } +// Detach/Reattach types +export interface DetachResponse { + success: boolean + files_moved: number + backup_size: number + backup_path: string + message: string + user_files_restored: number // Always returned by backend +} + +export interface ReattachResponse { + success: boolean + files_restored: number + message: string + conflicts: string[] // Always returned by backend (empty array if none) + conflicts_backup_path: string | null // null if no conflicts +} + +export interface DetachStatusResponse { + is_detached: boolean + backup_exists: boolean + backup_size?: number + detached_at?: string + file_count?: number +} + // Filesystem types export interface DriveInfo { letter: string