thoth.ingestion.pipeline

Ingestion pipeline orchestrator for Thoth.

This module provides the main pipeline coordinator that integrates all ingestion components (repo manager, chunker, embedder, vector store) into a complete end-to-end ingestion workflow with progress tracking, error handling, and resume logic.

Functions

dataclass([cls, init, repr, eq, order, ...])

Add dunder methods based on the fields defined in the class.

field(*[, default, default_factory, init, ...])

Return an object to identify dataclass fields.

setup_logger(name[, level, simple, json_output])

Create and configure a logger with structured JSON output.

Classes

Any(*args, **kwargs)

Special type indicating an unconstrained type.

Callable()

Chunk(content, metadata)

Represents a chunk of markdown content with metadata.

DocumentChunker([min_chunk_size, ...])

Generalized document chunker for multi-format support.

Embedder([model_name, device, batch_size, ...])

Generate embeddings from text using sentence-transformers.

GCSRepoSync(bucket_name, repo_url[, ...])

Manages repository synchronization between GCS and local storage.

HandbookRepoManager([repo_url, clone_path, ...])

Manages the GitLab handbook repository.

IngestionPipeline([repo_manager, chunker, ...])

Orchestrates the complete ingestion pipeline.

MarkdownChunker([min_chunk_size, ...])

Intelligent markdown-aware chunking.

ParserFactory()

Factory for creating and using document parsers.

Path(*args, **kwargs)

PurePath subclass that can make system calls.

PipelineState(last_commit, processed_files, ...)

Mutable state for a single pipeline run (resume and progress tracking).

PipelineStats(total_files, processed_files, ...)

Read-only statistics from a completed pipeline run.

SourceConfig(name, collection_name, ...)

Configuration for a single data source (handbook, D&D, personal, etc.).

VectorStore([persist_directory, ...])

Vector store for document embeddings using LanceDB.

