Reinforcement Learning

Banilla Policy Gradient를 Pytorch로 구현해 보자

NGC3132 2023. 4. 9. 23:58

지난 글에서 강화학습의 기본 알고리즘 중 하나인 Multi Armed Bandit을 Pytorch로 구현해 보았다.

이번 시간에는 Policy Gradient를 Pytorch로 구현해볼 예정이다.

밴딧 알고리즘에서 갑자기 Policy + Gradient 라는 새로운 개념이 어려울 수 있지만, 코드를 보면서 천천히 따라가보자.

Banilla Policy Gradient 코드

# vanilla policy gradient는 앞의 두 bandit 문제에서 고려한 것을 포함해 총 3가지를 고려해야 함
# 1. 액션 의존성 :MAB에서 사용된 것처럼 각각의 액션이 보상을 가져다 줄 확률은 다름
# 2. 상태 의존성 :MAB와 달리, CB에서 각 액션을 취할 때의 보상은 그 액션을 취할 당시의 상태와 관계가 있음
# 3. 시간 의존성 :에이전트는 보상에 대해 시간 지연된 시점에 학습함, 따라서 데이터에 샘플을 저장 후 뭉쳐서 학습

import gym
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.distributions import Categorical

# Hyperparameters
learning_rate = 0.002
gamma = 0.99

# 여기서 Policy는 상태가치함수를 나타냄


class Policy(nn.Module):
    def __init__(self):
        super(Policy, self).__init__()
        self.data = []

        self.fc1 = nn.Linear(4, 128)
        self.fc2 = nn.Linear(128, 2)
        self.optimizer = optim.Adam(self.parameters(), lr=learning_rate)

    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = F.softmax(self.fc2(x), dim=0)
        return x

    def put_data(self, item):
        self.data.append(item)

    def train_net(self):
        R = 0
        self.optimizer.zero_grad()
        for r, prob in self.data[::-1]:
            R = r + gamma * R
            loss = -torch.log(prob) * R
            loss.backward()
        self.optimizer.step()
        self.data = []


env = gym.make('CartPole-v1')
pi = Policy()
score = 0.0
print_interval = 100

for n_epi in range(1000):
    # s는 state로, Cart Position / Cart Velocity / Pole Angle / Pole Velocity 정보를 담고 있음
    s, _ = env.reset()
    done = False

    while not done:
        # policy는 state(4)를 받아 action(2) 별 확률을 반환
        prob = pi(torch.from_numpy(s).float())
        m = Categorical(prob)
        a = m.sample()
        s_prime, r, done, info, _ = env.step(a.item())
        pi.put_data((r, prob[a]))
        s = s_prime
        score += r

    pi.train_net()

    if n_epi % print_interval == 0 and n_epi != 0:
        print("# of episode :{}, avg score : {}".format(
            n_epi, score/print_interval))

        # 성공 조건 설정
        if score/print_interval > 500:
            print("Congratulations! solved :)")
            break

        score = 0.0

env.close()

import

import gym
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.distributions import Categorical

import에서는 기본적인 pytorch 라이브러리들을 import함과 동시에 'gym'이라는 라이브러리를 import 한다.

gym은 OpenAI에서 만든 강화학습 환경을 구성해주는 라이브러리 이다.

env = gym.make('CartPole-v1')

다음과 같은 코드를 통해 gym 라이브러리에 구현된 환경을 만들고,

s_prime, r, done, info, _ = env.step(a.item())

다음과 같은 코드를 통해, 현재 state를 진행 했을 때의 reward, 다음 state 및 종료 여부를 지속적으로 받아올 수 있다.

이 환경의 state 에서는 Cart Position / Cart Velocity / Pole Angle / Pole Velocity 라는 4가지 정보를 담고 있으므로,

첫 Linear의 입력 차원이 4차원이 되는 것이다.

그 후 128차원의 Fully Connected Layer를 거친 후 오른쪽 / 왼쪽을 선택하는 Linear(128,2) layer를 넣어 주었다.

즉, Banilla Policy Gradient에서 학습의 대상이 되는 것은 이러한 Layer를 가지고 있는 'Policy'이다.

Policy 클래스

class Policy(nn.Module):
    def __init__(self):
        super(Policy, self).__init__()
        self.data = []

        self.fc1 = nn.Linear(4, 128)
        self.fc2 = nn.Linear(128, 2)
        self.optimizer = optim.Adam(self.parameters(), lr=learning_rate)

    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = F.softmax(self.fc2(x), dim=0)
        return x

    def put_data(self, item):
        self.data.append(item)

    def train_net(self):
        R = 0
        self.optimizer.zero_grad()
        for r, prob in self.data[::-1]:
            R = r + gamma * R
            loss = -torch.log(prob) * R
            loss.backward()
        self.optimizer.step()
        self.data = []

즉, nn.Module을 상속받은 Policy class의 Linear layer들에 있는 가중치들이 모두 학습가능한 파라미터가 되는 것이다.

Policy 클래스 내에 정의된 forward 함수를 통해 forward propagation이 이뤄지고,

train_net 함수를 통해 zero_grad(), loss.backward, optimizer.step()으로 구성된 전형적인 backpropagation이 일어난다.

Episode loop

for n_epi in range(1000):
    # s는 state로, Cart Position / Cart Velocity / Pole Angle / Pole Velocity 정보를 담고 있음
    s, _ = env.reset()
    done = False

    while not done:
        # policy는 state(4)를 받아 action(2) 별 확률을 반환
        prob = pi(torch.from_numpy(s).float())
        m = Categorical(prob)
        a = m.sample()
        s_prime, r, done, info, _ = env.step(a.item())
        pi.put_data((r, prob[a]))
        s = s_prime
        score += r

    pi.train_net()

    if n_epi % print_interval == 0 and n_epi != 0:
        print("# of episode :{}, avg score : {}".format(
            n_epi, score/print_interval))

        # 성공 조건 설정
        if score/print_interval > 500:
            print("Congratulations! solved :)")
            break

        score = 0.0

env.close()

총 1000번의 에피소드를 실행하도록 설정 했으며, 한 에피소드는 evn.step에서 done 값이 True를 반환할 때까지 반복한다.

prob = pi(torch.from_numpy(s).float())

먼저 Policy를 이용해 오른쪽 / 왼쪽으로 움직일 확률을 받은 뒤 (예, [0.7, 0.3])

m = Categorical(prob)
a = m.sample()

확률에 따라 샘플을 뽑는다. (action에 해당)

그리고 이 액션을 다시 env에 넣어 주어서 reward, 다음 state, done 여부 정보를 받아오는 것이다.

여기서, 선택한 action에 해당하는 확률과 reward 값을 Policy에 put_data 메소드를 통해 전달해 준다.

그리고 done이 True가 되었을 경우, 쌓여진 정보를 한꺼번에 pi.train_net()을 통해 업데이트 한다.

실행한 결과는 다음과 같다.

```