import os
import shutil
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import Dict, List, Optional
import requests
from PIL import Image, ImageFile
from io import BytesIO
from huggingface_hub import hf_api, HfApi, hf_hub_url, snapshot_download, hf_hub_download
from huggingface_hub.utils import RepositoryNotFoundError, RevisionNotFoundError
from vision_unlearning.utils.logger import get_logger
logger = get_logger('integrations')
[docs]
def huggingface_model_upload(
folder_models: str,
model_repository: str,
model_config: Optional[str] = None,
token: Optional[str] = None,
) -> None:
'''
Upload an entire folder or specific model config in one single commit
When model_config is None, uploads entire contents of folder_models
Supposes that the folder exists in `folder_models`, and that it contains the model files
'''
folder_model = folder_models if model_config is None else os.path.join(folder_models, model_config)
# TODO: merge this func with the upload dataset
# TODO: each config/version should be immutable... should this be ensured here?
assert os.path.exists(folder_model)
# TODO: upload_large_folder is better, but don't allow to set the path_in_repo
# This can be solved by creating tje folder locally (with the path_in_repo inside), and then uploading the contents
api = HfApi()
api.upload_folder(
folder_path=folder_model,
repo_id=model_repository,
path_in_repo=model_config,
repo_type='model',
token=token,
)
[docs]
def huggingface_model_download(
folder_models: str,
model_repository: str,
model_config: Optional[str] = None,
token: Optional[str] = None,
clean: bool = False,
) -> None:
'''
Download a model or specific model config from Hugging Face Hub.
Args:
folder_models: Local directory to save the model
model_repository: Hugging Face repository ID
model_config: Specific model config to download (None for entire repository)
token: Hugging Face authentication token
clean: If True, the folder will be deleted before downloading
'''
folder_model = os.path.join(folder_models, model_config) if model_config else folder_models
if clean and os.path.exists(folder_model):
shutil.rmtree(folder_model)
if os.path.exists(folder_model):
logger.info('Model already exists locally, skipping download')
return
os.makedirs(folder_model, exist_ok=True)
# Download to cache
folder_cache = '/tmp/huggingface_cache'
folder_cache_model = os.path.join(folder_cache, model_repository)
if model_config:
folder_cache_model = os.path.join(folder_cache_model, model_config)
os.makedirs(folder_cache_model, exist_ok=True)
if model_config:
repo_path = snapshot_download(
repo_id=model_repository,
repo_type="model",
token=token,
allow_patterns=f"{model_config}/*",
cache_dir=folder_cache,
)
else:
repo_path = snapshot_download(
repo_id=model_repository,
repo_type="model",
token=token,
cache_dir=folder_cache,
)
# Copy from cache to final folder
source_path = repo_path if not model_config else os.path.join(repo_path, model_config)
for root, _, files in os.walk(source_path):
for file in files:
file_source_path = os.path.join(root, file)
if os.path.islink(file_source_path):
file_source_path = os.path.join(root, os.readlink(file_source_path))
rel_path = os.path.relpath(os.path.join(root, file), start=source_path)
target_path = os.path.join(folder_model, rel_path)
os.makedirs(os.path.dirname(target_path), exist_ok=True)
shutil.copy2(file_source_path, target_path)
[docs]
def huggingface_dataset_exists(
dataset_repository: str,
dataset_config: str,
token: Optional[str],
) -> bool:
"""
Checks whether a folder exists in a Hugging Face dataset repository.
Example:
dataset_repository="username/my_dataset"
dataset_config="configs/en"
Works without listing the whole repository.
"""
url = (
f"https://huggingface.co/api/datasets/"
f"{dataset_repository}/tree/main/{dataset_config.replace(os.sep, '/')}"
)
headers = {}
if token:
headers["Authorization"] = f"Bearer {token}"
response = requests.get(url, headers=headers)
if response.status_code == 404:
return False
response.raise_for_status()
# Existing folders return a JSON array of entries.
return isinstance(response.json(), list)
[docs]
def huggingface_dataset_file_exists(
dataset_repository: str,
dataset_path: str,
token: Optional[str],
) -> bool:
"""
Checks if a specific file exists in a Hugging Face dataset repository.
:param dataset_repository: e.g. "username/dataset_name"
:param dataset_path: full path in repo (e.g. "config/file.jsonl")
:param token: HF token (can be None for public repos)
:return: True if file exists, False otherwise
Efficiently checks if a file exists in a Hugging Face dataset repo without listing the entire repository.
Could be done more efficiently if we use a new version of the lib, see https://chatgpt.com/share/69edd525-d008-832d-8a0c-ec4560a4fe3b
"""
url = hf_hub_url(
repo_id=dataset_repository,
filename=dataset_path,
repo_type="dataset",
)
#print('url:', url, flush=True)
headers = {}
if token:
headers["Authorization"] = f"Bearer {token}"
response = requests.head(url, headers=headers)
return response.status_code in (200, 302, 303, 307)
[docs]
def huggingface_dataset_file_upload(
file_path: str,
dataset_repository: str,
dataset_path: str,
token: str,
):
'''
Upload a single file to a specific dataset config in Hugging Face Hub.
@param dataset_path: full name of the file in the repository, including the config folder (e.g., "my_config/my_file.jsonl")
'''
assert os.path.exists(file_path)
api = HfApi()
api.upload_file(
path_or_fileobj=file_path,
path_in_repo=dataset_path,
repo_id=dataset_repository,
repo_type='dataset',
token=token,
)
[docs]
def huggingface_dataset_upload(
folder_datasets: str,
dataset_repository: str,
dataset_config: str,
token: str,
):
'''
Supposes that a folder `dataset_config` exists in `folder_datasets`, and that it contains the dataset files
'''
folder_dataset = os.path.join(folder_datasets, dataset_config)
assert os.path.exists(folder_dataset)
# TODO: each config/version should be immutable... should this be ensured here?
# TODO: upload_large_folder is better, but don't allow to set the path_in_repo
# This can be solved by creating tje folder locally (with the path_in_repo inside), and then uploading the contents
api = HfApi()
api.upload_folder(
folder_path=folder_dataset,
repo_id=dataset_repository,
path_in_repo=dataset_config,
repo_type='dataset',
token=token,
)
[docs]
def huggingface_dataset_download(
folder_datasets: str,
dataset_repository: str,
dataset_config: str,
token: str,
clean: bool = False,
folder_cache: str = '/tmp/huggingface_cache',
clean_cache: bool = False,
):
'''
@param clean: If True, the folder will be deleted before downloading
'''
folder_dataset = os.path.join(folder_datasets, dataset_config)
if clean:
if os.path.exists(folder_dataset):
shutil.rmtree(folder_dataset)
if os.path.exists(folder_dataset):
logger.info('Dataset already exists locally, skipping download')
return
os.makedirs(folder_dataset)
folder_cache_dataset = os.path.join(folder_cache, dataset_repository, dataset_config)
os.makedirs(folder_cache_dataset, exist_ok=True)
# Download to cache
repo_path = snapshot_download(
repo_id=dataset_repository,
repo_type="dataset",
token=token,
allow_patterns=f"{dataset_config}/*",
cache_dir=folder_cache,
)
# Copy from cache to final folder
for root, _, files in os.walk(os.path.join(repo_path, dataset_config)):
for file in files:
source_path = os.path.join(root, file)
if os.path.islink(source_path):
source_path = os.path.join(root, os.readlink(source_path))
target_path = os.path.join(folder_dataset, os.path.relpath(os.path.join(root, file), start=os.path.join(repo_path, dataset_config)))
os.makedirs(os.path.dirname(target_path), exist_ok=True)
shutil.copy2(source_path, target_path)
# Remove cache
if clean_cache:
shutil.rmtree(repo_path)
[docs]
def huggingface_dataset_file_download(
folder_datasets: str,
dataset_repository: str,
file_path: str,
token: str,
folder_cache: str = '/tmp/huggingface_cache',
) -> None:
'''
Download a single file from a dataset in Hugging Face Hub.
Args:
folder_datasets: Local directory where datasets are stored.
dataset_repository: Hugging Face dataset repository ID
file_path: Full path of the file within the repository (e.g., "config/data.jsonl")
token: Hugging Face authentication token
folder_cache: Cache directory for downloads
The file will be saved at os.path.join(folder_datasets, file_path)
'''
os.makedirs(folder_datasets, exist_ok=True)
os.makedirs(folder_cache, exist_ok=True)
# Download to cache
cached_path = hf_hub_download(
repo_id=dataset_repository,
filename=file_path,
repo_type="dataset",
token=token,
cache_dir=folder_cache,
)
# Copy from cache to final folder
target_path = os.path.join(folder_datasets, file_path)
os.makedirs(os.path.dirname(target_path), exist_ok=True)
shutil.copy2(cached_path, target_path)
[docs]
def huggingface_get_model_metrics(model_id: str) -> Dict[str, float | int | bool]:
'''
Supposes that the credentials are properly configured
'''
api = hf_api.HfApi()
name_to_value = {}
model_info = api.model_info(model_id)
if model_info.cardData and model_info.cardData.eval_results:
for result in model_info.cardData.eval_results:
name_to_value[str(result.metric_name)] = result.metric_value
else:
logger.info(f"No metrics found for {model_id}")
return name_to_value
[docs]
def huggingface_get_model_images(model_id, prefix: str = '') -> List[ImageFile.ImageFile]:
'''
Searches in anything starting with `prefix`
'''
images: List[ImageFile.ImageFile] = []
api = hf_api.HfApi()
model_info = api.model_info(model_id)
if model_info.siblings:
for sibling in model_info.siblings:
if sibling.rfilename.endswith(('.png', '.jpg', '.jpeg', '.gif')) and sibling.rfilename.startswith(prefix):
logger.info(f"Image: {sibling.rfilename}")
response = requests.get(f"https://huggingface.co/{model_id}/resolve/main/{sibling.rfilename}")
images.append(Image.open(BytesIO(response.content)))
else:
logger.info(f"No files found in the repository {model_id}")
return images
[docs]
def _huggingface_download_one_file(
entry: dict, # type: ignore[type-arg]
folder_dataset: str,
dataset_repository: str,
headers: dict, # type: ignore[type-arg]
) -> bool:
"""Download a single file from HF via HTTP. Returns True on success."""
entry_path = entry.get("path", "")
filename = os.path.basename(entry_path)
local_path = os.path.join(folder_dataset, filename)
if os.path.exists(local_path):
return True # already present — treated as success
dl_url = (
f"https://huggingface.co/datasets/{dataset_repository}"
f"/resolve/main/{entry_path}"
)
for attempt in range(3):
try:
resp = requests.get(
dl_url,
headers=headers,
allow_redirects=True,
timeout=60,
)
resp.raise_for_status()
with open(local_path, "wb") as fh:
fh.write(resp.content)
return True
except Exception as exc:
logger.warning(
"Attempt %d/3 failed for %s: %s", attempt + 1, filename, exc
)
return False
[docs]
def huggingface_dataset_download_parallel(
folder_datasets: str,
dataset_repository: str,
dataset_config: str,
token: str,
clean: bool = False,
folder_cache: str = "C:/tmp/huggingface_cache",
hf_prefix: str = "datasets",
max_workers: int = 12,
) -> None:
"""Download a dataset config folder from HF using parallel HTTP requests.
Faster alternative to huggingface_dataset_download() for large folders.
Uses ThreadPoolExecutor(max_workers) for concurrent file downloads; reduces
per-entity download time from ~6 minutes (sequential snapshot_download) to
~35 s at max_workers=12 (measured on HF for 801 PNG files, 2026-05-20).
Args:
folder_datasets: Local parent directory (e.g. "assets/datasets").
dataset_repository: HF dataset repo ID.
dataset_config: Folder name within folder_datasets AND within hf_prefix
on HF (e.g. "generated_people_George W Bush_uce_000").
token: HF auth token.
clean: If True, delete local folder before downloading.
folder_cache: Unused — kept for signature compatibility with huggingface_dataset_download().
hf_prefix: Prefix path within the HF repo (default "datasets").
max_workers: Thread pool size for concurrent HTTP downloads.
Benchmark (2026-05-20, 801 files): 1=349s, 4=91s, 8=48s, 12=35s.
12 is the recommended default; do not exceed 16 (HF rate limits).
"""
folder_dataset = os.path.join(folder_datasets, dataset_config)
if clean and os.path.exists(folder_dataset):
shutil.rmtree(folder_dataset)
if os.path.exists(folder_dataset) and len(os.listdir(folder_dataset)) > 0:
logger.info('Dataset already exists locally, skipping download: %s', folder_dataset)
return
os.makedirs(folder_dataset, exist_ok=True)
hf_path = f"{hf_prefix}/{dataset_config}" if hf_prefix else dataset_config
headers = {"Authorization": f"Bearer {token}"}
# List files via HF tree API
tree_url = (
f"https://huggingface.co/api/datasets/{dataset_repository}"
f"/tree/main/{hf_path}"
)
logger.info("Fetching file list: %s", tree_url)
r = requests.get(tree_url, headers=headers, timeout=30)
r.raise_for_status()
entries = r.json()
logger.info("Files in HF folder: %d", len(entries))
file_entries: List[dict] = [ # type: ignore[type-arg]
e
for e in entries
if e.get("type", "file") != "directory" and os.path.basename(e.get("path", ""))
]
# Parallel download
failed = 0
done = 0
total = len(file_entries)
with ThreadPoolExecutor(max_workers=max_workers) as pool:
futures = {
pool.submit(_huggingface_download_one_file, entry, folder_dataset, dataset_repository, headers): entry
for entry in file_entries
}
for future in as_completed(futures):
success = future.result()
if success:
done += 1
else:
failed += 1
completed = done + failed
if completed % 100 == 0 or completed == total:
logger.info("Download progress: %d/%d (failed: %d)", completed, total, failed)
logger.info(
"Download complete: %d downloaded, %d failed -> %s",
done, failed, folder_dataset,
)
fail_rate = failed / max(total, 1)
if fail_rate > 0.01:
raise RuntimeError(
f"huggingface_dataset_download_parallel: {failed}/{total} files failed "
f"({fail_rate:.1%}) for {dataset_config}"
)
if failed > 0:
logger.warning(
"Tolerating %d failed file(s) for %s (below 1%% threshold)",
failed, dataset_config,
)