a_logging_context

Mar 5, 2025ยทan implementation for logging variables within a given context.

a_logging_context.md

logging

Tiny, robust logging context helpers built on contextvars, plus a Scenario context manager (and decorator) that adds structured timing marks to your logs.
Works seamlessly across async and sync Python code.

# app.py
import logging
from logging_context import setup_logging_context
from my_json_formatter import JsonFormatter  # see below

setup_logging_context()

handler = logging.StreamHandler()
handler.setFormatter(JsonFormatter())
root = logging.getLogger()
root.setLevel(logging.INFO)
root.addHandler(handler)

example json formatter

# my_json_formatter.py
import json, logging, time

class JsonFormatter(logging.Formatter):
    def format(self, record: logging.LogRecord) -> str:
        base = {
            "ts": time.strftime("%Y-%m-%dT%H:%M:%S", time.localtime(record.created)),
            "lvl": record.levelname,
            "logger": record.name,
            "msg": record.getMessage(),
        }
        ctx = {
            "scenario_id": getattr(record, "scenario_id", None),
            "scenario_short_id": getattr(record, "scenario_short_id", None),
            "scenario_name": getattr(record, "scenario_name", None),
            "scenario_last_mark": getattr(record, "scenario_last_mark", None),
            "scenario_last_mark_ms": getattr(record, "scenario_last_mark_ms", None),
            "scenario_total_ms": getattr(record, "scenario_total_ms", None),
            "scenario_marks": getattr(record, "scenario_marks", None),
        }
        base.update({k: v for k, v in ctx.items() if v is not None})
        return json.dumps(base, ensure_ascii=False)

using logging contexts

Attach ad-hoc fields that automatically propagate into log records.

from logging_context import logging_context_sync, logging_context

# sync
with logging_context_sync(request_id="abc123", component="ingestor"):
    logger.info("downloading")

# async
async with logging_context(user_id="u_42", tenant="acme"):
    logger.info("processing")

All context vars are injected automatically by the setup_logging_context() filter.

๐ŸŽฌ scenarios

Scenario enriches logs with metadata & timing marks.

import logging
from scenario import Scenario

logger = logging.getLogger(__name__)

with Scenario("ingest", extra={"component": "leecher"}) as s:
    s.mark("start")
    # work
    s.mark("downloaded")
    logger.info("batch done")
# logs โ†’ ingest::end total=XYZms

Async variant:

from scenario import Scenario

async def run():
    async with Scenario("assembly", extra={"component": "ChunkAssembler"}) as s:
        s.mark("start")
        await process()
        s.mark("chunked")

decorator sugar

Wrap a function in a scenario automatically.

from scenario import scenario

@scenario("resize", extra={"component": "imgproc"})
def resize_image(path: str):
    ...

@scenario("handle_request")
async def handle_request(req):
    ...

json_formatter.py

# my_json_formatter.py
import json, logging, time

class JsonFormatter(logging.Formatter):
    def format(self, record: logging.LogRecord) -> str:
        base = {
            "ts": time.strftime("%Y-%m-%dT%H:%M:%S", time.localtime(record.created)),
            "lvl": record.levelname,
            "logger": record.name,
            "msg": record.getMessage(),
        }
        # Pull commonly useful contextual attributes if present
        ctx = {
            "scenario_id": getattr(record, "scenario_id", None),
            "scenario_short_id": getattr(record, "scenario_short_id", None),
            "scenario_name": getattr(record, "scenario_name", None),
            "scenario_last_mark": getattr(record, "scenario_last_mark", None),
            "scenario_last_mark_ms": getattr(record, "scenario_last_mark_ms", None),
            "scenario_total_ms": getattr(record, "scenario_total_ms", None),
            "scenario_marks": getattr(record, "scenario_marks", None),
        }
        # Merge record.__dict__ custom fields if you like, but avoid huge dumps
        base.update({k: v for k, v in ctx.items() if v is not None})
        return json.dumps(base, ensure_ascii=False)

