Skip to content

Commit

Permalink
lint
Browse files Browse the repository at this point in the history
  • Loading branch information
codekansas committed Jan 3, 2025
1 parent 3d4c8c6 commit 2a7004c
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 24 deletions.
1 change: 0 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ def run(self) -> None:
],
setup_requires=["setuptools-rust"],
install_requires=requirements,
tests_require=requirements_dev,
extras_require={"dev": requirements_dev},
zip_safe=False,
long_description=long_description,
Expand Down
60 changes: 37 additions & 23 deletions tests/test_serialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,36 +6,46 @@
import time
import uuid
from pathlib import Path
from typing import Mapping

import pytest

import krec


@pytest.fixture
def synthetic_krec_data() -> tuple[krec.KRec, dict[str, list[float]]]:
def synthetic_krec_data() -> tuple[krec.KRec, dict[str, Mapping[int | str, list[float]]]]:
"""Fixture that provides a synthetic KRec object and its original data."""
krec_obj, original_data = create_sine_wave_krec(num_frames=50, fps=30)
return krec_obj, original_data


def create_sine_wave_krec(num_frames: int = 50, fps: int = 30) -> tuple[krec.KRec, dict[str, list[float]]]:
def create_sine_wave_krec(
num_frames: int = 50,
fps: int = 30,
) -> tuple[krec.KRec, dict[str, Mapping[int | str, list[float]]]]:
"""Create a synthetic KRec object with sine wave data for testing."""
# Create timestamps
timestamps = [i / fps for i in range(num_frames)]

# Create wave data for each joint
position_waves = {i: [math.sin(2 * math.pi * 0.5 * t) for t in timestamps] for i in range(3)}
velocity_waves = {i: [math.sin(2 * math.pi * 0.5 * t) for t in timestamps] for i in range(3)}
torque_waves = {i: [0.5 * math.sin(2 * math.pi * 0.5 * t) for t in timestamps] for i in range(3)}
position_waves: dict[int | str, list[float]] = {
i: [math.sin(2 * math.pi * 0.5 * t) for t in timestamps] for i in range(3)
}
velocity_waves: dict[int | str, list[float]] = {
i: [math.sin(2 * math.pi * 0.5 * t) for t in timestamps] for i in range(3)
}
torque_waves: dict[int | str, list[float]] = {
i: [0.5 * math.sin(2 * math.pi * 0.5 * t) for t in timestamps] for i in range(3)
}

# Add IMU data
accel_waves = {
accel_waves: dict[int | str, list[float]] = {
"x": [0.1 * math.sin(2 * math.pi * 0.5 * t) for t in timestamps],
"y": [0.1 * math.sin(2 * math.pi * 0.5 * t) for t in timestamps],
"z": [9.81 + 0.1 * math.sin(2 * math.pi * 0.5 * t) for t in timestamps],
}
gyro_waves = {
gyro_waves: dict[int | str, list[float]] = {
"x": [math.sin(2 * math.pi * 0.5 * t) for t in timestamps],
"y": [math.sin(2 * math.pi * 0.5 * t) for t in timestamps],
"z": [math.sin(2 * math.pi * 0.5 * t) for t in timestamps],
Expand Down Expand Up @@ -98,24 +108,25 @@ def create_sine_wave_krec(num_frames: int = 50, fps: int = 30) -> tuple[krec.KRe

krec_obj.add_frame(frame)

# Update original data for verification
original_data = {
original_data: dict[str, Mapping[int | str, list[float]]] = {
"position_waves": position_waves,
"velocity_waves": velocity_waves,
"torque_waves": torque_waves,
"accel_waves": accel_waves,
"gyro_waves": gyro_waves,
"timestamps": timestamps,
"timestamps": {"0": timestamps},
}

return krec_obj, original_data


def verify_krec_data(original_data: dict[str, list[float]], loaded_krec: krec.KRec) -> bool:
def verify_krec_data(original_data: dict[str, Mapping[int | str, list[float]]], loaded_krec: krec.KRec) -> bool:
"""Verify that loaded KRec data matches the original data."""
frames = loaded_krec.get_frames()

def is_close(a: float, b: float, rtol: float = 1e-5) -> bool:
def is_close(a: float | None, b: float, rtol: float = 1e-5) -> bool:
if a is None:
return False
return abs(a - b) <= rtol * max(abs(a), abs(b))

for i, frame in enumerate(frames):
Expand All @@ -137,7 +148,7 @@ def is_close(a: float, b: float, rtol: float = 1e-5) -> bool:

# Verify IMU data
imu = frame.get_imu_values()
if imu:
if imu and imu.accel and imu.gyro:
expected_accel = [
original_data["accel_waves"]["x"][i],
original_data["accel_waves"]["y"][i],
Expand All @@ -164,22 +175,22 @@ def is_close(a: float, b: float, rtol: float = 1e-5) -> bool:
return True


def test_direct_save_load(synthetic_krec_data: tuple[krec.KRec, dict[str, list[float]]], tmpdir: Path) -> None:
def test_direct_save_load(
synthetic_krec_data: tuple[krec.KRec, dict[str, Mapping[int | str, list[float]]]],
tmpdir: Path,
) -> None:
"""Test saving and loading KRec data directly to/from a file."""
krec_obj, original_data = synthetic_krec_data

temp_file = tmpdir / "test.krec"
# Save KRec to temporary file
krec_obj.save(str(temp_file))

# Load KRec from file
loaded_krec = krec.KRec.load(str(temp_file))

# Verify the loaded data matches the original
assert verify_krec_data(original_data, loaded_krec)


def test_video_combination(synthetic_krec_data: tuple[krec.KRec, dict[str, list[float]]], tmpdir: Path) -> None:
def test_video_combination(
synthetic_krec_data: tuple[krec.KRec, dict[str, Mapping[int | str, list[float]]]],
tmpdir: Path,
) -> None:
"""Test combining KRec with video and extracting it back."""
krec_obj, original_data = synthetic_krec_data

Expand All @@ -198,11 +209,14 @@ def test_video_combination(synthetic_krec_data: tuple[krec.KRec, dict[str, list[
krec.combine_with_video(str(temp_video), str(temp_krec), str(output_video))

# Extract KRec from video
extracted_krec = krec.extract_from_video(str(output_video))
extracted_krec = krec.extract_from_video(str(output_video), verbose=False)
assert verify_krec_data(original_data, extracted_krec)


def test_header_preservation(synthetic_krec_data: tuple[krec.KRec, dict[str, list[float]]], tmpdir: Path) -> None:
def test_header_preservation(
synthetic_krec_data: tuple[krec.KRec, dict[str, dict[int | str, list[float]]]],
tmpdir: Path,
) -> None:
"""Test that KRec header information is preserved during save/load."""
krec_obj, _ = synthetic_krec_data

Expand Down

0 comments on commit 2a7004c

Please sign in to comment.