Source code for thoth.shared.monitoring

"""Monitoring and health check system for Thoth.

This module provides metrics tracking, health status monitoring, and alerting
hooks for the ingestion pipeline and scheduled operations.
"""

from collections.abc import Callable
from dataclasses import dataclass, field
from datetime import UTC, datetime
from enum import Enum
import json
from pathlib import Path
import shutil
import threading
from typing import Any

from thoth.shared.utils.logger import logging, setup_logger

logger = setup_logger(__name__)

__all__ = [
    "HealthCheck",
    "HealthStatus",
    "Metrics",
    "Monitor",
    "create_default_health_checks",
]


[docs] class HealthStatus(Enum): """Enumeration of possible health statuses.""" HEALTHY = "healthy" DEGRADED = "degraded" UNHEALTHY = "unhealthy" UNKNOWN = "unknown"
[docs] @dataclass class HealthCheck: """Represents a health check result. Attributes: name: Name of the health check status: Health status result message: Human-readable status message timestamp: When the check was performed metadata: Additional check-specific data """ name: str status: HealthStatus message: str timestamp: datetime = field(default_factory=lambda: datetime.now(UTC)) metadata: dict[str, Any] = field(default_factory=dict)
[docs] def to_dict(self) -> dict[str, Any]: """Convert this health check result to a JSON-serializable dict. Returns: Dict with name, status (str), message, timestamp (ISO), metadata. """ return { "name": self.name, "status": self.status.value, "message": self.message, "timestamp": self.timestamp.isoformat(), "metadata": self.metadata, }
[docs] @dataclass class Metrics: """Tracks operational metrics. Attributes: sync_count: Total number of sync operations sync_success_count: Number of successful syncs sync_failure_count: Number of failed syncs last_sync_time: Timestamp of last sync attempt last_sync_duration: Duration of last sync in seconds total_files_processed: Cumulative files processed total_chunks_created: Cumulative chunks created errors: List of recent error messages """ sync_count: int = 0 sync_success_count: int = 0 sync_failure_count: int = 0 last_sync_time: datetime | None = None last_sync_duration: float = 0.0 total_files_processed: int = 0 total_chunks_created: int = 0 errors: list[dict[str, str]] = field(default_factory=list)
[docs] def to_dict(self) -> dict[str, Any]: """Convert metrics to a JSON-serializable dict for APIs or export. Returns: Dict with sync counts, last sync time/duration, totals, error_count, recent_errors. """ return { "sync_count": self.sync_count, "sync_success_count": self.sync_success_count, "sync_failure_count": self.sync_failure_count, "last_sync_time": (self.last_sync_time.isoformat() if self.last_sync_time else None), "last_sync_duration": self.last_sync_duration, "total_files_processed": self.total_files_processed, "total_chunks_created": self.total_chunks_created, "error_count": len(self.errors), "recent_errors": self.errors[-10:], # Last 10 errors }
[docs] class Monitor: """Monitoring system for tracking metrics and health status. This class provides centralized monitoring with thread-safe metric collection, health checks, and alerting capabilities. Attributes: metrics: Current operational metrics health_checks: Dictionary of registered health checks alert_callbacks: List of functions to call on alerts logger: Logger instance """
[docs] def __init__( self, logger_instance: logging.Logger | None = None, max_errors: int = 100, ): """Initialize the monitoring system. Args: logger_instance: Optional logger instance max_errors: Maximum number of errors to retain """ self.metrics = Metrics() self.health_checks: dict[str, Callable[[], HealthCheck]] = {} self.alert_callbacks: list[Callable[[str, dict[str, Any]], None]] = [] self.logger = logger_instance or setup_logger(__name__) self.max_errors = max_errors self._lock = threading.Lock()
[docs] def record_sync_start(self) -> None: """Record the start of a sync operation (thread-safe).""" with self._lock: # Protects metrics from concurrent scheduler/CLI updates self.metrics.sync_count += 1 self.metrics.last_sync_time = datetime.now(UTC) self.logger.debug("Sync operation started")
[docs] def record_sync_success( self, files_processed: int, chunks_created: int, duration: float, ) -> None: """Record a successful sync operation. Args: files_processed: Number of files processed chunks_created: Number of chunks created duration: Duration in seconds """ with self._lock: self.metrics.sync_success_count += 1 self.metrics.last_sync_duration = duration self.metrics.total_files_processed += files_processed self.metrics.total_chunks_created += chunks_created self.logger.info( f"Sync success recorded: {files_processed} files, {chunks_created} chunks, {duration:.2f}s" )
[docs] def record_sync_failure(self, error: Exception) -> None: """Record a failed sync operation. Args: error: Exception that caused the failure """ with self._lock: self.metrics.sync_failure_count += 1 error_info = { "timestamp": datetime.now(UTC).isoformat(), "type": type(error).__name__, "message": str(error), } self.metrics.errors.append(error_info) # Trim errors list if it exceeds max if len(self.metrics.errors) > self.max_errors: self.metrics.errors = self.metrics.errors[-self.max_errors :] self.logger.error(f"Sync failure recorded: {error}") # Trigger alert self._trigger_alert("sync_failure", error_info)
[docs] def register_health_check( self, name: str, check_function: Callable[[], HealthCheck], ) -> None: """Register a health check function. Args: name: Unique name for the health check check_function: Function that returns a HealthCheck """ self.health_checks[name] = check_function self.logger.debug(f"Registered health check: {name}")
[docs] def run_health_checks(self) -> dict[str, HealthCheck]: """Run all registered health checks. Returns: Dictionary mapping check names to results """ results = {} for name, check_func in self.health_checks.items(): try: result = check_func() results[name] = result if result.status == HealthStatus.UNHEALTHY: self._trigger_alert( "health_check_failed", result.to_dict(), ) except Exception as e: self.logger.exception(f"Health check '{name}' failed") results[name] = HealthCheck( name=name, status=HealthStatus.UNKNOWN, message=f"Check failed with error: {e}", ) return results
[docs] def get_overall_health(self) -> HealthStatus: """Determine overall system health based on all checks. Returns: Overall HealthStatus """ results = self.run_health_checks() if not results: return HealthStatus.UNKNOWN statuses = [check.status for check in results.values()] if HealthStatus.UNHEALTHY in statuses: return HealthStatus.UNHEALTHY if HealthStatus.DEGRADED in statuses: return HealthStatus.DEGRADED if HealthStatus.UNKNOWN in statuses: return HealthStatus.UNKNOWN return HealthStatus.HEALTHY
[docs] def get_health_report(self) -> dict[str, Any]: """Generate a comprehensive health report. Returns: Dictionary containing overall health and individual checks """ health_checks = self.run_health_checks() overall_status = self.get_overall_health() return { "overall_status": overall_status.value, "timestamp": datetime.now(UTC).isoformat(), "checks": {name: check.to_dict() for name, check in health_checks.items()}, }
[docs] def get_metrics(self) -> dict[str, Any]: """Get current metrics snapshot. Returns: Dictionary containing current metrics """ with self._lock: return self.metrics.to_dict()
[docs] def add_alert_callback( self, callback: Callable[[str, dict[str, Any]], None], ) -> None: """Add a callback function for alerts. The callback will be called with (alert_type, data) when alerts trigger. Args: callback: Function to call on alerts """ self.alert_callbacks.append(callback) self.logger.debug("Alert callback registered")
def _trigger_alert(self, alert_type: str, data: dict[str, Any]) -> None: """Trigger an alert by calling all registered callbacks. Args: alert_type: Type of alert (e.g., "sync_failure") data: Additional data about the alert """ alert_info = { "type": alert_type, "timestamp": datetime.now(UTC).isoformat(), "data": data, } self.logger.warning(f"Alert triggered: {alert_type}") for callback in self.alert_callbacks: try: callback(alert_type, alert_info) except Exception: self.logger.exception("Error in alert callback")
[docs] def reset_metrics(self) -> None: """Reset all metrics to initial values.""" with self._lock: self.metrics = Metrics() self.logger.info("Metrics reset")
[docs] def export_metrics(self, filepath: Path) -> None: """Export metrics to a JSON file. Args: filepath: Path to export file """ metrics_data = self.get_metrics() with filepath.open("w") as f: json.dump(metrics_data, f, indent=2) self.logger.info(f"Metrics exported to {filepath}")
[docs] def get_success_rate(self) -> float: """Calculate sync success rate. Returns: Success rate as a percentage (0-100) """ with self._lock: if self.metrics.sync_count == 0: return 0.0 return (self.metrics.sync_success_count / self.metrics.sync_count) * 100
[docs] def create_default_health_checks( vector_store_path: Path, repo_path: Path, ) -> dict[str, Callable[[], HealthCheck]]: """Create default health check functions for common components. Args: vector_store_path: Path to vector store database repo_path: Path to repository Returns: Dictionary of health check functions """ def check_vector_store() -> HealthCheck: """Check if vector store is accessible and valid.""" try: if not vector_store_path.exists(): return HealthCheck( name="vector_store", status=HealthStatus.UNHEALTHY, message="Vector store does not exist", ) # Check if it's a directory if not vector_store_path.is_dir(): return HealthCheck( name="vector_store", status=HealthStatus.UNHEALTHY, message="Vector store path is not a directory", ) return HealthCheck( name="vector_store", status=HealthStatus.HEALTHY, message="Vector store is accessible", metadata={"path": str(vector_store_path)}, ) except (OSError, ValueError) as e: return HealthCheck( name="vector_store", status=HealthStatus.UNHEALTHY, message=f"Error checking vector store: {e}", ) def check_repository() -> HealthCheck: """Check if repository is present and valid.""" try: if not repo_path.exists(): return HealthCheck( name="repository", status=HealthStatus.DEGRADED, message="Repository not cloned", ) # Check for .git directory git_dir = repo_path / ".git" if not git_dir.exists(): return HealthCheck( name="repository", status=HealthStatus.UNHEALTHY, message="Repository directory exists but is not a git repo", ) return HealthCheck( name="repository", status=HealthStatus.HEALTHY, message="Repository is valid", metadata={"path": str(repo_path)}, ) except (OSError, ValueError) as e: return HealthCheck( name="repository", status=HealthStatus.UNHEALTHY, message=f"Error checking repository: {e}", ) def check_disk_space() -> HealthCheck: """Check available disk space.""" try: stats = shutil.disk_usage(vector_store_path.parent) free_gb = stats.free / (1024**3) total_gb = stats.total / (1024**3) percent_free = (stats.free / stats.total) * 100 if percent_free < 5: status = HealthStatus.UNHEALTHY message = f"Critical: Only {free_gb:.1f}GB free" elif percent_free < 15: status = HealthStatus.DEGRADED message = f"Warning: Only {free_gb:.1f}GB free" else: status = HealthStatus.HEALTHY message = f"Sufficient space: {free_gb:.1f}GB free" return HealthCheck( name="disk_space", status=status, message=message, metadata={ "free_gb": round(free_gb, 2), "total_gb": round(total_gb, 2), "percent_free": round(percent_free, 2), }, ) except (OSError, ValueError) as e: return HealthCheck( name="disk_space", status=HealthStatus.UNKNOWN, message=f"Error checking disk space: {e}", ) return { "vector_store": check_vector_store, "repository": check_repository, "disk_space": check_disk_space, }