Monitoring
thoth.shared.monitoring
¶
Monitoring and health check system for Thoth.
This module provides metrics tracking, health status monitoring, and alerting hooks for the ingestion pipeline and scheduled operations.
logger = setup_logger(__name__)
module-attribute
¶
__all__ = ['HealthCheck', 'HealthStatus', 'Metrics', 'Monitor', 'create_default_health_checks']
module-attribute
¶
HealthStatus
¶
HealthCheck
dataclass
¶
Represents a health check result.
Attributes:
| Name | Type | Description |
|---|---|---|
name |
str
|
Name of the health check |
status |
HealthStatus
|
Health status result |
message |
str
|
Human-readable status message |
timestamp |
datetime
|
When the check was performed |
metadata |
dict[str, Any]
|
Additional check-specific data |
name: str
instance-attribute
¶
status: HealthStatus
instance-attribute
¶
message: str
instance-attribute
¶
timestamp: datetime = field(default_factory=(lambda: datetime.now(UTC)))
class-attribute
instance-attribute
¶
metadata: dict[str, Any] = field(default_factory=dict)
class-attribute
instance-attribute
¶
__init__(name: str, status: HealthStatus, message: str, timestamp: datetime = (lambda: datetime.now(UTC))(), metadata: dict[str, Any] = dict()) -> None
¶
to_dict() -> dict[str, Any]
¶
Convert this health check result to a JSON-serializable dict.
Returns:
| Type | Description |
|---|---|
dict[str, Any]
|
Dict with name, status (str), message, timestamp (ISO), metadata. |
Metrics
dataclass
¶
Tracks operational metrics.
Attributes:
| Name | Type | Description |
|---|---|---|
sync_count |
int
|
Total number of sync operations |
sync_success_count |
int
|
Number of successful syncs |
sync_failure_count |
int
|
Number of failed syncs |
last_sync_time |
datetime | None
|
Timestamp of last sync attempt |
last_sync_duration |
float
|
Duration of last sync in seconds |
total_files_processed |
int
|
Cumulative files processed |
total_chunks_created |
int
|
Cumulative chunks created |
errors |
list[dict[str, str]]
|
List of recent error messages |
sync_count: int = 0
class-attribute
instance-attribute
¶
sync_success_count: int = 0
class-attribute
instance-attribute
¶
sync_failure_count: int = 0
class-attribute
instance-attribute
¶
last_sync_time: datetime | None = None
class-attribute
instance-attribute
¶
last_sync_duration: float = 0.0
class-attribute
instance-attribute
¶
total_files_processed: int = 0
class-attribute
instance-attribute
¶
total_chunks_created: int = 0
class-attribute
instance-attribute
¶
errors: list[dict[str, str]] = field(default_factory=list)
class-attribute
instance-attribute
¶
__init__(sync_count: int = 0, sync_success_count: int = 0, sync_failure_count: int = 0, last_sync_time: datetime | None = None, last_sync_duration: float = 0.0, total_files_processed: int = 0, total_chunks_created: int = 0, errors: list[dict[str, str]] = list()) -> None
¶
to_dict() -> dict[str, Any]
¶
Convert metrics to a JSON-serializable dict for APIs or export.
Returns:
| Type | Description |
|---|---|
dict[str, Any]
|
Dict with sync counts, last sync time/duration, totals, error_count, recent_errors. |
Monitor
¶
Monitoring system for tracking metrics and health status.
This class provides centralized monitoring with thread-safe metric collection, health checks, and alerting capabilities.
Attributes:
| Name | Type | Description |
|---|---|---|
metrics |
Current operational metrics |
|
health_checks |
dict[str, Callable[[], HealthCheck]]
|
Dictionary of registered health checks |
alert_callbacks |
list[Callable[[str, dict[str, Any]], None]]
|
List of functions to call on alerts |
logger |
Logger instance |
metrics = Metrics()
instance-attribute
¶
health_checks: dict[str, Callable[[], HealthCheck]] = {}
instance-attribute
¶
alert_callbacks: list[Callable[[str, dict[str, Any]], None]] = []
instance-attribute
¶
logger = logger_instance or setup_logger(__name__)
instance-attribute
¶
max_errors = max_errors
instance-attribute
¶
__init__(logger_instance: logging.Logger | None = None, max_errors: int = 100)
¶
Initialize the monitoring system.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
logger_instance
|
Logger | None
|
Optional logger instance |
None
|
max_errors
|
int
|
Maximum number of errors to retain |
100
|
record_sync_start() -> None
¶
Record the start of a sync operation (thread-safe).
record_sync_success(files_processed: int, chunks_created: int, duration: float) -> None
¶
Record a successful sync operation.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
files_processed
|
int
|
Number of files processed |
required |
chunks_created
|
int
|
Number of chunks created |
required |
duration
|
float
|
Duration in seconds |
required |
record_sync_failure(error: Exception) -> None
¶
Record a failed sync operation.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
error
|
Exception
|
Exception that caused the failure |
required |
register_health_check(name: str, check_function: Callable[[], HealthCheck]) -> None
¶
Register a health check function.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
Unique name for the health check |
required |
check_function
|
Callable[[], HealthCheck]
|
Function that returns a HealthCheck |
required |
run_health_checks() -> dict[str, HealthCheck]
¶
Run all registered health checks.
Returns:
| Type | Description |
|---|---|
dict[str, HealthCheck]
|
Dictionary mapping check names to results |
get_overall_health() -> HealthStatus
¶
Determine overall system health based on all checks.
Returns:
| Type | Description |
|---|---|
HealthStatus
|
Overall HealthStatus |
get_health_report() -> dict[str, Any]
¶
Generate a comprehensive health report.
Returns:
| Type | Description |
|---|---|
dict[str, Any]
|
Dictionary containing overall health and individual checks |
get_metrics() -> dict[str, Any]
¶
Get current metrics snapshot.
Returns:
| Type | Description |
|---|---|
dict[str, Any]
|
Dictionary containing current metrics |
add_alert_callback(callback: Callable[[str, dict[str, Any]], None]) -> None
¶
Add a callback function for alerts.
The callback will be called with (alert_type, data) when alerts trigger.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
callback
|
Callable[[str, dict[str, Any]], None]
|
Function to call on alerts |
required |
reset_metrics() -> None
¶
Reset all metrics to initial values.
export_metrics(filepath: Path) -> None
¶
Export metrics to a JSON file.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
filepath
|
Path
|
Path to export file |
required |
get_success_rate() -> float
¶
Calculate sync success rate.
Returns:
| Type | Description |
|---|---|
float
|
Success rate as a percentage (0-100) |
create_default_health_checks(vector_store_path: Path, repo_path: Path) -> dict[str, Callable[[], HealthCheck]]
¶
Create default health check functions for common components.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
vector_store_path
|
Path
|
Path to vector store database |
required |
repo_path
|
Path
|
Path to repository |
required |
Returns:
| Type | Description |
|---|---|
dict[str, Callable[[], HealthCheck]]
|
Dictionary of health check functions |