"""
desispec.workflow.batch
=======================
Utilities for working with slurm batch queues.
"""
import os
from importlib import resources
import yaml
import numpy as np
from desiutil.log import get_logger
_config_cache = dict()
[docs]
def get_config(name):
"""
Return configuration dictionary for system `name`
Args:
name (str): e.g. cori-haswell, cori-knl, dirac, perlmutter-gpu, ...
Returns dictionary with keys:
* site: location of system, e.g. 'NERSC'
* cores_per_node: number of physical cores per node
* threads_per_core: hyperthreading / SMT per core
* memory: memory per node in GB
* timefactor: scale time estimates by this amount on this system
* gpus_per_node: number of GPUs per node
* batch_opts: list of additional batch options for script header
"""
if name is None:
name = default_system()
global _config_cache
if name in _config_cache:
return _config_cache[name]
configfile = resources.files('desispec').joinpath('data/batch_config.yaml')
with open(configfile) as fx:
config = yaml.safe_load(fx)
#- Add the name for reference, in case it was default selected
config['name'] = name
#- Add to cache so that we don't have to re-read batch_config.yaml every time
_config_cache[name] = config[name]
return config[name]
[docs]
def default_system(jobdesc=None, no_gpu=False):
"""
Guess default system to use based on environment
Args:
jobdesc (str): Description of the job in the processing table (optional).
no_gpu (bool): Don't use GPU's even if available. Default False.
Returns:
name (str): default system name to use
"""
log = get_logger()
name = None
if 'NERSC_HOST' in os.environ:
if os.environ['NERSC_HOST'] == 'perlmutter':
## HARDCODED: for now arcs and biases can't use gpu's, so use cpu's
if jobdesc in ['linkcal', 'arc', 'biasnight', 'biaspdark',
'ccdcalib', 'badcol', 'psfnight', 'pdark' ]:
name = 'perlmutter-cpu'
elif no_gpu:
name = 'perlmutter-cpu'
else:
name = 'perlmutter-gpu'
elif os.path.isdir('/clusterfs/dirac1'):
name = 'dirac'
if name is None:
msg = 'Unable to determine default batch system from environment'
log.error(msg)
raise RuntimeError(msg)
else:
log.info(f'Guessing default batch system {name}')
return name
[docs]
def parse_reservation(reservation, jobdesc):
"""
Parse reservation name into cpu/gpu reservation based upon jobdesc
Args:
reservation (str): resvname or resvname_cpu,resvname_gpu or None
jobdesc (str): job description string e.g. 'arc', 'flat', 'tilenight'
Returns:
cpu_reservation_name, gpu_reservation_name
If a single reservation name is provided, return both cpu/gpu as the same.
If either is 'none' (case-insensitive), return None for that reservation
"""
if reservation is None:
return reservation
tmp = reservation.split(',')
if len(tmp) == 1:
reservation_cpu = reservation_gpu = reservation
elif len(tmp) == 2:
reservation_cpu, reservation_gpu = tmp
else:
raise ValueError(f'Unable to parse {reservation} as rescpu,resgpu')
if reservation_cpu.lower() == 'none':
reservation_cpu = None
if reservation_gpu.lower() == 'none':
reservation_gpu = None
system_name = default_system(jobdesc)
config = get_config(system_name)
if 'gpus_per_node' not in config or config['gpus_per_node'] == 0:
return reservation_cpu
else:
return reservation_gpu
[docs]
def determine_resources(ncameras, jobdesc, nexps=1, forced_runtime=None, queue=None, system_name=None):
"""
Determine the resources that should be assigned to the batch script given what
desi_proc needs for the given input information.
Args:
ncameras (int): number of cameras to be processed
jobdesc (str): type of data being processed
nexps (int, optional): the number of exposures processed in this step
queue (str, optional): the Slurm queue to be submitted to. Currently not used.
system_name (str, optional): batch compute system, e.g. cori-haswell or perlmutter-gpu
Returns:
tuple: A tuple containing:
* ncores: int, number of cores (actually 2xphysical cores) that should be submitted via "-n {ncores}"
* nodes: int, number of nodes to be requested in the script. Typically (ncores-1) // cores_per_node + 1
* runtime: int, the max time requested for the script in minutes for the processing.
"""
if system_name is None:
system_name = default_system(jobdesc=jobdesc)
config = get_config(system_name)
log = get_logger()
jobdesc = jobdesc.upper()
nspectro = (ncameras - 1) // 3 + 1
nodes = None
if jobdesc in ('ARC', 'TESTARC'):
ncores = 20 * (10*(ncameras+1)//20) # lowest multiple of 20 exceeding 10 per camera
ncores, runtime = ncores + 1, 45 # + 1 for worflow.schedule scheduler proc
elif jobdesc in ('FLAT', 'TESTFLAT'):
runtime = 40
if system_name.startswith('perlmutter'):
ncores = config['cores_per_node']
else:
ncores = 20 * nspectro
elif jobdesc == 'TILENIGHT':
runtime = int(60. / 140. * ncameras * nexps) # 140 frames per node hour
runtime += 40 # overhead
ncores = config['cores_per_node']
if not system_name.startswith('perlmutter'):
msg = 'tilenight cannot run on system_name={}'.format(system_name)
log.critical(msg)
raise ValueError(msg)
elif jobdesc in ('SKY', 'TWILIGHT', 'SCIENCE','PRESTDSTAR'):
runtime = 30
if system_name.startswith('perlmutter'):
ncores = config['cores_per_node']
else:
ncores = 20 * nspectro
elif jobdesc in ('DARK', 'BADCOL'):
ncores, runtime = ncameras, 5
elif jobdesc in ('BIASNIGHT', 'BIASPDARK', 'NIGHTLYBIAS'):
## Jobs are memory limited, so use 15 cores max per node
## 8 minutes to run biases plus startup plus overhead
nodes = (ncameras // 16) + 1 # 2 nodes unless ncameras <= 15
ncores = ncameras # redefined for biaspdark below
runtime = 12. #~8 minutes after perlmutter system scaling factor
elif jobdesc in ('PDARK'):
## only one node since not memory or compute intensive
## ncores and runtime are defined below the if-elif-else statement,
## but we need to define nodes here for the scaling
nodes = 1
runtime, ncores = 0., 1 # both redefined later on
elif jobdesc == 'CCDCALIB':
nodes = 1
ncores, runtime = ncameras, 7 # 5 mins after perlmutter system scaling factor
elif jobdesc == 'ZERO':
ncores, runtime = 2, 5
elif jobdesc == 'PSFNIGHT':
ncores, runtime = ncameras, 5
elif jobdesc == 'NIGHTLYFLAT':
ncores, runtime = ncameras, 5
elif jobdesc == 'STDSTARFIT':
#- Special case hardcode: stdstar parallelism maxes out at ~30 cores
#- and on KNL, it OOMs above that anyway.
#- This might be more related to using a max of 30 standards, not that
#- there are 30 cameras (coincidence).
#- Use 32 as power of 2 for core packing
ncores = 32
runtime = 8+2*nexps
elif jobdesc == 'POSTSTDSTAR':
runtime = 10
ncores = ncameras
elif jobdesc in ['PEREXP', 'PERNIGHT', 'CUMULATIVE', 'CUSTOMZTILE']:
if system_name.startswith('perlmutter'):
nodes, runtime = 1, 50 #- timefactor will bring time back down
else:
nodes, runtime = 2, 30
ncores = nodes * config['cores_per_node']
elif jobdesc in ('HEALPIX', 'UNIQPIX'):
nodes = 1
runtime = 100
ncores = nodes * config['cores_per_node']
elif jobdesc == 'LINKCAL':
nodes, ncores = 1, 1
runtime = 5.
else:
msg = 'unknown jobdesc={}'.format(jobdesc)
log.critical(msg)
raise ValueError(msg)
## Above we didn't assign any runtime or n_cores for PDARK jobs.
## We can only run so many cam-exp pairs in parallel, so there is a
## linear scaling of runtime with the number of loops we have to wait for,
## which is ncameras*nexps/cores_available.
if jobdesc.endswith('PDARK'):
## base startup time and contingency
runtime += 6.
## now scale with the number of loops through ncameras*nexps tasks
## can do 1 core per camera per exp, but limit to cores available
ncores = min([ncameras*nexps, nodes*config['cores_per_node']])
runtime += 4.*np.ceil(float(ncameras*nexps)/float(nodes*config['cores_per_node']))
if forced_runtime is not None:
runtime = forced_runtime
if nodes is None:
nodes = (ncores - 1) // config['cores_per_node'] + 1
# - Arcs and flats make good use of full nodes, but throttle science
# - exposures to 5 nodes to enable two to run together in the 10-node
# - realtime queue, since their wallclock is dominated by less
# - efficient sky and fluxcalib steps
if jobdesc in ('ARC', 'TESTARC'):#, 'FLAT', 'TESTFLAT'):
max_realtime_nodes = 10
else:
max_realtime_nodes = 5
#- Pending further optimizations, use same number of nodes in all queues
### if (queue == 'realtime') and (nodes > max_realtime_nodes):
if (nodes > max_realtime_nodes):
nodes = max_realtime_nodes
ncores = config['cores_per_node'] * nodes
if jobdesc in ('ARC', 'TESTARC'):
# adjust for workflow.schedule scheduler proc
ncores = ((ncores - 1) // 20) * 20 + 1
#- Allow KNL jobs to be slower than Haswell,
#- except for ARC so that we don't have ridiculously long times
#- (Normal arc is still ~15 minutes, albeit with a tail)
if jobdesc not in ['ARC', 'TESTARC']:
runtime *= config['timefactor']
#- Do not allow runtime to be less than 8 min except for LINKCAL
if runtime < 8. and jobdesc not in ('LINKCAL'):
runtime = 8.
elif runtime < 5.:
runtime = 5.
#- Add additional overhead factor if needed
if 'NERSC_RUNTIME_OVERHEAD' in os.environ:
t = os.environ['NERSC_RUNTIME_OVERHEAD']
log.info(f'Adding $NERSC_RUNTIME_OVERHEAD={t} minutes to batch runtime request')
runtime += float(runtime)
return ncores, nodes, runtime