Source code for hypergraphx.measures.reducibility

import itertools
import math
from collections import Counter

import numpy as np
from scipy.special import loggamma

from hypergraphx import Hypergraph
from hypergraphx.exceptions import InvalidParameterError


def reducibility(
    hg: Hypergraph, partition=None, optimization="exact", entropy_method="count"
):
    """
    Compute hypergraph reducibility and the representative set of layers.

    Parameters
    ----------
    hg : Hypergraph
        The hypergraph of interest.
    partition : list or dict, optional
        Node partition for multiscale reducibility.
        If list, it must align with hg.get_nodes() order.
        If dict, it must map every node to a group.
    optimization : {"exact", "greedy"}, optional
        Optimization method for selecting representatives.
    entropy_method : {"count", "project"}, optional
        Entropy computation method.

    Returns
    -------
    tuple
        (reducibility, representative_layers)

    Notes
    -----
    If you use this function in published work, please cite:
    Kirkley, Alec, Helcio Felippe, and Federico Battiston.
    "Structural Reducibility of Hypergraphs." Physical Review Letters 135.24 (2025): 247401.

    Examples
    --------
    >>> from hypergraphx import Hypergraph
    >>> from hypergraphx.measures.reducibility import reducibility
    >>> hg = Hypergraph()
    >>> hg.add_nodes([0, 1, 2, 3])
    >>> hg.add_edges([(0, 1), (1, 2), (2, 3), (0, 1, 2)])
    >>> eta, reps = reducibility(hg, entropy_method="count", optimization="exact")
    """
    if entropy_method not in {"count", "project"}:
        raise InvalidParameterError(
            'entropy_method must be either "count" or "project".'
        )
    if optimization not in {"exact", "greedy"}:
        raise InvalidParameterError('optimization must be either "exact" or "greedy".')

    if entropy_method == "project":
        M, Q = _get_entropies_project(hg, partition)
    else:
        M, Q = _get_entropies_count(hg, partition)

    max_layer = max(list(Q.keys()))

    if optimization == "exact":
        DLs = {}
        for reps in _powerset(list(Q.keys())):
            reps = set(reps)
            if max_layer not in reps:
                continue
            H = sum(Q[r] for r in reps)
            non_reps = set(Q.keys()) - reps
            for layer in non_reps:
                higher = [r for r in reps if r > layer]
                ces = [M[r][layer] for r in higher]
                best = higher[int(np.argmin(ces))]
                H += M[best][layer]
            DLs[tuple(sorted(reps))] = H
        Rstar = min(DLs, key=DLs.get)
        Hstar = DLs[Rstar]
    else:
        Rs, Hs = [], []
        reps = set([max_layer])
        remaining = set(Q.keys()) - reps
        Rs.append(list(reps))
        Hs.append(Q[max_layer] + sum(M[max_layer][l] for l in remaining))
        while len(remaining) > 0:
            cand_layers, cand_scores = [], []
            for candidate in remaining:
                reps_tmp = reps.union(set([candidate]))
                non_reps = remaining - set([candidate])
                H_tmp = sum(Q[r] for r in reps_tmp)
                for layer in non_reps:
                    higher = [r for r in reps_tmp if r > layer]
                    ces = [M[r][layer] for r in higher]
                    best = higher[int(np.argmin(ces))]
                    H_tmp += M[best][layer]
                cand_layers.append(candidate)
                cand_scores.append(H_tmp)
            best_index = int(np.argmin(cand_scores))
            reps.add(cand_layers[best_index])
            remaining.remove(cand_layers[best_index])
            Rs.append(list(reps))
            Hs.append(cand_scores[best_index])
        best_num_reps = int(np.argmin(Hs))
        Rstar = tuple(sorted(Rs[best_num_reps]))
        Hstar = Hs[best_num_reps]

    H0 = sum(Q[l] for l in Q)
    if H0 == Q[max_layer]:
        return 1.0, Rstar
    return (H0 - Hstar) / (H0 - Q[max_layer]), Rstar


