This post walks through how to build the foundation blocks for implementing a Neural Agent. We start with basic Agent that can take action from the space of valid actions but does not learn from the rewards returned by the environemnt.

For this task we will use a simple environment GridWorld (see below picture).

image.png

Setup

First, we need to install OpenAI Gym and the dependencies needed to run Gym on a notebook.

%%capture
%%bash

apt install xvfb
pip install gym
pip install gym-notebook-wrapper

Import the dependencies and Gym's GridWorld environment

import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import gnwrapper
import gym

#collapse-hide
%%writefile gridworld.py

#!/usr/bin/env python
# GridWorld RL environment with image observations
# Chapter 1, TensorFlow 2 Reinforcement Learning Cookbook | Praveen Palanisamy
from collections import defaultdict
import copy
import sys

import gym
import numpy as np

# Grid cell state and color mapping
EMPTY = BLACK = 0
WALL = GRAY = 1
AGENT = BLUE = 2
BOMB = RED = 3
GOAL = GREEN = 4

# RGB color value table
COLOR_MAP = {
    BLACK: [0.0, 0.0, 0.0],
    GRAY: [0.5, 0.5, 0.5],
    BLUE: [0.0, 0.0, 1.0],
    RED: [1.0, 0.0, 0.0],
    GREEN: [0.0, 1.0, 0.0],
}

# Action mapping
NOOP = 0
DOWN = 1
UP = 2
LEFT = 3
RIGHT = 4


class GridworldEnv(gym.Env):
    def __init__(self, max_steps=100):
        """Initialize Gridworld
        Args:
            max_steps (int, optional): Max steps per episode. Defaults to 100.
        """
        # Observations
        self.grid_layout = """
        1 1 1 1 1 1 1 1
        1 2 0 0 0 0 0 1
        1 0 1 1 1 0 0 1
        1 0 1 0 1 0 0 1
        1 0 1 4 1 0 0 1
        1 0 3 0 0 0 0 1
        1 0 0 0 0 0 0 1
        1 1 1 1 1 1 1 1
        """
        self.initial_grid_state = np.fromstring(self.grid_layout, dtype=int, sep=" ")
        self.initial_grid_state = self.initial_grid_state.reshape(8, 8)
        self.grid_state = copy.deepcopy(self.initial_grid_state)
        self.observation_space = gym.spaces.Box(
            low=0, high=6, shape=self.grid_state.shape
        )
        self.img_shape = [256, 256, 3]
        self.metadata = {"render.modes": ["human"]}
        # Actions
        self.action_space = gym.spaces.Discrete(5)
        self.actions = [NOOP, UP, DOWN, LEFT, RIGHT]
        self.action_pos_dict = defaultdict(
            lambda: [0, 0],
            {
                NOOP: [0, 0],
                UP: [-1, 0],
                DOWN: [1, 0],
                LEFT: [0, -1],
                RIGHT: [0, 1],
            },
        )
        (self.agent_state, self.goal_state) = self.get_state()
        self.step_num = 0  # To keep track of number of steps
        self.max_steps = max_steps
        self.done = False
        self.info = {"status": "Live"}
        self.viewer = None

    def step(self, action):
        """Return next observation, reward, done , info"""
        action = int(action)
        reward = 0.0

        next_state = (
            self.agent_state[0] + self.action_pos_dict[action][0],
            self.agent_state[1] + self.action_pos_dict[action][1],
        )

        next_state_invalid = (
            next_state[0] < 0 or next_state[0] >= self.grid_state.shape[0]
        ) or (next_state[1] < 0 or next_state[1] >= self.grid_state.shape[1])
        if next_state_invalid:
            # Leave the agent state unchanged
            next_state = self.agent_state
            self.info["status"] = "Next state is invalid"

        next_agent_state = self.grid_state[next_state[0], next_state[1]]

        # Calculate reward
        if next_agent_state == EMPTY:
            # Move agent from previous state to the next state on the grid
            self.info["status"] = "Agent moved to a new cell"
            self.grid_state[next_state[0], next_state[1]] = AGENT
            self.grid_state[self.agent_state[0], self.agent_state[1]] = EMPTY
            self.agent_state = copy.deepcopy(next_state)

        elif next_agent_state == WALL:
            self.info["status"] = "Agent bumped into a wall"
            reward = -0.1
        # Terminal states
        elif next_agent_state == GOAL:
            self.info["status"] = "Agent reached the GOAL "
            self.done = True
            reward = 1
        elif next_agent_state == BOMB:
            self.info["status"] = "Agent stepped on a BOMB"
            self.done = True
            reward = -1
        # elif next_agent_state == AGENT:
        else:
            # NOOP or next state is invalid
            self.done = False

        self.step_num += 1

        # Check if max steps per episode has been reached
        if self.step_num >= self.max_steps:
            self.done = True
            self.info["status"] = "Max steps reached"

        if self.done:
            done = True
            terminal_state = copy.deepcopy(self.grid_state)
            terminal_info = copy.deepcopy(self.info)
            _ = self.reset()
            return (terminal_state, reward, done, terminal_info)

        return self.grid_state, reward, self.done, self.info

    def reset(self):
        self.grid_state = copy.deepcopy(self.initial_grid_state)
        (
            self.agent_state,
            self.agent_goal_state,
        ) = self.get_state()
        self.step_num = 0
        self.done = False
        self.info["status"] = "Live"
        return self.grid_state

    def get_state(self):
        start_state = np.where(self.grid_state == AGENT)
        goal_state = np.where(self.grid_state == GOAL)

        start_or_goal_not_found = not (start_state[0] and goal_state[0])
        if start_or_goal_not_found:
            sys.exit(
                "Start and/or Goal state not present in the Gridworld. "
                "Check the Grid layout"
            )
        start_state = (start_state[0][0], start_state[1][0])
        goal_state = (goal_state[0][0], goal_state[1][0])

        return start_state, goal_state

    def gridarray_to_image(self, img_shape=None):
        if img_shape is None:
            img_shape = self.img_shape
        observation = np.random.randn(*img_shape) * 0.0
        scale_x = int(observation.shape[0] / self.grid_state.shape[0])
        scale_y = int(observation.shape[1] / self.grid_state.shape[1])
        for i in range(self.grid_state.shape[0]):
            for j in range(self.grid_state.shape[1]):
                for k in range(3):  # 3-channel RGB image
                    pixel_value = COLOR_MAP[self.grid_state[i, j]][k]
                    observation[
                        i * scale_x : (i + 1) * scale_x,
                        j * scale_y : (j + 1) * scale_y,
                        k,
                    ] = pixel_value
        return (255 * observation).astype(np.uint8)

    def render(self, mode="human", close=False):
        if close:
            if self.viewer is not None:
                self.viewer.close()
                self.viewer = None
            return

        img = self.gridarray_to_image()
        if mode == "rgb_array":
            return img
        elif mode == "human":
            from gym.envs.classic_control import rendering

            if self.viewer is None:
                self.viewer = rendering.SimpleImageViewer()
            self.viewer.imshow(img)

    def close(self):
        self.render(close=True)

    @staticmethod
    def get_action_meanings():
        return ["NOOP", "DOWN", "UP", "LEFT", "RIGHT"]


