thoth.shared.scheduler¶
Scheduler for automated synchronization tasks.
This module provides scheduling functionality for periodic sync operations using APScheduler. It supports configurable intervals, manual triggers, and job status monitoring.
Functions
|
Create and configure a logger with structured JSON output. |
Classes
|
Special type indicating an unconstrained type. |
|
A scheduler that runs in the background using a separate thread ( |
|
|
|
Triggers when current time matches all specified time constraints, similarly to how the UNIX cron scheduler works. |
|
Orchestrates the complete ingestion pipeline. |
|
Triggers on specified intervals, starting on |
|
Manages scheduled synchronization jobs. |
|
The year, month and day arguments are required. |
- class thoth.shared.scheduler.SyncScheduler(pipeline: IngestionPipeline, logger_instance: Logger | None = None, job_id: str = 'sync_job')[source]¶
Bases:
objectManages scheduled synchronization jobs.
This class wraps APScheduler to provide convenient scheduling of sync operations with monitoring hooks and manual trigger support.
- pipeline¶
The ingestion pipeline to run on schedule
- scheduler¶
APScheduler BackgroundScheduler instance
- job_id¶
Unique identifier for the scheduled job
- logger¶
Logger instance for recording events
- __init__(pipeline: IngestionPipeline, logger_instance: Logger | None = None, job_id: str = 'sync_job')[source]¶
Initialize the sync scheduler.
- Parameters:
pipeline – Configured IngestionPipeline instance
logger_instance – Optional logger instance
job_id – Unique identifier for the scheduled job
- add_interval_job(interval_minutes: int = 60, start_immediately: bool = False) → None[source]¶
Schedule a sync job to run at regular intervals.
- Parameters:
interval_minutes – Minutes between sync runs (default: 60)
start_immediately – Whether to run the job immediately on schedule (default: False)
- add_cron_job(hour: int = 0, minute: int = 0, day_of_week: str = '*') → None[source]¶
Schedule a sync job using a cron-like schedule.
- Parameters:
hour – Hour of day to run (0-23)
minute – Minute of hour to run (0-59)
day_of_week – Days to run (0-6 for Mon-Sun, or * for all)
- start() → None[source]¶
Start the scheduler.
This will begin running scheduled jobs. The scheduler runs in the background and does not block.
- stop(wait: bool = True) → None[source]¶
Stop the scheduler.
- Parameters:
wait – Whether to wait for running jobs to complete
- trigger_manual_sync() → dict[str, Any][source]¶
Manually trigger a sync operation immediately.
- Returns:
Dictionary containing sync statistics and status
- get_job_status() → dict[str, Any][source]¶
Get the current status of the scheduled job.
- Returns:
scheduled: Whether job is scheduled
next_run_time: Next scheduled run (if scheduled)
running: Whether scheduler is running
- Return type:
Dictionary with job status information
- add_success_callback(callback: Callable[[dict[str, Any]], None]) → None[source]¶
Add a callback to be called when sync succeeds.
- Parameters:
callback – Function that takes sync stats dict as argument