Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion api_server_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,31 @@

from cloud_pipelines_backend import api_router
from cloud_pipelines_backend import database_ops
from cloud_pipelines_backend.instrumentation.request_middleware import RequestContextMiddleware
from cloud_pipelines_backend.instrumentation import logging_context

app = fastapi.FastAPI(
title="Cloud Pipelines API",
version="0.0.1",
separate_input_output_schemas=False,
)

# Add request context middleware for automatic request_id generation
app.add_middleware(RequestContextMiddleware)


@app.exception_handler(Exception)
def handle_error(request: fastapi.Request, exc: BaseException):
exception_str = traceback.format_exception(type(exc), exc, exc.__traceback__)
return fastapi.responses.JSONResponse(
response = fastapi.responses.JSONResponse(
status_code=503,
content={"exception": exception_str},
)
# Add request_id to error responses for traceability
request_id = logging_context.get_context_metadata("request_id")
if request_id:
response.headers["x-tangle-request-id"] = request_id
return response


DEFAULT_DATABASE_URI = "sqlite:///db.sqlite"
Expand Down
15 changes: 13 additions & 2 deletions cloud_pipelines_backend/api_router.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from . import component_library_api_server as components_api
from . import database_ops
from . import errors
from .instrumentation import logging_context

if typing.TYPE_CHECKING:
from .launchers import interfaces as launcher_interfaces
Expand Down Expand Up @@ -95,17 +96,27 @@ def _setup_routes_internal(

@app.exception_handler(errors.ItemNotFoundError)
def handle_not_found_error(request: fastapi.Request, exc: errors.ItemNotFoundError):
return fastapi.responses.JSONResponse(
response = fastapi.responses.JSONResponse(
status_code=404,
content={"message": str(exc)},
)
# Add request_id to error responses for traceability
request_id = logging_context.get_context_metadata("request_id")
if request_id:
response.headers["x-tangle-request-id"] = request_id
return response

@app.exception_handler(errors.PermissionError)
def handle_permission_error(request: fastapi.Request, exc: errors.PermissionError):
return fastapi.responses.JSONResponse(
response = fastapi.responses.JSONResponse(
status_code=403,
content={"message": str(exc)},
)
# Add request_id to error responses for traceability
request_id = logging_context.get_context_metadata("request_id")
if request_id:
response.headers["x-tangle-request-id"] = request_id
return response

get_user_details_dependency = fastapi.Depends(user_details_getter)

Expand Down
111 changes: 111 additions & 0 deletions cloud_pipelines_backend/instrumentation/logging_context.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
"""Logging context management for distributed tracing and execution tracking.

This module provides utilities for managing arbitrary metadata in the logging context.
This metadata is automatically added to all log records for better filtering and correlation.

Common metadata keys:
- request_id: From API requests - groups all logs from a single API call
- pipeline_run_id: From PipelineRun.id - tracks the entire pipeline run
- execution_id: From ExecutionNode.id - tracks individual execution nodes
- container_execution_id: From ContainerExecution.id - tracks running containers
- user_id: User who initiated the operation
- Any other metadata you want to track in logs

Usage:
# Set metadata in context
with logging_context(request_id="abc123", user_id="user@example.com"):
logger.info("Processing") # Both fields in logs
"""

import contextvars
from contextlib import contextmanager
from typing import Any, Optional


# Single context variable to store all metadata as a dictionary
_context_metadata: contextvars.ContextVar[dict[str, Any]] = contextvars.ContextVar(
"context_metadata", default={}
)


def set_context_metadata(key: str, value: Any) -> None:
"""Set a metadata value in the current context.

Args:
key: The metadata key (e.g., 'execution_id', 'request_id', 'user_id')
value: The value to set
"""
metadata = _context_metadata.get().copy()
metadata[key] = value
_context_metadata.set(metadata)


def get_context_metadata(key: str) -> Optional[Any]:
"""Get a metadata value from the current context.

Args:
key: The metadata key to retrieve

Returns:
The metadata value or None if not set
"""
return _context_metadata.get().get(key)


def get_all_context_metadata() -> dict[str, Any]:
"""Get all metadata from the current context.

Returns:
Dictionary of all context metadata
"""
return _context_metadata.get().copy()


def clear_context_metadata() -> None:
"""Clear all metadata from the current context."""
_context_metadata.set({})


@contextmanager
def logging_context(**metadata: Any):
"""Context manager for setting arbitrary metadata that is automatically cleared.

This is the recommended way to set logging context. It ensures metadata is
always cleaned up, even if an exception occurs.

You can pass any keyword arguments, and they will be available in log records.
Common keys include: request_id, pipeline_run_id, execution_id, container_execution_id, user_id

Args:
**metadata: Arbitrary keyword arguments to add to the context

Example with IDs:
>>> with logging_context(pipeline_run_id="run123", execution_id="exec456"):
... logger.info("Processing execution") # Will include both IDs

Example with custom metadata:
>>> with logging_context(
... execution_id="exec456",
... user_id="user@example.com",
... operation="reprocessing"
... ):
... logger.info("Custom operation") # All metadata in logs

Example for API requests:
>>> request_id = generate_request_id()
>>> with logging_context(request_id=request_id):
... logger.info("Handling API request")
"""
# Store previous metadata to restore nested contexts
prev_metadata = get_all_context_metadata()

try:
# Set all provided metadata
for key, value in metadata.items():
if value is not None: # Only set non-None values
set_context_metadata(key, value)
yield
finally:
# Restore previous metadata
_context_metadata.set(prev_metadata)

64 changes: 64 additions & 0 deletions cloud_pipelines_backend/instrumentation/request_middleware.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
"""Request context middleware for FastAPI applications.

This middleware automatically generates a request_id for each incoming HTTP request,
sets it in the logging context for the duration of the request, and includes it in
the response headers.
"""

import logging
import secrets

from starlette.middleware.base import BaseHTTPMiddleware
from starlette.requests import Request
from starlette.responses import Response

from . import logging_context

logger = logging.getLogger(__name__)


def generate_request_id() -> str:
"""Generate a new request ID compatible with OpenTelemetry format.

OpenTelemetry trace IDs are 16-byte (128-bit) values represented as
32 hexadecimal characters (lowercase). We use the same format for
request IDs to maintain compatibility.

Returns:
A 32-character hexadecimal string representing the request ID
"""
return secrets.token_hex(16)


class RequestContextMiddleware(BaseHTTPMiddleware):
"""Middleware to manage request_id for each request.

For each incoming request:
1. Generates a new request_id (32-character hex string)
2. Sets it in the logging context (as 'request_id' key)
3. Adds it to the response headers as 'x-tangle-request-id'
4. Clears it after the request completes

This ensures all logs during the request processing include the same request_id.
"""

async def dispatch(self, request: Request, call_next) -> Response:
"""Process each request with a new request_id.

Args:
request: The incoming HTTP request
call_next: The next middleware or route handler

Returns:
The HTTP response with request_id in headers
"""
# Generate a new request_id for this request
request_id = generate_request_id()

# Use generic logging_context to set request_id
with logging_context.logging_context(request_id=request_id):
# Process the request
response = await call_next(request)
# Add request_id to response headers for client reference
response.headers["x-tangle-request-id"] = request_id
return response
Loading