+from enum import Enum
+from MAMEToolkit.emulator import Action
+class Actions(Enum):
+ # coins
+ insert_coin = Action(':COIN', 'Coin 1')
+ p1_start = Action(':IN1', '1 Player Start')
+ p2_start = Action(':IN1', '2 Players Start')
+ coinage = Action(':IN2', 'Display Coinage')
+ # lives
+ bonus_life = Action(':IN2', 'Bonus Life')
+ lives = Action(':IN2', 'Lives')
+ # p1 movement
+ p1_right = Action(':CONTP1', 'P1 Right')
+ p1_left = Action(':CONTP1', 'P1 Left')
+ p1_shoot = Action(':CONTP1', 'P1 Button 1')
+ #p2 movement
+ p2_right = Action(':CONTP2', 'P2 Right')
+ p2_left = Action(':CONTP2', 'P2 Left')
+ p2_shoot = Action(':CONTP2', 'P2 Button 1')
+import math
+from MAMEToolkit.emulator import Emulator
+from MAMEToolkit.emulator import Address
+from Actions import Actions
+from Steps import *
+def add_rewards(old_data, new_data):
+ for k in old_data.keys():
+ if "rewards" in k:
+ for player in old_data[k]:
+ new_data[k][player] += old_data[k][player]
+ return new_data
+def memory_addresses():
+ return {
+ "score": Address('0x20F8', 's16'),
+ "player_x": Address('0x201B', 's16'),
+ "player_y": Address('0x201A', 's16'),
+ "shot_status": Address('0x2025', 's16'),
+ "num_aliens": Address('0x2082', 's16'),
+ "alien_shot_x": Address('0x207C', 's16'),
+ "alien_shot_y": Address('0x207B', 's16'),
+ "shot_collision": Address('0x2061', 's16'),
+ "player_alive": Address('0x2068', 's16'),
+ "has_lives": Address('0x20E7', 's16'),
+ "ships_remain": Address('0x21FF', 's16')
+ }
+def index_to_action(action):
+ return {
+ 0: [Actions.p1_shoot],
+ 1: [Actions.p1_left],
+ 2: [Actions.p1_right],
+ 3: []
+ }[action]
+class Space_Invaders(object):
+ def __init__(self, env_id, roms_path, frame_ratio=3, frames_per_step=3):
+ self.frame_ratio = frame_ratio
+ self.frames_per_step = frames_per_step
+ self.emu = Emulator(env_id, roms_path, "invaders", memory_addresses(), frame_ratio=frame_ratio)
+ self.expected_score = {"score": 1000, "alive": 1}
+ self.game_over = False
+ def run_steps(self, steps):
+ for step in steps:
+ for i in range(step["wait"]):
+ self.emu.step([])
+ self.emu.step([action.value for action in step["actions"]])
+ def start(self):
+ self.run_steps(start_game(self.frame_ratio))
+ self.started = True
+ def check_game_over(self, data):
+ if data["has_lives"] < 257:
+ self.game_over = True
+ return data
+ def new_game(self):
+ self.run_steps(new_game(self.frame_ratio))
+ self.expected_score = {"scoreL": 1000, "alive": 1}
+ self.game_over = False
+ def gather_frames(self, actions):
+ data = self.sub_step(actions)
+ frames = [data["frame"]]
+ for i in range(self.frames_per_step - 1):
+ data = add_rewards(data, self.sub_step(actions))
+ frames.append(data["frame"])
+ data["frame"] = frames[0] if self.frames_per_step == 1 else frames
+ return data
+ def sub_step(self, actions):
+ data = self.emu.step([action.value for action in actions])
+ score = data["score"]
+ aliens = data["num_aliens"]
+ alive = data["player_alive"]
+ x_diff = (data["player_x"] - data["alien_shot_x"])
+ y_diff = (data["player_y"] - data["alien_shot_y"])
+ alien_shot_x = data["alien_shot_x"]
+ alien_shot_y = data["alien_shot_y"]
+ shot_status = (data["shot_status"])
+ has_lives = (data["has_lives"])
+ distance = math.sqrt( ((data["player_x"] - data["alien_shot_x"])**2) + ((data["player_y"] - data["alien_shot_y"])**2))
+ collide = data["shot_collision"]
+ rewards = {
+ "score": score,
+ "aliens": aliens,
+ "x_diff": x_diff,
+ "y_diff": y_diff,
+ "alien_shot_x": alien_shot_x,
+ "alien_shot_y": alien_shot_y,
+ "distance": distance,
+ "alive": alive,
+ "shot_status": shot_status,
+ "collide": collide,
+ "has_lives": has_lives
+ }
+ data["rewards"] = rewards
+ return data
+ def step(self, action):
+ if self.started:
+ if not self.game_over:
+ actions = []
+ actions += index_to_action(action)
+ data = self.gather_frames(actions)
+ data = self.check_game_over(data)
+ return data["frame"], data["rewards"], self.game_over
+ else:
+ raise EnvironmentError("Attempted to step while game not playing")
+ else:
+ raise EnvironmentError("Start must be called before stepping")
+ def close(self):
+ self.emu.close()
+from Actions import Actions
+def start_game(frame_ratio):
+ return [
+ {"wait": int(300/frame_ratio), "actions": [Actions.insert_coin]},
+ {"wait": int(60/frame_ratio), "actions": [Actions.p1_start]},
+ {"wait": int(60/frame_ratio), "actions": [Actions.p1_start]}]
+def new_game(frame_ratio):
+ return [
+ {"wait": int(600/frame_ratio), "actions": [Actions.insert_coin]},
+ {"wait": int(60/frame_ratio), "actions": [Actions.p1_start]},
+ {"wait": int(60/frame_ratio), "actions": [Actions.p1_start]}]
+from Actions import Actions
+import random
+import numpy as np
+import matplotlib.pyplot as plt
+import pickle
+import time
+from matplotlib import style
+from Space_Invaders import Space_Invaders
+from Actions import Actions
+from MAMEToolkit.emulator import Emulator
+from MAMEToolkit.emulator import Address
+from MAMEToolkit.emulator import Action
+from MAMEToolkit.emulator import list_actions
+roms_path = "/home/john/media/downloads/Transmission/MAME 0.220 ROMs (split)/"
+game_id = "invaders"
+#print(list_actions(roms_path, game_id))
+# env = Space_Invaders("env1", roms_path)
+# env.start()
+def add_action_to_observation(observation, action):
+ return np.append([action], observation)
+def initial_training():
+ env = Space_Invaders("env1", roms_path)
+ env.start()
+ episode_rewards = []
+ epsilon = 0.6
+ EPS_DECAY = 0.9998
+ SHOW_EVERY = 1000 # how often to play through env visually.
+ q_table = np.random.rand(5,4)
+ # start_q_table = "qtable-1608050271.pickle"
+ # with open(start_q_table, "rb") as f:
+ # q_table = pickle.load(f)
+ avg_reward = []
+ DISCOUNT = 0.95
+ games = 15
+ score = 0
+ reward = 0
+ iterations = 0
+ action = random.randint(0, 3)
+ score_list = []
+ while games > 0:
+ iterations = iterations + 1
+ frames, rewards, game_over = env.step(action)
+ episode_reward = 0
+ old_score = score
+ state_index = 2
+ score = rewards["score"]
+ aliens = rewards["aliens"]
+ alive = rewards["alive"]
+ shot_status = rewards["shot_status"]
+ if alive == 0:
+ state_index = 1
+ elif shot_status != 12303 and shot_status != 12298 and shot_status != 12294:
+ state_index = 4
+ elif shot_status == 12303 or shot_status == 12298:
+ state_index = 0
+ elif shot_status == 12294:
+ state_index = 3
+ else:
+ state_index = 2
+ obs = (state_index, action)
+ if game_over:
+ games = games - 1
+ score_list.append(score)
+ env.new_game()
+ else:
+ if np.random.random() > epsilon:
+ action = np.argmax(q_table[obs])
+ else:
+ action = np.random.randint(0, 3)
+ if state_index == 1:
+ reward = -DEATH_PENALTY
+ elif state_index == 4:
+ reward = MISS_PENALTY
+ elif state_index == 0:
+ reward = KILL_REWARD
+ elif state_index == 3:
+ reward = 0
+ else:
+ reward = -1
+ new_obs = (state_index, action)
+ max_future_q = np.max(q_table[new_obs])
+ current_q = q_table[obs]
+ if reward == KILL_REWARD:
+ new_q = KILL_REWARD
+ else:
+ new_q = (1 - LEARNING_RATE) * current_q + LEARNING_RATE * (reward + DISCOUNT * max_future_q)
+ episode_reward += reward
+ episode_rewards.append(episode_reward)
+ epsilon *= EPS_DECAY
+ avg_reward.append(sum(episode_rewards) / len(episode_rewards) )
+ moving_avg = np.convolve(episode_rewards, np.ones((iterations,))/iterations, mode='valid')
+ env.close()
+ print(q_table)
+ with open(f"qtable-{int(time.time())}.pickle", "wb") as f:
+ pickle.dump(q_table, f)
+ plt.plot([i for i in range(len(episode_rewards))], avg_reward)
+ plt.ylabel(f"Reward {SHOW_EVERY}ma")
+ plt.xlabel("episode #")
+if __name__ == "__main__":
+# # training_data = initial_training()
+ initial_training()
+# #'training', training_data)
+# train_net()
+# # test_net()