From c098b6c93d0fe238524d181a82f704539f18e578 Mon Sep 17 00:00:00 2001 From: Mahmoud Mabrouk Date: Thu, 22 Jan 2026 13:15:28 +0100 Subject: [PATCH] docs: add webhook implementation planning documents Research and design documents for implementing webhooks to enable real-time notifications when configs are deployed. Covers: - Industry patterns (Stripe, GitHub) - Architecture decision (Taskiq + Redis Streams) - API design and database schema - Implementation plan (~3-4 weeks) --- docs/design/webhooks-config-sync/README.md | 107 ++++ .../design/webhooks-config-sync/api-design.md | 569 ++++++++++++++++++ .../webhooks-config-sync/architecture.md | 316 ++++++++++ docs/design/webhooks-config-sync/context.md | 327 ++++++++++ docs/design/webhooks-config-sync/plan.md | 317 ++++++++++ docs/design/webhooks-config-sync/research.md | 157 +++++ docs/design/webhooks-config-sync/schema.md | 348 +++++++++++ 7 files changed, 2141 insertions(+) create mode 100644 docs/design/webhooks-config-sync/README.md create mode 100644 docs/design/webhooks-config-sync/api-design.md create mode 100644 docs/design/webhooks-config-sync/architecture.md create mode 100644 docs/design/webhooks-config-sync/context.md create mode 100644 docs/design/webhooks-config-sync/plan.md create mode 100644 docs/design/webhooks-config-sync/research.md create mode 100644 docs/design/webhooks-config-sync/schema.md diff --git a/docs/design/webhooks-config-sync/README.md b/docs/design/webhooks-config-sync/README.md new file mode 100644 index 0000000000..889e293cb2 --- /dev/null +++ b/docs/design/webhooks-config-sync/README.md @@ -0,0 +1,107 @@ +# Webhooks for Config Sync - Planning Workspace + +## Overview +Research and planning for implementing webhooks in Agenta to enable real-time notifications when configuration changes occur (e.g., config deployed to production). + +## Problem Statement +Users want automatic sync to GitHub when a PM saves/deploys a new config version. Currently, they must either: +- Fetch config via API at deploy time +- Run scheduled GitHub Actions to sync +- Use runtime fetch with caching + +**Gap**: No real-time notification when config changes. + +--- + +## Documents + +| File | Description | +|------|-------------| +| [research.md](./research.md) | Industry research: Stripe, GitHub, and other webhook patterns | +| [architecture.md](./architecture.md) | Architecture options analysis leveraging existing infrastructure | +| [api-design.md](./api-design.md) | Proposed API endpoints and payload formats | +| [schema.md](./schema.md) | Database schema proposal for webhooks | +| [context.md](./context.md) | Codebase context, existing infrastructure, integration points | +| [plan.md](./plan.md) | Implementation plan with phases and timeline | + +--- + +## Key Decisions + +### 1. Architecture: Taskiq + Redis Streams +**Why**: Agenta already uses this pattern for evaluations (`EvaluationsWorker`). Zero new infrastructure needed. + +### 2. Existing Infrastructure Leveraged +| Component | Purpose | +|-----------|---------| +| `redis-durable:6381` | Taskiq broker (Redis Streams) | +| PostgreSQL | Webhooks & deliveries tables | +| Cron container | Cleanup & stuck retry jobs | +| Taskiq | Task queue with built-in retries | + +### 3. MVP Scope +- Single event: `config.deployed` +- Project-level webhooks +- HMAC-SHA256 signatures +- Automatic retries (6 attempts) +- Delivery history (30 days) + +### 4. Security: HMAC-SHA256 (Industry Standard) +``` +X-Agenta-Signature: t=1705318200,v1=5d2c3b1a... +``` +Same pattern as Stripe and GitHub. + +--- + +## Quick Reference + +### Webhook Payload Format +```json +{ + "id": "event-uuid", + "timestamp": "2024-01-15T10:30:00.000Z", + "type": "config.deployed", + "api_version": "2024-01", + "project_id": "project-uuid", + "data": { + "application": { "id": "...", "name": "my-app" }, + "variant": { "id": "...", "slug": "gpt4", "version": 5 }, + "environment": { "id": "...", "name": "production" }, + "config": { "params": { ... } }, + "deployed_by": { "id": "...", "email": "..." } + } +} +``` + +### New Files Structure +``` +api/ +├── entrypoints/ +│ └── worker_webhooks.py # Worker entrypoint +├── oss/src/ +│ ├── routers/webhooks_router.py # CRUD API +│ ├── services/webhook_service.py # Event emission +│ ├── tasks/taskiq/webhooks/worker.py # Taskiq tasks +│ ├── models/api/webhook_models.py # Pydantic models +│ └── crons/ +│ ├── webhooks.txt # Crontab +│ ├── webhooks-cleanup.sh +│ └── webhooks-retry.sh +└── hosting/docker-compose/ # Add worker service +``` + +### Timeline +**~3-4 weeks** (13-17 dev days) + +--- + +## Status + +- [x] Research complete (Stripe, GitHub, and industry patterns) +- [x] Architecture decided (Taskiq + Redis Streams) +- [x] API design complete +- [x] Database schema designed +- [x] Implementation plan created +- [ ] RFC review +- [ ] Implementation started diff --git a/docs/design/webhooks-config-sync/api-design.md b/docs/design/webhooks-config-sync/api-design.md new file mode 100644 index 0000000000..d90e687bad --- /dev/null +++ b/docs/design/webhooks-config-sync/api-design.md @@ -0,0 +1,569 @@ +# API Design + +## Overview + +RESTful API for managing webhook subscriptions, plus internal service for event dispatch. + +--- + +## Webhook Management Endpoints + +### Base Path: `/api/projects/{project_id}/webhooks` + +### 1. List Webhooks + +```http +GET /api/projects/{project_id}/webhooks +``` + +**Response:** +```json +{ + "webhooks": [ + { + "id": "01234567-89ab-cdef-0123-456789abcdef", + "name": "GitHub Sync", + "url": "https://api.github.com/repos/org/repo/dispatches", + "event_types": ["config.deployed"], + "application_id": null, + "environment_name": "production", + "is_active": true, + "description": "Sync production configs to GitHub", + "created_at": "2024-01-15T10:30:00Z", + "updated_at": "2024-01-15T10:30:00Z" + } + ] +} +``` + +### 2. Create Webhook + +```http +POST /api/projects/{project_id}/webhooks +``` + +**Request:** +```json +{ + "name": "GitHub Sync", + "url": "https://api.github.com/repos/org/repo/dispatches", + "event_types": ["config.deployed"], + "application_id": null, + "environment_name": "production", + "description": "Sync production configs to GitHub", + "headers": { + "Authorization": "Bearer ghp_xxxxxxxxxxxx" + } +} +``` + +**Response:** +```json +{ + "id": "01234567-89ab-cdef-0123-456789abcdef", + "name": "GitHub Sync", + "url": "https://api.github.com/repos/org/repo/dispatches", + "secret": "whsec_xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx", + "event_types": ["config.deployed"], + "application_id": null, + "environment_name": "production", + "is_active": true, + "description": "Sync production configs to GitHub", + "headers": { + "Authorization": "Bearer ghp_xxxxxxxxxxxx" + }, + "created_at": "2024-01-15T10:30:00Z", + "updated_at": "2024-01-15T10:30:00Z" +} +``` + +**Note:** `secret` is only returned on creation. Store it securely. + +### 3. Get Webhook + +```http +GET /api/projects/{project_id}/webhooks/{webhook_id} +``` + +**Response:** +```json +{ + "id": "01234567-89ab-cdef-0123-456789abcdef", + "name": "GitHub Sync", + "url": "https://api.github.com/repos/org/repo/dispatches", + "event_types": ["config.deployed"], + "application_id": null, + "environment_name": "production", + "is_active": true, + "description": "Sync production configs to GitHub", + "created_at": "2024-01-15T10:30:00Z", + "updated_at": "2024-01-15T10:30:00Z" +} +``` + +**Note:** `secret` is NOT returned on get (security). + +### 4. Update Webhook + +```http +PATCH /api/projects/{project_id}/webhooks/{webhook_id} +``` + +**Request:** +```json +{ + "name": "GitHub Sync - Updated", + "is_active": false +} +``` + +**Response:** Updated webhook object + +### 5. Delete Webhook + +```http +DELETE /api/projects/{project_id}/webhooks/{webhook_id} +``` + +**Response:** `204 No Content` + +### 6. Regenerate Secret + +```http +POST /api/projects/{project_id}/webhooks/{webhook_id}/regenerate-secret +``` + +**Response:** +```json +{ + "secret": "whsec_yyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyy" +} +``` + +### 7. Test Webhook + +Send a test event to verify the endpoint works. + +```http +POST /api/projects/{project_id}/webhooks/{webhook_id}/test +``` + +**Response:** +```json +{ + "success": true, + "response_status": 200, + "response_time_ms": 145, + "message": "Test event delivered successfully" +} +``` + +Or on failure: +```json +{ + "success": false, + "response_status": 500, + "response_time_ms": 2000, + "error": "Connection timeout" +} +``` + +--- + +## Delivery History Endpoints + +### 8. List Deliveries + +```http +GET /api/projects/{project_id}/webhooks/{webhook_id}/deliveries?status=failed&limit=50 +``` + +**Query Parameters:** +- `status`: Filter by status (`pending`, `delivered`, `failed`) +- `limit`: Max items (default 50, max 100) +- `cursor`: Pagination cursor + +**Response:** +```json +{ + "deliveries": [ + { + "id": "fedcba98-7654-3210-fedc-ba9876543210", + "event_id": "11111111-2222-3333-4444-555555555555", + "event_type": "config.deployed", + "status": "delivered", + "attempts": 1, + "created_at": "2024-01-15T10:30:00Z", + "delivered_at": "2024-01-15T10:30:01Z", + "last_response_status": 200 + } + ], + "next_cursor": "abc123" +} +``` + +### 9. Get Delivery Details + +```http +GET /api/projects/{project_id}/webhooks/{webhook_id}/deliveries/{delivery_id} +``` + +**Response:** +```json +{ + "id": "fedcba98-7654-3210-fedc-ba9876543210", + "event_id": "11111111-2222-3333-4444-555555555555", + "event_type": "config.deployed", + "status": "failed", + "attempts": 3, + "max_attempts": 6, + "payload": { + "id": "...", + "type": "config.deployed", + "data": { ... } + }, + "created_at": "2024-01-15T10:30:00Z", + "scheduled_at": "2024-01-15T11:30:00Z", + "last_attempt_at": "2024-01-15T10:45:00Z", + "last_response_status": 500, + "last_response_body": "Internal Server Error", + "last_error": "HTTP 500" +} +``` + +### 10. Retry Delivery + +```http +POST /api/projects/{project_id}/webhooks/{webhook_id}/deliveries/{delivery_id}/retry +``` + +**Response:** +```json +{ + "id": "fedcba98-7654-3210-fedc-ba9876543210", + "status": "pending", + "scheduled_at": "2024-01-15T12:00:00Z" +} +``` + +--- + +## Webhook Payload Format + +### Standard Envelope + +All webhook payloads follow this structure: + +```json +{ + "id": "11111111-2222-3333-4444-555555555555", + "timestamp": "2024-01-15T10:30:00.000Z", + "type": "config.deployed", + "api_version": "2024-01", + "project_id": "aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee", + "data": { + // Event-specific payload + } +} +``` + +### Event: `config.deployed` + +Fired when a config is deployed to an environment. + +```json +{ + "id": "11111111-2222-3333-4444-555555555555", + "timestamp": "2024-01-15T10:30:00.000Z", + "type": "config.deployed", + "api_version": "2024-01", + "project_id": "aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee", + "data": { + "application": { + "id": "app-uuid", + "name": "my-chatbot" + }, + "variant": { + "id": "variant-uuid", + "slug": "gpt4-optimized", + "version": 5 + }, + "environment": { + "id": "env-uuid", + "name": "production" + }, + "config": { + "params": { + "model": "gpt-4", + "temperature": 0.7, + "system_prompt": "You are a helpful assistant..." + } + }, + "deployed_by": { + "id": "user-uuid", + "email": "pm@company.com" + }, + "commit_message": "Tuned temperature for better responses" + } +} +``` + +### Event: `config.committed` (Future) + +Fired when a new config version is saved (but not necessarily deployed). + +```json +{ + "id": "22222222-3333-4444-5555-666666666666", + "timestamp": "2024-01-15T10:30:00.000Z", + "type": "config.committed", + "api_version": "2024-01", + "project_id": "aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee", + "data": { + "application": { + "id": "app-uuid", + "name": "my-chatbot" + }, + "variant": { + "id": "variant-uuid", + "slug": "gpt4-optimized", + "version": 5, + "previous_version": 4 + }, + "config": { + "params": { ... } + }, + "committed_by": { + "id": "user-uuid", + "email": "pm@company.com" + }, + "commit_message": "Tuned temperature" + } +} +``` + +--- + +## HTTP Headers + +### Request Headers (Outgoing Webhooks) + +```http +POST /your-webhook-endpoint HTTP/1.1 +Content-Type: application/json +User-Agent: Agenta-Webhooks/1.0 +X-Agenta-Delivery: fedcba98-7654-3210-fedc-ba9876543210 +X-Agenta-Event: config.deployed +X-Agenta-Signature: t=1705318200,v1=5d2c3b1a... +``` + +| Header | Description | +|--------|-------------| +| `Content-Type` | Always `application/json` | +| `User-Agent` | `Agenta-Webhooks/1.0` | +| `X-Agenta-Delivery` | Unique delivery ID (for idempotency) | +| `X-Agenta-Event` | Event type | +| `X-Agenta-Signature` | HMAC signature for verification | + +### Signature Format + +``` +X-Agenta-Signature: t=,v1= +``` + +**Verification:** +```python +import hmac +import hashlib + +def verify_signature(payload: bytes, header: str, secret: str) -> bool: + parts = dict(p.split("=") for p in header.split(",")) + timestamp = parts["t"] + signature = parts["v1"] + + message = f"{timestamp}.{payload.decode()}" + expected = hmac.new( + secret.encode(), + message.encode(), + hashlib.sha256 + ).hexdigest() + + return hmac.compare_digest(signature, expected) +``` + +--- + +## Pydantic Models + +```python +# api/oss/src/models/api/webhook_models.py + +from pydantic import BaseModel, HttpUrl, Field +from typing import Optional, List, Dict, Any +from uuid import UUID +from datetime import datetime +from enum import Enum + + +class WebhookEventType(str, Enum): + CONFIG_DEPLOYED = "config.deployed" + CONFIG_COMMITTED = "config.committed" + ALL = "*" + + +class WebhookCreateRequest(BaseModel): + name: str = Field(..., min_length=1, max_length=255) + url: HttpUrl + event_types: List[str] = Field(default=["*"]) + application_id: Optional[UUID] = None + environment_name: Optional[str] = None + description: Optional[str] = None + headers: Optional[Dict[str, str]] = None + + +class WebhookUpdateRequest(BaseModel): + name: Optional[str] = Field(None, min_length=1, max_length=255) + url: Optional[HttpUrl] = None + event_types: Optional[List[str]] = None + application_id: Optional[UUID] = None + environment_name: Optional[str] = None + description: Optional[str] = None + headers: Optional[Dict[str, str]] = None + is_active: Optional[bool] = None + + +class WebhookResponse(BaseModel): + id: UUID + name: str + url: str + event_types: List[str] + application_id: Optional[UUID] + environment_name: Optional[str] + is_active: bool + description: Optional[str] + created_at: datetime + updated_at: datetime + + +class WebhookWithSecretResponse(WebhookResponse): + secret: str + + +class WebhookDeliveryStatus(str, Enum): + PENDING = "pending" + DELIVERED = "delivered" + FAILED = "failed" + + +class WebhookDeliveryResponse(BaseModel): + id: UUID + event_id: UUID + event_type: str + status: WebhookDeliveryStatus + attempts: int + created_at: datetime + delivered_at: Optional[datetime] + last_response_status: Optional[int] + + +class WebhookDeliveryDetailResponse(WebhookDeliveryResponse): + max_attempts: int + payload: Dict[str, Any] + scheduled_at: datetime + last_attempt_at: Optional[datetime] + last_response_body: Optional[str] + last_error: Optional[str] + + +class WebhookTestResponse(BaseModel): + success: bool + response_status: Optional[int] + response_time_ms: Optional[int] + message: Optional[str] + error: Optional[str] +``` + +--- + +## Router Implementation Sketch + +```python +# api/oss/src/routers/webhooks_router.py + +from fastapi import APIRouter, Request, HTTPException +from typing import List + +router = APIRouter(prefix="/projects/{project_id}/webhooks", tags=["webhooks"]) + + +@router.get("/", response_model=List[WebhookResponse]) +async def list_webhooks(project_id: str, request: Request): + # Permission check + # Query webhooks for project + pass + + +@router.post("/", response_model=WebhookWithSecretResponse) +async def create_webhook( + project_id: str, + payload: WebhookCreateRequest, + request: Request +): + # Permission check + # Validate URL (HTTPS required in prod) + # Generate secret + # Create webhook + pass + + +@router.get("/{webhook_id}", response_model=WebhookResponse) +async def get_webhook(project_id: str, webhook_id: str, request: Request): + pass + + +@router.patch("/{webhook_id}", response_model=WebhookResponse) +async def update_webhook( + project_id: str, + webhook_id: str, + payload: WebhookUpdateRequest, + request: Request +): + pass + + +@router.delete("/{webhook_id}") +async def delete_webhook(project_id: str, webhook_id: str, request: Request): + pass + + +@router.post("/{webhook_id}/regenerate-secret") +async def regenerate_secret(project_id: str, webhook_id: str, request: Request): + pass + + +@router.post("/{webhook_id}/test", response_model=WebhookTestResponse) +async def test_webhook(project_id: str, webhook_id: str, request: Request): + pass + + +@router.get("/{webhook_id}/deliveries", response_model=List[WebhookDeliveryResponse]) +async def list_deliveries( + project_id: str, + webhook_id: str, + status: Optional[str] = None, + limit: int = 50, + cursor: Optional[str] = None, + request: Request = None +): + pass + + +@router.post("/{webhook_id}/deliveries/{delivery_id}/retry") +async def retry_delivery( + project_id: str, + webhook_id: str, + delivery_id: str, + request: Request +): + pass +``` diff --git a/docs/design/webhooks-config-sync/architecture.md b/docs/design/webhooks-config-sync/architecture.md new file mode 100644 index 0000000000..91a942c8b1 --- /dev/null +++ b/docs/design/webhooks-config-sync/architecture.md @@ -0,0 +1,316 @@ +# Architecture Options Analysis + +## Overview + +Three architecture options for webhook delivery, evaluated against **existing Agenta infrastructure**. + +### Existing Infrastructure (Key Context) + +Agenta already has: + +1. **Two Redis Instances**: + - `redis-volatile` (port 6379) - Caching, non-persistent + - `redis-durable` (port 6381) - Persistent data (streams, task queues) + +2. **Worker Patterns**: + - **Redis Streams + Custom Worker** (`TracingWorker`) - High-throughput tracing ingestion + - **Taskiq + Redis Streams** (`EvaluationsWorker`) - Evaluation task queue with retries + +3. **Existing Streams**: + - `streams:tracing` - Tracing spans + - `queues:evaluations` - Evaluation tasks (via Taskiq) + +--- + +## Option A: Synchronous Delivery + +### How It Works +``` +User saves config → API handler → Deliver webhook → Return response +``` + +### Evaluation + +| Criteria | Rating | Notes | +|----------|--------|-------| +| Complexity | Low | Simple to implement | +| Reliability | Poor | Webhook failure = API failure | +| Latency Impact | High | User waits for webhook delivery | +| Infrastructure | None | Uses existing API servers | + +### Verdict: **Not Recommended** +- Blocks user request +- No retry capability +- Doesn't scale + +--- + +## Option B: Taskiq + Redis Streams (Recommended) + +### How It Works +Leverage the existing Taskiq pattern used for evaluations: + +``` +User saves config → API handler → Queue task (Taskiq) → Return response + ↓ + Worker → Deliver webhook (with retries) +``` + +### Why Taskiq? + +1. **Already integrated** - Used for `EvaluationsWorker` +2. **Redis Streams backend** - Uses `redis-durable` for persistence +3. **Built-in retry semantics** - Taskiq handles retries automatically +4. **Proven pattern** - Same approach as evaluation tasks + +### Implementation + +```python +# api/entrypoints/worker_webhooks.py + +from taskiq_redis import RedisStreamBroker +from oss.src.tasks.taskiq.webhooks.worker import WebhooksWorker + +# Create broker with durable Redis Streams +broker = RedisStreamBroker( + url=env.redis.uri_durable, + queue_name="queues:webhooks", + consumer_group_name="worker-webhooks", + idle_timeout=3600000, # 1 hour - allow for retries + socket_timeout=30, + socket_connect_timeout=30, +) + +webhooks_worker = WebhooksWorker(broker=broker) +``` + +### Task Definition + +```python +# api/oss/src/tasks/taskiq/webhooks/worker.py + +class WebhooksWorker: + def __init__(self, broker: AsyncBroker): + self.broker = broker + self._register_tasks() + + def _register_tasks(self): + @self.broker.task( + task_name="webhooks.deliver", + retry_on_error=True, + max_retries=6, + ) + async def deliver_webhook( + *, + delivery_id: UUID, + webhook_id: UUID, + event_type: str, + payload: dict, + attempt: int = 1, + ) -> dict: + """Deliver a single webhook with signature.""" + # Fetch webhook config + webhook = await db_manager.get_webhook(str(webhook_id)) + if not webhook or not webhook.is_active: + return {"status": "skipped", "reason": "webhook inactive"} + + # Sign payload + timestamp = int(time.time()) + signature = sign_payload(payload, webhook.secret, timestamp) + + # Deliver + try: + async with httpx.AsyncClient(timeout=10.0) as client: + response = await client.post( + webhook.url, + json=payload, + headers={ + "Content-Type": "application/json", + "User-Agent": "Agenta-Webhooks/1.0", + "X-Agenta-Delivery": str(delivery_id), + "X-Agenta-Event": event_type, + "X-Agenta-Signature": f"t={timestamp},v1={signature}", + }, + ) + response.raise_for_status() + + # Mark as delivered + await db_manager.mark_delivery_success( + str(delivery_id), + response_status=response.status_code, + ) + return {"status": "delivered", "code": response.status_code} + + except Exception as e: + # Mark attempt failure + await db_manager.mark_delivery_attempt_failed( + str(delivery_id), + attempt=attempt, + error=str(e), + ) + # Re-raise for Taskiq retry + raise + + self.deliver_webhook = deliver_webhook +``` + +### Evaluation + +| Criteria | Rating | Notes | +|----------|--------|-------| +| Complexity | Low | Follows existing pattern | +| Reliability | Excellent | Persistent, automatic retries | +| Latency Impact | None | Non-blocking | +| Infrastructure | None | Uses existing Redis + Taskiq | + +### Verdict: **Recommended** + +--- + +## Option C: Custom Redis Streams Worker + +### How It Works +Similar to `TracingWorker`, create a custom worker consuming from a Redis Stream: + +``` +User saves config → API handler → XADD to stream → Return response + ↓ + WebhookWorker (XREADGROUP) → Deliver +``` + +### When to Use +- High-throughput webhook delivery (thousands/sec) +- Need custom batching logic +- More control over consumer groups + +### Evaluation + +| Criteria | Rating | Notes | +|----------|--------|-------| +| Complexity | Medium | Need custom worker like TracingWorker | +| Reliability | Excellent | Consumer groups, persistent | +| Latency Impact | None | Non-blocking | +| Infrastructure | None | Uses existing Redis | + +### Verdict: **Defer to v2 if Taskiq doesn't scale** + +--- + +## Option D: PostgreSQL Queue (Previous Proposal) + +### Why NOT Recommended Now + +The previous proposal suggested using PostgreSQL as the queue. Given the existing infrastructure: + +| PostgreSQL Queue | Redis Streams (Taskiq) | +|------------------|------------------------| +| New pattern to maintain | Existing pattern | +| Polling-based | Push-based (XREADGROUP) | +| More DB load | Dedicated Redis | +| Custom retry logic | Taskiq handles retries | + +**Verdict: Use Redis Streams via Taskiq instead** + +--- + +## Recommendation: Option B (Taskiq + Redis Streams) + +### Architecture Diagram + +``` +┌─────────────────────────────────────────────────────────────────┐ +│ API Server │ +├─────────────────────────────────────────────────────────────────┤ +│ POST /configs/deploy │ +│ 1. Save config to PostgreSQL │ +│ 2. Find matching webhooks │ +│ 3. For each webhook: queue task via Taskiq │ +│ await webhooks_worker.deliver_webhook.kiq(...) │ +│ 4. Return response to user │ +└─────────────────────────────────────────────────────────────────┘ + │ + ▼ +┌─────────────────────────────────────────────────────────────────┐ +│ Redis (redis-durable:6381) │ +├─────────────────────────────────────────────────────────────────┤ +│ Stream: queues:webhooks │ +│ - Consumer group: worker-webhooks │ +│ - Messages: {delivery_id, webhook_id, payload, ...} │ +└─────────────────────────────────────────────────────────────────┘ + │ + ▼ +┌─────────────────────────────────────────────────────────────────┐ +│ Webhook Worker (Taskiq) │ +│ (separate process, like worker_evaluations) │ +├─────────────────────────────────────────────────────────────────┤ +│ 1. Consume task from queue │ +│ 2. Fetch webhook config (URL, secret) │ +│ 3. Sign payload (HMAC-SHA256) │ +│ 4. POST to endpoint with timeout │ +│ 5. On success: mark delivered in PostgreSQL │ +│ 6. On failure: Taskiq retries automatically │ +└─────────────────────────────────────────────────────────────────┘ +``` + +### File Structure + +``` +api/ +├── entrypoints/ +│ ├── worker_evaluations.py # Existing +│ ├── worker_tracing.py # Existing +│ └── worker_webhooks.py # NEW: Webhook worker entrypoint +├── oss/src/ +│ ├── tasks/ +│ │ └── taskiq/ +│ │ ├── evaluations/ # Existing +│ │ └── webhooks/ # NEW +│ │ ├── __init__.py +│ │ └── worker.py # WebhooksWorker class +│ └── services/ +│ └── webhook_service.py # NEW: emit_webhook_event() +``` + +### Retry Strategy (Taskiq Built-in) + +Taskiq supports `retry_on_error=True` and `max_retries`. For custom backoff: + +```python +@self.broker.task( + task_name="webhooks.deliver", + retry_on_error=True, + max_retries=6, +) +async def deliver_webhook(...): + # Calculate delay based on attempt + if attempt > 1: + delays = [60, 300, 900, 3600, 14400] # 1m, 5m, 15m, 1h, 4h + delay = delays[min(attempt - 2, len(delays) - 1)] + await asyncio.sleep(delay) + + # ... delivery logic +``` + +### Docker Compose Addition + +```yaml +# hosting/docker-compose/oss/docker-compose.dev.yml + +worker-webhooks: + <<: *worker-common + container_name: agenta-worker-webhooks + command: ["python", "-m", "entrypoints.worker_webhooks"] + depends_on: + redis-durable: + condition: service_healthy + postgres: + condition: service_healthy +``` + +### Benefits of This Approach + +1. **Zero new infrastructure** - Uses existing Redis and Taskiq +2. **Proven pattern** - Same as `worker_evaluations` +3. **Built-in features** - Retries, dead-letter, monitoring +4. **Scalable** - Can run multiple worker instances +5. **Observable** - Taskiq provides task status tracking diff --git a/docs/design/webhooks-config-sync/context.md b/docs/design/webhooks-config-sync/context.md new file mode 100644 index 0000000000..f6bc10fc08 --- /dev/null +++ b/docs/design/webhooks-config-sync/context.md @@ -0,0 +1,327 @@ +# Codebase Context & Integration Points + +## Overview + +This document maps the existing Agenta infrastructure and where webhook functionality should integrate. + +--- + +## Existing Infrastructure + +### 1. Redis (Two Instances) + +| Instance | Port | Purpose | URI Env Var | +|----------|------|---------|-------------| +| `redis-volatile` | 6379 | Caching, non-persistent | `REDIS_URI_VOLATILE` | +| `redis-durable` | 6381 | Streams, task queues, persistent | `REDIS_URI_DURABLE` | + +**Location**: `api/oss/src/utils/env.py` (lines 472-498) + +```python +class RedisConfig(BaseModel): + uri_volatile: str | None = os.getenv("REDIS_URI_VOLATILE") or "redis://redis-volatile:6379/0" + uri_durable: str | None = os.getenv("REDIS_URI_DURABLE") or "redis://redis-durable:6381/0" +``` + +### 2. Worker Patterns + +#### A. Taskiq + Redis Streams (Evaluations) + +**Location**: `api/entrypoints/worker_evaluations.py` + +```python +from taskiq_redis import RedisStreamBroker + +broker = RedisStreamBroker( + url=env.redis.uri_durable, + queue_name="queues:evaluations", + consumer_group_name="worker-evaluations", +) + +evaluations_worker = EvaluationsWorker(broker=broker) +``` + +**Run**: `python -m entrypoints.worker_evaluations` + +**Pattern**: +- Tasks defined in `api/oss/src/tasks/taskiq/evaluations/worker.py` +- Uses `@self.broker.task()` decorator +- Supports `retry_on_error=True`, `max_retries=N` +- Triggered via `.kiq()` (kick) method + +#### B. Custom Redis Streams Worker (Tracing) + +**Location**: `api/entrypoints/worker_tracing.py` + +```python +tracing_worker = TracingWorker( + service=tracing_service, + redis_client=redis_client, + stream_name="streams:tracing", + consumer_group="worker-tracing", +) +await tracing_worker.run() +``` + +**Pattern**: +- Custom consumer using `XREADGROUP` +- Batching, grouping, manual ACK/DEL +- Used for high-throughput ingestion + +### 3. Cron Jobs + +**Location**: `api/oss/src/crons/` and `api/ee/src/crons/` + +**Docker Setup**: `hosting/docker-compose/*/docker-compose.*.yml` + +```yaml +cron: + image: agenta-oss-dev-api:latest + command: cron -f + volumes: + - ../../../api/oss/src/crons/queries.sh:/queries.sh +``` + +**Pattern**: +- Cron container runs `cron -f` +- Shell scripts mounted as volumes +- Scripts make HTTP calls to internal API endpoints +- Crontab files (`*.txt`) define schedules + +**Existing Jobs**: + +| Job | Schedule | Purpose | Script | +|-----|----------|---------|--------| +| `queries` | `* * * * *` (every minute) | Refresh evaluation runs | `queries.sh` | +| `meters` (EE) | `15,45 * * * *` (twice/hour) | Report billing usage | `meters.sh` | +| `spans` (EE) | Varies | Span cleanup | `spans.sh` | + +**Example Cron Script Pattern**: +```bash +#!/bin/sh +AGENTA_AUTH_KEY=$(tr '\0' '\n' < /proc/1/environ | grep ^AGENTA_AUTH_KEY= | cut -d= -f2-) +curl -X POST -H "Authorization: Access ${AGENTA_AUTH_KEY}" \ + "http://api:8000/admin/some-endpoint" +``` + +### 4. PostgreSQL + +Used for persistent data storage. Webhooks tables would go here. + +**Migrations**: `api/oss/databases/postgres/migrations/core/` + +--- + +## Recommended Integration for Webhooks + +### A. Task Queue: Taskiq + Redis Streams + +Follow the `EvaluationsWorker` pattern: + +``` +api/ +├── entrypoints/ +│ └── worker_webhooks.py # NEW: Entrypoint +└── oss/src/tasks/taskiq/webhooks/ + ├── __init__.py + └── worker.py # NEW: WebhooksWorker +``` + +**Why Taskiq (not custom Redis Streams worker)**: +- Webhook delivery is discrete tasks, not high-throughput streams +- Taskiq provides built-in retry semantics +- Same pattern as evaluations = less cognitive load + +### B. Cron: Cleanup & Monitoring + +Add cron jobs for: + +1. **Cleanup old deliveries** (daily) + ```bash + # webhooks-cleanup.sh + curl -X POST "http://api:8000/admin/webhooks/cleanup?days=30" + ``` + +2. **Retry stuck deliveries** (every 5 min) + ```bash + # webhooks-retry.sh + curl -X POST "http://api:8000/admin/webhooks/retry-stuck" + ``` + +**Crontab** (`api/oss/src/crons/webhooks.txt`): +``` +*/5 * * * * root sh /webhooks-retry.sh >> /proc/1/fd/1 2>&1 +0 3 * * * root sh /webhooks-cleanup.sh >> /proc/1/fd/1 2>&1 +``` + +### C. Database: PostgreSQL + +Add tables via Alembic migration (see `schema.md`). + +--- + +## File Structure (Complete) + +``` +api/ +├── entrypoints/ +│ ├── worker_evaluations.py # Existing +│ ├── worker_tracing.py # Existing +│ └── worker_webhooks.py # NEW +├── oss/ +│ ├── src/ +│ │ ├── routers/ +│ │ │ ├── variants_router.py # MODIFY: Add event emission +│ │ │ └── webhooks_router.py # NEW: CRUD + history endpoints +│ │ ├── services/ +│ │ │ └── webhook_service.py # NEW: emit_webhook_event() +│ │ ├── tasks/ +│ │ │ └── taskiq/ +│ │ │ └── webhooks/ +│ │ │ ├── __init__.py # NEW +│ │ │ └── worker.py # NEW: WebhooksWorker +│ │ ├── models/ +│ │ │ ├── db_models.py # MODIFY: Add WebhookDB, WebhookDeliveryDB +│ │ │ └── api/ +│ │ │ └── webhook_models.py # NEW: Pydantic models +│ │ └── crons/ +│ │ ├── webhooks.txt # NEW: Crontab +│ │ ├── webhooks-cleanup.sh # NEW +│ │ └── webhooks-retry.sh # NEW +│ └── databases/postgres/migrations/core/versions/ +│ └── xxxx_add_webhooks.py # NEW: Alembic migration +└── hosting/docker-compose/ + └── oss/docker-compose.dev.yml # MODIFY: Add worker-webhooks service +``` + +--- + +## Integration Points in Existing Code + +### 1. Event Emission (variants_router.py) + +**Location**: `api/oss/src/routers/variants_router.py` + +**Function**: `configs_deploy()` (line ~785) + +```python +@router.post("/configs/deploy", ...) +async def configs_deploy(request: Request, ...): + config = await deploy_config(...) + + if not config: + raise HTTPException(status_code=404, detail="Config not found.") + + await invalidate_cache(project_id=request.state.project_id) + + # ADD: Emit webhook event + await emit_webhook_event( + project_id=request.state.project_id, + event_type="config.deployed", + data={ + "config": config.model_dump(), + "deployed_by": request.state.user_id, + } + ) + + return config +``` + +### 2. Webhook Worker Instantiation + +**Location**: `api/entrypoints/worker_webhooks.py` (NEW) + +Follow the pattern from `worker_evaluations.py`: + +```python +from taskiq_redis import RedisStreamBroker +from oss.src.tasks.taskiq.webhooks.worker import WebhooksWorker + +broker = RedisStreamBroker( + url=env.redis.uri_durable, + queue_name="queues:webhooks", + consumer_group_name="worker-webhooks", + idle_timeout=3600000, # 1 hour + socket_timeout=30, + socket_connect_timeout=30, +) + +webhooks_worker = WebhooksWorker(broker=broker) + +def main(): + args = WorkerArgs( + broker="entrypoints.worker_webhooks:broker", + modules=[], + fs_discover=False, + workers=1, + max_async_tasks=10, + ) + return run_worker(args) +``` + +### 3. Router Registration + +**Location**: `api/entrypoints/routers.py` or main app + +```python +from oss.src.routers import webhooks_router + +app.include_router( + webhooks_router.router, + prefix="/api/projects/{project_id}", + tags=["webhooks"], +) +``` + +### 4. Docker Compose + +**Location**: `hosting/docker-compose/oss/docker-compose.dev.yml` + +```yaml +worker-webhooks: + <<: *worker-common # or copy from worker-tracing + container_name: agenta-worker-webhooks + command: ["python", "-m", "entrypoints.worker_webhooks"] + depends_on: + redis-durable: + condition: service_healthy + postgres: + condition: service_healthy + restart: always + +cron: + volumes: + - ../../../api/oss/src/crons/queries.sh:/queries.sh + - ../../../api/oss/src/crons/webhooks-cleanup.sh:/webhooks-cleanup.sh # ADD + - ../../../api/oss/src/crons/webhooks-retry.sh:/webhooks-retry.sh # ADD +``` + +--- + +## Environment Variables + +No new env vars required for MVP. Uses existing: +- `REDIS_URI_DURABLE` - For Taskiq broker +- `AGENTA_AUTH_KEY` - For internal cron API calls + +Optional future env vars: +```bash +WEBHOOK_TIMEOUT=10 # HTTP timeout in seconds +WEBHOOK_MAX_RETRIES=6 # Max delivery attempts +WEBHOOK_REQUIRE_HTTPS=true # Require HTTPS endpoints +``` + +--- + +## Permissions (EE Mode) + +**Location**: `api/ee/src/models/shared_models.py` + +```python +class Permission(str, Enum): + # ... existing ... + VIEW_WEBHOOKS = "view_webhooks" + MANAGE_WEBHOOKS = "manage_webhooks" +``` + +Permission checks follow existing pattern in routers. diff --git a/docs/design/webhooks-config-sync/plan.md b/docs/design/webhooks-config-sync/plan.md new file mode 100644 index 0000000000..c07be606cf --- /dev/null +++ b/docs/design/webhooks-config-sync/plan.md @@ -0,0 +1,317 @@ +# Implementation Plan + +## Overview + +Phased approach leveraging **existing Agenta infrastructure**: +- **Taskiq + Redis Streams** for task queue (like evaluations) +- **Cron** for cleanup and stuck retry (like meters/queries) +- **PostgreSQL** for persistent storage + +--- + +## MVP Scope (v1) + +### Goal +Enable users to receive webhook notifications when a config is deployed to an environment. + +### In Scope +- Single event type: `config.deployed` +- Project-level webhooks (no app/environment filtering in MVP) +- CRUD API for webhook management +- HMAC-SHA256 signature verification +- Taskiq-based delivery with automatic retries +- Delivery history (last 30 days) +- Manual retry capability +- Test webhook endpoint +- Cron jobs for cleanup and stuck retry + +### Out of Scope (Defer to v2) +- Multiple event types (`config.committed`, `variant.created`, etc.) +- App/environment filtering +- Webhook management UI +- Advanced analytics/monitoring +- Native GitHub App integration + +--- + +## Phase 1: Database & Models (2-3 days) + +### Tasks +1. [ ] Create Alembic migration for `webhooks` and `webhook_deliveries` tables +2. [ ] Add SQLAlchemy models to `db_models.py` +3. [ ] Create Pydantic models in `webhook_models.py` +4. [ ] Add DB manager functions for webhook CRUD + +### Deliverables +``` +api/oss/databases/postgres/migrations/core/versions/ +└── xxxx_add_webhooks.py + +api/oss/src/models/ +├── db_models.py # ADD: WebhookDB, WebhookDeliveryDB +└── api/ + └── webhook_models.py # NEW +``` + +--- + +## Phase 2: Webhook Management API (2-3 days) + +### Tasks +1. [ ] Create `webhooks_router.py` with CRUD endpoints +2. [ ] Implement secret generation (secure random bytes) +3. [ ] Add URL validation (HTTPS required in production) +4. [ ] Add permission checks for EE mode +5. [ ] Register router in main app +6. [ ] Write unit tests + +### Endpoints +``` +GET /api/projects/{project_id}/webhooks +POST /api/projects/{project_id}/webhooks +GET /api/projects/{project_id}/webhooks/{webhook_id} +PATCH /api/projects/{project_id}/webhooks/{webhook_id} +DELETE /api/projects/{project_id}/webhooks/{webhook_id} +POST /api/projects/{project_id}/webhooks/{webhook_id}/regenerate-secret +``` + +### Deliverables +``` +api/oss/src/routers/ +└── webhooks_router.py # NEW +``` + +--- + +## Phase 3: Taskiq Worker (3-4 days) + +### Tasks +1. [ ] Create `WebhooksWorker` class following `EvaluationsWorker` pattern +2. [ ] Define `deliver_webhook` task with retry support +3. [ ] Implement HMAC signature generation +4. [ ] Implement HTTP delivery with timeout (10s) +5. [ ] Create worker entrypoint +6. [ ] Add to docker-compose +7. [ ] Write integration tests + +### Taskiq Task Definition +```python +@self.broker.task( + task_name="webhooks.deliver", + retry_on_error=True, + max_retries=6, +) +async def deliver_webhook( + delivery_id: UUID, + webhook_id: UUID, + event_type: str, + payload: dict, +) -> dict: + # Fetch webhook, sign payload, deliver, update status +``` + +### Deliverables +``` +api/ +├── entrypoints/ +│ └── worker_webhooks.py # NEW +└── oss/src/tasks/taskiq/webhooks/ + ├── __init__.py # NEW + └── worker.py # NEW + +hosting/docker-compose/oss/docker-compose.dev.yml # ADD worker-webhooks service +``` + +--- + +## Phase 4: Event Emission (2 days) + +### Tasks +1. [ ] Create `webhook_service.py` with `emit_webhook_event()` function +2. [ ] Integrate with `configs_deploy()` endpoint +3. [ ] Build webhook payload with proper structure +4. [ ] Queue Taskiq tasks for all matching webhooks +5. [ ] Write unit tests + +### Integration Point +```python +# In variants_router.py, configs_deploy() +from oss.src.services.webhook_service import emit_webhook_event + +await emit_webhook_event( + project_id=request.state.project_id, + event_type="config.deployed", + data={ + "config": config.model_dump(), + "deployed_by": request.state.user_id, + } +) +``` + +### Deliverables +``` +api/oss/src/services/ +└── webhook_service.py # NEW + +api/oss/src/routers/ +└── variants_router.py # MODIFY +``` + +--- + +## Phase 5: Cron Jobs (1 day) + +### Tasks +1. [ ] Create admin endpoints for cleanup and retry +2. [ ] Create cron shell scripts +3. [ ] Create crontab file +4. [ ] Update docker-compose volumes + +### Admin Endpoints +```python +# In admin_router.py or webhooks_router.py +@router.post("/admin/webhooks/cleanup") +async def cleanup_old_deliveries(days: int = 30): + """Delete deliveries older than N days.""" + +@router.post("/admin/webhooks/retry-stuck") +async def retry_stuck_deliveries(): + """Retry deliveries stuck in 'delivering' state > 1 hour.""" +``` + +### Cron Scripts +```bash +# webhooks-cleanup.sh (daily at 3am) +curl -X POST -H "Authorization: Access ${AGENTA_AUTH_KEY}" \ + "http://api:8000/admin/webhooks/cleanup?days=30" + +# webhooks-retry.sh (every 5 minutes) +curl -X POST -H "Authorization: Access ${AGENTA_AUTH_KEY}" \ + "http://api:8000/admin/webhooks/retry-stuck" +``` + +### Crontab +``` +# webhooks.txt +*/5 * * * * root sh /webhooks-retry.sh >> /proc/1/fd/1 2>&1 +0 3 * * * root sh /webhooks-cleanup.sh >> /proc/1/fd/1 2>&1 +``` + +### Deliverables +``` +api/oss/src/crons/ +├── webhooks.txt # NEW +├── webhooks-cleanup.sh # NEW +└── webhooks-retry.sh # NEW + +hosting/docker-compose/oss/docker-compose.dev.yml # ADD volumes to cron service +``` + +--- + +## Phase 6: Delivery History & Testing (2 days) + +### Tasks +1. [ ] Add delivery history endpoints to router +2. [ ] Implement test webhook endpoint +3. [ ] Add manual retry capability +4. [ ] Write integration tests for full flow + +### Additional Endpoints +``` +GET /api/projects/{project_id}/webhooks/{webhook_id}/deliveries +GET /api/projects/{project_id}/webhooks/{webhook_id}/deliveries/{delivery_id} +POST /api/projects/{project_id}/webhooks/{webhook_id}/deliveries/{delivery_id}/retry +POST /api/projects/{project_id}/webhooks/{webhook_id}/test +``` + +--- + +## Phase 7: Documentation & Rollout (1-2 days) + +### Tasks +1. [ ] Update API documentation +2. [ ] Write user-facing documentation +3. [ ] Add signature verification examples (Python, Node, Go) +4. [ ] Create changelog entry +5. [ ] Deploy to staging +6. [ ] Deploy to production + +--- + +## Timeline Summary + +| Phase | Duration | Dependencies | +|-------|----------|--------------| +| 1. Database & Models | 2-3 days | None | +| 2. Management API | 2-3 days | Phase 1 | +| 3. Taskiq Worker | 3-4 days | Phase 1 | +| 4. Event Emission | 2 days | Phase 3 | +| 5. Cron Jobs | 1 day | Phase 1 | +| 6. History & Testing | 2 days | Phase 2, 4 | +| 7. Documentation | 1-2 days | Phase 6 | + +**Total: ~13-17 days** (3-4 weeks with buffer) + +--- + +## Infrastructure Summary + +### Uses Existing +| Component | Existing | Purpose | +|-----------|----------|---------| +| Redis (durable) | `redis-durable:6381` | Taskiq broker | +| PostgreSQL | `postgres:5432` | Webhooks tables | +| Cron container | `cron` | Cleanup & retry jobs | +| Taskiq | `taskiq_redis` | Task queue | + +### New Additions +| Component | Type | Purpose | +|-----------|------|---------| +| `worker-webhooks` | Docker service | Taskiq worker process | +| `webhooks` table | PostgreSQL | Webhook subscriptions | +| `webhook_deliveries` table | PostgreSQL | Delivery queue/history | +| `queues:webhooks` | Redis Stream | Task queue | + +--- + +## v2 Roadmap (Future) + +### Additional Event Types +- `config.committed` - New config version saved +- `variant.created` / `variant.deleted` +- `evaluation.completed` + +### Filtering +- Filter by application +- Filter by environment +- Filter by labels/tags + +### UI +- Webhook management in dashboard +- Delivery logs viewer + +### Scale +- If volume grows significantly, evaluate Svix + +--- + +## Complexity Assessment + +| Aspect | Rating | Notes | +|--------|--------|-------| +| Overall | **Medium** | Follows existing patterns | +| Database | Low | Two simple tables | +| API | Low | Standard CRUD | +| Taskiq Worker | Low | Copy EvaluationsWorker pattern | +| Cron | Low | Copy existing scripts | +| Event Emission | Low | Single integration point | +| Testing | Medium | Need mock HTTP server | + +**Risk factors:** +- Taskiq retry behavior customization +- Ensuring delivery idempotency + +**Mitigation:** +- Start with Taskiq defaults, customize if needed +- Clear documentation on `X-Agenta-Delivery` header for consumers diff --git a/docs/design/webhooks-config-sync/research.md b/docs/design/webhooks-config-sync/research.md new file mode 100644 index 0000000000..40a9b0f8af --- /dev/null +++ b/docs/design/webhooks-config-sync/research.md @@ -0,0 +1,157 @@ +# Webhook Implementation Research + +## 1. Industry Best Practices + +### Stripe Webhooks + +**Signature Verification**: +- Header: `Stripe-Signature` +- Format: `t=,v1=` +- Uses HMAC-SHA256 with endpoint secret +- Timestamp prevents replay attacks + +**Delivery**: +- 10-second timeout +- Automatic retries with exponential backoff (up to 3 days) +- Idempotency via `X-GitHub-Delivery` (event ID) + +**Best Practices from Stripe**: +1. Return 2xx quickly, process async +2. Handle duplicate events (idempotency) +3. Verify signatures BEFORE processing +4. Only subscribe to needed events + +### GitHub Webhooks + +**Key Headers**: +- `X-GitHub-Event`: Event type +- `X-GitHub-Delivery`: Unique delivery ID (for idempotency) +- `X-Hub-Signature-256`: HMAC-SHA256 signature + +**Best Practices**: +1. Respond within 10 seconds +2. Use async processing (queue payloads) +3. Redeliver missed deliveries manually +4. Check event type AND action before processing + +**Retry Policy**: +- GitHub allows manual redelivery +- No automatic retries (by design) + +### Common Patterns Summary + +| Feature | Stripe | GitHub | +|---------|--------|--------| +| Signature | HMAC-SHA256 | HMAC-SHA256 | +| Signature Header | `Stripe-Signature` | `X-Hub-Signature-256` | +| Timestamp in Signature | Yes | No | +| Delivery ID | Yes | `X-GitHub-Delivery` | +| Auto Retries | Yes (exponential) | No | +| Timeout | 10s | 10s | + +Other platforms in the LLM tooling space follow similar patterns. HMAC-SHA256 with timestamps is the de facto standard for webhook security. + +--- + +## 2. Build vs Buy Analysis + +### Option: Use Svix (Webhook-as-a-Service) + +**Pros**: +- Handles retries, monitoring, security automatically +- Embeddable consumer portal +- SOC 2 Type II, HIPAA compliant +- Used by Brex, Clerk, Replicate + +**Cons**: +- Additional dependency and cost +- May be overkill for MVP +- Adds external service to infrastructure + +**Pricing**: Free tier available, scales with usage + +### Option: Build from Scratch + +**Pros**: +- Full control +- No external dependencies +- Simpler for MVP scope +- Can add Svix later if needed + +**Cons**: +- Must implement retries, monitoring, etc. +- More maintenance burden long-term + +### Recommendation +**Start with build-from-scratch for MVP**, then evaluate Svix if: +- Webhook volume grows significantly +- Need advanced features (consumer portal, detailed analytics) +- Want to offload maintenance + +--- + +## 3. Alternatives to Webhooks + +### Polling Endpoint +``` +GET /api/configs/changes?since=2024-01-15T10:00:00Z +``` + +**Pros**: Simple, no infrastructure +**Cons**: Latency, wasted requests, not real-time + +### Server-Sent Events (SSE) +**Pros**: Real-time, simpler than WebSockets +**Cons**: Connection management, not great for server-to-server + +### Native GitHub App +Instead of generic webhook → custom receiver → GitHub, build a native GitHub App integration. + +**Pros**: Better UX, no intermediate server needed +**Cons**: GitHub-specific, more complex to build + +### Recommendation +**Webhooks are the right choice** because: +1. Standard pattern that users expect +2. Works for any integration (not just GitHub) +3. Multiple LLM platforms already offer similar functionality, validating the use case + +--- + +## 4. Security Considerations + +### HMAC Signature Verification (Required) +```python +import hmac +import hashlib + +def verify_webhook(payload: str, signature_header: str, secret: str) -> bool: + # Parse "t=,s=" + parts = dict(p.split("=") for p in signature_header.split(",")) + timestamp = parts["t"] + received_sig = parts["s"] + + # Compute expected signature + message = f"{timestamp}.{payload}" + expected_sig = hmac.new( + secret.encode(), + message.encode(), + hashlib.sha256 + ).hexdigest() + + return hmac.compare_digest(received_sig, expected_sig) +``` + +### Additional Security Measures +1. **Timestamp validation**: Reject if timestamp > 5 minutes old (replay protection) +2. **HTTPS only**: Reject HTTP endpoints in production +3. **Secret rotation**: Allow regenerating signing secrets +4. **IP allowlist**: Optional feature for enterprise + +### Common Attack Vectors +| Attack | Mitigation | +|--------|------------| +| Replay attacks | Timestamp in signature | +| SSRF | Validate URLs, no internal IPs | +| Spoofing | HMAC signature verification | +| Denial of service | Rate limiting on sends | diff --git a/docs/design/webhooks-config-sync/schema.md b/docs/design/webhooks-config-sync/schema.md new file mode 100644 index 0000000000..d61a706f1a --- /dev/null +++ b/docs/design/webhooks-config-sync/schema.md @@ -0,0 +1,348 @@ +# Database Schema Proposal + +## Overview + +Two main tables needed: +1. **webhooks** - Webhook endpoint configurations +2. **webhook_deliveries** - Delivery queue and history + +Plus optional tables for v2: +3. **webhook_event_types** - Event type registry +4. **webhook_delivery_attempts** - Detailed attempt history + +--- + +## Core Tables + +### 1. webhooks (Webhook Subscriptions) + +```sql +CREATE TABLE webhooks ( + id UUID PRIMARY KEY DEFAULT uuid_generate_v7(), + + -- Ownership + project_id UUID NOT NULL REFERENCES projects(id) ON DELETE CASCADE, + + -- Configuration + name VARCHAR(255) NOT NULL, + url VARCHAR(2048) NOT NULL, + secret VARCHAR(255) NOT NULL, -- HMAC signing secret + + -- Filtering (which events to receive) + event_types JSONB NOT NULL DEFAULT '["*"]', -- ["config.deployed", "config.committed"] or ["*"] + + -- Optional filters + application_id UUID REFERENCES app_db(id) ON DELETE SET NULL, -- Filter by app + environment_name VARCHAR(255), -- Filter by environment (e.g., "production") + + -- Status + is_active BOOLEAN NOT NULL DEFAULT true, + + -- Metadata + description TEXT, + headers JSONB DEFAULT '{}', -- Custom headers to include + + -- Audit + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + created_by_id UUID REFERENCES users(id) ON DELETE SET NULL, + + -- Constraints + CONSTRAINT webhooks_url_check CHECK (url ~ '^https?://'), + CONSTRAINT webhooks_event_types_check CHECK (jsonb_typeof(event_types) = 'array') +); + +-- Indexes +CREATE INDEX idx_webhooks_project_id ON webhooks(project_id); +CREATE INDEX idx_webhooks_project_active ON webhooks(project_id, is_active) WHERE is_active = true; +CREATE INDEX idx_webhooks_application_id ON webhooks(application_id) WHERE application_id IS NOT NULL; +``` + +### 2. webhook_deliveries (Delivery Queue & History) + +```sql +CREATE TYPE webhook_delivery_status AS ENUM ( + 'pending', -- Waiting to be delivered + 'delivering', -- Currently being delivered (prevents duplicate pickup) + 'delivered', -- Successfully delivered + 'failed' -- Failed after all retries exhausted +); + +CREATE TABLE webhook_deliveries ( + id UUID PRIMARY KEY DEFAULT uuid_generate_v7(), + + -- Reference + webhook_id UUID NOT NULL REFERENCES webhooks(id) ON DELETE CASCADE, + + -- Event data + event_id UUID NOT NULL, -- Idempotency key + event_type VARCHAR(255) NOT NULL, + payload JSONB NOT NULL, + + -- Delivery state + status webhook_delivery_status NOT NULL DEFAULT 'pending', + attempts INTEGER NOT NULL DEFAULT 0, + max_attempts INTEGER NOT NULL DEFAULT 6, + + -- Scheduling + scheduled_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + + -- Results + last_attempt_at TIMESTAMPTZ, + last_response_status INTEGER, + last_response_body TEXT, + last_error TEXT, + + -- Success tracking + delivered_at TIMESTAMPTZ, + + -- Audit + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + + -- Constraints + CONSTRAINT webhook_deliveries_attempts_check CHECK (attempts >= 0) +); + +-- Indexes for queue processing +CREATE INDEX idx_webhook_deliveries_pending + ON webhook_deliveries(scheduled_at) + WHERE status = 'pending'; + +CREATE INDEX idx_webhook_deliveries_webhook_id + ON webhook_deliveries(webhook_id); + +CREATE INDEX idx_webhook_deliveries_event_id + ON webhook_deliveries(event_id); + +-- For cleanup/retention policies +CREATE INDEX idx_webhook_deliveries_created_at + ON webhook_deliveries(created_at); +``` + +--- + +## SQLAlchemy Models + +```python +# api/oss/src/models/db_models.py + +from enum import Enum as PyEnum + +class WebhookDeliveryStatus(PyEnum): + PENDING = "pending" + DELIVERING = "delivering" + DELIVERED = "delivered" + FAILED = "failed" + + +class WebhookDB(Base): + __tablename__ = "webhooks" + + id = Column( + UUID(as_uuid=True), + primary_key=True, + default=uuid.uuid7, + unique=True, + nullable=False, + ) + + # Ownership + project_id = Column( + UUID(as_uuid=True), + ForeignKey("projects.id", ondelete="CASCADE"), + nullable=False, + ) + + # Configuration + name = Column(String(255), nullable=False) + url = Column(String(2048), nullable=False) + secret = Column(String(255), nullable=False) # Encrypted at rest + + # Filtering + event_types = Column( + mutable_json_type(dbtype=JSONB, nested=True), + nullable=False, + default=["*"], + ) + application_id = Column( + UUID(as_uuid=True), + ForeignKey("app_db.id", ondelete="SET NULL"), + nullable=True, + ) + environment_name = Column(String(255), nullable=True) + + # Status + is_active = Column(Boolean, nullable=False, default=True) + + # Metadata + description = Column(String, nullable=True) + headers = Column( + mutable_json_type(dbtype=JSONB, nested=True), + nullable=True, + default={}, + ) + + # Audit + created_at = Column( + DateTime(timezone=True), + default=lambda: datetime.now(timezone.utc), + nullable=False, + ) + updated_at = Column( + DateTime(timezone=True), + default=lambda: datetime.now(timezone.utc), + nullable=False, + ) + created_by_id = Column( + UUID(as_uuid=True), + ForeignKey("users.id", ondelete="SET NULL"), + nullable=True, + ) + + # Relationships + project = relationship("ProjectDB") + application = relationship("AppDB") + created_by = relationship("UserDB") + + +class WebhookDeliveryDB(Base): + __tablename__ = "webhook_deliveries" + + id = Column( + UUID(as_uuid=True), + primary_key=True, + default=uuid.uuid7, + unique=True, + nullable=False, + ) + + # Reference + webhook_id = Column( + UUID(as_uuid=True), + ForeignKey("webhooks.id", ondelete="CASCADE"), + nullable=False, + ) + + # Event data + event_id = Column(UUID(as_uuid=True), nullable=False) + event_type = Column(String(255), nullable=False) + payload = Column( + mutable_json_type(dbtype=JSONB, nested=True), + nullable=False, + ) + + # Delivery state + status = Column( + Enum(WebhookDeliveryStatus, name="webhook_delivery_status"), + nullable=False, + default=WebhookDeliveryStatus.PENDING, + ) + attempts = Column(Integer, nullable=False, default=0) + max_attempts = Column(Integer, nullable=False, default=6) + + # Scheduling + scheduled_at = Column( + DateTime(timezone=True), + default=lambda: datetime.now(timezone.utc), + nullable=False, + ) + + # Results + last_attempt_at = Column(DateTime(timezone=True), nullable=True) + last_response_status = Column(Integer, nullable=True) + last_response_body = Column(String, nullable=True) + last_error = Column(String, nullable=True) + delivered_at = Column(DateTime(timezone=True), nullable=True) + + # Audit + created_at = Column( + DateTime(timezone=True), + default=lambda: datetime.now(timezone.utc), + nullable=False, + ) + + # Relationships + webhook = relationship("WebhookDB") +``` + +--- + +## Migration Script + +```python +# alembic/versions/xxxx_add_webhooks_tables.py + +def upgrade(): + # Create enum type + op.execute(""" + CREATE TYPE webhook_delivery_status AS ENUM ( + 'pending', 'delivering', 'delivered', 'failed' + ) + """) + + # Create webhooks table + op.create_table( + 'webhooks', + sa.Column('id', UUID(as_uuid=True), primary_key=True, default=uuid.uuid7), + sa.Column('project_id', UUID(as_uuid=True), sa.ForeignKey('projects.id', ondelete='CASCADE'), nullable=False), + sa.Column('name', sa.String(255), nullable=False), + sa.Column('url', sa.String(2048), nullable=False), + sa.Column('secret', sa.String(255), nullable=False), + sa.Column('event_types', JSONB, nullable=False, server_default='["*"]'), + sa.Column('application_id', UUID(as_uuid=True), sa.ForeignKey('app_db.id', ondelete='SET NULL'), nullable=True), + sa.Column('environment_name', sa.String(255), nullable=True), + sa.Column('is_active', sa.Boolean, nullable=False, server_default='true'), + sa.Column('description', sa.String, nullable=True), + sa.Column('headers', JSONB, nullable=True, server_default='{}'), + sa.Column('created_at', sa.DateTime(timezone=True), nullable=False, server_default=sa.func.now()), + sa.Column('updated_at', sa.DateTime(timezone=True), nullable=False, server_default=sa.func.now()), + sa.Column('created_by_id', UUID(as_uuid=True), sa.ForeignKey('users.id', ondelete='SET NULL'), nullable=True), + ) + + op.create_index('idx_webhooks_project_id', 'webhooks', ['project_id']) + op.create_index('idx_webhooks_project_active', 'webhooks', ['project_id', 'is_active'], postgresql_where='is_active = true') + + # Create webhook_deliveries table + op.create_table( + 'webhook_deliveries', + sa.Column('id', UUID(as_uuid=True), primary_key=True, default=uuid.uuid7), + sa.Column('webhook_id', UUID(as_uuid=True), sa.ForeignKey('webhooks.id', ondelete='CASCADE'), nullable=False), + sa.Column('event_id', UUID(as_uuid=True), nullable=False), + sa.Column('event_type', sa.String(255), nullable=False), + sa.Column('payload', JSONB, nullable=False), + sa.Column('status', sa.Enum('pending', 'delivering', 'delivered', 'failed', name='webhook_delivery_status'), nullable=False, server_default='pending'), + sa.Column('attempts', sa.Integer, nullable=False, server_default='0'), + sa.Column('max_attempts', sa.Integer, nullable=False, server_default='6'), + sa.Column('scheduled_at', sa.DateTime(timezone=True), nullable=False, server_default=sa.func.now()), + sa.Column('last_attempt_at', sa.DateTime(timezone=True), nullable=True), + sa.Column('last_response_status', sa.Integer, nullable=True), + sa.Column('last_response_body', sa.String, nullable=True), + sa.Column('last_error', sa.String, nullable=True), + sa.Column('delivered_at', sa.DateTime(timezone=True), nullable=True), + sa.Column('created_at', sa.DateTime(timezone=True), nullable=False, server_default=sa.func.now()), + ) + + op.create_index('idx_webhook_deliveries_pending', 'webhook_deliveries', ['scheduled_at'], postgresql_where="status = 'pending'") + op.create_index('idx_webhook_deliveries_webhook_id', 'webhook_deliveries', ['webhook_id']) + + +def downgrade(): + op.drop_table('webhook_deliveries') + op.drop_table('webhooks') + op.execute('DROP TYPE webhook_delivery_status') +``` + +--- + +## Data Retention + +Recommended retention policy for `webhook_deliveries`: + +```sql +-- Run daily via cron/scheduler +DELETE FROM webhook_deliveries +WHERE created_at < NOW() - INTERVAL '30 days' + AND status IN ('delivered', 'failed'); +``` + +Or use PostgreSQL's `pg_partman` for automatic partition management by date.