Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Gymnasium interface changes #228

Open
wants to merge 3 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/gymnasium_interface/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
from gymnasium_interface.pycram_gym_env import PyCRAMGymEnv
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make this relative imports

from gymnasium_interface.task_executor import PyCRAMTaskExecutor
54 changes: 54 additions & 0 deletions src/gymnasium_interface/example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import logging
from gymnasium_interface.pycram_gym_env import PyCRAMGymEnv
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make these relative imports

from pycram.datastructures.enums import Arms, Grasp
from pycram.datastructures.pose import Pose

# Configure logging
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use the logging from pycram.ros.logging instead of the Python logging


def custom_reward(state):
"""
Custom reward function.

:param state: The current state of the environment.
:type state: dict
:return: Reward value based on the state.
:rtype: float
"""
return 10.0 if state else -1.0

# Define actions as a list of strings
actions = ["navigate", "pick_up"]

# Define default parameters for each action
default_params = {
"navigate": {"target_pose": Pose(position=[1.0, 2.0, 0.0], orientation=[0.0, 0.0, 0.0, 1.0])},
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these should be function points in my opinion and not key-value stores

"pick_up": {"object_desig": "milk", "arm": Arms.RIGHT, "grasps": [Grasp.FRONT]},
}

# Define objects to initialize in the environment
objects = [
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this needs some more engineering. Perhaps objects that do not have worlds yet? Please prepare a short discussion of this kind of problem for the next pycram meeting

{
"name": "milk",
"type": "object",
"urdf": "milk.stl",
"pose": Pose(position=[2.5, 2.10, 1.02]),
}
]

# Initialize the Gymnasium environment
env = PyCRAMGymEnv(actions=actions, default_params=default_params, objects=objects, reward_function=custom_reward)

# Reset the environment and retrieve the initial state
state, info = env.reset()
logging.info(f"State after reset: {state}")

# Perform a step in the environment
try:
state, reward, done, truncated, info = env.step(
action=1, # Index of the action to execute
params={"object_desig": "milk", "arm": Arms.RIGHT, "grasps": [Grasp.FRONT]},
)
logging.info(f"State after step: {state}, Reward: {reward}, Done: {done}, Truncated: {truncated}")
except ValueError as e:
logging.error(f"Action failed: {e}")
111 changes: 111 additions & 0 deletions src/gymnasium_interface/pycram_gym_env.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
import gymnasium as gym
from gymnasium.spaces import Discrete
from gymnasium_interface.task_executor import PyCRAMTaskExecutor # Use absolute import
from pycram.process_module import simulated_robot


class PyCRAMGymEnv(gym.Env):
"""
A Gymnasium-compatible environment for integrating PyCRAM task execution.

This environment allows users to execute PyCRAM tasks within a Gymnasium-compatible
framework. It supports dynamic task initialization, state tracking, and custom reward
calculations.

:param actions: List of valid action classes or functions (e.g., [NavigateAction, PickUpAction]).
:type actions: list
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

annotate types in the constructor, not in the docstring.

:param default_params: Default parameters for each action, keyed by action class/function (optional).
:type default_params: dict
:param objects: List of objects to initialize in the environment (optional).
:type objects: list
:param reward_function: Custom user-defined function to compute rewards (optional).
:type reward_function: callable
"""

def __init__(self, actions, default_params=None, objects=None, reward_function=None):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use function points instead, talk with @Leusmann how to do so

self.actions = actions
self.default_params = default_params or {}
self.objects = objects or []
self.reward_function = reward_function

# Dynamically define the action space
self.action_space = Discrete(len(actions))

# Initialize the task executor
self.executor = PyCRAMTaskExecutor()

# Initialize the state
self.state = None
self.reset()

def reset(self):
"""
Resets the environment.

:return: The initial state of the environment.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this docstring should be inherited from the super class

:rtype: tuple
"""
with simulated_robot:
self.executor.reset_task(self.objects)
self.state = self.executor.get_current_state()
return self.state, {}

def step(self, action, params=None):
"""
Executes a step in the environment.

:param action: The action index to execute.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this docstring should be inherited from the super class

:type action: int
:param params: Additional parameters for the action.
:type params: dict, optional
:return: A tuple containing the next state, reward, done flag, truncated flag, and additional info.
:rtype: tuple
"""
with simulated_robot:
action_name = self.actions[action]
action_params = self.default_params.get(action_name, {}).copy()
if params:
action_params.update(params)

# Execute the action
self.executor.execute_action(action_name, action_params)

# Update the state
self.state = self._get_observation()

# Calculate reward
reward = self._calculate_reward()

# Placeholder: done logic can be updated later
done = self._is_done()

return self.state, reward, done, False, {}

def _get_observation(self):
"""
Fetches the current state of the environment.

:return: The current state of the environment.
:rtype: dict
"""
return self.state

def _calculate_reward(self):
"""
Calculates the reward using the user-defined reward function.

:return: The calculated reward.
:rtype: float
"""
if self.reward_function:
return self.reward_function(self.state)
return 1.0

def _is_done(self):
"""
Checks if the task is complete.

:return: True if the task is done, otherwise False.
:rtype: bool
"""
return False
178 changes: 178 additions & 0 deletions src/gymnasium_interface/task_executor.py
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally the structure here is very repetitive with pycram designators, this should not be

Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
from pycram.worlds.bullet_world import BulletWorld
from pycram.world_concepts.world_object import Object
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

relative imports

from pycram.datastructures.enums import ObjectType, WorldMode, Grasp
from pycram.datastructures.pose import Pose
from pycram.designators.action_designator import (
NavigateAction,
PickUpAction,
PlaceAction,
OpenAction,
CloseAction,
)
from pycram.designators.object_designator import BelieveObject
from pycram.process_module import simulated_robot
import logging
from typing import Dict, List, Union

