thoth.ingestion.flows.batch

Batch processing workflow.

Handles the /ingest-batch endpoint which processes a specific batch of files in parallel. Each batch writes to its own isolated LanceDB table.

Functions

extract_trace_id_from_header(header_value)

Extract trace ID from X-Cloud-Trace-Context header.

get_job_logger(base_logger, job_id[, ...])

Create a job-scoped logger adapter.

get_job_manager()

Return the global JobManager singleton (creates on first call).

get_source_registry()

Return the global SourceRegistry singleton (creates on first call).

process_batch(request)

Process a specific batch of files (called by Cloud Tasks).

set_trace_context(trace_id[, project_id])

Set the trace context for the current request/task.

setup_logger(name[, level, simple, json_output])

Create and configure a logger with structured JSON output.

Classes

Any(*args, **kwargs)

Special type indicating an unconstrained type.

IngestionPipeline([repo_manager, chunker, ...])

Orchestrates the complete ingestion pipeline.

JSONResponse(content[, status_code, ...])

JobStats([total_files, processed_files, ...])

Statistics for a job.

Request(scope, ~typing.Any], receive, ...)

VectorStore([persist_directory, ...])

Vector store for document embeddings using LanceDB.

async thoth.ingestion.flows.batch.process_batch(request: Request) JSONResponse[source]

Process a specific batch of files (called by Cloud Tasks).

Each batch is stored in a unique GCS prefix to avoid conflicts during parallel processing. Use /merge-batches to consolidate.