From 300a96437a8f4666cfefb71691cf9a44b5ea0f6d Mon Sep 17 00:00:00 2001 From: Brian Strauch Date: Fri, 12 Jun 2026 15:40:22 -0700 Subject: [PATCH 1/3] Suppress spurious "Failed to detach context" log in OTel interceptor The tracing workflow interceptor attaches an OTel context at the start of execute_workflow/handle_query and detaches it in the finally. The workflow event loop runs portions of the workflow inside contextvars.copy_context().run(...), so the finally can execute in a copied contextvars.Context. A copy preserves the OTel context value (so the existing `context is get_current()` guard passes) but invalidates the attach token, so opentelemetry.context.detach logs "Failed to detach context". Because LogCapturer attaches to the process-global opentelemetry.context logger, that stray log also bled into test_opentelemetry_safe_detach when a tracing workflow tore down during its capture window, making it flaky. Route the best-effort detach through _safe_detach, which drops only that one log record via a scoped logging filter. It still calls opentelemetry.context.detach so attach/detach calls stay balanced, satisfying the leak invariant enforced by test_opentelemetry_context_restored_after_activity. --- .../contrib/opentelemetry/_interceptor.py | 33 +++++++++++++++++-- 1 file changed, 31 insertions(+), 2 deletions(-) diff --git a/temporalio/contrib/opentelemetry/_interceptor.py b/temporalio/contrib/opentelemetry/_interceptor.py index eb22f8be6..433f81452 100644 --- a/temporalio/contrib/opentelemetry/_interceptor.py +++ b/temporalio/contrib/opentelemetry/_interceptor.py @@ -2,7 +2,9 @@ from __future__ import annotations +import contextvars import dataclasses +import logging from collections.abc import Callable, Iterator, Mapping, Sequence from contextlib import contextmanager from dataclasses import dataclass @@ -58,6 +60,33 @@ _ContextT = TypeVar("_ContextT", bound=nexusrpc.handler.OperationContext) +_otel_context_logger = logging.getLogger("opentelemetry.context") + + +class _SuppressDetachFailureFilter(logging.Filter): + def filter(self, record: logging.LogRecord) -> bool: + return not record.getMessage().startswith("Failed to detach context") + + +def _safe_detach(token: contextvars.Token[Context]) -> None: + """Detach an OTel context token, suppressing OTel's spurious failure log. + + A detach can run inside a different ``contextvars.Context`` than the one the + token was created on -- the workflow event loop runs portions of the workflow + inside ``contextvars.copy_context().run(...)``. A copy preserves the OTel + context value but invalidates the token, so OTel's ``detach`` logs "Failed to + detach context". We still call ``opentelemetry.context.detach`` (rather than + e.g. restoring via ``attach``) so attach/detach calls stay balanced -- a leak + invariant the interceptor tests enforce -- and only drop that one log record. + """ + detach_filter = _SuppressDetachFailureFilter() + _otel_context_logger.addFilter(detach_filter) + try: + opentelemetry.context.detach(token) + finally: + _otel_context_logger.removeFilter(detach_filter) + + class TracingInterceptor(temporalio.client.Interceptor, temporalio.worker.Interceptor): """Interceptor that supports client and worker OpenTelemetry span creation and propagation. @@ -568,7 +597,7 @@ async def handle_query(self, input: temporalio.worker.HandleQueryInput) -> Any: # on. As such we do a best effort detach to avoid using a mismatched # token. if context is opentelemetry.context.get_current(): - opentelemetry.context.detach(token) + _safe_detach(token) def handle_update_validator( self, input: temporalio.worker.HandleUpdateInput @@ -663,7 +692,7 @@ def _top_level_workflow_context( # on. As such we do a best effort detach to avoid using a mismatched # token. if context is opentelemetry.context.get_current(): - opentelemetry.context.detach(token) + _safe_detach(token) def _context_to_headers( self, headers: Mapping[str, temporalio.api.common.v1.Payload] From 578a078ebc3ac2718a92e785f21c8ebfcd7e2aeb Mon Sep 17 00:00:00 2001 From: Brian Strauch Date: Tue, 16 Jun 2026 15:12:03 -0700 Subject: [PATCH 2/3] Fix spurious "Failed to detach context" by restoring OTel context via attach The tracing interceptor attached an OTel context and detached it via opentelemetry.context.detach(token) in a finally. detach calls ContextVar.reset(token), which raises (and OTel logs "Failed to detach context") when the finally runs in a different contextvars.Context than the attach -- e.g. when the workflow event loop resumes inside contextvars.copy_context().run(...). A copy preserves the OTel context value (so the `context is get_current()` guard passes) but invalidates the token. Restore the previously-current context by re-attaching it instead of detaching the token. attach goes through ContextVar.set, which is valid in any contextvars.Context, so the failing reset is never reached and there is no spurious log to suppress. This replaces the _safe_detach logging-filter workaround entirely. Rework the leak test to model OTel's current-context ContextVar and assert it is restored to baseline, rather than counting attach/detach calls (which no longer balance once restoration goes through attach). Remove test_opentelemetry_safe_detach, which only exercised the deleted suppression. --- .../contrib/opentelemetry/_interceptor.py | 68 ++++++-------- .../opentelemetry/test_opentelemetry.py | 89 +++++-------------- 2 files changed, 50 insertions(+), 107 deletions(-) diff --git a/temporalio/contrib/opentelemetry/_interceptor.py b/temporalio/contrib/opentelemetry/_interceptor.py index 433f81452..a15f7670b 100644 --- a/temporalio/contrib/opentelemetry/_interceptor.py +++ b/temporalio/contrib/opentelemetry/_interceptor.py @@ -2,9 +2,7 @@ from __future__ import annotations -import contextvars import dataclasses -import logging from collections.abc import Callable, Iterator, Mapping, Sequence from contextlib import contextmanager from dataclasses import dataclass @@ -60,31 +58,24 @@ _ContextT = TypeVar("_ContextT", bound=nexusrpc.handler.OperationContext) -_otel_context_logger = logging.getLogger("opentelemetry.context") +def _restore_context(previous: Context | None, attached: Context | None) -> None: + """Restore ``previous`` as the current OTel context after attaching one. + Restores by re-attaching ``previous`` rather than detaching the attach + token. ``opentelemetry.context.detach`` resets a ``contextvars`` Token, which + fails (and logs "Failed to detach context") when the restore runs in a + different ``contextvars.Context`` than the attach did -- e.g. when the + workflow event loop resumes inside ``contextvars.copy_context().run(...)``. + A copy preserves the OTel context value (so the guard below still matches) + but invalidates the Token. ``attach`` goes through ``ContextVar.set``, which + is valid in any ``contextvars.Context``, so restoration never fails. -class _SuppressDetachFailureFilter(logging.Filter): - def filter(self, record: logging.LogRecord) -> bool: - return not record.getMessage().startswith("Failed to detach context") - - -def _safe_detach(token: contextvars.Token[Context]) -> None: - """Detach an OTel context token, suppressing OTel's spurious failure log. - - A detach can run inside a different ``contextvars.Context`` than the one the - token was created on -- the workflow event loop runs portions of the workflow - inside ``contextvars.copy_context().run(...)``. A copy preserves the OTel - context value but invalidates the token, so OTel's ``detach`` logs "Failed to - detach context". We still call ``opentelemetry.context.detach`` (rather than - e.g. restoring via ``attach``) so attach/detach calls stay balanced -- a leak - invariant the interceptor tests enforce -- and only drop that one log record. + The guard restores only when our attached context is still current, so we + don't clobber a context something else has since attached. A ``None`` + ``previous`` means nothing was attached, so there is nothing to restore. """ - detach_filter = _SuppressDetachFailureFilter() - _otel_context_logger.addFilter(detach_filter) - try: - opentelemetry.context.detach(token) - finally: - _otel_context_logger.removeFilter(detach_filter) + if previous is not None and attached is opentelemetry.context.get_current(): + opentelemetry.context.attach(previous) class TracingInterceptor(temporalio.client.Interceptor, temporalio.worker.Interceptor): @@ -211,7 +202,9 @@ def _start_as_current_span( kind: opentelemetry.trace.SpanKind, context: Context | None = None, ) -> Iterator[None]: - token = opentelemetry.context.attach(context) if context else None + previous = opentelemetry.context.get_current() if context else None + if context: + opentelemetry.context.attach(context) try: with self.tracer.start_as_current_span( name, @@ -248,8 +241,7 @@ def _start_as_current_span( ) raise finally: - if token and context is opentelemetry.context.get_current(): - opentelemetry.context.detach(token) + _restore_context(previous, context) def _completed_workflow_span( self, params: _CompletedWorkflowSpanParams @@ -580,7 +572,8 @@ async def handle_query(self, input: temporalio.worker.HandleQueryInput) -> Any: # We need to put this interceptor on the context too context = self._set_on_context(context) # Run under context with new span - token = opentelemetry.context.attach(context) + previous = opentelemetry.context.get_current() + opentelemetry.context.attach(context) try: # This won't be created if there was no context header self._completed_span( @@ -592,12 +585,7 @@ async def handle_query(self, input: temporalio.worker.HandleQueryInput) -> Any: ) return await super().handle_query(input) finally: - # In some exceptional cases this finally is executed with a - # different contextvars.Context than the one the token was created - # on. As such we do a best effort detach to avoid using a mismatched - # token. - if context is opentelemetry.context.get_current(): - _safe_detach(token) + _restore_context(previous, context) def handle_update_validator( self, input: temporalio.worker.HandleUpdateInput @@ -668,7 +656,8 @@ def _top_level_workflow_context( success = False exception: Exception | None = None # Run under this context - token = opentelemetry.context.attach(context) + previous = opentelemetry.context.get_current() + opentelemetry.context.attach(context) try: yield None @@ -679,7 +668,7 @@ def _top_level_workflow_context( exception = err raise finally: - # Create a completed span before detaching context + # Create a completed span before restoring context if exception or (success and success_is_complete): self._completed_span( f"CompleteWorkflow:{temporalio.workflow.info().workflow_type}", @@ -687,12 +676,7 @@ def _top_level_workflow_context( kind=opentelemetry.trace.SpanKind.INTERNAL, ) - # In some exceptional cases this finally is executed with a - # different contextvars.Context than the one the token was created - # on. As such we do a best effort detach to avoid using a mismatched - # token. - if context is opentelemetry.context.get_current(): - _safe_detach(token) + _restore_context(previous, context) def _context_to_headers( self, headers: Mapping[str, temporalio.api.common.v1.Payload] diff --git a/tests/contrib/opentelemetry/test_opentelemetry.py b/tests/contrib/opentelemetry/test_opentelemetry.py index 71e2fa41d..93af64521 100644 --- a/tests/contrib/opentelemetry/test_opentelemetry.py +++ b/tests/contrib/opentelemetry/test_opentelemetry.py @@ -1,10 +1,7 @@ from __future__ import annotations import asyncio -import gc import logging -import queue -import threading import uuid from collections.abc import Callable, Generator, Iterable from concurrent.futures import ThreadPoolExecutor @@ -14,7 +11,6 @@ from typing import Any, cast import nexusrpc -import opentelemetry.context import pytest from opentelemetry import baggage, context from opentelemetry.sdk.trace import ReadableSpan, TracerProvider @@ -27,7 +23,6 @@ from temporalio.common import RetryPolicy, WorkflowIDConflictPolicy from temporalio.contrib.opentelemetry import ( TracingInterceptor, - TracingWorkflowInboundInterceptor, ) from temporalio.contrib.opentelemetry import workflow as otel_workflow from temporalio.exceptions import ( @@ -37,7 +32,6 @@ ) from temporalio.testing import WorkflowEnvironment from temporalio.worker import UnsandboxedWorkflowRunner, Worker -from tests.helpers import LogCapturer from tests.helpers.nexus import make_nexus_endpoint_name @@ -853,20 +847,31 @@ async def test_opentelemetry_context_restored_after_activity( activity: Callable[[], None], expect_failure: bool, ) -> None: - attach_count = 0 - detach_count = 0 original_attach = context.attach original_detach = context.detach + baseline = context.get_current() + + # Model OTel's single current-context ContextVar across every attach/detach + # so we can assert it returns to the baseline. The interceptor restores by + # re-attaching the previous context rather than detaching a token, so a + # token-pairing count no longer balances; the real leak invariant is that + # the current context is restored to where it started. + attach_count = 0 + modeled_current = baseline + previous_by_token: dict[int, context.Context] = {} - def tracked_attach(ctx): # type:ignore[reportMissingParameterType] - nonlocal attach_count + def tracked_attach(ctx: context.Context) -> Any: + nonlocal attach_count, modeled_current attach_count += 1 - return original_attach(ctx) + token = original_attach(ctx) + previous_by_token[id(token)] = modeled_current + modeled_current = ctx + return token - def tracked_detach(token): # type:ignore[reportMissingParameterType] - nonlocal detach_count - detach_count += 1 - return original_detach(token) + def tracked_detach(token: Any) -> None: + nonlocal modeled_current + modeled_current = previous_by_token.pop(id(token), baseline) + original_detach(token) context.attach = tracked_attach context.detach = tracked_detach @@ -892,10 +897,11 @@ def tracked_detach(token): # type:ignore[reportMissingParameterType] except Exception: assert expect_failure, "This test is not expeced to raise" - assert attach_count == detach_count, ( - f"Context leak detected: {attach_count} attaches vs {detach_count} detaches. " + assert attach_count > 0, "Expected at least one context attach" + assert modeled_current == baseline, ( + "Context leak detected: current context was not restored to baseline " + f"(modeled current={modeled_current!r}, baseline={baseline!r})" ) - assert attach_count > 0, "Expected at least one context attach/detach" finally: context.attach = original_attach @@ -986,50 +992,3 @@ async def test_opentelemetry_standalone_activity_tracing( assert start_activity_span.attributes is not None assert start_activity_span.attributes["temporalActivityID"] == activity_id assert start_activity_span.attributes["temporalActivityType"] == "tracing_activity" - - -def test_opentelemetry_safe_detach(): - class _fake_self: - def _load_workflow_context_carrier(*_args): - return None - - def _set_on_context(self, ctx: Any): - return opentelemetry.context.set_value("test-key", "test-value", ctx) - - def _completed_span(*args: Any, **_kwargs: Any): - pass - - # create a context manager and force enter to happen on this thread - context_manager = TracingWorkflowInboundInterceptor._top_level_workflow_context( - _fake_self(), # type: ignore - success_is_complete=True, - ) - context_manager.__enter__() - - # move reference to context manager into queue - q: queue.Queue = queue.Queue() - q.put(context_manager) - del context_manager - - def worker(): - # pull reference from queue and delete the last reference - context_manager = q.get() - del context_manager - # force gc - gc.collect() - - with LogCapturer().logs_captured(opentelemetry.context.logger) as capturer: - # run forced gc on other thread so exit happens there - t = threading.Thread(target=worker) - t.start() - t.join(timeout=5) - - def otel_context_error(record: logging.LogRecord) -> bool: - return ( - record.name == "opentelemetry.context" - and "Failed to detach context" in record.message - ) - - assert capturer.find(otel_context_error) is None, ( - "Detach from context message should not be logged" - ) From 9ef3841c4b33cadc25c264242a201b9c23382acb Mon Sep 17 00:00:00 2001 From: Brian Strauch Date: Tue, 16 Jun 2026 15:20:53 -0700 Subject: [PATCH 3/3] Fix mypy error in OTel leak test context.attach's parameter is named `context`; the leak test's tracked_attach wrapper named it `ctx`, which mypy rejects when assigning the wrapper to context.attach (parameter names are part of a callable's type). Match the name and annotate with the imported Context type. --- tests/contrib/opentelemetry/test_opentelemetry.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/tests/contrib/opentelemetry/test_opentelemetry.py b/tests/contrib/opentelemetry/test_opentelemetry.py index 93af64521..a39071c4e 100644 --- a/tests/contrib/opentelemetry/test_opentelemetry.py +++ b/tests/contrib/opentelemetry/test_opentelemetry.py @@ -13,6 +13,7 @@ import nexusrpc import pytest from opentelemetry import baggage, context +from opentelemetry.context import Context from opentelemetry.sdk.trace import ReadableSpan, TracerProvider from opentelemetry.sdk.trace.export import SimpleSpanProcessor from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter @@ -858,14 +859,14 @@ async def test_opentelemetry_context_restored_after_activity( # the current context is restored to where it started. attach_count = 0 modeled_current = baseline - previous_by_token: dict[int, context.Context] = {} + previous_by_token: dict[int, Context] = {} - def tracked_attach(ctx: context.Context) -> Any: + def tracked_attach(context: Context) -> Any: nonlocal attach_count, modeled_current attach_count += 1 - token = original_attach(ctx) + token = original_attach(context) previous_by_token[id(token)] = modeled_current - modeled_current = ctx + modeled_current = context return token def tracked_detach(token: Any) -> None: