사과 받기 강화학습 코드(Keras)
# 패키지 로딩
import json
import numpy as np
from keras.models import Sequential
from keras.layers.core import Dense
from keras.optimizers import sgd
import json
import numpy as np
from keras.models import Sequential
from keras.layers.core import Dense
from keras.optimizers import sgd
# 사과 받기 드로잉 클래스. ------------------------------------------------------
답글삭제class Catch(object):
# 생성자
def __init__(self, grid_size=10):
self.grid_size = grid_size
self.reset()
# 액션후 다음상태 업데이트
def _update_state(self, action):
"""
Input: action and states
Ouput: new states and reward
"""
state = self.state
if action == 0: # left
action = -1
elif action == 1: # stay
action = 0
else:
action = 1 # right
f0, f1, basket = state[0]
new_basket = min(max(1, basket + action), self.grid_size-1)
f0 += 1
out = np.asarray([f0, f1, new_basket])
out = out[np.newaxis]
assert len(out.shape) == 2
self.state = out
# 상태 그리기, 사과와 바구니 위치
def _draw_state(self):
im_size = (self.grid_size,)*2
state = self.state[0]
canvas = np.zeros(im_size)
canvas[state[0], state[1]] = 1 # draw fruit
canvas[-1, state[2]-1:state[2] + 2] = 1 # draw basket
return canvas
# 보상 계산
def _get_reward(self):
fruit_row, fruit_col, basket = self.state[0] # 사과위치, 바구니 수평위치 취득
if fruit_row == self.grid_size-1: # 사과가 바닥에 있고
if abs(fruit_col - basket) <= 1: # 사과가 바구니에 닿으면 보상1
return 1
else:
return -1
else:
return 0 # 사과가 떨어지고 있을땐 0
# 끝났는지 여부
def _is_over(self):
if self.state[0, 0] == self.grid_size-1:
return True
else:
return False
# 묘화 상태 확인
def observe(self):
canvas = self._draw_state()
return canvas.reshape((1, -1))
# 액션 수행하고 보상 리턴
def act(self, action):
self._update_state(action)
reward = self._get_reward()
game_over = self._is_over()
return self.observe(), reward, game_over
# 드로잉 그리드 초기화하기
def reset(self):
n = np.random.randint(0, self.grid_size-1, size=1)
m = np.random.randint(1, self.grid_size-2, size=1)
self.state = np.asarray([0, n, m])[np.newaxis]
# 경험 재실험 클래스 ---------------------------------------------------------
답글삭제class ExperienceReplay(object):
# 초기화 , 최대 메모리, 접근 메모리, 디스카운트
def __init__(self, max_memory=100, discount=.9):
self.max_memory = max_memory
self.memory = list()
self.discount = discount
# 신규 경험 저장
def remember(self, states, game_over):
# memory[i] = [[state_t, action_t, reward_t, state_t+1], game_over?]
self.memory.append([states, game_over])
if len(self.memory) > self.max_memory:
del self.memory[0]
# 신규 미니배치 생성 및 실행
def get_batch(self, model, batch_size=10):
len_memory = len(self.memory)
num_actions = model.output_shape[-1]
env_dim = self.memory[0][0][0].shape[1]
inputs = np.zeros((min(len_memory, batch_size), env_dim))
targets = np.zeros((inputs.shape[0], num_actions))
for i, idx in enumerate(np.random.randint(0, len_memory,
size=inputs.shape[0])):
state_t, action_t, reward_t, state_tp1 = self.memory[idx][0]
game_over = self.memory[idx][1]
inputs[i:i+1] = state_t
# There should be no target values for actions not taken.
# Thou shalt not correct actions not taken #deep
targets[i] = model.predict(state_t)[0]
Q_sa = np.max(model.predict(state_tp1)[0])
if game_over: # if game_over is True
targets[i, action_t] = reward_t
else:
# reward_t + gamma * max_a' Q(s', a')
targets[i, action_t] = reward_t + self.discount * Q_sa
return inputs, targets
# 메인 메써드
답글삭제if __name__ == "__main__":
# 메인 파라미터
epsilon = .1 # exploration
num_actions = 3 # [move_left, stay, move_right]
epoch = 1000
max_memory = 500
hidden_size = 100
batch_size = 50
grid_size = 10 # 게임 레졸루션
# CNN
model = Sequential()
# 히든 레이어 2층, 출력층
model.add(Dense(hidden_size, input_shape=(grid_size**2,), activation='relu'))
model.add(Dense(hidden_size, activation='relu'))
model.add(Dense(num_actions))
# 스토케스틱 그래디언트 디슨트, 민스퀘어 로쓰함수
model.compile(sgd(lr=.2), "mse")
# If you want to continue training from a previous model, just uncomment the line bellow
# model.load_weights("model.h5")
# Define environment/game
# 게임 환경
env = Catch(grid_size)
# Initialize experience replay object
# 경험 리플레이, 메모리 한도 내에서
exp_replay = ExperienceReplay(max_memory=max_memory)
# Train -------------- 학습
# 승리 카운트
win_cnt = 0
# 모든 에포크에 대해
for e in range(epoch):
loss = 0
env.reset()
game_over = False
# 초기상태 관찰
input_t = env.observe()
# 게임이 끝날때 까지
while not game_over:
# 상태 입력
input_tm1 = input_t
# 액션 선택
if np.random.rand() <= epsilon:
action = np.random.randint(0, num_actions, size=1)
else:
# 정책 예측
q = model.predict(input_tm1)
# 액션 판단
action = np.argmax(q[0])
# 액션 수행 및 보상 얻기
input_t, reward, game_over = env.act(action)
if reward == 1:
# 승리 카운트
win_cnt += 1
# store experience
# 플레이 경험 저장
exp_replay.remember([input_tm1, action, reward, input_t], game_over)
# adapt model
# 경험 리플레이
inputs, targets = exp_replay.get_batch(model, batch_size=batch_size)
# 로쓰 누적
loss += model.train_on_batch(inputs, targets)[0]
print("Epoch {:03d}/999 | Loss {:.4f} | Win count {}".format(e, loss, win_cnt))
# Save trained model weights and architecture, this will be used by the visualization code
# 가중치 갱신
model.save_weights("model.h5", overwrite=True)
# 저장 파일명
with open("model.json", "w") as outfile:
# 최종 모델 저장.
json.dump(model.to_json(), outfile)