Source code for rlgridworld.gridenv

import gym
import numpy as np
import cv2


[docs]class Actions: """Contiguous: Action is a tuple of length 2, where the first element is the x-axis and the second element is the y-axis. UP/DOWN -> action[0] LEFT/RIGHT -> action[1] Agent will: action[0]>0.5 -> try to go UP action[0]<=-0.5 -> try to go DOWN action[1]>0.5 -> try to go RIGHT action[1]<=-0.5 -> try to go LEFT Discrete: Action can be chosen from ['UP', 'DOWN', 'RIGHT', 'LEFT', 'UPRIGHT', 'UPLEFT', 'DOWNRIGHT', 'DOWNLEFT'] """ def __init__(self): self.actions={ 'UP': np.array([1.0, 0.0]), 'DOWN': np.array([-1.0, 0.0]), 'RIGHT': np.array([0.0, 1.0]), 'LEFT': np.array([0.0, -1.0]), 'UPRIGHT': np.array([1.0, 1.0]), 'UPLEFT': np.array([1.0, -1.0]), 'DOWNRIGHT': np.array([-1.0, 1.0]), 'DOWNLEFT': np.array([-1.0, -1.0]), }
[docs] def UP(self): return self.actions['UP'].copy()
[docs] def DOWN(self): return self.actions['DOWN'].copy()
[docs] def RIGHT(self): return self.actions['RIGHT'].copy()
[docs] def LEFT(self): return self.actions['LEFT'].copy()
[docs] def UPRIGHT(self): return self.actions['UPRIGHT'].copy()
[docs] def UPLEFT(self): return self.actions['UPLEFT'].copy()
[docs] def DOWNRIGHT(self): return self.actions['DOWNRIGHT'].copy()
[docs] def DOWNLEFT(self): return self.actions['DOWNLEFT'].copy()
[docs]class GridEnv(gym.Env): def __init__(self, load_chars_rep_fromd_dir='', init_chars_representation='O O O\nO A O\nO O T', max_steps=100, r_fall_off=-1, r_reach_target=1, r_timeout=0, r_continue=0, render_mode='human', obs_mode='single_rgb_array', render_width=0, render_height=0): """ For reward function: Falling off the edge = r_fall_off Reached Target = r_reach_target Timeout = r_timeout Continue one step = r_continue For Char Representation: A: Agent T: Target location O: Empty Ground spot (where the agent can step on and stay) W: Wall H: Hole (where the agent will fall if it steps in) Args: load_chars_rep_fromd_dir (str, optional): load chars_representation from a txt file. Overwrite init_chars_representation. Defaults to ''. init_chars_representation (str, optional): char representation of this grid-world. Defaults to 'O O O\nO A O\nO O T'. max_steps (int, optional): max game length. Defaults to 100. r_fall_off (int, optional): reward for falling off. Defaults to -1. r_reach_target (int, optional): reward for reaching target. Defaults to 1. r_timeout (int, optional): reward for ending the game with timeout. Defaults to 0. r_continue (int, optional): reward for continuing the game. Defaults to 0. render_mode (str, optional): None, 'chars_world' or 'single_rgb_array'. Defaults to 'chars_world'. obs_mode (str, optional): 'chars_world' or 'single_rgb_array'. Defaults to 'single_rgb_array'. render_width (int, optional): width of the rendered image. If 0, use the original size of char_world. Defaults to 0. render_height (int, optional): height of the rendered image. If 0, use the original size of char_world. Defaults to 0. """ self.actions=Actions() self.colors = { 'A': [255, 0, 0], # red 'T': [0, 255, 0], # green 'O': [0, 0, 0], # black 'W': [255, 255, 255], # white 'H': [0, 0, 255], # blue } if load_chars_rep_fromd_dir: with open(load_chars_rep_fromd_dir, 'r') as f: self.init_chars_representation = f.read() else: self.init_chars_representation = init_chars_representation self.max_steps = max_steps self.r_fall_off = r_fall_off self.r_reach_target = r_reach_target self.r_timeout = r_timeout self.r_continue = r_continue self.chars_world, self.width, self.height = self.chars_to_world(self.init_chars_representation) self.action_space = gym.spaces.Box(low=np.array([-1, -1]), high=np.array([1, 1]), dtype=np.float32) self.observation_space = gym.spaces.Space(shape=self.chars_world.shape, dtype=self.chars_world.dtype) self.render_mode = render_mode self.obs_mode = obs_mode self.render_width = render_width self.render_height = render_height # self.renderer = None # if self.render_mode == 'single_rgb_array': # self.renderer = gym.utils.Renderer(self.render_mode, self._render_frame)
[docs] def reset(self, seed=None, return_info=False, options=None): self.step_count = 0 self.chars_world, self.width, self.height = self.chars_to_world(self.init_chars_representation) a_arr_loc = np.where(self.chars_world == 'A') self.a_y = a_arr_loc[0][0] self.a_x = a_arr_loc[1][0] obs = self.chars_world_to_obs(self.chars_world) return obs
[docs] def step(self, action: np.ndarray): assert self.a_x >= 0 and self.a_x < self.width and self.a_y >= 0 and self.a_y < self.height if self.step_count >= self.max_steps: obs = self.chars_world_to_obs(self.chars_world) # agent is still kept the world where it was last seen. reward = self.r_timeout terminated = False truncated = True done = True info = { 'chars_world': self.chars_world, 'terminated': terminated, 'truncated': truncated, 'done': done, } self.render(mode=self.render_mode) return obs, reward, done, info result = [0,0,0,0] # the first, second, third and last zeros represents 'fall', 'fail', 'success', 'target'. # if result == [0,0,0,0], means the agent stays in the same place without a moving action. # Note that the agent can both move up and right at one step. if action[0] > 0.5: # going up self.move_to(self.a_y-1, self.a_x, result) if action[0] < -0.5: # going down self.move_to(self.a_y+1, self.a_x, result) if action[1] > 0.5: # going right self.move_to(self.a_y, self.a_x+1, result) if action[1] < -0.5: # going left self.move_to(self.a_y, self.a_x-1, result) if result[0] == 1: # fall obs = self.chars_world_to_obs(self.chars_world) # agent is still kept the world where it was last seen. reward = self.r_fall_off terminated = True truncated = False done = True info = { 'chars_world': self.chars_world, 'terminated': terminated, 'truncated': truncated, 'done': done, 'move_result': result, } self.render(mode=self.render_mode) return obs, reward, done, info elif result[3] == 1: # reach target obs = self.chars_world_to_obs(self.chars_world) # agent is still kept the world where it was last seen. reward = self.r_reach_target terminated = True truncated = False done = True info = { 'chars_world': self.chars_world, 'terminated': terminated, 'truncated': truncated, 'done': done, 'move_result': result, } self.render(mode=self.render_mode) return obs, reward, done, info self.step_count += 1 obs = self.chars_world_to_obs(self.chars_world) reward = self.r_continue terminated = False truncated = False done = False info = { 'chars_world': self.chars_world, 'terminated': terminated, 'truncated': truncated, 'done': done, 'move_result': result, } self.render(mode=self.render_mode) return obs, reward, done, info
[docs] def move_to(self, y, x, result): """ This is the distance=1 move action. move result should be one of: 'fall' 'fail' 'success' 'target' that is represented by the first, second, third and last element of result. """ if (not (x >= 0 and x < self.width and y >= 0 and y < self.height)) or self.chars_world[y, x] == 'H': result[0] = 1 return 'fall' elif self.chars_world[y, x] == 'O': self.chars_world[self.a_y, self.a_x] = 'O' self.chars_world[y, x] = 'A' self.a_y = y self.a_x = x result[2] = 1 return 'success' elif self.chars_world[y, x] == 'W': result[1] = 1 return 'fail' elif self.chars_world[y, x] == 'T': result[3] = 1 return 'target' else: raise Exception(f'Unknown char: {self.chars_world[y, x]}, y: {y}, x: {x}, chars_world: {str(self.char_world)}')
[docs] def chars_to_world(self, chars_representation): chars_world = np.array([line.split(' ') for line in chars_representation.split('\n')], dtype='<U1') height, width = chars_world.shape return chars_world, width, height
[docs] def chars_world_to_rgb_array(self, chars_world): rgb_image = np.zeros((*chars_world.shape,3), dtype='uint8') for x in range(chars_world.shape[1]): for y in range(chars_world.shape[0]): if chars_world[y, x] == 'A': rgb_image[y, x, :] = self.colors['A'] elif chars_world[y, x] == 'T': rgb_image[y, x, :] = self.colors['T'] elif chars_world[y, x] == 'O': rgb_image[y, x, :] = self.colors['O'] elif chars_world[y, x] == 'W': rgb_image[y, x, :] = self.colors['W'] elif chars_world[y, x] == 'H': rgb_image[y, x, :] = self.colors['H'] else: raise Exception(f'Unknown char: {chars_world[y, x]}, y: {y}, x: {x}, chars_world: {str(chars_world)}') return rgb_image
[docs] def chars_world_to_obs(self, chars_world): if self.obs_mode == 'chars_world': return chars_world elif self.obs_mode == 'single_rgb_array': rgb_img_array = self.chars_world_to_rgb_array(chars_world) if self.render_width == 0 and self.render_height == 0: return rgb_img_array else: return cv2.resize(rgb_img_array, (self.render_width, self.render_height), interpolation = cv2.INTER_NEAREST) else: raise Exception(f'Unknown obs mode: {self.obs_mode}')
def render(self, mode="human"): if mode is None: return None elif mode == "human": cv2.imshow("Game", self.canvas) cv2.waitKey(10) elif mode == "single_rgb_array": return self.chars_world_to_rgb_array(self.chars_world)
[docs] def close(self): cv2.destroyAllWindows()