Skip to content

Commit

Permalink
updates
Browse files Browse the repository at this point in the history
  • Loading branch information
alik-git committed Dec 27, 2024
1 parent a4f7821 commit 1c0aabc
Show file tree
Hide file tree
Showing 2 changed files with 150 additions and 108 deletions.
106 changes: 74 additions & 32 deletions krecviz/urdf_logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from PIL import Image
from urdf_parser_py import urdf as urdf_parser # type: ignore[import-untyped]


def rotation_from_euler_xyz(rpy):
"""
Given a 3-element list/tuple [rx, ry, rz] of Euler angles in radians,
Expand Down Expand Up @@ -56,25 +57,8 @@ def rotation_from_euler_xyz(rpy):
[ 0, 0, 1],
], dtype=np.float64)

# Optional debug: print each partial rotation
print(" rotation_from_euler_xyz() debug:")
print(f" rx={rx} ry={ry} rz={rz}")
print(f" R_x =\n{R_x}")
print(f" R_y =\n{R_y}")
print(f" R_z =\n{R_z}")

# Final rotation = Rz @ Ry @ Rx
R_final = R_z @ R_y @ R_x

# Print the final matrix as a row-major flat list, to compare with Rust
row0 = R_final[0, :].tolist()
row1 = R_final[1, :].tolist()
row2 = R_final[2, :].tolist()
# Flatten in row-major
row_major_flat = row0 + row1 + row2
print(" => final rotation = Rz @ Ry @ Rx =\n", R_final)
print(" => final rotation (row-major) =", row_major_flat)

return R_final


Expand Down Expand Up @@ -134,6 +118,9 @@ def log(self) -> None:
entity_path = self.link_entity_path(link)
self.log_link(entity_path, link)

# -- AFTER we log everything, let's print a final transform for each link:
self.print_final_link_transforms()

def log_link(self, entity_path: str, link: urdf_parser.Link) -> None:
"""Log a URDF link to Rerun."""
for i, visual in enumerate(link.visuals):
Expand All @@ -150,7 +137,6 @@ def log_joint(self, entity_path: str, joint: urdf_parser.Joint) -> None:
if joint.origin is not None and joint.origin.rpy is not None:
# We call our custom rotation_from_euler_xyz
rotation_matrix = rotation_from_euler_xyz(joint.origin.rpy)

# Convert to a Python list-of-lists
rotation = [[float(x) for x in row] for row in rotation_matrix]

Expand All @@ -162,18 +148,21 @@ def log_joint(self, entity_path: str, joint: urdf_parser.Joint) -> None:
print("======================")
print("rerun_log")
print(f"entity_path = entity_path_w_prefix with value '{entity_path_w_prefix}'")
print(f" => translation = {translation}")
print(f" => rotation (2D list) = {rotation}")

# Show the row-major flattening for direct comparison to Rust
if rotation is not None:
flat_rot = rotation[0] + rotation[1] + rotation[2]
print("Original joint RPY values:")
if joint.origin is not None and joint.origin.rpy is not None:
print(f" => rpy = {[round(float(x), 3) for x in joint.origin.rpy]}")
else:
flat_rot = []
print(f" => rotation (row-major flatten) = {flat_rot}")
print(" => rpy = None")

transform_3d = rr.Transform3D(translation=translation , mat3x3=rotation)
print(f"entity = rr.Transform3D(translation={translation}, mat3x3={rotation}) with value {transform_3d}")
print("entity = rr.Transform3D with:")
print(" translation:", [f"{x:>8.3f}" for x in translation] if translation else None)
print(" mat3x3:")
if rotation:
for row in rotation:
print(" [" + ", ".join(f"{x:>8.3f}" for x in row) + "]")
else:
print(" None")

rr.log(
entity_path=entity_path_w_prefix,
Expand All @@ -193,9 +182,8 @@ def log_visual(self, entity_path: str, visual: urdf_parser.Visual) -> None:
if visual.origin is not None and visual.origin.xyz is not None:
transform[:3, 3] = visual.origin.xyz
if visual.origin is not None and visual.origin.rpy is not None:
transform[:3, :3] = st.Rotation.from_euler("xyz", visual.origin.rpy).as_matrix()
transform[:3, :3] = rotation_from_euler_xyz(visual.origin.rpy)

# Determine geometry
if isinstance(visual.geometry, urdf_parser.Mesh):
resolved_path = self.resolve_ros_path(visual.geometry.filename)
mesh_scale = visual.geometry.scale
Expand Down Expand Up @@ -261,7 +249,7 @@ def resolve_ros_path(self, path: str) -> Path:
if path.startswith("package://"):
raise ValueError(f"Could not resolve '{path}'. Provide a direct or relative path.")
elif path.startswith("file://"):
resolved = path[len("file://") :]
resolved = path[len("file://"):]
return Path(resolved).resolve()
else:
direct_path = (self.urdf_dir / path).resolve()
Expand All @@ -275,6 +263,58 @@ def resolve_ros_path(self, path: str) -> Path:
"Please check that the file exists and is accessible."
)

def print_final_link_transforms(self) -> None:
"""
Debug function: for each link, accumulate the joint transforms from root -> link,
then print the resulting final 4x4.
"""
root_link = self.urdf.get_root()
print("\n========== FINAL ACCUMULATED TRANSFORMS PER LINK ==========")
for link in self.urdf.links:
if link.name == root_link:
# The root link is presumably identity in the URDF
print(f"Link '{link.name}': Root link => final transform is identity.\n")
continue

chain = self.urdf.get_chain(root_link, link.name)
# e.g. [root_link, joint0_name, link0_name, joint1_name, link1_name, ...]

# Accumulate transform from 'root_link' to 'link.name'
# We'll build an np.eye(4), multiply each joint's origin along the way
final_tf = np.eye(4)
# We skip the 0th element (which is root_link),
# and step in pairs: (joint_i, link_i). If the chain is [a, j0, b, j1, c],
# then chain[1] is j0, chain[2] is b, chain[3] is j1, chain[4] is c, etc.
for i in range(1, len(chain), 2):
joint_name = chain[i]
# find that joint in self.urdf.joints
j = None
for jt in self.urdf.joints:
if jt.name == joint_name:
j = jt
break
if j is None:
print(f" (!) Could not find joint named '{joint_name}' in URDF?")
continue

# Build local 4x4 from j.origin
xyz = j.origin.xyz if j.origin and j.origin.xyz else [0,0,0]
rpy = j.origin.rpy if j.origin and j.origin.rpy else [0,0,0]
local_rot = rotation_from_euler_xyz(rpy)
local_tf = np.eye(4)
local_tf[:3, :3] = local_rot
local_tf[:3, 3] = xyz

final_tf = final_tf @ local_tf

# Now 'final_tf' is the transform from root_link to link.name
# Print it
print(f"Link '{link.name}': BFS chain = {chain}")
print(" => final_tf (4x4) =")
for row in final_tf:
print(" [{: 8.3f} {: 8.3f} {: 8.3f} {: 8.3f}]".format(*row))
print()


def scene_to_trimeshes(scene: trimesh.Scene) -> list[trimesh.Trimesh]:
"""Convert a trimesh.Scene to a list of trimesh.Trimesh.
Expand Down Expand Up @@ -331,7 +371,7 @@ def log_trimesh(entity_path: str, mesh: trimesh.Trimesh) -> None:

