"""Information-diffusion and rumour-spreading model classes.
This module defines several simulation models that reuse the
:class:`~models.simulation_engine.SimulationEngine` infrastructure for
information / opinion spreading rather than epidemic disease:
* :class:`STATES` / :class:`Tipping` – state-code enumerations.
* :class:`RumourModel` – basic SIR rumour-spreading model.
* :class:`RumourModelInfo` – extended rumour model with time-varying
transmission and event-driven boost.
* :class:`InfoSIRModel` – SIR model driven by graph edge probabilities.
* :class:`InfoTippingModel` – threshold-based (tipping-point) adoption model.
"""
import json
import numpy as np
import pandas as pd
import time
import logging
from models.simulation_engine import SimulationEngine
from utils.random_utils import RandomDuration
from utils.random_utils import gen_tuple
from utils.history_utils import TimeSeries, TransitionHistory, ShortListSeries
from utils.global_configs import monitor
import utils.global_configs as global_configs
[docs]
class STATES():
"""Simple SIR state codes for information-diffusion models.
Attributes:
S (int): Susceptible (uninformed).
I (int): Infected / informed (spreading the rumour).
R (int): Recovered (stopped spreading).
EXT (int): External node.
"""
S = 0
I = 1
R = 2
EXT = 10
[docs]
class Tipping():
"""State codes for the threshold-based tipping-point model.
Attributes:
S (int): Susceptible (not yet adopted).
ACTIVE (int): Active (adopted / tipped).
EXT (int): External node.
"""
S = 0
ACTIVE = 1
EXT = 10
[docs]
class RumourModel(SimulationEngine):
"""Basic SIR rumour-spreading model on a contact network.
Agents transition S → I when a neighbour is I (with rate ``lambda0``),
and I → R after a fixed duration ``I_duration``.
Inherits the plan-based stepping logic from
:class:`~models.simulation_engine.SimulationEngine`.
"""
states = [
STATES.S,
STATES.I,
STATES.R,
STATES.EXT
]
num_states = len(states)
state_str_dict = {
STATES.S: "S",
STATES.I: "I",
STATES.R: "R",
STATES.EXT: "EXT"
}
ext_code = STATES.EXT
transitions = [
(STATES.S, STATES.I),
(STATES.I, STATES.R)
]
num_transitions = len(transitions)
final_states = [
STATES.R
]
invisible_states = [
STATES.EXT
]
unstable_states = [
STATES.I
]
fixed_model_parameters = {
"I_duration": (1, "time in the I state"),
"beta": (0, "rate of transmission (exposure)")
}
[docs]
def inicialization(self):
"""Delegate to parent initialiser (no extra setup needed)."""
super().inicialization()
[docs]
def setup_series_and_time_keeping(self):
"""Delegate to parent time-series setup."""
super().setup_series_and_time_keeping()
[docs]
def states_and_counts_init(self, ext_nodes=None, ext_code=None):
"""Initialise counts and flag S nodes for daily checks.
Args:
ext_nodes (int, optional): Number of external nodes.
ext_code (int, optional): State code for external nodes.
"""
super().states_and_counts_init(ext_nodes, ext_code)
# need_check - state that needs regular checkup
self.need_check = self.memberships[STATES.S]
self.update_plan(np.ones(self.num_nodes, dtype=bool))
[docs]
def daily_update(self, nodes):
"""Perform daily rumour-spreading check for susceptible nodes.
Calls :meth:`prob_of_contact` and schedules exposed S nodes to move
to I on the next day.
Args:
nodes (numpy.ndarray): Boolean bitmap of nodes needing a check.
"""
# S
target_nodes = self._get_target_nodes(nodes, STATES.S)
# try infection
exposed_nodes = self.prob_of_contact(
STATES.S,
STATES.I,
self.lambda0,
).flatten()
self.time_to_go[exposed_nodes] = 1
self.state_to_go[exposed_nodes] = STATES.I
[docs]
def update_plan(self, nodes):
"""Set transition plans for nodes that just changed state.
* S: no scheduled transition; flagged for daily checks.
* I: scheduled to move to R after ``I_duration`` days.
* R: no further transition.
Args:
nodes (numpy.ndarray): Boolean bitmap of nodes to update.
"""
# STATES.S: "S",
target_nodes = self._get_target_nodes(nodes, STATES.S)
self.time_to_go[target_nodes] = -1
self.state_to_go[target_nodes] = STATES.S
self.need_check[target_nodes] = True
# STATES.I: "I"
target_nodes = self._get_target_nodes(nodes, STATES.I)
self.time_to_go[target_nodes] = self.I_duration
self.state_to_go[target_nodes] = STATES.R
self.need_check[target_nodes] = False
# STATES.R: "R",
target_nodes = self._get_target_nodes(nodes, STATES.R)
self.time_to_go[target_nodes] = -1
self.state_to_go[target_nodes] = -1
self.need_check[target_nodes] = False
[docs]
def run_iteration(self):
"""Delegate to parent run_iteration (no extra logic needed)."""
super().run_iteration()
[docs]
class RumourModelInfo(RumourModel):
"""Extended rumour model with time-decaying transmission and event boost.
Extends :class:`RumourModel` with:
* A time-decaying per-node transmission rate:
``lambda = lambda0 * exp(-scale * time_in_I)``.
* An optional event at time ``t_event`` that adds a temporary boost to
lambda: ``lambda += event_boost * exp(-decay * (t - t_event))``.
* A stochastic I → R transition each day (probability ``beta_duration``).
The I state is maintained indefinitely until a daily Bernoulli trial
(probability ``beta_duration``) triggers recovery.
"""
fixed_model_parameters = {
"lambda0": (0.001, "base rate of transmission"),
"scale": (1.0, "scaling factor for the transmission probability"),
"beta_duration": (0.02,"probability of ending the I state at each step"),
#I_duration": (48, "time in the I state"),
"t_event": (95, "time of the event that increases the spread"),
"event_boost": (0.02, "boost in transmission rate after event"),
"decay": (0.06, "decay rate of the increased spread after event"),
"init_I": (12, "initial number of infected nodes")
}
[docs]
def daily_update(self, nodes):
"""Perform daily spreading check and stochastic I → R transition.
Calls the parent :meth:`RumourModel.daily_update`, then for each I
node draws a Bernoulli coin (probability ``beta_duration``) to decide
if it recovers today.
Args:
nodes (numpy.ndarray): Boolean bitmap of nodes needing a check.
"""
super().daily_update(nodes)
nodes_in_I = self._get_target_nodes(nodes, STATES.I)
r = np.random.rand(len(nodes_in_I))
end_I = r < self.beta_duration
self.time_to_go[nodes_in_I] = -1
self.time_to_go[end_I] = 1
self.state_to_go[end_I] = STATES.R
[docs]
def update_plan(self, nodes):
"""Set transition plans for RumourModelInfo nodes.
* S: no scheduled transition; flagged for daily checks.
* I: no fixed deadline; flagged for daily checks (recovery is
decided stochastically each day).
* R: no further transition.
Args:
nodes (numpy.ndarray): Boolean bitmap of nodes to update.
"""
# STATES.S: "S",
target_nodes = self._get_target_nodes(nodes, STATES.S)
self.time_to_go[target_nodes] = -1
self.state_to_go[target_nodes] = STATES.S
self.need_check[target_nodes] = True
# STATES.I: "I"
target_nodes = self._get_target_nodes(nodes, STATES.I)
self.time_to_go[target_nodes] = -1
self.state_to_go[target_nodes] = STATES.I
self.need_check[target_nodes] = True
# STATES.R: "R",
target_nodes = self._get_target_nodes(nodes, STATES.R)
self.time_to_go[target_nodes] = -1
self.state_to_go[target_nodes] = -1
self.need_check[target_nodes] = False
[docs]
class InfoSIRModel(SimulationEngine):
"""SIR information-spreading model driven by graph edge probabilities.
Uses the full edge-probability machinery from the graph object (rather than
the simple per-edge coin flip of :class:`RumourModel`) to activate contacts
each day. The effective transmission rate per active edge is given by
the per-node ``beta`` parameter multiplied by the edge intensity.
Compartments: S (uninformed), I (spreading), R (stopped), EXT (external).
"""
states = [
STATES.S,
STATES.I,
STATES.R,
STATES.EXT
]
num_states = len(states)
state_str_dict = {
STATES.S: "S",
STATES.I: "I",
STATES.R: "R",
STATES.EXT: "EXT"
}
ext_code = STATES.EXT
transitions = [
(STATES.S, STATES.I),
(STATES.I, STATES.R)
]
num_transitions = len(transitions)
final_states = [
STATES.R
]
invisible_states = [
STATES.EXT
]
unstable_states = [
STATES.I
]
fixed_model_parameters = {
"I_duration": (1, "time in the I state"),
}
model_parameters = {
"beta": (0, "rate of transmission (exposure)"),
}
[docs]
def inicialization(self):
"""Delegate to parent initialiser (no extra setup needed)."""
super().inicialization()
[docs]
def setup_series_and_time_keeping(self):
"""Delegate to parent time-series setup."""
super().setup_series_and_time_keeping()
[docs]
def states_and_counts_init(self, ext_nodes=None, ext_code=None):
"""Initialise counts and flag S nodes for daily infection checks.
Args:
ext_nodes (int, optional): Number of external nodes.
ext_code (int, optional): State code for external nodes.
"""
super().states_and_counts_init(ext_nodes, ext_code)
# need_check - state that needs regular checkup
self.need_check = self.memberships[STATES.S]
self.update_plan(np.ones(self.num_nodes, dtype=bool))
[docs]
def daily_update(self, nodes):
"""Perform daily infection check for susceptible nodes.
Calls :meth:`prob_of_contact` and schedules exposed S nodes to move
to I on the next day. External nodes are not yet supported.
Args:
nodes (numpy.ndarray): Boolean bitmap of nodes needing a check.
Raises:
NotImplementedError: If external nodes are present.
"""
# S
target_nodes = self._get_target_nodes(nodes, STATES.S)
# if we have external nodes
if self.num_ext_nodes > 0:
raise NotImplementedError("External nodes not implemented yet.")
# try infection
P_infection = self.prob_of_contact(
STATES.S,
STATES.I,
self.beta
).flatten()
exposed = P_infection[target_nodes]
exposed_mask = np.zeros(self.num_nodes, dtype=bool)
exposed_mask[target_nodes] = (exposed == 1)
print("Number of infected", exposed_mask.sum())
if exposed_mask.sum() > 0:
print(exposed_mask)
self.time_to_go[exposed_mask] = 1
self.state_to_go[exposed_mask] = STATES.I
[docs]
def update_plan(self, nodes):
"""Set transition plans for InfoSIRModel nodes.
* S: no scheduled transition; flagged for daily checks.
* I: scheduled to move to R after ``I_duration`` days.
* R: no further transition.
Args:
nodes (numpy.ndarray): Boolean bitmap of nodes to update.
"""
# STATES.S: "S",
target_nodes = self._get_target_nodes(nodes, STATES.S)
self.time_to_go[target_nodes] = -1
self.state_to_go[target_nodes] = STATES.S
self.need_check[target_nodes] = True
# STATES.I: "I"
target_nodes = self._get_target_nodes(nodes, STATES.I)
self.time_to_go[target_nodes] = self.I_duration
self.state_to_go[target_nodes] = STATES.R
self.need_check[target_nodes] = False
# STATES.R: "R",
target_nodes = self._get_target_nodes(nodes, STATES.R)
self.time_to_go[target_nodes] = -1
self.state_to_go[target_nodes] = -1
self.need_check[target_nodes] = False
[docs]
def run_iteration(self):
"""Delegate to parent run_iteration (no extra logic needed)."""
super().run_iteration()
[docs]
class InfoTippingModel(SimulationEngine):
"""Threshold-based (tipping-point) adoption model on a contact network.
An agent adopts (S → ACTIVE) when the weighted fraction of its active
neighbours exceeds its personal threshold ``theta``. Once active, the
agent remains active indefinitely.
Edge weights are the graph's ``e_intensities`` values, and daily contact
is stochastic (activated by edge probabilities).
"""
states = [
Tipping.S,
Tipping.ACTIVE,
Tipping.EXT
]
num_states = len(states)
state_str_dict = {
Tipping.S : "S",
Tipping.ACTIVE: "Active",
Tipping.EXT: "Ext"
}
ext_code = STATES.EXT
transitions = [
(Tipping.S, Tipping.ACTIVE)
]
num_transitions = len(transitions)
model_parameters = {
"theta": (0, "threshold"),
}
[docs]
def states_and_counts_init(self, ext_nodes=None, ext_code=None):
"""Initialise counts and flag S nodes for daily tipping checks.
Args:
ext_nodes (int, optional): Number of external nodes.
ext_code (int, optional): State code for external nodes.
"""
super().states_and_counts_init(ext_nodes, ext_code)
# need_check - state that needs regular checkup
self.need_check = self.memberships[Tipping.S]
self.update_plan(np.ones(self.num_nodes, dtype=bool))
def _transmission(self):
"""Return a boolean bitmap of nodes that tip (S → ACTIVE) today.
For each S node, activates edges stochastically, counts the weighted
fraction of active neighbours, and flips the node to ACTIVE if the
fraction exceeds ``self.theta[node]``.
Returns:
numpy.ndarray: Boolean array of shape ``(num_nodes,)``; ``True``
where a node becomes active.
"""
ret = np.zeros(self.num_nodes, dtype=bool)
active_nodes = self.memberships[Tipping.S]
edges_probs = self.graph.get_all_edges_probs()
num_edges = len(edges_probs)
print("num edges", num_edges)
r = np.random.rand(num_edges)
active_edges = (r < edges_probs) #.nonzero()[0] #bitmap
for i, node in enumerate(self.graph.nodes):
# bitmap of node's edges
my_edges = self.graph.e_source == node
print("my_edges", my_edges.shape)
# keep only those that are active
my_edges = np.logical_and(
my_edges,
active_edges
).nonzero()[0]
print("my_edges (after logical and)", my_edges.shape, my_edges.dtype)
if len(my_edges) == 0:
continue
# take destination nodes
my_neighbours = self.graph.e_dest[my_edges]
print("my neighbours", my_neighbours.shape)
active_neighbours = my_neighbours[(self.memberships[Tipping.ACTIVE] == 1)[my_neighbours].ravel()]
print("active neighbours", active_neighbours.shape)
my_edges_to_active = np.isin(self.graph.e_dest[my_edges], active_neighbours)
print("my_edges_to_active", my_edges_to_active.shape)
sum_all = self.graph.e_intensities[my_edges].sum()
sum_active = self.graph.e_intensities[my_edges][my_edges_to_active].sum()
if sum_active / sum_all > self.theta[node]:
ret[i] = 1.0
return ret
[docs]
def daily_update(self, nodes):
"""Perform daily tipping check: activate nodes whose threshold is met.
Calls :meth:`_transmission` and schedules newly tipping nodes to
move to ACTIVE on the next day. External nodes are not supported.
Args:
nodes (numpy.ndarray): Boolean bitmap of nodes needing a check.
Raises:
NotImplementedError: If external nodes are present.
"""
# S
target_nodes = self._get_target_nodes(nodes, Tipping.S)
# if we have external nodes
if self.num_ext_nodes > 0:
raise NotImplementedError("External nodes not present in Tipping Model.")
# try infection
transmission = self._transmission().flatten()
self.time_to_go[transmission] = 1
self.state_to_go[transmission] = Tipping.ACTIVE
[docs]
def update_plan(self, nodes):
"""Set transition plans for InfoTippingModel nodes.
* S: no scheduled transition; flagged for daily tipping checks.
* ACTIVE: stays ACTIVE indefinitely; no further scheduled transition.
Args:
nodes (numpy.ndarray): Boolean bitmap of nodes to update.
"""
# STATES.S: "S",
target_nodes = self._get_target_nodes(nodes, Tipping.S)
self.time_to_go[target_nodes] = -1
self.state_to_go[target_nodes] = Tipping.S
self.need_check[target_nodes] = True
# STATES.Active: "Active"
target_nodes = self._get_target_nodes(nodes, Tipping.ACTIVE)
self.time_to_go[target_nodes] = -1
self.state_to_go[target_nodes] = Tipping.ACTIVE
self.need_check[target_nodes] = False