Source code for thoth.ingestion.pipeline

"""Ingestion pipeline orchestrator for Thoth.

This module provides the main pipeline coordinator that integrates all ingestion
components (repo manager, chunker, embedder, vector store) into a complete
end-to-end ingestion workflow with progress tracking, error handling, and resume logic.
"""

from collections.abc import Callable
from dataclasses import dataclass, field
from datetime import datetime, timezone
import json
import logging
from pathlib import Path
import shutil
from typing import Any

from thoth.ingestion.chunker import Chunk, MarkdownChunker
from thoth.ingestion.embedder import Embedder
from thoth.ingestion.repo_manager import HandbookRepoManager
from thoth.ingestion.vector_store import VectorStore

logger = logging.getLogger(__name__)

# Constants
DEFAULT_STATE_FILE = "pipeline_state.json"
DEFAULT_BATCH_SIZE = 50  # Process files in batches


[docs] @dataclass class PipelineState: """Tracks the state of the ingestion pipeline.""" last_commit: str | None = None processed_files: list[str] = field(default_factory=list) failed_files: dict[str, str] = field(default_factory=dict) # file_path -> error_message total_chunks: int = 0 total_documents: int = 0 start_time: str | None = None last_update_time: str | None = None completed: bool = False
[docs] def to_dict(self) -> dict[str, Any]: """Convert state to dictionary.""" return { "last_commit": self.last_commit, "processed_files": self.processed_files, "failed_files": self.failed_files, "total_chunks": self.total_chunks, "total_documents": self.total_documents, "start_time": self.start_time, "last_update_time": self.last_update_time, "completed": self.completed, }
[docs] @classmethod def from_dict(cls, data: dict[str, Any]) -> "PipelineState": """Create state from dictionary.""" return cls( last_commit=data.get("last_commit"), processed_files=data.get("processed_files", []), failed_files=data.get("failed_files", {}), total_chunks=data.get("total_chunks", 0), total_documents=data.get("total_documents", 0), start_time=data.get("start_time"), last_update_time=data.get("last_update_time"), completed=data.get("completed", False), )
[docs] @dataclass class PipelineStats: """Statistics from pipeline execution.""" total_files: int processed_files: int failed_files: int total_chunks: int total_documents: int duration_seconds: float chunks_per_second: float files_per_second: float
[docs] class IngestionPipeline: """Orchestrates the complete ingestion pipeline. This class coordinates: 1. Repository cloning/updating 2. Markdown file discovery 3. Document chunking 4. Embedding generation 5. Vector store insertion With features: - Progress tracking and reporting - Resume capability from interruptions - Error handling and logging - Batch processing for efficiency """
[docs] def __init__( self, repo_manager: HandbookRepoManager | None = None, chunker: MarkdownChunker | None = None, embedder: Embedder | None = None, vector_store: VectorStore | None = None, state_file: Path | None = None, batch_size: int = DEFAULT_BATCH_SIZE, logger_instance: logging.Logger | None = None, ): """Initialize the ingestion pipeline. Args: repo_manager: Repository manager instance (creates default if None) chunker: Markdown chunker instance (creates default if None) embedder: Embedder instance (creates default if None) vector_store: Vector store instance (creates default if None) state_file: Path to state file for resume capability batch_size: Number of files to process in each batch logger_instance: Logger instance for logging """ self.repo_manager = repo_manager or HandbookRepoManager() self.chunker = chunker or MarkdownChunker() self.embedder = embedder or Embedder() self.vector_store = vector_store or VectorStore() self.state_file = state_file or (self.repo_manager.clone_path.parent / DEFAULT_STATE_FILE) self.batch_size = batch_size self.logger = logger_instance or logger self.state = self._load_state()
def _load_state(self) -> PipelineState: """Load pipeline state from disk. Returns: PipelineState instance (empty if no saved state) """ if not self.state_file.exists(): self.logger.info("No previous state found, starting fresh") return PipelineState() try: with self.state_file.open(encoding="utf-8") as f: data = json.load(f) state = PipelineState.from_dict(data) self.logger.info( "Loaded previous state: %d processed files, %d failed files", len(state.processed_files), len(state.failed_files), ) return state except (OSError, json.JSONDecodeError) as e: self.logger.warning("Failed to load state file: %s. Starting fresh.", e) return PipelineState() def _save_state(self) -> None: """Save current pipeline state to disk.""" self.state.last_update_time = datetime.now(timezone.utc).isoformat() try: self.state_file.parent.mkdir(parents=True, exist_ok=True) with self.state_file.open("w", encoding="utf-8") as f: json.dump(self.state.to_dict(), f, indent=2) self.logger.debug("Saved pipeline state to %s", self.state_file) except OSError: self.logger.exception("Failed to save state file") def _discover_markdown_files(self, repo_path: Path) -> list[Path]: """Discover all markdown files in the repository. Args: repo_path: Path to the repository Returns: List of markdown file paths """ self.logger.info("Discovering markdown files in %s", repo_path) markdown_files = list(repo_path.rglob("*.md")) self.logger.info("Found %d markdown files", len(markdown_files)) return markdown_files def _process_file(self, file_path: Path) -> list[Chunk]: """Process a single markdown file into chunks. Args: file_path: Path to the markdown file Returns: List of chunks from the file Raises: Exception: If processing fails """ self.logger.debug("Processing file: %s", file_path) try: chunks = self.chunker.chunk_file(file_path) self.logger.debug("Generated %d chunks from %s", len(chunks), file_path) return chunks except Exception: self.logger.exception("Failed to process file %s", file_path) raise def _process_batch( self, files: list[Path], progress_callback: Callable[[int, int, str], None] | None = None, ) -> tuple[int, int]: """Process a batch of markdown files. Args: files: List of file paths to process progress_callback: Optional callback(current, total, status_msg) for progress updates Returns: Tuple of (successful_count, failed_count) """ successful = 0 failed = 0 total_batch_chunks = 0 for i, file_path in enumerate(files): # Skip if already processed file_str = str(file_path.relative_to(self.repo_manager.clone_path)) if file_str in self.state.processed_files: self.logger.debug("Skipping already processed file: %s", file_str) successful += 1 continue try: # Process file into chunks chunks = self._process_file(file_path) if not chunks: self.logger.warning("No chunks generated from %s", file_str) self.state.processed_files.append(file_str) successful += 1 continue # Extract content and metadata for vector store documents = [chunk.content for chunk in chunks] metadatas = [chunk.metadata.to_dict() for chunk in chunks] ids = [chunk.metadata.chunk_id for chunk in chunks] # Generate embeddings and store embeddings = self.embedder.embed(documents, show_progress=False) self.vector_store.add_documents( documents=documents, metadatas=metadatas, ids=ids, embeddings=embeddings, ) # Update state self.state.processed_files.append(file_str) self.state.total_chunks += len(chunks) self.state.total_documents += len(chunks) total_batch_chunks += len(chunks) successful += 1 if progress_callback: progress_callback( i + 1, len(files), f"Processed {file_str} ({len(chunks)} chunks)", ) except Exception as e: self.logger.exception("Failed to process file %s", file_str) self.state.failed_files[file_str] = str(e) failed += 1 if progress_callback: progress_callback( i + 1, len(files), f"Failed to process {file_str}", ) self.logger.info( "Batch complete: %d successful, %d failed, %d chunks added", successful, failed, total_batch_chunks, ) return successful, failed
[docs] def run( # noqa: PLR0912, PLR0915 self, force_reclone: bool = False, incremental: bool = True, progress_callback: Callable[[int, int, str], None] | None = None, ) -> PipelineStats: """Run the complete ingestion pipeline. Args: force_reclone: If True, remove and re-clone the repository incremental: If True, only process changed files (requires previous state) progress_callback: Optional callback(current, total, status_msg) for progress Returns: PipelineStats with execution statistics Raises: RuntimeError: If pipeline fails """ start_time = datetime.now(timezone.utc) self.state.start_time = start_time.isoformat() self.logger.info("Starting ingestion pipeline") try: # Step 1: Clone or update repository if progress_callback: progress_callback(0, 100, "Cloning/updating repository...") if not self.repo_manager.clone_path.exists() or force_reclone: self.logger.info("Cloning repository...") self.repo_manager.clone_handbook(force=force_reclone) else: self.logger.info("Updating repository...") self.repo_manager.update_repository() current_commit = self.repo_manager.get_current_commit() if not current_commit: msg = "Failed to get current commit" raise RuntimeError(msg) # Step 2: Discover files to process if progress_callback: progress_callback(10, 100, "Discovering markdown files...") all_files = self._discover_markdown_files(self.repo_manager.clone_path) # Filter files for incremental processing if incremental and self.state.last_commit: changed_files = self.repo_manager.get_changed_files(self.state.last_commit) if changed_files is not None: files_to_process = [ f for f in all_files if str(f.relative_to(self.repo_manager.clone_path)) in changed_files ] self.logger.info( "Incremental mode: processing %d changed files out of %d total", len(files_to_process), len(all_files), ) else: self.logger.warning("Failed to get changed files, processing all files") files_to_process = all_files else: files_to_process = all_files self.logger.info("Full mode: processing all %d files", len(all_files)) # Step 3: Process files in batches if progress_callback: progress_callback(20, 100, f"Processing {len(files_to_process)} files...") total_successful = 0 total_failed = 0 for batch_start in range(0, len(files_to_process), self.batch_size): batch_end = min(batch_start + self.batch_size, len(files_to_process)) batch = files_to_process[batch_start:batch_end] self.logger.info( "Processing batch %d-%d of %d files", batch_start + 1, batch_end, len(files_to_process), ) def make_callback(start: int) -> Callable[[int, int, str], None] | None: if progress_callback is None: return None return lambda c, _t, m: progress_callback( 20 + int((start + c) / len(files_to_process) * 70), 100, m, ) successful, failed = self._process_batch( batch, progress_callback=make_callback(batch_start), ) total_successful += successful total_failed += failed # Save state after each batch self._save_state() # Step 4: Finalize self.state.last_commit = current_commit self.state.completed = True self._save_state() self.repo_manager.save_metadata(current_commit) if progress_callback: progress_callback(100, 100, "Pipeline complete!") # Calculate statistics end_time = datetime.now(timezone.utc) duration = (end_time - start_time).total_seconds() stats = PipelineStats( total_files=len(files_to_process), processed_files=total_successful, failed_files=total_failed, total_chunks=self.state.total_chunks, total_documents=self.state.total_documents, duration_seconds=duration, chunks_per_second=(self.state.total_chunks / duration if duration > 0 else 0), files_per_second=total_successful / duration if duration > 0 else 0, ) self.logger.info("Pipeline completed successfully") self.logger.info(" Files processed: %d", stats.processed_files) self.logger.info(" Files failed: %d", stats.failed_files) self.logger.info(" Total chunks: %d", stats.total_chunks) self.logger.info(" Duration: %.2f seconds", stats.duration_seconds) self.logger.info( " Throughput: %.2f files/sec, %.2f chunks/sec", stats.files_per_second, stats.chunks_per_second, ) return stats except Exception as e: self.logger.exception("Pipeline failed") self._save_state() msg = f"Pipeline execution failed: {e}" raise RuntimeError(msg) from e
[docs] def reset(self, keep_repo: bool = True) -> None: """Reset pipeline state and optionally vector store. Args: keep_repo: If True, keep the cloned repository, else remove it """ self.logger.info("Resetting pipeline state") # Reset vector store self.vector_store.reset() self.logger.info("Vector store reset") # Remove state file if self.state_file.exists(): self.state_file.unlink() self.logger.info("Removed state file") # Remove repository if requested if not keep_repo and self.repo_manager.clone_path.exists(): shutil.rmtree(self.repo_manager.clone_path) self.logger.info("Removed repository") # Reset internal state self.state = PipelineState() self.logger.info("Pipeline reset complete")
[docs] def get_status(self) -> dict[str, Any]: """Get current pipeline status. Returns: Dictionary with current status information """ return { "state": self.state.to_dict(), "repo_path": str(self.repo_manager.clone_path), "repo_exists": self.repo_manager.clone_path.exists(), "vector_store_count": self.vector_store.get_document_count(), "vector_store_collection": self.vector_store.collection_name, }