Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhance load_states function and add video recording capability #96

Merged
merged 8 commits into from
Dec 11, 2024
33 changes: 32 additions & 1 deletion robot_sf/render/playback_recording.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

def load_states(filename: str) -> List[VisualizableSimState]:
"""
load a list of states from a file with pickle
load a list of states from a file with pickle `*.pkl` format
"""
# Check if the file is empty
if os.path.getsize(filename) == 0:
Expand All @@ -25,6 +25,18 @@ def load_states(filename: str) -> List[VisualizableSimState]:
with open(filename, "rb") as f: # rb = read binary
states, map_def = pickle.load(f)
logger.info(f"Loaded {len(states)} states")

# Verify `states` is a list of VisualizableSimState
if not all(isinstance(state, VisualizableSimState) for state in states):
logger.error(f"Invalid states loaded from {filename}")
raise TypeError(f"Invalid states loaded from {filename}")

# Verify `map_def` is a MapDefinition
if not isinstance(map_def, MapDefinition):
logger.error(f"Invalid map definition loaded from {filename}")
logger.error(f"map_def: {type(map_def)}")
raise TypeError(f"Invalid map definition loaded from {filename}")

return states, map_def


Expand All @@ -46,3 +58,22 @@ def load_states_and_visualize(filename: str):
"""
states, map_def = load_states(filename)
visualize_states(states, map_def)


def load_states_and_record_video(state_file: str, video_save_path: str, video_fps: float = 10):
"""
load a list of states from a file and record a video
"""
logger.info(f"Loading states from {state_file}")
states, map_def = load_states(state_file)
sim_view = SimulationView(
map_def=map_def,
caption="RobotSF Recording",
record_video=True,
video_path=vide_save_path,
video_fps=video_fps,
)
for state in states:
sim_view.render(state)

sim_view.exit_simulation() # to write the video file
2 changes: 0 additions & 2 deletions robot_sf/render/sim_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -257,11 +257,9 @@ def render(self, state: VisualizableSimState, sleep_time: float = 0.01):

if self.record_video:
# Capture frame
logger.debug("trying to record a frame")
frame_data = pygame.surfarray.array3d(self.screen)
frame_data = frame_data.swapaxes(0, 1)
self.frames.append(frame_data)
logger.debug(f"Recorded frames {len(self.frames)}")
if len(self.frames) > 2000:
logger.warning("Too many frames recorded. Stopping video recording.")
else:
Expand Down
43 changes: 43 additions & 0 deletions tests/test_load_states_and_record_video.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
"""pytest for load_states_and_record_video.py"""

import pytest
import datetime
from robot_sf.render.playback_recording import load_states_and_record_video
from robot_sf.render.sim_view import MOVIEPY_AVAILABLE
from pathlib import Path


@pytest.mark.skipif(
not MOVIEPY_AVAILABLE, reason="MoviePy/ffmpeg not available for video recording"
)
def test_load_states_and_record_video(delete_video: bool = True):
"""Test loading simulation states and recording them as video.
Args:
delete_video: Whether to delete the video file after test. Default True.
"""
# Create recordings directory if it doesn't exist
recordings_dir = Path("recordings")
recordings_dir.mkdir(exist_ok=True)

# create a unique video name
video_name = "playback_test_" + datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S") + ".mp4"

output_path = recordings_dir / video_name

try:
load_states_and_record_video(
"test_pygame/recordings/2024-06-04_08-39-59.pkl", str(output_path)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider using a fixture for test data.

The hardcoded test file path "test_pygame/recordings/2024-06-04_08-39-59.pkl" could make the test brittle. Consider:

  1. Using a pytest fixture to provide test data
  2. Creating a small test recording file specifically for this test
  3. Adding the test file to version control to ensure test stability

)

assert output_path.exists(), "Video file was not created"
assert output_path.stat().st_size > 0, "Video file is empty"
finally:
# Clean up
if output_path.exists() and delete_video:
output_path.unlink()



if __name__ == "__main__":
test_load_states_and_record_video(delete_video=False)
Loading