Gcs sync
thoth.shared.gcs_sync
¶
Google Cloud Storage sync for vector database persistence.
This module provides upload/download of local vector database directories (e.g., LanceDB data) to/from a GCS bucket for backup and restore. Used by the ingestion worker and CLI when not using a direct gs:// LanceDB URI.
logger = setup_logger(__name__)
module-attribute
¶
GCS_AVAILABLE = True
module-attribute
¶
GCSSyncError
¶
Raised when GCS sync operations fail.
GCSSync
¶
Manages sync of local vector DB directories to/from Google Cloud Storage.
Handles uploading a local directory (e.g., LanceDB persistence path) to a GCS prefix and downloading it back for restore. Verifies bucket existence on init and uses Application Default Credentials or a provided credentials path.
logger = logger_instance or logger
instance-attribute
¶
bucket_name = bucket_name
instance-attribute
¶
project_id = project_id
instance-attribute
¶
client = storage.Client(project=project_id)
instance-attribute
¶
bucket = self.client.bucket(bucket_name)
instance-attribute
¶
__init__(bucket_name: str, project_id: str | None = None, credentials_path: str | None = None, logger_instance: logging.Logger | logging.LoggerAdapter | None = None)
¶
Initialize GCS sync manager.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
bucket_name
|
str
|
Name of the GCS bucket for storage |
required |
project_id
|
str | None
|
Optional GCP project ID (defaults to environment) |
None
|
credentials_path
|
str | None
|
Optional path to service account JSON key file If not provided, uses Application Default Credentials |
None
|
logger_instance
|
Logger | LoggerAdapter | None
|
Optional logger instance to use. |
None
|
Raises:
| Type | Description |
|---|---|
GCSSyncError
|
If google-cloud-storage is not installed |
upload_directory(local_path: str | Path, gcs_prefix: str = 'lancedb', exclude_patterns: list[str] | None = None) -> int
¶
Upload a local directory to GCS.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
local_path
|
str | Path
|
Path to local directory to upload |
required |
gcs_prefix
|
str
|
Prefix (folder path) in GCS bucket |
'lancedb'
|
exclude_patterns
|
list[str] | None
|
Optional list of filename patterns to exclude |
None
|
Returns:
| Type | Description |
|---|---|
int
|
Number of files uploaded |
Raises:
| Type | Description |
|---|---|
GCSSyncError
|
If upload fails |
download_directory(gcs_prefix: str, local_path: str | Path, clean_local: bool = False) -> int
¶
Download a directory from GCS to local storage.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
gcs_prefix
|
str
|
Prefix (folder path) in GCS bucket |
required |
local_path
|
str | Path
|
Path to local directory for download |
required |
clean_local
|
bool
|
If True, remove local directory before download |
False
|
Returns:
| Type | Description |
|---|---|
int
|
Number of files downloaded |
Raises:
| Type | Description |
|---|---|
GCSSyncError
|
If download fails |
sync_to_gcs(local_path: str | Path, gcs_prefix: str = 'lancedb') -> dict[str, int | str]
¶
Sync local LanceDB directory to GCS (upload).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
local_path
|
str | Path
|
Path to local LanceDB directory |
required |
gcs_prefix
|
str
|
Prefix in GCS bucket |
'lancedb'
|
Returns:
| Type | Description |
|---|---|
dict[str, int | str]
|
Dictionary with sync statistics |
Raises:
| Type | Description |
|---|---|
GCSSyncError
|
If sync fails |
sync_from_gcs(gcs_prefix: str, local_path: str | Path, clean_local: bool = False) -> dict[str, int | str]
¶
Sync LanceDB directory from GCS to local (download).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
gcs_prefix
|
str
|
Prefix in GCS bucket |
required |
local_path
|
str | Path
|
Path to local LanceDB directory |
required |
clean_local
|
bool
|
If True, remove local directory before sync |
False
|
Returns:
| Type | Description |
|---|---|
dict[str, int | str]
|
Dictionary with sync statistics |
Raises:
| Type | Description |
|---|---|
GCSSyncError
|
If sync fails |
backup_to_gcs(local_path: str | Path, backup_name: str | None = None) -> str
¶
Create a timestamped backup in GCS.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
local_path
|
str | Path
|
Path to local LanceDB directory |
required |
backup_name
|
str | None
|
Optional backup name (defaults to timestamp) |
None
|
Returns:
| Type | Description |
|---|---|
str
|
GCS prefix of the backup |
Raises:
| Type | Description |
|---|---|
GCSSyncError
|
If backup fails |
restore_from_backup(backup_name: str, local_path: str | Path, clean_local: bool = True) -> int
¶
Restore LanceDB from a GCS backup.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
backup_name
|
str
|
Name of the backup to restore |
required |
local_path
|
str | Path
|
Path to local LanceDB directory |
required |
clean_local
|
bool
|
If True, remove local directory before restore |
True
|
Returns:
| Type | Description |
|---|---|
int
|
Number of files restored |
Raises:
| Type | Description |
|---|---|
GCSSyncError
|
If restore fails |
list_backups() -> list[str]
¶
List available backups in GCS.
Returns:
| Type | Description |
|---|---|
list[str]
|
List of backup names |
Raises:
| Type | Description |
|---|---|
GCSSyncError
|
If listing fails |