"""
ns3_channel.py
==============
NS3WifiChannel — a CommChannel backed by an ns-3 WiFi simulation running
as a persistent subprocess.
Architecture
------------
The channel spawns a single long-lived child process (src/ns3_wifi_sim)
that owns the full ns-3 event loop. Python and the subprocess communicate
over stdin/stdout using a line-oriented text protocol:
TRANSMIT <step_id> <size> → schedule a probe packet for env step step_id
FLUSH <step_id> → advance ns-3 to end of step_id, report arrivals
RESET → restart the ns-3 simulation (sim time → 0)
QUIT → clean shutdown
The subprocess responds with:
READY → emitted once at startup
OK → acknowledges TRANSMIT / RESET
RECV <id>… → space-separated step_ids that arrived (may be empty)
ERROR <msg> → unexpected condition
Timing model
------------
• Each env step t occupies ns-3 time [t·step_ms, (t+1)·step_ms).
• transmit(obs, t) schedules a packet send 1 % into step t's window.
• flush(t) advances the sim to (t+1)·step_ms and collects arrivals.
The ns-3 simulation is PERSISTENT across env steps. The full MAC / PHY
state (backoff counters, retry timers, etc.) persists between consecutive
steps, giving temporally-correlated realistic channel behaviour. The
simulation is only torn down and rebuilt by reset() (i.e. env.reset()).
Observation mapping
-------------------
The step_id embedded in each probe packet (4-byte big-endian uint32)
allows the Python side to map a received packet back to the correct
observation, even when packets arrive out of their transmit order.
A pending observation is expired (considered lost) after max_pending_steps
env steps without receiving an acknowledgement. This bounds memory usage
and handles the case where ns-3 silently drops a packet (MAC retry
exhaustion never fires a Python callback).
Usage
-----
from netrl import NetworkedEnv, NetworkConfig, NS3WifiChannel, NS3WifiConfig
import gymnasium as gym
ns3_cfg = NS3WifiConfig(distance_m=15.0, step_duration_ms=2.0)
config = NetworkConfig(buffer_size=20)
env = NetworkedEnv(
gym.make("CartPole-v1"),
config,
channel_factory=lambda cfg: NS3WifiChannel(cfg, ns3_cfg),
)
Build the binary first:
bash src/build_ns3_sim.sh
"""
from __future__ import annotations
import os
import select
import subprocess
import threading
from collections import deque
from typing import Dict, List, Optional, Tuple
import numpy as np
from netrl.channels.comm_channel import CommChannel
from netrl.channels.network_config import NetworkConfig
from netrl.channels.ns3_wifi_config import NS3WifiConfig
# Default location of the compiled ns-3 simulation binary relative to
# this file: <project_root>/src/ns3_wifi_sim
_DEFAULT_SIM_BINARY = os.path.normpath(
os.path.join(os.path.dirname(__file__), "../..", "src", "ns3_wifi_sim")
)
[docs]
class NS3WifiChannel(CommChannel):
"""
CommChannel implementation backed by an ns-3 802.11a WiFi simulation.
Each instance manages exactly one subprocess running ns3_wifi_sim.
The subprocess is started on construction and stays alive until
reset() rebuilds the simulation or the Python object is garbage-collected.
Parameters
----------
config : NetworkConfig NetRL shared config (buffer_size, seed etc.).
Note: GE-specific fields (p_gb, p_bg, …) are
not used here — WiFi channel physics determine
loss and delay.
ns3_config : NS3WifiConfig ns-3-specific physical-layer parameters.
If None, defaults are used.
"""
[docs]
def __init__(
self,
config: NetworkConfig,
ns3_config: Optional[NS3WifiConfig] = None,
) -> None:
self._config = config
self._ns3_cfg: NS3WifiConfig = ns3_config or NS3WifiConfig()
self._ns3_cfg.validate()
# step_id → (obs, sent_at_step): obs waiting for the ns-3 ack
self._pending: Dict[int, Tuple[np.ndarray, int]] = {}
# Packets confirmed received by ns-3, waiting to be returned by flush()
# Each item: (arrival_step, obs)
self._arrived: deque = deque()
self._proc: Optional[subprocess.Popen] = None
# Rolling buffer of recent stderr lines (populated by background thread)
self._stderr_buf: deque = deque(maxlen=200)
self._start_subprocess()
# -----------------------------------------------------------------------
# CommChannel interface
# -----------------------------------------------------------------------
[docs]
def transmit(self, obs: np.ndarray, step: int,
packet_size: Optional[int] = None) -> None:
"""
Instruct ns-3 to simulate sending the observation at env step `step`.
The probe packet carrying the step_id is scheduled inside the ns-3
subprocess at simulation time step * step_ms + 0.01 * step_ms.
The observation is stored locally; ns-3 only carries the 4-byte
step_id, not the full observation data.
Parameters
----------
obs : np.ndarray Raw observation from the wrapped env.
step : int Current integer step counter (0-indexed).
packet_size : int | None Payload size in bytes. None uses the
default from NS3WifiConfig.packet_size_bytes.
"""
size = packet_size if packet_size is not None else self._ns3_cfg.packet_size_bytes
self._pending[step] = (obs.copy(), step)
self._send_command(f"TRANSMIT {step} {size}")
resp = self._read_line()
if resp != "OK":
raise RuntimeError(
f"NS3WifiChannel transmit: unexpected response '{resp}'"
)
[docs]
def flush(self, step: int) -> List[Tuple[int, np.ndarray]]:
"""
Advance the ns-3 simulation to the end of env step `step` and
return all observations whose packets arrived within this window.
Observations from earlier steps that had multi-step delays (due to
WiFi MAC retransmissions) may appear here.
Additionally, pending observations older than max_pending_steps are
expired (considered permanently lost).
Parameters
----------
step : int Current integer step counter.
Returns
-------
List of (arrival_step, obs) tuples. arrival_step is set to `step`
(the flush step) regardless of the exact ns-3 arrival time within
the window, because the CommChannel contract only requires that
return values have arrival_step <= step.
"""
# Ask the ns-3 subprocess to run until end of this step
self._send_command(f"FLUSH {step}")
response = self._read_line()
if not response.startswith("RECV"):
raise RuntimeError(
f"NS3WifiChannel flush: unexpected response '{response}'"
)
# Parse arrived step_ids
parts = response.split()
for part in parts[1:]:
sid = int(part)
if sid in self._pending:
obs, _ = self._pending.pop(sid)
self._arrived.append((step, obs))
# Expire old pending observations (packet permanently lost / dropped)
expired = [
sid
for sid, (_, sent_step) in self._pending.items()
if step - sent_step > self._ns3_cfg.max_pending_steps
]
for sid in expired:
self._pending.pop(sid)
# Drain all arrivals whose arrival_step <= step into result
result: List[Tuple[int, np.ndarray]] = []
while self._arrived and self._arrived[0][0] <= step:
result.append(self._arrived.popleft())
return result
[docs]
def reset(self) -> None:
"""
Restart the ns-3 simulation (simulation time → 0) and clear all
pending / arrived state. Called on env.reset().
The subprocess stays alive; only the ns-3 internal simulator state
is destroyed and rebuilt. This is equivalent to starting a new
wireless scenario.
"""
self._pending.clear()
self._arrived.clear()
self._send_command("RESET")
resp = self._read_line()
if resp != "OK":
raise RuntimeError(
f"NS3WifiChannel reset: unexpected response '{resp}'"
)
[docs]
def get_channel_info(self) -> dict:
"""
Return diagnostic information about the channel state.
Does not query the subprocess (would require an extra round-trip);
derives info from Python-side bookkeeping instead.
"""
return {
"state": "NS3_WIFI",
"pending_count": len(self._pending),
"arrived_buffered": len(self._arrived),
"distance_m": self._ns3_cfg.distance_m,
"step_duration_ms": self._ns3_cfg.step_duration_ms,
"tx_power_dbm": self._ns3_cfg.tx_power_dbm,
"loss_exponent": self._ns3_cfg.loss_exponent,
"max_retries": self._ns3_cfg.max_retries,
}
# -----------------------------------------------------------------------
# Subprocess management
# -----------------------------------------------------------------------
def _resolve_binary(self) -> str:
"""Return the path to the ns3_wifi_sim binary, raising if not found."""
path = self._ns3_cfg.sim_binary or _DEFAULT_SIM_BINARY
if not os.path.isfile(path):
raise FileNotFoundError(
f"ns3_wifi_sim binary not found at '{path}'.\n"
"Build it first:\n"
" bash src/build_ns3_sim.sh\n"
"Or set NS3WifiConfig.sim_binary to the correct path."
)
if not os.access(path, os.X_OK):
raise PermissionError(
f"ns3_wifi_sim binary at '{path}' is not executable.\n"
"Run: chmod +x " + path
)
return path
def _start_subprocess(self) -> None:
"""Launch the ns3_wifi_sim subprocess and wait for READY."""
binary = self._resolve_binary()
cfg = self._ns3_cfg
cmd = [
binary,
f"--step-ms={cfg.step_duration_ms}",
f"--distance={cfg.distance_m}",
f"--tx-power={cfg.tx_power_dbm}",
f"--loss-exp={cfg.loss_exponent}",
f"--retries={cfg.max_retries}",
f"--pkt-size={cfg.packet_size_bytes}",
]
self._proc = subprocess.Popen(
cmd,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE, # ns-3 log/warnings → captured stderr
bufsize=0, # unbuffered byte-stream
text=True,
encoding="utf-8",
)
# Background thread: continuously drain stderr so the pipe never fills
# up and blocks the subprocess. Lines are stored in _stderr_buf for
# error reporting.
def _drain_stderr_loop() -> None:
try:
for line in self._proc.stderr: # type: ignore[union-attr]
self._stderr_buf.append(line.rstrip("\n\r"))
except Exception:
pass
threading.Thread(target=_drain_stderr_loop, daemon=True).start()
# Wait for the READY signal (with timeout)
try:
ready_line = self._read_line(timeout=30.0)
except TimeoutError:
self._kill_subprocess()
raise RuntimeError(
"ns3_wifi_sim did not emit READY within 30 s. "
"Check stderr for ns-3 error messages."
)
if ready_line != "READY":
stderr_preview = self._drain_stderr()
self._kill_subprocess()
raise RuntimeError(
f"ns3_wifi_sim emitted unexpected startup line: '{ready_line}'\n"
f"stderr: {stderr_preview}"
)
def _kill_subprocess(self) -> None:
"""Terminate the subprocess forcefully."""
if self._proc is not None:
try:
self._proc.kill()
self._proc.wait(timeout=5)
except Exception:
pass
self._proc = None
def _send_command(self, line: str) -> None:
"""Write a command line to the subprocess stdin."""
if self._proc is None or self._proc.stdin is None:
raise RuntimeError("NS3WifiChannel: subprocess is not running.")
try:
self._proc.stdin.write(line + "\n")
self._proc.stdin.flush()
except BrokenPipeError as exc:
stderr_preview = self._drain_stderr()
raise RuntimeError(
f"NS3WifiChannel: subprocess stdin pipe broken.\n"
f"stderr: {stderr_preview}"
) from exc
def _read_line(self, timeout: float = 10.0) -> str:
"""Read one response line from the subprocess stdout (strips \\n).
Uses select.select() to implement timeout without thread overhead.
This avoids the expensive threading approach and is ~40x faster.
"""
if self._proc is None or self._proc.stdout is None:
raise RuntimeError("NS3WifiChannel: subprocess is not running.")
# Use select.select() for non-blocking read with timeout
# This is much faster than spawning a thread per read
ready, _, _ = select.select([self._proc.stdout], [], [], timeout)
if not ready:
# Timeout: no data available within the specified time
stderr_preview = self._drain_stderr()
raise TimeoutError(
f"NS3WifiChannel: subprocess did not respond within {timeout}s.\n"
f"stderr: {stderr_preview}"
)
# Data is available, read the line
try:
line = self._proc.stdout.readline() # type: ignore[union-attr]
except Exception as e:
stderr_preview = self._drain_stderr()
raise RuntimeError(
f"NS3WifiChannel: error reading from subprocess stdout: {e}\n"
f"stderr: {stderr_preview}"
) from e
if not line:
# EOF: subprocess closed stdout (process exited)
stderr_preview = self._drain_stderr()
raise RuntimeError(
"NS3WifiChannel: subprocess stdout closed (process exited?).\n"
f"Return code: {self._proc.poll()}\n"
f"stderr: {stderr_preview}"
)
return line.rstrip("\n\r")
def _drain_stderr(self, max_lines: int = 50) -> str:
"""Return recent stderr lines from the background drain buffer."""
if not self._stderr_buf:
return "<no stderr>"
lines = list(self._stderr_buf)[-max_lines:]
return "\n".join(lines)
# -----------------------------------------------------------------------
# Lifecycle
# -----------------------------------------------------------------------
def __del__(self) -> None:
"""Send QUIT before the object is garbage-collected."""
try:
if self._proc is not None and self._proc.poll() is None:
self._send_command("QUIT")
self._proc.wait(timeout=2)
except Exception:
pass
finally:
self._kill_subprocess()