if __name__ == "__main__":
    env = GridworldEnv()
    obs = env.reset()
    done = False
    step_num = 1
    # Run one episode
    while not done:
        # Sample a random action from the action space
        action = env.action_space.sample()
        next_obs, reward, done, info = env.step(action)
        print(f"step#:{step_num} reward:{reward} done:{done} info:{info}")
        step_num += 1
        env.render()
    env.close()

Writing gridworld.py

#collapse-hide
from gym.envs.registration import register

register(
    id="Gridworld-v0", entry_point="gridworld:GridworldEnv",
)

Agent

The agent is supposed to perform actions $a_{t}$ given the current environment state $s_{t-1}$. In return, the environment return a reward to the agent $r_t$. The agent is supposed to use the reward to learn from its previous mistakes, for simplicity the agent we will be building does not learn but still uses a Neural Network to choose it's actions.

image.png

We start with the Neural Network model that the agent will use to selet its actions. In this case, the model is implemented in the following Brain class

class Brain(keras.Model):
    def __init__(self, action_dim=5, input_shape=(1, 8 * 8), hidden_dim=32):
        super(Brain, self).__init__()
        self.proj = layers.Dense(hidden_dim, input_shape=input_shape, activation="relu")
        self.logits = layers.Dense(action_dim)

    def call(self, inputs):
        x = tf.convert_to_tensor(inputs)
        if len(x.shape) >= 2 and x.shape[0] != 1:
            x = tf.reshape(x, (1, -1))
        return self.logits(self.proj(x))

The model is very basic as you can see in the following summary, it consists of one hiddent layer and outputs one unit for every possible action.

env = gym.make('Gridworld-v0')
action_dim, input_shape = env.action_space.n, env.observation_space.shape
model = Brain(action_dim, input_shape)
model.build(input_shape)
model.summary()
Model: "brain_2"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
=================================================================
 dense_4 (Dense)             multiple                  2080      
                                                                 
 dense_5 (Dense)             multiple                  165       
                                                                 
=================================================================
Total params: 2,245
Trainable params: 2,245
Non-trainable params: 0
_________________________________________________________________

The agent is implemented in the following Agent class. It uses the model that we build earlier to take actions. Using tf.random.categorical the agent randomly picks an action from the model outputs.

class Agent(object):
    def __init__(self, brain):
        self.brain = brain

    def policy(self, observations):
        observations = observations.reshape(1, -1)
        action_logits = self.brain.predict_on_batch(observations)
        action = tf.random.categorical(tf.math.log(action_logits), num_samples=1)
        return tf.squeeze(action, axis=1)

    def get_action(self, observations):
        return self.policy(observations)

Environment

The environement used to test our agent is the GridWorld. We will run multiple episode where the agent will use its Neural Network model to take actions and we accumulate the rewards returned by the environment. At the end of each episode we print the total rewards and number of steps/actions taken during this eposide.

def run_episode(agent, env):
    obs, episode_reward, done, step_num = env.reset(), 0.0, False, 0
    while not done:
        action = agent.get_action(obs)
        obs, reward, done, info = env.step(action)
        episode_reward += reward
        step_num += 1
        env.render()
    return step_num, episode_reward, done, info

Here we setup the environemnt and record the actions taken and disply each eposide.

env = gnwrapper.LoopAnimation(gym.make('Gridworld-v0'))

action_dim, input_shape = env.action_space.n, env.observation_space.shape
brain = Brain(action_dim, input_shape)
agent = Agent(brain)

for episode in range(1):
    steps, episode_reward, done, info = run_episode(agent, env)
    print(f"EpReward:{episode_reward:.2f} steps:{steps} done:{done} info:{info}")
    env.reset()

env.display()
EpReward:-4.50 steps:100 done:True info:{'status': 'Max steps reached'}
</input>

That's all folks

I hope you enjoyed this article, feel free to leave a comment or reach out on twitter @bachiirc