ObservationBuffer
- class netrl.ObservationBuffer(maxlen, shape, dtype)[source]
Bases:
objectTime-slot based circular buffer for storing RL observations.
Each slot in the buffer represents a specific time step in the window [current_step - maxlen + 1, …, current_step]. The recv mask indicates which time steps have received their observations.
This allows delays to be visible: if delay_steps=2, the last 2 positions in recv_mask will be False (those observations are still in flight).
- __init__(maxlen, shape, dtype)[source]
- Parameters:
maxlen (int Maximum number of observations to retain (window size).)
shape (tuple Shape of a single observation (e.g. (4,)).)
dtype (Numpy dtype (e.g. np.float32).)
- Return type:
None
- add(obs, step)[source]
Add an observation that arrived at a specific time step.
Each call advances time; the buffer automatically tracks which time slots map to which buffer indices.
- Parameters:
obs (np.ndarray or None) – If None, step has no observation (packet lost/delayed). If an ndarray, it is stored and recv[step_slot] = True.
step (int) – The time step this observation belongs to.
- Return type:
None
- get()[source]
Return observations in the current time window in chronological order.
Returns only observations that fit in the time window [current_step - maxlen + 1, …, current_step]. Early observations before buffer initialization are excluded.
- Returns:
observations (np.ndarray, shape (num_steps, *shape))
recv_mask (np.ndarray, shape (num_steps,), dtype bool)
- Raises:
ValueError if buffer is empty (no steps added yet). –
- Return type:
- get_padded()[source]
Return exactly maxlen observations in time-slot order.
Returns observations for time window [current_step - maxlen + 1, …, current_step]. recv_mask[i] = True if that time slot’s observation has been received. The most recent observation is at index [-1].
Before buffer is initialized or for steps less than maxlen, earlier bounds are zero-padded with recv_mask=False.
- Returns:
observations (np.ndarray, shape (maxlen, *shape)) – Observations ordered by time step.
recv_mask (np.ndarray, shape (maxlen,), dtype bool) – True if that time slot’s observation has arrived.
- Return type:
Semantics
The buffer is a fixed-size circular window. After maxlen consecutive
add() calls, the oldest entry is silently overwritten.
from netrl import ObservationBuffer
import numpy as np
buf = ObservationBuffer(maxlen=4, shape=(3,), dtype=np.float32)
buf.add(np.array([1., 2., 3.]))
buf.add(None) # packet loss → zero-padded slot
buf.add(np.array([4., 5., 6.]))
obs, mask = buf.get_padded()
# obs.shape == (4, 3)
# mask == [False, True, False, True] (oldest → newest)
# obs[-1] == [4., 5., 6.] ← most recent real observation
# obs[-2] == [0., 0., 0.] ← lost packet (zero fill)
# obs[-3] == [1., 2., 3.]
# obs[-4] == [0., 0., 0.] ← unwritten slot (buffer not yet full)