機械学習とかコンピュータビジョンとか

CVやMLに関する勉強のメモ書き。

OpenAIのSpinning Upで強化学習を勉強してみた その3

はじめに

その3ということで一応Introduction to RLの最終回.今回勉強したページはこちら

Part 3: Intro to Policy Optimization

今回はpolicy optimizationの基礎理論とその実装について.

Deriving the Simplest Policy Gradient

まずは\thetaでparameterizeされた確率的なpolicy \pi_\thetaを考える.目標はexpected return J(\pi_\theta)=\underset{\tau\sim\pi_\theta}{E}[R(\tau)]の最大化で,今回R(\tau)はfinite-horizon undercounted return(減衰しない固定長区間の報酬の和)とする(ただしinfinite-horizon discounted returnでも結果は同じ).

基本的には次のような勾配上昇法によってpolicyを最適化していく.

\displaystyle
\theta_{k+1}=\theta_k+\alpha\nabla_\theta J(\pi_\theta)|_{\theta_k}

\nabla_\theta J(\pi_\theta)はpolicy gradientと呼ばれ,policy gradientを使ってpolicyの最適化を行うアルゴリズムをpolicy gradient algorithmsというらしい.

ということでpolicy gradientを求めてみる.まずはpolicy gradientを計算するのに必要なtrajectoryの確率の微分を求める.Policy \pi_\thetaにおけるtrajectory \tau=(s_1,a_0,\dots,s_{T+1})の確率は次のように与えられる.

\displaystyle
P(\tau|\theta)=\rho_0(s_0)\prod_{t=0}^TP(s_{t+1}|s_t,a_t)\pi_\theta(a_t|s_t)

ここで連鎖律と対数の導関数\nabla_\theta\log P(\tau|\theta)=\frac{1}{P(\tau|\theta)}\nabla_\theta P(\tau|\theta)を利用して次のようにP(\tau|\theta)導関数を表現.

\displaystyle
\nabla_\theta P(\tau|\theta)=P(\tau|\theta)\nabla_\theta\log P(\tau|\theta)

Trajectoryの確率の対数は

\displaystyle
\log P(\tau|\theta)=\log\rho_0(s_0)+\sum_{t=0}^T\left(\log P(s_{t+1}|s_t,a_t)+\log\pi_\theta(a_t|s_t)\right)

となることから\nabla_\theta P(\tau|\theta)は次のようになる.

\displaystyle
\nabla_\theta\log P(\tau|\theta)=\nabla_\theta\log\rho_0(s_0)+\sum_{t=0}^T\left(\nabla_\theta\log P(s_{t+1}|s_t,a_t)+\nabla_\theta\log\pi_\theta(a_t|s_t)\right)=\sum_{t=0}^T\nabla_\theta\log\pi_\theta(a_t|s_t)

最後の等式は\rho_0(s_0),P(s_{t+1}|s_t,a_t)\thetaと無関係なことから導かれる.以上から最終的に求めたいpolicy gradientは次のようになる.

\displaystyle
\nabla_\theta J(\pi_\theta)=\nabla_\theta\underset{\tau\sim\pi_\theta}{E}[R(\tau)]=\nabla_\theta\int_\tau P(\tau|\theta)R(\tau)=\int_\tau\nabla_\theta P(\tau|\theta)R(\tau) \\ \displaystyle
=\int_\tau P(\tau|\theta)\nabla_\theta\log P(\tau|\theta)R(\tau)=\underset{\tau\sim\pi_\theta}{E}[\nabla_\theta\log P(\tau|\theta)R(\tau)]\\ \displaystyle
=\underset{\tau\sim\pi_\theta}{E}\left[\sum_{t=0}^T\nabla_\theta\log\pi_\theta(a_t|s_t)R(\tau)\right]

最後の期待値の計算はサンプル平均として計算することで勾配を実際に計算することが可能.すなわち,trajectoryの集合\mathcal{D}=\{\tau_i\}_{i=1,\dots,N}がエージェントの行動の結果得られたとした時,policy gradientは次のように計算される.

\displaystyle
\hat{g}=\frac{1}{|\mathcal{D}|}\sum_{\tau\in\mathcal{D}}\sum_{t=0}^T\nabla_\theta\log\pi_theta(a_t|s_t)R(\tau)

