thoth.ingestion.flows¶
Ingestion workflow modules.
This package contains separate modules for each major ingestion workflow, making the codebase easier to understand and maintain.
Functions
|
Clone the GitLab handbook repo to GCS for ingestion (one-time setup). |
|
Get job status by ID. |
|
Health check endpoint. |
|
Start an ingestion job. |
|
List recent jobs with optional filtering. |
|
Merge all batch LanceDB tables from GCS into the main store. |
|
Process a specific batch of files (called by Cloud Tasks). |
- async thoth.ingestion.flows.clone_handbook(_request: Request) JSONResponse[source]¶
Clone the GitLab handbook repo to GCS for ingestion (one-time setup).
Uses the pipeline’s GCSRepoSync to clone the repo into the configured bucket/prefix. Requires GCS_BUCKET_NAME and pipeline configured for GCS.
- Returns:
JSONResponse with status and message; 200 on success, 4xx/5xx on error.
- async thoth.ingestion.flows.get_job_status(request: Request) JSONResponse[source]¶
Get job status by ID.
Returns current status, statistics, and error information if failed.
- async thoth.ingestion.flows.health_check(_request: Request) JSONResponse[source]¶
Health check endpoint.
Returns service health status.
- async thoth.ingestion.flows.ingest(request: Request) JSONResponse[source]¶
Start an ingestion job.
Creates a job record and starts background processing. Returns immediately with job_id for status tracking.
- Request body:
source: Source identifier (required) - ‘handbook’, ‘dnd’, or ‘personal’ force: Force full re-ingestion (optional, default: false)
- Returns:
202 Accepted with job_id for status polling
- async thoth.ingestion.flows.list_jobs(request: Request) JSONResponse[source]¶
List recent jobs with optional filtering.
- Query parameters:
source: Filter by source name status: Filter by status (pending, running, completed, failed) limit: Maximum number of jobs (default: 50)
- async thoth.ingestion.flows.merge_batches(request: Request) JSONResponse[source]¶
Merge all batch LanceDB tables from GCS into the main store.
- Expects JSON body:
collection_name: Collection to merge (optional, default: handbook_documents) cleanup: Delete batches after merge (optional, default: True)
- Returns:
JSONResponse with status, merged_count, batches_merged, batches_cleaned
- async thoth.ingestion.flows.process_batch(request: Request) JSONResponse[source]¶
Process a specific batch of files (called by Cloud Tasks).
Each batch is stored in a unique GCS prefix to avoid conflicts during parallel processing. Use /merge-batches to consolidate.
Modules
Batch processing workflow. |
|
Clone handbook repository to GCS. |
|
Health check endpoint. |
|
Job status and listing endpoints. |
|
Merge batches workflow. |