Skip to content

Commit

Permalink
Merge pull request #10 from ll7/robotenv_docs
Browse files Browse the repository at this point in the history
Robotenv docs
  • Loading branch information
ll7 authored Mar 1, 2024
2 parents 6cc4622 + 01f3c9b commit 7a44c22
Show file tree
Hide file tree
Showing 3 changed files with 153 additions and 22 deletions.
153 changes: 135 additions & 18 deletions robot_sf/robot_env.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import numpy as np
from gym.vector import VectorEnv
from gym import Env, spaces
from gym.utils import seeding
from robot_sf.nav.map_config import MapDefinition
from robot_sf.nav.navigation import RouteNavigator

Expand Down Expand Up @@ -342,75 +343,191 @@ def init_spaces(env_config: EnvSettings, map_def: MapDefinition):


class RobotEnv(Env):
"""Representing an OpenAI Gym environment for training
a self-driving robot with reinforcement learning"""
"""
Representing an OpenAI Gym environment for training a self-driving robot
with reinforcement learning.
"""

def __init__(
self, env_config: EnvSettings = EnvSettings(),
self,
env_config: EnvSettings = EnvSettings(),
reward_func: Callable[[dict], float] = simple_reward,
debug: bool = False):
debug: bool = False
):
"""
Initialize the Robot Environment.
Parameters:
- env_config (EnvSettings): Configuration for environment settings.
- reward_func (Callable[[dict], float]): Reward function that takes
a dictionary as input and returns a float as reward.
- debug (bool): If True, enables debugging information such as
visualizations.
"""

# Environment configuration details
self.env_config = env_config
map_def = env_config.map_pool.map_defs[0] # info: only use first map
self.action_space, self.observation_space, orig_obs_space = init_spaces(env_config, map_def)
# Extract first map definition; currently only supports using the first map
map_def = env_config.map_pool.map_defs[0]

# Initialize spaces based on the environment configuration and map
self.action_space, self.observation_space, orig_obs_space = \
init_spaces(env_config, map_def)

# Assign the reward function and debug flag
self.reward_func, self.debug = reward_func, debug
self.simulator = init_simulators(env_config, map_def, random_start_pos=True)[0]

# Initialize simulator with a random start position
self.simulator = init_simulators(
env_config,
map_def,
random_start_pos=True
)[0]

# Delta time per simulation step and maximum episode time
d_t = env_config.sim_config.time_per_step_in_secs
max_ep_time = env_config.sim_config.sim_time_in_secs

occupancies, sensors = init_collision_and_sensors(self.simulator, env_config, orig_obs_space)
self.state = RobotState(self.simulator.robot_navs[0], occupancies[0], sensors[0], d_t, max_ep_time)

# Initialize collision detectors and sensor data processors
occupancies, sensors = init_collision_and_sensors(
self.simulator,
env_config,
orig_obs_space
)

# Setup initial state of the robot
self.state = RobotState(
self.simulator.robot_navs[0],
occupancies[0],
sensors[0],
d_t,
max_ep_time)

# Store last action executed by the robot
self.last_action = None

# If in debug mode, create a simulation view to visualize the state
if debug:
self.sim_ui = SimulationView(
scaling=10,
obstacles=map_def.obstacles,
robot_radius=env_config.robot_config.radius,
ped_radius=env_config.sim_config.ped_radius,
goal_radius=env_config.sim_config.goal_radius)

# Display the simulation UI
self.sim_ui.show()

def step(self, action):
"""
Execute one time step within the environment.
Parameters:
- action: Action to be executed.
Returns:
- obs: Observation after taking the action.
- reward: Calculated reward for the taken action.
- term: Boolean indicating if the episode has terminated.
- info: Additional information as dictionary.
"""
# Process the action through the simulator
action = self.simulator.robots[0].parse_action(action)
self.last_action = action
# Perform simulation step
self.simulator.step_once([action])
# Get updated observation
obs = self.state.step()
# Fetch metadata about the current state
meta = self.state.meta_dict()
# Determine if the episode has reached terminal state
term = self.state.is_terminal
# Compute the reward using the provided reward function
reward = self.reward_func(meta)
return obs, reward, term, { "step": meta["step"], "meta": meta }
return obs, reward, term, {"step": meta["step"], "meta": meta}

def reset(self):
"""
Reset the environment state to start a new episode.
Returns:
- obs: The initial observation after resetting the environment.
"""
# Reset internal simulator state
self.simulator.reset_state()
# Reset the environment's state and return the initial observation
obs = self.state.reset()
return obs

