Ingestion Pipeline Architecture¶
This document describes the data ingestion pipeline that processes documents from multiple sources into searchable vector embeddings.
Overview¶
The ingestion pipeline is responsible for:
Managing multiple document sources (handbook, D&D, personal)
Parsing multiple file formats (Markdown, PDF, text, Word)
Chunking documents into semantic segments
Generating embeddings using sentence-transformers
Storing vectors in LanceDB tables for semantic search
Tracking job progress via Firestore
System Architecture¶
flowchart TB
subgraph External["External Services"]
GL[GitLab Repository]
GCS[(Google Cloud Storage)]
SM[Secret Manager]
FS[(Firestore)]
end
subgraph CloudRun["Cloud Run - Ingestion Worker"]
W[Worker HTTP Server]
JM[Job Manager]
SR[Source Registry]
PF[Parser Factory]
CH[Chunker]
EM[Embedder]
end
subgraph Parsers["Document Parsers"]
MP[Markdown Parser]
PP[PDF Parser]
TP[Text Parser]
DP[DOCX Parser]
end
subgraph Tasks["Cloud Tasks"]
Q[Ingestion Queue]
end
subgraph Storage["Vector Storage"]
VS[(LanceDB Tables)]
end
GL -->|clone| GCS
SM -->|gitlab-token| GL
GCS -->|files| PF
PF --> MP & PP & TP & DP
MP & PP & TP & DP -->|parsed docs| CH
CH -->|chunks| EM
EM -->|embeddings| VS
VS -->|backup| GCS
W -->|/ingest| JM
JM -->|create job| FS
JM -->|enqueue| Q
Q -->|/ingest-batch| W
Data Flow¶
sequenceDiagram
participant Client
participant Worker as Ingestion Worker
participant Jobs as Firestore
participant Tasks as Cloud Tasks
participant GCS
participant Parsers as Parser Factory
participant LanceDB
Client->>Worker: POST /ingest {source: "handbook"}
Worker->>Jobs: Create job (pending)
Worker-->>Client: 202 Accepted {job_id}
Worker->>GCS: List files (no download)
Worker->>Jobs: Create sub-jobs for batches
Worker->>Tasks: Enqueue batches (100 files each)
par Parallel Batch Processing
Tasks->>Worker: POST /ingest-batch (batch 0-99)
Worker->>GCS: Download batch files (10 parallel)
Worker->>Parsers: Parse batch files
Parsers-->>Worker: Parsed documents
Worker->>Worker: Chunk + Embed
Worker->>LanceDB: Write to isolated table (lancedb_batch_0)
Worker->>Jobs: Mark sub-job completed
and
Tasks->>Worker: POST /ingest-batch (batch 100-199)
Worker->>LanceDB: Write to isolated table (lancedb_batch_1)
end
Client->>Worker: POST /merge-batches
Worker->>LanceDB: Merge all batch tables to main table
Worker->>GCS: Cleanup batch prefixes
Worker->>Jobs: Update job (completed)
Client->>Worker: GET /jobs/{job_id}
Worker->>Jobs: Get job status
Worker-->>Client: Job status + stats
Sources¶
The pipeline supports multiple document sources, each with its own configuration:
Source |
Collection |
Formats |
Description |
|---|---|---|---|
|
|
|
GitLab Handbook |
|
|
|
D&D materials |
|
|
|
Personal documents |
Source Configuration (thoth/ingestion/sources/config.py)¶
@dataclass
class SourceConfig:
name: str # "handbook", "dnd", "personal"
collection_name: str # "handbook_documents"
gcs_prefix: str # Where files are stored in GCS
supported_formats: list[str] # [".md", ".pdf", ".txt", ".docx"]
description: str
Components¶
Document Parsers (thoth/ingestion/parsers/)¶
The parser factory selects the appropriate parser based on file extension:
flowchart LR
F[File] --> PF[Parser Factory]
PF -->|.md| MP[Markdown Parser]
PF -->|.pdf| PP[PDF Parser]
PF -->|.txt| TP[Text Parser]
PF -->|.docx| DP[DOCX Parser]
MP & PP & TP & DP --> PD[Parsed Document]
Parser |
Extensions |
Features |
|---|---|---|
|
|
YAML frontmatter extraction |
|
|
PyMuPDF text extraction, page numbers |
|
|
UTF-8/latin-1 encoding support |
|
|
Paragraph and table extraction |
Job Manager (thoth/ingestion/job_manager.py)¶
Tracks ingestion jobs in Firestore:
stateDiagram-v2
[*] --> pending: Create job
pending --> running: Start processing
running --> completed: Success
running --> failed: Error
completed --> [*]
failed --> [*]
Firestore Collection: thoth_jobs
Field |
Type |
Description |
|---|---|---|
|
string |
UUID identifier |
|
enum |
pending, running, completed, failed |
|
string |
Source name (handbook, dnd, personal) |
|
string |
Target LanceDB table name |
|
timestamp |
Job start time |
|
timestamp |
Job completion time |
|
object |
Processing statistics |
|
string |
Error message if failed |
Document Chunker (thoth/ingestion/chunker.py)¶
Splits documents into semantic chunks:
Chunk Size: 500-1000 tokens (configurable)
Overlap: 50 tokens between chunks
Strategy: Respects markdown structure (headers, code blocks)
Metadata: Preserves file path, section hierarchy, line numbers
Worker HTTP Server (thoth/ingestion/worker.py)¶
Cloud Run service with modular flow endpoints (116 lines, refactored from 935):
Endpoint |
Method |
Flow Module |
Description |
|---|---|---|---|
|
GET |
|
Health check |
|
POST |
|
Clone GitLab handbook to GCS |
|
POST |
|
List files, create sub-jobs, enqueue batches |
|
POST |
|
Download batch, parse, chunk, embed, store |
|
POST |
|
Consolidate batch LanceDB tables to main |
|
GET |
|
List jobs (with filtering) |
|
GET |
|
Get job status with sub-jobs |
Singleton Services (lazy-initialized in worker.py):
SourceRegistry: Multi-source config (handbook, dnd, personal)JobManager: Firestore job tracking with sub-job aggregationTaskQueueClient: Cloud Tasks batch distribution
API Usage¶
Starting an Ingestion Job¶
# Start ingestion for a source
curl -X POST https://worker-url/ingest \
-H "Authorization: Bearer $(gcloud auth print-identity-token)" \
-H "Content-Type: application/json" \
-d '{"source": "handbook"}'
# Response
{
"status": "accepted",
"job_id": "550e8400-e29b-41d4-a716-446655440000",
"source": "handbook",
"collection_name": "handbook_documents",
"message": "Ingestion job created. Use GET /jobs/{job_id} to check status."
}
Checking Job Status¶
curl https://worker-url/jobs/550e8400-e29b-41d4-a716-446655440000 \
-H "Authorization: Bearer $(gcloud auth print-identity-token)"
# Response
{
"job_id": "550e8400-e29b-41d4-a716-446655440000",
"status": "running",
"source": "handbook",
"collection_name": "handbook_documents",
"started_at": "2024-01-15T10:00:00Z",
"stats": {
"total_files": 1500,
"processed_files": 750,
"failed_files": 2,
"total_chunks": 3500,
"total_documents": 3400
}
}
Listing Jobs¶
# List all jobs
curl "https://worker-url/jobs" \
-H "Authorization: Bearer $(gcloud auth print-identity-token)"
# Filter by source and status
curl "https://worker-url/jobs?source=handbook&status=completed&limit=10" \
-H "Authorization: Bearer $(gcloud auth print-identity-token)"
Parallel Processing¶
For large ingestion jobs, the pipeline uses Cloud Tasks for parallel processing:
flowchart TB
subgraph Coordinator["Coordinator (POST /ingest)"]
L[List 1000 files]
B[Split into 10 batches]
end
subgraph Queue["Cloud Tasks Queue"]
T1[Batch 1: files 0-99]
T2[Batch 2: files 100-199]
T3[Batch 3: files 200-299]
TN[...]
end
subgraph Workers["Worker Instances"]
W1[Worker 1]
W2[Worker 2]
W3[Worker 3]
end
L --> B
B --> T1 & T2 & T3 & TN
T1 --> W1
T2 --> W2
T3 --> W3
Queue Configuration (terraform/ingestion/cloud_tasks.tf):
Max concurrent dispatches: 1 (sequential) - can be increased to 10-50 for parallel
Dispatch rate: 1-5 tasks/second
Retry policy: 5 attempts with exponential backoff (30-300s)
Batch isolation: Each batch writes to gs://bucket/lancedb_batch_X/
Idempotency: Skips processing if batch already exists in GCS
Configuration¶
Environment variables:
Variable |
Default |
Description |
|---|---|---|
|
- |
GCP project for Firestore/GCS |
|
- |
GCS bucket for storage |
|
|
GitLab instance URL |
|
- |
Repository path (e.g., |
|
|
Target chunk size in tokens |
|
|
Overlap between chunks |
|
|
Files per batch task |