注意将代码和下面公式推导结合起来。还要注意一下q_target和q_predict之间的关系。其实算法的更新是需要使用q_predict来逼近q_target,当两者相等时,算法将停止更新,当传统的qlearning转化为deep Qlearning,也是这样操作的,只是深度qlearning使用一个神经网络来表示q表。

这篇文章将要介绍传统的qlearning算法,使用的是迭代的方法更新q表,更新q表的方法类似于向前推进,而不是使用梯度下降方法,因为这里介绍的不是Deep QLearning方法。

一、算法介绍以及推导

:这里更新的不是agent,而是一个q表,q表里面记录的是agent在某个状态采取某个的动作的好坏,q表可以起到间接决定agent采取什么决策。q表就类似一个critic,一个评论家,来指导agent。

1.1、 Q ( s , a ) Q(s,a) Q(s,a)是什么?

Q ( s , a ) Q(s,a) Q(s,a)是状态动作价值函数,是在状态 s s s时采取动作 a a a之后,可以获得的奖励的期望值。 Q ( s , a ) Q(s,a) Q(s,a)越大表示在agent在看到状态 s s s是采取动作 a a a比较好。

1.2、q表

q表a1a2a3
s1q(s1,a1)q(s1,a2)q(s1,a3)
s2q(s2,a1)q(s2,a2)q(s2,a3)
s3q(s3,a1)q(s3,a2)q(s3,a3)
s4q(s4,a1)q(s4,a2)q(s4,a3)

q表里面记录的都是状态动作价值函数,前面说到q表可以间接决定agent采取什么样的决策,就是因为q表记录了所有的状态和动作的组合情况,比如agent看到状态 s 2 s_{2} s2时,就会在状态 s 2 s_{2} s2所在的行选取最大的q值所对应的动作。

1.3、如何根据q表进行决策?

假设下表是我们已经更新完成了的q表

q表a1a2a3
s1- 113
s2201
s3157
s4563

Q表指导agent决策的过程:t=1时,agent观测到环境的状态s2,于是查找状态s2所在的行,发现Q(s2,a1)>Q(s2,a3)>Q(s2,a2),因此选择动a1,此时环境发生变化,agent观测到环境的状态s4,接着查找状态s4所在的行,agent发现q(s4,a2)>q(s4,a1)>q(s4,a3),于是agent采取决策选择动作a2,一直进行下去,直到结束。

1.4、 ε − g r e e d y \varepsilon -greedy εgreedy选择动作

有上面的决策过程我们可以看到,在给定一个状态s时,会选择q值最大的动作,这样会导致一个问题:因为是随机初始化的,由于选择最大值,可能会使得一些动作无法被选择到,也就是无法更新q值,这样q值就一直是随机初始化的那个值。

ε − g r e e d y \varepsilon -greedy εgreedy选择动作的流程如下:agent观测到状态s时,采取动作时会以 1 − ε 1-\varepsilon 1ε的概率在Q表里面选择q值最大所对应的动作,以 ε \varepsilon ε的概率随机选择动作。
不再使用完全贪婪的算法,而是有一定的动作选择的完全随机性,这样就可以保证在迭代次数足够多的情况下Q表中的所有动作都会被更新到。

1.5、如何更新Q表?

