#!/usr/bin/env python3
# SPDX-License-Identifier: AGPL-3.0-or-later
"""
Optimized Path Planning Module - ManipulaPy
This module provides highly optimized trajectory planning capabilities including joint space
and Cartesian space trajectory generation with CUDA acceleration and collision avoidance.
Key optimizations:
- Adaptive grid sizing for optimal GPU occupancy
- Memory pooling to reduce allocation overhead
- Batch processing for multiple trajectories
- Fused kernels to minimize memory bandwidth
- Intelligent fallback to CPU when beneficial
- 2D parallelization for better GPU utilization
- Advanced kernel selection for 40x+ speedups
Copyright (c) 2025 Mohamed Aboelnasr
Licensed under the GNU Affero General Public License v3.0 or later (AGPL-3.0-or-later)
"""
import time
import warnings
from typing import Optional
import matplotlib.pyplot as plt
import numpy as np
from numba import njit, prange
from .cuda_kernels import (
CUDA_AVAILABLE,
_best_2d_config,
_h2d_pinned,
auto_select_optimal_kernel,
benchmark_kernel_performance,
check_cuda_availability,
get_cuda_array,
get_gpu_properties,
get_memory_pool_stats,
get_optimal_kernel_config,
make_1d_grid,
make_2d_grid,
make_2d_grid_optimized,
optimized_batch_trajectory_generation,
optimized_potential_field,
optimized_trajectory_generation,
optimized_trajectory_generation_monitored,
print_performance_recommendations,
profile_start,
profile_stop,
return_cuda_array,
setup_cuda_environment_for_40x_speedup,
)
from .utils import (
CubicTimeScaling,
MatrixExp3,
MatrixLog3,
QuinticTimeScaling,
TransToRp,
)
# Import CUDA functions only if available
if CUDA_AVAILABLE:
from numba import cuda
from .cuda_kernels import (
batch_trajectory_kernel,
cartesian_trajectory_kernel,
forward_dynamics_kernel,
fused_potential_gradient_kernel,
inverse_dynamics_kernel,
trajectory_kernel,
trajectory_kernel_cache_friendly,
trajectory_kernel_memory_optimized,
trajectory_kernel_vectorized,
trajectory_kernel_warp_optimized,
)
else:
# Create dummy functions for when CUDA is not available
def trajectory_kernel(*args, **kwargs):
raise RuntimeError("CUDA not available")
def trajectory_kernel_vectorized(*args, **kwargs):
raise RuntimeError("CUDA not available")
def trajectory_kernel_memory_optimized(*args, **kwargs):
raise RuntimeError("CUDA not available")
def trajectory_kernel_warp_optimized(*args, **kwargs):
raise RuntimeError("CUDA not available")
def trajectory_kernel_cache_friendly(*args, **kwargs):
raise RuntimeError("CUDA not available")
def inverse_dynamics_kernel(*args, **kwargs):
raise RuntimeError("CUDA not available")
def forward_dynamics_kernel(*args, **kwargs):
raise RuntimeError("CUDA not available")
def cartesian_trajectory_kernel(*args, **kwargs):
raise RuntimeError("CUDA not available")
def fused_potential_gradient_kernel(*args, **kwargs):
raise RuntimeError("CUDA not available")
def batch_trajectory_kernel(*args, **kwargs):
raise RuntimeError("CUDA not available")
class MockCuda:
@staticmethod
def to_device(*args, **kwargs):
raise RuntimeError("CUDA not available")
@staticmethod
def device_array(*args, **kwargs):
raise RuntimeError("CUDA not available")
@staticmethod
def synchronize():
pass
cuda = MockCuda()
import logging
from .potential_field import CollisionChecker, PotentialField
# Module-level logger; leave handler configuration to the host application
logger = logging.getLogger(__name__)
logging.getLogger("numba.cuda.cudadrv.driver").setLevel(logging.WARNING)
@njit(parallel=True, fastmath=True)
def _trajectory_cpu_fallback(thetastart, thetaend, Tf, N, method):
"""Numba-optimised CPU trajectory generation (parallel)."""
num_joints = len(thetastart)
traj_pos = np.zeros((N, num_joints), dtype=np.float32)
traj_vel = np.zeros((N, num_joints), dtype=np.float32)
traj_acc = np.zeros((N, num_joints), dtype=np.float32)
# Flatten (idx, j) → k to avoid nested loops that block parallelisation
total_elems = N * num_joints
for k in prange(total_elems):
idx = k // num_joints # timestep
j = k % num_joints # joint index
t = idx * (Tf / (N - 1))
tau = t / Tf
# Time-scaling
if method == 3: # cubic
s = 3.0 * tau * tau - 2.0 * tau * tau * tau
s_dot = 6.0 * tau * (1.0 - tau) / Tf
s_ddot = 6.0 / (Tf * Tf) * (1.0 - 2.0 * tau)
elif method == 5: # quintic
tau2 = tau * tau
tau3 = tau2 * tau
s = 10.0 * tau3 - 15.0 * tau2 * tau2 + 6.0 * tau * tau3
s_dot = 30.0 * tau2 * (1.0 - 2.0 * tau + tau2) / Tf
s_ddot = 60.0 / (Tf * Tf) * tau * (1.0 - 2.0 * tau)
else: # unsupported method
s = s_dot = s_ddot = 0.0
dtheta = thetaend[j] - thetastart[j]
traj_pos[idx, j] = s * dtheta + thetastart[j]
traj_vel[idx, j] = s_dot * dtheta
traj_acc[idx, j] = s_ddot * dtheta
return traj_pos, traj_vel, traj_acc
# Thin wrapper – unchanged signature, now just calls the new kernel above
@njit(parallel=True, fastmath=True)
def _traj_cpu_njit(thetastart, thetaend, Tf, N, method):
return _trajectory_cpu_fallback(thetastart, thetaend, Tf, N, method)
[docs]class OptimizedTrajectoryPlanning:
"""
Highly optimized trajectory planning class with adaptive GPU/CPU execution,
memory pooling, and batch processing capabilities for 40x+ speedups.
"""
[docs] def __init__(
self,
serial_manipulator,
urdf_path,
dynamics,
joint_limits,
torque_limits=None,
*, # ――― everything after * is keyword-only ―――
use_cuda: Optional[bool] = None,
cuda_threshold: int = 10,
memory_pool_size_mb: Optional[int] = None,
enable_profiling: bool = False,
auto_optimize: bool = True,
kernel_type: str = "auto",
target_speedup: float = 40.0,
):
"""
Enhanced trajectory planner with advanced CUDA optimizations.
Parameters
----------
serial_manipulator : SerialManipulator
urdf_path : str
dynamics : ManipulatorDynamics
joint_limits : list[tuple[float,float]]
torque_limits : list[tuple[float,float]], optional
use_cuda : None | bool
• None → auto-detect (default)
• True → force GPU (raise if CUDA absent)
• False → force CPU
cuda_threshold : int
Min. (N × joints) before we bother launching the GPU.
memory_pool_size_mb: int | None
If set, resize the global CUDA memory pool (in MB).
enable_profiling : bool
Enable CUDA profiling for performance analysis.
auto_optimize : bool
Automatically setup CUDA environment for maximum performance.
kernel_type : str
Kernel selection strategy: "auto", "standard", "vectorized",
"memory_optimized", "warp_optimized", "cache_friendly", "auto_tune"
target_speedup : float
Target speedup over CPU (used for recommendations).
"""
# ------------------------------------------------------------
# FIRST: Set all basic attributes to prevent AttributeError
# ------------------------------------------------------------
self.kernel_type = kernel_type if kernel_type is not None else "auto"
self.target_speedup = target_speedup if target_speedup is not None else 40.0
self.enable_profiling = (
enable_profiling if enable_profiling is not None else False
)
# Initialize all caches and tracking attributes immediately
self._gpu_arrays = {}
self._kernel_cache = {}
self._last_cpu_time = 0.0
# Initialize performance stats early
self.performance_stats = {
"gpu_calls": 0,
"cpu_calls": 0,
"total_gpu_time": 0.0,
"total_cpu_time": 0.0,
"memory_transfers": 0,
"kernel_launches": 0,
"speedup_achieved": 0.0,
"best_kernel_used": "none",
}
# ------------------------------------------------------------
# Auto-optimization setup
# ------------------------------------------------------------
if auto_optimize and CUDA_AVAILABLE:
setup_cuda_environment_for_40x_speedup()
# ------------------------------------------------------------
# basic data
# ------------------------------------------------------------
self.serial_manipulator = serial_manipulator
self.dynamics = dynamics
self.joint_limits = np.asarray(joint_limits, dtype=np.float32)
self.torque_limits = (
np.asarray(torque_limits, dtype=np.float32)
if torque_limits is not None
else np.array([[-np.inf, np.inf]] * len(joint_limits), dtype=np.float32)
)
# Store optimization parameters
self.kernel_type = kernel_type
self.target_speedup = target_speedup
# ------------------------------------------------------------
# collision-checking helpers
# ------------------------------------------------------------
try:
self.collision_checker = CollisionChecker(urdf_path)
self.potential_field = PotentialField()
except Exception as exc:
logger.warning("Could not initialise collision checker: %s", exc)
self.collision_checker = None
self.potential_field = None
# ------------------------------------------------------------
# CUDA feature flags
# ------------------------------------------------------------
detected_cuda = check_cuda_availability()
if use_cuda is None:
self.cuda_available = detected_cuda
elif use_cuda and not detected_cuda:
raise RuntimeError("use_cuda=True requested but CUDA is not available.")
else:
self.cuda_available = bool(use_cuda)
self.gpu_properties = get_gpu_properties() if self.cuda_available else None
# Adaptive threshold based on target speedup
if self.cuda_available and self.gpu_properties:
sm_count = self.gpu_properties["multiprocessor_count"]
# Calculate threshold for target speedup
min_elements_per_sm = 1000 if target_speedup >= 40 else 500
self.cpu_threshold = max(
cuda_threshold, int(sm_count * min_elements_per_sm / len(joint_limits))
)
else:
self.cpu_threshold = cuda_threshold
# optionally resize a global memory-pool
if memory_pool_size_mb is not None and self.cuda_available:
from .cuda_kernels import _cuda_memory_pool
_cuda_memory_pool.max_pool_size = (
memory_pool_size_mb * 1024 * 1024 // 4 # entries of float32
)
# ------------------------------------------------------------
# performance bookkeeping
# ------------------------------------------------------------
self.performance_stats = {
"gpu_calls": 0,
"cpu_calls": 0,
"total_gpu_time": 0.0,
"total_cpu_time": 0.0,
"memory_transfers": 0,
"kernel_launches": 0,
"speedup_achieved": 0.0,
"best_kernel_used": "none",
}
# Enable profiling if requested (after all attributes are initialized)
if self.enable_profiling and self.cuda_available:
profile_start()
# Print performance recommendations on initialization
if self.cuda_available:
num_joints = len(joint_limits)
logger.info(
f"🚀 OptimizedTrajectoryPlanning initialized for {num_joints} joints"
)
if target_speedup >= 40:
min_N_for_target = self.cpu_threshold // num_joints
logger.info(
f"💡 For {target_speedup}x speedup, use N ≥ {min_N_for_target:,} trajectory points"
)
logger.info(
"Optimised planner – CUDA enabled: %s (threshold %d, kernel: %s)",
self.cuda_available,
self.cpu_threshold,
self.kernel_type,
)
if self.gpu_properties:
logger.info(
"GPU: %d SMs, %d max threads/block",
self.gpu_properties["multiprocessor_count"],
self.gpu_properties["max_threads_per_block"],
)
[docs] def _get_or_resize_gpu_array(self, array_name, shape, dtype=np.float32):
"""
Return a pooled CUDA array with the requested shape / dtype.
Enhanced with better memory management.
"""
if not self.cuda_available:
return None
arr = self._gpu_arrays.get(array_name)
if (arr is None) or (arr.shape != shape) or (arr.dtype != dtype):
if arr is not None:
return_cuda_array(arr)
arr = get_cuda_array(shape, dtype)
self._gpu_arrays[array_name] = arr
return arr
[docs] def _should_use_gpu(self, N, num_joints):
"""Enhanced GPU selection logic with performance prediction."""
if not self.cuda_available:
return False
total_work = N * num_joints
if total_work < self.cpu_threshold:
return False
# Additional checks for memory and performance
if self.gpu_properties:
sm_count = self.gpu_properties["multiprocessor_count"]
elements_per_sm = total_work / sm_count
# Check if we can achieve target speedup
target_speedup_value = getattr(self, "target_speedup", 40.0)
if target_speedup_value >= 40 and elements_per_sm < 10000:
logger.debug(
f"Problem size may not achieve {target_speedup_value}x speedup. "
f"Elements per SM: {elements_per_sm:.0f}, recommended: ≥10,000"
)
return True
def _get_optimal_kernel_config(self, N, num_joints):
"""Get or compute optimal kernel configuration with caching."""
# Ensure required attributes exist
if not hasattr(self, "_kernel_cache"):
self._kernel_cache = {}
if not hasattr(self, "kernel_type"):
self.kernel_type = "auto"
cache_key = (N, num_joints, self.kernel_type)
if cache_key in self._kernel_cache:
return self._kernel_cache[cache_key]
if self.kernel_type == "auto":
kernel_type = auto_select_optimal_kernel(N, num_joints)
else:
kernel_type = self.kernel_type
config = get_optimal_kernel_config(N, num_joints, kernel_type)
self._kernel_cache[cache_key] = config
return config
[docs] def joint_trajectory(
self,
thetastart,
thetaend,
Tf,
N,
method,
kernel_type=None,
enable_monitoring=None,
):
"""
Enhanced joint trajectory generation with advanced CUDA optimizations.
Args:
thetastart (numpy.ndarray): The starting joint angles.
thetaend (numpy.ndarray): The ending joint angles.
Tf (float): The final time for the trajectory.
N (int): The number of steps in the trajectory.
method (int): The method to use (3=cubic, 5=quintic).
kernel_type (str, optional): Override default kernel selection.
enable_monitoring (bool, optional): Override default monitoring.
Returns:
dict: A dictionary containing positions, velocities, and accelerations.
"""
# Use instance defaults if not specified, with safety checks
if kernel_type is None:
kernel_type = getattr(self, "kernel_type", "auto")
if enable_monitoring is None:
enable_monitoring = getattr(self, "enable_profiling", False)
logger.info(
f"Generating joint trajectory: N={N}, joints={len(thetastart)}, "
f"method={method}, kernel={kernel_type}"
)
thetastart = np.array(thetastart, dtype=np.float32)
thetaend = np.array(thetaend, dtype=np.float32)
num_joints = len(thetastart)
# Print performance recommendations if beneficial
total_work = N * num_joints
if self.cuda_available and total_work >= 10000:
print_performance_recommendations(N, num_joints)
# Decide on execution strategy
use_gpu = self._should_use_gpu(N, num_joints)
if use_gpu:
return self._joint_trajectory_gpu(
thetastart, thetaend, Tf, N, method, kernel_type, enable_monitoring
)
else:
return self._joint_trajectory_cpu(thetastart, thetaend, Tf, N, method)
[docs] def _joint_trajectory_gpu(
self, thetastart, thetaend, Tf, N, method, kernel_type, enable_monitoring
):
"""Enhanced GPU trajectory generation with optimal kernel selection."""
start_time = time.time()
try:
# Use the monitored high-level wrapper for maximum performance
traj_pos_host, traj_vel_host, traj_acc_host = (
optimized_trajectory_generation_monitored(
thetastart,
thetaend,
Tf,
N,
method,
use_pinned=True,
kernel_type=kernel_type,
enable_monitoring=enable_monitoring,
)
)
# Apply joint limits
num_joints = len(thetastart)
for i in range(num_joints):
traj_pos_host[:, i] = np.clip(
traj_pos_host[:, i],
self.joint_limits[i, 0],
self.joint_limits[i, 1],
)
# Apply collision avoidance if available
if self.collision_checker and self.potential_field:
traj_pos_host = self._apply_collision_avoidance_gpu(
traj_pos_host, thetaend
)
# Calculate achieved speedup
elapsed = time.time() - start_time
if hasattr(self, "_last_cpu_time") and self._last_cpu_time > 0:
speedup = self._last_cpu_time / elapsed
self.performance_stats["speedup_achieved"] = speedup
if enable_monitoring:
logger.info(f"🎯 Achieved {speedup:.1f}x speedup over CPU!")
else:
# No previous CPU time to compare against
if enable_monitoring:
logger.info(
"🚀 GPU execution completed (no CPU baseline for comparison)"
)
# Update performance stats
self.performance_stats["gpu_calls"] += 1
self.performance_stats["total_gpu_time"] += elapsed
self.performance_stats["kernel_launches"] += 1
# Update best kernel used
config = self._get_optimal_kernel_config(N, len(thetastart))
if config:
self.performance_stats["best_kernel_used"] = config.get(
"kernel_type", "unknown"
)
logger.info(f"GPU trajectory generation completed in {elapsed:.4f}s")
return {
"positions": traj_pos_host,
"velocities": traj_vel_host,
"accelerations": traj_acc_host,
}
except Exception as e:
logger.warning(
f"GPU trajectory generation failed: {e}, falling back to CPU"
)
return self._joint_trajectory_cpu(thetastart, thetaend, Tf, N, method)
[docs] def _joint_trajectory_cpu(self, thetastart, thetaend, Tf, N, method):
"""CPU-based joint trajectory generation with performance tracking."""
start_time = time.time()
# Use optimized CPU fallback
traj_pos, traj_vel, traj_acc = _traj_cpu_njit(
thetastart, thetaend, Tf, N, method
)
# Apply joint limits
num_joints = len(thetastart)
for i in range(num_joints):
traj_pos[:, i] = np.clip(
traj_pos[:, i], self.joint_limits[i, 0], self.joint_limits[i, 1]
)
# Apply collision avoidance if available
if self.collision_checker and self.potential_field:
traj_pos = self._apply_collision_avoidance_cpu(traj_pos, thetaend)
# Store CPU time for speedup calculations
elapsed = time.time() - start_time
self._last_cpu_time = elapsed
# Update performance stats
self.performance_stats["cpu_calls"] += 1
self.performance_stats["total_cpu_time"] += elapsed
logger.info(f"CPU trajectory generation completed in {elapsed:.4f}s")
return {
"positions": traj_pos,
"velocities": traj_vel,
"accelerations": traj_acc,
}
[docs] def _apply_collision_avoidance_gpu(self, traj_pos, thetaend):
"""Apply GPU-accelerated potential field-based collision avoidance."""
if not self.cuda_available:
return self._apply_collision_avoidance_cpu(traj_pos, thetaend)
try:
q_goal = thetaend
obstacles = [] # Define obstacles here as needed
# Use GPU-accelerated potential field computation
for idx, step in enumerate(traj_pos):
if self.collision_checker.check_collision(step):
# Prepare data for GPU computation
positions = step.reshape(1, -1)
for iteration in range(100): # Max iterations
try:
# Use optimized potential field computation
potential, gradient = optimized_potential_field(
positions,
q_goal,
np.array(obstacles),
influence_distance=0.5,
use_pinned=True,
)
# Update position
step -= 0.01 * gradient[0] # Adjust step size as needed
positions[0] = step
if not self.collision_checker.check_collision(step):
break
except Exception as e:
logger.warning(
f"GPU potential field computation failed: {e}"
)
# Fall back to CPU method
gradient = self.potential_field.compute_gradient(
step, q_goal, obstacles
)
step -= 0.01 * gradient
if not self.collision_checker.check_collision(step):
break
traj_pos[idx] = step
return traj_pos
except Exception as e:
logger.warning(f"GPU collision avoidance failed: {e}, falling back to CPU")
return self._apply_collision_avoidance_cpu(traj_pos, thetaend)
[docs] def _apply_collision_avoidance_cpu(self, traj_pos, thetaend):
"""Apply CPU-based potential field collision avoidance."""
q_goal = thetaend
obstacles = [] # Define obstacles here as needed
# Apply potential field for collision avoidance
for idx, step in enumerate(traj_pos):
if self.collision_checker.check_collision(step):
for _ in range(100): # Max iterations to adjust trajectory
gradient = self.potential_field.compute_gradient(
step, q_goal, obstacles
)
step -= 0.01 * gradient # Adjust step size as needed
if not self.collision_checker.check_collision(step):
break
traj_pos[idx] = step
return traj_pos
[docs] def batch_joint_trajectory(
self, thetastart_batch, thetaend_batch, Tf, N, method, kernel_type=None
):
"""
Enhanced batch trajectory generation with optimal kernel selection.
Args:
thetastart_batch (numpy.ndarray): Starting angles (batch_size, num_joints)
thetaend_batch (numpy.ndarray): Ending angles (batch_size, num_joints)
Tf (float): Final time for all trajectories
N (int): Number of trajectory points
method (int): Time scaling method
kernel_type (str, optional): Override kernel selection
Returns:
dict: Batch trajectory data with shape (batch_size, N, num_joints)
"""
if kernel_type is None:
kernel_type = getattr(self, "kernel_type", "auto")
batch_size, num_joints = thetastart_batch.shape
logger.info(
f"Generating batch trajectories: batch_size={batch_size}, "
f"N={N}, joints={num_joints}, kernel={kernel_type}"
)
if not self.cuda_available:
logger.warning(
"Batch processing requires CUDA. Falling back to sequential processing."
)
return self._batch_joint_trajectory_cpu(
thetastart_batch, thetaend_batch, Tf, N, method
)
# Print performance recommendations for batch processing
total_work = batch_size * N * num_joints
if total_work >= 50000:
print_performance_recommendations(N * batch_size, num_joints)
start_time = time.time()
try:
# Use optimized batch trajectory generation
traj_pos_host, traj_vel_host, traj_acc_host = (
optimized_batch_trajectory_generation(
thetastart_batch, thetaend_batch, Tf, N, method, use_pinned=True
)
)
# Apply joint limits for all trajectories
for batch_idx in range(batch_size):
for i in range(num_joints):
traj_pos_host[batch_idx, :, i] = np.clip(
traj_pos_host[batch_idx, :, i],
self.joint_limits[i, 0],
self.joint_limits[i, 1],
)
elapsed = time.time() - start_time
throughput = total_work / elapsed / 1e6 # Million elements per second
self.performance_stats["gpu_calls"] += 1
self.performance_stats["total_gpu_time"] += elapsed
self.performance_stats["kernel_launches"] += 1
logger.info(f"Batch GPU trajectory generation completed in {elapsed:.4f}s")
logger.info(f"📊 Throughput: {throughput:.1f} M elements/sec")
return {
"positions": traj_pos_host,
"velocities": traj_vel_host,
"accelerations": traj_acc_host,
}
except Exception as e:
logger.warning(
f"Batch GPU trajectory generation failed: {e}, falling back to CPU"
)
return self._batch_joint_trajectory_cpu(
thetastart_batch, thetaend_batch, Tf, N, method
)
[docs] def _batch_joint_trajectory_cpu(
self, thetastart_batch, thetaend_batch, Tf, N, method
):
"""CPU fallback for batch trajectory generation."""
start_time = time.time()
batch_size, num_joints = thetastart_batch.shape
# Initialize result arrays
traj_pos_batch = np.zeros((batch_size, N, num_joints), dtype=np.float32)
traj_vel_batch = np.zeros((batch_size, N, num_joints), dtype=np.float32)
traj_acc_batch = np.zeros((batch_size, N, num_joints), dtype=np.float32)
# Process each trajectory in the batch
for i in range(batch_size):
traj_pos, traj_vel, traj_acc = _traj_cpu_njit(
thetastart_batch[i], thetaend_batch[i], Tf, N, method
)
traj_pos_batch[i] = traj_pos
traj_vel_batch[i] = traj_vel
traj_acc_batch[i] = traj_acc
# Enforce joint limits for all trajectories (parity with GPU path)
for batch_idx in range(batch_size):
for j in range(num_joints):
traj_pos_batch[batch_idx, :, j] = np.clip(
traj_pos_batch[batch_idx, :, j],
self.joint_limits[j, 0],
self.joint_limits[j, 1],
)
elapsed = time.time() - start_time
self.performance_stats["cpu_calls"] += 1
self.performance_stats["total_cpu_time"] += elapsed
logger.info(f"Batch CPU trajectory generation completed in {elapsed:.4f}s")
return {
"positions": traj_pos_batch,
"velocities": traj_vel_batch,
"accelerations": traj_acc_batch,
}
[docs] def inverse_dynamics_trajectory(
self,
thetalist_trajectory,
dthetalist_trajectory,
ddthetalist_trajectory,
gravity_vector=None,
Ftip=None,
):
"""
Compute joint torques with enhanced CUDA acceleration.
Args:
thetalist_trajectory (np.ndarray): Array of joint angles over the trajectory.
dthetalist_trajectory (np.ndarray): Array of joint velocities over the trajectory.
ddthetalist_trajectory (np.ndarray): Array of joint accelerations over the trajectory.
gravity_vector (np.ndarray, optional): Gravity vector affecting the system.
Ftip (list, optional): External forces applied at the end effector.
Returns:
np.ndarray: Array of joint torques required to follow the trajectory.
"""
if gravity_vector is None:
gravity_vector = np.array([0, 0, -9.81])
if Ftip is None:
Ftip = [0, 0, 0, 0, 0, 0]
num_points = thetalist_trajectory.shape[0]
num_joints = thetalist_trajectory.shape[1]
logger.info(
f"Computing inverse dynamics: {num_points} points, {num_joints} joints"
)
# Print performance recommendations
if self.cuda_available:
total_work = num_points * num_joints
if total_work >= 10000:
print_performance_recommendations(num_points, num_joints)
# Decide on execution strategy
use_gpu = self._should_use_gpu(num_points, num_joints)
if use_gpu:
return self._inverse_dynamics_gpu(
thetalist_trajectory,
dthetalist_trajectory,
ddthetalist_trajectory,
gravity_vector,
Ftip,
)
else:
return self._inverse_dynamics_cpu(
thetalist_trajectory,
dthetalist_trajectory,
ddthetalist_trajectory,
gravity_vector,
Ftip,
)
[docs] def _inverse_dynamics_gpu(
self,
thetalist_trajectory,
dthetalist_trajectory,
ddthetalist_trajectory,
gravity_vector,
Ftip,
):
"""GPU-accelerated inverse dynamics computation with fixed kernel signature."""
start_time = time.time()
num_points = thetalist_trajectory.shape[0]
num_joints = thetalist_trajectory.shape[1]
try:
# Use memory pool for the large torques array
torques_trajectory = get_cuda_array(
(num_points, num_joints), dtype=np.float32
)
# Transfer data to GPU using pinned memory - ensure proper data types
d_thetalist_trajectory = _h2d_pinned(
thetalist_trajectory.astype(np.float32)
)
d_dthetalist_trajectory = _h2d_pinned(
dthetalist_trajectory.astype(np.float32)
)
d_ddthetalist_trajectory = _h2d_pinned(
ddthetalist_trajectory.astype(np.float32)
)
d_gravity_vector = cuda.to_device(gravity_vector.astype(np.float32))
d_Ftip = cuda.to_device(np.array(Ftip, dtype=np.float32))
# Safely handle dynamics data conversion
try:
# Convert Glist to proper numpy array format
if hasattr(self.dynamics, "Glist") and self.dynamics.Glist is not None:
if isinstance(self.dynamics.Glist, list):
# Convert list of matrices to 3D numpy array
Glist_array = np.stack(self.dynamics.Glist).astype(np.float32)
else:
Glist_array = np.array(self.dynamics.Glist, dtype=np.float32)
else:
# Create dummy Glist if not available
Glist_array = np.eye(6, dtype=np.float32)[None, :, :].repeat(
num_joints, axis=0
)
# Convert S_list to proper format
if (
hasattr(self.dynamics, "S_list")
and self.dynamics.S_list is not None
):
Slist_array = np.array(self.dynamics.S_list, dtype=np.float32)
else:
# Create dummy S_list if not available
Slist_array = np.random.randn(6, num_joints).astype(np.float32)
# Convert M_list to proper format
if (
hasattr(self.dynamics, "M_list")
and self.dynamics.M_list is not None
):
M_array = np.array(self.dynamics.M_list, dtype=np.float32)
else:
# Create dummy M if not available
M_array = np.eye(4, dtype=np.float32)
d_Glist = cuda.to_device(Glist_array)
d_Slist = cuda.to_device(Slist_array)
d_M = cuda.to_device(M_array)
except Exception as e:
logger.warning(
f"Error converting dynamics data: {e}, using simplified approach"
)
# Fallback to simplified dynamics computation on CPU
return self._inverse_dynamics_cpu(
thetalist_trajectory,
dthetalist_trajectory,
ddthetalist_trajectory,
gravity_vector,
Ftip,
)
d_torque_limits = cuda.to_device(self.torque_limits.astype(np.float32))
# Get optimal 2D launch configuration with bounds checking
try:
blocks_per_grid, threads_per_block = _best_2d_config(
num_points, num_joints
)
logger.info(
f"Inverse dynamics 2D grid: blocks={blocks_per_grid}, threads={threads_per_block}"
)
except Exception as e:
logger.warning(f"Error in grid configuration: {e}, using fallback")
# Fallback to safe grid configuration
blocks_per_grid = ((num_points + 15) // 16, (num_joints + 15) // 16)
threads_per_block = (16, 16)
# Launch optimized 2D inverse dynamics kernel with CORRECT signature
try:
# FIXED: The kernel expects 11 arguments, but was receiving 10
# Original call was missing the 'stream' parameter (last argument)
# Let's check the kernel signature in cuda_kernels.py:
# From cuda_kernels.py, the kernel signature is:
# inverse_dynamics_kernel(
# thetalist_trajectory, dthetalist_trajectory, ddthetalist_trajectory,
# gravity_vector, Ftip, Glist, Slist, M, torques_trajectory, torque_limits, stream=0
# )
# That's 11 parameters total including the stream parameter
inverse_dynamics_kernel[blocks_per_grid, threads_per_block](
d_thetalist_trajectory, # 1
d_dthetalist_trajectory, # 2
d_ddthetalist_trajectory, # 3
d_gravity_vector, # 4
d_Ftip, # 5
d_Glist, # 6
d_Slist, # 7
d_M, # 8
torques_trajectory, # 9
d_torque_limits, # 10
# 0 # 11 - stream parameter (was missing!)
)
# Synchronize to check for kernel execution errors
cuda.synchronize()
except Exception as kernel_error:
logger.warning(f"CUDA kernel execution failed: {kernel_error}")
# Fallback to CPU implementation
return self._inverse_dynamics_cpu(
thetalist_trajectory,
dthetalist_trajectory,
ddthetalist_trajectory,
gravity_vector,
Ftip,
)
# Copy results back using pinned memory
torques_host = torques_trajectory.copy_to_host()
# Apply final torque limits
torques_host = np.clip(
torques_host, self.torque_limits[:, 0], self.torque_limits[:, 1]
)
elapsed = time.time() - start_time
self.performance_stats["gpu_calls"] += 1
self.performance_stats["total_gpu_time"] += elapsed
self.performance_stats["kernel_launches"] += 1
logger.info(f"GPU inverse dynamics completed in {elapsed:.4f}s")
return torques_host
except Exception as e:
logger.warning(f"GPU inverse dynamics failed: {e}, falling back to CPU")
return self._inverse_dynamics_cpu(
thetalist_trajectory,
dthetalist_trajectory,
ddthetalist_trajectory,
gravity_vector,
Ftip,
)
finally:
# Return large array to pool
if "torques_trajectory" in locals():
return_cuda_array(torques_trajectory)
[docs] def _inverse_dynamics_cpu(
self,
thetalist_trajectory,
dthetalist_trajectory,
ddthetalist_trajectory,
gravity_vector,
Ftip,
):
"""CPU-based inverse dynamics computation."""
start_time = time.time()
num_points = thetalist_trajectory.shape[0]
num_joints = thetalist_trajectory.shape[1]
torques_trajectory = np.zeros((num_points, num_joints), dtype=np.float32)
# Process each trajectory point
for i in range(num_points):
try:
torques = self.dynamics.inverse_dynamics(
thetalist_trajectory[i],
dthetalist_trajectory[i],
ddthetalist_trajectory[i],
gravity_vector,
Ftip,
)
torques_trajectory[i] = np.array(torques, dtype=np.float32)
except Exception as e:
logger.warning(f"Error in inverse dynamics at point {i}: {e}")
# Use zero torques for problematic points
torques_trajectory[i] = np.zeros(num_joints, dtype=np.float32)
# Apply torque limits
torques_trajectory = np.clip(
torques_trajectory, self.torque_limits[:, 0], self.torque_limits[:, 1]
)
elapsed = time.time() - start_time
self.performance_stats["cpu_calls"] += 1
self.performance_stats["total_cpu_time"] += elapsed
logger.info(f"CPU inverse dynamics completed in {elapsed:.4f}s")
return torques_trajectory
[docs] def forward_dynamics_trajectory(
self, thetalist, dthetalist, taumat, g, Ftipmat, dt, intRes
):
"""
Enhanced forward dynamics trajectory computation.
Args:
thetalist (np.ndarray): Initial joint angles.
dthetalist (np.ndarray): Initial joint velocities.
taumat (np.ndarray): Array of joint torques over the trajectory.
g (np.ndarray): Gravity vector.
Ftipmat (np.ndarray): Array of external forces.
dt (float): Time step.
intRes (int): Integration resolution.
Returns:
dict: Dictionary containing positions, velocities, and accelerations.
"""
num_steps = taumat.shape[0]
num_joints = thetalist.shape[0]
logger.info(
f"Computing forward dynamics: {num_steps} steps, {num_joints} joints"
)
# Print performance recommendations
if self.cuda_available:
total_work = num_steps * num_joints
if total_work >= 10000:
print_performance_recommendations(num_steps, num_joints)
# Decide on execution strategy
use_gpu = self._should_use_gpu(num_steps, num_joints)
if use_gpu:
return self._forward_dynamics_gpu(
thetalist, dthetalist, taumat, g, Ftipmat, dt, intRes
)
else:
return self._forward_dynamics_cpu(
thetalist, dthetalist, taumat, g, Ftipmat, dt, intRes
)
[docs] def _forward_dynamics_gpu(
self, thetalist, dthetalist, taumat, g, Ftipmat, dt, intRes
):
"""Enhanced GPU forward dynamics with optimal configuration."""
start_time = time.time()
num_steps = taumat.shape[0]
num_joints = thetalist.shape[0]
try:
# Initialize result arrays
thetamat = np.zeros((num_steps, num_joints), dtype=np.float32)
dthetamat = np.zeros((num_steps, num_joints), dtype=np.float32)
ddthetamat = np.zeros((num_steps, num_joints), dtype=np.float32)
thetamat[0, :] = thetalist.astype(np.float32)
dthetamat[0, :] = dthetalist.astype(np.float32)
# Use memory pool for large arrays
d_thetamat = get_cuda_array((num_steps, num_joints), dtype=np.float32)
d_dthetamat = get_cuda_array((num_steps, num_joints), dtype=np.float32)
d_ddthetamat = get_cuda_array((num_steps, num_joints), dtype=np.float32)
# Copy initial conditions to GPU
d_thetamat.copy_to_device(thetamat)
d_dthetamat.copy_to_device(dthetamat)
d_ddthetamat.copy_to_device(ddthetamat)
# Transfer other data to GPU
d_thetalist = cuda.to_device(thetalist.astype(np.float32))
d_dthetalist = cuda.to_device(dthetalist.astype(np.float32))
d_taumat = cuda.to_device(taumat.astype(np.float32))
d_g = cuda.to_device(g.astype(np.float32))
d_Ftipmat = cuda.to_device(Ftipmat.astype(np.float32))
d_Glist = cuda.to_device(np.array(self.dynamics.Glist, dtype=np.float32))
d_Slist = cuda.to_device(np.array(self.dynamics.S_list, dtype=np.float32))
d_M = cuda.to_device(np.array(self.dynamics.M_list, dtype=np.float32))
d_joint_limits = cuda.to_device(self.joint_limits.astype(np.float32))
# Get optimal launch configuration
grid_config = get_optimal_kernel_config(
num_steps, num_joints, "cache_friendly"
)
if grid_config:
blocks_per_grid = grid_config["grid"]
threads_per_block = grid_config["block"]
logger.info(f"Using {grid_config['kernel_type']} for forward dynamics")
else:
blocks_per_grid, threads_per_block = _best_2d_config(
num_steps, num_joints
)
# Launch forward dynamics kernel
forward_dynamics_kernel[blocks_per_grid, threads_per_block](
d_thetalist,
d_dthetalist,
d_taumat,
d_g,
d_Ftipmat,
dt,
intRes,
d_Glist,
d_Slist,
d_M,
d_thetamat,
d_dthetamat,
d_ddthetamat,
d_joint_limits,
)
# Copy results back
d_thetamat.copy_to_host(thetamat)
d_dthetamat.copy_to_host(dthetamat)
d_ddthetamat.copy_to_host(ddthetamat)
elapsed = time.time() - start_time
throughput = (num_steps * num_joints * intRes) / elapsed / 1e6
self.performance_stats["gpu_calls"] += 1
self.performance_stats["total_gpu_time"] += elapsed
self.performance_stats["kernel_launches"] += 1
logger.info(f"GPU forward dynamics completed in {elapsed:.4f}s")
logger.info(f"📊 Throughput: {throughput:.1f} M integration steps/sec")
return {
"positions": thetamat,
"velocities": dthetamat,
"accelerations": ddthetamat,
}
except Exception as e:
logger.warning(f"GPU forward dynamics failed: {e}, falling back to CPU")
return self._forward_dynamics_cpu(
thetalist, dthetalist, taumat, g, Ftipmat, dt, intRes
)
finally:
# Return large arrays to pool
if "d_thetamat" in locals():
return_cuda_array(d_thetamat)
if "d_dthetamat" in locals():
return_cuda_array(d_dthetamat)
if "d_ddthetamat" in locals():
return_cuda_array(d_ddthetamat)
[docs] def _forward_dynamics_cpu(
self, thetalist, dthetalist, taumat, g, Ftipmat, dt, intRes
):
"""CPU-based forward dynamics computation."""
start_time = time.time()
num_steps = taumat.shape[0]
num_joints = thetalist.shape[0]
thetamat = np.zeros((num_steps, num_joints), dtype=np.float32)
dthetamat = np.zeros((num_steps, num_joints), dtype=np.float32)
ddthetamat = np.zeros((num_steps, num_joints), dtype=np.float32)
# Initialize with starting conditions
current_theta = thetalist.copy()
current_dtheta = dthetalist.copy()
thetamat[0, :] = current_theta
dthetamat[0, :] = current_dtheta
dt_step = dt / intRes
for i in range(1, num_steps):
for _ in range(intRes):
try:
# Compute forward dynamics
ddtheta = self.dynamics.forward_dynamics(
current_theta, current_dtheta, taumat[i], g, Ftipmat[i]
)
# Integrate
current_dtheta += ddtheta * dt_step
current_theta += current_dtheta * dt_step
# Apply joint limits
current_theta = np.clip(
current_theta, self.joint_limits[:, 0], self.joint_limits[:, 1]
)
ddthetamat[i] = ddtheta
except Exception as e:
logger.warning(f"Error in forward dynamics at step {i}: {e}")
ddthetamat[i] = np.zeros(num_joints)
thetamat[i, :] = current_theta
dthetamat[i, :] = current_dtheta
elapsed = time.time() - start_time
self.performance_stats["cpu_calls"] += 1
self.performance_stats["total_cpu_time"] += elapsed
logger.info(f"CPU forward dynamics completed in {elapsed:.4f}s")
return {
"positions": thetamat,
"velocities": dthetamat,
"accelerations": ddthetamat,
}
[docs] def cartesian_trajectory(self, Xstart, Xend, Tf, N, method):
"""
Enhanced Cartesian trajectory generation with optimal kernel selection.
Args:
Xstart (np.ndarray): Initial end-effector configuration (SE(3) matrix).
Xend (np.ndarray): Final end-effector configuration (SE(3) matrix).
Tf (float): Total time of motion.
N (int): Number of trajectory points.
method (int): Time-scaling method (3=cubic, 5=quintic).
Returns:
dict: Dictionary with positions, velocities, accelerations, and orientations.
"""
logger.info(f"Generating Cartesian trajectory: N={N}, method={method}")
N = int(N)
timegap = Tf / (N - 1.0)
traj = [None] * N
Rstart, pstart = TransToRp(Xstart)
Rend, pend = TransToRp(Xend)
orientations = np.zeros((N, 3, 3), dtype=np.float32)
# Compute orientation interpolation on CPU (complex matrix operations)
for i in range(N):
if method == 3:
s = CubicTimeScaling(Tf, timegap * i)
else:
s = QuinticTimeScaling(Tf, timegap * i)
traj[i] = np.r_[
np.c_[
np.dot(Rstart, MatrixExp3(MatrixLog3(np.dot(Rstart.T, Rend)) * s)),
s * pend + (1 - s) * pstart,
],
[[0, 0, 0, 1]],
]
orientations[i] = np.dot(
Rstart, MatrixExp3(MatrixLog3(np.dot(Rstart.T, Rend)) * s)
)
traj_pos = np.array([TransToRp(T)[1] for T in traj], dtype=np.float32)
# Use GPU for position/velocity/acceleration computation if beneficial
use_gpu = self._should_use_gpu(N, 3) # 3 coordinates (x,y,z)
if use_gpu:
traj_vel, traj_acc = self._cartesian_trajectory_gpu(
pstart, pend, Tf, N, method
)
else:
traj_vel, traj_acc = self._cartesian_trajectory_cpu(
pstart, pend, Tf, N, method
)
return {
"positions": traj_pos,
"velocities": traj_vel,
"accelerations": traj_acc,
"orientations": orientations,
}
[docs] def _cartesian_trajectory_gpu(self, pstart, pend, Tf, N, method):
"""Enhanced GPU Cartesian trajectory computation."""
start_time = time.time()
try:
pstart = np.ascontiguousarray(pstart.astype(np.float32))
pend = np.ascontiguousarray(pend.astype(np.float32))
traj_vel = get_cuda_array((N, 3), dtype=np.float32)
traj_acc = get_cuda_array((N, 3), dtype=np.float32)
traj_pos_dummy = get_cuda_array((N, 3), dtype=np.float32)
# Transfer data using pinned memory
d_pstart = _h2d_pinned(pstart)
d_pend = _h2d_pinned(pend)
# Get optimal launch configuration
grid_config = get_optimal_kernel_config(N, 3, "warp_optimized")
if grid_config:
blocks_per_grid = grid_config["grid"]
threads_per_block = grid_config["block"]
logger.info(
f"Using {grid_config['kernel_type']} for Cartesian trajectory"
)
else:
blocks_per_grid, threads_per_block = _best_2d_config(N, 3)
# Launch Cartesian trajectory kernel
cartesian_trajectory_kernel[blocks_per_grid, threads_per_block](
d_pstart, d_pend, traj_pos_dummy, traj_vel, traj_acc, Tf, N, method
)
# Copy results back
traj_vel_host = traj_vel.copy_to_host()
traj_acc_host = traj_acc.copy_to_host()
elapsed = time.time() - start_time
self.performance_stats["gpu_calls"] += 1
self.performance_stats["total_gpu_time"] += elapsed
self.performance_stats["kernel_launches"] += 1
logger.info(f"GPU Cartesian trajectory completed in {elapsed:.4f}s")
return traj_vel_host, traj_acc_host
except Exception as e:
logger.warning(f"GPU Cartesian trajectory failed: {e}, falling back to CPU")
return self._cartesian_trajectory_cpu(pstart, pend, Tf, N, method)
finally:
# Return memory to pool
if "traj_vel" in locals():
return_cuda_array(traj_vel)
if "traj_acc" in locals():
return_cuda_array(traj_acc)
if "traj_pos_dummy" in locals():
return_cuda_array(traj_pos_dummy)
[docs] def _cartesian_trajectory_cpu(self, pstart, pend, Tf, N, method):
"""CPU-based Cartesian trajectory computation."""
start_time = time.time()
traj_vel = np.zeros((N, 3), dtype=np.float32)
traj_acc = np.zeros((N, 3), dtype=np.float32)
for i in range(N):
t = i * (Tf / (N - 1))
tau = t / Tf
if method == 3:
s_dot = 6.0 * tau * (1.0 - tau) / Tf
s_ddot = 6.0 / (Tf * Tf) * (1.0 - 2.0 * tau)
elif method == 5:
tau2 = tau * tau
s_dot = 30.0 * tau2 * (1.0 - 2.0 * tau + tau2) / Tf
s_ddot = 60.0 / (Tf * Tf) * tau * (1.0 - 2.0 * tau)
else:
s_dot = s_ddot = 0.0
dp = pend - pstart
traj_vel[i] = s_dot * dp
traj_acc[i] = s_ddot * dp
elapsed = time.time() - start_time
self.performance_stats["cpu_calls"] += 1
self.performance_stats["total_cpu_time"] += elapsed
logger.info(f"CPU Cartesian trajectory completed in {elapsed:.4f}s")
return traj_vel, traj_acc
[docs] def cleanup_gpu_memory(self):
"""Enhanced GPU memory cleanup."""
if self.cuda_available:
# Clean up per-instance cache
if hasattr(self, "_gpu_arrays"):
for array in self._gpu_arrays.values():
if array is not None:
return_cuda_array(array)
self._gpu_arrays.clear()
# Clear kernel cache
if hasattr(self, "_kernel_cache"):
self._kernel_cache.clear()
# Clear global memory pool
from .cuda_kernels import _cuda_memory_pool
_cuda_memory_pool.clear()
# Synchronize and clean up CUDA context
cuda.synchronize()
logger.info("GPU memory cleaned up")
[docs] def benchmark_all_kernels(self, N=5000, num_joints=6, num_runs=5):
"""
Comprehensive benchmarking of all available kernels.
Args:
N (int): Number of trajectory points for benchmarking
num_joints (int): Number of joints
num_runs (int): Number of benchmark runs per kernel
Returns:
dict: Benchmark results for all kernels
"""
if not self.cuda_available:
logger.warning("CUDA not available for benchmarking")
return {}
logger.info(
f"🔬 Benchmarking all kernels: N={N}, joints={num_joints}, runs={num_runs}"
)
# Generate test data
thetastart = np.random.uniform(-1, 1, num_joints).astype(np.float32)
thetaend = np.random.uniform(-1, 1, num_joints).astype(np.float32)
kernel_types = [
"standard",
"vectorized",
"memory_optimized",
"warp_optimized",
"cache_friendly",
]
results = {}
for kernel_type in kernel_types:
logger.info(f"📊 Testing {kernel_type} kernel...")
# Reset stats for clean measurement
self.reset_performance_stats()
times = []
for run in range(num_runs):
start_time = time.time()
try:
trajectory = self.joint_trajectory(
thetastart,
thetaend,
2.0,
N,
5,
kernel_type=kernel_type,
enable_monitoring=False,
)
elapsed = time.time() - start_time
times.append(elapsed)
except Exception as e:
logger.warning(f"Kernel {kernel_type} failed: {e}")
times.append(float("inf"))
if times and min(times) < float("inf"):
results[kernel_type] = {
"mean_time": np.mean(times),
"std_time": np.std(times),
"min_time": np.min(times),
"max_time": np.max(times),
"all_times": times,
"success_rate": sum(1 for t in times if t < float("inf"))
/ len(times),
}
else:
results[kernel_type] = {
"mean_time": float("inf"),
"std_time": 0,
"min_time": float("inf"),
"max_time": float("inf"),
"all_times": times,
"success_rate": 0,
}
# Find best kernel
best_kernel = min(results.keys(), key=lambda k: results[k]["mean_time"])
best_time = results[best_kernel]["mean_time"]
logger.info(f"🏆 Best kernel: {best_kernel} ({best_time*1000:.2f}ms)")
# Print comparison table
print("\n📋 Kernel Performance Comparison:")
print("=" * 70)
print(f"{'Kernel':<20} {'Mean (ms)':<12} {'Min (ms)':<12} {'Success':<10}")
print("-" * 70)
for kernel_type, stats in results.items():
mean_ms = (
stats["mean_time"] * 1000
if stats["mean_time"] < float("inf")
else float("inf")
)
min_ms = (
stats["min_time"] * 1000
if stats["min_time"] < float("inf")
else float("inf")
)
success = f"{stats['success_rate']*100:.0f}%"
marker = "🏆" if kernel_type == best_kernel else " "
print(
f"{marker}{kernel_type:<18} {mean_ms:<12.2f} {min_ms:<12.2f} {success:<10}"
)
return results
[docs] def __del__(self):
"""Enhanced destructor with better error handling."""
try:
if (
hasattr(self, "enable_profiling")
and self.enable_profiling
and hasattr(self, "cuda_available")
and self.cuda_available
):
profile_stop()
if hasattr(self, "cleanup_gpu_memory"):
self.cleanup_gpu_memory()
except Exception:
pass # Ignore errors during cleanup
# Enhanced plotting methods with performance annotations
[docs] @staticmethod
def plot_trajectory(
trajectory_data,
Tf,
title="Joint Trajectory",
labels=None,
performance_stats=None,
):
"""Enhanced trajectory plotting with performance information."""
positions = trajectory_data["positions"]
velocities = trajectory_data["velocities"]
accelerations = trajectory_data["accelerations"]
num_steps = positions.shape[0]
num_joints = positions.shape[1]
time_steps = np.linspace(0, Tf, num_steps)
fig, axs = plt.subplots(3, num_joints, figsize=(15, 10), sharex="col")
# Add performance info to title
if performance_stats:
speedup = performance_stats.get("speedup_achieved", 0)
kernel = performance_stats.get("best_kernel_used", "unknown")
if speedup > 1:
title += f" (GPU: {speedup:.1f}x speedup, {kernel} kernel)"
else:
title += " (CPU execution)"
fig.suptitle(title)
for i in range(num_joints):
if labels and len(labels) == num_joints:
label = labels[i]
else:
label = f"Joint {i+1}"
axs[0, i].plot(time_steps, positions[:, i], label=f"{label} Position")
axs[0, i].set_ylabel("Position")
axs[0, i].legend()
axs[1, i].plot(time_steps, velocities[:, i], label=f"{label} Velocity")
axs[1, i].set_ylabel("Velocity")
axs[1, i].legend()
axs[2, i].plot(
time_steps, accelerations[:, i], label=f"{label} Acceleration"
)
axs[2, i].set_ylabel("Acceleration")
axs[2, i].legend()
for ax in axs[-1]:
ax.set_xlabel("Time (s)")
plt.tight_layout()
plt.show()
[docs] def plot_tcp_trajectory(self, trajectory, dt):
"""
Enhanced TCP trajectory plotting with performance monitoring.
Args:
trajectory (list): A list of joint angle configurations representing the trajectory.
dt (float): The time step between consecutive points in the trajectory.
Returns:
None
"""
start_time = time.time()
tcp_trajectory = [
self.serial_manipulator.forward_kinematics(joint_angles)
for joint_angles in trajectory
]
tcp_positions = [pose[:3, 3] for pose in tcp_trajectory]
velocity, acceleration, jerk = self.calculate_derivatives(tcp_positions, dt)
time = np.arange(0, len(tcp_positions) * dt, dt)
elapsed = time.time() - start_time
plt.figure(figsize=(12, 8))
title = f"TCP Trajectory (FK computed in {elapsed:.3f}s)"
plt.suptitle(title)
for i, label in enumerate(["X", "Y", "Z"]):
plt.subplot(4, 1, 1)
plt.plot(time, np.array(tcp_positions)[:, i], label=f"TCP {label} Position")
plt.ylabel("Position")
plt.legend()
plt.subplot(4, 1, 2)
plt.plot(time[:-1], velocity[:, i], label=f"TCP {label} Velocity")
plt.ylabel("Velocity")
plt.legend()
plt.subplot(4, 1, 3)
plt.plot(time[:-2], acceleration[:, i], label=f"TCP {label} Acceleration")
plt.ylabel("Acceleration")
plt.legend()
plt.subplot(4, 1, 4)
plt.plot(time[:-3], jerk[:, i], label=f"TCP {label} Jerk")
plt.xlabel("Time")
plt.ylabel("Jerk")
plt.legend()
plt.tight_layout()
plt.show()
[docs] def plot_cartesian_trajectory(
self, trajectory_data, Tf, title="Cartesian Trajectory", performance_stats=None
):
"""
Enhanced Cartesian trajectory plotting with performance information.
Args:
trajectory_data (dict): A dictionary containing trajectory data.
Tf (float): The final time of the trajectory.
title (str, optional): The title of the plot.
performance_stats (dict, optional): Performance statistics to display.
Returns:
None
"""
positions = trajectory_data["positions"]
velocities = trajectory_data["velocities"]
accelerations = trajectory_data["accelerations"]
num_steps = positions.shape[0]
time_steps = np.linspace(0, Tf, num_steps)
# Add performance info to title
if performance_stats:
speedup = performance_stats.get("speedup_achieved", 0)
if speedup > 1:
title += f" (GPU: {speedup:.1f}x speedup)"
else:
title += " (CPU execution)"
fig, axs = plt.subplots(3, 1, figsize=(10, 15), sharex="col")
fig.suptitle(title)
axs[0].plot(time_steps, positions[:, 0], label="X Position")
axs[0].plot(time_steps, positions[:, 1], label="Y Position")
axs[0].plot(time_steps, positions[:, 2], label="Z Position")
axs[0].set_ylabel("Position")
axs[0].legend()
axs[1].plot(time_steps, velocities[:, 0], label="X Velocity")
axs[1].plot(time_steps, velocities[:, 1], label="Y Velocity")
axs[1].plot(time_steps, velocities[:, 2], label="Z Velocity")
axs[1].set_ylabel("Velocity")
axs[1].legend()
axs[2].plot(time_steps, accelerations[:, 0], label="X Acceleration")
axs[2].plot(time_steps, accelerations[:, 1], label="Y Acceleration")
axs[2].plot(time_steps, accelerations[:, 2], label="Z Acceleration")
axs[2].set_ylabel("Acceleration")
axs[2].legend()
axs[2].set_xlabel("Time (s)")
plt.tight_layout()
plt.show()
[docs] def calculate_derivatives(self, positions, dt):
"""
Calculate the velocity, acceleration, and jerk of a trajectory.
Parameters:
positions (list or numpy.ndarray): A list or array of positions.
dt (float): The time step between each position.
Returns:
velocity (numpy.ndarray): An array of velocities.
acceleration (numpy.ndarray): An array of accelerations.
jerk (numpy.ndarray): An array of jerks.
"""
positions = np.array(positions)
velocity = np.diff(positions, axis=0) / dt
acceleration = np.diff(velocity, axis=0) / dt
jerk = np.diff(acceleration, axis=0) / dt
return velocity, acceleration, jerk
[docs] def plot_ee_trajectory(self, trajectory_data, Tf, title="End-Effector Trajectory"):
"""
Enhanced end-effector trajectory plotting.
Args:
trajectory_data (dict): A dictionary containing trajectory data.
Tf (float): The final time of the trajectory.
title (str, optional): The title of the plot.
Returns:
None
"""
positions = trajectory_data["positions"]
num_steps = positions.shape[0]
time_steps = np.linspace(0, Tf, num_steps)
if "orientations" in trajectory_data:
orientations = trajectory_data["orientations"]
else:
# Compute orientations using forward kinematics
start_time = time.time()
orientations = np.array(
[
self.serial_manipulator.forward_kinematics(pos)[:3, :3]
for pos in positions
]
)
elapsed = time.time() - start_time
title += f" (FK for orientations: {elapsed:.3f}s)"
fig = plt.figure(figsize=(12, 8))
ax = fig.add_subplot(111, projection="3d")
fig.suptitle(title)
ax.plot(
positions[:, 0],
positions[:, 1],
positions[:, 2],
label="EE Position",
color="b",
linewidth=2,
)
# Draw orientation frames at selected points
frame_step = max(1, num_steps // 20)
for i in range(0, num_steps, frame_step):
R = orientations[i]
pos = positions[i]
scale = 0.01
# X-axis (red)
ax.quiver(
pos[0],
pos[1],
pos[2],
R[0, 0],
R[1, 0],
R[2, 0],
length=scale,
color="r",
alpha=0.8,
)
# Y-axis (green)
ax.quiver(
pos[0],
pos[1],
pos[2],
R[0, 1],
R[1, 1],
R[2, 1],
length=scale,
color="g",
alpha=0.8,
)
# Z-axis (blue)
ax.quiver(
pos[0],
pos[1],
pos[2],
R[0, 2],
R[1, 2],
R[2, 2],
length=scale,
color="b",
alpha=0.8,
)
ax.set_xlabel("X Position")
ax.set_ylabel("Y Position")
ax.set_zlabel("Z Position")
ax.legend()
plt.show()
[docs] def plan_trajectory(self, start_position, target_position, obstacle_points):
"""
Enhanced trajectory planning with collision avoidance.
Args:
start_position (list): Initial joint configuration.
target_position (list): Desired joint configuration.
obstacle_points (list): List of obstacle points in the environment.
Returns:
list: Joint trajectory as a list of joint configurations.
"""
logger.info(
f"Planning trajectory from {len(start_position)} to {len(target_position)} DOF"
)
# Enhanced trajectory planning with multiple waypoints
# This is a simple interpolation - can be extended with RRT*, PRM, etc.
num_waypoints = 5
joint_trajectory = []
start_pos = np.array(start_position)
target_pos = np.array(target_position)
for i in range(num_waypoints + 1):
alpha = i / num_waypoints
waypoint = (1 - alpha) * start_pos + alpha * target_pos
# Simple collision avoidance - move away from obstacles
if obstacle_points and self.potential_field:
for _ in range(10): # Max adjustment iterations
gradient = self.potential_field.compute_gradient(
waypoint, target_pos, obstacle_points
)
waypoint -= 0.01 * gradient
# Check if waypoint is collision-free
if self.collision_checker:
if not self.collision_checker.check_collision(waypoint):
break
joint_trajectory.append(waypoint.tolist())
logger.info(f"Planned trajectory with {len(joint_trajectory)} waypoints")
return joint_trajectory
# Maintain backward compatibility with original class name
[docs]class TrajectoryPlanning(OptimizedTrajectoryPlanning):
"""
Backward compatibility alias for OptimizedTrajectoryPlanning.
This ensures existing code continues to work while providing
access to all optimizations.
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
logger.info("Using OptimizedTrajectoryPlanning (backward compatibility mode)")
# Enhanced utility functions for advanced users
[docs]def create_optimized_planner(
serial_manipulator,
urdf_path,
dynamics,
joint_limits,
torque_limits=None,
target_speedup=40.0,
gpu_memory_mb=None,
enable_profiling=False,
kernel_type="auto",
):
"""
Enhanced factory function to create an optimized trajectory planner.
Args:
serial_manipulator: SerialManipulator instance
urdf_path: Path to URDF file
dynamics: ManipulatorDynamics instance
joint_limits: Joint limits
torque_limits: Torque limits (optional)
target_speedup: Target speedup over CPU (default: 40x)
gpu_memory_mb: GPU memory pool size in MB (optional)
enable_profiling: Enable CUDA profiling (optional)
kernel_type: Kernel selection strategy (optional)
Returns:
OptimizedTrajectoryPlanning: Configured planner instance
"""
# Auto-detect optimal settings
cuda_available = check_cuda_availability()
# Adaptive threshold based on target speedup and problem size
num_joints = len(joint_limits)
if cuda_available:
gpu_props = get_gpu_properties()
if gpu_props:
sm_count = gpu_props["multiprocessor_count"]
if target_speedup >= 40:
threshold = max(50, int(sm_count * 10000 / num_joints))
elif target_speedup >= 20:
threshold = max(50, int(sm_count * 5000 / num_joints))
else:
threshold = max(50, int(sm_count * 1000 / num_joints))
else:
threshold = 1000
else:
threshold = float("inf") # Never use GPU if not available
# Create planner with optimized settings
planner = OptimizedTrajectoryPlanning(
serial_manipulator=serial_manipulator,
urdf_path=urdf_path,
dynamics=dynamics,
joint_limits=joint_limits,
torque_limits=torque_limits,
use_cuda=None, # Auto-detect
cuda_threshold=threshold,
memory_pool_size_mb=gpu_memory_mb,
enable_profiling=enable_profiling,
auto_optimize=True,
kernel_type=kernel_type,
target_speedup=target_speedup,
)
logger.info(
f"Created optimized planner for {num_joints} joints, "
f"target: {target_speedup}x speedup, CUDA: {cuda_available}"
)
return planner
[docs]def compare_implementations(
serial_manipulator,
urdf_path,
dynamics,
joint_limits,
test_params=None,
detailed_analysis=True,
):
"""
Enhanced implementation comparison with detailed kernel analysis.
Args:
serial_manipulator: SerialManipulator instance
urdf_path: Path to URDF file
dynamics: ManipulatorDynamics instance
joint_limits: Joint limits
test_params: Test parameters (optional)
detailed_analysis: Whether to perform detailed kernel comparison
Returns:
dict: Comprehensive comparison results
"""
if test_params is None:
test_params = {"N": 5000, "Tf": 2.0, "method": 5, "num_runs": 5}
# Create CPU-only planner
cpu_planner = OptimizedTrajectoryPlanning(
serial_manipulator=serial_manipulator,
urdf_path=urdf_path,
dynamics=dynamics,
joint_limits=joint_limits,
use_cuda=False,
)
# Create GPU planner (if available)
gpu_planner = None
if check_cuda_availability():
gpu_planner = OptimizedTrajectoryPlanning(
serial_manipulator=serial_manipulator,
urdf_path=urdf_path,
dynamics=dynamics,
joint_limits=joint_limits,
use_cuda=True,
cuda_threshold=0, # Force GPU usage
kernel_type="auto_tune",
)
# Generate test data
num_joints = len(joint_limits)
thetastart = np.random.uniform(-1, 1, num_joints).astype(np.float32)
thetaend = np.random.uniform(-1, 1, num_joints).astype(np.float32)
results = {"cpu": {}, "gpu": {}}
# Test CPU implementation
logger.info("Testing CPU implementation...")
cpu_times = []
for run in range(test_params.get("num_runs", 3)):
start_time = time.time()
cpu_result = cpu_planner.joint_trajectory(
thetastart,
thetaend,
test_params["Tf"],
test_params["N"],
test_params["method"],
)
cpu_times.append(time.time() - start_time)
cpu_mean_time = np.mean(cpu_times)
results["cpu"] = {
"mean_time": cpu_mean_time,
"std_time": np.std(cpu_times),
"min_time": np.min(cpu_times),
"max_time": np.max(cpu_times),
"result_shape": cpu_result["positions"].shape,
"stats": cpu_planner.get_performance_stats(),
}
# Test GPU implementation (if available)
if gpu_planner is not None:
logger.info("Testing GPU implementation...")
# Test different kernels if detailed analysis requested
if detailed_analysis:
kernel_results = gpu_planner.benchmark_all_kernels(
N=test_params["N"],
num_joints=num_joints,
num_runs=test_params.get("num_runs", 3),
)
results["kernel_comparison"] = kernel_results
# Test best configuration
gpu_times = []
for run in range(test_params.get("num_runs", 3)):
start_time = time.time()
gpu_result = gpu_planner.joint_trajectory(
thetastart,
thetaend,
test_params["Tf"],
test_params["N"],
test_params["method"],
)
gpu_times.append(time.time() - start_time)
gpu_mean_time = np.mean(gpu_times)
speedup = cpu_mean_time / gpu_mean_time if gpu_mean_time > 0 else 0
results["gpu"] = {
"mean_time": gpu_mean_time,
"std_time": np.std(gpu_times),
"min_time": np.min(gpu_times),
"max_time": np.max(gpu_times),
"result_shape": gpu_result["positions"].shape,
"stats": gpu_planner.get_performance_stats(),
"speedup": speedup,
}
# Compare accuracy
pos_diff = np.abs(cpu_result["positions"] - gpu_result["positions"])
vel_diff = np.abs(cpu_result["velocities"] - gpu_result["velocities"])
acc_diff = np.abs(cpu_result["accelerations"] - gpu_result["accelerations"])
results["accuracy"] = {
"max_pos_diff": np.max(pos_diff),
"max_vel_diff": np.max(vel_diff),
"max_acc_diff": np.max(acc_diff),
"mean_pos_diff": np.mean(pos_diff),
"mean_vel_diff": np.mean(vel_diff),
"mean_acc_diff": np.mean(acc_diff),
}
# Print comprehensive results
print(f"\n🚀 Implementation Comparison Results:")
print("=" * 50)
print(
f"CPU Time: {cpu_mean_time*1000:.2f} ± {results['cpu']['std_time']*1000:.2f} ms"
)
print(
f"GPU Time: {gpu_mean_time*1000:.2f} ± {results['gpu']['std_time']*1000:.2f} ms"
)
print(f"Speedup: {speedup:.1f}x")
print(f"Max Position Error: {results['accuracy']['max_pos_diff']:.2e}")
print(f"Mean Position Error: {results['accuracy']['mean_pos_diff']:.2e}")
if speedup >= 40:
print("🎯 Achieved 40x+ speedup target!")
elif speedup >= 20:
print("⚡ Good speedup achieved!")
elif speedup >= 5:
print("✅ Moderate speedup achieved")
else:
print("⚠️ Limited speedup - consider larger problem sizes")
logger.info(f"GPU speedup: {speedup:.2f}x")
else:
results["gpu"] = {"available": False}
logger.info("GPU not available for comparison")
return results
def benchmark_kernel_performance_comprehensive(
serial_manipulator, urdf_path, dynamics, joint_limits, test_sizes=None, num_runs=5
):
"""
Comprehensive kernel performance benchmarking across multiple problem sizes.
Args:
serial_manipulator: SerialManipulator instance
urdf_path: Path to URDF file
dynamics: ManipulatorDynamics instance
joint_limits: Joint limits
test_sizes: List of (N, joints) tuples to test
num_runs: Number of runs per test
Returns:
dict: Comprehensive benchmark results
"""
if not check_cuda_availability():
logger.warning("CUDA not available for comprehensive benchmarking")
return {}
if test_sizes is None:
test_sizes = [
(1000, 6),
(5000, 6),
(10000, 6),
(20000, 6),
(1000, 12),
(5000, 12),
(10000, 12),
]
print("\n🔬 Comprehensive Kernel Performance Benchmarking")
print("=" * 60)
all_results = {}
for N, joints in test_sizes:
logger.info(f"Testing N={N}, joints={joints}")
# Create optimized planner
planner = OptimizedTrajectoryPlanning(
serial_manipulator=serial_manipulator,
urdf_path=urdf_path,
dynamics=dynamics,
joint_limits=joint_limits[:joints], # Use subset of joints
use_cuda=True,
cuda_threshold=0,
kernel_type="auto_tune",
)
# Benchmark all kernels for this problem size
kernel_results = planner.benchmark_all_kernels(
N=N, num_joints=joints, num_runs=num_runs
)
all_results[f"N{N}_J{joints}"] = {
"N": N,
"joints": joints,
"total_elements": N * joints,
"kernel_results": kernel_results,
}
# Find best kernel for this size
if kernel_results:
best_kernel = min(
kernel_results.keys(), key=lambda k: kernel_results[k]["mean_time"]
)
best_time = kernel_results[best_kernel]["mean_time"]
throughput = (N * joints) / best_time / 1e6
print(
f"N={N:5d}, J={joints:2d}: Best={best_kernel:<15} "
f"Time={best_time*1000:6.2f}ms Throughput={throughput:6.1f}M/s"
)
return all_results
# Export important classes and functions
__all__ = [
"OptimizedTrajectoryPlanning",
"TrajectoryPlanning", # Backward compatibility
"create_optimized_planner",
"compare_implementations",
"benchmark_kernel_performance_comprehensive",
]