"""Highly configurable custom policy for the MAIS epidemic simulation.
This module provides :class:`CustomPolicy`, the primary policy used in
production simulations. It orchestrates:
* Layer-weight changes read from a scenario calendar file.
* Model parameter changes (beta, theta, test rate).
* Face-mask effectiveness updates.
* Superspreader event toggling.
* Forced infections on a specified day.
* Daily background import of exposed individuals.
* Dynamic start/stop of sub-policies loaded by name.
"""
import pandas as pd
import numpy as np
import json
from functools import partial, reduce
import logging
from policies.policy import Policy
from utils.policy_utils import load_scenario_dict
def _load_dictionary(filename: str):
"""Load a time-indexed dictionary from a JSON or CSV file.
JSON files are expected to be objects with string keys (simulation
day numbers) and arbitrary values. CSV files must have a ``T``
column and a second column whose values are used.
Args:
filename (str or None): Path to the file, or ``None``.
Returns:
dict or None: Mapping from ``int`` day to the corresponding
value, or ``None`` if ``filename`` is ``None``.
"""
if filename is None:
return None
if filename.endswith(".json"):
with open(filename, "r") as f:
return {
int(key): value
for key, value in json.load(f).items()
}
else:
df = pd.read_csv(filename)
return dict(zip(df["T"], df.iloc[:, 1]))
[docs]
class CustomPolicy(Policy):
"""Highly configurable orchestration policy for MAIS simulations.
Controls layer weights, model parameters (beta, theta, test rate),
face-mask effectiveness, superspreader events, forced infections,
background import of exposed individuals, and the lifecycle of
dynamically loaded sub-policies.
All time-indexed inputs are read from external files (JSON or CSV)
keyed on integer simulation-day numbers.
Args:
graph: The contact network graph object.
model: The epidemic model instance.
layer_changes_filename (str, optional): Path to a scenario
calendar file for layer-weight updates.
param_changes_filename (str, optional): Temporarily disabled.
Path to a JSON file of model-parameter changes.
policy_calendar_filename (str, optional): Path to a JSON file
mapping simulation days to ``[action, policy_string]``
pairs for starting/stopping sub-policies.
beta_factor_filename (str, optional): Path to a
JSON/CSV file with per-day beta-factor multipliers.
face_masks_filename (str, optional): Path to a JSON/CSV file
with per-day face-mask compliance values.
theta_filename (str, optional): Path to a JSON/CSV file with
per-day ``theta_Is`` multipliers.
test_rate_filename (str, optional): Path to a JSON/CSV file
with per-day test-rate multipliers.
superspreader_date (int or str, optional): Simulation day on
which the superspreader layer is activated (one day only).
superspreader_layer (int or str, optional): Index of the
superspreader layer (default 31).
force_infect (int or str, optional): Simulation day on which
one node on ``force_infect_layer`` is forcibly infected.
force_infect_layer (int or str, optional): Layer index used to
select the node for forced infection.
init_filename (str, optional): Path to a JSON file mapping days
to the number of nodes moved to state E on that day.
reduction_coef1 (float or str): Required. Primary beta
reduction coefficient (face-mask effect on non-family
contacts).
reduction_coef2 (float or str): Required. Secondary beta
reduction coefficient (face-mask spillover into family).
new_beta (str, optional): ``"Yes"`` to use the updated beta
calculation combining face-mask compliance and beta factors.
daily_import (float or str, optional): Daily probability of
importing one exposed individual (value in [0, 1]).
**kwargs: Sub-policy keyword arguments. Pass
``sub_policies`` (list of names) together with
``<name>_filename``, ``<name>_name``, and optionally
``<name>_config`` for each sub-policy.
Raises:
ValueError: If ``param_changes_filename`` is provided (temporarily
disabled), if an unknown action is found in the policy
calendar, if ``new_beta=Yes`` but the required calendars are
absent or mismatched, or if ``reduction_coef1``/
``reduction_coef2`` are missing.
"""
[docs]
def __init__(self,
graph,
model,
layer_changes_filename=None,
param_changes_filename=None,
policy_calendar_filename=None,
beta_factor_filename=None,
face_masks_filename=None,
theta_filename=None,
test_rate_filename=None,
superspreader_date=None,
superspreader_layer=None,
force_infect=None,
force_infect_layer=None,
init_filename=None,
reduction_coef1=None,
reduction_coef2=None,
new_beta=None,
daily_import=None,
**kwargs):
"""Initialise the custom policy from file-based calendars and keyword arguments.
Args:
graph: The contact network graph object.
model: The epidemic model instance.
layer_changes_filename (str, optional): Layer-weight
scenario calendar file.
param_changes_filename (str, optional): Temporarily
disabled model-parameter changes file.
policy_calendar_filename (str, optional): Sub-policy
lifecycle calendar file.
beta_factor_filename (str, optional): Beta-factor calendar
file.
face_masks_filename (str, optional): Face-mask compliance
calendar file.
theta_filename (str, optional): ``theta_Is`` multiplier
calendar file.
test_rate_filename (str, optional): Test-rate multiplier
calendar file.
superspreader_date (int or str, optional): Day to activate
superspreader layer.
superspreader_layer (int or str, optional): Superspreader
layer index.
force_infect (int or str, optional): Day for forced
infection.
force_infect_layer (int or str, optional): Layer for
selecting the force-infected node.
init_filename (str, optional): Initial exposure calendar
file.
reduction_coef1 (float or str): Primary beta reduction
coefficient (required).
reduction_coef2 (float or str): Secondary beta reduction
coefficient (required).
new_beta (str, optional): ``"Yes"`` to use new beta
calculation.
daily_import (float or str, optional): Daily import
probability.
**kwargs: Sub-policy configuration keyword arguments.
"""
super().__init__(graph, model)
if layer_changes_filename is not None:
self.layer_changes_calendar = load_scenario_dict(
layer_changes_filename)
else:
self.layer_changes_calendar = None
if policy_calendar_filename is not None:
with open(policy_calendar_filename, "r") as f:
self.policy_calendar = {
int(k): v
for k, v in json.load(f).items()
}
else:
self.policy_calendar = None
if param_changes_filename is not None:
raise ValueError("Temporarily disabled. Sry.")
with open(param_changes_filename, "r") as f:
self.param_changes_calendar = json.load(f)
else:
self.param_changes_calendar = None
self.face_masks_calendar = _load_dictionary(face_masks_filename)
if self.face_masks_calendar is None:
logging.warning("DBG: NO MASKS ")
self.beta_factor_calendar = _load_dictionary(beta_factor_filename)
self.theta_calendar = _load_dictionary(theta_filename)
if self.theta_calendar is None:
logging.warning("DBG: Theta calendar")
self.test_rate_calendar = _load_dictionary(test_rate_filename)
self.policies = {}
if superspreader_date is not None:
self.superspreader_date = int(superspreader_date)
else:
self.superspreader_date = None
if superspreader_layer is not None:
self.superspreader_layer = int(superspreader_layer)
else:
self.superspreader_layer = 31
if force_infect is not None:
self.force_infect = int(force_infect)
assert self.force_infect
else:
self.force_infect = None
if force_infect_layer is not None:
self.force_infect_layer = int(force_infect_layer)
else:
self.force_infect_layer = None
if init_filename is not None:
with open(init_filename, "r") as f:
self.init_calendar = {
int(k): v
for k, v in json.load(f).items()
}
else:
self.init_calendar = None
if reduction_coef1 is not None:
self.reduction_coef1 = float(reduction_coef1)
else:
self.reduction_coef1 = 0.9
raise ValueError("Missing coef1")
if reduction_coef2 is not None:
self.reduction_coef2 = float(reduction_coef2)
else:
self.reduction_coef2 = 0.2
raise ValueError("Missing coef2")
self.mutation_coef = 1.0
self.new_beta = False if new_beta is None else new_beta == "Yes"
if self.new_beta:
logging.debug("Using new beta")
assert self.beta_factor_calendar is not None
assert self.face_masks_calendar is not None
assert self.beta_factor_calendar.keys() == self.face_masks_calendar.keys()
self.nodes_infected = None
if daily_import is not None:
self.daily_import = float(daily_import)
else:
self.daily_import = daily_import
if "sub_policies" in kwargs:
if type(kwargs["sub_policies"]) is list:
subpolicies = kwargs["sub_policies"]
else:
subpolicies = [kwargs["sub_policies"]]
for sub_policy_name in subpolicies:
filename = kwargs[f"{sub_policy_name}_filename"]
class_name = kwargs[f"{sub_policy_name}_name"]
config = kwargs.get(f"{sub_policy_name}_config", None)
policy = f"{filename}:{class_name}" if config is None else f"{filename}:{class_name}:{config}"
self.policies[policy] = self.create_policy(
filename, class_name, config)
[docs]
def create_policy(self, filename, object_name, config_file=None):
"""Dynamically import and instantiate a policy class by module and name.
Args:
filename (str): Module name within the ``policies`` package
(e.g. ``"contact_tracing"``).
object_name (str): Class name within that module.
config_file (str, optional): Path to a configuration file
to pass to the policy constructor.
Returns:
Policy: An instantiated policy object.
"""
PolicyClass = getattr(
__import__(
"policies."+filename,
globals(), locals(),
[object_name],
0
),
object_name
)
if config_file:
return PolicyClass(self.graph, self.model, config_file=config_file)
else:
return PolicyClass(self.graph, self.model)
[docs]
def update_layers(self, coefs):
"""Update graph layer weights to the provided coefficient values.
Args:
coefs: Iterable of new layer-weight values passed directly
to ``graph.set_layer_weights``.
"""
self.graph.set_layer_weights(coefs)
[docs]
def switch_on_superspread(self):
"""Activate the superspreader layer by setting its weight to 1.0."""
logging.info("DBG Superspreader ON")
self.graph.layer_weights[self.superspreader_layer] = 1.0
[docs]
def switch_off_superspread(self):
"""Deactivate the superspreader layer by setting its weight to 0.0."""
logging.info("DBG Superspreader OFF")
self.graph.layer_weights[self.superspreader_layer] = 0.0
[docs]
def update_beta(self, masks):
"""Update model beta values based on face-mask compliance (legacy method).
Scales non-family beta by ``(1 - reduction_coef1 * masks)`` and
family beta by a secondary factor derived from the non-family
reduction.
Args:
masks (float): Current face-mask compliance level in [0, 1].
"""
orig_beta = self.model.init_kwargs["beta"]
orig_beta_A = self.model.init_kwargs["beta_A"]
reduction = (1 - self.reduction_coef1 * masks)
new_value = orig_beta * reduction
new_value_A = orig_beta_A * reduction
logging.debug(f"{self.model.T} DBG BETA {new_value}")
self.model.beta.fill(new_value)
self.model.beta_A.fill(new_value_A)
orig_beta_in_family = self.model.init_kwargs["beta_in_family"]
orig_beta_A_in_family = self.model.init_kwargs["beta_A_in_family"]
reduction = 1 - self.reduction_coef2 * (1-reduction)
new_value = orig_beta_in_family * reduction
new_value_A = orig_beta_A_in_family * reduction
self.model.beta_in_family.fill(new_value)
self.model.beta_A_in_family.fill(new_value_A)
logging.debug(f"DBG beta: {self.model.beta[0][0]} {self.model.beta_in_family[0][0]}")
[docs]
def update_beta2(self, masks, beta_factors=None):
"""Update model beta values using both face-mask compliance and beta factors.
Family beta is reduced by ``(1 - reduction_coef1 * beta_factors)``.
Non-family beta is then derived as
``(1 - reduction_coef2 * masks) * family_beta``.
Args:
masks (float): Current face-mask compliance in [0, 1].
beta_factors (float): Additional beta scaling factor.
Raises:
AssertionError: If ``beta_factors`` is ``None``.
"""
assert beta_factors is not None
orig_beta_in_family = self.model.init_kwargs["beta_in_family"]
orig_beta_A_in_family = self.model.init_kwargs["beta_A_in_family"]
reduction = (1 - self.reduction_coef1*beta_factors)
new_value = self.mutation_coef * orig_beta_in_family * reduction
new_value_A = self.mutation_coef * orig_beta_A_in_family * reduction
self.model.beta_in_family.fill(new_value)
self.model.beta_A_in_family.fill(new_value_A)
#orig_beta= self.model.init_kwargs["beta"]
#orig_beta_A = self.model.init_kwargs["beta_A"]
# assumes betas are uniform
new_beta = (1-self.reduction_coef2*masks) * \
self.model.beta_in_family[0][0]
new_beta_A = (1-self.reduction_coef2*masks) * \
self.model.beta_A_in_family[0][0]
logging.debug(f"{self.model.T} DBG BETA {new_value}")
self.model.beta.fill(new_beta)
self.model.beta_A.fill(new_beta_A)
logging.debug(f"DBG beta: {self.model.beta[0][0]} {self.model.beta_A[0][0]}")
[docs]
def beta_increase(self):
"""Increase all beta values by a factor of 1.5 (mutation simulation).
Sets ``mutation_coef`` to 1.5 and scales ``beta``, ``beta_A``,
``beta_A_in_family``, and ``beta_in_family`` accordingly.
"""
self.mutation_coef = 1.5
orig_value = self.model.beta[0][0]
self.model.beta.fill(self.mutation_coef*orig_value)
orig_value = self.model.beta_A[0][0]
self.model.beta_A.fill(self.mutation_coef*orig_value)
orig_value = self.model.beta_A_in_family[0][0]
self.model.beta_A_in_family.fill(self.mutation_coef*orig_value)
orig_value = self.model.beta_in_family[0][0]
self.model.beta_in_family.fill(self.mutation_coef*orig_value)
[docs]
def update_test_rate(self, coef):
"""Scale the testing rate (theta_Is) by a given coefficient.
Args:
coef (float): Multiplier applied to the original test rate
from ``model.init_kwargs``.
"""
orig_test_rate = self.model.init_kwargs["test_rate"]
new_value = coef * orig_test_rate
#self.model.test_rate = new_value
self.model.theta_Is.fill(new_value)
[docs]
def update_theta(self, coef):
"""Scale the symptomatic testing probability (theta_Is) by a coefficient.
Args:
coef (float): Multiplier applied to the original
``theta_Is`` value from ``model.init_kwargs``.
"""
orig_theta = self.model.init_kwargs["theta_Is"]
new_value = orig_theta * coef
self.model.theta_Is.fill(new_value)
# if isinstance(new_value, (list)):
# np_new_value = np.array(new_value).reshape(
# (self.model.num_nodes, 1))
# else:
# np_new_value = np.full(
# fill_value=new_value, shape=(self.model.num_nodes, 1))
# setattr(self.model, "theta_Is", np_new_value)
logging.debug(f"DBG theta: {self.model.theta_Is[0][0]}")
[docs]
def run(self):
"""Execute one time-step of the custom policy.
Processes all registered calendars in order:
initial exposures, daily import, policy-calendar events,
parameter changes, layer updates, forced infections,
superspreader toggling, face-mask updates, theta updates,
test-rate updates, and finally runs all active sub-policies.
"""
if False and self.graph.is_quarantined is not None:
# dbg check
all_deposited = np.zeros(self.graph.number_of_nodes)
for p in self.policies.values():
all_deposited = all_deposited + p.depo.depo
if isinstance(p, EvaQuarantinePolicy) or isinstance(p, ContactTracingPolicy):
all_deposited += p.waiting_room_second_test.depo
assert np.sum(all_deposited > 0) == np.sum(self.graph.is_quarantined > 0), f"{all_deposited.nonzero()[0]} \n {self.graph.is_quarantined.nonzero()[0]}"
# print(all_deposited.nonzero()[0],
# self.graph.is_quarantined.nonzero()[0])
assert np.all(
all_deposited.nonzero()[0] == self.graph.is_quarantined.nonzero()[0]), f"{all_deposited.nonzero()[0]} \n {self.graph.is_quarantined.nonzero()[0]}"
logging.info(f"CustomPolicy {int(self.model.T)}")
today = int(self.model.T)
if self.init_calendar is not None and today in self.init_calendar:
num = self.init_calendar[today]
self.model.move_to_E(num)
if self.daily_import is not None:
assert self.daily_import <= 1.0, "not implemented yet"
if np.random.rand() < self.daily_import:
self.model.move_to_E(1)
if self.policy_calendar is not None and today in self.policy_calendar:
logging.info("changing quarantine policy")
# change the quaratine policy function
for action, policy in self.policy_calendar[today]:
if action == "start":
vals = policy.strip().split(":")
filename, object_name = vals[0], vals[1]
config_file = None if len(vals) == 2 else vals[2]
PolicyClass = getattr(__import__(filename), object_name)
if config_file is None:
self.policies[policy] = PolicyClass(
self.graph, self.model)
else:
self.policies[policy] = PolicyClass(
self.graph, self.model, config_file=config_file)
#params = [ float(param) for param in vals[2:] ]
#self.policies[policy] = PolicyClass(self.graph, self.model, *params)
elif action == "stop":
self.policies[policy].stop()
else:
raise ValueError(f"Unknown action {action}")
if self.param_changes_calendar is not None and today in self.param_changes_calendar:
for action, param, new_value in self.param_changes_calendar[today]:
if action == "set":
if isinstance(new_value, (list)):
np_new_value = np.array(new_value).reshape(
(self.model.num_nodes, 1))
else:
np_new_value = np.full(
fill_value=new_value, shape=(self.model.num_nodes, 1))
setattr(self.model, param, np_new_value)
elif action == "*":
attr = getattr(self.model, param)
if type(new_value) == str:
new_value = getattr(self.model, new_value)
setattr(self.model, param, attr * new_value)
else:
raise ValueError("Unknown value")
if self.layer_changes_calendar is not None and today in self.layer_changes_calendar:
logging.debug(f"{today} updating layers")
self.update_layers(self.layer_changes_calendar[today])
if self.force_infect is not None and self.model.T == self.force_infect:
# number_to_infect = 5 if self.force_infect_layer in (33,34,35) else 10
number_to_infect = 1
nodes_on_layer = self.graph.get_nodes(self.force_infect_layer)
nodes_to_infect = np.random.choice(
nodes_on_layer, number_to_infect, replace=False)
self.model.force_infect(nodes_to_infect)
self.nodes_infected = nodes_to_infect
# self.model.test_rate[self.nodes_infected] = 1.0
# self.model.theta_Is[self.nodes_infected] = 0.0
# self.model.testable[self.nodes_infected] = True
# if self.nodes_infected is not None:
# if self.model.t == self.force_infect + 6:
# self.model.theta_Is[self.nodes_infected] = 1.0
# self.nodes_infected = None
if self.superspreader_date is not None:
if self.model.T == self.superspreader_date:
self.switch_on_superspread()
elif self.model.T - 1 == self.superspreader_date:
self.switch_off_superspread()
if self.face_masks_calendar is not None and today in self.face_masks_calendar:
logging.debug(f"DBG face masks update", self.face_masks_calendar)
if self.new_beta:
self.update_beta2(
self.face_masks_calendar[today], self.beta_factor_calendar[today])
else:
raise ValueError("Temporarily disabled.")
# if today == 335:
# self.beta_increase()
if self.theta_calendar is not None and today in self.theta_calendar:
logging.debug(f"DBG theta update")
self.update_theta(self.theta_calendar[today])
if self.test_rate_calendar is not None and today in self.test_rate_calendar:
logging.debug(f"DBG test rate update")
self.update_test_rate(self.test_rate_calendar[today])
# perform registred policies
for name, policy in self.policies.items():
logging.info(f"run policy { name}")
policy.run()
[docs]
def to_df(self):
"""Merge and return DataFrames from all active sub-policies.
Returns:
pandas.DataFrame or None: Outer-merged DataFrame indexed by
``T`` combining statistics from all sub-policies, or
``None`` if there are no sub-policies or none produce data.
"""
if not self.policies:
return None
dfs = [
p.to_df()
for p in self.policies.values()
]
dfs = [d for d in dfs if d is not None]
if not dfs:
return None
my_merge = partial(pd.merge, on="T", how="outer")
return reduce(my_merge, dfs)