算法开始的时候,我们需要随机初始化q表,那么如何更新q表就是一件非常关键的事情。
在这里插入图片描述
参数解释:
更新公式为 Q ( s , a ) n e w = Q ( s , a ) o l d + α [ r + γ m a x a ′ Q ( s ′ , a ′ ) − Q ( s , a ) o l d ] (1) Q(s,a) ^{new} = Q(s,a) ^{old}+\alpha [r+\gamma max_{{a}'}Q({s}',{a}')-Q(s,a)^{old}]\tag{1} Q(s,a)new=Q(s,a)old+α[r+γmaxaQ(s,a)Q(s,a)old](1)

  1. α \alpha α指学习率,其实也是一个权值,我们将公式1进行改写得到如下的公式 Q ( s , a ) n e w = ( 1 − α ) Q ( s , a ) o l d + α [ r + γ m a x a ′ Q ( s ′ , a ′ ) ] (2) Q(s,a) ^{new} = (1-\alpha )Q(s,a) ^{old}+\alpha [r+\gamma max_{{a}'}Q({s}',{a}')]\tag{2} Q(s,a)new=(1α)Q(s,a)old+α[r+γmaxaQ(s,a)](2)
    我们从公式2可以看到, Q ( s , a ) n e w Q(s,a) ^{new} Q(s,a)new两部分的凸组合

  2. γ \gamma γ 是衰减值

首先我们随机初始化一个Q表,然后任意初始化一个状态 s s s,也可以理解为,agent观测到的环境的状态,根据Q表使用 ε − g r e e d y \varepsilon -greedy εgreedy算法选择状态 s s s对应的动作 a a a,因为agent做出了一个动作,会从环境中获得一个奖励 r r r,环境发生变化,agent又观测到一个新的状态 s ′ {s}' s,根据Q表在状态 s ′ {s}' s 所在的行,查询Q表,求得最大值,然后更新按照公式更新Q表。

Question 1:什么时候才会迭代收敛?
答:由公式(1)可以看到当 r + γ m a x a ′ Q ( s ′ , a ′ ) = = Q ( s , a ) o l d r+\gamma max_{{a}'}Q({s}',{a}')==Q(s,a)^{old} r+γmaxaQ(s,a)==Q(s,a)old时,迭代格式收敛,也就是Q表的更新完成。

1.6、TD方法(时间差分更新法)

假设 . . . , s t , a t , r t , s t + 1 , . . . ...,s_{t},a_{t},r_{t},s_{t+1},... ...,st,at,rt,st+1,...
我们要更新Q值使 r t 和 Q ( s t , a t ) o l d − γ m a x a ′ Q ( s t + 1 , a ′ ) r_{t}和Q(s_{t},a_{t})^{old}-\gamma max_{{a}'}Q(s_{t+1},{a}') rtQ(st,at)oldγmaxaQ(st+1,a)越接近越好,这里用了两个相邻时刻的状态价值函数

二、两个个小疑问

  1. 等eposide结束之后,进行反向更新
    从公式 Q ( s , a ) n e w = Q ( s , a ) o l d + α [ r + γ m a x a ′ Q ( s ′ , a ′ ) − Q ( s , a ) o l d ] Q(s,a) ^{new} = Q(s,a) ^{old}+\alpha [r+\gamma max_{{a}'}Q({s}',{a}')-Q(s,a)^{old}] Q(s,a)new=Q(s,a)old+α[r+γmaxaQ(s,a)Q(s,a)old] 可以看到, Q ( s , a ) Q(s,a) Q(s,a)的更新会使用到状态
    s ′ {s}' s的Q值,从而基于TD方法进行更新Q表。但是每次更新我们都利用了下一个时刻的Q值,比如 Q ( s ′ , a ′ ) Q({s}',{a}') Q(s,a),假设agent完了一个episode的游戏,得到 τ = { s 1 , a 1 , r 1 , s 2 , a 2 , r 2 , . . . , s T , a T , r T , E N D } \tau =\{s_{1},a_{1},r_{1},s_{2},a_{2},r_{2},...,s_{T},a_{T},r_{T},END\} τ={s1,a1,r1,s2,a2,r2,...,sT,aT,rT,END},假如我们先从后面的状态开始更新,这样前面的状态更新的时候就可以使用已经更新了的后面的状态,也就是反向更新
    不知道这样是否可行,这样有个问题就是:原始的方法每一时间步都可以更新,就是在episode进行的时候更新,换成上面我说的这样的更新方法,还要将状态,动作,价值先存储起来,要等到episode结束才能更新。

  2. agent观测一个状态,能不能选择做出多个动作回应这个状态?
    例子:太空入侵者游戏
    在这里插入图片描述
    在这个游戏中,agent可以做出动作有三种:开火(fire),向右移动(right),向左移动(left).
    假设上图为agent观测到的一个状态,但是这是agent同时选择了开火和向右移动的动作。感觉这样也是合理的,只要做出的动作不相互矛盾就可以。比如动作向右移动和向左移动就不能同时发生。

三、代码

import numpy as np
import pandas as pd
import time

np.random.seed(2)  # reproducible


N_STATES = 6   # the length of the 1 dimensional world
ACTIONS = ['left', 'right']     # available actions
EPSILON = 0.9   # greedy police
ALPHA = 0.1     # learning rate
GAMMA = 0.9    # discount factor
MAX_EPISODES = 13   # maximum episodes
FRESH_TIME = 0.3 # fresh time for one move


'''
所构造的q表的大小是:行数是财产的探索者总共所处的位置数,列数就是动作的数量
'''
def build_q_table(n_states, actions):
    table = pd.DataFrame(
        np.zeros((n_states, len(actions))),     # q_table initial values
        columns=actions,    # actions's name
    )
    # print(table)    # show table
    return table


def choose_action(state, q_table):
    # This is how to choose an action
    state_actions = q_table.iloc[state, :] #将现在agent观测到的状态所对应的q值取出来
    if (np.random.uniform() > EPSILON) or ((state_actions == 0).all()):  #当生撑随机数大于EPSILON或者状态state所对应的q值全部为0时,就随机选择状态state所对应的动作
        action_name = np.random.choice(ACTIONS)
    else:   # act greedy
        action_name = state_actions.idxmax()    # 选择状态state所对应的使q值最大的动作
    return action_name


def get_env_feedback(S, A):
    # 选择动作之后,还要根据现在的状态和动作获得下一个状态,并且返回奖励,这个奖励是环境给出的,用来评价当前动作的好坏。
    #这里设置的是,只有在获得宝藏是才给奖励,没有获得奖励时,无论是向左移动还是向右移动,给出的即时奖励都是0.
    if A == 'right':    # move right
        if S == N_STATES - 2:   # terminate
            S_ = 'terminal'
            R = 1
        else:
            S_ = S + 1
            R = 0
    else:   # move left
        R = 0
        if S == 0:
            S_ = S  # reach the wall
        else:
            S_ = S - 1
    return S_, R


def update_env(S, episode, step_counter):
    # 更新环境的函数,比如向右移动之后,o表示的agent就距离宝藏进了一步,将agent随处的位置实时打印出来
    env_list = ['-']*(N_STATES-1) + ['T']   # '---------T' our environment
    if S == 'terminal':
        interaction = 'Episode %s: total_steps = %s' % (episode+1, step_counter)
        print('\r{}'.format(interaction), end='')
        time.sleep(2)
        print('\r                                ', end='')
    else:
        env_list[S] = 'o'
        interaction = ''.join(env_list)
        print('\r{}'.format(interaction), end='')
        time.sleep(FRESH_TIME)


def rl():
    # 开始更新q表
    q_table = build_q_table(N_STATES, ACTIONS)#随机初始化一下q表
    for episode in range(MAX_EPISODES):
        step_counter = 0#记录走了多少步
        S = 0#每个episode开始时都将agent初始化在最开始的地方
        is_terminated = False
        update_env(S, episode, step_counter)#打印的就是o-----T
        while not is_terminated:#判断episode是否结束

            A = choose_action(S, q_table)#agent根据当前的状态选择动作
            S_, R = get_env_feedback(S, A)  # 上一步已经获得了s对应的动作a,接着我们要获得下一个时间步的状态
            q_predict = q_table.loc[S, A]
            if S_ != 'terminal':#要判断一下,下一个时间步的是不是已经取得宝藏了,如果不是,可以按照公式进行更新
                q_target = R + GAMMA * q_table.iloc[S_, :].max()   # next state is not terminal
            else:#如果已经得到了宝藏,得到的下一个状态不在q表中,q_target的计算也不同。
                q_target = R     # next state is terminal
                is_terminated = True    # terminate this episode

            q_table.loc[S, A] += ALPHA * (q_target - q_predict)  # update
            S = S_  # move to next state

            update_env(S, episode, step_counter+1)
            step_counter += 1
    return q_table


if __name__ == "__main__":
    q_table = rl()
    print('\r\nQ-table:\n')
    print(q_table)

可以参看下面的链接
莫烦python

Logo

学大模型,用大模型上飞桨星河社区!每天8点V100G算力免费领!免费领取ERNIE 4.0 100w Token >>>

更多推荐