Source code for hypergraphx.readwrite.load

from __future__ import annotations

import gzip
import json
import os
import re
import ssl
import tempfile
from pathlib import Path

from typing import Any, Iterable, List, Tuple
from urllib.parse import urlparse
from urllib.request import Request, urlopen
from urllib.error import HTTPError, URLError

from hypergraphx.core.undirected import Hypergraph
from hypergraphx.core.directed import DirectedHypergraph
from hypergraphx.core.multiplex import MultiplexHypergraph
from hypergraphx.core.temporal import TemporalHypergraph
from hypergraphx.exceptions import InvalidFileTypeError, InvalidFormatError
from hypergraphx.readwrite.io_json import (
    _parse_json_bytes_to_hypergraph,
    load_json_file,
)
from hypergraphx.readwrite.io_pickle import load_pickle

_BASE = "https://cricca.disi.unitn.it/datasets/hypergraphx-data"
_CATALOG_URL = "https://hgx-team.github.io/hypergraphx-data/static/js/related-data.js"

__all__ = [
    "download_remote_dataset",
    "download_remote_datasets",
    "iter_remote_hypergraphs",
    "get_remote_dataset_info",
    "list_remote_datasets",
    "load",
    "load_hypergraph",
    "load_hypergraph_from_server",
    "search_remote_datasets",
]


def _decompress_gzip_if_needed(raw: bytes) -> bytes:
    try:
        return gzip.decompress(raw)
    except OSError:
        return raw


def _ensure_hypergraph_obj(obj: Any):
    allowed = (
        Hypergraph,
        DirectedHypergraph,
        MultiplexHypergraph,
        TemporalHypergraph,
        dict,
    )
    if not isinstance(obj, allowed):
        raise TypeError(f"Object has type {type(obj)!r}, expected one of {allowed}.")


def _download(url: str, *, timeout: int = 30, verify_ssl: bool = True) -> bytes:
    try:
        if verify_ssl:
            context = ssl.create_default_context()
            try:
                import certifi  # type: ignore

                context = ssl.create_default_context(cafile=certifi.where())
            except Exception:
                pass
        else:
            context = ssl._create_unverified_context()  # noqa: SLF001
        req = Request(url, headers={"User-Agent": "hypergraphx-loader/1.0"})
        with urlopen(req, timeout=timeout, context=context) as resp:
            return resp.read()
    except HTTPError as exc:
        raise FileNotFoundError(f"Not found at {url} (HTTP {exc.code}).") from exc
    except URLError as exc:
        raise ConnectionError(
            f"Network error reaching {url}: {exc.reason}. "
            "Are you offline? For offline use, download the dataset and use load_hypergraph(...) on a local file."
        ) from exc


def _server_urls(dataset_name: str, fmt: str | None):
    urls = {
        "json": (
            f"{_BASE}/{dataset_name}/{dataset_name}.json.gz",
            f"{_BASE}/{dataset_name}/{dataset_name}.json",
            f"{_BASE}/{dataset_name}.json.gz",
            f"{_BASE}/{dataset_name}.json",
        ),
        "binary": (
            f"{_BASE}/{dataset_name}/{dataset_name}.hgx.gz",
            f"{_BASE}/{dataset_name}/{dataset_name}.hgx",
            f"{_BASE}/{dataset_name}.hgx.gz",
            f"{_BASE}/{dataset_name}.hgx",
            f"{_BASE}/{dataset_name}.pkl",
        ),
    }
    if fmt is None:
        return urls["json"] + urls["binary"]
    if fmt in {"json"}:
        return urls["json"]
    if fmt in {"binary", "pickle", "pkl", "hgx"}:
        return urls["binary"]
    raise InvalidFormatError("fmt must be one of {'json', 'binary', 'hgx'}")


def _deduplicate_urls(urls):
    seen = set()
    deduplicated = []
    for url in urls:
        if not url or url in seen:
            continue
        seen.add(url)
        deduplicated.append(url)
    return deduplicated


