Python SDK
@reflex.connect
Decorator-based control loop: bind observation and action callbacks to a streaming session.
@reflex.connect lets you describe a control loop as a class with two
methods. The decorator handles the streaming session, sequencing, and
shutdown.
It is built on top of reflex.ActionStream; reach for
it when you want a clean class shape rather than an explicit with-block.
Pattern
import reflex
@reflex.connect(
prompt="wipe the table",
model="pi0.5",
)
class TableWiper:
def __init__(self):
self.step = 0
@reflex.observation
def get_obs(self):
if self.step >= 50:
return None # ends the loop
return {
"state": read_joints(),
"images": {"wrist": grab_jpeg()},
}
@reflex.action
def apply(self, frame):
for target in frame["actions"]:
apply_to_hardware(target)
self.step += 1
TableWiper().run(max_steps=50)Decorators
| Decorator | Purpose |
|---|---|
@reflex.connect(...) | Class decorator that injects a .run() method on instances. |
@reflex.observation | Method that returns an observation dict (or None to stop). |
@reflex.action | Method that receives an action-chunk dict to apply. |
@reflex.connect(...) arguments
The decorator forwards every kwarg to ActionStream. See
ActionStream constructor for the full list.
The most common are prompt, model, lora, cameras, hz, and timeout.
.run() arguments
| Parameter | Default | Description |
|---|---|---|
max_steps | None | Max control steps. None (the default) runs until your observation method returns None. A positive integer caps the number of steps. |
Context-manager form
If you'd rather skip the class boilerplate, call reflex.connect(...) without
decorating anything — it returns an open ActionStream-compatible session:
with reflex.connect(prompt="move slowly", model="pi0.5") as stream:
stream.send_observation(state=read_joints())
print(stream.recv_action())