reflex.ActionStream
Long-lived streaming session: send observations, receive action chunks.
ActionStream is the low-level streaming primitive. Use it when you want to
own the control loop yourself and just need an open channel to inference.
For a higher-level decorator-based controller, see
@reflex.connect. For single requests, see
reflex.infer_actions.
Lifecycle
import reflex
with reflex.ActionStream(
prompt="pick up the cup",
model="pi0.5",
) as stream:
for step in range(50):
seq = stream.send_observation(
state=[0.1, 0.2, 0.3, 0.4, 0.5, 0.6],
images={"wrist": jpeg_bytes},
)
action = stream.recv_action()
apply_to_hardware(action["actions"])The context manager opens the stream on enter and closes it on exit. You can
also call .open() and .close() directly.
Constructor
reflex.ActionStream(
*,
prompt: str,
url: str | None = None,
api_key: str | None = None,
model: str | None = None,
lora: str | None = None,
robot: str | None = None,
action_adapter: str | None = None,
cameras: list[str] | None = None,
hz: float | None = None,
chunk_size: int | None = None,
max_gpu_seconds: float | None = None,
session_id: str = "",
timeout: float = 30.0,
connect_retry_seconds: float | None = None,
)| Parameter | Description |
|---|---|
prompt | Required. Task instruction sent to the model. |
url | WebSocket URL override. |
api_key | API key override (see Authentication). |
model | Model identifier, e.g. "pi0.5". |
lora | LoRA adapter identifier, e.g. "my-adapter@v1". |
robot | Robot identifier hint. |
action_adapter | Requested action schema. |
cameras | Camera names the server should expect (["wrist", "overhead"]). |
hz | Control-loop frequency metadata, in Hz. |
chunk_size | Actions per chunk. |
max_gpu_seconds | Per-inference GPU time cap. |
session_id | Caller-supplied session label. |
timeout | Socket timeout for connect and recv, in seconds. |
connect_retry_seconds | How long to retry transient connect errors. Defaults to 90s (env-overridable). |
Methods
| Method | What it does |
|---|---|
open() -> ActionStream | Open the socket, perform the handshake, wait for session.ready. |
close() -> None | Close the socket cleanly. |
send_observation(state, images=None, prompt=None, seq=None, request_id=None, capture_time_ns=None, max_gpu_seconds=None) -> int | Send an observation; return its sequence number. |
send_observation_frame(frame: dict) -> int | Send a pre-built observation dict. |
recv_action() -> dict | Block until the next action chunk arrives. |
send_raw(frame: dict) -> None | Send a raw protocol frame. |
receive() -> dict | Receive the next raw protocol frame. |
Properties
| Property | Description |
|---|---|
ready | The session.ready payload returned by the server. |
session_id | Server-assigned session ID (may differ from the one you passed). |
open_timing | Dict with tcp_ms, tls_ms, websocket_upgrade_ms, connect_ms. |
Observation shape
send_observation() accepts the common fields directly. For full control,
build a dict and pass it to send_observation_frame(). See
Observation and action schema for the
exact field names.
Errors
| Exception | When it fires |
|---|---|
WebSocketHandshakeError | The server returned an HTTP error during the upgrade. Transient codes (403, 404, 408, 425, 429, 5xx) are retried until connect_retry_seconds elapses. |
TimeoutError | A recv_action() or receive() did not arrive within timeout. |
RuntimeError | Unexpected close, malformed frame, or error frame from the server. |
One-shot: infer_actions
If you only need a single observation→action round-trip, skip the session and call:
result = reflex.infer_actions(
observation={
"state": [0.1, 0.2, 0.3],
"prompt": "move left",
"model": "pi0.5",
}
)
print(result["actions"])infer_actions accepts the same url, api_key, and timeout kwargs as
ActionStream.