ここにはpolicyの対数が微分可能であるという仮定と,trajectoryのデータを集めるために環境においてpolicyを実行可能という仮定が入っていることには注意.ただ,実際は自動微分のおかげで勝手に計算してくれるので実装に関してはめちゃくちゃ簡単にできる.

Implementing the Simplest Policy Gradient

Spinning upのgithubのrepositoryにpolicy gradient algorithmの実装例があって,わずか122行で実装可能とのこと.ただ自分はpytorch派なので,pytorchで再現実装してみました(元のコードをなるべく崩さないように書いたので不自然な記述方法があったりするので注意).以下コード.

import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import gym
from gym.spaces import Discrete, Box


def mlp(sizes, activation=nn.Tanh, output_activation=None):
    layers = []
    for i in range(1,len(sizes)-1):
        layers.append(nn.Sequential(
            nn.Linear(sizes[i-1], sizes[i]), activation()
        ))
    layers.append(nn.Linear(sizes[-2], sizes[-1]))
    if output_activation is not None:
        layers.append(output_activation())
    return nn.Sequential(*layers)

def multinomial(logits, num_samples=1):
    return torch.distributions.Multinomial(num_samples, logits=logits).sample()

def train(env_name="CartPole-v0", hidden_sizes=[32], lr=1e-2,
        epochs=50, batch_size=5000, render=False):

        if torch.cuda.is_available():
            device = "cuda"
        else:
            device = "cpu"

        # make environment, check spaces, get obs / act dims
        env = gym.make(env_name)
        assert isinstance(env.observation_space, Box), \
            "This example only works for envs with continuous state spaces."
        assert isinstance(env.action_space, Discrete), \
            "This example only works for envs with discrete action spaces."

        obs_dim = env.observation_space.shape[0]
        n_acts  = env.action_space.n

        # make core of policy network
        policy = mlp([obs_dim] + hidden_sizes+[n_acts]).to(device)

        optimizer = optim.Adam(policy.parameters(), lr=lr)

        def train_one_epoch():
            # make some empty lists for logging
            batch_obs     = []
            batch_acts    = []
            batch_weights = []
            batch_rets    = []
            batch_lens    = []

            # make
            obs     = env.reset()
            done    = False
            ep_rews = []

            finished_rendering_this_epoch = False

            while True:
                with torch.no_grad():
                    # rendering
                    if not finished_rendering_this_epoch:
                        env.render()

                    # save obs
                    batch_obs.append(obs.copy())

                    logits = policy(torch.from_numpy(obs[None]).float().to(device))
                    act    = multinomial(logits).squeeze().to("cpu").detach().numpy()
                    obs, rew, done, _ = env.step(act.argmax(0))

                    # save obs
                    batch_acts.append(act)
                    ep_rews.append(rew)

                    if done:
                        # if episode is over, record info about episode
                        ep_ret, ep_len = sum(ep_rews), len(ep_rews)
                        batch_rets.append(ep_ret)
                        batch_lens.append(ep_len)

                        # the weight for each logprob(a|s) is R(tau)
                        batch_weights += [ep_ret] * ep_len

                        # reset episode-specific variables
                        obs, done, ep_rews = env.reset(), False, []

                        # won't render again this epoch
                        finished_rendering_this_epoch = True

                        # end experience loop if we have enough of it
                        if len(batch_obs) > batch_size:
                            break

            # take a single policy gradient update step
            batch_obs     = torch.from_numpy(np.array(batch_obs)).float().to(device)
            batch_acts    = torch.from_numpy(np.array(batch_acts)).float().to(device)
            batch_weights = torch.from_numpy(np.array(batch_weights)).float().to(device)
            logits = policy(batch_obs)
            loss = -( batch_weights * (batch_acts * nn.functional.log_softmax(logits, 1)).sum(1)).mean()

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            return loss, batch_rets, batch_lens
        
        for i in range(epochs):
            batch_loss, batch_rets, batch_lens = train_one_epoch()
            print('epoch: %3d \t loss: %.3f \t return: %.3f \t ep_len: %.3f'%
                (i, batch_loss, np.mean(batch_rets), np.mean(batch_lens)))
        env.env.close()

if __name__ == '__main__':
    import argparse
    parser = argparse.ArgumentParser()
    parser.add_argument('--env_name', '--env', type=str, default='CartPole-v0')
    parser.add_argument('--render', action='store_true')
    parser.add_argument('--lr', type=float, default=1e-2)
    args = parser.parse_args()
    print('\nUsing simplest formulation of policy gradient.\n')
    train(env_name=args.env_name, render=args.render, lr=args.lr)

