OpenAIのSpinning Upで強化学習を勉強してみた その3
はじめに
その3ということで一応Introduction to RLの最終回.今回勉強したページはこちら
Part 3: Intro to Policy Optimization
今回はpolicy optimizationの基礎理論とその実装について.
Deriving the Simplest Policy Gradient
まずはでparameterizeされた確率的なpolicy を考える.目標はexpected return の最大化で,今回はfinite-horizon undercounted return(減衰しない固定長区間の報酬の和)とする(ただしinfinite-horizon discounted returnでも結果は同じ).
基本的には次のような勾配上昇法によってpolicyを最適化していく.
はpolicy gradientと呼ばれ,policy gradientを使ってpolicyの最適化を行うアルゴリズムをpolicy gradient algorithmsというらしい.
ということでpolicy gradientを求めてみる.まずはpolicy gradientを計算するのに必要なtrajectoryの確率の微分を求める.Policy におけるtrajectory の確率は次のように与えられる.
ここで連鎖律と対数の導関数を利用して次のようにの導関数を表現.
Trajectoryの確率の対数は
となることからは次のようになる.
最後の等式はがと無関係なことから導かれる.以上から最終的に求めたいpolicy gradientは次のようになる.
最後の期待値の計算はサンプル平均として計算することで勾配を実際に計算することが可能.すなわち,trajectoryの集合がエージェントの行動の結果得られたとした時,policy gradientは次のように計算される.
ここにはpolicyの対数が微分可能であるという仮定と,trajectoryのデータを集めるために環境においてpolicyを実行可能という仮定が入っていることには注意.ただ,実際は自動微分のおかげで勝手に計算してくれるので実装に関してはめちゃくちゃ簡単にできる.
Implementing the Simplest Policy Gradient
Spinning upのgithubのrepositoryにpolicy gradient algorithmの実装例があって,わずか122行で実装可能とのこと.ただ自分はpytorch派なので,pytorchで再現実装してみました(元のコードをなるべく崩さないように書いたので不自然な記述方法があったりするので注意).以下コード.
import torch import torch.nn as nn import torch.optim as optim import numpy as np import gym from gym.spaces import Discrete, Box def mlp(sizes, activation=nn.Tanh, output_activation=None): layers = [] for i in range(1,len(sizes)-1): layers.append(nn.Sequential( nn.Linear(sizes[i-1], sizes[i]), activation() )) layers.append(nn.Linear(sizes[-2], sizes[-1])) if output_activation is not None: layers.append(output_activation()) return nn.Sequential(*layers) def multinomial(logits, num_samples=1): return torch.distributions.Multinomial(num_samples, logits=logits).sample() def train(env_name="CartPole-v0", hidden_sizes=[32], lr=1e-2, epochs=50, batch_size=5000, render=False): if torch.cuda.is_available(): device = "cuda" else: device = "cpu" # make environment, check spaces, get obs / act dims env = gym.make(env_name) assert isinstance(env.observation_space, Box), \ "This example only works for envs with continuous state spaces." assert isinstance(env.action_space, Discrete), \ "This example only works for envs with discrete action spaces." obs_dim = env.observation_space.shape[0] n_acts = env.action_space.n # make core of policy network policy = mlp([obs_dim] + hidden_sizes+[n_acts]).to(device) optimizer = optim.Adam(policy.parameters(), lr=lr) def train_one_epoch(): # make some empty lists for logging batch_obs = [] batch_acts = [] batch_weights = [] batch_rets = [] batch_lens = [] # make obs = env.reset() done = False ep_rews = [] finished_rendering_this_epoch = False while True: with torch.no_grad(): # rendering if not finished_rendering_this_epoch: env.render() # save obs batch_obs.append(obs.copy()) logits = policy(torch.from_numpy(obs[None]).float().to(device)) act = multinomial(logits).squeeze().to("cpu").detach().numpy() obs, rew, done, _ = env.step(act.argmax(0)) # save obs batch_acts.append(act) ep_rews.append(rew) if done: # if episode is over, record info about episode ep_ret, ep_len = sum(ep_rews), len(ep_rews) batch_rets.append(ep_ret) batch_lens.append(ep_len) # the weight for each logprob(a|s) is R(tau) batch_weights += [ep_ret] * ep_len # reset episode-specific variables obs, done, ep_rews = env.reset(), False, [] # won't render again this epoch finished_rendering_this_epoch = True # end experience loop if we have enough of it if len(batch_obs) > batch_size: break # take a single policy gradient update step batch_obs = torch.from_numpy(np.array(batch_obs)).float().to(device) batch_acts = torch.from_numpy(np.array(batch_acts)).float().to(device) batch_weights = torch.from_numpy(np.array(batch_weights)).float().to(device) logits = policy(batch_obs) loss = -( batch_weights * (batch_acts * nn.functional.log_softmax(logits, 1)).sum(1)).mean() optimizer.zero_grad() loss.backward() optimizer.step() return loss, batch_rets, batch_lens for i in range(epochs): batch_loss, batch_rets, batch_lens = train_one_epoch() print('epoch: %3d \t loss: %.3f \t return: %.3f \t ep_len: %.3f'% (i, batch_loss, np.mean(batch_rets), np.mean(batch_lens))) env.env.close() if __name__ == '__main__': import argparse parser = argparse.ArgumentParser() parser.add_argument('--env_name', '--env', type=str, default='CartPole-v0') parser.add_argument('--render', action='store_true') parser.add_argument('--lr', type=float, default=1e-2) args = parser.parse_args() print('\nUsing simplest formulation of policy gradient.\n') train(env_name=args.env_name, render=args.render, lr=args.lr)
雑に移植しただけなのでコードは元のやつが参考になるかと.
サンプルコードの問題設定はCartPoleで棒が倒れないようにするあれ.強化学習の環境についてはopenai gymを使って実装.今回のCartPoleについてはここ参照.観測としてはcartの位置と速度,poleの角度と速度の4次元が得られる.Policyは2層のニューラルネットなので観測の4次元をとすればのように表現される.行動はcartを左か右に動かすかの2択なのでpolicyの出力は2次元.出力された確率からサンプリングによって行動を選択し,poleが倒れるなどの終了条件を満たすまで繰り返しサンプルを収集.rewardは毎行動ごとに1与えられる.よって最大化すべき関数は選択された行動を示すone-hotベクトルをとすればとして計算できる.
コード内でlossを計算しているいわゆるloss functionの役割を果たしている式があるが,これは厳密にはloss functionではない.基本的にloss functionはデータ分布が固定の元で計算されるが,この場合では直近のpolicyにおいてサンプルされたデータで学習を行うためデータ分布がパラメータ(policy)に依存する.また,基本的にはloss functionは予測結果の良し悪しをはかる指標となるが,ここでのloss functionは現在のパラメータの良し悪しをはかる.要は何が言いたいかというと,強化学習においてデータはエージェントの行動の結果得られるため,行動を決めるpolicyのパラメータ次第でデータの分布が変わるのでいわゆるloss functionとしての役割とは少し違うということ.
Expected Grad-Log-Prob Lemma
ここではExpected Grad-Log-Prod (EGLP)と呼ばれる命題を解説.
・EGLP Lemma
ここではpolicy gradientsを通して使われる中間的な結果を導く. ある確率変数が従うでparameterizeされた確率分布は次を満たす.
・証明
は確率分布なので.について微分すれば.この関係を用いれば
Don't Let the Past Distract You
ここまでpolicy gradientを以下のように表現した.
この勾配を使えば報酬の和であるに比例した行動の対数確率を増加させることができるが,実はこれはあまりよくないとのこと.
エージェントは結果に基づいて行動を決めるべきだが,行動の前に得られる報酬は行動がどれくらいよかったかには全く関係がない.この問題を解決するために次のようにpolicy gradientを表現する.
何が変わったかというと,rewardの計算がもともとと行動を起こした時刻に関係なく全ての時刻に対する総和の計算だったのに対し,行動を選択した時刻以降の足し合わせに変わっている.こうすることによって,行動を起こす以前の報酬を考慮しない形になるので良いだろうということ.ちなみにこれをreward-to-go policy gradientと呼びと表す.
Implementing Reward-to-Go Policy Gradient
ということでreward-to-go policy gradientを実装.rewardの計算が変わるだけなのでさっきのコードを一部修正するだけで動く.具体的には次のreward-to-goを計算する関数(サンプルコードまんま)を用意.
def reward_to_go(rews): n = len(rews) rtgs = np.zeros_like(rews) for i in reversed(range(n)): rtgs[i] = rews[i] + (rtgs[i+1] if i+1 < n else 0) return rtgs
batch_weightsの部分を
batch_weights += list(reward_to_go(ep_rews))
に変更.これでreward-to-goバージョンのpolicy gradientになる.学習の安定性は増した気がする.
Baselines in Policy Gradients
EGLPの結果は状態のみに依存する任意の関数に対して適用可能.
なので,次のように任意の数を引いたり足したりすることが可能.
ここで使われる関数はbaselineと呼ばれ,一般的な選択肢としてon-policy value function がある.
経験的にとすることはpolicy gradientにおいてsampleのばらつきを減らす効果があるらしく,結果として学習の速度と安定性が向上するとのこと.ただ,実践的にはを正確に計算することはできないので近似計算することになる.近似計算にはニューラルネットを使うことが多く,このvalue functionを計算するvalue network はpolicyと同時に更新されていく.
を学習する最も単純なものは以下の二乗誤差を目的関数として学習する方法.
はエポック目のpolicyで,過去のパラメータからまでに複数回のパラメータの更新が行われる.
Other Forms of the Policy Gradient
今までの内容を踏まえてpolicy gradientを一般化すると次の形になる.
はここまでやってきた例で言えば,だったり,だったり,だったりする.その他の選択肢として次の2つも重要.
- On-Policy Action-Value Function
Q関数を使っても良いという証明はこちら.
- The Advantage Function
Advantage functionを使っても良いというのはQ関数を使っても良いということから言える.
曰くadvantage functionを使ったpolicy gradientsはとても一般的な方法らしく,advantage functionを推定するたくさんの方法があるそうで,Generalized Advantage Estimation(GAE)の論文を読むことを推奨していた.のでまた今度読む.
まとめ
とりあえず単純なアルゴリズムで強化学習を試すというところまで.OpenAI gymがよくできていて,実装にかかるコストがめちゃくちゃ削減されているのが嬉しい.
とりあえずspinning upのintroduction to RLはこれで終わり.後は実際に色々手を動かしたり論文読んだりして各々頑張ってくれ的な感じなのでやってみようと思う.