Source code for thoth.shared.utils.logger

"""Structured Logging Utilities for GCP Cloud Logging and Grafana Loki.

This module provides a structured JSON logging framework that is compatible with:
- Google Cloud Logging (Cloud Run, GKE, Cloud Functions)
- Grafana Loki
- Any JSON-aware log aggregation system

Key Features:
- Structured JSON output with consistent field schema
- GCP Cloud Logging special fields (sourceLocation, trace, labels)
- Job/request correlation via JobLoggerAdapter
- Automatic sensitive data redaction
- Verbose source location (file, line, function)
- Metrics-ready numeric fields for dashboards

Example:
    >>> from thoth.shared.utils.logger import setup_logger, get_job_logger
    >>>
    >>> # Basic usage
    >>> logger = setup_logger("myapp")
    >>> logger.info("Server started", extra={"port": 8080})
    >>>
    >>> # Job-scoped logging
    >>> job_logger = get_job_logger(logger, job_id="job_123", source="handbook")
    >>> job_logger.info("Processing file", extra={"file_path": "docs/readme.md"})
"""

from collections.abc import MutableMapping
from contextvars import ContextVar
from datetime import UTC, datetime
import logging
import os
import re
from typing import Any, ClassVar, cast

from pythonjsonlogger.json import JsonFormatter as jsonlogger_JsonFormatter

# Context variable for trace ID (extracted from Cloud Run headers)
_trace_context: ContextVar[str | None] = ContextVar("trace_context", default=None)

# GCP Project ID for trace URL construction
_gcp_project_id: ContextVar[str | None] = ContextVar("gcp_project_id", default=None)


