"""GCS-based repository synchronization for Cloud Run.
This module handles syncing the GitLab handbook between GCS and local storage:
1. Clone repository to GCS (once, or on updates)
2. Sync from GCS to local /tmp on Cloud Run startup
"""
from concurrent.futures import ThreadPoolExecutor
import logging
from pathlib import Path
import shutil
import tempfile
from typing import Any
from git import Repo
from google.cloud import storage # type: ignore[attr-defined]
from thoth.shared.utils.logger import setup_logger
logger = setup_logger(__name__)
[docs]
class GCSRepoSync:
"""Manages repository synchronization between GCS and local storage."""
[docs]
def __init__(
self,
bucket_name: str,
repo_url: str,
gcs_prefix: str = "handbook",
local_path: Path | None = None,
logger_instance: logging.Logger | logging.LoggerAdapter | None = None,
):
"""Initialize GCS repository sync.
Args:
bucket_name: GCS bucket name
repo_url: Git repository URL
gcs_prefix: Prefix/folder in GCS bucket for repository files
local_path: Local path to sync to (defaults to /tmp/handbook)
logger_instance: Optional logger instance to use.
"""
self.bucket_name = bucket_name
self.repo_url = repo_url
self.gcs_prefix = gcs_prefix.strip("/")
self.local_path = local_path or Path("/tmp/handbook") # nosec B108 - Cloud Run requires /tmp
self.storage_client = storage.Client()
self.bucket = self.storage_client.bucket(bucket_name)
self.logger = logger_instance or logger
[docs]
def clone_to_gcs(self, force: bool = False) -> dict[str, Any]:
"""Clone repository and upload to GCS.
This should be run once initially, or when you want to refresh
the repository in GCS.
Args:
force: If True, re-clone even if files exist in GCS
Returns:
Dictionary with stats about the clone operation
"""
self.logger.info("Cloning repository to GCS: %s", self.repo_url)
# Check if already exists in GCS
if not force:
blobs = list(self.bucket.list_blobs(prefix=f"{self.gcs_prefix}/", max_results=1))
if blobs:
self.logger.info("Repository already exists in GCS. Use force=True to re-clone.")
return {
"status": "exists",
"message": "Repository already in GCS",
}
# Clone to temporary directory
with tempfile.TemporaryDirectory() as tmpdir:
tmp_path = Path(tmpdir) / "repo"
self.logger.info("Cloning to temporary directory: %s", tmp_path)
Repo.clone_from(self.repo_url, str(tmp_path))
# Upload all files to GCS
self.logger.info(
"Uploading repository to GCS bucket: %s/%s",
self.bucket_name,
self.gcs_prefix,
)
uploaded = 0
for file_path in tmp_path.rglob("*"):
if file_path.is_file() and ".git" not in str(file_path):
relative_path = file_path.relative_to(tmp_path)
blob_name = f"{self.gcs_prefix}/{relative_path}"
blob = self.bucket.blob(blob_name)
blob.upload_from_filename(str(file_path))
uploaded += 1
if uploaded % 100 == 0:
self.logger.info("Uploaded %d files...", uploaded)
self.logger.info("Successfully uploaded %d files to GCS", uploaded)
return {
"status": "success",
"files_uploaded": uploaded,
"gcs_path": f"gs://{self.bucket_name}/{self.gcs_prefix}",
}
[docs]
def sync_to_local(self, force: bool = False) -> dict[str, Any]:
"""Sync repository from GCS to local storage.
This is called on Cloud Run startup to get the latest repository files.
Args:
force: If True, delete and re-download even if local files exist
Returns:
Dictionary with stats about the sync operation
"""
# Check if already synced (has markdown files, not just directory exists)
if not force and self.is_synced():
self.logger.info("Local repository already synced at %s", self.local_path)
file_count = sum(1 for _ in self.local_path.rglob("*.md"))
return {
"status": "exists",
"local_path": str(self.local_path),
"file_count": file_count,
}
# Clean up if forcing re-download
if self.local_path.exists() and force:
self.logger.info("Removing existing local repository: %s", self.local_path)
shutil.rmtree(self.local_path)
# Create local directory
self.local_path.mkdir(parents=True, exist_ok=True)
self.logger.info("Syncing repository from GCS to %s", self.local_path)
# Download all blobs
downloaded = 0
blobs = self.bucket.list_blobs(prefix=f"{self.gcs_prefix}/")
for blob in blobs:
# Skip directory markers
if blob.name.endswith("/"):
continue
# Calculate local file path
# Remove prefix + "/"
relative_path = blob.name[len(self.gcs_prefix) + 1 :]
local_file = self.local_path / relative_path
# Create parent directories
local_file.parent.mkdir(parents=True, exist_ok=True)
# Download file
blob.download_to_filename(str(local_file))
downloaded += 1
if downloaded % 100 == 0:
self.logger.info("Downloaded %d files...", downloaded)
self.logger.info("Successfully synced %d files from GCS to local", downloaded)
# Create completion marker to signal sync is done
completion_marker = self.local_path / ".sync_complete"
completion_marker.touch()
self.logger.info("Created sync completion marker at %s", completion_marker)
return {
"status": "success",
"files_downloaded": downloaded,
"local_path": str(self.local_path),
}
[docs]
def list_files_in_gcs(self) -> list[str]:
"""List all files in GCS without downloading.
Returns:
List of relative file paths (e.g., ['docs/setup.md', 'api/README.md'])
"""
self.logger.info("Listing files in GCS bucket: %s/%s", self.bucket_name, self.gcs_prefix)
blobs = self.bucket.list_blobs(prefix=f"{self.gcs_prefix}/")
files = []
for blob in blobs:
# Skip directory markers
if blob.name.endswith("/"):
continue
# Remove prefix to get relative path
relative_path = blob.name[len(self.gcs_prefix) + 1 :]
files.append(relative_path)
self.logger.info("Found %d files in GCS", len(files))
return files
[docs]
def download_batch_files(self, file_list: list[str]) -> Path:
"""Download only specific files for a batch from GCS.
Uses parallel downloads (ThreadPoolExecutor) for faster performance.
Args:
file_list: List of relative file paths to download
Returns:
Path to local directory with downloaded files
"""
self.logger.info("Downloading %d files for batch processing", len(file_list))
# Create local directory if it doesn't exist
self.local_path.mkdir(parents=True, exist_ok=True)
def download_file(relative_path: str) -> None:
"""Download a single file from GCS."""
blob = self.bucket.blob(f"{self.gcs_prefix}/{relative_path}")
local_file = self.local_path / relative_path
# Create parent directories
local_file.parent.mkdir(parents=True, exist_ok=True)
# Download file
blob.download_to_filename(str(local_file))
# Download files in parallel (10 workers for good throughput)
with ThreadPoolExecutor(max_workers=10) as executor:
list(executor.map(download_file, file_list))
self.logger.info("Successfully downloaded %d files for batch", len(file_list))
return self.local_path
[docs]
def get_local_path(self) -> Path:
"""Get the local path where repository is synced.
Returns:
Path to local repository
"""
return self.local_path
[docs]
def is_synced(self) -> bool:
"""Check if repository is synced locally.
Returns:
True if local repository exists and sync is complete
"""
if not self.local_path.exists():
return False
# Check for sync completion marker (created after successful sync)
completion_marker = self.local_path / ".sync_complete"
return completion_marker.exists()