Source code for thoth.shared.gcs_sync

"""Google Cloud Storage sync for vector database persistence.

This module provides upload/download of local vector database directories
(e.g., LanceDB data) to/from a GCS bucket for backup
and restore. Used by the ingestion worker and CLI when not using a direct
gs:// LanceDB URI.
"""

from datetime import UTC, datetime
import logging
import os
from pathlib import Path
import shutil

from thoth.shared.utils.logger import setup_logger

logger = setup_logger(__name__)

try:
    from google.cloud import storage  # type: ignore[attr-defined]
    from google.cloud.exceptions import GoogleCloudError

    GCS_AVAILABLE = True
except ImportError:
    GCS_AVAILABLE = False
    logger.warning("google-cloud-storage not installed. GCS sync will not be available.")


[docs] class GCSSyncError(Exception): """Raised when GCS sync operations fail."""
[docs] class GCSSync: """Manages sync of local vector DB directories to/from Google Cloud Storage. Handles uploading a local directory (e.g., LanceDB persistence path) to a GCS prefix and downloading it back for restore. Verifies bucket existence on init and uses Application Default Credentials or a provided credentials path. """
[docs] def __init__( self, bucket_name: str, project_id: str | None = None, credentials_path: str | None = None, logger_instance: logging.Logger | logging.LoggerAdapter | None = None, ): """Initialize GCS sync manager. Args: bucket_name: Name of the GCS bucket for storage project_id: Optional GCP project ID (defaults to environment) credentials_path: Optional path to service account JSON key file If not provided, uses Application Default Credentials logger_instance: Optional logger instance to use. Raises: GCSSyncError: If google-cloud-storage is not installed """ self.logger = logger_instance or logger if not GCS_AVAILABLE: msg = "google-cloud-storage package is not installed. Install with: pip install google-cloud-storage" raise GCSSyncError(msg) self.bucket_name = bucket_name self.project_id = project_id # Set credentials if provided if credentials_path: os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = credentials_path try: # Initialize storage client self.client = storage.Client(project=project_id) self.bucket = self.client.bucket(bucket_name) # Verify bucket exists if not self.bucket.exists(): msg = f"Bucket '{bucket_name}' does not exist" raise GCSSyncError(msg) self.logger.info(f"Initialized GCS sync with bucket: {bucket_name}") except GoogleCloudError as e: msg = f"Failed to initialize GCS client: {e}" raise GCSSyncError(msg) from e
[docs] def upload_directory( self, local_path: str | Path, gcs_prefix: str = "lancedb", exclude_patterns: list[str] | None = None, ) -> int: """Upload a local directory to GCS. Args: local_path: Path to local directory to upload gcs_prefix: Prefix (folder path) in GCS bucket exclude_patterns: Optional list of filename patterns to exclude Returns: Number of files uploaded Raises: GCSSyncError: If upload fails """ local_path = Path(local_path) if not local_path.exists(): msg = f"Local path does not exist: {local_path}" raise GCSSyncError(msg) if not local_path.is_dir(): msg = f"Local path is not a directory: {local_path}" raise GCSSyncError(msg) uploaded_count = 0 try: self.logger.info(f"Starting upload from {local_path} to gs://{self.bucket_name}/{gcs_prefix}") # Walk through directory and upload each file for file_path in local_path.rglob("*"): if file_path.is_file(): # Check if file should be excluded patterns = exclude_patterns if exclude_patterns is not None else [] should_exclude = any(pattern in str(file_path) for pattern in patterns) if should_exclude: self.logger.debug(f"Excluding file: {file_path}") continue # Calculate relative path for GCS relative_path = file_path.relative_to(local_path) blob_name = f"{gcs_prefix}/{relative_path}".replace("\\", "/") # Upload file blob = self.bucket.blob(blob_name) blob.upload_from_filename(str(file_path)) uploaded_count += 1 self.logger.debug(f"Uploaded: {blob_name}") self.logger.info(f"Successfully uploaded {uploaded_count} files to GCS") return uploaded_count except GoogleCloudError as e: msg = f"Failed to upload directory: {e}" raise GCSSyncError(msg) from e
[docs] def download_directory( self, gcs_prefix: str, local_path: str | Path, clean_local: bool = False, ) -> int: """Download a directory from GCS to local storage. Args: gcs_prefix: Prefix (folder path) in GCS bucket local_path: Path to local directory for download clean_local: If True, remove local directory before download Returns: Number of files downloaded Raises: GCSSyncError: If download fails """ local_path = Path(local_path) # Clean local directory if requested if clean_local and local_path.exists(): self.logger.info(f"Cleaning local directory: {local_path}") shutil.rmtree(local_path) # Create local directory local_path.mkdir(parents=True, exist_ok=True) downloaded_count = 0 try: self.logger.info(f"Starting download from gs://{self.bucket_name}/{gcs_prefix} to {local_path}") # List all blobs with the prefix blobs = list(self.bucket.list_blobs(prefix=gcs_prefix)) if not blobs: self.logger.warning(f"No files found with prefix: {gcs_prefix}") return 0 # Download each blob for blob in blobs: # Skip directory markers (blobs ending with /) if blob.name.endswith("/"): continue # Calculate local file path relative_path = blob.name[len(gcs_prefix) :].lstrip("/") file_path = local_path / relative_path # Create parent directories file_path.parent.mkdir(parents=True, exist_ok=True) # Download file blob.download_to_filename(str(file_path)) downloaded_count += 1 self.logger.debug(f"Downloaded: {blob.name}") self.logger.info(f"Successfully downloaded {downloaded_count} files from GCS") return downloaded_count except GoogleCloudError as e: msg = f"Failed to download directory: {e}" raise GCSSyncError(msg) from e
[docs] def sync_to_gcs( self, local_path: str | Path, gcs_prefix: str = "lancedb", ) -> dict[str, int | str]: """Sync local LanceDB directory to GCS (upload). Args: local_path: Path to local LanceDB directory gcs_prefix: Prefix in GCS bucket Returns: Dictionary with sync statistics Raises: GCSSyncError: If sync fails """ self.logger.info(f"Syncing to GCS: {local_path} -> gs://{self.bucket_name}/{gcs_prefix}") uploaded = self.upload_directory(local_path, gcs_prefix) return { "uploaded_files": uploaded, "direction": "to_gcs", "bucket": self.bucket_name, "prefix": gcs_prefix, }
[docs] def sync_from_gcs( self, gcs_prefix: str, local_path: str | Path, clean_local: bool = False, ) -> dict[str, int | str]: """Sync LanceDB directory from GCS to local (download). Args: gcs_prefix: Prefix in GCS bucket local_path: Path to local LanceDB directory clean_local: If True, remove local directory before sync Returns: Dictionary with sync statistics Raises: GCSSyncError: If sync fails """ self.logger.info(f"Syncing from GCS: gs://{self.bucket_name}/{gcs_prefix} -> {local_path}") downloaded = self.download_directory(gcs_prefix, local_path, clean_local) return { "downloaded_files": downloaded, "direction": "from_gcs", "bucket": self.bucket_name, "prefix": gcs_prefix, }
[docs] def backup_to_gcs( self, local_path: str | Path, backup_name: str | None = None, ) -> str: """Create a timestamped backup in GCS. Args: local_path: Path to local LanceDB directory backup_name: Optional backup name (defaults to timestamp) Returns: GCS prefix of the backup Raises: GCSSyncError: If backup fails """ if backup_name is None: timestamp = datetime.now(tz=UTC).strftime("%Y%m%d_%H%M%S") backup_name = f"backup_{timestamp}" gcs_prefix = f"backups/{backup_name}" self.logger.info(f"Creating backup: {backup_name}") self.upload_directory(local_path, gcs_prefix) self.logger.info(f"Backup created at: gs://{self.bucket_name}/{gcs_prefix}") return gcs_prefix
[docs] def restore_from_backup( self, backup_name: str, local_path: str | Path, clean_local: bool = True, ) -> int: """Restore LanceDB from a GCS backup. Args: backup_name: Name of the backup to restore local_path: Path to local LanceDB directory clean_local: If True, remove local directory before restore Returns: Number of files restored Raises: GCSSyncError: If restore fails """ gcs_prefix = f"backups/{backup_name}" self.logger.info(f"Restoring backup: {backup_name}") result = self.sync_from_gcs(gcs_prefix, local_path, clean_local) self.logger.info(f"Restored {result['downloaded_files']} files from backup") downloaded = result["downloaded_files"] if isinstance(downloaded, int): return downloaded return 0
[docs] def list_backups(self) -> list[str]: """List available backups in GCS. Returns: List of backup names Raises: GCSSyncError: If listing fails """ try: blobs = self.bucket.list_blobs(prefix="backups/") # Extract unique backup names backup_names = set() for blob in blobs: # Extract backup name from path like "backups/backup_20240112_120000/..." parts = blob.name.split("/") if len(parts) >= 2 and parts[0] == "backups": backup_names.add(parts[1]) return sorted(backup_names) except GoogleCloudError as e: msg = f"Failed to list backups: {e}" raise GCSSyncError(msg) from e