Skip to content

Job manager

thoth.ingestion.job_manager

Job manager for tracking ingestion jobs using Firestore.

This module provides job tracking and status management for ingestion operations using Google Cloud Firestore as the backend.

logger = setup_logger(__name__) module-attribute

JobStatus

Job status enumeration.

PENDING = 'pending' class-attribute instance-attribute

RUNNING = 'running' class-attribute instance-attribute

COMPLETED = 'completed' class-attribute instance-attribute

FAILED = 'failed' class-attribute instance-attribute

JobStats dataclass

Statistics for a job.

Attributes:

Name Type Description
total_files int

Total number of files to process

processed_files int

Number of files successfully processed

failed_files int

Number of files that failed processing

total_chunks int

Total number of chunks created

total_documents int

Total number of documents stored

total_files: int = 0 class-attribute instance-attribute

processed_files: int = 0 class-attribute instance-attribute

failed_files: int = 0 class-attribute instance-attribute

total_chunks: int = 0 class-attribute instance-attribute

total_documents: int = 0 class-attribute instance-attribute

__init__(total_files: int = 0, processed_files: int = 0, failed_files: int = 0, total_chunks: int = 0, total_documents: int = 0) -> None

to_dict() -> dict[str, int]

Convert stats to dictionary.

from_dict(data: dict[str, Any]) -> JobStats classmethod

Create JobStats from dictionary.

Job dataclass

Represents an ingestion job.

Attributes:

Name Type Description
job_id str

Unique job identifier (UUID or parent_id_NNNN for sub-jobs)

status JobStatus

Current job status

source str

Source identifier (e.g., 'handbook', 'dnd')

collection_name str

Target LanceDB collection

started_at datetime

Job start timestamp

completed_at datetime | None

Job completion timestamp (if finished)

stats JobStats

Job statistics

error str | None

Error message (if failed)

parent_job_id str | None

Parent job ID (for sub-jobs/batches)

batch_index int | None

Batch number within parent job (for sub-jobs)

total_batches int | None

Total number of batches (for parent jobs)

completed_batches int

Number of completed batches (for parent jobs)

job_id: str instance-attribute

status: JobStatus instance-attribute

source: str instance-attribute

collection_name: str instance-attribute

started_at: datetime instance-attribute

completed_at: datetime | None = None class-attribute instance-attribute

stats: JobStats = field(default_factory=JobStats) class-attribute instance-attribute

error: str | None = None class-attribute instance-attribute

parent_job_id: str | None = None class-attribute instance-attribute

batch_index: int | None = None class-attribute instance-attribute

total_batches: int | None = None class-attribute instance-attribute

completed_batches: int = 0 class-attribute instance-attribute

is_finished: bool property

Check if job has finished (completed or failed).

is_sub_job: bool property

Check if this is a sub-job (batch) of a parent job.

duration_seconds: float | None property

Get job duration in seconds if finished.

__init__(job_id: str, status: JobStatus, source: str, collection_name: str, started_at: datetime, completed_at: datetime | None = None, stats: JobStats = JobStats(), error: str | None = None, parent_job_id: str | None = None, batch_index: int | None = None, total_batches: int | None = None, completed_batches: int = 0) -> None

to_dict() -> dict[str, Any]

Convert job to Firestore-compatible dictionary.

from_dict(data: dict[str, Any]) -> Job classmethod

Create Job from Firestore document dictionary.

JobManager

Manages job state in Firestore.

This class provides CRUD operations for ingestion jobs stored in Google Cloud Firestore. Jobs are stored in the 'thoth_jobs' collection.

Example

manager = JobManager() job = manager.create_job("handbook", "handbook_documents") print(job.job_id) manager.mark_running(job)

... do work ...

manager.mark_completed(job, stats)

COLLECTION_NAME = 'thoth_jobs' class-attribute instance-attribute

collection: Any property

Get the Firestore collection reference.

__init__(project_id: str | None = None) -> None

Initialize job manager with Firestore.

Parameters:

Name Type Description Default
project_id str | None

GCP project ID (uses default/env if not specified)

None

create_job(source: str, collection_name: str, total_batches: int | None = None) -> Job

Create a new job and persist to Firestore.

Parameters:

Name Type Description Default
source str