def _version_urls_from_catalog(dataset_info: dict, fmt: str | None):
    versions = dataset_info.get("versions") or []
    json_urls = []
    binary_urls = []
    for version in versions:
        if not isinstance(version, dict):
            continue
        json_urls.append(version.get("json_download"))
        binary_urls.append(version.get("binary_download"))

    if fmt is None:
        return _deduplicate_urls(json_urls + binary_urls)
    if fmt in {"json"}:
        return _deduplicate_urls(json_urls)
    if fmt in {"binary", "pickle", "pkl", "hgx"}:
        return _deduplicate_urls(binary_urls)
    raise InvalidFormatError("fmt must be one of {'json', 'binary', 'hgx'}")


def _dataset_identifiers(dataset: dict):
    return {
        str(dataset.get("name", "")),
        str(dataset.get("filename", "")),
        str(dataset.get("directory", "")),
    }


def _find_remote_dataset_info(dataset_name: str, datasets: Iterable[dict]):
    for dataset in datasets:
        if dataset_name in _dataset_identifiers(dataset):
            return dataset
    raise FileNotFoundError(f"Dataset not found in remote catalog: {dataset_name}")


def _attributes_match(dataset: dict, attributes, match_all: bool = True):
    if isinstance(attributes, str):
        requested = {attributes.casefold()}
    elif attributes is None:
        requested = set()
    else:
        requested = {str(attr).casefold() for attr in attributes}
    if not requested:
        return True

    tags = {str(tag).casefold() for tag in dataset.get("tags", [])}
    return requested.issubset(tags) if match_all else bool(requested & tags)


def _has_selection_values(values):
    if values is None:
        return False
    if isinstance(values, str):
        return bool(values)
    return bool(list(values))


def _remote_payload_format(url: str):
    path = urlparse(url).path
    if path.endswith(".gz"):
        path = path[:-3]
    if path.endswith(".json"):
        return "json"
    if path.endswith((".hgx", ".pkl", ".pickle")):
        return "binary"
    raise InvalidFormatError(f"Cannot infer remote payload format from URL: {url}")


def _default_dataset_cache_dir():
    return Path(
        os.environ.get(
            "HYPERGRAPHX_DATA_CACHE",
            os.path.join("~", ".cache", "hypergraphx", "datasets"),
        )
    ).expanduser()


def _remote_cache_path(dataset_name: str, url: str, cache_dir=None):
    root = (
        Path(cache_dir).expanduser()
        if cache_dir is not None
        else _default_dataset_cache_dir()
    )
    path = urlparse(url).path
    filename = os.path.basename(path)
    if filename.endswith(".gz"):
        filename = filename[:-3]
    return root / dataset_name / filename


def _load_remote_payload_from_path(path: Path, payload_format: str):
    if payload_format == "json":
        return load_json_file(str(path))
    return load_pickle(str(path))


def _resolve_remote_dataset_urls(
    dataset_name: str,
    fmt: str | None,
    *,
    timeout: int = 30,
    verify_ssl: bool = False,
    catalog_url: str | None = None,
    use_catalog: bool = True,
    dataset_info: dict | None = None,
):
    last_error = None
    urls = []
    if dataset_info is not None:
        urls.extend(_version_urls_from_catalog(dataset_info, fmt))
    elif use_catalog:
        try:
            dataset_info = get_remote_dataset_info(
                dataset_name,
                timeout=timeout,
                verify_ssl=verify_ssl,
                catalog_url=catalog_url,
            )
            urls.extend(_version_urls_from_catalog(dataset_info, fmt))
        except Exception as exc:
            last_error = exc
    urls = _deduplicate_urls(urls + list(_server_urls(dataset_name, fmt)))
    return urls, last_error


