thoth.ingestion.pipeline

Ingestion pipeline orchestrator for Thoth.

This module provides the main pipeline coordinator that integrates all ingestion components (repo manager, chunker, embedder, vector store) into a complete end-to-end ingestion workflow with progress tracking, error handling, and resume logic.

Functions

dataclass([cls, init, repr, eq, order, ...])

Add dunder methods based on the fields defined in the class.

field(*[, default, default_factory, init, ...])

Return an object to identify dataclass fields.

Classes

Any(*args, **kwargs)

Special type indicating an unconstrained type.

Callable()

Chunk(content, metadata)

Represents a chunk of markdown content with metadata.

Embedder([model_name, device, batch_size])

Generate embeddings from text using sentence-transformers.

HandbookRepoManager([repo_url, clone_path, ...])

Manages the GitLab handbook repository.

IngestionPipeline([repo_manager, chunker, ...])

Orchestrates the complete ingestion pipeline.

MarkdownChunker([min_chunk_size, ...])

Intelligent markdown-aware chunking.

Path(*args, **kwargs)

PurePath subclass that can make system calls.

PipelineState(last_commit, processed_files, ...)

Tracks the state of the ingestion pipeline.

PipelineStats(total_files, processed_files, ...)

Statistics from pipeline execution.

VectorStore([persist_directory, ...])

Vector store for managing document embeddings using ChromaDB.

datetime(year, month, day[, hour[, minute[, ...)

The year, month and day arguments are required.

timezone

Fixed offset from UTC implementation of tzinfo.

class thoth.ingestion.pipeline.PipelineState(last_commit: str | None = None, processed_files: list[str] = <factory>, failed_files: dict[str, str] = <factory>, total_chunks: int = 0, total_documents: int = 0, start_time: str | None = None, last_update_time: str | None = None, completed: bool = False)[source]

Bases: object

Tracks the state of the ingestion pipeline.

last_commit: str | None = None
processed_files: list[str]
failed_files: dict[str, str]
total_chunks: int = 0
total_documents: int = 0
start_time: str | None = None
last_update_time: str | None = None
completed: bool = False
to_dict() dict[str, Any][source]

Convert state to dictionary.

classmethod from_dict(data: dict[str, Any]) PipelineState[source]

Create state from dictionary.

__init__(last_commit: str | None = None, processed_files: list[str] = <factory>, failed_files: dict[str, str] = <factory>, total_chunks: int = 0, total_documents: int = 0, start_time: str | None = None, last_update_time: str | None = None, completed: bool = False) None
class thoth.ingestion.pipeline.PipelineStats(total_files: int, processed_files: int, failed_files: int, total_chunks: int, total_documents: int, duration_seconds: float, chunks_per_second: float, files_per_second: float)[source]

Bases: object

Statistics from pipeline execution.

total_files: int
processed_files: int
failed_files: int
total_chunks: int
total_documents: int
duration_seconds: float
chunks_per_second: float
files_per_second: float
__init__(total_files: int, processed_files: int, failed_files: int, total_chunks: int, total_documents: int, duration_seconds: float, chunks_per_second: float, files_per_second: float) None
class thoth.ingestion.pipeline.IngestionPipeline(repo_manager: HandbookRepoManager | None = None, chunker: MarkdownChunker | None = None, embedder: Embedder | None = None, vector_store: VectorStore | None = None, state_file: Path | None = None, batch_size: int = 50, logger_instance: Logger | None = None)[source]

Bases: object

Orchestrates the complete ingestion pipeline.

This class coordinates: 1. Repository cloning/updating 2. Markdown file discovery 3. Document chunking 4. Embedding generation 5. Vector store insertion

With features: - Progress tracking and reporting - Resume capability from interruptions - Error handling and logging - Batch processing for efficiency

__init__(repo_manager: HandbookRepoManager | None = None, chunker: MarkdownChunker | None = None, embedder: Embedder | None = None, vector_store: VectorStore | None = None, state_file: Path | None = None, batch_size: int = 50, logger_instance: Logger | None = None)[source]

Initialize the ingestion pipeline.

Parameters:
  • repo_manager – Repository manager instance (creates default if None)

  • chunker – Markdown chunker instance (creates default if None)

  • embedder – Embedder instance (creates default if None)

  • vector_store – Vector store instance (creates default if None)

  • state_file – Path to state file for resume capability

  • batch_size – Number of files to process in each batch

  • logger_instance – Logger instance for logging

run(force_reclone: bool = False, incremental: bool = True, progress_callback: Callable[[int, int, str], None] | None = None) PipelineStats[source]

Run the complete ingestion pipeline.

Parameters:
  • force_reclone – If True, remove and re-clone the repository

  • incremental – If True, only process changed files (requires previous state)

  • progress_callback – Optional callback(current, total, status_msg) for progress

Returns:

PipelineStats with execution statistics

Raises:

RuntimeError – If pipeline fails

reset(keep_repo: bool = True) None[source]

Reset pipeline state and optionally vector store.

Parameters:

keep_repo – If True, keep the cloned repository, else remove it

get_status() dict[str, Any][source]

Get current pipeline status.

Returns:

Dictionary with current status information