Reflex Docs
Python SDK

reflex.ActionStream

Long-lived streaming session: send observations, receive action chunks.

ActionStream is the low-level streaming primitive. Use it when you want to own the control loop yourself and just need an open channel to inference.

For a higher-level decorator-based controller, see @reflex.connect. For single requests, see reflex.infer_actions.

Lifecycle

import reflex

with reflex.ActionStream(
    prompt="pick up the cup",
    model="pi0.5",
) as stream:
    for step in range(50):
        seq = stream.send_observation(
            state=[0.1, 0.2, 0.3, 0.4, 0.5, 0.6],
            images={"wrist": jpeg_bytes},
        )
        action = stream.recv_action()
        apply_to_hardware(action["actions"])

The context manager opens the stream on enter and closes it on exit. You can also call .open() and .close() directly.

Constructor

reflex.ActionStream(
    *,
    prompt: str,
    url: str | None = None,
    api_key: str | None = None,
    model: str | None = None,
    lora: str | None = None,
    robot: str | None = None,
    action_adapter: str | None = None,
    cameras: list[str] | None = None,
    hz: float | None = None,
    chunk_size: int | None = None,
    max_gpu_seconds: float | None = None,
    session_id: str = "",
    timeout: float = 30.0,
    connect_retry_seconds: float | None = None,
)
ParameterDescription
promptRequired. Task instruction sent to the model.
urlWebSocket URL override.
api_keyAPI key override (see Authentication).
modelModel identifier, e.g. "pi0.5".
loraLoRA adapter identifier, e.g. "my-adapter@v1".
robotRobot identifier hint.
action_adapterRequested action schema.
camerasCamera names the server should expect (["wrist", "overhead"]).
hzControl-loop frequency metadata, in Hz.
chunk_sizeActions per chunk.
max_gpu_secondsPer-inference GPU time cap.
session_idCaller-supplied session label.
timeoutSocket timeout for connect and recv, in seconds.
connect_retry_secondsHow long to retry transient connect errors. Defaults to 90s (env-overridable).

Methods

MethodWhat it does
open() -> ActionStreamOpen the socket, perform the handshake, wait for session.ready.
close() -> NoneClose the socket cleanly.
send_observation(state, images=None, prompt=None, seq=None, request_id=None, capture_time_ns=None, max_gpu_seconds=None) -> intSend an observation; return its sequence number.
send_observation_frame(frame: dict) -> intSend a pre-built observation dict.
recv_action() -> dictBlock until the next action chunk arrives.
send_raw(frame: dict) -> NoneSend a raw protocol frame.
receive() -> dictReceive the next raw protocol frame.

Properties

PropertyDescription
readyThe session.ready payload returned by the server.
session_idServer-assigned session ID (may differ from the one you passed).
open_timingDict with tcp_ms, tls_ms, websocket_upgrade_ms, connect_ms.

Observation shape

send_observation() accepts the common fields directly. For full control, build a dict and pass it to send_observation_frame(). See Observation and action schema for the exact field names.

Errors

ExceptionWhen it fires
WebSocketHandshakeErrorThe server returned an HTTP error during the upgrade. Transient codes (403, 404, 408, 425, 429, 5xx) are retried until connect_retry_seconds elapses.
TimeoutErrorA recv_action() or receive() did not arrive within timeout.
RuntimeErrorUnexpected close, malformed frame, or error frame from the server.

One-shot: infer_actions

If you only need a single observation→action round-trip, skip the session and call:

result = reflex.infer_actions(
    observation={
        "state": [0.1, 0.2, 0.3],
        "prompt": "move left",
        "model": "pi0.5",
    }
)
print(result["actions"])

infer_actions accepts the same url, api_key, and timeout kwargs as ActionStream.