logging_context.py

import contextlib
import contextvars
import logging
from typing import Any, Dict, Iterator

# Global ContextVar storing a dict of logging context fields.
LOGGING_CONTEXT: contextvars.ContextVar[Dict[str, Any]] = contextvars.ContextVar("LOGGING_CONTEXT", default={})

logger = logging.getLogger(__name__)

# Guard to avoid adding duplicate filters on repeated init calls
_added_ctx_filter = False


def setup_logging_context() -> None:
    """
    Install a logging filter that injects LOGGING_CONTEXT fields into each record.
    Safe to call multiple times; the filter is only added once.
    """
    global _added_ctx_filter
    if _added_ctx_filter:
        return
    logging.getLogger().addFilter(_ContextFilter())
    _added_ctx_filter = True


def get_logging_context() -> Dict[str, Any]:
    """Return the current task/thread logging context (a dict)."""
    return LOGGING_CONTEXT.get({})


@contextlib.contextmanager
def _enter_logging_context(**kwargs: Any) -> Iterator[None]:
    """
    Enter a logging context, merging provided fields with any existing context.
    Restores the previous context on exit.
    """
    merged = {**LOGGING_CONTEXT.get({}), **kwargs}
    token = LOGGING_CONTEXT.set(merged)
    try:
        yield
    finally:
        LOGGING_CONTEXT.reset(token)


@contextlib.contextmanager
def logging_context_sync(**kwargs: Any) -> Iterator[None]:
    """
    A context manager to set logging context variables for the current context.
    These variables are automatically removed when the context is exited.
    """
    with _enter_logging_context(**kwargs):
        yield


@contextlib.asynccontextmanager
async def logging_context_async(**kwargs: Any):
    """
    Async context manager to set logging context variables for the current task.
    These variables are automatically removed when the context is exited.
    """
    with _enter_logging_context(**kwargs):
        yield


# Optional alias to keep the shorter name in older call sites
logging_context = logging_context_async


class _ContextFilter(logging.Filter):
    """
    A logging filter to inject our context variables into each log record,
    while providing safe defaults for common fields to avoid KeyError in formatters.
    """

    _defaults = {
        "scenario_id": None,
        "scenario_short_id": None,
        "scenario_name": None,
        "scenario_marks": None,
        "scenario_last_mark": None,
        "scenario_last_mark_ms": None,
        "scenario_total_ms": None,
    }

    def filter(self, record: logging.LogRecord) -> bool:
        context = LOGGING_CONTEXT.get({})
        # Provide known defaults first (prevents KeyError in format strings).
        for k, v in self._defaults.items():
            if not hasattr(record, k):
                setattr(record, k, context.get(k, v))
        # Then inject any additional fields not already on the record.
        for key, value in context.items():
            if not hasattr(record, key):
                setattr(record, key, value)
        return True

scenario.py

import contextlib
import logging
import time
import types
import functools
import inspect
import uuid
from typing import Any, Dict, Callable, Optional, TypeVar, Awaitable

from logging_context import LOGGING_CONTEXT, _enter_logging_context

logger = logging.getLogger(__name__)

F = TypeVar("F", bound=Callable[..., Any])


