Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Robotenv docs #10

Merged
merged 5 commits into from
Mar 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
153 changes: 135 additions & 18 deletions robot_sf/robot_env.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import numpy as np
from gym.vector import VectorEnv
from gym import Env, spaces
from gym.utils import seeding
from robot_sf.nav.map_config import MapDefinition
from robot_sf.nav.navigation import RouteNavigator

Expand Down Expand Up @@ -342,75 +343,191 @@ def init_spaces(env_config: EnvSettings, map_def: MapDefinition):


class RobotEnv(Env):
"""Representing an OpenAI Gym environment for training
a self-driving robot with reinforcement learning"""
"""
Representing an OpenAI Gym environment for training a self-driving robot
with reinforcement learning.
"""

def __init__(
self, env_config: EnvSettings = EnvSettings(),
self,
env_config: EnvSettings = EnvSettings(),
reward_func: Callable[[dict], float] = simple_reward,
debug: bool = False):
debug: bool = False
):
"""
Initialize the Robot Environment.

Parameters:
- env_config (EnvSettings): Configuration for environment settings.
- reward_func (Callable[[dict], float]): Reward function that takes
a dictionary as input and returns a float as reward.
- debug (bool): If True, enables debugging information such as
visualizations.
"""

# Environment configuration details
self.env_config = env_config
map_def = env_config.map_pool.map_defs[0] # info: only use first map
self.action_space, self.observation_space, orig_obs_space = init_spaces(env_config, map_def)
# Extract first map definition; currently only supports using the first map
map_def = env_config.map_pool.map_defs[0]

# Initialize spaces based on the environment configuration and map
self.action_space, self.observation_space, orig_obs_space = \
init_spaces(env_config, map_def)

# Assign the reward function and debug flag
self.reward_func, self.debug = reward_func, debug
self.simulator = init_simulators(env_config, map_def, random_start_pos=True)[0]

# Initialize simulator with a random start position
self.simulator = init_simulators(
env_config,
map_def,
random_start_pos=True
)[0]

# Delta time per simulation step and maximum episode time
d_t = env_config.sim_config.time_per_step_in_secs
max_ep_time = env_config.sim_config.sim_time_in_secs

occupancies, sensors = init_collision_and_sensors(self.simulator, env_config, orig_obs_space)
self.state = RobotState(self.simulator.robot_navs[0], occupancies[0], sensors[0], d_t, max_ep_time)

# Initialize collision detectors and sensor data processors
occupancies, sensors = init_collision_and_sensors(
self.simulator,
env_config,
orig_obs_space
)

# Setup initial state of the robot
self.state = RobotState(
self.simulator.robot_navs[0],
occupancies[0],
sensors[0],
d_t,
max_ep_time)

# Store last action executed by the robot
self.last_action = None

# If in debug mode, create a simulation view to visualize the state
if debug:
self.sim_ui = SimulationView(
scaling=10,
obstacles=map_def.obstacles,
robot_radius=env_config.robot_config.radius,
ped_radius=env_config.sim_config.ped_radius,
goal_radius=env_config.sim_config.goal_radius)

# Display the simulation UI
self.sim_ui.show()

def step(self, action):
"""
Execute one time step within the environment.

Parameters:
- action: Action to be executed.

Returns:
- obs: Observation after taking the action.
- reward: Calculated reward for the taken action.
- term: Boolean indicating if the episode has terminated.
- info: Additional information as dictionary.
"""
# Process the action through the simulator
action = self.simulator.robots[0].parse_action(action)
self.last_action = action
# Perform simulation step
self.simulator.step_once([action])
# Get updated observation
obs = self.state.step()
# Fetch metadata about the current state
meta = self.state.meta_dict()
# Determine if the episode has reached terminal state
term = self.state.is_terminal
# Compute the reward using the provided reward function
reward = self.reward_func(meta)
return obs, reward, term, { "step": meta["step"], "meta": meta }
return obs, reward, term, {"step": meta["step"], "meta": meta}

def reset(self):
"""
Reset the environment state to start a new episode.

Returns:
- obs: The initial observation after resetting the environment.
"""
# Reset internal simulator state
self.simulator.reset_state()
# Reset the environment's state and return the initial observation
obs = self.state.reset()
return obs

