Reflex Docs
Python SDK

Python SDK

Embed `reflex connect` in your own scripts.

The CLI is a thin shell over the reflex.connect_runner module. When you need to drive the runner from custom Python — build a connector for a robot the SDK doesn't ship, run the loop inside a larger application, override the heartbeat path — import reflex and call the runner directly.

Run from a YAML

from reflex.connect_runner import run_from_config

result = run_from_config("robot.yaml")
print(result.steps, result.applied_steps, result.safe_stops)

RunnerResult is a dataclass with steps, applied_steps, safe_stops, errors, and last_action_chunk.

Run from objects

Skip YAML entirely — build the transport, connector, and cameras yourself:

from reflex.connect_runner import run_session
from reflex.transports import build_transport
from reflex.connectors import build_connector
from reflex.cameras import build_camera

transport = build_transport("webrtc", {
    "url": "https://your-worker.example.com",
    "connect_timeout_s": 180,
})
connector = build_connector("yam_bimanual", {
    "left":  {"channel": "can1", "gripper": "linear_4310"},
    "right": {"channel": "can0", "gripper": "linear_4310"},
    "hz": 25,
})
cameras = {
    name: build_camera("shm", {"name": name})
    for name in ("top", "left", "right")
}

result = run_session(
    connector=connector,
    transport=transport,
    cameras=cameras,
    mode="apply_actions",
    control_period_s=0.2,
    pipeline_inference=False,
)

Heartbeats

Pass on_heartbeat for periodic callbacks:

def heartbeat(stats):
    print(f"step={stats['step']} applied={stats['applied_steps']} stops={stats['safe_stops']}")

run_from_config("robot.yaml", on_heartbeat=heartbeat)

Fires every heartbeat_interval_s (default 10s).

Custom connectors

Subclass reflex.connectors.base.RobotConnector to support new robots. Minimum API:

from reflex.connectors.base import RobotConnector, Observation, ActionChunk

class MyRobot(RobotConnector):
    kind = "my_robot"

    def __init__(self, *, port: str, hz: int = 30) -> None:
        self.port = port
        self.hz = hz

    def start(self) -> None:
        # power on motors, calibrate, etc. Called AFTER the inference transport
        # connects so this never runs unless inference is reachable.
        ...

    def get_observation(self) -> Observation:
        return Observation(
            state=self._read_joint_state(),
            task=self.instruction,
            cameras={},  # populated by the runner
        )

    def apply_action(self, chunk: ActionChunk) -> None:
        for step in chunk.actions:
            self._command(step)

    def safe_stop(self, *, reason: str) -> None:
        # emergency stop: brake all motors, return to safe state.
        ...

    def stop(self) -> None:
        # graceful shutdown: home, release torque.
        ...

Register it with a dotted path in the YAML:

hardware:
  kind: my.module:MyRobot
  config:
    port: /dev/ttyUSB0
    hz: 30

Custom cameras

Same pattern with reflex.cameras.base.CameraSource:

from reflex.cameras.base import CameraSource

class MyCamera(CameraSource):
    kind = "my_camera"

    def __init__(self, *, device: str) -> None:
        self.device = device

    def start(self) -> None: ...
    def read(self) -> "np.ndarray": ...   # HWC uint8 RGB
    def stop(self) -> None: ...

YAML:

cameras:
  top: { kind: my.module:MyCamera, device: /dev/video0 }

Reading the source

The runner is ~600 LOC of focused Python: