thoth.ingestion.flows

Ingestion workflow modules.

This package contains separate modules for each major ingestion workflow, making the codebase easier to understand and maintain.

Functions

clone_handbook(_request)

Clone the GitLab handbook repo to GCS for ingestion (one-time setup).

get_job_status(request)

Get job status by ID.

health_check(_request)

Health check endpoint.

ingest(request)

Start an ingestion job.

list_jobs(request)

List recent jobs with optional filtering.

merge_batches(request)

Merge all batch LanceDB tables from GCS into the main store.

process_batch(request)

Process a specific batch of files (called by Cloud Tasks).

async thoth.ingestion.flows.clone_handbook(_request: Request) JSONResponse[source]

Clone the GitLab handbook repo to GCS for ingestion (one-time setup).

Uses the pipeline’s GCSRepoSync to clone the repo into the configured bucket/prefix. Requires GCS_BUCKET_NAME and pipeline configured for GCS.

Returns:

JSONResponse with status and message; 200 on success, 4xx/5xx on error.

async thoth.ingestion.flows.get_job_status(request: Request) JSONResponse[source]

Get job status by ID.

Returns current status, statistics, and error information if failed.

async thoth.ingestion.flows.health_check(_request: Request) JSONResponse[source]

Health check endpoint.

Returns service health status.

async thoth.ingestion.flows.ingest(request: Request) JSONResponse[source]

Start an ingestion job.

Creates a job record and starts background processing. Returns immediately with job_id for status tracking.

Request body:

source: Source identifier (required) - ‘handbook’, ‘dnd’, or ‘personal’ force: Force full re-ingestion (optional, default: false)

Returns:

202 Accepted with job_id for status polling

async thoth.ingestion.flows.list_jobs(request: Request) JSONResponse[source]

List recent jobs with optional filtering.

Query parameters:

source: Filter by source name status: Filter by status (pending, running, completed, failed) limit: Maximum number of jobs (default: 50)

async thoth.ingestion.flows.merge_batches(request: Request) JSONResponse[source]

Merge all batch LanceDB tables from GCS into the main store.

Expects JSON body:

collection_name: Collection to merge (optional, default: handbook_documents) cleanup: Delete batches after merge (optional, default: True)

Returns:

JSONResponse with status, merged_count, batches_merged, batches_cleaned

async thoth.ingestion.flows.process_batch(request: Request) JSONResponse[source]

Process a specific batch of files (called by Cloud Tasks).

Each batch is stored in a unique GCS prefix to avoid conflicts during parallel processing. Use /merge-batches to consolidate.

Modules

batch

Batch processing workflow.

clone

Clone handbook repository to GCS.

health

Health check endpoint.

job_status

Job status and listing endpoints.

merge

Merge batches workflow.