Single-Observer Environments
NetworkedEnv wraps any Gymnasium environment whose observation
space is a Box. A single observation path — one channel, one buffer — is
simulated between the environment and the agent.
Observation space
The original Box(obs_shape) is replaced by a Dict:
gymnasium.spaces.Dict({
"observations": Box(shape=(buffer_size, *obs_shape), dtype=obs_dtype),
"recv_mask": MultiBinary(buffer_size),
})
The agent sees the last buffer_size delivery slots. Slot [-1] is the
most recent; slot [0] is the oldest. Slots where no packet arrived
(loss or delay) are zero-padded; recv_mask[i] == True means a real
observation occupies slot i.
Per-step sequence
For each call to env.step(action):
The wrapped environment is stepped:
raw_obs, reward, term, trunc, info.raw_obsis serialised into a UDP-payload packet and handed to the channel.The channel is flushed: packets due at this step are delivered (or dropped).
The observation buffer is updated with the arrived packet (or
None).The Dict observation (buffer + mask) is returned.
Note
The channel simulates the forward path only (sensor → central node). The return path (controller → actuator) is instantaneous.
Construction
from netrl import NetworkedEnv, NetworkConfig
env = NetworkedEnv(
base_env,
config=NetworkConfig(
p_gb=0.10, # Good → Bad
p_bg=0.30, # Bad → Good
loss_good=0.01,
loss_bad=0.20,
delay_steps=2,
buffer_size=10,
seed=42,
),
channel_config=None, # None → Gilbert–Elliott (default)
)
Selecting a different channel backend is done via channel_config:
from netrl import NS3WifiConfig
env = NetworkedEnv(base_env, config,
channel_config=NS3WifiConfig(distance_m=30.0))
See Choosing a Channel Backend for details on all available backends.
Using step()
obs, reward, term, trunc, info = env.step(action)
# Optional: override the packet payload size for this step only
obs, reward, term, trunc, info = env.step(action, packet_size=256)
The info dictionary is augmented with:
Key |
Value |
|---|---|
|
|
|
|
Resetting
obs, info = env.reset()
This resets the wrapped environment and calls central_node.reset(),
which clears the channel queues, resets the GE Markov state, and zeroes the
observation buffer. For ns-3 backends the subprocess is fully reinitialised.
Training with Stable-Baselines3
NetworkedEnv is a standard gymnasium.Wrapper and works with
any SB3 policy that accepts MultiInputPolicy:
from stable_baselines3 import PPO
from netrl import NetworkedEnv, NetworkConfig
env = NetworkedEnv(gym.make("CartPole-v1"), NetworkConfig(buffer_size=10))
model = PPO("MultiInputPolicy", env, verbose=1)
model.learn(total_timesteps=100_000)
Parallel environments
Use gymnasium.vector.AsyncVectorEnv to run multiple independent
environments (each with its own channel subprocess) in parallel:
from gymnasium.vector import AsyncVectorEnv
from netrl import NetworkedEnv, NetworkConfig, NS3WifiConfig
def make_env(seed):
def _fn():
return NetworkedEnv(
gym.make("CartPole-v1"),
NetworkConfig(buffer_size=10, seed=seed),
channel_config=NS3WifiConfig(distance_m=30.0, step_duration_ms=2.0),
)
return _fn
vec_env = AsyncVectorEnv([make_env(i) for i in range(4)])
obs, info = vec_env.reset()
# obs["observations"].shape == (4, 10, 4)