Skip to content

Logger

thoth.shared.utils.logger

Structured Logging Utilities for GCP Cloud Logging and Grafana Loki.

This module provides a structured JSON logging framework that is compatible with: - Google Cloud Logging (Cloud Run, GKE, Cloud Functions) - Grafana Loki - Any JSON-aware log aggregation system

Key Features: - Structured JSON output with consistent field schema - GCP Cloud Logging special fields (sourceLocation, trace, labels) - Job/request correlation via JobLoggerAdapter - Automatic sensitive data redaction - Verbose source location (file, line, function) - Metrics-ready numeric fields for dashboards

Example

from thoth.shared.utils.logger import setup_logger, get_job_logger

Basic usage

logger = setup_logger("myapp") logger.info("Server started", extra={"port": 8080})

Job-scoped logging

job_logger = get_job_logger(logger, job_id="job_123", source="handbook") job_logger.info("Processing file", extra={"file_path": "docs/readme.md"})

SensitiveDataFormatter = SimpleFormatter module-attribute

GCPStructuredFormatter

JSON formatter compatible with GCP Cloud Logging and Grafana Loki.

This formatter produces structured JSON logs with: - Standard fields (timestamp, severity, message, logger) - Verbose source location (pathname, filename, lineno, funcName) - GCP special fields (sourceLocation, trace, labels) - Custom context fields (job_id, source, operation, etc.) - Automatic sensitive data redaction

The output is compatible with: - GCP Cloud Logging (jsonPayload with special field recognition) - Grafana Loki (JSON parsing and label extraction) - Any JSON-aware log aggregation system

Example output

{ "timestamp": "2026-01-30T10:15:30.123456Z", "severity": "INFO", "message": "Processing file", "logger": "thoth.ingestion.pipeline", "pathname": "/app/thoth/ingestion/pipeline.py", "filename": "pipeline.py", "lineno": 456, "funcName": "_process_file", "module": "pipeline", "logging.googleapis.com/sourceLocation": { "file": "thoth/ingestion/pipeline.py", "line": "456", "function": "_process_file" }, "job_id": "job_xyz789", "source": "handbook" }

SENSITIVE_KEYWORDS: list[str] = ['password', 'passwd', 'pwd', 'secret', 'token', 'apikey', 'api_key', 'auth', 'authorization', 'credential', 'key', 'private', 'session', 'cookie', 'jwt', 'bearer', 'oauth'] class-attribute

__init__(*args: Any, **kwargs: Any) -> None

Initialize the formatter with GCP-compatible settings.

add_fields(log_record: dict[str, Any], record: logging.LogRecord, message_dict: dict[str, Any]) -> None

Add custom fields to the JSON log record.

This method is called by python-json-logger to populate the log record. We add all our custom fields here.

SimpleFormatter

Simple text formatter for local development/debugging.

Uses a human-readable format without JSON structure. Still includes sensitive data redaction.

SENSITIVE_KEYWORDS: list[str] = GCPStructuredFormatter.SENSITIVE_KEYWORDS class-attribute

__init__(fmt: str | None = None, **kwargs: Any) -> None

Initialize with default format if not provided.

format(record: logging.LogRecord) -> str

Format the record with sensitive data redaction.

JobLoggerAdapter

Logger adapter that automatically includes job context in all log messages.

This adapter enriches log messages with job-specific context like job_id, source, and collection. Use this when processing a specific job to ensure all logs can be correlated.

Example

base_logger = setup_logger("thoth.worker") job_logger = JobLoggerAdapter(base_logger, job_id="job_123", source="handbook") job_logger.info("Processing started") job_logger.info("File processed", extra={"file_path": "readme.md"})

__init__(logger: logging.Logger, job_id: str, source: str | None = None, collection: str | None = None, **extra_context: Any) -> None

Initialize the job logger adapter.

Parameters:

Name Type Description Default
logger Logger

The base logger to wrap

required
job_id str

Unique identifier for the job/run

required
source str | None

Source being processed (e.g., "handbook", "dnd")

None
collection str | None

Collection name being used

None
**extra_context Any

Additional context to include in all logs

{}

process(msg: str, kwargs: MutableMapping[str, Any]) -> tuple[str, MutableMapping[str, Any]]

Process the log message to include job context.

Parameters:

Name Type Description Default
msg str

The log message

required
kwargs MutableMapping[str, Any]

Keyword arguments for the log call

