Source code for hypergraphx.filters.statistical_filters

from collections import Counter, defaultdict
import logging
from functools import reduce
from itertools import combinations
from multiprocessing import Pool, cpu_count

import numpy as np
import pandas as pd
import scipy.stats as st
from scipy.special import binom


def _approximated_pvalue(t):
    """
    Return approximated p-value.

    Parameters
    ----------

    t   :tuple
        Tuple containing (N12,N,N1,N2,...Nn)

    Returns
    -------
    p-value:    float
                approximated p-value associated to the input tuple
    """
    n12 = t[0]
    n = t[1]
    ns = np.array(t[2:])
    order = len(ns)
    p = st.binom.sf(n12 - 1, p=np.prod(ns / n), n=n)
    if p >= 0:
        return p
    else:
        logging.getLogger(__name__).debug("%s %s %s", n12, n, ns)
        return p


def _get_bipartite_representation(hypergraph):
    def _as_int_multiplicity(w):
        if w is None:
            return 1
        if isinstance(w, bool):
            return int(w)
        if isinstance(w, (int, np.integer)):
            if w < 0:
                raise ValueError("Edge weights must be non-negative.")
            return int(w)
        if isinstance(w, (float, np.floating)):
            if w < 0:
                raise ValueError("Edge weights must be non-negative.")
            if float(w).is_integer():
                return int(w)
        raise ValueError(
            "Statistical filters require integer edge weights (multiplicity). "
            "If your hypergraph is weighted with real-valued weights, provide an unweighted hypergraph "
            "or integer-valued multiplicities."
        )

    edge_index = 0
    bipartite_list = []
    for edge in hypergraph.get_edges():
        w = hypergraph.get_weight(edge) if hypergraph.is_weighted() else 1
        for _ in range(_as_int_multiplicity(w)):
            for node in edge:
                bipartite_list.append((node, edge_index))
            edge_index += 1
    bipartite_df = pd.DataFrame(bipartite_list, columns=["a", "b"])
    return bipartite_df


