"""Repository manager for cloning and tracking the GitLab handbook."""
import json
import logging
from pathlib import Path
import shutil
import time
from typing import Any, ClassVar
from git import GitCommandError, InvalidGitRepositoryError, Repo
from git.remote import RemoteProgress
from thoth.shared.utils.logger import setup_logger
[docs]
class CloneProgress(RemoteProgress):
"""Progress handler for git clone operations.
Logs progress updates during clone/fetch operations to provide visibility
into long-running git operations.
"""
# Operation codes from RemoteProgress
OP_NAMES: ClassVar[dict[int, str]] = {
RemoteProgress.COUNTING: "Counting objects",
RemoteProgress.COMPRESSING: "Compressing objects",
RemoteProgress.WRITING: "Writing objects",
RemoteProgress.RECEIVING: "Receiving objects",
RemoteProgress.RESOLVING: "Resolving deltas",
RemoteProgress.FINDING_SOURCES: "Finding sources",
RemoteProgress.CHECKING_OUT: "Checking out files",
}
[docs]
def __init__(self, logger: logging.Logger | logging.LoggerAdapter) -> None:
"""Initialize the progress handler.
Args:
logger: Logger instance for progress messages
"""
super().__init__()
self.logger = logger
self._last_logged_percent: int = -1
self._current_op: int = 0
[docs]
def update(
self,
op_code: int,
cur_count: str | float,
max_count: str | float | None = None,
message: str = "",
) -> None:
"""Called for each progress update from git.
Args:
op_code: Operation code indicating the current stage
cur_count: Current progress count
max_count: Maximum count (if known)
message: Optional message from git
"""
# Extract the operation type (remove BEGIN/END flags)
op_type = op_code & self.OP_MASK
# Get human-readable operation name
op_name = self.OP_NAMES.get(op_type, f"Operation {op_type}")
# Calculate percentage if max_count is available
if max_count and float(max_count) > 0:
percent = int((float(cur_count) / float(max_count)) * 100)
# Log at 0%, 25%, 50%, 75%, 100% to avoid spam
if percent >= self._last_logged_percent + 25 or (op_type != self._current_op and percent > 0):
self._last_logged_percent = percent
self._current_op = op_type
self.logger.info(
"%s: %d%% (%d/%d)%s",
op_name,
percent,
int(float(cur_count)),
int(float(max_count)),
f" - {message}" if message else "",
)
elif op_code & self.BEGIN:
# Log when a new operation begins
self._last_logged_percent = -1
self.logger.info("%s started%s", op_name, f": {message}" if message else "")
# Constants
DEFAULT_REPO_URL = "https://gitlab.com/gitlab-com/content-sites/handbook.git"
DEFAULT_CLONE_PATH = Path.home() / ".thoth" / "handbook"
METADATA_FILE = "repo_metadata.json"
# Error messages as constants
MSG_REPO_EXISTS = "Repository already exists at {path}. Use force=True to re-clone."
MSG_CLONE_FAILED = "Failed to clone repository after {attempts} attempts"
MSG_UPDATE_FAILED = "Failed to update repository"
MSG_NO_REPO = "No repository found at {path}. Clone the repository first."
MSG_METADATA_SAVE_FAILED = "Failed to save metadata"
MSG_METADATA_LOAD_FAILED = "Failed to load metadata"
MSG_DIFF_FAILED = "Failed to get changed files"
[docs]
class HandbookRepoManager:
"""Manages the GitLab handbook repository."""
[docs]
def __init__(
self,
repo_url: str = DEFAULT_REPO_URL,
clone_path: Path | None = None,
logger: logging.Logger | logging.LoggerAdapter | None = None,
):
"""Initialize the repository manager.
Args:
repo_url: URL of the GitLab handbook repository
clone_path: Local path to clone/store the repository
logger: Logger instance for logging messages
"""
self.repo_url = repo_url
self.clone_path = clone_path or DEFAULT_CLONE_PATH
self.metadata_path = self.clone_path.parent / METADATA_FILE
self.logger: logging.Logger | logging.LoggerAdapter = logger or setup_logger(__name__)
[docs]
def is_valid_repo(self) -> bool:
"""Check if clone_path contains a valid git repository.
Returns:
True if valid repo exists, False otherwise
"""
if not self.clone_path.exists():
return False
try:
repo = Repo(str(self.clone_path))
# Try to access head to verify it's a valid initialized repo
_ = repo.head
return True
except (InvalidGitRepositoryError, ValueError):
return False
[docs]
def clone_handbook(
self,
force: bool = False,
max_retries: int = 3,
retry_delay: int = 5,
shallow: bool = True,
) -> Path:
"""Clone the GitLab handbook repository.
Args:
force: If True, remove existing repository and re-clone
max_retries: Maximum number of clone attempts
retry_delay: Delay in seconds between retries
shallow: If True, perform shallow clone (depth=1) for faster cloning.
Shallow clones only fetch the latest commit, significantly
reducing clone time for large repositories.
Returns:
Path to the cloned repository
Raises:
RuntimeError: If repository exists and force=False
GitCommandError: If cloning fails after all retries
"""
# Only raise error if a VALID repo exists and force=False
if self.is_valid_repo() and not force:
msg = MSG_REPO_EXISTS.format(path=self.clone_path)
raise RuntimeError(msg)
# Remove directory if it exists (whether valid repo or not)
if self.clone_path.exists():
self.logger.info("Removing existing directory at %s", self.clone_path)
shutil.rmtree(self.clone_path)
self.clone_path.parent.mkdir(parents=True, exist_ok=True)
return self._clone_with_retry(max_retries, retry_delay, shallow=shallow)
def _clone_with_retry(
self,
max_retries: int,
retry_delay: int,
shallow: bool = True,
) -> Path:
"""Clone repository with retry logic.
Args:
max_retries: Maximum number of attempts
retry_delay: Delay in seconds between attempts
shallow: If True, perform a shallow clone (depth=1) for faster cloning
Returns:
Path to cloned repository
Raises:
GitCommandError: If all attempts fail
"""
last_error = None
progress = CloneProgress(self.logger)
for attempt in range(1, max_retries + 1):
try:
clone_type = "shallow" if shallow else "full"
self.logger.info(
"Cloning repository (attempt %d/%d, %s clone)...",
attempt,
max_retries,
clone_type,
)
# Build clone options
clone_kwargs: dict[str, Any] = {
"progress": progress,
}
if shallow:
# Shallow clone: only get the latest commit
clone_kwargs["depth"] = 1
clone_kwargs["single_branch"] = True
Repo.clone_from(self.repo_url, str(self.clone_path), **clone_kwargs)
self.logger.info("Successfully cloned repository to %s", self.clone_path)
return self.clone_path
except GitCommandError as e:
last_error = e
self.logger.warning("Clone attempt %d/%d failed: %s", attempt, max_retries, e)
if attempt < max_retries:
self.logger.info("Retrying in %d seconds...", retry_delay)
time.sleep(retry_delay)
# Clean up failed clone attempt
if self.clone_path.exists():
shutil.rmtree(self.clone_path)
msg = MSG_CLONE_FAILED.format(attempts=max_retries)
self.logger.exception("All clone attempts failed")
raise GitCommandError(msg, 1) from last_error
[docs]
def update_repository(self) -> bool:
"""Update the repository by pulling latest changes.
For shallow clones, this fetches only the latest changes while
maintaining the shallow history.
Returns:
True if update successful, False otherwise
Raises:
RuntimeError: If repository doesn't exist
"""
if not self.clone_path.exists():
msg = MSG_NO_REPO.format(path=self.clone_path)
raise RuntimeError(msg)
try:
repo = Repo(str(self.clone_path))
self.logger.info("Pulling latest changes from %s", self.repo_url)
origin = repo.remotes.origin
progress = CloneProgress(self.logger)
origin.pull(progress=progress)
self.logger.info("Successfully updated repository")
return True
except (GitCommandError, InvalidGitRepositoryError):
self.logger.exception(MSG_UPDATE_FAILED)
return False
[docs]
def get_current_commit(self) -> str | None:
"""Get the current commit SHA of the repository.
Returns:
Commit SHA as string, or None if error occurs
Raises:
RuntimeError: If repository doesn't exist
"""
if not self.clone_path.exists():
msg = MSG_NO_REPO.format(path=self.clone_path)
raise RuntimeError(msg)
try:
repo = Repo(str(self.clone_path))
commit_sha = repo.head.commit.hexsha
self.logger.info("Current commit: %s", commit_sha)
return commit_sha
except (GitCommandError, InvalidGitRepositoryError):
self.logger.exception("Failed to get current commit")
return None
[docs]
def get_changed_files(self, since_commit: str) -> list[str] | None:
"""Get list of files changed since a specific commit.
Note: For shallow clones, this may fail if the comparison commit
is not in the shallow history. In this case, None is returned
and callers should fall back to full processing.
Args:
since_commit: Commit SHA to compare against
Returns:
List of changed file paths, or None if error occurs
Raises:
RuntimeError: If repository doesn't exist
"""
if not self.clone_path.exists():
msg = MSG_NO_REPO.format(path=self.clone_path)
raise RuntimeError(msg)
try:
repo = Repo(str(self.clone_path))
diff_output = repo.git.diff("--name-only", since_commit, "HEAD")
if not diff_output:
self.logger.info("No files changed since commit %s", since_commit)
return []
changed_files: list[str] = diff_output.strip().split("\n")
self.logger.info(
"Found %d changed files since commit %s",
len(changed_files),
since_commit,
)
return changed_files
except GitCommandError as e:
if "unknown revision" in str(e).lower() or "bad object" in str(e).lower():
self.logger.warning(
"Cannot diff against commit %s (likely shallow clone). Falling back to full processing.",
since_commit,
)
else:
self.logger.exception(MSG_DIFF_FAILED)
return None
except InvalidGitRepositoryError:
self.logger.exception(MSG_DIFF_FAILED)
return None
[docs]
def get_file_changes( # noqa: PLR0912
self, since_commit: str
) -> dict[str, list[str]] | None:
"""Get categorized file changes since a specific commit.
Note: For shallow clones, this may fail if the comparison commit
is not in the shallow history. In this case, None is returned
and callers should fall back to full processing.
Args:
since_commit: Commit SHA to compare against
Returns:
Dictionary with keys 'added', 'modified', 'deleted' containing
lists of file paths, or None if error occurs
Raises:
RuntimeError: If repository doesn't exist
"""
if not self.clone_path.exists():
msg = MSG_NO_REPO.format(path=self.clone_path)
raise RuntimeError(msg)
try:
repo = Repo(str(self.clone_path))
# Get diff with status information
diff_output = repo.git.diff("--name-status", since_commit, "HEAD")
if not diff_output:
self.logger.info("No files changed since commit %s", since_commit)
return {"added": [], "modified": [], "deleted": []}
# Parse the diff output
added_files: list[str] = []
modified_files: list[str] = []
deleted_files: list[str] = []
for line in diff_output.strip().split("\n"):
if not line:
continue
parts = line.split("\t", 1)
if len(parts) != 2:
continue
status = parts[0]
file_path = parts[1]
# Handle different status codes
if status.startswith("A"):
added_files.append(file_path)
elif status.startswith("M"):
modified_files.append(file_path)
elif status.startswith("D"):
deleted_files.append(file_path)
elif status.startswith(("R", "C")): # Renamed or Copied
self._handle_rename_or_copy(status, file_path, deleted_files, added_files, modified_files)
else:
# Unknown status, treat as modified
modified_files.append(file_path)
self.logger.info(
"Found %d added, %d modified, %d deleted files since commit %s",
len(added_files),
len(modified_files),
len(deleted_files),
since_commit,
)
return {
"added": added_files,
"modified": modified_files,
"deleted": deleted_files,
}
except GitCommandError as e:
if "unknown revision" in str(e).lower() or "bad object" in str(e).lower():
self.logger.warning(
"Cannot diff against commit %s (likely shallow clone). Falling back to full processing.",
since_commit,
)
else:
self.logger.exception(MSG_DIFF_FAILED)
return None
except InvalidGitRepositoryError:
self.logger.exception(MSG_DIFF_FAILED)
return None
def _handle_rename_or_copy(
self,
status: str,
file_path: str,
deleted_files: list[str],
added_files: list[str],
modified_files: list[str],
) -> None:
"""Handle renamed or copied files.
Args:
status: Git status code (R or C)
file_path: File path(s) from git diff
deleted_files: List to append deleted file paths
added_files: List to append added file paths
modified_files: List to append modified file paths (fallback)
"""
if status.startswith("R"): # Renamed
# Renamed files have format: R<score>\toldpath\tnewpath
# Treat as delete old + add new
if "\t" in file_path:
old_path, new_path = file_path.split("\t", 1)
deleted_files.append(old_path)
added_files.append(new_path)
else:
# Fallback: treat as modified
modified_files.append(file_path)
elif status.startswith("C"): # Copied
# Copied files have format: C<score>\tsourcepath\tnewpath
# Treat as add new, keep source intact
if "\t" in file_path:
_source_path, new_path = file_path.split("\t", 1)
added_files.append(new_path)
else:
# Fallback: treat as modified to avoid malformed added paths
modified_files.append(file_path)