Source code for hypergraphx.motifs.utils

import itertools
import math
from collections import deque
from itertools import combinations, permutations


def _motifs_ho_not_full(edges, N, visited):
    mapping, labeling = generate_motifs(N)
    T = set()

    graph = {}
    for e in edges:
        if len(e) >= N:
            continue

        T.add(tuple(sorted(e)))

        for e_i in e:
            if e_i in graph:
                graph[e_i].append(e)
            else:
                graph[e_i] = [e]

    def count_motif(nodes):
        nodes = tuple(sorted(tuple(nodes)))
        p_nodes = power_set(nodes)

        motif = [
            edge_key
            for edge in p_nodes
            if len(edge) >= 2
            if (edge_key := tuple(sorted(edge))) in T
        ]

        m = {i: idx + 1 for idx, i in enumerate(nodes)}

        labeled_motif = []
        for e in motif:
            new_e = [m[node] for node in e]
            new_e = tuple(sorted(new_e))
            labeled_motif.append(new_e)
        labeled_motif = tuple(sorted(labeled_motif))

        if labeled_motif in labeling:
            labeling[labeled_motif] += 1

    for e in edges:
        if len(e) == N - 1:
            nodes = list(e)

            for n in nodes:
                for e_i in graph[n]:
                    tmp = list(set(nodes) | set(e_i))
                    if len(tmp) == N and not (tuple(sorted(tmp)) in visited):
                        visited[tuple(sorted(tmp))] = 1
                        count_motif(tmp)

    out = []

    for motif in mapping.keys():
        count = 0
        for label in mapping[motif]:
            count += labeling[label]

        out.append((motif, count))

    out = sorted(out)

    # D = {}
    # for i in range(len(out)):
    #     D[i] = out[i][0]
    #
    # with open('motifs_{}.pickle'.format(N), 'wb') as handle:
    # pickle.dump(D, handle, protocol=pickle.HIGHEST_PROTOCOL)

    return out, visited


def _motifs_standard(edges, N, visited):
    mapping, labeling = generate_motifs(N)
    T = set()

    graph = {}
    for e in edges:
        if len(e) == 2:
            T.add(tuple(sorted(e)))
            a, b = e
            if a in graph:
                graph[a].append(b)
            else:
                graph[a] = [b]

            if b in graph:
                graph[b].append(a)
            else:
                graph[b] = [a]

    def count_motif(nodes):
        nodes = tuple(sorted(tuple(nodes)))

        if nodes in visited:
            return

        p_nodes = power_set(nodes)

        motif = [edge_key for edge in p_nodes if (edge_key := tuple(sorted(edge))) in T]

        m = {i: idx + 1 for idx, i in enumerate(nodes)}

        labeled_motif = []
        for e in motif:
            new_e = [m[node] for node in e]
            new_e = tuple(sorted(new_e))
            labeled_motif.append(new_e)
        labeled_motif = tuple(sorted(labeled_motif))

        if labeled_motif in labeling:
            labeling[labeled_motif] += 1

    def graph_extend(sub, ext, v, n_sub):
        if len(sub) == N:
            count_motif(sub)
            return

        while len(ext) > 0:
            w = ext.pop()
            tmp = set(ext)

            for u in graph[w]:
                if u not in sub and u not in n_sub and u > v:
                    tmp.add(u)

            new_sub = set(sub)
            new_sub.add(w)
            new_n_sub = set(n_sub).union(set(graph[w]))
            graph_extend(new_sub, tmp, v, new_n_sub)

    for v in graph.keys():
        v_ext = set()
        for u in graph[v]:
            if u > v:
                v_ext.add(u)
        graph_extend(set([v]), v_ext, v, set(graph[v]))

    out = []

    for motif in mapping.keys():
        count = 0
        for label in mapping[motif]:
            count += labeling[label]

        out.append((motif, count))

    out = sorted(out)

    # D = {}
    # for i in range(len(out)):
    #     D[i] = out[i][0]
    #
    # with open('motifs_{}.pickle'.format(N), 'wb') as handle:
    # pickle.dump(D, handle, protocol=pickle.HIGHEST_PROTOCOL)

    return out


