thoth.ingestion¶
Ingestion module for managing handbook repository.
Classes
|
Represents a chunk of markdown content with metadata. |
|
Metadata for a document chunk. |
|
GitLab API client with rate limiting, caching, and error handling. |
|
Manages the GitLab handbook repository. |
|
Intelligent markdown-aware chunking. |
|
Vector store for document embeddings using LanceDB. |
Exceptions
Raised for GitLab API errors. |
|
Raised when rate limit is exceeded. |
- class thoth.ingestion.Chunk(content: str, metadata: ChunkMetadata)[source]¶
Bases:
objectRepresents a chunk of markdown content with metadata.
- metadata: ChunkMetadata¶
- __init__(content: str, metadata: ChunkMetadata) None¶
- class thoth.ingestion.ChunkMetadata(chunk_id: str, file_path: str, chunk_index: int, total_chunks: int, headers: list[str] = <factory>, start_line: int = 0, end_line: int = 0, token_count: int = 0, char_count: int = 0, timestamp: str = <factory>, overlap_with_previous: bool = False, overlap_with_next: bool = False, source: str = '', format: str = '')[source]¶
Bases:
objectMetadata for a document chunk.
- to_dict() dict[str, Any][source]¶
Convert metadata to a dict suitable for vector store metadata columns.
Ensures all values are store-compatible types (str, int, float, bool). Lists (e.g., headers) are converted to comma-separated strings.
- Returns:
Dict with chunk_id, file_path, chunk_index, total_chunks, headers (str), start_line, end_line, token_count, char_count, timestamp, overlap flags, source, format.
- __init__(chunk_id: str, file_path: str, chunk_index: int, total_chunks: int, headers: list[str] = <factory>, start_line: int = 0, end_line: int = 0, token_count: int = 0, char_count: int = 0, timestamp: str = <factory>, overlap_with_previous: bool = False, overlap_with_next: bool = False, source: str = '', format: str = '') None¶
- class thoth.ingestion.GitLabAPIClient(token: str | None = None, base_url: str = 'https://gitlab.com/api/v4', timeout: int = 30, max_retries: int = 3, backoff_factor: float = 2, logger: Logger | None = None)[source]¶
Bases:
objectGitLab API client with rate limiting, caching, and error handling.
- __init__(token: str | None = None, base_url: str = 'https://gitlab.com/api/v4', timeout: int = 30, max_retries: int = 3, backoff_factor: float = 2, logger: Logger | None = None)[source]¶
Initialize GitLab API client.
- Parameters:
token – GitLab personal access token. If not provided, will try to get from Secret Manager or GITLAB_TOKEN environment variable.
base_url – Base URL for GitLab API. If not provided, will try to get from Secret Manager or GITLAB_BASE_URL environment variable.
timeout – Request timeout in seconds
max_retries – Maximum number of retries for failed requests
backoff_factor – Backoff factor for exponential backoff
logger – Logger instance
- get(endpoint: str, params: dict | None = None, use_cache: bool = True, cache_ttl: int = 300) Any[source]¶
Make GET request.
- Parameters:
endpoint – API endpoint
params – Query parameters
use_cache – Whether to use caching
cache_ttl – Cache time to live in seconds
- Returns:
Response data
- post(endpoint: str, data: dict | None = None) Any[source]¶
Make POST request.
- Parameters:
endpoint – API endpoint
data – Request body data
- Returns:
Response data
- put(endpoint: str, data: dict | None = None) Any[source]¶
Make PUT request.
- Parameters:
endpoint – API endpoint
data – Request body data
- Returns:
Response data
- delete(endpoint: str) Any[source]¶
Make DELETE request.
- Parameters:
endpoint – API endpoint
- Returns:
Response data
- get_project(project_id: str, use_cache: bool = True) dict[str, Any][source]¶
Get project details.
- Parameters:
project_id – Project ID or URL-encoded path
use_cache – Whether to use caching
- Returns:
Project data
- list_projects(params: dict | None = None, use_cache: bool = True) list[dict[str, Any]][source]¶
List projects.
- Parameters:
params – Query parameters (e.g., {‘per_page’: 100, ‘page’: 1})
use_cache – Whether to use caching
- Returns:
List of projects
- get_repository_tree(project_id: str, path: str = '', ref: str = 'main', recursive: bool = False, use_cache: bool = True) list[dict[str, Any]][source]¶
Get repository tree.
- Parameters:
project_id – Project ID or URL-encoded path
path – Path inside repository
ref – Branch/tag name
recursive – Get tree recursively
use_cache – Whether to use caching
- Returns:
List of repository tree items
- get_file(project_id: str, file_path: str, ref: str = 'main', use_cache: bool = True) dict[str, Any][source]¶
Get file content from repository.
- Parameters:
project_id – Project ID or URL-encoded path
file_path – Path to file in repository
ref – Branch/tag name
use_cache – Whether to use caching
- Returns:
File data including content
- get_commits(project_id: str, ref: str = 'main', since: str | None = None, until: str | None = None, path: str | None = None, use_cache: bool = True) list[dict[str, Any]][source]¶
Get commits for a project.
- Parameters:
project_id – Project ID or URL-encoded path
ref – Branch/tag name
since – Only commits after this date (ISO 8601 format)
until – Only commits before this date (ISO 8601 format)
path – Only commits that include this file path
use_cache – Whether to use caching
- Returns:
List of commits
- get_commit(project_id: str, commit_sha: str, use_cache: bool = True) dict[str, Any][source]¶
Get a single commit.
- Parameters:
project_id – Project ID or URL-encoded path
commit_sha – Commit SHA
use_cache – Whether to use caching
- Returns:
Commit data
- get_commit_diff(project_id: str, commit_sha: str, use_cache: bool = True) list[dict[str, Any]][source]¶
Get diff of a commit.
- Parameters:
project_id – Project ID or URL-encoded path
commit_sha – Commit SHA
use_cache – Whether to use caching
- Returns:
List of diffs
- list_branches(project_id: str, use_cache: bool = True) list[dict[str, Any]][source]¶
List branches.
- Parameters:
project_id – Project ID or URL-encoded path
use_cache – Whether to use caching
- Returns:
List of branches
- get_branch(project_id: str, branch: str, use_cache: bool = True) dict[str, Any][source]¶
Get branch details.
- Parameters:
project_id – Project ID or URL-encoded path
branch – Branch name
use_cache – Whether to use caching
- Returns:
Branch data
- list_merge_requests(project_id: str, state: str = 'opened', params: dict | None = None, use_cache: bool = True) list[dict[str, Any]][source]¶
List merge requests.
- Parameters:
project_id – Project ID or URL-encoded path
state – State filter (‘opened’, ‘closed’, ‘merged’, ‘all’)
params – Additional query parameters
use_cache – Whether to use caching
- Returns:
List of merge requests
- get_merge_request(project_id: str, mr_iid: int, use_cache: bool = True) dict[str, Any][source]¶
Get merge request details.
- Parameters:
project_id – Project ID or URL-encoded path
mr_iid – Merge request IID
use_cache – Whether to use caching
- Returns:
Merge request data
- get_current_user(use_cache: bool = True) dict[str, Any][source]¶
Get current authenticated user.
- Parameters:
use_cache – Whether to use caching
- Returns:
User data
- Raises:
GitLabAPIError – If not authenticated
- class thoth.ingestion.HandbookRepoManager(repo_url: str = 'https://gitlab.com/gitlab-com/content-sites/handbook.git', clone_path: Path | None = None, logger: Logger | LoggerAdapter | None = None)[source]¶
Bases:
objectManages the GitLab handbook repository.
- __init__(repo_url: str = 'https://gitlab.com/gitlab-com/content-sites/handbook.git', clone_path: Path | None = None, logger: Logger | LoggerAdapter | None = None)[source]¶
Initialize the repository manager.
- Parameters:
repo_url – URL of the GitLab handbook repository
clone_path – Local path to clone/store the repository
logger – Logger instance for logging messages
- is_valid_repo() bool[source]¶
Check if clone_path contains a valid git repository.
- Returns:
True if valid repo exists, False otherwise
- clone_handbook(force: bool = False, max_retries: int = 3, retry_delay: int = 5, shallow: bool = True) Path[source]¶
Clone the GitLab handbook repository.
- Parameters:
force – If True, remove existing repository and re-clone
max_retries – Maximum number of clone attempts
retry_delay – Delay in seconds between retries
shallow – If True, perform shallow clone (depth=1) for faster cloning. Shallow clones only fetch the latest commit, significantly reducing clone time for large repositories.
- Returns:
Path to the cloned repository
- Raises:
RuntimeError – If repository exists and force=False
GitCommandError – If cloning fails after all retries
- update_repository() bool[source]¶
Update the repository by pulling latest changes.
For shallow clones, this fetches only the latest changes while maintaining the shallow history.
- Returns:
True if update successful, False otherwise
- Raises:
RuntimeError – If repository doesn’t exist
- get_current_commit() str | None[source]¶
Get the current commit SHA of the repository.
- Returns:
Commit SHA as string, or None if error occurs
- Raises:
RuntimeError – If repository doesn’t exist
- save_metadata(commit_sha: str) bool[source]¶
Save repository metadata to a JSON file.
- Parameters:
commit_sha – Current commit SHA to save
- Returns:
True if save successful, False otherwise
- load_metadata() dict[str, Any] | None[source]¶
Load repository metadata from JSON file.
- Returns:
Metadata dictionary with commit_sha, clone_path, repo_url, or None if error
- get_changed_files(since_commit: str) list[str] | None[source]¶
Get list of files changed since a specific commit.
Note: For shallow clones, this may fail if the comparison commit is not in the shallow history. In this case, None is returned and callers should fall back to full processing.
- Parameters:
since_commit – Commit SHA to compare against
- Returns:
List of changed file paths, or None if error occurs
- Raises:
RuntimeError – If repository doesn’t exist
- get_file_changes(since_commit: str) dict[str, list[str]] | None[source]¶
Get categorized file changes since a specific commit.
Note: For shallow clones, this may fail if the comparison commit is not in the shallow history. In this case, None is returned and callers should fall back to full processing.
- Parameters:
since_commit – Commit SHA to compare against
- Returns:
Dictionary with keys ‘added’, ‘modified’, ‘deleted’ containing lists of file paths, or None if error occurs
- Raises:
RuntimeError – If repository doesn’t exist
- class thoth.ingestion.MarkdownChunker(min_chunk_size: int = 500, max_chunk_size: int = 1000, overlap_size: int = 150, logger: Logger | LoggerAdapter | None = None)[source]¶
Bases:
objectIntelligent markdown-aware chunking.
This chunker respects markdown structure and maintains context through overlapping chunks. It extracts metadata for each chunk to enable efficient retrieval and context-aware processing.
- __init__(min_chunk_size: int = 500, max_chunk_size: int = 1000, overlap_size: int = 150, logger: Logger | LoggerAdapter | None = None)[source]¶
Initialize the markdown chunker.
- Parameters:
min_chunk_size – Minimum chunk size in tokens
max_chunk_size – Maximum chunk size in tokens
overlap_size – Number of tokens to overlap between chunks
logger – Logger instance
- chunk_file(file_path: Path) list[Chunk][source]¶
Chunk a markdown file.
- Parameters:
file_path – Path to the markdown file
- Returns:
List of chunks with metadata
- Raises:
FileNotFoundError – If file doesn’t exist
ValueError – If file is empty or invalid
- exception thoth.ingestion.RateLimitError[source]¶
Bases:
ExceptionRaised when rate limit is exceeded.
- class thoth.ingestion.VectorStore(persist_directory: str = './lancedb', collection_name: str = 'thoth_documents', embedder: Embedder | None = None, gcs_bucket_name: str | None = None, gcs_project_id: str | None = None, gcs_prefix_override: str | None = None, logger_instance: Logger | LoggerAdapter | None = None)[source]¶
Bases:
objectVector store for document embeddings using LanceDB.
Provides add/search/delete/get operations for document chunks with metadata (file_path, section, chunk_index, source, format). Supports local paths or GCS via gs:// URIs. Uses an Embedder for query and document embeddings; defaults to sentence-transformers all-MiniLM-L6-v2.
- __init__(persist_directory: str = './lancedb', collection_name: str = 'thoth_documents', embedder: Embedder | None = None, gcs_bucket_name: str | None = None, gcs_project_id: str | None = None, gcs_prefix_override: str | None = None, logger_instance: Logger | LoggerAdapter | None = None)[source]¶
Initialize the LanceDB vector store.
- Parameters:
persist_directory – Local path or base path for LanceDB. Ignored when gcs_bucket_name is set (then URI is gs://bucket/lancedb or override).
collection_name – Name of the table (collection).
embedder – Optional Embedder instance. If not provided, a default Embedder with all-MiniLM-L6-v2 will be created.
gcs_bucket_name – Optional GCS bucket; when set, store uses gs://bucket/…
gcs_project_id – Optional GCP project ID (unused; kept for API compatibility).
gcs_prefix_override – Optional GCS path under bucket (e.g. lancedb_batch_xyz). When set with gcs_bucket_name, URI is gs://bucket/gcs_prefix_override.
logger_instance – Optional logger instance.
- add_documents(documents: list[str], metadatas: list[dict[str, Any]] | None = None, ids: list[str] | None = None, embeddings: list[list[float]] | None = None) None[source]¶
Add or update documents in the table.
- Parameters:
documents – List of document texts.
metadatas – Optional list of metadata dicts per document.
ids – Optional list of IDs; auto-generated if not provided.
embeddings – Optional pre-computed embeddings.
- Raises:
ValueError – If list lengths do not match.
- search_similar(query: str, n_results: int = 5, where: dict[str, Any] | None = None, where_document: dict[str, Any] | None = None, query_embedding: list[float] | None = None) dict[str, Any][source]¶
Search for similar documents by embedding.
- Parameters:
query – Query text.
n_results – Maximum number of results.
where – Optional metadata filter (Chroma-style dict).
where_document – Unused; kept for API compatibility.
query_embedding – Optional pre-computed query embedding.
- Returns:
Dict with ids, documents, metadatas, distances.
- delete_documents(ids: list[str] | None = None, where: dict[str, Any] | None = None) None[source]¶
Delete documents by ids or where filter.
- Parameters:
ids – Optional list of document IDs.
where – Optional metadata filter.
- Raises:
ValueError – If neither ids nor where is provided.
- delete_by_file_path(file_path: str) int[source]¶
Delete all documents with the given file_path metadata.
- Parameters:
file_path – File path to match.
- Returns:
Number of documents deleted.
- get_document_count() int[source]¶
Return the number of documents (rows) in the table.
- Returns:
Non-negative integer count of rows.
- get_documents(ids: list[str] | None = None, where: dict[str, Any] | None = None, limit: int | None = None) dict[str, Any][source]¶
Retrieve documents by ids, where filter, or full scan with limit.
- Parameters:
ids – Optional list of IDs.
where – Optional metadata filter.
limit – Optional maximum number of documents.
- Returns:
Dict with ids, documents, metadatas.
- backup_to_gcs(backup_name: str | None = None) str | None[source]¶
No-op when using GCS URI; data is already in GCS. Returns URI or None.
- restore_from_gcs(backup_name: str | None = None, gcs_prefix: str | None = None) int[source]¶
Reconnect to store; when URI is GCS, data is already current. Returns doc count.
Modules
Document chunking for multi-format ingestion. |
|
Ingestion workflow modules. |
|
GCS-based repository synchronization for Cloud Run. |
|
GitLab API client with rate limiting, caching, and error handling. |
|
Job manager for tracking ingestion jobs using Firestore. |
|
Document parsers for multi-format ingestion. |
|
Ingestion pipeline orchestrator for Thoth. |
|
Repository manager for cloning and tracking the GitLab handbook. |
|
Singleton management for ingestion worker services. |
|
Cloud Tasks client for batch ingestion processing. |
|
Ingestion worker HTTP server for Cloud Run. |