Multi-View Environments
MultiViewNetworkedEnv extends the single-observer model to
N independent sensors, each with its own channel and observation buffer.
Sensors can have different observation shapes and may transmit on different
schedules or with different payload sizes.
Motivation
Many real systems are monitored by heterogeneous sensors:
A camera (high-bandwidth, bursty)
A lidar (medium bandwidth, periodic)
An IMU (low-bandwidth, frequent)
Each sensor transmits over a shared or independent wireless link. The central node (controller) must fuse these streams, accounting for losses and delays on each path.
The MultiViewModel abstraction
MultiViewModel separates what each sensor observes from how
the transmission network is simulated. You subclass it to define the sensor
observation function:
import numpy as np
from netrl import MultiViewModel
class MySensors(MultiViewModel):
def observe(self, env, state):
"""
Parameters
----------
env : gymnasium.Env The wrapped environment (access render, etc.)
state : np.ndarray The raw observation returned by env.step().
Returns
-------
Dict[observer_id → np.ndarray]
One observation array per observer. Each array must match the
shape declared in obs_shapes.
"""
return {
"lidar": state[:2].astype(np.float32),
"camera": np.random.randn(8).astype(np.float32),
"imu": state[2:].astype(np.float32),
}
model = MySensors(
observer_ids=["lidar", "camera", "imu"],
obs_shapes =[(2,), (8,), (2,)],
obs_dtypes =[np.float32, np.float32, np.float32],
)
Note
observe() is called every step before any transmission decisions are
made. The observations it returns are used only for the observers that are
active (non-masked) that step.
Constructing the environment
import gymnasium as gym
from netrl import NetworkConfig, MultiViewNetworkedEnv
from netrl.channels.comm_channel import GEChannel
env = MultiViewNetworkedEnv(
gym.make("CartPole-v1"),
config=NetworkConfig(buffer_size=8, loss_bad=0.25, seed=42),
observer_ids=["lidar", "camera", "imu"],
multi_view_model=model,
channel_factory=GEChannel, # one independent GE channel per observer
)
Observation space
The returned observation space is a nested Dict:
gymnasium.spaces.Dict({
"lidar": Dict({
"observations": Box(shape=(8, 2), dtype=float32),
"recv_mask": MultiBinary(8),
}),
"camera": Dict({
"observations": Box(shape=(8, 8), dtype=float32),
"recv_mask": MultiBinary(8),
}),
"imu": Dict({
"observations": Box(shape=(8, 2), dtype=float32),
"recv_mask": MultiBinary(8),
}),
})
Using step()
The step() method accepts two keyword-only arguments that give per-step
control over transmissions:
transmit_maskA
Dict[str, bool]controlling which observers are active this step.None(default) means all observers transmit. Absent keys default toTrue(opt-out semantics):# Only lidar transmits; camera and imu are silenced obs, r, term, trunc, info = env.step( action, transmit_mask={"lidar": True, "camera": False, "imu": False}, )
packet_sizesA
Dict[str, int]overriding the payload bytes per active observer.None(default) uses each channel’s configured default:# lidar sends 256 bytes; camera sends 4096 bytes; imu uses default obs, r, term, trunc, info = env.step( action, packet_sizes={"lidar": 256, "camera": 4096}, )
Both arguments can be combined:
obs, r, term, trunc, info = env.step(
action,
transmit_mask={"lidar": True, "camera": True, "imu": False},
packet_sizes={"lidar": 128, "camera": 2048},
)
Important
flush_and_update() is called for all
observers every step, regardless of transmit_mask. This guarantees
that every buffer advances by exactly one slot per step; delayed packets
from previous steps are still collected for observers that were silent
this step.
The extended info dict
Key |
Value |
|---|---|
|
|
|
|
|
|
Implementation tips
- Duty-cycling sensors
Alternate which observers transmit to share channel capacity:
for step in range(N): mask = {oid: (step % len(observer_ids) == i) for i, oid in enumerate(observer_ids)} obs, r, term, trunc, info = env.step(action, transmit_mask=mask)
- Adaptive packet sizes
Transmit full observations when the channel is in the Good state, and reduced observations when in the Bad state:
sizes = { oid: 256 if info["channel_info"][oid].get("state") == "GOOD" else 64 for oid in observer_ids } obs, r, term, trunc, info = env.step(action, packet_sizes=sizes)
- Accessing buffers directly
Use
central_nodeto access raw buffers outside of the step loop:for oid in env.observer_ids: buf, mask = env.central_node.get_buffer(oid) print(f"{oid}: {mask.sum()} packets received in last {len(mask)} steps")