Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ dev = [
"toml>=0.10.2,<0.11",
"twine>=4.0.1,<5",
"maturin>=1.8.2",
"openinference-instrumentation-openai-agents>=0.1.0",
"pytest-cov>=6.1.1",
"httpx>=0.28.1",
"pytest-pretty>=1.3.0",
Expand Down
194 changes: 194 additions & 0 deletions temporalio/contrib/openai_agents/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -536,6 +536,200 @@ SQLite storage is not suited to a distributed environment.
| :--------------- | :-------: |
| OpenAI platform | Yes |

## OpenTelemetry Integration

This integration provides seamless export of OpenAI agent telemetry to OpenTelemetry (OTEL) endpoints for observability and monitoring. The integration automatically handles workflow replay semantics, ensuring spans are only exported when workflows actually complete.

### Quick Start

To enable OTEL telemetry export, simply provide exporters to the `OpenAIAgentsPlugin` or `AgentEnvironment`:

```python
from temporalio.contrib.openai_agents import OpenAIAgentsPlugin
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter

# Your OTEL endpoint configuration
exporters = [
OTLPSpanExporter(endpoint="http://localhost:4317"),
# Add multiple exporters for different endpoints as needed
]

# For production applications
client = await Client.connect(
"localhost:7233",
plugins=[
OpenAIAgentsPlugin(
otel_exporters=exporters, # Enable OTEL integration
model_params=ModelActivityParameters(
start_to_close_timeout=timedelta(seconds=30)
)
),
],
)

# For testing
from temporalio.contrib.openai_agents.testing import AgentEnvironment

async with AgentEnvironment(
model=my_test_model,
otel_exporters=exporters # Enable OTEL integration for tests
) as env:
client = env.applied_on_client(base_client)
```

### Features

- **Multiple Exporters**: Send telemetry to multiple OTEL endpoints simultaneously
- **Replay-Safe**: Spans are only exported when workflows actually complete, not during replays
- **Deterministic IDs**: Consistent span IDs across workflow replays for reliable correlation
- **Automatic Setup**: No manual instrumentation required - just provide exporters
- **Graceful Degradation**: Works seamlessly whether OTEL dependencies are installed or not

### Dependencies

OTEL integration requires additional dependencies:

```bash
pip install openinference-instrumentation-openai-agents opentelemetry-sdk
```

Choose the appropriate OTEL exporter for your monitoring system:

```bash
# For OTLP (works with most OTEL collectors and monitoring systems)
pip install opentelemetry-exporter-otlp

# For Console output (development/debugging)
pip install opentelemetry-exporter-console

# Other exporters available for specific systems
pip install opentelemetry-exporter-<your-system>
```

### Example: Multiple Exporters

```python
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.exporter.console import ConsoleSpanExporter

exporters = [
# Production monitoring system
OTLPSpanExporter(
endpoint="https://your-monitoring-system:4317",
headers={"api-key": "your-api-key"}
),

# Secondary monitoring endpoint
OTLPSpanExporter(endpoint="https://backup-collector:4317"),

# Development debugging
ConsoleSpanExporter(),
]

plugin = OpenAIAgentsPlugin(otel_exporters=exporters)
```

### Error Handling

If you provide OTEL exporters but the required dependencies are not installed, you'll receive a clear error message:

```
ImportError: OTEL dependencies not available. Install with: pip install openinference-instrumentation-openai-agents opentelemetry-sdk
```

### Direct OpenTelemetry API Calls in Workflows

When using direct OpenTelemetry API calls within workflows (e.g., `opentelemetry.trace.get_tracer(__name__).start_as_current_span()`), you need to ensure proper context bridging and sandbox configuration.

#### Sandbox Configuration

Workflows run in a sandbox that restricts module access. To use direct OTEL API calls, you must explicitly allow OpenTelemetry passthrough:

```python
from temporalio.worker import Worker
from temporalio.worker.workflow_sandbox import SandboxedWorkflowRunner, SandboxRestrictions

# Configure worker with OpenTelemetry passthrough
worker = Worker(
client,
task_queue="my-task-queue",
workflows=[MyWorkflow],
workflow_runner=SandboxedWorkflowRunner(
SandboxRestrictions.default.with_passthrough_modules("opentelemetry")
)
)
```

#### Context Bridging Pattern

Direct OTEL spans must be created within an active OpenAI Agents SDK span to ensure proper parenting:

```python
import opentelemetry.trace
from agents import custom_span
from temporalio import workflow

@workflow.defn
class MyWorkflow:
@workflow.run
async def run(self) -> str:
# Start an SDK span first to establish OTEL context bridge
with custom_span("Workflow coordination"):
# Now direct OTEL spans will be properly parented
tracer = opentelemetry.trace.get_tracer(__name__)
with tracer.start_as_current_span("Custom workflow span"):
# Your workflow logic here
result = await self.do_work()
return result
```

#### Why This Pattern is Required

- **OpenInference instrumentation** bridges OpenAI Agents SDK spans to OpenTelemetry context
- **Direct OTEL API calls** without an active SDK span become root spans with no parent
- **SDK spans** (`custom_span()`) establish the context bridge that allows subsequent direct OTEL spans to inherit proper trace parenting

#### Complete Example

```python
import opentelemetry.trace
from agents import custom_span
from temporalio import workflow
from temporalio.worker import Worker
from temporalio.worker.workflow_sandbox import SandboxedWorkflowRunner, SandboxRestrictions

@workflow.defn
class TracedWorkflow:
@workflow.run
async def run(self) -> str:
# Establish OTEL context with SDK span
with custom_span("Main workflow"):
# Create direct OTEL spans for fine-grained tracing
tracer = opentelemetry.trace.get_tracer(__name__)

with tracer.start_as_current_span("Data processing"):
data = await self.process_data()

with tracer.start_as_current_span("Business logic"):
result = await self.execute_business_logic(data)

return result

# Worker configuration
worker = Worker(
client,
task_queue="traced-workflows",
workflows=[TracedWorkflow],
workflow_runner=SandboxedWorkflowRunner(
SandboxRestrictions.default.with_passthrough_modules("opentelemetry")
)
)
```

This ensures your direct OTEL spans are properly parented within the trace hierarchy initiated by your client SDK traces.

If no OTEL exporters are provided, the integration works normally without any OTEL setup.

### Voice

| Mode | Supported |
Expand Down
106 changes: 106 additions & 0 deletions temporalio/contrib/openai_agents/_otel.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
"""OpenTelemetry integration for OpenAI Agents in Temporal workflows.

This module provides utilities for properly exporting OpenAI agent telemetry
to OpenTelemetry endpoints from within Temporal workflows, handling workflow
replay semantics correctly.
"""

from opentelemetry.sdk.trace import ReadableSpan
from opentelemetry.sdk.trace.export import SimpleSpanProcessor
from opentelemetry.sdk.trace.id_generator import IdGenerator
from opentelemetry.trace import INVALID_SPAN_ID, INVALID_TRACE_ID

from temporalio import workflow


class TemporalIdGenerator(IdGenerator):
"""OpenTelemetry ID generator that provides deterministic IDs for Temporal workflows.

This generator ensures that span and trace IDs are deterministic when running
within Temporal workflows by using the workflow's deterministic random source.
This is crucial for maintaining consistency across workflow replays.
"""

def __init__(self):
"""Initialize the ID generator with empty trace and span pools."""
self.traces = []
self.spans = []

def generate_span_id(self) -> int:
"""Generate a deterministic span ID.

Uses the workflow's deterministic random source when in a workflow context,
otherwise falls back to system random.

Returns:
A 64-bit span ID that is guaranteed not to be INVALID_SPAN_ID.
"""
if workflow.in_workflow():
get_rand_bits = workflow.random().getrandbits
else:
import random

get_rand_bits = random.getrandbits

if len(self.spans) > 0:
return self.spans.pop()

span_id = get_rand_bits(64)
while span_id == INVALID_SPAN_ID:
span_id = get_rand_bits(64)
return span_id

def generate_trace_id(self) -> int:
"""Generate a deterministic trace ID.

Uses the workflow's deterministic random source when in a workflow context,
otherwise falls back to system random.

Returns:
A 128-bit trace ID that is guaranteed not to be INVALID_TRACE_ID.
"""
if workflow.in_workflow():
get_rand_bits = workflow.random().getrandbits
else:
import random

get_rand_bits = random.getrandbits
if len(self.traces) > 0:
return self.traces.pop()

trace_id = get_rand_bits(128)
while trace_id == INVALID_TRACE_ID:
trace_id = get_rand_bits(128)
return trace_id


class TemporalSpanProcessor(SimpleSpanProcessor):
"""A span processor that handles Temporal workflow replay semantics.

This processor ensures that spans are only exported when workflows actually
complete, not during intermediate replays. This is crucial for maintaining
correct telemetry data when using OpenAI agents within Temporal workflows.

Example usage:
from opentelemetry.sdk import trace as trace_sdk
from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter
from temporalio.contrib.openai_agents._temporal_trace_provider import TemporalIdGenerator
from temporalio.contrib.openai_agents._otel import TemporalSpanProcessor
from openinference.instrumentation.openai_agents import OpenAIAgentsInstrumentor

exporter = InMemorySpanExporter()
provider = trace_sdk.TracerProvider(id_generator=TemporalIdGenerator())
provider.add_span_processor(TemporalSpanProcessor(exporter))
OpenAIAgentsInstrumentor().instrument(tracer_provider=provider)
"""

def on_end(self, span: ReadableSpan) -> None:
"""Handle span end events, skipping export during workflow replay.

Args:
span: The span that has ended.
"""
if workflow.in_workflow() and workflow.unsafe.is_replaying():
# Skip exporting spans during workflow replay to avoid duplicate telemetry
return
super().on_end(span)
Loading
Loading