Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 20 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -1018,10 +1018,21 @@ By default the sandbox completely reloads non-standard-library and non-Temporal
the sandbox quicker and use less memory when importing known-side-effect-free modules, they can be marked
as passthrough modules.

**For performance and behavior reasons, users are encouraged to pass through all third party modules whose calls will be
**Passthrough modules are about import-time behavior and determinism, not just about whether a module is third-party.**
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There seem to be a number of unrelated readme changes.

Any module that is side-effect-free on import and makes only deterministic calls can be passed through, including
first-party application modules, third-party libraries, and non-workflow-specific code. The key criteria are:
1. The module does not have import-time side effects (e.g., file I/O, network calls, random values)
2. Calls made from the module are deterministic within workflow code

**For performance and behavior reasons, users are encouraged to pass through all non-workflow imports whose calls will be
deterministic.** In particular, this advice extends to modules containing the activities to be referenced in workflows,
and modules containing dataclasses and Pydantic models, which can be particularly expensive to import.

**Note on datetime and similar stdlib modules:** If you need to use non-deterministic functions like `datetime.date.today()`,
do **not** mark `datetime` as passthrough. Instead, use `with_child_unrestricted()` to allow specific invalid members
(see [Invalid Module Members](#invalid-module-members) below). Passthrough affects import reloading, while
`invalid_module_members` controls which calls are allowed at runtime.

One way to pass through a module is at import time in the workflow file using the `imports_passed_through` context
manager like so:

Expand Down Expand Up @@ -1071,7 +1082,7 @@ Note, some calls from the module may still be checked for invalid calls at runti

`SandboxRestrictions.invalid_module_members` contains a root matcher that applies to all module members. This already
has a default set which includes things like `datetime.date.today()` which should never be called from a workflow. To
remove this restriction:
remove this restriction and allow a specific call like `datetime.date.today()`:

```python
my_restrictions = dataclasses.replace(
Expand All @@ -1083,6 +1094,12 @@ my_restrictions = dataclasses.replace(
my_worker = Worker(..., workflow_runner=SandboxedWorkflowRunner(restrictions=my_restrictions))
```

**This is the correct approach for allowing non-deterministic stdlib calls.** Do not use passthrough modules for this
purpose—passthrough controls import reloading, while `invalid_module_members` controls runtime call restrictions.

For a complete example showing both passthrough modules and unrestricted invalid members, see the
[pydantic_converter sample worker configuration](https://github.com/temporalio/samples-python/blob/4303a9b15f4ddc4cd770bc0ba33afef90a25d3ae/pydantic_converter/worker.py#L45-L65).

Restrictions can also be added by `|`'ing together matchers, for example to restrict the `datetime.date` class from
being used altogether:

Expand Down Expand Up @@ -1129,7 +1146,7 @@ To mitigate this, users should:

* Define workflows in files that have as few non-standard-library imports as possible
* Alter the max workflow cache and/or max concurrent workflows settings if memory grows too large
* Set third-party libraries as passthrough modules if they are known to be side-effect free
* Set non-workflow imports as passthrough modules if they are known to be side-effect free on import and deterministic

###### Extending Restricted Classes

Expand Down
108 changes: 108 additions & 0 deletions temporalio/_log_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
"""Internal utilities for Temporal logging.
This module is internal and may change at any time.
"""

from __future__ import annotations

import json
from collections.abc import Mapping, MutableMapping
from typing import Any, Literal

TemporalLogExtraMode = Literal["dict", "flatten", "json"]
"""Mode controlling how Temporal context is added to log record extra.
Values:
dict: (default) Add context as a nested dictionary under a single key.
This is the original behavior. Suitable for logging handlers that
support nested structures.
flatten: Add each context field as a separate top-level key with a
namespaced prefix. Values that are not primitives (str/int/float/bool)
are converted to strings. This mode is recommended for OpenTelemetry
and other logging pipelines that require flat, scalar attributes.
json: Add context as a JSON string under a single key. Useful when
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we actually have any known use case for json mode?

downstream systems expect string values but you want structured data.
"""


def _apply_temporal_context_to_extra(
extra: MutableMapping[str, Any],
*,
key: str,
prefix: str,
ctx: Mapping[str, Any],
mode: TemporalLogExtraMode,
) -> None:
"""Apply temporal context to log record extra based on the configured mode.
Args:
extra: The mutable extra dict to update.
key: The key to use for dict/json modes (e.g., "temporal_workflow").
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see a great reason to distinguish key and prefix. Do you have any compelling argument not to use the same one for both?

prefix: The prefix to use for flatten mode keys (e.g., "temporal.workflow").
ctx: The context mapping containing temporal fields.
mode: The mode controlling how context is added.
"""
if mode == "dict":
extra[key] = dict(ctx)
elif mode == "json":
extra[key] = json.dumps(ctx, separators=(",", ":"), default=str)
elif mode == "flatten":
for k, v in ctx.items():
# Ensure value is a primitive type safe for OTel attributes
if not isinstance(v, (str, int, float, bool, type(None))):
v = str(v)
extra[f"{prefix}.{k}"] = v
else:
# Fallback to dict for any unknown mode (shouldn't happen with typing)
extra[key] = dict(ctx)


def _update_temporal_context_in_extra(
extra: MutableMapping[str, Any],
*,
key: str,
prefix: str,
update_ctx: Mapping[str, Any],
mode: TemporalLogExtraMode,
) -> None:
"""Update existing temporal context in extra with additional fields.
This is used when adding update info to existing workflow context.
Args:
extra: The mutable extra dict to update.
key: The key used for dict/json modes (e.g., "temporal_workflow").
prefix: The prefix used for flatten mode keys (e.g., "temporal.workflow").
update_ctx: Additional context fields to add/update.
mode: The mode controlling how context is added.
"""
if mode == "dict":
extra.setdefault(key, {}).update(update_ctx)
elif mode == "json":
# For JSON mode, we need to parse, update, and re-serialize
existing = extra.get(key)
if existing is not None:
try:
existing_dict = json.loads(existing)
existing_dict.update(update_ctx)
extra[key] = json.dumps(
existing_dict, separators=(",", ":"), default=str
)
except (json.JSONDecodeError, TypeError):
# If parsing fails, just create a new JSON object with update_ctx
extra[key] = json.dumps(
dict(update_ctx), separators=(",", ":"), default=str
)
else:
extra[key] = json.dumps(
dict(update_ctx), separators=(",", ":"), default=str
)
elif mode == "flatten":
for k, v in update_ctx.items():
# Ensure value is a primitive type safe for OTel attributes
if not isinstance(v, (str, int, float, bool, type(None))):
v = str(v)
extra[f"{prefix}.{k}"] = v
else:
# Fallback to dict for any unknown mode
extra.setdefault(key, {}).update(update_ctx)
15 changes: 14 additions & 1 deletion temporalio/activity.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import temporalio.common
import temporalio.converter

from ._log_utils import TemporalLogExtraMode, _apply_temporal_context_to_extra
from .types import CallableType

if TYPE_CHECKING:
Expand Down Expand Up @@ -500,6 +501,11 @@ class LoggerAdapter(logging.LoggerAdapter):
value will be added to the ``extra`` dictionary with the entire
activity info, making it present on the ``LogRecord.__dict__`` for
use by others. Default is False.
temporal_extra_mode: Controls how activity context is added to log
``extra``. Default is ``"dict"`` (current behavior). Set to
``"flatten"`` for OpenTelemetry compatibility (scalar attributes
with ``temporal.activity.`` prefix), or ``"json"`` for a single JSON
string value.
"""

def __init__(self, logger: logging.Logger, extra: Mapping[str, Any] | None) -> None:
Expand All @@ -508,6 +514,7 @@ def __init__(self, logger: logging.Logger, extra: Mapping[str, Any] | None) -> N
self.activity_info_on_message = True
self.activity_info_on_extra = True
self.full_activity_info_on_extra = False
self.temporal_extra_mode: TemporalLogExtraMode = "dict"

def process(
self, msg: Any, kwargs: MutableMapping[str, Any]
Expand All @@ -525,7 +532,13 @@ def process(
if self.activity_info_on_extra:
# Extra can be absent or None, this handles both
extra = kwargs.get("extra", None) or {}
extra["temporal_activity"] = context.logger_details
_apply_temporal_context_to_extra(
extra,
key="temporal_activity",
prefix="temporal.activity",
ctx=context.logger_details,
mode=self.temporal_extra_mode,
)
kwargs["extra"] = extra
if self.full_activity_info_on_extra:
# Extra can be absent or None, this handles both
Expand Down
27 changes: 25 additions & 2 deletions temporalio/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,11 @@
import temporalio.workflow
from temporalio.nexus._util import ServiceHandlerT

from ._log_utils import (
TemporalLogExtraMode,
_apply_temporal_context_to_extra,
_update_temporal_context_in_extra,
)
from .types import (
AnyType,
CallableAsyncNoParam,
Expand Down Expand Up @@ -1569,6 +1574,11 @@ class LoggerAdapter(logging.LoggerAdapter):
use by others. Default is False.
log_during_replay: Boolean for whether logs should occur during replay.
Default is False.
temporal_extra_mode: Controls how workflow context is added to log
``extra``. Default is ``"dict"`` (current behavior). Set to
``"flatten"`` for OpenTelemetry compatibility (scalar attributes
with ``temporal.workflow.`` prefix), or ``"json"`` for a single JSON
string value.

Values added to ``extra`` are merged with the ``extra`` dictionary from a
logging call, with values from the logging call taking precedence. I.e. the
Expand All @@ -1582,6 +1592,7 @@ def __init__(self, logger: logging.Logger, extra: Mapping[str, Any] | None) -> N
self.workflow_info_on_extra = True
self.full_workflow_info_on_extra = False
self.log_during_replay = False
self.temporal_extra_mode: TemporalLogExtraMode = "dict"
self.disable_sandbox = False

def process(
Expand All @@ -1602,7 +1613,13 @@ def process(
if self.workflow_info_on_message:
msg_extra.update(workflow_details)
if self.workflow_info_on_extra:
extra["temporal_workflow"] = workflow_details
_apply_temporal_context_to_extra(
extra,
key="temporal_workflow",
prefix="temporal.workflow",
ctx=workflow_details,
mode=self.temporal_extra_mode,
)
if self.full_workflow_info_on_extra:
extra["workflow_info"] = runtime.workflow_info()
update_info = current_update_info()
Expand All @@ -1611,7 +1628,13 @@ def process(
if self.workflow_info_on_message:
msg_extra.update(update_details)
if self.workflow_info_on_extra:
extra.setdefault("temporal_workflow", {}).update(update_details)
_update_temporal_context_in_extra(
extra,
key="temporal_workflow",
prefix="temporal.workflow",
update_ctx=update_details,
mode=self.temporal_extra_mode,
)

kwargs["extra"] = {**extra, **(kwargs.get("extra") or {})}
if msg_extra:
Expand Down
Loading