def _download_remote_dataset_file(
    dataset_name: str,
    url: str,
    *,
    timeout: int = 30,
    verify_ssl: bool = False,
    cache_dir=None,
    overwrite: bool = False,
):
    cache_path = _remote_cache_path(dataset_name, url, cache_dir)
    if cache_path.exists() and not overwrite:
        return cache_path

    payload = _decompress_gzip_if_needed(
        _download(url, timeout=timeout, verify_ssl=verify_ssl)
    )
    cache_path.parent.mkdir(parents=True, exist_ok=True)
    cache_path.write_bytes(payload)
    return cache_path


def _infer_local_payload_format(path: str):
    lower = path.lower()
    if lower.endswith(".gz"):
        lower = lower[:-3]
    if lower.endswith((".pkl", ".pickle", ".hgx")):
        return "pickle"
    if lower.endswith(".json"):
        return "json"
    if lower.endswith(".hgr"):
        return "hgr"
    raise InvalidFileTypeError("Invalid file type")


def _load_hypergraph_from_decompressed_bytes(payload: bytes, payload_format: str):
    if payload_format == "json":
        return _parse_json_bytes_to_hypergraph(payload)
    with tempfile.NamedTemporaryFile(suffix=f".{payload_format}") as tmp:
        tmp.write(payload)
        tmp.flush()
        if payload_format == "hgr":
            return _load_hgr_file(tmp.name)
        return load_pickle(tmp.name)


def _load_gzipped_hypergraph(file_name: str, payload_format: str):
    with gzip.open(file_name, "rb") as infile:
        return _load_hypergraph_from_decompressed_bytes(infile.read(), payload_format)


def _parse_remote_dataset_catalog(payload: bytes):
    text = payload.decode("utf-8")
    text = text.strip()

    if text.startswith("window.RELATED_DATASETS"):
        match = re.match(r"window\.RELATED_DATASETS\s*=\s*(.*?);?\s*$", text, re.S)
        if not match:
            raise InvalidFormatError("Could not parse remote dataset catalog.")
        text = match.group(1)

    try:
        parsed = json.loads(text)
    except Exception as exc:
        raise InvalidFormatError("Remote dataset catalog is not valid JSON.") from exc

    if isinstance(parsed, dict):
        items = parsed.get("datasets")
    else:
        items = parsed

    if not isinstance(items, list):
        raise InvalidFormatError(
            "Remote dataset catalog must be a list or contain a 'datasets' list."
        )

    datasets = []
    for item in items:
        if not isinstance(item, dict) or "name" not in item:
            raise InvalidFormatError(
                "Remote dataset catalog entries must contain names."
            )
        dataset = dict(item)
        tags = list(item.get("tags") or item.get("categories") or [])
        dataset["tags"] = tags
        dataset["categories"] = tags
        dataset.setdefault("filename", item.get("directory") or item["name"])
        dataset.setdefault("directory", dataset["filename"])
        dataset.setdefault("vertices", item.get("vertices"))
        dataset.setdefault("edges", item.get("edges"))
        datasets.append(dataset)
    return datasets


def _catalog_url_candidates(catalog_url: str | None = None):
    explicit = catalog_url or os.environ.get("HYPERGRAPHX_DATA_CATALOG_URL")
    if explicit:
        return [explicit]
    return [_CATALOG_URL]


def _load_remote_dataset_catalog(
    *,
    timeout: int = 30,
    verify_ssl: bool = False,
    catalog_url: str | None = None,
):
    last_error = None
    for url in _catalog_url_candidates(catalog_url):
        try:
            payload = _download(url, timeout=timeout, verify_ssl=verify_ssl)
            return _parse_remote_dataset_catalog(payload)
        except Exception as exc:
            last_error = exc
            continue
    raise InvalidFormatError(
        f"Could not load remote dataset catalog: {last_error}"
    ) from last_error


