-
Notifications
You must be signed in to change notification settings - Fork 44
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Gymnasium interface changes #228
base: dev
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,2 @@ | ||
from gymnasium_interface.pycram_gym_env import PyCRAMGymEnv | ||
from gymnasium_interface.task_executor import PyCRAMTaskExecutor |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,54 @@ | ||
import logging | ||
from gymnasium_interface.pycram_gym_env import PyCRAMGymEnv | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Make these relative imports |
||
from pycram.datastructures.enums import Arms, Grasp | ||
from pycram.datastructures.pose import Pose | ||
|
||
# Configure logging | ||
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Use the logging from pycram.ros.logging instead of the Python logging |
||
|
||
def custom_reward(state): | ||
""" | ||
Custom reward function. | ||
|
||
:param state: The current state of the environment. | ||
:type state: dict | ||
:return: Reward value based on the state. | ||
:rtype: float | ||
""" | ||
return 10.0 if state else -1.0 | ||
|
||
# Define actions as a list of strings | ||
actions = ["navigate", "pick_up"] | ||
|
||
# Define default parameters for each action | ||
default_params = { | ||
"navigate": {"target_pose": Pose(position=[1.0, 2.0, 0.0], orientation=[0.0, 0.0, 0.0, 1.0])}, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. these should be function points in my opinion and not key-value stores |
||
"pick_up": {"object_desig": "milk", "arm": Arms.RIGHT, "grasps": [Grasp.FRONT]}, | ||
} | ||
|
||
# Define objects to initialize in the environment | ||
objects = [ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this needs some more engineering. Perhaps objects that do not have worlds yet? Please prepare a short discussion of this kind of problem for the next pycram meeting |
||
{ | ||
"name": "milk", | ||
"type": "object", | ||
"urdf": "milk.stl", | ||
"pose": Pose(position=[2.5, 2.10, 1.02]), | ||
} | ||
] | ||
|
||
# Initialize the Gymnasium environment | ||
env = PyCRAMGymEnv(actions=actions, default_params=default_params, objects=objects, reward_function=custom_reward) | ||
|
||
# Reset the environment and retrieve the initial state | ||
state, info = env.reset() | ||
logging.info(f"State after reset: {state}") | ||
|
||
# Perform a step in the environment | ||
try: | ||
state, reward, done, truncated, info = env.step( | ||
action=1, # Index of the action to execute | ||
params={"object_desig": "milk", "arm": Arms.RIGHT, "grasps": [Grasp.FRONT]}, | ||
) | ||
logging.info(f"State after step: {state}, Reward: {reward}, Done: {done}, Truncated: {truncated}") | ||
except ValueError as e: | ||
logging.error(f"Action failed: {e}") |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,111 @@ | ||
import gymnasium as gym | ||
from gymnasium.spaces import Discrete | ||
from gymnasium_interface.task_executor import PyCRAMTaskExecutor # Use absolute import | ||
from pycram.process_module import simulated_robot | ||
|
||
|
||
class PyCRAMGymEnv(gym.Env): | ||
""" | ||
A Gymnasium-compatible environment for integrating PyCRAM task execution. | ||
|
||
This environment allows users to execute PyCRAM tasks within a Gymnasium-compatible | ||
framework. It supports dynamic task initialization, state tracking, and custom reward | ||
calculations. | ||
|
||
:param actions: List of valid action classes or functions (e.g., [NavigateAction, PickUpAction]). | ||
:type actions: list | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. annotate types in the constructor, not in the docstring. |
||
:param default_params: Default parameters for each action, keyed by action class/function (optional). | ||
:type default_params: dict | ||
:param objects: List of objects to initialize in the environment (optional). | ||
:type objects: list | ||
:param reward_function: Custom user-defined function to compute rewards (optional). | ||
:type reward_function: callable | ||
""" | ||
|
||
def __init__(self, actions, default_params=None, objects=None, reward_function=None): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. use function points instead, talk with @Leusmann how to do so |
||
self.actions = actions | ||
self.default_params = default_params or {} | ||
self.objects = objects or [] | ||
self.reward_function = reward_function | ||
|
||
# Dynamically define the action space | ||
self.action_space = Discrete(len(actions)) | ||
|
||
# Initialize the task executor | ||
self.executor = PyCRAMTaskExecutor() | ||
|
||
# Initialize the state | ||
self.state = None | ||
self.reset() | ||
|
||
def reset(self): | ||
""" | ||
Resets the environment. | ||
|
||
:return: The initial state of the environment. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this docstring should be inherited from the super class |
||
:rtype: tuple | ||
""" | ||
with simulated_robot: | ||
self.executor.reset_task(self.objects) | ||
self.state = self.executor.get_current_state() | ||
return self.state, {} | ||
|
||
def step(self, action, params=None): | ||
""" | ||
Executes a step in the environment. | ||
|
||
:param action: The action index to execute. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this docstring should be inherited from the super class |
||
:type action: int | ||
:param params: Additional parameters for the action. | ||
:type params: dict, optional | ||
:return: A tuple containing the next state, reward, done flag, truncated flag, and additional info. | ||
:rtype: tuple | ||
""" | ||
with simulated_robot: | ||
action_name = self.actions[action] | ||
action_params = self.default_params.get(action_name, {}).copy() | ||
if params: | ||
action_params.update(params) | ||
|
||
# Execute the action | ||
self.executor.execute_action(action_name, action_params) | ||
|
||
# Update the state | ||
self.state = self._get_observation() | ||
|
||
# Calculate reward | ||
reward = self._calculate_reward() | ||
|
||
# Placeholder: done logic can be updated later | ||
done = self._is_done() | ||
|
||
return self.state, reward, done, False, {} | ||
|
||
def _get_observation(self): | ||
""" | ||
Fetches the current state of the environment. | ||
|
||
:return: The current state of the environment. | ||
:rtype: dict | ||
""" | ||
return self.state | ||
|
||
def _calculate_reward(self): | ||
""" | ||
Calculates the reward using the user-defined reward function. | ||
|
||
:return: The calculated reward. | ||
:rtype: float | ||
""" | ||
if self.reward_function: | ||
return self.reward_function(self.state) | ||
return 1.0 | ||
|
||
def _is_done(self): | ||
""" | ||
Checks if the task is complete. | ||
|
||
:return: True if the task is done, otherwise False. | ||
:rtype: bool | ||
""" | ||
return False |
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Generally the structure here is very repetitive with pycram designators, this should not be |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,178 @@ | ||
from pycram.worlds.bullet_world import BulletWorld | ||
from pycram.world_concepts.world_object import Object | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. relative imports |
||
from pycram.datastructures.enums import ObjectType, WorldMode, Grasp | ||
from pycram.datastructures.pose import Pose | ||
from pycram.designators.action_designator import ( | ||
NavigateAction, | ||
PickUpAction, | ||
PlaceAction, | ||
OpenAction, | ||
CloseAction, | ||
) | ||
from pycram.designators.object_designator import BelieveObject | ||
from pycram.process_module import simulated_robot | ||
import logging | ||
from typing import Dict, List, Union | ||
|
||
# Configure logging | ||
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Use logging from pycram.ros.logging |
||
|
||
|
||
class PyCRAMTaskExecutor: | ||
""" | ||
Handles task execution in a PyCRAM environment. This class integrates with BulletWorld | ||
for managing objects and robot tasks in the simulation. | ||
|
||
Attributes: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Use rst syntax for the doc string |
||
world: The BulletWorld instance managing the environment. | ||
robot: The robot object in the environment. | ||
apartment: The apartment or environment object in the simulation. | ||
""" | ||
|
||
world: BulletWorld | ||
robot: Object | ||
apartment: Object | ||
|
||
def __init__(self): | ||
""" | ||
Initializes the task executor for PyCRAM actions. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. useless docstring |
||
""" | ||
self.world = BulletWorld(WorldMode.GUI) | ||
self.robot = None | ||
self.apartment = None | ||
|
||
def clear_world(self) -> None: | ||
""" | ||
Removes all objects from the BulletWorld. | ||
""" | ||
logging.info("Clearing all objects from BulletWorld...") | ||
for obj in list(self.world.objects): | ||
obj.remove() | ||
logging.info("All objects removed from BulletWorld.") | ||
|
||
def reset_task(self, objects: List[Dict[str, Union[str, Pose]]]) -> None: | ||
""" | ||
Resets the simulation environment dynamically by clearing the world and adding new objects. | ||
|
||
:param objects: List of objects to be added to the environment. | ||
""" | ||
self.clear_world() | ||
|
||
# Reload the apartment URDF | ||
self.apartment = Object("apartment", "environment", "apartment.urdf") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Use ObjectType.ENVIRONMENT instead of the string |
||
|
||
# Reinitialize the robot | ||
self.robot = Object("pr2", ObjectType.ROBOT, "pr2.urdf", pose=Pose([1.2, 1, 0])) | ||
self.world.robot = self.robot | ||
|
||
# Add dynamic objects | ||
for obj in objects: | ||
name = obj["name"] | ||
obj_type = obj["type"] | ||
urdf = obj["urdf"] | ||
pose = obj["pose"] | ||
|
||
logging.info(f"Adding object: {name}, URDF path: {urdf}, Pose: {pose}") | ||
|
||
existing_object = self.world.get_object_by_name(name) | ||
if existing_object: | ||
logging.info(f"Reusing existing object: {name}") | ||
else: | ||
Object(name, obj_type, urdf, pose=pose) | ||
|
||
logging.info("Environment reset: Apartment, robot, and dynamic objects added.") | ||
|
||
def execute_action(self, action: str, params: Dict[str, Union[str, Pose, List[Grasp]]]) -> None: | ||
""" | ||
Executes a PyCRAM action based on the provided parameters. | ||
|
||
:param action: The action to be executed (e.g., "navigate", "pick_up"). | ||
:param params: Parameters required for the action. | ||
""" | ||
action_map = { | ||
"navigate": self._navigate, | ||
"pick_up": self._pick_up, | ||
"place": self._place, | ||
"open": self._open, | ||
"close": self._close, | ||
} | ||
|
||
if action in action_map: | ||
action_map[action](params) | ||
else: | ||
raise ValueError(f"Unknown action: {action}") | ||
|
||
def _navigate(self, params: Dict[str, Pose]) -> None: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why don't you execute the action directly? |
||
""" | ||
Navigates the robot to a target location. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. types 👏 in 👏 the 👏 signature 👏 |
||
|
||
:param params: Parameters for the navigate action, including "target_pose". | ||
""" | ||
target_pose = params.get("target_pose") | ||
if not target_pose: | ||
raise ValueError("Missing parameter: target_pose") | ||
NavigateAction(target_locations=[target_pose]).resolve().perform() | ||
|
||
def _pick_up(self, params: Dict[str, Union[str, List[Grasp]]]) -> None: | ||
""" | ||
Picks up an object. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. types 👏 in 👏 the 👏 signature 👏 |
||
|
||
:param params: Parameters for the pick-up action, including "object_desig" and "arm". | ||
""" | ||
object_name = params.get("object_desig") | ||
arm = params.get("arm") | ||
grasps = params.get("grasps", [Grasp.RIGHT]) | ||
if not object_name or not arm: | ||
raise ValueError("Missing parameters: object_desig and arm are required") | ||
object_desig = BelieveObject(names=[object_name]) | ||
action = PickUpAction( | ||
object_designator_description=object_desig, arms=[arm], grasps=grasps | ||
).resolve() | ||
action.perform() | ||
|
||
def _place(self, params: Dict[str, Union[str, Pose]]) -> None: | ||
""" | ||
Places an object at a target location. | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. types 👏 in 👏 the 👏 signature 👏 |
||
:param params: Parameters for the place action, including "object_desig", "target_pose", and "arm". | ||
""" | ||
object_desig = params.get("object_desig") | ||
target_pose = params.get("target_pose") | ||
arm = params.get("arm") | ||
if not object_desig or not target_pose or not arm: | ||
raise ValueError("Missing parameters: object_desig, target_pose, and arm are required") | ||
PlaceAction(object_designator_description=object_desig, target_locations=[target_pose], arms=[arm]).resolve().perform() | ||
|
||
def _open(self, params: Dict[str, str]) -> None: | ||
""" | ||
Opens an object (e.g., a drawer or door). | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. types 👏 in 👏 the 👏 signature 👏 |
||
:param params: Parameters for the open action, including "handle_desig" and "arm". | ||
""" | ||
handle_desig = params.get("handle_desig") | ||
arm = params.get("arm") | ||
if not handle_desig or not arm: | ||
raise ValueError("Missing parameters: handle_desig and arm are required") | ||
OpenAction(handle_desig, [arm]).resolve().perform() | ||
|
||
def _close(self, params: Dict[str, str]) -> None: | ||
""" | ||
Closes an object (e.g., a drawer or door). | ||
|
||
:param params: Parameters for the close action, including "handle_desig" and "arm". | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. types 👏 in 👏 the 👏 signature 👏 |
||
""" | ||
handle_desig = params.get("handle_desig") | ||
arm = params.get("arm") | ||
if not handle_desig or not arm: | ||
raise ValueError("Missing parameters: handle_desig and arm are required") | ||
CloseAction(handle_desig, [arm]).resolve().perform() | ||
|
||
def get_current_state(self) -> Dict[str, Union[Pose, List[Dict[str, Pose]]]]: | ||
""" | ||
Fetches the current state of the environment, including the robot pose and objects. | ||
|
||
:return: Dictionary containing the robot pose and a list of objects with their poses. | ||
""" | ||
robot_pose = self.robot.get_pose() if self.robot else None | ||
objects = [{"name": obj.name, "pose": obj.pose} for obj in self.world.objects] | ||
return {"robot_pose": robot_pose, "objects": objects} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make this relative imports