Source identifier (e.g., 'handbook', 'dnd')

required
collection_name str

Target LanceDB collection

required
total_batches int | None

Number of batches (for parent jobs with sub-jobs)

None

Returns:

Type Description
Job

Created Job instance

create_sub_job(parent_job: Job, batch_index: int, total_files: int = 0) -> Job

Create a sub-job (batch) for a parent job.

Sub-job IDs are formatted as {parent_job_id}_{batch_index:04d} for easy identification and sorting.

Parameters:

Name Type Description Default
parent_job Job

Parent job instance

required
batch_index int

Batch number (0-based)

required
total_files int

Number of files in this batch

0

Returns:

Type Description
Job

Created sub-job instance

get_job(job_id: str) -> Job | None

Retrieve a job by ID.

Parameters:

Name Type Description Default
job_id str

Job identifier

required

Returns:

Type Description
Job | None

Job instance or None if not found

update_job(job: Job) -> None

Update job state in Firestore.

Parameters:

Name Type Description Default
job Job

Job instance with updated state

required

mark_running(job: Job) -> None

Mark job as running.

Parameters:

Name Type Description Default
job Job

Job to update

required

mark_completed(job: Job, stats: JobStats | None = None) -> None

Mark job as completed with statistics.

Parameters:

Name Type Description Default
job Job

Job to update

required
stats JobStats | None

Optional final statistics

None

mark_failed(job: Job, error: str) -> None

Mark job as failed with error message.

Parameters:

Name Type Description Default
job Job

Job to update

required
error str

Error message describing the failure

required

update_stats(job: Job, stats: JobStats) -> None

Update job statistics (for progress tracking).

Parameters:

Name Type Description Default
job Job

Job to update

required
stats JobStats

Updated statistics

required

list_jobs(source: str | None = None, status: JobStatus | None = None, limit: int = 50) -> list[Job]

List jobs with optional filtering.

Parameters:

Name Type Description Default
source str | None

Filter by source

None
status JobStatus | None

Filter by status

None
limit int

Maximum number of jobs to return

50

Returns:

Type Description
list[Job]

List of Job instances, ordered by start time descending

delete_job(job_id: str) -> bool

Delete a job from Firestore.

Parameters:

Name Type Description Default
job_id str

Job identifier to delete

required

Returns:

Type Description
bool

True if job was deleted, False if not found

cleanup_old_jobs(days: int = 30) -> int

Delete jobs older than specified days.

Parameters:

Name Type Description Default
days int

Age threshold in days

30

Returns:

Type Description
int

Number of jobs deleted

get_sub_jobs(parent_job_id: str) -> list[Job]

Get all sub-jobs for a parent job.

Parameters:

Name Type Description Default
parent_job_id str

Parent job identifier

required

Returns:

Type Description
list[Job]

List of sub-jobs, ordered by batch_index

get_job_with_sub_jobs(job_id: str) -> dict[str, Any] | None

Get a job with its sub-jobs and aggregated statistics.

Parameters:

Name Type Description Default
job_id str

Job identifier

required

Returns:

Type Description
dict[str, Any] | None

Dictionary with job details and sub-jobs, or None if not found

mark_sub_job_completed(sub_job: Job, stats: JobStats | None = None) -> Job | None

Mark a sub-job as completed and update parent job progress.

Parameters:

Name Type Description Default
sub_job Job

Sub-job to mark as completed

required
stats JobStats | None

Optional final statistics for the sub-job

None

Returns:

Type Description
Job | None

Updated parent job, or None if no parent

mark_sub_job_failed(sub_job: Job, error: str) -> Job | None

Mark a sub-job as failed and update parent job.

Parameters:

Name Type Description Default
sub_job Job

Sub-job to mark as failed

required
error str

Error message

required

Returns:

Type Description
Job | None

Updated parent job, or None if no parent

get_job_logs_url(job_id: str, project_id: str | None = None, service_name: str = 'thoth-ingestion-worker') -> str

Generate Cloud Logging URL for a specific job.

Parameters:

Name Type Description Default
job_id str

Job identifier to filter logs

required
project_id str | None

GCP project ID (defaults to environment variable)

None
service_name str

Cloud Run service name

'thoth-ingestion-worker'

Returns:

Type Description
str

URL to view logs in GCP Console