class Scenario:
    """
    Scenario context manager that enriches log records with scenario metadata and timing marks.

    Usage:
        # Async
        async with Scenario(name="assembly", extra={"component": "ChunkAssembler"}) as scenario:
            scenario.mark("start")
            ...
            logger.info("processing")  # carries scenario fields
            scenario.mark("after_processing")

        # Sync
        with Scenario(name="ingest") as scenario:
            scenario.mark("downloaded")
            ...

    Fields injected into the logging context for all records inside the scenario:
      - scenario_id:          unique id for this scenario instance (uuid4 hex)
      - scenario_short_id:    8-char prefix of scenario_id
      - scenario_name:        user-specified logical name
      - scenario_marks:       cumulative map of mark durations in milliseconds (copy-on-update)
      - scenario_last_mark:   last mark label
      - scenario_last_mark_ms:millis since previous mark
      - scenario_total_ms:    total millis from enter() to exit()
      - any extra fields provided via "extra" on construction
    """

    def __init__(self, name: str, *, extra: Optional[Dict[str, Any]] = None):
        self.name = name
        self.id = uuid.uuid4().hex
        self.short_id = self.id[:8]
        self._extra: Dict[str, Any] = dict(extra or {})
        self._marks: Dict[str, int] = {}
        self._last_mark_ns: Optional[int] = None
        self._start_ns: Optional[int] = None
        self._exit_stack: Optional[contextlib.ExitStack] = None

    # ---- internal helpers ----

    def _current_context(self) -> Dict[str, Any]:
        base = LOGGING_CONTEXT.get({})
        return {
            **self._extra,
            "scenario_id": self.id,
            "scenario_short_id": self.short_id,
            "scenario_name": self.name,
            "scenario_marks": dict(self._marks),
            "scenario_last_mark": base.get("scenario_last_mark") if self._last_mark_ns is not None else None,
            "scenario_last_mark_ms": base.get("scenario_last_mark_ms") if self._last_mark_ns is not None else None,
            "scenario_total_ms": None,
        }

    # ---- public API ----

    def mark(self, label: str) -> None:
        now_ns = time.perf_counter_ns()
        if self._last_mark_ns is None:
            delta_ms = 0
        else:
            delta_ms = int((now_ns - self._last_mark_ns) / 1_000_000)
        self._last_mark_ns = now_ns

        # Update marks and push a copy into the ContextVar for stability
        self._marks[label] = delta_ms
        current = LOGGING_CONTEXT.get({})
        updated = {
            **current,
            "scenario_last_mark": label,
            "scenario_last_mark_ms": delta_ms,
            "scenario_marks": dict(self._marks),
        }
        LOGGING_CONTEXT.set(updated)
        logger.info(f"{self.name}::{label} after {delta_ms} milliseconds")

    # ---- context manager plumbing ----

    def __enter__(self) -> "Scenario":
        self._exit_stack = contextlib.ExitStack()
        self._start_ns = time.perf_counter_ns()
        self._last_mark_ns = self._start_ns
        self._exit_stack.enter_context(_enter_logging_context(**self._current_context()))
        logger.info(f"{self.name}::begin")
        return self

    def __exit__(self, exc_type, exc, tb) -> bool:
        try:
            if self._start_ns is not None:
                total_ms = int((time.perf_counter_ns() - self._start_ns) / 1_000_000)
                # publish final total into context for any final logs
                current = LOGGING_CONTEXT.get({})
                LOGGING_CONTEXT.set({**current, "scenario_total_ms": total_ms})
                logger.info(f"{self.name}::end total={total_ms}ms")
        finally:
            if self._exit_stack is not None:
                self._exit_stack.close()
            self._exit_stack = None
        # Do not suppress exceptions
        return False

    async def __aenter__(self) -> "Scenario":
        return self.__enter__()

    async def __aexit__(self, exc_type, exc, tb) -> bool:
        return self.__exit__(exc_type, exc, tb)


# ---- decorator sugar ----

def scenario(name: str, *, extra: Optional[Dict[str, Any]] = None) -> Callable[[F], F]:
    """
    Decorator that wraps a function in a Scenario context.
    Works for both sync and async callables.
    """

    def decorator(fn: F) -> F:
        if inspect.iscoroutinefunction(fn):
            @functools.wraps(fn)
            async def aw(*args, **kwargs):
                async with Scenario(name, extra=extra):
                    return await fn(*args, **kwargs)
            return types.cast(F, aw)
        else:
            @functools.wraps(fn)
            def sw(*args, **kwargs):
                with Scenario(name, extra=extra):
                    return fn(*args, **kwargs)
            return types.cast(F, sw)

    return decorator