Source code for thoth.shared.cli

"""Command-line interface for Thoth ingestion pipeline.

This module provides a Click-based CLI for running the ingestion pipeline,
checking status, and managing the vector store.
"""

import logging
from pathlib import Path
import signal
import sys
import time
from typing import Any

import click
from rich.console import Console
from rich.panel import Panel
from rich.progress import (
    BarColumn,
    Progress,
    SpinnerColumn,
    TaskProgressColumn,
    TextColumn,
    TimeRemainingColumn,
)
from rich.table import Table

from thoth.ingestion.chunker import MarkdownChunker
from thoth.ingestion.pipeline import IngestionPipeline
from thoth.ingestion.repo_manager import HandbookRepoManager
from thoth.shared.embedder import Embedder
from thoth.shared.monitoring import Monitor, create_default_health_checks
from thoth.shared.scheduler import SyncScheduler
from thoth.shared.utils.logger import setup_logger
from thoth.shared.vector_store import VectorStore

console = Console()


def _setup_monitor_and_scheduler(
    pipeline: IngestionPipeline,
    db_path: str | None,
) -> tuple[Monitor, SyncScheduler]:
    """Set up monitoring and scheduler with callbacks.

    Args:
        pipeline: Configured pipeline instance
        db_path: Database path

    Returns:
        Tuple of (monitor, scheduler)
    """
    # Set up monitoring
    repo_manager = pipeline.repo_manager
    vector_store_path = Path(db_path or "./handbook_vectors")
    repo_path = repo_manager.clone_path

    monitor = Monitor()

    # Register default health checks
    default_checks = create_default_health_checks(
        vector_store_path=vector_store_path,
        repo_path=repo_path,
    )
    for name, check_func in default_checks.items():
        monitor.register_health_check(name, check_func)

    # Set up scheduler
    scheduler = SyncScheduler(pipeline=pipeline)

    # Add monitoring callbacks
    def on_success(stats: dict[str, Any]) -> None:
        monitor.record_sync_success(
            files_processed=stats["processed_files"],
            chunks_created=stats["total_chunks"],
            duration=stats["duration_seconds"],
        )
        console.print(f"[green]✓ Sync completed: {stats['processed_files']} files processed[/green]")

    def on_failure(error: Exception) -> None:
        monitor.record_sync_failure(error)
        console.print(f"[red]✗ Sync failed: {error}[/red]")

    scheduler.add_success_callback(on_success)
    scheduler.add_failure_callback(on_failure)

    return monitor, scheduler


def _display_final_stats(monitor: Monitor) -> None:
    """Display final scheduler statistics.

    Args:
        monitor: Monitor instance with collected metrics
    """
    metrics = monitor.get_metrics()
    console.print("\n[bold]Final Statistics:[/bold]")
    metrics_table = Table(show_header=False, box=None, padding=(0, 2))
    metrics_table.add_column("Key", style="dim")
    metrics_table.add_column("Value")

    metrics_table.add_row("Total Syncs", str(metrics["sync_count"]))
    metrics_table.add_row("Successful", str(metrics["sync_success_count"]))
    metrics_table.add_row("Failed", str(metrics["sync_failure_count"]))
    metrics_table.add_row("Success Rate", f"{monitor.get_success_rate():.1f}%")

    console.print(metrics_table)


