-
Notifications
You must be signed in to change notification settings - Fork 8
Langfuse: Add comprehensive error handling and tests #551
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
vprashrex
wants to merge
6
commits into
main
Choose a base branch
from
feature/langfuse-error-handling
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
+738
−64
Open
Changes from all commits
Commits
Show all changes
6 commits
Select commit
Hold shift + click to select a range
8a43238
Langfuse: Add comprehensive error handling and tests
vprashrex f9ba4d0
Merge remote-tracking branch 'origin/main' into feature/langfuse-erro…
vprashrex 68e4e60
Merge branch 'main' into feature/langfuse-error-handling
AkhileshNegi a4c0dbd
Langfuse: Refactor error handling to use wrapper function
vprashrex d96232d
Formatted the code and added the fixture for test_langfuse_tracer.py
vprashrex 7e7b0bc
made the loggers consistent
vprashrex File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Some comments aren't visible on the classic Files Changed page.
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -17,11 +17,12 @@ def __init__( | |
| credentials: Optional[dict] = None, | ||
| session_id: Optional[str] = None, | ||
| response_id: Optional[str] = None, | ||
| ): | ||
| ) -> None: | ||
| self.session_id = session_id or str(uuid.uuid4()) | ||
| self.langfuse: Optional[Langfuse] = None | ||
| self.trace: Optional[StatefulTraceClient] = None | ||
| self.generation: Optional[StatefulGenerationClient] = None | ||
| self._failed = False | ||
|
|
||
| has_credentials = ( | ||
| credentials | ||
|
|
@@ -31,38 +32,57 @@ def __init__( | |
| ) | ||
|
|
||
| if has_credentials: | ||
| self.langfuse = Langfuse( | ||
| public_key=credentials["public_key"], | ||
| secret_key=credentials["secret_key"], | ||
| host=credentials["host"], | ||
| enabled=True, # This ensures the client is active | ||
| ) | ||
| try: | ||
| self.langfuse = Langfuse( | ||
| public_key=credentials["public_key"], | ||
| secret_key=credentials["secret_key"], | ||
| host=credentials["host"], | ||
| enabled=True, # This ensures the client is active | ||
| ) | ||
| except Exception as e: | ||
| logger.warning(f"[LangfuseTracer] Failed to initialize: {e}") | ||
| self._failed = True | ||
| return | ||
|
|
||
| if response_id: | ||
| traces = self.langfuse.fetch_traces(tags=response_id).data | ||
| if traces: | ||
| self.session_id = traces[0].session_id | ||
| try: | ||
| traces = self.langfuse.fetch_traces(tags=response_id).data | ||
| if traces: | ||
| self.session_id = traces[0].session_id | ||
| except Exception as e: | ||
| logger.debug(f"[LangfuseTracer] Session resume failed: {e}") | ||
|
|
||
| logger.info( | ||
| f"[LangfuseTracer] Langfuse tracing enabled | session_id={self.session_id}" | ||
| f"[LangfuseTracer] Tracing enabled | session_id={self.session_id}" | ||
| ) | ||
| else: | ||
| self.langfuse = Langfuse(enabled=False) | ||
| logger.warning("[LangfuseTracer] Tracing disabled - missing credentials") | ||
|
|
||
| def _langfuse_call(self, fn: Callable, *args: Any, **kwargs: Any) -> Any: | ||
| if self._failed: | ||
| return None | ||
| try: | ||
| return fn(*args, **kwargs) | ||
| except Exception as e: | ||
| logger.warning( | ||
| "[LangfuseTracer] Langfuse tracing disabled due to missing credentials" | ||
| f"[LangfuseTracer] {getattr(fn, '__name__', 'operation')} failed: {e}" | ||
| ) | ||
| self._failed = True | ||
| return None | ||
|
|
||
| def start_trace( | ||
| self, | ||
| name: str, | ||
| input: Dict[str, Any], | ||
| metadata: Optional[Dict[str, Any]] = None, | ||
| tags: list[str] | None = None, | ||
| ): | ||
| ) -> None: | ||
| if self._failed or not self.langfuse: | ||
| return | ||
| metadata = metadata or {} | ||
| metadata["request_id"] = correlation_id.get() or "N/A" | ||
|
|
||
| self.trace = self.langfuse.trace( | ||
| self.trace = self._langfuse_call( | ||
| self.langfuse.trace, | ||
| name=name, | ||
| input=input, | ||
| metadata=metadata, | ||
|
|
@@ -75,10 +95,11 @@ def start_generation( | |
| name: str, | ||
| input: Dict[str, Any], | ||
| metadata: Optional[Dict[str, Any]] = None, | ||
| ): | ||
| if not self.trace: | ||
| ) -> None: | ||
| if self._failed or not self.trace: | ||
| return | ||
| self.generation = self.langfuse.generation( | ||
| self.generation = self._langfuse_call( | ||
| self.langfuse.generation, | ||
| name=name, | ||
| trace_id=self.trace.id, | ||
| input=input, | ||
|
|
@@ -90,31 +111,40 @@ def end_generation( | |
| output: Dict[str, Any], | ||
| usage: Optional[Dict[str, Any]] = None, | ||
| model: Optional[str] = None, | ||
| ): | ||
| if self.generation: | ||
| self.generation.end(output=output, usage=usage, model=model) | ||
| ) -> None: | ||
| if self._failed or not self.generation: | ||
| return | ||
| self._langfuse_call( | ||
| self.generation.end, output=output, usage=usage, model=model | ||
| ) | ||
|
|
||
| def update_trace(self, tags: list[str], output: Dict[str, Any]): | ||
| if self.trace: | ||
| self.trace.update(tags=tags, output=output) | ||
| def update_trace(self, tags: list[str], output: Dict[str, Any]) -> None: | ||
| if self._failed or not self.trace: | ||
| return | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ditto as above |
||
| self._langfuse_call(self.trace.update, tags=tags, output=output) | ||
|
|
||
| def log_error(self, error_message: str, response_id: Optional[str] = None): | ||
| def log_error(self, error_message: str, response_id: Optional[str] = None) -> None: | ||
| if self._failed: | ||
| return | ||
| if self.generation: | ||
| self.generation.end(output={"error": error_message}) | ||
| self._langfuse_call(self.generation.end, output={"error": error_message}) | ||
| if self.trace: | ||
| self.trace.update( | ||
| self._langfuse_call( | ||
| self.trace.update, | ||
| tags=[response_id] if response_id else [], | ||
| output={"status": "failure", "error": error_message}, | ||
| ) | ||
|
|
||
| def flush(self): | ||
| self.langfuse.flush() | ||
| def flush(self) -> None: | ||
| if self._failed or not self.langfuse: | ||
| return | ||
| self._langfuse_call(self.langfuse.flush) | ||
|
|
||
|
|
||
| def observe_llm_execution( | ||
| session_id: str | None = None, | ||
| credentials: dict | None = None, | ||
| ): | ||
| ) -> Callable: | ||
| """Decorator to add Langfuse observability to LLM provider execute methods. | ||
|
|
||
| Args: | ||
|
|
@@ -135,7 +165,9 @@ def wrapper( | |
| ): | ||
| # Skip observability if no credentials provided | ||
| if not credentials: | ||
| logger.info("[Langfuse] No credentials - skipping observability") | ||
| logger.info( | ||
| "[observe_llm_execution] No credentials - skipping observability" | ||
| ) | ||
| return func(completion_config, query, **kwargs) | ||
|
|
||
| try: | ||
|
|
@@ -144,30 +176,56 @@ def wrapper( | |
| secret_key=credentials.get("secret_key"), | ||
| host=credentials.get("host"), | ||
| ) | ||
| logger.info( | ||
| f"[observe_llm_execution] Tracing enabled | session_id={session_id or 'auto'}" | ||
| ) | ||
| except Exception as e: | ||
| logger.warning(f"[Langfuse] Failed to initialize client: {e}") | ||
| logger.warning( | ||
| f"[observe_llm_execution] Failed to initialize client: {e}" | ||
| ) | ||
| return func(completion_config, query, **kwargs) | ||
|
|
||
| trace = langfuse.trace( | ||
| failed = False | ||
|
|
||
| def langfuse_call(fn, *args, **kwargs): | ||
| """Execute Langfuse operation safely. First failure disables further calls.""" | ||
| nonlocal failed | ||
| if failed: | ||
| return None | ||
| try: | ||
| return fn(*args, **kwargs) | ||
| except Exception as e: | ||
| logger.warning( | ||
| f"[observe_llm_execution] {getattr(fn, '__name__', 'operation')} failed: {e}" | ||
| ) | ||
| failed = True | ||
| return None | ||
|
|
||
| trace = langfuse_call( | ||
| langfuse.trace, | ||
| name="unified-llm-call", | ||
| input=query.input, | ||
| tags=[completion_config.provider], | ||
| ) | ||
|
|
||
| generation = trace.generation( | ||
| name=f"{completion_config.provider}-completion", | ||
| input=query.input, | ||
| model=completion_config.params.get("model"), | ||
| ) | ||
| generation = None | ||
| if trace: | ||
| generation = langfuse_call( | ||
| trace.generation, | ||
| name=f"{completion_config.provider}-completion", | ||
| input=query.input, | ||
| model=completion_config.params.get("model"), | ||
| ) | ||
|
|
||
| try: | ||
| # Execute the actual LLM call | ||
| response: LLMCallResponse | None | ||
| error: str | None | ||
| response, error = func(completion_config, query, **kwargs) | ||
| # Execute the actual LLM call | ||
| response: LLMCallResponse | None | ||
| error: str | None | ||
| response, error = func(completion_config, query, **kwargs) | ||
|
|
||
| if response: | ||
| generation.end( | ||
| if response: | ||
| if generation: | ||
| langfuse_call( | ||
| generation.end, | ||
| output={ | ||
| "status": "success", | ||
| "output": response.response.output.text, | ||
|
|
@@ -178,34 +236,28 @@ def wrapper( | |
| }, | ||
| model=response.response.model, | ||
| ) | ||
|
|
||
| trace.update( | ||
| if trace: | ||
| langfuse_call( | ||
| trace.update, | ||
| output={ | ||
| "status": "success", | ||
| "output": response.response.output.text, | ||
| }, | ||
| session_id=session_id or response.response.conversation_id, | ||
| ) | ||
| else: | ||
| error_msg = error or "Unknown error" | ||
| generation.end(output={"error": error_msg}) | ||
| trace.update( | ||
| else: | ||
| error_msg = error or "Unknown error" | ||
| if generation: | ||
| langfuse_call(generation.end, output={"error": error_msg}) | ||
| if trace: | ||
| langfuse_call( | ||
| trace.update, | ||
| output={"status": "failure", "error": error_msg}, | ||
| session_id=session_id, | ||
| ) | ||
|
|
||
| langfuse.flush() | ||
| return response, error | ||
|
|
||
| except Exception as e: | ||
| error_msg = str(e) | ||
| generation.end(output={"error": error_msg}) | ||
| trace.update( | ||
| output={"status": "failure", "error": error_msg}, | ||
| session_id=session_id, | ||
| ) | ||
| langfuse.flush() | ||
| raise | ||
| langfuse_call(langfuse.flush) | ||
| return response, error | ||
|
|
||
| return wrapper | ||
|
|
||
|
|
||
Empty file.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,14 @@ | ||
| """ | ||
| Local conftest for LangfuseTracer unit tests. | ||
|
|
||
| These tests don't require database access, so we override the session-scoped | ||
| seed_baseline fixture to skip database seeding. | ||
| """ | ||
|
|
||
| import pytest | ||
|
|
||
|
|
||
| @pytest.fixture(scope="session", autouse=True) | ||
| def seed_baseline(): | ||
| """Override the global seed_baseline fixture to skip database seeding.""" | ||
| yield |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Prefix
__init__logs with the function name.Lines 43, 53, 55-59 should use
[LangfuseTracer.__init__]to match the logging convention. As per coding guidelines, ...♻️ Proposed fix
🤖 Prompt for AI Agents