"""Job manager for tracking ingestion jobs using Firestore.
This module provides job tracking and status management for ingestion
operations using Google Cloud Firestore as the backend.
"""
from dataclasses import dataclass, field
from datetime import UTC, datetime
from enum import Enum
import os
from typing import Any
import urllib.parse
import uuid
from thoth.shared.utils.logger import setup_logger
logger = setup_logger(__name__)
[docs]
def get_job_logs_url(
job_id: str,
project_id: str | None = None,
service_name: str = "thoth-ingestion-worker",
) -> str:
"""Generate Cloud Logging URL for a specific job.
Args:
job_id: Job identifier to filter logs
project_id: GCP project ID (defaults to environment variable)
service_name: Cloud Run service name
Returns:
URL to view logs in GCP Console
"""
project = project_id or os.getenv("GCP_PROJECT_ID", "thoth-dev-485501")
# Build log filter query
filter_parts = [
'resource.type="cloud_run_revision"',
f'resource.labels.service_name="{service_name}"',
f'jsonPayload.job_id="{job_id}"',
]
filter_query = "\n".join(filter_parts)
# URL encode the filter
encoded_filter = urllib.parse.quote(filter_query)
return f"https://console.cloud.google.com/logs/query;query={encoded_filter}?project={project}"
[docs]
class JobStatus(Enum):
"""Job status enumeration."""
PENDING = "pending"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
[docs]
@dataclass
class JobStats:
"""Statistics for a job.
Attributes:
total_files: Total number of files to process
processed_files: Number of files successfully processed
failed_files: Number of files that failed processing
total_chunks: Total number of chunks created
total_documents: Total number of documents stored
"""
total_files: int = 0
processed_files: int = 0
failed_files: int = 0
total_chunks: int = 0
total_documents: int = 0
[docs]
def to_dict(self) -> dict[str, int]:
"""Convert stats to dictionary."""
return {
"total_files": self.total_files,
"processed_files": self.processed_files,
"failed_files": self.failed_files,
"total_chunks": self.total_chunks,
"total_documents": self.total_documents,
}
[docs]
@classmethod
def from_dict(cls, data: dict[str, Any]) -> "JobStats":
"""Create JobStats from dictionary."""
return cls(
total_files=data.get("total_files", 0),
processed_files=data.get("processed_files", 0),
failed_files=data.get("failed_files", 0),
total_chunks=data.get("total_chunks", 0),
total_documents=data.get("total_documents", 0),
)
[docs]
@dataclass
class Job:
"""Represents an ingestion job.
Attributes:
job_id: Unique job identifier (UUID or parent_id_NNNN for sub-jobs)
status: Current job status
source: Source identifier (e.g., 'handbook', 'dnd')
collection_name: Target LanceDB collection
started_at: Job start timestamp
completed_at: Job completion timestamp (if finished)
stats: Job statistics
error: Error message (if failed)
parent_job_id: Parent job ID (for sub-jobs/batches)
batch_index: Batch number within parent job (for sub-jobs)
total_batches: Total number of batches (for parent jobs)
completed_batches: Number of completed batches (for parent jobs)
"""
job_id: str
status: JobStatus
source: str
collection_name: str
started_at: datetime
completed_at: datetime | None = None
stats: JobStats = field(default_factory=JobStats)
error: str | None = None
parent_job_id: str | None = None
batch_index: int | None = None
total_batches: int | None = None
completed_batches: int = 0
[docs]
def to_dict(self) -> dict[str, Any]:
"""Convert job to Firestore-compatible dictionary."""
data: dict[str, Any] = {
"job_id": self.job_id,
"status": self.status.value,
"source": self.source,
"collection_name": self.collection_name,
"started_at": self.started_at.isoformat(),
"completed_at": (self.completed_at.isoformat() if self.completed_at else None),
"stats": self.stats.to_dict(),
"error": self.error,
"logs_url": get_job_logs_url(self.job_id),
}
# Only include batch fields if relevant
if self.parent_job_id is not None:
data["parent_job_id"] = self.parent_job_id
if self.batch_index is not None:
data["batch_index"] = self.batch_index
if self.total_batches is not None:
data["total_batches"] = self.total_batches
if self.completed_batches > 0 or self.total_batches is not None:
data["completed_batches"] = self.completed_batches
return data
[docs]
@classmethod
def from_dict(cls, data: dict[str, Any]) -> "Job":
"""Create Job from Firestore document dictionary."""
return cls(
job_id=data["job_id"],
status=JobStatus(data["status"]),
source=data["source"],
collection_name=data["collection_name"],
started_at=datetime.fromisoformat(data["started_at"]),
completed_at=(datetime.fromisoformat(data["completed_at"]) if data.get("completed_at") else None),
stats=JobStats.from_dict(data.get("stats", {})),
error=data.get("error"),
parent_job_id=data.get("parent_job_id"),
batch_index=data.get("batch_index"),
total_batches=data.get("total_batches"),
completed_batches=data.get("completed_batches", 0),
)
@property
def is_finished(self) -> bool:
"""Check if job has finished (completed or failed)."""
return self.status in (JobStatus.COMPLETED, JobStatus.FAILED)
@property
def is_sub_job(self) -> bool:
"""Check if this is a sub-job (batch) of a parent job."""
return self.parent_job_id is not None
@property
def duration_seconds(self) -> float | None:
"""Get job duration in seconds if finished."""
if self.completed_at:
return (self.completed_at - self.started_at).total_seconds()
return None
[docs]
class JobManager:
"""Manages job state in Firestore.
This class provides CRUD operations for ingestion jobs stored in
Google Cloud Firestore. Jobs are stored in the 'thoth_jobs' collection.
Example:
>>> manager = JobManager()
>>> job = manager.create_job("handbook", "handbook_documents")
>>> print(job.job_id)
>>> manager.mark_running(job)
>>> # ... do work ...
>>> manager.mark_completed(job, stats)
"""
COLLECTION_NAME = "thoth_jobs"
[docs]
def __init__(self, project_id: str | None = None) -> None:
"""Initialize job manager with Firestore.
Args:
project_id: GCP project ID (uses default/env if not specified)
"""
self._project_id = project_id or os.getenv("GCP_PROJECT_ID")
self._db = None
self._collection = None
def _ensure_initialized(self) -> None:
"""Lazily initialize Firestore client."""
if self._db is None:
try:
from google.cloud import firestore # type: ignore[attr-defined] # noqa: PLC0415
self._db = firestore.Client(project=self._project_id)
self._collection = self._db.collection( # type: ignore[attr-defined]
self.COLLECTION_NAME
)
logger.info(
"Initialized JobManager with Firestore (project: %s)",
self._project_id,
)
except ImportError as e:
msg = "google-cloud-firestore is required. Install with: pip install google-cloud-firestore"
raise ImportError(msg) from e
@property
def collection(self) -> Any:
"""Get the Firestore collection reference."""
self._ensure_initialized()
return self._collection
[docs]
def create_job(
self,
source: str,
collection_name: str,
total_batches: int | None = None,
) -> Job:
"""Create a new job and persist to Firestore.
Args:
source: Source identifier (e.g., 'handbook', 'dnd')
collection_name: Target LanceDB collection
total_batches: Number of batches (for parent jobs with sub-jobs)
Returns:
Created Job instance
"""
job = Job(
job_id=str(uuid.uuid4()),
status=JobStatus.PENDING,
source=source,
collection_name=collection_name,
started_at=datetime.now(UTC),
total_batches=total_batches,
)
self.collection.document(job.job_id).set(job.to_dict())
logger.info("Created job %s for source '%s'", job.job_id, source)
return job
[docs]
def create_sub_job(
self,
parent_job: Job,
batch_index: int,
total_files: int = 0,
) -> Job:
"""Create a sub-job (batch) for a parent job.
Sub-job IDs are formatted as {parent_job_id}_{batch_index:04d}
for easy identification and sorting.
Args:
parent_job: Parent job instance
batch_index: Batch number (0-based)
total_files: Number of files in this batch
Returns:
Created sub-job instance
"""
sub_job_id = f"{parent_job.job_id}_{batch_index:04d}"
sub_job = Job(
job_id=sub_job_id,
status=JobStatus.PENDING,
source=parent_job.source,
collection_name=parent_job.collection_name,
started_at=datetime.now(UTC),
parent_job_id=parent_job.job_id,
batch_index=batch_index,
stats=JobStats(total_files=total_files),
)
self.collection.document(sub_job.job_id).set(sub_job.to_dict())
logger.info(
"Created sub-job %s (batch %d of %d)",
sub_job.job_id,
batch_index + 1,
parent_job.total_batches or "?",
)
return sub_job
[docs]
def get_job(self, job_id: str) -> Job | None:
"""Retrieve a job by ID.
Args:
job_id: Job identifier
Returns:
Job instance or None if not found
"""
doc = self.collection.document(job_id).get()
if doc.exists:
return Job.from_dict(doc.to_dict())
return None
[docs]
def update_job(self, job: Job) -> None:
"""Update job state in Firestore.
Args:
job: Job instance with updated state
"""
self.collection.document(job.job_id).set(job.to_dict())
logger.debug("Updated job %s: status=%s", job.job_id, job.status.value)
[docs]
def mark_running(self, job: Job) -> None:
"""Mark job as running.
Args:
job: Job to update
"""
job.status = JobStatus.RUNNING
self.update_job(job)
logger.info("Job %s marked as running", job.job_id)
[docs]
def mark_completed(self, job: Job, stats: JobStats | None = None) -> None:
"""Mark job as completed with statistics.
Args:
job: Job to update
stats: Optional final statistics
"""
job.status = JobStatus.COMPLETED
job.completed_at = datetime.now(UTC)
if stats:
job.stats = stats
self.update_job(job)
logger.info(
"Job %s completed: %d files processed",
job.job_id,
job.stats.processed_files,
)
[docs]
def mark_failed(self, job: Job, error: str) -> None:
"""Mark job as failed with error message.
Args:
job: Job to update
error: Error message describing the failure
"""
job.status = JobStatus.FAILED
job.completed_at = datetime.now(UTC)
job.error = error
self.update_job(job)
logger.error("Job %s failed: %s", job.job_id, error)
[docs]
def update_stats(self, job: Job, stats: JobStats) -> None:
"""Update job statistics (for progress tracking).
Args:
job: Job to update
stats: Updated statistics
"""
job.stats = stats
self.update_job(job)
[docs]
def list_jobs(
self,
source: str | None = None,
status: JobStatus | None = None,
limit: int = 50,
) -> list[Job]:
"""List jobs with optional filtering.
Args:
source: Filter by source
status: Filter by status
limit: Maximum number of jobs to return
Returns:
List of Job instances, ordered by start time descending
"""
query = self.collection
if source:
query = query.where("source", "==", source)
if status:
query = query.where("status", "==", status.value)
query = query.order_by("started_at", direction="DESCENDING").limit(limit)
return [Job.from_dict(doc.to_dict()) for doc in query.stream()]
[docs]
def delete_job(self, job_id: str) -> bool:
"""Delete a job from Firestore.
Args:
job_id: Job identifier to delete
Returns:
True if job was deleted, False if not found
"""
doc_ref = self.collection.document(job_id)
doc = doc_ref.get()
if doc.exists:
doc_ref.delete()
logger.info("Deleted job %s", job_id)
return True
return False
[docs]
def cleanup_old_jobs(self, days: int = 30) -> int:
"""Delete jobs older than specified days.
Args:
days: Age threshold in days
Returns:
Number of jobs deleted
"""
cutoff = datetime.now(UTC).replace(hour=0, minute=0, second=0, microsecond=0)
cutoff = cutoff.replace(day=cutoff.day - days)
# Query old jobs
query = self.collection.where("started_at", "<", cutoff.isoformat())
deleted = 0
for doc in query.stream():
doc.reference.delete()
deleted += 1
if deleted:
logger.info("Cleaned up %d old jobs (older than %d days)", deleted, days)
return deleted
[docs]
def get_sub_jobs(self, parent_job_id: str) -> list[Job]:
"""Get all sub-jobs for a parent job.
Args:
parent_job_id: Parent job identifier
Returns:
List of sub-jobs, ordered by batch_index
"""
query = self.collection.where("parent_job_id", "==", parent_job_id)
query = query.order_by("batch_index")
return [Job.from_dict(doc.to_dict()) for doc in query.stream()]
[docs]
def get_job_with_sub_jobs(self, job_id: str) -> dict[str, Any] | None:
"""Get a job with its sub-jobs and aggregated statistics.
Args:
job_id: Job identifier
Returns:
Dictionary with job details and sub-jobs, or None if not found
"""
job = self.get_job(job_id)
if job is None:
return None
result = job.to_dict()
# If this is a parent job with batches, include sub-job info
if job.total_batches is not None and job.total_batches > 0:
sub_jobs = self.get_sub_jobs(job_id)
# Aggregate statistics from sub-jobs
total_processed = 0
total_failed = 0
total_chunks = 0
completed_count = 0
failed_count = 0
running_count = 0
pending_count = 0
sub_job_summaries = []
for sub_job in sub_jobs:
total_processed += sub_job.stats.processed_files
total_failed += sub_job.stats.failed_files
total_chunks += sub_job.stats.total_chunks
if sub_job.status == JobStatus.COMPLETED:
completed_count += 1
elif sub_job.status == JobStatus.FAILED:
failed_count += 1
elif sub_job.status == JobStatus.RUNNING:
running_count += 1
else:
pending_count += 1
sub_job_summaries.append(
{
"job_id": sub_job.job_id,
"batch_index": sub_job.batch_index,
"status": sub_job.status.value,
"stats": sub_job.stats.to_dict(),
"error": sub_job.error,
"logs_url": get_job_logs_url(sub_job.job_id),
}
)
# Update aggregated stats in result
result["stats"]["processed_files"] = total_processed
result["stats"]["failed_files"] = total_failed
result["stats"]["total_chunks"] = total_chunks
result["stats"]["total_documents"] = total_chunks
result["batch_summary"] = {
"total": job.total_batches,
"completed": completed_count,
"failed": failed_count,
"running": running_count,
"pending": pending_count,
}
result["sub_jobs"] = sub_job_summaries
return result
[docs]
def mark_sub_job_completed(
self,
sub_job: Job,
stats: JobStats | None = None,
) -> Job | None:
"""Mark a sub-job as completed and update parent job progress.
Args:
sub_job: Sub-job to mark as completed
stats: Optional final statistics for the sub-job
Returns:
Updated parent job, or None if no parent
"""
# Mark sub-job as completed
self.mark_completed(sub_job, stats)
# Update parent job if exists
if sub_job.parent_job_id:
parent_job = self.get_job(sub_job.parent_job_id)
if parent_job:
parent_job.completed_batches += 1
# Aggregate stats from this sub-job into parent
if stats:
parent_job.stats.processed_files += stats.processed_files
parent_job.stats.failed_files += stats.failed_files
parent_job.stats.total_chunks += stats.total_chunks
parent_job.stats.total_documents += stats.total_documents
# Check if all batches are done
if parent_job.total_batches and parent_job.completed_batches >= parent_job.total_batches:
parent_job.status = JobStatus.COMPLETED
parent_job.completed_at = datetime.now(UTC)
logger.info(
"Parent job %s completed: all %d batches done",
parent_job.job_id,
parent_job.total_batches,
)
self.update_job(parent_job)
return parent_job
return None
[docs]
def mark_sub_job_failed(self, sub_job: Job, error: str) -> Job | None:
"""Mark a sub-job as failed and update parent job.
Args:
sub_job: Sub-job to mark as failed
error: Error message
Returns:
Updated parent job, or None if no parent
"""
# Mark sub-job as failed
self.mark_failed(sub_job, error)
# Update parent job if exists
if sub_job.parent_job_id:
parent_job = self.get_job(sub_job.parent_job_id)
if parent_job:
# Increment completed count (failed still counts as "done")
parent_job.completed_batches += 1
parent_job.stats.failed_files += 1 # Track batch as failed
# Check if all batches are done
if parent_job.total_batches and parent_job.completed_batches >= parent_job.total_batches:
# If any batch failed, mark parent as failed too
parent_job.status = JobStatus.FAILED
parent_job.completed_at = datetime.now(UTC)
parent_job.error = f"One or more batches failed. Last error: {error}"
logger.error(
"Parent job %s failed: batch %s failed",
parent_job.job_id,
sub_job.job_id,
)
self.update_job(parent_job)
return parent_job
return None