diff --git a/deep_q_network.py b/deep_q_network.py index 1294f96..24567e1 100755 --- a/deep_q_network.py +++ b/deep_q_network.py @@ -1,39 +1,50 @@ #!/usr/bin/env python from __future__ import print_function +from collections import deque +import numpy as np +import random +import game.wrapped_flappy_bird as game import tensorflow as tf +import tensorflow.compat.v1 as tf import cv2 import sys + +tf.compat.v1.disable_eager_execution() + + sys.path.append("game/") -import wrapped_flappy_bird as game -import random -import numpy as np -from collections import deque -GAME = 'bird' # the name of the game being played for log files -ACTIONS = 2 # number of valid actions -GAMMA = 0.99 # decay rate of past observations -OBSERVE = 100000. # timesteps to observe before training -EXPLORE = 2000000. # frames over which to anneal epsilon -FINAL_EPSILON = 0.0001 # final value of epsilon -INITIAL_EPSILON = 0.0001 # starting value of epsilon -REPLAY_MEMORY = 50000 # number of previous transitions to remember -BATCH = 32 # size of minibatch +GAME = "bird" # the name of the game being played for log files +ACTIONS = 2 # number of valid actions +GAMMA = 0.99 # decay rate of past observations +OBSERVE = 100000.0 # timesteps to observe before training +EXPLORE = 2000000.0 # frames over which to anneal epsilon +FINAL_EPSILON = 0.0001 # final value of epsilon +INITIAL_EPSILON = 0.0001 # starting value of epsilon +REPLAY_MEMORY = 50000 # number of previous transitions to remember +BATCH = 32 # size of minibatch FRAME_PER_ACTION = 1 + def weight_variable(shape): - initial = tf.truncated_normal(shape, stddev = 0.01) + # initial = tf.truncated_normal(shape, stddev=0.01) + initial = tf.random.truncated_normal(shape, stddev=0.01) return tf.Variable(initial) + def bias_variable(shape): - initial = tf.constant(0.01, shape = shape) + initial = tf.constant(0.01, shape=shape) return tf.Variable(initial) + def conv2d(x, W, stride): - return tf.nn.conv2d(x, W, strides = [1, stride, stride, 1], padding = "SAME") + return tf.nn.conv2d(x, W, strides=[1, stride, stride, 1], padding="SAME") + def max_pool_2x2(x): - return tf.nn.max_pool(x, ksize = [1, 2, 2, 1], strides = [1, 2, 2, 1], padding = "SAME") + return tf.nn.max_pool(x, ksize=[1, 2, 2, 1], strides=[1, 2, 2, 1], padding="SAME") + def createNetwork(): # network weights @@ -53,19 +64,19 @@ def createNetwork(): b_fc2 = bias_variable([ACTIONS]) # input layer - s = tf.placeholder("float", [None, 80, 80, 4]) + s = tf.compat.v1.placeholder("float", [None, 80, 80, 4]) # hidden layers h_conv1 = tf.nn.relu(conv2d(s, W_conv1, 4) + b_conv1) h_pool1 = max_pool_2x2(h_conv1) h_conv2 = tf.nn.relu(conv2d(h_pool1, W_conv2, 2) + b_conv2) - #h_pool2 = max_pool_2x2(h_conv2) + # h_pool2 = max_pool_2x2(h_conv2) h_conv3 = tf.nn.relu(conv2d(h_conv2, W_conv3, 1) + b_conv3) - #h_pool3 = max_pool_2x2(h_conv3) + # h_pool3 = max_pool_2x2(h_conv3) - #h_pool3_flat = tf.reshape(h_pool3, [-1, 256]) + # h_pool3_flat = tf.reshape(h_pool3, [-1, 256]) h_conv3_flat = tf.reshape(h_conv3, [-1, 1600]) h_fc1 = tf.nn.relu(tf.matmul(h_conv3_flat, W_fc1) + b_fc1) @@ -75,11 +86,13 @@ def createNetwork(): return s, readout, h_fc1 + def trainNetwork(s, readout, h_fc1, sess): # define the cost function - a = tf.placeholder("float", [None, ACTIONS]) - y = tf.placeholder("float", [None]) - readout_action = tf.reduce_sum(tf.multiply(readout, a), reduction_indices=1) + a = tf.compat.v1.placeholder("float", [None, ACTIONS]) + y = tf.compat.v1.placeholder("float", [None]) + readout_action = tf.reduce_sum( + tf.multiply(readout, a), reduction_indices=1) cost = tf.reduce_mean(tf.square(y - readout_action)) train_step = tf.train.AdamOptimizer(1e-6).minimize(cost) @@ -90,15 +103,15 @@ def trainNetwork(s, readout, h_fc1, sess): D = deque() # printing - a_file = open("logs_" + GAME + "/readout.txt", 'w') - h_file = open("logs_" + GAME + "/hidden.txt", 'w') + a_file = open("logs_" + GAME + "/readout.txt", "w") + h_file = open("logs_" + GAME + "/hidden.txt", "w") # get the first state by doing nothing and preprocess the image to 80x80x4 do_nothing = np.zeros(ACTIONS) do_nothing[0] = 1 x_t, r_0, terminal = game_state.frame_step(do_nothing) x_t = cv2.cvtColor(cv2.resize(x_t, (80, 80)), cv2.COLOR_BGR2GRAY) - ret, x_t = cv2.threshold(x_t,1,255,cv2.THRESH_BINARY) + ret, x_t = cv2.threshold(x_t, 1, 255, cv2.THRESH_BINARY) s_t = np.stack((x_t, x_t, x_t, x_t), axis=2) # saving and loading networks @@ -116,7 +129,7 @@ def trainNetwork(s, readout, h_fc1, sess): t = 0 while "flappy bird" != "angry bird": # choose an action epsilon greedily - readout_t = readout.eval(feed_dict={s : [s_t]})[0] + readout_t = readout.eval(feed_dict={s: [s_t]})[0] a_t = np.zeros([ACTIONS]) action_index = 0 if t % FRAME_PER_ACTION == 0: @@ -128,7 +141,7 @@ def trainNetwork(s, readout, h_fc1, sess): action_index = np.argmax(readout_t) a_t[action_index] = 1 else: - a_t[0] = 1 # do nothing + a_t[0] = 1 # do nothing # scale down epsilon if epsilon > FINAL_EPSILON and t > OBSERVE: @@ -136,10 +149,11 @@ def trainNetwork(s, readout, h_fc1, sess): # run the selected action and observe next state and reward x_t1_colored, r_t, terminal = game_state.frame_step(a_t) - x_t1 = cv2.cvtColor(cv2.resize(x_t1_colored, (80, 80)), cv2.COLOR_BGR2GRAY) + x_t1 = cv2.cvtColor(cv2.resize( + x_t1_colored, (80, 80)), cv2.COLOR_BGR2GRAY) ret, x_t1 = cv2.threshold(x_t1, 1, 255, cv2.THRESH_BINARY) x_t1 = np.reshape(x_t1, (80, 80, 1)) - #s_t1 = np.append(x_t1, s_t[:,:,1:], axis = 2) + # s_t1 = np.append(x_t1, s_t[:,:,1:], axis = 2) s_t1 = np.append(x_t1, s_t[:, :, :3], axis=2) # store the transition in D @@ -159,21 +173,18 @@ def trainNetwork(s, readout, h_fc1, sess): s_j1_batch = [d[3] for d in minibatch] y_batch = [] - readout_j1_batch = readout.eval(feed_dict = {s : s_j1_batch}) + readout_j1_batch = readout.eval(feed_dict={s: s_j1_batch}) for i in range(0, len(minibatch)): terminal = minibatch[i][4] # if terminal, only equals reward if terminal: y_batch.append(r_batch[i]) else: - y_batch.append(r_batch[i] + GAMMA * np.max(readout_j1_batch[i])) + y_batch.append(r_batch[i] + GAMMA * + np.max(readout_j1_batch[i])) # perform gradient step - train_step.run(feed_dict = { - y : y_batch, - a : a_batch, - s : s_j_batch} - ) + train_step.run(feed_dict={y: y_batch, a: a_batch, s: s_j_batch}) # update the old values s_t = s_t1 @@ -181,7 +192,7 @@ def trainNetwork(s, readout, h_fc1, sess): # save progress every 10000 iterations if t % 10000 == 0: - saver.save(sess, 'saved_networks/' + GAME + '-dqn', global_step = t) + saver.save(sess, "saved_networks/" + GAME + "-dqn", global_step=t) # print info state = "" @@ -192,24 +203,38 @@ def trainNetwork(s, readout, h_fc1, sess): else: state = "train" - print("TIMESTEP", t, "/ STATE", state, \ - "/ EPSILON", epsilon, "/ ACTION", action_index, "/ REWARD", r_t, \ - "/ Q_MAX %e" % np.max(readout_t)) + print( + "TIMESTEP", + t, + "/ STATE", + state, + "/ EPSILON", + epsilon, + "/ ACTION", + action_index, + "/ REWARD", + r_t, + "/ Q_MAX %e" % np.max(readout_t), + ) # write info to files - ''' + """ if t % 10000 <= 100: a_file.write(",".join([str(x) for x in readout_t]) + '\n') h_file.write(",".join([str(x) for x in h_fc1.eval(feed_dict={s:[s_t]})[0]]) + '\n') cv2.imwrite("logs_tetris/frame" + str(t) + ".png", x_t1) - ''' + """ + def playGame(): - sess = tf.InteractiveSession() + sess = tf.compat.v1.InteractiveSession() + # sess = tf.InteractiveSession() s, readout, h_fc1 = createNetwork() trainNetwork(s, readout, h_fc1, sess) + def main(): playGame() + if __name__ == "__main__": main() diff --git a/game/__init__.py b/game/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/game/wrapped_flappy_bird.py b/game/wrapped_flappy_bird.py index 24a102e..51e5e53 100644 --- a/game/wrapped_flappy_bird.py +++ b/game/wrapped_flappy_bird.py @@ -2,29 +2,29 @@ import sys import random import pygame -import flappy_bird_utils +import game.flappy_bird_utils as flappy_bird_utils import pygame.surfarray as surfarray from pygame.locals import * from itertools import cycle FPS = 30 -SCREENWIDTH = 288 +SCREENWIDTH = 288 SCREENHEIGHT = 512 pygame.init() FPSCLOCK = pygame.time.Clock() SCREEN = pygame.display.set_mode((SCREENWIDTH, SCREENHEIGHT)) -pygame.display.set_caption('Flappy Bird') +pygame.display.set_caption("Flappy Bird") IMAGES, SOUNDS, HITMASKS = flappy_bird_utils.load() -PIPEGAPSIZE = 100 # gap between upper and lower part of pipe +PIPEGAPSIZE = 100 # gap between upper and lower part of pipe BASEY = SCREENHEIGHT * 0.79 -PLAYER_WIDTH = IMAGES['player'][0].get_width() -PLAYER_HEIGHT = IMAGES['player'][0].get_height() -PIPE_WIDTH = IMAGES['pipe'][0].get_width() -PIPE_HEIGHT = IMAGES['pipe'][0].get_height() -BACKGROUND_WIDTH = IMAGES['background'].get_width() +PLAYER_WIDTH = IMAGES["player"][0].get_width() +PLAYER_HEIGHT = IMAGES["player"][0].get_height() +PIPE_WIDTH = IMAGES["pipe"][0].get_width() +PIPE_HEIGHT = IMAGES["pipe"][0].get_height() +BACKGROUND_WIDTH = IMAGES["background"].get_width() PLAYER_INDEX_GEN = cycle([0, 1, 2, 1]) @@ -35,27 +35,27 @@ def __init__(self): self.playerx = int(SCREENWIDTH * 0.2) self.playery = int((SCREENHEIGHT - PLAYER_HEIGHT) / 2) self.basex = 0 - self.baseShift = IMAGES['base'].get_width() - BACKGROUND_WIDTH + self.baseShift = IMAGES["base"].get_width() - BACKGROUND_WIDTH newPipe1 = getRandomPipe() newPipe2 = getRandomPipe() self.upperPipes = [ - {'x': SCREENWIDTH, 'y': newPipe1[0]['y']}, - {'x': SCREENWIDTH + (SCREENWIDTH / 2), 'y': newPipe2[0]['y']}, + {"x": SCREENWIDTH, "y": newPipe1[0]["y"]}, + {"x": SCREENWIDTH + (SCREENWIDTH / 2), "y": newPipe2[0]["y"]}, ] self.lowerPipes = [ - {'x': SCREENWIDTH, 'y': newPipe1[1]['y']}, - {'x': SCREENWIDTH + (SCREENWIDTH / 2), 'y': newPipe2[1]['y']}, + {"x": SCREENWIDTH, "y": newPipe1[1]["y"]}, + {"x": SCREENWIDTH + (SCREENWIDTH / 2), "y": newPipe2[1]["y"]}, ] # player velocity, max velocity, downward accleration, accleration on flap self.pipeVelX = -4 - self.playerVelY = 0 # player's velocity along Y, default same as playerFlapped - self.playerMaxVelY = 10 # max vel along Y, max descend speed - self.playerMinVelY = -8 # min vel along Y, max ascend speed - self.playerAccY = 1 # players downward accleration - self.playerFlapAcc = -9 # players speed on flapping - self.playerFlapped = False # True when player flaps + self.playerVelY = 0 # player's velocity along Y, default same as playerFlapped + self.playerMaxVelY = 10 # max vel along Y, max descend speed + self.playerMinVelY = -8 # min vel along Y, max ascend speed + self.playerAccY = 1 # players downward accleration + self.playerFlapAcc = -9 # players speed on flapping + self.playerFlapped = False # True when player flaps def frame_step(self, input_actions): pygame.event.pump() @@ -64,7 +64,7 @@ def frame_step(self, input_actions): terminal = False if sum(input_actions) != 1: - raise ValueError('Multiple input actions!') + raise ValueError("Multiple input actions!") # input_actions[0] == 1: do nothing # input_actions[1] == 1: flap the bird @@ -72,15 +72,15 @@ def frame_step(self, input_actions): if self.playery > -2 * PLAYER_HEIGHT: self.playerVelY = self.playerFlapAcc self.playerFlapped = True - #SOUNDS['wing'].play() + # SOUNDS['wing'].play() # check for score playerMidPos = self.playerx + PLAYER_WIDTH / 2 for pipe in self.upperPipes: - pipeMidPos = pipe['x'] + PIPE_WIDTH / 2 + pipeMidPos = pipe["x"] + PIPE_WIDTH / 2 if pipeMidPos <= playerMidPos < pipeMidPos + 4: self.score += 1 - #SOUNDS['point'].play() + # SOUNDS['point'].play() reward = 1 # playerIndex basex change @@ -94,120 +94,129 @@ def frame_step(self, input_actions): self.playerVelY += self.playerAccY if self.playerFlapped: self.playerFlapped = False - self.playery += min(self.playerVelY, BASEY - self.playery - PLAYER_HEIGHT) + self.playery += min(self.playerVelY, BASEY - + self.playery - PLAYER_HEIGHT) if self.playery < 0: self.playery = 0 # move pipes to left for uPipe, lPipe in zip(self.upperPipes, self.lowerPipes): - uPipe['x'] += self.pipeVelX - lPipe['x'] += self.pipeVelX + uPipe["x"] += self.pipeVelX + lPipe["x"] += self.pipeVelX # add new pipe when first pipe is about to touch left of screen - if 0 < self.upperPipes[0]['x'] < 5: + if 0 < self.upperPipes[0]["x"] < 5: newPipe = getRandomPipe() self.upperPipes.append(newPipe[0]) self.lowerPipes.append(newPipe[1]) # remove first pipe if its out of the screen - if self.upperPipes[0]['x'] < -PIPE_WIDTH: + if self.upperPipes[0]["x"] < -PIPE_WIDTH: self.upperPipes.pop(0) self.lowerPipes.pop(0) # check if crash here - isCrash= checkCrash({'x': self.playerx, 'y': self.playery, - 'index': self.playerIndex}, - self.upperPipes, self.lowerPipes) + isCrash = checkCrash( + {"x": self.playerx, "y": self.playery, "index": self.playerIndex}, + self.upperPipes, + self.lowerPipes, + ) if isCrash: - #SOUNDS['hit'].play() - #SOUNDS['die'].play() + # SOUNDS['hit'].play() + # SOUNDS['die'].play() terminal = True self.__init__() reward = -1 # draw sprites - SCREEN.blit(IMAGES['background'], (0,0)) + SCREEN.blit(IMAGES["background"], (0, 0)) for uPipe, lPipe in zip(self.upperPipes, self.lowerPipes): - SCREEN.blit(IMAGES['pipe'][0], (uPipe['x'], uPipe['y'])) - SCREEN.blit(IMAGES['pipe'][1], (lPipe['x'], lPipe['y'])) + SCREEN.blit(IMAGES["pipe"][0], (uPipe["x"], uPipe["y"])) + SCREEN.blit(IMAGES["pipe"][1], (lPipe["x"], lPipe["y"])) - SCREEN.blit(IMAGES['base'], (self.basex, BASEY)) + SCREEN.blit(IMAGES["base"], (self.basex, BASEY)) # print score so player overlaps the score # showScore(self.score) - SCREEN.blit(IMAGES['player'][self.playerIndex], + SCREEN.blit(IMAGES["player"][self.playerIndex], (self.playerx, self.playery)) image_data = pygame.surfarray.array3d(pygame.display.get_surface()) pygame.display.update() FPSCLOCK.tick(FPS) - #print self.upperPipes[0]['y'] + PIPE_HEIGHT - int(BASEY * 0.2) + # print self.upperPipes[0]['y'] + PIPE_HEIGHT - int(BASEY * 0.2) return image_data, reward, terminal + def getRandomPipe(): """returns a randomly generated pipe""" # y of gap between upper and lower pipe gapYs = [20, 30, 40, 50, 60, 70, 80, 90] - index = random.randint(0, len(gapYs)-1) + index = random.randint(0, len(gapYs) - 1) gapY = gapYs[index] gapY += int(BASEY * 0.2) pipeX = SCREENWIDTH + 10 return [ - {'x': pipeX, 'y': gapY - PIPE_HEIGHT}, # upper pipe - {'x': pipeX, 'y': gapY + PIPEGAPSIZE}, # lower pipe + {"x": pipeX, "y": gapY - PIPE_HEIGHT}, # upper pipe + {"x": pipeX, "y": gapY + PIPEGAPSIZE}, # lower pipe ] def showScore(score): """displays score in center of screen""" scoreDigits = [int(x) for x in list(str(score))] - totalWidth = 0 # total width of all numbers to be printed + totalWidth = 0 # total width of all numbers to be printed for digit in scoreDigits: - totalWidth += IMAGES['numbers'][digit].get_width() + totalWidth += IMAGES["numbers"][digit].get_width() Xoffset = (SCREENWIDTH - totalWidth) / 2 for digit in scoreDigits: - SCREEN.blit(IMAGES['numbers'][digit], (Xoffset, SCREENHEIGHT * 0.1)) - Xoffset += IMAGES['numbers'][digit].get_width() + SCREEN.blit(IMAGES["numbers"][digit], (Xoffset, SCREENHEIGHT * 0.1)) + Xoffset += IMAGES["numbers"][digit].get_width() def checkCrash(player, upperPipes, lowerPipes): """returns True if player collders with base or pipes.""" - pi = player['index'] - player['w'] = IMAGES['player'][0].get_width() - player['h'] = IMAGES['player'][0].get_height() + pi = player["index"] + player["w"] = IMAGES["player"][0].get_width() + player["h"] = IMAGES["player"][0].get_height() # if player crashes into ground - if player['y'] + player['h'] >= BASEY - 1: + if player["y"] + player["h"] >= BASEY - 1: return True else: - playerRect = pygame.Rect(player['x'], player['y'], - player['w'], player['h']) + playerRect = pygame.Rect( + player["x"], player["y"], player["w"], player["h"]) for uPipe, lPipe in zip(upperPipes, lowerPipes): # upper and lower pipe rects - uPipeRect = pygame.Rect(uPipe['x'], uPipe['y'], PIPE_WIDTH, PIPE_HEIGHT) - lPipeRect = pygame.Rect(lPipe['x'], lPipe['y'], PIPE_WIDTH, PIPE_HEIGHT) + uPipeRect = pygame.Rect( + uPipe["x"], uPipe["y"], PIPE_WIDTH, PIPE_HEIGHT) + lPipeRect = pygame.Rect( + lPipe["x"], lPipe["y"], PIPE_WIDTH, PIPE_HEIGHT) # player and upper/lower pipe hitmasks - pHitMask = HITMASKS['player'][pi] - uHitmask = HITMASKS['pipe'][0] - lHitmask = HITMASKS['pipe'][1] + pHitMask = HITMASKS["player"][pi] + uHitmask = HITMASKS["pipe"][0] + lHitmask = HITMASKS["pipe"][1] # if bird collided with upipe or lpipe - uCollide = pixelCollision(playerRect, uPipeRect, pHitMask, uHitmask) - lCollide = pixelCollision(playerRect, lPipeRect, pHitMask, lHitmask) + uCollide = pixelCollision( + playerRect, uPipeRect, pHitMask, uHitmask) + lCollide = pixelCollision( + playerRect, lPipeRect, pHitMask, lHitmask) if uCollide or lCollide: return True return False + def pixelCollision(rect1, rect2, hitmask1, hitmask2): """Checks if two objects collide and not just their rects""" rect = rect1.clip(rect2) @@ -220,6 +229,6 @@ def pixelCollision(rect1, rect2, hitmask1, hitmask2): for x in range(rect.width): for y in range(rect.height): - if hitmask1[x1+x][y1+y] and hitmask2[x2+x][y2+y]: + if hitmask1[x1 + x][y1 + y] and hitmask2[x2 + x][y2 + y]: return True return False diff --git a/saved_networks/checkpoint b/saved_networks/checkpoint index 9f6d6fb..6f8d697 100644 --- a/saved_networks/checkpoint +++ b/saved_networks/checkpoint @@ -1,6 +1,3 @@ -model_checkpoint_path: "bird-dqn-2920000" -all_model_checkpoint_paths: "bird-dqn-2880000" -all_model_checkpoint_paths: "bird-dqn-2890000" -all_model_checkpoint_paths: "bird-dqn-2900000" -all_model_checkpoint_paths: "bird-dqn-2910000" -all_model_checkpoint_paths: "bird-dqn-2920000" +model_checkpoint_path: "bird-dqn-20000" +all_model_checkpoint_paths: "bird-dqn-10000" +all_model_checkpoint_paths: "bird-dqn-20000"