thoth.ingestion.job_manager¶
Job manager for tracking ingestion jobs using Firestore.
This module provides job tracking and status management for ingestion operations using Google Cloud Firestore as the backend.
Functions
|
Add dunder methods based on the fields defined in the class. |
|
Return an object to identify dataclass fields. |
|
Generate Cloud Logging URL for a specific job. |
|
Create and configure a logger with structured JSON output. |
Classes
|
Special type indicating an unconstrained type. |
|
Create a collection of name/value pairs. |
|
Represents an ingestion job. |
|
Manages job state in Firestore. |
|
Statistics for a job. |
|
Job status enumeration. |
|
The year, month and day arguments are required. |
- thoth.ingestion.job_manager.get_job_logs_url(job_id: str, project_id: str | None = None, service_name: str = 'thoth-ingestion-worker') str[source]¶
Generate Cloud Logging URL for a specific job.
- Parameters:
job_id – Job identifier to filter logs
project_id – GCP project ID (defaults to environment variable)
service_name – Cloud Run service name
- Returns:
URL to view logs in GCP Console
- class thoth.ingestion.job_manager.JobStatus(*values)[source]¶
Bases:
EnumJob status enumeration.
- PENDING = 'pending'¶
- RUNNING = 'running'¶
- COMPLETED = 'completed'¶
- FAILED = 'failed'¶
- class thoth.ingestion.job_manager.JobStats(total_files: int = 0, processed_files: int = 0, failed_files: int = 0, total_chunks: int = 0, total_documents: int = 0)[source]¶
Bases:
objectStatistics for a job.
- class thoth.ingestion.job_manager.Job(job_id: str, status: JobStatus, source: str, collection_name: str, started_at: datetime, completed_at: datetime | None = None, stats: JobStats = <factory>, error: str | None = None, parent_job_id: str | None = None, batch_index: int | None = None, total_batches: int | None = None, completed_batches: int = 0)[source]¶
Bases:
objectRepresents an ingestion job.
- status¶
Current job status
- started_at¶
Job start timestamp
- Type:
- completed_at¶
Job completion timestamp (if finished)
- Type:
datetime.datetime | None
- stats¶
Job statistics
- classmethod from_dict(data: dict[str, Any]) Job[source]¶
Create Job from Firestore document dictionary.
- __init__(job_id: str, status: JobStatus, source: str, collection_name: str, started_at: datetime, completed_at: datetime | None = None, stats: JobStats = <factory>, error: str | None = None, parent_job_id: str | None = None, batch_index: int | None = None, total_batches: int | None = None, completed_batches: int = 0) None¶
- class thoth.ingestion.job_manager.JobManager(project_id: str | None = None)[source]¶
Bases:
objectManages job state in Firestore.
This class provides CRUD operations for ingestion jobs stored in Google Cloud Firestore. Jobs are stored in the ‘thoth_jobs’ collection.
Example
>>> manager = JobManager() >>> job = manager.create_job("handbook", "handbook_documents") >>> print(job.job_id) >>> manager.mark_running(job) >>> # ... do work ... >>> manager.mark_completed(job, stats)
- COLLECTION_NAME = 'thoth_jobs'¶
- __init__(project_id: str | None = None) None[source]¶
Initialize job manager with Firestore.
- Parameters:
project_id – GCP project ID (uses default/env if not specified)
- create_job(source: str, collection_name: str, total_batches: int | None = None) Job[source]¶
Create a new job and persist to Firestore.
- Parameters:
source – Source identifier (e.g., ‘handbook’, ‘dnd’)
collection_name – Target LanceDB collection
total_batches – Number of batches (for parent jobs with sub-jobs)
- Returns:
Created Job instance
- create_sub_job(parent_job: Job, batch_index: int, total_files: int = 0) Job[source]¶
Create a sub-job (batch) for a parent job.
Sub-job IDs are formatted as {parent_job_id}_{batch_index:04d} for easy identification and sorting.
- Parameters:
parent_job – Parent job instance
batch_index – Batch number (0-based)
total_files – Number of files in this batch
- Returns:
Created sub-job instance
- get_job(job_id: str) Job | None[source]¶
Retrieve a job by ID.
- Parameters:
job_id – Job identifier
- Returns:
Job instance or None if not found
- update_job(job: Job) None[source]¶
Update job state in Firestore.
- Parameters:
job – Job instance with updated state
- mark_completed(job: Job, stats: JobStats | None = None) None[source]¶
Mark job as completed with statistics.
- Parameters:
job – Job to update
stats – Optional final statistics
- mark_failed(job: Job, error: str) None[source]¶
Mark job as failed with error message.
- Parameters:
job – Job to update
error – Error message describing the failure
- update_stats(job: Job, stats: JobStats) None[source]¶
Update job statistics (for progress tracking).
- Parameters:
job – Job to update
stats – Updated statistics
- list_jobs(source: str | None = None, status: JobStatus | None = None, limit: int = 50) list[Job][source]¶
List jobs with optional filtering.
- Parameters:
source – Filter by source
status – Filter by status
limit – Maximum number of jobs to return
- Returns:
List of Job instances, ordered by start time descending
- delete_job(job_id: str) bool[source]¶
Delete a job from Firestore.
- Parameters:
job_id – Job identifier to delete
- Returns:
True if job was deleted, False if not found
- cleanup_old_jobs(days: int = 30) int[source]¶
Delete jobs older than specified days.
- Parameters:
days – Age threshold in days
- Returns:
Number of jobs deleted
- get_sub_jobs(parent_job_id: str) list[Job][source]¶
Get all sub-jobs for a parent job.
- Parameters:
parent_job_id – Parent job identifier
- Returns:
List of sub-jobs, ordered by batch_index
- get_job_with_sub_jobs(job_id: str) dict[str, Any] | None[source]¶
Get a job with its sub-jobs and aggregated statistics.
- Parameters:
job_id – Job identifier
- Returns:
Dictionary with job details and sub-jobs, or None if not found