"""GitLab API client with rate limiting, caching, and error handling."""
from datetime import UTC, datetime, timedelta
import logging
import os
import time
from typing import Any
from urllib.parse import quote
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
from thoth.shared.utils.logger import setup_logger
from thoth.shared.utils.secrets import get_secret_manager
# Constants
DEFAULT_BASE_URL = "https://gitlab.com/api/v4"
DEFAULT_TIMEOUT = 30
DEFAULT_MAX_RETRIES = 3
DEFAULT_BACKOFF_FACTOR = 2
CACHE_DEFAULT_TTL = 300 # 5 minutes
RATE_LIMIT_MARGIN = 10 # Safety margin for rate limit
# Error messages
MSG_AUTH_REQUIRED = "Authentication token required for this operation"
MSG_RATE_LIMIT_EXCEEDED = "Rate limit exceeded. Waiting {wait_time}s"
MSG_REQUEST_FAILED = "Request failed after {attempts} attempts: {error}"
MSG_INVALID_RESPONSE = "Invalid response from GitLab API: {error}"
[docs]
class RateLimitError(Exception):
"""Raised when rate limit is exceeded."""
[docs]
class GitLabAPIError(Exception):
"""Raised for GitLab API errors."""
[docs]
class CacheEntry:
"""Represents a cached API response."""
[docs]
def __init__(self, data: Any, ttl: int = CACHE_DEFAULT_TTL):
"""Initialize cache entry.
Args:
data: Data to cache
ttl: Time to live in seconds
"""
self.data = data
self.expires_at = datetime.now(tz=UTC) + timedelta(seconds=ttl)
[docs]
def is_expired(self) -> bool:
"""Check if cache entry is expired."""
return datetime.now(tz=UTC) > self.expires_at
[docs]
class GitLabAPIClient:
"""GitLab API client with rate limiting, caching, and error handling."""
[docs]
def __init__(
self,
token: str | None = None,
base_url: str = DEFAULT_BASE_URL,
timeout: int = DEFAULT_TIMEOUT,
max_retries: int = DEFAULT_MAX_RETRIES,
backoff_factor: float = DEFAULT_BACKOFF_FACTOR,
logger: logging.Logger | None = None,
):
"""Initialize GitLab API client.
Args:
token: GitLab personal access token. If not provided, will try to get from
Secret Manager or GITLAB_TOKEN environment variable.
base_url: Base URL for GitLab API. If not provided, will try to get from
Secret Manager or GITLAB_BASE_URL environment variable.
timeout: Request timeout in seconds
max_retries: Maximum number of retries for failed requests
backoff_factor: Backoff factor for exponential backoff
logger: Logger instance
"""
# Try to get token and base_url from Secret Manager or environment
if token is None:
try:
secret_manager = get_secret_manager()
token = secret_manager.get_gitlab_token()
if token:
logger = logger or setup_logger(__name__)
logger.debug("Retrieved GitLab token from Secret Manager")
except Exception as e: # noqa: BLE001
logger = logger or setup_logger(__name__)
logger.debug("Could not retrieve token from Secret Manager: %s", e)
# Fallback to environment variable
if not token:
token = os.getenv("GITLAB_TOKEN")
if base_url == DEFAULT_BASE_URL:
try:
secret_manager = get_secret_manager()
custom_url = secret_manager.get_gitlab_url()
if custom_url and custom_url != "https://gitlab.com":
base_url = custom_url
logger = logger or setup_logger(__name__)
logger.debug("Using GitLab URL from Secret Manager: %s", base_url)
except Exception as e: # noqa: BLE001
logger = logger or setup_logger(__name__)
logger.debug("Could not retrieve URL from Secret Manager: %s", e)
# Fallback to environment variable
env_url = os.getenv("GITLAB_BASE_URL")
if env_url:
base_url = env_url
self.token = token
# Ensure base_url has /api/v4 suffix
base_url = base_url.rstrip("/")
if not base_url.endswith("/api/v4"):
base_url = f"{base_url}/api/v4"
self.base_url = base_url
self.timeout = timeout
self.logger = logger or setup_logger(__name__)
# Initialize session with retry strategy
self.session = requests.Session()
retry_strategy = Retry(
total=max_retries,
backoff_factor=backoff_factor,
status_forcelist=[500, 502, 503, 504],
allowed_methods=["GET", "POST", "PUT", "DELETE"],
)
adapter = HTTPAdapter(max_retries=retry_strategy)
self.session.mount("https://", adapter)
self.session.mount("http://", adapter)
# Set headers
if self.token:
self.session.headers.update({"PRIVATE-TOKEN": self.token})
# Rate limiting tracking
self._rate_limit_remaining: int | None = None
self._rate_limit_reset: datetime | None = None
# Cache storage
self._cache: dict[str, CacheEntry] = {}
def _get_cache_key(self, endpoint: str, params: dict | None = None) -> str:
"""Generate cache key for endpoint and parameters.
Args:
endpoint: API endpoint
params: Query parameters
Returns:
Cache key string
"""
param_str = ""
if params:
sorted_params = sorted(params.items())
param_str = "&".join(f"{k}={v}" for k, v in sorted_params)
return f"{endpoint}?{param_str}" if param_str else endpoint
def _get_from_cache(self, cache_key: str) -> Any | None:
"""Get data from cache if available and not expired.
Args:
cache_key: Cache key
Returns:
Cached data or None
"""
if cache_key in self._cache:
entry = self._cache[cache_key]
if not entry.is_expired():
self.logger.debug(f"Cache hit: {cache_key}")
return entry.data
# Remove expired entry
del self._cache[cache_key]
self.logger.debug(f"Cache expired: {cache_key}")
return None
def _add_to_cache(self, cache_key: str, data: Any, ttl: int = CACHE_DEFAULT_TTL) -> None:
"""Add data to cache.
Args:
cache_key: Cache key
data: Data to cache
ttl: Time to live in seconds
"""
self._cache[cache_key] = CacheEntry(data, ttl)
self.logger.debug(f"Cached: {cache_key} (TTL: {ttl}s)")
[docs]
def clear_cache(self) -> None:
"""Clear all cached data."""
self._cache.clear()
self.logger.info("Cache cleared")
def _update_rate_limit_info(self, headers: Any) -> None:
"""Update rate limit information from response headers.
Args:
headers: Response headers
"""
if "RateLimit-Remaining" in headers:
self._rate_limit_remaining = int(headers["RateLimit-Remaining"])
self.logger.debug(f"Rate limit remaining: {self._rate_limit_remaining}")
if "RateLimit-Reset" in headers:
reset_timestamp = int(headers["RateLimit-Reset"])
self._rate_limit_reset = datetime.fromtimestamp(reset_timestamp, tz=UTC)
self.logger.debug(f"Rate limit resets at: {self._rate_limit_reset}")
def _check_rate_limit(self) -> None:
"""Check if we're approaching rate limit and wait if necessary.
Raises:
RateLimitError: If rate limit is exceeded
"""
if (
self._rate_limit_remaining is not None
and self._rate_limit_remaining < RATE_LIMIT_MARGIN
and self._rate_limit_reset
):
wait_time = (self._rate_limit_reset - datetime.now(tz=UTC)).total_seconds()
if wait_time > 0:
self.logger.warning(MSG_RATE_LIMIT_EXCEEDED.format(wait_time=wait_time))
time.sleep(wait_time + 1) # Add 1 second buffer
# Reset rate limit tracking
self._rate_limit_remaining = None
self._rate_limit_reset = None
def _make_request(
self,
method: str,
endpoint: str,
params: dict | None = None,
data: dict | None = None,
use_cache: bool = True,
cache_ttl: int = CACHE_DEFAULT_TTL,
) -> Any:
"""Make HTTP request to GitLab API.
Args:
method: HTTP method (GET, POST, PUT, DELETE)
endpoint: API endpoint
params: Query parameters
data: Request body data
use_cache: Whether to use caching for GET requests
cache_ttl: Cache time to live in seconds
Returns:
Response data
Raises:
GitLabAPIError: If request fails
RateLimitError: If rate limit is exceeded
"""
# Check cache for GET requests
if method == "GET" and use_cache:
cache_key = self._get_cache_key(endpoint, params)
cached_data = self._get_from_cache(cache_key)
if cached_data is not None:
return cached_data
# Check rate limit before making request
self._check_rate_limit()
# Construct URL
url = f"{self.base_url}/{endpoint.lstrip('/')}"
try:
# Make request
response = self.session.request(
method=method,
url=url,
params=params,
json=data,
timeout=self.timeout,
)
# Update rate limit info
self._update_rate_limit_info(response.headers)
# Handle rate limit response
if response.status_code == 429:
retry_after = response.headers.get("Retry-After", "60")
wait_time = int(retry_after)
self.logger.warning(MSG_RATE_LIMIT_EXCEEDED.format(wait_time=wait_time))
time.sleep(wait_time)
# Retry the request
return self._make_request(method, endpoint, params, data, use_cache, cache_ttl)
# Raise for HTTP errors
response.raise_for_status()
# Parse response
result = response.json() if response.content else None
# Cache GET requests
if method == "GET" and use_cache and result is not None:
cache_key = self._get_cache_key(endpoint, params)
self._add_to_cache(cache_key, result, cache_ttl)
return result
except requests.exceptions.RequestException as e:
adapter = self.session.adapters["https://"]
max_retries = getattr(adapter, "max_retries", None)
attempts = max_retries.total + 1 if max_retries else 1
error_msg = MSG_REQUEST_FAILED.format(
attempts=attempts,
error=str(e),
)
self.logger.exception(error_msg)
raise GitLabAPIError(error_msg) from e
except ValueError as e:
error_msg = MSG_INVALID_RESPONSE.format(error=str(e))
self.logger.exception(error_msg)
raise GitLabAPIError(error_msg) from e
[docs]
def get(
self,
endpoint: str,
params: dict | None = None,
use_cache: bool = True,
cache_ttl: int = CACHE_DEFAULT_TTL,
) -> Any:
"""Make GET request.
Args:
endpoint: API endpoint
params: Query parameters
use_cache: Whether to use caching
cache_ttl: Cache time to live in seconds
Returns:
Response data
"""
return self._make_request("GET", endpoint, params, use_cache=use_cache, cache_ttl=cache_ttl)
[docs]
def post(self, endpoint: str, data: dict | None = None) -> Any:
"""Make POST request.
Args:
endpoint: API endpoint
data: Request body data
Returns:
Response data
"""
return self._make_request("POST", endpoint, data=data, use_cache=False)
[docs]
def put(self, endpoint: str, data: dict | None = None) -> Any:
"""Make PUT request.
Args:
endpoint: API endpoint
data: Request body data
Returns:
Response data
"""
return self._make_request("PUT", endpoint, data=data, use_cache=False)
[docs]
def delete(self, endpoint: str) -> Any:
"""Make DELETE request.
Args:
endpoint: API endpoint
Returns:
Response data
"""
return self._make_request("DELETE", endpoint, use_cache=False)
# =========================================================================
# Project API Methods
# =========================================================================
[docs]
def get_project(self, project_id: str, use_cache: bool = True) -> dict[str, Any]:
"""Get project details.
Args:
project_id: Project ID or URL-encoded path
use_cache: Whether to use caching
Returns:
Project data
"""
self.logger.debug(f"Fetching project: {project_id}")
encoded_project_id = quote(project_id, safe="") if "/" in project_id and "%" not in project_id else project_id
endpoint = f"projects/{encoded_project_id}"
return self.get(endpoint, use_cache=use_cache)
[docs]
def list_projects(
self,
params: dict | None = None,
use_cache: bool = True,
) -> list[dict[str, Any]]:
"""List projects.
Args:
params: Query parameters (e.g., {'per_page': 100, 'page': 1})
use_cache: Whether to use caching
Returns:
List of projects
"""
self.logger.debug("Listing projects")
return self.get("projects", params=params, use_cache=use_cache)
# =========================================================================
# Repository API Methods
# =========================================================================
[docs]
def get_repository_tree(
self,
project_id: str,
path: str = "",
ref: str = "main",
recursive: bool = False,
use_cache: bool = True,
) -> list[dict[str, Any]]:
"""Get repository tree.
Args:
project_id: Project ID or URL-encoded path
path: Path inside repository
ref: Branch/tag name
recursive: Get tree recursively
use_cache: Whether to use caching
Returns:
List of repository tree items
"""
self.logger.debug(f"Fetching repository tree for {project_id} (path: {path}, ref: {ref})")
endpoint = f"projects/{project_id}/repository/tree"
params = {
"path": path,
"ref": ref,
"recursive": str(recursive).lower(),
}
return self.get(endpoint, params=params, use_cache=use_cache)
[docs]
def get_file(
self,
project_id: str,
file_path: str,
ref: str = "main",
use_cache: bool = True,
) -> dict[str, Any]:
"""Get file content from repository.
Args:
project_id: Project ID or URL-encoded path
file_path: Path to file in repository
ref: Branch/tag name
use_cache: Whether to use caching
Returns:
File data including content
"""
self.logger.debug(f"Fetching file {file_path} from {project_id} (ref: {ref})")
endpoint = f"projects/{project_id}/repository/files/{quote(file_path, safe='')}"
params = {"ref": ref}
return self.get(endpoint, params=params, use_cache=use_cache)
[docs]
def get_commits(
self,
project_id: str,
ref: str = "main",
since: str | None = None,
until: str | None = None,
path: str | None = None,
use_cache: bool = True,
) -> list[dict[str, Any]]:
"""Get commits for a project.
Args:
project_id: Project ID or URL-encoded path
ref: Branch/tag name
since: Only commits after this date (ISO 8601 format)
until: Only commits before this date (ISO 8601 format)
path: Only commits that include this file path
use_cache: Whether to use caching
Returns:
List of commits
"""
endpoint = f"projects/{project_id}/repository/commits"
params = {"ref_name": ref}
if since:
params["since"] = since
if until:
params["until"] = until
if path:
params["path"] = path
return self.get(endpoint, params=params, use_cache=use_cache)
[docs]
def get_commit(
self,
project_id: str,
commit_sha: str,
use_cache: bool = True,
) -> dict[str, Any]:
"""Get a single commit.
Args:
project_id: Project ID or URL-encoded path
commit_sha: Commit SHA
use_cache: Whether to use caching
Returns:
Commit data
"""
endpoint = f"projects/{project_id}/repository/commits/{commit_sha}"
return self.get(endpoint, use_cache=use_cache)
[docs]
def get_commit_diff(
self,
project_id: str,
commit_sha: str,
use_cache: bool = True,
) -> list[dict[str, Any]]:
"""Get diff of a commit.
Args:
project_id: Project ID or URL-encoded path
commit_sha: Commit SHA
use_cache: Whether to use caching
Returns:
List of diffs
"""
endpoint = f"projects/{project_id}/repository/commits/{commit_sha}/diff"
return self.get(endpoint, use_cache=use_cache)
# =========================================================================
# Branch API Methods
# =========================================================================
[docs]
def list_branches(
self,
project_id: str,
use_cache: bool = True,
) -> list[dict[str, Any]]:
"""List branches.
Args:
project_id: Project ID or URL-encoded path
use_cache: Whether to use caching
Returns:
List of branches
"""
endpoint = f"projects/{project_id}/repository/branches"
return self.get(endpoint, use_cache=use_cache)
[docs]
def get_branch(
self,
project_id: str,
branch: str,
use_cache: bool = True,
) -> dict[str, Any]:
"""Get branch details.
Args:
project_id: Project ID or URL-encoded path
branch: Branch name
use_cache: Whether to use caching
Returns:
Branch data
"""
endpoint = f"projects/{project_id}/repository/branches/{quote(branch, safe='')}"
return self.get(endpoint, use_cache=use_cache)
# =========================================================================
# Merge Request API Methods
# =========================================================================
[docs]
def list_merge_requests(
self,
project_id: str,
state: str = "opened",
params: dict | None = None,
use_cache: bool = True,
) -> list[dict[str, Any]]:
"""List merge requests.
Args:
project_id: Project ID or URL-encoded path
state: State filter ('opened', 'closed', 'merged', 'all')
params: Additional query parameters
use_cache: Whether to use caching
Returns:
List of merge requests
"""
endpoint = f"projects/{project_id}/merge_requests"
request_params = {"state": state}
if params:
request_params.update(params)
return self.get(endpoint, params=request_params, use_cache=use_cache)
[docs]
def get_merge_request(
self,
project_id: str,
mr_iid: int,
use_cache: bool = True,
) -> dict[str, Any]:
"""Get merge request details.
Args:
project_id: Project ID or URL-encoded path
mr_iid: Merge request IID
use_cache: Whether to use caching
Returns:
Merge request data
"""
endpoint = f"projects/{project_id}/merge_requests/{mr_iid}"
return self.get(endpoint, use_cache=use_cache)
# =========================================================================
# User API Methods
# =========================================================================
[docs]
def get_current_user(self, use_cache: bool = True) -> dict[str, Any]:
"""Get current authenticated user.
Args:
use_cache: Whether to use caching
Returns:
User data
Raises:
GitLabAPIError: If not authenticated
"""
if not self.token:
raise GitLabAPIError(MSG_AUTH_REQUIRED)
return self.get("user", use_cache=use_cache)
[docs]
def get_user(self, user_id: int, use_cache: bool = True) -> dict[str, Any]:
"""Get user details.
Args:
user_id: User ID
use_cache: Whether to use caching
Returns:
User data
"""
endpoint = f"users/{user_id}"
return self.get(endpoint, use_cache=use_cache)
# =========================================================================
# Rate Limit Info
# =========================================================================
[docs]
def get_rate_limit_info(self) -> dict[str, Any]:
"""Get current rate limit information.
Returns:
Dictionary with rate limit info
"""
return {
"remaining": self._rate_limit_remaining,
"reset_at": (self._rate_limit_reset.isoformat() if self._rate_limit_reset else None),
}