ObservationBuffer

class netrl.ObservationBuffer(maxlen, shape, dtype)[source]

Bases: object

Time-slot based circular buffer for storing RL observations.

Each slot in the buffer represents a specific time step in the window [current_step - maxlen + 1, …, current_step]. The recv mask indicates which time steps have received their observations.

This allows delays to be visible: if delay_steps=2, the last 2 positions in recv_mask will be False (those observations are still in flight).

Parameters:
__init__(maxlen, shape, dtype)[source]
Parameters:
  • maxlen (int Maximum number of observations to retain (window size).)

  • shape (tuple Shape of a single observation (e.g. (4,)).)

  • dtype (Numpy dtype (e.g. np.float32).)

Return type:

None

add(obs, step)[source]

Add an observation that arrived at a specific time step.

Each call advances time; the buffer automatically tracks which time slots map to which buffer indices.

Parameters:
  • obs (np.ndarray or None) – If None, step has no observation (packet lost/delayed). If an ndarray, it is stored and recv[step_slot] = True.

  • step (int) – The time step this observation belongs to.

Return type:

None

get()[source]

Return observations in the current time window in chronological order.

Returns only observations that fit in the time window [current_step - maxlen + 1, …, current_step]. Early observations before buffer initialization are excluded.

Returns:

  • observations (np.ndarray, shape (num_steps, *shape))

  • recv_mask (np.ndarray, shape (num_steps,), dtype bool)

Raises:

ValueError if buffer is empty (no steps added yet).

Return type:

tuple

get_padded()[source]

Return exactly maxlen observations in time-slot order.

Returns observations for time window [current_step - maxlen + 1, …, current_step]. recv_mask[i] = True if that time slot’s observation has been received. The most recent observation is at index [-1].

Before buffer is initialized or for steps less than maxlen, earlier bounds are zero-padded with recv_mask=False.

Returns:

  • observations (np.ndarray, shape (maxlen, *shape)) – Observations ordered by time step.

  • recv_mask (np.ndarray, shape (maxlen,), dtype bool) – True if that time slot’s observation has arrived.

Return type:

tuple

clear()[source]

Reset the buffer to the empty state.

Return type:

None

property is_full: bool

Return True if the buffer has filled with maxlen observations.

Semantics

The buffer is a fixed-size circular window. After maxlen consecutive add() calls, the oldest entry is silently overwritten.

from netrl import ObservationBuffer
import numpy as np

buf = ObservationBuffer(maxlen=4, shape=(3,), dtype=np.float32)

buf.add(np.array([1., 2., 3.]))
buf.add(None)                     # packet loss → zero-padded slot
buf.add(np.array([4., 5., 6.]))

obs, mask = buf.get_padded()
# obs.shape  == (4, 3)
# mask       == [False, True, False, True]  (oldest → newest)
# obs[-1]    == [4., 5., 6.]   ← most recent real observation
# obs[-2]    == [0., 0., 0.]   ← lost packet (zero fill)
# obs[-3]    == [1., 2., 3.]
# obs[-4]    == [0., 0., 0.]   ← unwritten slot (buffer not yet full)