When testing a controller on the CloudPendulum, you usually want to do more than just check whether it reaches a setpoint — you also want to evaluate how it behaves under disturbances. The disturbance API lets you define disturbances upfront, and the server will trigger them automatically during the experiment.
This means you don't have to physically push the pendulum or implement disturbances manually in your control loop. You describe what you want to happen and when, pass the list to start_experiment, and the hardware takes care of it.
There are two main flavours:
- Named disturbances: a predefined set identified by a string, useful for reproducible benchmarks
- Custom disturbances: a list of disturbance objects you build yourself
This tutorial covers both. Any additions, such as new disturbance types, will be documented in the API reference.
Setup
Your user token is read from ~/.env — a plain text file with just the token on one line, without quotes or whitespace. If you haven't set it up yet:
# In your terminal: echo "your_token_here" > ~/.env
You can also pass the token directly as a string in your code, but please make sure not to commit that to public repositories.
import numpy as np import time import matplotlib.pyplot as plt import sys import os from cloudpendulumclient.client import Client import cloudpendulumclient.disturbance as Disturbances
with open(os.path.expanduser("~/.env"), 'r') as f: user_token = f.read().strip() client = Client() info = client.get_user_info(user_token) print(f"Attempts left: {info.number_of_attempts}") print(f"Max gym length: {info.max_gym_length}") print(f"Gym bucket size: {info.gym_bucket_size} / {info.gym_max_bucket_size}")
Named (Predefined) Disturbances
The simplest option is to pass a string to start_experiment. Named disturbance sets are presets used to run against a standard benchmark or reproduce someone else's results. Available presets are listed in the API documentation.
# Example — not meant to be run standalone, just shows the API # session_token, url = client.start_experiment( # user_token, "SimplePendulum", experiment_time, preparation_time, # initial_position=[np.pi], # disturbances="default_simple_pendulum" # )
Custom Disturbances
If the presets don't cover what you need, you can build your own list of disturbance objects. There are three types available:
TimedTorqueDisturbance: adds a constant torque for a specified time windowTimedPositionDisturbance: jumps the pendulum to a position at a fixed timeStateDependentPositionDisturbance: triggers a position jump after the pendulum has settled at the setpoint
You can create a list of multiple disturbance objects and pass it to the disturbances= keyword in start_experiment.
get_position() and get_velocity() return zero. It's a good idea to detect this case (e.g. if pos == 0 and vel == 0) and hold the last known state rather than feeding zeros into your control law.TimedTorqueDisturbance
Applies a predefined torque between start_time and start_time + duration. You need to specify one torque value per actuator. This is the least disruptive type — roughly equivalent to a brief push of the pendulum.
torque_kick = Disturbances.TimedTorqueDisturbance( start_time=3.0, # seconds after experiment start duration=0.075, # seconds torque=[0.1] # Nm, one value per actuator ) # Multiple torque disturbances are fine torque_kicks = [ Disturbances.TimedTorqueDisturbance(start_time=3.0, duration=0.075, torque=[0.1]), Disturbances.TimedTorqueDisturbance(start_time=10.0, duration=0.075, torque=[-0.1]), ]
TimedPositionDisturbance
At trigger_time seconds after the experiment starts, the hardware drives the pendulum to disturbance_position. This is a hard position command and your controller signal is overridden while the pendulum is moved. If you leave disturbance_position=None, the server picks a random position within the safety limits.
# Jump to a specific position at t=5s pos_jump = Disturbances.TimedPositionDisturbance( trigger_time=5.0, disturbance_position=[0.5 * np.pi] ) # Or let the server randomize the target pos_jump_random = Disturbances.TimedPositionDisturbance( trigger_time=5.0 # disturbance_position=None is the default ) # Multiple timed position disturbances are fine timed_pos_dist = [ Disturbances.TimedPositionDisturbance(trigger_time=5.0, disturbance_position=[0.5 * np.pi]), Disturbances.TimedPositionDisturbance(trigger_time=12.0, disturbance_position=[-0.5 * np.pi]), ]
StateDependentPositionDisturbance
This is usually the most useful type for controller evaluation. Instead of triggering at a fixed time, it waits until the pendulum has been close to goal_position for wait_at_target_seconds — i.e. it waits until your controller has actually done its job — and then kicks the system.
"Close to" is defined by target_pos_threshold (position error in rad) and target_vel_threshold (velocity in rad/s). The pendulum needs to be within both bounds continuously for the full wait duration. goal_position defaults to initial_position from start_experiment if set to None.
state_dep_dist = Disturbances.StateDependentPositionDisturbance( wait_at_target_seconds=2.0, # wait this long at the target before kicking goal_position=[np.pi], # the setpoint your controller is trying to reach disturbance_position=[0.5 * np.pi], # where to kick it to (None = random) target_pos_threshold=0.1, # rad — how close counts as "at target" target_vel_threshold=1.0, # rad/s )
Constraints and Practical Notes
- Mixing position disturbance types: state-dependent and time-dependent disturbances cannot currently be used at the same time, to avoid timing conflicts.
- Zero readings during position disturbances: when the server is driving the pendulum to the disturbance position,
get_position()andget_velocity()return zero. Detect this (e.g.if pos == 0 and vel == 0) and skip or freeze the control update during that window. - DIY disturbances: all API disturbances run server-side. If you need something the API doesn't support (e.g. additive sensor noise, model uncertainty) you can implement it client-side by modifying measured values or sent commands directly in your control loop. Server-side and client-side disturbances don't interfere with each other.
Full Example: Simple Pendulum with State-Dependent Disturbance
The example below runs a simple pendulum with an energy-shaping + LQR controller:
- The energy-shaping part swings the pendulum up from the hanging position
- Once it enters the region of attraction of the LQR (computed via a sampling method), the LQR takes over and stabilizes the upright position
A StateDependentPositionDisturbance is configured to trigger once the pendulum has been balanced at the top for 1 second, kicking it to 0.5π rad. We then observe whether the controller recovers.
from energy_shaping import EnergyShapingAndLQRController from najafi import najafi_based_sampling from pendulum import PendulumPlant def wait_for_control_loop_end(time_to_pass): """Sleep until the next control loop tick.""" if time_to_pass <= 0.0: return start = time.time() time.sleep(time_to_pass * 0.7) while time.time() - start < time_to_pass: pass
# Experiment parameters experiment_time = 20 preparation_time = 4 dt = 0.005 n = int(experiment_time / dt) # Pendulum model parameters (used for controller design only) mass = 0.06 # kg length = 0.1 # m damping = 0.0004 # kg m/s gravity = 9.81 # m/s² inertia = mass * length * length # kg m² torque_limit = 0.03 # Nm pendulum = PendulumPlant( mass=mass, length=length, damping=damping, gravity=gravity, inertia=inertia, torque_limit=torque_limit ) # Energy-shaping + LQR controller Q = np.diag([0.001, 0.0001]) R = np.array([[1]]) K = 0.1 controller = EnergyShapingAndLQRController( mass=mass, length=length, damping=damping, gravity=gravity, torque_limit=torque_limit, K=K, Q=Q, R=R ) # Compute region of attraction via Najafi sampling rho, M, _ = najafi_based_sampling(pendulum, controller.lqr_controller) controller.set_RoA(M, rho)
# Define disturbances dist = [ Disturbances.StateDependentPositionDisturbance( wait_at_target_seconds=1.0, goal_position=[np.pi], disturbance_position=[0.5 * np.pi], target_pos_threshold=0.1, target_vel_threshold=5.0, ) ] x0 = [0] # start hanging meas_time_vec = np.zeros(n) meas_x = np.zeros((n, 2)) meas_u = np.zeros((n, 1)) des_u = np.zeros((n, 1)) i = 0
session_token, url = client.start_experiment( user_token, "SimplePendulum", experiment_time, preparation_time, record=False, initial_position=x0, disturbances=dist ) print(client.get_actuator_info(session_token)) # Torque control mode (Kp=Kd=0 disables the onboard impedance controller) client.set_impedance_controller_params([0.0], [0.0], session_token) t_start = time.time() while i < n: meas_time_vec[i] = time.time() - t_start pos = client.get_position(session_token) vel = client.get_velocity(session_token) meas_x[i] = [pos, vel] meas_u[i] = client.get_torque(session_token) # During a position disturbance, readings are zero — skip the control update if pos == 0 and vel == 0 and i > 0: des_u[i] = des_u[i - 1] else: des_u[i] = controller.get_control_output(meas_x[i]) client.set_torque(des_u[i], session_token) i += 1 wait_for_control_loop_end(t_start + i * dt - time.time()) # Retrieve a log of disturbance events from the server logs = client.get_logs(session_token) if logs: print(logs) try: client.stop_experiment(session_token) except: pass
fig, axes = plt.subplots(1, 3, figsize=(14, 4)) axes[0].plot(meas_time_vec, meas_x[:, 0]) axes[0].set_title("Position q (rad)") axes[0].set_xlabel("time (s)") axes[0].axhline(np.pi, color='k', linestyle='--', alpha=0.4, label='target') axes[0].legend() axes[0].grid() axes[1].plot(meas_time_vec, meas_x[:, 1]) axes[1].set_title("Velocity qd (rad/s)") axes[1].set_xlabel("time (s)") axes[1].grid() axes[2].plot(meas_time_vec, des_u[:, 0], label='desired') axes[2].plot(meas_time_vec, meas_u[:, 0], label='measured', alpha=0.7) axes[2].set_title("Torque (Nm)") axes[2].set_xlabel("time (s)") axes[2].legend() axes[2].grid() plt.tight_layout() plt.show()
In the position plot you should see the pendulum swing up and stabilize at π, then drop to 0.5π when the disturbance fires, and (hopefully) swing back up to recover. The torque plot shows the gap during the disturbance window where readings go to zero.