# Sysmon unavailable → None (substrate leaves GPU snapshot fields at defaults).
stats = {'pid': 1111, 'subtree_pids': [2222]}
assert attribute_gpu_to_worker_subtree(stats, None) is None
class _NoListProcesses:
def get_system_status(self): return {}
assert attribute_gpu_to_worker_subtree(stats, _NoListProcesses()) is NoneSubstrate Telemetry Helpers
JobQueue._sample_resource_snapshot (CR-6 Stage 3) and PluginManager._record_sample_safe (CR-7).
attribute_gpu_to_worker_subtree
Sums GPU memory across the worker’s subtree by intersecting the worker-reported subtree PID set with the system-monitor’s GPU-process enumeration. Returns None when no system-monitor is available; returns (0.0, None) when sysmon is available but no subtree PID holds GPU memory. The gpu_index returned is taken from the highest-VRAM PID in the subtree (a stable tiebreak; multi-GPU subtree-spanning workloads are exceptional and can refine later).
attribute_gpu_to_worker_subtree
def attribute_gpu_to_worker_subtree(
stats:Dict, # Worker `/stats` payload (must include 'pid'; uses 'subtree_pids' if present)
sysmon:Any, # The configured MonitorPlugin (or None)
)->Optional:
Attribute GPU memory across the worker’s process subtree.
Returns {'gpu_memory_mb': float, 'gpu_index': Optional[int]} when sysmon is reachable, or None when sysmon isn’t configured / doesn’t expose list_processes() / errors out. Callers treat None as “sysmon unavailable” and leave GPU snapshot fields as their defaults; a 0.0 sum means sysmon worked but no subtree PID holds GPU memory (CPU-only plugin on a GPU box).
Tests
# Worker pid 1111 spawned vLLM grandchild 9999; sysmon enumerates the grandchild.
# The pre-fix substrate matched only the worker pid and reported gpu_memory_mb=0.
class _Sysmon:
def list_processes(self):
return [
{'pid': 9999, 'gpu_index': 0, 'gpu_memory_mb': 4096.0, 'command': 'vllm server'},
{'pid': 7777, 'gpu_index': 0, 'gpu_memory_mb': 512.0, 'command': 'other process'},
]
stats = {'pid': 1111, 'subtree_pids': [1111, 9999]}
result = attribute_gpu_to_worker_subtree(stats, _Sysmon())
assert result == {'gpu_memory_mb': 4096.0, 'gpu_index': 0}, result# Worker subtree spans multiple GPU-holding PIDs; total sums + best gpu_index is the highest-VRAM PID's.
class _Sysmon2:
def list_processes(self):
return [
{'pid': 100, 'gpu_index': 0, 'gpu_memory_mb': 256.0},
{'pid': 200, 'gpu_index': 1, 'gpu_memory_mb': 1024.0},
{'pid': 300, 'gpu_index': 1, 'gpu_memory_mb': 512.0}, # in tree but smaller
]
stats = {'pid': 100, 'subtree_pids': [100, 200, 300]}
result = attribute_gpu_to_worker_subtree(stats, _Sysmon2())
assert result == {'gpu_memory_mb': 256.0 + 1024.0 + 512.0, 'gpu_index': 1}, result# CPU-only plugin on a GPU box: sysmon works but the worker's subtree holds no GPU memory.
# Returns 0.0 (not None) so empirical store records an honest "no GPU usage" sample.
class _Sysmon3:
def list_processes(self):
return [{'pid': 9999, 'gpu_index': 0, 'gpu_memory_mb': 4096.0}]
stats = {'pid': 1111, 'subtree_pids': [1111, 2222]}
result = attribute_gpu_to_worker_subtree(stats, _Sysmon3())
assert result == {'gpu_memory_mb': 0.0, 'gpu_index': None}, result# Accept dataclass-shaped ProcessStats (CR-3 worker-direct calls) in addition to dicts (proxy round-trip).
from dataclasses import dataclass
@dataclass
class _PS:
pid: int
gpu_index: int
gpu_memory_mb: float
command: str = ''
class _SysmonDc:
def list_processes(self):
return [_PS(pid=9999, gpu_index=0, gpu_memory_mb=4096.0)]
stats = {'pid': 1111, 'subtree_pids': [9999]}
result = attribute_gpu_to_worker_subtree(stats, _SysmonDc())
assert result == {'gpu_memory_mb': 4096.0, 'gpu_index': 0}, result# Backward compat: pre-fix worker /stats without `subtree_pids` falls back to {worker_pid} only.
# (Substrate publishing precedes the cascade; in-between, hosts may have an old worker but a new substrate.)
class _Sysmon4:
def list_processes(self):
return [
{'pid': 1111, 'gpu_index': 0, 'gpu_memory_mb': 256.0},
{'pid': 9999, 'gpu_index': 0, 'gpu_memory_mb': 4096.0}, # grandchild — invisible without subtree_pids
]
stats = {'pid': 1111} # no subtree_pids
result = attribute_gpu_to_worker_subtree(stats, _Sysmon4())
assert result == {'gpu_memory_mb': 256.0, 'gpu_index': 0}, result # worker-only, the pre-fix behavior# Sysmon error → None (failures shouldn't break snapshot/sample paths).
class _SysmonBroken:
def list_processes(self):
raise RuntimeError('sysmon explosion')
stats = {'pid': 1111, 'subtree_pids': [1111]}
assert attribute_gpu_to_worker_subtree(stats, _SysmonBroken()) is None