[docs] def list_remote_datasets( *, timeout: int = 30, verify_ssl: bool = False, catalog_url: str | None = None, ): """ List datasets advertised by the remote Hypergraphx-data catalog. Returns a list of dictionaries with at least: - ``name`` - ``tags`` / ``categories`` - ``vertices`` - ``edges`` Parameters ---------- timeout : int, default=30 Download timeout in seconds. verify_ssl : bool, default=False Whether to verify TLS certificates when downloading the catalog. Defaults to False for compatibility with the current dataset server. catalog_url : str, optional Catalog metadata URL. Defaults to the Hypergraphx-data GitHub raw URL, or ``HYPERGRAPHX_DATA_CATALOG_URL`` if set. Notes ----- ``catalog_url`` can point to the generated ``catalog.json`` file, a JSON list, or the legacy ``related-data.js`` file used by the website. """ return _load_remote_dataset_catalog( timeout=timeout, verify_ssl=verify_ssl, catalog_url=catalog_url, )
[docs] def get_remote_dataset_info( dataset_name: str, *, timeout: int = 30, verify_ssl: bool = False, catalog_url: str | None = None, ): """ Return the full catalog entry for a remote dataset. ``dataset_name`` is matched against the catalog ``name``, ``filename``, and ``directory`` fields. """ datasets = list_remote_datasets( timeout=timeout, verify_ssl=verify_ssl, catalog_url=catalog_url, ) return _find_remote_dataset_info(dataset_name, datasets)
[docs] def iter_remote_hypergraphs( attributes=None, *, names=None, match_all: bool = True, fmt: str = "hgx", timeout: int = 30, verify_ssl: bool = False, catalog_url: str | None = None, include_metadata: bool = False, store: bool = True, cache_dir=None, overwrite: bool = False, ): """ Yield remote hypergraphs selected by name or catalog tags/categories. Parameters ---------- attributes : str | Iterable[str], optional Tag/category names to match, such as ``"Undirected"`` or ``["Undirected", "Temporal"]``. Matching is case-insensitive. names : str | Iterable[str], optional Dataset names, filenames, or directories to load explicitly. If omitted, datasets are selected from ``attributes``. match_all : bool, default=True If True, a dataset must contain all requested attributes. If False, any requested attribute is enough. fmt : {"hgx", "binary", "json"}, default="hgx" Remote format to load for each matching dataset. verify_ssl : bool, default=False Whether to verify TLS certificates for remote requests. catalog_url : str, optional Catalog metadata URL used for filtering. include_metadata : bool, default=False If True, yield ``(hypergraph, dataset_info)`` pairs. Otherwise yield only the hypergraph object. store : bool, default=True Store downloaded datasets locally before loading them. cache_dir : path-like, optional Cache directory. Defaults to ``~/.cache/hypergraphx/datasets`` or the ``HYPERGRAPHX_DATA_CACHE`` environment variable. overwrite : bool, default=False If True, re-download matching datasets even when cached files exist. Notes ----- This is a generator: datasets are downloaded and loaded lazily as the iterator advances. """ if names is None: if isinstance(attributes, str): selected_attributes = attributes else: selected_attributes = list(attributes or []) if not _has_selection_values(selected_attributes): raise ValueError("At least one dataset name or attribute must be provided.") else: requested_names = [names] if isinstance(names, str) else list(names) if not requested_names: raise ValueError("At least one dataset name or attribute must be provided.") if isinstance(attributes, str) or attributes is None: selected_attributes = attributes else: selected_attributes = list(attributes) datasets = list_remote_datasets( timeout=timeout, verify_ssl=verify_ssl, catalog_url=catalog_url, ) if names is None: selected = [ dataset for dataset in datasets if _attributes_match(dataset, selected_attributes, match_all) ] else: selected = [ _find_remote_dataset_info(dataset_name, datasets) for dataset_name in requested_names ] if selected_attributes is not None: selected = [ dataset for dataset in selected if _attributes_match(dataset, selected_attributes, match_all) ] for dataset in selected: hypergraph = load_hypergraph_from_server( dataset["name"], fmt=fmt, timeout=timeout, verify_ssl=verify_ssl, catalog_url=catalog_url, store=store, cache_dir=cache_dir, overwrite=overwrite, use_catalog=False, dataset_info=dataset, ) if include_metadata: yield hypergraph, dataset else: yield hypergraph
def _matches_range(value, minimum, maximum): if value is None: return minimum is None and maximum is None if minimum is not None and value < minimum: return False if maximum is not None and value > maximum: return False return True
[docs] def search_remote_datasets( query: str | None = None, *, tags=None, match_all_tags: bool = True, source: str | None = None, license: str | None = None, min_nodes: int | None = None, max_nodes: int | None = None, min_edges: int | None = None, max_edges: int | None = None, timeout: int = 30, verify_ssl: bool = False, catalog_url: str | None = None, ): """ Search the remote Hypergraphx-data catalog. Parameters ---------- query : str, optional Case-insensitive substring matched against dataset names and tags. tags : str | Iterable[str], optional Tags/categories to require. Matching is case-insensitive. match_all_tags : bool, default=True If True, all requested tags must be present. If False, any requested tag is enough. source : str, optional Case-insensitive substring matched against the source URL/text. license : str, optional Case-insensitive substring matched against the license identifier/text. min_nodes, max_nodes, min_edges, max_edges : int, optional Inclusive size filters using catalog ``vertices`` and ``edges``. Returns ------- list[dict] Matching catalog entries in catalog order. See Also -------- list_remote_datasets : Return the full remote catalog. iter_remote_hypergraphs : Lazily load matching remote hypergraphs. """ datasets = list_remote_datasets( timeout=timeout, verify_ssl=verify_ssl, catalog_url=catalog_url, ) query_cf = query.casefold() if query else None source_cf = source.casefold() if source else None license_cf = license.casefold() if license else None if isinstance(tags, str): requested_tags = {tags.casefold()} elif tags is None: requested_tags = set() else: requested_tags = {str(tag).casefold() for tag in tags} results = [] for dataset in datasets: dataset_tags = {str(tag).casefold() for tag in dataset.get("tags", [])} if query_cf: haystack = " ".join( [str(dataset.get("name", ""))] + [str(tag) for tag in dataset.get("tags", [])] + [ str(dataset.get("description", "")), str(dataset.get("source", "")), str(dataset.get("license", "")), ] ).casefold() if query_cf not in haystack: continue if source_cf and source_cf not in str(dataset.get("source", "")).casefold(): continue if license_cf and license_cf not in str(dataset.get("license", "")).casefold(): continue if requested_tags: if match_all_tags: if not requested_tags.issubset(dataset_tags): continue elif not (requested_tags & dataset_tags): continue if not _matches_range(dataset.get("vertices"), min_nodes, max_nodes): continue if not _matches_range(dataset.get("edges"), min_edges, max_edges): continue results.append(dataset) return results
def _load_hgr_file(file_name: str): with open(file_name) as file: edges = 0 nodes = 0 mode = 0 w_l: List[int] = [] edge_l: List[Tuple[int, ...]] = [] read_count = 0 read_node = 0 for line in file: this_l = line.strip() if len(this_l) == 0 or this_l[0] == "%": pass elif nodes == 0: head = this_l.split(" ") edges = int(head[0]) nodes = int(head[1]) if len(head) == 3: mode = int(head[2]) elif read_count < edges: read_count += 1 entries = [int(r) for r in this_l.split(" ") if r != ""] if mode % 10 == 1 and len(entries) > 1: w_l += [int(entries[0])] edge_l += [tuple(entries[1:])] elif mode % 10 != 1 and len(entries) > 0: edge_l += [tuple(entries)] else: raise ValueError(f"Empty edge in file. {read_count} edges read.") elif read_node < nodes: read_node += 1 else: raise ValueError("File read to the end unexpectedly.") h = Hypergraph( edge_list=edge_l, weighted=(mode % 10) == 1, weights=w_l if mode % 10 == 1 else None, ) return h
[docs] def load_hypergraph(file_name: str | Path, *, fmt: str | None = None): """ Load a hypergraph from disk. Parameters ---------- file_name : str or path-like Input file path. fmt : {"json", "pickle", "hgr"} | None Optional override for the input format. If None (default), infer format from the file extension. Gzipped files with ``.gz`` suffix are supported for each local format, such as ``.json.gz`` and ``.hgx.gz``. """ file_name = str(file_name) if fmt is not None: fmt = fmt.lower() if file_name.lower().endswith(".gz"): if fmt in {"pickle", "pkl", "binary", "hgx"}: return _load_gzipped_hypergraph(file_name, "pickle") if fmt in {"json"}: return _load_gzipped_hypergraph(file_name, "json") if fmt in {"hgr"}: return _load_gzipped_hypergraph(file_name, "hgr") raise InvalidFormatError("fmt must be one of {'json', 'pickle', 'hgr'}") if fmt in {"pickle", "pkl", "binary", "hgx"}: return load_pickle(file_name) if fmt in {"json"}: return load_json_file(file_name) if fmt in {"hgr"}: return _load_hgr_file(file_name) raise InvalidFormatError("fmt must be one of {'json', 'pickle', 'hgr'}") payload_format = _infer_local_payload_format(file_name) if file_name.lower().endswith(".gz"): return _load_gzipped_hypergraph(file_name, payload_format) if payload_format == "pickle": return load_pickle(file_name) if payload_format == "json": return load_json_file(file_name) if payload_format == "hgr": return _load_hgr_file(file_name) raise InvalidFileTypeError("Invalid file type")
[docs] def download_remote_dataset( dataset_name: str, *, fmt: str | None = "hgx", timeout: int = 30, verify_ssl: bool = False, cache_dir=None, overwrite: bool = False, catalog_url: str | None = None, use_catalog: bool = True, dataset_info: dict | None = None, ): """ Download and cache a remote dataset without loading it into memory. Parameters ---------- dataset_name : str Dataset identifier, such as ``"zoo"`` or ``"contacts-hospital"``. fmt : {"hgx", "binary", "json"} or None, default="hgx" Remote format to download. If explicitly set to None, JSON URLs are tried first, then binary URLs. timeout : int, default=30 Download timeout in seconds. verify_ssl : bool, default=False Whether to verify TLS certificates. cache_dir : path-like, optional Cache directory. Defaults to ``~/.cache/hypergraphx/datasets`` or the ``HYPERGRAPHX_DATA_CACHE`` environment variable. overwrite : bool, default=False If True, re-download even when a matching cached file exists. catalog_url : str, optional Catalog metadata URL used to resolve dataset download URLs. use_catalog : bool, default=True If True, resolve download URLs from the remote catalog before falling back to legacy hard-coded URL patterns. dataset_info : dict, optional Already loaded catalog entry. Passing this avoids reloading the catalog when downloading many datasets. Returns ------- pathlib.Path Local decompressed cache path, suitable for ``load_hypergraph(...)``. """ url_list, last_error = _resolve_remote_dataset_urls( dataset_name, fmt, timeout=timeout, verify_ssl=verify_ssl, catalog_url=catalog_url, use_catalog=use_catalog, dataset_info=dataset_info, ) for url in url_list: try: return _download_remote_dataset_file( dataset_name, url, timeout=timeout, verify_ssl=verify_ssl, cache_dir=cache_dir, overwrite=overwrite, ) except Exception as exc: last_error = exc continue urls = ", ".join(url_list) if isinstance(last_error, (ConnectionError, URLError)): raise ConnectionError( f"Failed to download '{dataset_name}' from server (network error). " f"Tried: {urls}. Last error: {last_error}." ) from last_error raise FileNotFoundError( f"Failed to download '{dataset_name}' from server. Tried: {urls}. Last error: {last_error}" ) from last_error
[docs] def download_remote_datasets( dataset_names=None, *, attributes=None, match_all: bool = True, fmt: str | None = "hgx", timeout: int = 30, verify_ssl: bool = False, cache_dir=None, overwrite: bool = False, catalog_url: str | None = None, continue_on_error: bool = False, progress_callback=None, ): """ Download and cache multiple remote datasets. Parameters ---------- dataset_names : str | Iterable[str], optional Dataset names, filenames, or directories to download explicitly. attributes : str | Iterable[str], optional Tag/category names used to select datasets from the catalog. If both ``dataset_names`` and ``attributes`` are provided, named datasets are filtered by the requested attributes. match_all : bool, default=True If True, selected datasets must contain all requested attributes. If False, any requested attribute is enough. fmt : {"hgx", "binary", "json"} or None, default="hgx" Remote format to download. timeout : int, default=30 Download timeout in seconds. verify_ssl : bool, default=False Whether to verify TLS certificates. cache_dir : path-like, optional Cache directory. Defaults to ``~/.cache/hypergraphx/datasets`` or the ``HYPERGRAPHX_DATA_CACHE`` environment variable. overwrite : bool, default=False If True, re-download even when matching cached files exist. catalog_url : str, optional Catalog metadata URL used to resolve dataset download URLs. continue_on_error : bool, default=False If True, keep downloading after a dataset fails and store the exception in that dataset's result record. If False, raise on the first failure. progress_callback : callable, optional Called after each dataset with its result record. Returns ------- dict Mapping from canonical dataset name to records with ``path``, ``metadata``, ``error``, and ``status`` fields. """ if dataset_names is None: if isinstance(attributes, str): selected_attributes = attributes else: selected_attributes = list(attributes or []) if not _has_selection_values(selected_attributes): raise ValueError("At least one dataset name or attribute must be provided.") else: requested_names = ( [dataset_names] if isinstance(dataset_names, str) else list(dataset_names) ) if not requested_names: raise ValueError("At least one dataset name or attribute must be provided.") if isinstance(attributes, str) or attributes is None: selected_attributes = attributes else: selected_attributes = list(attributes) datasets = list_remote_datasets( timeout=timeout, verify_ssl=verify_ssl, catalog_url=catalog_url, ) if dataset_names is None: selected = [ dataset for dataset in datasets if _attributes_match(dataset, selected_attributes, match_all) ] else: selected = [ _find_remote_dataset_info(dataset_name, datasets) for dataset_name in requested_names ] if selected_attributes is not None: selected = [ dataset for dataset in selected if _attributes_match(dataset, selected_attributes, match_all) ] results = {} for dataset in selected: name = dataset["name"] try: path = download_remote_dataset( name, fmt=fmt, timeout=timeout, verify_ssl=verify_ssl, cache_dir=cache_dir, overwrite=overwrite, catalog_url=catalog_url, use_catalog=False, dataset_info=dataset, ) result = { "path": path, "metadata": dataset, "error": None, "status": "downloaded", } except Exception as exc: result = { "path": None, "metadata": dataset, "error": exc, "status": "error", } if not continue_on_error: if progress_callback is not None: progress_callback(result) raise results[name] = result if progress_callback is not None: progress_callback(result) return results
[docs] def load_hypergraph_from_server( dataset_name: str, *, fmt: str | None = "hgx", as_dict: bool = False, timeout: int = 30, verify_ssl: bool = False, store: bool = True, cache_dir=None, overwrite: bool = False, catalog_url: str | None = None, use_catalog: bool = True, dataset_info: dict | None = None, ): """ Load a dataset by name from the remote Hypergraphx-data server. Parameters ---------- dataset_name : str Dataset identifier, such as ``"zoo"`` or ``"contacts-hospital"``. fmt : {"hgx", "binary", "json"} or None, default="hgx" Remote format to load. ``"hgx"`` and ``"binary"`` load the compact binary Hypergraphx format; ``"json"`` loads the JSON format. If explicitly set to None, JSON URLs are tried first, then binary URLs. as_dict : bool, default=False If True, return the exposed internal data-structure dictionary instead of a hypergraph object. timeout : int, default=30 Download timeout in seconds. verify_ssl : bool, default=False Whether to verify TLS certificates. Defaults to False for compatibility with the current dataset server certificate chain. store : bool, default=True Store the decompressed remote dataset locally before loading it. Cached files are reused on later calls. cache_dir : path-like, optional Cache directory. Defaults to ``~/.cache/hypergraphx/datasets`` or the ``HYPERGRAPHX_DATA_CACHE`` environment variable. overwrite : bool, default=False If True, re-download even when a matching cached file exists. catalog_url : str, optional Catalog metadata URL used to resolve dataset download URLs. use_catalog : bool, default=True If True, resolve download URLs from the remote catalog before falling back to legacy hard-coded URL patterns. dataset_info : dict, optional Already loaded catalog entry. Passing this avoids reloading the catalog when loading many datasets. Returns ------- Hypergraph | DirectedHypergraph | TemporalHypergraph | MultiplexHypergraph | dict Loaded hypergraph object, or its exposed dictionary if ``as_dict=True``. Notes ----- The loader tries current per-dataset ``.json.gz`` / ``.hgx.gz`` URLs first and keeps older flat URLs as fallbacks. When ``store=True``, compressed downloads are decompressed before being written to the cache. """ last_error = None url_list, last_error = _resolve_remote_dataset_urls( dataset_name, fmt, timeout=timeout, verify_ssl=verify_ssl, catalog_url=catalog_url, use_catalog=use_catalog, dataset_info=dataset_info, ) for url in url_list: tmp = None try: payload_format = _remote_payload_format(url) if store: cache_path = _download_remote_dataset_file( dataset_name, url, timeout=timeout, verify_ssl=verify_ssl, cache_dir=cache_dir, overwrite=overwrite, ) obj = _load_remote_payload_from_path(cache_path, payload_format) else: payload = _decompress_gzip_if_needed( _download(url, timeout=timeout, verify_ssl=verify_ssl) ) if payload_format == "json": obj = _parse_json_bytes_to_hypergraph(payload) else: with tempfile.NamedTemporaryFile(delete=False) as tmp: tmp.write(payload) tmp.flush() obj = load_pickle(tmp.name) _ensure_hypergraph_obj(obj) return obj if not as_dict else obj.expose_data_structures() except Exception as exc: last_error = exc continue finally: if tmp is not None: try: os.unlink(tmp.name) except OSError: pass urls = ", ".join(url_list) if isinstance(last_error, (ConnectionError, URLError)): raise ConnectionError( f"Failed to load '{dataset_name}' from server (network error). " f"Tried: {urls}. Last error: {last_error}. " "Are you offline? For offline use, download the dataset and use load_hypergraph(...) on a local file." ) from last_error raise FileNotFoundError( f"Failed to load '{dataset_name}' from server. Tried: {urls}. Last error: {last_error}" ) from last_error
[docs] def load(obj_or_path: str | Iterable): if isinstance(obj_or_path, str): return load_hypergraph(obj_or_path) if isinstance( obj_or_path, (Hypergraph, DirectedHypergraph, MultiplexHypergraph, TemporalHypergraph, dict), ): return obj_or_path if isinstance(obj_or_path, Iterable): hgs = [] for item in obj_or_path: if isinstance(item, str): hgs.append(load_hypergraph(item)) else: _ensure_hypergraph_obj(item) hgs.append(item) return hgs _ensure_hypergraph_obj(obj_or_path) return obj_or_path