thoth.ingestion

Ingestion module for managing handbook repository.

Classes

Chunk(content, metadata)

Represents a chunk of markdown content with metadata.

ChunkMetadata(chunk_id, file_path, ...)

Metadata for a document chunk.

GitLabAPIClient([token, base_url, timeout, ...])

GitLab API client with rate limiting, caching, and error handling.

HandbookRepoManager([repo_url, clone_path, ...])

Manages the GitLab handbook repository.

MarkdownChunker([min_chunk_size, ...])

Intelligent markdown-aware chunking.

VectorStore([persist_directory, ...])

Vector store for document embeddings using LanceDB.

Exceptions

GitLabAPIError

Raised for GitLab API errors.

RateLimitError

Raised when rate limit is exceeded.

class thoth.ingestion.Chunk(content: str, metadata: ChunkMetadata)[source]

Bases: object

Represents a chunk of markdown content with metadata.

content: str
metadata: ChunkMetadata
to_dict() dict[str, Any][source]

Convert chunk to dictionary.

__init__(content: str, metadata: ChunkMetadata) None
class thoth.ingestion.ChunkMetadata(chunk_id: str, file_path: str, chunk_index: int, total_chunks: int, headers: list[str] = <factory>, start_line: int = 0, end_line: int = 0, token_count: int = 0, char_count: int = 0, timestamp: str = <factory>, overlap_with_previous: bool = False, overlap_with_next: bool = False, source: str = '', format: str = '')[source]

Bases: object

Metadata for a document chunk.

chunk_id: str
file_path: str
chunk_index: int
total_chunks: int
headers: list[str]
start_line: int = 0
end_line: int = 0
token_count: int = 0
char_count: int = 0
timestamp: str
overlap_with_previous: bool = False
overlap_with_next: bool = False
source: str = ''
format: str = ''
to_dict() dict[str, Any][source]

Convert metadata to a dict suitable for vector store metadata columns.

Ensures all values are store-compatible types (str, int, float, bool). Lists (e.g., headers) are converted to comma-separated strings.

Returns:

Dict with chunk_id, file_path, chunk_index, total_chunks, headers (str), start_line, end_line, token_count, char_count, timestamp, overlap flags, source, format.

__init__(chunk_id: str, file_path: str, chunk_index: int, total_chunks: int, headers: list[str] = <factory>, start_line: int = 0, end_line: int = 0, token_count: int = 0, char_count: int = 0, timestamp: str = <factory>, overlap_with_previous: bool = False, overlap_with_next: bool = False, source: str = '', format: str = '') None
class thoth.ingestion.GitLabAPIClient(token: str | None = None, base_url: str = 'https://gitlab.com/api/v4', timeout: int = 30, max_retries: int = 3, backoff_factor: float = 2, logger: Logger | None = None)[source]

Bases: object

GitLab API client with rate limiting, caching, and error handling.

__init__(token: str | None = None, base_url: str = 'https://gitlab.com/api/v4', timeout: int = 30, max_retries: int = 3, backoff_factor: float = 2, logger: Logger | None = None)[source]

Initialize GitLab API client.

Parameters:
  • token – GitLab personal access token. If not provided, will try to get from Secret Manager or GITLAB_TOKEN environment variable.

  • base_url – Base URL for GitLab API. If not provided, will try to get from Secret Manager or GITLAB_BASE_URL environment variable.

  • timeout – Request timeout in seconds

  • max_retries – Maximum number of retries for failed requests

  • backoff_factor – Backoff factor for exponential backoff

  • logger – Logger instance

clear_cache() None[source]

Clear all cached data.

get(endpoint: str, params: dict | None = None, use_cache: bool = True, cache_ttl: int = 300) Any[source]

Make GET request.

Parameters:
  • endpoint – API endpoint

  • params – Query parameters

  • use_cache – Whether to use caching

  • cache_ttl – Cache time to live in seconds

Returns:

Response data

post(endpoint: str, data: dict | None = None) Any[source]

Make POST request.

Parameters:
  • endpoint – API endpoint

  • data – Request body data

Returns:

Response data

put(endpoint: str, data: dict | None = None) Any[source]

Make PUT request.

Parameters:
  • endpoint – API endpoint

  • data – Request body data

Returns:

Response data

delete(endpoint: str) Any[source]

Make DELETE request.

Parameters:

endpoint – API endpoint

Returns:

Response data

get_project(project_id: str, use_cache: bool = True) dict[str, Any][source]

Get project details.

