Source code for ManipulaPy.sim

#!/usr/bin/env python3
# SPDX-License-Identifier: AGPL-3.0-or-later
"""
Simulation Module - ManipulaPy

This module provides PyBullet-based simulation capabilities for robotic manipulators
including real-time visualization, physics simulation, and interactive control.

UPDATED VERSION with VISIBLE TRAJECTORY SPLINE:
- Replaced addUserDebugLine() with real capsule geometry
- Trajectory splines now appear in getCameraImage() screenshots
- Added proper cleanup for trajectory visualization

Copyright (c) 2025 Mohamed Aboelnasr
Licensed under the GNU Affero General Public License v3.0 or later (AGPL-3.0-or-later)

This file is part of ManipulaPy.

ManipulaPy is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.

ManipulaPy is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.

You should have received a copy of the GNU Affero General Public License
along with ManipulaPy. If not, see <https://www.gnu.org/licenses/>.
"""
import logging
import os
import time
from typing import Any, List, Optional, Sequence, Tuple

import matplotlib.pyplot as plt
import numpy as np

try:
    import cupy as cp  # Optional: CUDA acceleration

    _CUPY_AVAILABLE = True
except ImportError:

    class _NumpyProxy:
        """Small subset of CuPy's array interface backed by NumPy."""

        # Internal fallback shim: enough of cupy's surface for sim.py's own
        # call sites (cp.array, cp.asnumpy). Not a drop-in cupy replacement โ€”
        # cp.cuda.*, cp.ndarray identity (isinstance), and asnumpy keyword
        # arguments are unsupported. Do NOT import this as ManipulaPy.sim.cp
        # from external code; depend on cupy directly if you need GPU semantics.
        def __getattr__(self, name: str) -> Any:
            """Delegate attribute access to the NumPy module."""
            return getattr(np, name)

        def asnumpy(self, x: Any) -> np.ndarray:
            """Convert fallback arrays to NumPy arrays.

            Args:
                x: Any array-like object to coerce into a NumPy array.

            Returns:
                np.ndarray: ``x`` as a NumPy array (no copy when already one).
            """
            return np.asarray(x)

    cp = _NumpyProxy()
    _CUPY_AVAILABLE = False

try:
    import pybullet as p  # Required for Simulation; sim cannot run without it
    import pybullet_data

    _PYBULLET_AVAILABLE = True
except ImportError:
    p = None
    pybullet_data = None
    _PYBULLET_AVAILABLE = False


def _check_pybullet_available() -> None:
    """Raise a clear ImportError if pybullet is unavailable.

    __init__ already does this, but every public method that touches p.*
    needs the same check at runtime โ€” users can bypass __init__ via
    ``Simulation.__new__`` (tests do), or hot-swap the pybullet module after
    construction. Without this, those paths surface confusing
    ``AttributeError: 'NoneType' object has no attribute ...`` instead.
    """
    if not _PYBULLET_AVAILABLE or p is None:
        raise ImportError(
            "pybullet is required for this Simulation operation. "
            "Install with: pip install 'ManipulaPy[simulation]'"
        )


from ManipulaPy.control import ManipulatorController
from ManipulaPy.path_planning import TrajectoryPlanning as tp