雑に移植しただけなのでコードは元のやつが参考になるかと.

サンプルコードの問題設定はCartPoleで棒が倒れないようにするあれ.強化学習の環境についてはopenai gymを使って実装.今回のCartPoleについてはここ参照.観測としてはcartの位置と速度,poleの角度と速度の4次元が得られる.Policyは2層のニューラルネットなので観測の4次元を\mathbf{x}とすれば\pi_\theta(\mathbf{x})=\mathbf{SoftMax}(\mathbf{W}\cdot\mathrm{Tanh}(\mathbf{W}\cdot\mathbf{x}+\mathbf{b})+\mathbf{b})のように表現される.行動はcartを左か右に動かすかの2択なのでpolicyの出力は2次元.出力された確率からサンプリングによって行動を選択し,poleが倒れるなどの終了条件を満たすまで繰り返しサンプルを収集.rewardは毎行動ごとに1与えられる.よって最大化すべき関数は選択された行動を示すone-hotベクトルをaとすれば\frac{1}{T}\sum_{t=0}^Ta_t\log\left(\pi_\theta(s_t)\right)R(\tau)として計算できる.

コード内でlossを計算しているいわゆるloss functionの役割を果たしている式があるが,これは厳密にはloss functionではない.基本的にloss functionはデータ分布が固定の元で計算されるが,この場合では直近のpolicyにおいてサンプルされたデータで学習を行うためデータ分布がパラメータ(policy)に依存する.また,基本的にはloss functionは予測結果の良し悪しをはかる指標となるが,ここでのloss functionは現在のパラメータの良し悪しをはかる.要は何が言いたいかというと,強化学習においてデータはエージェントの行動の結果得られるため,行動を決めるpolicyのパラメータ次第でデータの分布が変わるのでいわゆるloss functionとしての役割とは少し違うということ.

Expected Grad-Log-Prob Lemma

ここではExpected Grad-Log-Prod (EGLP)と呼ばれる命題を解説.

・EGLP Lemma

ここではpolicy gradientsを通して使われる中間的な結果を導く. ある確率変数xが従う\thetaでparameterizeされた確率分布はP_\theta次を満たす.

\displaystyle
\underset{x\sim P_\theta}{E}[\nabla_\theta\log P_\theta(x)]=0

・証明

P_\theta(x)は確率分布なので\int_xP_\theta(x)=1\thetaについて微分すれば\nabla_\theta\int_xP_\theta(x)=\nabla_\theta 1=0.この関係を用いれば

\displaystyle
0=\nabla_\theta\int_xP_\theta(x)=\int_x\nabla_\theta P_\theta(x)=\int_xP_\theta(x)\nabla_\theta\log P_\theta(x)\\ \displaystyle
\therefore 0=\underset{x\sim P_\theta}{E}[\nabla_\theta\log P_\theta(x)]

Don't Let the Past Distract You

ここまでpolicy gradientを以下のように表現した.

\displaystyle
\nabla_\theta J(\pi_\theta)=\underset{\tau\sim\pi_\theta}{E}\left[\sum_{t=0}^T\nabla_\theta\log\pi_\theta(a_t|s_t)R(\tau)\right]

この勾配を使えば報酬の和であるR(\tau)に比例した行動の対数確率を増加させることができるが,実はこれはあまりよくないとのこと.

エージェントは結果に基づいて行動を決めるべきだが,行動の前に得られる報酬は行動がどれくらいよかったかには全く関係がない.この問題を解決するために次のようにpolicy gradientを表現する.

\displaystyle
\nabla_\theta J(\pi_\theta)=\underset{\tau\sim\pi_\theta}{E}\left[\sum_{t=0}^T\nabla_\theta\log\pi_\theta(a_t|s_t)\sum_{t'=t}^TR(s_{t'},a_{t'},s_{t'+1})\right]

何が変わったかというと,rewardの計算がもともとR(\tau)=\sum_{t=0}^TR(s_t,a_t,s_{t+1})と行動を起こした時刻に関係なく全ての時刻に対する総和の計算だったのに対し,行動を選択した時刻t以降の足し合わせ\sum_{t'=t}^TR(s_{t'},a_{t'},s_{t'+1})に変わっている.こうすることによって,行動を起こす以前の報酬を考慮しない形になるので良いだろうということ.ちなみにこれをreward-to-go policy gradientと呼び\hat{R}_t\overset{\cdot}{=}\sum_{t'=t}^TR(s_{t'},a_{t'},s_{t'+1})と表す.

