Source code for thoth.ingestion.pipeline

"""Ingestion pipeline orchestrator for Thoth.

This module provides the main pipeline coordinator that integrates all ingestion
components (repo manager, chunker, embedder, vector store) into a complete
end-to-end ingestion workflow with progress tracking, error handling, and resume logic.
"""

from collections.abc import Callable
from dataclasses import dataclass, field
from datetime import UTC, datetime
import json
import logging
import os
from pathlib import Path
import shutil
from typing import Any

from thoth.ingestion.chunker import Chunk, DocumentChunker, MarkdownChunker
from thoth.ingestion.gcs_repo_sync import GCSRepoSync
from thoth.ingestion.parsers import ParserFactory
from thoth.ingestion.repo_manager import HandbookRepoManager
from thoth.shared.embedder import Embedder
from thoth.shared.sources.config import SourceConfig
from thoth.shared.utils.logger import setup_logger
from thoth.shared.vector_store import VectorStore

logger = setup_logger(__name__)

# Constants
DEFAULT_STATE_FILE = "pipeline_state.json"
DEFAULT_BATCH_SIZE = 50  # Process files in batches


[docs] @dataclass class PipelineState: """Mutable state for a single pipeline run (resume and progress tracking). Persisted to pipeline_state.json so runs can resume after interruption. Tracks last processed commit, file lists, chunk counts, and completion flag. Attributes: last_commit: Last Git commit processed (for incremental sync). processed_files: List of file paths successfully processed. failed_files: Dict of file_path -> error_message for failed files. total_chunks: Total chunks created this run. total_documents: Total documents (chunks) added to the vector store. start_time: ISO timestamp when run started. last_update_time: ISO timestamp of last state save. completed: True when the run finished without error. """ last_commit: str | None = None processed_files: list[str] = field(default_factory=list) failed_files: dict[str, str] = field(default_factory=dict) # file_path -> error_message total_chunks: int = 0 total_documents: int = 0 start_time: str | None = None last_update_time: str | None = None completed: bool = False
[docs] def to_dict(self) -> dict[str, Any]: """Serialize state to a dict for JSON persistence. Returns: Dict with last_commit, processed_files, failed_files, total_chunks, total_documents, start_time, last_update_time, completed. """ return { "last_commit": self.last_commit, "processed_files": self.processed_files, "failed_files": self.failed_files, "total_chunks": self.total_chunks, "total_documents": self.total_documents, "start_time": self.start_time, "last_update_time": self.last_update_time, "completed": self.completed, }
[docs] @classmethod def from_dict(cls, data: dict[str, Any]) -> "PipelineState": """Deserialize state from a dict (e.g., from pipeline_state.json). Args: data: Dict with keys matching PipelineState attributes. Returns: PipelineState instance with restored values. """ return cls( last_commit=data.get("last_commit"), processed_files=data.get("processed_files", []), failed_files=data.get("failed_files", {}), total_chunks=data.get("total_chunks", 0), total_documents=data.get("total_documents", 0), start_time=data.get("start_time"), last_update_time=data.get("last_update_time"), completed=data.get("completed", False), )
[docs] @dataclass class PipelineStats: """Read-only statistics from a completed pipeline run. Returned by run() and used for logging and monitoring. All counts and rates are computed at the end of the run. Attributes: total_files: Total files discovered for processing. processed_files: Files successfully processed. failed_files: Files that failed (with errors). total_chunks: Chunks created and stored. total_documents: Documents (chunks) in the vector store after run. duration_seconds: Elapsed time in seconds. chunks_per_second: Throughput (chunks / duration). files_per_second: Throughput (files / duration). """ total_files: int processed_files: int failed_files: int total_chunks: int total_documents: int duration_seconds: float chunks_per_second: float files_per_second: float
[docs] class IngestionPipeline: """Orchestrates the complete ingestion pipeline. This class coordinates: 1. Repository cloning/updating 2. Markdown file discovery 3. Document chunking 4. Embedding generation 5. Vector store insertion With features: - Progress tracking and reporting - Resume capability from interruptions - Error handling and logging - Batch processing for efficiency """
[docs] def __init__( self, repo_manager: HandbookRepoManager | None = None, chunker: MarkdownChunker | None = None, embedder: Embedder | None = None, vector_store: VectorStore | None = None, state_file: Path | None = None, batch_size: int = DEFAULT_BATCH_SIZE, logger_instance: logging.Logger | logging.LoggerAdapter | None = None, collection_name: str = "thoth_documents", source_config: SourceConfig | None = None, ): """Initialize the ingestion pipeline. Args: repo_manager: Repository manager instance (creates default if None) chunker: Markdown chunker instance (creates default if None) embedder: Embedder instance (creates default if None) vector_store: Vector store instance (creates default if None) state_file: Path to state file for resume capability batch_size: Number of files to process in each batch logger_instance: Logger instance for logging collection_name: Name of the vector store table (collection) to use source_config: Source configuration for multi-source support """ self.logger = logger_instance or logger self.source_config = source_config self.repo_manager = repo_manager or HandbookRepoManager(logger=self.logger) self.chunker = chunker or MarkdownChunker(logger=self.logger) self.document_chunker = DocumentChunker(logger=self.logger) # Generalized chunker for all formats self.embedder = embedder or Embedder(logger_instance=self.logger) # Use collection name from source config if provided self.collection_name = source_config.collection_name if source_config else collection_name self.source_name = source_config.name if source_config else "" # Initialize GCS repo sync if in Cloud Run environment self.gcs_repo_sync = None gcs_bucket = os.getenv("GCS_BUCKET_NAME") gcs_project = os.getenv("GCP_PROJECT_ID") if gcs_bucket and gcs_project: # Cloud Run: use GCS for repository storage self.logger.info("Cloud Run detected - using GCS for repository sync") repo_url = os.getenv("GITLAB_BASE_URL", "https://gitlab.com") + "/gitlab-com/content-sites/handbook.git" # Use source-specific GCS prefix and local path gcs_prefix = source_config.gcs_prefix if source_config else "handbook" local_path_name = source_config.name if source_config else "handbook" self.gcs_repo_sync = GCSRepoSync( bucket_name=gcs_bucket, repo_url=repo_url, gcs_prefix=gcs_prefix, local_path=Path(f"/tmp/{local_path_name}"), # nosec B108 - Cloud Run requires /tmp logger_instance=self.logger, ) # Auto-configure vector store for Cloud Run environment if vector_store is None: if gcs_bucket and gcs_project: # Cloud Run: use /tmp for local cache and sync with GCS self.logger.info(f"Cloud Run detected - using GCS bucket: {gcs_bucket}") self.vector_store = VectorStore( persist_directory="/tmp/lancedb", # nosec B108 - Cloud Run uses GCS URI collection_name=collection_name, gcs_bucket_name=gcs_bucket, gcs_project_id=gcs_project, logger_instance=self.logger, ) else: # Local: use default lancedb directory self.vector_store = VectorStore(collection_name=collection_name, logger_instance=self.logger) else: self.vector_store = vector_store self.state_file = state_file or (self.repo_manager.clone_path.parent / DEFAULT_STATE_FILE) self.batch_size = batch_size self.state = self._load_state()
@property def effective_repo_path(self) -> Path: """Get the effective repository path (GCS local path or repo_manager clone path).""" if self.gcs_repo_sync: return self.gcs_repo_sync.local_path return self.repo_manager.clone_path def _load_state(self) -> PipelineState: """Load pipeline state from disk. Returns: PipelineState instance (empty if no saved state) """ if not self.state_file.exists(): self.logger.info("No previous state found, starting fresh") return PipelineState() try: with self.state_file.open(encoding="utf-8") as f: data = json.load(f) state = PipelineState.from_dict(data) self.logger.info( "Loaded previous state: %d processed files, %d failed files", len(state.processed_files), len(state.failed_files), ) return state except (OSError, json.JSONDecodeError) as e: self.logger.warning("Failed to load state file: %s. Starting fresh.", e) return PipelineState() def _save_state(self) -> None: """Save current pipeline state to disk.""" self.state.last_update_time = datetime.now(UTC).isoformat() try: self.state_file.parent.mkdir(parents=True, exist_ok=True) with self.state_file.open("w", encoding="utf-8") as f: json.dump(self.state.to_dict(), f, indent=2) self.logger.debug("Saved pipeline state to %s", self.state_file) except OSError: self.logger.exception("Failed to save state file") def _discover_markdown_files(self, repo_path: Path) -> list[Path]: """Discover all markdown files in the repository. Args: repo_path: Path to the repository Returns: List of markdown file paths """ self.logger.info("Discovering markdown files in %s", repo_path) markdown_files = list(repo_path.rglob("*.md")) self.logger.info("Found %d markdown files", len(markdown_files)) return markdown_files def _discover_source_files(self, source_path: Path) -> list[Path]: """Discover all supported files for the configured source. This method discovers files based on the source_config's supported formats. If no source_config is set, falls back to markdown-only discovery. Args: source_path: Path to search for files Returns: List of file paths matching supported formats """ if not self.source_config: # Fallback to markdown-only for backward compatibility return self._discover_markdown_files(source_path) self.logger.info( "Discovering files in %s for source '%s' (formats: %s)", source_path, self.source_config.name, self.source_config.supported_formats, ) all_files: list[Path] = [] for ext in self.source_config.supported_formats: # Remove leading dot if present for glob pattern pattern = f"*{ext}" if ext.startswith(".") else f"*.{ext}" files = list(source_path.rglob(pattern)) all_files.extend(files) self.logger.debug("Found %d %s files", len(files), ext) self.logger.info( "Found %d total files for source '%s'", len(all_files), self.source_config.name, ) return all_files
[docs] def get_file_list(self) -> list[str]: """Get list of all markdown files for batch processing. Returns: List of file paths relative to repo root """ # Use GCS sync if in Cloud Run environment if self.gcs_repo_sync: self.logger.info("Using GCS repository sync") if not self.gcs_repo_sync.is_synced(): self.logger.info("Syncing repository from GCS to local...") result = self.gcs_repo_sync.sync_to_local() self.logger.info("Sync result: %s", result) else: self.logger.info("Repository already synced locally") repo_path = self.gcs_repo_sync.get_local_path() else: # Local environment: use traditional git clone clone_path = self.repo_manager.clone_path self.logger.info("Checking if repository exists at: %s", clone_path) if not clone_path.exists(): self.logger.info("Repository not found, cloning...") try: self.repo_manager.clone_handbook() self.logger.info("Repository cloned successfully to: %s", clone_path) except Exception: self.logger.exception("Failed to clone repository") raise else: self.logger.info("Repository already exists at: %s", clone_path) repo_path = self.repo_manager.clone_path # Discover files markdown_files = self._discover_markdown_files(repo_path) # Convert to relative paths return [str(f.relative_to(repo_path)) for f in markdown_files]
[docs] def process_file_batch( self, start_index: int, end_index: int, file_list: list[str] | None = None, ) -> dict[str, Any]: """Process a specific batch of files by index range. Args: start_index: Starting index in original full file list (for tracking/logging) end_index: Ending index in original full file list (for tracking/logging) file_list: Pre-sliced file list for this batch. If None, discovers and slices. Returns: Statistics dictionary with processed/failed counts """ self.logger.info("Processing batch %d-%d of files", start_index, end_index) # Get file list and slice if not provided if file_list is None: # Local/direct processing: discover files and slice by indices full_file_list = self.get_file_list() if start_index < 0 or end_index > len(full_file_list) or start_index >= end_index: msg = f"Invalid batch range: {start_index}-{end_index} for {len(full_file_list)} files" raise ValueError(msg) batch_files = full_file_list[start_index:end_index] else: # Cloud Tasks: file_list is already pre-sliced for this batch batch_files = file_list # Determine repo path and download files if using GCS if self.gcs_repo_sync: # Download ONLY the files in this batch (parallel downloads) self.logger.info("Downloading %d files from GCS for this batch", len(batch_files)) repo_path = self.gcs_repo_sync.download_batch_files(batch_files) else: # Use local clone for development/testing repo_path = self.repo_manager.clone_path # Convert to Path objects file_paths = [repo_path / f for f in batch_files] # Process the batch start_time = datetime.now(UTC) successful, failed = self._process_batch(file_paths) end_time = datetime.now(UTC) duration = (end_time - start_time).total_seconds() stats = { "start_index": start_index, "end_index": end_index, "total_files": len(batch_files), "successful": successful, "failed": failed, "duration_seconds": duration, } self.logger.info( "Batch %d-%d complete: %d successful, %d failed in %.2fs", start_index, end_index, successful, failed, duration, ) return stats
def _process_file(self, file_path: Path) -> list[Chunk]: """Process a single file into chunks. Supports multiple file formats through the parser system: - Markdown (.md) - uses MarkdownChunker for structure-aware chunking - PDF (.pdf) - extracts text with page markers - Text (.txt) - simple text extraction - Word (.docx) - extracts paragraphs and tables Args: file_path: Path to the file Returns: List of chunks from the file Raises: Exception: If processing fails """ self.logger.debug("Processing file: %s", file_path) try: # Determine if we should use the new parser system extension = file_path.suffix.lower() if extension in [".md", ".markdown", ".mdown"]: # Use existing markdown chunker for backward compatibility chunks = self.chunker.chunk_file(file_path) # Add source and format metadata for chunk in chunks: chunk.metadata.source = self.source_name chunk.metadata.format = "markdown" elif ParserFactory.can_parse(file_path): # Use parser system for other formats parsed_doc = ParserFactory.parse(file_path) chunks = self.document_chunker.chunk_document( content=parsed_doc.content, source_path=str(file_path), source=self.source_name, doc_format=parsed_doc.format, ) else: self.logger.warning("No parser available for %s, skipping", file_path) return [] self.logger.debug("Generated %d chunks from %s", len(chunks), file_path) return chunks except Exception: self.logger.exception("Failed to process file %s", file_path) raise def _process_batch( self, files: list[Path], progress_callback: Callable[[int, int, str], None] | None = None, ) -> tuple[int, int]: """Process a batch of markdown files. Args: files: List of file paths to process progress_callback: Optional callback(current, total, status_msg) for progress updates Returns: Tuple of (successful_count, failed_count) """ successful = 0 failed = 0 total_batch_chunks = 0 for i, file_path in enumerate(files): # Skip if already processed file_str = str(file_path.relative_to(self.effective_repo_path)) if file_str in self.state.processed_files: self.logger.debug("Skipping already processed file: %s", file_str) successful += 1 continue try: # Process file into chunks chunks = self._process_file(file_path) if not chunks: self.logger.warning("No chunks generated from %s", file_str) self.state.processed_files.append(file_str) successful += 1 continue # Extract content and metadata for vector store documents = [chunk.content for chunk in chunks] metadatas = [chunk.metadata.to_dict() for chunk in chunks] ids = [chunk.metadata.chunk_id for chunk in chunks] # Sanitize metadatas to ensure LanceDB compatibility # Convert any list values to comma-separated strings def sanitize_metadata(meta: dict[str, Any]) -> dict[str, Any]: """Ensure all metadata values are LanceDB-compatible (str, int, float, bool).""" sanitized: dict[str, Any] = {} for key, value in meta.items(): if isinstance(value, list): sanitized[key] = ", ".join(str(v) for v in value) elif isinstance(value, (str, int, float, bool)): sanitized[key] = value elif value is None: sanitized[key] = "" else: sanitized[key] = str(value) return sanitized metadatas = [sanitize_metadata(m) for m in metadatas] # Generate embeddings and store embeddings = self.embedder.embed(documents, show_progress=False) self.vector_store.add_documents( documents=documents, metadatas=metadatas, ids=ids, embeddings=embeddings, ) # Update state self.state.processed_files.append(file_str) self.state.total_chunks += len(chunks) self.state.total_documents += len(chunks) total_batch_chunks += len(chunks) successful += 1 if progress_callback: progress_callback( i + 1, len(files), f"Processed {file_str} ({len(chunks)} chunks)", ) except Exception as e: self.logger.exception("Failed to process file %s", file_str) self.state.failed_files[file_str] = str(e) failed += 1 if progress_callback: progress_callback( i + 1, len(files), f"Failed to process {file_str}", ) self.logger.info( "Batch complete: %d successful, %d failed, %d chunks added", successful, failed, total_batch_chunks, ) return successful, failed def _handle_deleted_files(self, deleted_files: list[str]) -> tuple[int, int]: """Handle deleted files by removing their documents from vector store. Args: deleted_files: List of deleted file paths (relative to repo) Returns: Tuple of (successful_count, failed_count) """ successful = 0 failed = 0 for file_path in deleted_files: try: # Delete all documents associated with this file deleted_count = self.vector_store.delete_by_file_path(file_path) # Remove from processed files list if file_path in self.state.processed_files: self.state.processed_files.remove(file_path) # Update statistics, ensuring counters do not go negative self.state.total_chunks = max(0, self.state.total_chunks - deleted_count) self.state.total_documents = max(0, self.state.total_documents - deleted_count) self.logger.info( "Deleted %d documents for removed file: %s", deleted_count, file_path, ) successful += 1 except Exception as e: self.logger.exception("Failed to handle deleted file %s", file_path) self.state.failed_files[file_path] = f"Delete failed: {e}" failed += 1 return successful, failed def _handle_modified_files( self, modified_files: list[Path], progress_callback: Callable[[int, int, str], None] | None = None, ) -> tuple[int, int]: """Handle modified files by updating their documents in vector store. Args: modified_files: List of modified file paths progress_callback: Optional callback for progress updates Returns: Tuple of (successful_count, failed_count) """ successful = 0 failed = 0 total_chunks = 0 for i, file_path in enumerate(modified_files): file_str = str(file_path.relative_to(self.effective_repo_path)) try: # Step 1: Delete old documents for this file deleted_count = self.vector_store.delete_by_file_path(file_str) self.logger.debug( "Deleted %d old documents for modified file: %s", deleted_count, file_str, ) # Step 2: Process the updated file chunks = self._process_file(file_path) if not chunks: self.logger.warning("No chunks generated from modified file %s", file_str) # Still mark as successful since we deleted old content if file_str not in self.state.processed_files: self.state.processed_files.append(file_str) successful += 1 # Update statistics to account for deleted chunks when no new chunks are added self.state.total_chunks -= deleted_count self.state.total_documents -= deleted_count continue # Step 3: Add new documents documents = [chunk.content for chunk in chunks] metadatas = [chunk.metadata.to_dict() for chunk in chunks] ids = [chunk.metadata.chunk_id for chunk in chunks] embeddings = self.embedder.embed(documents, show_progress=False) self.vector_store.add_documents( documents=documents, metadatas=metadatas, ids=ids, embeddings=embeddings, ) # Update state if file_str not in self.state.processed_files: self.state.processed_files.append(file_str) # Update chunk counts (net change) self.state.total_chunks += len(chunks) - deleted_count self.state.total_documents += len(chunks) - deleted_count total_chunks += len(chunks) successful += 1 if progress_callback: progress_callback( i + 1, len(modified_files), f"Updated {file_str} ({len(chunks)} chunks)", ) except Exception as e: self.logger.exception("Failed to handle modified file %s", file_str) self.state.failed_files[file_str] = f"Modify failed: {e}" failed += 1 if progress_callback: progress_callback( i + 1, len(modified_files), f"Failed to update {file_str}", ) self.logger.info( "Modified files processed: %d successful, %d failed, %d new chunks", successful, failed, total_chunks, ) return successful, failed
[docs] def run( # noqa: PLR0912, PLR0915 self, force_reclone: bool = False, incremental: bool = True, progress_callback: Callable[[int, int, str], None] | None = None, ) -> PipelineStats: """Run the complete ingestion pipeline. Args: force_reclone: If True, force re-sync from GCS (or re-clone if no GCS) incremental: If True, only process files not already in state progress_callback: Optional callback(current, total, status_msg) for progress Returns: PipelineStats with execution statistics Raises: RuntimeError: If pipeline fails """ start_time = datetime.now(UTC) self.state.start_time = start_time.isoformat() self.logger.info("Starting ingestion pipeline") try: # Step 1: Get repository files (from GCS or local clone) if progress_callback: progress_callback(0, 100, "Syncing repository...") if self.gcs_repo_sync: # Cloud Run: sync from GCS (never clone directly) self.logger.info("Syncing repository from GCS...") sync_result = self.gcs_repo_sync.sync_to_local(force=force_reclone) self.logger.info( "GCS sync complete: %s (%s files)", sync_result.get("status"), sync_result.get("files_downloaded", sync_result.get("file_count", "?")), ) repo_path = self.gcs_repo_sync.get_local_path() # GCS sync doesn't have git history, use a placeholder commit current_commit = "gcs-sync" else: # Local environment: use traditional git clone if not self.repo_manager.is_valid_repo() or force_reclone: self.logger.info("Cloning repository...") self.repo_manager.clone_handbook(force=force_reclone) else: self.logger.info("Updating repository...") self.repo_manager.update_repository() commit_or_none = self.repo_manager.get_current_commit() if not commit_or_none: msg = "Failed to get current commit" raise RuntimeError(msg) current_commit = commit_or_none repo_path = self.repo_manager.clone_path # Step 2: Discover files to process if progress_callback: progress_callback(10, 100, "Discovering markdown files...") all_files = self._discover_markdown_files(repo_path) # Initialize file lists files_to_process: list[Path] = [] deleted_files: list[str] = [] added_files_list: list[Path] = [] modified_files_list: list[Path] = [] # Filter files for incremental processing # For GCS mode: use file-based incremental (skip already processed files) # For git mode: use commit-based incremental (diff against last commit) use_git_incremental = ( incremental and self.state.last_commit and self.state.last_commit != "gcs-sync" and not self.gcs_repo_sync ) if use_git_incremental and self.state.last_commit: # Git-based incremental: use commit diff file_changes = self.repo_manager.get_file_changes(self.state.last_commit) if file_changes is not None: # Filter for markdown files only added_md = [f for f in file_changes["added"] if f.endswith(".md")] modified_md = [f for f in file_changes["modified"] if f.endswith(".md")] deleted_md = [f for f in file_changes["deleted"] if f.endswith(".md")] # Convert to Path objects for added and modified added_files_list = [repo_path / f for f in added_md if (repo_path / f).exists()] modified_files_list = [repo_path / f for f in modified_md if (repo_path / f).exists()] files_to_process = added_files_list + modified_files_list deleted_files = deleted_md self.logger.info( "Git incremental mode: %d added, %d modified, %d deleted", len(added_files_list), len(modified_files_list), len(deleted_files), ) else: self.logger.warning("Failed to get file changes, processing all files") files_to_process = all_files added_files_list = all_files modified_files_list = [] elif incremental and self.state.processed_files: # File-based incremental: skip already processed files # This works for both GCS mode and when git diff fails processed_set = set(self.state.processed_files) added_files_list = [f for f in all_files if str(f.relative_to(repo_path)) not in processed_set] files_to_process = added_files_list modified_files_list = [] self.logger.info( "File-based incremental: %d new files to process (%d already done)", len(added_files_list), len(processed_set), ) else: files_to_process = all_files added_files_list = all_files modified_files_list = [] self.logger.info("Full mode: processing all %d files", len(all_files)) # Step 3: Handle file changes incrementally if progress_callback: progress_callback(20, 100, "Processing file changes...") total_successful = 0 total_failed = 0 # Step 3a: Handle deleted files if deleted_files: self.logger.info("Processing %d deleted files", len(deleted_files)) if progress_callback: progress_callback(25, 100, f"Removing {len(deleted_files)} deleted files...") deleted_success, deleted_failed = self._handle_deleted_files(deleted_files) total_successful += deleted_success total_failed += deleted_failed self._save_state() # Step 3b: Handle modified files (update) if incremental and self.state.last_commit and modified_files_list: self.logger.info("Processing %d modified files", len(modified_files_list)) for batch_start in range(0, len(modified_files_list), self.batch_size): batch_end = min(batch_start + self.batch_size, len(modified_files_list)) batch = modified_files_list[batch_start:batch_end] self.logger.info( "Processing modified batch %d-%d of %d files", batch_start + 1, batch_end, len(modified_files_list), ) def make_modified_callback( start: int, ) -> Callable[[int, int, str], None] | None: if progress_callback is None: return None total_changes = len(deleted_files) + len(modified_files_list) + len(added_files_list) base_progress = 25 + int(len(deleted_files) / max(total_changes, 1) * 30) return lambda c, _t, m: progress_callback( base_progress + int((start + c) / len(modified_files_list) * 25), 100, m, ) successful, failed = self._handle_modified_files( batch, progress_callback=make_modified_callback(batch_start), ) total_successful += successful total_failed += failed self._save_state() # Step 3c: Handle added files (new) - process normally if added_files_list: self.logger.info("Processing %d added/new files", len(added_files_list)) for batch_start in range(0, len(added_files_list), self.batch_size): batch_end = min(batch_start + self.batch_size, len(added_files_list)) batch = added_files_list[batch_start:batch_end] self.logger.info( "Processing added batch %d-%d of %d files", batch_start + 1, batch_end, len(added_files_list), ) def make_added_callback( start: int, ) -> Callable[[int, int, str], None] | None: if progress_callback is None: return None if incremental and self.state.last_commit: total_changes = len(deleted_files) + len(modified_files_list) + len(added_files_list) base_progress = 25 + int( (len(deleted_files) + len(modified_files_list)) / max(total_changes, 1) * 55 ) return lambda c, _t, m: progress_callback( base_progress + int((start + c) / len(added_files_list) * 20), 100, m, ) # Full mode return lambda c, _t, m: progress_callback( 20 + int((start + c) / len(added_files_list) * 70), 100, m, ) successful, failed = self._process_batch( batch, progress_callback=make_added_callback(batch_start), ) total_successful += successful total_failed += failed self._save_state() # Step 4: Finalize self.state.last_commit = current_commit self.state.completed = True self._save_state() self.repo_manager.save_metadata(current_commit) if progress_callback: progress_callback(100, 100, "Pipeline complete!") # Calculate statistics end_time = datetime.now(UTC) duration = (end_time - start_time).total_seconds() total_files_processed = ( len(added_files_list) + len(modified_files_list) + len(deleted_files) if incremental and self.state.last_commit else len(files_to_process) ) stats = PipelineStats( total_files=total_files_processed, processed_files=total_successful, failed_files=total_failed, total_chunks=self.state.total_chunks, total_documents=self.state.total_documents, duration_seconds=duration, chunks_per_second=(self.state.total_chunks / duration if duration > 0 else 0), files_per_second=total_successful / duration if duration > 0 else 0, ) self.logger.info("Pipeline completed successfully") self.logger.info(" Files processed: %d", stats.processed_files) self.logger.info(" Files failed: %d", stats.failed_files) self.logger.info(" Total chunks: %d", stats.total_chunks) self.logger.info(" Duration: %.2f seconds", stats.duration_seconds) self.logger.info( " Throughput: %.2f files/sec, %.2f chunks/sec", stats.files_per_second, stats.chunks_per_second, ) return stats except Exception as e: self.logger.exception("Pipeline failed") self._save_state() msg = f"Pipeline execution failed: {e}" raise RuntimeError(msg) from e
[docs] def reset(self, keep_repo: bool = True) -> None: """Reset pipeline state and optionally vector store. Args: keep_repo: If True, keep the cloned repository, else remove it """ self.logger.info("Resetting pipeline state") # Reset vector store self.vector_store.reset() self.logger.info("Vector store reset") # Remove state file if self.state_file.exists(): self.state_file.unlink() self.logger.info("Removed state file") # Remove repository if requested if not keep_repo and self.repo_manager.clone_path.exists(): shutil.rmtree(self.repo_manager.clone_path) self.logger.info("Removed repository") # Reset internal state self.state = PipelineState() self.logger.info("Pipeline reset complete")
[docs] def get_status(self) -> dict[str, Any]: """Get current pipeline status. Returns: Dictionary with current status information """ return { "state": self.state.to_dict(), "repo_path": str(self.repo_manager.clone_path), "repo_exists": self.repo_manager.clone_path.exists(), "vector_store_count": self.vector_store.get_document_count(), "vector_store_collection": self.vector_store.collection_name, }