This post walks through how to build the foundation blocks for implementing a Neural Agent. We start with basic Agent that can take action from the space of valid actions but does not learn from the rewards returned by the environemnt.
For this task we will use a simple environment GridWorld (see below picture).
%%capture
%%bash
apt install xvfb
pip install gym
pip install gym-notebook-wrapper
Import the dependencies and Gym's GridWorld environment
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import gnwrapper
import gym
#collapse-hide
%%writefile gridworld.py
#!/usr/bin/env python
# GridWorld RL environment with image observations
# Chapter 1, TensorFlow 2 Reinforcement Learning Cookbook | Praveen Palanisamy
from collections import defaultdict
import copy
import sys
import gym
import numpy as np
# Grid cell state and color mapping
EMPTY = BLACK = 0
WALL = GRAY = 1
AGENT = BLUE = 2
BOMB = RED = 3
GOAL = GREEN = 4
# RGB color value table
COLOR_MAP = {
BLACK: [0.0, 0.0, 0.0],
GRAY: [0.5, 0.5, 0.5],
BLUE: [0.0, 0.0, 1.0],
RED: [1.0, 0.0, 0.0],
GREEN: [0.0, 1.0, 0.0],
}
# Action mapping
NOOP = 0
DOWN = 1
UP = 2
LEFT = 3
RIGHT = 4
class GridworldEnv(gym.Env):
def __init__(self, max_steps=100):
"""Initialize Gridworld
Args:
max_steps (int, optional): Max steps per episode. Defaults to 100.
"""
# Observations
self.grid_layout = """
1 1 1 1 1 1 1 1
1 2 0 0 0 0 0 1
1 0 1 1 1 0 0 1
1 0 1 0 1 0 0 1
1 0 1 4 1 0 0 1
1 0 3 0 0 0 0 1
1 0 0 0 0 0 0 1
1 1 1 1 1 1 1 1
"""
self.initial_grid_state = np.fromstring(self.grid_layout, dtype=int, sep=" ")
self.initial_grid_state = self.initial_grid_state.reshape(8, 8)
self.grid_state = copy.deepcopy(self.initial_grid_state)
self.observation_space = gym.spaces.Box(
low=0, high=6, shape=self.grid_state.shape
)
self.img_shape = [256, 256, 3]
self.metadata = {"render.modes": ["human"]}
# Actions
self.action_space = gym.spaces.Discrete(5)
self.actions = [NOOP, UP, DOWN, LEFT, RIGHT]
self.action_pos_dict = defaultdict(
lambda: [0, 0],
{
NOOP: [0, 0],
UP: [-1, 0],
DOWN: [1, 0],
LEFT: [0, -1],
RIGHT: [0, 1],
},
)
(self.agent_state, self.goal_state) = self.get_state()
self.step_num = 0 # To keep track of number of steps
self.max_steps = max_steps
self.done = False
self.info = {"status": "Live"}
self.viewer = None
def step(self, action):
"""Return next observation, reward, done , info"""
action = int(action)
reward = 0.0
next_state = (
self.agent_state[0] + self.action_pos_dict[action][0],
self.agent_state[1] + self.action_pos_dict[action][1],
)
next_state_invalid = (
next_state[0] < 0 or next_state[0] >= self.grid_state.shape[0]
) or (next_state[1] < 0 or next_state[1] >= self.grid_state.shape[1])
if next_state_invalid:
# Leave the agent state unchanged
next_state = self.agent_state
self.info["status"] = "Next state is invalid"
next_agent_state = self.grid_state[next_state[0], next_state[1]]
# Calculate reward
if next_agent_state == EMPTY:
# Move agent from previous state to the next state on the grid
self.info["status"] = "Agent moved to a new cell"
self.grid_state[next_state[0], next_state[1]] = AGENT
self.grid_state[self.agent_state[0], self.agent_state[1]] = EMPTY
self.agent_state = copy.deepcopy(next_state)
elif next_agent_state == WALL:
self.info["status"] = "Agent bumped into a wall"
reward = -0.1
# Terminal states
elif next_agent_state == GOAL:
self.info["status"] = "Agent reached the GOAL "
self.done = True
reward = 1
elif next_agent_state == BOMB:
self.info["status"] = "Agent stepped on a BOMB"
self.done = True
reward = -1
# elif next_agent_state == AGENT:
else:
# NOOP or next state is invalid
self.done = False
self.step_num += 1
# Check if max steps per episode has been reached
if self.step_num >= self.max_steps:
self.done = True
self.info["status"] = "Max steps reached"
if self.done:
done = True
terminal_state = copy.deepcopy(self.grid_state)
terminal_info = copy.deepcopy(self.info)
_ = self.reset()
return (terminal_state, reward, done, terminal_info)
return self.grid_state, reward, self.done, self.info
def reset(self):
self.grid_state = copy.deepcopy(self.initial_grid_state)
(
self.agent_state,
self.agent_goal_state,
) = self.get_state()
self.step_num = 0
self.done = False
self.info["status"] = "Live"
return self.grid_state
def get_state(self):
start_state = np.where(self.grid_state == AGENT)
goal_state = np.where(self.grid_state == GOAL)
start_or_goal_not_found = not (start_state[0] and goal_state[0])
if start_or_goal_not_found:
sys.exit(
"Start and/or Goal state not present in the Gridworld. "
"Check the Grid layout"
)
start_state = (start_state[0][0], start_state[1][0])
goal_state = (goal_state[0][0], goal_state[1][0])
return start_state, goal_state
def gridarray_to_image(self, img_shape=None):
if img_shape is None:
img_shape = self.img_shape
observation = np.random.randn(*img_shape) * 0.0
scale_x = int(observation.shape[0] / self.grid_state.shape[0])
scale_y = int(observation.shape[1] / self.grid_state.shape[1])
for i in range(self.grid_state.shape[0]):
for j in range(self.grid_state.shape[1]):
for k in range(3): # 3-channel RGB image
pixel_value = COLOR_MAP[self.grid_state[i, j]][k]
observation[
i * scale_x : (i + 1) * scale_x,
j * scale_y : (j + 1) * scale_y,
k,
] = pixel_value
return (255 * observation).astype(np.uint8)
def render(self, mode="human", close=False):
if close:
if self.viewer is not None:
self.viewer.close()
self.viewer = None
return
img = self.gridarray_to_image()
if mode == "rgb_array":
return img
elif mode == "human":
from gym.envs.classic_control import rendering
if self.viewer is None:
self.viewer = rendering.SimpleImageViewer()
self.viewer.imshow(img)
def close(self):
self.render(close=True)
@staticmethod
def get_action_meanings():
return ["NOOP", "DOWN", "UP", "LEFT", "RIGHT"]
if __name__ == "__main__":
env = GridworldEnv()
obs = env.reset()
done = False
step_num = 1
# Run one episode
while not done:
# Sample a random action from the action space
action = env.action_space.sample()
next_obs, reward, done, info = env.step(action)
print(f"step#:{step_num} reward:{reward} done:{done} info:{info}")
step_num += 1
env.render()
env.close()
#collapse-hide
from gym.envs.registration import register
register(
id="Gridworld-v0", entry_point="gridworld:GridworldEnv",
)
Agent
The agent is supposed to perform actions $a_{t}$ given the current environment state $s_{t-1}$. In return, the environment return a reward to the agent $r_t$. The agent is supposed to use the reward to learn from its previous mistakes, for simplicity the agent we will be building does not learn but still uses a Neural Network to choose it's actions.
We start with the Neural Network model that the agent will use to selet its actions. In this case, the model is implemented in the following Brain
class
class Brain(keras.Model):
def __init__(self, action_dim=5, input_shape=(1, 8 * 8), hidden_dim=32):
super(Brain, self).__init__()
self.proj = layers.Dense(hidden_dim, input_shape=input_shape, activation="relu")
self.logits = layers.Dense(action_dim)
def call(self, inputs):
x = tf.convert_to_tensor(inputs)
if len(x.shape) >= 2 and x.shape[0] != 1:
x = tf.reshape(x, (1, -1))
return self.logits(self.proj(x))
The model is very basic as you can see in the following summary, it consists of one hiddent layer and outputs one unit for every possible action.
env = gym.make('Gridworld-v0')
action_dim, input_shape = env.action_space.n, env.observation_space.shape
model = Brain(action_dim, input_shape)
model.build(input_shape)
model.summary()
The agent is implemented in the following Agent
class. It uses the model that we build earlier to take actions. Using tf.random.categorical the agent randomly picks an action from the model outputs.
class Agent(object):
def __init__(self, brain):
self.brain = brain
def policy(self, observations):
observations = observations.reshape(1, -1)
action_logits = self.brain.predict_on_batch(observations)
action = tf.random.categorical(tf.math.log(action_logits), num_samples=1)
return tf.squeeze(action, axis=1)
def get_action(self, observations):
return self.policy(observations)
Environment
The environement used to test our agent is the GridWorld. We will run multiple episode where the agent will use its Neural Network model to take actions and we accumulate the rewards returned by the environment. At the end of each episode we print the total rewards and number of steps/actions taken during this eposide.
def run_episode(agent, env):
obs, episode_reward, done, step_num = env.reset(), 0.0, False, 0
while not done:
action = agent.get_action(obs)
obs, reward, done, info = env.step(action)
episode_reward += reward
step_num += 1
env.render()
return step_num, episode_reward, done, info
Here we setup the environemnt and record the actions taken and disply each eposide.
env = gnwrapper.LoopAnimation(gym.make('Gridworld-v0'))
action_dim, input_shape = env.action_space.n, env.observation_space.shape
brain = Brain(action_dim, input_shape)
agent = Agent(brain)
for episode in range(1):
steps, episode_reward, done, info = run_episode(agent, env)
print(f"EpReward:{episode_reward:.2f} steps:{steps} done:{done} info:{info}")
env.reset()
env.display()
That's all folks
I hope you enjoyed this article, feel free to leave a comment or reach out on twitter @bachiirc