Task queue
thoth.ingestion.task_queue
¶
Cloud Tasks client for batch ingestion processing.
This module provides a client for enqueueing ingestion batches to Cloud Tasks, enabling parallel processing of large document collections.
logger = setup_logger(__name__)
module-attribute
¶
BatchTask
dataclass
¶
Represents a batch processing task.
job_id: str
instance-attribute
¶
batch_id: str
instance-attribute
¶
start_index: int
instance-attribute
¶
end_index: int
instance-attribute
¶
collection_name: str
instance-attribute
¶
source: str
instance-attribute
¶
file_list: list[str] | None = None
class-attribute
instance-attribute
¶
__init__(job_id: str, batch_id: str, start_index: int, end_index: int, collection_name: str, source: str, file_list: list[str] | None = None) -> None
¶
TaskQueueClient
¶
Client for enqueueing tasks to Cloud Tasks.
project_id = project_id or os.getenv('GCP_PROJECT_ID')
instance-attribute
¶
location = location or os.getenv('CLOUD_TASKS_LOCATION', 'us-central1')
instance-attribute
¶
queue_name = queue_name or os.getenv('CLOUD_TASKS_QUEUE')
instance-attribute
¶
service_url = service_url or os.getenv('CLOUD_RUN_SERVICE_URL')
instance-attribute
¶
service_account_email = service_account_email or os.getenv('SERVICE_ACCOUNT_EMAIL')
instance-attribute
¶
client: tasks_v2.CloudTasksClient
property
¶
Lazy-initialize the Cloud Tasks client.
queue_path: str
property
¶
Get the full queue path.
__init__(project_id: str | None = None, location: str | None = None, queue_name: str | None = None, service_url: str | None = None, service_account_email: str | None = None)
¶
Initialize the Cloud Tasks client.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
project_id
|
str | None
|
GCP project ID (default: from GCP_PROJECT_ID env) |
None
|
location
|
str | None
|
Cloud Tasks location/region (default: from CLOUD_TASKS_LOCATION env) |
None
|
queue_name
|
str | None
|
Cloud Tasks queue name (default: from CLOUD_TASKS_QUEUE env) |
None
|
service_url
|
str | None
|
Cloud Run service URL for callbacks (default: from CLOUD_RUN_SERVICE_URL env) |
None
|
service_account_email
|
str | None
|
Service account for OIDC auth (default: from SERVICE_ACCOUNT_EMAIL env) |
None
|
is_configured() -> bool
¶
Check if Cloud Tasks is properly configured.
enqueue_batch(batch: BatchTask, delay_seconds: int = 0) -> str | None
¶
Enqueue a batch processing task.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
batch
|
BatchTask
|
BatchTask with batch details |
required |
delay_seconds
|
int
|
Optional delay before task execution |
0
|
Returns:
| Type | Description |
|---|---|
str | None
|
Task name if successful, None if failed |
enqueue_batches(job_id: str, file_list: list[str], collection_name: str, source: str, batch_size: int = 100) -> dict[str, Any]
¶
Split file list into batches and enqueue all.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
job_id
|
str
|
Job ID for tracking |
required |
file_list
|
list[str]
|
List of file paths to process |
required |
collection_name
|
str
|
Target LanceDB collection |
required |
source
|
str
|
Source name (handbook, dnd, personal) |
required |
batch_size
|
int
|
Number of files per batch |
100
|
Returns:
| Type | Description |
|---|---|
dict[str, Any]
|
Dictionary with enqueueing results |