Source code for ivy.backend
# Copyright (C) 2013 ETH Zurich, Institute for Astronomy
'''
Created on Mar 18, 2014
author: jakeret
'''
from __future__ import print_function, division, absolute_import, unicode_literals
from multiprocessing import Pool
import time
from ivy.context import getContextProvider
from ivy.utils.timing import SimpleTiming
from ivy.utils.timing import TimingCollection
[docs]class SimpleMapPlugin(object):
def __init__(self, ctx):
self.ctx = ctx
[docs] def getWorkload(self):
return [self.ctx]
[docs]class SequentialBackend(object):
"""
Simple implementation of a backend executing the plugins in a sequential order
"""
def __init__(self, ctx):
self.ctx = ctx
[docs] def run(self, loop, mapPlugin=None):
if mapPlugin is None: mapPlugin=SimpleMapPlugin(self.ctx)
return list(map(LoopWrapper(loop), mapPlugin.getWorkload()))
[docs]class MultiprocessingBackend(object):
"""
Backend based on Python's multiprocessing.
Will instantiate a multiprocessing pool with ``ctx.params.cpu_count`` processes.
"""
def __init__(self, ctx):
self.ctx = ctx
[docs] def run(self, loop, mapPlugin):
pool = Pool(self.ctx.params.cpu_count)
try:
ctxList = pool.map(LoopWrapper(loop, True), mapPlugin.getWorkload())
timingCollection = TimingCollection(str(loop))
for ctx in ctxList:
for timing in ctx.timings:
timingCollection.addTiming(timing)
self.ctx.timings.append(timingCollection)
return ctxList
finally:
pool.close()
[docs]class IpClusterBackend(object):
"""
Backend based on IPython cluster.
Will distribute the workload among the available engines.
"""
def __init__(self, ctx):
self.ctx = ctx
[docs] def run(self, loop, mapPlugin):
from IPython import parallel
client = parallel.Client()
view = client.load_balanced_view()
try:
return view.map_sync(LoopWrapper(loop), mapPlugin.getWorkload())
finally:
pass
# view.close()
[docs]class JoblibBackend(object):
"""
Backend based on the joblib package
Will instantiate a multiprocessing pool with ``ctx.params.cpu_count`` processes.
"""
def __init__(self, ctx):
self.ctx = ctx
[docs] def run(self, loop, mapPlugin):
import joblib
with joblib.Parallel(n_jobs=self.ctx.params.cpu_count) as parallel:
ctxList = parallel(joblib.delayed(LoopWrapper(loop, True), False)(ctx) for ctx in mapPlugin.getWorkload())
timingCollection = TimingCollection(str(loop))
for ctx in ctxList:
for timing in ctx.timings:
timingCollection.addTiming(timing)
self.ctx.timings.append(timingCollection)
return ctxList
[docs]class LoopWrapper(object):
"""
Callable wrapper for the loop execution
"""
def __init__(self, loop, parallel=False):
self.loop = loop
self.parallel = parallel
def __call__(self, ctx):
if self.parallel: ctx.timings = []
self.loop.ctx = ctx
for plugin in self.loop:
print(plugin)
start = time.time()
plugin()
ctx.timings.append(SimpleTiming(str(plugin), time.time() - start))
getContextProvider().storeContext()
self.loop.reset()
return ctx
BACKEND_NAME_MAP = {"sequential": SequentialBackend,
"multiprocessing": MultiprocessingBackend,
"ipcluster": IpClusterBackend,
"joblib": JoblibBackend,
}
[docs]def create(ctx, force=None):
'''
Simple factory instantiating backends for the given name in ``ctx.params.backend``
'''
backend_name = ctx.params.backend if force is None else force
return BACKEND_NAME_MAP[backend_name](ctx)