Source code for desispec.workflow.batch_writer

"""
desispec.workflow.batch_writer
==============================

Utilities for writing slurm batch scripts.
"""

import os
import sys
from desispec.workflow.batch import determine_resources
import numpy as np
from desispec.io import findfile
from desispec.io.util import decode_camword, parse_cameras
from desispec.workflow import batch
from desiutil.log import get_logger


[docs] def get_desi_proc_batch_file_name(night, exp, jobdesc, cameras): """ Returns the default directory location to store a batch script file given a night Args: night: str or int, defines the night (should be 8 digits) exp: str, int, or array of ints, defines the exposure id(s) relevant to the job jobdesc: str, type of data being processed cameras: str or list of str. If str, must be camword, If list, must be list of cameras to include in the processing. Returns: pathname: str, the default script name for a desi_proc batch script file """ camword = parse_cameras(cameras) if jobdesc.lower() == 'linkcal': expstr = "" elif type(exp) is not str: if exp is None: expstr = 'none' elif np.isscalar(exp): expstr = '{:08d}'.format(exp) else: #expstr = '-'.join(['{:08d}'.format(curexp) for curexp in exp]) expstr = '{:08d}'.format(exp[0]) else: expstr = exp if expstr != "": expstr = "-" + expstr jobname = f'{jobdesc.lower()}-{night}{expstr}-{camword}' return jobname
[docs] def get_desi_proc_batch_file_path(night,reduxdir=None): """ Returns the default directory location to store a batch script file given a night Args: night (str or int): defines the night (should be 8 digits) reduxdir (str, optional): define the base directory where the /run/scripts directory should or does live. Returns: str: the default location where a batch script file should be written """ if reduxdir is None: from desispec.io import specprod_root reduxdir = specprod_root() batchdir = os.path.join(reduxdir, 'run', 'scripts', 'night', str(night)) return batchdir
[docs] def get_desi_proc_batch_file_pathname(night, exp, jobdesc, cameras, reduxdir=None): """ Returns the default directory location to store a batch script file given a night Args: night: str or int, defines the night (should be 8 digits) exp: str, int, or array of ints, defines the exposure id(s) relevant to the job jobdesc: str, type of data being processed cameras: str or list of str. If str, must be camword, If list, must be list of cameras to include in the processing. reduxdir: str (optional), define the base directory where the /run/scripts directory should or does live Returns: pathname: str, the default location and script name for a desi_proc batch script file """ path = get_desi_proc_batch_file_path(night, reduxdir=reduxdir) name = get_desi_proc_batch_file_name(night, exp, jobdesc, cameras) return os.path.join(path, name)
[docs] def get_desi_proc_tilenight_batch_file_name(night, tileid): """ Returns the filename for a tilenight batch script file given a night and tileid Args: night: str or int, defines the night (should be 8 digits) tileid: str or int, defines the tile id relevant to the job Returns: pathname: str, the default script name for a desi_proc_tilenight batch script file """ if type(tileid) is not str: if np.isscalar(tileid): tileid = '{}'.format(tileid) else: raise RuntimeError('tileid should be either int or str') jobname = 'tilenight-{}-{}'.format(night, tileid) return jobname
[docs] def get_desi_proc_tilenight_batch_file_pathname(night, tileid, reduxdir=None): """ Returns the default directory location to store a tilenight batch script file given a night and tileid Args: night: str or int, defines the night (should be 8 digits) tileid: str or int, defines the tile id relevant to the job reduxdir: str (optional), define the base directory where the /run/scripts directory should or does live Returns: pathname: str, the default location and script name for a desi_proc_tilenight batch script file """ path = get_desi_proc_batch_file_path(night,reduxdir=reduxdir) name = get_desi_proc_tilenight_batch_file_name(night,tileid) return os.path.join(path, name)
[docs] def wrap_command_for_script(cmd, nodes, ntasks, threads_per_task, stepname='step'): """ Wraps a command for execution in a bash script using srun. Args: cmd (str): The command to be executed. nodes (int): Number of nodes to use. ntasks (int): Total number of tasks to use. threads_per_task (int): Number of threads per core. stepname (str): Short name of command step for logging purposes only Returns: str: The wrapped command ready for inclusion in a bash script. """ srun = f'srun -N {nodes} -n {ntasks} -c {threads_per_task} --cpu-bind=cores {cmd}' wrapped_cmd = f'\necho Running {srun}\n' wrapped_cmd += f'{srun}\n\n' wrapped_cmd += 'if [ $? -eq 0 ]; then\n' wrapped_cmd += f' echo {stepname} succeeded at $(date)\n' wrapped_cmd += 'else\n' wrapped_cmd += f' echo FAILED: {stepname} failed, stopping at $(date)\n' wrapped_cmd += ' exit 1\n' wrapped_cmd += 'fi\n' return wrapped_cmd
[docs] def wrapup_for_script(): """ Give the boiler plate ending to a DESI slurm script echo'ing that the job succeeded or failed """ wrapped_cmd = "\n\n" wrapped_cmd += 'if [ $? -eq 0 ]; then\n' wrapped_cmd += ' echo All done at $(date)\n' wrapped_cmd += 'else\n' wrapped_cmd += ' echo FAILED: Script failed, stopping at $(date)\n' wrapped_cmd += ' exit 1\n' wrapped_cmd += 'fi\n' return wrapped_cmd
[docs] def create_linkcal_batch_script(newnight, queue, cameras=None, runtime=None, batch_opts=None, timingfile=None, batchdir=None, jobname=None, cmd=None, system_name=None): """ Generate a batch script to be submitted to the slurm scheduler to run desi_link_calibnight. Args: newnight (str or int): The night in calibnight where the links will queue (str): Queue to be used. cameras (str or list of str): List of cameras to include in the processing. runtime (str, optional): Timeout wall clock time. batch_opts (str, optional): Other options to give to the slurm batch scheduler (written into the script). timingfile (str, optional): Specify the name of the timing file. batchdir (str, optional): Specify where the batch file will be written. jobname (str, optional): Specify the name of the slurm script written. cmd (str, optional): Complete command as would be given in terminal to run desi_link_calibnight. system_name (str, optional): name of batch system, e.g. cori-haswell, cori-knl Returns: scriptfile: the full path name for the script written. Note: batchdir and jobname can be used to define an alternative pathname, but may not work with assumptions in desi_proc. These optional arguments should be used with caution and primarily for debugging. """ jobdesc = 'linkcal' if cameras is None or np.isscalar(cameras): camword = cameras cameras = decode_camword(camword) if batchdir is None: batchdir = get_desi_proc_batch_file_path(newnight) os.makedirs(batchdir, exist_ok=True) if jobname is None: jobname = get_desi_proc_batch_file_name(night=newnight, exp="", jobdesc=jobdesc, cameras=cameras) if timingfile is None: timingfile = f'{jobname}-timing-$SLURM_JOBID.json' scriptfile = os.path.join(batchdir, jobname + '.slurm') ## If system name isn't specified, pick it based upon jobdesc if system_name is None: system_name = batch.default_system(jobdesc=jobdesc) batch_config = batch.get_config(system_name) threads_per_core = batch_config['threads_per_core'] gpus_per_node = batch_config['gpus_per_node'] ncameras = len(cameras) ncores, nodes, runtime = determine_resources(ncameras, jobdesc.upper(), forced_runtime=runtime, system_name=system_name) runtime_hh = int(runtime // 60) runtime_mm = int(runtime % 60) with open(scriptfile, 'w') as fx: fx.write('#!/bin/bash -l\n\n') fx.write('#SBATCH -N {}\n'.format(nodes)) fx.write('#SBATCH --qos {}\n'.format(queue)) for opts in batch_config['batch_opts']: fx.write('#SBATCH {}\n'.format(opts)) if batch_opts is not None: fx.write('#SBATCH {}\n'.format(batch_opts)) if system_name == 'perlmutter-gpu': # perlmutter-gpu requires projects name with "_g" appended fx.write('#SBATCH --account desi_g\n') else: fx.write('#SBATCH --account desi\n') fx.write('#SBATCH --job-name {}\n'.format(jobname)) fx.write('#SBATCH --output {}/{}-%j.log\n'.format(batchdir, jobname)) fx.write('#SBATCH --time={:02d}:{:02d}:00\n'.format(runtime_hh, runtime_mm)) #fx.write('#SBATCH --exclusive\n') fx.write('\n') fx.write(f'# {jobdesc} with {ncameras} cameras\n') fx.write(f'# using {ncores} cores on {nodes} nodes\n\n') fx.write('echo Starting job $SLURM_JOB_ID on $(hostname) at $(date)\n') # fx.write("export OMP_NUM_THREADS=1\n") fx.write(f'cd {batchdir}\n') fx.write(f'\n# Link refnight to new night\n') fx.write(wrap_command_for_script(cmd, nodes, ntasks=ncores, threads_per_task=threads_per_core)) fx.write(wrapup_for_script()) print('Wrote {}'.format(scriptfile)) print('logfile will be {}/{}-JOBID.log\n'.format(batchdir, jobname)) return scriptfile
[docs] def create_biaspdark_batch_script(night, expids, jobdesc=None, camword='a0123456789', do_biasnight=False, do_pdark=False, queue=None, system_name=None): """ Generate a SLURM batch script to be submitted to the slurm scheduler to run biasnight and then preproc darks script. Args: night (str or int): The night in which the biaspdark script will be run. expids (list of int or np.array): The exposure id(s) for the data. These are the dark expids if pdark or biaspdark is being run. Otherwise it is a zero expid. jobdesc (str, optional): Description of the job to be performed. If None, will default to 'biaspdark' or 'pdark' depending on do_biasnight and do_pdark. camword (str): Camword of cameras to include in the processing. do_biasnight (bool): If True, run the nightly bias script first. do_pdark (bool): If True, run the preproc darks script. queue (str): Queue to be used. system_name (str, optional): name of batch system, e.g. cori-haswell, perlmutter Returns: scriptpathname (str): The full path name for the biaspdark batch script file. """ log = get_logger() if jobdesc is None: if do_biasnight: if do_pdark: jobdesc = 'biaspdark' else: jobdesc = 'biasnight' elif do_pdark: jobdesc = 'pdark' else: log.error('Must specify at least one of do_biasnight or do_pdark') raise ValueError('Must specify at least one of do_biasnight or do_pdark') ## Default to regular queue if queue is None: queue = 'regular' scriptpathname = get_desi_proc_batch_file_pathname(night=night, exp=expids, jobdesc=jobdesc, cameras=camword) scriptpathname += '.slurm' cameras = decode_camword(camword) ncameras = len(cameras) nexps = len(expids) if expids is not None else 1 expids = np.array(expids) if expids is not None else None batchdir = os.path.dirname(scriptpathname) os.makedirs(batchdir, exist_ok=True) jobname = os.path.basename(scriptpathname).removesuffix('.slurm') timingfile = f'{jobname}-timing-$SLURM_JOBID.json' if do_pdark and expids is None: log.error('Must provide exposure ids if requesting pdark') raise ValueError('Must provide exposure ids if requesting pdark') ## If system name isn't specified, guess it if system_name is None: system_name = batch.default_system(jobdesc=jobdesc) batch_config = batch.get_config(system_name) ## Get number of mpi workers nranks, nodes, runtime = determine_resources(ncameras, jobdesc=jobdesc, queue=queue, nexps=nexps, system_name=system_name) threads_on_node = batch_config['cores_per_node'] * batch_config['threads_per_core'] script_body = "" # Run nightlybias first if do_biasnight: ## One rank for each camera bias_nranks = ncameras ## srun won't split a ranks across nodes, so for ranks that aren't evenly split ## across nodes, make sure largest rank count with number of threads ## will still fit in a single node if nodes > 1 and bias_nranks % nodes != 0: largest_nranks_on_node = np.ceil(float(bias_nranks)/float(nodes)) bias_threads_per_rank = int(np.floor(threads_on_node / largest_nranks_on_node)) else: tot_threads = nodes * threads_on_node bias_threads_per_rank = int(np.floor(tot_threads // bias_nranks)) if bias_nranks * bias_threads_per_rank > nodes * threads_on_node: assertstring = f"Requested {bias_nranks} ranks with {bias_threads_per_rank} threads per rank on " \ + f"{nodes} nodes with {threads_on_node} threads per node exceeds available threads ({nodes*threads_on_node})" log.critical(assertstring) raise AssertionError(assertstring) cmd = f'desi_proc --cameras {camword} -n {night} --nightlybias --mpi' cmd += f' --starttime $(date +%s) --timingfile {timingfile}' script_body += wrap_command_for_script(cmd, nodes, ntasks=bias_nranks, threads_per_task=bias_threads_per_rank, stepname='biasnight') # Then pdarks if do_pdark: ## if fewer than one-to-one assign more than one core to each rank (min of batch_config['threads_per_core'] ## since we don't use threads) ## srun won't split a rank across nodes, so for ranks that aren't evenly split ## across nodes, make sure largest rank count with number of threads ## will still fit in a single node if nodes > 1 and nranks % nodes != 0: largest_nranks_on_node = np.ceil(float(nranks)/float(nodes)) dark_threads_per_rank = int(np.floor(threads_on_node / largest_nranks_on_node)) else: tot_threads = nodes * threads_on_node dark_threads_per_rank = int(np.floor(nodes*batch_config['cores_per_node']*batch_config['threads_per_core'] // nranks)) if nranks * dark_threads_per_rank > nodes * threads_on_node: assertstring = f"Requested {nranks} ranks with {dark_threads_per_rank} threads per rank on " \ + f"{nodes} nodes with {threads_on_node} threads per node exceeds available threads ({nodes*threads_on_node})" log.critical(assertstring) raise AssertionError(assertstring) cmd = f'desi_preproc_darks -n {night} --expids={",".join(expids.astype(str))} --camword={camword} --mpi' script_body += wrap_command_for_script(cmd, nodes, ntasks=nranks, threads_per_task=dark_threads_per_rank, stepname='pdark') script_body += wrapup_for_script() runtime_hh = int(runtime // 60) runtime_mm = int(runtime % 60) with open(scriptpathname, 'w') as fx: fx.write('#!/bin/bash -l\n\n') fx.write('#SBATCH -N {}\n'.format(nodes)) fx.write('#SBATCH --qos {}\n'.format(queue)) for opts in batch_config['batch_opts']: fx.write('#SBATCH {}\n'.format(opts)) fx.write('#SBATCH --account desi\n') fx.write('#SBATCH --job-name {}\n'.format(jobname)) fx.write('#SBATCH --output {}/{}-%j.log\n'.format(batchdir, jobname)) fx.write('#SBATCH --time={:02d}:{:02d}:00\n'.format(runtime_hh, runtime_mm)) fx.write('#SBATCH --exclusive\n') fx.write('\n') # batch-friendly matplotlib backend fx.write('export MPLBACKEND=agg\n') ## we're using MPI for this job, so set OMP_NUM_THREADS to 1 fx.write("export OMP_NUM_THREADS=1\n") fx.write(f'# using {nodes*batch_config["cores_per_node"]} cores on {nodes} nodes\n\n') fx.write('echo Starting at $(date)\n') fx.write(f'cd {batchdir}\n') fx.write(script_body) print('Wrote {}'.format(scriptpathname)) print('logfile will be {}/{}-JOBID.log\n'.format(batchdir, jobname)) return scriptpathname
[docs] def create_ccdcalib_batch_script(night, expids, camword='a0123456789', do_darknight=False, do_badcolumn=False, do_ctecorr=False, n_nights_before=None, n_nights_after=None, dark_expid=None, cte_expids=None, queue=None, system_name=None): """ Generate a SLURM batch script to be submitted to the slurm scheduler to run the requested CCD calibration tasks Args: night (str or int): The night in which the ccdcalib script will be run. expids (list of int or np.array): The exposure id(s) for the data. camword (str): Camword of cameras to include in the processing. do_darknight (bool): If True, run the darknight script first. do_badcolumn (bool): If True, run the badcolumn script. do_ctecorr (bool): If True, run the ctecorr script. n_nights_before (int, optional): Number of nights before the current night to include in the darknight script. n_nights_after (int, optional): Number of nights after the current night to include in the darknight script. dark_expid (int, optional): The exposure id to use for the darknight script. If None, will use the first expid. cte_expids (list of int, optional): The exposure ids to use for the ctecorr script. If None, will use all expids except the first. queue (str): Queue to be used. system_name (str, optional): name of batch system, e.g. cori-haswell, perlmutter Returns: scriptpathname (str): The full path name for the ccdcalib batch script file. """ log = get_logger() if not (do_darknight or do_badcolumn or do_ctecorr): log.critical('Must specify at least one of do_darknight, do_badcolumn, or do_ctecorr') raise ValueError('Must specify at least one of do_darknight, do_badcolumn, or do_ctecorr') jobdesc = 'ccdcalib' ## Default to regular queue if queue is None: queue = 'regular' scriptpathname = get_desi_proc_batch_file_pathname(night=night, exp=expids, jobdesc=jobdesc, cameras=camword) scriptpathname += '.slurm' cameras = decode_camword(camword) ncameras = len(cameras) nexps = len(expids) if expids is not None else 1 batchdir = os.path.dirname(scriptpathname) os.makedirs(batchdir, exist_ok=True) jobname = os.path.basename(scriptpathname).removesuffix('.slurm') timingfile = f'{jobname}-timing-$SLURM_JOBID.json' ## If system name isn't specified, guess it if system_name is None: system_name = batch.default_system(jobdesc=jobdesc) batch_config = batch.get_config(system_name) ntasks, nodes, runtime = determine_resources(ncameras, jobdesc='ccdcalib', queue=queue, nexps=nexps, system_name=system_name) threads_on_node = batch_config['cores_per_node'] * batch_config['threads_per_core'] threads_per_task = int(np.floor((nodes*threads_on_node) / ntasks)) script_body = "" # Run nightlybias first if do_darknight: cmd = f'desi_compute_dark_night --reference-night={night} --camword={camword}' if n_nights_before is not None: cmd += f' --before={n_nights_before}' if n_nights_after is not None: cmd += f' --after={n_nights_after}' cmd += ' --mpi' ## darknight will hit memory limits if more than 10 are done on a ## single node simultaneously max_ranks_per_node = 10 if float(ntasks)/float(nodes) > max_ranks_per_node: ## will need to run in multiple batches, so reduce the ntasks and add more runtime dn_ntasks = max_ranks_per_node*nodes # concurrent ranks that won't hit memory limit issues dn_threads_per_task = int(np.floor(threads_on_node / max_ranks_per_node)) else: dn_ntasks, dn_threads_per_task = ntasks, threads_per_task runtime += 7.*np.ceil(float(ntasks)/float(dn_ntasks)) ## each loop takes about 3-5 minutes, but add 7 each for contingency script_body += wrap_command_for_script(cmd, nodes, ntasks=dn_ntasks, threads_per_task=dn_threads_per_task, stepname='darknight') # Then pdarks if do_badcolumn: if dark_expid is None: dark_expid = expids[0] cmd = f'desi_proc -n {night} --cameras {camword} -e {dark_expid} --mpi' cmd += f' --starttime $(date +%s) --timingfile {timingfile}' script_body += wrap_command_for_script(cmd, nodes, ntasks=ntasks, threads_per_task=threads_per_task, stepname='badcolumn') if do_ctecorr: if cte_expids is None: if do_darknight or do_badcolumn: cte_expids = expids[1:] else: cte_expids = expids cte_expstr = ','.join(np.array(cte_expids).astype(str)) cmd = f"desi_fit_cte_night -n {night} -c {camword} -e {cte_expstr}" script_body += wrap_command_for_script(cmd, nodes, ntasks=ntasks, threads_per_task=threads_per_task, stepname='ctecorr') script_body += wrapup_for_script() runtime_hh = int(runtime // 60) runtime_mm = int(runtime % 60) with open(scriptpathname, 'w') as fx: fx.write('#!/bin/bash -l\n\n') fx.write('#SBATCH -N {}\n'.format(nodes)) fx.write('#SBATCH --qos {}\n'.format(queue)) for opts in batch_config['batch_opts']: fx.write('#SBATCH {}\n'.format(opts)) fx.write('#SBATCH --account desi\n') fx.write('#SBATCH --job-name {}\n'.format(jobname)) fx.write('#SBATCH --output {}/{}-%j.log\n'.format(batchdir, jobname)) fx.write('#SBATCH --time={:02d}:{:02d}:00\n'.format(runtime_hh, runtime_mm)) fx.write('#SBATCH --exclusive\n') fx.write('\n') # batch-friendly matplotlib backend fx.write('export MPLBACKEND=agg\n') ## we're using MPI for this job, so set OMP_NUM_THREADS to 1 fx.write("export OMP_NUM_THREADS=1\n") fx.write(f'# using {nodes*batch_config["cores_per_node"]} cores on {nodes} nodes\n\n') fx.write('echo Starting at $(date)\n') fx.write(f'cd {batchdir}\n') fx.write(script_body) print('Wrote {}'.format(scriptpathname)) print('logfile will be {}/{}-JOBID.log\n'.format(batchdir, jobname)) return scriptpathname
[docs] def create_desi_proc_batch_script(night, exp, cameras, jobdesc, queue, runtime=None, batch_opts=None, timingfile=None, batchdir=None, jobname=None, cmdline=None, system_name=None, use_specter=False, no_gpu=False, nightlybias=None, nightlycte=None, cte_expids=None): """ Generate a SLURM batch script to be submitted to the slurm scheduler to run desi_proc. Args: night (str or int): The night the data was acquired exp (str, int, or list of int): The exposure id(s) for the data. cameras (str or list of str): List of cameras to include in the processing. jobdesc (str): Description of the job to be performed. Used to determine requested resources and whether to operate in a more mpi parallelism (all except poststdstar) or less (only poststdstar). Directly relate to the obstype, with science exposures being split into two (pre, post)-stdstar, and adding joint fit categories stdstarfit, psfnight, and nightlyflat. Options include: 'prestdstar', 'poststdstar', 'stdstarfit', 'arc', 'flat', 'psfnight', 'nightlyflat' queue (str): Queue to be used. runtime (str, optional): Timeout wall clock time. batch_opts (str, optional): Other options to give to the slurm batch scheduler (written into the script). timingfile (str, optional): Specify the name of the timing file. batchdir (str, optional): Specify where the batch file will be written. jobname (str, optional): Specify the name of the slurm script written. cmdline (str, optional): Complete command as would be given in terminal to run the desi_proc. Can be used instead of reading from argv. system_name (str, optional): name of batch system, e.g. cori-haswell, cori-knl use_specter (bool, optional): Use classic specter instead of gpu_specter for extractions no_gpu (bool, optional): Do not use GPU even if available nightlybias (bool): Create nightly bias model from ZEROs nightlycte (bool): Fit CTE model from LED exposures cte_expids (list): Explicitly name expids of the cte flat and flat to use for cte model Returns: scriptfile: the full path name for the script written. Note: batchdir and jobname can be used to define an alternative pathname, but may not work with assumptions in desi_proc. These optional arguments should be used with caution and primarily for debugging. """ log = get_logger() if np.isscalar(cameras): camword = cameras cameras = decode_camword(camword) if batchdir is None: batchdir = get_desi_proc_batch_file_path(night) os.makedirs(batchdir, exist_ok=True) if jobname is None: jobname = get_desi_proc_batch_file_name(night, exp, jobdesc, cameras) if timingfile is None: timingfile = f'{jobname}-timing-$SLURM_JOBID.json' scriptfile = os.path.join(batchdir, jobname + '.slurm') ## If system name isn't specified, pick it based upon jobdesc if system_name is None: system_name = batch.default_system(jobdesc=jobdesc) batch_config = batch.get_config(system_name) threads_per_core = batch_config['threads_per_core'] gpus_per_node = batch_config['gpus_per_node'] ncameras = len(cameras) nexps = 1 if exp is not None and not np.isscalar(exp) and type(exp) is not str: nexps = len(exp) ncores, nodes, runtime = determine_resources( ncameras, jobdesc.upper(), queue=queue, nexps=nexps, forced_runtime=runtime, system_name=system_name) ## derive from cmdline or sys.argv whether this is a nightlybias job ## if not explicitly defined if nightlybias is None: nightlybias = False if cmdline is not None: if '--nightlybias' in cmdline: nightlybias = True elif '--nightlybias' in sys.argv: nightlybias = True #- nightlybias jobs are memory limited, so throttle number of ranks if nightlybias: tot_threads = batch_config['threads_per_core'] * batch_config['cores_per_node'] bias_threads_per_core = tot_threads // 8 bias_cores, bias_nodes, bias_runtime = determine_resources( ncameras, 'NIGHTLYBIAS', queue=queue, nexps=nexps, system_name=system_name) nodes = max(nodes, bias_nodes) runtime += bias_runtime ## derive from cmdline or sys.argv whether this is a nightlycte job ## if not explicitly defined if nightlycte is None: nightlycte = False if cmdline is not None: if '--nightlycte' in cmdline: nightlycte = True elif '--nightlycte' in sys.argv: nightlycte = True ## nightlycte jobs add time to the job ## hardcoding a runtime for nightlycte. ## TODO should be moved into determine_resources() if nightlycte: cte_runtime = 5 runtime += cte_runtime #- arc fits require 3.2 GB of memory per bundle, so increase nodes as needed if jobdesc.lower() == 'arc': cores_per_node = (ncores-1) // nodes + ((ncores-1) % nodes > 0) mem_per_node = float(batch_config['memory']) mem_per_core = mem_per_node / cores_per_node while mem_per_core < 3.2: nodes += 1 cores_per_node = (ncores-1) // nodes + ((ncores-1) % nodes > 0) mem_per_core = mem_per_node / cores_per_node threads_per_node = batch_config['threads_per_core'] * batch_config['cores_per_node'] threads_per_core = (threads_per_node * nodes) // ncores runtime_hh = int(runtime // 60) runtime_mm = int(runtime % 60) with open(scriptfile, 'w') as fx: fx.write('#!/bin/bash -l\n\n') fx.write('#SBATCH -N {}\n'.format(nodes)) fx.write('#SBATCH --qos {}\n'.format(queue)) for opts in batch_config['batch_opts']: fx.write('#SBATCH {}\n'.format(opts)) if batch_opts is not None: fx.write('#SBATCH {}\n'.format(batch_opts)) if system_name == 'perlmutter-gpu': # perlmutter-gpu requires projects name with "_g" appended fx.write('#SBATCH --account desi_g\n') else: fx.write('#SBATCH --account desi\n') fx.write('#SBATCH --job-name {}\n'.format(jobname)) fx.write('#SBATCH --output {}/{}-%j.log\n'.format(batchdir, jobname)) fx.write('#SBATCH --time={:02d}:{:02d}:00\n'.format(runtime_hh, runtime_mm)) fx.write('#SBATCH --exclusive\n') fx.write('\n') #- Special case CFS readonly mount at NERSC #- SB 2023-01-27: disable this since Perlmutter might deprecate /dvs_ro; #- inherit it from the environment but don't hardcode into script itself # if 'DESI_ROOT_READONLY' in os.environ: # readonlydir = os.environ['DESI_ROOT_READONLY'] # elif os.environ['DESI_ROOT'].startswith('/global/cfs/cdirs'): # readonlydir = os.environ['DESI_ROOT'].replace( # '/global/cfs/cdirs', '/dvs_ro/cfs/cdirs', 1) # else: # readonlydir = None # # if readonlydir is not None: # fx.write(f'export DESI_ROOT_READONLY={readonlydir}\n\n') if cmdline is None: inparams = list(sys.argv).copy() elif np.isscalar(cmdline): inparams = [] for param in cmdline.split(' '): for subparam in param.split("="): inparams.append(subparam) else: inparams = list(cmdline) for parameter in ['--queue', '-q', '--batch-opts', '--cte-expids']: ## If a parameter is in the list, remove it and its argument ## Elif it is a '--' command, it might be --option=value, which won't be split. ## check for that and remove the whole "--option=value" if parameter in inparams: loc = np.where(np.array(inparams) == parameter)[0][0] # Remove the command inparams.pop(loc) # Remove the argument of the command (now in the command location after pop) inparams.pop(loc) elif '--' in parameter: for ii,inparam in enumerate(inparams.copy()): if parameter in inparam: inparams.pop(ii) break cmd = ' '.join(inparams) cmd = cmd.replace(' --batch', ' ').replace(' --nosubmit', ' ') cmd = cmd.replace(' --nightlycte', ' ') if '--mpi' not in cmd: cmd += ' --mpi' if jobdesc.lower() == 'stdstarfit': cmd += ' --mpistdstars' if no_gpu and '--no-gpu' not in cmd: cmd += ' --no-gpu' if (use_specter and ('--use-specter' not in cmd) and jobdesc.lower() in ['flat', 'science', 'prestdstar', 'tilenight']): cmd += ' --use-specter' cmd += ' --starttime $(date +%s)' cmd += f' --timingfile {timingfile}' fx.write(f'# {jobdesc} exposure with {ncameras} cameras\n') fx.write(f'# using {ncores} cores on {nodes} nodes\n\n') fx.write('echo Starting job $SLURM_JOB_ID on $(hostname) at $(date)\n') fx.write(f'cd {batchdir}\n') mps_wrapper='' if jobdesc.lower() == 'arc': fx.write("export OMP_NUM_THREADS={}\n".format(threads_per_core)) else: fx.write("export OMP_NUM_THREADS=1\n") if system_name == 'perlmutter-gpu' and jobdesc.lower() not in ['arc']: fx.write("export MPICH_GPU_SUPPORT_ENABLED=1\n") mps_wrapper='desi_mps_wrapper' if jobdesc.lower() not in ['science', 'prestdstar', 'stdstarfit', 'poststdstar']: if nightlybias: tmp = cmd.split() has_expid = False if '-e' in tmp: has_expid = True i = tmp.index('-e') tmp.pop(i) # -e tmp.pop(i) # EXPID if '--expid' in tmp: has_expid = True i = tmp.index('--expid') tmp.pop(i) # --expid tmp.pop(i) # EXPID bias_cmd = ' '.join(tmp) fx.write('\n# Run nightlybias first\n') srun=f'srun -N {bias_nodes} -n {bias_cores} -c {bias_threads_per_core} {bias_cmd}' fx.write('echo Running {}\n'.format(srun)) fx.write('{}\n'.format(srun)) if has_expid: fx.write('\nif [ $? -eq 0 ]; then\n') fx.write(' echo nightlybias succeeded at $(date)\n') fx.write('else\n') fx.write(' echo FAILED: nightlybias failed; stopping at $(date)\n') fx.write(' exit 1\n') fx.write('fi\n') if ' -e ' in cmd or ' --expid ' in cmd: fx.write('\n# Process exposure\n') cmd = cmd.replace(' --nightlybias', '') cmd = cmd.replace(' --nightlycte', '') srun=(f'srun -N {nodes} -n {ncores} -c {threads_per_core} --cpu-bind=cores ' +mps_wrapper+f' {cmd}') fx.write('echo Running {}\n'.format(srun)) fx.write('{}\n'.format(srun)) #- nightlybias implies that this is a ccdcalib job, #- where we will also run CTE fitting if nightlybias: #- first check if previous command failed fx.write('\nif [ $? -eq 0 ]; then\n') fx.write(' echo command succeeded at $(date)\n') fx.write('else\n') fx.write(' echo FAILED: processing failed; stopping at $(date)\n') fx.write(' exit 1\n') fx.write('fi\n') if nightlycte: #- then proceed with desi_fit_cte_night command camword = parse_cameras(cameras) fx.write('\n# Fit CTE parameters from flats if needed\n') cmd = f'desi_fit_cte_night -n {night} -c {camword}' if cte_expids is not None: cmd += f' -e ' + ','.join(np.atleast_1d(cte_expids).astype(str)) ctecorrfile = findfile('ctecorrnight', night=night) fname = os.path.basename(ctecorrfile) fx.write(f'if [ -f {ctecorrfile} ]; then\n') fx.write(f' echo Already have {fname}\n') fx.write(f'else\n') fx.write(f' echo running {cmd}\n') fx.write(f' {cmd}\n') fx.write(f'fi\n') else: if jobdesc.lower() in ['science', 'prestdstar', 'stdstarfit']: fx.write('\n# Do steps through stdstarfit at full MPI parallelism\n') srun = (f'srun -N {nodes} -n {ncores} -c {threads_per_core} --cpu-bind=cores ' +mps_wrapper+f' {cmd}') if jobdesc.lower() in ['science', 'prestdstar']: srun += ' --nofluxcalib' fx.write('echo Running {}\n'.format(srun)) fx.write('{}\n'.format(srun)) if jobdesc.lower() in ['science', 'poststdstar']: ntasks=ncameras tot_threads = nodes * batch_config['cores_per_node'] * batch_config['threads_per_core'] threads_per_task = max(int(tot_threads / ntasks), 1) fx.write('\n# Use less MPI parallelism for fluxcalib MP parallelism\n') fx.write('# This should quickly skip over the steps already done\n') #- fluxcalib multiprocessing parallelism needs --cpu-bind=none (or at least not "cores") srun = f'srun -N {nodes} -n {ntasks} -c {threads_per_task} --cpu-bind=none {cmd} ' fx.write('if [ $? -eq 0 ]; then\n') fx.write(' echo Running {}\n'.format(srun)) fx.write(' {}\n'.format(srun)) fx.write('else\n') fx.write(' echo FAILED: done at $(date)\n') fx.write(' exit 1\n') fx.write('fi\n') fx.write('\nif [ $? -eq 0 ]; then\n') fx.write(' echo SUCCESS: done at $(date)\n') fx.write('else\n') fx.write(' echo FAILED: done at $(date)\n') fx.write(' exit 1\n') fx.write('fi\n') print('Wrote {}'.format(scriptfile)) print('logfile will be {}/{}-JOBID.log\n'.format(batchdir, jobname)) return scriptfile
[docs] def create_desi_proc_tilenight_batch_script(night, exp, tileid, ncameras, queue, runtime=None, batch_opts=None, system_name=None, mpistdstars=True, use_specter=False, no_gpu=False, laststeps=None, cameras=None ): """ Generate a SLURM batch script to be submitted to the slurm scheduler to run desi_proc. Args: night: str or int. The night the data was acquired. exp: int, or list of ints. The exposure id(s) for the data. tileid: str or int. The tile id for the data. ncameras: int. The number of cameras used for joint fitting. queue: str. Queue to be used. Options: runtime: str. Timeout wall clock time. batch_opts: str. Other options to give to the slurm batch scheduler (written into the script). system_name: name of batch system, e.g. cori-haswell, cori-knl. mpistdstars: bool. Whether to use MPI for stdstar fitting. use_specter: bool. Use classic specter instead of gpu_specter for extractions no_gpu: bool. Do not use GPU even if available laststeps: list of str. A list of laststeps to pass as the laststeps argument to tilenight cameras: str, must be camword. Returns: scriptfile: the full path name for the script written. """ batchdir = get_desi_proc_batch_file_path(night) os.makedirs(batchdir, exist_ok=True) nexps = 1 if exp is not None and not np.isscalar(exp): nexps = len(exp) jobname = get_desi_proc_tilenight_batch_file_name(night, tileid) timingfile = f'{jobname}-timing-$SLURM_JOBID.json' scriptfile = os.path.join(batchdir, jobname + '.slurm') ## If system name isn't specified, pick it based upon jobdesc if system_name is None: system_name = batch.default_system(jobdesc='tilenight') batch_config = batch.get_config(system_name) threads_per_core = batch_config['threads_per_core'] gpus_per_node = batch_config['gpus_per_node'] ncores, nodes, runtime = determine_resources(ncameras,'TILENIGHT', queue=queue, nexps=nexps, system_name=system_name,forced_runtime=runtime) if runtime is None: runtime = 30 runtime_hh = int(runtime // 60) runtime_mm = int(runtime % 60) with open(scriptfile, 'w') as fx: fx.write('#!/bin/bash -l\n\n') fx.write('#SBATCH -N {}\n'.format(nodes)) fx.write('#SBATCH --qos {}\n'.format(queue)) for opts in batch_config['batch_opts']: fx.write('#SBATCH {}\n'.format(opts)) if batch_opts is not None: fx.write('#SBATCH {}\n'.format(batch_opts)) if system_name == 'perlmutter-gpu': # perlmutter-gpu requires projects name with "_g" appended fx.write('#SBATCH --account desi_g\n') else: fx.write('#SBATCH --account desi\n') fx.write('#SBATCH --job-name {}\n'.format(jobname)) fx.write('#SBATCH --output {}/{}-%j.log\n'.format(batchdir, jobname)) fx.write('#SBATCH --time={:02d}:{:02d}:00\n'.format(runtime_hh, runtime_mm)) fx.write('#SBATCH --exclusive\n') fx.write('\n') #- Special case CFS readonly mount at NERSC #- SB 2023-01-27: disable this since Perlmutter might deprecate /dvs_ro; #- inherit it from the environment but don't hardcode into script itself # if 'DESI_ROOT_READONLY' in os.environ: # readonlydir = os.environ['DESI_ROOT_READONLY'] # elif os.environ['DESI_ROOT'].startswith('/global/cfs/cdirs'): # readonlydir = os.environ['DESI_ROOT'].replace( # '/global/cfs/cdirs', '/dvs_ro/cfs/cdirs', 1) # else: # readonlydir = None # # if readonlydir is not None: # fx.write(f'export DESI_ROOT_READONLY={readonlydir}\n\n') # # fx.write('\n') cmd = 'desi_proc_tilenight' cmd += f' -n {night}' cmd += f' -t {tileid}' cmd += f' --mpi' if cameras is not None: cmd += f' --cameras {cameras}' else: cmd += f' --cameras a0123456789' if mpistdstars: cmd += f' --mpistdstars' if no_gpu: cmd += f' --no-gpu' elif use_specter: cmd += f' --use-specter' if laststeps is not None: cmd += f' --laststeps="{",".join(laststeps)}"' cmd += f' --timingfile {timingfile}' fx.write(f'# running a tile-night\n') fx.write(f'# using {ncores} cores on {nodes} nodes\n\n') fx.write('echo Starting job $SLURM_JOB_ID on $(hostname) at $(date)\n') fx.write(f'cd {batchdir}\n') mps_wrapper='' if system_name == 'perlmutter-gpu': fx.write("export MPICH_GPU_SUPPORT_ENABLED=1\n") mps_wrapper='desi_mps_wrapper' srun = (f' srun -N {nodes} -n {ncores} -c {threads_per_core} --cpu-bind=cores ' +mps_wrapper+f' {cmd}') fx.write('echo Running {}\n'.format(srun)) fx.write('{}\n'.format(srun)) fx.write('\nif [ $? -eq 0 ]; then\n') fx.write(' echo SUCCESS: done at $(date)\n') fx.write('else\n') fx.write(' echo FAILED: done at $(date)\n') fx.write(' exit 1\n') fx.write('fi\n') print('Wrote {}'.format(scriptfile)) print('logfile will be {}/{}-JOBID.log\n'.format(batchdir, jobname)) return scriptfile