Scheduler
thoth.shared.scheduler
¶
Scheduler for automated synchronization tasks.
This module provides scheduling functionality for periodic sync operations using APScheduler. It supports configurable intervals, manual triggers, and job status monitoring.
__all__ = ['SyncScheduler']
module-attribute
¶
logger = setup_logger(__name__)
module-attribute
¶
SyncScheduler
¶
Manages scheduled synchronization jobs.
This class wraps APScheduler to provide convenient scheduling of sync operations with monitoring hooks and manual trigger support.
Attributes:
| Name | Type | Description |
|---|---|---|
pipeline |
The ingestion pipeline to run on schedule |
|
scheduler |
APScheduler BackgroundScheduler instance |
|
job_id |
Unique identifier for the scheduled job |
|
logger |
Logger instance for recording events |
pipeline = pipeline
instance-attribute
¶
scheduler = BackgroundScheduler()
instance-attribute
¶
job_id = job_id
instance-attribute
¶
logger = logger_instance or setup_logger(__name__)
instance-attribute
¶
__init__(pipeline: IngestionPipeline, logger_instance: logging.Logger | None = None, job_id: str = 'sync_job')
¶
Initialize the sync scheduler.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
pipeline
|
IngestionPipeline
|
Configured IngestionPipeline instance |
required |
logger_instance
|
Logger | None
|
Optional logger instance |
None
|
job_id
|
str
|
Unique identifier for the scheduled job |
'sync_job'
|
add_interval_job(interval_minutes: int = 60, start_immediately: bool = False) -> None
¶
Schedule a sync job to run at regular intervals.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
interval_minutes
|
int
|
Minutes between sync runs (default: 60) |
60
|
start_immediately
|
bool
|
Whether to run the job immediately on schedule (default: False) |
False
|
add_cron_job(hour: int = 0, minute: int = 0, day_of_week: str = '*') -> None
¶
Schedule a sync job using a cron-like schedule.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
hour
|
int
|
Hour of day to run (0-23) |
0
|
minute
|
int
|
Minute of hour to run (0-59) |
0
|
day_of_week
|
str
|
Days to run (0-6 for Mon-Sun, or * for all) |
'*'
|
start() -> None
¶
Start the scheduler.
This will begin running scheduled jobs. The scheduler runs in the background and does not block.
stop(wait: bool = True) -> None
¶
Stop the scheduler.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
wait
|
bool
|
Whether to wait for running jobs to complete |
True
|
trigger_manual_sync() -> dict[str, Any]
¶
Manually trigger a sync operation immediately.
Returns:
| Type | Description |
|---|---|
dict[str, Any]
|
Dictionary containing sync statistics and status |
get_job_status() -> dict[str, Any]
¶
Get the current status of the scheduled job.
Returns:
| Type | Description |
|---|---|
dict[str, Any]
|
Dictionary with job status information: - scheduled: Whether job is scheduled - next_run_time: Next scheduled run (if scheduled) - running: Whether scheduler is running |
add_success_callback(callback: Callable[[dict[str, Any]], None]) -> None
¶
Add a callback to be called when sync succeeds.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
callback
|
Callable[[dict[str, Any]], None]
|
Function that takes sync stats dict as argument |
required |
add_failure_callback(callback: Callable[[Exception], None]) -> None
¶
Add a callback to be called when sync fails.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
callback
|
Callable[[Exception], None]
|
Function that takes exception as argument |
required |
pause_job() -> None
¶
Pause the scheduled job without stopping the scheduler.
resume_job() -> None
¶
Resume a paused job.
remove_job() -> None
¶
Remove the scheduled job.