Parameters:
  • project_id – Project ID or URL-encoded path

  • use_cache – Whether to use caching

Returns:

Project data

list_projects(params: dict | None = None, use_cache: bool = True) list[dict[str, Any]][source]

List projects.

Parameters:
  • params – Query parameters (e.g., {‘per_page’: 100, ‘page’: 1})

  • use_cache – Whether to use caching

Returns:

List of projects

get_repository_tree(project_id: str, path: str = '', ref: str = 'main', recursive: bool = False, use_cache: bool = True) list[dict[str, Any]][source]

Get repository tree.

Parameters:
  • project_id – Project ID or URL-encoded path

  • path – Path inside repository

  • ref – Branch/tag name

  • recursive – Get tree recursively

  • use_cache – Whether to use caching

Returns:

List of repository tree items

get_file(project_id: str, file_path: str, ref: str = 'main', use_cache: bool = True) dict[str, Any][source]

Get file content from repository.

Parameters:
  • project_id – Project ID or URL-encoded path

  • file_path – Path to file in repository

  • ref – Branch/tag name

  • use_cache – Whether to use caching

Returns:

File data including content

get_commits(project_id: str, ref: str = 'main', since: str | None = None, until: str | None = None, path: str | None = None, use_cache: bool = True) list[dict[str, Any]][source]

Get commits for a project.

Parameters:
  • project_id – Project ID or URL-encoded path

  • ref – Branch/tag name

  • since – Only commits after this date (ISO 8601 format)

  • until – Only commits before this date (ISO 8601 format)

  • path – Only commits that include this file path

  • use_cache – Whether to use caching

Returns:

List of commits

get_commit(project_id: str, commit_sha: str, use_cache: bool = True) dict[str, Any][source]

Get a single commit.

Parameters:
  • project_id – Project ID or URL-encoded path

  • commit_sha – Commit SHA

  • use_cache – Whether to use caching

Returns:

Commit data

get_commit_diff(project_id: str, commit_sha: str, use_cache: bool = True) list[dict[str, Any]][source]

Get diff of a commit.

Parameters:
  • project_id – Project ID or URL-encoded path

  • commit_sha – Commit SHA

  • use_cache – Whether to use caching

Returns:

List of diffs

list_branches(project_id: str, use_cache: bool = True) list[dict[str, Any]][source]

List branches.

Parameters:
  • project_id – Project ID or URL-encoded path

  • use_cache – Whether to use caching

Returns:

List of branches

get_branch(project_id: str, branch: str, use_cache: bool = True) dict[str, Any][source]

Get branch details.

Parameters:
  • project_id – Project ID or URL-encoded path

  • branch – Branch name

  • use_cache – Whether to use caching

Returns:

Branch data

list_merge_requests(project_id: str, state: str = 'opened', params: dict | None = None, use_cache: bool = True) list[dict[str, Any]][source]

List merge requests.

Parameters:
  • project_id – Project ID or URL-encoded path

  • state – State filter (‘opened’, ‘closed’, ‘merged’, ‘all’)

  • params – Additional query parameters

  • use_cache – Whether to use caching

Returns:

List of merge requests

get_merge_request(project_id: str, mr_iid: int, use_cache: bool = True) dict[str, Any][source]

Get merge request details.

Parameters:
  • project_id – Project ID or URL-encoded path

  • mr_iid – Merge request IID

  • use_cache – Whether to use caching

Returns:

Merge request data

get_current_user(use_cache: bool = True) dict[str, Any][source]

Get current authenticated user.

Parameters:

use_cache – Whether to use caching

Returns:

User data

Raises:

GitLabAPIError – If not authenticated

get_user(user_id: int, use_cache: bool = True) dict[str, Any][source]

Get user details.

Parameters:
  • user_id – User ID

  • use_cache – Whether to use caching

Returns:

User data

get_rate_limit_info() dict[str, Any][source]

Get current rate limit information.

Returns:

Dictionary with rate limit info

exception thoth.ingestion.GitLabAPIError[source]

Bases: Exception

Raised for GitLab API errors.

class thoth.ingestion.HandbookRepoManager(repo_url: str = 'https://gitlab.com/gitlab-com/content-sites/handbook.git', clone_path: Path | None = None, logger: Logger | LoggerAdapter | None = None)[source]

Bases: object

Manages the GitLab handbook repository.

__init__(repo_url: str = 'https://gitlab.com/gitlab-com/content-sites/handbook.git', clone_path: Path | None = None, logger: Logger | LoggerAdapter | None = None)[source]

