diff --git a/README.md b/README.md index 38f3696d5..021e64975 100644 --- a/README.md +++ b/README.md @@ -1018,10 +1018,21 @@ By default the sandbox completely reloads non-standard-library and non-Temporal the sandbox quicker and use less memory when importing known-side-effect-free modules, they can be marked as passthrough modules. -**For performance and behavior reasons, users are encouraged to pass through all third party modules whose calls will be +**Passthrough modules are about import-time behavior and determinism, not just about whether a module is third-party.** +Any module that is side-effect-free on import and makes only deterministic calls can be passed through, including +first-party application modules, third-party libraries, and non-workflow-specific code. The key criteria are: +1. The module does not have import-time side effects (e.g., file I/O, network calls, random values) +2. Calls made from the module are deterministic within workflow code + +**For performance and behavior reasons, users are encouraged to pass through all non-workflow imports whose calls will be deterministic.** In particular, this advice extends to modules containing the activities to be referenced in workflows, and modules containing dataclasses and Pydantic models, which can be particularly expensive to import. +**Note on datetime and similar stdlib modules:** If you need to use non-deterministic functions like `datetime.date.today()`, +do **not** mark `datetime` as passthrough. Instead, use `with_child_unrestricted()` to allow specific invalid members +(see [Invalid Module Members](#invalid-module-members) below). Passthrough affects import reloading, while +`invalid_module_members` controls which calls are allowed at runtime. + One way to pass through a module is at import time in the workflow file using the `imports_passed_through` context manager like so: @@ -1071,7 +1082,7 @@ Note, some calls from the module may still be checked for invalid calls at runti `SandboxRestrictions.invalid_module_members` contains a root matcher that applies to all module members. This already has a default set which includes things like `datetime.date.today()` which should never be called from a workflow. To -remove this restriction: +remove this restriction and allow a specific call like `datetime.date.today()`: ```python my_restrictions = dataclasses.replace( @@ -1083,6 +1094,12 @@ my_restrictions = dataclasses.replace( my_worker = Worker(..., workflow_runner=SandboxedWorkflowRunner(restrictions=my_restrictions)) ``` +**This is the correct approach for allowing non-deterministic stdlib calls.** Do not use passthrough modules for this +purpose—passthrough controls import reloading, while `invalid_module_members` controls runtime call restrictions. + +For a complete example showing both passthrough modules and unrestricted invalid members, see the +[pydantic_converter sample worker configuration](https://github.com/temporalio/samples-python/blob/4303a9b15f4ddc4cd770bc0ba33afef90a25d3ae/pydantic_converter/worker.py#L45-L65). + Restrictions can also be added by `|`'ing together matchers, for example to restrict the `datetime.date` class from being used altogether: @@ -1129,7 +1146,7 @@ To mitigate this, users should: * Define workflows in files that have as few non-standard-library imports as possible * Alter the max workflow cache and/or max concurrent workflows settings if memory grows too large -* Set third-party libraries as passthrough modules if they are known to be side-effect free +* Set non-workflow imports as passthrough modules if they are known to be side-effect free on import and deterministic ###### Extending Restricted Classes diff --git a/temporalio/_log_utils.py b/temporalio/_log_utils.py new file mode 100644 index 000000000..b97d87fa1 --- /dev/null +++ b/temporalio/_log_utils.py @@ -0,0 +1,108 @@ +"""Internal utilities for Temporal logging. + +This module is internal and may change at any time. +""" + +from __future__ import annotations + +import json +from collections.abc import Mapping, MutableMapping +from typing import Any, Literal + +TemporalLogExtraMode = Literal["dict", "flatten", "json"] +"""Mode controlling how Temporal context is added to log record extra. + +Values: + dict: (default) Add context as a nested dictionary under a single key. + This is the original behavior. Suitable for logging handlers that + support nested structures. + flatten: Add each context field as a separate top-level key with a + namespaced prefix. Values that are not primitives (str/int/float/bool) + are converted to strings. This mode is recommended for OpenTelemetry + and other logging pipelines that require flat, scalar attributes. + json: Add context as a JSON string under a single key. Useful when + downstream systems expect string values but you want structured data. +""" + + +def _apply_temporal_context_to_extra( + extra: MutableMapping[str, Any], + *, + key: str, + prefix: str, + ctx: Mapping[str, Any], + mode: TemporalLogExtraMode, +) -> None: + """Apply temporal context to log record extra based on the configured mode. + + Args: + extra: The mutable extra dict to update. + key: The key to use for dict/json modes (e.g., "temporal_workflow"). + prefix: The prefix to use for flatten mode keys (e.g., "temporal.workflow"). + ctx: The context mapping containing temporal fields. + mode: The mode controlling how context is added. + """ + if mode == "dict": + extra[key] = dict(ctx) + elif mode == "json": + extra[key] = json.dumps(ctx, separators=(",", ":"), default=str) + elif mode == "flatten": + for k, v in ctx.items(): + # Ensure value is a primitive type safe for OTel attributes + if not isinstance(v, (str, int, float, bool, type(None))): + v = str(v) + extra[f"{prefix}.{k}"] = v + else: + # Fallback to dict for any unknown mode (shouldn't happen with typing) + extra[key] = dict(ctx) + + +def _update_temporal_context_in_extra( + extra: MutableMapping[str, Any], + *, + key: str, + prefix: str, + update_ctx: Mapping[str, Any], + mode: TemporalLogExtraMode, +) -> None: + """Update existing temporal context in extra with additional fields. + + This is used when adding update info to existing workflow context. + + Args: + extra: The mutable extra dict to update. + key: The key used for dict/json modes (e.g., "temporal_workflow"). + prefix: The prefix used for flatten mode keys (e.g., "temporal.workflow"). + update_ctx: Additional context fields to add/update. + mode: The mode controlling how context is added. + """ + if mode == "dict": + extra.setdefault(key, {}).update(update_ctx) + elif mode == "json": + # For JSON mode, we need to parse, update, and re-serialize + existing = extra.get(key) + if existing is not None: + try: + existing_dict = json.loads(existing) + existing_dict.update(update_ctx) + extra[key] = json.dumps( + existing_dict, separators=(",", ":"), default=str + ) + except (json.JSONDecodeError, TypeError): + # If parsing fails, just create a new JSON object with update_ctx + extra[key] = json.dumps( + dict(update_ctx), separators=(",", ":"), default=str + ) + else: + extra[key] = json.dumps( + dict(update_ctx), separators=(",", ":"), default=str + ) + elif mode == "flatten": + for k, v in update_ctx.items(): + # Ensure value is a primitive type safe for OTel attributes + if not isinstance(v, (str, int, float, bool, type(None))): + v = str(v) + extra[f"{prefix}.{k}"] = v + else: + # Fallback to dict for any unknown mode + extra.setdefault(key, {}).update(update_ctx) diff --git a/temporalio/activity.py b/temporalio/activity.py index ff46bdea8..dcaaee374 100644 --- a/temporalio/activity.py +++ b/temporalio/activity.py @@ -32,6 +32,7 @@ import temporalio.common import temporalio.converter +from ._log_utils import TemporalLogExtraMode, _apply_temporal_context_to_extra from .types import CallableType if TYPE_CHECKING: @@ -500,6 +501,11 @@ class LoggerAdapter(logging.LoggerAdapter): value will be added to the ``extra`` dictionary with the entire activity info, making it present on the ``LogRecord.__dict__`` for use by others. Default is False. + temporal_extra_mode: Controls how activity context is added to log + ``extra``. Default is ``"dict"`` (current behavior). Set to + ``"flatten"`` for OpenTelemetry compatibility (scalar attributes + with ``temporal.activity.`` prefix), or ``"json"`` for a single JSON + string value. """ def __init__(self, logger: logging.Logger, extra: Mapping[str, Any] | None) -> None: @@ -508,6 +514,7 @@ def __init__(self, logger: logging.Logger, extra: Mapping[str, Any] | None) -> N self.activity_info_on_message = True self.activity_info_on_extra = True self.full_activity_info_on_extra = False + self.temporal_extra_mode: TemporalLogExtraMode = "dict" def process( self, msg: Any, kwargs: MutableMapping[str, Any] @@ -525,7 +532,13 @@ def process( if self.activity_info_on_extra: # Extra can be absent or None, this handles both extra = kwargs.get("extra", None) or {} - extra["temporal_activity"] = context.logger_details + _apply_temporal_context_to_extra( + extra, + key="temporal_activity", + prefix="temporal.activity", + ctx=context.logger_details, + mode=self.temporal_extra_mode, + ) kwargs["extra"] = extra if self.full_activity_info_on_extra: # Extra can be absent or None, this handles both diff --git a/temporalio/workflow.py b/temporalio/workflow.py index 1fbcc0717..1e66a8793 100644 --- a/temporalio/workflow.py +++ b/temporalio/workflow.py @@ -61,6 +61,11 @@ import temporalio.workflow from temporalio.nexus._util import ServiceHandlerT +from ._log_utils import ( + TemporalLogExtraMode, + _apply_temporal_context_to_extra, + _update_temporal_context_in_extra, +) from .types import ( AnyType, CallableAsyncNoParam, @@ -1569,6 +1574,11 @@ class LoggerAdapter(logging.LoggerAdapter): use by others. Default is False. log_during_replay: Boolean for whether logs should occur during replay. Default is False. + temporal_extra_mode: Controls how workflow context is added to log + ``extra``. Default is ``"dict"`` (current behavior). Set to + ``"flatten"`` for OpenTelemetry compatibility (scalar attributes + with ``temporal.workflow.`` prefix), or ``"json"`` for a single JSON + string value. Values added to ``extra`` are merged with the ``extra`` dictionary from a logging call, with values from the logging call taking precedence. I.e. the @@ -1582,6 +1592,7 @@ def __init__(self, logger: logging.Logger, extra: Mapping[str, Any] | None) -> N self.workflow_info_on_extra = True self.full_workflow_info_on_extra = False self.log_during_replay = False + self.temporal_extra_mode: TemporalLogExtraMode = "dict" self.disable_sandbox = False def process( @@ -1602,7 +1613,13 @@ def process( if self.workflow_info_on_message: msg_extra.update(workflow_details) if self.workflow_info_on_extra: - extra["temporal_workflow"] = workflow_details + _apply_temporal_context_to_extra( + extra, + key="temporal_workflow", + prefix="temporal.workflow", + ctx=workflow_details, + mode=self.temporal_extra_mode, + ) if self.full_workflow_info_on_extra: extra["workflow_info"] = runtime.workflow_info() update_info = current_update_info() @@ -1611,7 +1628,13 @@ def process( if self.workflow_info_on_message: msg_extra.update(update_details) if self.workflow_info_on_extra: - extra.setdefault("temporal_workflow", {}).update(update_details) + _update_temporal_context_in_extra( + extra, + key="temporal_workflow", + prefix="temporal.workflow", + update_ctx=update_details, + mode=self.temporal_extra_mode, + ) kwargs["extra"] = {**extra, **(kwargs.get("extra") or {})} if msg_extra: diff --git a/tests/test_log_utils.py b/tests/test_log_utils.py new file mode 100644 index 000000000..3292be227 --- /dev/null +++ b/tests/test_log_utils.py @@ -0,0 +1,478 @@ +"""Tests for Temporal logging utilities and LoggerAdapter modes.""" + +from __future__ import annotations + +import json +import logging +from typing import Any + +import pytest + +from temporalio._log_utils import ( + _apply_temporal_context_to_extra, + _update_temporal_context_in_extra, +) + + +class TestApplyTemporalContextToExtra: + """Tests for _apply_temporal_context_to_extra helper.""" + + @pytest.fixture + def sample_context(self) -> dict[str, Any]: + return { + "attempt": 1, + "namespace": "default", + "run_id": "abc123", + "task_queue": "test-queue", + "workflow_id": "wf-001", + "workflow_type": "TestWorkflow", + } + + def test_dict_mode_adds_nested_dict(self, sample_context: dict[str, Any]) -> None: + extra: dict[str, Any] = {} + _apply_temporal_context_to_extra( + extra, + key="temporal_workflow", + prefix="temporal.workflow", + ctx=sample_context, + mode="dict", + ) + + assert "temporal_workflow" in extra + assert extra["temporal_workflow"] == sample_context + # Verify it's a copy, not the same object + assert extra["temporal_workflow"] is not sample_context + + def test_json_mode_adds_json_string(self, sample_context: dict[str, Any]) -> None: + extra: dict[str, Any] = {} + _apply_temporal_context_to_extra( + extra, + key="temporal_workflow", + prefix="temporal.workflow", + ctx=sample_context, + mode="json", + ) + + assert "temporal_workflow" in extra + assert isinstance(extra["temporal_workflow"], str) + # Verify it's valid JSON + parsed = json.loads(extra["temporal_workflow"]) + assert parsed == sample_context + + def test_flatten_mode_adds_prefixed_keys( + self, sample_context: dict[str, Any] + ) -> None: + extra: dict[str, Any] = {} + _apply_temporal_context_to_extra( + extra, + key="temporal_workflow", + prefix="temporal.workflow", + ctx=sample_context, + mode="flatten", + ) + + # Should NOT have the nested dict key + assert "temporal_workflow" not in extra + + # Should have individual prefixed keys + assert extra["temporal.workflow.attempt"] == 1 + assert extra["temporal.workflow.namespace"] == "default" + assert extra["temporal.workflow.run_id"] == "abc123" + assert extra["temporal.workflow.task_queue"] == "test-queue" + assert extra["temporal.workflow.workflow_id"] == "wf-001" + assert extra["temporal.workflow.workflow_type"] == "TestWorkflow" + + def test_flatten_mode_converts_non_primitives_to_string(self) -> None: + ctx = { + "string_val": "hello", + "int_val": 42, + "float_val": 3.14, + "bool_val": True, + "none_val": None, + "list_val": [1, 2, 3], + "dict_val": {"nested": "value"}, + } + extra: dict[str, Any] = {} + _apply_temporal_context_to_extra( + extra, + key="temporal_test", + prefix="temporal.test", + ctx=ctx, + mode="flatten", + ) + + # Primitives should remain as-is + assert extra["temporal.test.string_val"] == "hello" + assert extra["temporal.test.int_val"] == 42 + assert extra["temporal.test.float_val"] == 3.14 + assert extra["temporal.test.bool_val"] is True + assert extra["temporal.test.none_val"] is None + + # Non-primitives should be converted to strings + assert extra["temporal.test.list_val"] == "[1, 2, 3]" + assert extra["temporal.test.dict_val"] == "{'nested': 'value'}" + + def test_flatten_mode_does_not_produce_dict_values( + self, sample_context: dict[str, Any] + ) -> None: + """Verify no value in flattened extra is a dict (OTel compatibility).""" + extra: dict[str, Any] = {} + _apply_temporal_context_to_extra( + extra, + key="temporal_workflow", + prefix="temporal.workflow", + ctx=sample_context, + mode="flatten", + ) + + for key, value in extra.items(): + if key.startswith("temporal."): + assert not isinstance( + value, dict + ), f"Key {key} has dict value, not OTel-safe" + + +class TestUpdateTemporalContextInExtra: + """Tests for _update_temporal_context_in_extra helper.""" + + @pytest.fixture + def initial_context(self) -> dict[str, Any]: + return { + "workflow_id": "wf-001", + "workflow_type": "TestWorkflow", + } + + @pytest.fixture + def update_context(self) -> dict[str, Any]: + return { + "update_id": "upd-001", + "update_name": "my_update", + } + + def test_dict_mode_updates_existing_dict( + self, initial_context: dict[str, Any], update_context: dict[str, Any] + ) -> None: + extra: dict[str, Any] = {} + # First apply initial context + _apply_temporal_context_to_extra( + extra, + key="temporal_workflow", + prefix="temporal.workflow", + ctx=initial_context, + mode="dict", + ) + # Then update + _update_temporal_context_in_extra( + extra, + key="temporal_workflow", + prefix="temporal.workflow", + update_ctx=update_context, + mode="dict", + ) + + assert "temporal_workflow" in extra + assert extra["temporal_workflow"]["workflow_id"] == "wf-001" + assert extra["temporal_workflow"]["workflow_type"] == "TestWorkflow" + assert extra["temporal_workflow"]["update_id"] == "upd-001" + assert extra["temporal_workflow"]["update_name"] == "my_update" + + def test_json_mode_updates_json_string( + self, initial_context: dict[str, Any], update_context: dict[str, Any] + ) -> None: + extra: dict[str, Any] = {} + # First apply initial context + _apply_temporal_context_to_extra( + extra, + key="temporal_workflow", + prefix="temporal.workflow", + ctx=initial_context, + mode="json", + ) + # Then update + _update_temporal_context_in_extra( + extra, + key="temporal_workflow", + prefix="temporal.workflow", + update_ctx=update_context, + mode="json", + ) + + assert "temporal_workflow" in extra + parsed = json.loads(extra["temporal_workflow"]) + assert parsed["workflow_id"] == "wf-001" + assert parsed["update_id"] == "upd-001" + + def test_flatten_mode_adds_prefixed_update_keys( + self, initial_context: dict[str, Any], update_context: dict[str, Any] + ) -> None: + extra: dict[str, Any] = {} + # First apply initial context + _apply_temporal_context_to_extra( + extra, + key="temporal_workflow", + prefix="temporal.workflow", + ctx=initial_context, + mode="flatten", + ) + # Then update + _update_temporal_context_in_extra( + extra, + key="temporal_workflow", + prefix="temporal.workflow", + update_ctx=update_context, + mode="flatten", + ) + + assert extra["temporal.workflow.workflow_id"] == "wf-001" + assert extra["temporal.workflow.workflow_type"] == "TestWorkflow" + assert extra["temporal.workflow.update_id"] == "upd-001" + assert extra["temporal.workflow.update_name"] == "my_update" + + +class TestActivityPrefixes: + """Tests for activity-specific prefixes.""" + + @pytest.fixture + def activity_context(self) -> dict[str, Any]: + return { + "activity_id": "act-001", + "activity_type": "TestActivity", + "attempt": 1, + "namespace": "default", + "task_queue": "test-queue", + "workflow_id": "wf-001", + "workflow_run_id": "run-001", + "workflow_type": "TestWorkflow", + } + + def test_activity_flatten_uses_activity_prefix( + self, activity_context: dict[str, Any] + ) -> None: + extra: dict[str, Any] = {} + _apply_temporal_context_to_extra( + extra, + key="temporal_activity", + prefix="temporal.activity", + ctx=activity_context, + mode="flatten", + ) + + assert "temporal_activity" not in extra + assert extra["temporal.activity.activity_id"] == "act-001" + assert extra["temporal.activity.activity_type"] == "TestActivity" + assert extra["temporal.activity.attempt"] == 1 + + +class TestNamespaceSafety: + """Tests to verify flattened keys use proper namespacing.""" + + def test_flattened_keys_do_not_conflict_with_logrecord_attrs(self) -> None: + """Verify temporal prefixed keys won't conflict with LogRecord attributes.""" + # Standard LogRecord attributes that we must not overwrite + logrecord_attrs = { + "name", + "msg", + "args", + "created", + "filename", + "funcName", + "levelname", + "levelno", + "lineno", + "module", + "msecs", + "pathname", + "process", + "processName", + "relativeCreated", + "stack_info", + "exc_info", + "exc_text", + "thread", + "threadName", + "message", + } + + ctx = { + "msecs": 999, # Same name as LogRecord attr + "name": "workflow-name", # Same name as LogRecord attr + "workflow_id": "wf-001", + } + extra: dict[str, Any] = {} + _apply_temporal_context_to_extra( + extra, + key="temporal_workflow", + prefix="temporal.workflow", + ctx=ctx, + mode="flatten", + ) + + # None of our flattened keys should be standard LogRecord attrs + for key in extra.keys(): + assert ( + key not in logrecord_attrs + ), f"Key {key} conflicts with LogRecord attribute" + + # All our keys should have the temporal prefix + for key in extra.keys(): + assert key.startswith( + "temporal." + ), f"Key {key} doesn't have temporal prefix" + + +class TestFlattenModeOTelSafety: + """Critical tests to verify flatten mode is fully OTel-safe.""" + + def test_flatten_mode_produces_zero_dicts_for_temporal_keys(self) -> None: + """Verify flatten mode has zero dict values for any temporal keys. + + This is the critical OTel compatibility requirement - nested dicts + cause OTel pipelines to reject or drop the attributes. + """ + ctx = { + "workflow_id": "wf-001", + "workflow_type": "TestWorkflow", + "run_id": "run-001", + "namespace": "default", + "task_queue": "test-queue", + "attempt": 1, + } + extra: dict[str, Any] = {} + _apply_temporal_context_to_extra( + extra, + key="temporal_workflow", + prefix="temporal.workflow", + ctx=ctx, + mode="flatten", + ) + + # Assert no dict-valued keys exist at all + for key, value in extra.items(): + assert not isinstance(value, dict), ( + f"Flatten mode violation: {key}={type(value).__name__} " + f"(expected primitive, got dict)" + ) + + # Assert legacy nested keys don't exist in flatten mode + assert ( + "temporal_workflow" not in extra + ), "Legacy nested key 'temporal_workflow' should not exist in flatten mode" + assert ( + "temporal_activity" not in extra + ), "Legacy nested key 'temporal_activity' should not exist in flatten mode" + + def test_flatten_mode_activity_produces_zero_dicts(self) -> None: + """Verify activity flatten mode has zero dict values.""" + ctx = { + "activity_id": "act-001", + "activity_type": "TestActivity", + "attempt": 1, + "namespace": "default", + "task_queue": "test-queue", + "workflow_id": "wf-001", + "workflow_run_id": "run-001", + "workflow_type": "TestWorkflow", + } + extra: dict[str, Any] = {} + _apply_temporal_context_to_extra( + extra, + key="temporal_activity", + prefix="temporal.activity", + ctx=ctx, + mode="flatten", + ) + + # Assert no dict-valued keys exist + for key, value in extra.items(): + assert not isinstance( + value, dict + ), f"Flatten mode violation: {key}={type(value).__name__}" + + # Assert legacy nested key doesn't exist + assert "temporal_activity" not in extra + + def test_flatten_mode_with_update_produces_zero_dicts(self) -> None: + """Verify flatten mode with update info still produces zero dicts.""" + workflow_ctx = { + "workflow_id": "wf-001", + "workflow_type": "TestWorkflow", + } + update_ctx = { + "update_id": "upd-001", + "update_name": "my_update", + } + extra: dict[str, Any] = {} + + # Apply workflow context + _apply_temporal_context_to_extra( + extra, + key="temporal_workflow", + prefix="temporal.workflow", + ctx=workflow_ctx, + mode="flatten", + ) + # Apply update context + _update_temporal_context_in_extra( + extra, + key="temporal_workflow", + prefix="temporal.workflow", + update_ctx=update_ctx, + mode="flatten", + ) + + # Assert no dict-valued keys exist after both operations + for key, value in extra.items(): + assert not isinstance( + value, dict + ), f"Flatten mode violation after update: {key}={type(value).__name__}" + + # Assert legacy nested key doesn't exist + assert "temporal_workflow" not in extra + + # Verify all expected keys are present as flat primitives + assert extra["temporal.workflow.workflow_id"] == "wf-001" + assert extra["temporal.workflow.update_id"] == "upd-001" + + +class TestLogRecordAccessibility: + """Tests to verify flattened attributes are accessible on LogRecord.__dict__.""" + + def test_flattened_attrs_accessible_via_record_dict(self) -> None: + """Verify that flattened attributes can be accessed via LogRecord.__dict__.""" + ctx = { + "workflow_id": "wf-001", + "attempt": 1, + } + extra: dict[str, Any] = {} + _apply_temporal_context_to_extra( + extra, + key="temporal_workflow", + prefix="temporal.workflow", + ctx=ctx, + mode="flatten", + ) + + # Create a log record with our extra data + record = logging.LogRecord( + name="test", + level=logging.INFO, + pathname="", + lineno=0, + msg="test message", + args=(), + exc_info=None, + ) + # Add our extra to the record + for key, value in extra.items(): + setattr(record, key, value) + + # Verify accessibility via __dict__ + assert record.__dict__["temporal.workflow.workflow_id"] == "wf-001" + assert record.__dict__["temporal.workflow.attempt"] == 1 + + # Verify all values are primitive (OTel-safe) + for key in extra.keys(): + value = record.__dict__[key] + assert isinstance( + value, (str, int, float, bool, type(None)) + ), f"Value for {key} is not primitive: {type(value)}" diff --git a/tests/worker/test_activity.py b/tests/worker/test_activity.py index e66a42dc0..8c2e8bf52 100644 --- a/tests/worker/test_activity.py +++ b/tests/worker/test_activity.py @@ -1042,6 +1042,115 @@ async def say_hello(name: str) -> str: assert records[-1].__dict__["temporal_activity"]["activity_type"] == "say_hello" +async def test_activity_logging_flatten_mode( + client: Client, + worker: ExternalWorker, + shared_state_manager: SharedStateManager, +): + """Test that activity logger flatten mode produces OTel-safe scalar attributes.""" + + @activity.defn + async def say_hello_flatten(name: str) -> str: + activity.logger.info(f"Called with arg: {name}") + return f"Hello, {name}!" + + # Save original mode and set to flatten + original_mode = activity.logger.temporal_extra_mode + activity.logger.temporal_extra_mode = "flatten" + + handler = logging.handlers.QueueHandler(queue.Queue()) + activity.logger.base_logger.addHandler(handler) + prev_level = activity.logger.base_logger.level + activity.logger.base_logger.setLevel(logging.INFO) + try: + result = await _execute_workflow_with_activity( + client, + worker, + say_hello_flatten, + "Temporal", + shared_state_manager=shared_state_manager, + ) + finally: + activity.logger.base_logger.removeHandler(handler) + activity.logger.base_logger.setLevel(prev_level) + activity.logger.temporal_extra_mode = original_mode + + assert result.result == "Hello, Temporal!" + records: list[logging.LogRecord] = list(handler.queue.queue) # type: ignore + assert len(records) > 0 + record = records[-1] + + # Should NOT have nested dict + assert "temporal_activity" not in record.__dict__ + + # Should have flattened keys with temporal.activity prefix + assert record.__dict__["temporal.activity.activity_type"] == "say_hello_flatten" + assert "temporal.activity.activity_id" in record.__dict__ + assert "temporal.activity.workflow_id" in record.__dict__ + assert "temporal.activity.workflow_run_id" in record.__dict__ + assert "temporal.activity.namespace" in record.__dict__ + assert "temporal.activity.task_queue" in record.__dict__ + assert record.__dict__["temporal.activity.attempt"] == 1 + + # Verify all temporal.activity.* values are primitives (OTel-safe) + for key, value in record.__dict__.items(): + if key.startswith("temporal.activity."): + assert isinstance( + value, (str, int, float, bool, type(None)) + ), f"Key {key} has non-primitive value: {type(value)}" + + +async def test_activity_logging_json_mode( + client: Client, + worker: ExternalWorker, + shared_state_manager: SharedStateManager, +): + """Test that activity logger json mode produces a JSON string value.""" + import json as json_module + + @activity.defn + async def say_hello_json(name: str) -> str: + activity.logger.info(f"Called with arg: {name}") + return f"Hello, {name}!" + + # Save original mode and set to json + original_mode = activity.logger.temporal_extra_mode + activity.logger.temporal_extra_mode = "json" + + handler = logging.handlers.QueueHandler(queue.Queue()) + activity.logger.base_logger.addHandler(handler) + prev_level = activity.logger.base_logger.level + activity.logger.base_logger.setLevel(logging.INFO) + try: + result = await _execute_workflow_with_activity( + client, + worker, + say_hello_json, + "Temporal", + shared_state_manager=shared_state_manager, + ) + finally: + activity.logger.base_logger.removeHandler(handler) + activity.logger.base_logger.setLevel(prev_level) + activity.logger.temporal_extra_mode = original_mode + + assert result.result == "Hello, Temporal!" + records: list[logging.LogRecord] = list(handler.queue.queue) # type: ignore + assert len(records) > 0 + record = records[-1] + + # Should have temporal_activity as a JSON string + assert "temporal_activity" in record.__dict__ + json_str = record.__dict__["temporal_activity"] + assert isinstance(json_str, str) + + # Should be valid JSON + parsed = json_module.loads(json_str) + assert parsed["activity_type"] == "say_hello_json" + assert "activity_id" in parsed + assert "workflow_id" in parsed + + async def test_activity_worker_shutdown( client: Client, worker: ExternalWorker, diff --git a/tests/worker/test_workflow.py b/tests/worker/test_workflow.py index 4d9299f09..eb46ac3ff 100644 --- a/tests/worker/test_workflow.py +++ b/tests/worker/test_workflow.py @@ -2064,6 +2064,98 @@ async def test_workflow_logging(client: Client): assert capturer.find_log("Signal: finish") +async def test_workflow_logging_flatten_mode(client: Client): + """Test that flatten mode produces OTel-safe scalar attributes.""" + # Save original mode and set to flatten + original_mode = workflow.logger.temporal_extra_mode + workflow.logger.temporal_extra_mode = "flatten" + + try: + with LogCapturer().logs_captured(workflow.logger.base_logger) as capturer: + async with new_worker( + client, LoggingWorkflow, max_cached_workflows=0 + ) as worker: + handle = await client.start_workflow( + LoggingWorkflow.run, + id=f"workflow-{uuid.uuid4()}", + task_queue=worker.task_queue, + ) + await handle.signal(LoggingWorkflow.my_signal, "signal 1") + await handle.execute_update( + LoggingWorkflow.my_update, "update 1", id="update-1" + ) + await handle.signal(LoggingWorkflow.my_signal, "finish") + await handle.result() + + # Check signal log record + record = capturer.find_log("Signal: signal 1") + assert record is not None + + # Should NOT have nested dict + assert "temporal_workflow" not in record.__dict__ + + # Should have flattened keys with temporal.workflow prefix + assert record.__dict__["temporal.workflow.workflow_type"] == "LoggingWorkflow" + assert "temporal.workflow.workflow_id" in record.__dict__ + assert "temporal.workflow.run_id" in record.__dict__ + assert "temporal.workflow.namespace" in record.__dict__ + assert "temporal.workflow.task_queue" in record.__dict__ + assert record.__dict__["temporal.workflow.attempt"] == 1 + + # Verify all temporal.workflow.* values are primitives (OTel-safe) + for key, value in record.__dict__.items(): + if key.startswith("temporal.workflow."): + assert isinstance( + value, (str, int, float, bool, type(None)) + ), f"Key {key} has non-primitive value: {type(value)}" + + # Check update log record has flattened update fields + update_record = capturer.find_log("Update: update 1") + assert update_record is not None + assert update_record.__dict__["temporal.workflow.update_id"] == "update-1" + assert update_record.__dict__["temporal.workflow.update_name"] == "my_update" + finally: + workflow.logger.temporal_extra_mode = original_mode + + +async def test_workflow_logging_json_mode(client: Client): + """Test that json mode produces a JSON string value.""" + # Save original mode and set to json + original_mode = workflow.logger.temporal_extra_mode + workflow.logger.temporal_extra_mode = "json" + + try: + with LogCapturer().logs_captured(workflow.logger.base_logger) as capturer: + async with new_worker( + client, LoggingWorkflow, max_cached_workflows=0 + ) as worker: + handle = await client.start_workflow( + LoggingWorkflow.run, + id=f"workflow-{uuid.uuid4()}", + task_queue=worker.task_queue, + ) + await handle.signal(LoggingWorkflow.my_signal, "signal 1") + await handle.signal(LoggingWorkflow.my_signal, "finish") + await handle.result() + + # Check log record + record = capturer.find_log("Signal: signal 1") + assert record is not None + + # Should have temporal_workflow as a JSON string + assert "temporal_workflow" in record.__dict__ + json_str = record.__dict__["temporal_workflow"] + assert isinstance(json_str, str) + + # Should be valid JSON + parsed = json.loads(json_str) + assert parsed["workflow_type"] == "LoggingWorkflow" + assert "workflow_id" in parsed + assert "run_id" in parsed + finally: + workflow.logger.temporal_extra_mode = original_mode + + @activity.defn async def task_fail_once_activity() -> None: if activity.info().attempt == 1: