Source code for engine_m

"""Multi-layer-graph sequential engine (EngineM).

This module defines :class:`EngineM`, which extends
:class:`~models.engine_sequential.SequentialEngine` to work directly with a
multi-layer graph object instead of a sparse adjacency matrix.  Edge-level
transmission probabilities are computed using the graph's own edge-probability
and edge-intensity queries, taking into account family vs. non-family contact
layers and asymptomatic infectiousness reduction.
"""

import pandas as pd
import numpy as np
import numpy_indexed as npi
import scipy as scipy
import scipy.integrate
import networkx as nx
import time
from operator import itemgetter

from utils.history_utils import TimeSeries, TransitionHistory
from models.engine_sequential import SequentialEngine
# from extended_network_model import STATES as s
from utils.sparse_utils import prop_of_row


[docs] class STATES(): """State codes for :class:`EngineM` (mirrors :class:`~models.engine_sequential.STATES`). Attributes: S (int): Susceptible. S_s (int): Susceptible with false symptoms. E (int): Exposed. I_n (int): Infectious asymptomatic (non-symptomatic track). I_a (int): Infectious pre-symptomatic. I_s (int): Infectious symptomatic. I_ds (int): Infectious symptomatic, detected. J_s (int): Post-infectious symptomatic. J_n (int): Post-infectious asymptomatic. E_d (int): Exposed, detected. I_da (int): Infectious pre-symptomatic, detected. I_dn (int): Infectious asymptomatic, detected. J_ds (int): Post-infectious symptomatic, detected. J_dn (int): Post-infectious asymptomatic, detected. R_d (int): Recovered, detected. R_u (int): Recovered, undetected. D_d (int): Dead, detected. D_u (int): Dead, undetected. """ S = 0 S_s = 1 E = 2 I_n = 3 I_a = 4 I_s = 5 I_ds = 6 J_s = 11 J_n = 12 E_d = 13 I_da = 14 I_dn = 15 J_ds = 16 J_dn = 17 R_d = 7 R_u = 8 D_d = 9 D_u = 10 pass
def _searchsorted2d(a, b): """Vectorised row-wise ``numpy.searchsorted`` (see engine_sequential for details). Args: a (numpy.ndarray): 2-D sorted array of shape ``(m, n)``. b (numpy.ndarray): Query values, shape ``(m,)`` or ``(m, 1)``. Returns: numpy.ndarray: 1-D insertion-index array of shape ``(m,)``. """ m, n = a.shape max_num = np.maximum(a.max() - a.min(), b.max() - b.min()) + 1 r = max_num * np.arange(a.shape[0])[:, None] p = np.searchsorted((a+r).ravel(), (b+r).ravel(), side="right") return p - n*np.arange(m)
[docs] class EngineM(SequentialEngine): """Multi-layer-graph daily-step engine (the production engine for Model-M). Extends :class:`~models.engine_sequential.SequentialEngine` with: * Multi-layer graph support via ``self.graph`` (no sparse adjacency matrix). * Detailed per-edge transmission computation in :meth:`prob_of_contact`, accounting for edge type (family / class / pub / super-spreader), edge intensity, and asymptomatic infectiousness reduction. This engine is used by the TGM (Task Group Model) model variant. """
[docs] def update_graph(self, new_G): """Store the multi-layer graph object and update the node count. Unlike the parent implementation, this method does not build a sparse adjacency matrix; the engine queries the graph object directly. Args: new_G: A multi-layer graph object exposing ``num_nodes`` and graph-query methods (e.g. ``get_edges``, ``get_edges_probs``). """ self.G = new_G # just for backward compability self.graph = new_G self.num_nodes = self.graph.num_nodes
# print(f"DBD: graph udpate {self.graph}")
[docs] def node_degrees(self, Amat): """Not implemented: EngineM uses the graph object directly. Raises: NotImplementedError: Always. """ raise NotImplementedError("We use the graph directly, not matrix.")
[docs] def prob_of_contact(self, source_states, source_candidate_states, dest_states, dest_candidate_states, beta, beta_in_family): """Compute per-node exposure probability using the multi-layer graph. Selects active edges by flipping per-edge coins (edge probability from the graph), then determines which active edges connect nodes in the relevant source/destination states, and finally computes the compound probability that each source node is *not* infected (product over independent contacts) and returns ``1 - P(no infection)``. Distinguishes between family-type edges (using *beta_in_family* / ``self.beta_A_in_family``) and other edges (using *beta* / ``self.beta_A``), and reduces transmission rate for asymptomatic infectious nodes. Args: source_states (list of int): States that can become exposed. source_candidate_states (list of int): Candidate source states for edge selection (superset of *source_states*). dest_states (list of int): Infectious states. dest_candidate_states (list of int): Candidate destination states for edge selection (superset of *dest_states*). beta (numpy.ndarray): Per-node baseline transmission rate for non-family contacts, shape ``(num_nodes, 1)``. beta_in_family (numpy.ndarray): Per-node baseline transmission rate for family contacts, shape ``(num_nodes, 1)``. Returns: numpy.ndarray: Per-node exposure probability, shape ``(num_nodes, 1)``. """ assert type(dest_states) == list and type(source_states) == list # 1. select active edges # candidates for active edges are edges between source_candidate_states and dest_candidate_states # source (the one who is going to be infected) # dest (the one who can offer infection) s = time.time() source_candidate_flags = self.memberships[source_candidate_states, :, :].reshape( len(source_candidate_states), self.num_nodes).sum(axis=0) # source_candidate_indices = source_candidate_flags.nonzero()[0] dest_candidate_flags = self.memberships[dest_candidate_states, :, :].reshape( len(dest_candidate_states), self.num_nodes).sum(axis=0) # dest_candidate_indices = dest_candidate_flags.nonzero()[0] e = time.time() print("Create flags", e-s) s = time.time() possibly_active_edges, possibly_active_edges_dirs = self.graph.get_edges( source_candidate_flags, dest_candidate_flags ) num_possibly_active_edges = len(possibly_active_edges) e = time.time() print("Select possibly active", e-s) if self.t == 6: print("DBG SUPERSPREAD possibly active edges ", num_possibly_active_edges) s = time.time() if num_possibly_active_edges == 0: return np.zeros((self.num_nodes, 1)) # for each possibly active edge flip coin r = np.random.rand(num_possibly_active_edges) e = time.time() print("Random numbers: ", e-s) s = time.time() # edges probs p = self.graph.get_edges_probs(possibly_active_edges) e = time.time() print("Get intensities:", e-s) s = time.time() active_indices = (r < p).nonzero()[0] num_active_edges = len(active_indices) if num_active_edges == 0: return np.zeros((self.num_nodes, 1)) if self.t == 6: print("DBG SUPERSPREAD really active edges ", num_active_edges) # select active edges # if num_active_edges == 1: # active_edges = [possibly_active_edges[active_indices[0]]] # active_edges_dirs = [possibly_active_edges_dirs[active_indices[0]]] # else: # active_edges = list(itemgetter(*active_indices) # (possibly_active_edges)) # active_edges_dirs = list(itemgetter( # *active_indices)(possibly_active_edges_dirs)) active_edges = possibly_active_edges[active_indices] active_edges_dirs = possibly_active_edges_dirs[active_indices] e = time.time() print("Select active edges:", e-s) s = time.time() # get source and dest nodes for active edges # source and dest met today, dest is possibly infectious, source was possibly infected source_nodes, dest_nodes = self.graph.get_edges_nodes( active_edges, active_edges_dirs) # add to contact_history (infectious node goes first!!!) contact_indices = list(zip(dest_nodes, source_nodes, active_edges)) self.contact_history.append(contact_indices) if self.t == 6: contact_numbers = {} for e in active_edges: layer_number = self.graph.get_layer_for_edge(e) contact_numbers[layer_number] = contact_numbers.get( layer_number, 0) + 1 print("DBG contact numbers ", ", ".join([f"{layer_type}:{number}" for layer_type, number in contact_numbers.items()]) ) # print("Potkali se u kolina:", contact_indices) print("Todays contact num:", len(contact_indices)) e = time.time() print("Archive active edges:", e-s) # restrict the selection to only relevant states # (ie. candidates can be both E and I, relevant are only I) # candidates are those who will be possibly relevant in future s = time.time() dest_flags = self.memberships[dest_states, :, :].reshape( len(dest_states), self.num_nodes).sum(axis=0) source_flags = self.memberships[source_states, :, :].reshape( len(source_states), self.num_nodes).sum(axis=0) relevant_edges, relevant_edges_dirs = self.graph.get_edges( source_flags, dest_flags) if len(relevant_edges) == 0: return np.zeros((self.num_nodes, 1)) # get intersection active_relevant_edges = np.intersect1d(active_edges, relevant_edges) if len(active_relevant_edges) == 0: return np.zeros((self.num_nodes, 1)) # print(active_relevant_edges.shape) # print(relevant_edges.shape) # print((active_relevant_edges[:, np.newaxis] == relevant_edges).shape) try: # this causes exceptin, but only sometimes ??? # where_indices = ( # active_relevant_edges[:, np.newaxis] == relevant_edges).nonzero()[1] # lets try npi instead where_indices = npi.indices(relevant_edges, active_relevant_edges) except AttributeError as e: print(e) print("Lucky we are ...") print(active_relevant_edges.shape) np.save("active_relevant_edges", active_relevant_edges) print(relevant_edges.shape) np.save("relevant_edges", relevant_edges) print(active_relevant_edges[:, np.newaxis] == relevant_edges) exit() # print(where_indices, len(where_indices)) # always one index! (sources and dest must be disjunct) active_relevant_edges_dirs = relevant_edges_dirs[where_indices] e = time.time() print("Get relevant active edges:", e-s) s = time.time() intensities = self.graph.get_edges_intensities( active_relevant_edges).reshape(-1, 1) relevant_sources, relevant_dests = self.graph.get_edges_nodes( active_relevant_edges, active_relevant_edges_dirs) is_family_edge = self.graph.is_family_edge( active_relevant_edges).reshape(-1, 1) """ pro experiment s hospodama jenom: """ # list of no-masks layers into config if True: is_class_edge = self.graph.is_class_edge( active_relevant_edges).reshape(-1, 1) is_pub_edge = self.graph.is_pub_edge( active_relevant_edges).reshape(-1, 1) is_super_edge = self.graph.is_super_edge( active_relevant_edges).reshape(-1, 1) is_family_edge = np.logical_or.reduce((is_family_edge, is_class_edge, is_super_edge, is_pub_edge)) """ else: is_class_edge = self.graph.is_class_edge( active_relevant_edges).reshape(-1, 1) is_family_edge = np.logical_or(is_family_edge, is_class_edge) """ # assert len(relevant_sources) == len(set(relevant_sources)) # TOD beta - b_ij # new beta depands on the one who is going to be infected # b_intensities = beta[relevant_sources] # b_f_intensities = beta_in_family[relevant_sources] # reduce asymptomatic is_A = ( self.memberships[STATES.I_a][relevant_dests] + self.memberships[STATES.I_n][relevant_dests] + self.memberships[STATES.I_dn][relevant_dests] + self.memberships[STATES.I_da][relevant_dests] ) b_original_intensities = ( beta_in_family[relevant_sources] * (1 - is_A) + self.beta_A_in_family[relevant_sources] * is_A ) b_reduced_intensities = ( beta[relevant_sources] * (1 - is_A) + self.beta_A[relevant_sources] * is_A ) b_intensities = ( b_original_intensities * is_family_edge + b_reduced_intensities * (1 - is_family_edge) ) # print(beta[relevant_sources].shape) # print(is_family_edge) # print(b_intensities.shape) # exit() assert b_intensities.shape == intensities.shape # print(b_intensitites.shape, intensities.shape, # prob_of_no_infection.shape) relevant_sources_unique, unique_indices = np.unique( relevant_sources, return_inverse=True) no_infection = (1 - b_intensities * intensities).ravel() res = np.ones(len(relevant_sources_unique), dtype='float32') for i in range(len(unique_indices)): res[unique_indices[i]] = res[unique_indices[i]]*no_infection[i] prob_of_no_infection = res # prob_of_no_infection2 = np.fromiter((np.prod(no_infection, where=(relevant_sources==v).T) # for v in relevant_sources_unique), dtype='float32') result = np.zeros(self.num_nodes) result[relevant_sources_unique] = 1 - prob_of_no_infection e = time.time() print("Comp probability", e-s) return result.reshape(self.num_nodes, 1)