thoth.ingestion.task_queue

Cloud Tasks client for batch ingestion processing.

This module provides a client for enqueueing ingestion batches to Cloud Tasks, enabling parallel processing of large document collections.

Functions

dataclass([cls, init, repr, eq, order, ...])

Add dunder methods based on the fields defined in the class.

setup_logger(name[, level, simple, json_output])

Create and configure a logger with structured JSON output.

Classes

Any(*args, **kwargs)

Special type indicating an unconstrained type.

BatchTask(job_id, batch_id, start_index, ...)

Represents a batch processing task.

TaskQueueClient([project_id, location, ...])

Client for enqueueing tasks to Cloud Tasks.

class thoth.ingestion.task_queue.BatchTask(job_id: str, batch_id: str, start_index: int, end_index: int, collection_name: str, source: str, file_list: list[str] | None = None)[source]

Bases: object

Represents a batch processing task.

job_id: str
batch_id: str
start_index: int
end_index: int
collection_name: str
source: str
file_list: list[str] | None = None
__init__(job_id: str, batch_id: str, start_index: int, end_index: int, collection_name: str, source: str, file_list: list[str] | None = None) None
class thoth.ingestion.task_queue.TaskQueueClient(project_id: str | None = None, location: str | None = None, queue_name: str | None = None, service_url: str | None = None, service_account_email: str | None = None)[source]

Bases: object

Client for enqueueing tasks to Cloud Tasks.

__init__(project_id: str | None = None, location: str | None = None, queue_name: str | None = None, service_url: str | None = None, service_account_email: str | None = None)[source]

Initialize the Cloud Tasks client.

Parameters:
  • project_id – GCP project ID (default: from GCP_PROJECT_ID env)

  • location – Cloud Tasks location/region (default: from CLOUD_TASKS_LOCATION env)

  • queue_name – Cloud Tasks queue name (default: from CLOUD_TASKS_QUEUE env)

  • service_url – Cloud Run service URL for callbacks (default: from CLOUD_RUN_SERVICE_URL env)

  • service_account_email – Service account for OIDC auth (default: from SERVICE_ACCOUNT_EMAIL env)

property client: CloudTasksClient

Lazy-initialize the Cloud Tasks client.

property queue_path: str

Get the full queue path.

is_configured() bool[source]

Check if Cloud Tasks is properly configured.

enqueue_batch(batch: BatchTask, delay_seconds: int = 0) str | None[source]

Enqueue a batch processing task.

Parameters:
  • batch – BatchTask with batch details

  • delay_seconds – Optional delay before task execution

Returns:

Task name if successful, None if failed

enqueue_batches(job_id: str, file_list: list[str], collection_name: str, source: str, batch_size: int = 100) dict[str, Any][source]

Split file list into batches and enqueue all.

Parameters:
  • job_id – Job ID for tracking

  • file_list – List of file paths to process

  • collection_name – Target LanceDB collection

  • source – Source name (handbook, dnd, personal)

  • batch_size – Number of files per batch

Returns:

Dictionary with enqueueing results