def _motifs_ho_full(edges, N):
    mapping, labeling = generate_motifs(N)
    T = {tuple(sorted(e)) for e in edges}

    visited = {}

    def count_motif(nodes):
        nodes = tuple(sorted(tuple(nodes)))
        p_nodes = power_set(nodes)

        motif = [
            edge_key
            for edge in p_nodes
            if len(edge) >= 2
            if (edge_key := tuple(sorted(edge))) in T
        ]

        m = {i: idx + 1 for idx, i in enumerate(nodes)}

        labeled_motif = []
        for e in motif:
            new_e = [m[node] for node in e]
            new_e = tuple(sorted(new_e))
            labeled_motif.append(new_e)
        labeled_motif = tuple(sorted(labeled_motif))

        if labeled_motif in labeling:
            labeling[labeled_motif] += 1

    for e in edges:
        if len(e) == N:
            visited[e] = 1
            nodes = list(e)
            count_motif(nodes)

    out = []

    for motif in mapping.keys():
        count = 0
        for label in mapping[motif]:
            count += labeling[label]

        out.append((motif, count))

    out = sorted(out)

    # D = {}
    # for i in range(len(out)):
    #     D[i] = out[i][0]
    #
    # with open('motifs_{}.pickle'.format(N), 'wb') as handle:
    # pickle.dump(D, handle, protocol=pickle.HIGHEST_PROTOCOL)

    return out, visited


