a_logging_context
Mar 5, 2025ยทan implementation for logging variables within a given context.
a_logging_context.md
logging
Tiny, robust logging context helpers built on contextvars, plus a Scenario context manager (and decorator) that adds structured timing marks to your logs.
Works seamlessly across async and sync Python code.
# app.py
import logging
from logging_context import setup_logging_context
from my_json_formatter import JsonFormatter # see below
setup_logging_context()
handler = logging.StreamHandler()
handler.setFormatter(JsonFormatter())
root = logging.getLogger()
root.setLevel(logging.INFO)
root.addHandler(handler)
example json formatter
# my_json_formatter.py
import json, logging, time
class JsonFormatter(logging.Formatter):
def format(self, record: logging.LogRecord) -> str:
base = {
"ts": time.strftime("%Y-%m-%dT%H:%M:%S", time.localtime(record.created)),
"lvl": record.levelname,
"logger": record.name,
"msg": record.getMessage(),
}
ctx = {
"scenario_id": getattr(record, "scenario_id", None),
"scenario_short_id": getattr(record, "scenario_short_id", None),
"scenario_name": getattr(record, "scenario_name", None),
"scenario_last_mark": getattr(record, "scenario_last_mark", None),
"scenario_last_mark_ms": getattr(record, "scenario_last_mark_ms", None),
"scenario_total_ms": getattr(record, "scenario_total_ms", None),
"scenario_marks": getattr(record, "scenario_marks", None),
}
base.update({k: v for k, v in ctx.items() if v is not None})
return json.dumps(base, ensure_ascii=False)
using logging contexts
Attach ad-hoc fields that automatically propagate into log records.
from logging_context import logging_context_sync, logging_context
# sync
with logging_context_sync(request_id="abc123", component="ingestor"):
logger.info("downloading")
# async
async with logging_context(user_id="u_42", tenant="acme"):
logger.info("processing")
All context vars are injected automatically by the setup_logging_context() filter.
๐ฌ scenarios
Scenario enriches logs with metadata & timing marks.
import logging
from scenario import Scenario
logger = logging.getLogger(__name__)
with Scenario("ingest", extra={"component": "leecher"}) as s:
s.mark("start")
# work
s.mark("downloaded")
logger.info("batch done")
# logs โ ingest::end total=XYZms
Async variant:
from scenario import Scenario
async def run():
async with Scenario("assembly", extra={"component": "ChunkAssembler"}) as s:
s.mark("start")
await process()
s.mark("chunked")
decorator sugar
Wrap a function in a scenario automatically.
from scenario import scenario
@scenario("resize", extra={"component": "imgproc"})
def resize_image(path: str):
...
@scenario("handle_request")
async def handle_request(req):
...
json_formatter.py
# my_json_formatter.py
import json, logging, time
class JsonFormatter(logging.Formatter):
def format(self, record: logging.LogRecord) -> str:
base = {
"ts": time.strftime("%Y-%m-%dT%H:%M:%S", time.localtime(record.created)),
"lvl": record.levelname,
"logger": record.name,
"msg": record.getMessage(),
}
# Pull commonly useful contextual attributes if present
ctx = {
"scenario_id": getattr(record, "scenario_id", None),
"scenario_short_id": getattr(record, "scenario_short_id", None),
"scenario_name": getattr(record, "scenario_name", None),
"scenario_last_mark": getattr(record, "scenario_last_mark", None),
"scenario_last_mark_ms": getattr(record, "scenario_last_mark_ms", None),
"scenario_total_ms": getattr(record, "scenario_total_ms", None),
"scenario_marks": getattr(record, "scenario_marks", None),
}
# Merge record.__dict__ custom fields if you like, but avoid huge dumps
base.update({k: v for k, v in ctx.items() if v is not None})
return json.dumps(base, ensure_ascii=False)
logging_context.py
import contextlib
import contextvars
import logging
from typing import Any, Dict, Iterator
# Global ContextVar storing a dict of logging context fields.
LOGGING_CONTEXT: contextvars.ContextVar[Dict[str, Any]] = contextvars.ContextVar("LOGGING_CONTEXT", default={})
logger = logging.getLogger(__name__)
# Guard to avoid adding duplicate filters on repeated init calls
_added_ctx_filter = False
def setup_logging_context() -> None:
"""
Install a logging filter that injects LOGGING_CONTEXT fields into each record.
Safe to call multiple times; the filter is only added once.
"""
global _added_ctx_filter
if _added_ctx_filter:
return
logging.getLogger().addFilter(_ContextFilter())
_added_ctx_filter = True
def get_logging_context() -> Dict[str, Any]:
"""Return the current task/thread logging context (a dict)."""
return LOGGING_CONTEXT.get({})
@contextlib.contextmanager
def _enter_logging_context(**kwargs: Any) -> Iterator[None]:
"""
Enter a logging context, merging provided fields with any existing context.
Restores the previous context on exit.
"""
merged = {**LOGGING_CONTEXT.get({}), **kwargs}
token = LOGGING_CONTEXT.set(merged)
try:
yield
finally:
LOGGING_CONTEXT.reset(token)
@contextlib.contextmanager
def logging_context_sync(**kwargs: Any) -> Iterator[None]:
"""
A context manager to set logging context variables for the current context.
These variables are automatically removed when the context is exited.
"""
with _enter_logging_context(**kwargs):
yield
@contextlib.asynccontextmanager
async def logging_context_async(**kwargs: Any):
"""
Async context manager to set logging context variables for the current task.
These variables are automatically removed when the context is exited.
"""
with _enter_logging_context(**kwargs):
yield
# Optional alias to keep the shorter name in older call sites
logging_context = logging_context_async
class _ContextFilter(logging.Filter):
"""
A logging filter to inject our context variables into each log record,
while providing safe defaults for common fields to avoid KeyError in formatters.
"""
_defaults = {
"scenario_id": None,
"scenario_short_id": None,
"scenario_name": None,
"scenario_marks": None,
"scenario_last_mark": None,
"scenario_last_mark_ms": None,
"scenario_total_ms": None,
}
def filter(self, record: logging.LogRecord) -> bool:
context = LOGGING_CONTEXT.get({})
# Provide known defaults first (prevents KeyError in format strings).
for k, v in self._defaults.items():
if not hasattr(record, k):
setattr(record, k, context.get(k, v))
# Then inject any additional fields not already on the record.
for key, value in context.items():
if not hasattr(record, key):
setattr(record, key, value)
return True
scenario.py
import contextlib
import logging
import time
import types
import functools
import inspect
import uuid
from typing import Any, Dict, Callable, Optional, TypeVar, Awaitable
from logging_context import LOGGING_CONTEXT, _enter_logging_context
logger = logging.getLogger(__name__)
F = TypeVar("F", bound=Callable[..., Any])
class Scenario:
"""
Scenario context manager that enriches log records with scenario metadata and timing marks.
Usage:
# Async
async with Scenario(name="assembly", extra={"component": "ChunkAssembler"}) as scenario:
scenario.mark("start")
...
logger.info("processing") # carries scenario fields
scenario.mark("after_processing")
# Sync
with Scenario(name="ingest") as scenario:
scenario.mark("downloaded")
...
Fields injected into the logging context for all records inside the scenario:
- scenario_id: unique id for this scenario instance (uuid4 hex)
- scenario_short_id: 8-char prefix of scenario_id
- scenario_name: user-specified logical name
- scenario_marks: cumulative map of mark durations in milliseconds (copy-on-update)
- scenario_last_mark: last mark label
- scenario_last_mark_ms:millis since previous mark
- scenario_total_ms: total millis from enter() to exit()
- any extra fields provided via "extra" on construction
"""
def __init__(self, name: str, *, extra: Optional[Dict[str, Any]] = None):
self.name = name
self.id = uuid.uuid4().hex
self.short_id = self.id[:8]
self._extra: Dict[str, Any] = dict(extra or {})
self._marks: Dict[str, int] = {}
self._last_mark_ns: Optional[int] = None
self._start_ns: Optional[int] = None
self._exit_stack: Optional[contextlib.ExitStack] = None
# ---- internal helpers ----
def _current_context(self) -> Dict[str, Any]:
base = LOGGING_CONTEXT.get({})
return {
**self._extra,
"scenario_id": self.id,
"scenario_short_id": self.short_id,
"scenario_name": self.name,
"scenario_marks": dict(self._marks),
"scenario_last_mark": base.get("scenario_last_mark") if self._last_mark_ns is not None else None,
"scenario_last_mark_ms": base.get("scenario_last_mark_ms") if self._last_mark_ns is not None else None,
"scenario_total_ms": None,
}
# ---- public API ----
def mark(self, label: str) -> None:
now_ns = time.perf_counter_ns()
if self._last_mark_ns is None:
delta_ms = 0
else:
delta_ms = int((now_ns - self._last_mark_ns) / 1_000_000)
self._last_mark_ns = now_ns
# Update marks and push a copy into the ContextVar for stability
self._marks[label] = delta_ms
current = LOGGING_CONTEXT.get({})
updated = {
**current,
"scenario_last_mark": label,
"scenario_last_mark_ms": delta_ms,
"scenario_marks": dict(self._marks),
}
LOGGING_CONTEXT.set(updated)
logger.info(f"{self.name}::{label} after {delta_ms} milliseconds")
# ---- context manager plumbing ----
def __enter__(self) -> "Scenario":
self._exit_stack = contextlib.ExitStack()
self._start_ns = time.perf_counter_ns()
self._last_mark_ns = self._start_ns
self._exit_stack.enter_context(_enter_logging_context(**self._current_context()))
logger.info(f"{self.name}::begin")
return self
def __exit__(self, exc_type, exc, tb) -> bool:
try:
if self._start_ns is not None:
total_ms = int((time.perf_counter_ns() - self._start_ns) / 1_000_000)
# publish final total into context for any final logs
current = LOGGING_CONTEXT.get({})
LOGGING_CONTEXT.set({**current, "scenario_total_ms": total_ms})
logger.info(f"{self.name}::end total={total_ms}ms")
finally:
if self._exit_stack is not None:
self._exit_stack.close()
self._exit_stack = None
# Do not suppress exceptions
return False
async def __aenter__(self) -> "Scenario":
return self.__enter__()
async def __aexit__(self, exc_type, exc, tb) -> bool:
return self.__exit__(exc_type, exc, tb)
# ---- decorator sugar ----
def scenario(name: str, *, extra: Optional[Dict[str, Any]] = None) -> Callable[[F], F]:
"""
Decorator that wraps a function in a Scenario context.
Works for both sync and async callables.
"""
def decorator(fn: F) -> F:
if inspect.iscoroutinefunction(fn):
@functools.wraps(fn)
async def aw(*args, **kwargs):
async with Scenario(name, extra=extra):
return await fn(*args, **kwargs)
return types.cast(F, aw)
else:
@functools.wraps(fn)
def sw(*args, **kwargs):
with Scenario(name, extra=extra):
return fn(*args, **kwargs)
return types.cast(F, sw)
return decorator