def render(self):
"""
Render the environment visually if in debug mode.

Raises RuntimeError if debug mode is not enabled.
"""
if not self.sim_ui:
raise RuntimeError('Debug mode is not activated! Consider setting debug=True!')
raise RuntimeError(
'Debug mode is not activated! Consider setting '
'debug=True!')

# Prepare action visualization, if any action was executed
action = None if not self.last_action else VisualizableAction(
self.simulator.robot_poses[0], self.last_action, self.simulator.goal_pos[0])
self.simulator.robot_poses[0], self.last_action,
self.simulator.goal_pos[0])

# Robot position and LIDAR scanning visualization preparation
robot_pos = self.simulator.robot_poses[0][0]
distances, directions = lidar_ray_scan(
self.simulator.robot_poses[0], self.state.occupancy, self.env_config.lidar_config)
self.simulator.robot_poses[0], self.state.occupancy,
self.env_config.lidar_config)

# Construct ray vectors for visualization
ray_vecs = zip(np.cos(directions) * distances, np.sin(directions) * distances)
ray_vecs_np = np.array(
[[[robot_pos[0], robot_pos[1]], [robot_pos[0] + x, robot_pos[1] + y]] for x, y in ray_vecs])
ped_actions = zip(self.simulator.pysf_sim.peds.pos(),
self.simulator.pysf_sim.peds.pos() + self.simulator.pysf_sim.peds.vel() * 2)
ray_vecs_np = np.array([[
[robot_pos[0], robot_pos[1]],
[robot_pos[0] + x, robot_pos[1] + y]
] for x, y in ray_vecs])

# Prepare pedestrian action visualization
ped_actions = zip(
self.simulator.pysf_sim.peds.pos(),
self.simulator.pysf_sim.peds.pos() +
self.simulator.pysf_sim.peds.vel() * 2)
ped_actions_np = np.array([[pos, vel] for pos, vel in ped_actions])

# Package the state for visualization
state = VisualizableSimState(
self.state.timestep, action, self.simulator.robot_poses[0],
deepcopy(self.simulator.ped_pos), ray_vecs_np, ped_actions_np)

# Execute rendering of the state through the simulation UI
self.sim_ui.render(state)

def seed(self, seed=None):
"""
Set the seed for this env's random number generator(s).

Note:
Some environments use multiple pseudorandom number generators.
We want to capture all such seeds used in order to ensure that
there aren't accidental correlations between multiple generators.

Returns:
list<bigint>: Returns the list of seeds used in this env's random
number generators. The first value in the list should be the
"main" seed, or the value which a reproducer should pass to
'seed'. Often, the main seed equals the provided 'seed', but
this won't be true if seed=None, for example.

TODO: validate this method
"""
self.np_random, seed = seeding.np_random(seed)
return [seed]

def exit(self):
"""
Clean up and exit the simulation UI, if it exists.
"""
if self.sim_ui:
self.sim_ui.exit()

Expand Down
3 changes: 3 additions & 0 deletions scripts/debug_random_policy.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
"""
Generate a random policy to test the environment
"""
from robot_sf.robot_env import RobotEnv


Expand Down
19 changes: 15 additions & 4 deletions scripts/training_ppo.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,11 @@


def training():
n_envs = 64
n_envs = 32
ped_densities = [0.01, 0.02, 0.04, 0.08]
difficulty = 2


def make_env():
config = EnvSettings()
config.sim_config.ped_density_by_difficulty = ped_densities
Expand All @@ -29,11 +30,21 @@ def make_env():
tensorboard_log="./logs/ppo_logs/",
policy_kwargs=policy_kwargs
)
save_model_callback = CheckpointCallback(500_000 // n_envs, "./model/backup", "ppo_model")
save_model_callback = CheckpointCallback(
500_000 // n_envs,
"./model/backup",
"ppo_model"
)
collect_metrics_callback = DrivingMetricsCallback(n_envs)
combined_callback = CallbackList([save_model_callback, collect_metrics_callback])
combined_callback = CallbackList(
[save_model_callback, collect_metrics_callback]
)

model.learn(total_timesteps=50_000_000, progress_bar=True, callback=combined_callback)
model.learn(
total_timesteps=1_000_000,
progress_bar=True,
callback=combined_callback
)
model.save("./model/ppo_model")


Expand Down
Loading