蒙特卡洛,每次访问网格世界,探索开始,python 代码在剧集生成中卡在永远循环中

人工智能 Python 蒙特卡罗方法
2021-11-16 18:16:34

我一直在尝试根据 Sutton&Barto 强化学习书对 Q(s,a) 函数实施策略改进。首次访问蒙特卡洛的原始算法如下图所示。

在此处输入图像描述

我记得之前的书提到,每次访问的变化只是省略了首次访问检查“除非对 blah,blah ...”,但否则算法应该是相同的(???)

第一次迭代(第一集)的初始策略应该是等概率随机游走。一般而言,如果行动将您带到网格世界 (4x4) 的边界之外,那么您只需弹回您开始的地方,但将给予奖励,并采取行动。

我已经验证,由于某种原因,我的代码(有时)在代码的早期(外循环中的迭代量很小)中卡在了永远循环中的代码的情节生成部分。尽管如此,我认为我很好地遵循了伪代码,但有时代码会卡在永远循环中,这真的很烦人。

原因一定是由于某种原因,我的代码从等概率随机游走策略 => 更新为确定性策略,但是以错误的方式,这样它可以在第一集运行后在剧集生成中创建永远循环(它必须运行整个等概率随机游走的第一集)。下图显示,如果您进入标记框中的任何状态,您将无法从那里退出并卡在情节生成中...... 在此处输入图像描述

如果随机生成器很幸运,那么它通常可以“渡过难关”并继续在外循环中进行所需的迭代次数,并为网格世界输出最优策略。

