Skip to content

Scheduler

thoth.shared.scheduler

Scheduler for automated synchronization tasks.

This module provides scheduling functionality for periodic sync operations using APScheduler. It supports configurable intervals, manual triggers, and job status monitoring.

__all__ = ['SyncScheduler'] module-attribute

logger = setup_logger(__name__) module-attribute

SyncScheduler

Manages scheduled synchronization jobs.

This class wraps APScheduler to provide convenient scheduling of sync operations with monitoring hooks and manual trigger support.

Attributes:

Name Type Description
pipeline

The ingestion pipeline to run on schedule

scheduler

APScheduler BackgroundScheduler instance

job_id

Unique identifier for the scheduled job

logger

Logger instance for recording events

pipeline = pipeline instance-attribute

scheduler = BackgroundScheduler() instance-attribute

job_id = job_id instance-attribute

logger = logger_instance or setup_logger(__name__) instance-attribute

__init__(pipeline: IngestionPipeline, logger_instance: logging.Logger | None = None, job_id: str = 'sync_job')

Initialize the sync scheduler.

Parameters:

Name Type Description Default
pipeline IngestionPipeline

Configured IngestionPipeline instance

required
logger_instance Logger | None

Optional logger instance

None
job_id str

Unique identifier for the scheduled job

'sync_job'

add_interval_job(interval_minutes: int = 60, start_immediately: bool = False) -> None

Schedule a sync job to run at regular intervals.

Parameters:

Name Type Description Default
interval_minutes int

Minutes between sync runs (default: 60)

60
start_immediately bool

Whether to run the job immediately on schedule (default: False)

False

add_cron_job(hour: int = 0, minute: int = 0, day_of_week: str = '*') -> None

Schedule a sync job using a cron-like schedule.

Parameters:

Name Type Description Default
hour int

Hour of day to run (0-23)

0
minute int

Minute of hour to run (0-59)

0
day_of_week str

Days to run (0-6 for Mon-Sun, or * for all)

'*'

start() -> None

Start the scheduler.

This will begin running scheduled jobs. The scheduler runs in the background and does not block.

stop(wait: bool = True) -> None

Stop the scheduler.

Parameters:

Name Type Description Default
wait bool

Whether to wait for running jobs to complete

True

trigger_manual_sync() -> dict[str, Any]

Manually trigger a sync operation immediately.

Returns:

Type Description
dict[str, Any]

Dictionary containing sync statistics and status

get_job_status() -> dict[str, Any]

Get the current status of the scheduled job.

Returns:

Type Description
dict[str, Any]

Dictionary with job status information: - scheduled: Whether job is scheduled - next_run_time: Next scheduled run (if scheduled) - running: Whether scheduler is running

add_success_callback(callback: Callable[[dict[str, Any]], None]) -> None

Add a callback to be called when sync succeeds.

Parameters:

Name Type Description Default
callback Callable[[dict[str, Any]], None]

Function that takes sync stats dict as argument

required

add_failure_callback(callback: Callable[[Exception], None]) -> None

Add a callback to be called when sync fails.

Parameters:

Name Type Description Default
callback Callable[[Exception], None]

Function that takes exception as argument

required

pause_job() -> None

Pause the scheduled job without stopping the scheduler.

resume_job() -> None

Resume a paused job.

remove_job() -> None

Remove the scheduled job.