CloudPendulum — Gym Interface

The CloudPendulum gym interface lets you reserve and control multiple hardware cells in parallel under a single session. This is particularly useful for:

The connection can happen through two different mechanisms:

  1. The gym interface discussed here. This gives you access to all low-level controls that CloudPendulum exposes and is suited for general multi-platform usage.
  2. A class mimicking a Gymnasium environment suitable for Reinforcement Learning tasks specifically. For the latter, please refer to this Jupyter Notebook as an introduction and these files to run the Gymnasium environment.

In this tutorial we identify a simple pendulum model from free-fall data. We reserve four cells at once, start each from a different initial angle, let them swing freely, and fit the equation of motion to the collected trajectories. The source files for this tutorial can be found here.

Setup

In [ ]:
import numpy as np
import time
import matplotlib.pyplot as plt
import os
from typing import List, Tuple

from cloudpendulumclient.client import Client
from cloudpendulumclient.data import CellType

Your user token should be in ~/.env (one line, no quotes):

# In your terminal:
echo "your_token_here" > ~/.env

You can also pass the token directly as a string in your code, but please make sure not to commit it to a public repository.

Important: the gym interface requires a token with gym permissions. Standard experiment tokens do not grant access to start_gym(). You can check whether you have a gym token at cloudpendulum.m2.chalmers.se:1443/hub/hardware-status and request a gym-enabled token at cloudpendulum.m2.chalmers.se:1443/hub/token-request.
In [ ]:
with open(os.path.expanduser("~/.env"), 'r') as f:
    user_token = f.read().strip()

client = Client()

info = client.get_user_info(user_token)
print(f"Attempts left:    {info.number_of_attempts}")
print(f"Max gym length:   {info.max_gym_length} s")
print(f"Gym bucket size:  {info.gym_bucket_size} / {info.gym_max_bucket_size}")
In [ ]:
# Gym & episode settings
N_CELLS          = 4    # number of cells to reserve simultaneously
GYM_TIME         = 30   # total gym reservation [s]
EPISODE_TIME     = 5    # duration of each episode [s]
PREPARATION_TIME = 4    # countdown before each episode starts [s]

# Control loop
DT = 0.02   # control period [s]  ->  50 Hz

# Physical constants
GRAVITY = 9.81   # m/s²

n_steps = int(EPISODE_TIME / DT)

Gym Session and Episodes

start_gym() reserves a block of hardware cells under a single lease. All subsequent start_episode() calls draw from that pool and work exactly like start_experiment() — you can set impedance parameters, read position and velocity, send torques, configure safety limits, etc. The token hierarchy is:

  1. user_token — comes from ~/.env and is used for start_gym()
  2. gym_token — comes from the start_gym() return value and is used for start_episode() and stop_gym()
  3. episode_token — comes from the first return value of start_episode() and is used for all per-cell calls: get_position(), set_torque(), stop_episode(), etc.

This means that for a gym with four episodes running simultaneously, you have one gym token and four episode tokens to keep track of.

Free-fall Data Collection and Parameter Estimation

Setting Kp = Kd = 0 puts the motor in torque mode with zero torque — the pendulum swings freely under gravity and friction. We send no torque commands; the get_position() and get_velocity() calls inside the loop serve as heartbeat requests that keep the session alive.

The equation of motion of a simple pendulum is:

I · q̈ + b · q̇ + m·g·l · sin(q) = 0

Rearranging with p1 = b/I and p2 = m·g·l/I = g/l (using I = m·l² for a point-mass pendulum):

q̈ = −p1 · q̇ − p2 · sin(q)

This is linear in [p1, p2], so both parameters can be fitted simultaneously via least squares given measured positions, velocities, and numerically differentiated accelerations. The pendulum length then follows directly: l = g / p2.

In [ ]:
def fit_parameters(pos, vel, dt):
    """Fit p1 = b/I and p2 = g/l via least squares. Returns (p1, p2, l_est)."""
    # Central-difference acceleration, skipping the first and last sample
    acc = (vel[:, 2:] - vel[:, :-2]) / (2 * dt)
    q   = pos[:, 1:-1].ravel()
    qd  = vel[:, 1:-1].ravel()
    qdd = acc.ravel()

    # Regressor:  q̈ = −p1·q̇ − p2·sin(q)
    Phi = np.column_stack([-qd, -np.sin(q)])
    params, _, _, _ = np.linalg.lstsq(Phi, qdd, rcond=None)
    p1, p2 = params

    return float(p1), float(p2), float(GRAVITY / p2)

Running Gym Experiments

We start by reserving cells using start_gym(), which returns the gym token and prints a livestream URL showing all reserved cells side by side. While the gym session is running, we can start multiple episodes — each in a separate thread so they are fully independent.

