Skip to content

Commit

Permalink
fix python mujoco path parsing
Browse files Browse the repository at this point in the history
  • Loading branch information
alberthli committed Jul 24, 2024
1 parent 9bced9e commit de7ccd1
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 20 deletions.
49 changes: 30 additions & 19 deletions obelisk/python/obelisk_py/core/sim/mujoco.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import numpy as np
import obelisk_sensor_msgs.msg as osm
import rclpy
from ament_index_python.packages import get_package_share_directory
from mujoco import MjData, MjModel, mj_forward, mj_name2id, mj_step # type: ignore
from rclpy.callback_groups import ReentrantCallbackGroup
from rclpy.lifecycle import LifecycleState, TransitionCallbackReturn
Expand Down Expand Up @@ -39,10 +40,10 @@ def _get_msg_type_from_string(self, msg_type: str) -> Type[ObeliskSensorMsg]:
return osm.ObkJointEncoders # type: ignore
elif msg_type == "ObkImu":
assert is_in_bound(osm.ObkImu, ObeliskSensorMsg)
return osm.ObkImu
return osm.ObkImu # type: ignore
elif msg_type == "ObkFramePose":
assert is_in_bound(osm.ObkFramePose, ObeliskSensorMsg)
return osm.ObkFramePose
return osm.ObkFramePose # type: ignore
else:
raise NotImplementedError(f"Message type {msg_type} not supported! Check your spelling or open a PR.")

