From 8a4323809620217fd72be3ff351ead72f561b0fd Mon Sep 17 00:00:00 2001 From: Prashant Vasudevan <71649489+vprashrex@users.noreply.github.com> Date: Wed, 21 Jan 2026 19:11:15 +0530 Subject: [PATCH 1/4] Langfuse: Add comprehensive error handling and tests - Added try-catch blocks to prevent Langfuse failures from blocking main flow - Ensure tracer is always initialized (disabled if no credentials) - Added graceful degradation for all tracer methods - Created comprehensive test suite covering all failure scenarios - Tests verify OpenAI responses work regardless of Langfuse state Fixes issue where Langfuse connection failures could block API responses. --- backend/app/core/langfuse/langfuse.py | 220 +++-- .../app/tests/core/test_langfuse/__init__.py | 0 .../app/tests/core/test_langfuse/conftest.py | 14 + .../test_langfuse/test_langfuse_tracer.py | 849 ++++++++++++++++++ 4 files changed, 1008 insertions(+), 75 deletions(-) create mode 100644 backend/app/tests/core/test_langfuse/__init__.py create mode 100644 backend/app/tests/core/test_langfuse/conftest.py create mode 100644 backend/app/tests/core/test_langfuse/test_langfuse_tracer.py diff --git a/backend/app/core/langfuse/langfuse.py b/backend/app/core/langfuse/langfuse.py index 287790131..1f3560a83 100644 --- a/backend/app/core/langfuse/langfuse.py +++ b/backend/app/core/langfuse/langfuse.py @@ -19,7 +19,7 @@ def __init__( response_id: Optional[str] = None, ): self.session_id = session_id or str(uuid.uuid4()) - self.langfuse: Optional[Langfuse] = None + self.langfuse: Langfuse = Langfuse(enabled=False) self.trace: Optional[StatefulTraceClient] = None self.generation: Optional[StatefulGenerationClient] = None @@ -29,28 +29,45 @@ def __init__( and "secret_key" in credentials and "host" in credentials ) + if not has_credentials: + logger.warning( + "[LangfuseTracer] Missing Langfuse credentials; tracing will be disabled" + ) + return - if has_credentials: + try: self.langfuse = Langfuse( public_key=credentials["public_key"], secret_key=credentials["secret_key"], host=credentials["host"], enabled=True, # This ensures the client is active ) + except Exception as e: + self.langfuse = Langfuse(enabled=False) + logger.error( + f"[LangfuseTracer] Error initializing Langfuse client: {e}", + exc_info=True, + ) + return - if response_id: + if response_id: + try: traces = self.langfuse.fetch_traces(tags=response_id).data if traces: self.session_id = traces[0].session_id - logger.info( - f"[LangfuseTracer] Langfuse tracing enabled | session_id={self.session_id}" - ) - else: - self.langfuse = Langfuse(enabled=False) - logger.warning( - "[LangfuseTracer] Langfuse tracing disabled due to missing credentials" - ) + logger.info( + f"[LangfuseTracer] Langfuse tracing enabled | session_id={self.session_id}" + ) + except Exception as e: + logger.warning( + f"[LangfuseTracer] Error fetching traces for response_id={response_id}: {e}", + exc_info=True, + ) + + logger.info( + f"[LangfuseTracer] Langfuse tracing initialized | session_id={self.session_id}" + ) def start_trace( self, @@ -59,16 +76,23 @@ def start_trace( metadata: Optional[Dict[str, Any]] = None, tags: list[str] | None = None, ): - metadata = metadata or {} - metadata["request_id"] = correlation_id.get() or "N/A" - - self.trace = self.langfuse.trace( - name=name, - input=input, - metadata=metadata, - session_id=self.session_id, - tags=tags, - ) + if not self.langfuse.enabled: + return + + try: + metadata = metadata or {} + metadata["request_id"] = correlation_id.get() or "N/A" + + self.trace = self.langfuse.trace( + name=name, + input=input, + metadata=metadata, + session_id=self.session_id, + tags=tags, + ) + except Exception as e: + logger.error(f"[LangfuseTracer] Failed to start trace: {e}") + self.trace = None def start_generation( self, @@ -76,14 +100,18 @@ def start_generation( input: Dict[str, Any], metadata: Optional[Dict[str, Any]] = None, ): - if not self.trace: + if not self.langfuse.enabled or not self.trace: return - self.generation = self.langfuse.generation( - name=name, - trace_id=self.trace.id, - input=input, - metadata=metadata or {}, - ) + try: + self.generation = self.langfuse.generation( + name=name, + trace_id=self.trace.id, + input=input, + metadata=metadata or {}, + ) + except Exception as e: + logger.error(f"[LangfuseTracer] Failed to start generation: {e}") + self.generation = None def end_generation( self, @@ -91,24 +119,44 @@ def end_generation( usage: Optional[Dict[str, Any]] = None, model: Optional[str] = None, ): - if self.generation: + if not self.langfuse.enabled or not self.generation: + return + + try: self.generation.end(output=output, usage=usage, model=model) + except Exception as e: + logger.error(f"[LangfuseTracer] Failed to end generation: {e}") def update_trace(self, tags: list[str], output: Dict[str, Any]): - if self.trace: - self.trace.update(tags=tags, output=output) + if not self.langfuse.enabled or not self.trace: + return + try: + if self.trace: + self.trace.update(tags=tags, output=output) + except Exception as e: + logger.error(f"[LangfuseTracer] Failed to update trace: {e}") def log_error(self, error_message: str, response_id: Optional[str] = None): - if self.generation: - self.generation.end(output={"error": error_message}) - if self.trace: - self.trace.update( - tags=[response_id] if response_id else [], - output={"status": "failure", "error": error_message}, - ) + if not self.langfuse.enabled: + return + try: + if self.generation: + self.generation.end(output={"error": error_message}) + if self.trace: + self.trace.update( + tags=[response_id] if response_id else [], + output={"status": "failure", "error": error_message}, + ) + except Exception as e: + logger.error(f"[LangfuseTracer] Failed to log error: {e}") def flush(self): - self.langfuse.flush() + if not self.langfuse.enabled: + return + try: + self.langfuse.flush() + except Exception as e: + logger.error(f"[LangfuseTracer] Failed to flush Langfuse client: {e}") def observe_llm_execution( @@ -138,6 +186,13 @@ def wrapper( logger.info("[Langfuse] No credentials - skipping observability") return func(completion_config, query, **kwargs) + def safe_langfuse_op(op: Callable, *args, **kwargs): + try: + return op(*args, **kwargs) + except Exception as e: + logger.warning(f"[Langfuse] Operation failed: {e}") + return None + try: langfuse = Langfuse( public_key=credentials.get("public_key"), @@ -148,17 +203,22 @@ def wrapper( logger.warning(f"[Langfuse] Failed to initialize client: {e}") return func(completion_config, query, **kwargs) - trace = langfuse.trace( + trace = safe_langfuse_op( + langfuse.trace, name="unified-llm-call", input=query.input, tags=[completion_config.provider], ) - generation = trace.generation( - name=f"{completion_config.provider}-completion", - input=query.input, - model=completion_config.params.get("model"), - ) + generation = None + + if trace: + generation = safe_langfuse_op( + trace.generation, + name=f"{completion_config.provider}-completion", + input=query.input, + model=completion_config.params.get("model"), + ) try: # Execute the actual LLM call @@ -167,44 +227,54 @@ def wrapper( response, error = func(completion_config, query, **kwargs) if response: - generation.end( - output={ - "status": "success", - "output": response.response.output.text, - }, - usage_details={ - "input": response.usage.input_tokens, - "output": response.usage.output_tokens, - }, - model=response.response.model, - ) + if generation: + safe_langfuse_op( + generation.end, + output={ + "status": "success", + "output": response.response.output.text, + }, + usage_details={ + "input": response.usage.input_tokens, + "output": response.usage.output_tokens, + }, + model=response.response.model, + ) - trace.update( - output={ - "status": "success", - "output": response.response.output.text, - }, - session_id=session_id or response.response.conversation_id, - ) + if trace: + safe_langfuse_op( + trace.update, + output={ + "status": "success", + "output": response.response.output.text, + }, + session_id=session_id or response.response.conversation_id, + ) else: error_msg = error or "Unknown error" - generation.end(output={"error": error_msg}) - trace.update( - output={"status": "failure", "error": error_msg}, - session_id=session_id, - ) + if generation: + safe_langfuse_op(generation.end, output={"error": error_msg}) + if trace: + safe_langfuse_op( + trace.update, + output={"status": "failure", "error": error_msg}, + session_id=session_id, + ) - langfuse.flush() + safe_langfuse_op(langfuse.flush) return response, error except Exception as e: error_msg = str(e) - generation.end(output={"error": error_msg}) - trace.update( - output={"status": "failure", "error": error_msg}, - session_id=session_id, - ) - langfuse.flush() + if generation: + safe_langfuse_op(generation.end, output={"error": error_msg}) + if trace: + safe_langfuse_op( + trace.update, + output={"status": "failure", "error": error_msg}, + session_id=session_id, + ) + safe_langfuse_op(langfuse.flush) raise return wrapper diff --git a/backend/app/tests/core/test_langfuse/__init__.py b/backend/app/tests/core/test_langfuse/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/backend/app/tests/core/test_langfuse/conftest.py b/backend/app/tests/core/test_langfuse/conftest.py new file mode 100644 index 000000000..9821f2467 --- /dev/null +++ b/backend/app/tests/core/test_langfuse/conftest.py @@ -0,0 +1,14 @@ +""" +Local conftest for LangfuseTracer unit tests. + +These tests don't require database access, so we override the session-scoped +seed_baseline fixture to skip database seeding. +""" + +import pytest + + +@pytest.fixture(scope="session", autouse=True) +def seed_baseline(): + """Override the global seed_baseline fixture to skip database seeding.""" + yield diff --git a/backend/app/tests/core/test_langfuse/test_langfuse_tracer.py b/backend/app/tests/core/test_langfuse/test_langfuse_tracer.py new file mode 100644 index 000000000..0ed3ec062 --- /dev/null +++ b/backend/app/tests/core/test_langfuse/test_langfuse_tracer.py @@ -0,0 +1,849 @@ +""" +Unit tests for LangfuseTracer class. + +Tests all scenarios to ensure: +1. OpenAI responses are ALWAYS generated regardless of Langfuse state +2. Langfuse failures never block the main execution flow +3. All edge cases are handled gracefully + +Scenarios covered: +- No credentials (None) +- Incomplete credentials (missing keys) +- Langfuse initialization exception +- fetch_traces failure (non-critical) +- Runtime exceptions in all methods +- Full working tracer +""" + +import pytest +from unittest.mock import MagicMock, patch +from uuid import uuid4 + +from openai import OpenAIError + +from app.core.langfuse.langfuse import LangfuseTracer +from app.models import Assistant, ResponsesAPIRequest +from app.services.response.response import generate_response, process_response + + +# ============================================================================= +# Fixtures +# ============================================================================= + + +@pytest.fixture +def valid_credentials() -> dict: + """Valid Langfuse credentials.""" + return { + "public_key": "pk-test-123", + "secret_key": "sk-test-456", + "host": "https://langfuse.example.com", + } + + +@pytest.fixture +def assistant_mock() -> Assistant: + """Mock assistant for generate_response tests.""" + return Assistant( + id=123, + assistant_id="asst_test123", + name="Test Assistant", + model="gpt-4", + temperature=0.7, + instructions="You are a helpful assistant.", + vector_store_ids=["vs1"], + max_num_results=5, + project_id=1, + organization_id=1, + ) + + +# ============================================================================= +# Test: LangfuseTracer.__init__ +# ============================================================================= + + +class TestLangfuseTracerInit: + """Tests for LangfuseTracer initialization scenarios.""" + + def test_init_no_credentials_disables_langfuse(self) -> None: + """Scenario #1: No credentials - tracer should be disabled.""" + tracer = LangfuseTracer(credentials=None) + + assert tracer.langfuse.enabled is False + assert tracer.trace is None + assert tracer.generation is None + + def test_init_empty_credentials_disables_langfuse(self) -> None: + """Empty dict credentials - tracer should be disabled.""" + tracer = LangfuseTracer(credentials={}) + + assert tracer.langfuse.enabled is False + + def test_init_missing_public_key_disables_langfuse(self) -> None: + """Missing public_key - tracer should be disabled.""" + tracer = LangfuseTracer( + credentials={ + "secret_key": "sk-test", + "host": "https://example.com", + } + ) + + assert tracer.langfuse.enabled is False + + def test_init_missing_secret_key_disables_langfuse(self) -> None: + """Missing secret_key - tracer should be disabled.""" + tracer = LangfuseTracer( + credentials={ + "public_key": "pk-test", + "host": "https://example.com", + } + ) + + assert tracer.langfuse.enabled is False + + def test_init_missing_host_disables_langfuse(self) -> None: + """Missing host - tracer should be disabled.""" + tracer = LangfuseTracer( + credentials={ + "public_key": "pk-test", + "secret_key": "sk-test", + } + ) + + assert tracer.langfuse.enabled is False + + @patch("app.core.langfuse.langfuse.Langfuse") + def test_init_langfuse_exception_disables_tracer( + self, mock_langfuse_class: MagicMock, valid_credentials: dict + ) -> None: + """Scenario #3: Langfuse init throws exception - tracer should be disabled.""" + disabled_mock = MagicMock() + disabled_mock.enabled = False + + # First call: default disabled, second call: raises exception, third: fallback disabled + mock_langfuse_class.side_effect = [ + disabled_mock, + Exception("Connection failed"), + disabled_mock, + ] + + tracer = LangfuseTracer(credentials=valid_credentials) + + assert tracer.langfuse.enabled is False + + @patch("app.core.langfuse.langfuse.Langfuse") + def test_init_fetch_traces_fails_but_tracer_still_enabled( + self, mock_langfuse_class: MagicMock, valid_credentials: dict + ) -> None: + """Scenario #4: fetch_traces fails - tracer should STILL be enabled.""" + enabled_mock = MagicMock() + enabled_mock.enabled = True + enabled_mock.fetch_traces.side_effect = Exception("Network error") + + disabled_mock = MagicMock() + disabled_mock.enabled = False + + mock_langfuse_class.side_effect = [disabled_mock, enabled_mock] + + tracer = LangfuseTracer(credentials=valid_credentials, response_id="resp-123") + + # Key assertion: tracer is STILL enabled despite fetch_traces failure + assert tracer.langfuse.enabled is True + + @patch("app.core.langfuse.langfuse.Langfuse") + def test_init_success_resumes_session_from_traces( + self, mock_langfuse_class: MagicMock, valid_credentials: dict + ) -> None: + """Successful init with existing traces - should resume session.""" + existing_trace = MagicMock() + existing_trace.session_id = "existing-session-456" + + enabled_mock = MagicMock() + enabled_mock.enabled = True + enabled_mock.fetch_traces.return_value.data = [existing_trace] + + disabled_mock = MagicMock() + disabled_mock.enabled = False + + mock_langfuse_class.side_effect = [disabled_mock, enabled_mock] + + tracer = LangfuseTracer(credentials=valid_credentials, response_id="resp-123") + + assert tracer.session_id == "existing-session-456" + + +# ============================================================================= +# Test: Methods when Langfuse is disabled +# ============================================================================= + + +class TestLangfuseTracerMethodsDisabled: + """Tests that all methods are no-ops when Langfuse is disabled.""" + + def test_start_trace_when_disabled_is_noop(self) -> None: + """start_trace should do nothing when disabled.""" + tracer = LangfuseTracer(credentials=None) + + # Should not raise any exception + tracer.start_trace(name="test", input={"question": "hello"}) + + assert tracer.trace is None + + def test_start_generation_when_disabled_is_noop(self) -> None: + """start_generation should do nothing when disabled.""" + tracer = LangfuseTracer(credentials=None) + + tracer.start_generation(name="test", input={"question": "hello"}) + + assert tracer.generation is None + + def test_end_generation_when_disabled_is_noop(self) -> None: + """end_generation should do nothing when disabled.""" + tracer = LangfuseTracer(credentials=None) + + # Should not raise any exception + tracer.end_generation(output={"response": "world"}) + + def test_update_trace_when_disabled_is_noop(self) -> None: + """update_trace should do nothing when disabled.""" + tracer = LangfuseTracer(credentials=None) + + # Should not raise any exception + tracer.update_trace(tags=["test"], output={"status": "success"}) + + def test_log_error_when_disabled_is_noop(self) -> None: + """log_error should do nothing when disabled.""" + tracer = LangfuseTracer(credentials=None) + + # Should not raise any exception + tracer.log_error(error_message="Test error", response_id="resp-123") + + def test_flush_when_disabled_is_noop(self) -> None: + """flush should do nothing when disabled.""" + tracer = LangfuseTracer(credentials=None) + + # Should not raise any exception + tracer.flush() + + +# ============================================================================= +# Test: Methods when Langfuse is enabled but operations fail +# ============================================================================= + + +class TestLangfuseTracerMethodsFailure: + """Tests that method failures are caught and don't propagate.""" + + @patch("app.core.langfuse.langfuse.Langfuse") + def test_start_trace_exception_is_caught( + self, mock_langfuse_class: MagicMock, valid_credentials: dict + ) -> None: + """start_trace exception should be caught, not propagated.""" + enabled_mock = MagicMock() + enabled_mock.enabled = True + enabled_mock.trace.side_effect = Exception("Trace creation failed") + + disabled_mock = MagicMock() + disabled_mock.enabled = False + + mock_langfuse_class.side_effect = [disabled_mock, enabled_mock] + + tracer = LangfuseTracer(credentials=valid_credentials) + + # Should NOT raise exception + tracer.start_trace(name="test", input={"q": "hello"}) + + assert tracer.trace is None + + @patch("app.core.langfuse.langfuse.Langfuse") + def test_start_generation_exception_is_caught( + self, mock_langfuse_class: MagicMock, valid_credentials: dict + ) -> None: + """start_generation exception should be caught, not propagated.""" + mock_trace = MagicMock(id="trace-123") + + enabled_mock = MagicMock() + enabled_mock.enabled = True + enabled_mock.trace.return_value = mock_trace + enabled_mock.generation.side_effect = Exception("Generation failed") + + disabled_mock = MagicMock() + disabled_mock.enabled = False + + mock_langfuse_class.side_effect = [disabled_mock, enabled_mock] + + tracer = LangfuseTracer(credentials=valid_credentials) + tracer.start_trace(name="test", input={"q": "hello"}) + + # Should NOT raise exception + tracer.start_generation(name="gen", input={"q": "hello"}) + + assert tracer.generation is None + + @patch("app.core.langfuse.langfuse.Langfuse") + def test_end_generation_exception_is_caught( + self, mock_langfuse_class: MagicMock, valid_credentials: dict + ) -> None: + """end_generation exception should be caught, not propagated.""" + mock_trace = MagicMock(id="trace-123") + mock_generation = MagicMock() + mock_generation.end.side_effect = Exception("End failed") + + enabled_mock = MagicMock() + enabled_mock.enabled = True + enabled_mock.trace.return_value = mock_trace + enabled_mock.generation.return_value = mock_generation + + disabled_mock = MagicMock() + disabled_mock.enabled = False + + mock_langfuse_class.side_effect = [disabled_mock, enabled_mock] + + tracer = LangfuseTracer(credentials=valid_credentials) + tracer.start_trace(name="test", input={"q": "hello"}) + tracer.start_generation(name="gen", input={"q": "hello"}) + + # Should NOT raise exception + tracer.end_generation(output={"response": "world"}) + + @patch("app.core.langfuse.langfuse.Langfuse") + def test_update_trace_exception_is_caught( + self, mock_langfuse_class: MagicMock, valid_credentials: dict + ) -> None: + """update_trace exception should be caught, not propagated.""" + mock_trace = MagicMock(id="trace-123") + mock_trace.update.side_effect = Exception("Update failed") + + enabled_mock = MagicMock() + enabled_mock.enabled = True + enabled_mock.trace.return_value = mock_trace + + disabled_mock = MagicMock() + disabled_mock.enabled = False + + mock_langfuse_class.side_effect = [disabled_mock, enabled_mock] + + tracer = LangfuseTracer(credentials=valid_credentials) + tracer.start_trace(name="test", input={"q": "hello"}) + + # Should NOT raise exception + tracer.update_trace(tags=["test"], output={"status": "success"}) + + @patch("app.core.langfuse.langfuse.Langfuse") + def test_flush_exception_is_caught( + self, mock_langfuse_class: MagicMock, valid_credentials: dict + ) -> None: + """flush exception should be caught, not propagated.""" + enabled_mock = MagicMock() + enabled_mock.enabled = True + enabled_mock.flush.side_effect = Exception("Flush failed") + + disabled_mock = MagicMock() + disabled_mock.enabled = False + + mock_langfuse_class.side_effect = [disabled_mock, enabled_mock] + + tracer = LangfuseTracer(credentials=valid_credentials) + + # Should NOT raise exception + tracer.flush() + + @patch("app.core.langfuse.langfuse.Langfuse") + def test_log_error_exception_is_caught( + self, mock_langfuse_class: MagicMock, valid_credentials: dict + ) -> None: + """log_error exception should be caught, not propagated.""" + mock_trace = MagicMock(id="trace-123") + mock_trace.update.side_effect = Exception("Log error failed") + mock_generation = MagicMock() + mock_generation.end.side_effect = Exception("End failed") + + enabled_mock = MagicMock() + enabled_mock.enabled = True + enabled_mock.trace.return_value = mock_trace + enabled_mock.generation.return_value = mock_generation + + disabled_mock = MagicMock() + disabled_mock.enabled = False + + mock_langfuse_class.side_effect = [disabled_mock, enabled_mock] + + tracer = LangfuseTracer(credentials=valid_credentials) + tracer.start_trace(name="test", input={"q": "hello"}) + tracer.start_generation(name="gen", input={"q": "hello"}) + + # Should NOT raise exception + tracer.log_error(error_message="Test error", response_id="resp-123") + + +# ============================================================================= +# Test: Integration with generate_response +# ============================================================================= + + +class TestGenerateResponseWithTracer: + """Tests that generate_response works in all tracer scenarios.""" + + def test_generate_response_with_no_credentials( + self, assistant_mock: Assistant + ) -> None: + """Scenario #1: No credentials - OpenAI should still work.""" + mock_client = MagicMock() + tracer = LangfuseTracer(credentials=None) + + request = ResponsesAPIRequest( + assistant_id="asst_test123", + question="What is 2+2?", + ) + + response, error = generate_response( + tracer=tracer, + client=mock_client, + assistant=assistant_mock, + request=request, + ancestor_id=None, + ) + + # OpenAI client was called despite disabled tracer + mock_client.responses.create.assert_called_once() + assert error is None + + def test_generate_response_with_incomplete_credentials( + self, assistant_mock: Assistant + ) -> None: + """Scenario #2: Incomplete credentials - OpenAI should still work.""" + mock_client = MagicMock() + tracer = LangfuseTracer(credentials={"incomplete": True}) + + request = ResponsesAPIRequest( + assistant_id="asst_test123", + question="What is 2+2?", + ) + + response, error = generate_response( + tracer=tracer, + client=mock_client, + assistant=assistant_mock, + request=request, + ancestor_id=None, + ) + + mock_client.responses.create.assert_called_once() + assert error is None + + def test_generate_response_openai_error_with_disabled_tracer( + self, assistant_mock: Assistant + ) -> None: + """OpenAI error with disabled tracer - should handle gracefully.""" + mock_client = MagicMock() + mock_client.responses.create.side_effect = OpenAIError("API failed") + + tracer = LangfuseTracer(credentials=None) + + request = ResponsesAPIRequest( + assistant_id="asst_test123", + question="What is 2+2?", + ) + + response, error = generate_response( + tracer=tracer, + client=mock_client, + assistant=assistant_mock, + request=request, + ancestor_id=None, + ) + + assert response is None + assert error is not None + assert "API failed" in error + + +# ============================================================================= +# Test: Successful tracing flow +# ============================================================================= + + +class TestLangfuseTracerSuccess: + """Tests for successful tracer operations.""" + + @patch("app.core.langfuse.langfuse.Langfuse") + def test_full_tracing_flow_success( + self, mock_langfuse_class: MagicMock, valid_credentials: dict + ) -> None: + """Test complete tracing flow when everything works.""" + mock_trace = MagicMock(id="trace-123") + mock_generation = MagicMock() + + enabled_mock = MagicMock() + enabled_mock.enabled = True + enabled_mock.trace.return_value = mock_trace + enabled_mock.generation.return_value = mock_generation + + disabled_mock = MagicMock() + disabled_mock.enabled = False + + mock_langfuse_class.side_effect = [disabled_mock, enabled_mock] + + tracer = LangfuseTracer(credentials=valid_credentials) + + # Full flow + tracer.start_trace(name="test", input={"q": "hello"}) + tracer.start_generation(name="gen", input={"q": "hello"}) + tracer.end_generation( + output={"response": "world"}, + usage={"input": 10, "output": 20, "total": 30}, + model="gpt-4", + ) + tracer.update_trace(tags=["resp-123"], output={"status": "success"}) + tracer.flush() + + # Verify all methods were called + enabled_mock.trace.assert_called_once() + enabled_mock.generation.assert_called_once() + mock_generation.end.assert_called_once() + mock_trace.update.assert_called_once() + enabled_mock.flush.assert_called_once() + + @patch("app.core.langfuse.langfuse.Langfuse") + def test_start_generation_without_trace_is_noop( + self, mock_langfuse_class: MagicMock, valid_credentials: dict + ) -> None: + """start_generation should be no-op if trace doesn't exist.""" + enabled_mock = MagicMock() + enabled_mock.enabled = True + enabled_mock.trace.side_effect = Exception("Trace failed") + + disabled_mock = MagicMock() + disabled_mock.enabled = False + + mock_langfuse_class.side_effect = [disabled_mock, enabled_mock] + + tracer = LangfuseTracer(credentials=valid_credentials) + tracer.start_trace(name="test", input={"q": "hello"}) # This fails + + assert tracer.trace is None + + # This should be a no-op since trace is None + tracer.start_generation(name="gen", input={"q": "hello"}) + + assert tracer.generation is None + # generation method should not be called since trace is None + enabled_mock.generation.assert_not_called() + + @patch("app.core.langfuse.langfuse.Langfuse") + def test_end_generation_without_generation_is_noop( + self, mock_langfuse_class: MagicMock, valid_credentials: dict + ) -> None: + """end_generation should be no-op if generation doesn't exist.""" + mock_trace = MagicMock(id="trace-123") + + enabled_mock = MagicMock() + enabled_mock.enabled = True + enabled_mock.trace.return_value = mock_trace + enabled_mock.generation.side_effect = Exception("Generation failed") + + disabled_mock = MagicMock() + disabled_mock.enabled = False + + mock_langfuse_class.side_effect = [disabled_mock, enabled_mock] + + tracer = LangfuseTracer(credentials=valid_credentials) + tracer.start_trace(name="test", input={"q": "hello"}) + tracer.start_generation(name="gen", input={"q": "hello"}) # This fails + + assert tracer.generation is None + + # This should be a no-op since generation is None + tracer.end_generation(output={"response": "world"}) + # No exception should be raised + + +# ============================================================================= +# Test: generate_response with ENABLED tracer +# ============================================================================= + + +class TestGenerateResponseWithEnabledTracer: + """Tests for generate_response with ENABLED tracer.""" + + @patch("app.core.langfuse.langfuse.Langfuse") + def test_generate_response_openai_error_with_enabled_tracer( + self, + mock_langfuse_class: MagicMock, + valid_credentials: dict, + assistant_mock: Assistant, + ) -> None: + """OpenAI error with enabled tracer - tracer.log_error should be called.""" + mock_trace = MagicMock(id="trace-123") + mock_generation = MagicMock() + + enabled_mock = MagicMock() + enabled_mock.enabled = True + enabled_mock.trace.return_value = mock_trace + enabled_mock.generation.return_value = mock_generation + + disabled_mock = MagicMock() + disabled_mock.enabled = False + + mock_langfuse_class.side_effect = [disabled_mock, enabled_mock] + + mock_client = MagicMock() + mock_client.responses.create.side_effect = OpenAIError("API failed") + + tracer = LangfuseTracer(credentials=valid_credentials) + + request = ResponsesAPIRequest( + assistant_id="asst_test123", + question="What is 2+2?", + ) + + response, error = generate_response( + tracer=tracer, + client=mock_client, + assistant=assistant_mock, + request=request, + ancestor_id=None, + ) + + # OpenAI failed + assert response is None + assert error is not None + assert "API failed" in error + + # Tracer methods were called before the error + enabled_mock.trace.assert_called_once() + enabled_mock.generation.assert_called_once() + + @patch("app.core.langfuse.langfuse.Langfuse") + def test_generate_response_success_with_enabled_tracer( + self, + mock_langfuse_class: MagicMock, + valid_credentials: dict, + assistant_mock: Assistant, + ) -> None: + """Full success flow with enabled tracer.""" + mock_trace = MagicMock(id="trace-123") + mock_generation = MagicMock() + + enabled_mock = MagicMock() + enabled_mock.enabled = True + enabled_mock.trace.return_value = mock_trace + enabled_mock.generation.return_value = mock_generation + + disabled_mock = MagicMock() + disabled_mock.enabled = False + + mock_langfuse_class.side_effect = [disabled_mock, enabled_mock] + + # Mock successful OpenAI response + mock_response = MagicMock() + mock_response.id = "resp-456" + mock_response.output_text = "The answer is 4" + mock_response.model = "gpt-4" + mock_response.usage.input_tokens = 10 + mock_response.usage.output_tokens = 5 + mock_response.usage.total_tokens = 15 + + mock_client = MagicMock() + mock_client.responses.create.return_value = mock_response + + tracer = LangfuseTracer(credentials=valid_credentials) + + request = ResponsesAPIRequest( + assistant_id="asst_test123", + question="What is 2+2?", + ) + + response, error = generate_response( + tracer=tracer, + client=mock_client, + assistant=assistant_mock, + request=request, + ancestor_id=None, + ) + + # OpenAI succeeded + assert response is not None + assert error is None + + # All tracer methods were called + enabled_mock.trace.assert_called_once() + enabled_mock.generation.assert_called_once() + mock_generation.end.assert_called_once() + mock_trace.update.assert_called_once() + + +# ============================================================================= +# Test: Integration tests for process_response +# ============================================================================= + + +class TestProcessResponseIntegration: + """Integration tests for process_response with various tracer scenarios.""" + + @patch("app.services.response.response.persist_conversation") + @patch("app.services.response.response.get_conversation_by_ancestor_id") + @patch("app.services.response.response.Session") + @patch("app.services.response.response.get_openai_client") + @patch("app.services.response.response.get_assistant_by_id") + @patch("app.services.response.response.get_provider_credential") + @patch("app.services.response.response.JobCrud") + def test_process_response_with_no_langfuse_credentials( + self, + mock_job_crud: MagicMock, + mock_get_credential: MagicMock, + mock_get_assistant: MagicMock, + mock_get_client: MagicMock, + mock_session: MagicMock, + mock_get_conversation: MagicMock, + mock_persist: MagicMock, + assistant_mock: Assistant, + ) -> None: + """process_response works when langfuse_credentials is None.""" + # Setup mocks + mock_get_credential.return_value = None # No Langfuse credentials + mock_get_assistant.return_value = assistant_mock + mock_get_conversation.return_value = None + + mock_response = MagicMock() + mock_response.id = "resp-123" + mock_response.output_text = "Answer" + mock_response.model = "gpt-4" + mock_response.usage.input_tokens = 10 + mock_response.usage.output_tokens = 5 + mock_response.usage.total_tokens = 15 + mock_response.previous_response_id = None + + mock_client = MagicMock() + mock_client.responses.create.return_value = mock_response + mock_get_client.return_value = mock_client + + request = ResponsesAPIRequest( + assistant_id="asst_test123", + question="Test question", + ) + + result = process_response( + request=request, + project_id=1, + organization_id=1, + job_id=uuid4(), + task_id="task-123", + task_instance=None, + ) + + # OpenAI was called + mock_client.responses.create.assert_called_once() + # Response succeeded + assert result.success is True + + @patch("app.services.response.response.persist_conversation") + @patch("app.services.response.response.get_conversation_by_ancestor_id") + @patch("app.services.response.response.Session") + @patch("app.services.response.response.get_openai_client") + @patch("app.services.response.response.get_assistant_by_id") + @patch("app.services.response.response.get_provider_credential") + @patch("app.services.response.response.JobCrud") + @patch("app.core.langfuse.langfuse.Langfuse") + def test_process_response_with_langfuse_init_failure( + self, + mock_langfuse_class: MagicMock, + mock_job_crud: MagicMock, + mock_get_credential: MagicMock, + mock_get_assistant: MagicMock, + mock_get_client: MagicMock, + mock_session: MagicMock, + mock_get_conversation: MagicMock, + mock_persist: MagicMock, + assistant_mock: Assistant, + valid_credentials: dict, + ) -> None: + """process_response works even when Langfuse init fails.""" + # Langfuse init fails + disabled_mock = MagicMock() + disabled_mock.enabled = False + mock_langfuse_class.side_effect = [ + disabled_mock, # Default disabled + Exception("Langfuse connection failed"), # Init fails + disabled_mock, # Fallback disabled + ] + + mock_get_credential.return_value = valid_credentials + mock_get_assistant.return_value = assistant_mock + mock_get_conversation.return_value = None + + mock_response = MagicMock() + mock_response.id = "resp-123" + mock_response.output_text = "Answer" + mock_response.model = "gpt-4" + mock_response.usage.input_tokens = 10 + mock_response.usage.output_tokens = 5 + mock_response.usage.total_tokens = 15 + mock_response.previous_response_id = None + + mock_client = MagicMock() + mock_client.responses.create.return_value = mock_response + mock_get_client.return_value = mock_client + + request = ResponsesAPIRequest( + assistant_id="asst_test123", + question="Test question", + ) + + result = process_response( + request=request, + project_id=1, + organization_id=1, + job_id=uuid4(), + task_id="task-123", + task_instance=None, + ) + + # OpenAI was still called despite Langfuse failure + mock_client.responses.create.assert_called_once() + assert result.success is True + + @patch("app.services.response.response.Session") + @patch("app.services.response.response.get_openai_client") + @patch("app.services.response.response.get_assistant_by_id") + @patch("app.services.response.response.get_provider_credential") + @patch("app.services.response.response.JobCrud") + @patch("app.services.response.response._fail_job") + def test_process_response_openai_error_with_disabled_tracer( + self, + mock_fail_job: MagicMock, + mock_job_crud: MagicMock, + mock_get_credential: MagicMock, + mock_get_assistant: MagicMock, + mock_get_client: MagicMock, + mock_session: MagicMock, + assistant_mock: Assistant, + ) -> None: + """process_response handles OpenAI error with disabled tracer.""" + mock_get_credential.return_value = None # No Langfuse + mock_get_assistant.return_value = assistant_mock + + mock_client = MagicMock() + mock_client.responses.create.side_effect = OpenAIError("API failed") + mock_get_client.return_value = mock_client + + # Mock _fail_job to return a failure response + mock_fail_job.return_value = MagicMock(success=False, error="API failed") + + request = ResponsesAPIRequest( + assistant_id="asst_test123", + question="Test question", + ) + + result = process_response( + request=request, + project_id=1, + organization_id=1, + job_id=uuid4(), + task_id="task-123", + task_instance=None, + ) + + # Response failed gracefully + assert result.success is False From a4c0dbdbebe3732c0e167274edf4ac3fb8c52990 Mon Sep 17 00:00:00 2001 From: Prashant Vasudevan <71649489+vprashrex@users.noreply.github.com> Date: Fri, 23 Jan 2026 22:12:39 +0530 Subject: [PATCH 2/4] Langfuse: Refactor error handling to use wrapper function Address PR review feedback - replace multiple try/catch blocks with single _langfuse_call wrapper that auto-disables on first failure. --- backend/app/core/langfuse/langfuse.py | 265 +++++------- .../test_langfuse/test_langfuse_tracer.py | 403 +++--------------- 2 files changed, 183 insertions(+), 485 deletions(-) diff --git a/backend/app/core/langfuse/langfuse.py b/backend/app/core/langfuse/langfuse.py index 1f3560a83..d78b64464 100644 --- a/backend/app/core/langfuse/langfuse.py +++ b/backend/app/core/langfuse/langfuse.py @@ -19,9 +19,10 @@ def __init__( response_id: Optional[str] = None, ): self.session_id = session_id or str(uuid.uuid4()) - self.langfuse: Langfuse = Langfuse(enabled=False) + self.langfuse: Optional[Langfuse] = None self.trace: Optional[StatefulTraceClient] = None self.generation: Optional[StatefulGenerationClient] = None + self._failed = False has_credentials = ( credentials @@ -29,45 +30,44 @@ def __init__( and "secret_key" in credentials and "host" in credentials ) - if not has_credentials: + + if has_credentials: + try: + self.langfuse = Langfuse( + public_key=credentials["public_key"], + secret_key=credentials["secret_key"], + host=credentials["host"], + enabled=True, # This ensures the client is active + ) + except Exception: + self._failed = True + return + + if response_id: + try: + traces = self.langfuse.fetch_traces(tags=response_id).data + if traces: + self.session_id = traces[0].session_id + except Exception: + pass # Non-critical: session resume is optional + + logger.info( + f"[LangfuseTracer] Langfuse tracing enabled | session_id={self.session_id}" + ) + else: logger.warning( - "[LangfuseTracer] Missing Langfuse credentials; tracing will be disabled" + "[LangfuseTracer] Langfuse tracing disabled due to missing credentials" ) - return + def _langfuse_call(self, fn, *args, **kwargs): + if self._failed: + return None try: - self.langfuse = Langfuse( - public_key=credentials["public_key"], - secret_key=credentials["secret_key"], - host=credentials["host"], - enabled=True, # This ensures the client is active - ) + return fn(*args, **kwargs) except Exception as e: - self.langfuse = Langfuse(enabled=False) - logger.error( - f"[LangfuseTracer] Error initializing Langfuse client: {e}", - exc_info=True, - ) - return - - if response_id: - try: - traces = self.langfuse.fetch_traces(tags=response_id).data - if traces: - self.session_id = traces[0].session_id - - logger.info( - f"[LangfuseTracer] Langfuse tracing enabled | session_id={self.session_id}" - ) - except Exception as e: - logger.warning( - f"[LangfuseTracer] Error fetching traces for response_id={response_id}: {e}", - exc_info=True, - ) - - logger.info( - f"[LangfuseTracer] Langfuse tracing initialized | session_id={self.session_id}" - ) + logger.warning(f"[LangfuseTracer] {getattr(fn, '__name__', 'operation')} failed: {e}") + self._failed = True + return None def start_trace( self, @@ -76,23 +76,18 @@ def start_trace( metadata: Optional[Dict[str, Any]] = None, tags: list[str] | None = None, ): - if not self.langfuse.enabled: + if self._failed or not self.langfuse: return - - try: - metadata = metadata or {} - metadata["request_id"] = correlation_id.get() or "N/A" - - self.trace = self.langfuse.trace( - name=name, - input=input, - metadata=metadata, - session_id=self.session_id, - tags=tags, - ) - except Exception as e: - logger.error(f"[LangfuseTracer] Failed to start trace: {e}") - self.trace = None + metadata = metadata or {} + metadata["request_id"] = correlation_id.get() or "N/A" + self.trace = self._langfuse_call( + self.langfuse.trace, + name=name, + input=input, + metadata=metadata, + session_id=self.session_id, + tags=tags, + ) def start_generation( self, @@ -100,18 +95,15 @@ def start_generation( input: Dict[str, Any], metadata: Optional[Dict[str, Any]] = None, ): - if not self.langfuse.enabled or not self.trace: + if self._failed or not self.trace: return - try: - self.generation = self.langfuse.generation( - name=name, - trace_id=self.trace.id, - input=input, - metadata=metadata or {}, - ) - except Exception as e: - logger.error(f"[LangfuseTracer] Failed to start generation: {e}") - self.generation = None + self.generation = self._langfuse_call( + self.langfuse.generation, + name=name, + trace_id=self.trace.id, + input=input, + metadata=metadata or {}, + ) def end_generation( self, @@ -119,44 +111,31 @@ def end_generation( usage: Optional[Dict[str, Any]] = None, model: Optional[str] = None, ): - if not self.langfuse.enabled or not self.generation: + if self._failed or not self.generation: return - - try: - self.generation.end(output=output, usage=usage, model=model) - except Exception as e: - logger.error(f"[LangfuseTracer] Failed to end generation: {e}") + self._langfuse_call(self.generation.end, output=output, usage=usage, model=model) def update_trace(self, tags: list[str], output: Dict[str, Any]): - if not self.langfuse.enabled or not self.trace: + if self._failed or not self.trace: return - try: - if self.trace: - self.trace.update(tags=tags, output=output) - except Exception as e: - logger.error(f"[LangfuseTracer] Failed to update trace: {e}") + self._langfuse_call(self.trace.update, tags=tags, output=output) def log_error(self, error_message: str, response_id: Optional[str] = None): - if not self.langfuse.enabled: + if self._failed: return - try: - if self.generation: - self.generation.end(output={"error": error_message}) - if self.trace: - self.trace.update( - tags=[response_id] if response_id else [], - output={"status": "failure", "error": error_message}, - ) - except Exception as e: - logger.error(f"[LangfuseTracer] Failed to log error: {e}") + if self.generation: + self._langfuse_call(self.generation.end, output={"error": error_message}) + if self.trace: + self._langfuse_call( + self.trace.update, + tags=[response_id] if response_id else [], + output={"status": "failure", "error": error_message}, + ) def flush(self): - if not self.langfuse.enabled: + if self._failed or not self.langfuse: return - try: - self.langfuse.flush() - except Exception as e: - logger.error(f"[LangfuseTracer] Failed to flush Langfuse client: {e}") + self._langfuse_call(self.langfuse.flush) def observe_llm_execution( @@ -186,24 +165,34 @@ def wrapper( logger.info("[Langfuse] No credentials - skipping observability") return func(completion_config, query, **kwargs) - def safe_langfuse_op(op: Callable, *args, **kwargs): - try: - return op(*args, **kwargs) - except Exception as e: - logger.warning(f"[Langfuse] Operation failed: {e}") - return None - try: langfuse = Langfuse( public_key=credentials.get("public_key"), secret_key=credentials.get("secret_key"), host=credentials.get("host"), ) + logger.info( + f"[Langfuse] Tracing enabled | session_id={session_id or 'auto'}" + ) except Exception as e: logger.warning(f"[Langfuse] Failed to initialize client: {e}") return func(completion_config, query, **kwargs) - trace = safe_langfuse_op( + failed = False + + def langfuse_call(fn, *args, **kwargs): + """Execute Langfuse operation safely. First failure disables further calls.""" + nonlocal failed + if failed: + return None + try: + return fn(*args, **kwargs) + except Exception as e: + logger.warning(f"[Langfuse] {getattr(fn, '__name__', 'operation')} failed: {e}") + failed = True + return None + + trace = langfuse_call( langfuse.trace, name="unified-llm-call", input=query.input, @@ -211,71 +200,49 @@ def safe_langfuse_op(op: Callable, *args, **kwargs): ) generation = None - if trace: - generation = safe_langfuse_op( + generation = langfuse_call( trace.generation, name=f"{completion_config.provider}-completion", input=query.input, model=completion_config.params.get("model"), ) - try: - # Execute the actual LLM call - response: LLMCallResponse | None - error: str | None - response, error = func(completion_config, query, **kwargs) - - if response: - if generation: - safe_langfuse_op( - generation.end, - output={ - "status": "success", - "output": response.response.output.text, - }, - usage_details={ - "input": response.usage.input_tokens, - "output": response.usage.output_tokens, - }, - model=response.response.model, - ) - - if trace: - safe_langfuse_op( - trace.update, - output={ - "status": "success", - "output": response.response.output.text, - }, - session_id=session_id or response.response.conversation_id, - ) - else: - error_msg = error or "Unknown error" - if generation: - safe_langfuse_op(generation.end, output={"error": error_msg}) - if trace: - safe_langfuse_op( - trace.update, - output={"status": "failure", "error": error_msg}, - session_id=session_id, - ) + # Execute the actual LLM call + response: LLMCallResponse | None + error: str | None + response, error = func(completion_config, query, **kwargs) - safe_langfuse_op(langfuse.flush) - return response, error - - except Exception as e: - error_msg = str(e) + if response: + if generation: + langfuse_call( + generation.end, + output={"status": "success", "output": response.response.output.text}, + usage_details={ + "input": response.usage.input_tokens, + "output": response.usage.output_tokens, + }, + model=response.response.model, + ) + if trace: + langfuse_call( + trace.update, + output={"status": "success", "output": response.response.output.text}, + session_id=session_id or response.response.conversation_id, + ) + else: + error_msg = error or "Unknown error" if generation: - safe_langfuse_op(generation.end, output={"error": error_msg}) + langfuse_call(generation.end, output={"error": error_msg}) if trace: - safe_langfuse_op( + langfuse_call( trace.update, output={"status": "failure", "error": error_msg}, session_id=session_id, ) - safe_langfuse_op(langfuse.flush) - raise + + langfuse_call(langfuse.flush) + return response, error return wrapper diff --git a/backend/app/tests/core/test_langfuse/test_langfuse_tracer.py b/backend/app/tests/core/test_langfuse/test_langfuse_tracer.py index 0ed3ec062..552a1f698 100644 --- a/backend/app/tests/core/test_langfuse/test_langfuse_tracer.py +++ b/backend/app/tests/core/test_langfuse/test_langfuse_tracer.py @@ -1,19 +1,4 @@ -""" -Unit tests for LangfuseTracer class. - -Tests all scenarios to ensure: -1. OpenAI responses are ALWAYS generated regardless of Langfuse state -2. Langfuse failures never block the main execution flow -3. All edge cases are handled gracefully - -Scenarios covered: -- No credentials (None) -- Incomplete credentials (missing keys) -- Langfuse initialization exception -- fetch_traces failure (non-critical) -- Runtime exceptions in all methods -- Full working tracer -""" +"""Unit tests for LangfuseTracer class.""" import pytest from unittest.mock import MagicMock, patch @@ -26,14 +11,8 @@ from app.services.response.response import generate_response, process_response -# ============================================================================= -# Fixtures -# ============================================================================= - - @pytest.fixture def valid_credentials() -> dict: - """Valid Langfuse credentials.""" return { "public_key": "pk-test-123", "secret_key": "sk-test-456", @@ -43,7 +22,6 @@ def valid_credentials() -> dict: @pytest.fixture def assistant_mock() -> Assistant: - """Mock assistant for generate_response tests.""" return Assistant( id=123, assistant_id="asst_test123", @@ -58,180 +36,100 @@ def assistant_mock() -> Assistant: ) -# ============================================================================= -# Test: LangfuseTracer.__init__ -# ============================================================================= - - class TestLangfuseTracerInit: - """Tests for LangfuseTracer initialization scenarios.""" + """Tests for LangfuseTracer initialization.""" - def test_init_no_credentials_disables_langfuse(self) -> None: - """Scenario #1: No credentials - tracer should be disabled.""" + def test_no_credentials_sets_langfuse_to_none(self) -> None: tracer = LangfuseTracer(credentials=None) - - assert tracer.langfuse.enabled is False + assert tracer.langfuse is None assert tracer.trace is None assert tracer.generation is None - def test_init_empty_credentials_disables_langfuse(self) -> None: - """Empty dict credentials - tracer should be disabled.""" + def test_empty_credentials_sets_langfuse_to_none(self) -> None: tracer = LangfuseTracer(credentials={}) + assert tracer.langfuse is None - assert tracer.langfuse.enabled is False + def test_missing_public_key_sets_langfuse_to_none(self) -> None: + tracer = LangfuseTracer(credentials={"secret_key": "sk", "host": "https://x.com"}) + assert tracer.langfuse is None - def test_init_missing_public_key_disables_langfuse(self) -> None: - """Missing public_key - tracer should be disabled.""" - tracer = LangfuseTracer( - credentials={ - "secret_key": "sk-test", - "host": "https://example.com", - } - ) + def test_missing_secret_key_sets_langfuse_to_none(self) -> None: + tracer = LangfuseTracer(credentials={"public_key": "pk", "host": "https://x.com"}) + assert tracer.langfuse is None - assert tracer.langfuse.enabled is False - - def test_init_missing_secret_key_disables_langfuse(self) -> None: - """Missing secret_key - tracer should be disabled.""" - tracer = LangfuseTracer( - credentials={ - "public_key": "pk-test", - "host": "https://example.com", - } - ) - - assert tracer.langfuse.enabled is False - - def test_init_missing_host_disables_langfuse(self) -> None: - """Missing host - tracer should be disabled.""" - tracer = LangfuseTracer( - credentials={ - "public_key": "pk-test", - "secret_key": "sk-test", - } - ) - - assert tracer.langfuse.enabled is False + def test_missing_host_sets_langfuse_to_none(self) -> None: + tracer = LangfuseTracer(credentials={"public_key": "pk", "secret_key": "sk"}) + assert tracer.langfuse is None @patch("app.core.langfuse.langfuse.Langfuse") - def test_init_langfuse_exception_disables_tracer( + def test_langfuse_exception_sets_langfuse_to_none( self, mock_langfuse_class: MagicMock, valid_credentials: dict ) -> None: - """Scenario #3: Langfuse init throws exception - tracer should be disabled.""" - disabled_mock = MagicMock() - disabled_mock.enabled = False - - # First call: default disabled, second call: raises exception, third: fallback disabled - mock_langfuse_class.side_effect = [ - disabled_mock, - Exception("Connection failed"), - disabled_mock, - ] - + mock_langfuse_class.side_effect = Exception("Connection failed") tracer = LangfuseTracer(credentials=valid_credentials) - - assert tracer.langfuse.enabled is False + assert tracer.langfuse is None @patch("app.core.langfuse.langfuse.Langfuse") - def test_init_fetch_traces_fails_but_tracer_still_enabled( + def test_fetch_traces_failure_keeps_tracer_enabled( self, mock_langfuse_class: MagicMock, valid_credentials: dict ) -> None: - """Scenario #4: fetch_traces fails - tracer should STILL be enabled.""" enabled_mock = MagicMock() enabled_mock.enabled = True enabled_mock.fetch_traces.side_effect = Exception("Network error") - - disabled_mock = MagicMock() - disabled_mock.enabled = False - - mock_langfuse_class.side_effect = [disabled_mock, enabled_mock] + mock_langfuse_class.return_value = enabled_mock tracer = LangfuseTracer(credentials=valid_credentials, response_id="resp-123") - # Key assertion: tracer is STILL enabled despite fetch_traces failure + assert tracer.langfuse is not None assert tracer.langfuse.enabled is True @patch("app.core.langfuse.langfuse.Langfuse") - def test_init_success_resumes_session_from_traces( + def test_resumes_session_from_existing_traces( self, mock_langfuse_class: MagicMock, valid_credentials: dict ) -> None: - """Successful init with existing traces - should resume session.""" existing_trace = MagicMock() existing_trace.session_id = "existing-session-456" enabled_mock = MagicMock() enabled_mock.enabled = True enabled_mock.fetch_traces.return_value.data = [existing_trace] - - disabled_mock = MagicMock() - disabled_mock.enabled = False - - mock_langfuse_class.side_effect = [disabled_mock, enabled_mock] + mock_langfuse_class.return_value = enabled_mock tracer = LangfuseTracer(credentials=valid_credentials, response_id="resp-123") assert tracer.session_id == "existing-session-456" -# ============================================================================= -# Test: Methods when Langfuse is disabled -# ============================================================================= - - class TestLangfuseTracerMethodsDisabled: - """Tests that all methods are no-ops when Langfuse is disabled.""" + """Tests that methods are no-ops when Langfuse is disabled.""" - def test_start_trace_when_disabled_is_noop(self) -> None: - """start_trace should do nothing when disabled.""" + def test_start_trace_is_noop(self) -> None: tracer = LangfuseTracer(credentials=None) - - # Should not raise any exception tracer.start_trace(name="test", input={"question": "hello"}) - assert tracer.trace is None - def test_start_generation_when_disabled_is_noop(self) -> None: - """start_generation should do nothing when disabled.""" + def test_start_generation_is_noop(self) -> None: tracer = LangfuseTracer(credentials=None) - tracer.start_generation(name="test", input={"question": "hello"}) - assert tracer.generation is None - def test_end_generation_when_disabled_is_noop(self) -> None: - """end_generation should do nothing when disabled.""" + def test_end_generation_is_noop(self) -> None: tracer = LangfuseTracer(credentials=None) - - # Should not raise any exception tracer.end_generation(output={"response": "world"}) - def test_update_trace_when_disabled_is_noop(self) -> None: - """update_trace should do nothing when disabled.""" + def test_update_trace_is_noop(self) -> None: tracer = LangfuseTracer(credentials=None) - - # Should not raise any exception tracer.update_trace(tags=["test"], output={"status": "success"}) - def test_log_error_when_disabled_is_noop(self) -> None: - """log_error should do nothing when disabled.""" + def test_log_error_is_noop(self) -> None: tracer = LangfuseTracer(credentials=None) - - # Should not raise any exception tracer.log_error(error_message="Test error", response_id="resp-123") - def test_flush_when_disabled_is_noop(self) -> None: - """flush should do nothing when disabled.""" + def test_flush_is_noop(self) -> None: tracer = LangfuseTracer(credentials=None) - - # Should not raise any exception tracer.flush() -# ============================================================================= -# Test: Methods when Langfuse is enabled but operations fail -# ============================================================================= - - class TestLangfuseTracerMethodsFailure: """Tests that method failures are caught and don't propagate.""" @@ -239,19 +137,12 @@ class TestLangfuseTracerMethodsFailure: def test_start_trace_exception_is_caught( self, mock_langfuse_class: MagicMock, valid_credentials: dict ) -> None: - """start_trace exception should be caught, not propagated.""" enabled_mock = MagicMock() enabled_mock.enabled = True enabled_mock.trace.side_effect = Exception("Trace creation failed") - - disabled_mock = MagicMock() - disabled_mock.enabled = False - - mock_langfuse_class.side_effect = [disabled_mock, enabled_mock] + mock_langfuse_class.return_value = enabled_mock tracer = LangfuseTracer(credentials=valid_credentials) - - # Should NOT raise exception tracer.start_trace(name="test", input={"q": "hello"}) assert tracer.trace is None @@ -260,23 +151,15 @@ def test_start_trace_exception_is_caught( def test_start_generation_exception_is_caught( self, mock_langfuse_class: MagicMock, valid_credentials: dict ) -> None: - """start_generation exception should be caught, not propagated.""" mock_trace = MagicMock(id="trace-123") - enabled_mock = MagicMock() enabled_mock.enabled = True enabled_mock.trace.return_value = mock_trace enabled_mock.generation.side_effect = Exception("Generation failed") - - disabled_mock = MagicMock() - disabled_mock.enabled = False - - mock_langfuse_class.side_effect = [disabled_mock, enabled_mock] + mock_langfuse_class.return_value = enabled_mock tracer = LangfuseTracer(credentials=valid_credentials) tracer.start_trace(name="test", input={"q": "hello"}) - - # Should NOT raise exception tracer.start_generation(name="gen", input={"q": "hello"}) assert tracer.generation is None @@ -285,7 +168,6 @@ def test_start_generation_exception_is_caught( def test_end_generation_exception_is_caught( self, mock_langfuse_class: MagicMock, valid_credentials: dict ) -> None: - """end_generation exception should be caught, not propagated.""" mock_trace = MagicMock(id="trace-123") mock_generation = MagicMock() mock_generation.end.side_effect = Exception("End failed") @@ -294,66 +176,45 @@ def test_end_generation_exception_is_caught( enabled_mock.enabled = True enabled_mock.trace.return_value = mock_trace enabled_mock.generation.return_value = mock_generation - - disabled_mock = MagicMock() - disabled_mock.enabled = False - - mock_langfuse_class.side_effect = [disabled_mock, enabled_mock] + mock_langfuse_class.return_value = enabled_mock tracer = LangfuseTracer(credentials=valid_credentials) tracer.start_trace(name="test", input={"q": "hello"}) tracer.start_generation(name="gen", input={"q": "hello"}) - - # Should NOT raise exception tracer.end_generation(output={"response": "world"}) @patch("app.core.langfuse.langfuse.Langfuse") def test_update_trace_exception_is_caught( self, mock_langfuse_class: MagicMock, valid_credentials: dict ) -> None: - """update_trace exception should be caught, not propagated.""" mock_trace = MagicMock(id="trace-123") mock_trace.update.side_effect = Exception("Update failed") enabled_mock = MagicMock() enabled_mock.enabled = True enabled_mock.trace.return_value = mock_trace - - disabled_mock = MagicMock() - disabled_mock.enabled = False - - mock_langfuse_class.side_effect = [disabled_mock, enabled_mock] + mock_langfuse_class.return_value = enabled_mock tracer = LangfuseTracer(credentials=valid_credentials) tracer.start_trace(name="test", input={"q": "hello"}) - - # Should NOT raise exception tracer.update_trace(tags=["test"], output={"status": "success"}) @patch("app.core.langfuse.langfuse.Langfuse") def test_flush_exception_is_caught( self, mock_langfuse_class: MagicMock, valid_credentials: dict ) -> None: - """flush exception should be caught, not propagated.""" enabled_mock = MagicMock() enabled_mock.enabled = True enabled_mock.flush.side_effect = Exception("Flush failed") - - disabled_mock = MagicMock() - disabled_mock.enabled = False - - mock_langfuse_class.side_effect = [disabled_mock, enabled_mock] + mock_langfuse_class.return_value = enabled_mock tracer = LangfuseTracer(credentials=valid_credentials) - - # Should NOT raise exception tracer.flush() @patch("app.core.langfuse.langfuse.Langfuse") def test_log_error_exception_is_caught( self, mock_langfuse_class: MagicMock, valid_credentials: dict ) -> None: - """log_error exception should be caught, not propagated.""" mock_trace = MagicMock(id="trace-123") mock_trace.update.side_effect = Exception("Log error failed") mock_generation = MagicMock() @@ -363,40 +224,22 @@ def test_log_error_exception_is_caught( enabled_mock.enabled = True enabled_mock.trace.return_value = mock_trace enabled_mock.generation.return_value = mock_generation - - disabled_mock = MagicMock() - disabled_mock.enabled = False - - mock_langfuse_class.side_effect = [disabled_mock, enabled_mock] + mock_langfuse_class.return_value = enabled_mock tracer = LangfuseTracer(credentials=valid_credentials) tracer.start_trace(name="test", input={"q": "hello"}) tracer.start_generation(name="gen", input={"q": "hello"}) - - # Should NOT raise exception tracer.log_error(error_message="Test error", response_id="resp-123") -# ============================================================================= -# Test: Integration with generate_response -# ============================================================================= - - class TestGenerateResponseWithTracer: - """Tests that generate_response works in all tracer scenarios.""" + """Tests that generate_response works regardless of tracer state.""" - def test_generate_response_with_no_credentials( - self, assistant_mock: Assistant - ) -> None: - """Scenario #1: No credentials - OpenAI should still work.""" + def test_with_no_credentials(self, assistant_mock: Assistant) -> None: mock_client = MagicMock() tracer = LangfuseTracer(credentials=None) - request = ResponsesAPIRequest( - assistant_id="asst_test123", - question="What is 2+2?", - ) - + request = ResponsesAPIRequest(assistant_id="asst_test123", question="What is 2+2?") response, error = generate_response( tracer=tracer, client=mock_client, @@ -405,22 +248,14 @@ def test_generate_response_with_no_credentials( ancestor_id=None, ) - # OpenAI client was called despite disabled tracer mock_client.responses.create.assert_called_once() assert error is None - def test_generate_response_with_incomplete_credentials( - self, assistant_mock: Assistant - ) -> None: - """Scenario #2: Incomplete credentials - OpenAI should still work.""" + def test_with_incomplete_credentials(self, assistant_mock: Assistant) -> None: mock_client = MagicMock() tracer = LangfuseTracer(credentials={"incomplete": True}) - request = ResponsesAPIRequest( - assistant_id="asst_test123", - question="What is 2+2?", - ) - + request = ResponsesAPIRequest(assistant_id="asst_test123", question="What is 2+2?") response, error = generate_response( tracer=tracer, client=mock_client, @@ -432,20 +267,12 @@ def test_generate_response_with_incomplete_credentials( mock_client.responses.create.assert_called_once() assert error is None - def test_generate_response_openai_error_with_disabled_tracer( - self, assistant_mock: Assistant - ) -> None: - """OpenAI error with disabled tracer - should handle gracefully.""" + def test_openai_error_with_disabled_tracer(self, assistant_mock: Assistant) -> None: mock_client = MagicMock() mock_client.responses.create.side_effect = OpenAIError("API failed") - tracer = LangfuseTracer(credentials=None) - request = ResponsesAPIRequest( - assistant_id="asst_test123", - question="What is 2+2?", - ) - + request = ResponsesAPIRequest(assistant_id="asst_test123", question="What is 2+2?") response, error = generate_response( tracer=tracer, client=mock_client, @@ -459,19 +286,13 @@ def test_generate_response_openai_error_with_disabled_tracer( assert "API failed" in error -# ============================================================================= -# Test: Successful tracing flow -# ============================================================================= - - class TestLangfuseTracerSuccess: """Tests for successful tracer operations.""" @patch("app.core.langfuse.langfuse.Langfuse") - def test_full_tracing_flow_success( + def test_full_tracing_flow( self, mock_langfuse_class: MagicMock, valid_credentials: dict ) -> None: - """Test complete tracing flow when everything works.""" mock_trace = MagicMock(id="trace-123") mock_generation = MagicMock() @@ -479,15 +300,9 @@ def test_full_tracing_flow_success( enabled_mock.enabled = True enabled_mock.trace.return_value = mock_trace enabled_mock.generation.return_value = mock_generation - - disabled_mock = MagicMock() - disabled_mock.enabled = False - - mock_langfuse_class.side_effect = [disabled_mock, enabled_mock] + mock_langfuse_class.return_value = enabled_mock tracer = LangfuseTracer(credentials=valid_credentials) - - # Full flow tracer.start_trace(name="test", input={"q": "hello"}) tracer.start_generation(name="gen", input={"q": "hello"}) tracer.end_generation( @@ -498,7 +313,6 @@ def test_full_tracing_flow_success( tracer.update_trace(tags=["resp-123"], output={"status": "success"}) tracer.flush() - # Verify all methods were called enabled_mock.trace.assert_called_once() enabled_mock.generation.assert_called_once() mock_generation.end.assert_called_once() @@ -509,72 +323,49 @@ def test_full_tracing_flow_success( def test_start_generation_without_trace_is_noop( self, mock_langfuse_class: MagicMock, valid_credentials: dict ) -> None: - """start_generation should be no-op if trace doesn't exist.""" enabled_mock = MagicMock() enabled_mock.enabled = True enabled_mock.trace.side_effect = Exception("Trace failed") - - disabled_mock = MagicMock() - disabled_mock.enabled = False - - mock_langfuse_class.side_effect = [disabled_mock, enabled_mock] + mock_langfuse_class.return_value = enabled_mock tracer = LangfuseTracer(credentials=valid_credentials) - tracer.start_trace(name="test", input={"q": "hello"}) # This fails - + tracer.start_trace(name="test", input={"q": "hello"}) # Fails assert tracer.trace is None - # This should be a no-op since trace is None tracer.start_generation(name="gen", input={"q": "hello"}) - assert tracer.generation is None - # generation method should not be called since trace is None enabled_mock.generation.assert_not_called() @patch("app.core.langfuse.langfuse.Langfuse") def test_end_generation_without_generation_is_noop( self, mock_langfuse_class: MagicMock, valid_credentials: dict ) -> None: - """end_generation should be no-op if generation doesn't exist.""" mock_trace = MagicMock(id="trace-123") enabled_mock = MagicMock() enabled_mock.enabled = True enabled_mock.trace.return_value = mock_trace enabled_mock.generation.side_effect = Exception("Generation failed") - - disabled_mock = MagicMock() - disabled_mock.enabled = False - - mock_langfuse_class.side_effect = [disabled_mock, enabled_mock] + mock_langfuse_class.return_value = enabled_mock tracer = LangfuseTracer(credentials=valid_credentials) tracer.start_trace(name="test", input={"q": "hello"}) - tracer.start_generation(name="gen", input={"q": "hello"}) # This fails - + tracer.start_generation(name="gen", input={"q": "hello"}) # Fails assert tracer.generation is None - # This should be a no-op since generation is None - tracer.end_generation(output={"response": "world"}) - # No exception should be raised - - -# ============================================================================= -# Test: generate_response with ENABLED tracer -# ============================================================================= + tracer.end_generation(output={"response": "world"}) # No exception class TestGenerateResponseWithEnabledTracer: - """Tests for generate_response with ENABLED tracer.""" + """Tests for generate_response with enabled tracer.""" @patch("app.core.langfuse.langfuse.Langfuse") - def test_generate_response_openai_error_with_enabled_tracer( + def test_openai_error_still_calls_tracer( self, mock_langfuse_class: MagicMock, valid_credentials: dict, assistant_mock: Assistant, ) -> None: - """OpenAI error with enabled tracer - tracer.log_error should be called.""" mock_trace = MagicMock(id="trace-123") mock_generation = MagicMock() @@ -582,21 +373,13 @@ def test_generate_response_openai_error_with_enabled_tracer( enabled_mock.enabled = True enabled_mock.trace.return_value = mock_trace enabled_mock.generation.return_value = mock_generation - - disabled_mock = MagicMock() - disabled_mock.enabled = False - - mock_langfuse_class.side_effect = [disabled_mock, enabled_mock] + mock_langfuse_class.return_value = enabled_mock mock_client = MagicMock() mock_client.responses.create.side_effect = OpenAIError("API failed") tracer = LangfuseTracer(credentials=valid_credentials) - - request = ResponsesAPIRequest( - assistant_id="asst_test123", - question="What is 2+2?", - ) + request = ResponsesAPIRequest(assistant_id="asst_test123", question="What is 2+2?") response, error = generate_response( tracer=tracer, @@ -606,23 +389,18 @@ def test_generate_response_openai_error_with_enabled_tracer( ancestor_id=None, ) - # OpenAI failed assert response is None - assert error is not None assert "API failed" in error - - # Tracer methods were called before the error enabled_mock.trace.assert_called_once() enabled_mock.generation.assert_called_once() @patch("app.core.langfuse.langfuse.Langfuse") - def test_generate_response_success_with_enabled_tracer( + def test_success_calls_all_tracer_methods( self, mock_langfuse_class: MagicMock, valid_credentials: dict, assistant_mock: Assistant, ) -> None: - """Full success flow with enabled tracer.""" mock_trace = MagicMock(id="trace-123") mock_generation = MagicMock() @@ -630,13 +408,8 @@ def test_generate_response_success_with_enabled_tracer( enabled_mock.enabled = True enabled_mock.trace.return_value = mock_trace enabled_mock.generation.return_value = mock_generation + mock_langfuse_class.return_value = enabled_mock - disabled_mock = MagicMock() - disabled_mock.enabled = False - - mock_langfuse_class.side_effect = [disabled_mock, enabled_mock] - - # Mock successful OpenAI response mock_response = MagicMock() mock_response.id = "resp-456" mock_response.output_text = "The answer is 4" @@ -649,11 +422,7 @@ def test_generate_response_success_with_enabled_tracer( mock_client.responses.create.return_value = mock_response tracer = LangfuseTracer(credentials=valid_credentials) - - request = ResponsesAPIRequest( - assistant_id="asst_test123", - question="What is 2+2?", - ) + request = ResponsesAPIRequest(assistant_id="asst_test123", question="What is 2+2?") response, error = generate_response( tracer=tracer, @@ -663,24 +432,16 @@ def test_generate_response_success_with_enabled_tracer( ancestor_id=None, ) - # OpenAI succeeded assert response is not None assert error is None - - # All tracer methods were called enabled_mock.trace.assert_called_once() enabled_mock.generation.assert_called_once() mock_generation.end.assert_called_once() mock_trace.update.assert_called_once() -# ============================================================================= -# Test: Integration tests for process_response -# ============================================================================= - - class TestProcessResponseIntegration: - """Integration tests for process_response with various tracer scenarios.""" + """Integration tests for process_response.""" @patch("app.services.response.response.persist_conversation") @patch("app.services.response.response.get_conversation_by_ancestor_id") @@ -689,7 +450,7 @@ class TestProcessResponseIntegration: @patch("app.services.response.response.get_assistant_by_id") @patch("app.services.response.response.get_provider_credential") @patch("app.services.response.response.JobCrud") - def test_process_response_with_no_langfuse_credentials( + def test_works_without_langfuse_credentials( self, mock_job_crud: MagicMock, mock_get_credential: MagicMock, @@ -700,9 +461,7 @@ def test_process_response_with_no_langfuse_credentials( mock_persist: MagicMock, assistant_mock: Assistant, ) -> None: - """process_response works when langfuse_credentials is None.""" - # Setup mocks - mock_get_credential.return_value = None # No Langfuse credentials + mock_get_credential.return_value = None mock_get_assistant.return_value = assistant_mock mock_get_conversation.return_value = None @@ -719,11 +478,7 @@ def test_process_response_with_no_langfuse_credentials( mock_client.responses.create.return_value = mock_response mock_get_client.return_value = mock_client - request = ResponsesAPIRequest( - assistant_id="asst_test123", - question="Test question", - ) - + request = ResponsesAPIRequest(assistant_id="asst_test123", question="Test question") result = process_response( request=request, project_id=1, @@ -733,9 +488,7 @@ def test_process_response_with_no_langfuse_credentials( task_instance=None, ) - # OpenAI was called mock_client.responses.create.assert_called_once() - # Response succeeded assert result.success is True @patch("app.services.response.response.persist_conversation") @@ -746,7 +499,7 @@ def test_process_response_with_no_langfuse_credentials( @patch("app.services.response.response.get_provider_credential") @patch("app.services.response.response.JobCrud") @patch("app.core.langfuse.langfuse.Langfuse") - def test_process_response_with_langfuse_init_failure( + def test_works_when_langfuse_init_fails( self, mock_langfuse_class: MagicMock, mock_job_crud: MagicMock, @@ -759,16 +512,7 @@ def test_process_response_with_langfuse_init_failure( assistant_mock: Assistant, valid_credentials: dict, ) -> None: - """process_response works even when Langfuse init fails.""" - # Langfuse init fails - disabled_mock = MagicMock() - disabled_mock.enabled = False - mock_langfuse_class.side_effect = [ - disabled_mock, # Default disabled - Exception("Langfuse connection failed"), # Init fails - disabled_mock, # Fallback disabled - ] - + mock_langfuse_class.side_effect = Exception("Langfuse connection failed") mock_get_credential.return_value = valid_credentials mock_get_assistant.return_value = assistant_mock mock_get_conversation.return_value = None @@ -786,11 +530,7 @@ def test_process_response_with_langfuse_init_failure( mock_client.responses.create.return_value = mock_response mock_get_client.return_value = mock_client - request = ResponsesAPIRequest( - assistant_id="asst_test123", - question="Test question", - ) - + request = ResponsesAPIRequest(assistant_id="asst_test123", question="Test question") result = process_response( request=request, project_id=1, @@ -800,7 +540,6 @@ def test_process_response_with_langfuse_init_failure( task_instance=None, ) - # OpenAI was still called despite Langfuse failure mock_client.responses.create.assert_called_once() assert result.success is True @@ -810,7 +549,7 @@ def test_process_response_with_langfuse_init_failure( @patch("app.services.response.response.get_provider_credential") @patch("app.services.response.response.JobCrud") @patch("app.services.response.response._fail_job") - def test_process_response_openai_error_with_disabled_tracer( + def test_handles_openai_error_gracefully( self, mock_fail_job: MagicMock, mock_job_crud: MagicMock, @@ -820,22 +559,15 @@ def test_process_response_openai_error_with_disabled_tracer( mock_session: MagicMock, assistant_mock: Assistant, ) -> None: - """process_response handles OpenAI error with disabled tracer.""" - mock_get_credential.return_value = None # No Langfuse + mock_get_credential.return_value = None mock_get_assistant.return_value = assistant_mock mock_client = MagicMock() mock_client.responses.create.side_effect = OpenAIError("API failed") mock_get_client.return_value = mock_client - - # Mock _fail_job to return a failure response mock_fail_job.return_value = MagicMock(success=False, error="API failed") - request = ResponsesAPIRequest( - assistant_id="asst_test123", - question="Test question", - ) - + request = ResponsesAPIRequest(assistant_id="asst_test123", question="Test question") result = process_response( request=request, project_id=1, @@ -845,5 +577,4 @@ def test_process_response_openai_error_with_disabled_tracer( task_instance=None, ) - # Response failed gracefully assert result.success is False From d96232d587ef983c33715c4996aa3fed3371ad54 Mon Sep 17 00:00:00 2001 From: Prashant Vasudevan <71649489+vprashrex@users.noreply.github.com> Date: Fri, 23 Jan 2026 22:30:35 +0530 Subject: [PATCH 3/4] Formatted the code and added the fixture for test_langfuse_tracer.py --- backend/app/core/langfuse/langfuse.py | 22 +++-- .../test_langfuse/test_langfuse_tracer.py | 82 +++++++++++++------ 2 files changed, 72 insertions(+), 32 deletions(-) diff --git a/backend/app/core/langfuse/langfuse.py b/backend/app/core/langfuse/langfuse.py index d78b64464..fc2adc0fb 100644 --- a/backend/app/core/langfuse/langfuse.py +++ b/backend/app/core/langfuse/langfuse.py @@ -65,7 +65,9 @@ def _langfuse_call(self, fn, *args, **kwargs): try: return fn(*args, **kwargs) except Exception as e: - logger.warning(f"[LangfuseTracer] {getattr(fn, '__name__', 'operation')} failed: {e}") + logger.warning( + f"[LangfuseTracer] {getattr(fn, '__name__', 'operation')} failed: {e}" + ) self._failed = True return None @@ -113,7 +115,9 @@ def end_generation( ): if self._failed or not self.generation: return - self._langfuse_call(self.generation.end, output=output, usage=usage, model=model) + self._langfuse_call( + self.generation.end, output=output, usage=usage, model=model + ) def update_trace(self, tags: list[str], output: Dict[str, Any]): if self._failed or not self.trace: @@ -188,7 +192,9 @@ def langfuse_call(fn, *args, **kwargs): try: return fn(*args, **kwargs) except Exception as e: - logger.warning(f"[Langfuse] {getattr(fn, '__name__', 'operation')} failed: {e}") + logger.warning( + f"[Langfuse] {getattr(fn, '__name__', 'operation')} failed: {e}" + ) failed = True return None @@ -217,7 +223,10 @@ def langfuse_call(fn, *args, **kwargs): if generation: langfuse_call( generation.end, - output={"status": "success", "output": response.response.output.text}, + output={ + "status": "success", + "output": response.response.output.text, + }, usage_details={ "input": response.usage.input_tokens, "output": response.usage.output_tokens, @@ -227,7 +236,10 @@ def langfuse_call(fn, *args, **kwargs): if trace: langfuse_call( trace.update, - output={"status": "success", "output": response.response.output.text}, + output={ + "status": "success", + "output": response.response.output.text, + }, session_id=session_id or response.response.conversation_id, ) else: diff --git a/backend/app/tests/core/test_langfuse/test_langfuse_tracer.py b/backend/app/tests/core/test_langfuse/test_langfuse_tracer.py index 552a1f698..020d17ffa 100644 --- a/backend/app/tests/core/test_langfuse/test_langfuse_tracer.py +++ b/backend/app/tests/core/test_langfuse/test_langfuse_tracer.py @@ -13,27 +13,35 @@ @pytest.fixture def valid_credentials() -> dict: - return { - "public_key": "pk-test-123", - "secret_key": "sk-test-456", - "host": "https://langfuse.example.com", - } + def _create(**overrides) -> dict: + defaults = { + "public_key": "pk-test-123", + "secret_key": "sk-test-456", + "host": "https://langfuse.example.com", + } + return {**defaults, **overrides} + + return _create() @pytest.fixture def assistant_mock() -> Assistant: - return Assistant( - id=123, - assistant_id="asst_test123", - name="Test Assistant", - model="gpt-4", - temperature=0.7, - instructions="You are a helpful assistant.", - vector_store_ids=["vs1"], - max_num_results=5, - project_id=1, - organization_id=1, - ) + def _create(**overrides) -> Assistant: + defaults = { + "id": 123, + "assistant_id": "asst_test123", + "name": "Test Assistant", + "model": "gpt-4", + "temperature": 0.7, + "instructions": "You are a helpful assistant.", + "vector_store_ids": ["vs1"], + "max_num_results": 5, + "project_id": 1, + "organization_id": 1, + } + return Assistant(**{**defaults, **overrides}) + + return _create() class TestLangfuseTracerInit: @@ -50,11 +58,15 @@ def test_empty_credentials_sets_langfuse_to_none(self) -> None: assert tracer.langfuse is None def test_missing_public_key_sets_langfuse_to_none(self) -> None: - tracer = LangfuseTracer(credentials={"secret_key": "sk", "host": "https://x.com"}) + tracer = LangfuseTracer( + credentials={"secret_key": "sk", "host": "https://x.com"} + ) assert tracer.langfuse is None def test_missing_secret_key_sets_langfuse_to_none(self) -> None: - tracer = LangfuseTracer(credentials={"public_key": "pk", "host": "https://x.com"}) + tracer = LangfuseTracer( + credentials={"public_key": "pk", "host": "https://x.com"} + ) assert tracer.langfuse is None def test_missing_host_sets_langfuse_to_none(self) -> None: @@ -239,7 +251,9 @@ def test_with_no_credentials(self, assistant_mock: Assistant) -> None: mock_client = MagicMock() tracer = LangfuseTracer(credentials=None) - request = ResponsesAPIRequest(assistant_id="asst_test123", question="What is 2+2?") + request = ResponsesAPIRequest( + assistant_id="asst_test123", question="What is 2+2?" + ) response, error = generate_response( tracer=tracer, client=mock_client, @@ -255,7 +269,9 @@ def test_with_incomplete_credentials(self, assistant_mock: Assistant) -> None: mock_client = MagicMock() tracer = LangfuseTracer(credentials={"incomplete": True}) - request = ResponsesAPIRequest(assistant_id="asst_test123", question="What is 2+2?") + request = ResponsesAPIRequest( + assistant_id="asst_test123", question="What is 2+2?" + ) response, error = generate_response( tracer=tracer, client=mock_client, @@ -272,7 +288,9 @@ def test_openai_error_with_disabled_tracer(self, assistant_mock: Assistant) -> N mock_client.responses.create.side_effect = OpenAIError("API failed") tracer = LangfuseTracer(credentials=None) - request = ResponsesAPIRequest(assistant_id="asst_test123", question="What is 2+2?") + request = ResponsesAPIRequest( + assistant_id="asst_test123", question="What is 2+2?" + ) response, error = generate_response( tracer=tracer, client=mock_client, @@ -379,7 +397,9 @@ def test_openai_error_still_calls_tracer( mock_client.responses.create.side_effect = OpenAIError("API failed") tracer = LangfuseTracer(credentials=valid_credentials) - request = ResponsesAPIRequest(assistant_id="asst_test123", question="What is 2+2?") + request = ResponsesAPIRequest( + assistant_id="asst_test123", question="What is 2+2?" + ) response, error = generate_response( tracer=tracer, @@ -422,7 +442,9 @@ def test_success_calls_all_tracer_methods( mock_client.responses.create.return_value = mock_response tracer = LangfuseTracer(credentials=valid_credentials) - request = ResponsesAPIRequest(assistant_id="asst_test123", question="What is 2+2?") + request = ResponsesAPIRequest( + assistant_id="asst_test123", question="What is 2+2?" + ) response, error = generate_response( tracer=tracer, @@ -478,7 +500,9 @@ def test_works_without_langfuse_credentials( mock_client.responses.create.return_value = mock_response mock_get_client.return_value = mock_client - request = ResponsesAPIRequest(assistant_id="asst_test123", question="Test question") + request = ResponsesAPIRequest( + assistant_id="asst_test123", question="Test question" + ) result = process_response( request=request, project_id=1, @@ -530,7 +554,9 @@ def test_works_when_langfuse_init_fails( mock_client.responses.create.return_value = mock_response mock_get_client.return_value = mock_client - request = ResponsesAPIRequest(assistant_id="asst_test123", question="Test question") + request = ResponsesAPIRequest( + assistant_id="asst_test123", question="Test question" + ) result = process_response( request=request, project_id=1, @@ -567,7 +593,9 @@ def test_handles_openai_error_gracefully( mock_get_client.return_value = mock_client mock_fail_job.return_value = MagicMock(success=False, error="API failed") - request = ResponsesAPIRequest(assistant_id="asst_test123", question="Test question") + request = ResponsesAPIRequest( + assistant_id="asst_test123", question="Test question" + ) result = process_response( request=request, project_id=1, From 7e7b0bcc78014fb46381f127537bbe72d4412fe5 Mon Sep 17 00:00:00 2001 From: Prashant Vasudevan <71649489+vprashrex@users.noreply.github.com> Date: Fri, 23 Jan 2026 22:39:41 +0530 Subject: [PATCH 4/4] made the loggers consistent --- backend/app/core/langfuse/langfuse.py | 43 ++++++++++++++------------- 1 file changed, 23 insertions(+), 20 deletions(-) diff --git a/backend/app/core/langfuse/langfuse.py b/backend/app/core/langfuse/langfuse.py index fc2adc0fb..d70d07e09 100644 --- a/backend/app/core/langfuse/langfuse.py +++ b/backend/app/core/langfuse/langfuse.py @@ -17,7 +17,7 @@ def __init__( credentials: Optional[dict] = None, session_id: Optional[str] = None, response_id: Optional[str] = None, - ): + ) -> None: self.session_id = session_id or str(uuid.uuid4()) self.langfuse: Optional[Langfuse] = None self.trace: Optional[StatefulTraceClient] = None @@ -39,7 +39,8 @@ def __init__( host=credentials["host"], enabled=True, # This ensures the client is active ) - except Exception: + except Exception as e: + logger.warning(f"[LangfuseTracer] Failed to initialize: {e}") self._failed = True return @@ -48,18 +49,16 @@ def __init__( traces = self.langfuse.fetch_traces(tags=response_id).data if traces: self.session_id = traces[0].session_id - except Exception: - pass # Non-critical: session resume is optional + except Exception as e: + logger.debug(f"[LangfuseTracer] Session resume failed: {e}") logger.info( - f"[LangfuseTracer] Langfuse tracing enabled | session_id={self.session_id}" + f"[LangfuseTracer] Tracing enabled | session_id={self.session_id}" ) else: - logger.warning( - "[LangfuseTracer] Langfuse tracing disabled due to missing credentials" - ) + logger.warning("[LangfuseTracer] Tracing disabled - missing credentials") - def _langfuse_call(self, fn, *args, **kwargs): + def _langfuse_call(self, fn: Callable, *args: Any, **kwargs: Any) -> Any: if self._failed: return None try: @@ -77,7 +76,7 @@ def start_trace( input: Dict[str, Any], metadata: Optional[Dict[str, Any]] = None, tags: list[str] | None = None, - ): + ) -> None: if self._failed or not self.langfuse: return metadata = metadata or {} @@ -96,7 +95,7 @@ def start_generation( name: str, input: Dict[str, Any], metadata: Optional[Dict[str, Any]] = None, - ): + ) -> None: if self._failed or not self.trace: return self.generation = self._langfuse_call( @@ -112,19 +111,19 @@ def end_generation( output: Dict[str, Any], usage: Optional[Dict[str, Any]] = None, model: Optional[str] = None, - ): + ) -> None: if self._failed or not self.generation: return self._langfuse_call( self.generation.end, output=output, usage=usage, model=model ) - def update_trace(self, tags: list[str], output: Dict[str, Any]): + def update_trace(self, tags: list[str], output: Dict[str, Any]) -> None: if self._failed or not self.trace: return self._langfuse_call(self.trace.update, tags=tags, output=output) - def log_error(self, error_message: str, response_id: Optional[str] = None): + def log_error(self, error_message: str, response_id: Optional[str] = None) -> None: if self._failed: return if self.generation: @@ -136,7 +135,7 @@ def log_error(self, error_message: str, response_id: Optional[str] = None): output={"status": "failure", "error": error_message}, ) - def flush(self): + def flush(self) -> None: if self._failed or not self.langfuse: return self._langfuse_call(self.langfuse.flush) @@ -145,7 +144,7 @@ def flush(self): def observe_llm_execution( session_id: str | None = None, credentials: dict | None = None, -): +) -> Callable: """Decorator to add Langfuse observability to LLM provider execute methods. Args: @@ -166,7 +165,9 @@ def wrapper( ): # Skip observability if no credentials provided if not credentials: - logger.info("[Langfuse] No credentials - skipping observability") + logger.info( + "[observe_llm_execution] No credentials - skipping observability" + ) return func(completion_config, query, **kwargs) try: @@ -176,10 +177,12 @@ def wrapper( host=credentials.get("host"), ) logger.info( - f"[Langfuse] Tracing enabled | session_id={session_id or 'auto'}" + f"[observe_llm_execution] Tracing enabled | session_id={session_id or 'auto'}" ) except Exception as e: - logger.warning(f"[Langfuse] Failed to initialize client: {e}") + logger.warning( + f"[observe_llm_execution] Failed to initialize client: {e}" + ) return func(completion_config, query, **kwargs) failed = False @@ -193,7 +196,7 @@ def langfuse_call(fn, *args, **kwargs): return fn(*args, **kwargs) except Exception as e: logger.warning( - f"[Langfuse] {getattr(fn, '__name__', 'operation')} failed: {e}" + f"[observe_llm_execution] {getattr(fn, '__name__', 'operation')} failed: {e}" ) failed = True return None