Inside each thread, starting an episode works exactly like start_experiment(), except you use the gym token. Every episode gets a unique episode token used for all subsequent interactions with that cell: get_position(), get_velocity(), stop_episode(), etc. See the documentation for a full overview.

As long as the gym session is active, you can start more episodes immediately — the hardware stays reserved between episodes, so a new run begins as soon as the previous one ends. At the end, stop_gym() releases all reserved cells and closes the session.

We first define the callback that runs a single episode:

In [ ]:
import threading

def run_episode(
    gym_token: str,
    angle: float,
    results: List[Tuple[np.ndarray, np.ndarray]],
    lock: threading.Lock,
) -> None:
    """
    Runs one complete episode:
      1. start episode
      2. prepare (set controller params)
      3. collect pos/vel at fixed rate
      4. stop episode
      5. append (pos, vel) to shared results list under the lock
    """
    thread_client = Client()
    pos = np.zeros(n_steps)
    vel = np.zeros(n_steps)

    # 1. Start episode
    token, url = thread_client.start_episode(
        gym_token, "SimplePendulum", EPISODE_TIME, 0.0,
        initial_position=[angle],
    )
    print(f"[{threading.current_thread().name}]  q0={np.degrees(angle):.0f}°  token={token}")

    try:
        # 2. Prepare
        thread_client.set_impedance_controller_params([0.0], [0.0], token)

        # 3. Collect at fixed rate
        t_start = time.time()
        for i in range(n_steps):
            pos[i] = thread_client.get_position(token)
            vel[i] = thread_client.get_velocity(token)
            sleep_time = t_start + (i + 1) * DT - time.time()
            if sleep_time > 0:
                time.sleep(sleep_time)

        # 4. Stop
        thread_client.stop_episode(token)

    except Exception as exc:
        print(f"[{threading.current_thread().name}]  error: {exc}")
        try:
            thread_client.stop_episode(token)
        except Exception:
            pass

    # 5. Append result (thread-safe)
    with lock:
        results.append((pos.copy(), vel.copy()))

Then run the gym, calling run_episode in parallel threads:

In [ ]:
gym_token = client.start_gym(user_token, GYM_TIME, [CellType.SINGLE] * N_CELLS)

results: List[Tuple[np.ndarray, np.ndarray]] = []
lock      = threading.Lock()
semaphore = threading.Semaphore(N_CELLS)   # cap live threads
threads   = []

number_of_batches     = int(np.floor(GYM_TIME / (EPISODE_TIME + PREPARATION_TIME)))
number_of_experiments = number_of_batches * N_CELLS
angles = np.random.uniform(low=-np.pi, high=np.pi, size=(number_of_experiments,))

experiments_started = 0

while experiments_started < len(angles):
    angle = angles[experiments_started]
    semaphore.acquire()   # blocks if N_CELLS threads are already live

    def _target(a=angle):
        try:
            run_episode(gym_token, a, results, lock)
        finally:
            semaphore.release()   # free the slot when the thread finishes

    t = threading.Thread(target=_target, name=f"exp-{experiments_started:03d}")
    t.start()
    threads.append(t)
    experiments_started += 1

# Wait for all remaining threads to finish
for t in threads:
    t.join()

After collecting all data, run the parameter identification:

In [ ]:
pos_all = np.array([r[0] for r in results])
vel_all = np.array([r[1] for r in results])

p1, p2, l = fit_parameters(pos_all, vel_all, DT)

print(f"p1: {p1}")
print(f"p2: {p2}")
print(f"Pendulum length: {l}")

The actual length of the pendulum is 10 cm, so the results are quite accurate. While it would likely also have worked with less data, the gym environment allowed us to collect four times the data in the same amount of time compared to the start_experiment approach.

Episodes Don't Have to Be Synchronised

The experiments above started all episodes at the same time. But start_episode() can be called at any time while the gym session is active — each episode is fully independent of any others. This makes it straightforward to drive one episode at a time, for example in a reinforcement learning training loop:

In [ ]:
for episode in range(N_EPISODES):
    token, url = client.start_episode(gym_token, "SimplePendulum", EPISODE_TIME)

    reward = run_policy(client, token)   # run your policy, collect reward

    client.stop_episode(token)
    update_policy(reward)                # process before starting the next episode
Reinforcement Learning: For RL specifically, CloudPendulum also provides a Gymnasium wrapper that exposes the hardware as a standard Gymnasium class. See this notebook for an introduction and these files to run it.

What Next?

Feel free to design your own experiments. If you are interested in the Gymnasium wrapper class, you can take a look here. The full notebook for this tutorial is also available on GitHub.