def layer_reducibility(hg: Hypergraph, partition=None, entropy_method="count"):
    """
    Compute layer-wise reducibility.

    Parameters
    ----------
    hg : Hypergraph
        The hypergraph of interest.
    partition : list or dict, optional
        Node partition for multiscale reducibility.
    entropy_method : {"count", "project"}, optional
        Entropy computation method.

    Returns
    -------
    dict
        Mapping layer size -> reducibility value.

    Notes
    -----
    If you use this function in published work, please cite:
    Kirkley, Alec, Helcio Felippe, and Federico Battiston.
    "Structural Reducibility of Hypergraphs." Physical Review Letters 135.24 (2025): 247401.

    Examples
    --------
    >>> from hypergraphx import Hypergraph
    >>> from hypergraphx.measures.reducibility import layer_reducibility
    >>> hg = Hypergraph()
    >>> hg.add_nodes([0, 1, 2, 3])
    >>> hg.add_edges([(0, 1), (1, 2), (2, 3), (0, 1, 2)])
    >>> etas = layer_reducibility(hg, entropy_method="count")
    """
    if entropy_method not in {"count", "project"}:
        raise InvalidParameterError(
            'entropy_method must be either "count" or "project".'
        )
    if entropy_method == "project":
        M, Q = _get_entropies_project(hg, partition)
    else:
        M, Q = _get_entropies_count(hg, partition)

    lmax = max(l for l in Q)
    etas = {}
    for l in Q:
        Hl = Q[l]
        best_rep = None
        best_ce = np.inf
        for k in Q:
            if k > l:
                ce = M[k][l]
                if ce <= best_ce:
                    best_ce = ce
                    best_rep = k
        if l == lmax:
            etas[l] = 0.0
        elif Hl == 0:
            etas[l] = 1.0 if best_rep != l else 0.0
        else:
            etas[l] = 1.0 - best_ce / Hl
    return etas


def _logchoose(n, k):
    """Log binomial coefficient."""
    if len(str(n)) > 300:  # stable approximation for n >> k
        if (k == 0) or (k == n):
            return 0.0
        return k * math.log(n) - k * math.log(k) + k
    n, k = float(n), float(k)
    return loggamma(n + 1) - loggamma(k + 1) - loggamma(n - k + 1)


def _logmultiset(n, k):
    """Log multiset coefficient."""
    return _logchoose(n + k - 1, k)


def _powerset(iterable):
    """Powerset of iterable, returned as a list without the empty set."""
    items = list(iterable)
    pset = list(
        itertools.chain.from_iterable(
            itertools.combinations(items, r) for r in range(len(items) + 1)
        )
    )
    return pset[1:]


def _normalize_partition(partition, hg: Hypergraph):
    if partition is None:
        return None
    nodes = hg.get_nodes()
    if isinstance(partition, dict):
        try:
            return {node: partition[node] for node in nodes}
        except KeyError as exc:
            raise InvalidParameterError(
                "Partition dict must include every node in the hypergraph."
            ) from exc
    if len(partition) != len(nodes):
        raise InvalidParameterError(
            "Partition list must match the number of nodes in the hypergraph."
        )
    return {node: group for node, group in zip(nodes, partition)}


def _projection(edges, sizes):
    """
    Project a collection of hyperedges onto layers with sizes in sizes.
    Returns a dict {size: set(edges)}.
    """
    sizes = sorted(sizes)[::-1]
    projections = {}
    for size in sizes:
        projections[size] = set()
        for edge in edges:
            for combo in itertools.combinations(edge, size):
                projections[size].add(combo)
    return projections


def _get_layers(hg: Hypergraph, sizes="all"):
    """
    Return layers of hypergraph as a dict {size: set(edges)}.
    """
    edges_by_size = hg.edges_by_size(index_by="edge_key")
    if sizes == "all":
        sizes = list(edges_by_size.keys())
    layers = {}
    for size in sizes:
        layers[size] = set(edges_by_size.get(size, []))
    return layers


def _coarse_grain(edges, partition_map):
    """
    Return coarse-grained hypergraph according to node partition.
    Returns a dict {tuple: count} to represent a multiset.
    """
    coarse = {}
    for edge in edges:
        mapped = [partition_map[node] for node in edge]
        new_edge = tuple(sorted(mapped))
        coarse[new_edge] = coarse.get(new_edge, 0) + 1
    return coarse


def _all_projections(layers):
    """
    Return dict P such that P[k][l] is projection of layer k onto tuples of size l.
    """
    projections = {}
    indices = sorted(list(layers.keys()))[::-1]
    for index, k in enumerate(indices):
        projections[k] = {}
        remaining = indices[index + 1 :]
        for subindex, l in enumerate(remaining):
            if subindex == 0:
                projections[k][l] = _projection(layers[k], [l])[l]
            else:
                prev = remaining[subindex - 1]
                projections[k][l] = _projection(projections[k][prev], [l])[l]
    return projections


