Source code for netrl.channels.ns3_lena_channel

"""
ns3_lena_channel.py
===================
NS3LenaChannel — a CommChannel backed by a 5G-LENA ns-3 simulation
running as a persistent subprocess.

Protocol (stdin/stdout, line-oriented)
--------------------------------------
    TRANSMIT <step_id> <size>
    FLUSH    <step_id>
    RESET
    QUIT

Responses:
    READY
    OK
    RECV <id1> <id2> ...
    ERROR <msg>
"""

from __future__ import annotations

import os
import subprocess
import threading
from collections import deque
from typing import Dict, List, Optional, Tuple

import numpy as np

from netrl.channels.comm_channel import CommChannel
from netrl.channels.network_config import NetworkConfig
from netrl.channels.ns3_lena_config import NS3LenaConfig

_DEFAULT_SIM_BINARY = os.path.normpath(
    os.path.join(os.path.dirname(__file__), "../..", "src", "ns3_lena_sim")
)


[docs] class NS3LenaChannel(CommChannel): """CommChannel implementation backed by an ns-3 5G-LENA NR simulation."""
[docs] def __init__( self, config: NetworkConfig, ns3_config: Optional[NS3LenaConfig] = None, ) -> None: self._config = config self._ns3_cfg: NS3LenaConfig = ns3_config or NS3LenaConfig() self._ns3_cfg.validate() self._pending: Dict[int, Tuple[np.ndarray, int]] = {} self._arrived: deque = deque() self._proc: Optional[subprocess.Popen] = None self._stderr_buf: deque = deque(maxlen=200) self._start_subprocess()
[docs] def transmit(self, obs: np.ndarray, step: int, packet_size: Optional[int] = None) -> None: size = packet_size if packet_size is not None else self._ns3_cfg.packet_size_bytes self._pending[step] = (obs.copy(), step) self._send_command(f"TRANSMIT {step} {size}") resp = self._read_line() if resp != "OK": raise RuntimeError( f"NS3LenaChannel transmit: unexpected response '{resp}'" )
[docs] def flush(self, step: int) -> List[Tuple[int, np.ndarray]]: self._send_command(f"FLUSH {step}") response = self._read_line() if not response.startswith("RECV"): raise RuntimeError( f"NS3LenaChannel flush: unexpected response '{response}'" ) parts = response.split() for part in parts[1:]: sid = int(part) if sid in self._pending: obs, _ = self._pending.pop(sid) self._arrived.append((step, obs)) expired = [ sid for sid, (_, sent_step) in self._pending.items() if step - sent_step > self._ns3_cfg.max_pending_steps ] for sid in expired: self._pending.pop(sid) result: List[Tuple[int, np.ndarray]] = [] while self._arrived and self._arrived[0][0] <= step: result.append(self._arrived.popleft()) return result
[docs] def reset(self) -> None: self._pending.clear() self._arrived.clear() self._send_command("RESET") resp = self._read_line(timeout=60.0) if resp != "OK": raise RuntimeError( f"NS3LenaChannel reset: unexpected response '{resp}'" )
[docs] def get_channel_info(self) -> dict: return { "state": "NS3_LENA", "pending_count": len(self._pending), "arrived_buffered": len(self._arrived), "distance_m": self._ns3_cfg.distance_m, "frequency_ghz": self._ns3_cfg.frequency_ghz, "bandwidth_ghz": self._ns3_cfg.bandwidth_ghz, "ue_tx_power_dbm": self._ns3_cfg.ue_tx_power_dbm, "gnb_tx_power_dbm": self._ns3_cfg.gnb_tx_power_dbm, "scenario": self._ns3_cfg.scenario, "numerology": self._ns3_cfg.numerology, "step_duration_ms": self._ns3_cfg.step_duration_ms, }
def _resolve_binary(self) -> str: path = self._ns3_cfg.sim_binary or _DEFAULT_SIM_BINARY if not os.path.isfile(path): raise FileNotFoundError( f"ns3_lena_sim binary not found at '{path}'.\n" "Build it first:\n" " bash src/build_ns3_lena_sim.sh\n" "Or set NS3LenaConfig.sim_binary to the correct path." ) if not os.access(path, os.X_OK): raise PermissionError( f"ns3_lena_sim binary at '{path}' is not executable.\n" "Run: chmod +x " + path ) return path def _start_subprocess(self) -> None: binary = self._resolve_binary() cfg = self._ns3_cfg cmd = [ binary, f"--step-ms={cfg.step_duration_ms}", f"--distance={cfg.distance_m}", f"--freq={cfg.frequency_ghz * 1e9}", f"--bandwidth={cfg.bandwidth_ghz * 1e9}", f"--ue-tx-power={cfg.ue_tx_power_dbm}", f"--gnb-tx-power={cfg.gnb_tx_power_dbm}", f"--scenario={cfg.scenario}", f"--numerology={cfg.numerology}", f"--shadowing={int(cfg.shadowing_enabled)}", f"--pkt-size={cfg.packet_size_bytes}", ] self._proc = subprocess.Popen( cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, bufsize=0, text=True, encoding="utf-8", ) def _drain_stderr_loop() -> None: try: for line in self._proc.stderr: # type: ignore[union-attr] self._stderr_buf.append(line.rstrip("\n\r")) except Exception: pass threading.Thread(target=_drain_stderr_loop, daemon=True).start() try: ready_line = self._read_line(timeout=60.0) except TimeoutError: self._kill_subprocess() raise RuntimeError( "ns3_lena_sim did not emit READY within 60 s. " "Check stderr for ns-3 error messages." ) if ready_line != "READY": stderr_preview = self._drain_stderr() self._kill_subprocess() raise RuntimeError( f"ns3_lena_sim emitted unexpected startup line: '{ready_line}'\n" f"stderr: {stderr_preview}" ) def _kill_subprocess(self) -> None: if self._proc is not None: try: self._proc.kill() self._proc.wait(timeout=5) except Exception: pass self._proc = None def _send_command(self, line: str) -> None: if self._proc is None or self._proc.stdin is None: raise RuntimeError("NS3LenaChannel: subprocess is not running.") try: self._proc.stdin.write(line + "\n") self._proc.stdin.flush() except BrokenPipeError as exc: stderr_preview = self._drain_stderr() raise RuntimeError( "NS3LenaChannel: subprocess stdin pipe broken.\n" f"stderr: {stderr_preview}" ) from exc def _read_line(self, timeout: float = 10.0) -> str: if self._proc is None or self._proc.stdout is None: raise RuntimeError("NS3LenaChannel: subprocess is not running.") result: List[Optional[str]] = [None] exc_holder: List[Optional[Exception]] = [None] def _read(): try: result[0] = self._proc.stdout.readline() # type: ignore[union-attr] except Exception as e: exc_holder[0] = e t = threading.Thread(target=_read, daemon=True) t.start() t.join(timeout) if t.is_alive(): stderr_preview = self._drain_stderr() raise TimeoutError( f"NS3LenaChannel: subprocess did not respond within {timeout}s.\n" f"stderr: {stderr_preview}" ) if exc_holder[0] is not None: raise exc_holder[0] line = result[0] if line is None or line == "": stderr_preview = self._drain_stderr() raise RuntimeError( "NS3LenaChannel: subprocess stdout closed (process exited?).\n" f"Return code: {self._proc.poll()}\n" f"stderr: {stderr_preview}" ) return line.rstrip("\n\r") def _drain_stderr(self, max_lines: int = 50) -> str: if not self._stderr_buf: return "<no stderr>" lines = list(self._stderr_buf)[-max_lines:] return "\n".join(lines) def __del__(self) -> None: try: if self._proc is not None and self._proc.poll() is None: self._send_command("QUIT") self._proc.wait(timeout=2) except Exception: pass finally: self._kill_subprocess()