# Prepare debug prints
print("======================")
print("rerun_log")
print("rerun_log log_trimesh")
print(f"entity_path = entity_path with value '{entity_path}'")

# Build the rr.Mesh3D
Expand All @@ -348,7 +388,9 @@ def log_trimesh(entity_path: str, mesh: trimesh.Trimesh) -> None:
# Print only the first three vertex positions
first_three_vertices = mesh.vertices[:3].tolist()
print("entity = rr.Mesh3D(...) with these numeric values:")
print(f" => vertex_positions (first 3) = {first_three_vertices}")
print(" => vertex_positions (first 3):")
for vertex in first_three_vertices:
print(f" [{', '.join(f'{x:>7.3f}' for x in vertex)}]")

timeless_val = True
print(f"timeless = {timeless_val}")
Expand Down
152 changes: 76 additions & 76 deletions krecviz/visualize.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,58 +67,58 @@ def load_krec(file_path: Path, verbose: bool = False) -> krec.KRec:
raise RuntimeError(f"Invalid file extension. Expected '.krec' or '.krec.mkv', got: {file_path}")


# def update_robot_pose(
# entity_to_transform: dict[str, tuple[list[float], list[list[float]]]],
# actuator_states: list[krec.ActuatorState],
# joint_name_to_entity_path: dict[str, str],
# ) -> None:
# """Updates robot joint positions based on actuator states."""
# joint_angles = {}
# for state in actuator_states:
# if state.actuator_id in actuator_to_urdf_joint and state.position is not None:
# joint_name = actuator_to_urdf_joint[state.actuator_id]
# # Convert degrees to radians
# angle_rad = np.deg2rad(float(state.position))
# joint_angles[joint_name] = angle_rad

# # Update each joint's transform
# for joint_name, angle in joint_angles.items():
# if joint_name not in joint_name_to_entity_path:
# logging.warning("No entity path found for joint %s", joint_name)
# continue

# full_path = joint_name_to_entity_path[joint_name]
# if full_path not in entity_to_transform:
# logging.warning("Transform not found for path %s", full_path)
# continue