def _get_entropies_project(hg: Hypergraph, partition=None):
    """
    Returns:
        M: dict such that M[k][l] is conditional entropy of higher layer k to lower layer l.
        Q: dict such that Q[l] is entropy of layer l.
    """
    if not isinstance(hg, Hypergraph):
        raise InvalidParameterError("Reducibility supports undirected Hypergraph only.")

    partition_map = _normalize_partition(partition, hg)
    if partition_map is not None:
        num_blocks = len(set(partition_map.values()))

    num_nodes = hg.num_nodes()

    layers = _get_layers(hg)
    projections = _all_projections(layers)

    def entropy(size):
        edge_count = len(layers[size])
        if partition_map is not None:
            return _logmultiset(math.comb(num_blocks + size - 1, size), edge_count)
        return _logchoose(math.comb(num_nodes, size), edge_count)

    def conditional_entropy(k, l):
        edge_count = len(layers[l])
        proj_size = len(projections[k][l])
        if partition_map is not None:
            proj_coarse = _coarse_grain(projections[k][l], partition_map)
            lower_coarse = _coarse_grain(layers[l], partition_map)
            overlap = 0
            for edge in lower_coarse:
                if edge in proj_coarse:
                    overlap += min(proj_coarse[edge], lower_coarse[edge])
            return _logchoose(proj_size, overlap) + _logmultiset(
                math.comb(num_blocks + l - 1, l), edge_count - overlap
            )
        overlap = len(projections[k][l].intersection(layers[l]))
        return _logchoose(proj_size, overlap) + _logchoose(
            math.comb(num_nodes, l) - proj_size, edge_count - overlap
        )

    M, Q = {}, {}
    for k in layers:
        M[k] = {}
        for l in layers:
            if k > l:
                M[k][l] = conditional_entropy(k, l)
        Q[k] = entropy(k)
    return M, Q


def _get_entropies_count(hg: Hypergraph, partition=None):
    """
    Returns:
        M: dict such that M[k][l] is conditional entropy of higher layer k to lower layer l.
        Q: dict such that Q[l] is entropy of layer l.
    """
    if not isinstance(hg, Hypergraph):
        raise InvalidParameterError("Reducibility supports undirected Hypergraph only.")

    layers = _get_layers(hg)
    num_nodes = hg.num_nodes()
    partition_map = _normalize_partition(partition, hg)

    if partition_map is not None:
        num_blocks = len(set(partition_map.values()))

    M, Q = {}, {}
    indices = sorted(list(layers.keys()))[::-1]
    for ii, k in enumerate(indices):
        Ek = len(layers[k])
        if partition_map is not None:
            Q[k] = _logmultiset(math.comb(num_blocks + k - 1, k), Ek)
        else:
            Q[k] = _logchoose(math.comb(num_nodes, k), Ek)

        lower_indices = indices[ii + 1 :]

        def get_sizes_proj(layer):
            sizes = Counter({l: 0 for l in lower_indices})
            if len(layer) == 0:
                return sizes
            checked = []
            layer = list(layer)
            for edge in layer:
                overlaps = set()
                for prev in checked:
                    inter = set(edge).intersection(set(prev))
                    overlaps.add(tuple(sorted(list(inter))))
                sizes_max = Counter({l: math.comb(len(edge), l) for l in lower_indices})
                sizes += sizes_max - get_sizes_proj(overlaps)
                checked.append(edge)
            return sizes

        Ek2ls = get_sizes_proj(layers[k])

        M[k] = {}
        for l in lower_indices:
            El = len(layers[l])
            Ek2l = Ek2ls[l]
            overlap = 0
            if partition_map is not None:
                lower_coarse = _coarse_grain(layers[l], partition_map)
                for lower_tup in lower_coarse:
                    lower_counts = Counter(list(lower_tup))
                    num_lower_in_higher = 0
                    for higher_tup in layers[k]:
                        higher_counts = Counter(
                            [partition_map[node] for node in higher_tup]
                        )
                        hl_counts = higher_counts & lower_counts
                        if hl_counts == lower_counts:
                            num_lower_in_higher += np.prod(
                                [
                                    math.comb(higher_counts[i], lower_counts[i])
                                    for i in lower_counts
                                ]
                            )
                    overlap += min(num_lower_in_higher, lower_coarse[lower_tup])
                M[k][l] = _logchoose(Ek2l, overlap) + _logmultiset(
                    math.comb(num_blocks + l - 1, l), El - overlap
                )
            else:
                lower_tmp = layers[l].copy()
                for higher in layers[k]:
                    higher_set = set(higher)
                    overlapping = set()
                    for lower in lower_tmp:
                        if len(set(lower).intersection(higher_set)) == l:
                            overlapping.add(lower)
                    for edge in overlapping:
                        lower_tmp.remove(edge)
                        overlap += 1
                M[k][l] = _logchoose(Ek2l, overlap) + _logchoose(
                    math.comb(num_nodes, l) - Ek2l, El - overlap
                )
    return M, Q


