thoth.shared.monitoring

Monitoring and health check system for Thoth.

This module provides metrics tracking, health status monitoring, and alerting hooks for the ingestion pipeline and scheduled operations.

Functions

create_default_health_checks(...)

Create default health check functions for common components.

dataclass([cls, init, repr, eq, order, ...])

Add dunder methods based on the fields defined in the class.

field(*[, default, default_factory, init, ...])

Return an object to identify dataclass fields.

setup_logger(name[, level, simple, json_output])

Create and configure a logger with structured JSON output.

Classes

Any(*args, **kwargs)

Special type indicating an unconstrained type.

Callable()

Enum(new_class_name, /, names, *[, module, ...])

Create a collection of name/value pairs.

HealthCheck(name, status, message, ...)

Represents a health check result.

HealthStatus(*values)

Enumeration of possible health statuses.

Metrics(sync_count, sync_success_count, ...)

Tracks operational metrics.

Monitor([logger_instance, max_errors])

Monitoring system for tracking metrics and health status.

Path(*args, **kwargs)

PurePath subclass that can make system calls.

datetime(year, month, day[, hour[, minute[, ...)

The year, month and day arguments are required.

class thoth.shared.monitoring.HealthCheck(name: str, status: HealthStatus, message: str, timestamp: datetime = <factory>, metadata: dict[str, ~typing.Any]=<factory>)[source]

Bases: object

Represents a health check result.

name

Name of the health check

Type:

str

status

Health status result

Type:

thoth.shared.monitoring.HealthStatus

message

Human-readable status message

Type:

str

timestamp

When the check was performed

Type:

datetime.datetime

metadata

Additional check-specific data

Type:

dict[str, Any]

name: str
status: HealthStatus
message: str
timestamp: datetime
metadata: dict[str, Any]
to_dict() dict[str, Any][source]

Convert this health check result to a JSON-serializable dict.

Returns:

Dict with name, status (str), message, timestamp (ISO), metadata.

__init__(name: str, status: HealthStatus, message: str, timestamp: datetime = <factory>, metadata: dict[str, ~typing.Any]=<factory>) None
class thoth.shared.monitoring.HealthStatus(*values)[source]

Bases: Enum

Enumeration of possible health statuses.

HEALTHY = 'healthy'
DEGRADED = 'degraded'
UNHEALTHY = 'unhealthy'
UNKNOWN = 'unknown'
class thoth.shared.monitoring.Metrics(sync_count: int = 0, sync_success_count: int = 0, sync_failure_count: int = 0, last_sync_time: datetime | None = None, last_sync_duration: float = 0.0, total_files_processed: int = 0, total_chunks_created: int = 0, errors: list[dict[str, str]]=<factory>)[source]

Bases: object

Tracks operational metrics.

sync_count

Total number of sync operations

Type:

int

sync_success_count

Number of successful syncs

Type:

int

sync_failure_count

Number of failed syncs

Type:

int

last_sync_time

Timestamp of last sync attempt

Type:

datetime.datetime | None

last_sync_duration

Duration of last sync in seconds

Type:

float

total_files_processed

Cumulative files processed

Type:

int

total_chunks_created

Cumulative chunks created

Type:

int

errors

List of recent error messages

Type:

list[dict[str, str]]

sync_count: int = 0
sync_success_count: int = 0
sync_failure_count: int = 0
last_sync_time: datetime | None = None
last_sync_duration: float = 0.0
total_files_processed: int = 0
total_chunks_created: int = 0
errors: list[dict[str, str]]
to_dict() dict[str, Any][source]

Convert metrics to a JSON-serializable dict for APIs or export.

Returns:

Dict with sync counts, last sync time/duration, totals, error_count, recent_errors.

__init__(sync_count: int = 0, sync_success_count: int = 0, sync_failure_count: int = 0, last_sync_time: datetime | None = None, last_sync_duration: float = 0.0, total_files_processed: int = 0, total_chunks_created: int = 0, errors: list[dict[str, str]]=<factory>) None
class thoth.shared.monitoring.Monitor(logger_instance: Logger | None = None, max_errors: int = 100)[source]

Bases: object

Monitoring system for tracking metrics and health status.

This class provides centralized monitoring with thread-safe metric collection, health checks, and alerting capabilities.

metrics

Current operational metrics

health_checks

Dictionary of registered health checks

alert_callbacks

List of functions to call on alerts

logger

Logger instance

__init__(logger_instance: Logger | None = None, max_errors: int = 100)[source]

Initialize the monitoring system.

Parameters:
  • logger_instance – Optional logger instance

  • max_errors – Maximum number of errors to retain

health_checks: dict[str, Callable[[], HealthCheck]]
alert_callbacks: list[Callable[[str, dict[str, Any]], None]]
record_sync_start() None[source]

Record the start of a sync operation (thread-safe).

record_sync_success(files_processed: int, chunks_created: int, duration: float) None[source]

Record a successful sync operation.

Parameters:
  • files_processed – Number of files processed

  • chunks_created – Number of chunks created

  • duration – Duration in seconds

record_sync_failure(error: Exception) None[source]

Record a failed sync operation.

Parameters:

error – Exception that caused the failure

register_health_check(name: str, check_function: Callable[[], HealthCheck]) None[source]

Register a health check function.

Parameters:
  • name – Unique name for the health check

  • check_function – Function that returns a HealthCheck

run_health_checks() dict[str, HealthCheck][source]

Run all registered health checks.

Returns:

Dictionary mapping check names to results

get_overall_health() HealthStatus[source]

Determine overall system health based on all checks.

Returns:

Overall HealthStatus

get_health_report() dict[str, Any][source]

Generate a comprehensive health report.

Returns:

Dictionary containing overall health and individual checks

get_metrics() dict[str, Any][source]

Get current metrics snapshot.

Returns:

Dictionary containing current metrics

add_alert_callback(callback: Callable[[str, dict[str, Any]], None]) None[source]

Add a callback function for alerts.

The callback will be called with (alert_type, data) when alerts trigger.

Parameters:

callback – Function to call on alerts

reset_metrics() None[source]

Reset all metrics to initial values.

export_metrics(filepath: Path) None[source]

Export metrics to a JSON file.

Parameters:

filepath – Path to export file

get_success_rate() float[source]

Calculate sync success rate.

Returns:

Success rate as a percentage (0-100)

thoth.shared.monitoring.create_default_health_checks(vector_store_path: Path, repo_path: Path) dict[str, Callable[[], HealthCheck]][source]

Create default health check functions for common components.

Parameters:
  • vector_store_path – Path to vector store database

  • repo_path – Path to repository

Returns:

Dictionary of health check functions