[docs] def set_trace_context(trace_id: str | None, project_id: str | None = None) -> None: """Set the trace context for the current request/task. Call this at the start of each request handler to enable log correlation. Args: trace_id: The trace ID from X-Cloud-Trace-Context header project_id: GCP project ID for constructing full trace URL """ _trace_context.set(trace_id) if project_id: _gcp_project_id.set(project_id)
[docs] def get_trace_context() -> str | None: """Get the current trace context.""" return _trace_context.get()
[docs] def extract_trace_id_from_header(header_value: str | None) -> str | None: """Extract trace ID from X-Cloud-Trace-Context header. The header format is: TRACE_ID/SPAN_ID;o=TRACE_TRUE Args: header_value: The X-Cloud-Trace-Context header value Returns: The trace ID portion, or None if header is missing/invalid """ if not header_value: return None # Extract just the trace ID (before the /) return header_value.split("/")[0] if "/" in header_value else header_value
[docs] class GCPStructuredFormatter(jsonlogger_JsonFormatter): """JSON formatter compatible with GCP Cloud Logging and Grafana Loki. This formatter produces structured JSON logs with: - Standard fields (timestamp, severity, message, logger) - Verbose source location (pathname, filename, lineno, funcName) - GCP special fields (sourceLocation, trace, labels) - Custom context fields (job_id, source, operation, etc.) - Automatic sensitive data redaction The output is compatible with: - GCP Cloud Logging (jsonPayload with special field recognition) - Grafana Loki (JSON parsing and label extraction) - Any JSON-aware log aggregation system Example output: { "timestamp": "2026-01-30T10:15:30.123456Z", "severity": "INFO", "message": "Processing file", "logger": "thoth.ingestion.pipeline", "pathname": "/app/thoth/ingestion/pipeline.py", "filename": "pipeline.py", "lineno": 456, "funcName": "_process_file", "module": "pipeline", "logging.googleapis.com/sourceLocation": { "file": "thoth/ingestion/pipeline.py", "line": "456", "function": "_process_file" }, "job_id": "job_xyz789", "source": "handbook" } """ # Keywords that indicate sensitive information SENSITIVE_KEYWORDS: ClassVar[list[str]] = [ "password", "passwd", "pwd", "secret", "token", "apikey", "api_key", "auth", "authorization", "credential", "key", "private", "session", "cookie", "jwt", "bearer", "oauth", ] # Compiled regex patterns for sensitive data redaction _redaction_patterns: ClassVar[list[tuple[re.Pattern[str], str]]] = []
[docs] def __init__(self, *args: Any, **kwargs: Any) -> None: """Initialize the formatter with GCP-compatible settings.""" # Use a simple format string - we'll build the JSON ourselves super().__init__(*args, **kwargs) # Compile redaction patterns once if not GCPStructuredFormatter._redaction_patterns: keywords_pattern = "|".join(self.SENSITIVE_KEYWORDS) GCPStructuredFormatter._redaction_patterns = [ ( re.compile(rf"\b({keywords_pattern})(\s+is\s+)(\S+)", re.IGNORECASE), r"\1\2[REDACTED]", ), ( re.compile(rf"\b({keywords_pattern})(\s*:\s+)(\S+)", re.IGNORECASE), r"\1\2[REDACTED]", ), ( re.compile(rf"\b({keywords_pattern})(\s*=\s*)(\S+)", re.IGNORECASE), r"\1\2[REDACTED]", ), ]
def _redact_sensitive_data(self, message: str) -> str: """Redact sensitive data from a message string.""" for pattern, replacement in self._redaction_patterns: message = pattern.sub(replacement, message) return message
[docs] def add_fields( # noqa: PLR0912 self, log_record: dict[str, Any], record: logging.LogRecord, message_dict: dict[str, Any], ) -> None: """Add custom fields to the JSON log record. This method is called by python-json-logger to populate the log record. We add all our custom fields here. """ super().add_fields(log_record, record, message_dict) # === Standard Fields === log_record["timestamp"] = datetime.now(UTC).isoformat() log_record["severity"] = record.levelname log_record["logger"] = record.name # Redact sensitive data from message if "message" in log_record: log_record["message"] = self._redact_sensitive_data(str(log_record["message"])) # === Verbose Source Location === log_record["pathname"] = record.pathname log_record["filename"] = record.filename log_record["lineno"] = record.lineno log_record["funcName"] = record.funcName log_record["module"] = record.module # === GCP Special Fields === # sourceLocation - makes logs clickable in GCP Console # Use relative path for cleaner display relative_path = record.pathname if "/thoth/" in relative_path: relative_path = "thoth/" + relative_path.split("/thoth/", 1)[1] log_record["logging.googleapis.com/sourceLocation"] = { "file": relative_path, "line": str(record.lineno), "function": record.funcName, } # Trace correlation trace_id = get_trace_context() project_id = _gcp_project_id.get() or os.getenv("GCP_PROJECT_ID") if trace_id and project_id: log_record["logging.googleapis.com/trace"] = f"projects/{project_id}/traces/{trace_id}" # === Job Context Fields (from extra) === # These are added via extra={} or JobLoggerAdapter context_fields = [ "job_id", "source", "collection", "operation", "batch_id", "request_id", ] for field in context_fields: if hasattr(record, field): value = getattr(record, field) if value is not None: log_record[field] = value # === Metrics Fields (from extra) === metric_fields = [ "files_processed", "chunks_created", "duration_ms", "total_files", "successful", "failed", "documents_count", ] for field in metric_fields: if hasattr(record, field): value = getattr(record, field) if value is not None: log_record[field] = value # === Error Context (from extra) === error_fields = ["error_type", "error_message", "file_path", "stack_trace"] for field in error_fields: if hasattr(record, field): value = getattr(record, field) if value is not None: log_record[field] = self._redact_sensitive_data(str(value)) if isinstance(value, str) else value # === GCP Labels (for filtering) === # Build labels from job context labels: dict[str, str] = {} if hasattr(record, "job_id") and record.job_id: labels["job_id"] = str(record.job_id) if hasattr(record, "source") and record.source: labels["source"] = str(record.source) if hasattr(record, "operation") and record.operation: labels["operation"] = str(record.operation) if labels: log_record["logging.googleapis.com/labels"] = labels # === Process/Thread Info === log_record["process"] = record.process log_record["processName"] = record.processName log_record["thread"] = record.thread log_record["threadName"] = record.threadName # Remove None values to keep logs clean keys_to_remove = [k for k, v in log_record.items() if v is None] for key in keys_to_remove: del log_record[key]
[docs] class SimpleFormatter(logging.Formatter): """Simple text formatter for local development/debugging. Uses a human-readable format without JSON structure. Still includes sensitive data redaction. """ # Same sensitive keywords as GCPStructuredFormatter SENSITIVE_KEYWORDS: ClassVar[list[str]] = GCPStructuredFormatter.SENSITIVE_KEYWORDS
[docs] def __init__(self, fmt: str | None = None, **kwargs: Any) -> None: """Initialize with default format if not provided.""" if fmt is None: fmt = "%(asctime)s - %(name)s - %(levelname)s - %(funcName)s:%(lineno)d - %(message)s" super().__init__(fmt, **kwargs) # Compile redaction patterns self._patterns: list[tuple[re.Pattern[str], str]] = [] keywords_pattern = "|".join(self.SENSITIVE_KEYWORDS) self._patterns = [ ( re.compile(rf"\b({keywords_pattern})(\s+is\s+)(\S+)", re.IGNORECASE), r"\1\2[REDACTED]", ), ( re.compile(rf"\b({keywords_pattern})(\s*:\s+)(\S+)", re.IGNORECASE), r"\1\2[REDACTED]", ), ( re.compile(rf"\b({keywords_pattern})(\s*=\s*)(\S+)", re.IGNORECASE), r"\1\2[REDACTED]", ), ]
[docs] def format(self, record: logging.LogRecord) -> str: """Format the record with sensitive data redaction.""" formatted = super().format(record) for pattern, replacement in self._patterns: formatted = pattern.sub(replacement, formatted) return formatted
[docs] class JobLoggerAdapter(logging.LoggerAdapter): """Logger adapter that automatically includes job context in all log messages. This adapter enriches log messages with job-specific context like job_id, source, and collection. Use this when processing a specific job to ensure all logs can be correlated. Example: >>> base_logger = setup_logger("thoth.worker") >>> job_logger = JobLoggerAdapter(base_logger, job_id="job_123", source="handbook") >>> job_logger.info("Processing started") >>> job_logger.info("File processed", extra={"file_path": "readme.md"}) """
[docs] def __init__( self, logger: logging.Logger, job_id: str, source: str | None = None, collection: str | None = None, **extra_context: Any, ) -> None: """Initialize the job logger adapter. Args: logger: The base logger to wrap job_id: Unique identifier for the job/run source: Source being processed (e.g., "handbook", "dnd") collection: Collection name being used **extra_context: Additional context to include in all logs """ context = { "job_id": job_id, "source": source, "collection": collection, **extra_context, } # Remove None values context = {k: v for k, v in context.items() if v is not None} super().__init__(logger, context)
[docs] def process(self, msg: str, kwargs: MutableMapping[str, Any]) -> tuple[str, MutableMapping[str, Any]]: """Process the log message to include job context. Args: msg: The log message kwargs: Keyword arguments for the log call Returns: Tuple of (message, kwargs) with context added to extra """ # Merge our context into extra extra = kwargs.get("extra", {}) if self.extra: extra.update(self.extra) kwargs["extra"] = extra return msg, kwargs
[docs] def with_operation(self, operation: str) -> "JobLoggerAdapter": """Create a child logger for a specific operation. Args: operation: The operation name (e.g., "chunking", "embedding", "storing") Returns: A new JobLoggerAdapter with the operation context added """ current_extra = self.extra or {} return JobLoggerAdapter( self.logger, job_id=cast("str", current_extra.get("job_id", "unknown")), source=cast("str | None", current_extra.get("source")), collection=cast("str | None", current_extra.get("collection")), operation=operation, )
# Legacy alias for backward compatibility
[docs] class SecureLogger(logging.Logger): """Legacy SecureLogger class for backward compatibility. New code should use setup_logger() which returns a standard Logger with GCPStructuredFormatter attached. This class is maintained for backward compatibility with existing code that checks isinstance(logger, SecureLogger). """ SENSITIVE_KEYWORDS: ClassVar[list[str]] = GCPStructuredFormatter.SENSITIVE_KEYWORDS
[docs] def __init__(self, name: str, level: int = logging.NOTSET) -> None: """Initialize the SecureLogger.""" super().__init__(name, level)
def _safe_format(self, msg: Any, args: tuple[Any, ...]) -> str: """Safely format a message with arguments.""" try: return msg % args if args and isinstance(msg, str) else str(msg) except (TypeError, ValueError): return str(msg)
[docs] def debug(self, msg: Any, *args: Any, **kwargs: Any) -> None: """Log a debug message with safe formatting.""" super().debug(self._safe_format(msg, args), **kwargs)
[docs] def info(self, msg: Any, *args: Any, **kwargs: Any) -> None: """Log an info message with safe formatting.""" super().info(self._safe_format(msg, args), **kwargs)
[docs] def warning(self, msg: Any, *args: Any, **kwargs: Any) -> None: """Log a warning message with safe formatting.""" super().warning(self._safe_format(msg, args), **kwargs)
[docs] def error(self, msg: Any, *args: Any, **kwargs: Any) -> None: """Log an error message with safe formatting.""" super().error(self._safe_format(msg, args), **kwargs)
[docs] def critical(self, msg: Any, *args: Any, **kwargs: Any) -> None: """Log a critical message with safe formatting.""" super().critical(self._safe_format(msg, args), **kwargs)
# Legacy alias SensitiveDataFormatter = SimpleFormatter
[docs] def setup_logger( name: str, level: int = logging.INFO, simple: bool = False, json_output: bool | None = None, ) -> logging.Logger: """Create and configure a logger with structured JSON output. This function creates a logger that outputs structured JSON logs compatible with GCP Cloud Logging and Grafana Loki. By default, it auto-detects whether to use JSON output based on the environment. Args: name: Name of the logger (typically __name__) level: Logging level (default: INFO) simple: If True, use simple text format instead of JSON (for local dev) json_output: Explicit control over JSON output. If None, auto-detects: - True in Cloud Run (GCS_BUCKET_NAME set) - True if LOG_FORMAT=json - False otherwise (local development) Returns: Configured logger instance Example: >>> logger = setup_logger(__name__) >>> logger.info("Server started", extra={"port": 8080}) >>> # With job context >>> logger.info("Processing", extra={"job_id": "abc123", "source": "handbook"}) """ # Check if logger already exists and is configured existing_logger = logging.getLogger(name) if existing_logger.handlers: # Logger already configured, just update level if different if existing_logger.level != level: existing_logger.setLevel(level) return existing_logger # Auto-detect JSON output mode if json_output is None: # Use JSON in Cloud Run or if explicitly requested in_cloud_run = bool(os.getenv("GCS_BUCKET_NAME") and os.getenv("GCP_PROJECT_ID")) explicit_json = os.getenv("LOG_FORMAT", "").lower() == "json" json_output = in_cloud_run or explicit_json # For backward compatibility, simple=True forces text output if simple: json_output = False # Create logger (use SecureLogger for backward compatibility checks) logger = SecureLogger(name, level) # Create handler handler = logging.StreamHandler() # Choose formatter based on output mode formatter = GCPStructuredFormatter() if json_output else SimpleFormatter() handler.setFormatter(formatter) logger.addHandler(handler) # Register in logger manager logging.Logger.manager.loggerDict[name] = logger return logger
[docs] def get_job_logger( base_logger: logging.Logger, job_id: str, source: str | None = None, collection: str | None = None, **extra_context: Any, ) -> JobLoggerAdapter: """Create a job-scoped logger adapter. This is the recommended way to create loggers for job processing. All log messages will automatically include the job context. Args: base_logger: The base logger (from setup_logger) job_id: Unique identifier for the job source: Source being processed (e.g., "handbook") collection: Collection name **extra_context: Additional context fields Returns: JobLoggerAdapter with job context Example: >>> logger = setup_logger("thoth.worker") >>> job_logger = get_job_logger(logger, job_id="job_123", source="handbook") >>> job_logger.info("Starting ingestion") >>> job_logger.info("Processed file", extra={"file_path": "readme.md", "chunks_created": 15}) """ return JobLoggerAdapter( base_logger, job_id=job_id, source=source, collection=collection, **extra_context, )
[docs] def configure_root_logger(level: int = logging.INFO, json_output: bool | None = None) -> None: """Configure the root logger for the application. Call this once at application startup to configure global logging behavior. Args: level: Root logging level json_output: Whether to use JSON output (auto-detects if None) """ root_logger = logging.getLogger() root_logger.setLevel(level) # Remove existing handlers for handler in root_logger.handlers[:]: root_logger.removeHandler(handler) # Auto-detect JSON output mode if json_output is None: in_cloud_run = bool(os.getenv("GCS_BUCKET_NAME") and os.getenv("GCP_PROJECT_ID")) explicit_json = os.getenv("LOG_FORMAT", "").lower() == "json" json_output = in_cloud_run or explicit_json # Add new handler handler = logging.StreamHandler() if json_output: handler.setFormatter(GCPStructuredFormatter()) else: handler.setFormatter(SimpleFormatter()) root_logger.addHandler(handler)