Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 22 additions & 3 deletions ats/atsMachines/fluxScheduled.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,20 @@ def get_physical_node(self, rel_index):
Works for any node prefix (e.g., rzadams, elcap, tuo, syz).
"""
if FluxScheduled._cached_nodes is None:
# Query instance-level -> >0 = in allocation
inst_lvl = 0
inst_lvl_out = subprocess.run(['flux', 'getattr', 'instance-level'], capture_output=True)
if inst_lvl_out.returncode == 0:
try:
inst_lvl = int(inst_lvl_out.stdout.strip())
except ValueError:
pass

if inst_lvl == 0:
raise RuntimeError(
"flux instance-level is 0 or not set. Use of ATS same_node feature requires running ATS within an allocation."
)

out = subprocess.check_output("flux resource list", shell=True).decode()
nodelist_field = None
for line in out.splitlines():
Expand All @@ -155,7 +169,6 @@ def get_physical_node(self, rel_index):
if nodelist_field is None:
raise RuntimeError("Could not find NODELIST field in flux resource list output. Use of ATS same_node feature requires running ATS within an allocation.")
FluxScheduled._cached_nodes = self.expand_nodelist(nodelist_field)
log(("Info: Physical Hardware Nodes: %s" % FluxScheduled._cached_nodes), echo=True)

nodes = FluxScheduled._cached_nodes
if rel_index < 0 or rel_index >= len(nodes):
Expand Down Expand Up @@ -225,6 +238,10 @@ def examineOptions(self, options):
if self.use_flux_rm:
log("Info: Will use flux resource manager to verify free resources", echo=True)

if options.verbose:
if FluxScheduled._cached_nodes:
log(f"Info: Physical Hardware Nodes: {FluxScheduled._cached_nodes}", echo=True)

if self.cpx or self.within_cpx_allocation():
log("NOTICE: Running in CPX mode", echo=True)
os.environ['FLUX_MPIBIND_USE_TOPOFILE'] = "1"
Expand Down Expand Up @@ -350,6 +367,8 @@ def calculateCommandList(self, test):
self.node_list.append(same_node)
rel_node = self.node_list.index(same_node) % self.numNodes
physical_node = self.get_physical_node(rel_node)
if configuration.options.verbose:
log(f"{test.name}:{physical_node}", echo=True)
ret.append(f"--requires=host:{physical_node}")

"""
Expand Down Expand Up @@ -381,7 +400,7 @@ def calculateCommandList(self, test):

if same_node: # Need to limit -N if we want to run on the same node
if test.num_nodes > 0: # Check that -N was set by user before giving warning
log("ATS WARNING: Limiting nodes to 1 be able to run on same node.", echo=True)
log(f"ATS WARNING: {test.name} : Limiting nodes to 1 be able to run on same node.", echo=True)
ret.append("-N1")
ret.append("--exclusive")
elif test.num_nodes > 0:
Expand All @@ -393,7 +412,7 @@ def calculateCommandList(self, test):
ret.append("--exclusive")

if test.np > self.coresPerNode and same_node: # Need to limit cores to be the max on the node if we want to run on the same node
log(f"ATS WARNING: Limiting cores, because of same_node option, to match max: {self.coresPerNode}", echo=True)
log(f"ATS WARNING: {test.name} : Limiting cores, because of same_node option, to match max: {self.coresPerNode}", echo=True)
ret.append(f"-n{self.coresPerNode}")
else:
ret.append(f"-n{test.np}")
Expand Down
10 changes: 6 additions & 4 deletions ats/atsMachines/slurmProcessorScheduled.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,10 +133,6 @@ def get_physical_node(self, rel_index):
# ).decode().splitlines()
# self._cached_nodes = out

# Optional logging
log(("Info: Physical Hardware Nodes: %s" % self._cached_nodes), echo=True)
# log(f"Info: Physical Hardware Nodes: {self._cached_nodes}", echo=True)

nodes = self._cached_nodes
if rel_index < 0 or rel_index >= len(nodes):
raise IndexError(f"Relative index {rel_index} out of range (0-{len(nodes)-1})")
Expand Down Expand Up @@ -237,6 +233,10 @@ def examineOptions(self, options):
print("%s options.filter = %s " % (DEBUG_SLURM, options.filter))
print("%s options.glue = %s " % (DEBUG_SLURM, options.glue))

if options.verbose:
if self._cached_nodes:
log(f"Info: Physical Hardware Nodes: {self._cached_nodes}", echo=True)

if options.npMax > 0:
self.npMax = options.npMax
else:
Expand Down Expand Up @@ -395,6 +395,8 @@ def calculateCommandList(self, test):
self.node_list.append(same_node)
rel_node = self.node_list.index(same_node) % self.numNodes
physical_node = self.get_physical_node(rel_node)
if configuration.options.verbose:
log(f"{test.name}:{physical_node}", echo=True)
srun_nodelist = '--nodelist=%s' % physical_node
distribution="no_distribution"

Expand Down