Pipeline
thoth.ingestion.pipeline
¶
Ingestion pipeline orchestrator for Thoth.
This module provides the main pipeline coordinator that integrates all ingestion components (repo manager, chunker, embedder, vector store) into a complete end-to-end ingestion workflow with progress tracking, error handling, and resume logic.
logger = setup_logger(__name__)
module-attribute
¶
DEFAULT_STATE_FILE = 'pipeline_state.json'
module-attribute
¶
DEFAULT_BATCH_SIZE = 50
module-attribute
¶
PipelineState
dataclass
¶
Mutable state for a single pipeline run (resume and progress tracking).
Persisted to pipeline_state.json so runs can resume after interruption. Tracks last processed commit, file lists, chunk counts, and completion flag.
Attributes:
| Name | Type | Description |
|---|---|---|
last_commit |
str | None
|
Last Git commit processed (for incremental sync). |
processed_files |
list[str]
|
List of file paths successfully processed. |
failed_files |
dict[str, str]
|
Dict of file_path -> error_message for failed files. |
total_chunks |
int
|
Total chunks created this run. |
total_documents |
int
|
Total documents (chunks) added to the vector store. |
start_time |
str | None
|
ISO timestamp when run started. |
last_update_time |
str | None
|
ISO timestamp of last state save. |
completed |
bool
|
True when the run finished without error. |
last_commit: str | None = None
class-attribute
instance-attribute
¶
processed_files: list[str] = field(default_factory=list)
class-attribute
instance-attribute
¶
failed_files: dict[str, str] = field(default_factory=dict)
class-attribute
instance-attribute
¶
total_chunks: int = 0
class-attribute
instance-attribute
¶
total_documents: int = 0
class-attribute
instance-attribute
¶
start_time: str | None = None
class-attribute
instance-attribute
¶
last_update_time: str | None = None
class-attribute
instance-attribute
¶
completed: bool = False
class-attribute
instance-attribute
¶
__init__(last_commit: str | None = None, processed_files: list[str] = list(), failed_files: dict[str, str] = dict(), total_chunks: int = 0, total_documents: int = 0, start_time: str | None = None, last_update_time: str | None = None, completed: bool = False) -> None
¶
to_dict() -> dict[str, Any]
¶
Serialize state to a dict for JSON persistence.
Returns:
| Type | Description |
|---|---|
dict[str, Any]
|
Dict with last_commit, processed_files, failed_files, total_chunks, |
dict[str, Any]
|
total_documents, start_time, last_update_time, completed. |
from_dict(data: dict[str, Any]) -> PipelineState
classmethod
¶
Deserialize state from a dict (e.g., from pipeline_state.json).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
data
|
dict[str, Any]
|
Dict with keys matching PipelineState attributes. |
required |
Returns:
| Type | Description |
|---|---|
PipelineState
|
PipelineState instance with restored values. |
PipelineStats
dataclass
¶
Read-only statistics from a completed pipeline run.
Returned by run() and used for logging and monitoring. All counts and rates are computed at the end of the run.
Attributes:
| Name | Type | Description |
|---|---|---|
total_files |
int
|
Total files discovered for processing. |
processed_files |
int
|
Files successfully processed. |
failed_files |
int
|
Files that failed (with errors). |
total_chunks |
int
|
Chunks created and stored. |
total_documents |
int
|
Documents (chunks) in the vector store after run. |
duration_seconds |
float
|
Elapsed time in seconds. |
chunks_per_second |
float
|
Throughput (chunks / duration). |
files_per_second |
float
|
Throughput (files / duration). |
total_files: int
instance-attribute
¶
processed_files: int
instance-attribute
¶
failed_files: int
instance-attribute
¶
total_chunks: int
instance-attribute
¶
total_documents: int
instance-attribute
¶
duration_seconds: float
instance-attribute
¶
chunks_per_second: float
instance-attribute
¶
files_per_second: float
instance-attribute
¶
__init__(total_files: int, processed_files: int, failed_files: int, total_chunks: int, total_documents: int, duration_seconds: float, chunks_per_second: float, files_per_second: float) -> None
¶
IngestionPipeline
¶
Orchestrates the complete ingestion pipeline.
This class coordinates: 1. Repository cloning/updating 2. Markdown file discovery 3. Document chunking 4. Embedding generation 5. Vector store insertion
With features: - Progress tracking and reporting - Resume capability from interruptions - Error handling and logging - Batch processing for efficiency
logger = logger_instance or logger
instance-attribute
¶
source_config = source_config
instance-attribute
¶
repo_manager = repo_manager or HandbookRepoManager(logger=(self.logger))
instance-attribute
¶
chunker = chunker or MarkdownChunker(logger=(self.logger))
instance-attribute
¶
document_chunker = DocumentChunker(logger=(self.logger))
instance-attribute
¶
embedder = embedder or Embedder(logger_instance=(self.logger))
instance-attribute
¶
collection_name = source_config.collection_name if source_config else collection_name
instance-attribute
¶
source_name = source_config.name if source_config else ''
instance-attribute
¶
gcs_repo_sync = None
instance-attribute
¶
vector_store = VectorStore(persist_directory='/tmp/lancedb', collection_name=collection_name, gcs_bucket_name=gcs_bucket, gcs_project_id=gcs_project, logger_instance=(self.logger))
instance-attribute
¶
state_file = state_file or self.repo_manager.clone_path.parent / DEFAULT_STATE_FILE
instance-attribute
¶
batch_size = batch_size
instance-attribute
¶
state = self._load_state()
instance-attribute
¶
effective_repo_path: Path
property
¶
Get the effective repository path (GCS local path or repo_manager clone path).
__init__(repo_manager: HandbookRepoManager | None = None, chunker: MarkdownChunker | None = None, embedder: Embedder | None = None, vector_store: VectorStore | None = None, state_file: Path | None = None, batch_size: int = DEFAULT_BATCH_SIZE, logger_instance: logging.Logger | logging.LoggerAdapter | None = None, collection_name: str = 'thoth_documents', source_config: SourceConfig | None = None)
¶
Initialize the ingestion pipeline.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
repo_manager
|
HandbookRepoManager | None
|
Repository manager instance (creates default if None) |
None
|
chunker
|
MarkdownChunker | None
|
Markdown chunker instance (creates default if None) |
None
|
embedder
|
Embedder | None
|
Embedder instance (creates default if None) |
None
|
vector_store
|
VectorStore | None
|
Vector store instance (creates default if None) |
None
|
state_file
|
Path | None
|
Path to state file for resume capability |
None
|
batch_size
|
int
|
Number of files to process in each batch |
DEFAULT_BATCH_SIZE
|
logger_instance
|
Logger | LoggerAdapter | None
|
Logger instance for logging |
None
|
collection_name
|
str
|
Name of the vector store table (collection) to use |
'thoth_documents'
|
source_config
|
SourceConfig | None
|
Source configuration for multi-source support |
None
|
get_file_list() -> list[str]
¶
Get list of all markdown files for batch processing.
Returns:
| Type | Description |
|---|---|
list[str]
|
List of file paths relative to repo root |
process_file_batch(start_index: int, end_index: int, file_list: list[str] | None = None) -> dict[str, Any]
¶
Process a specific batch of files by index range.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
start_index
|
int
|
Starting index in original full file list (for tracking/logging) |
required |
end_index
|
int
|
Ending index in original full file list (for tracking/logging) |
required |
file_list
|
list[str] | None
|
Pre-sliced file list for this batch. If None, discovers and slices. |
None
|
Returns:
| Type | Description |
|---|---|
dict[str, Any]
|
Statistics dictionary with processed/failed counts |
run(force_reclone: bool = False, incremental: bool = True, progress_callback: Callable[[int, int, str], None] | None = None) -> PipelineStats
¶
Run the complete ingestion pipeline.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
force_reclone
|
bool
|
If True, force re-sync from GCS (or re-clone if no GCS) |
False
|
incremental
|
bool
|
If True, only process files not already in state |
True
|
progress_callback
|
Callable[[int, int, str], None] | None
|
Optional callback(current, total, status_msg) for progress |
None
|
Returns:
| Type | Description |
|---|---|
PipelineStats
|
PipelineStats with execution statistics |
Raises:
| Type | Description |
|---|---|
RuntimeError
|
If pipeline fails |
reset(keep_repo: bool = True) -> None
¶
Reset pipeline state and optionally vector store.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
keep_repo
|
bool
|
If True, keep the cloned repository, else remove it |
True
|
get_status() -> dict[str, Any]
¶
Get current pipeline status.
Returns:
| Type | Description |
|---|---|
dict[str, Any]
|
Dictionary with current status information |