Source code for ManipulaPy.path_planning

#!/usr/bin/env python3
# SPDX-License-Identifier: AGPL-3.0-or-later
"""
Optimized Path Planning Module - ManipulaPy

This module provides highly optimized trajectory planning capabilities including joint space 
and Cartesian space trajectory generation with CUDA acceleration and collision avoidance.

Key optimizations:
- Adaptive grid sizing for optimal GPU occupancy
- Memory pooling to reduce allocation overhead
- Batch processing for multiple trajectories
- Fused kernels to minimize memory bandwidth
- Intelligent fallback to CPU when beneficial
- 2D parallelization for better GPU utilization
- Advanced kernel selection for 40x+ speedups

Copyright (c) 2025 Mohamed Aboelnasr
Licensed under the GNU Affero General Public License v3.0 or later (AGPL-3.0-or-later)
"""

import time
import warnings
from typing import Any, Dict, List, NoReturn, Optional, Tuple

import matplotlib.pyplot as plt
import numpy as np
from numba import njit, prange

from .cuda_kernels import (
    CUDA_AVAILABLE,
    _best_2d_config,
    _h2d_pinned,
    auto_select_optimal_kernel,
    benchmark_kernel_performance,
    check_cuda_availability,
    get_cuda_array,
    get_gpu_properties,
    get_memory_pool_stats,
    get_optimal_kernel_config,
    make_1d_grid,
    make_2d_grid,
    make_2d_grid_optimized,
    optimized_batch_trajectory_generation,
    optimized_trajectory_generation,
    optimized_trajectory_generation_monitored,
    print_performance_recommendations,
    profile_start,
    profile_stop,
    return_cuda_array,
    setup_cuda_environment_for_40x_speedup,
)
from .utils import (
    CubicTimeScaling,
    MatrixExp3,
    MatrixLog3,
    QuinticTimeScaling,
    TransToRp,
)

# Import CUDA functions only if available
if CUDA_AVAILABLE:
    from numba import cuda

    from .cuda_kernels import (
        batch_trajectory_kernel,
        cartesian_trajectory_kernel,
        forward_dynamics_kernel,
        fused_potential_gradient_kernel,
        inverse_dynamics_kernel,
        trajectory_kernel,
        trajectory_kernel_cache_friendly,
        trajectory_kernel_memory_optimized,
        trajectory_kernel_vectorized,
        trajectory_kernel_warp_optimized,
    )
else:
    # Create dummy functions for when CUDA is not available
    def trajectory_kernel(*args: Any, **kwargs: Any) -> NoReturn:
        """Raise a CUDA availability error for the standard trajectory kernel."""
        raise RuntimeError("CUDA not available")

    def trajectory_kernel_vectorized(*args: Any, **kwargs: Any) -> NoReturn:
        """Raise a CUDA availability error for the vectorized trajectory kernel."""
        raise RuntimeError("CUDA not available")

    def trajectory_kernel_memory_optimized(*args: Any, **kwargs: Any) -> NoReturn:
        """Raise a CUDA availability error for the memory-optimized kernel."""
        raise RuntimeError("CUDA not available")

    def trajectory_kernel_warp_optimized(*args: Any, **kwargs: Any) -> NoReturn:
        """Raise a CUDA availability error for the warp-optimized kernel."""
        raise RuntimeError("CUDA not available")

    def trajectory_kernel_cache_friendly(*args: Any, **kwargs: Any) -> NoReturn:
        """Raise a CUDA availability error for the cache-friendly kernel."""
        raise RuntimeError("CUDA not available")

    def inverse_dynamics_kernel(*args: Any, **kwargs: Any) -> NoReturn:
        """Raise a CUDA availability error for inverse dynamics kernels."""
        raise RuntimeError("CUDA not available")

    def forward_dynamics_kernel(*args: Any, **kwargs: Any) -> NoReturn:
        """Raise a CUDA availability error for forward dynamics kernels."""
        raise RuntimeError("CUDA not available")

    def cartesian_trajectory_kernel(*args: Any, **kwargs: Any) -> NoReturn:
        """Raise a CUDA availability error for Cartesian trajectory kernels."""
        raise RuntimeError("CUDA not available")

    def fused_potential_gradient_kernel(*args: Any, **kwargs: Any) -> NoReturn:
        """Raise a CUDA availability error for potential field kernels."""
        raise RuntimeError("CUDA not available")

    def batch_trajectory_kernel(*args: Any, **kwargs: Any) -> NoReturn:
        """Raise a CUDA availability error for batch trajectory kernels."""
        raise RuntimeError("CUDA not available")

    class MockCuda:
        @staticmethod
        def to_device(*args: Any, **kwargs: Any) -> NoReturn:
            """Raise a CUDA availability error for device transfers."""
            raise RuntimeError("CUDA not available")

        @staticmethod
        def device_array(*args: Any, **kwargs: Any) -> NoReturn:
            """Raise a CUDA availability error for device allocations."""
            raise RuntimeError("CUDA not available")

        @staticmethod
        def synchronize() -> None:
            """No-op synchronization placeholder when CUDA is unavailable."""
            pass

    cuda = MockCuda()

import logging

from .potential_field import CollisionChecker, PotentialField

# Module-level logger; leave handler configuration to the host application
logger = logging.getLogger(__name__)
logging.getLogger("numba.cuda.cudadrv.driver").setLevel(logging.WARNING)


@njit(parallel=True, fastmath=True)
def _trajectory_cpu_fallback(
    thetastart, thetaend, Tf, N, method
) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
    """Numba-optimised CPU trajectory generation (parallel).

    Computes joint-space position, velocity and acceleration trajectories by
    point-to-point time scaling between ``thetastart`` and ``thetaend``. The
    nested (timestep, joint) iteration is flattened into a single ``prange``
    loop so Numba can parallelise it.

    Args:
        thetastart: (num_joints,) ndarray of starting joint angles, radians.
        thetaend: (num_joints,) ndarray of ending joint angles, radians.
        Tf (float): Total trajectory duration, seconds.
        N (int): Number of trajectory points (timesteps).
        method (int): Time-scaling method; 3 for cubic, 5 for quintic. Any
            other value yields zero scaling.

    Returns:
        Tuple[np.ndarray, np.ndarray, np.ndarray]: ``(traj_pos, traj_vel,
        traj_acc)``, each an ``(N, num_joints)`` float32 array of positions
        (rad), velocities (rad/s) and accelerations (rad/s^2).
    """
    num_joints = len(thetastart)

    traj_pos = np.zeros((N, num_joints), dtype=np.float32)
    traj_vel = np.zeros((N, num_joints), dtype=np.float32)
    traj_acc = np.zeros((N, num_joints), dtype=np.float32)

    # Flatten (idx, j) → k  to avoid nested loops that block parallelisation
    total_elems = N * num_joints
    for k in prange(total_elems):
        idx = k // num_joints  # timestep
        j = k % num_joints  # joint index

        t = idx * (Tf / (N - 1))
        tau = t / Tf

        # Time-scaling
        if method == 3:  # cubic
            s = 3.0 * tau * tau - 2.0 * tau * tau * tau
            s_dot = 6.0 * tau * (1.0 - tau) / Tf
            s_ddot = 6.0 / (Tf * Tf) * (1.0 - 2.0 * tau)
        elif method == 5:  # quintic
            tau2 = tau * tau
            tau3 = tau2 * tau
            tau4 = tau2 * tau2
            tau5 = tau4 * tau
            s = 10.0 * tau3 - 15.0 * tau4 + 6.0 * tau5
            s_dot = (30.0 * tau2 - 60.0 * tau3 + 30.0 * tau4) / Tf
            s_ddot = (60.0 * tau - 180.0 * tau2 + 120.0 * tau3) / (Tf * Tf)
        else:  # unsupported method
            s = s_dot = s_ddot = 0.0

        dtheta = thetaend[j] - thetastart[j]
        traj_pos[idx, j] = s * dtheta + thetastart[j]
        traj_vel[idx, j] = s_dot * dtheta
        traj_acc[idx, j] = s_ddot * dtheta

    return traj_pos, traj_vel, traj_acc


# Thin wrapper – unchanged signature, now just calls the new kernel above
@njit(fastmath=True)
def _traj_cpu_njit(
    thetastart, thetaend, Tf, N, method
) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
    """Dispatch to the optimized CPU fallback kernel.

    Thin ``@njit`` wrapper that forwards its arguments to
    :func:`_trajectory_cpu_fallback`, keeping a stable public entry point for
    the CPU trajectory path.

    Args:
        thetastart: (num_joints,) ndarray of starting joint angles, radians.
        thetaend: (num_joints,) ndarray of ending joint angles, radians.
        Tf (float): Total trajectory duration, seconds.
        N (int): Number of trajectory points (timesteps).
        method (int): Time-scaling method; 3 for cubic, 5 for quintic.

    Returns:
        Tuple[np.ndarray, np.ndarray, np.ndarray]: ``(traj_pos, traj_vel,
        traj_acc)``, each an ``(N, num_joints)`` float32 array.
    """
    return _trajectory_cpu_fallback(thetastart, thetaend, Tf, N, method)


