Skip to content

Monitoring

thoth.shared.monitoring

Monitoring and health check system for Thoth.

This module provides metrics tracking, health status monitoring, and alerting hooks for the ingestion pipeline and scheduled operations.

logger = setup_logger(__name__) module-attribute

__all__ = ['HealthCheck', 'HealthStatus', 'Metrics', 'Monitor', 'create_default_health_checks'] module-attribute

HealthStatus

Enumeration of possible health statuses.

HEALTHY = 'healthy' class-attribute instance-attribute

DEGRADED = 'degraded' class-attribute instance-attribute

UNHEALTHY = 'unhealthy' class-attribute instance-attribute

UNKNOWN = 'unknown' class-attribute instance-attribute

HealthCheck dataclass

Represents a health check result.

Attributes:

Name Type Description
name str

Name of the health check

status HealthStatus

Health status result

message str

Human-readable status message

timestamp datetime

When the check was performed

metadata dict[str, Any]

Additional check-specific data

name: str instance-attribute

status: HealthStatus instance-attribute

message: str instance-attribute

timestamp: datetime = field(default_factory=(lambda: datetime.now(UTC))) class-attribute instance-attribute

metadata: dict[str, Any] = field(default_factory=dict) class-attribute instance-attribute

__init__(name: str, status: HealthStatus, message: str, timestamp: datetime = (lambda: datetime.now(UTC))(), metadata: dict[str, Any] = dict()) -> None

to_dict() -> dict[str, Any]

Convert this health check result to a JSON-serializable dict.

Returns:

Type Description
dict[str, Any]

Dict with name, status (str), message, timestamp (ISO), metadata.

Metrics dataclass

Tracks operational metrics.

Attributes:

Name Type Description
sync_count int

Total number of sync operations

sync_success_count int

Number of successful syncs

sync_failure_count int

Number of failed syncs

last_sync_time datetime | None

Timestamp of last sync attempt

last_sync_duration float

Duration of last sync in seconds

total_files_processed int

Cumulative files processed

total_chunks_created int

Cumulative chunks created

errors list[dict[str, str]]

List of recent error messages

sync_count: int = 0 class-attribute instance-attribute

sync_success_count: int = 0 class-attribute instance-attribute

sync_failure_count: int = 0 class-attribute instance-attribute

last_sync_time: datetime | None = None class-attribute instance-attribute

last_sync_duration: float = 0.0 class-attribute instance-attribute

total_files_processed: int = 0 class-attribute instance-attribute

total_chunks_created: int = 0 class-attribute instance-attribute

errors: list[dict[str, str]] = field(default_factory=list) class-attribute instance-attribute

__init__(sync_count: int = 0, sync_success_count: int = 0, sync_failure_count: int = 0, last_sync_time: datetime | None = None, last_sync_duration: float = 0.0, total_files_processed: int = 0, total_chunks_created: int = 0, errors: list[dict[str, str]] = list()) -> None

to_dict() -> dict[str, Any]

Convert metrics to a JSON-serializable dict for APIs or export.

Returns:

Type Description
dict[str, Any]

Dict with sync counts, last sync time/duration, totals, error_count, recent_errors.

Monitor

Monitoring system for tracking metrics and health status.

This class provides centralized monitoring with thread-safe metric collection, health checks, and alerting capabilities.

Attributes:

Name Type Description
metrics

Current operational metrics

health_checks dict[str, Callable[[], HealthCheck]]

Dictionary of registered health checks

alert_callbacks list[Callable[[str, dict[str, Any]], None]]

List of functions to call on alerts

logger

Logger instance

metrics = Metrics() instance-attribute

health_checks: dict[str, Callable[[], HealthCheck]] = {} instance-attribute

alert_callbacks: list[Callable[[str, dict[str, Any]], None]] = [] instance-attribute

logger = logger_instance or setup_logger(__name__) instance-attribute

max_errors = max_errors instance-attribute

__init__(logger_instance: logging.Logger | None = None, max_errors: int = 100)

Initialize the monitoring system.

Parameters:

Name Type Description Default
logger_instance Logger | None

Optional logger instance

None
max_errors int

Maximum number of errors to retain

100

record_sync_start() -> None

Record the start of a sync operation (thread-safe).

record_sync_success(files_processed: int, chunks_created: int, duration: float) -> None

Record a successful sync operation.

Parameters:

Name Type Description Default
files_processed int

Number of files processed

required
chunks_created int

Number of chunks created

required
duration float

Duration in seconds

required

record_sync_failure(error: Exception) -> None

Record a failed sync operation.

Parameters:

Name Type Description Default
error Exception

Exception that caused the failure

required

register_health_check(name: str, check_function: Callable[[], HealthCheck]) -> None

Register a health check function.

Parameters:

Name Type Description Default
name str

Unique name for the health check

required
check_function Callable[[], HealthCheck]

Function that returns a HealthCheck

required

run_health_checks() -> dict[str, HealthCheck]

Run all registered health checks.

Returns:

Type Description
dict[str, HealthCheck]

Dictionary mapping check names to results

get_overall_health() -> HealthStatus

Determine overall system health based on all checks.

Returns:

Type Description
HealthStatus

Overall HealthStatus

get_health_report() -> dict[str, Any]

Generate a comprehensive health report.

Returns:

Type Description
dict[str, Any]

Dictionary containing overall health and individual checks

get_metrics() -> dict[str, Any]

Get current metrics snapshot.

Returns:

Type Description
dict[str, Any]

Dictionary containing current metrics

add_alert_callback(callback: Callable[[str, dict[str, Any]], None]) -> None

Add a callback function for alerts.

The callback will be called with (alert_type, data) when alerts trigger.

Parameters:

Name Type Description Default
callback Callable[[str, dict[str, Any]], None]

Function to call on alerts

required

reset_metrics() -> None

Reset all metrics to initial values.

export_metrics(filepath: Path) -> None

Export metrics to a JSON file.

Parameters:

Name Type Description Default
filepath Path

Path to export file

required

get_success_rate() -> float

Calculate sync success rate.

Returns:

Type Description
float

Success rate as a percentage (0-100)

create_default_health_checks(vector_store_path: Path, repo_path: Path) -> dict[str, Callable[[], HealthCheck]]

Create default health check functions for common components.

Parameters:

Name Type Description Default
vector_store_path Path

Path to vector store database

required
repo_path Path

Path to repository

required

Returns:

Type Description
dict[str, Callable[[], HealthCheck]]

Dictionary of health check functions