[docs] def reducibility( hg: Hypergraph, partition=None, optimization="exact", entropy_method="count" ): """ Compute hypergraph reducibility and the representative set of layers. Parameters ---------- hg : Hypergraph The hypergraph of interest. partition : list or dict, optional Node partition for multiscale reducibility. If list, it must align with hg.get_nodes() order. If dict, it must map every node to a group. optimization : {"exact", "greedy"}, optional Optimization method for selecting representatives. entropy_method : {"count", "project"}, optional Entropy computation method. Returns ------- tuple (reducibility, representative_layers) Notes ----- If you use this function in published work, please cite: Kirkley, Alec, Helcio Felippe, and Federico Battiston. "Structural Reducibility of Hypergraphs." Physical Review Letters 135.24 (2025): 247401. Examples -------- >>> from hypergraphx import Hypergraph >>> from hypergraphx.measures.reducibility import reducibility >>> hg = Hypergraph() >>> hg.add_nodes([0, 1, 2, 3]) >>> hg.add_edges([(0, 1), (1, 2), (2, 3), (0, 1, 2)]) >>> eta, reps = reducibility(hg, entropy_method="count", optimization="exact") """ if entropy_method not in {"count", "project"}: raise InvalidParameterError( 'entropy_method must be either "count" or "project".' ) if optimization not in {"exact", "greedy"}: raise InvalidParameterError('optimization must be either "exact" or "greedy".') if entropy_method == "project": M, Q = _get_entropies_project(hg, partition) else: M, Q = _get_entropies_count(hg, partition) max_layer = max(list(Q.keys())) if optimization == "exact": DLs = {} for reps in _powerset(list(Q.keys())): reps = set(reps) if max_layer not in reps: continue H = sum(Q[r] for r in reps) non_reps = set(Q.keys()) - reps for layer in non_reps: higher = [r for r in reps if r > layer] ces = [M[r][layer] for r in higher] best = higher[int(np.argmin(ces))] H += M[best][layer] DLs[tuple(sorted(reps))] = H Rstar = min(DLs, key=DLs.get) Hstar = DLs[Rstar] else: Rs, Hs = [], [] reps = set([max_layer]) remaining = set(Q.keys()) - reps Rs.append(list(reps)) Hs.append(Q[max_layer] + sum(M[max_layer][l] for l in remaining)) while len(remaining) > 0: cand_layers, cand_scores = [], [] for candidate in remaining: reps_tmp = reps.union(set([candidate])) non_reps = remaining - set([candidate]) H_tmp = sum(Q[r] for r in reps_tmp) for layer in non_reps: higher = [r for r in reps_tmp if r > layer] ces = [M[r][layer] for r in higher] best = higher[int(np.argmin(ces))] H_tmp += M[best][layer] cand_layers.append(candidate) cand_scores.append(H_tmp) best_index = int(np.argmin(cand_scores)) reps.add(cand_layers[best_index]) remaining.remove(cand_layers[best_index]) Rs.append(list(reps)) Hs.append(cand_scores[best_index]) best_num_reps = int(np.argmin(Hs)) Rstar = tuple(sorted(Rs[best_num_reps])) Hstar = Hs[best_num_reps] H0 = sum(Q[l] for l in Q) if H0 == Q[max_layer]: return 1.0, Rstar return (H0 - Hstar) / (H0 - Q[max_layer]), Rstar
[docs] def layer_reducibility(hg: Hypergraph, partition=None, entropy_method="count"): """ Compute layer-wise reducibility. Parameters ---------- hg : Hypergraph The hypergraph of interest. partition : list or dict, optional Node partition for multiscale reducibility. entropy_method : {"count", "project"}, optional Entropy computation method. Returns ------- dict Mapping layer size -> reducibility value. Notes ----- If you use this function in published work, please cite: Kirkley, Alec, Helcio Felippe, and Federico Battiston. "Structural Reducibility of Hypergraphs." Physical Review Letters 135.24 (2025): 247401. Examples -------- >>> from hypergraphx import Hypergraph >>> from hypergraphx.measures.reducibility import layer_reducibility >>> hg = Hypergraph() >>> hg.add_nodes([0, 1, 2, 3]) >>> hg.add_edges([(0, 1), (1, 2), (2, 3), (0, 1, 2)]) >>> etas = layer_reducibility(hg, entropy_method="count") """ if entropy_method not in {"count", "project"}: raise InvalidParameterError( 'entropy_method must be either "count" or "project".' ) if entropy_method == "project": M, Q = _get_entropies_project(hg, partition) else: M, Q = _get_entropies_count(hg, partition) lmax = max(l for l in Q) etas = {} for l in Q: Hl = Q[l] best_rep = None best_ce = np.inf for k in Q: if k > l: ce = M[k][l] if ce <= best_ce: best_ce = ce best_rep = k if l == lmax: etas[l] = 0.0 elif Hl == 0: etas[l] = 1.0 if best_rep != l else 0.0 else: etas[l] = 1.0 - best_ce / Hl return etas