from numpy import array,dot,fft
import random
from math import tanh,exp
class td_agent:
"""
linear weighted SARSA agent implementation with adjustable learning parameters
"""
def __init__(self,data,meta_a=[0.,0.,0.,0.1],meta_b=[0.,0.,0.,6.],meta_g=[0.,0.,0.,.9],meta_r=[0.,0.,-1.,0.],meta_l=[0.,0.,0.,.3]):
"""
Data is an int pair, (#inputs,#outputs).
This agent uses descriptions for a cubic polynomial function for each metaparameter.
"""
#store metaparameters
self.meta_alpha = meta_a
self.meta_beta = meta_b
self.meta_gamma = meta_g
self.meta_rav = meta_r
self.meta_lambda = meta_l
#store average reward
self._rav = 0
#create agent
inputs,outputs = data
self._inputs = inputs
self._outputs = outputs
self._current = array([0]*(inputs+1))
self._current_old = self._current
self._states = [array([random.random()*0.1 for a in xrange(inputs+1)]) for b in xrange(outputs)]
self._action = 0
self._action_old = 0
self._out = array([0.]*outputs)
self._elig_trace = [array([0. for a in xrange(inputs+1)]) for b in xrange(outputs)]
def approx(self,x,params):
"""
Approximates a cubic function
"""
return x*x*x*params[0]+x*x*params[1]+x*params[2]+params[3]
def update(self,r,rav):
"""
Learning loop to update the agent's value function
"""
#learning metaparameters
gamma = self.approx(rav,self.meta_gamma)
alpha = self.approx(rav,self.meta_alpha)
rav_err = self.approx(rav,self.meta_rav)
lamda = self.approx(rav,self.meta_lambda)
#error (delta) estimate
self._error = (r + rav_err + gamma*max(self._out) - self._out_old[self._action_old])
#update trace
self._elig_trace[self._action_old] += alpha*self._current_old*self._out_old[self._action_old]*(1.-self._out_old[self._action_old])
#nonlinear weight credit assignments
#adjust weights
for i in xrange(len(self._states)):
self._states[i] += self._error*self._elig_trace[i]
#lambda attenuation
self._elig_trace[i] *= lamda*gamma
def run(self,input,r):
"""
run one iteration.
"""
if self._rav is None:
self._rav = r
beta = self.approx(self._rav,self.meta_beta)
self._current_old = self._current
self._current = array([1.] + list(input))
self._action_old = self._action
#get boltzman probability for actions
s = [exp(-abs(dot(self._states[i],self._current)*beta)) for i in xrange(len(self._states))]
ssum = sum(s)+.0000000000001
so = [exp(-abs(dot(self._states[i],self._current))) for i in xrange(len(self._states))]
s = [i/ssum for i in s]
#calculate the action to take
psum = random.random()
c = 0
while c < len(s)-1 and psum > s[c]:
psum -= s[c]
c += 1
#sigmoid normalised weightings
s = so
#store action and values
self._action = c
self._out_old = self._out
self._out = array(s)
#handle update
self._rav = self._rav*.99+r*.01
self.update(r,self._rav)
return c
def reset(self):
"""
reset the agent to starting conditions
"""
self._current = array([0]*self._inputs)
self._action = 0
self._action_old = 0
self._rav = None
self._out = array([0.]*self._outputs)
self._elig_trace = [array([0. for a in xrange(self._inputs+1)]) for b in xrange(self._outputs)]
|