Job manager
thoth.ingestion.job_manager
¶
Job manager for tracking ingestion jobs using Firestore.
This module provides job tracking and status management for ingestion operations using Google Cloud Firestore as the backend.
logger = setup_logger(__name__)
module-attribute
¶
JobStatus
¶
JobStats
dataclass
¶
Statistics for a job.
Attributes:
| Name | Type | Description |
|---|---|---|
total_files |
int
|
Total number of files to process |
processed_files |
int
|
Number of files successfully processed |
failed_files |
int
|
Number of files that failed processing |
total_chunks |
int
|
Total number of chunks created |
total_documents |
int
|
Total number of documents stored |
total_files: int = 0
class-attribute
instance-attribute
¶
processed_files: int = 0
class-attribute
instance-attribute
¶
failed_files: int = 0
class-attribute
instance-attribute
¶
total_chunks: int = 0
class-attribute
instance-attribute
¶
total_documents: int = 0
class-attribute
instance-attribute
¶
__init__(total_files: int = 0, processed_files: int = 0, failed_files: int = 0, total_chunks: int = 0, total_documents: int = 0) -> None
¶
to_dict() -> dict[str, int]
¶
Convert stats to dictionary.
from_dict(data: dict[str, Any]) -> JobStats
classmethod
¶
Create JobStats from dictionary.
Job
dataclass
¶
Represents an ingestion job.
Attributes:
| Name | Type | Description |
|---|---|---|
job_id |
str
|
Unique job identifier (UUID or parent_id_NNNN for sub-jobs) |
status |
JobStatus
|
Current job status |
source |
str
|
Source identifier (e.g., 'handbook', 'dnd') |
collection_name |
str
|
Target LanceDB collection |
started_at |
datetime
|
Job start timestamp |
completed_at |
datetime | None
|
Job completion timestamp (if finished) |
stats |
JobStats
|
Job statistics |
error |
str | None
|
Error message (if failed) |
parent_job_id |
str | None
|
Parent job ID (for sub-jobs/batches) |
batch_index |
int | None
|
Batch number within parent job (for sub-jobs) |
total_batches |
int | None
|
Total number of batches (for parent jobs) |
completed_batches |
int
|
Number of completed batches (for parent jobs) |
job_id: str
instance-attribute
¶
status: JobStatus
instance-attribute
¶
source: str
instance-attribute
¶
collection_name: str
instance-attribute
¶
started_at: datetime
instance-attribute
¶
completed_at: datetime | None = None
class-attribute
instance-attribute
¶
stats: JobStats = field(default_factory=JobStats)
class-attribute
instance-attribute
¶
error: str | None = None
class-attribute
instance-attribute
¶
parent_job_id: str | None = None
class-attribute
instance-attribute
¶
batch_index: int | None = None
class-attribute
instance-attribute
¶
total_batches: int | None = None
class-attribute
instance-attribute
¶
completed_batches: int = 0
class-attribute
instance-attribute
¶
is_finished: bool
property
¶
Check if job has finished (completed or failed).
is_sub_job: bool
property
¶
Check if this is a sub-job (batch) of a parent job.
duration_seconds: float | None
property
¶
Get job duration in seconds if finished.
__init__(job_id: str, status: JobStatus, source: str, collection_name: str, started_at: datetime, completed_at: datetime | None = None, stats: JobStats = JobStats(), error: str | None = None, parent_job_id: str | None = None, batch_index: int | None = None, total_batches: int | None = None, completed_batches: int = 0) -> None
¶
to_dict() -> dict[str, Any]
¶
Convert job to Firestore-compatible dictionary.
from_dict(data: dict[str, Any]) -> Job
classmethod
¶
Create Job from Firestore document dictionary.
JobManager
¶
Manages job state in Firestore.
This class provides CRUD operations for ingestion jobs stored in Google Cloud Firestore. Jobs are stored in the 'thoth_jobs' collection.
Example
manager = JobManager() job = manager.create_job("handbook", "handbook_documents") print(job.job_id) manager.mark_running(job)
... do work ...¶
manager.mark_completed(job, stats)
COLLECTION_NAME = 'thoth_jobs'
class-attribute
instance-attribute
¶
collection: Any
property
¶
Get the Firestore collection reference.
__init__(project_id: str | None = None) -> None
¶
Initialize job manager with Firestore.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
project_id
|
str | None
|
GCP project ID (uses default/env if not specified) |
None
|
create_job(source: str, collection_name: str, total_batches: int | None = None) -> Job
¶
Create a new job and persist to Firestore.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
source
|
str
|
Source identifier (e.g., 'handbook', 'dnd') |
required |
collection_name
|
str
|
Target LanceDB collection |
required |
total_batches
|
int | None
|
Number of batches (for parent jobs with sub-jobs) |
None
|
Returns:
| Type | Description |
|---|---|
Job
|
Created Job instance |
create_sub_job(parent_job: Job, batch_index: int, total_files: int = 0) -> Job
¶
Create a sub-job (batch) for a parent job.
Sub-job IDs are formatted as {parent_job_id}_{batch_index:04d} for easy identification and sorting.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
parent_job
|
Job
|
Parent job instance |
required |
batch_index
|
int
|
Batch number (0-based) |
required |
total_files
|
int
|
Number of files in this batch |
0
|
Returns:
| Type | Description |
|---|---|
Job
|
Created sub-job instance |
get_job(job_id: str) -> Job | None
¶
Retrieve a job by ID.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
job_id
|
str
|
Job identifier |
required |
Returns:
| Type | Description |
|---|---|
Job | None
|
Job instance or None if not found |
update_job(job: Job) -> None
¶
Update job state in Firestore.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
job
|
Job
|
Job instance with updated state |
required |
mark_running(job: Job) -> None
¶
mark_completed(job: Job, stats: JobStats | None = None) -> None
¶
mark_failed(job: Job, error: str) -> None
¶
Mark job as failed with error message.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
job
|
Job
|
Job to update |
required |
error
|
str
|
Error message describing the failure |
required |
update_stats(job: Job, stats: JobStats) -> None
¶
list_jobs(source: str | None = None, status: JobStatus | None = None, limit: int = 50) -> list[Job]
¶
List jobs with optional filtering.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
source
|
str | None
|
Filter by source |
None
|
status
|
JobStatus | None
|
Filter by status |
None
|
limit
|
int
|
Maximum number of jobs to return |
50
|
Returns:
| Type | Description |
|---|---|
list[Job]
|
List of Job instances, ordered by start time descending |
delete_job(job_id: str) -> bool
¶
Delete a job from Firestore.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
job_id
|
str
|
Job identifier to delete |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if job was deleted, False if not found |
cleanup_old_jobs(days: int = 30) -> int
¶
Delete jobs older than specified days.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
days
|
int
|
Age threshold in days |
30
|
Returns:
| Type | Description |
|---|---|
int
|
Number of jobs deleted |
get_sub_jobs(parent_job_id: str) -> list[Job]
¶
Get all sub-jobs for a parent job.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
parent_job_id
|
str
|
Parent job identifier |
required |
Returns:
| Type | Description |
|---|---|
list[Job]
|
List of sub-jobs, ordered by batch_index |
get_job_with_sub_jobs(job_id: str) -> dict[str, Any] | None
¶
Get a job with its sub-jobs and aggregated statistics.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
job_id
|
str
|
Job identifier |
required |
Returns:
| Type | Description |
|---|---|
dict[str, Any] | None
|
Dictionary with job details and sub-jobs, or None if not found |
mark_sub_job_completed(sub_job: Job, stats: JobStats | None = None) -> Job | None
¶
mark_sub_job_failed(sub_job: Job, error: str) -> Job | None
¶
get_job_logs_url(job_id: str, project_id: str | None = None, service_name: str = 'thoth-ingestion-worker') -> str
¶
Generate Cloud Logging URL for a specific job.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
job_id
|
str
|
Job identifier to filter logs |
required |
project_id
|
str | None
|
GCP project ID (defaults to environment variable) |
None
|
service_name
|
str
|
Cloud Run service name |
'thoth-ingestion-worker'
|
Returns:
| Type | Description |
|---|---|
str
|
URL to view logs in GCP Console |