사과 받기 강화학습 코드(Keras)

# 패키지 로딩
import json
import numpy as np
from keras.models import Sequential
from keras.layers.core import Dense
from keras.optimizers import sgd

댓글

  1. # 사과 받기 드로잉 클래스. ------------------------------------------------------
    class Catch(object):
    # 생성자
    def __init__(self, grid_size=10):
    self.grid_size = grid_size
    self.reset()
    # 액션후 다음상태 업데이트
    def _update_state(self, action):
    """
    Input: action and states
    Ouput: new states and reward
    """
    state = self.state
    if action == 0: # left
    action = -1
    elif action == 1: # stay
    action = 0
    else:
    action = 1 # right
    f0, f1, basket = state[0]
    new_basket = min(max(1, basket + action), self.grid_size-1)
    f0 += 1
    out = np.asarray([f0, f1, new_basket])
    out = out[np.newaxis]

    assert len(out.shape) == 2
    self.state = out
    # 상태 그리기, 사과와 바구니 위치
    def _draw_state(self):
    im_size = (self.grid_size,)*2
    state = self.state[0]
    canvas = np.zeros(im_size)
    canvas[state[0], state[1]] = 1 # draw fruit
    canvas[-1, state[2]-1:state[2] + 2] = 1 # draw basket
    return canvas
    # 보상 계산
    def _get_reward(self):
    fruit_row, fruit_col, basket = self.state[0] # 사과위치, 바구니 수평위치 취득
    if fruit_row == self.grid_size-1: # 사과가 바닥에 있고
    if abs(fruit_col - basket) <= 1: # 사과가 바구니에 닿으면 보상1
    return 1
    else:
    return -1
    else:
    return 0 # 사과가 떨어지고 있을땐 0
    # 끝났는지 여부
    def _is_over(self):
    if self.state[0, 0] == self.grid_size-1:
    return True
    else:
    return False
    # 묘화 상태 확인
    def observe(self):
    canvas = self._draw_state()
    return canvas.reshape((1, -1))
    # 액션 수행하고 보상 리턴
    def act(self, action):
    self._update_state(action)
    reward = self._get_reward()
    game_over = self._is_over()
    return self.observe(), reward, game_over
    # 드로잉 그리드 초기화하기
    def reset(self):
    n = np.random.randint(0, self.grid_size-1, size=1)
    m = np.random.randint(1, self.grid_size-2, size=1)
    self.state = np.asarray([0, n, m])[np.newaxis]

    답글삭제
  2. # 경험 재실험 클래스 ---------------------------------------------------------
    class ExperienceReplay(object):
    # 초기화 , 최대 메모리, 접근 메모리, 디스카운트
    def __init__(self, max_memory=100, discount=.9):
    self.max_memory = max_memory
    self.memory = list()
    self.discount = discount
    # 신규 경험 저장
    def remember(self, states, game_over):
    # memory[i] = [[state_t, action_t, reward_t, state_t+1], game_over?]
    self.memory.append([states, game_over])
    if len(self.memory) > self.max_memory:
    del self.memory[0]
    # 신규 미니배치 생성 및 실행
    def get_batch(self, model, batch_size=10):
    len_memory = len(self.memory)
    num_actions = model.output_shape[-1]
    env_dim = self.memory[0][0][0].shape[1]
    inputs = np.zeros((min(len_memory, batch_size), env_dim))
    targets = np.zeros((inputs.shape[0], num_actions))
    for i, idx in enumerate(np.random.randint(0, len_memory,
    size=inputs.shape[0])):
    state_t, action_t, reward_t, state_tp1 = self.memory[idx][0]
    game_over = self.memory[idx][1]

    inputs[i:i+1] = state_t
    # There should be no target values for actions not taken.
    # Thou shalt not correct actions not taken #deep
    targets[i] = model.predict(state_t)[0]
    Q_sa = np.max(model.predict(state_tp1)[0])
    if game_over: # if game_over is True
    targets[i, action_t] = reward_t
    else:
    # reward_t + gamma * max_a' Q(s', a')
    targets[i, action_t] = reward_t + self.discount * Q_sa
    return inputs, targets

    답글삭제
  3. # 메인 메써드
    if __name__ == "__main__":
    # 메인 파라미터
    epsilon = .1 # exploration
    num_actions = 3 # [move_left, stay, move_right]
    epoch = 1000
    max_memory = 500
    hidden_size = 100
    batch_size = 50
    grid_size = 10 # 게임 레졸루션

    # CNN
    model = Sequential()
    # 히든 레이어 2층, 출력층
    model.add(Dense(hidden_size, input_shape=(grid_size**2,), activation='relu'))
    model.add(Dense(hidden_size, activation='relu'))
    model.add(Dense(num_actions))
    # 스토케스틱 그래디언트 디슨트, 민스퀘어 로쓰함수
    model.compile(sgd(lr=.2), "mse")

    # If you want to continue training from a previous model, just uncomment the line bellow
    # model.load_weights("model.h5")

    # Define environment/game
    # 게임 환경
    env = Catch(grid_size)

    # Initialize experience replay object
    # 경험 리플레이, 메모리 한도 내에서
    exp_replay = ExperienceReplay(max_memory=max_memory)

    # Train -------------- 학습
    # 승리 카운트
    win_cnt = 0
    # 모든 에포크에 대해
    for e in range(epoch):
    loss = 0
    env.reset()
    game_over = False
    # 초기상태 관찰
    input_t = env.observe()
    # 게임이 끝날때 까지
    while not game_over:
    # 상태 입력
    input_tm1 = input_t
    # 액션 선택
    if np.random.rand() <= epsilon:
    action = np.random.randint(0, num_actions, size=1)
    else:
    # 정책 예측
    q = model.predict(input_tm1)
    # 액션 판단
    action = np.argmax(q[0])

    # 액션 수행 및 보상 얻기
    input_t, reward, game_over = env.act(action)
    if reward == 1:
    # 승리 카운트
    win_cnt += 1

    # store experience
    # 플레이 경험 저장
    exp_replay.remember([input_tm1, action, reward, input_t], game_over)

    # adapt model
    # 경험 리플레이
    inputs, targets = exp_replay.get_batch(model, batch_size=batch_size)
    # 로쓰 누적
    loss += model.train_on_batch(inputs, targets)[0]
    print("Epoch {:03d}/999 | Loss {:.4f} | Win count {}".format(e, loss, win_cnt))

    # Save trained model weights and architecture, this will be used by the visualization code
    # 가중치 갱신
    model.save_weights("model.h5", overwrite=True)
    # 저장 파일명
    with open("model.json", "w") as outfile:
    # 최종 모델 저장.
    json.dump(model.to_json(), outfile)

    답글삭제

댓글 쓰기

이 블로그의 인기 게시물

파이썬으로 Homomorphic Filtering 하기

파이썬으로 2D FFT/iFFT 하기: numpy 버전