CloudPendulum — Disturbance API

When testing a controller on the CloudPendulum, you usually want to do more than just check whether it reaches a setpoint — you also want to evaluate how it behaves under disturbances. The disturbance API lets you define disturbances upfront, and the server will trigger them automatically during the experiment.

This means you don't have to physically push the pendulum or implement disturbances manually in your control loop. You describe what you want to happen and when, pass the list to start_experiment, and the hardware takes care of it.

There are two main flavours:

This tutorial covers both. Any additions, such as new disturbance types, will be documented in the API reference.

Setup

Your user token is read from ~/.env — a plain text file with just the token on one line, without quotes or whitespace. If you haven't set it up yet:

# In your terminal:
echo "your_token_here" > ~/.env

You can also pass the token directly as a string in your code, but please make sure not to commit that to public repositories.

In [ ]:
import numpy as np
import time
import matplotlib.pyplot as plt
import sys
import os

from cloudpendulumclient.client import Client
import cloudpendulumclient.disturbance as Disturbances
In [ ]:
with open(os.path.expanduser("~/.env"), 'r') as f:
    user_token = f.read().strip()

client = Client()

info = client.get_user_info(user_token)
print(f"Attempts left:    {info.number_of_attempts}")
print(f"Max gym length:   {info.max_gym_length}")
print(f"Gym bucket size:  {info.gym_bucket_size} / {info.gym_max_bucket_size}")

Named (Predefined) Disturbances

The simplest option is to pass a string to start_experiment. Named disturbance sets are presets used to run against a standard benchmark or reproduce someone else's results. Available presets are listed in the API documentation.

In [ ]:
# Example — not meant to be run standalone, just shows the API
# session_token, url = client.start_experiment(
#     user_token, "SimplePendulum", experiment_time, preparation_time,
#     initial_position=[np.pi],
#     disturbances="default_simple_pendulum"
# )

Custom Disturbances

If the presets don't cover what you need, you can build your own list of disturbance objects. There are three types available:

You can create a list of multiple disturbance objects and pass it to the disturbances= keyword in start_experiment.

Important: while a position disturbance is active, get_position() and get_velocity() return zero. It's a good idea to detect this case (e.g. if pos == 0 and vel == 0) and hold the last known state rather than feeding zeros into your control law.

TimedTorqueDisturbance

Applies a predefined torque between start_time and start_time + duration. You need to specify one torque value per actuator. This is the least disruptive type — roughly equivalent to a brief push of the pendulum.

In [ ]:
torque_kick = Disturbances.TimedTorqueDisturbance(
    start_time=3.0,    # seconds after experiment start
    duration=0.075,    # seconds
    torque=[0.1]        # Nm, one value per actuator
)

# Multiple torque disturbances are fine
torque_kicks = [
    Disturbances.TimedTorqueDisturbance(start_time=3.0,  duration=0.075, torque=[0.1]),
    Disturbances.TimedTorqueDisturbance(start_time=10.0, duration=0.075, torque=[-0.1]),
]

TimedPositionDisturbance

At trigger_time seconds after the experiment starts, the hardware drives the pendulum to disturbance_position. This is a hard position command and your controller signal is overridden while the pendulum is moved. If you leave disturbance_position=None, the server picks a random position within the safety limits.

In [ ]:
# Jump to a specific position at t=5s
pos_jump = Disturbances.TimedPositionDisturbance(
    trigger_time=5.0,
    disturbance_position=[0.5 * np.pi]
)

# Or let the server randomize the target
pos_jump_random = Disturbances.TimedPositionDisturbance(
    trigger_time=5.0
    # disturbance_position=None is the default
)

# Multiple timed position disturbances are fine
timed_pos_dist = [
    Disturbances.TimedPositionDisturbance(trigger_time=5.0,  disturbance_position=[0.5 * np.pi]),
    Disturbances.TimedPositionDisturbance(trigger_time=12.0, disturbance_position=[-0.5 * np.pi]),
]

StateDependentPositionDisturbance

This is usually the most useful type for controller evaluation. Instead of triggering at a fixed time, it waits until the pendulum has been close to goal_position for wait_at_target_seconds — i.e. it waits until your controller has actually done its job — and then kicks the system.

"Close to" is defined by target_pos_threshold (position error in rad) and target_vel_threshold (velocity in rad/s). The pendulum needs to be within both bounds continuously for the full wait duration. goal_position defaults to initial_position from start_experiment if set to None.

In [ ]:
state_dep_dist = Disturbances.StateDependentPositionDisturbance(
    wait_at_target_seconds=2.0,          # wait this long at the target before kicking
    goal_position=[np.pi],               # the setpoint your controller is trying to reach
    disturbance_position=[0.5 * np.pi],  # where to kick it to (None = random)
    target_pos_threshold=0.1,            # rad — how close counts as "at target"
    target_vel_threshold=1.0,            # rad/s
)

