Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
180 changes: 116 additions & 64 deletions backend/app/core/langfuse/langfuse.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@ def __init__(
credentials: Optional[dict] = None,
session_id: Optional[str] = None,
response_id: Optional[str] = None,
):
) -> None:
self.session_id = session_id or str(uuid.uuid4())
self.langfuse: Optional[Langfuse] = None
self.trace: Optional[StatefulTraceClient] = None
self.generation: Optional[StatefulGenerationClient] = None
self._failed = False

has_credentials = (
credentials
Expand All @@ -31,38 +32,57 @@ def __init__(
)

if has_credentials:
self.langfuse = Langfuse(
public_key=credentials["public_key"],
secret_key=credentials["secret_key"],
host=credentials["host"],
enabled=True, # This ensures the client is active
)
try:
self.langfuse = Langfuse(
public_key=credentials["public_key"],
secret_key=credentials["secret_key"],
host=credentials["host"],
enabled=True, # This ensures the client is active
)
except Exception as e:
logger.warning(f"[LangfuseTracer] Failed to initialize: {e}")
self._failed = True
return

if response_id:
traces = self.langfuse.fetch_traces(tags=response_id).data
if traces:
self.session_id = traces[0].session_id
try:
traces = self.langfuse.fetch_traces(tags=response_id).data
if traces:
self.session_id = traces[0].session_id
except Exception as e:
logger.debug(f"[LangfuseTracer] Session resume failed: {e}")

logger.info(
f"[LangfuseTracer] Langfuse tracing enabled | session_id={self.session_id}"
f"[LangfuseTracer] Tracing enabled | session_id={self.session_id}"
)
else:
self.langfuse = Langfuse(enabled=False)
logger.warning("[LangfuseTracer] Tracing disabled - missing credentials")

Comment on lines +35 to +60
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Prefix __init__ logs with the function name.

Lines 43, 53, 55-59 should use [LangfuseTracer.__init__] to match the logging convention. As per coding guidelines, ...

♻️ Proposed fix
-                logger.warning(f"[LangfuseTracer] Failed to initialize: {e}")
+                logger.warning(f"[LangfuseTracer.__init__] Failed to initialize: {e}")
@@
-                    logger.debug(f"[LangfuseTracer] Session resume failed: {e}")
+                    logger.debug(f"[LangfuseTracer.__init__] Session resume failed: {e}")
@@
-            logger.info(
-                f"[LangfuseTracer] Tracing enabled | session_id={self.session_id}"
-            )
+            logger.info(
+                f"[LangfuseTracer.__init__] Tracing enabled | session_id={self.session_id}"
+            )
@@
-            logger.warning("[LangfuseTracer] Tracing disabled - missing credentials")
+            logger.warning("[LangfuseTracer.__init__] Tracing disabled - missing credentials")
🤖 Prompt for AI Agents
In `@backend/app/core/langfuse/langfuse.py` around lines 35 - 60, Update the log
messages in LangfuseTracer.__init__ to follow the logging convention by
prefixing them with "[LangfuseTracer.__init__]": change the logger.warning that
logs initialization failure (inside the except for Langfuse(...)), the
logger.debug in the except when fetch_traces/session resume fails, the
logger.info that announces tracing enabled with session_id, and the
logger.warning that reports tracing disabled due to missing credentials; keep
the same messages and exception interpolation but prepend the required
function-name tag so all logs from the constructor use
"[LangfuseTracer.__init__]".

def _langfuse_call(self, fn: Callable, *args: Any, **kwargs: Any) -> Any:
if self._failed:
return None
try:
return fn(*args, **kwargs)
except Exception as e:
logger.warning(
"[LangfuseTracer] Langfuse tracing disabled due to missing credentials"
f"[LangfuseTracer] {getattr(fn, '__name__', 'operation')} failed: {e}"
)
self._failed = True
return None

def start_trace(
self,
name: str,
input: Dict[str, Any],
metadata: Optional[Dict[str, Any]] = None,
tags: list[str] | None = None,
):
) -> None:
if self._failed or not self.langfuse:
return
metadata = metadata or {}
metadata["request_id"] = correlation_id.get() or "N/A"