def render(self):
"""
Render the environment visually if in debug mode.
Raises RuntimeError if debug mode is not enabled.
"""
if not self.sim_ui:
raise RuntimeError('Debug mode is not activated! Consider setting debug=True!')
raise RuntimeError(
'Debug mode is not activated! Consider setting '
'debug=True!')

# Prepare action visualization, if any action was executed
action = None if not self.last_action else VisualizableAction(
self.simulator.robot_poses[0], self.last_action, self.simulator.goal_pos[0])
self.simulator.robot_poses[0], self.last_action,
self.simulator.goal_pos[0])

# Robot position and LIDAR scanning visualization preparation
robot_pos = self.simulator.robot_poses[0][0]
distances, directions = lidar_ray_scan(
self.simulator.robot_poses[0], self.state.occupancy, self.env_config.lidar_config)
self.simulator.robot_poses[0], self.state.occupancy,
self.env_config.lidar_config)

# Construct ray vectors for visualization
ray_vecs = zip(np.cos(directions) * distances, np.sin(directions) * distances)
ray_vecs_np = np.array(
[[[robot_pos[0], robot_pos[1]], [robot_pos[0] + x, robot_pos[1] + y]] for x, y in ray_vecs])
ped_actions = zip(self.simulator.pysf_sim.peds.pos(),
self.simulator.pysf_sim.peds.pos() + self.simulator.pysf_sim.peds.vel() * 2)
ray_vecs_np = np.array([[
[robot_pos[0], robot_pos[1]],
[robot_pos[0] + x, robot_pos[1] + y]
] for x, y in ray_vecs])

# Prepare pedestrian action visualization
ped_actions = zip(
self.simulator.pysf_sim.peds.pos(),
self.simulator.pysf_sim.peds.pos() +
self.simulator.pysf_sim.peds.vel() * 2)
ped_actions_np = np.array([[pos, vel] for pos, vel in ped_actions])

# Package the state for visualization
state = VisualizableSimState(
self.state.timestep, action, self.simulator.robot_poses[0],
deepcopy(self.simulator.ped_pos), ray_vecs_np, ped_actions_np)

# Execute rendering of the state through the simulation UI
self.sim_ui.render(state)

def seed(self, seed=None):
"""
Set the seed for this env's random number generator(s).
Note:
Some environments use multiple pseudorandom number generators.
We want to capture all such seeds used in order to ensure that
there aren't accidental correlations between multiple generators.
Returns:
list<bigint>: Returns the list of seeds used in this env's random
number generators. The first value in the list should be the
"main" seed, or the value which a reproducer should pass to
'seed'. Often, the main seed equals the provided 'seed', but
this won't be true if seed=None, for example.
TODO: validate this method
"""
self.np_random, seed = seeding.np_random(seed)
return [seed]

def exit(self):
"""
Clean up and exit the simulation UI, if it exists.
"""
if self.sim_ui:
self.sim_ui.exit()

Expand Down
3 changes: 3 additions & 0 deletions scripts/debug_random_policy.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
"""
Generate a random policy to test the environment
"""
from robot_sf.robot_env import RobotEnv


Expand Down
19 changes: 15 additions & 4 deletions scripts/training_ppo.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,11 @@


def training():
n_envs = 64
n_envs = 32
ped_densities = [0.01, 0.02, 0.04, 0.08]
difficulty = 2


def make_env():
config = EnvSettings()
config.sim_config.ped_density_by_difficulty = ped_densities
Expand All @@ -29,11 +30,21 @@ def make_env():
tensorboard_log="./logs/ppo_logs/",
policy_kwargs=policy_kwargs
)
save_model_callback = CheckpointCallback(500_000 // n_envs, "./model/backup", "ppo_model")
save_model_callback = CheckpointCallback(
500_000 // n_envs,
"./model/backup",
"ppo_model"
)
collect_metrics_callback = DrivingMetricsCallback(n_envs)
combined_callback = CallbackList([save_model_callback, collect_metrics_callback])
combined_callback = CallbackList(
[save_model_callback, collect_metrics_callback]
)

model.learn(total_timesteps=50_000_000, progress_bar=True, callback=combined_callback)
model.learn(
total_timesteps=1_000_000,
progress_bar=True,
callback=combined_callback
)
model.save("./model/ppo_model")


Expand Down

0 comments on commit 7a44c22

Please sign in to comment.