Source code for cea.utilities.parallel

"""
Standardizes multiprocessing use. In the CEA, some functions are run using the standard ``multiprocessing`` library.
They are run by ``map``ing the function to a list of arguments (see ``multiprocessing.Pool.map_async``) and waiting
for the processes to finish, while at the same time piping STDOUT, STDERR through
``cea.utilities.workerstream.QueueWorkerStream`` - this ensures that the dashboard interface can read the output from
the sub-processes.

The way this was done in CEA < v2.23 included boiler plate code that needed to be repeated every time multiprocessing
was used. Issue [#2344](https://github.com/architecture-building-systems/CityEnergyAnalyst/issues/2344) was a result of
not applying this technique to the demand script.

This module exports the function `map` which is intended to replace both ``map_async`` and the builtin ``map`` function
(which was used when ``config.multiprocessing == False``). This simplifies multiprocessing.
"""

import multiprocessing
import sys
import logging
from itertools import repeat
from cea.utilities.workerstream import stream_from_queue, QueueWorkerStream

__author__ = "Daren Thomas"
__copyright__ = "Copyright 2019, Architecture and Building Systems - ETH Zurich"
__credits__ = ["Daren Thomas"]
__license__ = "MIT"
__version__ = "0.1"
__maintainer__ = "Daren Thomas"
__email__ = "cea@arch.ethz.ch"
__status__ = "Production"


[docs]def vectorize(func, processes=1, on_complete=None): """ Similar to ``numpy.vectorize``, this function wraps ``func`` so that it operates on sequences (of same length) of inputs and outputs a sequence of results, similar to ``map(func, *args)``. The main point of using ``vectorize`` is to unify single-processing with multi-processing - if processes > 1, then multiprocessing is used and the function will be run on a pool of processes. STDOUT and STDERR of these processes are fed through a ``cea.workerstream.QueueWorkerStream`` so it can be shown in the dashboard job output. The parameter ``on_complete`` is an optional callable that is called for each completed call of ``func``. It takes 4 arguments: - i: the 0-based order in which this call was completed - n: the total number of function calls to be made - args: the arguments passed to this call to ``func`` - result: the return value of this call to ``func`` .. note: due to the way multiprocessing works, ``func`` and ``on_complete`` need to be module-level functions .. note: the if processes > 1, then the first argument to the vectorized ``func`` will be converted to a list before running. This should not have any side effects, but is necessary if the args are constructed with ``itertools.repeat``. :param func: The function to vectorize :param int processes: The number of processes to use (use ``config.get_number_of_processes()``) :param on_complete: An optional function to call for each completed call to ``func``. """ if processes > 1: return __multiprocess_wrapper(func, processes, on_complete) else: return single_process_wrapper(func, on_complete)
[docs]def __multiprocess_wrapper(func, processes, on_complete): """Create a worker pool to map the function, taking care to set up STDOUT and STDERR""" def wrapper(*args): print("Using {processes} CPU's".format(processes=processes)) pool = multiprocessing.Pool(processes) manager = multiprocessing.Manager() # a queue for STDOUT and STDERR output of sub-processes (see cea.utilities.workerstream.QueueWorkerStream) queue = manager.Queue() # make sure the first arg is a list (not a generator) since we need the length of the sequence args = [list(a) for a in args] n = len(args[0]) # the number of iterations to map # set up the list of i-values for on_complete i_queue = manager.Queue() for i in range(n): i_queue.put(i) args = [repeat(func, n), repeat(queue, n), repeat(on_complete, n), repeat(i_queue, n), repeat(n, n)] + args args = zip(*args) map_result = pool.map_async(__apply_func_with_worker_stream, args) while not map_result.ready(): stream_from_queue(queue) result = map_result.get() pool.close() pool.join() # process the rest of the queue while not queue.empty(): stream_from_queue(queue) return result return wrapper
[docs]def __apply_func_with_worker_stream(args): """ Call func, using ``queue`` to redirect stdout and stderr, with a tuple of args because multiprocessing.Pool.map only accepts one argument for the function. This function is called _inside_ a separate process. """ # set up logging logger = multiprocessing.log_to_stderr() logger.setLevel(logging.WARNING) from cea import suppress_3rd_party_debug_loggers suppress_3rd_party_debug_loggers() # unpack the arguments func, queue, on_complete, i_queue, n, args = args[0], args[1], args[2], args[3], args[4], args[5:] # set up printing to stderr and stdout to go through the queue sys.stdout = QueueWorkerStream('stdout', queue) sys.stderr = QueueWorkerStream('stderr', queue) # CALL result = func(*args) if on_complete: on_complete(i_queue.get(), n, args, result) return result
[docs]def single_process_wrapper(func, on_complete): """The simplest form of vectorization: Just loop""" def wrapper(*args): print("Using single process") args = [list(a) for a in args] n = len(args[0]) map_result = [] for i, instance_args in enumerate(zip(*args)): result = func(*instance_args) if on_complete: on_complete(i, n, instance_args, result) map_result.append(result) return map_result return wrapper
[docs]def test(a, b): print("test {a}+{b}".format(a=a, b=b)) return a + b
if __name__ == '__main__': print(vectorize(test, 4)(range(10, 20), range(20, 30)))