thoth.ingestion.job_manager

Job manager for tracking ingestion jobs using Firestore.

This module provides job tracking and status management for ingestion operations using Google Cloud Firestore as the backend.

Functions

dataclass([cls, init, repr, eq, order, ...])

Add dunder methods based on the fields defined in the class.

field(*[, default, default_factory, init, ...])

Return an object to identify dataclass fields.

get_job_logs_url(job_id[, project_id, ...])

Generate Cloud Logging URL for a specific job.

setup_logger(name[, level, simple, json_output])

Create and configure a logger with structured JSON output.

Classes

Any(*args, **kwargs)

Special type indicating an unconstrained type.

Enum(new_class_name, /, names, *[, module, ...])

Create a collection of name/value pairs.

Job(job_id, status, source, collection_name, ...)

Represents an ingestion job.

JobManager([project_id])

Manages job state in Firestore.

JobStats([total_files, processed_files, ...])

Statistics for a job.

JobStatus(*values)

Job status enumeration.

datetime(year, month, day[, hour[, minute[, ...)

The year, month and day arguments are required.

thoth.ingestion.job_manager.get_job_logs_url(job_id: str, project_id: str | None = None, service_name: str = 'thoth-ingestion-worker') str[source]

Generate Cloud Logging URL for a specific job.

Parameters:
  • job_id – Job identifier to filter logs

  • project_id – GCP project ID (defaults to environment variable)

  • service_name – Cloud Run service name

Returns:

URL to view logs in GCP Console

class thoth.ingestion.job_manager.JobStatus(*values)[source]

Bases: Enum

Job status enumeration.

PENDING = 'pending'
RUNNING = 'running'
COMPLETED = 'completed'
FAILED = 'failed'
class thoth.ingestion.job_manager.JobStats(total_files: int = 0, processed_files: int = 0, failed_files: int = 0, total_chunks: int = 0, total_documents: int = 0)[source]

Bases: object

Statistics for a job.

total_files

Total number of files to process

Type:

int

processed_files

Number of files successfully processed

Type:

int

failed_files

Number of files that failed processing

Type:

int

total_chunks

Total number of chunks created

Type:

int

total_documents

Total number of documents stored

Type:

int

total_files: int = 0
processed_files: int = 0
failed_files: int = 0
total_chunks: int = 0
total_documents: int = 0
to_dict() dict[str, int][source]

Convert stats to dictionary.

classmethod from_dict(data: dict[str, Any]) JobStats[source]

Create JobStats from dictionary.

__init__(total_files: int = 0, processed_files: int = 0, failed_files: int = 0, total_chunks: int = 0, total_documents: int = 0) None
class thoth.ingestion.job_manager.Job(job_id: str, status: JobStatus, source: str, collection_name: str, started_at: datetime, completed_at: datetime | None = None, stats: JobStats = <factory>, error: str | None = None, parent_job_id: str | None = None, batch_index: int | None = None, total_batches: int | None = None, completed_batches: int = 0)[source]

Bases: object

Represents an ingestion job.

job_id

Unique job identifier (UUID or parent_id_NNNN for sub-jobs)

Type:

str

status

Current job status

Type:

thoth.ingestion.job_manager.JobStatus

source

Source identifier (e.g., ‘handbook’, ‘dnd’)

Type:

str

collection_name

Target LanceDB collection

Type:

str

started_at

Job start timestamp

Type:

datetime.datetime

completed_at

Job completion timestamp (if finished)

Type:

datetime.datetime | None

stats

Job statistics

Type:

thoth.ingestion.job_manager.JobStats

error

Error message (if failed)

Type:

str | None

parent_job_id

Parent job ID (for sub-jobs/batches)

Type:

str | None

batch_index

Batch number within parent job (for sub-jobs)

Type:

int | None

total_batches

Total number of batches (for parent jobs)

Type:

int | None

completed_batches

Number of completed batches (for parent jobs)

Type:

int

job_id: str
status: JobStatus
source: str
collection_name: str
started_at: datetime
completed_at: datetime | None = None
stats: JobStats
error: str | None = None
parent_job_id: str | None = None
batch_index: int | None = None
total_batches: int | None = None
completed_batches: int = 0
to_dict() dict[str, Any][source]

Convert job to Firestore-compatible dictionary.

classmethod from_dict(data: dict[str, Any]) Job[source]

Create Job from Firestore document dictionary.

property is_finished: bool

Check if job has finished (completed or failed).

property is_sub_job: bool

Check if this is a sub-job (batch) of a parent job.

property duration_seconds: float | None

Get job duration in seconds if finished.

__init__(job_id: str, status: JobStatus, source: str, collection_name: str, started_at: datetime, completed_at: datetime | None = None, stats: JobStats = <factory>, error: str | None = None, parent_job_id: str | None = None, batch_index: int | None = None, total_batches: int | None = None, completed_batches: int = 0) None
class thoth.ingestion.job_manager.JobManager(project_id: str | None = None)[source]

Bases: object

Manages job state in Firestore.

This class provides CRUD operations for ingestion jobs stored in Google Cloud Firestore. Jobs are stored in the ‘thoth_jobs’ collection.

Example

>>> manager = JobManager()
>>> job = manager.create_job("handbook", "handbook_documents")
>>> print(job.job_id)
>>> manager.mark_running(job)
>>> # ... do work ...
>>> manager.mark_completed(job, stats)
COLLECTION_NAME = 'thoth_jobs'
__init__(project_id: str | None = None) None[source]

Initialize job manager with Firestore.

Parameters:

project_id – GCP project ID (uses default/env if not specified)

property collection: Any

Get the Firestore collection reference.

create_job(source: str, collection_name: str, total_batches: int | None = None) Job[source]

Create a new job and persist to Firestore.

Parameters:
  • source – Source identifier (e.g., ‘handbook’, ‘dnd’)

  • collection_name – Target LanceDB collection

  • total_batches – Number of batches (for parent jobs with sub-jobs)

Returns:

Created Job instance

create_sub_job(parent_job: Job, batch_index: int, total_files: int = 0) Job[source]

Create a sub-job (batch) for a parent job.

Sub-job IDs are formatted as {parent_job_id}_{batch_index:04d} for easy identification and sorting.

Parameters:
  • parent_job – Parent job instance

  • batch_index – Batch number (0-based)

  • total_files – Number of files in this batch

Returns:

Created sub-job instance

get_job(job_id: str) Job | None[source]

Retrieve a job by ID.

Parameters:

job_id – Job identifier

Returns:

Job instance or None if not found

update_job(job: Job) None[source]

Update job state in Firestore.

Parameters:

job – Job instance with updated state

mark_running(job: Job) None[source]

Mark job as running.

Parameters:

job – Job to update

mark_completed(job: Job, stats: JobStats | None = None) None[source]

Mark job as completed with statistics.

Parameters:
  • job – Job to update

  • stats – Optional final statistics

mark_failed(job: Job, error: str) None[source]

Mark job as failed with error message.

Parameters:
  • job – Job to update

  • error – Error message describing the failure

update_stats(job: Job, stats: JobStats) None[source]

Update job statistics (for progress tracking).

Parameters:
  • job – Job to update

  • stats – Updated statistics

list_jobs(source: str | None = None, status: JobStatus | None = None, limit: int = 50) list[Job][source]

List jobs with optional filtering.

Parameters:
  • source – Filter by source

  • status – Filter by status

  • limit – Maximum number of jobs to return

Returns:

List of Job instances, ordered by start time descending

delete_job(job_id: str) bool[source]

Delete a job from Firestore.

Parameters:

job_id – Job identifier to delete

Returns:

True if job was deleted, False if not found

cleanup_old_jobs(days: int = 30) int[source]

Delete jobs older than specified days.

Parameters:

days – Age threshold in days

Returns:

Number of jobs deleted

get_sub_jobs(parent_job_id: str) list[Job][source]

Get all sub-jobs for a parent job.

Parameters:

parent_job_id – Parent job identifier

Returns:

List of sub-jobs, ordered by batch_index

get_job_with_sub_jobs(job_id: str) dict[str, Any] | None[source]

Get a job with its sub-jobs and aggregated statistics.

Parameters:

job_id – Job identifier

Returns:

Dictionary with job details and sub-jobs, or None if not found

mark_sub_job_completed(sub_job: Job, stats: JobStats | None = None) Job | None[source]

Mark a sub-job as completed and update parent job progress.

Parameters:
  • sub_job – Sub-job to mark as completed

  • stats – Optional final statistics for the sub-job

Returns:

Updated parent job, or None if no parent

mark_sub_job_failed(sub_job: Job, error: str) Job | None[source]

Mark a sub-job as failed and update parent job.

Parameters:
  • sub_job – Sub-job to mark as failed

  • error – Error message

Returns:

Updated parent job, or None if no parent