Initialize the repository manager.

Parameters:
  • repo_url – URL of the GitLab handbook repository

  • clone_path – Local path to clone/store the repository

  • logger – Logger instance for logging messages

is_valid_repo() bool[source]

Check if clone_path contains a valid git repository.

Returns:

True if valid repo exists, False otherwise

clone_handbook(force: bool = False, max_retries: int = 3, retry_delay: int = 5, shallow: bool = True) Path[source]

Clone the GitLab handbook repository.

Parameters:
  • force – If True, remove existing repository and re-clone

  • max_retries – Maximum number of clone attempts

  • retry_delay – Delay in seconds between retries

  • shallow – If True, perform shallow clone (depth=1) for faster cloning. Shallow clones only fetch the latest commit, significantly reducing clone time for large repositories.

Returns:

Path to the cloned repository

Raises:
  • RuntimeError – If repository exists and force=False

  • GitCommandError – If cloning fails after all retries

update_repository() bool[source]

Update the repository by pulling latest changes.

For shallow clones, this fetches only the latest changes while maintaining the shallow history.

Returns:

True if update successful, False otherwise

Raises:

RuntimeError – If repository doesn’t exist

get_current_commit() str | None[source]

Get the current commit SHA of the repository.

Returns:

Commit SHA as string, or None if error occurs

Raises:

RuntimeError – If repository doesn’t exist

save_metadata(commit_sha: str) bool[source]

Save repository metadata to a JSON file.

Parameters:

commit_sha – Current commit SHA to save

Returns:

True if save successful, False otherwise

load_metadata() dict[str, Any] | None[source]

Load repository metadata from JSON file.

Returns:

Metadata dictionary with commit_sha, clone_path, repo_url, or None if error

get_changed_files(since_commit: str) list[str] | None[source]

Get list of files changed since a specific commit.

Note: For shallow clones, this may fail if the comparison commit is not in the shallow history. In this case, None is returned and callers should fall back to full processing.

Parameters:

since_commit – Commit SHA to compare against

Returns:

List of changed file paths, or None if error occurs

Raises:

RuntimeError – If repository doesn’t exist

get_file_changes(since_commit: str) dict[str, list[str]] | None[source]

Get categorized file changes since a specific commit.

Note: For shallow clones, this may fail if the comparison commit is not in the shallow history. In this case, None is returned and callers should fall back to full processing.

Parameters:

since_commit – Commit SHA to compare against

Returns:

Dictionary with keys ‘added’, ‘modified’, ‘deleted’ containing lists of file paths, or None if error occurs

Raises:

RuntimeError – If repository doesn’t exist

class thoth.ingestion.MarkdownChunker(min_chunk_size: int = 500, max_chunk_size: int = 1000, overlap_size: int = 150, logger: Logger | LoggerAdapter | None = None)[source]

Bases: object

Intelligent markdown-aware chunking.

This chunker respects markdown structure and maintains context through overlapping chunks. It extracts metadata for each chunk to enable efficient retrieval and context-aware processing.

__init__(min_chunk_size: int = 500, max_chunk_size: int = 1000, overlap_size: int = 150, logger: Logger | LoggerAdapter | None = None)[source]

Initialize the markdown chunker.

Parameters:
  • min_chunk_size – Minimum chunk size in tokens

  • max_chunk_size – Maximum chunk size in tokens

  • overlap_size – Number of tokens to overlap between chunks

  • logger – Logger instance

chunk_file(file_path: Path) list[Chunk][source]

Chunk a markdown file.

Parameters:

file_path – Path to the markdown file

Returns:

List of chunks with metadata

Raises:
chunk_text(text: str, source_path: str = '') list[Chunk][source]

Chunk markdown text content.

Parameters:
  • text – Markdown text to chunk

  • source_path – Source file path for metadata

Returns:

List of chunks with metadata

exception thoth.ingestion.RateLimitError[source]

Bases: Exception

Raised when rate limit is exceeded.

class thoth.ingestion.VectorStore(persist_directory: str = './lancedb', collection_name: str = 'thoth_documents', embedder: Embedder | None = None, gcs_bucket_name: str | None = None, gcs_project_id: str | None = None, gcs_prefix_override: str | None = None, logger_instance: Logger | LoggerAdapter | None = None)[source]

Bases: object

Vector store for document embeddings using LanceDB.

