Logger
thoth.shared.utils.logger
¶
Structured Logging Utilities for GCP Cloud Logging and Grafana Loki.
This module provides a structured JSON logging framework that is compatible with: - Google Cloud Logging (Cloud Run, GKE, Cloud Functions) - Grafana Loki - Any JSON-aware log aggregation system
Key Features: - Structured JSON output with consistent field schema - GCP Cloud Logging special fields (sourceLocation, trace, labels) - Job/request correlation via JobLoggerAdapter - Automatic sensitive data redaction - Verbose source location (file, line, function) - Metrics-ready numeric fields for dashboards
Example
from thoth.shared.utils.logger import setup_logger, get_job_logger
Basic usage¶
logger = setup_logger("myapp") logger.info("Server started", extra={"port": 8080})
Job-scoped logging¶
job_logger = get_job_logger(logger, job_id="job_123", source="handbook") job_logger.info("Processing file", extra={"file_path": "docs/readme.md"})
SensitiveDataFormatter = SimpleFormatter
module-attribute
¶
GCPStructuredFormatter
¶
JSON formatter compatible with GCP Cloud Logging and Grafana Loki.
This formatter produces structured JSON logs with: - Standard fields (timestamp, severity, message, logger) - Verbose source location (pathname, filename, lineno, funcName) - GCP special fields (sourceLocation, trace, labels) - Custom context fields (job_id, source, operation, etc.) - Automatic sensitive data redaction
The output is compatible with: - GCP Cloud Logging (jsonPayload with special field recognition) - Grafana Loki (JSON parsing and label extraction) - Any JSON-aware log aggregation system
Example output
{ "timestamp": "2026-01-30T10:15:30.123456Z", "severity": "INFO", "message": "Processing file", "logger": "thoth.ingestion.pipeline", "pathname": "/app/thoth/ingestion/pipeline.py", "filename": "pipeline.py", "lineno": 456, "funcName": "_process_file", "module": "pipeline", "logging.googleapis.com/sourceLocation": { "file": "thoth/ingestion/pipeline.py", "line": "456", "function": "_process_file" }, "job_id": "job_xyz789", "source": "handbook" }
SENSITIVE_KEYWORDS: list[str] = ['password', 'passwd', 'pwd', 'secret', 'token', 'apikey', 'api_key', 'auth', 'authorization', 'credential', 'key', 'private', 'session', 'cookie', 'jwt', 'bearer', 'oauth']
class-attribute
¶
__init__(*args: Any, **kwargs: Any) -> None
¶
Initialize the formatter with GCP-compatible settings.
add_fields(log_record: dict[str, Any], record: logging.LogRecord, message_dict: dict[str, Any]) -> None
¶
Add custom fields to the JSON log record.
This method is called by python-json-logger to populate the log record. We add all our custom fields here.
SimpleFormatter
¶
Simple text formatter for local development/debugging.
Uses a human-readable format without JSON structure. Still includes sensitive data redaction.
JobLoggerAdapter
¶
Logger adapter that automatically includes job context in all log messages.
This adapter enriches log messages with job-specific context like job_id, source, and collection. Use this when processing a specific job to ensure all logs can be correlated.
Example
base_logger = setup_logger("thoth.worker") job_logger = JobLoggerAdapter(base_logger, job_id="job_123", source="handbook") job_logger.info("Processing started") job_logger.info("File processed", extra={"file_path": "readme.md"})
__init__(logger: logging.Logger, job_id: str, source: str | None = None, collection: str | None = None, **extra_context: Any) -> None
¶
Initialize the job logger adapter.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
logger
|
Logger
|
The base logger to wrap |
required |
job_id
|
str
|
Unique identifier for the job/run |
required |
source
|
str | None
|
Source being processed (e.g., "handbook", "dnd") |
None
|
collection
|
str | None
|
Collection name being used |
None
|
**extra_context
|
Any
|
Additional context to include in all logs |
{}
|
process(msg: str, kwargs: MutableMapping[str, Any]) -> tuple[str, MutableMapping[str, Any]]
¶
Process the log message to include job context.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
msg
|
str
|
The log message |
required |
kwargs
|
MutableMapping[str, Any]
|
Keyword arguments for the log call |
required |
Returns:
| Type | Description |
|---|---|
tuple[str, MutableMapping[str, Any]]
|
Tuple of (message, kwargs) with context added to extra |
with_operation(operation: str) -> JobLoggerAdapter
¶
Create a child logger for a specific operation.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
operation
|
str
|
The operation name (e.g., "chunking", "embedding", "storing") |
required |
Returns:
| Type | Description |
|---|---|
JobLoggerAdapter
|
A new JobLoggerAdapter with the operation context added |
SecureLogger
¶
Legacy SecureLogger class for backward compatibility.
New code should use setup_logger() which returns a standard Logger with GCPStructuredFormatter attached.
This class is maintained for backward compatibility with existing code that checks isinstance(logger, SecureLogger).
SENSITIVE_KEYWORDS: list[str] = GCPStructuredFormatter.SENSITIVE_KEYWORDS
class-attribute
¶
__init__(name: str, level: int = logging.NOTSET) -> None
¶
Initialize the SecureLogger.
debug(msg: Any, *args: Any, **kwargs: Any) -> None
¶
Log a debug message with safe formatting.
info(msg: Any, *args: Any, **kwargs: Any) -> None
¶
Log an info message with safe formatting.
warning(msg: Any, *args: Any, **kwargs: Any) -> None
¶
Log a warning message with safe formatting.
error(msg: Any, *args: Any, **kwargs: Any) -> None
¶
Log an error message with safe formatting.
critical(msg: Any, *args: Any, **kwargs: Any) -> None
¶
Log a critical message with safe formatting.
set_trace_context(trace_id: str | None, project_id: str | None = None) -> None
¶
Set the trace context for the current request/task.
Call this at the start of each request handler to enable log correlation.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
trace_id
|
str | None
|
The trace ID from X-Cloud-Trace-Context header |
required |
project_id
|
str | None
|
GCP project ID for constructing full trace URL |
None
|
get_trace_context() -> str | None
¶
Get the current trace context.
extract_trace_id_from_header(header_value: str | None) -> str | None
¶
Extract trace ID from X-Cloud-Trace-Context header.
The header format is: TRACE_ID/SPAN_ID;o=TRACE_TRUE
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
header_value
|
str | None
|
The X-Cloud-Trace-Context header value |
required |
Returns:
| Type | Description |
|---|---|
str | None
|
The trace ID portion, or None if header is missing/invalid |
setup_logger(name: str, level: int = logging.INFO, simple: bool = False, json_output: bool | None = None) -> logging.Logger
¶
Create and configure a logger with structured JSON output.
This function creates a logger that outputs structured JSON logs compatible with GCP Cloud Logging and Grafana Loki. By default, it auto-detects whether to use JSON output based on the environment.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
Name of the logger (typically name) |
required |
level
|
int
|
Logging level (default: INFO) |
INFO
|
simple
|
bool
|
If True, use simple text format instead of JSON (for local dev) |
False
|
json_output
|
bool | None
|
Explicit control over JSON output. If None, auto-detects: - True in Cloud Run (GCS_BUCKET_NAME set) - True if LOG_FORMAT=json - False otherwise (local development) |
None
|
Returns:
| Type | Description |
|---|---|
Logger
|
Configured logger instance |
Example
logger = setup_logger(name) logger.info("Server started", extra={"port": 8080})
With job context¶
logger.info("Processing", extra={"job_id": "abc123", "source": "handbook"})
get_job_logger(base_logger: logging.Logger, job_id: str, source: str | None = None, collection: str | None = None, **extra_context: Any) -> JobLoggerAdapter
¶
Create a job-scoped logger adapter.
This is the recommended way to create loggers for job processing. All log messages will automatically include the job context.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
base_logger
|
Logger
|
The base logger (from setup_logger) |
required |
job_id
|
str
|
Unique identifier for the job |
required |
source
|
str | None
|
Source being processed (e.g., "handbook") |
None
|
collection
|
str | None
|
Collection name |
None
|
**extra_context
|
Any
|
Additional context fields |
{}
|
Returns:
| Type | Description |
|---|---|
JobLoggerAdapter
|
JobLoggerAdapter with job context |
Example
logger = setup_logger("thoth.worker") job_logger = get_job_logger(logger, job_id="job_123", source="handbook") job_logger.info("Starting ingestion") job_logger.info("Processed file", extra={"file_path": "readme.md", "chunks_created": 15})
configure_root_logger(level: int = logging.INFO, json_output: bool | None = None) -> None
¶
Configure the root logger for the application.
Call this once at application startup to configure global logging behavior.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
level
|
int
|
Root logging level |
INFO
|
json_output
|
bool | None
|
Whether to use JSON output (auto-detects if None) |
None
|