Source code for netrl.channels.ns3_multi_ue_channel

"""
ns3_multi_ue_channel.py
=======================
Multi-UE WiFi channel backed by a single ns-3 infrastructure BSS simulation.

Architecture
------------
A **single** ns3_wifi_multi_ue_sim subprocess simulates N UEs (STAs) all
associated with one AP.  All UEs share the same 802.11a wireless medium and
contend via CSMA/CA, producing correct multi-node uplink behaviour.

Python-side class hierarchy
---------------------------
                  ┌────────────────────────────┐
                  │  NS3WifiMultiUEBackend      │
                  │  (one instance, shared)     │
                  │  - owns the subprocess      │
                  │  - flush cache per step     │
                  │  - reset coordination       │
                  └────────────┬───────────────┘
                               │  referenced by N instances
          ┌────────────────────┼────────────────────┐
          ▼                    ▼                     ▼
  NS3WifiUEChannel(0)  NS3WifiUEChannel(1)  NS3WifiUEChannel(N-1)
  (CommChannel)        (CommChannel)        (CommChannel)
  - owns pending obs   - owns pending obs   - owns pending obs
  - ue_id = 0          - ue_id = 1          - ue_id = N-1

Each NS3WifiUEChannel instance is registered as a separate node in
CentralNode, so the existing multi-node aggregation logic is reused
without modification.

Usage
-----
    from netrl import NetworkConfig, CentralNode
    from netrl.channels.ns3_wifi_multi_ue_config import NS3WifiMultiUEConfig
    from netrl.channels.ns3_multi_ue_channel import make_multi_ue_wifi_factory
    import numpy as np

    ns3_cfg = NS3WifiMultiUEConfig(
        n_ues=3,
        distances_m=[10.0, 30.0, 60.0],
        step_duration_ms=2.0,
    )
    factory = make_multi_ue_wifi_factory(ns3_cfg)

    central = CentralNode(
        node_ids=["ue_0", "ue_1", "ue_2"],
        obs_shape=(4,),
        obs_dtype=np.float32,
        config=NetworkConfig(buffer_size=10),
        channel_factory=factory,
    )

Build the binary first:
    bash src/build_ns3_multi_ue_sim.sh

Protocol (subprocess stdin/stdout)
-----------------------------------
Python → subprocess:
    TRANSMIT <ue_id> <step_id> <pkt_size>
    FLUSH    <step_id>
    RESET
    QUIT

Subprocess → Python:
    READY
    OK
    RECV <ue_id>:<step_id> ...   (space-separated, may be empty: "RECV")
    ERROR <msg>
"""

from __future__ import annotations

import os
import subprocess
import threading
from collections import deque
from typing import Callable, Dict, List, Optional, Tuple

import numpy as np

from netrl.channels.comm_channel import CommChannel
from netrl.channels.network_config import NetworkConfig
from netrl.channels.ns3_wifi_multi_ue_config import NS3WifiMultiUEConfig
from netrl import netrl_multi_ue_ext

_HAS_MULTI_UE_PYBIND = True

_DEFAULT_SIM_BINARY = os.path.normpath(
    os.path.join(
        os.path.dirname(__file__), "../..", "src", "ns3_wifi_multi_ue_sim"
    )
)


# ---------------------------------------------------------------------------
# Backend: owns the subprocess and is shared across all UE channel instances
# ---------------------------------------------------------------------------

