Source code for hypergraphx.linalg.linalg

from __future__ import annotations
from typing import Any, Dict, List, Literal, Optional, Tuple
import logging
import numpy as np
from scipy import sparse
from scipy.sparse import csc_array
from scipy.special import factorial
from hypergraphx import Hypergraph, TemporalHypergraph
from hypergraphx.utils.labeling import get_inverse_mapping

SparseFormat = Literal["csr", "csc"]


def _as_sparse_format(matrix: sparse.sparray, fmt: SparseFormat) -> sparse.sparray:
    if fmt == "csr":
        return matrix.tocsr()
    if fmt == "csc":
        return matrix.tocsc()
    raise ValueError(f"Unsupported sparse format: {fmt!r}. Expected 'csr' or 'csc'.")


[docs] def hye_list_to_binary_incidence( hye_list: List[Tuple[int]], shape: Optional[Tuple[int]] = None ) -> sparse.coo_array: """Convert a list of hyperedges into a scipy sparse COO array. The hyperedges need to be list of integers, representing nodes, starting from 0. If no shape is provided, this is inferred from the hyperedge list as (N, E). N is the number of nodes, given by the maximum integer observed in the hyperedge list plus one (since the node index starts from 0). E is the number of hyperedges in the list. If not None, the shape can only specify a tuple (N', E') where N' is greater or equal than the N inferred from the hyperedge list, and E' is greater or equal than the number of hyperedges in the list. Parameters ---------- hye_list: the list of hyperedges. Every hyperedge is represented as a tuple of integer nodes. shape: the shape of the adjacency matrix, passed to the array constructor. If None, it is inferred. Returns ------- The binary adjacency matrix representing the hyperedges. """ rows = [] columns = [] for j, hye in enumerate(hye_list): # If there are repeated nodes in the hyperedge, count them once set_hye = set(hye) rows.extend(list(set_hye)) columns.extend([j] * len(set_hye)) if len(rows) > 0: inferred_N = max(rows) + 1 else: inferred_N = 0 inferred_E = len(hye_list) # inferred_N = shape[0] # inferred_E = shape[1] if shape is not None: if shape[0] < inferred_N or shape[1] < inferred_E: raise ValueError("Provided shape incompatible with input hyperedge list.") else: shape = (inferred_N, inferred_E) data = np.ones_like(rows) return sparse.coo_array((data, (rows, columns)), shape=shape, dtype=np.uint8)
[docs] def binary_incidence_matrix( hypergraph: Hypergraph, return_mapping: bool = False, format: SparseFormat = "csr", ) -> sparse.sparray | Tuple[sparse.sparray, Dict[int, Any]]: """Produce the binary incidence matrix representing a hypergraph. For any node i and hyperedge e, the entry (i, e) of the binary incidence matrix is 1 if the node belongs to the hyperedge, 0 otherwise. Parameters ---------- hypergraph: instance of the class Hypergraph. Every hyperedge is represented as either a tuple or list of nodes. return_mapping: return the dictionary mapping the new node indices to the hypergraph nodes. The node indices in the incidence matrix vary from 0 to N-1, where N is the total number of distinct nodes. Returns ------- The binary adjacency matrix representing the hyperedges. If return_mapping is True, return the dictionary of node mappings. """ encoder = hypergraph.get_mapping() hye_list = [tuple(encoder.transform(hye)) for hye in hypergraph.get_edges()] shape = (hypergraph.num_nodes(), hypergraph.num_edges()) incidence = _as_sparse_format(hye_list_to_binary_incidence(hye_list, shape), format) if return_mapping: mapping = get_inverse_mapping(encoder) return incidence, mapping return incidence
[docs] def incidence_matrix( hypergraph: Hypergraph, return_mapping: bool = False, format: SparseFormat = "csr", ) -> sparse.sparray | Tuple[sparse.sparray, Dict[int, Any]]: """Produce the incidence matrix representing a hypergraph. For any node i and hyperedge e, the entry (i, e) of the binary incidence matrix is the weight of the hyperedge if the node belongs to it, 0 otherwise. Parameters ---------- hypergraph: instance of the class Hypergraph. Every hyperedge is represented as either a tuple or list of nodes. return_mapping: return the dictionary mapping the new node indices to the hypergraph nodes. The node indices in the incidence matrix vary from 0 to N-1, where N is the total number of distinct nodes. Returns ------- The binary adjacency matrix representing the hyperedges. If return_mapping is True, return the dictionary of node mappings. """ binary_incidence, mapping = binary_incidence_matrix( hypergraph, return_mapping=True, format=format ) incidence = _as_sparse_format( binary_incidence.multiply(hypergraph.get_weights()), format ) if return_mapping: return incidence, mapping return incidence
[docs] def incidence_matrix_by_order( hypergraph: Hypergraph, order: int, shape: Optional[Tuple[int]] = None, keep_isolated_nodes: bool = False, return_mapping: bool = False, format: SparseFormat = "csr", ) -> sparse.spmatrix: """Produce the incidence matrix of a hypergraph at a given order. For any node i and hyperedge e, the entry (i, e) of the incidence matrix is the weight of the hyperedge if the node belongs to it, 0 otherwise. Parameters ---------- hypergraph The hypergraph. order The order of the hyperedges to consider. shape The shape of the incidence matrix. keep_isolated_nodes If True, keep the isolated nodes in the incidence matrix. return_mapping If True, return the dictionary mapping the node indices in the matrix to the hypergraph nodes. Returns ------- The incidence matrix. If return_mapping is True, return the dictionary of node mappings. """ binary_incidence, mapping = binary_incidence_matrix( hypergraph.get_edges( order=order, subhypergraph=True, keep_isolated_nodes=keep_isolated_nodes ), return_mapping=True, format=format, ) incidence = _as_sparse_format( binary_incidence.multiply(hypergraph.get_weights(order=order)), format ) if return_mapping: return incidence, mapping return incidence
[docs] def incidence_matrices_all_orders( hypergraph: Hypergraph, shape: Optional[Tuple[int]] = None, keep_isolated_nodes: bool = False, return_mapping: bool = False, format: SparseFormat = "csr", ) -> ( Dict[int, sparse.spmatrix] | Tuple[Dict[int, sparse.spmatrix], Dict[int, Dict[int, Any]]] ): """Produce the incidence matrices of a hypergraph at all orders. For any node i and hyperedge e, the entry (i, e) of the incidence matrix is the weight of the hyperedge if the node belongs to it, 0 otherwise. Parameters ---------- hypergraph The hypergraph. shape The shape of the incidence matrix. keep_isolated_nodes If True, keep the isolated nodes in the incidence matrix. return_mapping If True, return the dictionary mapping the node indices in the matrix to the hypergraph nodes. Returns ------- Dictionary mapping each order to its incidence matrix. """ incidence_matrices = {} mappings: Dict[int, Dict[int, Any]] = {} for order in range(1, hypergraph.max_order() + 1): if return_mapping: incidence_matrices[order], mappings[order] = incidence_matrix_by_order( hypergraph, order, shape, keep_isolated_nodes, return_mapping, format=format, ) else: incidence_matrices[order] = incidence_matrix_by_order( hypergraph, order, shape, keep_isolated_nodes, return_mapping, format=format, ) if return_mapping: return incidence_matrices, mappings return incidence_matrices
[docs] def adjacency_matrix( hypergraph: Hypergraph, return_mapping: bool = False, format: SparseFormat = "csr", ) -> sparse.sparray | Tuple[sparse.sparray, Dict[int, Any]]: """Compute the adjacency matrix of the hypergraph. For any two nodes i, j in the hypergraph, the entry (i, j) of the adjacency matrix counts the number of hyperedges where both i and j are contained. Parameters ---------- hypergraph: the hypergraph. return_mapping: return the dictionary mapping the new node indices to the hypergraph nodes. The node indices in the adjacency matrix vary from 0 to N-1, where N is the total number of distinct nodes. Returns ------- The adjacency matrix of the hypergraph. If return_mapping is True, return the dictionary of node mappings. """ incidence, mapping = binary_incidence_matrix( hypergraph, return_mapping=True, format=format ) adj = _as_sparse_format(incidence @ incidence.transpose(), format) adj.setdiag(0) if return_mapping: return adj, mapping return adj
[docs] def adjacency_matrix_by_order( hypergraph: Hypergraph, order: int, return_mapping: bool = False, format: SparseFormat = "csr", ) -> sparse.sparray | Tuple[sparse.sparray, Dict[int, Any]]: """Compute the adjacency matrix of the hypergraph by order. For any two nodes i, j in the hypergraph, the entry (i, j) of the adjacency matrix counts the number of hyperedges of a given order where both i and j are contained. Parameters ---------- hypergraph: the hypergraph. order: the order. Returns ------- The adjacency matrix of the hypergraph for a given order and the dictionary of node mappings. """ incidence, mapping = incidence_matrix_by_order( hypergraph, order, keep_isolated_nodes=True, return_mapping=True, format=format, ) adj = _as_sparse_format(incidence @ incidence.transpose(), format) adj.setdiag(0) if return_mapping: return adj, mapping else: return adj
[docs] def dual_random_walk_adjacency( hypergraph: Hypergraph, return_mapping: bool = False, format: SparseFormat = "csr", ) -> sparse.sparray | Tuple[sparse.sparray, Dict[int, Any]]: """Compute the adjacency matrix matrix associated to the dual hypergraph random walk. For any two hyperedges e, f in the hypergraph, the entry (e, f) of the random walk adjacency has value 1 if their intersection is non-null, else 0. This is the matrix of adjacency between hyperedges in the dual hypergraph. Parameters ---------- hypergraph: the hypergraph. return_mapping: If True, return a mapping from edge indices in the matrix (0..E-1) back to the corresponding hyperedge keys in the input hypergraph. format: Sparse format of the returned matrix ("csr" by default). Returns ------- The random walk adjacency matrix of the hypergraph. If return_mapping is True, return the dictionary of edge mappings. """ incidence = binary_incidence_matrix(hypergraph, return_mapping=False, format=format) adj = incidence.transpose() @ incidence adj.data = np.ones_like(adj.data) adj = _as_sparse_format(adj, format) if return_mapping: edge_mapping = {i: edge for i, edge in enumerate(hypergraph.get_edges())} return adj, edge_mapping return adj
[docs] def degree_matrix( hypergraph: Hypergraph, order: int, mapping: Optional[Dict[int, Any]] = None, return_mapping: bool = False, format: SparseFormat = "csr", ) -> sparse.sparray | Tuple[sparse.sparray, Dict[int, Any]]: """Compute the degree matrix of the hypergraph for a given order. The returned matrix is node-indexed: entry (i, i) equals the degree of node `mapping[i]` considering only hyperedges of the given `order`. """ if mapping is None: mapping = get_inverse_mapping(hypergraph.get_mapping()) degree_dct = hypergraph.degree_sequence(order) n = len(mapping) degrees = [degree_dct.get(mapping[i], 0) for i in range(n)] deg = sparse.diags(degrees, format="csr") deg = sparse.csr_array(deg) deg = _as_sparse_format(deg, format) if return_mapping: return deg, mapping return deg
[docs] def laplacian_matrix_by_order( hypergraph: Hypergraph, order: int, weighted=False, shape: Optional[Tuple[int]] = None, return_mapping: bool = False, format: SparseFormat = "csr", ) -> sparse.sparray | Tuple[sparse.sparray, Dict[int, Any]]: incidence, mapping = incidence_matrix_by_order( hypergraph, order, shape, keep_isolated_nodes=True, return_mapping=True, format=format, ) degree_mtx = degree_matrix(hypergraph, order, mapping=mapping, format=format) laplacian = degree_mtx.multiply(order + 1) - (incidence @ incidence.transpose()) laplacian = _as_sparse_format(laplacian, format) if weighted: scale_factor = factorial(order - 1) laplacian = laplacian.multiply(scale_factor) if return_mapping: return laplacian, mapping return laplacian
[docs] def laplacian_matrices_all_orders( hypergraph: Hypergraph, weighted: bool = False, shape: Optional[Tuple[int]] = None, return_mapping: bool = False, format: SparseFormat = "csr", ) -> ( Dict[int, sparse.spmatrix] | Tuple[Dict[int, sparse.spmatrix], Dict[int, Dict[int, Any]]] ): laplacian_matrices = {} mappings: Dict[int, Dict[int, Any]] = {} for order in range(1, hypergraph.max_order() + 1): if return_mapping: laplacian_matrices[order], mappings[order] = laplacian_matrix_by_order( hypergraph, order, weighted, shape, return_mapping=True, format=format, ) else: laplacian_matrices[order] = laplacian_matrix_by_order( hypergraph, order, weighted, shape, return_mapping=False, format=format, ) if return_mapping: return laplacian_matrices, mappings return laplacian_matrices
[docs] def compute_multiorder_laplacian( hypergraph: Hypergraph, sigmas, order_weighted=False, degree_weighted=True ) -> sparse.spmatrix: if not type(sigmas) == np.ndarray: sigmas = np.array(sigmas) laplacians = laplacian_matrices_all_orders(hypergraph, order_weighted) laplacians = [laplacians[key] for key in sorted(laplacians.keys(), reverse=False)] weighted_laplacians = [ laplacian.multiply(sigma) for laplacian, sigma in zip(laplacians, sigmas) ] if degree_weighted: average_degrees = [ np.average(list(hypergraph.degree_sequence(order).values())) for order in range(1, hypergraph.max_order() + 1) ] weighted_laplacians = [ laplacian.multiply(1.0 / degree) for laplacian, degree in zip(weighted_laplacians, average_degrees) ] multiorder_laplacian = sum(weighted_laplacians) return multiorder_laplacian
[docs] def are_commuting(laplacian_matrices: List[sparse.spmatrix], verbose=True) -> bool: logger = logging.getLogger(__name__) orders = len(laplacian_matrices) for d1 in range(orders - 1): laplacian_d1 = laplacian_matrices[d1] for d2 in range(d1 + 1, orders): laplacian_d2 = laplacian_matrices[d2] d1d2_product = laplacian_d1.dot(laplacian_d2) d2d1_product = laplacian_d2.dot(laplacian_d1) commutator = d1d2_product - d2d1_product if not commutator.any(): if verbose: logger.info("The Laplacian matrices do not commute") return False if verbose: logger.info("The Laplacian matrices commute") return True
[docs] def adjacency_tensor(hypergraph: Hypergraph) -> np.ndarray: """ Compute the tensor of a uniform hypergraph. For a hypergraph of order m, create a tensor of order m+1 where i,j,k... is 1 if the hyperedge (i,j,k...) is in the hypergraph and 0 otherwise Parameters ---------- hypergraph : Hypergraph The uniform hypergraph on which the tensor is computed. Returns ------- T : np.ndarray The tensor of the hypergraph. """ if not hypergraph.is_uniform(): raise Exception("The hypergraph is not uniform.") order = len(hypergraph.get_edges()[0]) - 1 T = np.zeros((hypergraph.num_nodes(),) * (order + 1)) for edge in hypergraph.get_edges(): from itertools import permutations for perm in permutations(edge): T[tuple(perm)] = 1 return T
[docs] def adjacency_factor(hypergraph: Hypergraph | TemporalHypergraph, t: int = 0): if isinstance(hypergraph, Hypergraph): matrix, mapping = hypergraph.adjacency_matrix(return_mapping=True) elif isinstance(hypergraph, TemporalHypergraph): matrix, mapping = hypergraph.annealed_adjacency_matrix(return_mapping=True) else: raise ValueError("An Hypergraph or Temporal Hypergraph must be provided.") new_mapping = dict() for k, v in mapping.items(): new_mapping[v] = k mapping = new_mapping res = dict() for node1 in hypergraph.get_nodes(): res[node1] = 0 for node2 in hypergraph.get_nodes(): if node1 != node2: try: val = round(float(matrix[mapping[node1], mapping[node2]]), 3) except IndexError: val = round(float(matrix[mapping[node2], mapping[node1]]), 3) if val != 0: res[node1] += val**t return res
# Temporal Hypergraph Adjacency Matrix
[docs] def temporal_adjacency_matrix( temporal_hypergraph: TemporalHypergraph, return_mapping: bool = False, format: SparseFormat = "csr", ) -> ( Dict[int, sparse.sparray] | Tuple[Dict[int, sparse.sparray], dict[int, dict[int, int]]] ): """ Compute the temporal adjacency matrix of the temporal hypergraph by order. For any two nodes i, j in the hypergraph, the entry (i, j) of the adjacency matrix at time t counts the number of hyperedges of a given order, existing at time t, where both i and j are contained. Parameters ---------- temporal_hypergraph: TemporalHypergraph. return_mapping: bool,optional Return the dictionary mapping the new node indices to the Temporal Hypergraph Returns ------- temporal_adjacency_matrixes: Dict[int, sparse.csc_array] A dictionary encoding the temporal adjacency matrixes, i.e., {time : adjacency matrix} mapping: Dict[int, Dict[int,int]] The dictionary of node mappings for each adjacency matrix. """ temporal_adjacency_matrixes = {} subhypergraphs = temporal_hypergraph.subhypergraph() mapping = dict() for t in subhypergraphs.keys(): hypergraph_t = subhypergraphs[t] adj_t, matrix_map = adjacency_matrix( hypergraph_t, return_mapping=True, format=format ) if return_mapping: mapping[t] = matrix_map temporal_adjacency_matrixes[t] = adj_t if return_mapping: return temporal_adjacency_matrixes, mapping else: return temporal_adjacency_matrixes
[docs] def temporal_adjacency_matrix_by_order( temporal_hypergraph: TemporalHypergraph, order: int, return_mapping: bool = False, format: SparseFormat = "csr", ) -> ( Dict[int, sparse.sparray] | Tuple[Dict[int, sparse.sparray], dict[int, dict[int, int]]] ): """Compute the temporal adjacency matrix of the temporal hypergraph by order. For any two nodes i, j in the hypergraph, the entry (i, j) of the adjacency matrix at time t counts the number of hyperedges of a given order, existing at time t, where both i and j are contained. Parameters ---------- temporal_hypergraph: TemporalHypergraph. order: int The specific order to evaluate return_mapping: bool,optional Return the dictionary mapping the new node indices to the Temporal Hypergraph Returns ------- temporal_adjacency_matrixes: Dict[int, sparse.csc_array] A dictionary encoding the temporal adjacency matrixes, i.e., {time : adjacency matrix} mapping: Dict[int, Dict[int,int]] The dictionary of node mappings for each adjacency matrix. """ temporal_adjacency = {} subhypergraphs = temporal_hypergraph.subhypergraph() mapping = dict() for t in subhypergraphs.keys(): hypergraph_t = subhypergraphs[t] adj_t, matrix_map = adjacency_matrix_by_order( hypergraph_t, order, return_mapping=return_mapping, format=format ) temporal_adjacency[t] = adj_t if return_mapping: mapping[t] = matrix_map if return_mapping: return temporal_adjacency, mapping else: return temporal_adjacency
[docs] def temporal_adjacency_matrices_all_orders( temporal_hypergraph: TemporalHypergraph, max_order: int = None, return_mapping: bool = False, format: SparseFormat = "csr", ) -> ( dict[int, Tuple[sparse.sparray]] | Tuple[Dict[int, Tuple[sparse.sparray]], dict[int, dict[int, int]]] ): """Compute the temporal adjacency matrices of the temporal hypergraph for all orders. For any two nodes i, j in the hypergraph, the entry (i, j) of the adjacency matrix of order d at time t counts the number of hyperedges of order d, existing at time t, where both i and j are contained. Parameters ---------- temporal_hypergraph: TemporalHypergraph max_order: int | None, optional The maximum order of the hypergraph. If not specified will be automatically selected return_mapping: bool,optional Return the dictionary mapping the new node indices to the Temporal Hypergraph Returns ------- temporal_adjacencies: Dict[int, sparse.csc_array] A dictionary encoding the temporal adjacency matrixes, i.e., {order : {time : adjacency matrix}} mapping: Dict[int, Dict[int,int]] The dictionary of node mappings for each adjacency matrix. """ temporal_adjacencies = {} if max_order is None: max_order = temporal_hypergraph.max_order() mapping_dict = dict() for order in range(1, max_order + 1): adj_order_d, mapping = temporal_adjacency_matrix_by_order( temporal_hypergraph, order, return_mapping=True, format=format ) temporal_adjacencies[order] = adj_order_d mapping_dict[order] = mapping if return_mapping: return temporal_adjacencies, mapping_dict else: return temporal_adjacencies
[docs] def annealed_adjacency_matrix( temporal_hypergraph: TemporalHypergraph, return_mapping: bool = False, format: SparseFormat = "csr", ) -> sparse.sparray | Tuple[sparse.sparray, Dict[int, Any]]: """Compute the annealed adjacency matrix of the temporal hypergraph by order. For any two nodes i, j in the hypergraph, the entry (i, j) of the adjacency matrix counts the average number of hyperedges of a given order where both i and j are contained over time. Parameters ---------- temporal_hypergraph: TemporalHypergraph return_mapping: bool,optional Return the dictionary mapping the new node indices to the Temporal Hypergraph Returns ------- matrix: sparse.csc_array The annealed adjacency matrix for a given order. return_mapping: bool,optional Return the dictionary mapping the new node indices to the Temporal Hypergraph """ encoder = temporal_hypergraph.get_mapping() temporal_adjacency_matrix, mapping = temporal_hypergraph.temporal_adjacency_matrix( return_mapping=True ) T = len(temporal_adjacency_matrix.keys()) temporal_adjacency_matrix_lst = temporal_adjacency_matrix.values() res = dict() t = min(mapping.keys()) for matrix in temporal_adjacency_matrix_lst: for j in range(matrix.shape[1]): for i in range(matrix.indptr[j], matrix.indptr[j + 1]): row = mapping[t][matrix.indices[i]] column = mapping[t][j] if row != column: cell = (row, column) cell = tuple(sorted(cell)) if cell not in res.keys(): res[cell] = 0 res[cell] += matrix.data[i] t += 1 res = {k: v / T for k, v in res.items()} matrix_row = [] matrix_col = [] matrix_val = [] for k, v in res.items(): matrix_row.append(encoder.transform([k[0]])[0]) matrix_col.append(encoder.transform([k[1]])[0]) matrix_val.append(v) matrix = csc_array((matrix_val, (matrix_row, matrix_col))) matrix = _as_sparse_format(matrix, format) if return_mapping: return matrix, get_inverse_mapping(encoder) else: return matrix
[docs] def annealed_adjacency_matrices_all_orders( temporal_hypergraph: TemporalHypergraph, ) -> dict[int, sparse.sparray]: """Compute the annealed adjacency matrices of the temporal hypergraph for all orders. For any two nodes i, j in the hypergraph, the entry (i, j) of the adjacency matrix of order d counts the average number of hyperedges of order d where both i and j are contained over time. Parameters ---------- temporal_hypergraph: TemporalHypergraph Returns ------- annealed_adjacency_matrices_order: dict[int, sparse.csc_array] The annealed adjacency matrix for all orders, i.e., {order : annealed adjacency matrix}. """ temporal_adjacency_matrices = temporal_adjacency_matrices_all_orders( temporal_hypergraph ) annealed_adjacency_matrices_order = dict() for order, temporal_adjacency_matrix in temporal_adjacency_matrices.items(): T = len(temporal_adjacency_matrix.keys()) temporal_adjacency_matrix_lst = temporal_adjacency_matrix.values() annealed_adjacency_matrix = sum(temporal_adjacency_matrix_lst) / T annealed_adjacency_matrices_order[order] = annealed_adjacency_matrix return annealed_adjacency_matrices_order