Source code for pool
"""Multiprocessing worker pool for parallel model evaluation in the MAIS simulation.
This module provides a simple process-pool abstraction built on top of
:mod:`multiprocessing`. Each worker process runs an evaluation function in an
infinite loop, consuming queries from a shared input queue and posting results
to a shared output queue. The main process enqueues tasks and dequeues results
through the :class:`Pool` interface.
"""
from multiprocessing import Process, Queue
[docs]
def worker(name, evalfunc, querries, answers, model):
"""Entry point for a pool worker process.
Runs an infinite loop: retrieves one query at a time from the ``querries``
queue, evaluates ``evalfunc(model, query)``, and places the result onto
the ``answers`` queue. The loop continues until the process is terminated
externally (e.g., via :meth:`Pool.close`).
Args:
name (int): Numeric identifier for this worker (used for logging or
debugging).
evalfunc (callable): Function to evaluate. Called as
``evalfunc(model, query)`` and must return a serialisable result.
querries (multiprocessing.Queue): Input queue from which queries are
consumed.
answers (multiprocessing.Queue): Output queue to which computed
answers are posted.
model: Model object passed as the first argument to ``evalfunc``.
Each worker receives its own model instance.
"""
while True:
querry = querries.get()
answer = evalfunc(model, querry)
answers.put(answer)
[docs]
class Pool:
"""Process pool that parallelises model evaluation across multiple workers.
Workers consume tasks from a shared queries queue and post results to a
shared answers queue. The pool is non-blocking from the caller's
perspective: tasks are submitted via :meth:`putQuerry` and results are
retrieved (blocking) via :meth:`getAnswer`.
Args:
processors (int): Number of worker processes to spawn.
evalfunc (callable): Evaluation function passed to each worker.
Signature: ``evalfunc(model, query) -> answer``.
models (list): List of model instances, one per worker. Element ``i``
is passed to worker ``i``.
"""
def __init__(self, processors, evalfunc, models):
self.querries = Queue()
self.answers = Queue()
self.workers = []
for i in range(processors):
worker_i = Process(target=worker, args=(i, evalfunc, self.querries, self.answers, models[i]))
self.workers.append(worker_i)
worker_i.start() # Launch worker() as a separate python process
[docs]
def putQuerry(self, querry):
"""Enqueue a query for processing by one of the worker processes.
Args:
querry: The query object to evaluate. Must be picklable.
"""
self.querries.put(querry)
[docs]
def getAnswer(self):
"""Block until an answer is available and return it.
Returns:
The result produced by ``evalfunc`` for the oldest unread query.
"""
return self.answers.get()
[docs]
def close(self):
"""Terminate all worker processes and shut down the pool.
Sends a ``SIGTERM`` signal to each worker process. After calling this
method the pool should not be used further.
"""
for w in self.workers:
w.terminate()
#print("pool killed")