required

Returns:

Type Description
tuple[str, MutableMapping[str, Any]]

Tuple of (message, kwargs) with context added to extra

with_operation(operation: str) -> JobLoggerAdapter

Create a child logger for a specific operation.

Parameters:

Name Type Description Default
operation str

The operation name (e.g., "chunking", "embedding", "storing")

required

Returns:

Type Description
JobLoggerAdapter

A new JobLoggerAdapter with the operation context added

SecureLogger

Legacy SecureLogger class for backward compatibility.

New code should use setup_logger() which returns a standard Logger with GCPStructuredFormatter attached.

This class is maintained for backward compatibility with existing code that checks isinstance(logger, SecureLogger).

SENSITIVE_KEYWORDS: list[str] = GCPStructuredFormatter.SENSITIVE_KEYWORDS class-attribute

__init__(name: str, level: int = logging.NOTSET) -> None

Initialize the SecureLogger.

debug(msg: Any, *args: Any, **kwargs: Any) -> None

Log a debug message with safe formatting.

info(msg: Any, *args: Any, **kwargs: Any) -> None

Log an info message with safe formatting.

warning(msg: Any, *args: Any, **kwargs: Any) -> None

Log a warning message with safe formatting.

error(msg: Any, *args: Any, **kwargs: Any) -> None

Log an error message with safe formatting.

critical(msg: Any, *args: Any, **kwargs: Any) -> None

Log a critical message with safe formatting.

set_trace_context(trace_id: str | None, project_id: str | None = None) -> None

Set the trace context for the current request/task.

Call this at the start of each request handler to enable log correlation.

Parameters:

Name Type Description Default
trace_id str | None

The trace ID from X-Cloud-Trace-Context header

required
project_id str | None

GCP project ID for constructing full trace URL

None

get_trace_context() -> str | None

Get the current trace context.

extract_trace_id_from_header(header_value: str | None) -> str | None

Extract trace ID from X-Cloud-Trace-Context header.

The header format is: TRACE_ID/SPAN_ID;o=TRACE_TRUE

Parameters:

Name Type Description Default
header_value str | None

The X-Cloud-Trace-Context header value

required

Returns:

Type Description
str | None

The trace ID portion, or None if header is missing/invalid

setup_logger(name: str, level: int = logging.INFO, simple: bool = False, json_output: bool | None = None) -> logging.Logger

Create and configure a logger with structured JSON output.

This function creates a logger that outputs structured JSON logs compatible with GCP Cloud Logging and Grafana Loki. By default, it auto-detects whether to use JSON output based on the environment.

Parameters:

Name Type Description Default
name str

Name of the logger (typically name)

required
level int

Logging level (default: INFO)

INFO
simple bool

If True, use simple text format instead of JSON (for local dev)

False
json_output bool | None

Explicit control over JSON output. If None, auto-detects: - True in Cloud Run (GCS_BUCKET_NAME set) - True if LOG_FORMAT=json - False otherwise (local development)

None

Returns:

Type Description
Logger

Configured logger instance

Example

logger = setup_logger(name) logger.info("Server started", extra={"port": 8080})

With job context

logger.info("Processing", extra={"job_id": "abc123", "source": "handbook"})

get_job_logger(base_logger: logging.Logger, job_id: str, source: str | None = None, collection: str | None = None, **extra_context: Any) -> JobLoggerAdapter

Create a job-scoped logger adapter.

This is the recommended way to create loggers for job processing. All log messages will automatically include the job context.

Parameters:

Name Type Description Default
base_logger Logger

The base logger (from setup_logger)

required
job_id str

Unique identifier for the job

required
source str | None

Source being processed (e.g., "handbook")

None
collection str | None

Collection name

None
**extra_context Any

Additional context fields

{}

Returns:

Type Description
JobLoggerAdapter

JobLoggerAdapter with job context

Example

logger = setup_logger("thoth.worker") job_logger = get_job_logger(logger, job_id="job_123", source="handbook") job_logger.info("Starting ingestion") job_logger.info("Processed file", extra={"file_path": "readme.md", "chunks_created": 15})

configure_root_logger(level: int = logging.INFO, json_output: bool | None = None) -> None

Configure the root logger for the application.

Call this once at application startup to configure global logging behavior.

Parameters:

Name Type Description Default
level int

Root logging level

INFO
json_output bool | None

Whether to use JSON output (auto-detects if None)

None