thoth.ingestion.pipeline¶
Ingestion pipeline orchestrator for Thoth.
This module provides the main pipeline coordinator that integrates all ingestion components (repo manager, chunker, embedder, vector store) into a complete end-to-end ingestion workflow with progress tracking, error handling, and resume logic.
Functions
|
Add dunder methods based on the fields defined in the class. |
|
Return an object to identify dataclass fields. |
|
Create and configure a logger with structured JSON output. |
Classes
|
Special type indicating an unconstrained type. |
|
|
|
Represents a chunk of markdown content with metadata. |
|
Generalized document chunker for multi-format support. |
|
Generate embeddings from text using sentence-transformers. |
|
Manages repository synchronization between GCS and local storage. |
|
Manages the GitLab handbook repository. |
|
Orchestrates the complete ingestion pipeline. |
|
Intelligent markdown-aware chunking. |
|
Factory for creating and using document parsers. |
|
PurePath subclass that can make system calls. |
|
Mutable state for a single pipeline run (resume and progress tracking). |
|
Read-only statistics from a completed pipeline run. |
|
Configuration for a single data source (handbook, D&D, personal, etc.). |
|
Vector store for document embeddings using LanceDB. |
|
The year, month and day arguments are required. |
- class thoth.ingestion.pipeline.PipelineState(last_commit: str | None = None, processed_files: list[str] = <factory>, failed_files: dict[str, str]=<factory>, total_chunks: int = 0, total_documents: int = 0, start_time: str | None = None, last_update_time: str | None = None, completed: bool = False)[source]¶
Bases:
objectMutable state for a single pipeline run (resume and progress tracking).
Persisted to pipeline_state.json so runs can resume after interruption. Tracks last processed commit, file lists, chunk counts, and completion flag.
- to_dict() dict[str, Any][source]¶
Serialize state to a dict for JSON persistence.
- Returns:
Dict with last_commit, processed_files, failed_files, total_chunks, total_documents, start_time, last_update_time, completed.
- class thoth.ingestion.pipeline.PipelineStats(total_files: int, processed_files: int, failed_files: int, total_chunks: int, total_documents: int, duration_seconds: float, chunks_per_second: float, files_per_second: float)[source]¶
Bases:
objectRead-only statistics from a completed pipeline run.
Returned by run() and used for logging and monitoring. All counts and rates are computed at the end of the run.
- class thoth.ingestion.pipeline.IngestionPipeline(repo_manager: HandbookRepoManager | None = None, chunker: MarkdownChunker | None = None, embedder: Embedder | None = None, vector_store: VectorStore | None = None, state_file: Path | None = None, batch_size: int = 50, logger_instance: Logger | LoggerAdapter | None = None, collection_name: str = 'thoth_documents', source_config: SourceConfig | None = None)[source]¶
Bases:
objectOrchestrates the complete ingestion pipeline.
This class coordinates: 1. Repository cloning/updating 2. Markdown file discovery 3. Document chunking 4. Embedding generation 5. Vector store insertion
With features: - Progress tracking and reporting - Resume capability from interruptions - Error handling and logging - Batch processing for efficiency
- __init__(repo_manager: HandbookRepoManager | None = None, chunker: MarkdownChunker | None = None, embedder: Embedder | None = None, vector_store: VectorStore | None = None, state_file: Path | None = None, batch_size: int = 50, logger_instance: Logger | LoggerAdapter | None = None, collection_name: str = 'thoth_documents', source_config: SourceConfig | None = None)[source]¶
Initialize the ingestion pipeline.
- Parameters:
repo_manager – Repository manager instance (creates default if None)
chunker – Markdown chunker instance (creates default if None)
embedder – Embedder instance (creates default if None)
vector_store – Vector store instance (creates default if None)
state_file – Path to state file for resume capability
batch_size – Number of files to process in each batch
logger_instance – Logger instance for logging
collection_name – Name of the vector store table (collection) to use
source_config – Source configuration for multi-source support
- property effective_repo_path: Path¶
Get the effective repository path (GCS local path or repo_manager clone path).
- get_file_list() list[str][source]¶
Get list of all markdown files for batch processing.
- Returns:
List of file paths relative to repo root
- process_file_batch(start_index: int, end_index: int, file_list: list[str] | None = None) dict[str, Any][source]¶
Process a specific batch of files by index range.
- Parameters:
start_index – Starting index in original full file list (for tracking/logging)
end_index – Ending index in original full file list (for tracking/logging)
file_list – Pre-sliced file list for this batch. If None, discovers and slices.
- Returns:
Statistics dictionary with processed/failed counts
- run(force_reclone: bool = False, incremental: bool = True, progress_callback: Callable[[int, int, str], None] | None = None) PipelineStats[source]¶
Run the complete ingestion pipeline.
- Parameters:
force_reclone – If True, force re-sync from GCS (or re-clone if no GCS)
incremental – If True, only process files not already in state
progress_callback – Optional callback(current, total, status_msg) for progress
- Returns:
PipelineStats with execution statistics
- Raises:
RuntimeError – If pipeline fails