[docs] def get_svh( hypergraph, max_order=10, alpha=0.01, mp: bool = False, n_jobs: int | None = None, max_tests_per_order: int | None = None, ): """ Extract the Statistically Validated Hypergraph. Parameters ------------- hypergraph: Hypergraph Hypergraph object for which the SVH will be extracted. max_order: int Maximum order of the hyperlinks to be tested alpha: float Threshold of statistical significance for FDR validation. mp: Bool (default: False) Specify whether to use multiprocessing or not. n_jobs: int, optional Number of worker processes to use when `mp=True`. Defaults to `cpu_count()`. max_tests_per_order: int, optional Guardrail: maximum number of hypothesis tests per order. If exceeded, raises a ValueError. Returns ------------- svh: dict Dictionary where key is the order and value is a DataFrame with the result of validation. Each DataFrame is a Table with columns ['edge','pvalue','fdr']. 'edge' contains all the hyperlinks (mapped as tuples) present in the hypergraph 'pvalue' reports the pvalue 'fdr' is a bool that is True if the hyperlink belongs to the SVH, False otherwise """ df = _get_bipartite_representation(hypergraph) deg_set_b = df.groupby("b")["a"].count().reset_index() orders = deg_set_b.a.unique() orders = orders[(orders >= 2) & (orders <= max_order)] pvalues = {} for order in np.sort(orders): sub_deg = deg_set_b.query("a==@order").b.tolist() N = len(sub_deg) sub_edges = df.query("b in @sub_deg") tuples = ( sub_edges.groupby("b")["a"] .apply(lambda x: tuple(sorted(x))) .unique() .tolist() ) tuples_order = list(filter(lambda x: len(x) == order, tuples)) neigh_set_a_sub = dict( sub_edges.groupby("a")["b"].apply(list).reset_index().values ) tuples_params = [ ( len( reduce( lambda x, y: set(x).intersection(y), [neigh_set_a_sub[node] for node in edge], ) ), N, ) + tuple([len(neigh_set_a_sub[node]) for node in edge]) for edge in tuples_order ] if max_tests_per_order is not None and len(tuples_params) > max_tests_per_order: raise ValueError( f"Too many hypothesis tests (order={order}): {len(tuples_params)} > {max_tests_per_order}. " "Reduce max_order, pre-filter your hypergraph, or increase max_tests_per_order." ) if mp: p = Pool(processes=cpu_count() if n_jobs is None else n_jobs) pvalues[order] = dict( zip(tuples_order, p.map(_approximated_pvalue, tuples_params)) ) p.close() else: pvalues[order] = dict( zip(tuples_order, map(_approximated_pvalue, tuples_params)) ) svh = {} links = 0 links_order = {} for order in sorted(pvalues): n_a = len(set(np.concatenate(list(pvalues[order].keys())))) n_possible = binom(n_a, order) bonf = alpha / n_possible temp_df = pd.DataFrame(pvalues[order].items()) temp_df.columns = ["edge", "pvalue"] ps = np.sort(temp_df.pvalue) k = np.arange(1, len(ps) + 1) * bonf try: fdr = k[ps < k][-1] except: fdr = 0 temp_df["fdr"] = temp_df["pvalue"] < fdr svh[order] = temp_df return svh
[docs] def get_svc( hypergraph, min_order=2, max_order=None, alpha=0.01, mp: bool = False, n_jobs: int | None = None, max_groups: int | None = None, ): """ Extract the Statistically Validated Cores. Parameters ------------- hypergraph: Hypergraph Hypergraph object for which the SVS will be extracted. min_order: int Minimum size of the hyperlinks to be tested max_order: int Maximum size of the hyperlinks to be tested alpha: float Threshold of statistical significance for FDR validation. mp: bool (default: False) Specify whether to use multiprocessing or not. n_jobs: int, optional Number of worker processes to use when `mp=True`. Defaults to `cpu_count()`. max_groups: int, optional Guardrail: maximum number of groups tested per order. If exceeded, raises a ValueError. Returns ------------- svs: DataFrame The DataFrame is a Table with columns ['edge','pvalue','fdr']. 'group' contains all the cores (mapped as tuples) tested in the hypergraph 'pvalue' reports the pvalue 'fdr' is a bool that is True if the core has been validated, False otherwise """ df = _get_bipartite_representation(hypergraph) observables = df.groupby("b")["a"].apply(lambda x: tuple(sorted(x))).tolist() if max_order: max_order = min(max_order, max(map(len, observables))) else: max_order = max(map(len, observables)) s_groups = [] neigh_set_a_sub = dict(df.groupby("a")["b"].apply(list).reset_index().values) N = df.b.nunique() na = df.a.nunique() svh_dfs = [] for order in list(range(min_order, max_order + 1))[::-1]: drop = [] for l in list(map(lambda x: tuple(combinations(x, order)), s_groups)): drop.extend(l) deg_a = Counter(df.a) groups = defaultdict(int) for l in map( lambda x: tuple(combinations(x, order)), filter(lambda x: len(x) >= order, observables), ): for g in l: if g not in drop: groups[g] += 1 tuples_params = [ tuple([groups[i], N]) + tuple([deg_a[ii] for ii in i]) for i in groups ] if max_groups is not None and len(tuples_params) > max_groups: raise ValueError( f"Too many groups to test (order={order}): {len(tuples_params)} > {max_groups}. " "Increase max_groups, reduce max_order, or pre-filter your hypergraph." ) if mp: p = Pool(processes=cpu_count() if n_jobs is None else n_jobs) pvalues = dict(zip(groups, p.map(_approximated_pvalue, tuples_params))) p.close() else: pvalues = dict(zip(groups, map(_approximated_pvalue, tuples_params))) n_possible = binom(na, order) bonf = alpha / n_possible temp_df = pd.DataFrame(pvalues.items()) try: temp_df.columns = ["group", "pvalue"] except ValueError: temp_df = pd.DataFrame(columns=["group", "pvalue"]) ps = np.sort(temp_df.pvalue) k = np.arange(1, len(ps) + 1) * bonf try: fdr = k[ps < k][-1] except IndexError: fdr = 0 temp_df["w"] = temp_df.group.apply(lambda x: groups[x]) temp_df["fdr"] = temp_df["pvalue"] < fdr svh_dfs.append(temp_df) # .query('fdr')) s_groups_order = temp_df.query("fdr").group.tolist() s_groups.extend(s_groups_order) return pd.concat(svh_dfs)