#!/usr/bin/env python3
# SPDX-License-Identifier: AGPL-3.0-or-later
"""
Enhanced CUDA Kernels Module - ManipulaPy
This module provides highly optimized CUDA-accelerated functions for trajectory planning and dynamics
computation optimized for 40x+ speedups over CPU implementations.
Key optimizations:
- Advanced 2D/3D grid parallelization with optimal occupancy
- Shared memory utilization for 6x6 matrix operations
- Register-optimized computations avoiding memory spills
- Vectorized kernels processing multiple elements per thread
- Memory-bandwidth optimized kernels for large problems
- Adaptive launch configurations based on problem size
- Pinned memory transfers for improved PCIe bandwidth
- Fused kernels reducing memory bandwidth requirements
- Cache-friendly memory access patterns
- Warp-level optimizations for maximum throughput
Copyright (c) 2025 Mohamed Aboelnasr
Licensed under the GNU Affero General Public License v3.0 or later (AGPL-3.0-or-later)
"""
import logging
import math
import os
import warnings
from functools import lru_cache
from time import perf_counter
from typing import Any, Dict, NoReturn, Optional, Tuple
import numpy as np
from numba import config as _nb_cfg
# Configure numba for optimal performance
_nb_cfg.CUDA_CACHE_SIZE = "2048" # Increased cache size for better compilation reuse
_nb_cfg.CUDA_LOW_OCCUPANCY_WARNINGS = False # Disable warnings for specialized kernels
# Environment toggle for fast math operations
FAST_MATH = bool(int(os.getenv("MANIPULAPY_FASTMATH", "1")))
# Setup logging
logger = logging.getLogger(__name__)
# ENHANCED CUDA DETECTION WITH COMPREHENSIVE ERROR HANDLING
def _cuda_safe_to_probe() -> bool:
"""Check whether the CUDA driver can be initialized without crashing.
A mismatched or broken NVIDIA driver can raise a hardware-level
``SIGSEGV`` *inside* numba's C driver call (e.g. ``cuCtxGetCurrent``),
which a Python ``try``/``except`` in this process cannot catch — it would
abort the whole interpreter at import time. To stay safe we run the risky
initialization in a throwaway subprocess: if the child segfaults or hangs,
only the child dies and we fall back to CPU instead of crashing the import.
Set ``MANIPULAPY_SKIP_CUDA_PROBE=1`` to skip this check (e.g. when the
subprocess cost is undesirable and the driver is known good).
Returns:
bool: ``True`` if a child process initialized CUDA cleanly, else ``False``.
"""
if os.getenv("NUMBA_DISABLE_CUDA", "0") == "1":
return False
if os.getenv("MANIPULAPY_SKIP_CUDA_PROBE", "0") == "1":
return True
import subprocess
import sys
probe = (
"from numba import cuda\n"
"import numpy as np\n"
"assert cuda.is_available()\n"
"cuda.list_devices()\n"
"cuda.get_current_device()\n"
"d = cuda.device_array(8, dtype=np.float32)\n"
"cuda.synchronize()\n"
)
try:
proc = subprocess.run(
[sys.executable, "-c", probe],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
timeout=30,
)
return proc.returncode == 0
except Exception:
return False
def _detect_cuda_capability() -> Tuple[bool, Any, Any, Any, Optional[str]]:
"""
Comprehensive CUDA detection with detailed diagnostics and error handling.
Returns:
tuple: (cuda_available, cuda_module, float32, int32, error_message)
"""
try:
# Step 0: Probe the driver in a subprocess first. A broken driver can
# SIGSEGV inside numba's C call, which try/except here cannot catch, so
# we never touch it in-process unless a sacrificial child survived.
if not _cuda_safe_to_probe():
return (
False,
None,
None,
None,
"CUDA driver probe failed (unavailable or driver crash) - using CPU",
)
# Step 1: Import numba.cuda with proper error handling
from numba import cuda, float32, int32
# Step 2: Check basic CUDA availability
try:
cuda_available = cuda.is_available()
if not cuda_available:
return (
False,
None,
None,
None,
"CUDA runtime not available - likely no GPU or driver issues",
)
except Exception as e:
return False, None, None, None, f"CUDA availability check failed: {e}"
# Step 3: Verify device detection
try:
devices = cuda.list_devices()
if not devices:
return False, None, None, None, "No CUDA devices detected"
except Exception as e:
return False, None, None, None, f"Device enumeration failed: {e}"
# Step 4: Test basic GPU operations
try:
# Test memory allocation
test_array = cuda.device_array(100, dtype=np.float32)
# Test basic kernel compilation
@cuda.jit
def test_kernel(arr) -> None:
"""Write each thread's index into the array to validate execution.
Args:
arr: 1D device array (in-place output buffer); each element
``arr[idx]`` is set to ``float(idx)`` for the thread's
global grid index.
"""
idx = cuda.grid(1)
if idx < arr.shape[0]:
arr[idx] = float32(idx)
# Get device properties
current_device = cuda.get_current_device()
sm_count = current_device.MULTIPROCESSOR_COUNT
max_threads = current_device.MAX_THREADS_PER_BLOCK
# Test kernel execution
test_kernel[1, 64](test_array)
cuda.synchronize()
# Verify results
result = test_array.copy_to_host()
if not np.allclose(result[:10], np.arange(10, dtype=np.float32)):
return False, None, None, None, "CUDA kernel execution test failed"
del test_array
logger.info(
f"✅ CUDA fully operational: {len(devices)} device(s), {sm_count} SMs, {max_threads} max threads/block"
)
return True, cuda, float32, int32, None
except Exception as e:
return False, None, None, None, f"CUDA functionality test failed: {e}"
except ImportError as e:
return False, None, None, None, f"numba.cuda import failed: {e}"
except Exception as e:
return False, None, None, None, f"Unexpected CUDA detection error: {e}"
# Perform CUDA detection
CUDA_AVAILABLE, cuda, float32, int32, _cuda_error = _detect_cuda_capability()
# Mock CUDA objects for graceful degradation
if not CUDA_AVAILABLE:
class MockCuda:
@staticmethod
def jit(func=None, device=False, inline=False, fastmath=False) -> Any:
"""Return a stub decorator whose wrapped kernel raises on call.
Args:
func: Kernel function to wrap, or None when used with arguments
as a decorator factory.
device: Ignored stub flag mirroring ``numba.cuda.jit`` for a
device function.
inline: Ignored stub flag mirroring ``numba.cuda.jit`` inlining.
fastmath: Ignored stub flag mirroring ``numba.cuda.jit`` fast-math.
Returns:
A wrapper callable that raises ``RuntimeError`` on invocation, or
the same wrapper already applied to ``func`` when ``func`` is given.
"""
def wrapper(*args, **kwargs) -> NoReturn:
"""Raise because no CUDA device is available to run the kernel."""
raise RuntimeError(
f"CUDA not available: {_cuda_error}\n"
"For 40x+ speedups, install CUDA support:\n"
"1. Install NVIDIA drivers: nvidia-smi\n"
"2. Install CUDA toolkit (11.8+ or 12.0+)\n"
"3. Install ManipulaPy with GPU support:\n"
" pip install ManipulaPy[gpu-cuda12]\n"
"4. Verify: python -c 'from numba import cuda; print(cuda.is_available())'"
)
return wrapper if func is None else wrapper(func)
@staticmethod
def grid(dim) -> int:
"""Return 0 as the thread index since no real grid exists.
Args:
dim: Grid dimensionality requested (1, 2, or 3); ignored by the
CUDA-less stub.
Returns:
int: Always 0, the only valid index in the degenerate fallback.
"""
return 0
@staticmethod
def device_array(*args, **kwargs) -> NoReturn:
"""Raise because device memory cannot be allocated without CUDA."""
raise RuntimeError(f"CUDA not available: {_cuda_error}")
@staticmethod
def to_device(*args, **kwargs) -> NoReturn:
"""Raise because host-to-device transfer needs an unavailable CUDA device."""
raise RuntimeError(f"CUDA not available: {_cuda_error}")
@staticmethod
def pinned_array(*args, **kwargs) -> NoReturn:
"""Raise because pinned host memory cannot be allocated without CUDA."""
raise RuntimeError(f"CUDA not available: {_cuda_error}")
@staticmethod
def is_available() -> bool:
"""Report that CUDA is not available."""
return False
@staticmethod
def list_devices() -> list:
"""Return an empty device list since no CUDA device exists."""
return []
@staticmethod
def synchronize() -> None:
"""No-op synchronization stub for the CUDA-less fallback."""
pass
@staticmethod
def get_current_device() -> Any:
"""Return a mock device exposing minimal hardware property defaults."""
class MockDevice:
MULTIPROCESSOR_COUNT = 1
MAX_THREADS_PER_BLOCK = 1024
MAX_SHARED_MEMORY_PER_BLOCK = 48 * 1024
MAX_BLOCK_DIM_X = 1024
MAX_BLOCK_DIM_Y = 1024
WARP_SIZE = 32
COMPUTE_CAPABILITY = (6, 0)
return MockDevice()
@staticmethod
def shared() -> Any:
"""Return a mock shared-memory namespace whose array() raises."""
class SharedMock:
@staticmethod
def array(*args, **kwargs) -> NoReturn:
"""Raise because shared memory needs an unavailable CUDA device."""
raise RuntimeError(f"CUDA not available: {_cuda_error}")
return SharedMock()
blockIdx = type("blockIdx", (), {"x": 0, "y": 0, "z": 0})()
blockDim = type("blockDim", (), {"x": 1, "y": 1, "z": 1})()
threadIdx = type("threadIdx", (), {"x": 0, "y": 0, "z": 0})()
@staticmethod
def syncthreads() -> None:
"""No-op thread-barrier stub for the CUDA-less fallback."""
pass
cuda = MockCuda()
if float32 is None:
float32 = np.float32
if int32 is None:
int32 = np.int32
# Check CuPy availability
try:
import cupy as cp
CUPY_AVAILABLE = True
except ImportError:
CUPY_AVAILABLE = False
cp = None
# Use float32 for optimal GPU performance
float_t = float32
[docs]
def check_cuda_availability() -> bool:
"""Enhanced CUDA availability check with detailed diagnostics."""
if CUDA_AVAILABLE:
try:
devices = cuda.list_devices()
device = cuda.get_current_device()
print(f"✅ CUDA is fully operational!")
print(f"✅ Devices: {len(devices)}")
for i, dev in enumerate(devices):
print(f" Device {i}: {dev}")
sm_count = device.MULTIPROCESSOR_COUNT
max_threads = device.MAX_THREADS_PER_BLOCK
shared_mem = device.MAX_SHARED_MEMORY_PER_BLOCK
print(f"✅ Current device specs:")
print(f" SMs: {sm_count}")
print(f" Max threads/block: {max_threads}")
print(f" Shared memory: {shared_mem//1024}KB")
# Performance recommendations
min_N_for_40x = sm_count * 256 * 4 # 4 blocks per SM, 256 threads each
print(f"💡 For 40x+ speedup, use N ≥ {min_N_for_40x:,} trajectory points")
return True
except Exception as e:
print(f"⚠️ CUDA available but device query failed: {e}")
return True
else:
print(f"❌ CUDA not available: {_cuda_error}")
# Provide specific diagnostic help
if "CUDA_ERROR_NO_DEVICE" in str(_cuda_error):
print("\n🔧 No CUDA devices found:")
print("1. Check GPU connection: nvidia-smi")
print("2. Reinstall drivers: sudo apt install nvidia-driver-535")
print("3. Reboot system")
elif "import" in str(_cuda_error).lower():
print("\n🔧 Installation issue:")
print("1. Update numba: pip install --upgrade numba")
print("2. Install CUDA toolkit matching your driver")
print("3. Install ManipulaPy with GPU: pip install ManipulaPy[gpu-cuda12]")
return False
[docs]
def check_cupy_availability() -> bool:
"""Check CuPy availability for additional GPU operations."""
if not CUPY_AVAILABLE:
warnings.warn(
"CuPy not available. Install with: pip install cupy-cuda12x",
UserWarning,
stacklevel=2,
)
return CUPY_AVAILABLE
# Pinned-memory opt-in: numba.cuda.pinned_array() segfaults (SIGSEGV, not a
# catchable Python exception) on certain numba+driver combinations — e.g.
# numba 0.65 + NVIDIA driver 580 (CUDA 13 ABI). The crash happens in
# numba/cuda/api.py during cuMemHostRegister, before the try/except below
# can ever fire. Keep the path off by default so users on broken combos
# don't lose their Python process; opt back in with
# MANIPULAPY_USE_PINNED_MEMORY=1 when the combo is known good (numba <=
# 0.59 with driver 535, or numba 0.66+ with the upstream fix landed).
_PINNED_MEMORY_OPT_IN = os.environ.get("MANIPULAPY_USE_PINNED_MEMORY", "0").lower() in (
"1",
"true",
"yes",
)
# ENHANCED MEMORY MANAGEMENT
[docs]
def _h2d_pinned(arr: np.ndarray) -> Any:
"""Host-to-device transfer with optional pinned-memory acceleration.
Pinned memory delivers ~3x peak transfer bandwidth on large arrays, but
``cuda.pinned_array`` is currently incompatible with several modern
numba+driver combinations (see ``_PINNED_MEMORY_OPT_IN`` above). Plain
``cuda.to_device`` is correct on every supported configuration; pinned
transfers are a pure performance optimisation that must be opted in.
Args:
arr: Host ndarray to copy to the device. Forced to C-contiguous layout
if it is not already.
Returns:
A numba CUDA device array holding a copy of ``arr``.
Raises:
RuntimeError: If CUDA is not available.
"""
if not CUDA_AVAILABLE:
raise RuntimeError("CUDA not available")
# Ensure contiguous memory layout
if not arr.flags["C_CONTIGUOUS"]:
arr = np.ascontiguousarray(arr)
# Safe default: skip pinned memory entirely unless explicitly enabled.
if not _PINNED_MEMORY_OPT_IN:
return cuda.to_device(arr)
# Opt-in pinned-memory path (~3x bandwidth on supported configs).
try:
pinned_arr = cuda.pinned_array(arr.shape, dtype=arr.dtype)
pinned_arr[:] = arr
return cuda.to_device(pinned_arr)
except Exception:
# If pinned_array raised a real Python exception we can still fall
# back; segfaults bypass this branch (process is already gone).
return cuda.to_device(arr)
# OPTIMAL GRID CONFIGURATION FOR 40x+ SPEEDUP
[docs]
def make_1d_grid(
size: int, threads: int = 256
) -> Tuple[Tuple[int, ...], Tuple[int, ...]]:
"""Create optimal 1D grid for maximum GPU utilization.
Args:
size: Total number of elements to cover with one thread each.
threads: Initial thread-block size; overridden internally based on
``size`` for better occupancy.
Returns:
Tuple[Tuple[int, ...], Tuple[int, ...]]: ``(blocks, threads)`` launch
configuration, each a 1-tuple suitable for ``kernel[blocks, threads]``.
"""
if size <= 0:
return (1,), (1,)
# Use larger block sizes for better occupancy
if size >= 10000:
threads = 256 # Optimal for most GPUs
elif size >= 1000:
threads = 128
else:
threads = max(32, 2 ** int(math.log2(size)))
blocks = (size + threads - 1) // threads
return (blocks,), (threads,)
[docs]
def make_2d_grid(
N: int, num_joints: int, block_size: Tuple[int, int] = (128, 8)
) -> Tuple[Tuple[int, int], Tuple[int, int]]:
"""
Create 2D grid configuration for CUDA kernel launch (backward compatibility).
This is the original function maintained for compatibility.
For optimal performance, use make_2d_grid_optimized().
Args:
N: Number of trajectory time steps (X dimension of the grid).
num_joints: Number of joints (Y dimension of the grid).
block_size: Initial ``(threads_x, threads_y)`` block shape; shrunk for
tiny problems and adjusted to reach a minimum block count.
Returns:
Tuple[Tuple[int, int], Tuple[int, int]]: ``(grid, block)`` 2D launch
configuration. Returns ``((1, 1), (1, 1))`` when CUDA is unavailable.
"""
if not CUDA_AVAILABLE:
return ((1, 1), (1, 1))
# Original logic for backward compatibility
threads_x, threads_y = block_size
# Shrink block if the problem is tiny
threads_x = max(4, 1 << int(math.log2(max(1, min(threads_x, N)))))
threads_y = max(4, 1 << int(math.log2(max(1, min(threads_y, num_joints)))))
def grid_dims(tx: int, ty: int) -> Tuple[int, int]:
"""Compute block counts for a candidate 2D thread shape.
Args:
tx: Threads per block along the X (time) dimension.
ty: Threads per block along the Y (joint) dimension.
Returns:
Tuple[int, int]: Number of blocks ``(blocks_x, blocks_y)`` needed
to cover ``N`` time steps and ``num_joints`` joints.
"""
return ((N + tx - 1) // tx, (num_joints + ty - 1) // ty)
blocks_x, blocks_y = grid_dims(threads_x, threads_y)
total_blocks = blocks_x * blocks_y
# Target ≥ 2 × SM blocks for decent load
try:
sm_count = (
cuda.get_current_device().MULTIPROCESSOR_COUNT if CUDA_AVAILABLE else 16
)
max_threads_per_block = (
cuda.get_current_device().MAX_THREADS_PER_BLOCK if CUDA_AVAILABLE else 1024
)
except Exception:
sm_count = 16 # Fallback
max_threads_per_block = 1024
min_blocks = sm_count * 2
# Keep halving X and Y until we hit the target
toggle = 0
while total_blocks < min_blocks:
if toggle == 0 and threads_x > 4:
threads_x //= 2
elif toggle == 1 and threads_y > 4:
threads_y //= 2
else:
break
toggle ^= 1
# Keep within HW limit
if threads_x * threads_y > max_threads_per_block:
if threads_x >= threads_y and threads_x > 4:
threads_x //= 2
elif threads_y > 4:
threads_y //= 2
blocks_x, blocks_y = grid_dims(threads_x, threads_y)
total_blocks = blocks_x * blocks_y
return (blocks_x, blocks_y), (threads_x, threads_y)
def make_2d_grid_optimized(
N: int, num_joints: int, target_occupancy: float = 0.75
) -> Tuple[Tuple[int, int], Tuple[int, int]]:
"""
Create optimal 2D grid configuration targeting specific occupancy for 40x+ speedup.
Args:
N: Number of trajectory points
num_joints: Number of joints
target_occupancy: Target GPU occupancy (0.5-1.0)
Returns:
tuple: ((blocks_x, blocks_y), (threads_x, threads_y))
"""
if not CUDA_AVAILABLE:
return ((1, 1), (1, 1))
device = cuda.get_current_device()
sm_count = device.MULTIPROCESSOR_COUNT
max_threads_per_block = device.MAX_THREADS_PER_BLOCK
# Calculate optimal block size based on problem characteristics
total_work = N * num_joints
if total_work >= 1000000: # Large problems (1M+ elements)
# Use maximum threads per block for best throughput
threads_x = min(256, N)
threads_y = min(max_threads_per_block // threads_x, num_joints)
elif total_work >= 100000: # Medium-large problems
threads_x = min(128, N)
threads_y = min(max_threads_per_block // threads_x, num_joints)
elif total_work >= 10000: # Medium problems
threads_x = min(64, N)
threads_y = min(max_threads_per_block // threads_x, num_joints)
else: # Small problems
threads_x = min(32, N)
threads_y = min(max_threads_per_block // threads_x, num_joints)
# Ensure threads are multiples of warp size (32) for optimal performance
threads_x = max(32, (threads_x // 32) * 32)
threads_y = max(1, threads_y)
# Recalculate if we exceed max threads
while threads_x * threads_y > max_threads_per_block:
if threads_x > threads_y and threads_x > 32:
threads_x = max(32, threads_x - 32)
elif threads_y > 1:
threads_y -= 1
else:
break
# Calculate grid dimensions
blocks_x = (N + threads_x - 1) // threads_x
blocks_y = (num_joints + threads_y - 1) // threads_y
# Ensure sufficient blocks for target occupancy
total_blocks = blocks_x * blocks_y
min_blocks_needed = int(sm_count * target_occupancy * 4) # 4 blocks per SM target
if total_blocks < min_blocks_needed:
# Adjust block size to increase block count
scale_factor = math.sqrt(min_blocks_needed / total_blocks)
threads_x = max(32, int(threads_x / scale_factor))
threads_y = max(1, int(threads_y / scale_factor))
# Recalculate
blocks_x = (N + threads_x - 1) // threads_x
blocks_y = (num_joints + threads_y - 1) // threads_y
return ((blocks_x, blocks_y), (threads_x, threads_y))
[docs]
def get_gpu_properties() -> Optional[Dict[str, Any]]:
"""Get comprehensive GPU properties for optimization."""
if not CUDA_AVAILABLE:
return None
try:
device = cuda.get_current_device()
return {
"multiprocessor_count": device.MULTIPROCESSOR_COUNT,
"max_threads_per_block": device.MAX_THREADS_PER_BLOCK,
"max_shared_memory_per_block": device.MAX_SHARED_MEMORY_PER_BLOCK,
"max_block_dim_x": device.MAX_BLOCK_DIM_X,
"max_block_dim_y": device.MAX_BLOCK_DIM_Y,
"warp_size": getattr(device, "WARP_SIZE", 32),
"compute_capability": getattr(device, "COMPUTE_CAPABILITY", (6, 0)),
"memory_bandwidth_peak_gb_s": 500, # Approximate, varies by GPU
}
except Exception:
return None
# CPU FALLBACK IMPLEMENTATION
[docs]
def trajectory_cpu_fallback(
thetastart: np.ndarray,
thetaend: np.ndarray,
Tf: float,
N: int,
method: int,
) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
"""Optimized CPU fallback using NumPy vectorization.
Args:
thetastart: (num_joints,) ndarray of starting joint angles, radians.
thetaend: (num_joints,) ndarray of ending joint angles, radians.
Tf: Total trajectory duration, seconds. Values <= 0 collapse to the
start configuration with zero velocity and acceleration.
N: Number of trajectory time steps. Values <= 1 collapse to the start
configuration.
method: Time-scaling polynomial order: 3 for cubic, 5 for quintic, any
other value (e.g. 1) for linear.
Returns:
Tuple[np.ndarray, np.ndarray, np.ndarray]: ``(traj_pos, traj_vel,
traj_acc)``, each an ``(N, num_joints)`` float32 ndarray of joint
positions (radians), velocities (radians/s), and accelerations
(radians/s^2).
"""
num_joints = len(thetastart)
# Degenerate inputs collapse to "sit at start": s = s_dot = s_ddot = 0.
# Matches the GPU kernels' N<=1 guard and avoids the divide-by-zero
# RuntimeWarning when callers pass Tf=0 (regression coverage in
# test_zero_time_trajectory accepts NaN but the warning is noisy).
if N <= 1 or Tf <= 0.0:
s = np.zeros(N, dtype=np.float32)
s_dot = np.zeros(N, dtype=np.float32)
s_ddot = np.zeros(N, dtype=np.float32)
else:
# Vectorized time computation
t = np.linspace(0, Tf, N, dtype=np.float32)
tau = t / Tf
# Vectorized time scaling
if method == 3: # Cubic
s = 3.0 * tau**2 - 2.0 * tau**3
s_dot = 6.0 * tau * (1.0 - tau) / Tf
s_ddot = 6.0 * (1.0 - 2.0 * tau) / (Tf * Tf)
elif method == 5: # Quintic
tau2 = tau**2
tau3 = tau**3
tau4 = tau**4
tau5 = tau**5
s = 10.0 * tau3 - 15.0 * tau4 + 6.0 * tau5
s_dot = (30.0 * tau2 - 60.0 * tau3 + 30.0 * tau4) / Tf
s_ddot = (60.0 * tau - 180.0 * tau2 + 120.0 * tau3) / (Tf * Tf)
else: # Linear (method == 1) and any other value
s = tau
s_dot = np.ones_like(tau) / Tf
s_ddot = np.zeros_like(tau)
# Vectorized trajectory computation
delta = thetaend - thetastart
traj_pos = thetastart[np.newaxis, :] + s[:, np.newaxis] * delta[np.newaxis, :]
traj_vel = s_dot[:, np.newaxis] * delta[np.newaxis, :]
traj_acc = s_ddot[:, np.newaxis] * delta[np.newaxis, :]
return (
traj_pos.astype(np.float32),
traj_vel.astype(np.float32),
traj_acc.astype(np.float32),
)
# CUDA KERNELS FOR 40x+ SPEEDUP
if CUDA_AVAILABLE:
jit_kwargs = {"fastmath": FAST_MATH}
@cuda.jit(device=True, inline=True, **jit_kwargs)
def matrix_vector_multiply_6x6(M, v, result) -> None:
"""Optimized 6x6 matrix-vector multiplication using registers.
Args:
M: (6, 6) device array, the matrix operand.
v: (6,) device array, the vector operand.
result: (6,) device array, in-place output buffer set to ``M @ v``.
"""
# Unrolled for maximum performance
result[0] = (
M[0, 0] * v[0]
+ M[0, 1] * v[1]
+ M[0, 2] * v[2]
+ M[0, 3] * v[3]
+ M[0, 4] * v[4]
+ M[0, 5] * v[5]
)
result[1] = (
M[1, 0] * v[0]
+ M[1, 1] * v[1]
+ M[1, 2] * v[2]
+ M[1, 3] * v[3]
+ M[1, 4] * v[4]
+ M[1, 5] * v[5]
)
result[2] = (
M[2, 0] * v[0]
+ M[2, 1] * v[1]
+ M[2, 2] * v[2]
+ M[2, 3] * v[3]
+ M[2, 4] * v[4]
+ M[2, 5] * v[5]
)
result[3] = (
M[3, 0] * v[0]
+ M[3, 1] * v[1]
+ M[3, 2] * v[2]
+ M[3, 3] * v[3]
+ M[3, 4] * v[4]
+ M[3, 5] * v[5]
)
result[4] = (
M[4, 0] * v[0]
+ M[4, 1] * v[1]
+ M[4, 2] * v[2]
+ M[4, 3] * v[3]
+ M[4, 4] * v[4]
+ M[4, 5] * v[5]
)
result[5] = (
M[5, 0] * v[0]
+ M[5, 1] * v[1]
+ M[5, 2] * v[2]
+ M[5, 3] * v[3]
+ M[5, 4] * v[4]
+ M[5, 5] * v[5]
)
@cuda.jit(**jit_kwargs)
def trajectory_kernel(
thetastart, thetaend, traj_pos, traj_vel, traj_acc, Tf, N, method
) -> None:
"""Each thread computes its own time scaling — no shared memory race.
Args:
thetastart: (num_joints,) device array of starting joint angles, radians.
thetaend: (num_joints,) device array of ending joint angles, radians.
traj_pos: (N, num_joints) device array, in-place output buffer for
joint positions, radians.
traj_vel: (N, num_joints) device array, in-place output buffer for
joint velocities, radians/s.
traj_acc: (N, num_joints) device array, in-place output buffer for
joint accelerations, radians/s^2.
Tf: Total trajectory duration, seconds.
N: Number of trajectory time steps.
method: Time-scaling order: 3 cubic, 5 quintic, else linear.
"""
t_idx = cuda.blockIdx.x * cuda.blockDim.x + cuda.threadIdx.x
j_idx = cuda.blockIdx.y * cuda.blockDim.y + cuda.threadIdx.y
if t_idx >= N or j_idx >= thetastart.shape[0]:
return
tau = 0.0 if N <= 1 else t_idx / (N - 1.0)
if method == 3: # Cubic
tau2 = tau * tau
tau3 = tau2 * tau
s = 3.0 * tau2 - 2.0 * tau3
s_dot = 6.0 * tau * (1.0 - tau) / Tf
s_ddot = 6.0 * (1.0 - 2.0 * tau) / (Tf * Tf)
elif method == 5: # Quintic
tau2 = tau * tau
tau3 = tau2 * tau
tau4 = tau2 * tau2
tau5 = tau4 * tau
s = 10.0 * tau3 - 15.0 * tau4 + 6.0 * tau5
s_dot = (30.0 * tau2 - 60.0 * tau3 + 30.0 * tau4) / Tf
s_ddot = (60.0 * tau - 180.0 * tau2 + 120.0 * tau3) / (Tf * Tf)
else: # Linear
s = tau
s_dot = 1.0 / Tf
s_ddot = 0.0
start_angle = thetastart[j_idx]
delta_angle = thetaend[j_idx] - start_angle
traj_pos[t_idx, j_idx] = start_angle + s * delta_angle
traj_vel[t_idx, j_idx] = s_dot * delta_angle
traj_acc[t_idx, j_idx] = s_ddot * delta_angle
@cuda.jit(**jit_kwargs)
def trajectory_kernel_vectorized(
thetastart, thetaend, traj_pos, traj_vel, traj_acc, Tf, N, method
) -> None:
"""
FIXED: Vectorized trajectory kernel with correct 8-parameter signature.
Each thread processes multiple time steps for better throughput.
Args:
thetastart: (num_joints,) device array of starting joint angles, radians.
thetaend: (num_joints,) device array of ending joint angles, radians.
traj_pos: (N, num_joints) device array, in-place output buffer for
joint positions, radians.
traj_vel: (N, num_joints) device array, in-place output buffer for
joint velocities, radians/s.
traj_acc: (N, num_joints) device array, in-place output buffer for
joint accelerations, radians/s^2.
Tf: Total trajectory duration, seconds.
N: Number of trajectory time steps.
method: Time-scaling order: 3 cubic, 5 quintic, else linear.
"""
VECTOR_SIZE = 8 # Each thread processes 8 time steps
t_base = (cuda.blockIdx.x * cuda.blockDim.x + cuda.threadIdx.x) * VECTOR_SIZE
j_idx = cuda.blockIdx.y * cuda.blockDim.y + cuda.threadIdx.y
if j_idx >= thetastart.shape[0]:
return
# Shared memory for joint data
shared_joints = cuda.shared.array((32, 2), dtype=float32)
# Load joint data to shared memory
if cuda.threadIdx.x == 0 and j_idx < 32:
start_val = thetastart[j_idx]
shared_joints[j_idx, 0] = start_val
shared_joints[j_idx, 1] = thetaend[j_idx] - start_val
cuda.syncthreads()
# Get joint data
if j_idx < 32:
start_angle = shared_joints[j_idx, 0]
delta_angle = shared_joints[j_idx, 1]
else:
start_angle = thetastart[j_idx]
delta_angle = thetaend[j_idx] - start_angle
# Process VECTOR_SIZE time steps
for i in range(VECTOR_SIZE):
t_idx = t_base + i
if t_idx >= N:
break
# Compute time scaling
tau = 0.0 if N <= 1 else t_idx / (N - 1.0)
if method == 5: # Quintic - optimized computation
tau2 = tau * tau
tau3 = tau2 * tau
s = tau3 * (10.0 - 15.0 * tau + 6.0 * tau2)
s_dot = tau2 * (30.0 - 60.0 * tau + 30.0 * tau2) / Tf
s_ddot = tau * (60.0 - 180.0 * tau + 120.0 * tau2) / (Tf * Tf)
elif method == 3: # Cubic
tau2 = tau * tau
s = tau2 * (3.0 - 2.0 * tau)
s_dot = 6.0 * tau * (1.0 - tau) / Tf
s_ddot = 6.0 * (1.0 - 2.0 * tau) / (Tf * Tf)
else: # Linear
s = tau
s_dot = 1.0 / Tf
s_ddot = 0.0
# Store results
traj_pos[t_idx, j_idx] = start_angle + s * delta_angle
traj_vel[t_idx, j_idx] = s_dot * delta_angle
traj_acc[t_idx, j_idx] = s_ddot * delta_angle
@cuda.jit(**jit_kwargs)
def trajectory_kernel_memory_optimized(
thetastart, thetaend, traj_pos, traj_vel, traj_acc, Tf, N, method
) -> None:
"""
FIXED: Memory-bandwidth optimized kernel with correct 8-parameter signature.
Uses grid-stride loops for better memory utilization.
Args:
thetastart: (num_joints,) device array of starting joint angles, radians.
thetaend: (num_joints,) device array of ending joint angles, radians.
traj_pos: (N, num_joints) device array, in-place output buffer for
joint positions, radians.
traj_vel: (N, num_joints) device array, in-place output buffer for
joint velocities, radians/s.
traj_acc: (N, num_joints) device array, in-place output buffer for
joint accelerations, radians/s^2.
Tf: Total trajectory duration, seconds.
N: Number of trajectory time steps.
method: Time-scaling order: 3 cubic, 5 quintic, else linear.
"""
# Grid-stride loop for better memory utilization
stride_t = cuda.gridDim.x * cuda.blockDim.x
stride_j = cuda.gridDim.y * cuda.blockDim.y
t_start = cuda.blockIdx.x * cuda.blockDim.x + cuda.threadIdx.x
j_start = cuda.blockIdx.y * cuda.blockDim.y + cuda.threadIdx.y
# Shared memory for coefficients and joint data
shared_data = cuda.shared.array(
(32, 4), dtype=float32
) # [start, delta, s, s_dot]
# Process joints in chunks
for j_idx in range(j_start, thetastart.shape[0], stride_j):
# Load joint data to shared memory
local_j = cuda.threadIdx.y
if local_j < 32 and j_idx < thetastart.shape[0]:
start_val = thetastart[j_idx]
shared_data[local_j, 0] = start_val
shared_data[local_j, 1] = thetaend[j_idx] - start_val
cuda.syncthreads()
# Process time steps with grid-stride
for t_idx in range(t_start, N, stride_t):
# Compute time scaling
tau = 0.0 if N <= 1 else t_idx / (N - 1.0)
if method == 5: # Quintic
tau_sq = tau * tau
tau_cb = tau_sq * tau
s = tau_cb * (10.0 + tau * (-15.0 + 6.0 * tau))
s_dot = tau_sq * (30.0 + tau * (-60.0 + 30.0 * tau)) / Tf
s_ddot = tau * (60.0 + tau * (-180.0 + 120.0 * tau)) / (Tf * Tf)
elif method == 3: # Cubic
tau_sq = tau * tau
s = tau_sq * (3.0 - 2.0 * tau)
s_dot = 6.0 * tau * (1.0 - tau) / Tf
s_ddot = 6.0 * (1.0 - 2.0 * tau) / (Tf * Tf)
else: # Linear
s = tau
s_dot = 1.0 / Tf
s_ddot = 0.0
# Use shared memory data if available
if local_j < 32 and j_idx < thetastart.shape[0]:
start_angle = shared_data[local_j, 0]
delta_angle = shared_data[local_j, 1]
else:
start_angle = (
thetastart[j_idx] if j_idx < thetastart.shape[0] else 0.0
)
delta_angle = (
(thetaend[j_idx] - start_angle)
if j_idx < thetastart.shape[0]
else 0.0
)
# Store results
if j_idx < thetastart.shape[0]:
traj_pos[t_idx, j_idx] = start_angle + s * delta_angle
traj_vel[t_idx, j_idx] = s_dot * delta_angle
traj_acc[t_idx, j_idx] = s_ddot * delta_angle
cuda.syncthreads()
@cuda.jit(**jit_kwargs)
def trajectory_kernel_warp_optimized(
thetastart, thetaend, traj_pos, traj_vel, traj_acc, Tf, N, method
) -> None:
"""
FIXED: Warp-level optimized kernel with correct 8-parameter signature.
Uses warp-level primitives for maximum throughput.
Args:
thetastart: (num_joints,) device array of starting joint angles, radians.
thetaend: (num_joints,) device array of ending joint angles, radians.
traj_pos: (N, num_joints) device array, in-place output buffer for
joint positions, radians.
traj_vel: (N, num_joints) device array, in-place output buffer for
joint velocities, radians/s.
traj_acc: (N, num_joints) device array, in-place output buffer for
joint accelerations, radians/s^2.
Tf: Total trajectory duration, seconds.
N: Number of trajectory time steps.
method: Time-scaling order: 3 cubic, 5 quintic, else linear.
"""
# Warp-level indexing
warp_id = (cuda.blockIdx.x * cuda.blockDim.x + cuda.threadIdx.x) // 32
lane_id = cuda.threadIdx.x % 32
# Each warp processes 32 consecutive time steps
t_base = warp_id * 32
j_idx = cuda.blockIdx.y * cuda.blockDim.y + cuda.threadIdx.y
if j_idx >= thetastart.shape[0]:
return
# Load joint data (broadcast across warp)
start_angle = thetastart[j_idx]
delta_angle = thetaend[j_idx] - start_angle
# Each thread in warp processes one time step
t_idx = t_base + lane_id
if t_idx < N:
# Optimized time scaling computation
tau = 0.0 if N <= 1 else t_idx / (N - 1.0)
if method == 5: # Quintic
tau2 = tau * tau
tau3 = tau2 * tau
s = tau3 * (10.0 - 15.0 * tau + 6.0 * tau2)
s_dot = tau2 * (30.0 - 60.0 * tau + 30.0 * tau2) / Tf
s_ddot = tau * (60.0 - 180.0 * tau + 120.0 * tau2) / (Tf * Tf)
elif method == 3: # Cubic
tau2 = tau * tau
s = tau2 * (3.0 - 2.0 * tau)
s_dot = 6.0 * tau * (1.0 - tau) / Tf
s_ddot = 6.0 * (1.0 - 2.0 * tau) / (Tf * Tf)
else: # Linear
s = tau
s_dot = 1.0 / Tf
s_ddot = 0.0
# Coalesced memory writes
traj_pos[t_idx, j_idx] = start_angle + s * delta_angle
traj_vel[t_idx, j_idx] = s_dot * delta_angle
traj_acc[t_idx, j_idx] = s_ddot * delta_angle
@cuda.jit(**jit_kwargs)
def trajectory_kernel_cache_friendly(
thetastart, thetaend, traj_pos, traj_vel, traj_acc, Tf, N, method
) -> None:
"""
FIXED: Cache-friendly kernel with correct 8-parameter signature.
Uses tiled computation to maximize cache utilization.
Args:
thetastart: (num_joints,) device array of starting joint angles, radians.
thetaend: (num_joints,) device array of ending joint angles, radians.
traj_pos: (N, num_joints) device array, in-place output buffer for
joint positions, radians.
traj_vel: (N, num_joints) device array, in-place output buffer for
joint velocities, radians/s.
traj_acc: (N, num_joints) device array, in-place output buffer for
joint accelerations, radians/s^2.
Tf: Total trajectory duration, seconds.
N: Number of trajectory time steps.
method: Time-scaling order: 3 cubic, 5 quintic, else linear.
"""
TILE_SIZE_T = 64 # Time tile size
TILE_SIZE_J = 8 # Joint tile size
# Block-level tiling
t_tile_start = cuda.blockIdx.x * TILE_SIZE_T
j_tile_start = cuda.blockIdx.y * TILE_SIZE_J
# Thread indices within tile
t_local = cuda.threadIdx.x
j_local = cuda.threadIdx.y
# Global indices
t_idx = t_tile_start + t_local
j_idx = j_tile_start + j_local
# Shared memory for tile data
shared_joints = cuda.shared.array((TILE_SIZE_J, 2), dtype=float32)
shared_time = cuda.shared.array((TILE_SIZE_T, 3), dtype=float32)
# Load joint data to shared memory
if t_local == 0 and j_idx < thetastart.shape[0]:
start_val = thetastart[j_idx]
shared_joints[j_local, 0] = start_val
shared_joints[j_local, 1] = thetaend[j_idx] - start_val
# Load time scaling data to shared memory
if j_local == 0 and t_idx < N:
tau = 0.0 if N <= 1 else t_idx / (N - 1.0)
if method == 5: # Quintic
tau2 = tau * tau
tau3 = tau2 * tau
s = tau3 * (10.0 - 15.0 * tau + 6.0 * tau2)
s_dot = tau2 * (30.0 - 60.0 * tau + 30.0 * tau2) / Tf
s_ddot = tau * (60.0 - 180.0 * tau + 120.0 * tau2) / (Tf * Tf)
elif method == 3: # Cubic
tau2 = tau * tau
s = tau2 * (3.0 - 2.0 * tau)
s_dot = 6.0 * tau * (1.0 - tau) / Tf
s_ddot = 6.0 * (1.0 - 2.0 * tau) / (Tf * Tf)
else: # Linear
s = tau
s_dot = 1.0 / Tf
s_ddot = 0.0
shared_time[t_local, 0] = s
shared_time[t_local, 1] = s_dot
shared_time[t_local, 2] = s_ddot
cuda.syncthreads()
# Compute trajectory using shared memory data
if t_idx < N and j_idx < thetastart.shape[0]:
start_angle = shared_joints[j_local, 0]
delta_angle = shared_joints[j_local, 1]
s = shared_time[t_local, 0]
s_dot = shared_time[t_local, 1]
s_ddot = shared_time[t_local, 2]
traj_pos[t_idx, j_idx] = start_angle + s * delta_angle
traj_vel[t_idx, j_idx] = s_dot * delta_angle
traj_acc[t_idx, j_idx] = s_ddot * delta_angle
# DYNAMICS KERNELS - FIXED SIGNATURES
@cuda.jit(**jit_kwargs)
def inverse_dynamics_kernel(
thetalist_trajectory,
dthetalist_trajectory,
ddthetalist_trajectory,
gravity_vector,
Ftip,
Glist,
Slist,
M,
torques_trajectory,
torque_limits,
) -> None:
"""
FIXED: Inverse dynamics kernel with correct 10-parameter signature.
Removed the problematic 'stream' parameter that was causing the mismatch.
Uses a simplified per-joint dynamics model (diagonal inertia, linear
Coriolis term, scalar gravity contribution) rather than full recursive
Newton-Euler.
Args:
thetalist_trajectory: (N, num_joints) device array of joint angles,
radians.
dthetalist_trajectory: (N, num_joints) device array of joint
velocities, radians/s.
ddthetalist_trajectory: (N, num_joints) device array of joint
accelerations, radians/s^2.
gravity_vector: (3,) device array, gravitational acceleration; only
the z component is used.
Ftip: External wrench at the tip (unused in this simplified kernel).
Glist: (num_joints, *, *) device array of spatial inertia matrices;
its diagonal supplies the effective inertia term.
Slist: (>=num_joints, >=num_joints) device array of screw axes; its
diagonal supplies the velocity-coupling term.
M: Home configuration matrix (unused in this simplified kernel).
torques_trajectory: (N, num_joints) device array, in-place output
buffer for computed joint torques, clamped to ``torque_limits``.
torque_limits: (num_joints, 2) device array of ``[min, max]`` torque
bounds per joint.
"""
t_idx = cuda.blockIdx.x * cuda.blockDim.x + cuda.threadIdx.x
j_idx = cuda.blockIdx.y * cuda.blockDim.y + cuda.threadIdx.y
if (
t_idx >= thetalist_trajectory.shape[0]
or j_idx >= thetalist_trajectory.shape[1]
):
return
# Load trajectory data
theta_j = thetalist_trajectory[t_idx, j_idx]
dtheta_j = dthetalist_trajectory[t_idx, j_idx]
ddtheta_j = ddthetalist_trajectory[t_idx, j_idx]
# Simplified dynamics computation with bounds checking
M_contrib = (
Glist[j_idx, j_idx, j_idx]
if (
j_idx < Glist.shape[0]
and j_idx < Glist.shape[1]
and j_idx < Glist.shape[2]
)
else 1.0
)
c_j = (
Slist[j_idx, j_idx] * dtheta_j
if (j_idx < Slist.shape[0] and j_idx < Slist.shape[1])
else 0.0
)
g_j = gravity_vector[2] * 0.1 if gravity_vector.shape[0] > 2 else 0.0
# Compute torque
tau = M_contrib * ddtheta_j + c_j + g_j
# Apply torque limits
if j_idx < torque_limits.shape[0]:
tau = max(torque_limits[j_idx, 0], min(tau, torque_limits[j_idx, 1]))
torques_trajectory[t_idx, j_idx] = tau
@cuda.jit(**jit_kwargs)
def forward_dynamics_kernel(
thetalist,
dthetalist,
taumat,
g,
Ftipmat,
dt,
intRes,
Glist,
Slist,
M,
thetamat,
dthetamat,
ddthetamat,
joint_limits,
) -> None:
"""Forward dynamics kernel.
Each thread integrates from the initial state up to its own ``t_idx``,
avoiding the temporal data race in the previous version (which read
``thetamat[t_idx-1]`` while parallel threads at lower ``t_idx`` may
not have written that row yet). Cost is O(t_idx * intRes) per
thread instead of O(intRes), but correctness no longer depends on
warp scheduling.
Uses a simplified per-joint dynamics model (diagonal inertia, scalar
gravity) rather than full recursive Newton-Euler.
Args:
thetalist: (num_joints,) device array of initial joint angles, radians.
dthetalist: (num_joints,) device array of initial joint velocities,
radians/s.
taumat: (N, num_joints) device array of applied joint torques per
time step; ``taumat[i]`` advances state into row ``i``.
g: (3,) device array, gravitational acceleration; only the z
component is used.
Ftipmat: External tip wrench per step (unused in this simplified
kernel).
dt: Time step between trajectory rows, seconds.
intRes: Number of Euler sub-integration steps per ``dt``.
Glist: (num_joints, *, *) device array of spatial inertia matrices;
its diagonal supplies the inverse-inertia term.
Slist: Screw axes (unused in this simplified kernel).
M: Home configuration matrix (unused in this simplified kernel).
thetamat: (N, num_joints) device array, in-place output buffer for
integrated joint angles, radians.
dthetamat: (N, num_joints) device array, in-place output buffer for
integrated joint velocities, radians/s.
ddthetamat: (N, num_joints) device array, in-place output buffer for
joint accelerations, radians/s^2.
joint_limits: (num_joints, 2) device array of ``[min, max]`` angle
limits used to clamp integrated positions.
"""
t_idx = cuda.blockIdx.x * cuda.blockDim.x + cuda.threadIdx.x
j_idx = cuda.blockIdx.y * cuda.blockDim.y + cuda.threadIdx.y
if t_idx >= taumat.shape[0] or j_idx >= thetalist.shape[0]:
return
current_theta = thetalist[j_idx]
current_dtheta = dthetalist[j_idx]
dt_step = dt / intRes
ddtheta = 0.0
if t_idx == 0:
thetamat[t_idx, j_idx] = current_theta
dthetamat[t_idx, j_idx] = current_dtheta
ddthetamat[t_idx, j_idx] = 0.0
return
# Walk from t=1 to this thread's t_idx, matching the CPU path's
# convention that row 0 is the initial state and taumat[i] advances
# state row i. This remains independent per thread.
for step in range(1, t_idx + 1):
tau = taumat[step, j_idx]
for _ in range(intRes):
M_inv = (
1.0 / Glist[j_idx, j_idx, j_idx]
if (
j_idx < Glist.shape[0]
and j_idx < Glist.shape[1]
and j_idx < Glist.shape[2]
and Glist[j_idx, j_idx, j_idx] != 0.0
)
else 1.0
)
g_force = g[2] * 0.1 if g.shape[0] > 2 else 0.0
ddtheta = (tau - g_force) * M_inv
current_dtheta += ddtheta * dt_step
current_theta += current_dtheta * dt_step
if j_idx < joint_limits.shape[0]:
current_theta = max(
joint_limits[j_idx, 0],
min(current_theta, joint_limits[j_idx, 1]),
)
thetamat[t_idx, j_idx] = current_theta
dthetamat[t_idx, j_idx] = current_dtheta
ddthetamat[t_idx, j_idx] = ddtheta
@cuda.jit(**jit_kwargs)
def cartesian_trajectory_kernel(
pstart, pend, traj_pos, traj_vel, traj_acc, Tf, N, method
) -> None:
"""Cartesian trajectory kernel.
Each thread computes its own time scaling (no shared memory) so the
scaling matches its own ``t_idx``. Quintic acceleration uses the
full ``60 tau (1 - tau) (1 - 2 tau) / Tf^2`` form, and the linear
method (1) is no longer silently zeroed.
Args:
pstart: (3,) device array, starting Cartesian position.
pend: (3,) device array, ending Cartesian position.
traj_pos: (N, 3) device array, in-place output buffer for Cartesian
positions.
traj_vel: (N, 3) device array, in-place output buffer for Cartesian
velocities.
traj_acc: (N, 3) device array, in-place output buffer for Cartesian
accelerations.
Tf: Total trajectory duration, seconds.
N: Number of trajectory time steps.
method: Time-scaling order: 3 cubic, 5 quintic, else linear.
"""
t_idx = cuda.blockIdx.x * cuda.blockDim.x + cuda.threadIdx.x
coord_idx = cuda.blockIdx.y * cuda.blockDim.y + cuda.threadIdx.y
if t_idx >= N or coord_idx >= 3:
return
tau = 0.0 if N <= 1 else t_idx / (N - 1.0)
if method == 3: # Cubic
s = 3.0 * tau * tau - 2.0 * tau * tau * tau
s_dot = 6.0 * tau * (1.0 - tau) / Tf
s_ddot = 6.0 / (Tf * Tf) * (1.0 - 2.0 * tau)
elif method == 5: # Quintic
tau2 = tau * tau
tau3 = tau2 * tau
tau4 = tau2 * tau2
tau5 = tau4 * tau
s = 10.0 * tau3 - 15.0 * tau4 + 6.0 * tau5
s_dot = 30.0 * tau2 * (1.0 - 2.0 * tau + tau2) / Tf
s_ddot = 60.0 * tau * (1.0 - tau) * (1.0 - 2.0 * tau) / (Tf * Tf)
else: # Linear (method == 1) and any other value
s = tau
s_dot = 1.0 / Tf
s_ddot = 0.0
dp = pend[coord_idx] - pstart[coord_idx]
traj_pos[t_idx, coord_idx] = s * dp + pstart[coord_idx]
traj_vel[t_idx, coord_idx] = s_dot * dp
traj_acc[t_idx, coord_idx] = s_ddot * dp
@cuda.jit(**jit_kwargs)
def fused_potential_gradient_kernel(
positions, goal, obstacles, potential, gradient, influence_distance
) -> None:
"""
FIXED: Fused potential gradient kernel with correct 6-parameter signature.
Removed the problematic 'stream' parameter.
Computes a combined attractive (toward goal) and repulsive (away from
obstacles within ``influence_distance``) potential field and its
gradient for each query position.
Args:
positions: (N, 3) device array of query point positions.
goal: (3,) device array, attractive goal position.
obstacles: (num_obstacles, 3) device array of obstacle positions.
potential: (N,) device array, in-place output buffer for the total
potential at each position.
gradient: (N, >=3) device array, in-place output buffer for the
potential gradient (x, y, z) at each position.
influence_distance: Repulsive influence radius; obstacles farther
than this contribute nothing.
"""
idx = cuda.grid(1)
if idx >= positions.shape[0]:
return
influence_distance_inv = (
1.0 / influence_distance if influence_distance > 0.0 else 0.0
)
# Load position
pos_x = positions[idx, 0]
pos_y = positions[idx, 1]
pos_z = positions[idx, 2]
# Attractive potential
diff_x = pos_x - goal[0]
diff_y = pos_y - goal[1]
diff_z = pos_z - goal[2]
attractive_pot = 0.5 * (diff_x * diff_x + diff_y * diff_y + diff_z * diff_z)
grad_x = diff_x
grad_y = diff_y
grad_z = diff_z
# Repulsive potential
repulsive_pot = 0.0
for obs in range(obstacles.shape[0]):
obs_diff_x = pos_x - obstacles[obs, 0]
obs_diff_y = pos_y - obstacles[obs, 1]
obs_diff_z = pos_z - obstacles[obs, 2]
dist_sq = (
obs_diff_x * obs_diff_x
+ obs_diff_y * obs_diff_y
+ obs_diff_z * obs_diff_z
)
if dist_sq > 0.0 and dist_sq < influence_distance * influence_distance:
# math.rsqrt is supported as a CUDA intrinsic in numba <=
# 0.59 but was dropped in 0.65. 1.0 / math.sqrt(...) is
# portable across all numba versions and lowers to the
# same PTX (rsqrt.approx.f32) under -ffast-math.
dist_inv = 1.0 / math.sqrt(dist_sq)
influence_term = dist_inv - influence_distance_inv
repulsive_term = 0.5 * influence_term * influence_term
repulsive_pot += repulsive_term
# ∇U_rep = (1/d - 1/d_0) * (-1/d^3) * (pos - obstacle).
# Force = -∇U_rep then points pos -> away_from_obstacle, which
# is what a repulsive potential field is meant to produce. The
# previous code dropped the leading minus, so the resulting
# gradient pulled the robot toward obstacles.
grad_factor = -influence_term * dist_inv * dist_inv * dist_inv
grad_x += grad_factor * obs_diff_x
grad_y += grad_factor * obs_diff_y
grad_z += grad_factor * obs_diff_z
potential[idx] = attractive_pot + repulsive_pot
if idx < gradient.shape[0] and gradient.shape[1] >= 3:
gradient[idx, 0] = grad_x
gradient[idx, 1] = grad_y
gradient[idx, 2] = grad_z
@cuda.jit(**jit_kwargs)
def batch_trajectory_kernel(
thetastart_batch, # (batch_size, num_joints)
thetaend_batch, # (batch_size, num_joints)
traj_pos_batch, # (batch_size, N, num_joints)
traj_vel_batch, # (batch_size, N, num_joints)
traj_acc_batch, # (batch_size, N, num_joints)
Tf,
N,
method,
batch_size,
) -> None:
"""Generate position, velocity, and acceleration for a batch of trajectories.
Args:
thetastart_batch: (batch_size, num_joints) device array of starting
joint angles, radians.
thetaend_batch: (batch_size, num_joints) device array of ending
joint angles, radians.
traj_pos_batch: (batch_size, N, num_joints) device array, in-place
output buffer for joint positions, radians.
traj_vel_batch: (batch_size, N, num_joints) device array, in-place
output buffer for joint velocities, radians/s.
traj_acc_batch: (batch_size, N, num_joints) device array, in-place
output buffer for joint accelerations, radians/s^2.
Tf: Total trajectory duration, seconds.
N: Number of trajectory time steps.
method: Time-scaling order: 3 cubic, 5 quintic, else linear.
batch_size: Number of trajectories in the batch.
"""
# Compute global indices
batch_idx = cuda.blockIdx.x * cuda.blockDim.x + cuda.threadIdx.x
t_idx = cuda.blockIdx.y * cuda.blockDim.y + cuda.threadIdx.y
j_idx = cuda.blockIdx.z * cuda.blockDim.z + cuda.threadIdx.z
# Bounds check—use shape[1] for num_joints
if batch_idx >= batch_size or t_idx >= N or j_idx >= thetastart_batch.shape[1]:
return
# Per-thread time-scaling computation. Previous version wrote scaling
# for thread (0,0,0)'s t_idx into shared memory and let every other
# thread read it — so threads at different t_idx got the wrong scaling.
tau = 0.0 if N <= 1 else t_idx / (N - 1.0)
if method == 3:
s = 3.0 * tau * tau - 2.0 * tau * tau * tau
s_dot = 6.0 * tau * (1.0 - tau) / Tf
s_ddot = 6.0 * (1.0 - 2.0 * tau) / (Tf * Tf)
elif method == 5:
tau2 = tau * tau
tau3 = tau2 * tau
tau4 = tau2 * tau2
tau5 = tau4 * tau
s = 10.0 * tau3 - 15.0 * tau4 + 6.0 * tau5
s_dot = 30.0 * tau2 * (1 - 2 * tau + tau2) / Tf
s_ddot = 60.0 * tau * (1 - tau) * (1 - 2 * tau) / (Tf * Tf)
else: # Linear (method == 1) and any other value
s = tau
s_dot = 1.0 / Tf
s_ddot = 0.0
# Compute delta for this trajectory
dtheta = thetaend_batch[batch_idx, j_idx] - thetastart_batch[batch_idx, j_idx]
# Write results
traj_pos_batch[batch_idx, t_idx, j_idx] = (
s * dtheta + thetastart_batch[batch_idx, j_idx]
)
traj_vel_batch[batch_idx, t_idx, j_idx] = s_dot * dtheta
traj_acc_batch[batch_idx, t_idx, j_idx] = s_ddot * dtheta
# MEMORY POOL MANAGEMENT
class _GlobalCudaMemoryPool:
"""Enhanced memory pool with size tracking and performance optimization."""
def __init__(self) -> None:
"""Initialize the memory pool with empty caches and statistics counters."""
self.pool = {}
self.max_pool_size = 200 # Increased for better caching
self.total_allocated = 0
self.cache_hits = 0
self.cache_misses = 0
def get_array(self, shape: Tuple[int, ...], dtype: Any = np.float32) -> Any:
"""Return a pooled device array of the given shape/dtype, allocating on a cache miss.
Args:
shape: Shape of the requested device array.
dtype: Element dtype of the requested array (default float32).
Returns:
A numba CUDA device array of the requested shape and dtype, reused
from the pool on a cache hit or freshly allocated otherwise.
"""
key = (shape, dtype)
if key in self.pool and len(self.pool[key]) > 0:
self.cache_hits += 1
return self.pool[key].pop()
else:
self.cache_misses += 1
self.total_allocated += np.prod(shape) * np.dtype(dtype).itemsize
return cuda.device_array(shape, dtype=dtype)
def return_array(self, array: Any) -> None:
"""Return a device array to the pool for reuse, up to the pool size limit.
Args:
array: Device array to return to the pool, keyed by its shape and
dtype. Dropped if the per-key pool is already at
``max_pool_size``.
"""
key = (array.shape, array.dtype)
if key not in self.pool:
self.pool[key] = []
if len(self.pool[key]) < self.max_pool_size:
self.pool[key].append(array)
def get_stats(self) -> Dict[str, Any]:
"""Return cache hit rate, total allocated memory, and per-shape pool sizes."""
total_requests = self.cache_hits + self.cache_misses
hit_rate = self.cache_hits / total_requests if total_requests > 0 else 0
return {
"cache_hit_rate": hit_rate,
"total_allocated_mb": self.total_allocated / (1024 * 1024),
"pool_sizes": {str(k): len(v) for k, v in self.pool.items()},
}
def clear(self) -> None:
"""Empty the pool and reset allocation and cache statistics."""
self.pool.clear()
self.total_allocated = 0
self.cache_hits = 0
self.cache_misses = 0
_cuda_memory_pool = _GlobalCudaMemoryPool()
def get_cuda_array(shape: Tuple[int, ...], dtype: Any = np.float32) -> Any:
"""Get optimized CUDA array from memory pool.
Args:
shape: Shape of the requested device array.
dtype: Element dtype of the requested array (default float32).
Returns:
A pooled or newly allocated numba CUDA device array.
"""
return _cuda_memory_pool.get_array(shape, dtype)
def return_cuda_array(array: Any) -> None:
"""Return CUDA array to memory pool.
Args:
array: Device array to release back into the shared memory pool for
later reuse.
"""
_cuda_memory_pool.return_array(array)
def get_memory_pool_stats() -> Dict[str, Any]:
"""Get memory pool performance statistics."""
return _cuda_memory_pool.get_stats()
# PERFORMANCE MONITORING
class CUDAPerformanceMonitor:
"""Advanced performance monitoring for CUDA kernels."""
def __init__(self) -> None:
"""Initialize empty kernel and memory statistics dictionaries."""
self.kernel_stats = {}
self.memory_stats = {}
def record_kernel_launch(
self,
kernel_name: str,
grid: Tuple[int, ...],
block: Tuple[int, ...],
shared_mem: int = 0,
) -> None:
"""Accumulate launch counts, block/thread totals, and shared memory for a kernel.
Args:
kernel_name: Identifier under which to aggregate statistics.
grid: Grid dimensions of the launch (1D or 2D tuple of block counts).
block: Block dimensions of the launch (threads per block per axis).
shared_mem: Bytes of dynamic shared memory used by the launch.
"""
if kernel_name not in self.kernel_stats:
self.kernel_stats[kernel_name] = {
"launches": 0,
"total_blocks": 0,
"total_threads": 0,
"total_shared_mem": 0,
}
stats = self.kernel_stats[kernel_name]
stats["launches"] += 1
stats["total_blocks"] += grid[0] * grid[1] if len(grid) > 1 else grid[0]
stats["total_threads"] += (
grid[0] * grid[1] * block[0] * block[1]
if len(grid) > 1
else grid[0] * block[0]
)
stats["total_shared_mem"] += shared_mem
def get_stats(self) -> Dict[str, Any]:
"""Return aggregated kernel launch statistics and memory pool statistics."""
return {
"kernel_stats": self.kernel_stats,
"memory_pool_stats": get_memory_pool_stats(),
}
_perf_monitor = CUDAPerformanceMonitor()
# KERNEL CONFIGURATION OPTIMIZATION
def get_optimal_kernel_config(
N: int, num_joints: int, kernel_type: str = "auto"
) -> Optional[Dict[str, Any]]:
"""
Automatically select optimal kernel and configuration for 40x+ speedup.
Args:
N: Number of trajectory points
num_joints: Number of joints
kernel_type: "auto", "standard", "vectorized", "memory_optimized",
"warp_optimized", or "cache_friendly"
Returns:
Configuration dictionary with kernel function and launch parameters
"""
if not CUDA_AVAILABLE:
return None
device = cuda.get_current_device()
sm_count = device.MULTIPROCESSOR_COUNT
max_threads = device.MAX_THREADS_PER_BLOCK
total_work = N * num_joints
# Auto-select kernel based on problem characteristics
if kernel_type == "auto":
if total_work < 50000:
kernel_type = "standard"
elif total_work < 500000:
kernel_type = "vectorized"
elif total_work < 2000000:
kernel_type = "memory_optimized"
else:
kernel_type = "warp_optimized"
# Configure based on selected kernel type
if kernel_type == "vectorized":
vector_size = 8
effective_N = (N + vector_size - 1) // vector_size
threads_x = min(256, max(32, effective_N))
threads_y = min(max_threads // threads_x, num_joints)
blocks_x = (effective_N + threads_x - 1) // threads_x
blocks_y = (num_joints + threads_y - 1) // threads_y
kernel_func = trajectory_kernel_vectorized
elif kernel_type == "memory_optimized":
threads_x = min(128, max(64, N // (sm_count * 2)))
threads_y = min(max_threads // threads_x, min(16, num_joints))
blocks_x = min(sm_count * 4, (N + threads_x - 1) // threads_x)
blocks_y = min(sm_count * 4, (num_joints + threads_y - 1) // threads_y)
kernel_func = trajectory_kernel_memory_optimized
elif kernel_type == "warp_optimized":
# Optimize for warp-level execution
threads_x = 32 # One warp
threads_y = min(max_threads // 32, num_joints)
blocks_x = (N + 31) // 32 # Each block processes 32 time steps
blocks_y = (num_joints + threads_y - 1) // threads_y
kernel_func = trajectory_kernel_warp_optimized
elif kernel_type == "cache_friendly":
# Use tile-based approach
threads_x = 64
threads_y = 8
blocks_x = (N + 63) // 64
blocks_y = (num_joints + 7) // 8
kernel_func = trajectory_kernel_cache_friendly
else: # standard
if num_joints <= 8:
threads_x, threads_y = 128, min(8, num_joints)
elif num_joints <= 16:
threads_x, threads_y = 64, min(16, num_joints)
else:
threads_x, threads_y = 32, min(32, num_joints)
while threads_x * threads_y > max_threads:
if threads_x > threads_y and threads_x > 32:
threads_x = max(32, threads_x - 32)
elif threads_y > 1:
threads_y -= 1
else:
break
blocks_x = (N + threads_x - 1) // threads_x
blocks_y = (num_joints + threads_y - 1) // threads_y
kernel_func = trajectory_kernel
# Calculate performance metrics
total_blocks = blocks_x * blocks_y
theoretical_occupancy = min(100, (total_blocks / (sm_count * 4)) * 100)
# Estimate performance potential
elements_per_sm = total_work / sm_count
expected_speedup_range = (20, 60) if elements_per_sm > 10000 else (5, 20)
return {
"kernel_func": kernel_func,
"kernel_type": kernel_type,
"grid": (blocks_x, blocks_y),
"block": (threads_x, threads_y),
"total_blocks": total_blocks,
"threads_per_block": threads_x * threads_y,
"theoretical_occupancy": theoretical_occupancy,
"expected_speedup_range": expected_speedup_range,
"elements_per_sm": elements_per_sm,
"recommended_for_40x": elements_per_sm > 10000,
}
# AUTO-TUNING FOR MAXIMUM PERFORMANCE
@lru_cache(maxsize=64)
def _best_2d_config(
N: int, J: int
) -> Tuple[Tuple[int, int], Tuple[int, int]]:
"""
Auto-tune 2D CUDA kernel launch configuration for optimal performance.
This function is maintained for backward compatibility with path_planning.py.
Args:
N: Number of trajectory time steps (X dimension).
J: Number of joints (Y dimension).
Returns:
Tuple[Tuple[int, int], Tuple[int, int]]: ``(grid, block)`` 2D launch
configuration. Returns ``((1, 1), (1, 1))`` when CUDA is unavailable.
"""
if not CUDA_AVAILABLE:
return ((1, 1), (1, 1))
# Use the optimized configuration function
config = get_optimal_kernel_config(N, J, "auto")
if config:
return config["grid"], config["block"]
# Fallback to basic configuration
return make_2d_grid(N, J)
@lru_cache(maxsize=64)
def _auto_tune_kernel_config(
N: int, num_joints: int
) -> Optional[Dict[str, Any]]:
"""Auto-tune kernel configuration for specific problem size.
Benchmarks each candidate kernel type on small test arrays and returns
the fastest configuration. Results are memoized via ``lru_cache``.
Args:
N: Number of trajectory time steps for the target problem.
num_joints: Number of joints for the target problem.
Returns:
Optional[Dict[str, Any]]: The best-performing kernel configuration
dict (as returned by ``get_optimal_kernel_config``), or None when
CUDA is unavailable.
"""
if not CUDA_AVAILABLE:
return None
configs_to_test = [
("standard", {}),
("vectorized", {}),
("memory_optimized", {}),
("warp_optimized", {}),
("cache_friendly", {}),
]
best_config = None
best_time = float("inf")
# Create small test arrays
test_N = min(N, 1000)
test_joints = min(num_joints, 8)
try:
d_start = cuda.device_array(test_joints, dtype=float32)
d_end = cuda.device_array(test_joints, dtype=float32)
d_pos = cuda.device_array((test_N, test_joints), dtype=float32)
d_vel = cuda.device_array((test_N, test_joints), dtype=float32)
d_acc = cuda.device_array((test_N, test_joints), dtype=float32)
for kernel_type, params in configs_to_test:
try:
config = get_optimal_kernel_config(test_N, test_joints, kernel_type)
if not config:
continue
kernel_func = config["kernel_func"]
grid = config["grid"]
block = config["block"]
# Warm-up
kernel_func[grid, block](
d_start, d_end, d_pos, d_vel, d_acc, 1.0, test_N, 3
)
cuda.synchronize()
# Timed run
start_time = perf_counter()
kernel_func[grid, block](
d_start, d_end, d_pos, d_vel, d_acc, 1.0, test_N, 3
)
cuda.synchronize()
elapsed = perf_counter() - start_time
if elapsed < best_time:
best_time = elapsed
best_config = config
except Exception:
continue
return best_config or get_optimal_kernel_config(N, num_joints, "standard")
except Exception:
return get_optimal_kernel_config(N, num_joints, "standard")
# HIGH-LEVEL OPTIMIZED FUNCTIONS
def optimized_trajectory_generation_monitored(
thetastart: Any,
thetaend: Any,
Tf: float,
N: int,
method: int,
use_pinned: bool = True,
kernel_type: str = "auto",
enable_monitoring: bool = True,
) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
"""
Generate trajectory with comprehensive performance monitoring.
This is the main function for achieving 40x+ speedups. Falls back to the
optimized CPU implementation if CUDA is unavailable or GPU execution
fails.
Args:
thetastart: (num_joints,) array-like of starting joint angles, radians.
thetaend: (num_joints,) array-like of ending joint angles, radians.
Tf: Total trajectory duration, seconds.
N: Number of trajectory time steps.
method: Time-scaling order: 3 cubic, 5 quintic, else linear.
use_pinned: If True, use pinned host memory for host/device transfers.
kernel_type: Kernel selection strategy: "auto", "auto_tune", or an
explicit kernel name ("standard", "vectorized",
"memory_optimized", "warp_optimized", "cache_friendly").
enable_monitoring: If True, log launch configuration and throughput
and record per-kernel launch statistics.
Returns:
Tuple[np.ndarray, np.ndarray, np.ndarray]: ``(traj_pos, traj_vel,
traj_acc)``, each an ``(N, num_joints)`` float32 ndarray of joint
positions (radians), velocities (radians/s), and accelerations
(radians/s^2).
"""
if not CUDA_AVAILABLE:
return trajectory_cpu_fallback(thetastart, thetaend, Tf, N, method)
num_joints = len(thetastart)
total_work = N * num_joints
# Performance recommendations
if total_work < 50000:
logger.warning(
f"Problem size {total_work:,} may not achieve 40x speedup. "
f"Recommend N ≥ {50000 // num_joints:,} for better GPU utilization."
)
try:
# Convert to optimal data types
thetastart_arr = np.ascontiguousarray(thetastart, dtype=np.float32)
thetaend_arr = np.ascontiguousarray(thetaend, dtype=np.float32)
# Get optimal configuration
if kernel_type == "auto_tune":
config = _auto_tune_kernel_config(N, num_joints)
else:
config = get_optimal_kernel_config(N, num_joints, kernel_type)
if not config:
raise RuntimeError("Failed to get kernel configuration")
if enable_monitoring:
logger.info(f"🚀 Using {config['kernel_type']} kernel:")
logger.info(f" Grid: {config['grid']}, Block: {config['block']}")
logger.info(f" Occupancy: {config['theoretical_occupancy']:.1f}%")
logger.info(
f" Expected speedup: {config['expected_speedup_range'][0]}-{config['expected_speedup_range'][1]}x"
)
if config.get("recommended_for_40x"):
logger.info(" ✅ Problem size optimal for 40x+ speedup!")
else:
logger.info(" ⚠️ Consider larger N for maximum speedup")
# Setup GPU memory with optimal transfers
if use_pinned:
d_thetastart = _h2d_pinned(thetastart_arr)
d_thetaend = _h2d_pinned(thetaend_arr)
else:
d_thetastart = cuda.to_device(thetastart_arr)
d_thetaend = cuda.to_device(thetaend_arr)
# Allocate output arrays
d_traj_pos = get_cuda_array((N, num_joints), dtype=np.float32)
d_traj_vel = get_cuda_array((N, num_joints), dtype=np.float32)
d_traj_acc = get_cuda_array((N, num_joints), dtype=np.float32)
try:
# Extract kernel configuration
kernel_func = config["kernel_func"]
grid = config["grid"]
block = config["block"]
# Record performance metrics
if enable_monitoring:
_perf_monitor.record_kernel_launch(
config["kernel_type"], grid, block
)
# Warm-up for large problems to eliminate JIT overhead
if total_work > 100000:
warm_N = min(1000, N)
warm_grid = ((warm_N + block[0] - 1) // block[0], grid[1])
kernel_func[warm_grid, block](
d_thetastart,
d_thetaend,
d_traj_pos,
d_traj_vel,
d_traj_acc,
Tf,
warm_N,
method,
)
cuda.synchronize()
# Main kernel launch - FIXED: Using 8 parameters instead of 9
start_time = perf_counter()
kernel_func[grid, block](
d_thetastart,
d_thetaend,
d_traj_pos,
d_traj_vel,
d_traj_acc,
Tf,
N,
method,
)
cuda.synchronize()
gpu_time = perf_counter() - start_time
# Copy results back with optimal memory transfer
if use_pinned:
# Use pinned host arrays for faster transfer
traj_pos_pinned = cuda.pinned_array(
(N, num_joints), dtype=np.float32
)
traj_vel_pinned = cuda.pinned_array(
(N, num_joints), dtype=np.float32
)
traj_acc_pinned = cuda.pinned_array(
(N, num_joints), dtype=np.float32
)
d_traj_pos.copy_to_host(traj_pos_pinned)
d_traj_vel.copy_to_host(traj_vel_pinned)
d_traj_acc.copy_to_host(traj_acc_pinned)
# Convert to regular numpy arrays
traj_pos = np.array(traj_pos_pinned)
traj_vel = np.array(traj_vel_pinned)
traj_acc = np.array(traj_acc_pinned)
else:
traj_pos = d_traj_pos.copy_to_host()
traj_vel = d_traj_vel.copy_to_host()
traj_acc = d_traj_acc.copy_to_host()
if enable_monitoring:
throughput = (
total_work / gpu_time / 1e6
) # Million elements per second
logger.info(f"⚡ GPU execution: {gpu_time*1000:.2f}ms")
logger.info(f"📊 Throughput: {throughput:.1f} M elements/sec")
return traj_pos, traj_vel, traj_acc
finally:
# Always return arrays to pool
return_cuda_array(d_traj_pos)
return_cuda_array(d_traj_vel)
return_cuda_array(d_traj_acc)
except Exception as e:
logger.warning(f"GPU trajectory generation failed: {e}")
logger.info("Falling back to optimized CPU implementation")
return trajectory_cpu_fallback(thetastart, thetaend, Tf, N, method)
def auto_select_optimal_kernel(N: int, num_joints: int) -> str:
"""
Automatically select the best kernel type for maximum performance.
Returns kernel type string for get_optimal_kernel_config().
Args:
N: Number of trajectory time steps.
num_joints: Number of joints.
Returns:
str: Kernel type name ("standard", "vectorized", "memory_optimized",
"warp_optimized", or "cache_friendly") chosen from the per-SM work
load and GPU multiprocessor count.
"""
total_work = N * num_joints
device_props = get_gpu_properties()
if not device_props:
return "standard"
sm_count = device_props["multiprocessor_count"]
elements_per_sm = total_work / sm_count
# Decision tree based on extensive benchmarking
if elements_per_sm < 1000:
return "standard" # Small problems
elif elements_per_sm < 10000:
return "vectorized" # Medium problems
elif elements_per_sm < 50000:
return "memory_optimized" # Large problems
elif sm_count >= 40: # High-end GPUs
return "warp_optimized"
else:
return "cache_friendly" # Memory-bound scenarios
# PROFILING AND BENCHMARKING UTILITIES
def profile_start() -> None:
"""Start CUDA profiling with enhanced monitoring."""
try:
cuda.profile_start()
_perf_monitor.kernel_stats.clear()
except Exception:
pass
def profile_stop() -> Dict[str, Any]:
"""Stop CUDA profiling and return statistics."""
try:
cuda.profile_stop()
return _perf_monitor.get_stats()
except Exception:
return {}
def benchmark_kernel_performance(
kernel_name: str, *args: Any, num_runs: int = 10, warmup_runs: int = 2
) -> Optional[Dict[str, Any]]:
"""Enhanced kernel benchmarking with detailed statistics.
Args:
kernel_name: Which high-level routine to benchmark: "trajectory",
"potential_field", or "batch_trajectory".
*args: Positional arguments forwarded to the selected routine.
num_runs: Number of timed runs to average over.
warmup_runs: Number of untimed warm-up runs to discard JIT/transfer
overhead.
Returns:
Optional[Dict[str, Any]]: Timing statistics (mean/avg, std, min, max,
median time in seconds, raw timings, and memory pool stats), or None
when CUDA is unavailable.
"""
if not CUDA_AVAILABLE:
print(f"Cannot benchmark {kernel_name} - CUDA not available")
return None
# Warmup runs
for _ in range(warmup_runs):
if kernel_name == "trajectory":
optimized_trajectory_generation_monitored(
*args, enable_monitoring=False
)
elif kernel_name == "potential_field":
optimized_potential_field(*args)
elif kernel_name == "batch_trajectory":
optimized_batch_trajectory_generation(*args)
cuda.synchronize()
# Timed runs
times = []
for _ in range(num_runs):
start = perf_counter()
if kernel_name == "trajectory":
result = optimized_trajectory_generation_monitored(
*args, enable_monitoring=False
)
elif kernel_name == "potential_field":
result = optimized_potential_field(*args)
elif kernel_name == "batch_trajectory":
result = optimized_batch_trajectory_generation(*args)
cuda.synchronize()
times.append(perf_counter() - start)
# Calculate statistics
times = np.array(times)
mean_time = float(np.mean(times))
stats = {
# avg_time is an alias for mean_time kept for compatibility
# with pre-v1.3.2 callers (and tests) that expected the
# "avg_time" key.
"avg_time": mean_time,
"mean_time": mean_time,
"std_time": float(np.std(times)),
"min_time": float(np.min(times)),
"max_time": float(np.max(times)),
"median_time": float(np.median(times)),
"all_times": times.tolist(),
"memory_pool_stats": get_memory_pool_stats(),
}
print(f"📊 {kernel_name} benchmark results ({num_runs} runs):")
print(
f" Mean: {stats['mean_time']*1000:.2f} ± {stats['std_time']*1000:.2f} ms"
)
print(
f" Range: {stats['min_time']*1000:.2f} - {stats['max_time']*1000:.2f} ms"
)
print(
f" Memory pool hit rate: {stats['memory_pool_stats']['cache_hit_rate']*100:.1f}%"
)
return stats
else:
# CPU-only fallback implementations
class _MockMemoryPool:
def get_array(self, *args: Any, **kwargs: Any) -> Any:
"""Stub that raises because the CUDA memory pool is unavailable on CPU."""
raise RuntimeError("CUDA memory pool not available")
def return_array(self, *args: Any, **kwargs: Any) -> None:
"""Stub that raises because the CUDA memory pool is unavailable on CPU."""
raise RuntimeError("CUDA memory pool not available")
def clear(self) -> None:
"""No-op stub for the unavailable CUDA memory pool."""
pass
def get_stats(self) -> Dict[str, Any]:
"""Return empty statistics for the unavailable CUDA memory pool."""
return {}
_cuda_memory_pool = _MockMemoryPool()
class CUDAPerformanceMonitor:
"""CPU-only no-op performance monitor used when CUDA is unavailable."""
def __init__(self) -> None:
"""No-op initializer for the CPU-only performance monitor stub."""
pass
def record_kernel_launch(self, *args: Any) -> None:
"""No-op stub since no CUDA kernels are launched on CPU."""
pass
def get_stats(self) -> Dict[str, Any]:
"""Return empty statistics for the CPU-only performance monitor stub."""
return {}
_perf_monitor = CUDAPerformanceMonitor()
# Mock functions for API compatibility
[docs]
def trajectory_kernel(*args: Any, **kwargs: Any) -> NoReturn:
"""Raise because the CUDA trajectory kernel is unavailable."""
raise RuntimeError("CUDA trajectory kernel not available")
def trajectory_kernel_vectorized(*args: Any, **kwargs: Any) -> NoReturn:
"""Raise because the CUDA vectorized trajectory kernel is unavailable."""
raise RuntimeError("CUDA vectorized trajectory kernel not available")
def trajectory_kernel_memory_optimized(*args: Any, **kwargs: Any) -> NoReturn:
"""Raise because the CUDA memory-optimized trajectory kernel is unavailable."""
raise RuntimeError("CUDA memory-optimized trajectory kernel not available")
def trajectory_kernel_warp_optimized(*args: Any, **kwargs: Any) -> NoReturn:
"""Raise because the CUDA warp-optimized trajectory kernel is unavailable."""
raise RuntimeError("CUDA warp-optimized trajectory kernel not available")
def trajectory_kernel_cache_friendly(*args: Any, **kwargs: Any) -> NoReturn:
"""Raise because the CUDA cache-friendly trajectory kernel is unavailable."""
raise RuntimeError("CUDA cache-friendly trajectory kernel not available")
[docs]
def inverse_dynamics_kernel(*args: Any, **kwargs: Any) -> NoReturn:
"""Raise because the CUDA inverse dynamics kernel is unavailable."""
raise RuntimeError("CUDA inverse dynamics kernel not available")
[docs]
def forward_dynamics_kernel(*args: Any, **kwargs: Any) -> NoReturn:
"""Raise because the CUDA forward dynamics kernel is unavailable."""
raise RuntimeError("CUDA forward dynamics kernel not available")
[docs]
def cartesian_trajectory_kernel(*args: Any, **kwargs: Any) -> NoReturn:
"""Raise because the CUDA Cartesian trajectory kernel is unavailable."""
raise RuntimeError("CUDA Cartesian trajectory kernel not available")
[docs]
def fused_potential_gradient_kernel(*args: Any, **kwargs: Any) -> NoReturn:
"""Raise because the CUDA potential field kernel is unavailable."""
raise RuntimeError("CUDA potential field kernel not available")
[docs]
def batch_trajectory_kernel(*args: Any, **kwargs: Any) -> NoReturn:
"""Raise because the CUDA batch trajectory kernel is unavailable."""
raise RuntimeError("CUDA batch trajectory kernel not available")
[docs]
def get_cuda_array(*args: Any, **kwargs: Any) -> NoReturn:
"""Raise because the CUDA memory pool is unavailable."""
raise RuntimeError("CUDA memory pool not available")
[docs]
def return_cuda_array(*args: Any, **kwargs: Any) -> NoReturn:
"""Raise because the CUDA memory pool is unavailable."""
raise RuntimeError("CUDA memory pool not available")
def get_memory_pool_stats() -> Dict[str, Any]:
"""Return empty memory-pool stats when CUDA is unavailable."""
return {}
def get_optimal_kernel_config(*args: Any, **kwargs: Any) -> Optional[Dict[str, Any]]:
"""Return no kernel configuration when CUDA is unavailable."""
return None
def auto_select_optimal_kernel(*args: Any, **kwargs: Any) -> str:
"""Report that no CUDA kernel can be selected."""
return "none"
def optimized_trajectory_generation_monitored(
*args: Any, **kwargs: Any
) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
"""Use the CPU trajectory fallback when CUDA is unavailable."""
return trajectory_cpu_fallback(args[0], args[1], args[2], args[3], args[4])
[docs]
def _best_2d_config(*args: Any, **kwargs: Any) -> Tuple[Tuple[int, int], Tuple[int, int]]:
"""Return a minimal launch shape when CUDA is unavailable."""
return ((1, 1), (1, 1))
[docs]
def profile_start() -> None:
"""No-op CUDA profiler start for CPU-only environments."""
pass
[docs]
def profile_stop() -> Dict[str, Any]:
"""Return empty CUDA profiler stats in CPU-only environments."""
return {}
# HIGH-LEVEL WRAPPER FUNCTIONS
[docs]
def optimized_trajectory_generation(
thetastart: Any,
thetaend: Any,
Tf: float,
N: int,
method: int,
use_pinned: bool = True,
kernel_type: str = "auto",
) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
"""
Main entry point for optimized trajectory generation.
This function automatically selects the best kernel and configuration
for maximum performance and 40x+ speedups.
Args:
thetastart, thetaend: Start and end joint angles
Tf: Final time
N: Number of trajectory points
method: Time scaling method (3=cubic, 5=quintic)
use_pinned: Use pinned memory for faster transfers
kernel_type: Kernel selection ("auto", "standard", "vectorized", etc.)
"""
return optimized_trajectory_generation_monitored(
thetastart,
thetaend,
Tf,
N,
method,
use_pinned,
kernel_type,
enable_monitoring=True,
)
[docs]
def optimized_potential_field(
positions: np.ndarray,
goal: np.ndarray,
obstacles: np.ndarray,
influence_distance: float,
use_pinned: bool = True,
) -> Tuple[np.ndarray, np.ndarray]:
"""Optimized potential field computation with CUDA acceleration.
Args:
positions: (N, 3) ndarray of query point positions.
goal: (3,) ndarray, attractive goal position.
obstacles: (num_obstacles, 3) ndarray of obstacle positions.
influence_distance: Repulsive influence radius; obstacles farther than
this contribute nothing.
use_pinned: If True, use pinned host memory for host-to-device transfers.
Returns:
Tuple[np.ndarray, np.ndarray]: ``(potential, gradient)`` where
``potential`` is an ``(N,)`` float32 array of total potential values and
``gradient`` is an ``(N, 3)`` float32 array of potential gradients.
Raises:
RuntimeError: If CUDA is not available.
"""
if not CUDA_AVAILABLE:
raise RuntimeError("CUDA not available for potential field computation")
N = positions.shape[0]
# The fused kernel indexes ``obstacles[obs, 0..2]``, so obstacles must be a
# 2-D ``(M, 3)`` array. Callers (e.g. the planner's no-obstacle path) may
# pass an empty list -> ``np.array([])`` which is 1-D ``(0,)``; a 1-D type
# breaks Numba's nopython type inference and aborts the kernel launch.
# Normalise to ``(M, 3)`` (empty -> ``(0, 3)``).
obstacles = np.ascontiguousarray(obstacles, dtype=np.float32).reshape(-1, 3)
# Use pinned memory for faster transfers
if use_pinned:
d_positions = _h2d_pinned(np.ascontiguousarray(positions, dtype=np.float32))
d_goal = _h2d_pinned(np.ascontiguousarray(goal, dtype=np.float32))
d_obstacles = _h2d_pinned(np.ascontiguousarray(obstacles, dtype=np.float32))
else:
d_positions = cuda.to_device(np.ascontiguousarray(positions, dtype=np.float32))
d_goal = cuda.to_device(np.ascontiguousarray(goal, dtype=np.float32))
d_obstacles = cuda.to_device(np.ascontiguousarray(obstacles, dtype=np.float32))
# Allocate output arrays
d_potential = get_cuda_array((N,), dtype=np.float32)
d_gradient = get_cuda_array((N, 3), dtype=np.float32)
try:
# Launch fused kernel - FIXED: Using 6 parameters instead of 7
grid, block = make_1d_grid(N)
fused_potential_gradient_kernel[grid, block](
d_positions,
d_goal,
d_obstacles,
d_potential,
d_gradient,
influence_distance,
)
# Copy results back
potential = d_potential.copy_to_host()
gradient = d_gradient.copy_to_host()
return potential, gradient
finally:
# Return arrays to pool
return_cuda_array(d_potential)
return_cuda_array(d_gradient)
[docs]
def optimized_batch_trajectory_generation(
thetastart_batch: np.ndarray,
thetaend_batch: np.ndarray,
Tf: float,
N: int,
method: int,
use_pinned: bool = True,
) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
"""Optimized batch trajectory generation for multiple trajectories.
Args:
thetastart_batch: (batch_size, num_joints) ndarray of starting joint
angles, radians.
thetaend_batch: (batch_size, num_joints) ndarray of ending joint angles,
radians.
Tf: Total trajectory duration, seconds.
N: Number of trajectory time steps.
method: Time-scaling order: 3 cubic, 5 quintic, else linear.
use_pinned: If True, use pinned host memory for host-to-device transfers.
Returns:
Tuple[np.ndarray, np.ndarray, np.ndarray]: ``(traj_pos_batch,
traj_vel_batch, traj_acc_batch)``, each a ``(batch_size, N, num_joints)``
float32 ndarray of joint positions (radians), velocities (radians/s), and
accelerations (radians/s^2).
Raises:
RuntimeError: If CUDA is not available.
"""
if not CUDA_AVAILABLE:
raise RuntimeError("CUDA not available for batch trajectory generation")
batch_size, num_joints = thetastart_batch.shape
# Use pinned memory for faster transfers
if use_pinned:
d_thetastart_batch = _h2d_pinned(
np.ascontiguousarray(thetastart_batch, dtype=np.float32)
)
d_thetaend_batch = _h2d_pinned(
np.ascontiguousarray(thetaend_batch, dtype=np.float32)
)
else:
d_thetastart_batch = cuda.to_device(
np.ascontiguousarray(thetastart_batch, dtype=np.float32)
)
d_thetaend_batch = cuda.to_device(
np.ascontiguousarray(thetaend_batch, dtype=np.float32)
)
# Allocate output arrays
d_traj_pos_batch = get_cuda_array((batch_size, N, num_joints), dtype=np.float32)
d_traj_vel_batch = get_cuda_array((batch_size, N, num_joints), dtype=np.float32)
d_traj_acc_batch = get_cuda_array((batch_size, N, num_joints), dtype=np.float32)
try:
# 3D grid for batch processing
grid = ((batch_size + 7) // 8, (N + 15) // 16, (num_joints + 7) // 8)
block = (8, 16, 8)
# Launch batch kernel - FIXED: Using 9 parameters instead of 10
batch_trajectory_kernel[grid, block](
d_thetastart_batch,
d_thetaend_batch,
d_traj_pos_batch,
d_traj_vel_batch,
d_traj_acc_batch,
Tf,
N,
method,
batch_size,
)
# Copy results back
traj_pos_batch = d_traj_pos_batch.copy_to_host()
traj_vel_batch = d_traj_vel_batch.copy_to_host()
traj_acc_batch = d_traj_acc_batch.copy_to_host()
return traj_pos_batch, traj_vel_batch, traj_acc_batch
finally:
# Return arrays to pool
return_cuda_array(d_traj_pos_batch)
return_cuda_array(d_traj_vel_batch)
return_cuda_array(d_traj_acc_batch)
# LEGACY COMPATIBILITY FUNCTIONS
[docs]
def attractive_potential_kernel(*args: Any, **kwargs: Any) -> NoReturn:
"""Legacy function - use fused_potential_gradient_kernel instead."""
raise RuntimeError(
"Legacy attractive_potential_kernel is deprecated.\n"
"Use fused_potential_gradient_kernel for better performance."
)
[docs]
def repulsive_potential_kernel(*args: Any, **kwargs: Any) -> NoReturn:
"""Legacy function - use fused_potential_gradient_kernel instead."""
raise RuntimeError(
"Legacy repulsive_potential_kernel is deprecated.\n"
"Use fused_potential_gradient_kernel for better performance."
)
[docs]
def gradient_kernel(*args: Any, **kwargs: Any) -> NoReturn:
"""Legacy function - use fused_potential_gradient_kernel instead."""
raise RuntimeError(
"Legacy gradient_kernel is deprecated.\n"
"Use fused_potential_gradient_kernel for better performance."
)
# PERFORMANCE UTILITIES
def print_performance_recommendations(N: int, num_joints: int) -> None:
"""Print recommendations for achieving 40x+ speedup.
Args:
N: Number of trajectory time steps in the target problem.
num_joints: Number of joints in the target problem.
"""
total_work = N * num_joints
print("🚀 ManipulaPy CUDA Performance Recommendations")
print("=" * 50)
print(f"Current problem size: {total_work:,} elements ({N:,} × {num_joints})")
if not CUDA_AVAILABLE:
print("❌ CUDA not available")
print("📋 To enable 40x+ speedups:")
print(" 1. Install NVIDIA GPU drivers: nvidia-smi")
print(" 2. Install CUDA toolkit (11.8+ or 12.0+)")
print(" 3. Install GPU support: pip install ManipulaPy[gpu-cuda12]")
return
device_props = get_gpu_properties()
if device_props:
sm_count = device_props["multiprocessor_count"]
elements_per_sm = total_work / sm_count
print(f"✅ GPU detected: {sm_count} SMs")
print(f"📊 Elements per SM: {elements_per_sm:,.0f}")
if elements_per_sm > 10000:
print("✅ Problem size OPTIMAL for 40x+ speedup!")
elif elements_per_sm > 1000:
print("⚠️ Good for 10-20x speedup. For 40x+:")
recommended_N = int(10000 * sm_count / num_joints)
print(f" 📈 Use N ≥ {recommended_N:,} trajectory points")
else:
print("⚠️ Problem too small for maximum speedup:")
min_N_for_40x = int(10000 * sm_count / num_joints)
print(f" 📈 For 40x speedup: N ≥ {min_N_for_40x:,}")
print(f" 📈 For 10x speedup: N ≥ {min_N_for_40x//10:,}")
print(f"\n💡 Optimization tips:")
print(f" 🔧 Use quintic trajectories (method=5) for more work per thread")
print(f" 🔧 Enable pinned memory (use_pinned=True)")
print(f" 🔧 Use batch processing for multiple trajectories")
print(f" 🔧 Enable auto-tuning (kernel_type='auto_tune')")
optimal_kernel = auto_select_optimal_kernel(N, num_joints)
print(f" 🎯 Recommended kernel: {optimal_kernel}")
def setup_cuda_environment_for_40x_speedup() -> None:
"""Setup CUDA environment variables for maximum performance."""
import os
print("🔧 Setting up CUDA environment for 40x+ speedup...")
# CUDA environment optimizations — setdefault so we never clobber a
# value the user (or a test harness) explicitly set, only fill in defaults.
os.environ.setdefault("CUDA_CACHE_DISABLE", "0") # Enable kernel caching
os.environ.setdefault("CUDA_LAUNCH_BLOCKING", "0") # Enable async execution
os.environ.setdefault("CUDA_DEVICE_ORDER", "PCI_BUS_ID") # Stable device ordering
# Numba optimizations
os.environ.setdefault("NUMBA_CUDA_CACHE_SIZE", "2048") # Larger cache
os.environ.setdefault("NUMBA_CUDA_LOW_OCCUPANCY_WARNINGS", "0") # Reduce warnings
if CUPY_AVAILABLE and CUDA_AVAILABLE:
try:
import cupy as cp
# Setup CuPy memory pool for optimal allocation.
mempool = cp.get_default_memory_pool()
mempool.set_limit(size=2**30) # 1GB limit
print("✅ CuPy memory pool configured")
except Exception as exc:
print(f"⚠️ CuPy memory pool not configured: {exc}")
print("✅ CUDA environment optimized for maximum performance")
# COMPREHENSIVE EXPORT LIST
__all__ = [
# Core availability checks
"CUDA_AVAILABLE",
"CUPY_AVAILABLE",
"check_cuda_availability",
"check_cupy_availability",
# Standard kernels
"trajectory_kernel",
"inverse_dynamics_kernel",
"forward_dynamics_kernel",
"cartesian_trajectory_kernel",
"fused_potential_gradient_kernel",
"batch_trajectory_kernel",
# Advanced kernels for 40x+ speedup
"trajectory_kernel_vectorized",
"trajectory_kernel_memory_optimized",
"trajectory_kernel_warp_optimized",
"trajectory_kernel_cache_friendly",
# High-level optimized functions
"optimized_trajectory_generation",
"optimized_trajectory_generation_monitored",
"optimized_potential_field",
"optimized_batch_trajectory_generation",
# Kernel configuration and optimization
"get_optimal_kernel_config",
"auto_select_optimal_kernel",
"_best_2d_config",
# Memory management
"get_cuda_array",
"return_cuda_array",
"get_memory_pool_stats",
# Performance monitoring
"CUDAPerformanceMonitor",
"profile_start",
"profile_stop",
"benchmark_kernel_performance",
# Grid configuration utilities (backward compatibility)
"make_1d_grid",
"make_2d_grid",
"make_2d_grid_optimized",
"get_gpu_properties",
"trajectory_cpu_fallback",
# Performance optimization helpers
"print_performance_recommendations",
"setup_cuda_environment_for_40x_speedup",
# Legacy compatibility
"attractive_potential_kernel",
"repulsive_potential_kernel",
"gradient_kernel",
]