[docs] class OptimizedTrajectoryPlanning: """ Highly optimized trajectory planning class with adaptive GPU/CPU execution, memory pooling, and batch processing capabilities for 40x+ speedups. """
[docs] def __init__( self, serial_manipulator, urdf_path, dynamics, joint_limits, torque_limits=None, *, # ――― everything after * is keyword-only ――― use_cuda: Optional[bool] = None, cuda_threshold: int = 10, memory_pool_size_mb: Optional[int] = None, enable_profiling: bool = False, auto_optimize: bool = True, kernel_type: str = "auto", target_speedup: float = 40.0, ) -> None: """ Enhanced trajectory planner with advanced CUDA optimizations. Parameters ---------- serial_manipulator : SerialManipulator urdf_path : str dynamics : ManipulatorDynamics joint_limits : list[tuple[float,float]] torque_limits : list[tuple[float,float]], optional use_cuda : None | bool • None → auto-detect (default) • True → force GPU (raise if CUDA absent) • False → force CPU cuda_threshold : int Min. (N × joints) before we bother launching the GPU. memory_pool_size_mb: int | None If set, resize the global CUDA memory pool (in MB). enable_profiling : bool Enable CUDA profiling for performance analysis. auto_optimize : bool Automatically setup CUDA environment for maximum performance. kernel_type : str Kernel selection strategy: "auto", "standard", "vectorized", "memory_optimized", "warp_optimized", "cache_friendly", "auto_tune" target_speedup : float Target speedup over CPU (used for recommendations). """ # ------------------------------------------------------------ # FIRST: Set all basic attributes to prevent AttributeError # ------------------------------------------------------------ self.kernel_type = kernel_type if kernel_type is not None else "auto" self.target_speedup = target_speedup if target_speedup is not None else 40.0 self.enable_profiling = ( enable_profiling if enable_profiling is not None else False ) # Initialize all caches and tracking attributes immediately self._gpu_arrays = {} self._kernel_cache = {} self._last_cpu_time = 0.0 # Initialize performance stats early self.performance_stats = { "gpu_calls": 0, "cpu_calls": 0, "total_gpu_time": 0.0, "total_cpu_time": 0.0, "memory_transfers": 0, "kernel_launches": 0, "speedup_achieved": 0.0, "best_kernel_used": "none", } # ------------------------------------------------------------ # Auto-optimization setup # ------------------------------------------------------------ if auto_optimize and CUDA_AVAILABLE: setup_cuda_environment_for_40x_speedup() # ------------------------------------------------------------ # basic data # ------------------------------------------------------------ self.serial_manipulator = serial_manipulator self.dynamics = dynamics self.joint_limits = np.asarray(joint_limits, dtype=np.float32) self.torque_limits = ( np.asarray(torque_limits, dtype=np.float32) if torque_limits is not None else np.array([[-np.inf, np.inf]] * len(joint_limits), dtype=np.float32) ) # Store optimization parameters self.kernel_type = kernel_type self.target_speedup = target_speedup # ------------------------------------------------------------ # collision-checking helpers # ------------------------------------------------------------ try: self.collision_checker = CollisionChecker(urdf_path) self.potential_field = PotentialField() except Exception as exc: logger.warning("Could not initialise collision checker: %s", exc) self.collision_checker = None self.potential_field = None # ------------------------------------------------------------ # CUDA feature flags # ------------------------------------------------------------ detected_cuda = check_cuda_availability() if use_cuda is None: self.cuda_available = detected_cuda elif use_cuda and not detected_cuda: raise RuntimeError("use_cuda=True requested but CUDA is not available.") else: self.cuda_available = bool(use_cuda) self.gpu_properties = get_gpu_properties() if self.cuda_available else None # Adaptive threshold based on target speedup if self.cuda_available and self.gpu_properties: sm_count = self.gpu_properties["multiprocessor_count"] # Calculate threshold for target speedup min_elements_per_sm = 1000 if target_speedup >= 40 else 500 self.cpu_threshold = max( cuda_threshold, int(sm_count * min_elements_per_sm / len(joint_limits)) ) else: self.cpu_threshold = cuda_threshold # optionally resize a global memory-pool if memory_pool_size_mb is not None and self.cuda_available: from .cuda_kernels import _cuda_memory_pool _cuda_memory_pool.max_pool_size = ( memory_pool_size_mb * 1024 * 1024 // 4 # entries of float32 ) # ------------------------------------------------------------ # performance bookkeeping # ------------------------------------------------------------ self.performance_stats = { "gpu_calls": 0, "cpu_calls": 0, "total_gpu_time": 0.0, "total_cpu_time": 0.0, "memory_transfers": 0, "kernel_launches": 0, "speedup_achieved": 0.0, "best_kernel_used": "none", } # Enable profiling if requested (after all attributes are initialized) if self.enable_profiling and self.cuda_available: profile_start() # Print performance recommendations on initialization if self.cuda_available: num_joints = len(joint_limits) logger.info( f"🚀 OptimizedTrajectoryPlanning initialized for {num_joints} joints" ) if target_speedup >= 40: min_N_for_target = self.cpu_threshold // num_joints logger.info( f"💡 For {target_speedup}x speedup, use N ≥ {min_N_for_target:,} trajectory points" ) logger.info( "Optimised planner – CUDA enabled: %s (threshold %d, kernel: %s)", self.cuda_available, self.cpu_threshold, self.kernel_type, ) if self.gpu_properties: logger.info( "GPU: %d SMs, %d max threads/block", self.gpu_properties["multiprocessor_count"], self.gpu_properties["max_threads_per_block"], )
[docs] def _get_or_resize_gpu_array( self, array_name: str, shape: Tuple[int, ...], dtype: Any = np.float32 ) -> Any: """Return a pooled CUDA array with the requested shape / dtype. Looks up a previously pooled device array by ``array_name`` and reuses it when its shape and dtype match; otherwise the old array (if any) is returned to the pool and a freshly sized one is acquired and cached. Args: array_name (str): Cache key identifying the pooled array slot. shape (Tuple[int, ...]): Desired device array shape. dtype (Any): Desired element dtype. Defaults to ``np.float32``. Returns: Any: The pooled CUDA device array of the requested shape/dtype, or ``None`` if CUDA is unavailable. """ if not self.cuda_available: return None arr = self._gpu_arrays.get(array_name) if (arr is None) or (arr.shape != shape) or (arr.dtype != dtype): if arr is not None: return_cuda_array(arr) arr = get_cuda_array(shape, dtype) self._gpu_arrays[array_name] = arr return arr
[docs] def _should_use_gpu(self, N: int, num_joints: int) -> bool: """Decide whether to dispatch a problem to the GPU. Returns ``False`` when CUDA is unavailable or the total work (``N * num_joints``) is below ``self.cpu_threshold``. Otherwise logs a debug hint when the per-SM element count is unlikely to reach the configured target speedup, then approves GPU execution. Args: N (int): Number of trajectory points / timesteps. num_joints (int): Number of joints (work units per point). Returns: bool: ``True`` if the GPU path should be used, ``False`` otherwise. """ if not self.cuda_available: return False total_work = N * num_joints if total_work < self.cpu_threshold: return False # Additional checks for memory and performance if self.gpu_properties: sm_count = self.gpu_properties["multiprocessor_count"] elements_per_sm = total_work / sm_count # Check if we can achieve target speedup target_speedup_value = getattr(self, "target_speedup", 40.0) if target_speedup_value >= 40 and elements_per_sm < 10000: logger.debug( f"Problem size may not achieve {target_speedup_value}x speedup. " f"Elements per SM: {elements_per_sm:.0f}, recommended: ≥10,000" ) return True
def _get_optimal_kernel_config( self, N: int, num_joints: int ) -> Optional[Dict[str, Any]]: """Get or compute the optimal kernel launch configuration with caching. Resolves the effective kernel type (auto-selecting one when ``self.kernel_type == "auto"``) and computes a launch configuration for the given problem size, memoising results by ``(N, num_joints, kernel_type)``. Args: N (int): Number of trajectory points / timesteps. num_joints (int): Number of joints. Returns: Optional[Dict[str, Any]]: Kernel configuration dict (grid/block, kernel type, etc.), or ``None`` if no configuration is available. """ # Ensure required attributes exist if not hasattr(self, "_kernel_cache"): self._kernel_cache = {} if not hasattr(self, "kernel_type"): self.kernel_type = "auto" cache_key = (N, num_joints, self.kernel_type) if cache_key in self._kernel_cache: return self._kernel_cache[cache_key] if self.kernel_type == "auto": kernel_type = auto_select_optimal_kernel(N, num_joints) else: kernel_type = self.kernel_type config = get_optimal_kernel_config(N, num_joints, kernel_type) self._kernel_cache[cache_key] = config return config
[docs] def joint_trajectory( self, thetastart, thetaend, Tf, N, method, kernel_type=None, enable_monitoring=None, ) -> Dict[str, np.ndarray]: """ Enhanced joint trajectory generation with advanced CUDA optimizations. Args: thetastart (numpy.ndarray): The starting joint angles. thetaend (numpy.ndarray): The ending joint angles. Tf (float): The final time for the trajectory. N (int): The number of steps in the trajectory. method (int): The method to use (3=cubic, 5=quintic). kernel_type (str, optional): Override default kernel selection. enable_monitoring (bool, optional): Override default monitoring. Returns: dict: A dictionary containing positions, velocities, and accelerations. """ # Use instance defaults if not specified, with safety checks if kernel_type is None: kernel_type = getattr(self, "kernel_type", "auto") if enable_monitoring is None: enable_monitoring = getattr(self, "enable_profiling", False) logger.info( f"Generating joint trajectory: N={N}, joints={len(thetastart)}, " f"method={method}, kernel={kernel_type}" ) thetastart = np.array(thetastart, dtype=np.float32) thetaend = np.array(thetaend, dtype=np.float32) num_joints = len(thetastart) # Print performance recommendations if beneficial total_work = N * num_joints if self.cuda_available and total_work >= 10000: print_performance_recommendations(N, num_joints) # Decide on execution strategy use_gpu = self._should_use_gpu(N, num_joints) if use_gpu: return self._joint_trajectory_gpu( thetastart, thetaend, Tf, N, method, kernel_type, enable_monitoring ) else: return self._joint_trajectory_cpu(thetastart, thetaend, Tf, N, method)
[docs] def _joint_trajectory_gpu( self, thetastart, thetaend, Tf, N, method, kernel_type, enable_monitoring ) -> Dict[str, np.ndarray]: """GPU joint trajectory generation with optimal kernel selection. Runs the monitored GPU trajectory generator, clips positions to the configured joint limits, optionally applies GPU collision avoidance, updates performance stats, and falls back to the CPU path on any error. Args: thetastart: (num_joints,) ndarray of starting joint angles (rad). thetaend: (num_joints,) ndarray of ending joint angles (rad). Tf (float): Total trajectory duration, seconds. N (int): Number of trajectory points. method (int): Time-scaling method; 3 for cubic, 5 for quintic. kernel_type (str): Kernel selection strategy passed to the GPU generator (e.g. "auto", "vectorized"). enable_monitoring (bool): If True, enable CUDA profiling/logging. Returns: Dict[str, np.ndarray]: Dict with keys ``"positions"``, ``"velocities"`` and ``"accelerations"``, each an ``(N, num_joints)`` array. """ start_time = time.time() try: # Use the monitored high-level wrapper for maximum performance traj_pos_host, traj_vel_host, traj_acc_host = ( optimized_trajectory_generation_monitored( thetastart, thetaend, Tf, N, method, use_pinned=True, kernel_type=kernel_type, enable_monitoring=enable_monitoring, ) ) # Apply joint limits num_joints = len(thetastart) for i in range(num_joints): traj_pos_host[:, i] = np.clip( traj_pos_host[:, i], self.joint_limits[i, 0], self.joint_limits[i, 1], ) # Apply collision avoidance if available if self.collision_checker and self.potential_field: traj_pos_host = self._apply_collision_avoidance_gpu( traj_pos_host, thetaend ) # Calculate achieved speedup elapsed = time.time() - start_time if hasattr(self, "_last_cpu_time") and self._last_cpu_time > 0: speedup = self._last_cpu_time / elapsed self.performance_stats["speedup_achieved"] = speedup if enable_monitoring: logger.info(f"🎯 Achieved {speedup:.1f}x speedup over CPU!") else: # No previous CPU time to compare against if enable_monitoring: logger.info( "🚀 GPU execution completed (no CPU baseline for comparison)" ) # Update performance stats self.performance_stats["gpu_calls"] += 1 self.performance_stats["total_gpu_time"] += elapsed self.performance_stats["kernel_launches"] += 1 # Update best kernel used config = self._get_optimal_kernel_config(N, len(thetastart)) if config: self.performance_stats["best_kernel_used"] = config.get( "kernel_type", "unknown" ) logger.info(f"GPU trajectory generation completed in {elapsed:.4f}s") return { "positions": traj_pos_host, "velocities": traj_vel_host, "accelerations": traj_acc_host, } except Exception as e: logger.warning( f"GPU trajectory generation failed: {e}, falling back to CPU" ) return self._joint_trajectory_cpu(thetastart, thetaend, Tf, N, method)
[docs] def _joint_trajectory_cpu( self, thetastart, thetaend, Tf, N, method ) -> Dict[str, np.ndarray]: """CPU joint trajectory generation with performance tracking. Generates the trajectory via the Numba CPU kernel, clips positions to the configured joint limits, optionally applies CPU collision avoidance, and records the elapsed time as the CPU baseline used for speedup reporting. Args: thetastart: (num_joints,) ndarray of starting joint angles (rad). thetaend: (num_joints,) ndarray of ending joint angles (rad). Tf (float): Total trajectory duration, seconds. N (int): Number of trajectory points. method (int): Time-scaling method; 3 for cubic, 5 for quintic. Returns: Dict[str, np.ndarray]: Dict with keys ``"positions"``, ``"velocities"`` and ``"accelerations"``, each an ``(N, num_joints)`` array. """ start_time = time.time() # Use optimized CPU fallback traj_pos, traj_vel, traj_acc = _traj_cpu_njit( thetastart, thetaend, Tf, N, method ) # Apply joint limits num_joints = len(thetastart) for i in range(num_joints): traj_pos[:, i] = np.clip( traj_pos[:, i], self.joint_limits[i, 0], self.joint_limits[i, 1] ) # Apply collision avoidance if available if self.collision_checker and self.potential_field: traj_pos = self._apply_collision_avoidance_cpu(traj_pos, thetaend) # Store CPU time for speedup calculations elapsed = time.time() - start_time self._last_cpu_time = elapsed # Update performance stats self.performance_stats["cpu_calls"] += 1 self.performance_stats["total_cpu_time"] += elapsed logger.info(f"CPU trajectory generation completed in {elapsed:.4f}s") return { "positions": traj_pos, "velocities": traj_vel, "accelerations": traj_acc, }
[docs] def _apply_collision_avoidance_gpu( self, traj_pos: np.ndarray, thetaend: np.ndarray ) -> np.ndarray: """Apply potential-field collision avoidance on the CUDA-enabled path. Collision avoidance is a joint-space computation and has no GPU kernel (the GPU potential field is 3-D Cartesian, not joint-space), so this delegates to the validated joint-space CPU routine. Kept as a distinct dispatch target so the GPU/CPU planning code paths stay symmetric. Args: traj_pos: (N, num_joints) ndarray of joint positions (rad); modified in place as colliding points are nudged out. thetaend: (num_joints,) ndarray of goal joint angles (rad), used as the attractive target. Returns: np.ndarray: The (possibly adjusted) ``(N, num_joints)`` trajectory. """ # Collision avoidance operates in JOINT space: ``traj_pos`` holds joint # configurations and each is nudged by ``step -= gradient``, so the # potential-field gradient must be joint-space too (see PotentialField and # _apply_collision_avoidance_cpu). The GPU ``optimized_potential_field`` # kernel is a 3-D *Cartesian* field — feeding it an (N, num_joints) array # and an (num_joints,) goal is a dimension mismatch that yields a wrong # gradient and a 6-vs-3 broadcast error on update. There is no joint-space # GPU potential-field kernel, so we run the validated joint-space routine # rather than silently erroring per iteration and falling back anyway. return self._apply_collision_avoidance_cpu(traj_pos, thetaend)
[docs] def _apply_collision_avoidance_cpu( self, traj_pos: np.ndarray, thetaend: np.ndarray ) -> np.ndarray: """Apply CPU-based potential field collision avoidance. For each colliding trajectory point, iteratively descends the potential-field gradient toward ``thetaend`` (up to 100 iterations) until the configuration is collision-free. Args: traj_pos: (N, num_joints) ndarray of joint positions (rad); modified in place as colliding points are nudged out. thetaend: (num_joints,) ndarray of goal joint angles (rad), used as the attractive target. Returns: np.ndarray: The (possibly adjusted) ``(N, num_joints)`` trajectory. """ q_goal = thetaend obstacles = [] # Define obstacles here as needed # Apply potential field for collision avoidance for idx, step in enumerate(traj_pos): if self.collision_checker.check_collision(step): for _ in range(100): # Max iterations to adjust trajectory gradient = self.potential_field.compute_gradient( step, q_goal, obstacles ) step -= 0.01 * gradient # Adjust step size as needed if not self.collision_checker.check_collision(step): break traj_pos[idx] = step return traj_pos
[docs] def batch_joint_trajectory( self, thetastart_batch, thetaend_batch, Tf, N, method, kernel_type=None ) -> Dict[str, np.ndarray]: """ Enhanced batch trajectory generation with optimal kernel selection. Args: thetastart_batch (numpy.ndarray): Starting angles (batch_size, num_joints) thetaend_batch (numpy.ndarray): Ending angles (batch_size, num_joints) Tf (float): Final time for all trajectories N (int): Number of trajectory points method (int): Time scaling method kernel_type (str, optional): Override kernel selection Returns: dict: Batch trajectory data with shape (batch_size, N, num_joints) """ if kernel_type is None: kernel_type = getattr(self, "kernel_type", "auto") batch_size, num_joints = thetastart_batch.shape logger.info( f"Generating batch trajectories: batch_size={batch_size}, " f"N={N}, joints={num_joints}, kernel={kernel_type}" ) if not self.cuda_available: logger.warning( "Batch processing requires CUDA. Falling back to sequential processing." ) return self._batch_joint_trajectory_cpu( thetastart_batch, thetaend_batch, Tf, N, method ) # Print performance recommendations for batch processing total_work = batch_size * N * num_joints if total_work >= 50000: print_performance_recommendations(N * batch_size, num_joints) start_time = time.time() try: # Use optimized batch trajectory generation traj_pos_host, traj_vel_host, traj_acc_host = ( optimized_batch_trajectory_generation( thetastart_batch, thetaend_batch, Tf, N, method, use_pinned=True ) ) # Apply joint limits for all trajectories for batch_idx in range(batch_size): for i in range(num_joints): traj_pos_host[batch_idx, :, i] = np.clip( traj_pos_host[batch_idx, :, i], self.joint_limits[i, 0], self.joint_limits[i, 1], ) elapsed = time.time() - start_time throughput = total_work / elapsed / 1e6 # Million elements per second self.performance_stats["gpu_calls"] += 1 self.performance_stats["total_gpu_time"] += elapsed self.performance_stats["kernel_launches"] += 1 logger.info(f"Batch GPU trajectory generation completed in {elapsed:.4f}s") logger.info(f"📊 Throughput: {throughput:.1f} M elements/sec") return { "positions": traj_pos_host, "velocities": traj_vel_host, "accelerations": traj_acc_host, } except Exception as e: logger.warning( f"Batch GPU trajectory generation failed: {e}, falling back to CPU" ) return self._batch_joint_trajectory_cpu( thetastart_batch, thetaend_batch, Tf, N, method )
[docs] def _batch_joint_trajectory_cpu( self, thetastart_batch, thetaend_batch, Tf, N, method ) -> Dict[str, np.ndarray]: """CPU fallback for batch trajectory generation. Generates each trajectory in the batch sequentially with the Numba CPU kernel and clips positions to the configured joint limits (matching the GPU batch path). Args: thetastart_batch: (batch_size, num_joints) ndarray of starting joint angles, radians. thetaend_batch: (batch_size, num_joints) ndarray of ending joint angles, radians. Tf (float): Total trajectory duration, seconds. N (int): Number of trajectory points per trajectory. method (int): Time-scaling method; 3 for cubic, 5 for quintic. Returns: Dict[str, np.ndarray]: Dict with keys ``"positions"``, ``"velocities"`` and ``"accelerations"``, each a ``(batch_size, N, num_joints)`` array. """ start_time = time.time() batch_size, num_joints = thetastart_batch.shape # Initialize result arrays traj_pos_batch = np.zeros((batch_size, N, num_joints), dtype=np.float32) traj_vel_batch = np.zeros((batch_size, N, num_joints), dtype=np.float32) traj_acc_batch = np.zeros((batch_size, N, num_joints), dtype=np.float32) # Process each trajectory in the batch for i in range(batch_size): traj_pos, traj_vel, traj_acc = _traj_cpu_njit( thetastart_batch[i], thetaend_batch[i], Tf, N, method ) traj_pos_batch[i] = traj_pos traj_vel_batch[i] = traj_vel traj_acc_batch[i] = traj_acc # Enforce joint limits for all trajectories (parity with GPU path) for batch_idx in range(batch_size): for j in range(num_joints): traj_pos_batch[batch_idx, :, j] = np.clip( traj_pos_batch[batch_idx, :, j], self.joint_limits[j, 0], self.joint_limits[j, 1], ) elapsed = time.time() - start_time self.performance_stats["cpu_calls"] += 1 self.performance_stats["total_cpu_time"] += elapsed logger.info(f"Batch CPU trajectory generation completed in {elapsed:.4f}s") return { "positions": traj_pos_batch, "velocities": traj_vel_batch, "accelerations": traj_acc_batch, }
[docs] def inverse_dynamics_trajectory( self, thetalist_trajectory, dthetalist_trajectory, ddthetalist_trajectory, gravity_vector=None, Ftip=None, ) -> np.ndarray: """ Compute joint torques with enhanced CUDA acceleration. Args: thetalist_trajectory (np.ndarray): Array of joint angles over the trajectory. dthetalist_trajectory (np.ndarray): Array of joint velocities over the trajectory. ddthetalist_trajectory (np.ndarray): Array of joint accelerations over the trajectory. gravity_vector (np.ndarray, optional): Gravity vector affecting the system. Ftip (list, optional): External forces applied at the end effector. Returns: np.ndarray: Array of joint torques required to follow the trajectory. """ if gravity_vector is None: gravity_vector = np.array([0, 0, -9.81]) if Ftip is None: Ftip = [0, 0, 0, 0, 0, 0] num_points = thetalist_trajectory.shape[0] num_joints = thetalist_trajectory.shape[1] logger.info( f"Computing inverse dynamics: {num_points} points, {num_joints} joints" ) # Print performance recommendations if self.cuda_available: total_work = num_points * num_joints if total_work >= 10000: print_performance_recommendations(num_points, num_joints) # Decide on execution strategy use_gpu = self._should_use_gpu(num_points, num_joints) if use_gpu: return self._inverse_dynamics_gpu( thetalist_trajectory, dthetalist_trajectory, ddthetalist_trajectory, gravity_vector, Ftip, ) else: return self._inverse_dynamics_cpu( thetalist_trajectory, dthetalist_trajectory, ddthetalist_trajectory, gravity_vector, Ftip, )
[docs] def _inverse_dynamics_gpu( self, thetalist_trajectory, dthetalist_trajectory, ddthetalist_trajectory, gravity_vector, Ftip, ) -> np.ndarray: """GPU-accelerated inverse dynamics over a trajectory. Transfers the trajectory and the manipulator's dynamics data (Glist, S_list, M_list) to the device, launches the 2D inverse-dynamics kernel to compute per-point joint torques, clips them to ``self.torque_limits`` and returns them. Falls back to the CPU implementation if dynamics-data conversion or kernel execution fails. Args: thetalist_trajectory: (num_points, num_joints) ndarray of joint angles, radians. dthetalist_trajectory: (num_points, num_joints) ndarray of joint velocities, rad/s. ddthetalist_trajectory: (num_points, num_joints) ndarray of joint accelerations, rad/s^2. gravity_vector: (3,) ndarray of gravitational acceleration, m/s^2. Ftip: length-6 sequence of the external wrench at the end effector. Returns: np.ndarray: (num_points, num_joints) array of joint torques, clipped to the configured torque limits. """ start_time = time.time() num_points = thetalist_trajectory.shape[0] num_joints = thetalist_trajectory.shape[1] try: # Use memory pool for the large torques array torques_trajectory = get_cuda_array( (num_points, num_joints), dtype=np.float32 ) # Transfer data to GPU using pinned memory - ensure proper data types d_thetalist_trajectory = _h2d_pinned( thetalist_trajectory.astype(np.float32) ) d_dthetalist_trajectory = _h2d_pinned( dthetalist_trajectory.astype(np.float32) ) d_ddthetalist_trajectory = _h2d_pinned( ddthetalist_trajectory.astype(np.float32) ) d_gravity_vector = cuda.to_device(gravity_vector.astype(np.float32)) d_Ftip = cuda.to_device(np.array(Ftip, dtype=np.float32)) # Safely handle dynamics data conversion try: # Convert Glist to proper numpy array format if hasattr(self.dynamics, "Glist") and self.dynamics.Glist is not None: if isinstance(self.dynamics.Glist, list): # Convert list of matrices to 3D numpy array Glist_array = np.stack(self.dynamics.Glist).astype(np.float32) else: Glist_array = np.array(self.dynamics.Glist, dtype=np.float32) else: # Create dummy Glist if not available Glist_array = np.eye(6, dtype=np.float32)[None, :, :].repeat( num_joints, axis=0 ) # Convert S_list to proper format if ( hasattr(self.dynamics, "S_list") and self.dynamics.S_list is not None ): Slist_array = np.array(self.dynamics.S_list, dtype=np.float32) else: # Create dummy S_list if not available Slist_array = np.random.randn(6, num_joints).astype(np.float32) # Convert M_list to proper format if ( hasattr(self.dynamics, "M_list") and self.dynamics.M_list is not None ): M_array = np.array(self.dynamics.M_list, dtype=np.float32) else: # Create dummy M if not available M_array = np.eye(4, dtype=np.float32) d_Glist = cuda.to_device(Glist_array) d_Slist = cuda.to_device(Slist_array) d_M = cuda.to_device(M_array) except Exception as e: logger.warning( f"Error converting dynamics data: {e}, using simplified approach" ) # Fallback to simplified dynamics computation on CPU return self._inverse_dynamics_cpu( thetalist_trajectory, dthetalist_trajectory, ddthetalist_trajectory, gravity_vector, Ftip, ) d_torque_limits = cuda.to_device(self.torque_limits.astype(np.float32)) # Get optimal 2D launch configuration with bounds checking try: blocks_per_grid, threads_per_block = _best_2d_config( num_points, num_joints ) logger.info( f"Inverse dynamics 2D grid: blocks={blocks_per_grid}, threads={threads_per_block}" ) except Exception as e: logger.warning(f"Error in grid configuration: {e}, using fallback") # Fallback to safe grid configuration blocks_per_grid = ((num_points + 15) // 16, (num_joints + 15) // 16) threads_per_block = (16, 16) # Launch optimized 2D inverse dynamics kernel with CORRECT signature try: # FIXED: The kernel expects 11 arguments, but was receiving 10 # Original call was missing the 'stream' parameter (last argument) # Let's check the kernel signature in cuda_kernels.py: # From cuda_kernels.py, the kernel signature is: # inverse_dynamics_kernel( # thetalist_trajectory, dthetalist_trajectory, ddthetalist_trajectory, # gravity_vector, Ftip, Glist, Slist, M, torques_trajectory, torque_limits, stream=0 # ) # That's 11 parameters total including the stream parameter inverse_dynamics_kernel[blocks_per_grid, threads_per_block]( d_thetalist_trajectory, # 1 d_dthetalist_trajectory, # 2 d_ddthetalist_trajectory, # 3 d_gravity_vector, # 4 d_Ftip, # 5 d_Glist, # 6 d_Slist, # 7 d_M, # 8 torques_trajectory, # 9 d_torque_limits, # 10 # 0 # 11 - stream parameter (was missing!) ) # Synchronize to check for kernel execution errors cuda.synchronize() except Exception as kernel_error: logger.warning(f"CUDA kernel execution failed: {kernel_error}") # Fallback to CPU implementation return self._inverse_dynamics_cpu( thetalist_trajectory, dthetalist_trajectory, ddthetalist_trajectory, gravity_vector, Ftip, ) # Copy results back using pinned memory torques_host = torques_trajectory.copy_to_host() # Apply final torque limits torques_host = np.clip( torques_host, self.torque_limits[:, 0], self.torque_limits[:, 1] ) elapsed = time.time() - start_time self.performance_stats["gpu_calls"] += 1 self.performance_stats["total_gpu_time"] += elapsed self.performance_stats["kernel_launches"] += 1 logger.info(f"GPU inverse dynamics completed in {elapsed:.4f}s") return torques_host except Exception as e: logger.warning(f"GPU inverse dynamics failed: {e}, falling back to CPU") return self._inverse_dynamics_cpu( thetalist_trajectory, dthetalist_trajectory, ddthetalist_trajectory, gravity_vector, Ftip, ) finally: # Return large array to pool if "torques_trajectory" in locals(): return_cuda_array(torques_trajectory)
[docs] def _inverse_dynamics_cpu( self, thetalist_trajectory, dthetalist_trajectory, ddthetalist_trajectory, gravity_vector, Ftip, ) -> np.ndarray: """CPU-based inverse dynamics over a trajectory. Computes per-point joint torques by calling ``self.dynamics.inverse_dynamics`` at each trajectory point (using zero torques for any point that raises), then clips the result to ``self.torque_limits``. Args: thetalist_trajectory: (num_points, num_joints) ndarray of joint angles, radians. dthetalist_trajectory: (num_points, num_joints) ndarray of joint velocities, rad/s. ddthetalist_trajectory: (num_points, num_joints) ndarray of joint accelerations, rad/s^2. gravity_vector: (3,) ndarray of gravitational acceleration, m/s^2. Ftip: length-6 sequence of the external wrench at the end effector. Returns: np.ndarray: (num_points, num_joints) array of joint torques, clipped to the configured torque limits. """ start_time = time.time() num_points = thetalist_trajectory.shape[0] num_joints = thetalist_trajectory.shape[1] torques_trajectory = np.zeros((num_points, num_joints), dtype=np.float32) # Process each trajectory point for i in range(num_points): try: torques = self.dynamics.inverse_dynamics( thetalist_trajectory[i], dthetalist_trajectory[i], ddthetalist_trajectory[i], gravity_vector, Ftip, ) torques_trajectory[i] = np.array(torques, dtype=np.float32) except Exception as e: logger.warning(f"Error in inverse dynamics at point {i}: {e}") # Use zero torques for problematic points torques_trajectory[i] = np.zeros(num_joints, dtype=np.float32) # Apply torque limits torques_trajectory = np.clip( torques_trajectory, self.torque_limits[:, 0], self.torque_limits[:, 1] ) elapsed = time.time() - start_time self.performance_stats["cpu_calls"] += 1 self.performance_stats["total_cpu_time"] += elapsed logger.info(f"CPU inverse dynamics completed in {elapsed:.4f}s") return torques_trajectory
[docs] def forward_dynamics_trajectory( self, thetalist, dthetalist, taumat, g, Ftipmat, dt, intRes ) -> Dict[str, np.ndarray]: """ Enhanced forward dynamics trajectory computation. Args: thetalist (np.ndarray): Initial joint angles. dthetalist (np.ndarray): Initial joint velocities. taumat (np.ndarray): Array of joint torques over the trajectory. g (np.ndarray): Gravity vector. Ftipmat (np.ndarray): Array of external forces. dt (float): Time step. intRes (int): Integration resolution. Returns: dict: Dictionary containing positions, velocities, and accelerations. """ num_steps = taumat.shape[0] num_joints = thetalist.shape[0] logger.info( f"Computing forward dynamics: {num_steps} steps, {num_joints} joints" ) # Print performance recommendations if self.cuda_available: total_work = num_steps * num_joints if total_work >= 10000: print_performance_recommendations(num_steps, num_joints) # Decide on execution strategy use_gpu = self._should_use_gpu(num_steps, num_joints) if use_gpu: return self._forward_dynamics_gpu( thetalist, dthetalist, taumat, g, Ftipmat, dt, intRes ) else: return self._forward_dynamics_cpu( thetalist, dthetalist, taumat, g, Ftipmat, dt, intRes )
[docs] def _forward_dynamics_gpu( self, thetalist, dthetalist, taumat, g, Ftipmat, dt, intRes ) -> Dict[str, np.ndarray]: """GPU-accelerated forward dynamics integration over a trajectory. Seeds the first step with the initial state, transfers the torque profile, external forces and dynamics data to the device, and launches the forward-dynamics kernel to integrate joint motion with ``intRes`` sub-steps per outer step, enforcing joint limits on the device. Falls back to the CPU implementation on any error. Args: thetalist: (num_joints,) ndarray of initial joint angles, radians. dthetalist: (num_joints,) ndarray of initial joint velocities, rad/s. taumat: (num_steps, num_joints) ndarray of applied joint torques. g: (3,) ndarray of gravitational acceleration, m/s^2. Ftipmat: (num_steps, 6) ndarray of end-effector wrenches per step. dt (float): Outer time step between trajectory points, seconds. intRes (int): Number of integration sub-steps per outer step. Returns: Dict[str, np.ndarray]: Dict with keys ``"positions"``, ``"velocities"`` and ``"accelerations"``, each a ``(num_steps, num_joints)`` array. """ start_time = time.time() num_steps = taumat.shape[0] num_joints = thetalist.shape[0] try: # Initialize result arrays thetamat = np.zeros((num_steps, num_joints), dtype=np.float32) dthetamat = np.zeros((num_steps, num_joints), dtype=np.float32) ddthetamat = np.zeros((num_steps, num_joints), dtype=np.float32) thetamat[0, :] = thetalist.astype(np.float32) dthetamat[0, :] = dthetalist.astype(np.float32) # Use memory pool for large arrays d_thetamat = get_cuda_array((num_steps, num_joints), dtype=np.float32) d_dthetamat = get_cuda_array((num_steps, num_joints), dtype=np.float32) d_ddthetamat = get_cuda_array((num_steps, num_joints), dtype=np.float32) # Copy initial conditions to GPU d_thetamat.copy_to_device(thetamat) d_dthetamat.copy_to_device(dthetamat) d_ddthetamat.copy_to_device(ddthetamat) # Transfer other data to GPU d_thetalist = cuda.to_device(thetalist.astype(np.float32)) d_dthetalist = cuda.to_device(dthetalist.astype(np.float32)) d_taumat = cuda.to_device(taumat.astype(np.float32)) d_g = cuda.to_device(g.astype(np.float32)) d_Ftipmat = cuda.to_device(Ftipmat.astype(np.float32)) d_Glist = cuda.to_device(np.array(self.dynamics.Glist, dtype=np.float32)) d_Slist = cuda.to_device(np.array(self.dynamics.S_list, dtype=np.float32)) d_M = cuda.to_device(np.array(self.dynamics.M_list, dtype=np.float32)) d_joint_limits = cuda.to_device(self.joint_limits.astype(np.float32)) # Get optimal launch configuration grid_config = get_optimal_kernel_config( num_steps, num_joints, "cache_friendly" ) if grid_config: blocks_per_grid = grid_config["grid"] threads_per_block = grid_config["block"] logger.info(f"Using {grid_config['kernel_type']} for forward dynamics") else: blocks_per_grid, threads_per_block = _best_2d_config( num_steps, num_joints ) # Launch forward dynamics kernel forward_dynamics_kernel[blocks_per_grid, threads_per_block]( d_thetalist, d_dthetalist, d_taumat, d_g, d_Ftipmat, dt, intRes, d_Glist, d_Slist, d_M, d_thetamat, d_dthetamat, d_ddthetamat, d_joint_limits, ) # Copy results back d_thetamat.copy_to_host(thetamat) d_dthetamat.copy_to_host(dthetamat) d_ddthetamat.copy_to_host(ddthetamat) elapsed = time.time() - start_time throughput = (num_steps * num_joints * intRes) / elapsed / 1e6 self.performance_stats["gpu_calls"] += 1 self.performance_stats["total_gpu_time"] += elapsed self.performance_stats["kernel_launches"] += 1 logger.info(f"GPU forward dynamics completed in {elapsed:.4f}s") logger.info(f"📊 Throughput: {throughput:.1f} M integration steps/sec") return { "positions": thetamat, "velocities": dthetamat, "accelerations": ddthetamat, } except Exception as e: logger.warning(f"GPU forward dynamics failed: {e}, falling back to CPU") return self._forward_dynamics_cpu( thetalist, dthetalist, taumat, g, Ftipmat, dt, intRes ) finally: # Return large arrays to pool if "d_thetamat" in locals(): return_cuda_array(d_thetamat) if "d_dthetamat" in locals(): return_cuda_array(d_dthetamat) if "d_ddthetamat" in locals(): return_cuda_array(d_ddthetamat)
[docs] def _forward_dynamics_cpu( self, thetalist, dthetalist, taumat, g, Ftipmat, dt, intRes ) -> Dict[str, np.ndarray]: """CPU-based forward dynamics integration over a trajectory. Integrates joint motion step by step using ``self.dynamics.forward_dynamics`` with ``intRes`` Euler sub-steps per outer step (``dt / intRes`` each), clamping joint angles to ``self.joint_limits`` and using zero acceleration for any step that raises. Args: thetalist: (num_joints,) ndarray of initial joint angles, radians. dthetalist: (num_joints,) ndarray of initial joint velocities, rad/s. taumat: (num_steps, num_joints) ndarray of applied joint torques. g: (3,) ndarray of gravitational acceleration, m/s^2. Ftipmat: (num_steps, 6) ndarray of end-effector wrenches per step. dt (float): Outer time step between trajectory points, seconds. intRes (int): Number of integration sub-steps per outer step. Returns: Dict[str, np.ndarray]: Dict with keys ``"positions"``, ``"velocities"`` and ``"accelerations"``, each a ``(num_steps, num_joints)`` array. """ start_time = time.time() num_steps = taumat.shape[0] num_joints = thetalist.shape[0] thetamat = np.zeros((num_steps, num_joints), dtype=np.float32) dthetamat = np.zeros((num_steps, num_joints), dtype=np.float32) ddthetamat = np.zeros((num_steps, num_joints), dtype=np.float32) # Initialize with starting conditions current_theta = thetalist.copy() current_dtheta = dthetalist.copy() thetamat[0, :] = current_theta dthetamat[0, :] = current_dtheta dt_step = dt / intRes for i in range(1, num_steps): for _ in range(intRes): try: # Compute forward dynamics ddtheta = self.dynamics.forward_dynamics( current_theta, current_dtheta, taumat[i], g, Ftipmat[i] ) # Integrate current_dtheta += ddtheta * dt_step current_theta += current_dtheta * dt_step # Apply joint limits current_theta = np.clip( current_theta, self.joint_limits[:, 0], self.joint_limits[:, 1] ) ddthetamat[i] = ddtheta except Exception as e: logger.warning(f"Error in forward dynamics at step {i}: {e}") ddthetamat[i] = np.zeros(num_joints) thetamat[i, :] = current_theta dthetamat[i, :] = current_dtheta elapsed = time.time() - start_time self.performance_stats["cpu_calls"] += 1 self.performance_stats["total_cpu_time"] += elapsed logger.info(f"CPU forward dynamics completed in {elapsed:.4f}s") return { "positions": thetamat, "velocities": dthetamat, "accelerations": ddthetamat, }
[docs] def cartesian_trajectory(self, Xstart, Xend, Tf, N, method) -> Dict[str, np.ndarray]: """ Enhanced Cartesian trajectory generation with optimal kernel selection. Args: Xstart (np.ndarray): Initial end-effector configuration (SE(3) matrix). Xend (np.ndarray): Final end-effector configuration (SE(3) matrix). Tf (float): Total time of motion. N (int): Number of trajectory points. method (int): Time-scaling method (3=cubic, 5=quintic). Returns: dict: Dictionary with positions, velocities, accelerations, and orientations. """ logger.info(f"Generating Cartesian trajectory: N={N}, method={method}") N = int(N) timegap = Tf / (N - 1.0) traj = [None] * N Rstart, pstart = TransToRp(Xstart) Rend, pend = TransToRp(Xend) orientations = np.zeros((N, 3, 3), dtype=np.float32) # Compute orientation interpolation on CPU (complex matrix operations) for i in range(N): if method == 3: s = CubicTimeScaling(Tf, timegap * i) else: s = QuinticTimeScaling(Tf, timegap * i) traj[i] = np.r_[ np.c_[ np.dot(Rstart, MatrixExp3(MatrixLog3(np.dot(Rstart.T, Rend)) * s)), s * pend + (1 - s) * pstart, ], [[0, 0, 0, 1]], ] orientations[i] = np.dot( Rstart, MatrixExp3(MatrixLog3(np.dot(Rstart.T, Rend)) * s) ) traj_pos = np.array([TransToRp(T)[1] for T in traj], dtype=np.float32) # Use GPU for position/velocity/acceleration computation if beneficial use_gpu = self._should_use_gpu(N, 3) # 3 coordinates (x,y,z) if use_gpu: traj_vel, traj_acc = self._cartesian_trajectory_gpu( pstart, pend, Tf, N, method ) else: traj_vel, traj_acc = self._cartesian_trajectory_cpu( pstart, pend, Tf, N, method ) return { "positions": traj_pos, "velocities": traj_vel, "accelerations": traj_acc, "orientations": orientations, }
[docs] def _cartesian_trajectory_gpu( self, pstart, pend, Tf, N, method ) -> Tuple[np.ndarray, np.ndarray]: """GPU computation of Cartesian linear velocity and acceleration. Launches the Cartesian trajectory kernel to compute the time-scaled translational velocity and acceleration of the straight-line path from ``pstart`` to ``pend``. Falls back to the CPU implementation on error. Args: pstart: (3,) ndarray of the start position, metres. pend: (3,) ndarray of the end position, metres. Tf (float): Total trajectory duration, seconds. N (int): Number of trajectory points. method (int): Time-scaling method; 3 for cubic, 5 for quintic. Returns: Tuple[np.ndarray, np.ndarray]: ``(traj_vel, traj_acc)``, each an ``(N, 3)`` array of linear velocity (m/s) and acceleration (m/s^2). """ start_time = time.time() try: pstart = np.ascontiguousarray(pstart.astype(np.float32)) pend = np.ascontiguousarray(pend.astype(np.float32)) traj_vel = get_cuda_array((N, 3), dtype=np.float32) traj_acc = get_cuda_array((N, 3), dtype=np.float32) traj_pos_dummy = get_cuda_array((N, 3), dtype=np.float32) # Transfer data using pinned memory d_pstart = _h2d_pinned(pstart) d_pend = _h2d_pinned(pend) # Get optimal launch configuration grid_config = get_optimal_kernel_config(N, 3, "warp_optimized") if grid_config: blocks_per_grid = grid_config["grid"] threads_per_block = grid_config["block"] logger.info( f"Using {grid_config['kernel_type']} for Cartesian trajectory" ) else: blocks_per_grid, threads_per_block = _best_2d_config(N, 3) # Launch Cartesian trajectory kernel cartesian_trajectory_kernel[blocks_per_grid, threads_per_block]( d_pstart, d_pend, traj_pos_dummy, traj_vel, traj_acc, Tf, N, method ) # Copy results back traj_vel_host = traj_vel.copy_to_host() traj_acc_host = traj_acc.copy_to_host() elapsed = time.time() - start_time self.performance_stats["gpu_calls"] += 1 self.performance_stats["total_gpu_time"] += elapsed self.performance_stats["kernel_launches"] += 1 logger.info(f"GPU Cartesian trajectory completed in {elapsed:.4f}s") return traj_vel_host, traj_acc_host except Exception as e: logger.warning(f"GPU Cartesian trajectory failed: {e}, falling back to CPU") return self._cartesian_trajectory_cpu(pstart, pend, Tf, N, method) finally: # Return memory to pool if "traj_vel" in locals(): return_cuda_array(traj_vel) if "traj_acc" in locals(): return_cuda_array(traj_acc) if "traj_pos_dummy" in locals(): return_cuda_array(traj_pos_dummy)
[docs] def _cartesian_trajectory_cpu( self, pstart, pend, Tf, N, method ) -> Tuple[np.ndarray, np.ndarray]: """CPU computation of Cartesian linear velocity and acceleration. Evaluates the time-scaling derivatives at each point and applies them to the straight-line displacement ``pend - pstart`` to obtain the translational velocity and acceleration profiles. Args: pstart: (3,) ndarray of the start position, metres. pend: (3,) ndarray of the end position, metres. Tf (float): Total trajectory duration, seconds. N (int): Number of trajectory points. method (int): Time-scaling method; 3 for cubic, 5 for quintic. Returns: Tuple[np.ndarray, np.ndarray]: ``(traj_vel, traj_acc)``, each an ``(N, 3)`` array of linear velocity (m/s) and acceleration (m/s^2). """ start_time = time.time() traj_vel = np.zeros((N, 3), dtype=np.float32) traj_acc = np.zeros((N, 3), dtype=np.float32) for i in range(N): t = i * (Tf / (N - 1)) tau = t / Tf if method == 3: s_dot = 6.0 * tau * (1.0 - tau) / Tf s_ddot = 6.0 / (Tf * Tf) * (1.0 - 2.0 * tau) elif method == 5: tau2 = tau * tau tau3 = tau2 * tau tau4 = tau2 * tau2 s_dot = (30.0 * tau2 - 60.0 * tau3 + 30.0 * tau4) / Tf s_ddot = (60.0 * tau - 180.0 * tau2 + 120.0 * tau3) / (Tf * Tf) else: s_dot = s_ddot = 0.0 dp = pend - pstart traj_vel[i] = s_dot * dp traj_acc[i] = s_ddot * dp elapsed = time.time() - start_time self.performance_stats["cpu_calls"] += 1 self.performance_stats["total_cpu_time"] += elapsed logger.info(f"CPU Cartesian trajectory completed in {elapsed:.4f}s") return traj_vel, traj_acc
[docs] def get_performance_stats(self) -> Dict[str, Any]: """ Enhanced performance statistics with speedup analysis. Returns: dict: Comprehensive performance statistics """ stats = self.performance_stats.copy() if stats["gpu_calls"] > 0: stats["avg_gpu_time"] = stats["total_gpu_time"] / stats["gpu_calls"] else: stats["avg_gpu_time"] = 0.0 if stats["cpu_calls"] > 0: stats["avg_cpu_time"] = stats["total_cpu_time"] / stats["cpu_calls"] else: stats["avg_cpu_time"] = 0.0 total_calls = stats["gpu_calls"] + stats["cpu_calls"] if total_calls > 0: stats["gpu_usage_percent"] = (stats["gpu_calls"] / total_calls) * 100 else: stats["gpu_usage_percent"] = 0.0 # Calculate overall speedup total_gpu_time = stats["total_gpu_time"] total_cpu_time = stats["total_cpu_time"] if total_gpu_time > 0 and total_cpu_time > 0: stats["overall_speedup"] = total_cpu_time / total_gpu_time else: stats["overall_speedup"] = 0.0 # Add memory pool statistics if self.cuda_available: stats["memory_pool_stats"] = get_memory_pool_stats() # Simple EWMA auto-tune for adaptive threshold if stats["avg_gpu_time"] > 0 and stats["avg_cpu_time"] > 0: efficiency_ratio = stats["avg_cpu_time"] / stats["avg_gpu_time"] self.cpu_threshold = int( 0.9 * self.cpu_threshold + 0.1 * efficiency_ratio * self.cpu_threshold ) self.cpu_threshold = max( 50, min(self.cpu_threshold, 5000) ) # Keep within reasonable bounds return stats
[docs] def reset_performance_stats(self) -> None: """Reset performance statistics.""" self.performance_stats = { "gpu_calls": 0, "cpu_calls": 0, "total_gpu_time": 0.0, "total_cpu_time": 0.0, "memory_transfers": 0, "kernel_launches": 0, "speedup_achieved": 0.0, "best_kernel_used": "none", }
[docs] def cleanup_gpu_memory(self) -> None: """Enhanced GPU memory cleanup.""" if self.cuda_available: # Clean up per-instance cache if hasattr(self, "_gpu_arrays"): for array in self._gpu_arrays.values(): if array is not None: return_cuda_array(array) self._gpu_arrays.clear() # Clear kernel cache if hasattr(self, "_kernel_cache"): self._kernel_cache.clear() # Clear global memory pool from .cuda_kernels import _cuda_memory_pool _cuda_memory_pool.clear() # Synchronize and clean up CUDA context cuda.synchronize() logger.info("GPU memory cleaned up")
def benchmark_all_kernels( self, N: int = 5000, num_joints: int = 6, num_runs: int = 5 ) -> Dict[str, Dict[str, Any]]: """ Comprehensive benchmarking of all available kernels. Args: N (int): Number of trajectory points for benchmarking num_joints (int): Number of joints num_runs (int): Number of benchmark runs per kernel Returns: dict: Benchmark results for all kernels """ if not self.cuda_available: logger.warning("CUDA not available for benchmarking") return {} logger.info( f"🔬 Benchmarking all kernels: N={N}, joints={num_joints}, runs={num_runs}" ) # Generate test data thetastart = np.random.uniform(-1, 1, num_joints).astype(np.float32) thetaend = np.random.uniform(-1, 1, num_joints).astype(np.float32) kernel_types = [ "standard", "vectorized", "memory_optimized", "warp_optimized", "cache_friendly", ] results = {} for kernel_type in kernel_types: logger.info(f"📊 Testing {kernel_type} kernel...") # Reset stats for clean measurement self.reset_performance_stats() times = [] for run in range(num_runs): start_time = time.time() try: trajectory = self.joint_trajectory( thetastart, thetaend, 2.0, N, 5, kernel_type=kernel_type, enable_monitoring=False, ) elapsed = time.time() - start_time times.append(elapsed) except Exception as e: logger.warning(f"Kernel {kernel_type} failed: {e}") times.append(float("inf")) if times and min(times) < float("inf"): results[kernel_type] = { "mean_time": np.mean(times), "std_time": np.std(times), "min_time": np.min(times), "max_time": np.max(times), "all_times": times, "success_rate": sum(1 for t in times if t < float("inf")) / len(times), } else: results[kernel_type] = { "mean_time": float("inf"), "std_time": 0, "min_time": float("inf"), "max_time": float("inf"), "all_times": times, "success_rate": 0, } # Find best kernel best_kernel = min(results.keys(), key=lambda k: results[k]["mean_time"]) best_time = results[best_kernel]["mean_time"] logger.info(f"🏆 Best kernel: {best_kernel} ({best_time*1000:.2f}ms)") # Print comparison table print("\n📋 Kernel Performance Comparison:") print("=" * 70) print(f"{'Kernel':<20} {'Mean (ms)':<12} {'Min (ms)':<12} {'Success':<10}") print("-" * 70) for kernel_type, stats in results.items(): mean_ms = ( stats["mean_time"] * 1000 if stats["mean_time"] < float("inf") else float("inf") ) min_ms = ( stats["min_time"] * 1000 if stats["min_time"] < float("inf") else float("inf") ) success = f"{stats['success_rate']*100:.0f}%" marker = "🏆" if kernel_type == best_kernel else " " print( f"{marker}{kernel_type:<18} {mean_ms:<12.2f} {min_ms:<12.2f} {success:<10}" ) return results def __del__(self) -> None: """Enhanced destructor with better error handling.""" try: if ( hasattr(self, "enable_profiling") and self.enable_profiling and hasattr(self, "cuda_available") and self.cuda_available ): profile_stop() if hasattr(self, "cleanup_gpu_memory"): self.cleanup_gpu_memory() except Exception: pass # Ignore errors during cleanup # Enhanced plotting methods with performance annotations
[docs] @staticmethod def plot_trajectory( trajectory_data, Tf, title="Joint Trajectory", labels=None, performance_stats=None, ) -> None: """Plot joint position, velocity and acceleration trajectories. Draws a 3-by-num_joints grid of subplots (position, velocity, acceleration per joint) over a time axis spanning ``[0, Tf]`` and appends GPU speedup / kernel info to the title when ``performance_stats`` is provided. Displays the figure via ``plt.show()``. Args: trajectory_data (dict): Trajectory dict with keys ``"positions"``, ``"velocities"`` and ``"accelerations"``, each an ``(num_steps, num_joints)`` array. Tf (float): Total trajectory duration, seconds, defining the time axis. title (str): Base figure title. Defaults to ``"Joint Trajectory"``. labels (list, optional): Per-joint labels; used only when its length equals ``num_joints``, otherwise generic ``"Joint i"`` labels are generated. performance_stats (dict, optional): Optional stats with keys such as ``"speedup_achieved"`` and ``"best_kernel_used"`` used to annotate the title. """ positions = trajectory_data["positions"] velocities = trajectory_data["velocities"] accelerations = trajectory_data["accelerations"] num_steps = positions.shape[0] num_joints = positions.shape[1] time_steps = np.linspace(0, Tf, num_steps) fig, axs = plt.subplots(3, num_joints, figsize=(15, 10), sharex="col") # Add performance info to title if performance_stats: speedup = performance_stats.get("speedup_achieved", 0) kernel = performance_stats.get("best_kernel_used", "unknown") if speedup > 1: title += f" (GPU: {speedup:.1f}x speedup, {kernel} kernel)" else: title += " (CPU execution)" fig.suptitle(title) for i in range(num_joints): if labels and len(labels) == num_joints: label = labels[i] else: label = f"Joint {i+1}" axs[0, i].plot(time_steps, positions[:, i], label=f"{label} Position") axs[0, i].set_ylabel("Position") axs[0, i].legend() axs[1, i].plot(time_steps, velocities[:, i], label=f"{label} Velocity") axs[1, i].set_ylabel("Velocity") axs[1, i].legend() axs[2, i].plot( time_steps, accelerations[:, i], label=f"{label} Acceleration" ) axs[2, i].set_ylabel("Acceleration") axs[2, i].legend() for ax in axs[-1]: ax.set_xlabel("Time (s)") plt.tight_layout() plt.show()
[docs] def plot_tcp_trajectory(self, trajectory, dt) -> None: """ Enhanced TCP trajectory plotting with performance monitoring. Args: trajectory (list): A list of joint angle configurations representing the trajectory. dt (float): The time step between consecutive points in the trajectory. Returns: None """ start_time = time.time() tcp_trajectory = [ self.serial_manipulator.forward_kinematics(joint_angles) for joint_angles in trajectory ] tcp_positions = [pose[:3, 3] for pose in tcp_trajectory] velocity, acceleration, jerk = self.calculate_derivatives(tcp_positions, dt) time_array = np.arange(0, len(tcp_positions) * dt, dt) elapsed = time.time() - start_time plt.figure(figsize=(12, 8)) title = f"TCP Trajectory (FK computed in {elapsed:.3f}s)" plt.suptitle(title) for i, label in enumerate(["X", "Y", "Z"]): plt.subplot(4, 1, 1) plt.plot( time_array, np.array(tcp_positions)[:, i], label=f"TCP {label} Position" ) plt.ylabel("Position") plt.legend() plt.subplot(4, 1, 2) plt.plot(time_array[:-1], velocity[:, i], label=f"TCP {label} Velocity") plt.ylabel("Velocity") plt.legend() plt.subplot(4, 1, 3) plt.plot( time_array[:-2], acceleration[:, i], label=f"TCP {label} Acceleration" ) plt.ylabel("Acceleration") plt.legend() plt.subplot(4, 1, 4) plt.plot(time_array[:-3], jerk[:, i], label=f"TCP {label} Jerk") plt.xlabel("Time") plt.ylabel("Jerk") plt.legend() plt.tight_layout() plt.show()
[docs] def plot_cartesian_trajectory( self, trajectory_data, Tf, title="Cartesian Trajectory", performance_stats=None ) -> None: """ Enhanced Cartesian trajectory plotting with performance information. Args: trajectory_data (dict): A dictionary containing trajectory data. Tf (float): The final time of the trajectory. title (str, optional): The title of the plot. performance_stats (dict, optional): Performance statistics to display. Returns: None """ positions = trajectory_data["positions"] velocities = trajectory_data["velocities"] accelerations = trajectory_data["accelerations"] num_steps = positions.shape[0] time_steps = np.linspace(0, Tf, num_steps) # Add performance info to title if performance_stats: speedup = performance_stats.get("speedup_achieved", 0) if speedup > 1: title += f" (GPU: {speedup:.1f}x speedup)" else: title += " (CPU execution)" fig, axs = plt.subplots(3, 1, figsize=(10, 15), sharex="col") fig.suptitle(title) axs[0].plot(time_steps, positions[:, 0], label="X Position") axs[0].plot(time_steps, positions[:, 1], label="Y Position") axs[0].plot(time_steps, positions[:, 2], label="Z Position") axs[0].set_ylabel("Position") axs[0].legend() axs[1].plot(time_steps, velocities[:, 0], label="X Velocity") axs[1].plot(time_steps, velocities[:, 1], label="Y Velocity") axs[1].plot(time_steps, velocities[:, 2], label="Z Velocity") axs[1].set_ylabel("Velocity") axs[1].legend() axs[2].plot(time_steps, accelerations[:, 0], label="X Acceleration") axs[2].plot(time_steps, accelerations[:, 1], label="Y Acceleration") axs[2].plot(time_steps, accelerations[:, 2], label="Z Acceleration") axs[2].set_ylabel("Acceleration") axs[2].legend() axs[2].set_xlabel("Time (s)") plt.tight_layout() plt.show()
[docs] def calculate_derivatives( self, positions, dt ) -> Tuple[np.ndarray, np.ndarray, np.ndarray]: """ Calculate the velocity, acceleration, and jerk of a trajectory. Parameters: positions (list or numpy.ndarray): A list or array of positions. dt (float): The time step between each position. Returns: velocity (numpy.ndarray): An array of velocities. acceleration (numpy.ndarray): An array of accelerations. jerk (numpy.ndarray): An array of jerks. """ positions = np.array(positions) velocity = np.diff(positions, axis=0) / dt acceleration = np.diff(velocity, axis=0) / dt jerk = np.diff(acceleration, axis=0) / dt return velocity, acceleration, jerk
[docs] def plot_ee_trajectory( self, trajectory_data, Tf, title="End-Effector Trajectory" ) -> None: """ Enhanced end-effector trajectory plotting. Args: trajectory_data (dict): A dictionary containing trajectory data. Tf (float): The final time of the trajectory. title (str, optional): The title of the plot. Returns: None """ positions = trajectory_data["positions"] num_steps = positions.shape[0] time_steps = np.linspace(0, Tf, num_steps) if "orientations" in trajectory_data: orientations = trajectory_data["orientations"] else: # Compute orientations using forward kinematics start_time = time.time() orientations = np.array( [ self.serial_manipulator.forward_kinematics(pos)[:3, :3] for pos in positions ] ) elapsed = time.time() - start_time title += f" (FK for orientations: {elapsed:.3f}s)" fig = plt.figure(figsize=(12, 8)) ax = fig.add_subplot(111, projection="3d") fig.suptitle(title) ax.plot( positions[:, 0], positions[:, 1], positions[:, 2], label="EE Position", color="b", linewidth=2, ) # Draw orientation frames at selected points frame_step = max(1, num_steps // 20) for i in range(0, num_steps, frame_step): R = orientations[i] pos = positions[i] scale = 0.01 # X-axis (red) ax.quiver( pos[0], pos[1], pos[2], R[0, 0], R[1, 0], R[2, 0], length=scale, color="r", alpha=0.8, ) # Y-axis (green) ax.quiver( pos[0], pos[1], pos[2], R[0, 1], R[1, 1], R[2, 1], length=scale, color="g", alpha=0.8, ) # Z-axis (blue) ax.quiver( pos[0], pos[1], pos[2], R[0, 2], R[1, 2], R[2, 2], length=scale, color="b", alpha=0.8, ) ax.set_xlabel("X Position") ax.set_ylabel("Y Position") ax.set_zlabel("Z Position") ax.legend() plt.show()
[docs] def plan_trajectory( self, start_position, target_position, obstacle_points ) -> List[List[float]]: """ Enhanced trajectory planning with collision avoidance. Args: start_position (list): Initial joint configuration. target_position (list): Desired joint configuration. obstacle_points (list): List of obstacle points in the environment. Returns: list: Joint trajectory as a list of joint configurations. """ logger.info( f"Planning trajectory from {len(start_position)} to {len(target_position)} DOF" ) # Enhanced trajectory planning with multiple waypoints # This is a simple interpolation - can be extended with RRT*, PRM, etc. num_waypoints = 5 joint_trajectory = [] start_pos = np.array(start_position) target_pos = np.array(target_position) for i in range(num_waypoints + 1): alpha = i / num_waypoints waypoint = (1 - alpha) * start_pos + alpha * target_pos # Simple collision avoidance - move away from obstacles if obstacle_points and self.potential_field: for _ in range(10): # Max adjustment iterations gradient = self.potential_field.compute_gradient( waypoint, target_pos, obstacle_points ) waypoint -= 0.01 * gradient # Check if waypoint is collision-free if self.collision_checker: if not self.collision_checker.check_collision(waypoint): break joint_trajectory.append(waypoint.tolist()) logger.info(f"Planned trajectory with {len(joint_trajectory)} waypoints") return joint_trajectory
[docs] def benchmark_performance( self, test_cases=None, include_cpu_comparison: bool = True ) -> Dict[str, Dict[str, Any]]: """ Enhanced performance benchmarking with detailed analysis. Args: test_cases (list, optional): List of test cases to benchmark. include_cpu_comparison (bool): Whether to include CPU vs GPU comparison. Returns: dict: Comprehensive benchmark results """ if test_cases is None: test_cases = [ {"N": 100, "joints": 6, "name": "Small"}, {"N": 1000, "joints": 6, "name": "Medium"}, {"N": 5000, "joints": 6, "name": "Large"}, {"N": 10000, "joints": 6, "name": "Very Large"}, {"N": 1000, "joints": 12, "name": "Many joints"}, {"N": 5000, "joints": 12, "name": "Large + Many joints"}, ] results = {} print("\n🚀 Enhanced Performance Benchmarking") print("=" * 60) for test_case in test_cases: N = test_case["N"] joints = test_case["joints"] name = test_case["name"] logger.info(f"Benchmarking {name} case: N={N}, joints={joints}") # Generate test data thetastart = np.random.uniform(-1, 1, joints).astype(np.float32) thetaend = np.random.uniform(-1, 1, joints).astype(np.float32) # Reset stats self.reset_performance_stats() # Test trajectory generation with multiple runs for accuracy times = [] for run in range(3): # Multiple runs for statistical accuracy start_time = time.time() trajectory = self.joint_trajectory(thetastart, thetaend, 2.0, N, 5) end_time = time.time() times.append(end_time - start_time) mean_time = np.mean(times) std_time = np.std(times) # Get performance stats stats = self.get_performance_stats() results[name] = { "mean_time": mean_time, "std_time": std_time, "min_time": min(times), "max_time": max(times), "N": N, "joints": joints, "stats": stats, "used_gpu": stats["gpu_calls"] > 0, "trajectory_shape": trajectory["positions"].shape, "speedup_achieved": stats.get("speedup_achieved", 0), "kernel_used": stats.get("best_kernel_used", "unknown"), "elements_per_second": (N * joints) / mean_time, } # CPU comparison if requested if include_cpu_comparison and self.cuda_available: # Force CPU execution old_threshold = self.cpu_threshold self.cpu_threshold = float("inf") # Force CPU cpu_start = time.time() cpu_trajectory = self.joint_trajectory(thetastart, thetaend, 2.0, N, 5) cpu_time = time.time() - cpu_start self.cpu_threshold = old_threshold # Restore threshold if mean_time > 0: actual_speedup = cpu_time / mean_time results[name]["cpu_time"] = cpu_time results[name]["actual_speedup"] = actual_speedup else: results[name]["actual_speedup"] = 0 # Print summary gpu_indicator = "🚀 GPU" if results[name]["used_gpu"] else "🖥️ CPU" speedup_str = "" if ( "actual_speedup" in results[name] and results[name]["actual_speedup"] > 1 ): speedup_str = f" ({results[name]['actual_speedup']:.1f}x speedup)" print( f"{gpu_indicator} {name}: {mean_time*1000:.2f}±{std_time*1000:.2f}ms{speedup_str}" ) logger.info( f"{name} benchmark: {mean_time:.4f}s, GPU: {results[name]['used_gpu']}" ) # Print summary table print(f"\n📊 Benchmark Summary:") print("-" * 80) print( f"{'Test Case':<20} {'Time (ms)':<12} {'GPU':<6} {'Speedup':<10} {'Throughput':<15}" ) print("-" * 80) for name, result in results.items(): time_ms = result["mean_time"] * 1000 gpu_used = "✓" if result["used_gpu"] else "✗" speedup = ( f"{result.get('actual_speedup', 0):.1f}x" if result.get("actual_speedup", 0) > 1 else "-" ) throughput = f"{result['elements_per_second']/1e6:.2f} M/s" print( f"{name:<20} {time_ms:<12.2f} {gpu_used:<6} {speedup:<10} {throughput:<15}" ) return results
# Maintain backward compatibility with original class name
[docs] class TrajectoryPlanning(OptimizedTrajectoryPlanning): """ Backward compatibility alias for OptimizedTrajectoryPlanning. This ensures existing code continues to work while providing access to all optimizations. """ def __init__(self, *args: Any, **kwargs: Any) -> None: """Initialize the compatibility wrapper with the optimized planner.""" super().__init__(*args, **kwargs) logger.info("Using OptimizedTrajectoryPlanning (backward compatibility mode)")
# Enhanced utility functions for advanced users
[docs] def create_optimized_planner( serial_manipulator, urdf_path, dynamics, joint_limits, torque_limits=None, target_speedup=40.0, gpu_memory_mb=None, enable_profiling=False, kernel_type="auto", ) -> OptimizedTrajectoryPlanning: """ Enhanced factory function to create an optimized trajectory planner. Args: serial_manipulator: SerialManipulator instance urdf_path: Path to URDF file dynamics: ManipulatorDynamics instance joint_limits: Joint limits torque_limits: Torque limits (optional) target_speedup: Target speedup over CPU (default: 40x) gpu_memory_mb: GPU memory pool size in MB (optional) enable_profiling: Enable CUDA profiling (optional) kernel_type: Kernel selection strategy (optional) Returns: OptimizedTrajectoryPlanning: Configured planner instance """ # Auto-detect optimal settings cuda_available = check_cuda_availability() # Adaptive threshold based on target speedup and problem size num_joints = len(joint_limits) if cuda_available: gpu_props = get_gpu_properties() if gpu_props: sm_count = gpu_props["multiprocessor_count"] if target_speedup >= 40: threshold = max(50, int(sm_count * 10000 / num_joints)) elif target_speedup >= 20: threshold = max(50, int(sm_count * 5000 / num_joints)) else: threshold = max(50, int(sm_count * 1000 / num_joints)) else: threshold = 1000 else: threshold = float("inf") # Never use GPU if not available # Create planner with optimized settings planner = OptimizedTrajectoryPlanning( serial_manipulator=serial_manipulator, urdf_path=urdf_path, dynamics=dynamics, joint_limits=joint_limits, torque_limits=torque_limits, use_cuda=None, # Auto-detect cuda_threshold=threshold, memory_pool_size_mb=gpu_memory_mb, enable_profiling=enable_profiling, auto_optimize=True, kernel_type=kernel_type, target_speedup=target_speedup, ) logger.info( f"Created optimized planner for {num_joints} joints, " f"target: {target_speedup}x speedup, CUDA: {cuda_available}" ) return planner
[docs] def compare_implementations( serial_manipulator, urdf_path, dynamics, joint_limits, test_params=None, detailed_analysis=True, ) -> Dict[str, Any]: """ Enhanced implementation comparison with detailed kernel analysis. Args: serial_manipulator: SerialManipulator instance urdf_path: Path to URDF file dynamics: ManipulatorDynamics instance joint_limits: Joint limits test_params: Test parameters (optional) detailed_analysis: Whether to perform detailed kernel comparison Returns: dict: Comprehensive comparison results """ if test_params is None: test_params = {"N": 5000, "Tf": 2.0, "method": 5, "num_runs": 5} # Create CPU-only planner cpu_planner = OptimizedTrajectoryPlanning( serial_manipulator=serial_manipulator, urdf_path=urdf_path, dynamics=dynamics, joint_limits=joint_limits, use_cuda=False, ) # Create GPU planner (if available) gpu_planner = None if check_cuda_availability(): gpu_planner = OptimizedTrajectoryPlanning( serial_manipulator=serial_manipulator, urdf_path=urdf_path, dynamics=dynamics, joint_limits=joint_limits, use_cuda=True, cuda_threshold=0, # Force GPU usage kernel_type="auto_tune", ) # Generate test data num_joints = len(joint_limits) thetastart = np.random.uniform(-1, 1, num_joints).astype(np.float32) thetaend = np.random.uniform(-1, 1, num_joints).astype(np.float32) results = {"cpu": {}, "gpu": {}} # Test CPU implementation logger.info("Testing CPU implementation...") cpu_times = [] for run in range(test_params.get("num_runs", 3)): start_time = time.time() cpu_result = cpu_planner.joint_trajectory( thetastart, thetaend, test_params["Tf"], test_params["N"], test_params["method"], ) cpu_times.append(time.time() - start_time) cpu_mean_time = np.mean(cpu_times) results["cpu"] = { "mean_time": cpu_mean_time, "std_time": np.std(cpu_times), "min_time": np.min(cpu_times), "max_time": np.max(cpu_times), "result_shape": cpu_result["positions"].shape, "stats": cpu_planner.get_performance_stats(), } # Test GPU implementation (if available) if gpu_planner is not None: logger.info("Testing GPU implementation...") # Test different kernels if detailed analysis requested if detailed_analysis: kernel_results = gpu_planner.benchmark_all_kernels( N=test_params["N"], num_joints=num_joints, num_runs=test_params.get("num_runs", 3), ) results["kernel_comparison"] = kernel_results # Test best configuration gpu_times = [] for run in range(test_params.get("num_runs", 3)): start_time = time.time() gpu_result = gpu_planner.joint_trajectory( thetastart, thetaend, test_params["Tf"], test_params["N"], test_params["method"], ) gpu_times.append(time.time() - start_time) gpu_mean_time = np.mean(gpu_times) speedup = cpu_mean_time / gpu_mean_time if gpu_mean_time > 0 else 0 results["gpu"] = { "mean_time": gpu_mean_time, "std_time": np.std(gpu_times), "min_time": np.min(gpu_times), "max_time": np.max(gpu_times), "result_shape": gpu_result["positions"].shape, "stats": gpu_planner.get_performance_stats(), "speedup": speedup, } # Compare accuracy pos_diff = np.abs(cpu_result["positions"] - gpu_result["positions"]) vel_diff = np.abs(cpu_result["velocities"] - gpu_result["velocities"]) acc_diff = np.abs(cpu_result["accelerations"] - gpu_result["accelerations"]) results["accuracy"] = { "max_pos_diff": np.max(pos_diff), "max_vel_diff": np.max(vel_diff), "max_acc_diff": np.max(acc_diff), "mean_pos_diff": np.mean(pos_diff), "mean_vel_diff": np.mean(vel_diff), "mean_acc_diff": np.mean(acc_diff), } # Print comprehensive results print(f"\n🚀 Implementation Comparison Results:") print("=" * 50) print( f"CPU Time: {cpu_mean_time*1000:.2f} ± {results['cpu']['std_time']*1000:.2f} ms" ) print( f"GPU Time: {gpu_mean_time*1000:.2f} ± {results['gpu']['std_time']*1000:.2f} ms" ) print(f"Speedup: {speedup:.1f}x") print(f"Max Position Error: {results['accuracy']['max_pos_diff']:.2e}") print(f"Mean Position Error: {results['accuracy']['mean_pos_diff']:.2e}") if speedup >= 40: print("🎯 Achieved 40x+ speedup target!") elif speedup >= 20: print("⚡ Good speedup achieved!") elif speedup >= 5: print("✅ Moderate speedup achieved") else: print("⚠️ Limited speedup - consider larger problem sizes") logger.info(f"GPU speedup: {speedup:.2f}x") else: results["gpu"] = {"available": False} logger.info("GPU not available for comparison") return results
def benchmark_kernel_performance_comprehensive( serial_manipulator, urdf_path, dynamics, joint_limits, test_sizes=None, num_runs=5 ) -> Dict[str, Dict[str, Any]]: """ Comprehensive kernel performance benchmarking across multiple problem sizes. Args: serial_manipulator: SerialManipulator instance urdf_path: Path to URDF file dynamics: ManipulatorDynamics instance joint_limits: Joint limits test_sizes: List of (N, joints) tuples to test num_runs: Number of runs per test Returns: dict: Comprehensive benchmark results """ if not check_cuda_availability(): logger.warning("CUDA not available for comprehensive benchmarking") return {} if test_sizes is None: test_sizes = [ (1000, 6), (5000, 6), (10000, 6), (20000, 6), (1000, 12), (5000, 12), (10000, 12), ] print("\n🔬 Comprehensive Kernel Performance Benchmarking") print("=" * 60) all_results = {} for N, joints in test_sizes: logger.info(f"Testing N={N}, joints={joints}") # Create optimized planner planner = OptimizedTrajectoryPlanning( serial_manipulator=serial_manipulator, urdf_path=urdf_path, dynamics=dynamics, joint_limits=joint_limits[:joints], # Use subset of joints use_cuda=True, cuda_threshold=0, kernel_type="auto_tune", ) # Benchmark all kernels for this problem size kernel_results = planner.benchmark_all_kernels( N=N, num_joints=joints, num_runs=num_runs ) all_results[f"N{N}_J{joints}"] = { "N": N, "joints": joints, "total_elements": N * joints, "kernel_results": kernel_results, } # Find best kernel for this size if kernel_results: best_kernel = min( kernel_results.keys(), key=lambda k: kernel_results[k]["mean_time"] ) best_time = kernel_results[best_kernel]["mean_time"] throughput = (N * joints) / best_time / 1e6 print( f"N={N:5d}, J={joints:2d}: Best={best_kernel:<15} " f"Time={best_time*1000:6.2f}ms Throughput={throughput:6.1f}M/s" ) return all_results # Export important classes and functions __all__ = [ "OptimizedTrajectoryPlanning", "TrajectoryPlanning", # Backward compatibility "create_optimized_planner", "compare_implementations", "benchmark_kernel_performance_comprehensive", ]