thoth.shared.scheduler

Scheduler for automated synchronization tasks.

This module provides scheduling functionality for periodic sync operations using APScheduler. It supports configurable intervals, manual triggers, and job status monitoring.

Functions

setup_logger(name[, level, simple, json_output])

Create and configure a logger with structured JSON output.

Classes

Any(*args, **kwargs)

Special type indicating an unconstrained type.

BackgroundScheduler([gconfig])

A scheduler that runs in the background using a separate thread (start() will return immediately).

Callable()

CronTrigger([year, month, day, week, ...])

Triggers when current time matches all specified time constraints, similarly to how the UNIX cron scheduler works.

IngestionPipeline([repo_manager, chunker, ...])

Orchestrates the complete ingestion pipeline.

IntervalTrigger([weeks, days, hours, ...])

Triggers on specified intervals, starting on start_date if specified, datetime.now() + interval otherwise.

SyncScheduler(pipeline[, logger_instance, ...])

Manages scheduled synchronization jobs.

datetime(year, month, day[, hour[, minute[, ...)

The year, month and day arguments are required.

class thoth.shared.scheduler.SyncScheduler(pipeline: IngestionPipeline, logger_instance: Logger | None = None, job_id: str = 'sync_job')[source]

Bases: object

Manages scheduled synchronization jobs.

This class wraps APScheduler to provide convenient scheduling of sync operations with monitoring hooks and manual trigger support.

pipeline

The ingestion pipeline to run on schedule

scheduler

APScheduler BackgroundScheduler instance

job_id

Unique identifier for the scheduled job

logger

Logger instance for recording events

__init__(pipeline: IngestionPipeline, logger_instance: Logger | None = None, job_id: str = 'sync_job')[source]

Initialize the sync scheduler.

Parameters:
  • pipeline – Configured IngestionPipeline instance

  • logger_instance – Optional logger instance

  • job_id – Unique identifier for the scheduled job

add_interval_job(interval_minutes: int = 60, start_immediately: bool = False) None[source]

Schedule a sync job to run at regular intervals.

Parameters:
  • interval_minutes – Minutes between sync runs (default: 60)

  • start_immediately – Whether to run the job immediately on schedule (default: False)

add_cron_job(hour: int = 0, minute: int = 0, day_of_week: str = '*') None[source]

Schedule a sync job using a cron-like schedule.

Parameters:
  • hour – Hour of day to run (0-23)

  • minute – Minute of hour to run (0-59)

  • day_of_week – Days to run (0-6 for Mon-Sun, or * for all)

start() None[source]

Start the scheduler.

This will begin running scheduled jobs. The scheduler runs in the background and does not block.

stop(wait: bool = True) None[source]

Stop the scheduler.

Parameters:

wait – Whether to wait for running jobs to complete

trigger_manual_sync() dict[str, Any][source]

Manually trigger a sync operation immediately.

Returns:

Dictionary containing sync statistics and status

get_job_status() dict[str, Any][source]

Get the current status of the scheduled job.

Returns:

  • scheduled: Whether job is scheduled

  • next_run_time: Next scheduled run (if scheduled)

  • running: Whether scheduler is running

Return type:

Dictionary with job status information

add_success_callback(callback: Callable[[dict[str, Any]], None]) None[source]

Add a callback to be called when sync succeeds.

Parameters:

callback – Function that takes sync stats dict as argument

add_failure_callback(callback: Callable[[Exception], None]) None[source]

Add a callback to be called when sync fails.

Parameters:

callback – Function that takes exception as argument

pause_job() None[source]

Pause the scheduled job without stopping the scheduler.

resume_job() None[source]

Resume a paused job.

remove_job() None[source]

Remove the scheduled job.