Source code for graph_gen

# these graphs are not used and the file is not maintained at the moment
# random graphs should be used in the future

# -*- coding: utf-8 -*-
"""NetworkX-based multi-layer graph generators (legacy, not actively maintained).

This module provides :class:`GraphGenerator` and several concrete subclasses
that build multi-layer contact graphs backed by a NetworkX ``MultiGraph``.
These classes pre-date the :class:`~graphs.light.LightGraph` implementation
and are no longer used in production simulations; random-graph variants are
planned for future use.

Classes:
    GraphGenerator: Base class wrapping a NetworkX MultiGraph with
    layer-aware helper methods.
    RandomSingleGraphGenerator: Generates a single-layer Barabasi-Albert
    graph with exponentially trimmed degree distribution.
    RandomGraphGenerator: Generates one Barabasi-Albert graph per layer
    with random truncated-normal edge weights.
    PickleGraphGenerator: Restores a graph from a NetworkX pickle file.
    CSVGraphGenerator: Builds a graph from nodes, edges and layer CSV files.
"""

import networkx as nx
import numpy as np
import scipy.stats as stats
from scipy.sparse import csr_matrix
import pandas as pd

from utils.sparse_utils import multiply_zeros_as_ones


[docs] class GraphGenerator: """Base class for multi-layer contact-graph generators. Wraps a NetworkX ``MultiGraph`` (``self.G``) and provides layer-aware helpers for building, querying and modifying multi-layer contact networks. Subclasses are responsible for populating ``self.G`` with nodes and edges. Class Attributes: layer_names (list[str]): Ordered list of layer name strings. Default covers 14 generic layers (``'F'`` through ``'Z'``). layer_probs (list[float]): Per-layer transmission probabilities, initialised to ``1.0`` for every layer. layer_types (list[int]): Integer indices corresponding to each layer. Attributes: G (networkx.MultiGraph): The underlying multi-layer graph. Graphs (dict): Optional per-layer single-layer graph cache populated by :meth:`as_dict_of_graphs`. A (scipy.sparse.csr_matrix or None): Cached final adjacency matrix. ``None`` until first computed. A_valid (bool): ``True`` when the cached adjacency matrix is up to date with the current edge weights. A_invalids (set): Set of ``(u, v)`` node pairs whose adjacency matrix entries need recomputing. quarantined_edges (dict): Mapping from edge keys to their original weights, used to restore edges after a quarantine is lifted. Args: random_seed (int, optional): Seed for NumPy's random number generator. ``None`` leaves the global RNG state unchanged. """ layer_names = ['F', 'D', 'P', 'E', 'H', 'K', 'C', 'S', 'O', 'L', 'R', 'T', 'X', 'Z'] layer_probs = [1.0] * len(layer_names) layer_types = list(range(len(layer_names))) def __init__(self, random_seed=None): self.G = nx.MultiGraph() self.Graphs = {} self.G.graph['layer_names'] = self.layer_names self.G.graph['layer_probs'] = self.layer_probs if random_seed: np.random.seed(random_seed) self.A = None self.A_valid = False self.A_invalids = set() self.quarantined_edges = {} @property def nodes(self): """NodeView of all nodes in the underlying MultiGraph. Returns: networkx.classes.reportviews.NodeView: All nodes of ``self.G``. """ return self.G.nodes
[docs] def number_of_nodes(self): """Return the total number of nodes in the graph. Returns: int: Number of nodes in ``self.G``. """ return self.G.number_of_nodes()
[docs] def as_multigraph(self): """Return the raw underlying NetworkX MultiGraph. Returns: networkx.MultiGraph: The underlying graph object ``self.G``. """ return self.G
[docs] def as_one_graph(self): """Collapse the MultiGraph into a simple undirected Graph. Parallel edges between any pair of nodes are merged. Returns: networkx.Graph: A simple graph derived from ``self.G``. """ return nx.Graph(self.G)
[docs] def as_aggregated_graph(self): """Build an aggregated single-layer graph (incomplete, legacy stub). This method is a historical stub and does not return a fully populated graph. It exists for backward-compatibility only. Returns: networkx.Graph: An empty graph with the same node set as ``self.G``. """ ag = nx.Graph() ag.add_nodes_from(self.G) lp = [] selected_edges = [] for i, l in enumerate(self.layer_names): lp[i] = self.G.graph['layer_probs'][i] selected_edges[i] = [(u, v, e) for u, v, e in self.G.edges( data=True) if e['type'] == l] return ag
[docs] def final_adjacency_matrix(self): """Compute (or return cached) the final contact-probability adjacency matrix. The matrix entry ``A[i, j]`` holds the probability that nodes ``i`` and ``j`` make contact across *any* layer, calculated as ``1 - product_over_layers(1 - p_layer * w_edge)``. A two-level caching strategy is used: * If ``A_valid`` is ``True`` the cached matrix is returned immediately. * If the matrix has never been computed (``A is None``) or ``A_invalids`` is empty, the entire matrix is recalculated from scratch. * Otherwise only the entries listed in ``A_invalids`` are recomputed. Returns: scipy.sparse.csr_matrix: Symmetric probability matrix of shape ``(N, N)`` where ``N`` is the number of nodes. """ if self.A_valid: return self.A if self.A is None or not self.A_invalids: N = self.number_of_nodes() prob_no_contact = csr_matrix((N, N)) # empty values = 1.0 for name, prob in self.get_layers_info().items(): a = nx.adj_matrix(self.get_graph_for_layer(name)) if len(a.data) == 0: # no edges, prob of no-contact = 1 continue a = a.multiply(prob) # contact on layer not_a = a # empty values = 1.0 not_a.data = 1.0 - not_a.data # prob of no-contace prob_no_contact = multiply_zeros_as_ones( prob_no_contact, not_a) del a # prob_of_contact = 1.0 - prob_no_contact prob_of_contact = prob_no_contact prob_of_contact.data = 1.0 - prob_no_contact.data self.A = prob_of_contact self.A_valid = True self.A_invalids = set() return self.A else: # recalculate only invalids for u, v in list(self.A_invalids): edges_to_aggregate = self.G[u][v] sublayers = dict() for e in edges_to_aggregate.values(): sublayers[e["type"]] = sublayers.get( e["type"], 0) + e["weight"] no_contact_prob = 1.0 for layer_type, weight in sublayers.items(): w = 1.0 - weight * self.G.graph["layer_probs"][layer_type] no_contact_prob *= w self.A[u, v] = 1.0 - no_contact_prob self.A[v, u] = 1.0 - no_contact_prob self.A_valid = True self.A_invalids = set() return self.A
[docs] def get_graph_for_layer(self, layer_name): """Extract a single-layer subgraph for the named layer. Args: layer_name (str): Name of the layer to extract, must appear in ``self.layer_names``. Returns: networkx.Graph: A new simple graph containing all nodes of ``self.G`` and only the edges whose ``'type'`` attribute matches the index of ``layer_name`` in ``self.layer_names``. The graph's metadata includes ``'layer_name'`` and ``'layer_prob'``. """ ret_g = nx.Graph() ret_g.graph['layer_name'] = layer_name layer_index = self.layer_names.index(layer_name) ret_g.graph['layer_prob'] = self.G.graph['layer_probs'][layer_index] ret_g.add_nodes_from(self.G) selected_edges = [(u, v, e) for u, v, e in self.G.edges(data=True) if e['type'] == self.layer_names.index(layer_name)] ret_g.add_edges_from(selected_edges) return ret_g
[docs] def as_dict_of_graphs(self): """Build a per-layer dictionary of single-layer graphs. Iterates over all layers, constructs a separate :class:`networkx.Graph` for each one (all nodes, only edges on that layer) and caches the result in ``self.Graphs``. Returns: dict[str, networkx.Graph]: Mapping of layer name to its corresponding single-layer graph. """ self.Graphs = {} for i, l in enumerate(self.layer_names): FG = nx.Graph() FG.graph['layer_name'] = l FG.graph['layer_prob'] = self.G.graph['layer_probs'][i] FG.add_nodes_from(self.G) selected_edges = [(u, v, e) for u, v, e in self.G.edges( data=True) if e['type'] == l] FG.add_edges_from(selected_edges) self.Graphs[l] = FG return self.Graphs
[docs] def get_attr_list(self, attr): """Return a list of node attribute values in node-iteration order. Args: attr (str): Name of the node attribute to collect. Returns: list: Attribute values for every node in ``self.G``, ordered by NetworkX's default node-iteration order. """ attr_list = [] for (p, d) in self.G.nodes(data=True): attr_list.append(d[attr]) return attr_list
[docs] def get_edges_for_node(self, node_id): """Return all edges incident to a given node. Args: node_id: Node identifier as stored in ``self.G``. If the node does not exist a warning is printed and ``None`` is returned implicitly. Returns: networkx.classes.reportviews.OutMultiEdgeDataView or None: An edge-data view for all edges incident to ``node_id``, or ``None`` (implicit) if the node is not in the graph. """ if not self.G.has_node(node_id): print(f"Warning: modify_layer_for_node called for nonexisting node_id {node_id}") return # for key in what_by_what: # edges_to_modify = [ # (u, v, k) for u, v, k, d in self.G.edges(node, data=True, keys=True) # if d['label'] == key # ] # print(edges_to_modify) # for e in edges_to_modify: # self.G.edges[e]['weight'] *= what_by_what[key] # for e in self.G.edges(node, data=True, keys=True): # print(*e) # for e in self.G.edges([node_id], data=True, keys=True): # print(*e) return self.G.edges([node_id], data=True, keys=True)
[docs] def get_layers_for_edge(self, u, v): """Return the list of layer types for all parallel edges between two nodes. Args: u: Source node identifier. v: Destination node identifier. Returns: list[int]: Layer type (``'type'`` attribute) for each parallel edge between ``u`` and ``v`` in the MultiGraph. """ edges = self.G[u][v] return [d["type"] for d in edges.values()]
[docs] def modify_layers_for_nodes(self, node_id_list, what_by_what, is_quarrantined=None): """Apply quarantine by scaling edge weights for given nodes. For each edge incident to a node in ``node_id_list`` whose layer appears in ``what_by_what``, the edge weight is multiplied by the corresponding coefficient (clipped to ``[0, 1]``). Edges that are already quarantined (i.e. at least one endpoint has a non-zero quarantine counter in ``is_quarrantined``) are skipped. The original weight is recorded in ``self.quarantined_edges`` so it can be restored by :meth:`recover_edges_for_nodes`. Affected ``(u, v)`` pairs are added to ``A_invalids`` so the adjacency matrix will be partially recomputed on next access. Args: node_id_list (iterable): Node identifiers whose incident edges should be modified. what_by_what (dict): Mapping of ``{layer_type: coefficient}`` specifying the weight multiplier for each layer. If falsy (empty or ``None``) the method returns immediately. is_quarrantined (numpy.ndarray, optional): Per-node quarantine counters. When provided, edges where either endpoint has a non-zero counter are skipped. """ if not what_by_what: return # if not self.G.has_node(node_id): # print(f"Warning: modify_layer_for_node called for nonexisting node_id {node_id}") # return changed = set() # keep the original list (it is modified in the cycle) for u, v, k, d in self.G.edges(set(node_id_list), data=True, keys=True): if is_quarrantined is not None and (is_quarrantined[u] > 0 or is_quarrantined[v] > 0): # edge is already quarrantined continue layer_label = d["type"] if layer_label in what_by_what: assert (u, v, k, d["type"], d["subtype"]) not in self.quarantined_edges, \ f"edge {(u, v, k, d['type'], d['subtype'])} is already quaratined, {is_quarrantined[u]}{is_quarrantined[v]}" s, e = (u, v) if u < v else (v, u) self.quarantined_edges[( s, e, k, d["type"], d["subtype"])] = d['weight'] self.G.edges[(u, v, k) ]['weight'] = min(self.G.edges[(u, v, k)]['weight'] * what_by_what[layer_label], 1.0) changed.add((u, v)) if changed: self.A_invalids.update(changed) self.A_valid = False
[docs] def recover_edges_for_nodes(self, release, normal_life, is_quarrantined): """Restore original edge weights for nodes being released from quarantine. Iterates over all edges incident to ``release`` nodes. For edges where both endpoints have a zero quarantine counter in ``is_quarrantined``, the weight stored in ``self.quarantined_edges`` is written back. Edges where at least one endpoint is still quarantined are left unchanged. Affected ``(u, v)`` pairs are added to ``A_invalids`` so the adjacency matrix is recomputed on next access. Args: release (iterable): Node identifiers being released from quarantine. normal_life: Unused parameter retained for interface compatibility with the ``LightGraph`` API. is_quarrantined (numpy.ndarray): Per-node quarantine counters indexed by node ID. Nodes with a counter of ``0`` are considered free. """ changed = set() for u, v, k, d in self.G.edges(release, data=True, keys=True): if is_quarrantined[u] or is_quarrantined[v]: # still one of nodes is in quarrantine continue e_type = d["type"] e_subtype = d["subtype"] # new_weight = None # for edata in normal_life.G.get_edge_data(u, v).values(): # if edata["type"] == e_type and edata["subtype"] == e_subtype: # new_weight = edata["weight"] # break # if new_weight is None: # raise ValueError("Edge not found in normal life.") s, e = (u, v) if u < v else (v, u) self.G.edges[(u, v, k)]['weight'] = self.quarantined_edges[( s, e, k, e_type, e_subtype)] del self.quarantined_edges[( s, e, k, e_type, e_subtype)] changed.add((s, e)) if changed: self.A_invalids.update(changed) self.A_valid = False
[docs] def get_layers_info(self): """Return a mapping of layer names to their transmission probabilities. Returns: dict[str, float]: Dictionary mapping each layer name to its current transmission probability as stored in ``self.G.graph['layer_probs']``. """ return dict(zip(self.G.graph["layer_names"], self.G.graph["layer_probs"]))
[docs] def print_multi(self): """Print the MultiGraph in DOT format to stdout.""" dot_G = nx.nx_pydot.to_pydot(self.G) print(dot_G)
[docs] def draw_multi(self, filename='empty_graph.png'): """Render the MultiGraph using Graphviz and save it to a file. Args: filename (str): Output file path for the rendered image. Defaults to ``'empty_graph.png'``. """ A = nx.nx_agraph.to_agraph(self.G) A.layout('dot') A.draw(filename)
[docs] def close_layers(self, list_of_layers, coefs=None): """Set transmission probability to zero (or a custom value) for named layers. Invalidates the cached adjacency matrix so it will be recomputed on next access. Args: list_of_layers (list[str]): Names of layers to close. coefs (list[float], optional): Per-layer replacement probabilities. When provided, ``coefs[idx]`` is used for the layer at position ``idx`` in ``list_of_layers``. When ``None`` all listed layers are set to ``0``. """ for idx, name in enumerate(list_of_layers): i = self.G.graph["layer_names"].index(name) self.G.graph["layer_probs"][i] = 0 if not coefs else coefs[idx] print(f"Closing {name} (new value {self.G.graph['layer_probs'][i]})") self.A_valid = False
[docs] def write_pickle(self, path): """Serialize the underlying NetworkX graph to a pickle file. Args: path (str): Destination file path for the pickle. """ nx.write_gpickle(self.G, path)
[docs] def read_picke(self, path): """Restore the graph from a NetworkX pickle file. Note: The method name contains a deliberate historical typo (``read_picke`` instead of ``read_pickle``); it is kept as-is for backward compatibility. Args: path (str): Path to the pickle file to read. """ self.G = nx.read_gpickle(path) self.layer_names = self.G.graph['layer_names'] self.layer_probs = self.G.graph['layer_probs']
[docs] def custom_exponential_graph(base_graph=None, scale=100, min_num_edges=0, m=9, n=None): """Generate a power-law-esque graph by thinning a Barabasi-Albert base graph. Starts from a Barabasi-Albert preferential-attachment graph (expected to form a single connected component) and then stochastically removes edges from each node so that the resulting degree distribution has an exponential tail rather than a hard minimum of ``m``. For each node the number of edges to *keep* is drawn from an exponential distribution with the given ``scale``, clipped to ``[min_num_edges, current_degree]``. Edges not selected for retention are removed. Args: base_graph (networkx.Graph, optional): Pre-existing graph to thin. When ``None`` a fresh Barabasi-Albert graph of ``n`` nodes is created. scale (float): Scale parameter of the exponential distribution used to sample the number of edges to keep per node. Defaults to ``100``. min_num_edges (int): Minimum number of edges guaranteed to be kept per node. Defaults to ``0``. m (int): Number of edges added per node when constructing the Barabasi-Albert base graph (only relevant when ``base_graph`` is ``None``). Defaults to ``9``. n (int, optional): Number of nodes for the Barabasi-Albert graph. Required when ``base_graph`` is ``None``. Returns: networkx.Graph: The thinned graph. Raises: AssertionError: If ``base_graph`` is ``None`` and ``n`` is also ``None``. """ if(base_graph): graph = base_graph.copy() else: assert(n is not None), "Argument n (number of nodes) must be provided when no base graph is given." graph = nx.barabasi_albert_graph(n=n, m=m) # To get a graph with power-law-esque properties but without the fixed minimum degree, # We modify the graph by probabilistically dropping some edges from each node. for node in graph: neighbors = list(graph[node].keys()) quarantineEdgeNum = int(max(min(np.random.exponential( scale=scale, size=1), len(neighbors)), min_num_edges)) # print(quarantineEdgeNum) quarantineKeepNeighbors = np.random.choice( neighbors, size=quarantineEdgeNum, replace=False) for neighbor in neighbors: if(neighbor not in quarantineKeepNeighbors): graph.remove_edge(node, neighbor) return graph
[docs] class RandomSingleGraphGenerator(GraphGenerator): """Graph generator producing a single-layer random contact graph. Builds one Barabasi-Albert graph (with exponential degree thinning) and assigns each edge a truncated-normal weight in ``(0, 1)``. The resulting graph has no layer structure (it collapses all contacts into a single layer). Args: num_nodes (int): Number of nodes. Defaults to ``10000``. **kwargs: Additional keyword arguments forwarded to :class:`GraphGenerator`. """ def __init__(self, num_nodes=10000, **kwargs): super().__init__(**kwargs) self.num_nodes = num_nodes # generate random connections baseGraph = nx.barabasi_albert_graph(n=num_nodes, m=9) FG = custom_exponential_graph(baseGraph, scale=100) # list_of_zeroes = [ n for n, d in FG.degree() if d == 0 ] # if list_of_zeroes != []: # print('OMG: ', list_of_zeroes) # generate random weights from trunc norm lower, upper = 0, 1 mu, sigma = 0.7, 0.3 iii = 0 for (u, v, d) in FG.edges(data=True): iii += 1 FG[u][v]['weight'] = stats.truncnorm.rvs( (lower - mu) / sigma, (upper - mu) / sigma, loc=mu, scale=sigma) print(iii, 'Edges') self.G = FG
[docs] class RandomGraphGenerator(GraphGenerator): """Graph generator producing independent random contact graphs per layer. For each of the 14 default layers a separate Barabasi-Albert graph is generated (with exponential degree thinning) and each edge is assigned a truncated-normal weight in ``(0, 1)`` with mean ``0.7`` and std-dev ``0.3``. Layer-level transmission probabilities are also drawn from the same truncated-normal distribution. All per-layer graphs are merged into the shared MultiGraph ``self.G``. The resulting graph has a mean degree of approximately 13 per layer. Args: num_nodes (int): Number of nodes in each per-layer graph. Defaults to ``10000``. **kwargs: Additional keyword arguments forwarded to :class:`GraphGenerator`. """ def __init__(self, num_nodes=10000, **kwargs): super().__init__(**kwargs) self.num_nodes = num_nodes i = 0 for l in self.layer_names: # generate random connections baseGraph = nx.barabasi_albert_graph(n=num_nodes, m=9) FG = custom_exponential_graph(baseGraph, scale=100) # generate random weights from trunc norm lower, upper = 0, 1 mu, sigma = 0.7, 0.3 for (u, v, d) in FG.edges(data=True): FG[u][v]['weight'] = stats.truncnorm.rvs( (lower - mu) / sigma, (upper - mu) / sigma, loc=mu, scale=sigma) FG[u][v]['type'] = l # other params of the graph FG.graph['layer_name'] = l self.G.graph['layer_probs'][i] = stats.truncnorm.rvs( (lower - mu) / sigma, (upper - mu) / sigma, loc=mu, scale=sigma) FG.graph['layer_prob'] = self.G.graph['layer_probs'][i] self.Graphs[l] = FG i = i + 1 # dot = nx.nx_pydot.to_pydot(FG) # print(dot) for l in self.layer_names: self.G.add_edges_from(self.Graphs[l].edges(data=True))
[docs] def as_dict_of_graphs(self): """Return the pre-built per-layer graph dictionary. Overrides :meth:`GraphGenerator.as_dict_of_graphs` to return the dictionary built during ``__init__`` rather than rebuilding it. Returns: dict[str, networkx.Graph]: Mapping of layer name to its single-layer graph. """ return self.Graphs
[docs] class PickleGraphGenerator(GraphGenerator): """Graph generator that restores a graph from a NetworkX pickle file. Args: path_to_pickle (str): Path to the pickle file containing a previously serialized NetworkX graph. Defaults to ``'graph.pickle'``. **kwardgs: Additional keyword arguments (currently unused). """ def __init__(self, path_to_pickle='graph.pickle', **kwardgs): self.read_pickle(path_to_pickle)
[docs] class CSVGraphGenerator(GraphGenerator): """Graph generator that builds a multi-layer graph from CSV files. Reads nodes, edges and layer definitions from three separate CSV files and populates the underlying NetworkX MultiGraph. Duplicate edges (same ``type``, ``subtype``, endpoint pair and weight) are removed in debug mode. Class Attributes: layer_names (list): Starts empty; populated from the layers CSV. layer_probs (list): Starts empty; populated from the layers CSV. Args: path_to_nodes (str): Path to the nodes CSV file. Defaults to ``'nodes.csv'``. path_to_edges (str): Path to the edges CSV file. Defaults to ``'edges.csv'``. Required columns: ``vertex1``, ``vertex2``, ``type``, ``subtype``, ``weight``. path_to_layers (str): Path to the layer-definition CSV file. Defaults to ``'etypes.csv'``. Required columns: ``id``, ``name``, ``weight``. **kwargs: Additional keyword arguments forwarded to :class:`GraphGenerator`. """ layer_names = [] layer_probs = [] def __init__(self, path_to_nodes='nodes.csv', path_to_edges='edges.csv', path_to_layers='etypes.csv', **kwargs): super().__init__(**kwargs) csv_hacking = {'na_values': 'undef', 'skipinitialspace': True} nodes = pd.read_csv(path_to_nodes, **csv_hacking) edges = pd.read_csv(path_to_edges, **csv_hacking) layers = pd.read_csv(path_to_layers, **csv_hacking) indexNames = edges[edges['vertex1'] == edges['vertex2']].index if len(indexNames): print(f"Warning: dropping self edges!!!! {indexNames}") # print(edges[edges['vertex1'] == edges['vertex2']]) edges.drop(indexNames, inplace=True) # print(layers) # fill the layers # layer_names = tuple(zip(layers.loc('id'), layers.loc('id2'))) layers_to_add = layers.to_dict('list') self.layer_names = layers_to_add['name'] # print(layers_names) self.layer_probs = layers_to_add['weight'] self.layer_ids = layers_to_add['id'] self.G.graph['layer_names'] = self.layer_names self.G.graph['layer_probs'] = self.layer_probs # fill the nodes nodes_to_add = nodes.to_dict('records') idx_s = list(range(0, len(nodes_to_add))) self.G.add_nodes_from(zip(idx_s, nodes_to_add)) # fill the edges # get rid of duplicate edges if __debug__: # fill the edges edges["e"] = edges.apply( lambda row: ( (int(row.vertex1), int(row.vertex2)) if int(row.vertex1) < int(row.vertex2) else (int(row.vertex2), int(row.vertex1)) ), axis=1 ) edges.drop_duplicates(inplace=True, subset=[ "type", "subtype", "e", "weight"]) edges.drop(columns=["e"], inplace=True) edges_to_add = edges.to_dict('list') froms = edges_to_add['vertex1'] tos = edges_to_add['vertex2'] datas = [{ k: v for k, v in d.items() if k != 'vertex1' and k != 'vertex2' } for d in edges.to_dict('records')] self.G.add_edges_from(zip(froms, tos, datas))
[docs] def __str__(self): """Return a human-readable string listing all edges with their data. Returns: str: Newline-separated representation of every edge in ``self.G`` including edge attribute dictionaries. """ return "\n".join([str(e) for e in self.G.edges(data=True)])