Python SDK
Embed `reflex connect` in your own scripts.
The CLI is a thin shell over the reflex.connect_runner module. When you
need to drive the runner from custom Python — build a connector for a robot
the SDK doesn't ship, run the loop inside a larger application, override the
heartbeat path — import reflex and call the runner directly.
Run from a YAML
from reflex.connect_runner import run_from_config
result = run_from_config("robot.yaml")
print(result.steps, result.applied_steps, result.safe_stops)RunnerResult is a dataclass with steps, applied_steps, safe_stops,
errors, and last_action_chunk.
Run from objects
Skip YAML entirely — build the transport, connector, and cameras yourself:
from reflex.connect_runner import run_session
from reflex.transports import build_transport
from reflex.connectors import build_connector
from reflex.cameras import build_camera
transport = build_transport("webrtc", {
"url": "https://your-worker.example.com",
"connect_timeout_s": 180,
})
connector = build_connector("yam_bimanual", {
"left": {"channel": "can1", "gripper": "linear_4310"},
"right": {"channel": "can0", "gripper": "linear_4310"},
"hz": 25,
})
cameras = {
name: build_camera("shm", {"name": name})
for name in ("top", "left", "right")
}
result = run_session(
connector=connector,
transport=transport,
cameras=cameras,
mode="apply_actions",
control_period_s=0.2,
pipeline_inference=False,
)Heartbeats
Pass on_heartbeat for periodic callbacks:
def heartbeat(stats):
print(f"step={stats['step']} applied={stats['applied_steps']} stops={stats['safe_stops']}")
run_from_config("robot.yaml", on_heartbeat=heartbeat)Fires every heartbeat_interval_s (default 10s).
Custom connectors
Subclass reflex.connectors.base.RobotConnector to support new robots.
Minimum API:
from reflex.connectors.base import RobotConnector, Observation, ActionChunk
class MyRobot(RobotConnector):
kind = "my_robot"
def __init__(self, *, port: str, hz: int = 30) -> None:
self.port = port
self.hz = hz
def start(self) -> None:
# power on motors, calibrate, etc. Called AFTER the inference transport
# connects so this never runs unless inference is reachable.
...
def get_observation(self) -> Observation:
return Observation(
state=self._read_joint_state(),
task=self.instruction,
cameras={}, # populated by the runner
)
def apply_action(self, chunk: ActionChunk) -> None:
for step in chunk.actions:
self._command(step)
def safe_stop(self, *, reason: str) -> None:
# emergency stop: brake all motors, return to safe state.
...
def stop(self) -> None:
# graceful shutdown: home, release torque.
...Register it with a dotted path in the YAML:
hardware:
kind: my.module:MyRobot
config:
port: /dev/ttyUSB0
hz: 30Custom cameras
Same pattern with reflex.cameras.base.CameraSource:
from reflex.cameras.base import CameraSource
class MyCamera(CameraSource):
kind = "my_camera"
def __init__(self, *, device: str) -> None:
self.device = device
def start(self) -> None: ...
def read(self) -> "np.ndarray": ... # HWC uint8 RGB
def stop(self) -> None: ...YAML:
cameras:
top: { kind: my.module:MyCamera, device: /dev/video0 }Reading the source
The runner is ~600 LOC of focused Python:
reflex/connect_runner.py—run_from_config,run_session.reflex/connectors/base.py— the connector ABC.reflex/cameras/base.py— the camera ABC.reflex/transports/base.py— the transport interface behindbuild_transport.