'''
Created on Sep 30, 2013
@author: J. Akeret
'''
from __future__ import print_function, division, absolute_import, unicode_literals
from copy import copy
from math import floor
import math
import multiprocessing
import numpy
[docs]class ParticleSwarmOptimizer(object):
'''
Optimizer using a swarm of particles
:param func:
A function that takes a vector in the parameter space as input and
returns the natural logarithm of the posterior probability for that
position.
:param low: array of the lower bound of the parameter space
:param high: array of the upper bound of the parameter space
:param particleCount: the number of particles to use.
:param threads: (optional)
The number of threads to use for parallelization. If ``threads == 1``,
then the ``multiprocessing`` module is not used but if
``threads > 1``, then a ``Pool`` object is created and calls to
``lnpostfn`` are run in parallel.
:param pool: (optional)
An alternative method of using the parallelized algorithm. If
provided, the value of ``threads`` is ignored and the
object provided by ``pool`` is used for all parallelization. It
can be any object with a ``map`` method that follows the same
calling sequence as the built-in ``map`` function.
'''
def __init__(self, func, low, high, particleCount=25, threads=1, pool=None):
'''
Constructor
'''
self.func = func
self.low = low
self.high = high
self.particleCount = particleCount
self.threads = threads
self.pool = pool
if self.threads > 1 and self.pool is None:
self.pool = multiprocessing.Pool(self.threads)
self.paramCount = len(self.low)
self.swarm = self._initSwarm()
self.gbest = Particle.create(self.paramCount)
def _initSwarm(self):
swarm = []
for _ in range(self.particleCount):
swarm.append(Particle(numpy.random.uniform(self.low, self.high, size=self.paramCount), numpy.zeros(self.paramCount)))
return swarm
[docs] def sample(self, maxIter=1000, c1=1.193, c2=1.193, p=0.7, m=10**-3, n=10**-2):
"""
Launches the PSO. Yields the complete swarm per iteration
:param maxIter: maximum iterations
:param c1: cognitive weight
:param c2: social weight
:param p: stop criterion, percentage of particles to use
:param m: stop criterion, difference between mean fitness and global best
:param n: stop criterion, difference between norm of the particle vector and norm of the global best
"""
self._get_fitness(self.swarm)
i = 0
while True:
for particle in self.swarm:
if ((self.gbest.fitness)<particle.fitness):
self.gbest = particle.copy()
#if(self.isMaster()):
#print("new global best found %i %s"%(i, self.gbest.__str__()))
if (particle.fitness > particle.pbest.fitness):
particle.updatePBest()
if(i>=maxIter):
print("max iteration reached! stoping")
return
if(self._converged(i, p=p,m=m, n=n)):
if(self.isMaster()):
print("converged after %s iterations!"%i)
print("best fit found: ", self.gbest.fitness, self.gbest.position)
return
for particle in self.swarm:
w = 0.5 + numpy.random.uniform(0,1,size=self.paramCount)/2
#w=0.72
part_vel = w * particle.velocity
cog_vel = c1 * numpy.random.uniform(0,1,size=self.paramCount) * (particle.pbest.position - particle.position)
soc_vel = c2 * numpy.random.uniform(0,1,size=self.paramCount) * (self.gbest.position - particle.position)
particle.velocity = part_vel + cog_vel + soc_vel
particle.position = particle.position + particle.velocity
self._get_fitness(self.swarm)
swarm = []
for particle in self.swarm:
swarm.append(particle.copy())
yield swarm
i+=1
[docs] def optimize(self, maxIter=1000, c1=1.193, c2=1.193, p=0.7, m=10**-3, n=10**-2):
"""
Runs the complete optimiziation.
:param maxIter: maximum iterations
:param c1: cognitive weight
:param c2: social weight
:param p: stop criterion, percentage of particles to use
:param m: stop criterion, difference between mean fitness and global best
:param n: stop criterion, difference between norm of the particle vector and norm of the global best
:return swarms, gBests: the swarms and the global bests of all iterations
"""
swarms = []
gBests = []
for swarm in self.sample(maxIter,c1,c2,p,m,n):
swarms.append(swarm)
gBests.append(self.gbest.copy())
return swarms, gBests
def _get_fitness(self,swarm):
# If the `pool` property of the pso has been set (i.e. we want
# to use `multiprocessing`), use the `pool`'s map method. Otherwise,
# just use the built-in `map` function.
if self.pool is not None:
mapFunction = self.pool.map
else:
mapFunction = map
pos = numpy.array([part.position for part in swarm])
results = mapFunction(self.func, pos)
lnprob = numpy.array([l[0] for l in results])
for i, particle in enumerate(swarm):
particle.fitness = lnprob[i]
def _converged(self, it, p, m, n):
fit = self._convergedFit(it=it, p=p, m=m)
if fit:
space = self._convergedSpace(it=it, p=p, m=n)
return space
else:
return False
def _convergedFit(self, it, p, m):
bestSort = numpy.sort([particle.pbest.fitness for particle in self.swarm])[::-1]
meanFit = numpy.mean(bestSort[1:int(math.floor(self.particleCount*p))])
# print( "best %f, meanFit %f, ration %f"%( self.gbest[0], meanFit, abs((self.gbest[0]-meanFit))))
return abs(self.gbest.fitness-meanFit) < m
def _convergedSpace(self, it, p, m):
sortedSwarm = [particle for particle in self.swarm]
sortedSwarm.sort(key=lambda part: -part.fitness)
bestOfBest = sortedSwarm[0:int(floor(self.particleCount*p))]
diffs = []
for particle in bestOfBest:
diffs.append(self.gbest.position-particle.position)
maxNorm = max(list(map(numpy.linalg.norm, diffs)))
return (abs(maxNorm)<m)
def _convergedSpace2(self, p):
#Andres N. Ruiz et al.
sortedSwarm = [particle for particle in self.swarm]
sortedSwarm.sort(key=lambda part: -part.fitness)
bestOfBest = sortedSwarm[0:int(floor(self.particleCount*p))]
positions = [particle.position for particle in bestOfBest]
means = numpy.mean(positions, axis=0)
delta = numpy.mean((means-self.gbest.position)/self.gbest.position)
return numpy.log10(delta) < -3.0
[docs] def isMaster(self):
return True
[docs]class Particle(object):
"""
Implementation of a single particle
:param position: the position of the particle in the parameter space
:param velocity: the velocity of the particle
:param fitness: the current fitness of the particle
"""
def __init__(self, position, velocity, fitness = 0):
self.position = position
self.velocity = velocity
self.fitness = fitness
self.paramCount = len(self.position)
self.pbest = self
[docs] @classmethod
def create(cls, paramCount):
"""
Creates a new particle without position, velocity and -inf as fitness
"""
return Particle(numpy.array([[]]*paramCount),
numpy.array([[]]*paramCount),
-numpy.Inf)
[docs] def updatePBest(self):
"""
Sets the current particle representation as personal best
"""
self.pbest = self.copy()
[docs] def copy(self):
"""
Creates a copy of itself
"""
return Particle(copy(self.position),
copy(self.velocity),
self.fitness)
def __str__(self):
return "%f, pos: %s velo: %s"%(self.fitness, self.position, self.velocity)
def __unicode__(self):
return self.__str__()