[docs] class NS3WifiMultiUEBackend: """ Manages the ns3_wifi_multi_ue_sim subprocess shared by all UE channels. Two coordination mechanisms are needed because CentralNode calls each CommChannel in a loop: Flush coordination flush(step) sends "FLUSH <step>" to the subprocess exactly once per step. Subsequent calls for the same step return a cached result. CentralNode guarantees strictly increasing step values, so the cache is valid for the lifetime of the step. Reset coordination request_reset() is called by each NS3WifiUEChannel on reset(). The RESET command is sent to the subprocess only after all n_ues channels have called request_reset(), ensuring a single clean reset. """
[docs] def __init__(self, ns3_cfg: NS3WifiMultiUEConfig) -> None: self.ns3_cfg = ns3_cfg self._native = None self._use_pybind = _HAS_MULTI_UE_PYBIND self._proc: Optional[subprocess.Popen] = None self._stderr_buf: deque = deque(maxlen=200) # Flush cache: last flushed step → {ue_id: [step_id, ...]} self._last_flushed_step: int = -1 self._flush_cache: Dict[int, List[int]] = {} # Reset coordination self._reset_pending: int = 0 if self._use_pybind: self._start_native_backend() else: self._start_subprocess()
# ----------------------------------------------------------------------- # Public API (called by NS3WifiUEChannel instances) # -----------------------------------------------------------------------
[docs] def transmit(self, ue_id: int, step_id: int, pkt_size: int) -> None: """Send TRANSMIT <ue_id> <step_id> <pkt_size> and wait for OK.""" if self._use_pybind: assert self._native is not None self._native.transmit(ue_id, step_id, pkt_size) return self._send_command(f"TRANSMIT {ue_id} {step_id} {pkt_size}") resp = self._read_line() if resp != "OK": raise RuntimeError( f"NS3WifiMultiUEBackend transmit: unexpected response '{resp}'" )
[docs] def flush(self, step: int) -> Dict[int, List[int]]: """ Advance the simulation to the end of env step `step` and return arrived packet identifiers grouped by UE. The first call for a given step sends FLUSH to the subprocess and caches the result; subsequent calls for the same step return the cache. Returns ------- Dict[ue_id -> List[step_id]] step_ids that arrived at the AP in this flush window, per UE. """ if step == self._last_flushed_step: return self._flush_cache if step < self._last_flushed_step: # Should not happen under normal CentralNode usage; return empty. return {} if self._use_pybind: assert self._native is not None result: Dict[int, List[int]] = {} for uid, sid in self._native.flush(step): result.setdefault(int(uid), []).append(int(sid)) self._last_flushed_step = step self._flush_cache = result return result self._send_command(f"FLUSH {step}") response = self._read_line() if not response.startswith("RECV"): raise RuntimeError( f"NS3WifiMultiUEBackend flush: unexpected response '{response}'" ) # Parse "RECV ue_id:step_id ue_id:step_id ..." result: Dict[int, List[int]] = {} for part in response.split()[1:]: uid_str, sid_str = part.split(":") uid = int(uid_str) sid = int(sid_str) result.setdefault(uid, []).append(sid) self._last_flushed_step = step self._flush_cache = result return result
[docs] def request_reset(self) -> None: """ Called once by each NS3WifiUEChannel on reset(). Sends RESET to the subprocess only after every UE channel has called this method, guaranteeing exactly one RESET per env.reset(). """ self._reset_pending += 1 if self._reset_pending >= self.ns3_cfg.n_ues: self._reset_pending = 0 self._last_flushed_step = -1 self._flush_cache = {} if self._use_pybind: assert self._native is not None self._native.reset() return self._send_command("RESET") resp = self._read_line() if resp != "OK": raise RuntimeError( f"NS3WifiMultiUEBackend reset: unexpected response '{resp}'" )
def _start_native_backend(self) -> None: cfg = self.ns3_cfg self._native = netrl_multi_ue_ext.NS3WiFiMultiUEChannel( n_ues=cfg.n_ues, distances_m=cfg.distances_m, step_duration_ms=cfg.step_duration_ms, tx_power_dbm=cfg.tx_power_dbm, loss_exponent=cfg.loss_exponent, max_retries=cfg.max_retries, packet_size_bytes=cfg.packet_size_bytes, ) # ----------------------------------------------------------------------- # Subprocess management (mirrors NS3WifiChannel implementation) # ----------------------------------------------------------------------- def _resolve_binary(self) -> str: path = self.ns3_cfg.sim_binary or _DEFAULT_SIM_BINARY if not os.path.isfile(path): raise FileNotFoundError( f"ns3_wifi_multi_ue_sim binary not found at '{path}'.\n" "Build it first:\n" " bash src/build_ns3_multi_ue_sim.sh\n" "Or set NS3WifiMultiUEConfig.sim_binary to the correct path." ) if not os.access(path, os.X_OK): raise PermissionError( f"ns3_wifi_multi_ue_sim binary at '{path}' is not executable.\n" "Run: chmod +x " + path ) return path def _start_subprocess(self) -> None: binary = self._resolve_binary() cfg = self.ns3_cfg distances_str = ",".join(str(d) for d in cfg.distances_m) cmd = [ binary, f"--n-ues={cfg.n_ues}", f"--distances={distances_str}", f"--step-ms={cfg.step_duration_ms}", f"--tx-power={cfg.tx_power_dbm}", f"--loss-exp={cfg.loss_exponent}", f"--retries={cfg.max_retries}", f"--pkt-size={cfg.packet_size_bytes}", ] self._proc = subprocess.Popen( cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, bufsize=0, text=True, encoding="utf-8", ) def _drain_stderr_loop() -> None: try: for line in self._proc.stderr: # type: ignore[union-attr] self._stderr_buf.append(line.rstrip("\n\r")) except Exception: pass threading.Thread(target=_drain_stderr_loop, daemon=True).start() # Infrastructure BSS association takes up to 500 ms warm-up; allow # 60 s total for the subprocess to become ready (same as mmWave). try: ready_line = self._read_line(timeout=60.0) except TimeoutError: self._kill_subprocess() raise RuntimeError( "ns3_wifi_multi_ue_sim did not emit READY within 60 s.\n" "Check stderr for ns-3 error messages." ) if ready_line != "READY": preview = self._drain_stderr() self._kill_subprocess() raise RuntimeError( f"ns3_wifi_multi_ue_sim unexpected startup line: '{ready_line}'\n" f"stderr: {preview}" ) def _kill_subprocess(self) -> None: if self._proc is not None: try: self._proc.kill() self._proc.wait(timeout=5) except Exception: pass self._proc = None def _send_command(self, line: str) -> None: if self._proc is None or self._proc.stdin is None: raise RuntimeError( "NS3WifiMultiUEBackend: subprocess is not running." ) try: self._proc.stdin.write(line + "\n") self._proc.stdin.flush() except BrokenPipeError as exc: preview = self._drain_stderr() raise RuntimeError( "NS3WifiMultiUEBackend: subprocess stdin pipe broken.\n" f"stderr: {preview}" ) from exc def _read_line(self, timeout: float = 10.0) -> str: if self._proc is None or self._proc.stdout is None: raise RuntimeError( "NS3WifiMultiUEBackend: subprocess is not running." ) result: List[Optional[str]] = [None] exc_holder: List[Optional[Exception]] = [None] def _read() -> None: try: result[0] = self._proc.stdout.readline() # type: ignore[union-attr] except Exception as e: exc_holder[0] = e t = threading.Thread(target=_read, daemon=True) t.start() t.join(timeout) if t.is_alive(): preview = self._drain_stderr() raise TimeoutError( f"NS3WifiMultiUEBackend: no response within {timeout} s.\n" f"stderr: {preview}" ) if exc_holder[0] is not None: raise exc_holder[0] # type: ignore[misc] line = result[0] if line is None or line == "": preview = self._drain_stderr() raise RuntimeError( "NS3WifiMultiUEBackend: subprocess stdout closed (process exited?).\n" f"Return code: {self._proc.poll()}\n" f"stderr: {preview}" ) return line.rstrip("\n\r") def _drain_stderr(self, max_lines: int = 50) -> str: if not self._stderr_buf: return "<no stderr>" return "\n".join(list(self._stderr_buf)[-max_lines:]) def __del__(self) -> None: if self._use_pybind: self._native = None return try: if self._proc is not None and self._proc.poll() is None: self._send_command("QUIT") self._proc.wait(timeout=2) except Exception: pass finally: self._kill_subprocess()
# --------------------------------------------------------------------------- # Per-UE channel: CommChannel proxy delegating to the shared backend # ---------------------------------------------------------------------------
[docs] class NS3WifiUEChannel(CommChannel): """ CommChannel implementation for a single UE in the multi-UE WiFi network. All instances created by make_multi_ue_wifi_factory() share one NS3WifiMultiUEBackend (and therefore one ns-3 subprocess). This ensures that all UEs contend for the same simulated wireless medium. Parameters ---------- ue_id : int Zero-based index of this UE. backend : NS3WifiMultiUEBackend Shared subprocess manager. config : NetworkConfig NetRL shared config (used for metadata). """
[docs] def __init__( self, ue_id: int, backend: NS3WifiMultiUEBackend, config: NetworkConfig, ) -> None: self._ue_id = ue_id self._backend = backend self._config = config # step_id → obs_copy: observations awaiting ns-3 ack self._pending: Dict[int, np.ndarray] = {}
# ----------------------------------------------------------------------- # CommChannel interface # -----------------------------------------------------------------------
[docs] def transmit( self, obs: np.ndarray, step: int, packet_size: Optional[int] = None ) -> None: """ Transmit `obs` from this UE at env step `step`. Stores the observation locally (keyed by step_id) and instructs the shared backend to schedule a packet send in the ns-3 simulation. """ size = ( packet_size if packet_size is not None else self._backend.ns3_cfg.packet_size_bytes ) self._pending[step] = obs.copy() self._backend.transmit(self._ue_id, step, size)
[docs] def flush(self, step: int) -> List[Tuple[int, np.ndarray]]: """ Return observations whose packets arrived at the AP during step `step`. Queries the shared backend (which sends FLUSH to ns-3 at most once per step and caches results for subsequent calls). Any pending observations older than max_pending_steps are expired. Returns ------- List of (arrival_step, obs) tuples. """ arrived_map = self._backend.flush(step) step_ids = arrived_map.get(self._ue_id, []) result: List[Tuple[int, np.ndarray]] = [] for sid in step_ids: if sid in self._pending: result.append((sid, self._pending.pop(sid))) # Expire observations that stayed in-flight too long max_age = self._backend.ns3_cfg.max_pending_steps expired = [sid for sid in self._pending if step - sid > max_age] for sid in expired: del self._pending[sid] return result
[docs] def reset(self) -> None: """ Clear local state and notify the backend that this UE has reset. The backend sends RESET to the subprocess once all n_ues channels have called reset(), ensuring a single coordinated simulation reset. """ self._pending.clear() self._backend.request_reset()
[docs] def get_channel_info(self) -> dict: """Return diagnostic information for this UE channel.""" cfg = self._backend.ns3_cfg distances = cfg.distances_m dist = ( distances[self._ue_id] if self._ue_id < len(distances) else distances[-1] ) return { "state": "NS3_WIFI_MULTI_UE", "ue_id": self._ue_id, "n_ues": cfg.n_ues, "pending_count": len(self._pending), "distance_m": dist, "step_duration_ms": cfg.step_duration_ms, "tx_power_dbm": cfg.tx_power_dbm, "loss_exponent": cfg.loss_exponent, "max_retries": cfg.max_retries, }
# --------------------------------------------------------------------------- # Factory function # ---------------------------------------------------------------------------
[docs] def make_multi_ue_wifi_factory( ns3_cfg: NS3WifiMultiUEConfig, ) -> Callable[[NetworkConfig], CommChannel]: """ Create a channel_factory callable for use with CentralNode. A **single** ns3_wifi_multi_ue_sim subprocess is started immediately. Each call to the returned factory creates one NS3WifiUEChannel proxy (indexed sequentially: 0, 1, 2, …) that delegates to the shared backend. The factory must be called exactly ns3_cfg.n_ues times — once per node_id registered with CentralNode — so that len(node_ids) == n_ues. Parameters ---------- ns3_cfg : NS3WifiMultiUEConfig Physical-layer and network configuration. Returns ------- Callable[[NetworkConfig], CommChannel] A factory suitable for CentralNode's channel_factory parameter. Example ------- :: ns3_cfg = NS3WifiMultiUEConfig( n_ues=3, distances_m=[10.0, 30.0, 60.0], step_duration_ms=2.0, ) factory = make_multi_ue_wifi_factory(ns3_cfg) central = CentralNode( node_ids=["ue_0", "ue_1", "ue_2"], obs_shape=(4,), obs_dtype=np.float32, config=NetworkConfig(buffer_size=10), channel_factory=factory, ) """ ns3_cfg.validate() backend: NS3WifiMultiUEBackend = NS3WifiMultiUEBackend(ns3_cfg) counter: List[int] = [0] def _factory(net_cfg: NetworkConfig) -> CommChannel: ue_id = counter[0] counter[0] += 1 if ue_id >= ns3_cfg.n_ues: raise ValueError( f"make_multi_ue_wifi_factory: factory called for UE index " f"{ue_id} but NS3WifiMultiUEConfig.n_ues={ns3_cfg.n_ues}. " "Ensure len(node_ids) passed to CentralNode equals n_ues." ) return NS3WifiUEChannel(ue_id, backend, net_cfg) return _factory