import itertools
import math
from collections import deque
from itertools import combinations, permutations
def _motifs_ho_not_full(edges, N, visited):
mapping, labeling = generate_motifs(N)
T = set()
graph = {}
for e in edges:
if len(e) >= N:
continue
T.add(tuple(sorted(e)))
for e_i in e:
if e_i in graph:
graph[e_i].append(e)
else:
graph[e_i] = [e]
def count_motif(nodes):
nodes = tuple(sorted(tuple(nodes)))
p_nodes = power_set(nodes)
motif = [
edge_key
for edge in p_nodes
if len(edge) >= 2
if (edge_key := tuple(sorted(edge))) in T
]
m = {i: idx + 1 for idx, i in enumerate(nodes)}
labeled_motif = []
for e in motif:
new_e = [m[node] for node in e]
new_e = tuple(sorted(new_e))
labeled_motif.append(new_e)
labeled_motif = tuple(sorted(labeled_motif))
if labeled_motif in labeling:
labeling[labeled_motif] += 1
for e in edges:
if len(e) == N - 1:
nodes = list(e)
for n in nodes:
for e_i in graph[n]:
tmp = list(set(nodes) | set(e_i))
if len(tmp) == N and not (tuple(sorted(tmp)) in visited):
visited[tuple(sorted(tmp))] = 1
count_motif(tmp)
out = []
for motif in mapping.keys():
count = 0
for label in mapping[motif]:
count += labeling[label]
out.append((motif, count))
out = sorted(out)
# D = {}
# for i in range(len(out)):
# D[i] = out[i][0]
#
# with open('motifs_{}.pickle'.format(N), 'wb') as handle:
# pickle.dump(D, handle, protocol=pickle.HIGHEST_PROTOCOL)
return out, visited
def _motifs_standard(edges, N, visited):
mapping, labeling = generate_motifs(N)
T = set()
graph = {}
for e in edges:
if len(e) == 2:
T.add(tuple(sorted(e)))
a, b = e
if a in graph:
graph[a].append(b)
else:
graph[a] = [b]
if b in graph:
graph[b].append(a)
else:
graph[b] = [a]
def count_motif(nodes):
nodes = tuple(sorted(tuple(nodes)))
if nodes in visited:
return
p_nodes = power_set(nodes)
motif = [edge_key for edge in p_nodes if (edge_key := tuple(sorted(edge))) in T]
m = {i: idx + 1 for idx, i in enumerate(nodes)}
labeled_motif = []
for e in motif:
new_e = [m[node] for node in e]
new_e = tuple(sorted(new_e))
labeled_motif.append(new_e)
labeled_motif = tuple(sorted(labeled_motif))
if labeled_motif in labeling:
labeling[labeled_motif] += 1
def graph_extend(sub, ext, v, n_sub):
if len(sub) == N:
count_motif(sub)
return
while len(ext) > 0:
w = ext.pop()
tmp = set(ext)
for u in graph[w]:
if u not in sub and u not in n_sub and u > v:
tmp.add(u)
new_sub = set(sub)
new_sub.add(w)
new_n_sub = set(n_sub).union(set(graph[w]))
graph_extend(new_sub, tmp, v, new_n_sub)
for v in graph.keys():
v_ext = set()
for u in graph[v]:
if u > v:
v_ext.add(u)
graph_extend(set([v]), v_ext, v, set(graph[v]))
out = []
for motif in mapping.keys():
count = 0
for label in mapping[motif]:
count += labeling[label]
out.append((motif, count))
out = sorted(out)
# D = {}
# for i in range(len(out)):
# D[i] = out[i][0]
#
# with open('motifs_{}.pickle'.format(N), 'wb') as handle:
# pickle.dump(D, handle, protocol=pickle.HIGHEST_PROTOCOL)
return out
def _motifs_ho_full(edges, N):
mapping, labeling = generate_motifs(N)
T = {tuple(sorted(e)) for e in edges}
visited = {}
def count_motif(nodes):
nodes = tuple(sorted(tuple(nodes)))
p_nodes = power_set(nodes)
motif = [
edge_key
for edge in p_nodes
if len(edge) >= 2
if (edge_key := tuple(sorted(edge))) in T
]
m = {i: idx + 1 for idx, i in enumerate(nodes)}
labeled_motif = []
for e in motif:
new_e = [m[node] for node in e]
new_e = tuple(sorted(new_e))
labeled_motif.append(new_e)
labeled_motif = tuple(sorted(labeled_motif))
if labeled_motif in labeling:
labeling[labeled_motif] += 1
for e in edges:
if len(e) == N:
visited[e] = 1
nodes = list(e)
count_motif(nodes)
out = []
for motif in mapping.keys():
count = 0
for label in mapping[motif]:
count += labeling[label]
out.append((motif, count))
out = sorted(out)
# D = {}
# for i in range(len(out)):
# D[i] = out[i][0]
#
# with open('motifs_{}.pickle'.format(N), 'wb') as handle:
# pickle.dump(D, handle, protocol=pickle.HIGHEST_PROTOCOL)
return out, visited
[docs]
def diff_sum(observed: list, null_models: list):
"""
Compute the relative abundance between the observed frequencies and the null models
Parameters
----------
observed : list
Observed frequencies
null_models : list
Null models
Returns
-------
list
Relative abundance between the observed frequencies and the null models
Notes
-----
The relative abundance is computed as: (observed - null) / (observed + null + 4)
"""
u_null = avg(null_models)
res = []
for i in range(len(observed)):
res.append((observed[i][1] - u_null[i]) / (observed[i][1] + u_null[i] + 4))
return res
[docs]
def norm_vector(a):
"""
Normalize a vector
Parameters
----------
a : list
Vector to be normalized
Returns
-------
list
Normalized vector (unit vector) or the original vector if the norm is zero
"""
M = 0
for i in a:
M += i**2
M = math.sqrt(M)
if M == 0:
# vector is unchanged
return a
else:
return [i / M for i in a]
[docs]
def avg(motifs):
"""
Compute the average motif frequencies across multiple models.
Parameters
----------
motifs : list of list of tuples
Motif data from multiple models. Each model is a list of tuples where
each tuple is (motif_id, frequency).
Returns
-------
list of float
Average frequency for each motif position across all models.
"""
result = []
for i in range(len(motifs[0])):
s = 0
for j in range(len(motifs)):
s += motifs[j][i][1]
result.append(s / len(motifs))
return result
[docs]
def sigma(motifs):
"""
Compute the standard deviation of motif frequencies across multiple models.
Parameters
----------
motifs : list of list of tuples
Motif data from multiple models. Each model is a list of tuples where
each tuple is (motif_id, frequency).
Returns
-------
list of float
Standard deviation of frequency for each motif position across all models.
"""
u = avg(motifs)
result = []
for i in range(len(motifs[0])):
s = 0
for j in range(len(motifs)):
s += (motifs[j][i][1] - u[i]) ** 2
s /= len(motifs)
s = s**0.5
result.append(s)
return result
[docs]
def z_score(observed, null_models):
"""
Compute the z-score between the observed frequencies and the null models
Parameters
----------
observed : list
Observed frequencies
null_models : list
Null models
Returns
-------
list
Z-score between the observed frequencies and the null models
"""
u_null = avg(null_models)
sigma_null = sigma(null_models)
z_scores = []
for i in range(len(observed)):
z_scores.append((observed[i][1] - u_null[i]) / (sigma_null[i] + 0.01))
return z_scores
[docs]
def power_set(A):
"""
Compute the power set of a set
Parameters
----------
A : list
Set
Yields
------
list
Subsets of the set
"""
N = len(A)
for mask in range(1 << N):
subset = []
for n in range(N):
if ((mask >> n) & 1) == 1:
subset.append(A[n])
yield subset
def _is_connected(edges, N):
"""
Check if a graph represented by a list of edges is connected.
Parameters
----------
edges : list
A list of edges, where each edge is represented as a list of nodes.
N : int
The total number of nodes in the graph.
Returns
-------
bool
True if the graph is connected, False otherwise.
"""
nodes = set(itertools.chain(*edges))
if not edges:
return False
if len(nodes) != N:
return False
# Initialize graph as a dictionary of sets for efficient edge handling
graph = {i: set() for i in nodes}
for edge in edges:
for i in range(len(edge)):
for j in range(i + 1, len(edge)):
graph[edge[i]].add(edge[j])
graph[edge[j]].add(edge[i])
# Early exit if any node is isolated
if any(len(neighbors) == 0 for neighbors in graph.values()):
return False
visited = set()
queue = deque([next(iter(graph))]) # Start from any node
# BFS to check connectivity
while queue:
node = queue.pop()
if node not in visited:
visited.add(node)
queue.extend(graph[node] - visited)
# Check if all nodes were visited
return len(visited) == N
[docs]
def relabel(edges: list, relabeling: dict):
"""
Relabel the vertices of a hypergraph according to a given relabeling
Parameters
----------
edges : list
Edges of the hypergraph
relabeling : dict
Relabeling
Returns
-------
list
Edges of the hypergraph with the vertices relabeled
Notes
-----
The relabeling is a dictionary that maps the old labels to the new labels
"""
res = []
for edge in edges:
new_edge = []
for v in edge:
new_edge.append(relabeling[v - 1])
res.append(tuple(sorted(new_edge)))
return sorted(res)
[docs]
def generate_motifs(N):
"""
Generates all possible patterns of non-isomorphic subhypergraphs of size N
Parameters
----------
N : int
Size of the subhypergraphs
Returns
-------
list
List of all possible patterns of non-isomorphic subhypergraphs of size N
"""
n = N
assert n >= 2
isom_classes = set()
relabeling_list = list(itertools.permutations(range(1, n + 1)))
h = range(1, n + 1)
A = [comb for r in range(n, 1, -1) for comb in itertools.combinations(h, r)]
B = power_set(A)
for edges in B:
if _is_connected(edges, N):
edges = sorted(edges)
found = False
for relabeling in relabeling_list:
relabeling_i = relabel(edges, relabeling)
if tuple(relabeling_i) in isom_classes:
found = True
break
if not found:
isom_classes.add(tuple(edges))
mapping = {}
labeling = {}
for k in isom_classes:
mapping[k] = set()
for relabeling in relabeling_list:
relabeling_i = relabel(k, relabeling)
relabeling_i = tuple(sorted(relabeling_i))
labeling[relabeling_i] = 0
mapping[k].add(relabeling_i)
return mapping, labeling
def _directed_motifs_ho_full(edges, N):
mapping = {}
T = {(tuple(sorted(e[0])), tuple(sorted(e[1]))) for e in edges}
visited = {}
def count_motif(nodes):
nodes = tuple(sorted(tuple(nodes)))
p_nodes = _all_directed_hyperedges(nodes)
motif = [edge for edge in p_nodes if edge in T]
m = {i: idx + 1 for idx, i in enumerate(nodes)}
labeled_motif = []
for e in motif:
new_e0 = [m[node] for node in e[0]]
new_e1 = [m[node] for node in e[1]]
new_e = tuple((tuple(sorted(new_e0)), tuple(sorted(new_e1))))
labeled_motif.append(new_e)
labeled_motif = tuple(sorted(labeled_motif))
l_perm = []
vector = list(range(1, N + 1))
vector_permutations = permutations(vector)
m = {}
for permutation in vector_permutations:
i = 1
for x in permutation:
m[i] = x
i += 1
new_comb = []
for x in labeled_motif:
edge = []
for y in x:
tmp_edge = []
for j in y:
tmp_edge.append(m[j])
edge.append(tuple(sorted(tmp_edge)))
edge = tuple(edge)
new_comb.append(edge)
new_comb = tuple(sorted(new_comb))
l_perm.append(new_comb)
l_perm = sorted(l_perm)
rappr = l_perm[0]
if rappr in mapping:
mapping[rappr] += 1
else:
mapping[rappr] = 1
for e in edges:
if (
len(set(list(e[0]) + list(e[1]))) == N
and not tuple(sorted(set(list(e[0]) + list(e[1])))) in visited
):
nodes = set(list(e[0]) + list(e[1]))
visited[tuple(sorted(nodes))] = 1
count_motif(nodes)
out = []
for motif, count in mapping.items():
out.append((motif, count))
out = sorted(out)
# D = {}
# for i in range(len(out)):
# D[i] = out[i][0]
return out, visited
def _directed_motifs_ho_not_full(edges, N, visited):
mapping = {}
T = set()
graph = {}
for e in edges:
T.add(tuple((tuple(sorted(e[0])), tuple(sorted(e[1])))))
for e_i in e[0]:
if e_i in graph:
graph[e_i].append(e)
else:
graph[e_i] = [e]
for e_i in e[1]:
if e_i in graph:
graph[e_i].append(e)
else:
graph[e_i] = [e]
def count_motif(nodes):
nodes = tuple(sorted(tuple(nodes)))
p_nodes = _all_directed_hyperedges(nodes)
motif = [edge for edge in p_nodes if edge in T]
m = {i: idx + 1 for idx, i in enumerate(nodes)}
labeled_motif = []
for e in motif:
new_e0 = [m[node] for node in e[0]]
new_e1 = [m[node] for node in e[1]]
new_e = tuple((tuple(sorted(new_e0)), tuple(sorted(new_e1))))
labeled_motif.append(new_e)
labeled_motif = tuple(sorted(labeled_motif))
l_perm = []
vector = list(range(1, N + 1))
vector_permutations = permutations(vector)
m = {}
for permutation in vector_permutations:
i = 1
for x in permutation:
m[i] = x
i += 1
new_comb = []
for x in labeled_motif:
edge = []
for y in x:
tmp_edge = []
for j in y:
tmp_edge.append(m[j])
edge.append(tuple(sorted(tmp_edge)))
edge = tuple(edge)
new_comb.append(edge)
new_comb = tuple(sorted(new_comb))
l_perm.append(new_comb)
l_perm = sorted(l_perm)
rappr = l_perm[0]
if rappr in mapping:
mapping[rappr] += 1
else:
mapping[rappr] = 1
for e in edges:
if (
len(set(list(e[0]) + list(e[1]))) == N - 1
and len(list(e[0]) + list(e[1])) == N - 1
):
nodes = list(set(list(e[0]) + list(e[1])))
for n in nodes:
for e_i in graph[n]:
tmp = set(nodes) | set(e_i[0]) | set(e_i[1])
if (
len(set(list(e_i[0]) + list(e_i[1])))
== len(list(e_i[0]) + list(e_i[1]))
and len(tmp) == N
and not (tuple(sorted(tmp)) in visited)
):
visited[tuple(sorted(tmp))] = 1
count_motif(tmp)
out = []
for motif, count in mapping.items():
out.append((motif, count))
out = sorted(out)
# D = {}
# for i in range(len(out)):
# D[i] = out[i][0]
return out, visited
def _all_directed_hyperedges(nodes):
"""
Compute all directed hyperedges
Parameters
----------
nodes : list
Set of nodes
Returns
-------
set
All directed hyperedges (ordered pairs of non-empty disjoint node subsets)
"""
hyperedges = set()
# Generate directed hyperedges with source nodes and target nodes
for source_size in range(1, len(nodes)):
for source_nodes in combinations(nodes, source_size):
for target_size in range(1, len(nodes) - source_size + 1):
for target_nodes in combinations(
set(nodes) - set(source_nodes), target_size
):
edge = (
tuple(sorted(source_nodes)),
tuple(sorted(target_nodes)),
)
hyperedges.add(edge)
return hyperedges
[docs]
def directed_diff_sum(observed: list, null_models: list):
"""
Compute the relative abundance between the observed frequencies and the null models
for directed hypergraphs.
Parameters
----------
observed : list
Observed frequencies
null_models : list
Null models
Returns
-------
list
Relative abundance between the observed frequencies and the null models
Notes
-----
The relative abundance is computed as: (observed - null) / (observed + null + 4)
"""
u_null = directed_avg(null_models)
res = []
for i in range(len(observed)):
if observed[i][0] in u_null:
res.append(
(observed[i][1] - u_null[observed[i][0]])
/ (observed[i][1] + u_null[observed[i][0]] + 4)
)
else:
res.append((observed[i][1]) / (observed[i][1] + 4))
return res
[docs]
def directed_avg(motifs):
m = {}
for i in range(len(motifs)):
for j in range(len(motifs[i])):
if motifs[i][j][0] in m:
m[motifs[i][j][0]] += motifs[i][j][1]
else:
m[motifs[i][j][0]] = motifs[i][j][1]
result = {}
for key in m.keys():
result[key] = m[key] / len(motifs)
return result