Optimized Path Planning User Guide ===================================== This guide covers the highly optimized trajectory planning capabilities in ManipulaPy, including adaptive GPU/CPU execution, memory pooling, batch processing, and advanced performance optimizations for robotic manipulators. Introduction -------------- The optimized `TrajectoryPlanning` class (now `OptimizedTrajectoryPlanning`) provides comprehensive trajectory generation and execution capabilities with significant performance improvements through CUDA acceleration and intelligent adaptive execution strategies. **Key Optimizations:** - **Adaptive GPU/CPU execution** based on problem size and hardware availability - **Memory pooling** to reduce allocation overhead and improve performance - **Batch processing** for multiple trajectories with optimized kernel launches - **Fused kernels** to minimize memory bandwidth requirements - **Intelligent fallback** to CPU when beneficial for small problems - **2D parallelization** for better GPU utilization - **Pinned memory transfers** for faster host-device communication **Enhanced Features:** - Automatic performance tuning and statistics collection - Smart threshold adaptation based on execution patterns - Comprehensive error handling and graceful degradation - Advanced profiling and benchmarking capabilities - Backward compatibility with existing `TrajectoryPlanning` interface Mathematical Background ~~~~~~~~~~~~~~~~~~~~~~~~~~ The mathematical foundation remains the same as the original implementation, with optimizations focused on computational efficiency rather than algorithmic changes. The core time-scaling functions, dynamics computations, and collision avoidance methods are preserved while being accelerated through parallel execution. **Key Computational Optimizations:** 1. **Parallel Time-Scaling**: Joint trajectory computation parallelized across both time steps and joints using 2D CUDA grids 2. **Vectorized Operations**: All mathematical operations vectorized for SIMD execution on both CPU and GPU 3. **Memory Coalescing**: Data layouts optimized for efficient memory access patterns 4. **Kernel Fusion**: Multiple operations combined into single kernel launches to reduce overhead Getting Started ------------------ Basic Setup with Optimizations ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ .. code-block:: python from ManipulaPy.path_planning import OptimizedTrajectoryPlanning, create_optimized_planner from ManipulaPy.urdf_processor import URDFToSerialManipulator import numpy as np # Load robot model processor = URDFToSerialManipulator("robot.urdf") robot = processor.serial_manipulator dynamics = processor.dynamics # Define joint and torque limits joint_limits = [(-np.pi, np.pi)] * 6 # 6-DOF robot torque_limits = [(-50, 50)] * 6 # ±50 N⋅m per joint # Method 1: Use factory function with auto-optimization planner = create_optimized_planner( serial_manipulator=robot, urdf_path="robot.urdf", dynamics=dynamics, joint_limits=joint_limits, torque_limits=torque_limits, gpu_memory_mb=512, # Allocate 512MB GPU memory pool enable_profiling=True # Enable performance profiling ) # Method 2: Direct instantiation with custom settings planner_custom = OptimizedTrajectoryPlanning( serial_manipulator=robot, urdf_path="robot.urdf", dynamics=dynamics, joint_limits=joint_limits, torque_limits=torque_limits, use_cuda=None, # Auto-detect (None/True/False) cuda_threshold=100, # Min problem size for GPU memory_pool_size_mb=256, # GPU memory pool size enable_profiling=False # Disable profiling for production ) print(f"CUDA available: {planner.cuda_available}") print(f"GPU properties: {planner.gpu_properties}") Backward Compatibility ~~~~~~~~~~~~~~~~~~~~~ The optimized planner maintains full backward compatibility: .. code-block:: python # Existing code works unchanged - automatically uses optimizations from ManipulaPy.path_planning import TrajectoryPlanning # This now creates an OptimizedTrajectoryPlanning instance planner = TrajectoryPlanning( serial_manipulator=robot, urdf_path="robot.urdf", dynamics=dynamics, joint_limits=joint_limits, torque_limits=torque_limits ) # All existing methods work exactly the same trajectory = planner.joint_trajectory( theta_start, theta_end, Tf=2.0, N=100, method=3 ) Performance-Optimized Methods ---------------------------- joint_trajectory() with Adaptive Execution ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ The optimized `joint_trajectory()` method automatically selects the best execution strategy: .. code-block:: python def optimized_trajectory_demo(): """Demonstrate adaptive trajectory generation with performance monitoring.""" # Test different problem sizes test_cases = [ {"N": 50, "name": "Small (CPU preferred)"}, {"N": 500, "name": "Medium (GPU beneficial)"}, {"N": 5000, "name": "Large (GPU optimal)"} ] theta_start = np.zeros(6) theta_end = np.array([0.8, -0.5, 0.3, -0.2, 0.6, -0.4]) for case in test_cases: print(f"\n=== {case['name']} ===") # Reset performance stats planner.reset_performance_stats() # Generate trajectory start_time = time.time() trajectory = planner.joint_trajectory( theta_start, theta_end, Tf=2.0, N=case['N'], method=5 ) elapsed = time.time() - start_time # Get performance statistics stats = planner.get_performance_stats() print(f"Points generated: {trajectory['positions'].shape}") print(f"Execution time: {elapsed:.4f}s") print(f"Used GPU: {stats['gpu_calls'] > 0}") print(f"GPU usage: {stats['gpu_usage_percent']:.1f}%") if stats['gpu_calls'] > 0: print(f"Avg GPU time: {stats['avg_gpu_time']:.4f}s") if stats['cpu_calls'] > 0: print(f"Avg CPU time: {stats['avg_cpu_time']:.4f}s") return trajectory # Run demonstration demo_trajectory = optimized_trajectory_demo() batch_joint_trajectory() for Multiple Trajectories ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Process multiple trajectories simultaneously with optimized batch kernels: .. code-block:: python def batch_trajectory_demo(): """Demonstrate high-performance batch trajectory generation.""" # Generate multiple start/end configurations batch_size = 20 num_joints = 6 # Random start and end configurations np.random.seed(42) # For reproducible results thetastart_batch = np.random.uniform(-1.0, 1.0, (batch_size, num_joints)) thetaend_batch = np.random.uniform(-1.0, 1.0, (batch_size, num_joints)) print(f"Generating {batch_size} trajectories in batch...") # Reset stats for clean measurement planner.reset_performance_stats() # Generate batch trajectories start_time = time.time() batch_trajectories = planner.batch_joint_trajectory( thetastart_batch=thetastart_batch, thetaend_batch=thetaend_batch, Tf=3.0, N=200, method=5 ) batch_time = elapsed = time.time() - start_time print(f"Batch processing completed:") print(f"- Total time: {batch_time:.4f}s") print(f"- Time per trajectory: {batch_time/batch_size:.4f}s") print(f"- Output shape: {batch_trajectories['positions'].shape}") # Compare with sequential processing print(f"\nComparing with sequential processing...") planner.reset_performance_stats() start_time = time.time() sequential_trajectories = [] for i in range(batch_size): traj = planner.joint_trajectory( thetastart_batch[i], thetaend_batch[i], Tf=3.0, N=200, method=5 ) sequential_trajectories.append(traj) sequential_time = time.time() - start_time speedup = sequential_time / batch_time print(f"- Sequential time: {sequential_time:.4f}s") print(f"- Batch speedup: {speedup:.2f}x") # Verify results are equivalent sequential_positions = np.array([t['positions'] for t in sequential_trajectories]) max_diff = np.max(np.abs(batch_trajectories['positions'] - sequential_positions)) print(f"- Max difference: {max_diff:.2e} (should be ~0)") return batch_trajectories, speedup # Run batch demonstration batch_trajs, speedup = batch_trajectory_demo() Advanced Performance Features ---------------------------- Memory Pool Management ~~~~~~~~~~~~~~~~~~~~~ Optimize memory allocation for better performance: .. code-block:: python def memory_optimization_demo(): """Demonstrate memory pool optimization for sustained performance.""" print("Memory Pool Optimization Demo") print("=" * 40) # Create planner with large memory pool large_pool_planner = OptimizedTrajectoryPlanning( serial_manipulator=robot, urdf_path="robot.urdf", dynamics=dynamics, joint_limits=joint_limits, memory_pool_size_mb=1024, # 1GB memory pool enable_profiling=True ) # Test sustained performance with many trajectories num_iterations = 50 trajectory_sizes = [100, 500, 1000, 2000] print(f"Testing {num_iterations} iterations for each size...") for N in trajectory_sizes: print(f"\nTesting N={N}:") # Reset stats large_pool_planner.reset_performance_stats() times = [] for i in range(num_iterations): # Generate random trajectory theta_start = np.random.uniform(-1, 1, 6) theta_end = np.random.uniform(-1, 1, 6) start_time = time.time() traj = large_pool_planner.joint_trajectory( theta_start, theta_end, Tf=2.0, N=N, method=5 ) times.append(time.time() - start_time) # Analyze performance stability times = np.array(times) stats = large_pool_planner.get_performance_stats() print(f" Mean time: {np.mean(times):.4f}s ± {np.std(times):.4f}s") print(f" Min/Max: {np.min(times):.4f}s / {np.max(times):.4f}s") print(f" GPU usage: {stats['gpu_usage_percent']:.1f}%") print(f" Memory transfers: {stats['memory_transfers']}") print(f" Kernel launches: {stats['kernel_launches']}") # Clean up memory pool large_pool_planner.cleanup_gpu_memory() return times # Run memory optimization demo memory_times = memory_optimization_demo() Performance Benchmarking ~~~~~~~~~~~~~~~~~~~~~~~ Built-in benchmarking capabilities for performance analysis: .. code-block:: python def comprehensive_benchmark(): """Run comprehensive performance benchmarks.""" print("Comprehensive Performance Benchmark") print("=" * 50) # Test 1: Built-in benchmark print("Running built-in benchmarks...") benchmark_results = planner.benchmark_performance() for test_name, result in benchmark_results.items(): print(f"\n{test_name} Test:") print(f" Problem size: {result['N']} × {result['joints']}") print(f" Total time: {result['total_time']:.4f}s") print(f" Used GPU: {result['used_gpu']}") print(f" Output shape: {result['trajectory_shape']}") if 'stats' in result: stats = result['stats'] print(f" GPU calls: {stats['gpu_calls']}") print(f" CPU calls: {stats['cpu_calls']}") # Test 2: Implementation comparison print(f"\n" + "=" * 50) print("Comparing CPU vs GPU implementations...") comparison_results = compare_implementations( serial_manipulator=robot, urdf_path="robot.urdf", dynamics=dynamics, joint_limits=joint_limits, test_params={"N": 2000, "Tf": 3.0, "method": 5} ) print("\nCPU Implementation:") cpu_result = comparison_results['cpu'] print(f" Time: {cpu_result['time']:.4f}s") print(f" Shape: {cpu_result['result_shape']}") gpu_result = comparison_results.get('gpu', {}) if gpu_result.get('available', True): print("\nGPU Implementation:") print(f" Time: {gpu_result['time']:.4f}s") print(f" Shape: {gpu_result['result_shape']}") print(f" Speedup: {gpu_result['speedup']:.2f}x") if 'accuracy' in comparison_results: acc = comparison_results['accuracy'] print("\nAccuracy Comparison:") print(f" Max position diff: {acc['max_pos_diff']:.2e}") print(f" Max velocity diff: {acc['max_vel_diff']:.2e}") print(f" Max acceleration diff: {acc['max_acc_diff']:.2e}") else: print("\nGPU Implementation: Not available") return benchmark_results, comparison_results # Run comprehensive benchmark bench_results, comp_results = comprehensive_benchmark() Optimized Dynamics Integration ----------------------------- Enhanced inverse_dynamics_trajectory() ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ GPU-accelerated dynamics computation with optimized memory management: .. code-block:: python def optimized_dynamics_demo(): """Demonstrate optimized dynamics computation with performance analysis.""" # Generate a complex trajectory theta_start = np.array([0.1, 0.2, -0.3, 0.1, 0.5, -0.2]) theta_end = np.array([0.8, -0.4, 0.6, -0.3, 0.2, 0.7]) # Large trajectory for performance testing N = 2000 # 2000 points Tf = 5.0 # 5 seconds print("Generating large trajectory for dynamics analysis...") trajectory = planner.joint_trajectory( theta_start, theta_end, Tf=Tf, N=N, method=5 ) print(f"Trajectory generated: {trajectory['positions'].shape}") # Test dynamics computation performance print("\nComputing inverse dynamics...") planner.reset_performance_stats() start_time = time.time() torques = planner.inverse_dynamics_trajectory( trajectory['positions'], trajectory['velocities'], trajectory['accelerations'], gravity_vector=[0, 0, -9.81], Ftip=[0, 0, 0, 0, 0, 0] ) dynamics_time = time.time() - start_time print(f"Dynamics computation completed:") print(f"- Time: {dynamics_time:.4f}s") print(f"- Rate: {N/dynamics_time:.1f} points/second") print(f"- Torque shape: {torques.shape}") # Analyze torque statistics max_torques = np.max(np.abs(torques), axis=0) mean_torques = np.mean(np.abs(torques), axis=0) print(f"\nTorque Analysis:") for i, (max_t, mean_t) in enumerate(zip(max_torques, mean_torques)): limit = planner.torque_limits[i, 1] usage = max_t / limit * 100 print(f" Joint {i+1}: Max {max_t:.1f} N⋅m ({usage:.1f}% of limit), Mean {mean_t:.1f} N⋅m") # Get performance stats stats = planner.get_performance_stats() print(f"\nPerformance Stats:") print(f"- GPU usage: {stats['gpu_usage_percent']:.1f}%") print(f"- Kernel launches: {stats['kernel_launches']}") print(f"- Memory transfers: {stats['memory_transfers']}") return torques, dynamics_time # Run optimized dynamics demo demo_torques, demo_time = optimized_dynamics_demo() Enhanced forward_dynamics_trajectory() ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Optimized forward dynamics simulation with adaptive execution: .. code-block:: python def optimized_forward_dynamics_demo(): """Demonstrate optimized forward dynamics simulation.""" # Initial conditions theta_initial = np.array([0.1, 0.2, -0.1, 0.0, 0.3, 0.0]) theta_dot_initial = np.zeros(6) # Define control sequence - sinusoidal torques N_steps = 1000 dt = 0.01 time_steps = np.arange(N_steps) * dt # Generate realistic control torques tau_matrix = np.zeros((N_steps, 6)) for i in range(6): frequency = 0.5 + i * 0.2 # Different frequency for each joint amplitude = 2.0 + i * 0.5 # Different amplitude for each joint tau_matrix[:, i] = amplitude * np.sin(2 * np.pi * frequency * time_steps) # External forces (varying) Ftip_matrix = np.zeros((N_steps, 6)) Ftip_matrix[:, 2] = 10.0 * np.sin(2 * np.pi * 0.2 * time_steps) # Vertical force print("Running optimized forward dynamics simulation...") print(f"Steps: {N_steps}, dt: {dt}s, Total time: {N_steps*dt}s") # Reset performance tracking planner.reset_performance_stats() # Run simulation start_time = time.time() sim_result = planner.forward_dynamics_trajectory( thetalist=theta_initial, dthetalist=theta_dot_initial, taumat=tau_matrix, g=[0, 0, -9.81], Ftipmat=Ftip_matrix, dt=dt, intRes=1 ) simulation_time = time.time() - start_time print(f"Simulation completed:") print(f"- Computation time: {simulation_time:.4f}s") print(f"- Real-time factor: {(N_steps*dt)/simulation_time:.1f}x") print(f"- Position shape: {sim_result['positions'].shape}") # Analyze results final_positions = sim_result['positions'][-1] max_velocities = np.max(np.abs(sim_result['velocities']), axis=0) max_accelerations = np.max(np.abs(sim_result['accelerations']), axis=0) print(f"\nSimulation Analysis:") print(f"Final positions: {np.degrees(final_positions).round(1)} deg") print(f"Max velocities: {max_velocities.round(2)} rad/s") print(f"Max accelerations: {max_accelerations.round(2)} rad/s²") # Check joint limit compliance positions = sim_result['positions'] limit_violations = 0 for i in range(6): min_pos = np.min(positions[:, i]) max_pos = np.max(positions[:, i]) if min_pos < planner.joint_limits[i, 0] or max_pos > planner.joint_limits[i, 1]: limit_violations += 1 print(f" Joint {i+1}: LIMIT VIOLATION ({np.degrees([min_pos, max_pos]).round(1)} deg)") if limit_violations == 0: print(" All joints stayed within limits ✓") # Performance stats stats = planner.get_performance_stats() print(f"\nPerformance Stats:") print(f"- Used GPU: {stats['gpu_calls'] > 0}") print(f"- Execution strategy: {'GPU' if stats['gpu_calls'] > 0 else 'CPU'}") return sim_result, simulation_time # Run forward dynamics demonstration sim_results, sim_time = optimized_forward_dynamics_demo() Optimized Cartesian Trajectories ------------------------------- Enhanced cartesian_trajectory() ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ GPU-accelerated Cartesian trajectory generation with adaptive execution: .. code-block:: python def optimized_cartesian_demo(): """Demonstrate optimized Cartesian trajectory generation.""" # Define complex Cartesian trajectory X_start = np.eye(4) X_start[:3, 3] = [0.3, 0.2, 0.5] # Start position # End pose with significant rotation and translation X_end = np.eye(4) X_end[:3, 3] = [0.6, -0.3, 0.3] # End position # 90-degree rotation about Z-axis angle = np.pi/2 X_end[:3, :3] = np.array([ [np.cos(angle), -np.sin(angle), 0], [np.sin(angle), np.cos(angle), 0], [0, 0, 1] ]) # Test different trajectory sizes test_sizes = [100, 500, 2000, 5000] print("Optimized Cartesian Trajectory Generation") print("=" * 45) for N in test_sizes: print(f"\nTesting N={N} points:") # Reset performance stats planner.reset_performance_stats() # Generate trajectory start_time = time.time() cart_traj = planner.cartesian_trajectory( X_start, X_end, Tf=3.0, N=N, method=5 ) elapsed = time.time() - start_time # Analyze results positions = cart_traj['positions'] velocities = cart_traj['velocities'] accelerations = cart_traj['accelerations'] orientations = cart_traj['orientations'] # Calculate path metrics path_length = np.sum(np.linalg.norm(np.diff(positions, axis=0), axis=1)) max_velocity = np.max(np.linalg.norm(velocities, axis=1)) max_acceleration = np.max(np.linalg.norm(accelerations, axis=1)) # Performance stats stats = planner.get_performance_stats() used_gpu = stats['gpu_calls'] > 0 print(f" Time: {elapsed:.4f}s ({'GPU' if used_gpu else 'CPU'})") print(f" Path length: {path_length:.3f}m") print(f" Max velocity: {max_velocity:.3f}m/s") print(f" Max acceleration: {max_acceleration:.3f}m/s²") print(f" Shapes: pos{positions.shape}, vel{velocities.shape}, acc{accelerations.shape}") # Verify start and end points start_error = np.linalg.norm(positions[0] - X_start[:3, 3]) end_error = np.linalg.norm(positions[-1] - X_end[:3, 3]) print(f" Start/End errors: {start_error:.2e}, {end_error:.2e}") # Return the largest trajectory for visualization final_traj = planner.cartesian_trajectory(X_start, X_end, Tf=3.0, N=1000, method=5) return final_traj # Run Cartesian trajectory demonstration cartesian_demo = optimized_cartesian_demo() Real-World Application Examples ------------------------------ High-Performance Pick-and-Place ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Optimized trajectory planning for industrial pick-and-place operations: .. code-block:: python def optimized_pick_and_place(): """Demonstrate optimized pick-and-place trajectory planning.""" print("Optimized Pick-and-Place Trajectory Planning") print("=" * 50) # Define task parameters pick_location = np.array([0.4, 0.3, 0.2]) place_location = np.array([0.6, -0.2, 0.25]) approach_height = 0.1 # 10cm above objects # Calculate waypoint poses home_pose = np.eye(4) home_pose[:3, 3] = [0.5, 0.0, 0.4] pick_approach = np.eye(4) pick_approach[:3, 3] = pick_location + np.array([0, 0, approach_height]) pick_pose = np.eye(4) pick_pose[:3, 3] = pick_location place_approach = np.eye(4) place_approach[:3, 3] = place_location + np.array([0, 0, approach_height]) place_pose = np.eye(4) place_pose[:3, 3] = place_location # Convert to joint space using inverse kinematics waypoint_poses = [home_pose, pick_approach, pick_pose, pick_approach, place_approach, place_pose, place_approach, home_pose] waypoint_joints = [] current_joints = np.zeros(6) # Start from home print("Converting Cartesian waypoints to joint space...") for i, pose in enumerate(waypoint_poses): try: joints, success, _ = planner.serial_manipulator.iterative_inverse_kinematics( pose, current_joints, max_iterations=200 ) if success: waypoint_joints.append(joints) current_joints = joints print(f" Waypoint {i+1}: ✓") else: print(f" Waypoint {i+1}: Failed IK, using approximation") waypoint_joints.append(current_joints) except Exception as e: print(f" Waypoint {i+1}: Error {e}") waypoint_joints.append(current_joints) # Define segment durations (optimized for speed) segment_durations = [1.5, 0.8, 0.5, 1.0, 2.0, 0.5, 0.8, 1.5] # seconds segment_names = [ "Move to pick approach", "Approach object", "Pick up", "Lift object", "Move to place approach", "Lower to place", "Place object", "Return home" ] # Generate optimized batch trajectory print(f"\nGenerating {len(segment_names)} trajectory segments...") # Prepare batch data batch_starts = waypoint_joints[:-1] batch_ends = waypoint_joints[1:] batch_size = len(batch_starts) # Use different point densities for different segments points_per_segment = [75, 40, 25, 50, 100, 25, 40, 75] all_segments = [] total_computation_time = 0 for i, (start, end, duration, points, name) in enumerate( zip(batch_starts, batch_ends, segment_durations, points_per_segment, segment_names) ): print(f" {i+1}. {name} ({duration}s, {points} points)") planner.reset_performance_stats() start_time = time.time() segment = planner.joint_trajectory( start, end, Tf=duration, N=points, method=5 ) segment_time = time.time() - start_time total_computation_time += segment_time stats = planner.get_performance_stats() used_gpu = stats['gpu_calls'] > 0 print(f" Time: {segment_time:.3f}s ({'GPU' if used_gpu else 'CPU'})") all_segments.append({ 'name': name, 'duration': duration, 'trajectory': segment, 'computation_time': segment_time, 'used_gpu': used_gpu }) # Combine all segments print(f"\nCombining trajectory segments...") combined_positions = [] combined_velocities = [] combined_accelerations = [] for i, segment in enumerate(all_segments): traj = segment['trajectory'] if i == 0: # Include all points for first segment combined_positions.extend(traj['positions']) combined_velocities.extend(traj['velocities']) combined_accelerations.extend(traj['accelerations']) else: # Skip first point to avoid duplication combined_positions.extend(traj['positions'][1:]) combined_velocities.extend(traj['velocities'][1:]) combined_accelerations.extend(traj['accelerations'][1:]) # Convert to arrays combined_trajectory = { 'positions': np.array(combined_positions), 'velocities': np.array(combined_velocities), 'accelerations': np.array(combined_accelerations) } total_duration = sum(segment_durations) total_points = combined_trajectory['positions'].shape[0] print(f"\nPick-and-Place Trajectory Generated:") print(f"- Total duration: {total_duration:.1f}s") print(f"- Total points: {total_points}") print(f"- Computation time: {total_computation_time:.3f}s") print(f"- Real-time factor: {total_duration/total_computation_time:.1f}x") # Analyze trajectory for safety and performance print(f"\nTrajectory Analysis:") # Check joint limits compliance positions = combined_trajectory['positions'] velocities = combined_trajectory['velocities'] accelerations = combined_trajectory['accelerations'] for i in range(6): joint_range = [np.min(positions[:, i]), np.max(positions[:, i])] limit_range = planner.joint_limits[i] if joint_range[0] < limit_range[0] or joint_range[1] > limit_range[1]: print(f" Joint {i+1}: ⚠️ NEAR LIMITS {np.degrees(joint_range).round(1)}° " f"(limits: {np.degrees(limit_range).round(1)}°)") else: margin = min(joint_range[0] - limit_range[0], limit_range[1] - joint_range[1]) print(f" Joint {i+1}: ✓ Safe margin: {np.degrees(margin).round(1)}°") # Velocity and acceleration analysis max_vel = np.max(np.abs(velocities), axis=0) max_acc = np.max(np.abs(accelerations), axis=0) print(f"\nMotion Analysis:") print(f" Max velocities: {max_vel.round(3)} rad/s") print(f" Max accelerations: {max_acc.round(3)} rad/s²") return combined_trajectory, all_segments # Run optimized pick-and-place demonstration pick_place_traj, segments = optimized_pick_and_place() Multi-Robot Trajectory Coordination ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Optimized trajectory planning for multiple robots with collision avoidance: .. code-block:: python def multi_robot_coordination(): """Demonstrate optimized multi-robot trajectory coordination.""" print("Multi-Robot Trajectory Coordination") print("=" * 40) # Simulate 4 robots working in shared workspace num_robots = 4 robot_configs = [] # Different start/end configurations for each robot for i in range(num_robots): start_config = np.random.uniform(-0.5, 0.5, 6) + i * 0.1 end_config = np.random.uniform(-0.5, 0.5, 6) - i * 0.1 robot_configs.append((start_config, end_config)) print(f"Planning trajectories for {num_robots} robots...") # Method 1: Sequential planning (traditional) print(f"\n1. Sequential Planning:") start_time = time.time() sequential_trajectories = [] for i, (start, end) in enumerate(robot_configs): traj = planner.joint_trajectory(start, end, Tf=3.0, N=150, method=5) sequential_trajectories.append(traj) print(f" Robot {i+1}: {traj['positions'].shape}") sequential_time = time.time() - start_time print(f" Total time: {sequential_time:.4f}s") # Method 2: Batch planning (optimized) print(f"\n2. Batch Planning (Optimized):") start_time = time.time() # Prepare batch data batch_starts = np.array([config[0] for config in robot_configs]) batch_ends = np.array([config[1] for config in robot_configs]) batch_trajectories = planner.batch_joint_trajectory( thetastart_batch=batch_starts, thetaend_batch=batch_ends, Tf=3.0, N=150, method=5 ) batch_time = time.time() - start_time print(f" Batch shape: {batch_trajectories['positions'].shape}") print(f" Total time: {batch_time:.4f}s") print(f" Speedup: {sequential_time/batch_time:.2f}x") # Method 3: Collision-aware coordination print(f"\n3. Collision-Aware Coordination:") start_time = time.time() # Generate staggered timing to avoid collisions stagger_delays = [0.0, 0.3, 0.6, 0.9] # seconds coordinated_trajectories = [] for i, ((start, end), delay) in enumerate(zip(robot_configs, stagger_delays)): # Extend trajectory duration to accommodate delay extended_duration = 3.0 + delay points_with_delay = int(150 * extended_duration / 3.0) traj = planner.joint_trajectory( start, end, Tf=extended_duration, N=points_with_delay, method=5 ) # Add delay by padding with start position delay_points = int(delay * 50) # 50 points per second if delay_points > 0: start_padding = np.tile(start.reshape(1, -1), (delay_points, 1)) zero_padding = np.zeros((delay_points, 6)) # Insert delay at beginning padded_positions = np.vstack([start_padding, traj['positions']]) padded_velocities = np.vstack([zero_padding, traj['velocities']]) padded_accelerations = np.vstack([zero_padding, traj['accelerations']]) coordinated_traj = { 'positions': padded_positions, 'velocities': padded_velocities, 'accelerations': padded_accelerations } else: coordinated_traj = traj coordinated_trajectories.append(coordinated_traj) print(f" Robot {i+1}: delay {delay}s, shape {coordinated_traj['positions'].shape}") coordination_time = time.time() - start_time print(f" Total time: {coordination_time:.4f}s") # Analyze coordination effectiveness print(f"\n4. Coordination Analysis:") # Check for potential collisions (simplified workspace overlap) max_timesteps = max(traj['positions'].shape[0] for traj in coordinated_trajectories) collision_risk_points = 0 for t in range(0, max_timesteps, 10): # Check every 10th timestep robot_positions = [] for traj in coordinated_trajectories: if t < traj['positions'].shape[0]: # Convert joint angles to end-effector position T = planner.serial_manipulator.forward_kinematics(traj['positions'][t]) robot_positions.append(T[:3, 3]) # Check pairwise distances for i in range(len(robot_positions)): for j in range(i+1, len(robot_positions)): distance = np.linalg.norm( np.array(robot_positions[i]) - np.array(robot_positions[j]) ) if distance < 0.5: # 50cm safety margin collision_risk_points += 1 print(f" Collision risk points: {collision_risk_points}") print(f" Safety score: {max(0, 100 - collision_risk_points*2):.1f}%") return { 'sequential': sequential_trajectories, 'batch': batch_trajectories, 'coordinated': coordinated_trajectories, 'timing': { 'sequential_time': sequential_time, 'batch_time': batch_time, 'coordination_time': coordination_time } } # Run multi-robot coordination demonstration multi_robot_results = multi_robot_coordination() Advanced Optimization Techniques ------------------------------- Adaptive Performance Tuning ~~~~~~~~~~~~~~~~~~~~~~~~~~ The planner automatically adapts its execution strategy based on performance history: .. code-block:: python def adaptive_tuning_demo(): """Demonstrate automatic performance tuning capabilities.""" print("Adaptive Performance Tuning Demonstration") print("=" * 50) # Test various problem sizes to trigger adaptive behavior problem_sizes = [ (50, 6), (100, 6), (200, 6), (500, 6), (1000, 6), (2000, 6), (5000, 6), (1000, 12), (2000, 12) ] print("Testing adaptive threshold adjustment...") print(f"Initial threshold: {planner.cpu_threshold}") for i, (N, joints) in enumerate(problem_sizes): print(f"\nTest {i+1}: N={N}, joints={joints}") # Generate test trajectory theta_start = np.random.uniform(-1, 1, joints) theta_end = np.random.uniform(-1, 1, joints) # Reset stats for clean measurement planner.reset_performance_stats() start_time = time.time() trajectory = planner.joint_trajectory( theta_start, theta_end, Tf=2.0, N=N, method=5 ) elapsed = time.time() - start_time # Get updated performance stats stats = planner.get_performance_stats() used_gpu = stats['gpu_calls'] > 0 print(f" Execution: {'GPU' if used_gpu else 'CPU'}") print(f" Time: {elapsed:.4f}s") print(f" Updated threshold: {planner.cpu_threshold}") # Show efficiency metrics if stats['avg_gpu_time'] > 0 and stats['avg_cpu_time'] > 0: efficiency = stats['avg_cpu_time'] / stats['avg_gpu_time'] print(f" GPU efficiency: {efficiency:.2f}x") final_stats = planner.get_performance_stats() print(f"\nFinal Performance Summary:") print(f"- Total GPU calls: {final_stats['gpu_calls']}") print(f"- Total CPU calls: {final_stats['cpu_calls']}") print(f"- GPU usage: {final_stats['gpu_usage_percent']:.1f}%") print(f"- Final threshold: {planner.cpu_threshold}") return final_stats # Run adaptive tuning demonstration tuning_stats = adaptive_tuning_demo() Memory Profiling and Optimization ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Monitor and optimize memory usage for sustained performance: .. code-block:: python def memory_profiling_demo(): """Demonstrate memory profiling and optimization techniques.""" print("Memory Profiling and Optimization") print("=" * 40) # Test sustained performance under memory pressure trajectory_sizes = [500, 1000, 2000, 5000, 2000, 1000, 500] print("Testing memory allocation patterns...") memory_stats = [] for i, N in enumerate(trajectory_sizes): print(f"\nIteration {i+1}: N={N}") # Generate large trajectory to stress memory system theta_start = np.random.uniform(-1, 1, 6) theta_end = np.random.uniform(-1, 1, 6) # Measure memory allocation performance planner.reset_performance_stats() start_time = time.time() trajectory = planner.joint_trajectory( theta_start, theta_end, Tf=3.0, N=N, method=5 ) traj_time = time.time() - start_time # Compute dynamics to further stress memory start_time = time.time() torques = planner.inverse_dynamics_trajectory( trajectory['positions'], trajectory['velocities'], trajectory['accelerations'] ) dynamics_time = time.time() - start_time stats = planner.get_performance_stats() memory_stat = { 'iteration': i+1, 'N': N, 'traj_time': traj_time, 'dynamics_time': dynamics_time, 'total_time': traj_time + dynamics_time, 'used_gpu': stats['gpu_calls'] > 0, 'memory_transfers': stats['memory_transfers'], 'kernel_launches': stats['kernel_launches'] } memory_stats.append(memory_stat) print(f" Trajectory: {traj_time:.4f}s ({'GPU' if stats['gpu_calls'] > 0 else 'CPU'})") print(f" Dynamics: {dynamics_time:.4f}s") print(f" Memory transfers: {stats['memory_transfers']}") # Analyze memory allocation patterns print(f"\nMemory Allocation Analysis:") gpu_times = [s['total_time'] for s in memory_stats if s['used_gpu']] cpu_times = [s['total_time'] for s in memory_stats if not s['used_gpu']] if gpu_times: print(f" GPU times: {np.mean(gpu_times):.4f}s ± {np.std(gpu_times):.4f}s") print(f" GPU consistency: {(1 - np.std(gpu_times)/np.mean(gpu_times))*100:.1f}%") if cpu_times: print(f" CPU times: {np.mean(cpu_times):.4f}s ± {np.std(cpu_times):.4f}s") print(f" CPU consistency: {(1 - np.std(cpu_times)/np.mean(cpu_times))*100:.1f}%") # Memory cleanup demonstration print(f"\nMemory cleanup...") pre_cleanup_stats = planner.get_performance_stats() planner.cleanup_gpu_memory() post_cleanup_stats = planner.get_performance_stats() print(f" Memory cleanup completed") print(f" Performance stats preserved: {pre_cleanup_stats == post_cleanup_stats}") return memory_stats # Run memory profiling demonstration memory_profile = memory_profiling_demo() Performance Visualization and Analysis ------------------------------------ Advanced Performance Monitoring ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Comprehensive performance analysis and visualization: .. code-block:: python def performance_analysis_suite(): """Comprehensive performance analysis and visualization.""" print("Performance Analysis Suite") print("=" * 30) # Collect performance data across various scenarios test_scenarios = [ {"name": "Small Problems", "sizes": [(50, 6), (100, 6), (150, 6)]}, {"name": "Medium Problems", "sizes": [(500, 6), (750, 6), (1000, 6)]}, {"name": "Large Problems", "sizes": [(2000, 6), (3000, 6), (5000, 6)]}, {"name": "Many Joints", "sizes": [(500, 12), (1000, 12), (1500, 12)]}, ] all_results = [] for scenario in test_scenarios: print(f"\n{scenario['name']}:") scenario_results = [] for N, joints in scenario['sizes']: print(f" Testing N={N}, joints={joints}...") # Generate test data theta_start = np.random.uniform(-1, 1, joints) theta_end = np.random.uniform(-1, 1, joints) # Run multiple trials for statistical accuracy trial_times = [] trial_gpu_usage = [] for trial in range(5): # 5 trials per configuration planner.reset_performance_stats() start_time = time.time() trajectory = planner.joint_trajectory( theta_start, theta_end, Tf=2.0, N=N, method=5 ) elapsed = time.time() - start_time stats = planner.get_performance_stats() trial_times.append(elapsed) trial_gpu_usage.append(stats['gpu_calls'] > 0) # Calculate statistics mean_time = np.mean(trial_times) std_time = np.std(trial_times) gpu_usage_rate = np.mean(trial_gpu_usage) result = { 'scenario': scenario['name'], 'N': N, 'joints': joints, 'problem_size': N * joints, 'mean_time': mean_time, 'std_time': std_time, 'gpu_usage_rate': gpu_usage_rate, 'performance_score': (N * joints) / mean_time # ops per second } scenario_results.append(result) all_results.append(result) print(f" Time: {mean_time:.4f}s ± {std_time:.4f}s") print(f" GPU usage: {gpu_usage_rate*100:.0f}%") # Performance analysis print(f"\n" + "=" * 50) print("Performance Analysis Results:") # Find optimal problem sizes for GPU gpu_results = [r for r in all_results if r['gpu_usage_rate'] > 0.5] cpu_results = [r for r in all_results if r['gpu_usage_rate'] < 0.5] if gpu_results: gpu_threshold_size = min(r['problem_size'] for r in gpu_results) best_gpu_performance = max(r['performance_score'] for r in gpu_results) print(f" GPU threshold: ~{gpu_threshold_size} total elements") print(f" Best GPU performance: {best_gpu_performance:.0f} ops/second") if cpu_results: best_cpu_performance = max(r['performance_score'] for r in cpu_results) print(f" Best CPU performance: {best_cpu_performance:.0f} ops/second") # Performance consistency analysis gpu_times = [r['mean_time'] for r in gpu_results] cpu_times = [r['mean_time'] for r in cpu_results] if gpu_times and cpu_times: gpu_efficiency = np.mean(cpu_times) / np.mean(gpu_times) print(f" Average GPU speedup: {gpu_efficiency:.2f}x") # Memory transfer efficiency total_gpu_calls = sum(1 for r in all_results if r['gpu_usage_rate'] > 0) if total_gpu_calls > 0: final_stats = planner.get_performance_stats() transfer_efficiency = final_stats['kernel_launches'] / total_gpu_calls print(f" Memory transfer efficiency: {transfer_efficiency:.2f} kernels/call") return all_results # Run comprehensive performance analysis perf_analysis = performance_analysis_suite() Deployment Best Practices ------------------------- Production Deployment Guidelines ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Guidelines for deploying optimized trajectory planning in production environments: .. code-block:: python def production_deployment_guide(): """Guidelines and examples for production deployment.""" print("Production Deployment Guidelines") print("=" * 40) # Example production configuration production_config = { 'use_cuda': None, # Auto-detect for flexibility 'cuda_threshold': 200, # Conservative threshold 'memory_pool_size_mb': 512, # Moderate memory pool 'enable_profiling': False, # Disable in production } print("1. Production Configuration:") for key, value in production_config.items(): print(f" {key}: {value}") # Create production-ready planner prod_planner = OptimizedTrajectoryPlanning( serial_manipulator=robot, urdf_path="robot.urdf", dynamics=dynamics, joint_limits=joint_limits, torque_limits=torque_limits, **production_config ) print(f"\n2. System Capabilities:") print(f" CUDA available: {prod_planner.cuda_available}") print(f" GPU properties: {prod_planner.gpu_properties}") print(f" CPU threshold: {prod_planner.cpu_threshold}") # Test production performance print(f"\n3. Production Performance Test:") # Simulate typical production workload workload_sizes = [100, 250, 500, 1000, 2000] workload_results = [] for size in workload_sizes: theta_start = np.random.uniform(-1, 1, 6) theta_end = np.random.uniform(-1, 1, 6) # Measure performance start_time = time.time() trajectory = prod_planner.joint_trajectory( theta_start, theta_end, Tf=2.0, N=size, method=5 ) elapsed = time.time() - start_time # Check for acceptable performance is_realtime = elapsed < 0.1 # 100ms max for real-time throughput = size / elapsed result = { 'size': size, 'time': elapsed, 'realtime': is_realtime, 'throughput': throughput } workload_results.append(result) status = "✓" if is_realtime else "⚠️" print(f" N={size}: {elapsed:.4f}s {status} ({throughput:.0f} points/s)") # Production recommendations print(f"\n4. Production Recommendations:") realtime_sizes = [r['size'] for r in workload_results if r['realtime']] if realtime_sizes: max_realtime = max(realtime_sizes) print(f" ✓ Real-time capable up to {max_realtime} points") best_throughput = max(r['throughput'] for r in workload_results) print(f" ✓ Peak throughput: {best_throughput:.0f} points/second") # Error handling recommendations print(f"\n5. Error Handling:") print(f" ✓ Automatic GPU->CPU fallback enabled") print(f" ✓ Memory allocation failures handled gracefully") print(f" ✓ Joint limit enforcement active") print(f" ✓ Torque limit checking enabled") # Monitoring recommendations print(f"\n6. Monitoring Setup:") print(f" • Monitor planner.get_performance_stats() regularly") print(f" • Track GPU usage percentage for optimization") print(f" • Alert on excessive CPU fallback occurrences") print(f" • Monitor memory allocation patterns") return prod_planner, workload_results # Run production deployment guide prod_planner, prod_results = production_deployment_guide() Error Handling and Debugging ~~~~~~~~~~~~~~~~~~~~~~~~~~~ Comprehensive error handling and debugging tools for production use: .. code-block:: python def error_handling_demo(): """Demonstrate comprehensive error handling and debugging capabilities.""" print("Error Handling and Debugging") print("=" * 35) # Test various error conditions error_tests = [ { 'name': 'Invalid joint limits', 'test': lambda: planner.joint_trajectory( np.array([5.0, 5.0, 5.0, 5.0, 5.0, 5.0]), # Beyond limits np.array([0.0, 0.0, 0.0, 0.0, 0.0, 0.0]), Tf=2.0, N=100, method=3 ) }, { 'name': 'Zero duration trajectory', 'test': lambda: planner.joint_trajectory( np.zeros(6), np.ones(6), Tf=0.0, N=100, method=3 ) }, { 'name': 'Invalid method parameter', 'test': lambda: planner.joint_trajectory( np.zeros(6), np.ones(6), Tf=2.0, N=100, method=7 # Invalid ) }, { 'name': 'Extremely large trajectory', 'test': lambda: planner.joint_trajectory( np.zeros(6), np.ones(6), Tf=2.0, N=100000, method=3 # Very large ) } ] print("Testing error conditions:") for i, error_test in enumerate(error_tests, 1): print(f"\n{i}. {error_test['name']}:") try: result = error_test['test']() print(f" ✓ Handled gracefully") print(f" Result shape: {result['positions'].shape}") except Exception as e: print(f" ⚠️ Exception: {type(e).__name__}: {e}") # Test memory exhaustion handling print(f"\n5. Memory exhaustion test:") try: # Try to allocate extremely large trajectory huge_trajectory = planner.joint_trajectory( np.zeros(6), np.ones(6), Tf=2.0, N=1000000, method=3 ) print(f" ✓ Large trajectory handled: {huge_trajectory['positions'].shape}") except Exception as e: print(f" ⚠️ Memory limit reached: {type(e).__name__}") # Test GPU error recovery if planner.cuda_available: print(f"\n6. GPU error recovery test:") try: # Force GPU usage with large problem original_threshold = planner.cpu_threshold planner.cpu_threshold = 0 # Force GPU trajectory = planner.joint_trajectory( np.zeros(6), np.ones(6), Tf=2.0, N=5000, method=5 ) print(f" ✓ GPU computation successful") planner.cpu_threshold = original_threshold except Exception as e: print(f" ⚠️ GPU error handled, fell back to CPU: {e}") # Debugging utilities demonstration print(f"\n7. Debugging Utilities:") # Performance stats for debugging stats = planner.get_performance_stats() print(f" Current performance stats:") for key, value in stats.items(): print(f" {key}: {value}") # Memory cleanup for debugging print(f" Memory cleanup status:") try: planner.cleanup_gpu_memory() print(f" ✓ GPU memory cleaned successfully") except Exception as e: print(f" ⚠️ Memory cleanup error: {e}") return True # Run error handling demonstration error_test_result = error_handling_demo() Summary and Migration Guide -------------------------- Migration from Original TrajectoryPlanning ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Step-by-step migration guide for existing code: .. code-block:: python def migration_guide(): """Guide for migrating from original TrajectoryPlanning to optimized version.""" print("Migration Guide: Original → Optimized TrajectoryPlanning") print("=" * 60) print("1. BACKWARD COMPATIBILITY:") print(" ✓ Existing code works unchanged") print(" ✓ All method signatures preserved") print(" ✓ Return value formats identical") print(" ✓ Automatic optimization activation") print("\n2. SIMPLE MIGRATION (No Code Changes Required):") print(" # Original code") print(" from ManipulaPy.path_planning import TrajectoryPlanning") print(" planner = TrajectoryPlanning(robot, urdf, dynamics, limits)") print(" trajectory = planner.joint_trajectory(start, end, 2.0, 100, 3)") print(" ") print(" # → Automatically uses OptimizedTrajectoryPlanning!") print("\n3. ENHANCED MIGRATION (Unlock Full Performance):") print(" # Use factory function for optimal settings") print(" from ManipulaPy.path_planning import create_optimized_planner") print(" planner = create_optimized_planner(") print(" robot, urdf, dynamics, limits,") print(" gpu_memory_mb=512, # GPU memory pool") print(" enable_profiling=True # Performance monitoring") print(" )") print("\n4. NEW PERFORMANCE FEATURES:") print(" # Batch processing for multiple trajectories") print(" batch_results = planner.batch_joint_trajectory(") print(" starts_batch, ends_batch, Tf, N, method") print(" )") print(" ") print(" # Performance monitoring") print(" stats = planner.get_performance_stats()") print(" print(f'GPU usage: {stats[\"gpu_usage_percent\"]:.1f}%')") print(" ") print(" # Memory management") print(" planner.cleanup_gpu_memory() # Clean up when done") print("\n5. PERFORMANCE BENEFITS:") # Demonstrate actual performance improvements original_planner = TrajectoryPlanning( serial_manipulator=robot, urdf_path="robot.urdf", dynamics=dynamics, joint_limits=joint_limits ) # Test case theta_start = np.zeros(6) theta_end = np.array([0.5, 0.3, -0.2, 0.1, 0.4, -0.1]) # Both planners are actually the same optimized implementation now # but we can show the before/after conceptually test_sizes = [100, 500, 1000, 2000] for N in test_sizes: start_time = time.time() traj = original_planner.joint_trajectory( theta_start, theta_end, Tf=2.0, N=N, method=5 ) elapsed = time.time() - start_time stats = original_planner.get_performance_stats() used_gpu = stats['gpu_calls'] > 0 print(f" N={N}: {elapsed:.4f}s ({'GPU' if used_gpu else 'CPU'})") print("\n6. MIGRATION CHECKLIST:") print(" □ Update import statements (optional)") print(" □ Add performance monitoring (recommended)") print(" □ Configure GPU memory pool (optional)") print(" □ Add error handling for production (recommended)") print(" □ Test with your specific workloads") print("\n7. TROUBLESHOOTING:") print(" • GPU not detected? Check CUDA installation") print(" • Memory errors? Reduce memory_pool_size_mb") print(" • Performance regression? Check cuda_threshold") print(" • Need CPU-only? Set use_cuda=False") return True # Run migration guide migration_complete = migration_guide() Advanced Configuration Examples ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Real-world configuration examples for different use cases: .. code-block:: python def advanced_configuration_examples(): """Show advanced configuration examples for different scenarios.""" print("Advanced Configuration Examples") print("=" * 40) # Configuration 1: High-throughput batch processing print("1. HIGH-THROUGHPUT BATCH PROCESSING:") batch_config = { 'use_cuda': True, # Force GPU usage 'cuda_threshold': 50, # Low threshold for maximum GPU usage 'memory_pool_size_mb': 2048, # Large memory pool for batch ops 'enable_profiling': True # Monitor performance } try: batch_planner = OptimizedTrajectoryPlanning( serial_manipulator=robot, urdf_path="robot.urdf", dynamics=dynamics, joint_limits=joint_limits, **batch_config ) print(" ✓ High-throughput planner configured") print(f" GPU available: {batch_planner.cuda_available}") print(f" Memory pool: {batch_config['memory_pool_size_mb']} MB") except Exception as e: print(f" ⚠️ Configuration failed: {e}") # Configuration 2: Real-time control system print("\n2. REAL-TIME CONTROL SYSTEM:") realtime_config = { 'use_cuda': None, # Adaptive based on timing 'cuda_threshold': 200, # Conservative threshold for reliability 'memory_pool_size_mb': 256, # Moderate memory usage 'enable_profiling': False # No profiling overhead } realtime_planner = OptimizedTrajectoryPlanning( serial_manipulator=robot, urdf_path="robot.urdf", dynamics=dynamics, joint_limits=joint_limits, **realtime_config ) print(" ✓ Real-time planner configured") print(f" Adaptive execution: {realtime_planner.cuda_available}") # Configuration 3: Memory-constrained embedded system print("\n3. MEMORY-CONSTRAINED EMBEDDED:") embedded_config = { 'use_cuda': False, # CPU-only for embedded 'cuda_threshold': float('inf'), # Never use GPU 'memory_pool_size_mb': None, # No GPU memory pool 'enable_profiling': False # Minimal overhead } embedded_planner = OptimizedTrajectoryPlanning( serial_manipulator=robot, urdf_path="robot.urdf", dynamics=dynamics, joint_limits=joint_limits, **embedded_config ) print(" ✓ Embedded planner configured") print(f" CPU-only mode: {not embedded_planner.cuda_available}") # Configuration 4: Development and debugging print("\n4. DEVELOPMENT AND DEBUGGING:") debug_config = { 'use_cuda': None, # Test both paths 'cuda_threshold': 100, # Standard threshold 'memory_pool_size_mb': 512, # Reasonable pool size 'enable_profiling': True # Full profiling enabled } debug_planner = OptimizedTrajectoryPlanning( serial_manipulator=robot, urdf_path="robot.urdf", dynamics=dynamics, joint_limits=joint_limits, **debug_config ) print(" ✓ Debug planner configured") print(f" Profiling enabled: {debug_planner.enable_profiling}") # Test each configuration with sample workload configs = [ ("Batch", batch_planner), ("Real-time", realtime_planner), ("Embedded", embedded_planner), ("Debug", debug_planner) ] print(f"\n5. CONFIGURATION PERFORMANCE TEST:") test_N = 500 for name, planner_instance in configs: try: planner_instance.reset_performance_stats() start_time = time.time() trajectory = planner_instance.joint_trajectory( np.zeros(6), np.ones(6), Tf=2.0, N=test_N, method=5 ) elapsed = time.time() - start_time stats = planner_instance.get_performance_stats() used_gpu = stats['gpu_calls'] > 0 print(f" {name}: {elapsed:.4f}s ({'GPU' if used_gpu else 'CPU'})") except Exception as e: print(f" {name}: ⚠️ Error - {e}") return { 'batch': batch_planner, 'realtime': realtime_planner, 'embedded': embedded_planner, 'debug': debug_planner } # Run advanced configuration examples config_planners = advanced_configuration_examples() Conclusion and Best Practices ---------------------------- **Performance Summary:** The optimized `TrajectoryPlanning` class provides significant performance improvements: - **Adaptive execution**: Automatically chooses optimal CPU/GPU strategy - **Batch processing**: Up to 10x speedup for multiple trajectories - **Memory pooling**: Reduces allocation overhead by 50-80% - **CUDA acceleration**: 2-20x speedup for large problems - **Intelligent fallback**: Graceful degradation when GPU unavailable **Key Optimizations:** 1. **2D Parallelization**: Trajectories computed across both time and joint dimensions 2. **Fused Kernels**: Multiple operations combined to minimize memory transfers 3. **Pinned Memory**: Faster host-device transfers for large datasets 4. **Adaptive Thresholds**: Automatic tuning based on performance history 5. **Memory Pooling**: Reuse of GPU arrays to eliminate allocation overhead **Best Practices for Production:** 1. **Use Factory Function**: `create_optimized_planner()` for automatic optimization 2. **Monitor Performance**: Regularly check `get_performance_stats()` 3. **Configure Memory**: Set appropriate `memory_pool_size_mb` for your workload 4. **Handle Errors**: Implement proper error handling for GPU failures 5. **Profile First**: Use `enable_profiling=True` during development 6. **Batch When Possible**: Use `batch_joint_trajectory()` for multiple paths 7. **Clean Up**: Call `cleanup_gpu_memory()` when done **Migration Strategy:** - **Phase 1**: Drop-in replacement (no code changes required) - **Phase 2**: Add performance monitoring - **Phase 3**: Enable batch processing where applicable - **Phase 4**: Fine-tune configuration for your specific use case **Configuration Guidelines:** - **High-throughput**: `cuda_threshold=50`, large `memory_pool_size_mb` - **Real-time**: `cuda_threshold=200`, moderate memory pool - **Embedded**: `use_cuda=False`, minimal memory footprint - **Development**: `enable_profiling=True`, adaptive settings The optimized trajectory planning module maintains full backward compatibility while providing substantial performance improvements. Users can benefit from optimizations immediately with existing code, then gradually adopt advanced features for maximum performance gains. For the most demanding applications, the combination of GPU acceleration, batch processing, and intelligent memory management can provide order-of-magnitude performance improvements while maintaining the same simple API that makes ManipulaPy easy to use.