"""Light-weight graph representation for agent-based network simulations.
This module provides the ``LightGraph`` class, which stores a multi-layer
undirected graph as a collection of NumPy arrays and a compressed sparse
row (CSR) adjacency matrix. It is designed to be considerably faster than
NetworkX for the high-throughput edge queries required by epidemic and
information-spread models.
Typical workflow:
1. Build a ``LightGraph`` from CSV files with :meth:`LightGraph.read_csv`.
2. Pickle the result for repeated simulation runs.
3. Clone per-run state cheaply with :meth:`LightGraph.copy`.
"""
import logging
from operator import iconcat
from functools import reduce
from itertools import chain
import numexpr as ne
import numpy as np
import pandas as pd
from copy import copy
from scipy.sparse import csr_matrix, lil_matrix
import json
# import os
# os.environ["NUMEXPR_MAX_THREADS"] = "272" # to fix numexpr complains
[docs]
def concat_lists(l):
"""Return the concatenation of all lists contained in an iterable.
Args:
l (iterable): An iterable whose elements are lists (or other
iterables) to be concatenated.
Returns:
list: A single flat list produced by chaining every element of ``l``.
"""
return list(chain.from_iterable(l))
[docs]
class LightGraph:
"""Graph for agent-based network models stored as NumPy arrays.
NetworkX proved too slow for the high-throughput edge queries needed by
large-scale simulations. ``LightGraph`` therefore stores the graph as a
collection of plain NumPy arrays together with a CSR adjacency matrix
whose non-zero values are indices into those arrays.
Typical usage is to load the graph from CSV files once with
:meth:`read_csv` and then pickle the result for future simulation runs,
because building from CSV is comparatively slow.
Nodes carry integer IDs that are user-visible, but internally (and in the
model) nodes are addressed by their positional index in the original
nodes file.
Attributes:
random_seed (int or None): The seed passed to ``numpy.random.seed``
at construction time, stored for reproducibility bookkeeping.
edge_repo: Internal edge repository (populated by :meth:`read_csv`).
A (scipy.sparse.csr_matrix or None): Adjacency matrix whose non-zero
entries hold edge-repository keys. ``None`` until
:meth:`read_csv` has been called.
is_quarantined (numpy.ndarray or None): Per-node quarantine counter.
``None`` until the first quarantine operation.
Args:
random_seed (int, optional): Seed for NumPy's random number
generator. ``None`` leaves the global state unchanged.
"""
# __slots__ = ['e_types', 'e_subtypes', 'e_probs', 'e_intensities', 'e_source', 'e_dest', 'e_valid', 'edges_repo',
# 'edges_directions', '__dict__'] # not really helpful here, beneficial only for lots of small objects
def __init__(self, random_seed=None):
if random_seed:
np.random.seed(random_seed)
self.random_seed = random_seed
self.edge_repo = None
self.A = None
self.is_quarantined = None
[docs]
def read_csv(self,
path_to_nodes='p.csv',
path_to_external=None,
path_to_layers=None,
path_to_edges='edges.csv',
path_to_quarantine=None,
path_to_layer_groups=None):
"""Load graph data from CSV files and build internal data structures.
Reads node attributes, edge lists, layer definitions, optional
external nodes, optional quarantine coefficients and optional layer
groups. After this call the graph is ready to use.
Self-loops are silently dropped with a warning.
Args:
path_to_nodes (str): Path to the primary nodes CSV file.
Defaults to ``'p.csv'``.
path_to_external (str, optional): Path to an optional secondary
nodes CSV file (e.g. external / virtual nodes). When
``None`` no external nodes are added.
path_to_layers (str, optional): Path to a CSV file defining
layer IDs, names and weights. When ``None`` a minimal
two-layer default is used (layers 0 and 1).
path_to_edges (str): Path to the edges CSV file.
Defaults to ``'edges.csv'``. Required columns are
``vertex1``, ``vertex2``; optional columns are ``layer``,
``sublayer``, ``probability`` and ``intensity`` (all
back-filled with sensible defaults when absent).
path_to_quarantine (str, optional): Path to a two-column
headerless CSV file mapping layer ID to quarantine
coefficient. When ``None`` quarantine coefs are disabled.
path_to_layer_groups (str, optional): Path to a JSON file
defining named groups of layers. When ``None`` layer groups
are disabled.
"""
csv_hacking = {'na_values': 'undef', 'skipinitialspace': True}
base_nodes = pd.read_csv(
path_to_nodes, **csv_hacking).drop_duplicates().reset_index()
edges = pd.read_csv(path_to_edges, **csv_hacking)
# ensure columns - for backward compatibility with model-M
if "layer" not in edges.columns:
edges["layer"] = 1
if "sublayer" not in edges.columns:
edges["sublayer"] = 0
if "probability" not in edges.columns:
edges["probability"] = 1.0
if "intensity" not in edges.columns:
edges["intensity"] = 1.0
if path_to_layers is None:
layers = pd.DataFrame([{"id": 0, "name": "no_layer", "weight": 1.0},
{"id": 1, "name": "default", "weight": 1.0}])
else:
layers = pd.read_csv(path_to_layers, **csv_hacking)
if path_to_external is not None:
external_nodes = pd.read_csv(path_to_external, **csv_hacking)
else:
external_nodes = pd.DataFrame()
nodes = pd.concat([base_nodes, external_nodes],
ignore_index=True).drop(columns=["index"])
if path_to_quarantine is None:
self.QUARANTINE_COEFS = None
else:
df = pd.read_csv(path_to_quarantine, header=None, index_col=0)
self.QUARANTINE_COEFS = {
i: df.loc[i][1]
for i in df.index
}
if path_to_layer_groups is not None:
with open(path_to_layer_groups, "r") as f:
self.LAYER_GROUPS = json.load(f)
else:
self.LAYER_GROUPS = None
# layer names, ids and weights go to graph
layers_to_add = layers.to_dict('list')
self.layer_ids = layers_to_add['id']
self.layer_name = layers_to_add['name']
self.layer_weights = np.array(layers_to_add['weight'], dtype=float)
# nodes
# select categorical columns
cat_columns = nodes.select_dtypes(['object']).columns
nodes[cat_columns] = nodes[cat_columns].apply(
lambda x: x.astype('category'))
# save codes for backward conversion
self.cat_table = {
col: list(nodes[col].cat.categories)
for col in cat_columns
}
# covert categorical to numbers
nodes[cat_columns] = nodes[cat_columns].apply(
lambda x: x.cat.codes)
# pprint(nodes)
# just test of conversion back
# print(cat_columns)
# for col in list(cat_columns):
# nodes[[col]] = nodes[[col]].apply(
# lambda x: pd.Categorical.from_codes(
# x, categories=cat_table[col])
# )
# pprint(nodes)
for col in nodes.columns:
setattr(self, "nodes_"+col, np.array(nodes[col]))
self.nodes = np.array(nodes.index)
self.num_nodes = len(self.nodes)
self.num_base_nodes = len(base_nodes)
""" if self.num_nodes > 65535:
raise ValueError(
"Number of nodes too high (we are using unit16, change it to unit32 for higher numbers of nodes.")
"""
# self.ignored = set(external_nodes["id"])
# edges
# drop self edges
indexNames = edges[edges['vertex1'] == edges['vertex2']].index
if len(indexNames):
logging.warning("Warning: dropping self edges!!!!")
edges.drop(indexNames, inplace=True)
# fill edges to a graph
n_edges = len(edges)
# edges data"
self.e_types = np.empty(n_edges, dtype="uint32")
self.e_subtypes = np.empty(n_edges, dtype="uint32")
self.e_probs = np.empty(n_edges, dtype="float32")
self.e_intensities = np.empty(n_edges, dtype="float32")
self.e_source = np.empty(n_edges, dtype="uint32")
self.e_dest = np.empty(n_edges, dtype="uint32")
self.e_active = np.ones(n_edges, dtype="bool")
# if value == 2 than is valid, other numbers prob in quarantine
self.e_valid = 2 * np.ones(n_edges, dtype="float32")
# edges repo which will eventually be list of sets and not a dict
self.edges_repo = {
0: []
}
self.edges_directions = {
0: []
}
key = 1
# working matrix
tmpA = lil_matrix((self.num_nodes, self.num_nodes), dtype="uint32")
forward_edge = True
backward_edge = False
id_dict = {self.nodes_id[i]: i for i in range(self.num_nodes)}
# fill data and get indicies
for i, row in enumerate(edges.itertuples()):
self.e_types[i] = row.layer
self.e_subtypes[i] = row.sublayer
self.e_probs[i] = row.probability
self.e_intensities[i] = row.intensity
# if row.vertex1 in self.ignored or row.vertex2 in self.ignored:
# continue
try:
i_row = id_dict[row.vertex1]
i_col = id_dict[row.vertex2]
except IndexError:
print("Node does not exist")
print(row.vertex1, row.vertex2)
print(np.where(self.nodes_id == row.vertex1),
np.where(self.nodes_id == row.vertex2))
exit()
i_row, i_col = min(i_row, i_col), max(i_row, i_col)
self.e_source[i] = i_row
self.e_dest[i] = i_col
if tmpA[i_row, i_col] == 0:
# first edge between (row, col)
self.edges_repo[key], self.edges_directions[key] = [
i], forward_edge
self.edges_repo[key + 1], self.edges_directions[key +
1] = [i], backward_edge
tmpA[i_row, i_col] = key
tmpA[i_col, i_row] = key + 1
key += 2
else:
# add to existing edge list
print("+", end="")
key_forward = tmpA[i_row, i_col]
key_backward = tmpA[i_col, i_row]
self.edges_repo[key_forward].append(i)
assert self.edges_directions[key_forward] == forward_edge
# self.edges_directions[key_forward].append(forward_edge)
self.edges_repo[key_backward].append(i)
# self.edges_directions[key_backward].append(backward_edge)
assert self.edges_directions[key_backward] == backward_edge
if i % 10000 == 0:
print("\nEdges loaded", i)
# create matrix (A[i,j] is an index of edge (i,j) in array of edges)
print("\nConverting lil_matrix A to csr ...", end="")
self.A = csr_matrix(tmpA)
print("level done")
del tmpA
print("Converting edges_repo to list ...", end="")
# data = [None]
# subedges_counts = [0]
# for i_key in range(1, key):
# value_set = self.edges_repo[i_key]
# # if len(value_list) > 1:
# # print(i_key)
# data.append(value_set)
# subedges_counts.append(len(value_set))
# self.edges_repo = data
# the above can be replaced by
self.edges_repo = np.array(
list(self.edges_repo.values()), dtype=object)
subedges_counts = [len(s) for s in self.edges_repo]
# subedges_counts = [len(s) for s in np.nditer(self.edges_repo, flags=['refs_ok'], op_flags=['readonly'])]
print("level done")
print("Converting edges_directions to list ... ", end="")
data = [None]
for i_key in range(1, key):
dir_list = [self.edges_directions[i_key]] * subedges_counts[i_key]
data.append(dir_list)
self.edges_directions = np.array(data, dtype=object)
print("level done")
print("Control check ... ", end="")
for i_key in range(1, key):
assert len(self.edges_repo[i_key]) == len(
self.edges_directions[i_key])
print("ok")
print("Precalculate array of layer weights ... ", end="")
self.e_layer_weight = self.layer_weights[self.e_types]
print("ok")
print("LightGraph is ready to use.")
logging.info(f"Max intensity {self.e_intensities.max()}")
@property
def number_of_nodes(self):
"""Total number of nodes in the graph (base + external).
Returns:
int: Number of nodes.
"""
return self.num_nodes
[docs]
def get_nodes(self, layer):
"""Return all node indices that have at least one edge on the given layer.
Args:
layer (int): Layer ID to filter by.
Returns:
numpy.ndarray: Sorted array of unique node indices that participate
in at least one edge whose layer equals ``layer``.
"""
sources = self.e_source[self.e_types == layer]
dests = self.e_dest[self.e_types == layer]
return np.union1d(sources, dests)
[docs]
def get_edges_nodes(self, edges, edges_dirs):
"""Return the source and destination node indices for a set of edges.
Edges are stored with an arbitrary orientation (smaller index as
source). ``edges_dirs`` corrects for this: when ``True`` the stored
source is the logical source; when ``False`` the stored destination is
the logical source.
Args:
edges (numpy.ndarray): 1-D array of edge indices.
edges_dirs (numpy.ndarray): Boolean array of the same length as
``edges``. ``True`` means *forward* direction (source →
dest); ``False`` means *backward* direction.
Returns:
tuple[numpy.ndarray, numpy.ndarray]: A pair
``(source_nodes, dest_nodes)`` where each element is a 1-D
integer array of node indices aligned with ``edges``.
"""
sources = self.e_source[edges]
dests = self.e_dest[edges]
# sources, dests numpy vectors on nodes
# edges_dirs - bool vector
# if True take source if False take dest
flags = edges_dirs
# print(edges_dirs)
source_nodes = sources * flags + dests * (1 - flags)
dest_nodes = sources * (1 - flags) + dests * flags
return source_nodes, dest_nodes
# def get_edges_subset(self, source_flags, dest_flags):
# active_subset = self.A[source_flags == 1, :][:, dest_flags == 1]
# edge_lists = [self.edges_repo[key] for key in active_subset.data]
# return subset, sum(edge_lists, [])
[docs]
def get_edges(self, source_flags, dest_flags, dirs=True):
"""Return all edges between two sets of nodes.
Args:
source_flags (numpy.ndarray): Binary vector of length
``num_nodes``. A value of ``1`` marks nodes in the first
(source) set.
dest_flags (numpy.ndarray): Binary vector of length
``num_nodes``. A value of ``1`` marks nodes in the second
(destination) set.
dirs (bool): When ``True`` (default) also return direction flags
alongside the edge indices.
Returns:
tuple[numpy.ndarray, numpy.ndarray] or numpy.ndarray:
If ``dirs`` is ``True``, returns ``(edges, directions)`` where
``edges`` is a 1-D integer array of edge indices and
``directions`` is a corresponding boolean array (``True`` =
forward). If ``dirs`` is ``False``, only ``edges`` is returned.
Both arrays are empty when no edges exist between the two sets.
"""
active_subset = self.A[source_flags == 1, :][:, dest_flags == 1]
active_edges_indices = active_subset.data
if len(active_edges_indices) == 0:
return np.array([]), np.array([])
edge_lists = self.edges_repo[active_edges_indices]
result = np.array(concat_lists(edge_lists))
if dirs:
dirs_lists = self.edges_directions[active_edges_indices]
result_dirs = np.array(concat_lists(dirs_lists), dtype=bool)
return result, result_dirs
return result
[docs]
def get_nodes_edges(self, nodes):
"""Return all edge indices adjacent to a set of nodes.
Args:
nodes (array-like): Node indices whose incident edges are
requested.
Returns:
list: Flat list of edge indices (possibly with duplicates when
multiple nodes share an edge). Returns an empty list when
``nodes`` is empty or when none of the nodes have any edges.
"""
if len(nodes) == 0:
return []
active_subset = self.A[nodes]
active_edges_indices = active_subset.data
if len(active_edges_indices) == 0:
logging.warning(f"Warning: no edges for nodes {nodes}")
return []
edge_lists = self.edges_repo[active_edges_indices]
result = concat_lists(edge_lists)
return result
[docs]
def get_nodes_edges_on_layers(self, nodes, layers):
"""Return incident edge indices for a set of nodes filtered by layer.
Identical to :meth:`get_nodes_edges` but restricts the result to
edges whose layer ID appears in ``layers``.
Args:
nodes (array-like): Node indices whose incident edges are
requested.
layers (array-like): Allowed layer IDs. Only edges on one of
these layers are included in the result.
Returns:
list: Flat list of edge indices that belong to one of the
specified layers. Returns an empty list when no matching edges
are found.
"""
edges = self.get_nodes_edges(nodes)
if len(edges) == 0:
return edges
edges_layers = self.e_types[edges]
selected_edges = np.isin(edges_layers, layers)
# todo: list or convert to numpy array?
return [x
for i, x in enumerate(edges)
if selected_edges[i]
]
[docs]
def switch_off_edges(self, edges):
"""Permanently deactivate a list of edges.
Sets ``e_active`` to ``False`` for each edge in ``edges``. This
operation is independent of quarantine state and sets the effective
transmission probability to zero regardless of other factors.
Args:
edges (list): List of edge indices to deactivate.
"""
assert type(edges) == list
self.e_active[edges] = False
[docs]
def switch_on_edges(self, edges):
"""Reactivate a list of previously deactivated edges.
Reverses :meth:`switch_off_edges` by setting ``e_active`` back to
``True``. Does not interact with quarantine state.
Args:
edges (list): List of edge indices to reactivate.
"""
assert type(edges) == list
self.e_active[edges] = True
[docs]
def get_all_edges_probs(self):
"""Compute effective transmission probabilities for every edge.
Combines the raw per-edge probability (``e_probs``), the quarantine
validity flag (``e_valid``), the active flag (``e_active``) and the
layer weight into a single scalar per edge. Uses ``numexpr`` for
fast vectorised evaluation.
When ``e_valid == 2`` the edge is fully active and the probability
is ``e_probs * layer_weight``. Any other value of ``e_valid`` is
interpreted as an override probability (e.g. from quarantine) and
replaces ``e_probs`` directly.
Returns:
numpy.ndarray: Float array of length ``n_edges`` with effective
transmission probabilities in ``[0, 1]``.
"""
# probs = self.e_probs.copy()
# invalid = self.e_valid != 2
# probs[invalid] = self.e_valid[invalid]
# weights = self.layer_weights[self.e_types]
# probs[self.e_active == False] = 0
# # return ne.evaluate("probs * weights")
# return probs * weights
return ne.evaluate("active * (e_probs * (e_valid == 2) + e_valid * (e_valid != 2)) * weights",
local_dict={
'active': self.e_active,
'e_probs': self.e_probs,
'e_valid': self.e_valid,
'weights': self.e_layer_weight
}
)
[docs]
def get_edges_probs(self, edges):
"""Compute effective transmission probabilities for a subset of edges.
Applies the same logic as :meth:`get_all_edges_probs` but only for
the requested edge indices.
Args:
edges (numpy.ndarray): 1-D integer array of edge indices.
Returns:
numpy.ndarray: Float array of the same length as ``edges`` with
effective transmission probabilities in ``[0, 1]``.
"""
assert type(edges) == np.ndarray
layer_types = self.e_types[edges]
probs = self.e_probs[edges] * (self.e_valid[edges] == 2)
probs += self.e_valid[edges] * (self.e_valid[edges] != 2)
weights = self.e_layer_weight[edges]
return self.e_active[edges] * probs * weights
[docs]
def get_edges_intensities(self, edges):
"""Return the intensity values for a subset of edges.
Args:
edges (numpy.ndarray): 1-D integer array of edge indices.
Returns:
numpy.ndarray: Float array of the same length as ``edges``
containing the raw intensity value (``e_intensities``) for each
requested edge.
"""
assert type(edges) == np.ndarray
return self.e_intensities[edges]
# these methods work only with hodonin's layers
[docs]
def is_super_edge(self, edges):
"""Identify super-spreader edges (Hodonin layer convention).
Super-spreader edges are defined as those with layer ID >= 33.
Args:
edges (numpy.ndarray): 1-D integer array of edge indices.
Returns:
numpy.ndarray: Boolean array of the same length as ``edges``,
``True`` where the edge is a super-spreader edge.
"""
assert type(edges) == np.ndarray
etypes = self.e_types[edges]
return etypes >= 33
[docs]
def is_family_edge(self, edges):
"""Identify household/family edges (Hodonin layer convention).
Family edges are those with layer ID 1 or 2.
Args:
edges (numpy.ndarray): 1-D integer array of edge indices.
Returns:
numpy.ndarray: Boolean array of the same length as ``edges``,
``True`` where the edge belongs to a family layer.
"""
assert type(edges) == np.ndarray
etypes = self.e_types[edges]
return np.logical_or(etypes == 1, etypes == 2)
[docs]
def is_class_edge(self, edges, all=True):
"""Identify school/class edges (Hodonin layer convention).
School edges have layer IDs 4–11. The ``all`` flag controls whether
all school types are included or only lower school levels (layers
4–7, excluding high school and higher elementary).
Args:
edges (numpy.ndarray): 1-D integer array of edge indices.
all (bool): When ``True`` (default), include all school layers
(4–11). When ``False``, restrict to layers 4–7.
Returns:
numpy.ndarray: Boolean array of the same length as ``edges``,
``True`` where the edge is a class/school edge.
"""
assert type(edges) == np.ndarray
etypes = self.e_types[edges]
if all:
# all schools
return np.logical_and(etypes >= 4, etypes <= 11)
else:
# excepty high and higher elementary
return np.logical_and(etypes >= 4, etypes <= 7)
[docs]
def is_pub_edge(self, edges):
"""Identify pub/bar edges (Hodonin layer convention).
Pub edges have layer IDs 20, 28 or 29.
Args:
edges (numpy.ndarray): 1-D integer array of edge indices.
Returns:
numpy.ndarray: Boolean array of the same length as ``edges``,
``True`` where the edge represents a pub/bar contact.
"""
assert type(edges) == np.ndarray
etypes = self.e_types[edges]
# pubs
return np.logical_or(etypes == 20,
np.logical_or(etypes == 28, etypes == 29))
[docs]
def modify_layers_for_nodes(self, node_id_list, what_by_what):
"""Apply quarantine by reducing edge probabilities for given nodes.
For every edge incident to a node in ``node_id_list`` that is not
already quarantined (``e_valid == 2``), the effective probability is
replaced by ``e_probs * coef`` (clipped to ``[0, 1]``), where
``coef`` is the coefficient for that edge's layer taken from
``what_by_what``.
The per-node quarantine counter ``is_quarantined`` is incremented so
that edges shared with still-quarantined nodes are not accidentally
restored by a partial release.
Args:
node_id_list (array-like): Positional indices of nodes to
quarantine.
what_by_what (dict): Mapping of ``{layer_id: coefficient}``
specifying the multiplicative reduction per layer. Must not
be empty.
Raises:
ValueError: If ``what_by_what`` is falsy (empty or ``None``).
"""
if self.is_quarantined is None:
self.is_quarantined = np.zeros(self.number_of_nodes, dtype=int)
self.is_quarantined[node_id_list] += 1
if not what_by_what:
raise ValueError("what_by_what missing or empty")
relevant_edges = np.unique(self.get_nodes_edges(node_id_list))
if len(relevant_edges) == 0:
return
# select edges to change (those who are not in quarantine)
valid = self.e_valid[relevant_edges]
relevant_edges = relevant_edges[valid == 2]
edges_types = self.e_types[relevant_edges]
logging.info(f"DBG edges that goes to quarantinexs {len(relevant_edges)}")
# print(edges_types)
for layer_type, coef in what_by_what.items():
# print(layer_type)
edges_on_this_layer = relevant_edges[edges_types == layer_type]
# modify their probabilities
self.e_valid[edges_on_this_layer] = self.e_probs[edges_on_this_layer] * coef
# use out in clip to work inplace
np.clip(self.e_valid[edges_on_this_layer], 0.0,
1.0, out=self.e_valid[edges_on_this_layer])
[docs]
def recover_edges_for_nodes(self, release):
"""Restore original edge probabilities when nodes leave quarantine.
Decrements the quarantine counter for each node in ``release``.
For nodes whose counter reaches zero (i.e. no longer quarantined on
any count), edges that connect two fully released nodes have their
``e_valid`` flag reset to ``2`` (fully active). Edges that still
touch a quarantined node are left unchanged.
Args:
release (numpy.ndarray): Positional indices of nodes being
released from quarantine.
"""
self.is_quarantined[release] -= 1
assert np.all(self.is_quarantined >= 0)
no_quara = self.is_quarantined[release] == 0
release = release[no_quara]
if len(release) == 0:
return
relevant_edges = np.unique(self.get_nodes_edges(release))
if len(relevant_edges) == 0:
logging.warning("Warning: recovering nodes with no edges")
return
# from source and dest nodes select those who are not in quarantine
source_nodes = self.e_source[relevant_edges]
dest_nodes = self.e_dest[relevant_edges]
is_quarrantined_source = self.is_quarantined[source_nodes]
is_quarrantined_dest = self.is_quarantined[dest_nodes]
# recover only edges where both nodes are free
relevant_edges = relevant_edges[np.logical_not(
np.logical_or(is_quarrantined_source, is_quarrantined_dest))]
# recover probs
self.e_valid[relevant_edges] = 2
[docs]
def final_adjacency_matrix(self):
"""Return self for backward compatibility with older graph interfaces.
Earlier versions of the model expected a separate adjacency-matrix
object. This shim allows code written against that interface to work
without modification.
Returns:
LightGraph: The graph instance itself.
"""
return self
[docs]
def get_layer_for_edge(self, e):
"""Return the layer ID for a single edge.
Args:
e (int): Edge index.
Returns:
int: Layer ID of edge ``e``.
"""
return self.e_types[e]
[docs]
def set_layer_weights(self, weights):
"""Replace the layer-weight vector and rebuild per-edge weight cache.
The per-edge weight array ``e_layer_weight`` is recomputed in-place
after the update so that subsequent probability calculations use the
new weights immediately.
Args:
weights (array-like): New weight values indexed by layer ID.
Must be the same length as the current ``layer_weights``
array.
"""
logging.info(f"DBG Updating layer weights {weights}")
np.copyto(self.layer_weights, weights)
# print(self.layer_weights[i])
self.e_layer_weight = self.layer_weights[self.e_types]
logging.info(f"DBG new weigths {self.layer_weights}")
[docs]
def close_layers(self, list_of_layers, coefs=None):
"""Set the weight of named layers to zero (or a custom coefficient).
Useful for simulating non-pharmaceutical interventions such as school
or workplace closures.
Args:
list_of_layers (list[str]): Names of layers to close, matched
against ``self.layer_name``.
coefs (list[float], optional): Per-layer replacement weights.
When provided, ``coefs[idx]`` is used instead of ``0`` for
the layer at position ``idx`` in ``list_of_layers``. When
``None`` all listed layers are set to weight ``0``.
"""
print(f"Closing {list_of_layers}")
for idx, name in enumerate(list_of_layers):
i = self.layer_name.index(name)
self.layer_weights[i] = 0 if not coefs else coefs[idx]
print(self.layer_weights)
[docs]
def copy(self):
"""Return an optimised partial copy of the graph for a new simulation run.
The vast majority of graph fields (topology, node attributes, edge
metadata) are immutable between runs and are therefore shared via a
shallow copy. Only the mutable, run-specific arrays—``e_valid``,
``layer_weights``, ``e_active`` and ``e_layer_weight``—are deep-
copied and reset to their default values. The quarantine tracker
``is_quarantined`` is reset to ``None``.
Returns:
LightGraph: A new ``LightGraph`` instance that shares topology
with the original but has independent mutable state.
"""
heavy_fields = ['e_valid', 'layer_weights',
'e_active', 'e_layer_weight']
new = copy(self)
for key in heavy_fields:
field = getattr(self, key)
setattr(new, key, field.copy())
new.is_quarantined = None
new.e_valid.fill(2)
new.e_active.fill(True)
return new