self.trace = self.langfuse.trace(
self.trace = self._langfuse_call(
self.langfuse.trace,
name=name,
input=input,
metadata=metadata,
Expand All @@ -75,10 +95,11 @@ def start_generation(
name: str,
input: Dict[str, Any],
metadata: Optional[Dict[str, Any]] = None,
):
if not self.trace:
) -> None:
if self._failed or not self.trace:
return
self.generation = self.langfuse.generation(
self.generation = self._langfuse_call(
self.langfuse.generation,
name=name,
trace_id=self.trace.id,
input=input,
Expand All @@ -90,31 +111,40 @@ def end_generation(
output: Dict[str, Any],
usage: Optional[Dict[str, Any]] = None,
model: Optional[str] = None,
):
if self.generation:
self.generation.end(output=output, usage=usage, model=model)
) -> None:
if self._failed or not self.generation:
return
self._langfuse_call(
self.generation.end, output=output, usage=usage, model=model
)

def update_trace(self, tags: list[str], output: Dict[str, Any]):
if self.trace:
self.trace.update(tags=tags, output=output)
def update_trace(self, tags: list[str], output: Dict[str, Any]) -> None:
if self._failed or not self.trace:
return
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto as above

self._langfuse_call(self.trace.update, tags=tags, output=output)

def log_error(self, error_message: str, response_id: Optional[str] = None):
def log_error(self, error_message: str, response_id: Optional[str] = None) -> None:
if self._failed:
return
if self.generation:
self.generation.end(output={"error": error_message})
self._langfuse_call(self.generation.end, output={"error": error_message})
if self.trace:
self.trace.update(
self._langfuse_call(
self.trace.update,
tags=[response_id] if response_id else [],
output={"status": "failure", "error": error_message},
)

def flush(self):
self.langfuse.flush()
def flush(self) -> None:
if self._failed or not self.langfuse:
return
self._langfuse_call(self.langfuse.flush)


def observe_llm_execution(
session_id: str | None = None,
credentials: dict | None = None,
):
) -> Callable:
"""Decorator to add Langfuse observability to LLM provider execute methods.

Args:
Expand All @@ -135,7 +165,9 @@ def wrapper(
):
# Skip observability if no credentials provided
if not credentials:
logger.info("[Langfuse] No credentials - skipping observability")
logger.info(
"[observe_llm_execution] No credentials - skipping observability"
)
return func(completion_config, query, **kwargs)

try:
Expand All @@ -144,30 +176,56 @@ def wrapper(
secret_key=credentials.get("secret_key"),
host=credentials.get("host"),
)
logger.info(
f"[observe_llm_execution] Tracing enabled | session_id={session_id or 'auto'}"
)
except Exception as e:
logger.warning(f"[Langfuse] Failed to initialize client: {e}")
logger.warning(
f"[observe_llm_execution] Failed to initialize client: {e}"
)
return func(completion_config, query, **kwargs)

trace = langfuse.trace(
failed = False

def langfuse_call(fn, *args, **kwargs):
"""Execute Langfuse operation safely. First failure disables further calls."""
nonlocal failed
if failed:
return None
try:
return fn(*args, **kwargs)
except Exception as e:
logger.warning(
f"[observe_llm_execution] {getattr(fn, '__name__', 'operation')} failed: {e}"
)
failed = True
return None

trace = langfuse_call(
langfuse.trace,
name="unified-llm-call",
input=query.input,
tags=[completion_config.provider],
)

generation = trace.generation(
name=f"{completion_config.provider}-completion",
input=query.input,
model=completion_config.params.get("model"),
)
generation = None
if trace:
generation = langfuse_call(
trace.generation,
name=f"{completion_config.provider}-completion",
input=query.input,
model=completion_config.params.get("model"),
)

try:
# Execute the actual LLM call
response: LLMCallResponse | None
error: str | None
response, error = func(completion_config, query, **kwargs)
# Execute the actual LLM call
response: LLMCallResponse | None
error: str | None
response, error = func(completion_config, query, **kwargs)

if response:
generation.end(
if response:
if generation:
langfuse_call(
generation.end,
output={
"status": "success",
"output": response.response.output.text,
Expand All @@ -178,34 +236,28 @@ def wrapper(
},
model=response.response.model,
)

trace.update(
if trace:
langfuse_call(
trace.update,
output={
"status": "success",
"output": response.response.output.text,
},
session_id=session_id or response.response.conversation_id,
)
else:
error_msg = error or "Unknown error"
generation.end(output={"error": error_msg})
trace.update(
else:
error_msg = error or "Unknown error"
if generation:
langfuse_call(generation.end, output={"error": error_msg})
if trace:
langfuse_call(
trace.update,
output={"status": "failure", "error": error_msg},
session_id=session_id,
)

langfuse.flush()
return response, error

except Exception as e:
error_msg = str(e)
generation.end(output={"error": error_msg})
trace.update(
output={"status": "failure", "error": error_msg},
session_id=session_id,
)
langfuse.flush()
raise
langfuse_call(langfuse.flush)
return response, error

return wrapper

Expand Down
Empty file.
14 changes: 14 additions & 0 deletions backend/app/tests/core/test_langfuse/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
"""
Local conftest for LangfuseTracer unit tests.

These tests don't require database access, so we override the session-scoped
seed_baseline fixture to skip database seeding.
"""

import pytest


@pytest.fixture(scope="session", autouse=True)
def seed_baseline():
"""Override the global seed_baseline fixture to skip database seeding."""
yield
Loading