Multi-View Environments

MultiViewNetworkedEnv extends the single-observer model to N independent sensors, each with its own channel and observation buffer. Sensors can have different observation shapes and may transmit on different schedules or with different payload sizes.

Motivation

Many real systems are monitored by heterogeneous sensors:

  • A camera (high-bandwidth, bursty)

  • A lidar (medium bandwidth, periodic)

  • An IMU (low-bandwidth, frequent)

Each sensor transmits over a shared or independent wireless link. The central node (controller) must fuse these streams, accounting for losses and delays on each path.

The MultiViewModel abstraction

MultiViewModel separates what each sensor observes from how the transmission network is simulated. You subclass it to define the sensor observation function:

import numpy as np
from netrl import MultiViewModel

class MySensors(MultiViewModel):
    def observe(self, env, state):
        """
        Parameters
        ----------
        env   : gymnasium.Env   The wrapped environment (access render, etc.)
        state : np.ndarray      The raw observation returned by env.step().

        Returns
        -------
        Dict[observer_id → np.ndarray]
            One observation array per observer.  Each array must match the
            shape declared in obs_shapes.
        """
        return {
            "lidar":   state[:2].astype(np.float32),
            "camera":  np.random.randn(8).astype(np.float32),
            "imu":     state[2:].astype(np.float32),
        }

model = MySensors(
    observer_ids=["lidar", "camera", "imu"],
    obs_shapes  =[(2,), (8,), (2,)],
    obs_dtypes  =[np.float32, np.float32, np.float32],
)

Note

observe() is called every step before any transmission decisions are made. The observations it returns are used only for the observers that are active (non-masked) that step.

Constructing the environment

import gymnasium as gym
from netrl import NetworkConfig, MultiViewNetworkedEnv
from netrl.channels.comm_channel import GEChannel

env = MultiViewNetworkedEnv(
    gym.make("CartPole-v1"),
    config=NetworkConfig(buffer_size=8, loss_bad=0.25, seed=42),
    observer_ids=["lidar", "camera", "imu"],
    multi_view_model=model,
    channel_factory=GEChannel,   # one independent GE channel per observer
)

Observation space

The returned observation space is a nested Dict:

gymnasium.spaces.Dict({
    "lidar": Dict({
        "observations": Box(shape=(8, 2), dtype=float32),
        "recv_mask":    MultiBinary(8),
    }),
    "camera": Dict({
        "observations": Box(shape=(8, 8), dtype=float32),
        "recv_mask":    MultiBinary(8),
    }),
    "imu": Dict({
        "observations": Box(shape=(8, 2), dtype=float32),
        "recv_mask":    MultiBinary(8),
    }),
})

Using step()

The step() method accepts two keyword-only arguments that give per-step control over transmissions:

transmit_mask

A Dict[str, bool] controlling which observers are active this step. None (default) means all observers transmit. Absent keys default to True (opt-out semantics):

# Only lidar transmits; camera and imu are silenced
obs, r, term, trunc, info = env.step(
    action,
    transmit_mask={"lidar": True, "camera": False, "imu": False},
)
packet_sizes

A Dict[str, int] overriding the payload bytes per active observer. None (default) uses each channel’s configured default:

# lidar sends 256 bytes; camera sends 4096 bytes; imu uses default
obs, r, term, trunc, info = env.step(
    action,
    packet_sizes={"lidar": 256, "camera": 4096},
)

Both arguments can be combined:

obs, r, term, trunc, info = env.step(
    action,
    transmit_mask={"lidar": True, "camera": True, "imu": False},
    packet_sizes={"lidar": 128, "camera": 2048},
)

Important

flush_and_update() is called for all observers every step, regardless of transmit_mask. This guarantees that every buffer advances by exactly one slot per step; delayed packets from previous steps are still collected for observers that were silent this step.

The extended info dict

Key

Value

"channel_info"

Dict[observer_id channel_info_dict]

"arrived_this_step"

Dict[observer_id bool] — packet arrived at central node

"transmitted_this_step"

Dict[observer_id bool] — transmission was attempted

Using a shared 802.11a WiFi channel

Replace independent GE channels with a single ns-3 infrastructure BSS where all observers compete for the wireless medium via CSMA/CA. This requires building the multi-UE binary first:

bash src/build_ns3_multi_ue_sim.sh
from netrl import NS3WifiMultiUEConfig, make_multi_ue_wifi_factory

factory = make_multi_ue_wifi_factory(
    NS3WifiMultiUEConfig(
        n_ues=3,                          # must equal len(observer_ids)
        distances_m=[10.0, 30.0, 60.0],  # distance per observer
        step_duration_ms=2.0,
        packet_size_bytes=128,
    )
)

env = MultiViewNetworkedEnv(
    gym.make("CartPole-v1"),
    NetworkConfig(buffer_size=8),
    observer_ids=["lidar", "camera", "imu"],
    multi_view_model=model,
    channel_factory=factory,
)

When all three observers transmit in the same step they contend for the channel via MAC-layer CSMA/CA backoff — the same way real Wi-Fi devices do.

Implementation tips

Duty-cycling sensors

Alternate which observers transmit to share channel capacity:

for step in range(N):
    mask = {oid: (step % len(observer_ids) == i)
            for i, oid in enumerate(observer_ids)}
    obs, r, term, trunc, info = env.step(action, transmit_mask=mask)
Adaptive packet sizes

Transmit full observations when the channel is in the Good state, and reduced observations when in the Bad state:

sizes = {
    oid: 256 if info["channel_info"][oid].get("state") == "GOOD" else 64
    for oid in observer_ids
}
obs, r, term, trunc, info = env.step(action, packet_sizes=sizes)
Accessing buffers directly

Use central_node to access raw buffers outside of the step loop:

for oid in env.observer_ids:
    buf, mask = env.central_node.get_buffer(oid)
    print(f"{oid}: {mask.sum()} packets received in last {len(mask)} steps")