[docs] def diff_sum(observed: list, null_models: list): """ Compute the relative abundance between the observed frequencies and the null models Parameters ---------- observed : list Observed frequencies null_models : list Null models Returns ------- list Relative abundance between the observed frequencies and the null models Notes ----- The relative abundance is computed as: (observed - null) / (observed + null + 4) """ u_null = avg(null_models) res = [] for i in range(len(observed)): res.append((observed[i][1] - u_null[i]) / (observed[i][1] + u_null[i] + 4)) return res
[docs] def norm_vector(a): """ Normalize a vector Parameters ---------- a : list Vector to be normalized Returns ------- list Normalized vector (unit vector) or the original vector if the norm is zero """ M = 0 for i in a: M += i**2 M = math.sqrt(M) if M == 0: # vector is unchanged return a else: return [i / M for i in a]
[docs] def avg(motifs): """ Compute the average motif frequencies across multiple models. Parameters ---------- motifs : list of list of tuples Motif data from multiple models. Each model is a list of tuples where each tuple is (motif_id, frequency). Returns ------- list of float Average frequency for each motif position across all models. """ result = [] for i in range(len(motifs[0])): s = 0 for j in range(len(motifs)): s += motifs[j][i][1] result.append(s / len(motifs)) return result
[docs] def sigma(motifs): """ Compute the standard deviation of motif frequencies across multiple models. Parameters ---------- motifs : list of list of tuples Motif data from multiple models. Each model is a list of tuples where each tuple is (motif_id, frequency). Returns ------- list of float Standard deviation of frequency for each motif position across all models. """ u = avg(motifs) result = [] for i in range(len(motifs[0])): s = 0 for j in range(len(motifs)): s += (motifs[j][i][1] - u[i]) ** 2 s /= len(motifs) s = s**0.5 result.append(s) return result
[docs] def z_score(observed, null_models): """ Compute the z-score between the observed frequencies and the null models Parameters ---------- observed : list Observed frequencies null_models : list Null models Returns ------- list Z-score between the observed frequencies and the null models """ u_null = avg(null_models) sigma_null = sigma(null_models) z_scores = [] for i in range(len(observed)): z_scores.append((observed[i][1] - u_null[i]) / (sigma_null[i] + 0.01)) return z_scores
[docs] def power_set(A): """ Compute the power set of a set Parameters ---------- A : list Set Yields ------ list Subsets of the set """ N = len(A) for mask in range(1 << N): subset = [] for n in range(N): if ((mask >> n) & 1) == 1: subset.append(A[n]) yield subset
def _is_connected(edges, N): """ Check if a graph represented by a list of edges is connected. Parameters ---------- edges : list A list of edges, where each edge is represented as a list of nodes. N : int The total number of nodes in the graph. Returns ------- bool True if the graph is connected, False otherwise. """ nodes = set(itertools.chain(*edges)) if not edges: return False if len(nodes) != N: return False # Initialize graph as a dictionary of sets for efficient edge handling graph = {i: set() for i in nodes} for edge in edges: for i in range(len(edge)): for j in range(i + 1, len(edge)): graph[edge[i]].add(edge[j]) graph[edge[j]].add(edge[i]) # Early exit if any node is isolated if any(len(neighbors) == 0 for neighbors in graph.values()): return False visited = set() queue = deque([next(iter(graph))]) # Start from any node # BFS to check connectivity while queue: node = queue.pop() if node not in visited: visited.add(node) queue.extend(graph[node] - visited) # Check if all nodes were visited return len(visited) == N
[docs] def relabel(edges: list, relabeling: dict): """ Relabel the vertices of a hypergraph according to a given relabeling Parameters ---------- edges : list Edges of the hypergraph relabeling : dict Relabeling Returns ------- list Edges of the hypergraph with the vertices relabeled Notes ----- The relabeling is a dictionary that maps the old labels to the new labels """ res = [] for edge in edges: new_edge = [] for v in edge: new_edge.append(relabeling[v - 1]) res.append(tuple(sorted(new_edge))) return sorted(res)
[docs] def generate_motifs(N): """ Generates all possible patterns of non-isomorphic subhypergraphs of size N Parameters ---------- N : int Size of the subhypergraphs Returns ------- list List of all possible patterns of non-isomorphic subhypergraphs of size N """ n = N assert n >= 2 isom_classes = set() relabeling_list = list(itertools.permutations(range(1, n + 1))) h = range(1, n + 1) A = [comb for r in range(n, 1, -1) for comb in itertools.combinations(h, r)] B = power_set(A) for edges in B: if _is_connected(edges, N): edges = sorted(edges) found = False for relabeling in relabeling_list: relabeling_i = relabel(edges, relabeling) if tuple(relabeling_i) in isom_classes: found = True break if not found: isom_classes.add(tuple(edges)) mapping = {} labeling = {} for k in isom_classes: mapping[k] = set() for relabeling in relabeling_list: relabeling_i = relabel(k, relabeling) relabeling_i = tuple(sorted(relabeling_i)) labeling[relabeling_i] = 0 mapping[k].add(relabeling_i) return mapping, labeling
def _directed_motifs_ho_full(edges, N): mapping = {} T = {(tuple(sorted(e[0])), tuple(sorted(e[1]))) for e in edges} visited = {} def count_motif(nodes): nodes = tuple(sorted(tuple(nodes))) p_nodes = _all_directed_hyperedges(nodes) motif = [edge for edge in p_nodes if edge in T] m = {i: idx + 1 for idx, i in enumerate(nodes)} labeled_motif = [] for e in motif: new_e0 = [m[node] for node in e[0]] new_e1 = [m[node] for node in e[1]] new_e = tuple((tuple(sorted(new_e0)), tuple(sorted(new_e1)))) labeled_motif.append(new_e) labeled_motif = tuple(sorted(labeled_motif)) l_perm = [] vector = list(range(1, N + 1)) vector_permutations = permutations(vector) m = {} for permutation in vector_permutations: i = 1 for x in permutation: m[i] = x i += 1 new_comb = [] for x in labeled_motif: edge = [] for y in x: tmp_edge = [] for j in y: tmp_edge.append(m[j]) edge.append(tuple(sorted(tmp_edge))) edge = tuple(edge) new_comb.append(edge) new_comb = tuple(sorted(new_comb)) l_perm.append(new_comb) l_perm = sorted(l_perm) rappr = l_perm[0] if rappr in mapping: mapping[rappr] += 1 else: mapping[rappr] = 1 for e in edges: if ( len(set(list(e[0]) + list(e[1]))) == N and not tuple(sorted(set(list(e[0]) + list(e[1])))) in visited ): nodes = set(list(e[0]) + list(e[1])) visited[tuple(sorted(nodes))] = 1 count_motif(nodes) out = [] for motif, count in mapping.items(): out.append((motif, count)) out = sorted(out) # D = {} # for i in range(len(out)): # D[i] = out[i][0] return out, visited def _directed_motifs_ho_not_full(edges, N, visited): mapping = {} T = set() graph = {} for e in edges: T.add(tuple((tuple(sorted(e[0])), tuple(sorted(e[1]))))) for e_i in e[0]: if e_i in graph: graph[e_i].append(e) else: graph[e_i] = [e] for e_i in e[1]: if e_i in graph: graph[e_i].append(e) else: graph[e_i] = [e] def count_motif(nodes): nodes = tuple(sorted(tuple(nodes))) p_nodes = _all_directed_hyperedges(nodes) motif = [edge for edge in p_nodes if edge in T] m = {i: idx + 1 for idx, i in enumerate(nodes)} labeled_motif = [] for e in motif: new_e0 = [m[node] for node in e[0]] new_e1 = [m[node] for node in e[1]] new_e = tuple((tuple(sorted(new_e0)), tuple(sorted(new_e1)))) labeled_motif.append(new_e) labeled_motif = tuple(sorted(labeled_motif)) l_perm = [] vector = list(range(1, N + 1)) vector_permutations = permutations(vector) m = {} for permutation in vector_permutations: i = 1 for x in permutation: m[i] = x i += 1 new_comb = [] for x in labeled_motif: edge = [] for y in x: tmp_edge = [] for j in y: tmp_edge.append(m[j]) edge.append(tuple(sorted(tmp_edge))) edge = tuple(edge) new_comb.append(edge) new_comb = tuple(sorted(new_comb)) l_perm.append(new_comb) l_perm = sorted(l_perm) rappr = l_perm[0] if rappr in mapping: mapping[rappr] += 1 else: mapping[rappr] = 1 for e in edges: if ( len(set(list(e[0]) + list(e[1]))) == N - 1 and len(list(e[0]) + list(e[1])) == N - 1 ): nodes = list(set(list(e[0]) + list(e[1]))) for n in nodes: for e_i in graph[n]: tmp = set(nodes) | set(e_i[0]) | set(e_i[1]) if ( len(set(list(e_i[0]) + list(e_i[1]))) == len(list(e_i[0]) + list(e_i[1])) and len(tmp) == N and not (tuple(sorted(tmp)) in visited) ): visited[tuple(sorted(tmp))] = 1 count_motif(tmp) out = [] for motif, count in mapping.items(): out.append((motif, count)) out = sorted(out) # D = {} # for i in range(len(out)): # D[i] = out[i][0] return out, visited def _all_directed_hyperedges(nodes): """ Compute all directed hyperedges Parameters ---------- nodes : list Set of nodes Returns ------- set All directed hyperedges (ordered pairs of non-empty disjoint node subsets) """ hyperedges = set() # Generate directed hyperedges with source nodes and target nodes for source_size in range(1, len(nodes)): for source_nodes in combinations(nodes, source_size): for target_size in range(1, len(nodes) - source_size + 1): for target_nodes in combinations( set(nodes) - set(source_nodes), target_size ): edge = ( tuple(sorted(source_nodes)), tuple(sorted(target_nodes)), ) hyperedges.add(edge) return hyperedges
[docs] def directed_diff_sum(observed: list, null_models: list): """ Compute the relative abundance between the observed frequencies and the null models for directed hypergraphs. Parameters ---------- observed : list Observed frequencies null_models : list Null models Returns ------- list Relative abundance between the observed frequencies and the null models Notes ----- The relative abundance is computed as: (observed - null) / (observed + null + 4) """ u_null = directed_avg(null_models) res = [] for i in range(len(observed)): if observed[i][0] in u_null: res.append( (observed[i][1] - u_null[observed[i][0]]) / (observed[i][1] + u_null[observed[i][0]] + 4) ) else: res.append((observed[i][1]) / (observed[i][1] + 4)) return res
[docs] def directed_avg(motifs): m = {} for i in range(len(motifs)): for j in range(len(motifs[i])): if motifs[i][j][0] in m: m[motifs[i][j][0]] += motifs[i][j][1] else: m[motifs[i][j][0]] = motifs[i][j][1] result = {} for key in m.keys(): result[key] = m[key] / len(motifs) return result