Constraints and Practical Notes

Full Example: Simple Pendulum with State-Dependent Disturbance

The example below runs a simple pendulum with an energy-shaping + LQR controller:

A StateDependentPositionDisturbance is configured to trigger once the pendulum has been balanced at the top for 1 second, kicking it to 0.5π rad. We then observe whether the controller recovers.

In [ ]:
from energy_shaping import EnergyShapingAndLQRController
from najafi import najafi_based_sampling
from pendulum import PendulumPlant


def wait_for_control_loop_end(time_to_pass):
    """Sleep until the next control loop tick."""
    if time_to_pass <= 0.0:
        return
    start = time.time()
    time.sleep(time_to_pass * 0.7)
    while time.time() - start < time_to_pass:
        pass
In [ ]:
# Experiment parameters
experiment_time  = 20
preparation_time = 4
dt               = 0.005
n                = int(experiment_time / dt)

# Pendulum model parameters (used for controller design only)
mass         = 0.06                      # kg
length       = 0.1                       # m
damping      = 0.0004                    # kg m/s
gravity      = 9.81                      # m/s²
inertia      = mass * length * length    # kg m²
torque_limit = 0.03                      # Nm

pendulum = PendulumPlant(
    mass=mass, length=length, damping=damping,
    gravity=gravity, inertia=inertia, torque_limit=torque_limit
)

# Energy-shaping + LQR controller
Q = np.diag([0.001, 0.0001])
R = np.array([[1]])
K = 0.1
controller = EnergyShapingAndLQRController(
    mass=mass, length=length, damping=damping, gravity=gravity,
    torque_limit=torque_limit, K=K, Q=Q, R=R
)

# Compute region of attraction via Najafi sampling
rho, M, _ = najafi_based_sampling(pendulum, controller.lqr_controller)
controller.set_RoA(M, rho)
In [ ]:
# Define disturbances
dist = [
    Disturbances.StateDependentPositionDisturbance(
        wait_at_target_seconds=1.0,
        goal_position=[np.pi],
        disturbance_position=[0.5 * np.pi],
        target_pos_threshold=0.1,
        target_vel_threshold=5.0,
    )
]

x0 = [0]  # start hanging

meas_time_vec = np.zeros(n)
meas_x        = np.zeros((n, 2))
meas_u        = np.zeros((n, 1))
des_u         = np.zeros((n, 1))

i = 0
In [ ]:
session_token, url = client.start_experiment(
    user_token, "SimplePendulum", experiment_time, preparation_time,
    record=False,
    initial_position=x0,
    disturbances=dist
)
print(client.get_actuator_info(session_token))

# Torque control mode (Kp=Kd=0 disables the onboard impedance controller)
client.set_impedance_controller_params([0.0], [0.0], session_token)

t_start = time.time()

while i < n:
    meas_time_vec[i] = time.time() - t_start

    pos = client.get_position(session_token)
    vel = client.get_velocity(session_token)
    meas_x[i] = [pos, vel]
    meas_u[i] = client.get_torque(session_token)

    # During a position disturbance, readings are zero — skip the control update
    if pos == 0 and vel == 0 and i > 0:
        des_u[i] = des_u[i - 1]
    else:
        des_u[i] = controller.get_control_output(meas_x[i])

    client.set_torque(des_u[i], session_token)

    i += 1
    wait_for_control_loop_end(t_start + i * dt - time.time())

# Retrieve a log of disturbance events from the server
logs = client.get_logs(session_token)
if logs:
    print(logs)

try:
    client.stop_experiment(session_token)
except:
    pass
In [ ]:
fig, axes = plt.subplots(1, 3, figsize=(14, 4))

axes[0].plot(meas_time_vec, meas_x[:, 0])
axes[0].set_title("Position q (rad)")
axes[0].set_xlabel("time (s)")
axes[0].axhline(np.pi, color='k', linestyle='--', alpha=0.4, label='target')
axes[0].legend()
axes[0].grid()

axes[1].plot(meas_time_vec, meas_x[:, 1])
axes[1].set_title("Velocity qd (rad/s)")
axes[1].set_xlabel("time (s)")
axes[1].grid()

axes[2].plot(meas_time_vec, des_u[:, 0], label='desired')
axes[2].plot(meas_time_vec, meas_u[:, 0], label='measured', alpha=0.7)
axes[2].set_title("Torque (Nm)")
axes[2].set_xlabel("time (s)")
axes[2].legend()
axes[2].grid()

plt.tight_layout()
plt.show()

In the position plot you should see the pendulum swing up and stabilize at π, then drop to 0.5π when the disturbance fires, and (hopefully) swing back up to recover. The torque plot shows the gap during the disturbance window where readings go to zero.