datetime(year, month, day[, hour[, minute[, ...)

The year, month and day arguments are required.

class thoth.ingestion.pipeline.PipelineState(last_commit: str | None = None, processed_files: list[str] = <factory>, failed_files: dict[str, str]=<factory>, total_chunks: int = 0, total_documents: int = 0, start_time: str | None = None, last_update_time: str | None = None, completed: bool = False)[source]

Bases: object

Mutable state for a single pipeline run (resume and progress tracking).

Persisted to pipeline_state.json so runs can resume after interruption. Tracks last processed commit, file lists, chunk counts, and completion flag.

last_commit

Last Git commit processed (for incremental sync).

Type:

str | None

processed_files

List of file paths successfully processed.

Type:

list[str]

failed_files

Dict of file_path -> error_message for failed files.

Type:

dict[str, str]

total_chunks

Total chunks created this run.

Type:

int

total_documents

Total documents (chunks) added to the vector store.

Type:

int

start_time

ISO timestamp when run started.

Type:

str | None

last_update_time

ISO timestamp of last state save.

Type:

str | None

completed

True when the run finished without error.

Type:

bool

last_commit: str | None = None
processed_files: list[str]
failed_files: dict[str, str]
total_chunks: int = 0
total_documents: int = 0
start_time: str | None = None
last_update_time: str | None = None
completed: bool = False
to_dict() dict[str, Any][source]

Serialize state to a dict for JSON persistence.

Returns:

Dict with last_commit, processed_files, failed_files, total_chunks, total_documents, start_time, last_update_time, completed.

classmethod from_dict(data: dict[str, Any]) PipelineState[source]

Deserialize state from a dict (e.g., from pipeline_state.json).

Parameters:

data – Dict with keys matching PipelineState attributes.

Returns:

PipelineState instance with restored values.

__init__(last_commit: str | None = None, processed_files: list[str] = <factory>, failed_files: dict[str, str]=<factory>, total_chunks: int = 0, total_documents: int = 0, start_time: str | None = None, last_update_time: str | None = None, completed: bool = False) None
class thoth.ingestion.pipeline.PipelineStats(total_files: int, processed_files: int, failed_files: int, total_chunks: int, total_documents: int, duration_seconds: float, chunks_per_second: float, files_per_second: float)[source]

Bases: object

Read-only statistics from a completed pipeline run.

Returned by run() and used for logging and monitoring. All counts and rates are computed at the end of the run.

total_files

Total files discovered for processing.

Type:

int

processed_files

Files successfully processed.

Type:

int

failed_files

Files that failed (with errors).

Type:

int

total_chunks

Chunks created and stored.

Type:

int

total_documents

Documents (chunks) in the vector store after run.

Type:

int

duration_seconds

Elapsed time in seconds.

Type:

float

chunks_per_second

Throughput (chunks / duration).

Type:

float

files_per_second

Throughput (files / duration).

Type:

float

total_files: int
processed_files: int
failed_files: int
total_chunks: int
total_documents: int
duration_seconds: float
chunks_per_second: float
files_per_second: float
__init__(total_files: int, processed_files: int, failed_files: int, total_chunks: int, total_documents: int, duration_seconds: float, chunks_per_second: float, files_per_second: float) None
class thoth.ingestion.pipeline.IngestionPipeline(repo_manager: HandbookRepoManager | None = None, chunker: MarkdownChunker | None = None, embedder: Embedder | None = None, vector_store: VectorStore | None = None, state_file: Path | None = None, batch_size: int = 50, logger_instance: Logger | LoggerAdapter | None = None, collection_name: str = 'thoth_documents', source_config: SourceConfig | None = None)[source]

Bases: object

Orchestrates the complete ingestion pipeline.

This class coordinates: 1. Repository cloning/updating 2. Markdown file discovery 3. Document chunking 4. Embedding generation 5. Vector store insertion

With features: - Progress tracking and reporting - Resume capability from interruptions - Error handling and logging - Batch processing for efficiency

__init__(repo_manager: HandbookRepoManager | None = None, chunker: MarkdownChunker | None = None, embedder: Embedder | None = None, vector_store: VectorStore | None = None, state_file: Path | None = None, batch_size: int = 50, logger_instance: Logger | LoggerAdapter | None = None, collection_name: str = 'thoth_documents', source_config: SourceConfig | None = None)[source]

Initialize the ingestion pipeline.

Parameters:
  • repo_manager – Repository manager instance (creates default if None)

  • chunker – Markdown chunker instance (creates default if None)

  • embedder – Embedder instance (creates default if None)

  • vector_store – Vector store instance (creates default if None)

  • state_file – Path to state file for resume capability

  • batch_size – Number of files to process in each batch

  • logger_instance – Logger instance for logging

  • collection_name – Name of the vector store table (collection) to use

  • source_config – Source configuration for multi-source support

property effective_repo_path: Path

Get the effective repository path (GCS local path or repo_manager clone path).

get_file_list() list[str][source]

Get list of all markdown files for batch processing.

Returns:

List of file paths relative to repo root

process_file_batch(start_index: int, end_index: int, file_list: list[str] | None = None) dict[str, Any][source]

Process a specific batch of files by index range.

Parameters:
  • start_index – Starting index in original full file list (for tracking/logging)

  • end_index – Ending index in original full file list (for tracking/logging)

  • file_list – Pre-sliced file list for this batch. If None, discovers and slices.

Returns:

Statistics dictionary with processed/failed counts

run(force_reclone: bool = False, incremental: bool = True, progress_callback: Callable[[int, int, str], None] | None = None) PipelineStats[source]

Run the complete ingestion pipeline.

Parameters:
  • force_reclone – If True, force re-sync from GCS (or re-clone if no GCS)

  • incremental – If True, only process files not already in state

  • progress_callback – Optional callback(current, total, status_msg) for progress

Returns:

PipelineStats with execution statistics

Raises:

RuntimeError – If pipeline fails

reset(keep_repo: bool = True) None[source]

Reset pipeline state and optionally vector store.

Parameters:

keep_repo – If True, keep the cloned repository, else remove it

get_status() dict[str, Any][source]

Get current pipeline status.

Returns:

Dictionary with current status information