Expand Down Expand Up @@ -82,13 +83,13 @@ def _create_timer_callback_from_msg_type(
if msg_type == osm.ObkJointEncoders:
# verify that all joints are hinge or slider joints
for sensor_name in mj_sensor_names:
sensor_id = mj_name2id(self.mj_model, mujoco.mjtObj.mjOBJ_SENSOR, sensor_name)
sensor_id = mj_name2id(self.mj_model, mujoco.mjtObj.mjOBJ_SENSOR, sensor_name) # type: ignore

joint_id = self.mj_model.sensor_objid[sensor_id]
joint_name = self.mj_model.joint(joint_id).name
joint_type = self.mj_model.jnt_type[joint_id]

if joint_type not in [mujoco.mjtJoint.mjJNT_HINGE, mujoco.mjtJoint.mjJNT_SLIDE]:
if joint_type not in [mujoco.mjtJoint.mjJNT_HINGE, mujoco.mjtJoint.mjJNT_SLIDE]: # type: ignore
self.get_logger().error(f"Joint {joint_name} should be a hinge or slide joint!")
raise ValueError(f"Joint {joint_name} should be a hinge or slide joint!")

Expand All @@ -101,7 +102,7 @@ def timer_callback() -> None:

for sensor_name, obk_sensor_field in zip(mj_sensor_names, obk_sensor_fields):
# get the sensor id and address
sensor_id = mj_name2id(self.mj_model, mujoco.mjtObj.mjOBJ_SENSOR, sensor_name)
sensor_id = mj_name2id(self.mj_model, mujoco.mjtObj.mjOBJ_SENSOR, sensor_name) # type: ignore
sensor_adr = self.mj_model.sensor_adr[sensor_id]
if sensor_id == -1:
self.get_logger().error(f"Sensor {sensor_name} not found!")
Expand Down Expand Up @@ -141,7 +142,7 @@ def timer_callback() -> None:
has_acc, has_gyro, has_framequat = False, False, False
for sensor_name, obk_sensor_field in zip(mj_sensor_names, obk_sensor_fields):
# get the sensor id and address
sensor_id = mj_name2id(self.mj_model, mujoco.mjtObj.mjOBJ_SENSOR, sensor_name)
sensor_id = mj_name2id(self.mj_model, mujoco.mjtObj.mjOBJ_SENSOR, sensor_name) # type: ignore
sensor_adr = self.mj_model.sensor_adr[sensor_id]
if sensor_id == -1:
self.get_logger().error(f"Sensor {sensor_name} not found!")
Expand Down Expand Up @@ -204,7 +205,7 @@ def timer_callback() -> None:

for sensor_name, obk_sensor_field in zip(mj_sensor_names, obk_sensor_fields):
# get the sensor id and address
sensor_id = mj_name2id(self.mj_model, mujoco.mjtObj.mjOBJ_SENSOR, sensor_name)
sensor_id = mj_name2id(self.mj_model, mujoco.mjtObj.mjOBJ_SENSOR, sensor_name) # type: ignore
sensor_adr = self.mj_model.sensor_adr[sensor_id]
if sensor_id == -1:
self.get_logger().error(f"Sensor {sensor_name} not found!")
Expand All @@ -219,14 +220,14 @@ def timer_callback() -> None:
# framepos sensors can be mounted onto different mujoco objects (the frame we measure)
obj_type = self.mj_model.sensor_objtype[sensor_id]
obj_id = self.mj_model.sensor_objid[sensor_id]
if obj_type == mujoco.mjtObj.mjOBJ_SITE:
if obj_type == mujoco.mjtObj.mjOBJ_SITE: # type: ignore
msg.frame_name = self.mj_model.site(obj_id).name
msg.header.frame_id = self.mj_model.site(obj_id).name
elif obj_type == mujoco.mjtObj.mjOBJ_BODY:
elif obj_type == mujoco.mjtObj.mjOBJ_BODY: # type: ignore
msg.frame_name = self.mj_model.body(obj_id).name
elif obj_type == mujoco.mjtObj.mjOBJ_GEOM:
elif obj_type == mujoco.mjtObj.mjOBJ_GEOM: # type: ignore
msg.frame_name = self.mj_model.geom(obj_id).name
elif obj_type == mujoco.mjtObj.mjOBJ_CAMERA:
elif obj_type == mujoco.mjtObj.mjOBJ_CAMERA: # type: ignore
msg.frame_name = self.mj_model.cam(obj_id).name
else:
err_msg = (
Expand All @@ -242,13 +243,13 @@ def timer_callback() -> None:
else:
ref_type = self.mj_model.sensor_reftype[sensor_id]
ref_id = self.mj_model.sensor_refid[sensor_id]
if ref_type == mujoco.mjtObj.mjOBJ_SITE:
if ref_type == mujoco.mjtObj.mjOBJ_SITE: # type: ignore
msg.header.frame_id = self.mj_model.site(ref_id).name
elif ref_type == mujoco.mjtObj.mjOBJ_BODY:
elif ref_type == mujoco.mjtObj.mjOBJ_BODY: # type: ignore
msg.header.frame_id = self.mj_model.body(ref_id).name
elif ref_type == mujoco.mjtObj.mjOBJ_GEOM:
elif ref_type == mujoco.mjtObj.mjOBJ_GEOM: # type: ignore
msg.header.frame_id = self.mj_model.geom(ref_id).name
elif ref_type == mujoco.mjtObj.mjOBJ_CAMERA:
elif ref_type == mujoco.mjtObj.mjOBJ_CAMERA: # type: ignore
msg.header.frame_id = self.mj_model.cam(ref_id).name
else:
err_msg = (
Expand Down Expand Up @@ -302,19 +303,29 @@ def on_configure(self, state: LifecycleState) -> TransitionCallbackReturn: # no
field_names, value_names = ObeliskMujocoRobot._parse_config_str(self.mujoco_setting)

required_field_names = ["model_xml_path", "n_u"]
optional_field_names = ["time_step", "num_steps_per_viz", "sensor_settings"]
optional_field_names = ["robot_pkg", "time_step", "num_steps_per_viz", "sensor_settings"]
ObeliskMujocoRobot._check_fields(field_names, required_field_names, optional_field_names)
config_dict = dict(zip(field_names, value_names))

# load mujoco model
self.model_xml_path = config_dict["model_xml_path"]
self.robot_pkg = config_dict.get("robot_pkg", None)
assert isinstance(self.model_xml_path, str), "Model XML path must be a string!"
assert isinstance(self.robot_pkg, str) or self.robot_pkg is None, "Robot package must be a string or None!"
if not os.path.exists(self.model_xml_path):
if "OBELISK_ROOT" not in os.environ:
raise ValueError("OBELISK_ROOT environment variable not set. Run the dev_setup.sh script!")
model_xml_path = os.path.join(os.environ["OBELISK_ROOT"], "models", self.model_xml_path)
if self.robot_pkg is not None and self.robot_pkg.lower() != "none":
share_directory = get_package_share_directory(self.robot_pkg)
model_xml_path = os.path.join(share_directory, "mujoco", self.model_xml_path)
else:
self.get_logger().error(
"Provided Mujoco XML is NOT an absolute path and robot_pkg is None or not specified. "
"Please provide a valid Mujoco XML path."
)
return TransitionCallbackReturn.ERROR
else:
model_xml_path = self.model_xml_path

# initialize the mujoco model and data
self.mj_model = MjModel.from_xml_path(model_xml_path)
self.mj_data = MjData(self.mj_model)
mj_forward(self.mj_model, self.mj_data)
Expand Down
3 changes: 2 additions & 1 deletion obelisk_ws/src/obelisk_ros/config/dummy_py.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,11 @@ onboard:
non_obelisk: False
sim:
- ros_parameter: mujoco_setting
model_xml_path: dummy/dummy.xml
n_u: 1
time_step: 0.002
num_steps_per_viz: 5
robot_pkg: dummy_bot
model_xml_path: dummy.xml
sensor_settings:
- topic: /obelisk/dummy/joint_encoders
dt: 0.001
Expand Down

0 comments on commit de7ccd1

Please sign in to comment.