thoth.ingestion.task_queue¶
Cloud Tasks client for batch ingestion processing.
This module provides a client for enqueueing ingestion batches to Cloud Tasks, enabling parallel processing of large document collections.
Functions
|
Add dunder methods based on the fields defined in the class. |
|
Create and configure a logger with structured JSON output. |
Classes
|
Special type indicating an unconstrained type. |
|
Represents a batch processing task. |
|
Client for enqueueing tasks to Cloud Tasks. |
- class thoth.ingestion.task_queue.BatchTask(job_id: str, batch_id: str, start_index: int, end_index: int, collection_name: str, source: str, file_list: list[str] | None = None)[source]¶
Bases:
objectRepresents a batch processing task.
- class thoth.ingestion.task_queue.TaskQueueClient(project_id: str | None = None, location: str | None = None, queue_name: str | None = None, service_url: str | None = None, service_account_email: str | None = None)[source]¶
Bases:
objectClient for enqueueing tasks to Cloud Tasks.
- __init__(project_id: str | None = None, location: str | None = None, queue_name: str | None = None, service_url: str | None = None, service_account_email: str | None = None)[source]¶
Initialize the Cloud Tasks client.
- Parameters:
project_id – GCP project ID (default: from GCP_PROJECT_ID env)
location – Cloud Tasks location/region (default: from CLOUD_TASKS_LOCATION env)
queue_name – Cloud Tasks queue name (default: from CLOUD_TASKS_QUEUE env)
service_url – Cloud Run service URL for callbacks (default: from CLOUD_RUN_SERVICE_URL env)
service_account_email – Service account for OIDC auth (default: from SERVICE_ACCOUNT_EMAIL env)
- property client: CloudTasksClient¶
Lazy-initialize the Cloud Tasks client.
- enqueue_batch(batch: BatchTask, delay_seconds: int = 0) str | None[source]¶
Enqueue a batch processing task.
- Parameters:
batch – BatchTask with batch details
delay_seconds – Optional delay before task execution
- Returns:
Task name if successful, None if failed
- enqueue_batches(job_id: str, file_list: list[str], collection_name: str, source: str, batch_size: int = 100) dict[str, Any][source]¶
Split file list into batches and enqueue all.
- Parameters:
job_id – Job ID for tracking
file_list – List of file paths to process
collection_name – Target LanceDB collection
source – Source name (handbook, dnd, personal)
batch_size – Number of files per batch
- Returns:
Dictionary with enqueueing results