Source code for thoth.shared.scheduler

"""Scheduler for automated synchronization tasks.

This module provides scheduling functionality for periodic sync operations
using APScheduler. It supports configurable intervals, manual triggers, and
job status monitoring.
"""

from collections.abc import Callable
from datetime import UTC, datetime
from typing import Any

from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger
from apscheduler.triggers.interval import IntervalTrigger

from thoth.ingestion.pipeline import IngestionPipeline
from thoth.shared.utils.logger import logging, setup_logger

__all__ = ["SyncScheduler"]
logger = setup_logger(__name__)


[docs] class SyncScheduler: """Manages scheduled synchronization jobs. This class wraps APScheduler to provide convenient scheduling of sync operations with monitoring hooks and manual trigger support. Attributes: pipeline: The ingestion pipeline to run on schedule scheduler: APScheduler BackgroundScheduler instance job_id: Unique identifier for the scheduled job logger: Logger instance for recording events """
[docs] def __init__( self, pipeline: IngestionPipeline, logger_instance: logging.Logger | None = None, job_id: str = "sync_job", ): """Initialize the sync scheduler. Args: pipeline: Configured IngestionPipeline instance logger_instance: Optional logger instance job_id: Unique identifier for the scheduled job """ self.pipeline = pipeline self.scheduler = BackgroundScheduler() self.job_id = job_id self.logger = logger_instance or setup_logger(__name__) self._on_success_callbacks: list[Callable[[dict[str, Any]], None]] = [] self._on_failure_callbacks: list[Callable[[Exception], None]] = []
[docs] def add_interval_job( self, interval_minutes: int = 60, start_immediately: bool = False, ) -> None: """Schedule a sync job to run at regular intervals. Args: interval_minutes: Minutes between sync runs (default: 60) start_immediately: Whether to run the job immediately on schedule (default: False) """ if interval_minutes <= 0: self.logger.error( "Invalid interval_minutes value: %s. The interval must be greater than 0.", interval_minutes, ) msg = f"interval_minutes must be greater than 0, got {interval_minutes}" raise ValueError(msg) trigger = IntervalTrigger(minutes=interval_minutes) self.scheduler.add_job( func=self._run_sync, trigger=trigger, id=self.job_id, name="Periodic Handbook Sync", replace_existing=True, ) self.logger.info(f"Scheduled sync job to run every {interval_minutes} minutes") if start_immediately: self.trigger_manual_sync()
[docs] def add_cron_job( self, hour: int = 0, minute: int = 0, day_of_week: str = "*", ) -> None: """Schedule a sync job using a cron-like schedule. Args: hour: Hour of day to run (0-23) minute: Minute of hour to run (0-59) day_of_week: Days to run (0-6 for Mon-Sun, or * for all) """ trigger = CronTrigger( hour=hour, minute=minute, day_of_week=day_of_week, ) self.scheduler.add_job( func=self._run_sync, trigger=trigger, id=self.job_id, name="Cron Handbook Sync", replace_existing=True, ) self.logger.info(f"Scheduled sync job (cron): hour={hour}, minute={minute}, day_of_week={day_of_week}")
[docs] def start(self) -> None: """Start the scheduler. This will begin running scheduled jobs. The scheduler runs in the background and does not block. """ if not self.scheduler.running: self.scheduler.start() self.logger.info("Scheduler started") else: self.logger.warning("Scheduler is already running")
[docs] def stop(self, wait: bool = True) -> None: """Stop the scheduler. Args: wait: Whether to wait for running jobs to complete """ if self.scheduler.running: self.scheduler.shutdown(wait=wait) self.logger.info("Scheduler stopped") else: self.logger.warning("Scheduler is not running")
[docs] def trigger_manual_sync(self) -> dict[str, Any]: """Manually trigger a sync operation immediately. Returns: Dictionary containing sync statistics and status """ self.logger.info("Manual sync triggered") return self._run_sync()
[docs] def get_job_status(self) -> dict[str, Any]: """Get the current status of the scheduled job. Returns: Dictionary with job status information: - scheduled: Whether job is scheduled - next_run_time: Next scheduled run (if scheduled) - running: Whether scheduler is running """ job = self.scheduler.get_job(self.job_id) status = { "scheduled": job is not None, "next_run_time": None, "running": self.scheduler.running, } if job: status["next_run_time"] = job.next_run_time.isoformat() if job.next_run_time else None return status
[docs] def add_success_callback( self, callback: Callable[[dict[str, Any]], None], ) -> None: """Add a callback to be called when sync succeeds. Args: callback: Function that takes sync stats dict as argument """ self._on_success_callbacks.append(callback)
[docs] def add_failure_callback( self, callback: Callable[[Exception], None], ) -> None: """Add a callback to be called when sync fails. Args: callback: Function that takes exception as argument """ self._on_failure_callbacks.append(callback)
def _run_sync(self) -> dict[str, Any]: """Internal method to run the sync pipeline. Executes the pipeline and calls registered callbacks on success/failure. Returns: Dictionary containing sync statistics """ self.logger.info("Starting scheduled sync operation") start_time = datetime.now(UTC) try: # Run the pipeline in incremental mode stats = self.pipeline.run( force_reclone=False, incremental=True, ) # Convert stats to dict stats_dict = { "success": True, "start_time": start_time.isoformat(), "end_time": datetime.now(UTC).isoformat(), "processed_files": stats.processed_files, "failed_files": stats.failed_files, "total_chunks": stats.total_chunks, "total_documents": stats.total_documents, "duration_seconds": stats.duration_seconds, } self.logger.info(f"Sync completed successfully: {stats.processed_files} files processed") # Call success callbacks for success_callback in self._on_success_callbacks: try: success_callback(stats_dict) except Exception: self.logger.exception("Error in success callback") return stats_dict except Exception as e: self.logger.exception("Sync operation failed") # Call failure callbacks for failure_callback in self._on_failure_callbacks: try: failure_callback(e) except Exception: self.logger.exception("Error in failure callback") # Re-raise to allow APScheduler to handle it raise
[docs] def pause_job(self) -> None: """Pause the scheduled job without stopping the scheduler.""" job = self.scheduler.get_job(self.job_id) if job: job.pause() self.logger.info("Job paused") else: self.logger.warning("No job to pause")
[docs] def resume_job(self) -> None: """Resume a paused job.""" job = self.scheduler.get_job(self.job_id) if job: job.resume() self.logger.info("Job resumed") else: self.logger.warning("No job to resume")
[docs] def remove_job(self) -> None: """Remove the scheduled job.""" if self.scheduler.get_job(self.job_id): self.scheduler.remove_job(self.job_id) self.logger.info("Job removed") else: self.logger.warning("No job to remove")