Provides add/search/delete/get operations for document chunks with metadata (file_path, section, chunk_index, source, format). Supports local paths or GCS via gs:// URIs. Uses an Embedder for query and document embeddings; defaults to sentence-transformers all-MiniLM-L6-v2.

__init__(persist_directory: str = './lancedb', collection_name: str = 'thoth_documents', embedder: Embedder | None = None, gcs_bucket_name: str | None = None, gcs_project_id: str | None = None, gcs_prefix_override: str | None = None, logger_instance: Logger | LoggerAdapter | None = None)[source]

Initialize the LanceDB vector store.

Parameters:
  • persist_directory – Local path or base path for LanceDB. Ignored when gcs_bucket_name is set (then URI is gs://bucket/lancedb or override).

  • collection_name – Name of the table (collection).

  • embedder – Optional Embedder instance. If not provided, a default Embedder with all-MiniLM-L6-v2 will be created.

  • gcs_bucket_name – Optional GCS bucket; when set, store uses gs://bucket/…

  • gcs_project_id – Optional GCP project ID (unused; kept for API compatibility).

  • gcs_prefix_override – Optional GCS path under bucket (e.g. lancedb_batch_xyz). When set with gcs_bucket_name, URI is gs://bucket/gcs_prefix_override.

  • logger_instance – Optional logger instance.

add_documents(documents: list[str], metadatas: list[dict[str, Any]] | None = None, ids: list[str] | None = None, embeddings: list[list[float]] | None = None) None[source]

Add or update documents in the table.

Parameters:
  • documents – List of document texts.

  • metadatas – Optional list of metadata dicts per document.

  • ids – Optional list of IDs; auto-generated if not provided.

  • embeddings – Optional pre-computed embeddings.

Raises:

ValueError – If list lengths do not match.

search_similar(query: str, n_results: int = 5, where: dict[str, Any] | None = None, where_document: dict[str, Any] | None = None, query_embedding: list[float] | None = None) dict[str, Any][source]

Search for similar documents by embedding.

Parameters:
  • query – Query text.

  • n_results – Maximum number of results.

  • where – Optional metadata filter (Chroma-style dict).

  • where_document – Unused; kept for API compatibility.

  • query_embedding – Optional pre-computed query embedding.

Returns:

Dict with ids, documents, metadatas, distances.

delete_documents(ids: list[str] | None = None, where: dict[str, Any] | None = None) None[source]

Delete documents by ids or where filter.

Parameters:
  • ids – Optional list of document IDs.

  • where – Optional metadata filter.

Raises:

ValueError – If neither ids nor where is provided.

delete_by_file_path(file_path: str) int[source]

Delete all documents with the given file_path metadata.

Parameters:

file_path – File path to match.

Returns:

Number of documents deleted.

get_document_count() int[source]

Return the number of documents (rows) in the table.

Returns:

Non-negative integer count of rows.

get_documents(ids: list[str] | None = None, where: dict[str, Any] | None = None, limit: int | None = None) dict[str, Any][source]

Retrieve documents by ids, where filter, or full scan with limit.

Parameters:
  • ids – Optional list of IDs.

  • where – Optional metadata filter.

  • limit – Optional maximum number of documents.

Returns:

Dict with ids, documents, metadatas.

reset() None[source]

Drop and recreate the table (all data removed).

backup_to_gcs(backup_name: str | None = None) str | None[source]

No-op when using GCS URI; data is already in GCS. Returns URI or None.

restore_from_gcs(backup_name: str | None = None, gcs_prefix: str | None = None) int[source]

Reconnect to store; when URI is GCS, data is already current. Returns doc count.

sync_to_gcs(gcs_prefix: str = 'lancedb') dict | None[source]

When using GCS URI, sync is implicit. Returns status dict or None.

list_gcs_backups() list[str][source]

No discrete backups when using LanceDB on GCS; return empty list.

Modules

chunker

Document chunking for multi-format ingestion.

flows

Ingestion workflow modules.

gcs_repo_sync

GCS-based repository synchronization for Cloud Run.

gitlab_api

GitLab API client with rate limiting, caching, and error handling.

job_manager

Job manager for tracking ingestion jobs using Firestore.

parsers

Document parsers for multi-format ingestion.

pipeline

Ingestion pipeline orchestrator for Thoth.

repo_manager

Repository manager for cloning and tracking the GitLab handbook.

singletons

Singleton management for ingestion worker services.

task_queue

Cloud Tasks client for batch ingestion processing.

worker

Ingestion worker HTTP server for Cloud Run.