# Configure logging
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use logging from pycram.ros.logging



class PyCRAMTaskExecutor:
"""
Handles task execution in a PyCRAM environment. This class integrates with BulletWorld
for managing objects and robot tasks in the simulation.

Attributes:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use rst syntax for the doc string

world: The BulletWorld instance managing the environment.
robot: The robot object in the environment.
apartment: The apartment or environment object in the simulation.
"""

world: BulletWorld
robot: Object
apartment: Object

def __init__(self):
"""
Initializes the task executor for PyCRAM actions.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

useless docstring

"""
self.world = BulletWorld(WorldMode.GUI)
self.robot = None
self.apartment = None

def clear_world(self) -> None:
"""
Removes all objects from the BulletWorld.
"""
logging.info("Clearing all objects from BulletWorld...")
for obj in list(self.world.objects):
obj.remove()
logging.info("All objects removed from BulletWorld.")

def reset_task(self, objects: List[Dict[str, Union[str, Pose]]]) -> None:
"""
Resets the simulation environment dynamically by clearing the world and adding new objects.

:param objects: List of objects to be added to the environment.
"""
self.clear_world()

# Reload the apartment URDF
self.apartment = Object("apartment", "environment", "apartment.urdf")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use ObjectType.ENVIRONMENT instead of the string


# Reinitialize the robot
self.robot = Object("pr2", ObjectType.ROBOT, "pr2.urdf", pose=Pose([1.2, 1, 0]))
self.world.robot = self.robot

# Add dynamic objects
for obj in objects:
name = obj["name"]
obj_type = obj["type"]
urdf = obj["urdf"]
pose = obj["pose"]

logging.info(f"Adding object: {name}, URDF path: {urdf}, Pose: {pose}")

existing_object = self.world.get_object_by_name(name)
if existing_object:
logging.info(f"Reusing existing object: {name}")
else:
Object(name, obj_type, urdf, pose=pose)

logging.info("Environment reset: Apartment, robot, and dynamic objects added.")

def execute_action(self, action: str, params: Dict[str, Union[str, Pose, List[Grasp]]]) -> None:
"""
Executes a PyCRAM action based on the provided parameters.

:param action: The action to be executed (e.g., "navigate", "pick_up").
:param params: Parameters required for the action.
"""
action_map = {
"navigate": self._navigate,
"pick_up": self._pick_up,
"place": self._place,
"open": self._open,
"close": self._close,
}

if action in action_map:
action_map[action](params)
else:
raise ValueError(f"Unknown action: {action}")

def _navigate(self, params: Dict[str, Pose]) -> None:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why don't you execute the action directly?

"""
Navigates the robot to a target location.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

types 👏 in 👏 the 👏 signature 👏


:param params: Parameters for the navigate action, including "target_pose".
"""
target_pose = params.get("target_pose")
if not target_pose:
raise ValueError("Missing parameter: target_pose")
NavigateAction(target_locations=[target_pose]).resolve().perform()

def _pick_up(self, params: Dict[str, Union[str, List[Grasp]]]) -> None:
"""
Picks up an object.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

types 👏 in 👏 the 👏 signature 👏


:param params: Parameters for the pick-up action, including "object_desig" and "arm".
"""
object_name = params.get("object_desig")
arm = params.get("arm")
grasps = params.get("grasps", [Grasp.RIGHT])
if not object_name or not arm:
raise ValueError("Missing parameters: object_desig and arm are required")
object_desig = BelieveObject(names=[object_name])
action = PickUpAction(
object_designator_description=object_desig, arms=[arm], grasps=grasps
).resolve()
action.perform()

def _place(self, params: Dict[str, Union[str, Pose]]) -> None:
"""
Places an object at a target location.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

types 👏 in 👏 the 👏 signature 👏

:param params: Parameters for the place action, including "object_desig", "target_pose", and "arm".
"""
object_desig = params.get("object_desig")
target_pose = params.get("target_pose")
arm = params.get("arm")
if not object_desig or not target_pose or not arm:
raise ValueError("Missing parameters: object_desig, target_pose, and arm are required")
PlaceAction(object_designator_description=object_desig, target_locations=[target_pose], arms=[arm]).resolve().perform()

def _open(self, params: Dict[str, str]) -> None:
"""
Opens an object (e.g., a drawer or door).

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

types 👏 in 👏 the 👏 signature 👏

:param params: Parameters for the open action, including "handle_desig" and "arm".
"""
handle_desig = params.get("handle_desig")
arm = params.get("arm")
if not handle_desig or not arm:
raise ValueError("Missing parameters: handle_desig and arm are required")
OpenAction(handle_desig, [arm]).resolve().perform()

def _close(self, params: Dict[str, str]) -> None:
"""
Closes an object (e.g., a drawer or door).

:param params: Parameters for the close action, including "handle_desig" and "arm".
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

types 👏 in 👏 the 👏 signature 👏

"""
handle_desig = params.get("handle_desig")
arm = params.get("arm")
if not handle_desig or not arm:
raise ValueError("Missing parameters: handle_desig and arm are required")
CloseAction(handle_desig, [arm]).resolve().perform()

def get_current_state(self) -> Dict[str, Union[Pose, List[Dict[str, Pose]]]]:
"""
Fetches the current state of the environment, including the robot pose and objects.

:return: Dictionary containing the robot pose and a list of objects with their poses.
"""
robot_pose = self.robot.get_pose() if self.robot else None
objects = [{"name": obj.name, "pose": obj.pose} for obj in self.world.objects]
return {"robot_pose": robot_pose, "objects": objects}
Loading