Implementing Reward-to-Go Policy Gradient

ということでreward-to-go policy gradientを実装.rewardの計算が変わるだけなのでさっきのコードを一部修正するだけで動く.具体的には次のreward-to-goを計算する関数(サンプルコードまんま)を用意.

def reward_to_go(rews):
    n = len(rews)
    rtgs = np.zeros_like(rews)
    for i in reversed(range(n)):
        rtgs[i] = rews[i] + (rtgs[i+1] if i+1 < n else 0)
    return rtgs

batch_weightsの部分を

batch_weights += list(reward_to_go(ep_rews))

に変更.これでreward-to-goバージョンのpolicy gradientになる.学習の安定性は増した気がする.

Baselines in Policy Gradients

EGLPの結果は状態のみに依存する任意の関数bに対して適用可能.

\displaystyle
\underset{a_t\sim\pi_\theta}{E}[\nabla_\theta\log\pi_\theta(a_t|s_t)b(s_t)]=0

なので,次のように任意の数を引いたり足したりすることが可能.

\displaystyle
\nabla_\theta J(\pi_\theta)=\underset{\tau\sim\pi_\theta}{E}\left[\sum_{t=0}^T\nabla_\theta\log\pi_\theta(a_t|s_t)\left(\sum_{t'=t}^TR(s_{t'},a_{t'},s_{t'+1})-b(s_t)\right)\right]

ここで使われる関数bはbaselineと呼ばれ,一般的な選択肢としてon-policy value function V^\pi(s_t)がある.

経験的にb(s_t)=V^\pi(s_t)とすることはpolicy gradientにおいてsampleのばらつきを減らす効果があるらしく,結果として学習の速度と安定性が向上するとのこと.ただ,実践的にはV^\pi(s_t)を正確に計算することはできないので近似計算することになる.近似計算にはニューラルネットを使うことが多く,このvalue functionを計算するvalue network V_\phiはpolicyと同時に更新されていく.

V_\phiを学習する最も単純なものは以下の二乗誤差を目的関数として学習する方法.

\displaystyle
\phi_k=\underset{\phi}{\mathrm{argmin}}\underset{s_t,\hat{R}_t\sim\pi_k}{E}\left[\left(V_\phi(s_t)-\hat{R}_t\right)^2\right]

\pi_kkエポック目のpolicyで,過去のパラメータ\phi_{k-1}から\phi_kまでに複数回のパラメータの更新が行われる.

Other Forms of the Policy Gradient

今までの内容を踏まえてpolicy gradientを一般化すると次の形になる.

\displaystyle
\nabla_\theta J(\pi_\theta)=\underset{\tau\sim\pi_\theta}{E}\left[\sum_{t=0}^T\nabla_\theta\log\pi_\theta(a_t|s_t)\Phi_t\right]

\Phi_tはここまでやってきた例で言えば,\Phi_t=R(\tau)だったり,\Phi_t=\sum_{t'=t}^TR(s_{t'},a_{t'},s_{s'+1})だったり,\Phi_t=\sum_{t'=t}^TR(s_{t'},a_{t'},s_{t'+1})-b(s_t)だったりする.その他\Phi_tの選択肢として次の2つも重要.

  1. On-Policy Action-Value Function

\displaystyle
\Phi_t=Q^{\pi_\theta}(s_t,a_t)

Q関数を使っても良いという証明はこちら

  1. The Advantage Function

\displaystyle
\Phi_t=A^{\pi_\theta}(s_t,a_t)=Q^\pi(s_t,a_t)-V^\pi(s_t)

Advantage functionを使っても良いというのはQ関数を使っても良いということから言える.

曰くadvantage functionを使ったpolicy gradientsはとても一般的な方法らしく,advantage functionを推定するたくさんの方法があるそうで,Generalized Advantage Estimation(GAE)の論文を読むことを推奨していた.のでまた今度読む.

まとめ

とりあえず単純なアルゴリズム強化学習を試すというところまで.OpenAI gymがよくできていて,実装にかかるコストがめちゃくちゃ削減されているのが嬉しい.

とりあえずspinning upのintroduction to RLはこれで終わり.後は実際に色々手を動かしたり論文読んだりして各々頑張ってくれ的な感じなのでやってみようと思う.