Source code for ivy.plugin.parallel_plugin_collection
# Copyright (C) 2013 ETH Zurich, Institute for Astronomy
'''
Created on Mar 18, 2014
author: jakeret
'''
from __future__ import print_function, division, absolute_import
from ivy.plugin.base_plugin import BasePlugin
from ivy.loop import Loop
from ivy import context
from ivy.plugin.plugin_factory import PluginFactory
from ivy.exceptions.exceptions import InvalidAttributeException
from ivy import backend
try:
unicode
except NameError: # python 3
unicode = str
basestring = str
[docs]class ParallelPluginCollection(BasePlugin):
"""
Collection that allows for executing plugins in parallel by using
a MapReduce aprach. The implementation therefore requires a
list of plugins to execute, a map plugin creating the workload and
(optionally) a reduce plugin reducing the data from the parallel task exection
:param pluginList: List of plugins (or a Loop) which should be executed in parallel
:param mapPlugin:
:param reducePlugin: (optional)
:param ctx: (optional)
"""
def __init__(self, pluginList, mapPlugin, reducePlugin=None, ctx=None, parallel=True):
'''
Constructor
'''
if ctx is None:
ctx = context.ctx()
self.ctx = ctx
super(ParallelPluginCollection, self).__init__(self.ctx)
if not isinstance(pluginList, Loop):
pluginList = Loop(pluginList)
self.pluginList = pluginList
if mapPlugin is None:
raise InvalidAttributeException("No map plugin provided")
self.mapPlugin = mapPlugin
self.reducePlugin = reducePlugin
self.parallel = parallel
def __str__(self):
return "ParallelPluginCollection"
def __call__(self):
force = None
if not self.parallel:
force = "sequential"
backendImpl = backend.create(self.ctx, force)
mapPlugin = self.mapPlugin
if isinstance(self.mapPlugin, basestring):
mapPlugin = PluginFactory.createInstance(mapPlugin, self.ctx)
ctxList = backendImpl.run(self.pluginList, mapPlugin)
if self.reducePlugin is not None:
reducePlugin = self.reducePlugin
if isinstance(self.reducePlugin, basestring):
reducePlugin = PluginFactory.createInstance(reducePlugin, self.ctx)
reducePlugin.reduce(ctxList)