Source code for thoth.ingestion.worker

"""Ingestion worker HTTP server for Cloud Run.

This module provides the HTTP application with routing to workflow endpoints.
All business logic has been extracted to the flows/ package for modularity:
- health.py: Health check endpoint
- clone.py: Clone handbook to GCS
- ingest.py: Main ingestion workflow (file listing, batching, Cloud Tasks)
- batch.py: Batch processing with idempotency checks
- merge.py: Merge isolated batch LanceDB tables into main store
- job_status.py: Job status and listing endpoints

The worker maintains singleton instances for shared services:
- SourceRegistry: Multi-source configuration (handbook, dnd, personal)
- JobManager: Firestore job tracking with sub-job aggregation
- TaskQueueClient: Cloud Tasks batch distribution
"""

import logging
import os

from starlette.applications import Starlette
from starlette.routing import Route
import uvicorn

from thoth.ingestion import flows
from thoth.shared.utils.logger import configure_root_logger, setup_logger

# Configure root logger for the application
configure_root_logger(level=logging.INFO)
logger = setup_logger(__name__)

# Batch prefix pattern for parallel processing (GCS path under bucket)
BATCH_PREFIX_PATTERN = "lancedb_batch_"


# =============================================================================
# Application Setup
# =============================================================================


[docs] def create_app() -> Starlette: """Create the Starlette application with all routes.""" routes = [ # Health Route("/health", endpoint=flows.health_check), Route("/", endpoint=flows.health_check), # Handbook clone Route("/clone-handbook", endpoint=flows.clone_handbook, methods=["POST"]), # Ingestion Route("/ingest", endpoint=flows.ingest, methods=["POST"]), Route("/ingest-batch", endpoint=flows.process_batch, methods=["POST"]), Route("/merge-batches", endpoint=flows.merge_batches, methods=["POST"]), # Job status Route("/jobs/{job_id}", endpoint=flows.get_job_status), Route("/jobs", endpoint=flows.list_jobs), ] return Starlette(debug=False, routes=routes)
[docs] def main() -> None: """Run the uvicorn server.""" port = int(os.getenv("PORT", "8080")) app = create_app() logger.info("Starting ingestion worker on port %d", port) uvicorn.run(app, host="0.0.0.0", port=port, log_level="info") # nosec B104
if __name__ == "__main__": main()