Skip to content

Pipeline

thoth.ingestion.pipeline

Ingestion pipeline orchestrator for Thoth.

This module provides the main pipeline coordinator that integrates all ingestion components (repo manager, chunker, embedder, vector store) into a complete end-to-end ingestion workflow with progress tracking, error handling, and resume logic.

logger = setup_logger(__name__) module-attribute

DEFAULT_STATE_FILE = 'pipeline_state.json' module-attribute

DEFAULT_BATCH_SIZE = 50 module-attribute

PipelineState dataclass

Mutable state for a single pipeline run (resume and progress tracking).

Persisted to pipeline_state.json so runs can resume after interruption. Tracks last processed commit, file lists, chunk counts, and completion flag.

Attributes:

Name Type Description
last_commit str | None

Last Git commit processed (for incremental sync).

processed_files list[str]

List of file paths successfully processed.

failed_files dict[str, str]

Dict of file_path -> error_message for failed files.

total_chunks int

Total chunks created this run.

total_documents int

Total documents (chunks) added to the vector store.

start_time str | None

ISO timestamp when run started.

last_update_time str | None

ISO timestamp of last state save.

completed bool

True when the run finished without error.

last_commit: str | None = None class-attribute instance-attribute

processed_files: list[str] = field(default_factory=list) class-attribute instance-attribute

failed_files: dict[str, str] = field(default_factory=dict) class-attribute instance-attribute

total_chunks: int = 0 class-attribute instance-attribute

total_documents: int = 0 class-attribute instance-attribute

start_time: str | None = None class-attribute instance-attribute

last_update_time: str | None = None class-attribute instance-attribute

completed: bool = False class-attribute instance-attribute

__init__(last_commit: str | None = None, processed_files: list[str] = list(), failed_files: dict[str, str] = dict(), total_chunks: int = 0, total_documents: int = 0, start_time: str | None = None, last_update_time: str | None = None, completed: bool = False) -> None

to_dict() -> dict[str, Any]

Serialize state to a dict for JSON persistence.

Returns:

Type Description
dict[str, Any]

Dict with last_commit, processed_files, failed_files, total_chunks,

dict[str, Any]

total_documents, start_time, last_update_time, completed.

from_dict(data: dict[str, Any]) -> PipelineState classmethod

Deserialize state from a dict (e.g., from pipeline_state.json).

Parameters:

Name Type Description Default
data dict[str, Any]

Dict with keys matching PipelineState attributes.

required

Returns:

Type Description
PipelineState

PipelineState instance with restored values.

PipelineStats dataclass

Read-only statistics from a completed pipeline run.

Returned by run() and used for logging and monitoring. All counts and rates are computed at the end of the run.

Attributes:

Name Type Description
total_files int

Total files discovered for processing.

processed_files int

Files successfully processed.

failed_files int

Files that failed (with errors).

total_chunks int

Chunks created and stored.

total_documents int

Documents (chunks) in the vector store after run.

duration_seconds float

Elapsed time in seconds.

chunks_per_second float

Throughput (chunks / duration).

files_per_second float

Throughput (files / duration).

total_files: int instance-attribute

processed_files: int instance-attribute

failed_files: int instance-attribute

total_chunks: int instance-attribute

total_documents: int instance-attribute

duration_seconds: float instance-attribute

chunks_per_second: float instance-attribute

files_per_second: float instance-attribute

__init__(total_files: int, processed_files: int, failed_files: int, total_chunks: int, total_documents: int, duration_seconds: float, chunks_per_second: float, files_per_second: float) -> None

IngestionPipeline

Orchestrates the complete ingestion pipeline.

This class coordinates: 1. Repository cloning/updating 2. Markdown file discovery 3. Document chunking 4. Embedding generation 5. Vector store insertion

With features: - Progress tracking and reporting - Resume capability from interruptions - Error handling and logging - Batch processing for efficiency

logger = logger_instance or logger instance-attribute

source_config = source_config instance-attribute

repo_manager = repo_manager or HandbookRepoManager(logger=(self.logger)) instance-attribute

chunker = chunker or MarkdownChunker(logger=(self.logger)) instance-attribute

document_chunker = DocumentChunker(logger=(self.logger)) instance-attribute

embedder = embedder or Embedder(logger_instance=(self.logger)) instance-attribute

collection_name = source_config.collection_name if source_config else collection_name instance-attribute

source_name = source_config.name if source_config else '' instance-attribute

gcs_repo_sync = None instance-attribute

vector_store = VectorStore(persist_directory='/tmp/lancedb', collection_name=collection_name, gcs_bucket_name=gcs_bucket, gcs_project_id=gcs_project, logger_instance=(self.logger)) instance-attribute

state_file = state_file or self.repo_manager.clone_path.parent / DEFAULT_STATE_FILE instance-attribute

batch_size = batch_size instance-attribute

state = self._load_state() instance-attribute

effective_repo_path: Path property

Get the effective repository path (GCS local path or repo_manager clone path).

__init__(repo_manager: HandbookRepoManager | None = None, chunker: MarkdownChunker | None = None, embedder: Embedder | None = None, vector_store: VectorStore | None = None, state_file: Path | None = None, batch_size: int = DEFAULT_BATCH_SIZE, logger_instance: logging.Logger | logging.LoggerAdapter | None = None, collection_name: str = 'thoth_documents', source_config: SourceConfig | None = None)

Initialize the ingestion pipeline.

Parameters:

Name Type Description Default
repo_manager HandbookRepoManager | None

Repository manager instance (creates default if None)

None
chunker MarkdownChunker | None

Markdown chunker instance (creates default if None)

None
embedder Embedder | None

Embedder instance (creates default if None)

None
vector_store VectorStore | None

Vector store instance (creates default if None)

None
state_file Path | None

Path to state file for resume capability

None
batch_size int

Number of files to process in each batch

DEFAULT_BATCH_SIZE
logger_instance Logger | LoggerAdapter | None

Logger instance for logging

None
collection_name str

Name of the vector store table (collection) to use

'thoth_documents'
source_config SourceConfig | None

Source configuration for multi-source support

None

get_file_list() -> list[str]

Get list of all markdown files for batch processing.

Returns:

Type Description
list[str]

List of file paths relative to repo root

process_file_batch(start_index: int, end_index: int, file_list: list[str] | None = None) -> dict[str, Any]

Process a specific batch of files by index range.

Parameters:

Name Type Description Default
start_index int

Starting index in original full file list (for tracking/logging)

required
end_index int

Ending index in original full file list (for tracking/logging)

required
file_list list[str] | None

Pre-sliced file list for this batch. If None, discovers and slices.

None

Returns:

Type Description
dict[str, Any]

Statistics dictionary with processed/failed counts

run(force_reclone: bool = False, incremental: bool = True, progress_callback: Callable[[int, int, str], None] | None = None) -> PipelineStats

Run the complete ingestion pipeline.

Parameters:

Name Type Description Default
force_reclone bool

If True, force re-sync from GCS (or re-clone if no GCS)

False
incremental bool

If True, only process files not already in state

True
progress_callback Callable[[int, int, str], None] | None

Optional callback(current, total, status_msg) for progress

None

Returns:

Type Description
PipelineStats

PipelineStats with execution statistics

Raises:

Type Description
RuntimeError

If pipeline fails

reset(keep_repo: bool = True) -> None

Reset pipeline state and optionally vector store.

Parameters:

Name Type Description Default
keep_repo bool

If True, keep the cloned repository, else remove it

True

get_status() -> dict[str, Any]

Get current pipeline status.

Returns:

Type Description
dict[str, Any]

Dictionary with current status information