[docs] class Simulation: """PyBullet-backed simulation and visualization helper for manipulators."""
[docs] def __init__( self, urdf_file_path: str, joint_limits: Sequence[Tuple[float, float]], torque_limits: Optional[Sequence[Any]] = None, time_step: float = 0.01, real_time_factor: float = 1.0, physics_client: Optional[int] = None, enable_self_collision: bool = False, disable_pairs: Optional[Sequence[Tuple[str, str]]] = None, ) -> None: """ Initialize the simulation and set up the PyBullet world. Args: urdf_file_path: Path to the robot's URDF file. joint_limits: Per-joint (lower, upper) position limits. torque_limits: Optional per-joint torque limits. time_step: Simulation time step in seconds. real_time_factor: Real-time playback factor. physics_client: Existing PyBullet client id to reuse, if any. enable_self_collision: Whether to enable self-collision checking. disable_pairs: Link name pairs to exclude from self-collision. """ if not _PYBULLET_AVAILABLE: raise ImportError( "Simulation requires pybullet. Install with: " "pip install 'ManipulaPy[simulation]'" ) self.urdf_file_path = urdf_file_path self.joint_limits = joint_limits self.torque_limits = torque_limits self.time_step = time_step self.real_time_factor = real_time_factor self.enable_self_collision = enable_self_collision self._disable_pairs = disable_pairs or [] self.logger = self.setup_logger() self.physics_client = physics_client self.joint_params = [] self.reset_button = None self.home_position = None # NEW: Track trajectory visualization bodies for cleanup self.trajectory_body_ids = [] self.setup_simulation()
[docs] def setup_logger(self) -> logging.Logger: """ Sets up the logger for the simulation. """ logger = logging.getLogger("SimulationLogger") logger.setLevel(logging.DEBUG) if not any(isinstance(h, logging.StreamHandler) for h in logger.handlers): ch = logging.StreamHandler() ch.setLevel(logging.DEBUG) ch.setFormatter( logging.Formatter( "%(asctime)s - %(name)s - %(levelname)s - %(message)s" ) ) logger.addHandler(ch) # Own our output; don't also bubble to the root handler (double logging) logger.propagate = False return logger
def _connect_client(self) -> int: """Connect to PyBullet, preferring GUI but falling back to DIRECT. Honors ``MANIPULAPY_PYBULLET_CONNECT`` (``GUI`` or ``DIRECT``) so headless environments (CI, servers) can force a windowless client. Otherwise it tries GUI and transparently falls back to DIRECT when no display is available, instead of leaving an invalid client that later surfaces as out-of-range joint-index errors. Returns: int: The connected PyBullet physics client id. """ mode = os.getenv("MANIPULAPY_PYBULLET_CONNECT", "").strip().upper() if mode == "DIRECT": return p.connect(p.DIRECT) if mode == "GUI": return p.connect(p.GUI) try: client = p.connect(p.GUI) if client < 0: raise RuntimeError("GUI connection returned an invalid client id") return client except Exception: self.logger.warning( "PyBullet GUI unavailable; falling back to DIRECT (headless) mode." ) return p.connect(p.DIRECT)
[docs] def connect_simulation(self) -> None: """ Connects to the PyBullet simulation. """ _check_pybullet_available() self.logger.info("Connecting to PyBullet simulation...") if self.physics_client is None: self.physics_client = self._connect_client() p.resetSimulation() # Clear the simulation environment p.setAdditionalSearchPath(pybullet_data.getDataPath()) p.setGravity(0, 0, -9.81) p.setTimeStep(self.time_step)
[docs] def disconnect_simulation(self) -> None: """ Disconnects from the PyBullet simulation. """ _check_pybullet_available() self.logger.info("Disconnecting from PyBullet simulation...") if self.physics_client is not None: p.disconnect() self.physics_client = None self.logger.info("Disconnected successfully.")
[docs] def setup_simulation(self) -> None: """ Sets up the simulation environment. """ _check_pybullet_available() if self.physics_client is None: self.physics_client = self._connect_client() p.resetSimulation() p.setAdditionalSearchPath(pybullet_data.getDataPath()) p.setGravity(0, 0, -9.81) p.setTimeStep(self.time_step) # Load the ground plane self.plane_id = p.loadURDF("plane.urdf") # Load the robot, enabling self-collision detection if requested load_flags = p.URDF_USE_SELF_COLLISION if self.enable_self_collision else 0 self.robot_id = p.loadURDF( self.urdf_file_path, useFixedBase=True, flags=load_flags ) # Identify non-fixed joints self.non_fixed_joints = [ i for i in range(p.getNumJoints(self.robot_id)) if p.getJointInfo(self.robot_id, i)[2] != p.JOINT_FIXED ] self.home_position = np.zeros(len(self.non_fixed_joints)) # Apply per-pair collision filter exclusions when self-collision is on if self.enable_self_collision and self._disable_pairs: # Build link-name -> pybullet link index map; joint info[12] is child link name link_name_to_idx = { p.getJointInfo(self.robot_id, i)[12].decode(): i for i in range(p.getNumJoints(self.robot_id)) } for name_a, name_b in self._disable_pairs: idx_a = link_name_to_idx.get(name_a) idx_b = link_name_to_idx.get(name_b) if idx_a is not None and idx_b is not None: p.setCollisionFilterPair( self.robot_id, self.robot_id, idx_a, idx_b, 0 ) else: self.logger.warning( "disable_pairs: could not resolve link names %r / %r to indices", name_a, name_b, )
[docs] def initialize_robot(self) -> None: """ Initializes the robot using the URDF processor. """ _check_pybullet_available() # Only skip URDF processing if self.robot is already set. if hasattr(self, "robot") and self.robot is not None: self.logger.warning("Robot already initialized. Skipping URDF processing.") else: # Even if self.robot_id is already set from setup_simulation(), # we need to process the URDF to set self.robot and self.dynamics. if not (hasattr(self, "robot_id") and self.robot_id is not None): load_flags = ( p.URDF_USE_SELF_COLLISION if self.enable_self_collision else 0 ) self.robot_id = p.loadURDF( self.urdf_file_path, [0, 0, 0.1], useFixedBase=True, flags=load_flags, ) # Process the URDF to generate the robot model and dynamics. from ManipulaPy.urdf_processor import URDFToSerialManipulator urdf_processor = URDFToSerialManipulator(self.urdf_file_path) self.robot = urdf_processor.serial_manipulator self.dynamics = urdf_processor.dynamics # Identify non-fixed joints self.non_fixed_joints = [ i for i in range(p.getNumJoints(self.robot_id)) if p.getJointInfo(self.robot_id, i)[2] != p.JOINT_FIXED ] self.home_position = np.zeros(len(self.non_fixed_joints))
def set_robot_models(self, robot: Any, dynamics: Any) -> None: """ Set pre-existing robot models to avoid reprocessing. Args: robot: SerialManipulator instance dynamics: ManipulatorDynamics instance """ self.robot = robot self.dynamics = dynamics self.logger.info("Pre-existing robot models set successfully.")
[docs] def initialize_planner_and_controller(self) -> None: """ Initializes the trajectory planner and the manipulator controller. """ self.trajectory_planner = tp( self.robot, self.urdf_file_path, self.dynamics, self.joint_limits, self.torque_limits, ) self.controller = ManipulatorController(self.dynamics)
[docs] def add_joint_parameters(self) -> None: """ Adds GUI sliders for each joint. """ _check_pybullet_available() if not self.joint_params: for i, joint_index in enumerate(self.non_fixed_joints): param_id = p.addUserDebugParameter( f"Joint {joint_index}", self.joint_limits[i][0], self.joint_limits[i][1], 0, ) self.joint_params.append(param_id)
[docs] def add_reset_button(self) -> None: """ Adds a reset button to the simulation. """ _check_pybullet_available() if self.reset_button is None: try: self.reset_button = p.addUserDebugParameter("Reset", 1, 0, 1) except Exception as e: self.logger.error(f"Failed to add reset button: {e}")
[docs] def set_joint_positions( self, joint_positions: Sequence[float], forces: Optional[Sequence[float]] = None ) -> None: """ Sets the joint positions of the robot. Drives the non-fixed joints toward ``joint_positions`` using PyBullet's ``POSITION_CONTROL`` mode. Args: joint_positions: Target angles for the non-fixed joints, in radians, one entry per non-fixed joint. forces: Optional per-joint maximum motor force. When ``None``, forces are derived from ``self.torque_limits`` (collapsing each (min, max) pair to its largest absolute magnitude) or default to ``1000.0`` for every joint when no torque limits are configured. """ _check_pybullet_available() n = len(self.non_fixed_joints) if forces is None: if getattr(self, "torque_limits", None) is not None: # PyBullet wants one scalar per joint, but torque_limits is # commonly a list of (min, max) pairs. Collapse each pair to # the maximum absolute magnitude so the motor can both push # and pull within the configured limits. torque_limits = np.asarray(self.torque_limits, dtype=float) if torque_limits.ndim == 2 and torque_limits.shape[1] == 2: forces = np.max(np.abs(torque_limits), axis=1).tolist() else: forces = torque_limits.tolist() else: forces = [1000.0] * n p.setJointMotorControlArray( self.robot_id, self.non_fixed_joints, p.POSITION_CONTROL, targetPositions=joint_positions, forces=forces, )
[docs] def get_joint_positions(self) -> np.ndarray: """ Gets the current joint positions of the robot. """ _check_pybullet_available() joint_positions = [ p.getJointState(self.robot_id, i)[0] for i in self.non_fixed_joints ] return np.array(joint_positions)
def _capsule_line( self, a: Sequence[float], b: Sequence[float], radius: float = 0.006, rgba: Sequence[float] = (1, 0.5, 0, 1), ) -> int: """ Create a thin capsule between point a and b; returns body-id. This creates REAL GEOMETRY that appears in getCameraImage() screenshots. Args: a: Start point [x, y, z] b: End point [x, y, z] radius: Capsule radius in world units rgba: Color as [r, g, b, a] where values are 0-1 Returns: int: PyBullet body ID, or -1 if failed """ a, b = np.array(a), np.array(b) v = b - a L = np.linalg.norm(v) if L < 1e-6: return -1 # Calculate orientation to align capsule with the line direction z = v / L # Direction vector # Find perpendicular vectors x = np.cross([0, 0, 1], z) if np.linalg.norm(x) < 1e-6: x = np.cross([0, 1, 0], z) x = x / np.linalg.norm(x) y = np.cross(z, x) # Calculate proper orientation for capsule # PyBullet capsules are aligned with Z-axis by default if abs(z[2]) > 0.99: # Nearly vertical orn = p.getQuaternionFromEuler([0, 0, 0]) else: # Calculate rotation to align Z-axis with direction vector angle = np.arccos(np.clip(z[2], -1, 1)) if angle > 1e-6: axis = np.cross([0, 0, 1], z) axis_norm = np.linalg.norm(axis) if axis_norm > 1e-6: axis = axis / axis_norm # Inline axis-angle to quaternion to avoid relying on # p.getQuaternionFromAxisAngle, which is missing from # several pybullet builds (cross-version portability). half = angle / 2.0 s = np.sin(half) orn = (axis[0] * s, axis[1] * s, axis[2] * s, np.cos(half)) else: orn = p.getQuaternionFromEuler([0, 0, 0]) else: orn = p.getQuaternionFromEuler([0, 0, 0]) # Midpoint of the line segment mid = (a + b) / 2 try: # Create collision and visual shapes col = p.createCollisionShape(p.GEOM_CAPSULE, radius=radius, height=L) vis = p.createVisualShape( p.GEOM_CAPSULE, radius=radius, length=L, rgbaColor=rgba ) # Create static body (mass=0) body_id = p.createMultiBody( baseMass=0, baseCollisionShapeIndex=col, baseVisualShapeIndex=vis, basePosition=mid, baseOrientation=orn, ) return body_id except Exception as e: self.logger.error(f"Failed to create capsule line: {e}") return -1
[docs] def plot_trajectory( self, ee_positions: Sequence[Sequence[float]], line_width: int = 3, color: Optional[List[float]] = None, ) -> List[int]: """ Plots the end-effector trajectory in PyBullet using REAL GEOMETRY. This method now creates actual 3D capsules that will appear in screenshots taken with getCameraImage(), unlike the previous addUserDebugLine() approach. Args: ee_positions: List of end-effector positions [[x,y,z], ...] line_width: Width factor for trajectory visualization color: RGB color as [r, g, b] where values are 0-1 Returns: list: Body IDs of created trajectory geometry (for cleanup) """ _check_pybullet_available() # Clear any existing trajectory bodies self.clear_trajectory_visualization() if len(ee_positions) < 2: self.logger.warning("Not enough positions to plot trajectory") return [] if color is None: color = [1, 0, 0] # Convert color to RGBA if len(color) == 3: rgba_color = color + [1.0] # Add alpha else: rgba_color = color # Calculate radius based on line_width (convert to world scale) base_radius = 0.003 # Base radius in world units radius = base_radius * (line_width / 3.0) # Scale with line_width trajectory_bodies = [] self.logger.info( f"Creating trajectory visualization with {len(ee_positions)} points" ) # Create capsule segments between consecutive points for i in range(1, len(ee_positions)): try: # Get consecutive points start_pos = ee_positions[i - 1] end_pos = ee_positions[i] # Create multiple parallel capsules for thickness effect for j in range(max(1, line_width // 2)): # Slight offset for thickness offset = j * 0.002 # Small offset in world units start_offset = [start_pos[0] + offset, start_pos[1], start_pos[2]] end_offset = [end_pos[0] + offset, end_pos[1], end_pos[2]] # Create capsule segment body_id = self._capsule_line( start_offset, end_offset, radius=radius, rgba=rgba_color ) if body_id != -1: trajectory_bodies.append(body_id) except Exception as e: self.logger.error(f"Failed to create trajectory segment {i}: {e}") # Store body IDs for cleanup self.trajectory_body_ids.extend(trajectory_bodies) # Add trajectory markers marker_bodies = self._add_trajectory_markers(ee_positions, rgba_color) self.trajectory_body_ids.extend(marker_bodies) self.logger.info( f"โœ… Created trajectory visualization: {len(trajectory_bodies)} segments + {len(marker_bodies)} markers" ) self.logger.info("๐ŸŽฏ Trajectory will now appear in screenshots as 3D geometry!") return trajectory_bodies
def _add_trajectory_markers( self, ee_positions: Sequence[Sequence[float]], color: Sequence[float] ) -> List[int]: """ Add START/END markers using real geometry. Args: ee_positions: List of end-effector positions color: RGBA color for markers Returns: list: Body IDs of created markers """ marker_bodies = [] try: # START marker (green sphere) start_visual = p.createVisualShape( shapeType=p.GEOM_SPHERE, radius=0.02, rgbaColor=[0.0, 1.0, 0.0, 1.0], # Green ) start_collision = p.createCollisionShape( shapeType=p.GEOM_SPHERE, radius=0.02 ) start_marker = p.createMultiBody( baseMass=0, baseCollisionShapeIndex=start_collision, baseVisualShapeIndex=start_visual, basePosition=[ ee_positions[0][0], ee_positions[0][1], ee_positions[0][2] + 0.05, ], ) marker_bodies.append(start_marker) # END marker (red sphere) end_visual = p.createVisualShape( shapeType=p.GEOM_SPHERE, radius=0.02, rgbaColor=[1.0, 0.0, 0.0, 1.0], # Red ) end_collision = p.createCollisionShape(shapeType=p.GEOM_SPHERE, radius=0.02) end_marker = p.createMultiBody( baseMass=0, baseCollisionShapeIndex=end_collision, baseVisualShapeIndex=end_visual, basePosition=[ ee_positions[-1][0], ee_positions[-1][1], ee_positions[-1][2] + 0.05, ], ) marker_bodies.append(end_marker) # Add intermediate waypoints if trajectory is long enough if len(ee_positions) > 10: waypoint_indices = [ len(ee_positions) // 4, len(ee_positions) // 2, 3 * len(ee_positions) // 4, ] for idx in waypoint_indices: if 0 <= idx < len(ee_positions): waypoint_visual = p.createVisualShape( shapeType=p.GEOM_SPHERE, radius=0.015, rgbaColor=[0.0, 0.0, 1.0, 1.0], # Blue ) waypoint_collision = p.createCollisionShape( shapeType=p.GEOM_SPHERE, radius=0.015 ) waypoint_marker = p.createMultiBody( baseMass=0, baseCollisionShapeIndex=waypoint_collision, baseVisualShapeIndex=waypoint_visual, basePosition=[ ee_positions[idx][0], ee_positions[idx][1], ee_positions[idx][2] + 0.03, ], ) marker_bodies.append(waypoint_marker) except Exception as e: self.logger.error(f"Failed to create trajectory markers: {e}") return marker_bodies def clear_trajectory_visualization(self) -> None: """ Clear all trajectory visualization bodies from the simulation. """ _check_pybullet_available() if hasattr(self, "trajectory_body_ids"): removed_count = 0 for body_id in self.trajectory_body_ids: try: p.removeBody(body_id) removed_count += 1 except Exception as e: self.logger.warning( f"Could not remove trajectory body {body_id}: {e}" ) if removed_count > 0: self.logger.info( f"๐Ÿงน Removed {removed_count} trajectory visualization bodies" ) self.trajectory_body_ids = []
[docs] def run_trajectory( self, joint_trajectory: Sequence[Sequence[float]] ) -> Tuple[float, float, float]: """ Runs a joint trajectory in the simulation. Iterates over each waypoint, commands position control, steps the simulation, records the end-effector position, and finally renders the traced end-effector path in the scene. Args: joint_trajectory: Sequence of joint-angle configurations (one per simulation step), each a sequence of joint angles in radians. Returns: tuple[float, float, float]: The (x, y, z) world position of the end-effector at the final waypoint. """ _check_pybullet_available() self.logger.info("Running trajectory...") ee_positions = [] for joint_positions in joint_trajectory: self.set_joint_positions(joint_positions) p.stepSimulation() # Get end-effector position ee_pos = p.getLinkState(self.robot_id, p.getNumJoints(self.robot_id) - 1)[4] ee_positions.append(ee_pos) time.sleep(self.time_step / self.real_time_factor) # Plot trajectory with REAL GEOMETRY that appears in screenshots self.plot_trajectory(ee_positions) self.logger.info("Trajectory completed.") return ee_positions[-1] # Return the last end-effector position
[docs] def run_controller( self, desired_positions: Sequence[Sequence[float]] ) -> Tuple[float, float, float]: """ Drive the robot through ``desired_positions`` in open-loop position control, one configuration per simulation step. For real closed-loop torque control, drive PyBullet's ``p.TORQUE_CONTROL`` mode directly in your own loop. The previous signature accepted a controller object plus PID gains; those were removed in v1.3.2 because the loop body never produced honest closed-loop behavior. See CHANGELOG. Args: desired_positions: Waypoints to track, shape ``(N, DOF)`` where DOF must equal the number of non-fixed joints; joint angles in radians. Returns: tuple[float, float, float]: The (x, y, z) world position of the end-effector at the final waypoint. Raises: ValueError: If ``desired_positions`` is empty, is not 2-D, or its joint count does not match the number of non-fixed joints. """ _check_pybullet_available() self.logger.info("Running controller...") ee_positions = [] positions_arr = np.asarray(list(desired_positions), dtype=float) if positions_arr.size == 0: raise ValueError("desired_positions is empty; nothing to track") if positions_arr.ndim != 2: raise ValueError( "desired_positions must have shape (N waypoints x DOF); " f"actual shape is {positions_arr.shape}" ) expected_dof = len(self.non_fixed_joints) actual_dof = positions_arr.shape[1] if actual_dof != expected_dof: raise ValueError( "desired_positions joint count mismatch: " f"expected {expected_dof}, got {actual_dof}" ) for pos in positions_arr: # Open-loop position tracking. Closed-loop torque control via # this method was always broken (treated torque as position delta). # For real closed-loop control, use p.TORQUE_CONTROL mode directly # in your own loop. See v1.3.2 CHANGELOG. self.set_joint_positions(pos) p.stepSimulation() # Get end-effector position ee_pos = p.getLinkState(self.robot_id, p.getNumJoints(self.robot_id) - 1)[4] ee_positions.append(ee_pos) time.sleep(self.time_step / self.real_time_factor) # Plot trajectory with REAL GEOMETRY that appears in screenshots self.plot_trajectory(ee_positions) self.logger.info("Controller run completed.") return ee_positions[-1] # Return the last end-effector position
[docs] def get_joint_parameters(self) -> List[float]: """ Gets the current values of the GUI sliders. """ _check_pybullet_available() return [p.readUserDebugParameter(param_id) for param_id in self.joint_params]
[docs] def simulate_robot_motion( self, desired_angles_trajectory: Sequence[Sequence[float]] ) -> Tuple[float, float, float]: """ Simulates the robot's motion using a given trajectory of desired joint angles. Commands each configuration via position control, steps the simulation, collects the end-effector positions, and plots the traced path. Args: desired_angles_trajectory: Sequence of desired joint-angle configurations (one per simulation step), each a sequence of joint angles in radians. Returns: tuple[float, float, float]: The (x, y, z) world position of the end-effector at the final configuration. """ _check_pybullet_available() self.logger.info("Simulating robot motion...") ee_positions = [] for joint_positions in desired_angles_trajectory: self.set_joint_positions(joint_positions) p.stepSimulation() # Get end-effector position ee_pos = p.getLinkState(self.robot_id, p.getNumJoints(self.robot_id) - 1)[4] ee_positions.append(ee_pos) time.sleep(self.time_step / self.real_time_factor) # Plot trajectory with REAL GEOMETRY that appears in screenshots self.plot_trajectory(ee_positions) self.logger.info("Robot motion simulation completed.") return ee_positions[-1] # Return the last end-effector position
[docs] def simulate_robot_with_desired_angles( self, desired_angles: Sequence[float] ) -> None: """ Simulates the robot using PyBullet with desired joint angles. Args: desired_angles (np.ndarray): Desired joint angles. """ _check_pybullet_available() self.logger.info("Simulating robot with desired joint angles...") p.setJointMotorControlArray( self.robot_id, self.non_fixed_joints, p.POSITION_CONTROL, targetPositions=desired_angles, forces=[1000] * len(desired_angles), ) time_step = 0.00001 p.setTimeStep(time_step) try: while True: p.stepSimulation() time.sleep(time_step / self.real_time_factor) except KeyboardInterrupt: self.logger.info("Simulation stopped by user.") self.logger.info("Robot simulation with desired angles completed.") self.close_simulation() except Exception: self.close_simulation() raise
[docs] def close_simulation(self) -> None: """ Closes the simulation. """ self.logger.info("Closing simulation...") # Clear trajectory visualization self.clear_trajectory_visualization() self.disconnect_simulation() self.logger.info("Simulation closed.")
[docs] def check_collisions(self) -> List[Tuple[int, int, Tuple[float, float, float]]]: """ Checks for self-collisions in the simulation and returns contacts. Returns: list of (linkA, linkB, position) tuples for each contact point. Empty list if no collisions or simulation not started. """ _check_pybullet_available() if self.robot_id is None: self.logger.warning( "Cannot check for collisions before simulation is started." ) return [] contacts = [] # PyBullet's per-joint linkIndexA filter excludes base-link contacts # (base index is -1, never a non-fixed joint). Query without filter to # catch base<->link pairs (the most common self-collision on folded arms). for pt in p.getContactPoints(self.robot_id, self.robot_id) or []: link_a, link_b, position = pt[3], pt[4], pt[5] contacts.append((link_a, link_b, position)) self.logger.warning( "Self-collision: link %s <-> link %s at %s", link_a, link_b, position ) return contacts
[docs] def step_simulation(self) -> None: """ Steps the simulation forward by one time step. """ _check_pybullet_available() self.logger.info("Setting up the simulation environment...") self.connect_simulation() self.add_additional_parameters()
[docs] def add_additional_parameters(self) -> None: """ Adds additional GUI parameters for controlling physics properties like gravity and time step. """ _check_pybullet_available() if not hasattr(self, "gravity_param"): self.gravity_param = p.addUserDebugParameter("Gravity", -20, 20, -9.81) if not hasattr(self, "time_step_param"): self.time_step_param = p.addUserDebugParameter( "Time Step", 0.001, 0.1, self.time_step )
[docs] def update_simulation_parameters(self) -> None: """ Updates simulation parameters from GUI controls. """ _check_pybullet_available() gravity = p.readUserDebugParameter(self.gravity_param) time_step = p.readUserDebugParameter(self.time_step_param) p.setGravity(0, 0, gravity) p.setTimeStep(time_step) self.time_step = time_step
[docs] def manual_control(self) -> None: """ Allows manual control of the robot through the PyBullet UI sliders. """ _check_pybullet_available() self.logger.info("Starting manual control...") if not self.joint_params: self.add_joint_parameters() # Ensure sliders are created self.add_additional_parameters() # Additional controls like gravity and time step # Add reset button if it doesn't exist if self.reset_button is None: self.add_reset_button() try: while True: joint_positions = self.get_joint_parameters() if len(joint_positions) != len(self.non_fixed_joints): raise ValueError( "Number of joint positions " f"({len(joint_positions)}) does not match number of " f"non-fixed joints ({len(self.non_fixed_joints)})." ) self.set_joint_positions(joint_positions) self.check_collisions() # Check for collisions in each step self.update_simulation_parameters() # Update simulation parameters p.stepSimulation() time.sleep(self.time_step / self.real_time_factor) # Check if reset button exists before reading it if ( self.reset_button is not None and p.readUserDebugParameter(self.reset_button) == 1 ): self.logger.info("Resetting simulation state...") self.set_joint_positions(self.home_position) break # Exit manual control to restart trajectory loop except KeyboardInterrupt: self.logger.info("Manual control stopped.") self.close_simulation() except Exception: self.close_simulation() raise
[docs] def save_joint_states(self, filename: str = "joint_states.csv") -> None: """ Saves the joint states to a CSV file. Args: filename (str): The filename for the CSV file. """ _check_pybullet_available() joint_states = [ p.getJointState(self.robot_id, i) for i in self.non_fixed_joints ] positions = [state[0] for state in joint_states] velocities = [state[1] for state in joint_states] data = np.column_stack((positions, velocities)) np.savetxt( filename, data, delimiter=",", header="Position,Velocity", comments="" ) self.logger.info(f"Joint states saved to {filename}.")
[docs] def plot_trajectory_in_scene( self, joint_trajectory: Sequence[Sequence[float]], end_effector_trajectory: Sequence[Sequence[float]], ) -> None: """ Plots the trajectory in the simulation scene. Renders the end-effector path as a 3-D Matplotlib line plot, then replays the joint trajectory in the PyBullet simulation. Args: joint_trajectory: Sequence of joint-angle configurations (one per simulation step) to replay, each a sequence of joint angles in radians. end_effector_trajectory: Sequence of end-effector positions to plot, each an (x, y, z) world-frame coordinate. """ _check_pybullet_available() self.logger.info("Plotting trajectory in simulation scene...") ee_positions = np.array(end_effector_trajectory) fig = plt.figure() ax = fig.add_subplot(111, projection="3d") ax.plot( ee_positions[:, 0], ee_positions[:, 1], ee_positions[:, 2], label="End-Effector Trajectory", ) ax.set_xlabel("X") ax.set_ylabel("Y") ax.set_zlabel("Z") plt.legend() plt.show() self.run_trajectory(joint_trajectory) self.logger.info("Trajectory plotted and simulation completed.")
[docs] def run(self, joint_trajectory: Sequence[Sequence[float]]) -> None: """ Main loop for running the simulation. Runs the given trajectory once, then waits for the GUI reset button and switches between trajectory, wait-for-reset, and manual control modes. Args: joint_trajectory: Sequence of joint-angle configurations (one per simulation step) to execute, each a sequence of joint angles in radians. """ try: reset_pressed = False mode = "trajectory" # Mode can be 'trajectory' or 'manual' while True: if mode == "trajectory": end_pos = self.run_trajectory(joint_trajectory) self.logger.info("Trajectory completed. Waiting for reset...") mode = "wait_reset" while mode == "wait_reset" and not reset_pressed: p.stepSimulation() time.sleep(0.01) if p.readUserDebugParameter(self.reset_button) > 0: self.logger.info( "Reset button pressed. Returning to home position and entering manual control..." ) self.set_joint_positions(self.home_position) mode = "manual" break if mode == "manual": self.manual_control() reset_pressed = False # Reset the flag to restart the trajectory mode = "trajectory" # Go back to trajectory mode except KeyboardInterrupt: self.logger.info("Simulation stopped by user.") self.close_simulation() except Exception: self.close_simulation() raise
def __del__(self) -> None: """ Destructor to clean up trajectory visualization when simulation is destroyed. """ try: if hasattr(self, "trajectory_body_ids"): self.clear_trajectory_visualization() except Exception: logger = getattr(self, "logger", None) if logger is not None: try: logger.debug( "Failed to clear trajectory visualization during cleanup.", exc_info=True, ) except Exception: pass