这是python代码(根据我的经验,我在调试器中运行它并且不得不重新启动它几次,但通常它会很快显示它卡在情节生成中,并且无法在外循环中进行迭代。

import numpy as np
import numpy.linalg as LA
import random
from datetime import datetime

random.seed(datetime.now())
rows_count = 4
columns_count = 4

def isTerminal(r, c):  # helper function to check if terminal state or regular state
    global rows_count, columns_count
    if r == 0 and c == 0:  # im a bit too lazy to check otherwise the iteration boundaries
        return True  # so that this helper function is a quick way to exclude computations
    if r == rows_count - 1 and c == columns_count - 1:
        return True
    return False


maxiters = 100000
reward = -1
actions = ["U", "R", "D", "L"]
V = np.zeros((rows_count, columns_count))
returnsDict={}
QDict={}
actDict={0:"U",1:"R",2:"D",3:"L"}
policies = np.array([ ['T','A','A','A'],
                     ['A','A','A','A'],
                     ['A','A','A','A'],
                     ['A','A','A','T'] ])





"""returnsDict, for each state-action pair, maintain (mean,visitedCount)"""
for r in range(rows_count):
    for c in range(columns_count):
        if not isTerminal(r, c):
            for act in actions:
                returnsDict[ ((r, c), act) ] = [0, 0] ## Maintain Mean, and VisitedCount for each state-action pair



""" Qfunc, we maintain the action-value for each state-action pair"""
for r in range(rows_count):
    for c in range(columns_count):
        if not isTerminal(r, c):
            for act in actions:
                QDict[ ((r,c), act) ] = -9999  ## Maintain Q function value for each state-action pair






def getValue(row, col):  # helper func, get state value
    global V
    if row == -1:
        row = 0  # if you bump into wall, you bounce back
    elif row == 4:
        row = 3
    if col == -1:
        col = 0
    elif col == 4:
        col = 3
    return V[row, col]

def getRandomStartState():
    illegalState = True

    while illegalState:
        r = random.randint(0, 3)
        c = random.randint(0, 3)
        if (r == 0 and c == 0) or (r == 3 and c == 3):
            illegalState = True
        else:
            illegalState = False
    return r, c

def getState(row, col):
    if row == -1:
        row = 0  # helper func for the exercise:1
    elif row == 4:
        row = 3
    if col == -1:
        col = 0
    elif col == 4:
        col = 3
    return row, col



def getRandomAction():
    global actDict
    return actDict[random.randint(0, 3)]


def getMeanFromReturns(oldMean, n, curVal):
    newMean = 0
    if n == 0:
        raise Exception('Exception, incrementalMeanFunc, n should not be less than 1')
    elif n == 1:
        return curVal
    elif n >= 2:
        newMean = (float) ( oldMean + (1.0 / n) * (curVal - oldMean) )
        return newMean


"""get the best action 
returns string action
parameter is state tuple (r,c)"""
def getArgmaxActQ(S_t):
    global QDict
    qvalList = []
    saList = []

    """for example get together
    s1a1, s1a2, s1a3, s1a4
    find which is the maxValue, and get the action which caused it"""
    sa1 = (S_t, "U")
    sa2 = (S_t, "R")
    sa3 = (S_t, "D")
    sa4 = (S_t, "L")
    saList.append(sa1)
    saList.append(sa2)
    saList.append(sa3)
    saList.append(sa4)

    q1 = QDict[sa1]
    q2 = QDict[sa2]
    q3 = QDict[sa3]
    q4 = QDict[sa4]
    qvalList.append(q1)
    qvalList.append(q2)
    qvalList.append(q3)
    qvalList.append(q4)

    maxQ = max(qvalList)
    ind_maxQ = qvalList.index(maxQ)  # gets the maxQ value and the index which caused it

    """when we have index of maxQval, then we know which sa-pair
    gave that maxQval => we can access that action from the correct sa-pair"""
    argmaxAct = saList[ind_maxQ][1]
    return argmaxAct

"""QEpisode generation func
returns episodeList
parameters are starting state, starting action"""
def QEpisode(r, c, act):
    global reward
    global policies

    """NOTE! r,c will both be local variables inside this func
    they denote the nextState (s') in this func"""
    stateWasTerm = False
    stepsTaken = 0
    curR = r
    curC = c
    episodeList = [ ((r, c), act, reward) ]  # add the starting (s,a) immediately

    if act == "U":  ##up
        r -= 1
    elif act == "R":  ##right
        c += 1
    elif act == "D":  ## down
        r += 1
    else:  ##left
        c -= 1
    stepsTaken += 1
    r, c = getState(r, c)  ## check status of the newState (s')
    stateWasTerm = isTerminal(r, c)  ## if status was terminal stop iteration, else keep going into loop

    if not stateWasTerm:
        curR = r
        curC = c

    while not stateWasTerm:
        if policies[curR, curC] == "A":
            act = getRandomAction()  ## """get the random action from policy"""
        else:
            act = policies[curR, curC]  ## """get the deterministic action from policy"""

        if act == "U":  ## up
            r -= 1
        elif act == "R":  ## right
            c += 1
        elif act == "D":  ## down
            r += 1
        else:  ## left
            c -= 1
        stepsTaken += 1

        r, c = getState(r, c)
        stateWasTerm = isTerminal(r, c)
        episodeList.append( ((curR, curC), act, reward) )
        if not stateWasTerm:
            curR = r
            curC = c

    return episodeList




print("montecarlo program starting...\n")
""" MOnte Carlo Q-function, exploring starts, every-visit, estimating Pi ~~ Pi* """
for iteration in range(1, maxiters+1): ## for all episodes

    print("curIter == ", iteration)
    print("\n")
    if iteration % 20 == 0: ## get random seed periodically to improve randomness performance
        random.seed(datetime.now())



    for r in range(4):
        for c in range(4):
            if not isTerminal(r,c):
                startR = r
                startC = c
                startAct = getRandomAction()


   ## startR, startC = getRandomStartState() ## get random starting-state, and starting action equiprobably
  ##  startAct = getRandomAction()
                sequence = QEpisode(startR, startC, startAct)  ## generate Q-sequence following policy Pi, until terminal-state (excluding terminal)
                G = 0

                for t in reversed(range(len(sequence))): ## iterate through the timesteps in reversed order
                    S_t = sequence[t][0] ## use temp variables as helpers
                    A_t = sequence[t][1]
                    R_t = sequence[t][2]
                    G += R_t ## increment G with reward, NOTE! the gamma discount factor == 1.0
                    visitedCount = returnsDict[S_t, A_t][1] ## use temp visitedcount
                    visitedCount += 1

                    if visitedCount == 1: ## special case in iterative mean algorithm, the first visit to any state-action pair
                        curMean = 9999
                        curMean = getMeanFromReturns(curMean, visitedCount, G)
                        returnsDict[S_t, A_t][0] = curMean ## update mean
                        returnsDict[S_t, A_t][1] = visitedCount ## update visitedcount
                    else:
                        curMean = returnsDict[S_t, A_t][0] ## get temp mean from returnsDict
                        curMean = getMeanFromReturns(curMean, visitedCount, G) ## get the new temp mean iteratively
                        returnsDict[S_t, A_t][1] = visitedCount ## update visitedcount
                        returnsDict[S_t, A_t][0] = curMean ## update mean


                    QDict[S_t, A_t] = returnsDict[S_t, A_t][0] ## update the Qfunction with the new mean value
                    tempR = S_t[0] ## temp variables simply to disassemble the tuple into row,col
                    tempC = S_t[1]
                    policies[tempR, tempC] = getArgmaxActQ(S_t) ## update policy based on argmax_a[Q(S_t)]


print("optimal policy with Monte-Carlo, every visit was \n")
print("\n")
print(policies)

这是更新的“hacky fix”代码,它似乎使算法“越过了驼峰”,而不会陷入具有确定性策略的永远循环中。我的老师建议您不需要在这种蒙特卡洛的每一步都更新策略,因此您可以使用 python 的模运算符在迭代计数或其他东西上定期更新策略。此外,我有一个绝妙的想法,即 Sutton 和 Barto 的书描述了必须访问所有状态-动作对,非常多次,才能满足算法的探索开始前提条件。

所以,然后我决定强制算法对所有状态-动作对至少运行一次,这样您就可以确定地从每个状态-动作对开始(实际上对于每一集)。在早期探索阶段,这仍将使用旧的随机游走策略运行,我们正在将数据收集到返回字典和 Qdict 中,但尚未改进确定性策略。

import numpy as np
import numpy.linalg as LA
import random
from datetime import datetime

random.seed(datetime.now())
rows_count = 4
columns_count = 4

def isTerminal(r, c):  # helper function to check if terminal state or regular state
    global rows_count, columns_count
    if r == 0 and c == 0:  # im a bit too lazy to check otherwise the iteration boundaries
        return True  # so that this helper function is a quick way to exclude computations
    if r == rows_count - 1 and c == columns_count - 1:
        return True
    return False



"""NOTE about maxiters!!!
the Monte-Carlo every visit algorithm implements total amount of iterations with formula
totalIters = maxiters * nonTerminalStates * possibleActions
totalIters = 5000 * 14 * 4
totalIters = 280000

in other words, there will be 5k iterations per each state-action pair
in other words there will be an early exploration phase where policy willnot be updated,
but the gridworld will be explored with randomwalk policy, gathering Qfunc information, 
and returnDict information.

in early phase there will be about 27 iterations for each state-action pair during,
non-policy-updating exploration 
(maxiters * explorationFactor) / (stateACtionPairs) = 7500 *0.2 /56

after that early exploring with randomwalk,
then we act greedily w.r.t. the Q-function, 
for the rest of the iterations to get the optimal deterministic policy
"""
maxiters = 7500
explorationFactor = 0.2 ## explore that percentage of the first maxiters rounds, try to increase it, if you get stuck in foreverloop, in QEpisode function
reward = -1
actions = ["U", "R", "D", "L"]
V = np.zeros((rows_count, columns_count))
returnsDict={}
QDict={}
actDict={0:"U",1:"R",2:"D",3:"L"}
policies = np.array([ ['T','A','A','A'],
                     ['A','A','A','A'],
                     ['A','A','A','A'],
                     ['A','A','A','T'] ])





"""returnsDict, for each state-action pair, maintain (mean,visitedCount)"""
for r in range(rows_count):
    for c in range(columns_count):
        if not isTerminal(r, c):
            for act in actions:
                returnsDict[ ((r, c), act) ] = [0, 0] ## Maintain Mean, and VisitedCount for each state-action pair



""" Qfunc, we maintain the action-value for each state-action pair"""
for r in range(rows_count):
    for c in range(columns_count):
        if not isTerminal(r, c):
            for act in actions:
                QDict[ ((r,c), act) ] = -9999  ## Maintain Q function value for each state-action pair






def getValue(row, col):  # helper func, get state value
    global V
    if row == -1:
        row = 0  # if you bump into wall, you bounce back
    elif row == 4:
        row = 3
    if col == -1:
        col = 0
    elif col == 4:
        col = 3
    return V[row, col]

def getRandomStartState():
    illegalState = True

    while illegalState:
        r = random.randint(0, 3)
        c = random.randint(0, 3)
        if (r == 0 and c == 0) or (r == 3 and c == 3):
            illegalState = True
        else:
            illegalState = False
    return r, c

def getState(row, col):
    if row == -1:
        row = 0  # helper func for the exercise:1
    elif row == 4:
        row = 3
    if col == -1:
        col = 0
    elif col == 4:
        col = 3
    return row, col



def getRandomAction():
    global actDict
    return actDict[random.randint(0, 3)]


def getMeanFromReturns(oldMean, n, curVal):
    newMean = 0
    if n == 0:
        raise Exception('Exception, incrementalMeanFunc, n should not be less than 1\n')
    elif n == 1:
        return curVal
    elif n >= 2:
        newMean = (float) ( oldMean + (1.0 / n) * (curVal - oldMean) )
        return newMean


"""get the best action 
returns string action
parameter is state tuple (r,c)"""
def getArgmaxActQ(S_t):
    global QDict
    qvalList = []
    saList = []

    """for example get together
    s1a1, s1a2, s1a3, s1a4
    find which is the maxValue, and get the action which caused it"""
    sa1 = (S_t, "U")
    sa2 = (S_t, "R")
    sa3 = (S_t, "D")
    sa4 = (S_t, "L")
    saList.append(sa1)
    saList.append(sa2)
    saList.append(sa3)
    saList.append(sa4)

    q1 = QDict[sa1]
    q2 = QDict[sa2]
    q3 = QDict[sa3]
    q4 = QDict[sa4]
    qvalList.append(q1)
    qvalList.append(q2)
    qvalList.append(q3)
    qvalList.append(q4)

    maxQ = max(qvalList)
    ind_maxQ = qvalList.index(maxQ)  # gets the maxQ value and the index which caused it

    """when we have index of maxQval, then we know which sa-pair
    gave that maxQval => we can access that action from the correct sa-pair"""
    argmaxAct = saList[ind_maxQ][1]
    return argmaxAct




"""QEpisode generation func
returns episodeList
parameters are starting state, starting action"""
def QEpisode(r, c, act):

    """ideally, we should not get stuck in the gridworld...but,
    but sometiems when policy transitions from the first episode's policy == randomwalk,
    then, on second episode sometimes we get stuck in foreverloop in episode generation
    usually the only choice then seems to restart the entire policy into randomwalk ??? """

    global reward
    global policies

    """NOTE! r,c will both be local variables inside this func
    they denote the nextState (s') in this func"""
    stepsTaken = 0
    curR = r
    curC = c
    episodeList = [ ((r, c), act, reward) ]  # add the starting (s,a) immediately

    if act == "U":  ##up
        r -= 1
    elif act == "R":  ##right
        c += 1
    elif act == "D":  ## down
        r += 1
    elif act == "L":  ##left
        c -= 1
    stepsTaken += 1
    r, c = getState(r, c)  ## check status of the newState (s')
    stateWasTerm = isTerminal(r, c)  ## if status was terminal stop iteration, else keep going into loop

    if not stateWasTerm:
        curR = r
        curC = c

    while not stateWasTerm:
        if policies[curR, curC] == "A":
            act = getRandomAction()  ## """get the random action from policy"""
        else:
            act = policies[curR, curC]  ## """get the deterministic action from policy"""

        if act == "U":  ## up
            r -= 1
        elif act == "R":  ## right
            c += 1
        elif act == "D":  ## down
            r += 1
        else:  ## left
            c -= 1
        stepsTaken += 1

        r, c = getState(r, c)
        stateWasTerm = isTerminal(r, c)
        episodeList.append( ((curR, curC), act, reward) )
        if not stateWasTerm:
            curR = r
            curC = c
        if stepsTaken >= 100000:
            raise Exception("Exception raised, because program got stuck in MC Qepisode generation...\n")


    return episodeList




print("montecarlo program starting...\n")
""" MOnte Carlo Q-function, exploring starts, every-visit, estimating Pi ~~ Pi* """

"""It appears that the Qfunction apparently can be unreliable in the early episodes rounds, so we can avoid getting 
stuck in foreverloop because of unreliable early episodes, BUT...

we gotta delay updating the policy, until we have explored enough for a little bit...
so our Qfunction has reliable info inside of it, to base the decision on, later..."""
Q_function_is_reliable = False ## variable shows if we are currently updating the policy, or just improving Q-function and exploring


for iteration in range(1, maxiters+1): ## for all episodes

    print("curIter == ", iteration, ", QfunctionIsReliable == ", Q_function_is_reliable )
    print("\n")
    if iteration % 20 == 0: ## get random seed periodically to improve randomness performance
        random.seed(datetime.now())

    for r in range(4):  ## for every non-terminal-state
        for c in range(4):
            if not isTerminal(r,c):
                startR = r
                startC = c
                for act in actions: ## for every action possible
                    startAct = act
                    sequence = QEpisode(startR, startC, startAct)  ## generate Q-sequence following policy Pi, until terminal-state (excluding terminal)
                    G = 0

                    for t in reversed(range(len(sequence))): ## iterate through the timesteps in reversed order
                        S_t = sequence[t][0] ## use temp variables as helpers
                        A_t = sequence[t][1]
                        R_t = sequence[t][2]
                        G += R_t ## increment G with reward, gamma discount factor is zero
                        visitedCount = returnsDict[S_t, A_t][1]
                        visitedCount += 1

                       ## if (S_t, A_t, -1) not in sequence[:t]: ## This is how you COULD have done the first-visit MC, but we do every-visit now...
                        if visitedCount == 1: ## special case in iterative mean algorithm, the first visit to any state-action pair
                            curMean = 9999
                            curMean = getMeanFromReturns(curMean, visitedCount, G)
                            returnsDict[S_t, A_t][0] = curMean ## update mean
                            returnsDict[S_t, A_t][1] = visitedCount ## update visitedcount
                        else:
                            curMean = returnsDict[S_t, A_t][0] ## get temp mean from returnsDict
                            curMean = getMeanFromReturns(curMean, visitedCount, G) ## get the new temp mean iteratively
                            returnsDict[S_t, A_t][1] = visitedCount ## update visitedcount
                            returnsDict[S_t, A_t][0] = curMean ## update mean


                        QDict[S_t, A_t] = returnsDict[S_t, A_t][0] ## update the Qfunction with the new mean value
                        tempR = S_t[0] ## temp variables simply to disassemble the tuple into row,col
                        tempC = S_t[1]

                        if iteration >= round(maxiters * explorationFactor): ## ONLY START UPDATING POLICY when we have reliable estimates for Qfunction, that is when iteration > maxiter/10
                            Q_function_is_reliable = True
                            policies[tempR, tempC] = getArgmaxActQ(S_t) ## update policy based on argmax_a[Q(S_t)]


print("optimal policy with Monte-Carlo, every visit was \n")
print("\n")
print(policies)
1个回答

您对 Monte Carlo Exploring Starts 算法的实现似乎按设计工作。这是网格世界环境中某些确定性策略可能出现的问题。

您的策略改进步骤可能会生成这样的策略,并且算法中内置的此策略无法恢复。首次访问和每次访问变体都会以不同的方式收敛到真实的操作值,但是在这里都没有提供改进。您的循环很可能是通过第一集中未发生的状态/动作对发生的,因此值估计值默认为 0,这看起来是创建确定性策略时的最佳选择*。

在 Sutton & Barto 演示 Monte Carlo ES 中,作者选择了一个不可能出现此类循环的环境(简化的二十一点游戏)。然后,他们迅速通过使用ϵ- 贪婪的政策。所以这个问题没有详细讨论,尽管有几个地方断言必须有可能完成剧集。

要在仍使用 Monte Carlo ES 的同时解决此问题,您需要更改环境以使此类循环不存在。最简单的更改是在情节过长时终止情节。这是 hacky,因为仅仅在 gridworld 上完成它违反了 Markov 属性(因为如果你想预测值,现在时间步长在技术上应该是状态的一部分)。然而,它会让你摆脱眼前的问题,并且只要你将终止点设置得足够高——例如,为你的小网格世界设置 100 步——代理仍然应该发现最佳策略和相关的动作值。

许多 OpenAI Gym 环境也使用了这个“超时”补丁,因为尽管大多数其他算法都可以找到摆脱这种无限循环的方法,但它们仍然会因过长的情节而导致学习缓慢。


* 这可能导致悲观地初始化您的 Q 值 - 例如使用 -50 起始值。这应该有助于第一个确定性策略加入终端状态 - 尽管如果您使其严格确定性并确定性地解决价值关系,那么即使这可能还不够。您可能想尝试验证我在这里所说的内容,尽管我会建议使用超时黑客,因为这是一个更通用的解决方案。使用其他算法时,悲观的起始值不利于探索。