Skip to content

Task queue

thoth.ingestion.task_queue

Cloud Tasks client for batch ingestion processing.

This module provides a client for enqueueing ingestion batches to Cloud Tasks, enabling parallel processing of large document collections.

logger = setup_logger(__name__) module-attribute

BatchTask dataclass

Represents a batch processing task.

job_id: str instance-attribute

batch_id: str instance-attribute

start_index: int instance-attribute

end_index: int instance-attribute

collection_name: str instance-attribute

source: str instance-attribute

file_list: list[str] | None = None class-attribute instance-attribute

__init__(job_id: str, batch_id: str, start_index: int, end_index: int, collection_name: str, source: str, file_list: list[str] | None = None) -> None

TaskQueueClient

Client for enqueueing tasks to Cloud Tasks.

project_id = project_id or os.getenv('GCP_PROJECT_ID') instance-attribute

location = location or os.getenv('CLOUD_TASKS_LOCATION', 'us-central1') instance-attribute

queue_name = queue_name or os.getenv('CLOUD_TASKS_QUEUE') instance-attribute

service_url = service_url or os.getenv('CLOUD_RUN_SERVICE_URL') instance-attribute

service_account_email = service_account_email or os.getenv('SERVICE_ACCOUNT_EMAIL') instance-attribute

client: tasks_v2.CloudTasksClient property

Lazy-initialize the Cloud Tasks client.

queue_path: str property

Get the full queue path.

__init__(project_id: str | None = None, location: str | None = None, queue_name: str | None = None, service_url: str | None = None, service_account_email: str | None = None)

Initialize the Cloud Tasks client.

Parameters:

Name Type Description Default
project_id str | None

GCP project ID (default: from GCP_PROJECT_ID env)

None
location str | None

Cloud Tasks location/region (default: from CLOUD_TASKS_LOCATION env)

None
queue_name str | None

Cloud Tasks queue name (default: from CLOUD_TASKS_QUEUE env)

None
service_url str | None

Cloud Run service URL for callbacks (default: from CLOUD_RUN_SERVICE_URL env)

None
service_account_email str | None

Service account for OIDC auth (default: from SERVICE_ACCOUNT_EMAIL env)

None

is_configured() -> bool

Check if Cloud Tasks is properly configured.

enqueue_batch(batch: BatchTask, delay_seconds: int = 0) -> str | None

Enqueue a batch processing task.

Parameters:

Name Type Description Default
batch BatchTask

BatchTask with batch details

required
delay_seconds int

Optional delay before task execution

0

Returns:

Type Description
str | None

Task name if successful, None if failed

enqueue_batches(job_id: str, file_list: list[str], collection_name: str, source: str, batch_size: int = 100) -> dict[str, Any]

Split file list into batches and enqueue all.

Parameters:

Name Type Description Default
job_id str

Job ID for tracking

required
file_list list[str]

List of file paths to process

required
collection_name str

Target LanceDB collection

required
source str

Source name (handbook, dnd, personal)

required
batch_size int

Number of files per batch

100

Returns:

Type Description
dict[str, Any]

Dictionary with enqueueing results