# # Get initial transform and rotation axis
# translation, base_rotation = entity_to_transform[full_path]
# axis = np.array([0, 0, 1])

# # Compute new rotation (angle is already in radians)
# rot_mat = Rotation.from_rotvec(axis * angle).as_matrix()
# new_rotation = base_rotation @ rot_mat

# # Log updated transform
# rr.log(
# f"/{full_path}",
# rr.Transform3D(translation=translation, mat3x3=new_rotation),
# )


# def log_frame_data(frame: krec.KRecFrame, frame_idx: int) -> None:
# """Log actuator states for each frame."""
# rr.set_time_sequence("frame_idx", frame_idx)

# for state in frame.get_actuator_states():
# prefix = f"actuators/actuator_{state.actuator_id}/state"
# if state.position is not None:
# rr.log(f"{prefix}/position", rr.Scalar(float(state.position)))
# if state.velocity is not None:
# rr.log(f"{prefix}/velocity", rr.Scalar(float(state.velocity)))
# if state.torque is not None:
# rr.log(f"{prefix}/torque", rr.Scalar(float(state.torque)))
def update_robot_pose(
entity_to_transform: dict[str, tuple[list[float], list[list[float]]]],
actuator_states: list[krec.ActuatorState],
joint_name_to_entity_path: dict[str, str],
) -> None:
"""Updates robot joint positions based on actuator states."""
joint_angles = {}
for state in actuator_states:
if state.actuator_id in actuator_to_urdf_joint and state.position is not None:
joint_name = actuator_to_urdf_joint[state.actuator_id]
# Convert degrees to radians
angle_rad = np.deg2rad(float(state.position))
joint_angles[joint_name] = angle_rad

# Update each joint's transform
for joint_name, angle in joint_angles.items():
if joint_name not in joint_name_to_entity_path:
logging.warning("No entity path found for joint %s", joint_name)
continue

full_path = joint_name_to_entity_path[joint_name]
if full_path not in entity_to_transform:
logging.warning("Transform not found for path %s", full_path)
continue

# Get initial transform and rotation axis
translation, base_rotation = entity_to_transform[full_path]
axis = np.array([0, 0, 1])

# Compute new rotation (angle is already in radians)
rot_mat = Rotation.from_rotvec(axis * angle).as_matrix()
new_rotation = base_rotation @ rot_mat

# Log updated transform
rr.log(
f"/{full_path}",
rr.Transform3D(translation=translation, mat3x3=new_rotation),
)


def log_frame_data(frame: krec.KRecFrame, frame_idx: int) -> None:
"""Log actuator states for each frame."""
rr.set_time_sequence("frame_idx", frame_idx)

for state in frame.get_actuator_states():
prefix = f"actuators/actuator_{state.actuator_id}/state"
if state.position is not None:
rr.log(f"{prefix}/position", rr.Scalar(float(state.position)))
if state.velocity is not None:
rr.log(f"{prefix}/velocity", rr.Scalar(float(state.velocity)))
if state.torque is not None:
rr.log(f"{prefix}/torque", rr.Scalar(float(state.torque)))


def visualize_krec(
Expand Down Expand Up @@ -156,30 +156,30 @@ def visualize_krec(
entity_path = urdf_logger.joint_entity_path(joint)
joint_name_to_entity_path[joint.name] = entity_path

# # Load KREC file
# krec_data = load_krec(krec_path)
# logging.info("Processing %d frames...", len(krec_data))

# # Process frames
# try:
# for idx, frame in enumerate(tqdm(krec_data, desc="Processing frames")):
# if frame is not None:
# log_frame_data(frame, idx)
# if urdf_path:
# update_robot_pose(
# urdf_logger.entity_to_transform,
# frame.get_actuator_states(),
# joint_name_to_entity_path,
# )

# if output_path:
# logging.info("Saved animation to: %s", output_path)
# else:
# logging.info("Animation complete. Press Ctrl+C to exit.")

# except Exception as e:
# logging.error("Error during animation: %s", e)
# raise
# Load KREC file
krec_data = load_krec(krec_path)
logging.info("Processing %d frames...", len(krec_data))

# Process frames
try:
for idx, frame in enumerate(tqdm(krec_data, desc="Processing frames")):
if frame is not None:
log_frame_data(frame, idx)
if urdf_path:
update_robot_pose(
urdf_logger.entity_to_transform,
frame.get_actuator_states(),
joint_name_to_entity_path,
)

if output_path:
logging.info("Saved animation to: %s", output_path)
else:
logging.info("Animation complete. Press Ctrl+C to exit.")

except Exception as e:
logging.error("Error during animation: %s", e)
raise


def main() -> None:
Expand Down

0 comments on commit 1c0aabc

Please sign in to comment.