[docs] def setup_pipeline( repo_url: str | None, clone_path: str | None, db_path: str | None, collection: str | None, ) -> IngestionPipeline: """Set up the ingestion pipeline with given configuration. Args: repo_url: Repository URL (None for default) clone_path: Local clone path (None for default) db_path: Database path (None for default) collection: Collection name (None for default) Returns: Configured IngestionPipeline instance """ # Set up logging logger = setup_logger("thoth", level=logging.INFO) # Initialize components repo_manager = HandbookRepoManager( repo_url=repo_url or "https://gitlab.com/gitlab-com/content-sites/handbook.git", clone_path=Path(clone_path) if clone_path else None, logger=logger, ) chunker = MarkdownChunker(logger=logger) embedder = Embedder(model_name="all-MiniLM-L6-v2", batch_size=32) vector_store = VectorStore( persist_directory=db_path or "./handbook_vectors", collection_name=collection or "thoth_documents", embedder=embedder, ) return IngestionPipeline( repo_manager=repo_manager, chunker=chunker, embedder=embedder, vector_store=vector_store, logger_instance=logger, )
@click.group() @click.version_option() def cli() -> None: """Thoth - GitLab Handbook Ingestion Pipeline. Ingest, process, and index the GitLab handbook for semantic search. """ @cli.command() @click.option( "--repo-url", default=None, help="Repository URL (default: GitLab handbook)", ) @click.option( "--clone-path", default=None, help="Local path to clone repository (default: ~/.thoth/handbook)", ) @click.option( "--db-path", default=None, help="Database path (default: ./handbook_vectors)", ) @click.option( "--collection", default=None, help="Collection name (default: thoth_documents)", ) @click.option( "--force", is_flag=True, help="Force re-clone of repository", ) @click.option( "--full", is_flag=True, help="Process all files (disable incremental mode)", ) @click.option( "--batch-size", default=50, type=int, help="Number of files to process per batch (default: 50)", ) def ingest( repo_url: str | None, clone_path: str | None, db_path: str | None, collection: str | None, force: bool, full: bool, batch_size: int, ) -> None: """Run the ingestion pipeline to index the GitLab handbook. This command will: 1. Clone or update the GitLab handbook repository 2. Discover and process markdown files 3. Generate chunks and embeddings 4. Store in the vector database By default, runs in incremental mode (only processes changed files). Use --full to process all files regardless of changes. """ console.print(Panel.fit("🔮 Thoth Ingestion Pipeline", style="bold magenta")) try: # Set up pipeline pipeline = setup_pipeline(repo_url, clone_path, db_path, collection) pipeline.batch_size = batch_size # Create progress display with Progress( SpinnerColumn(), TextColumn("[bold blue]{task.description}"), BarColumn(), TaskProgressColumn(), TimeRemainingColumn(), console=console, ) as progress: task = progress.add_task("Initializing...", total=100) def progress_callback(current: int, total: int, message: str) -> None: """Update progress bar.""" progress.update(task, completed=current, total=total, description=message) # Run pipeline console.print("\n[bold]Starting ingestion...[/bold]") stats = pipeline.run( force_reclone=force, incremental=not full, progress_callback=progress_callback, ) # Display results console.print("\n[bold green]✓ Ingestion Complete![/bold green]\n") table = Table(show_header=True, header_style="bold cyan") table.add_column("Metric", style="dim") table.add_column("Value", justify="right") table.add_row("Files Processed", f"{stats.processed_files:,}") table.add_row("Files Failed", f"{stats.failed_files:,}" if stats.failed_files > 0 else "0") table.add_row("Total Chunks", f"{stats.total_chunks:,}") table.add_row("Total Documents", f"{stats.total_documents:,}") table.add_row("Duration", f"{stats.duration_seconds:.2f}s") table.add_row("Throughput (files/sec)", f"{stats.files_per_second:.2f}") table.add_row("Throughput (chunks/sec)", f"{stats.chunks_per_second:.2f}") console.print(table) if stats.failed_files > 0: console.print( f"\n[yellow]⚠ {stats.failed_files} file(s) failed to process. Check logs for details.[/yellow]" ) except RuntimeError as e: console.print(f"\n[bold red]✗ Error:[/bold red] {e}") raise click.Abort from e @cli.command() @click.option( "--clone-path", default=None, help="Local path to repository (default: ~/.thoth/handbook)", ) @click.option( "--db-path", default=None, help="Database path (default: ./handbook_vectors)", ) @click.option( "--collection", default=None, help="Collection name (default: thoth_documents)", ) def status( clone_path: str | None, db_path: str | None, collection: str | None, ) -> None: """Show current pipeline status and statistics.""" console.print(Panel.fit("📊 Pipeline Status", style="bold cyan")) try: pipeline = setup_pipeline(None, clone_path, db_path, collection) status_info = pipeline.get_status() # Repository info console.print("\n[bold]Repository:[/bold]") repo_table = Table(show_header=False, box=None, padding=(0, 2)) repo_table.add_column("Key", style="dim") repo_table.add_column("Value") repo_table.add_row("Path", status_info["repo_path"]) repo_table.add_row("Exists", "✓ Yes" if status_info["repo_exists"] else "✗ No") console.print(repo_table) # State info state = status_info["state"] console.print("\n[bold]Pipeline State:[/bold]") state_table = Table(show_header=False, box=None, padding=(0, 2)) state_table.add_column("Key", style="dim") state_table.add_column("Value") state_table.add_row("Last Commit", state["last_commit"] or "None") state_table.add_row("Processed Files", f"{len(state['processed_files']):,}") state_table.add_row("Failed Files", f"{len(state['failed_files']):,}") state_table.add_row("Total Chunks", f"{state['total_chunks']:,}") state_table.add_row("Total Documents", f"{state['total_documents']:,}") state_table.add_row("Completed", "✓ Yes" if state["completed"] else "✗ No") if state["start_time"]: state_table.add_row("Start Time", state["start_time"]) if state["last_update_time"]: state_table.add_row("Last Update", state["last_update_time"]) console.print(state_table) # Vector store info console.print("\n[bold]Vector Store:[/bold]") store_table = Table(show_header=False, box=None, padding=(0, 2)) store_table.add_column("Key", style="dim") store_table.add_column("Value") store_table.add_row("Collection", status_info["vector_store_collection"]) store_table.add_row("Document Count", f"{status_info['vector_store_count']:,}") console.print(store_table) # Failed files detail if state["failed_files"]: console.print("\n[bold yellow]Failed Files:[/bold yellow]") failed_table = Table(show_header=True, header_style="bold yellow") failed_table.add_column("File", style="dim") failed_table.add_column("Error") for file, error in list(state["failed_files"].items())[:10]: failed_table.add_row(file, error[:80] + "..." if len(error) > 80 else error) console.print(failed_table) if len(state["failed_files"]) > 10: console.print(f"[dim]... and {len(state['failed_files']) - 10} more[/dim]") except RuntimeError as e: console.print(f"\n[bold red]✗ Error:[/bold red] {e}") raise click.Abort from e @cli.command() @click.option( "--clone-path", default=None, help="Local path to repository (default: ~/.thoth/handbook)", ) @click.option( "--db-path", default=None, help="Database path (default: ./handbook_vectors)", ) @click.option( "--collection", default=None, help="Collection name (default: thoth_documents)", ) @click.option( "--keep-repo", is_flag=True, help="Keep the cloned repository (only reset database and state)", ) @click.confirmation_option(prompt="Are you sure you want to reset the pipeline?") def reset( clone_path: str | None, db_path: str | None, collection: str | None, keep_repo: bool, ) -> None: """Reset the pipeline state and vector database. This will clear all processed data and start fresh. Use --keep-repo to preserve the cloned repository. """ console.print(Panel.fit("🔄 Reset Pipeline", style="bold yellow")) try: pipeline = setup_pipeline(None, clone_path, db_path, collection) with console.status("[bold yellow]Resetting pipeline...", spinner="dots"): pipeline.reset(keep_repo=keep_repo) console.print("\n[bold green]✓ Pipeline reset successfully![/bold green]") if keep_repo: console.print("[dim]Repository preserved. Vector store and state cleared.[/dim]") else: console.print("[dim]Repository, vector store, and state cleared.[/dim]") except RuntimeError as e: console.print(f"\n[bold red]✗ Error:[/bold red] {e}") raise click.Abort from e @cli.command() @click.option( "--db-path", default=None, help="Database path (default: ./handbook_vectors)", ) @click.option( "--collection", default=None, help="Collection name (default: thoth_documents)", ) @click.option( "--query", "-q", required=True, help="Query text to search for", ) @click.option( "--limit", "-n", default=5, type=int, help="Number of results to return (default: 5)", ) def search( db_path: str | None, collection: str | None, query: str, limit: int, ) -> None: """Search the indexed handbook for relevant content. Example: thoth search -q "How to contribute to GitLab?" -n 3 """ console.print(Panel.fit(f"🔍 Searching: {query}", style="bold blue")) try: # Set up vector store embedder = Embedder(model_name="all-MiniLM-L6-v2") vector_store = VectorStore( persist_directory=db_path or "./handbook_vectors", collection_name=collection or "thoth_documents", embedder=embedder, ) # Check if there are any documents count = vector_store.get_document_count() if count == 0: console.print("\n[yellow]⚠ No documents found in the vector store. Run 'thoth ingest' first.[/yellow]") return # Perform search with console.status("[bold blue]Searching...", spinner="dots"): results = vector_store.search_similar(query, n_results=limit) # Display results if not results or not results["documents"]: console.print("\n[yellow]No results found.[/yellow]") return console.print(f"\n[bold]Found {len(results['documents'][0])} results:[/bold]\n") for i, (doc, metadata, distance) in enumerate( zip( results["documents"][0], results["metadatas"][0], results["distances"][0], strict=True, ), 1, ): similarity = 1 - distance # Convert distance to similarity console.print( Panel( f"[bold]File:[/bold] {metadata.get('file_path', 'Unknown')}\n" f"[bold]Chunk:[/bold] {metadata.get('chunk_index', '?')}/{metadata.get('total_chunks', '?')}\n" f"[bold]Similarity:[/bold] {similarity:.2%}\n\n" f"{doc[:500]}{'...' if len(doc) > 500 else ''}", title=f"Result {i}", border_style="blue", ) ) except RuntimeError as e: console.print(f"\n[bold red]✗ Error:[/bold red] {e}") raise click.Abort from e @cli.command() @click.option( "--repo-url", default=None, help="Repository URL (default: GitLab handbook)", ) @click.option( "--clone-path", default=None, help="Local path to clone repository (default: ~/.thoth/handbook)", ) @click.option( "--db-path", default=None, help="Database path (default: ./handbook_vectors)", ) @click.option( "--collection", default=None, help="Collection name (default: thoth_documents)", ) @click.option( "--interval", default=60, type=int, help="Sync interval in minutes (default: 60)", ) @click.option( "--cron-hour", type=int, help="Hour to run (0-23) for cron-style scheduling", ) @click.option( "--cron-minute", default=0, type=int, help="Minute to run (0-59) for cron-style scheduling", ) @click.option( "--start-immediately", is_flag=True, help="Run sync immediately on start", ) def schedule( repo_url: str | None, clone_path: str | None, db_path: str | None, collection: str | None, interval: int, cron_hour: int | None, cron_minute: int, start_immediately: bool, ) -> None: """Start the scheduler for automated syncs. By default, syncs run every 60 minutes. Use --interval to change frequency, or use --cron-hour and --cron-minute for cron-style scheduling. Examples: # Run every 30 minutes thoth schedule --interval 30 # Run daily at 2:30 AM thoth schedule --cron-hour 2 --cron-minute 30 # Run every hour, starting immediately thoth schedule --start-immediately Press Ctrl+C to stop the scheduler. """ console.print(Panel.fit("⏰ Thoth Scheduler", style="bold magenta")) try: # Set up pipeline pipeline = setup_pipeline(repo_url, clone_path, db_path, collection) # Set up monitoring and scheduler monitor, scheduler = _setup_monitor_and_scheduler(pipeline, db_path) # Configure schedule if cron_hour is not None: scheduler.add_cron_job(hour=cron_hour, minute=cron_minute) console.print(f"[cyan]Scheduled daily at {cron_hour:02d}:{cron_minute:02d}[/cyan]") else: scheduler.add_interval_job( interval_minutes=interval, start_immediately=start_immediately, ) console.print(f"[cyan]Scheduled every {interval} minutes[/cyan]") # Start scheduler scheduler.start() console.print("[green]Scheduler started[/green]") # Display status status = scheduler.get_job_status() if status["next_run_time"]: console.print(f"Next run: {status['next_run_time']}") console.print("\n[dim]Press Ctrl+C to stop...[/dim]\n") # Set up signal handler for graceful shutdown def signal_handler(_sig: int, _frame: Any) -> None: console.print("\n[yellow]Stopping scheduler...[/yellow]") scheduler.stop() console.print("[green]Scheduler stopped[/green]") _display_final_stats(monitor) sys.exit(0) signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) # Keep running while True: time.sleep(1) except RuntimeError as e: console.print(f"\n[bold red]✗ Error:[/bold red] {e}") raise click.Abort from e @cli.command() @click.option( "--clone-path", default=None, help="Local path to repository (default: ~/.thoth/handbook)", ) @click.option( "--db-path", default=None, help="Database path (default: ./handbook_vectors)", ) def health( clone_path: str | None, db_path: str | None, ) -> None: """Check system health status. Runs health checks on key components and displays the results. """ console.print(Panel.fit("🏥 Health Check", style="bold cyan")) try: # Set up paths vector_store_path = Path(db_path or "./handbook_vectors") repo_manager = HandbookRepoManager( repo_url="https://gitlab.com/gitlab-com/content-sites/handbook.git", clone_path=Path(clone_path) if clone_path else None, ) repo_path = repo_manager.clone_path # Set up monitor monitor = Monitor() # Register default health checks default_checks = create_default_health_checks( vector_store_path=vector_store_path, repo_path=repo_path, ) for name, check_func in default_checks.items(): monitor.register_health_check(name, check_func) # Run health checks report = monitor.get_health_report() # Display overall status overall = report["overall_status"] status_colors = { "healthy": "green", "degraded": "yellow", "unhealthy": "red", "unknown": "dim", } status_symbols = { "healthy": "✓", "degraded": "⚠", "unhealthy": "✗", "unknown": "?", } color = status_colors.get(overall, "white") symbol = status_symbols.get(overall, "?") console.print(f"\n[bold {color}]{symbol} Overall Status: {overall.upper()}[/bold {color}]\n") # Display individual checks checks_table = Table(show_header=True, header_style="bold") checks_table.add_column("Component", style="bold") checks_table.add_column("Status", justify="center") checks_table.add_column("Message") for name, check_data in report["checks"].items(): status = check_data["status"] color = status_colors.get(status, "white") symbol = status_symbols.get(status, "?") checks_table.add_row( name.replace("_", " ").title(), f"[{color}]{symbol} {status}[/{color}]", check_data["message"], ) console.print(checks_table) # Exit with appropriate code if overall == "unhealthy": sys.exit(1) elif overall == "degraded": sys.exit(2) else: sys.exit(0) except Exception as e: console.print(f"\n[bold red]✗ Error:[/bold red] {e}") raise click.Abort from e @cli.command() @click.option( "--repo-url", default=None, help="Repository URL (default: GitLab handbook)", ) @click.option( "--clone-path", default=None, help="Local path to repository (default: ~/.thoth/handbook)", ) @click.option( "--db-path", default=None, help="Database path (default: ./handbook_vectors)", ) @click.option( "--collection", default=None, help="Collection name (default: thoth_documents)", ) def sync( repo_url: str | None, clone_path: str | None, db_path: str | None, collection: str | None, ) -> None: """Manually trigger a sync operation. This is useful for testing the scheduler or running a one-off sync. """ console.print(Panel.fit("🔄 Manual Sync", style="bold blue")) try: # Set up pipeline pipeline = setup_pipeline(repo_url, clone_path, db_path, collection) # Set up monitoring monitor = Monitor() monitor.record_sync_start() # Run sync with Progress( SpinnerColumn(), TextColumn("[bold blue]{task.description}"), BarColumn(), TaskProgressColumn(), TimeRemainingColumn(), console=console, ) as progress: task = progress.add_task("Syncing...", total=100) def progress_callback(current: int, total: int, message: str) -> None: progress.update(task, completed=current, total=total, description=message) try: stats = pipeline.run( force_reclone=False, incremental=True, progress_callback=progress_callback, ) monitor.record_sync_success( files_processed=stats.processed_files, chunks_created=stats.total_chunks, duration=stats.duration_seconds, ) console.print("\n[bold green]✓ Sync completed successfully![/bold green]\n") # Display stats stats_table = Table(show_header=False, box=None, padding=(0, 2)) stats_table.add_column("Metric", style="dim") stats_table.add_column("Value") stats_table.add_row("Files Processed", f"{stats.processed_files:,}") stats_table.add_row("Total Chunks", f"{stats.total_chunks:,}") stats_table.add_row("Duration", f"{stats.duration_seconds:.2f}s") console.print(stats_table) except Exception as e: monitor.record_sync_failure(e) raise except RuntimeError as e: console.print(f"\n[bold red]✗ Error:[/bold red] {e}") raise click.Abort from e if __name__ == "__main__": cli()