#!/usr/bin/env python3
# SPDX-License-Identifier: AGPL-3.0-or-later
"""
Control Module - ManipulaPy
This module provides various control algorithms for robotic manipulators including
PID, computed torque, adaptive, and robust control methods.
Note: All control methods use CPU-based NumPy computation to avoid GPU-CPU transfer
overhead. Since the dynamics module operates on NumPy arrays, keeping everything on
the CPU is significantly more efficient than repeated PCIe transfers between GPU and
CPU memory spaces.
Copyright (c) 2025 Mohamed Aboelnasr
Licensed under the GNU Affero General Public License v3.0 or later (AGPL-3.0-or-later)
This file is part of ManipulaPy.
ManipulaPy is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
ManipulaPy is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with ManipulaPy. If not, see <https://www.gnu.org/licenses/>.
"""
from __future__ import annotations
import logging
from typing import Any, Dict, List, Optional, Tuple, Union
import matplotlib.pyplot as plt
import numpy as np
from numpy.typing import NDArray
# Optional CuPy import for defensive array handling
try:
import cupy as cp
CUPY_AVAILABLE = True
except ImportError:
cp = None
CUPY_AVAILABLE = False
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def _to_numpy(arr: Any) -> NDArray[Any]:
"""
Safely convert array to NumPy, handling both NumPy and CuPy arrays.
Args:
arr: Input array (can be NumPy array, CuPy array, or list)
Returns:
NumPy array
Note:
This is necessary because np.asarray() does not work with CuPy arrays.
CuPy raises "Implicit conversion to a NumPy array is not allowed"
to prevent accidental performance issues. We must explicitly call .get()
to transfer CuPy arrays from GPU to CPU.
"""
if CUPY_AVAILABLE and cp is not None:
try:
if isinstance(arr, cp.ndarray):
# CuPy array: explicitly transfer from GPU to CPU
return arr.get()
except (TypeError, AttributeError):
# cp.ndarray may not be a real type when CuPy is mocked; treat as non-CuPy
pass
# NumPy array, list, or other: convert to NumPy
return np.asarray(arr)
def _validate_i_clamp(i_clamp: Optional[float]) -> Optional[float]:
"""Return a scalar positive finite integral clamp, or None if disabled."""
if i_clamp is None:
return None
clamp = np.asarray(_to_numpy(i_clamp), dtype=float)
if clamp.size != 1:
raise ValueError(f"i_clamp must be a scalar, got shape {clamp.shape}")
clamp_value = float(clamp.reshape(-1)[0])
if not np.isfinite(clamp_value) or clamp_value <= 0:
raise ValueError(
f"i_clamp must be positive and finite when set, got {i_clamp!r}"
)
return clamp_value
[docs]
class ManipulatorController:
"""CPU-based manipulator controller implementations and tuning utilities."""
[docs]
def __init__(self, dynamics: Any) -> None:
"""
Initialize the ManipulatorController with the dynamics of the manipulator.
Note: Control algorithms now use CPU (NumPy) to avoid GPU-CPU transfer
overhead, since the dynamics module operates on NumPy arrays.
Parameters:
dynamics (ManipulatorDynamics): An instance of ManipulatorDynamics.
"""
self.dynamics = dynamics
self.eint: Optional[NDArray[np.float64]] = None
self.parameter_estimate: Optional[NDArray[np.float64]] = None
self.P: Optional[NDArray[np.float64]] = None
self.x_hat: Optional[NDArray[np.float64]] = None
[docs]
def computed_torque_control(
self,
thetalistd: Union[NDArray[np.float64], List[float]],
dthetalistd: Union[NDArray[np.float64], List[float]],
ddthetalistd: Union[NDArray[np.float64], List[float]],
thetalist: Union[NDArray[np.float64], List[float]],
dthetalist: Union[NDArray[np.float64], List[float]],
g: Union[NDArray[np.float64], List[float]],
dt: float,
Kp: Union[NDArray[np.float64], List[float]],
Ki: Union[NDArray[np.float64], List[float]],
Kd: Union[NDArray[np.float64], List[float]],
i_clamp: Optional[float] = None,
) -> NDArray[np.float64]:
"""
Computed Torque Control.
Uses CPU-based computation to avoid GPU-CPU transfer overhead.
The dynamics module operates on NumPy arrays, so keeping everything
on CPU is more efficient than repeated GPU↔CPU transfers.
Parameters:
thetalistd: Desired joint angles.
dthetalistd: Desired joint velocities.
ddthetalistd: Desired joint accelerations.
thetalist: Current joint angles.
dthetalist: Current joint velocities.
g: Gravity vector.
dt: Time step.
Kp: Proportional gain.
Ki: Integral gain.
Kd: Derivative gain.
Returns:
NDArray: Torque command (CPU-based NumPy array).
"""
# Convert to NumPy arrays (CPU) - avoid GPU↔CPU transfer bottleneck
# Use _to_numpy() to safely handle both NumPy and CuPy arrays
thetalistd = _to_numpy(thetalistd)
dthetalistd = _to_numpy(dthetalistd)
ddthetalistd = _to_numpy(ddthetalistd)
thetalist = _to_numpy(thetalist)
dthetalist = _to_numpy(dthetalist)
g = _to_numpy(g)
Kp = _to_numpy(Kp)
Ki = _to_numpy(Ki)
Kd = _to_numpy(Kd)
if self.eint is None or self.eint.shape != thetalist.shape:
if self.eint is not None and self.eint.shape != thetalist.shape:
logger.debug(
"Controller state shape mismatch (%s vs %s); resetting integral.",
self.eint.shape,
thetalist.shape,
)
self.eint = np.zeros_like(thetalist, dtype=float)
e = thetalistd - thetalist
self.eint += e * dt
i_clamp = _validate_i_clamp(i_clamp)
if i_clamp is not None:
np.clip(self.eint, -i_clamp, i_clamp, out=self.eint)
# Dynamics computations (no GPU↔CPU transfers)
M = self.dynamics.mass_matrix(thetalist)
tau = M @ (Kp * e + Ki * self.eint + Kd * (dthetalistd - dthetalist))
tau += self.dynamics.inverse_dynamics(
thetalist,
dthetalist,
ddthetalistd,
g,
[0, 0, 0, 0, 0, 0],
)
return tau
[docs]
def pd_control(
self,
desired_position: Union[NDArray[np.float64], List[float]],
desired_velocity: Union[NDArray[np.float64], List[float]],
current_position: Union[NDArray[np.float64], List[float]],
current_velocity: Union[NDArray[np.float64], List[float]],
Kp: Union[NDArray[np.float64], List[float]],
Kd: Union[NDArray[np.float64], List[float]],
) -> NDArray[np.float64]:
"""
PD Control.
Uses CPU-based computation to avoid GPU-CPU transfer overhead.
Parameters:
desired_position: Desired joint positions.
desired_velocity: Desired joint velocities.
current_position: Current joint positions.
current_velocity: Current joint velocities.
Kp: Proportional gain.
Kd: Derivative gain.
Returns:
NDArray: PD control signal (CPU-based NumPy array).
"""
desired_position = _to_numpy(desired_position)
desired_velocity = _to_numpy(desired_velocity)
current_position = _to_numpy(current_position)
current_velocity = _to_numpy(current_velocity)
Kp = _to_numpy(Kp)
Kd = _to_numpy(Kd)
e = desired_position - current_position
edot = desired_velocity - current_velocity
pd_signal = Kp * e + Kd * edot
return pd_signal
[docs]
def pid_control(
self,
thetalistd: Union[NDArray[np.float64], List[float]],
dthetalistd: Union[NDArray[np.float64], List[float]],
thetalist: Union[NDArray[np.float64], List[float]],
dthetalist: Union[NDArray[np.float64], List[float]],
dt: float,
Kp: Union[NDArray[np.float64], List[float]],
Ki: Union[NDArray[np.float64], List[float]],
Kd: Union[NDArray[np.float64], List[float]],
i_clamp: Optional[float] = None,
) -> NDArray[np.float64]:
"""
PID Control.
Uses CPU-based computation to avoid GPU-CPU transfer overhead.
Parameters:
thetalistd: Desired joint angles.
dthetalistd: Desired joint velocities.
thetalist: Current joint angles.
dthetalist: Current joint velocities.
dt: Time step.
Kp: Proportional gain.
Ki: Integral gain.
Kd: Derivative gain.
Returns:
NDArray: PID control signal (CPU-based NumPy array).
"""
thetalistd = _to_numpy(thetalistd)
dthetalistd = _to_numpy(dthetalistd)
thetalist = _to_numpy(thetalist)
dthetalist = _to_numpy(dthetalist)
Kp = _to_numpy(Kp)
Ki = _to_numpy(Ki)
Kd = _to_numpy(Kd)
if self.eint is None or self.eint.shape != thetalist.shape:
if self.eint is not None and self.eint.shape != thetalist.shape:
logger.debug(
"Controller state shape mismatch (%s vs %s); resetting integral.",
self.eint.shape,
thetalist.shape,
)
self.eint = np.zeros_like(thetalist, dtype=float)
e = thetalistd - thetalist
self.eint += e * dt
i_clamp = _validate_i_clamp(i_clamp)
if i_clamp is not None:
np.clip(self.eint, -i_clamp, i_clamp, out=self.eint)
e_dot = dthetalistd - dthetalist
tau = Kp * e + Ki * self.eint + Kd * e_dot
return tau
[docs]
def robust_control(
self,
thetalist: Union[NDArray[np.float64], List[float]],
dthetalist: Union[NDArray[np.float64], List[float]],
ddthetalist: Union[NDArray[np.float64], List[float]],
g: Union[NDArray[np.float64], List[float]],
Ftip: Union[NDArray[np.float64], List[float]],
disturbance_estimate: Union[NDArray[np.float64], List[float]],
adaptation_gain: float,
) -> NDArray[np.float64]:
"""
Robust Control.
Uses CPU-based computation to avoid GPU-CPU transfer overhead.
Parameters:
thetalist: Current joint angles.
dthetalist: Current joint velocities.
ddthetalist: Desired joint accelerations.
g: Gravity vector.
Ftip: External forces applied at the end effector.
disturbance_estimate: Estimate of disturbances.
adaptation_gain: Gain for the adaptation term.
Returns:
NDArray: Robust control torque (CPU-based NumPy array).
"""
# Convert to NumPy arrays (CPU) - avoid GPU↔CPU transfer bottleneck
thetalist = _to_numpy(thetalist)
dthetalist = _to_numpy(dthetalist)
ddthetalist = _to_numpy(ddthetalist)
g = _to_numpy(g)
Ftip = _to_numpy(Ftip)
disturbance_estimate = _to_numpy(disturbance_estimate)
adaptation_gain = _to_numpy(adaptation_gain)
# Dynamics computations — _to_numpy guards keep us on the CPU
# path even when the dynamics backend would otherwise hand back
# CuPy arrays. Mixing the two raises TypeError on a real CuPy
# install (previously masked by the CPU-only test mock).
M = _to_numpy(self.dynamics.mass_matrix(thetalist))
c = _to_numpy(self.dynamics.velocity_quadratic_forces(thetalist, dthetalist))
g_forces = _to_numpy(self.dynamics.gravity_forces(thetalist, g))
J_transpose = _to_numpy(self.dynamics.jacobian(thetalist)).T
tau = (
M @ ddthetalist
+ c
+ g_forces
+ J_transpose @ Ftip
+ adaptation_gain * disturbance_estimate
)
return tau
[docs]
def adaptive_control(
self,
thetalist: Union[NDArray[np.float64], List[float]],
dthetalist: Union[NDArray[np.float64], List[float]],
ddthetalist: Union[NDArray[np.float64], List[float]],
g: Union[NDArray[np.float64], List[float]],
Ftip: Union[NDArray[np.float64], List[float]],
measurement_error: Union[NDArray[np.float64], List[float]],
adaptation_gain: float,
) -> NDArray[np.float64]:
"""
Adaptive Control.
Uses CPU-based computation to avoid GPU-CPU transfer overhead.
Parameters:
thetalist: Current joint angles.
dthetalist: Current joint velocities.
ddthetalist: Desired joint accelerations.
g: Gravity vector.
Ftip: External forces applied at the end effector.
measurement_error: Error in measurement.
adaptation_gain: Gain for the adaptation term.
Returns:
NDArray: Adaptive control torque (CPU-based NumPy array).
"""
# Convert to NumPy arrays (CPU) - avoid GPU↔CPU transfer bottleneck
thetalist = _to_numpy(thetalist)
dthetalist = _to_numpy(dthetalist)
ddthetalist = _to_numpy(ddthetalist)
g = _to_numpy(g)
Ftip = _to_numpy(Ftip)
measurement_error = _to_numpy(measurement_error)
# ---- parameter update (make it 1-D, same length as joints) ----
n = thetalist.size
if getattr(self, "parameter_estimate", None) is None:
self.parameter_estimate = np.zeros((n,), dtype=thetalist.dtype)
err = measurement_error.reshape(
-1
) # (n,) - already NumPy from _to_numpy() above
# Handle both scalar and array adaptation_gain
gamma = float(np.atleast_1d(adaptation_gain).ravel()[0])
# simple gradient-like update
self.parameter_estimate = self.parameter_estimate + gamma * err
# ---- standard torque computation (no GPU↔CPU transfers) ----
M = self.dynamics.mass_matrix(thetalist)
c = self.dynamics.velocity_quadratic_forces(thetalist, dthetalist)
g_forces = self.dynamics.gravity_forces(thetalist, g)
J_transpose = self.dynamics.jacobian(thetalist).T
tau = (
M @ ddthetalist
+ c
+ g_forces
+ J_transpose @ Ftip
+ self.parameter_estimate
)
return tau
[docs]
def kalman_filter_predict(
self,
thetalist: Union[NDArray[np.float64], List[float]],
dthetalist: Union[NDArray[np.float64], List[float]],
taulist: Union[NDArray[np.float64], List[float]],
g: Union[NDArray[np.float64], List[float]],
Ftip: Union[NDArray[np.float64], List[float]],
dt: float,
Q: NDArray[np.float64],
) -> None:
"""
Kalman Filter Prediction.
Uses CPU-based computation to avoid GPU-CPU transfer overhead.
Parameters:
thetalist: Current joint angles.
dthetalist: Current joint velocities.
taulist: Applied torques.
g: Gravity vector.
Ftip: External forces applied at the end effector.
dt: Time step.
Q: Process noise covariance.
Returns:
None
"""
thetalist = _to_numpy(thetalist)
dthetalist = _to_numpy(dthetalist)
taulist = _to_numpy(taulist)
g = _to_numpy(g)
Ftip = _to_numpy(Ftip)
Q = _to_numpy(Q)
n = self.x_hat.shape[0] if self.x_hat is not None else len(thetalist) * 2
if Q.shape != (n, n):
raise ValueError(f"Q must have shape ({n}, {n}), got {Q.shape}")
if self.x_hat is None:
self.x_hat = np.concatenate((thetalist, dthetalist))
thetalist_pred = (
self.x_hat[: len(thetalist)] + self.x_hat[len(thetalist) :] * dt
)
# forward_dynamics may return a CuPy array when the backend is
# CuPy; coerce so x_hat stays NumPy throughout the filter cycle
# (otherwise the update step's H @ x_hat raises TypeError).
dthetalist_pred = (
_to_numpy(
self.dynamics.forward_dynamics(
self.x_hat[: len(thetalist)],
self.x_hat[len(thetalist) :],
taulist,
g,
Ftip,
)
)
* dt
+ self.x_hat[len(thetalist) :]
)
x_hat_pred = np.concatenate((thetalist_pred, dthetalist_pred))
if self.P is None:
self.P = np.eye(len(x_hat_pred))
F = np.eye(len(x_hat_pred))
self.P = F @ self.P @ F.T + Q
self.x_hat = x_hat_pred
[docs]
def kalman_filter_update(
self, z: Union[NDArray[np.float64], List[float]], R: NDArray[np.float64]
) -> None:
"""
Kalman Filter Update.
Uses CPU-based computation to avoid GPU-CPU transfer overhead.
Parameters:
z: Measurement vector.
R: Measurement noise covariance.
Returns:
None
"""
z = _to_numpy(z)
R = _to_numpy(R)
if self.x_hat is None:
raise ValueError(
"kalman_filter_update called before kalman_filter_predict; "
"x_hat has not been initialized"
)
self.x_hat = _to_numpy(self.x_hat)
n = self.x_hat.shape[0]
if self.P is None:
raise ValueError(
f"P must be initialized with shape ({n}, {n}) before update; "
"got None"
)
self.P = _to_numpy(self.P)
if getattr(self.P, "shape", None) != (n, n):
raise ValueError(
f"P must be initialized with shape ({n}, {n}) before update; "
f"got {self.P.shape}"
)
if z.shape != (n,):
raise ValueError(f"z must have shape ({n},) to match x_hat, got {z.shape}")
if R.shape != (n, n):
raise ValueError(f"R must have shape ({n}, {n}), got {R.shape}")
H = np.eye(len(self.x_hat))
y = z - H @ self.x_hat
S = H @ self.P @ H.T + R
K = self.P @ H.T @ np.linalg.inv(S)
self.x_hat += K @ y
self.P = (np.eye(len(self.x_hat)) - K @ H) @ self.P
[docs]
def kalman_filter_control(
self,
thetalistd: Union[NDArray[np.float64], List[float]],
dthetalistd: Union[NDArray[np.float64], List[float]],
thetalist: Union[NDArray[np.float64], List[float]],
dthetalist: Union[NDArray[np.float64], List[float]],
taulist: Union[NDArray[np.float64], List[float]],
g: Union[NDArray[np.float64], List[float]],
Ftip: Union[NDArray[np.float64], List[float]],
dt: float,
Q: NDArray[np.float64],
R: NDArray[np.float64],
) -> Tuple[NDArray[np.float64], NDArray[np.float64]]:
"""
Kalman Filter Control.
Uses CPU-based computation to avoid GPU-CPU transfer overhead.
Parameters:
thetalistd: Desired joint angles.
dthetalistd: Desired joint velocities.
thetalist: Current joint angles.
dthetalist: Current joint velocities.
taulist: Applied torques.
g: Gravity vector.
Ftip: External forces applied at the end effector.
dt: Time step.
Q: Process noise covariance.
R: Measurement noise covariance.
Returns:
tuple: Estimated joint angles and velocities (CPU-based NumPy arrays).
"""
# Convert to NumPy (predictions and updates already handle this, but for consistency)
thetalist = _to_numpy(thetalist)
dthetalist = _to_numpy(dthetalist)
self.kalman_filter_predict(thetalist, dthetalist, taulist, g, Ftip, dt, Q)
self.kalman_filter_update(np.concatenate((thetalist, dthetalist)), R)
return self.x_hat[: len(thetalist)], self.x_hat[len(thetalist) :]
[docs]
def feedforward_control(
self,
desired_position: Union[NDArray[np.float64], List[float]],
desired_velocity: Union[NDArray[np.float64], List[float]],
desired_acceleration: Union[NDArray[np.float64], List[float]],
g: Union[NDArray[np.float64], List[float]],
Ftip: Union[NDArray[np.float64], List[float]],
) -> NDArray[np.float64]:
"""
Feedforward Control.
Uses CPU-based computation to avoid GPU-CPU transfer overhead.
Parameters:
desired_position: Desired joint positions.
desired_velocity: Desired joint velocities.
desired_acceleration: Desired joint accelerations.
g: Gravity vector.
Ftip: External forces applied at the end effector.
Returns:
NDArray: Feedforward torque (CPU-based NumPy array).
"""
desired_position = _to_numpy(desired_position)
desired_velocity = _to_numpy(desired_velocity)
desired_acceleration = _to_numpy(desired_acceleration)
g = _to_numpy(g)
Ftip = _to_numpy(Ftip)
tau = self.dynamics.inverse_dynamics(
desired_position,
desired_velocity,
desired_acceleration,
g,
Ftip,
)
return tau
[docs]
def pd_feedforward_control(
self,
desired_position: Union[NDArray[np.float64], List[float]],
desired_velocity: Union[NDArray[np.float64], List[float]],
desired_acceleration: Union[NDArray[np.float64], List[float]],
current_position: Union[NDArray[np.float64], List[float]],
current_velocity: Union[NDArray[np.float64], List[float]],
Kp: Union[NDArray[np.float64], List[float]],
Kd: Union[NDArray[np.float64], List[float]],
g: Union[NDArray[np.float64], List[float]],
Ftip: Union[NDArray[np.float64], List[float]],
) -> NDArray[np.float64]:
"""
PD Feedforward Control.
Uses CPU-based computation to avoid GPU-CPU transfer overhead.
Parameters:
desired_position: Desired joint positions.
desired_velocity: Desired joint velocities.
desired_acceleration: Desired joint accelerations.
current_position: Current joint positions.
current_velocity: Current joint velocities.
Kp: Proportional gain.
Kd: Derivative gain.
g: Gravity vector.
Ftip: External forces applied at the end effector.
Returns:
NDArray: Control signal (CPU-based NumPy array).
"""
# pd_control and feedforward_control now handle conversion internally
pd_signal = self.pd_control(
desired_position,
desired_velocity,
current_position,
current_velocity,
Kp,
Kd,
)
ff_signal = self.feedforward_control(
desired_position, desired_velocity, desired_acceleration, g, Ftip
)
control_signal = pd_signal + ff_signal
return control_signal
[docs]
@staticmethod
def enforce_limits(
thetalist: Union[NDArray[np.float64], List[float]],
dthetalist: Union[NDArray[np.float64], List[float]],
tau: Union[NDArray[np.float64], List[float]],
joint_limits: Union[NDArray[np.float64], List[Tuple[float, float]]],
torque_limits: Union[NDArray[np.float64], List[Tuple[float, float]]],
) -> Tuple[NDArray[np.float64], NDArray[np.float64], NDArray[np.float64]]:
"""
Enforce joint and torque limits.
Uses CPU-based computation to avoid GPU-CPU transfer overhead.
Parameters:
thetalist: Joint angles.
dthetalist: Joint velocities.
tau: Torques.
joint_limits: Joint angle limits.
torque_limits: Torque limits.
Returns:
tuple: Clipped joint angles, velocities, and torques (CPU-based NumPy arrays).
"""
thetalist = _to_numpy(thetalist)
dthetalist = _to_numpy(dthetalist)
tau = _to_numpy(tau)
joint_limits = _to_numpy(joint_limits)
torque_limits = _to_numpy(torque_limits)
thetalist = np.clip(thetalist, joint_limits[:, 0], joint_limits[:, 1])
tau = np.clip(tau, torque_limits[:, 0], torque_limits[:, 1])
return thetalist, dthetalist, tau
[docs]
def plot_steady_state_response(
self,
time: Union[NDArray[np.float64], List[float]],
response: Union[NDArray[np.float64], List[float]],
set_point: float,
title: str = "Steady State Response",
) -> None:
"""
Plot the steady-state response of the controller.
Parameters:
time (np.ndarray): Array of time steps.
response (np.ndarray): Array of response values.
set_point (float): Desired set point value.
title (str, optional): Title of the plot.
Returns:
None
"""
time = _to_numpy(time)
response = _to_numpy(response)
plt.figure(figsize=(10, 5))
plt.plot(time, response, label="Response")
plt.axhline(y=set_point, color="r", linestyle="--", label="Set Point")
# Calculate key metrics
rise_time = self.calculate_rise_time(time, response, set_point)
percent_overshoot = self.calculate_percent_overshoot(response, set_point)
settling_time = self.calculate_settling_time(time, response, set_point)
steady_state_error = self.calculate_steady_state_error(response, set_point)
# Annotate metrics on the plot
plt.axvline(
x=rise_time, color="g", linestyle="--", label=f"Rise Time: {rise_time:.2f}s"
)
plt.axhline(
y=set_point * (1 + percent_overshoot / 100),
color="b",
linestyle="--",
label=f"Overshoot: {percent_overshoot:.2f}%",
)
plt.axvline(
x=settling_time,
color="m",
linestyle="--",
label=f"Settling Time: {settling_time:.2f}s",
)
plt.axhline(
y=set_point + steady_state_error,
color="c",
linestyle="--",
label=f"Steady State Error: {steady_state_error:.2f}",
)
plt.xlabel("Time (s)")
plt.ylabel("Response")
plt.title(title)
plt.legend()
plt.grid(True)
plt.show()
[docs]
def calculate_rise_time(
self,
time: Union[NDArray[np.float64], List[float]],
response: Union[NDArray[np.float64], List[float]],
set_point: float,
) -> float:
"""
Calculate the rise time.
Parameters:
time (np.ndarray): Array of time steps.
response (np.ndarray): Array of response values.
set_point (float): Desired set point value.
Returns:
float: Rise time.
"""
time = _to_numpy(time)
response = _to_numpy(response)
rise_start = 0.1 * set_point
rise_end = 0.9 * set_point
start_idx = np.where(response >= rise_start)[0]
end_idx = np.where(response >= rise_end)[0]
if len(start_idx) == 0 or len(end_idx) == 0:
return float("inf")
return time[end_idx[0]] - time[start_idx[0]]
[docs]
def calculate_percent_overshoot(
self, response: Union[NDArray[np.float64], List[float]], set_point: float
) -> float:
"""
Calculate the percent overshoot.
Parameters:
response (np.ndarray): Array of response values.
set_point (float): Desired set point value.
Returns:
float: Percent overshoot.
"""
response = _to_numpy(response)
if set_point == 0:
return 0.0
max_response = np.max(response)
return ((max_response - set_point) / set_point) * 100
[docs]
def calculate_settling_time(
self,
time: Union[NDArray[np.float64], List[float]],
response: Union[NDArray[np.float64], List[float]],
set_point: float,
tolerance: float = 0.02,
) -> float:
"""
Calculate the settling time.
Returns the first time at which the response enters the tolerance band
and never leaves it again. Returns ``float('inf')`` when the response
never enters the band, or enters but does not remain settled through
the end of the recorded data.
Parameters:
time (np.ndarray): Array of time steps.
response (np.ndarray): Array of response values.
set_point (float): Desired set point value.
tolerance (float): Fractional tolerance band (default 0.02 = 2 %).
Returns:
float: Settling time, or inf if the response never settles.
"""
time = _to_numpy(time)
response = _to_numpy(response)
settling_threshold = abs(set_point) * tolerance
in_band = np.abs(response - set_point) <= settling_threshold
if not np.any(in_band):
return float("inf")
n = len(in_band)
last_excursion = -1
for i in range(n - 1, -1, -1):
if not in_band[i]:
last_excursion = i
break
if last_excursion == n - 1:
return float("inf") # never settles
first_settled_idx = last_excursion + 1
return float(time[first_settled_idx])
[docs]
def calculate_steady_state_error(
self, response: Union[NDArray[np.float64], List[float]], set_point: float
) -> float:
"""
Calculate the steady-state error.
Parameters:
response (np.ndarray): Array of response values.
set_point (float): Desired set point value.
Returns:
float: Steady-state error.
"""
response = _to_numpy(response)
steady_state_error = response[-1] - set_point
return steady_state_error
[docs]
def joint_space_control(
self,
desired_joint_angles: Union[NDArray[np.float64], List[float]],
current_joint_angles: Union[NDArray[np.float64], List[float]],
current_joint_velocities: Union[NDArray[np.float64], List[float]],
Kp: Union[NDArray[np.float64], List[float]],
Kd: Union[NDArray[np.float64], List[float]],
) -> NDArray[np.float64]:
"""
Joint Space Control.
Uses CPU-based computation to avoid GPU-CPU transfer overhead.
Parameters:
desired_joint_angles: Desired joint angles.
current_joint_angles: Current joint angles.
current_joint_velocities: Current joint velocities.
Kp: Proportional gain.
Kd: Derivative gain.
Returns:
NDArray: Control torque (CPU-based NumPy array).
"""
desired_joint_angles = _to_numpy(desired_joint_angles)
current_joint_angles = _to_numpy(current_joint_angles)
current_joint_velocities = _to_numpy(current_joint_velocities)
Kp = _to_numpy(Kp)
Kd = _to_numpy(Kd)
e = desired_joint_angles - current_joint_angles
edot = 0 - current_joint_velocities
tau = Kp * e + Kd * edot
return tau
[docs]
def cartesian_space_control(
self,
desired_position: Union[NDArray[np.float64], List[float]],
current_joint_angles: Union[NDArray[np.float64], List[float]],
current_joint_velocities: Union[NDArray[np.float64], List[float]],
Kp: Union[NDArray[np.float64], List[float]],
Kd: Union[NDArray[np.float64], List[float]],
) -> NDArray[np.float64]:
"""
Cartesian Space Control.
Uses CPU-based computation to avoid GPU-CPU transfer overhead.
Parameters:
desired_position: Desired end-effector position.
current_joint_angles: Current joint angles.
current_joint_velocities: Current joint velocities.
Kp: Proportional gain.
Kd: Derivative gain.
Returns:
NDArray: Control torque (CPU-based NumPy array).
"""
desired_position = _to_numpy(desired_position)
current_joint_angles = _to_numpy(current_joint_angles)
current_joint_velocities = _to_numpy(current_joint_velocities)
Kp = _to_numpy(Kp)
Kd = _to_numpy(Kd)
current_position = self.dynamics.forward_kinematics(current_joint_angles)[:3, 3]
e = desired_position - current_position
# Position-only control: use linear (3xN) part of Jacobian
J_v = self.dynamics.jacobian(current_joint_angles)[:3, :]
cartesian_velocity = J_v @ current_joint_velocities
Kp_term = Kp @ e if np.ndim(Kp) == 2 else Kp * e
Kd_term = (
Kd @ cartesian_velocity if np.ndim(Kd) == 2 else Kd * cartesian_velocity
)
tau = J_v.T @ (Kp_term - Kd_term)
return tau
# ------------------------------------------------------------------------
[docs]
def ziegler_nichols_tuning(
self,
Ku: Union[float, NDArray[np.float64], List[float]],
Tu: Union[float, NDArray[np.float64], List[float]],
kind: str = "PID",
) -> Tuple[
Union[float, NDArray[np.float64]],
Union[float, NDArray[np.float64]],
Union[float, NDArray[np.float64]],
]:
"""
Compute Ziegler-Nichols controller gains.
Args:
Ku: Ultimate gain as a scalar or vector.
Tu: Ultimate period as a scalar or vector.
kind: Controller type: ``"P"``, ``"PI"``, or ``"PID"``.
Returns:
Tuple of ``(Kp, Ki, Kd)`` gains.
"""
Ku = _to_numpy(Ku).astype(float)
kind = kind.upper()
if kind == "P":
Kp, Ki, Kd = 0.50 * Ku, 0.0 * Ku, 0.0 * Ku
else:
Tu = _to_numpy(Tu).astype(float)
if not np.all(np.isfinite(Tu)) or np.any(Tu <= 0):
raise ValueError(
f"Tu (ultimate period) must be positive and finite, got Tu={Tu!r}. "
"Tu == 0 typically indicates find_ultimate_gain_and_period found no "
"sustained oscillation; check your gain sweep."
)
if kind == "PI":
Kp, Ki, Kd = 0.45 * Ku, 1.2 * Ku / Tu, 0.0 * Ku
elif kind == "PID":
Kp = 0.60 * Ku
Ki = 2.0 * Kp / Tu
Kd = 0.125 * Kp * Tu
else:
raise ValueError("kind must be 'P', 'PI' or 'PID'")
# Return scalars as plain floats so assertEqual passes exactly
if Ku.size == 1:
return float(Kp), float(Ki), float(Kd)
return Kp, Ki, Kd
# ------------------------------------------------------------------------
[docs]
def tune_controller(
self,
Ku: Union[float, NDArray[np.float64], List[float]],
Tu: Union[float, NDArray[np.float64], List[float]],
kind: str = "PID",
) -> Tuple[
Union[float, NDArray[np.float64]],
Union[float, NDArray[np.float64]],
Union[float, NDArray[np.float64]],
]:
"""
Convenience wrapper that logs and returns NumPy arrays (length = DOF).
"""
Kp, Ki, Kd = self.ziegler_nichols_tuning(Ku, Tu, kind)
logger.info(f"Tuned Z-N ({kind}) gains\n Kp={Kp}\n Ki={Ki}\n Kd={Kd}")
return Kp, Ki, Kd
# ------------------------------------------------------------------------
[docs]
def find_ultimate_gain_and_period(
self,
thetalist: Union[NDArray[np.float64], List[float]],
desired_joint_angles: Union[NDArray[np.float64], List[float]],
dt: float,
max_steps: int = 1000,
) -> Tuple[float, float, List[float], List[NDArray[np.float64]]]:
"""
Find the ultimate gain and period using the Ziegler–Nichols method.
Uses CPU-based computation to avoid GPU-CPU transfer overhead.
Parameters:
thetalist: Initial joint angles (shape [6]).
desired_joint_angles: Step target angles (shape [6]).
dt: Simulation time step.
max_steps: Number of integration steps to try.
Returns:
tuple:
- ultimate_gain (float)
- ultimate_period (float)
- gain_history (list of float)
- error_history (list of np.ndarray)
"""
thetalist = _to_numpy(thetalist)
desired_joint_angles = _to_numpy(desired_joint_angles)
Kp = 0.01
increase = 1.1
oscillation = False
gain_history = []
error_history = []
while not oscillation and Kp < 1000:
theta = thetalist.copy()
omega = np.zeros_like(theta)
self.eint = np.zeros_like(theta)
errors = []
for step in range(max_steps):
# pure-PD poke
tau = self.pd_control(
desired_joint_angles, np.zeros_like(theta), theta, omega, Kp, 0.0
)
# alpha = M⁻¹ (tau – C – G)
M = self.dynamics.mass_matrix(theta)
C = self.dynamics.velocity_quadratic_forces(theta, omega)
Gf = self.dynamics.gravity_forces(theta, np.array([0, 0, -9.81]))
alpha = np.linalg.solve(M, tau - C - Gf)
omega += alpha * dt
theta += omega * dt
err = np.linalg.norm(theta - desired_joint_angles)
errors.append(err)
# blow-up guard
if step > 10 and err > 1e10:
break
gain_history.append(Kp)
error_history.append(np.array(errors))
# look for the first upward slope after initial increase
if len(errors) >= 2 and errors[-2] < errors[-1] < errors[-2] * 1.2:
oscillation = True
else:
Kp *= increase
ultimate_gain = float(Kp)
ultimate_period = (max_steps * dt) / max(
1, np.count_nonzero(np.diff(np.sign(error_history[-1]))) // 2
)
return ultimate_gain, ultimate_period, gain_history, error_history