"""
desispec.workflow.desi_proc_funcs
=================================
"""
import json
import time
import sys, os, argparse, re
import numpy as np
import fitsio
import desispec.io
import desiutil
from desispec.io import findfile
from desispec.io.util import decode_camword, \
parse_cameras, get_tempfilename
from desiutil.log import get_logger
from . import batch
[docs]def get_desi_proc_parser():
"""
Create an argparser object for use with desi_proc based on arguments from sys.argv
"""
parser = get_shared_desi_proc_parser()
parser = add_desi_proc_singular_terms(parser)
return parser
[docs]def get_desi_proc_joint_fit_parser():
"""
Create an argparser object for use with desi_proc_joint_fit based on arguments from sys.argv
"""
parser = get_shared_desi_proc_parser()
parser = add_desi_proc_joint_fit_terms(parser)
return parser
[docs]def get_desi_proc_tilenight_parser():
"""
Create an argparser object for use with desi_proc_tilenight based on arguments from sys.argv
"""
parser = get_shared_desi_proc_parser()
parser = add_desi_proc_tilenight_terms(parser)
return parser
[docs]def get_shared_desi_proc_parser():
"""
Create an argparser object for use with desi_proc AND desi_proc_joint_fit based on arguments from sys.argv
"""
parser = argparse.ArgumentParser(usage="{prog} [options]")
parser.add_argument("-n", "--night", type=int, help="YEARMMDD night")
parser.add_argument("--obstype", type=str, help="science, arc, flat, dark, zero, ...")
parser.add_argument("--cameras", type=str, help="Explicitly define the spectrographs for which you want" +
" to reduce the data. Should be a comma separated list." +
" Numbers only assumes you want to reduce R, B, and Z " +
"for that spectrograph. Otherwise specify separately [BRZ|brz][0-9].")
parser.add_argument("--mpi", action="store_true", help="Use MPI parallelism")
parser.add_argument("--traceshift", action="store_true", help="(deprecated)")
parser.add_argument("--no-traceshift", action="store_true", help="Do not shift traces")
parser.add_argument('--maxstdstars', type=int, default=None, \
help='Maximum number of stdstars to include')
parser.add_argument("--psf", type=str, required=False, default=None,
help="use this input psf (trace shifts will still be computed)")
parser.add_argument("--fiberflat", type=str, required=False, default=None, help="use this fiberflat")
parser.add_argument("--calibnight", type=int, required=False, default=None,
help="use this night to find nightly PSF and fiberflats")
parser.add_argument("--scattered-light", action='store_true', help="fit and remove scattered light")
parser.add_argument("--no-bkgsub", action='store_true', help="disable CCD bkg fit between fiber blocks")
parser.add_argument("--no-extra-variance", action='store_true',
help='disable increase sky model variance based on chi2 on sky lines')
parser.add_argument("--batch", action="store_true", help="Submit a batch job to process this exposure")
parser.add_argument("--nosubmit", action="store_true", help="Create batch script but don't submit")
parser.add_argument("-q", "--queue", type=str, default="realtime", help="batch queue to use")
parser.add_argument("--batch-opts", type=str, default=None, help="additional batch commands")
parser.add_argument("--runtime", type=int, default=None, help="batch runtime in minutes")
parser.add_argument("--most-recent-calib", action="store_true", help="If no calibrations exist for the night," +
" use the most recent calibrations from *past* nights. If not set, uses default calibs instead.")
parser.add_argument("--no-model-pixel-variance", action="store_true",
help="Do not use a model of the variance in preprocessing")
parser.add_argument("--no-sky-wavelength-adjustment", action="store_true", help="Do not adjust wavelength of sky lines")
parser.add_argument("--no-sky-lsf-adjustment", action="store_true", help="Do not adjust width of sky lines")
parser.add_argument("--adjust-sky-with-more-fibers", action="store_true", help="Use more fibers than just the sky fibers for the adjustements")
parser.add_argument("--save-sky-adjustments", action="store_true", help="Save sky adjustment terms (wavelength and LSF)")
parser.add_argument("--starttime", type=str, help='start time; use "--starttime `date +%%s`"')
parser.add_argument("--timingfile", type=str, help='save runtime info to this json file; augment if pre-existing')
parser.add_argument("--no-xtalk", action="store_true", help='disable fiber crosstalk correction')
parser.add_argument("--system-name", type=str, help='Batch system name (cori-haswell, perlmutter-gpu, ...)')
parser.add_argument("--extract-subcomm-size", type=int, default=None, help="Size to use for GPU extract subcomm")
parser.add_argument("--no-gpu", action="store_true", help="Do not use GPU for extractions even if available")
parser.add_argument("--use-specter", action="store_true", help="Use classic specter instead of gpu_specter")
parser.add_argument("--mpistdstars", action="store_true", help="Use MPI parallelism in stdstar fitting instead of multiprocessing")
parser.add_argument("--no-skygradpca", action="store_true", help="Do not fit sky gradient")
parser.add_argument("--no-tpcorrparam", action="store_true", help="Do not apply tpcorrparam spatial model or fit tpcorrparam pca terms")
parser.add_argument("--no-barycentric-correction", action="store_true", help="Do not apply barycentric correction to wavelength")
parser.add_argument("--apply-sky-throughput-correction", action="store_true", help="Apply throughput correction to sky fibers (default: do not apply!)")
return parser
[docs]def add_desi_proc_singular_terms(parser):
"""
Add parameters to the argument parser that are only used by desi_proc
"""
#parser.add_argument("-n", "--night", type=int, help="YEARMMDD night")
parser.add_argument("-e", "--expid", type=int, default=None, help="Exposure ID")
parser.add_argument("-i", "--input", type=str, default=None, help="input raw data file")
parser.add_argument("--badamps", type=str, default=None, help="comma separated list of {camera}{petal}{amp}"+\
", i.e. [brz][0-9][ABCD]. Example: 'b7D,z8A'")
parser.add_argument("--fframe", action="store_true", help="Also write non-sky subtracted fframe file")
parser.add_argument("--nofiberflat", action="store_true", help="Do not apply fiberflat")
parser.add_argument("--noskysub", action="store_true",
help="Do not subtract the sky. Also skips stdstar fit and flux calib")
parser.add_argument("--noprestdstarfit", action="store_true",
help="Do not do any science reductions prior to stdstar fitting")
parser.add_argument("--nostdstarfit", action="store_true", help="Do not fit standard stars")
parser.add_argument("--nofluxcalib", action="store_true", help="Do not flux calibrate")
parser.add_argument("--nightlybias", action="store_true", help="Create nightly bias model from ZEROs")
# parser.add_argument("--bias-expids", type=str, default=None,
# help="Explicitly name expids of ZEROs to use for nightly bias model")
parser.add_argument("--nightlycte", action="store_true", help="Fit CTE model from LED exposures")
parser.add_argument("--cte-expids", type=str, default=None,
help="Explicitly name expids of a cte flat and flat to use for cte model")
return parser
[docs]def add_desi_proc_joint_fit_terms(parser):
"""
Add parameters to the argument parser that are only used by desi_proc_joint_fit
"""
#parser.add_argument("-n", "--nights", type=str, help="YEARMMDD nights")
parser.add_argument("-e", "--expids", type=str, help="Exposure IDs")
parser.add_argument("-i", "--inputs", type=str, help="input raw data files")
parser.add_argument("--autocal-ff-solve-grad", action="store_true",
help="Perform a spatial gradient correction to the fiber flat"
+ " by running desi_autocalib_fiberflat with --solve-gradient")
return parser
[docs]def add_desi_proc_tilenight_terms(parser):
"""
Add parameters to the argument parser that are only used by desi_proc_tilenight
"""
parser.add_argument("-t", "--tileid", type=str, help="Tile ID")
parser.add_argument("-d", "--dryrun", action="store_true", help="show commands only, do not run")
parser.add_argument("--laststeps", type=str, default=None,
help="Comma separated list of LASTSTEP values "
+ "(e.g. all, skysub, fluxcalib, ignore); "
+ "by default, exposures with LASTSTEP "
+ "'all' and 'fluxcalib' will be processed "
+ "to the poststdstar step, and all others "
+ "will not be processed at all.")
return parser
[docs]def assign_mpi(do_mpi, do_batch, log):
"""
Based on whether the mpi flag is set and whether the batch flag is set, assign the appropriate
MPI values for the communicator, rank, and number of ranks. Also verify that environment
variables are set and provide useful information via the log
Args:
do_mpi: bool, whether to use mpi or not
do_batch: bool, whether the script is meant to write a batch script and exit or not.
log: desi log object for reporting
Returns:
tuple: A tuple containing:
* comm: MPI communicator object
* rank: int, the numeric number assigned to the currenct MPI rank
* size: int, the total number of MPI ranks
"""
if do_mpi and not do_batch:
from mpi4py import MPI
comm = MPI.COMM_WORLD
rank = comm.rank
size = comm.size
else:
comm = None
rank = 0
size = 1
#- Prevent MPI from killing off spawned processes
if 'PMI_MMAP_SYNC_WAIT_TIME' not in os.environ:
os.environ['PMI_MMAP_SYNC_WAIT_TIME'] = '3600'
#- Double check env for MPI+multiprocessing at NERSC
if 'MPICH_GNI_FORK_MODE' not in os.environ:
os.environ['MPICH_GNI_FORK_MODE'] = 'FULLCOPY'
if rank == 0:
log.info('Setting MPICH_GNI_FORK_MODE=FULLCOPY for MPI+multiprocessing')
elif os.environ['MPICH_GNI_FORK_MODE'] != 'FULLCOPY':
gnifork = os.environ['MPICH_GNI_FORK_MODE']
if rank == 0:
log.error(f'MPICH_GNI_FORK_MODE={gnifork} is not "FULLCOPY"; this might not work with MPI+multiprocessing, but not overriding')
elif rank == 0:
log.debug('MPICH_GNI_FORK_MODE={}'.format(os.environ['MPICH_GNI_FORK_MODE']))
return comm, rank, size
[docs]def cameras_from_raw_data(rawdata):
"""
Takes a filepath or fitsio FITS object corresponding to a DESI raw data file
and returns a list of cameras for which data exists in the file.
Args:
rawdata, str or fitsio.FITS object. The input raw desi data file. str must be a full file path. Otherwise
it must be a fitsio.FITS object.
Returns:
str: The list of cameras that have data in the given file.
"""
## Be flexible on whether input is filepath or a filehandle
if type(rawdata) is str:
if os.path.isfile(rawdata):
fx = fitsio.FITS(rawdata)
else:
raise IOError(f"File {rawdata} doesn't exist.")
else:
fx = rawdata
recam = re.compile('^[brzBRZ][\d]$')
cameras = list()
for hdu in fx.hdu_list:
if recam.match(hdu.get_extname()):
cameras.append(hdu.get_extname().lower())
return cameras
[docs]def log_timer(timer, timingfile=None, comm=None):
"""
Log timing info, optionally writing to json timingfile
Args:
timer: desiutil.timer.Timer object
timingfile (str): write json output to this file
comm: MPI communicator
Notes:
* If comm is not None, collect timers across ranks.
* If timingfile already exists, read and append timing then re-write.
"""
log = get_logger()
if comm is not None:
timers = comm.gather(timer, root=0)
rank, size = comm.rank, comm.size
else:
timers = [timer,]
rank, size = 0, 1
if rank == 0:
stats = desiutil.timer.compute_stats(timers)
if timingfile:
if os.path.exists(timingfile):
with open(timingfile) as fx:
previous_stats = json.load(fx)
#- augment previous_stats with new entries, but don't overwrite old
for name in stats:
if name not in previous_stats:
previous_stats[name] = stats[name]
stats = previous_stats
tmpfile = get_tempfilename(timingfile)
with open(tmpfile, 'w') as fx:
json.dump(stats, fx, indent=2)
os.rename(tmpfile, timingfile)
log.info(f'Timing stats saved to {timingfile}')
log.info('Timing max duration per step [seconds]:')
for stepname, steptiming in stats.items():
tmax = steptiming['duration.max']
log.info(f' {stepname:16s} {tmax:.2f}')
[docs]def determine_resources(ncameras, jobdesc, nexps=1, forced_runtime=None, queue=None, system_name=None):
"""
Determine the resources that should be assigned to the batch script given what
desi_proc needs for the given input information.
Args:
ncameras (int): number of cameras to be processed
jobdesc (str): type of data being processed
nexps (int, optional): the number of exposures processed in this step
queue (str, optional): the Slurm queue to be submitted to. Currently not used.
system_name (str, optional): batch compute system, e.g. cori-haswell or perlmutter-gpu
Returns:
tuple: A tuple containing:
* ncores: int, number of cores (actually 2xphysical cores) that should be submitted via "-n {ncores}"
* nodes: int, number of nodes to be requested in the script. Typically (ncores-1) // cores_per_node + 1
* runtime: int, the max time requested for the script in minutes for the processing.
"""
if system_name is None:
system_name = batch.default_system(jobdesc=jobdesc)
config = batch.get_config(system_name)
log = get_logger()
jobdesc = jobdesc.upper()
nspectro = (ncameras - 1) // 3 + 1
nodes = None
if jobdesc in ('ARC', 'TESTARC'):
ncores = 20 * (10*(ncameras+1)//20) # lowest multiple of 20 exceeding 10 per camera
ncores, runtime = ncores + 1, 45 # + 1 for worflow.schedule scheduler proc
elif jobdesc in ('FLAT', 'TESTFLAT'):
runtime = 40
if system_name.startswith('perlmutter'):
ncores = config['cores_per_node']
else:
ncores = 20 * nspectro
elif jobdesc == 'TILENIGHT':
runtime = int(60. / 140. * ncameras * nexps) # 140 frames per node hour
runtime += 30 # overhead
ncores = config['cores_per_node']
if not system_name.startswith('perlmutter'):
msg = 'tilenight cannot run on system_name={}'.format(system_name)
log.critical(msg)
raise ValueError(msg)
elif jobdesc in ('SKY', 'TWILIGHT', 'SCIENCE','PRESTDSTAR'):
runtime = 30
if system_name.startswith('perlmutter'):
ncores = config['cores_per_node']
else:
ncores = 20 * nspectro
elif jobdesc in ('DARK', 'BADCOL'):
ncores, runtime = ncameras, 5
elif jobdesc == 'CCDCALIB':
ncores, runtime = ncameras, 5
elif jobdesc == 'ZERO':
ncores, runtime = 2, 5
elif jobdesc == 'PSFNIGHT':
ncores, runtime = ncameras, 5
elif jobdesc == 'NIGHTLYFLAT':
ncores, runtime = ncameras, 5
elif jobdesc == 'STDSTARFIT':
#- Special case hardcode: stdstar parallelism maxes out at ~30 cores
#- and on KNL, it OOMs above that anyway.
#- This might be more related to using a max of 30 standards, not that
#- there are 30 cameras (coincidence).
#- Use 32 as power of 2 for core packing
ncores = 32
runtime = 8+2*nexps
elif jobdesc == 'POSTSTDSTAR':
runtime = 10
ncores = ncameras
elif jobdesc == 'NIGHTLYBIAS':
ncores, runtime = 15, 5
nodes = 2
elif jobdesc in ['PEREXP', 'PERNIGHT', 'CUMULATIVE']:
if system_name.startswith('perlmutter'):
nodes, runtime = 1, 50 #- timefactor will bring time back down
else:
nodes, runtime = 2, 30
ncores = nodes * config['cores_per_node']
elif jobdesc == 'HEALPIX':
nodes = 1
runtime = 100
ncores = nodes * config['cores_per_node']
elif jobdesc == 'LINKCAL':
nodes, ncores = 1, 1
runtime = 5.
else:
msg = 'unknown jobdesc={}'.format(jobdesc)
log.critical(msg)
raise ValueError(msg)
if forced_runtime is not None:
runtime = forced_runtime
if nodes is None:
nodes = (ncores - 1) // config['cores_per_node'] + 1
# - Arcs and flats make good use of full nodes, but throttle science
# - exposures to 5 nodes to enable two to run together in the 10-node
# - realtime queue, since their wallclock is dominated by less
# - efficient sky and fluxcalib steps
if jobdesc in ('ARC', 'TESTARC'):#, 'FLAT', 'TESTFLAT'):
max_realtime_nodes = 10
else:
max_realtime_nodes = 5
#- Pending further optimizations, use same number of nodes in all queues
### if (queue == 'realtime') and (nodes > max_realtime_nodes):
if (nodes > max_realtime_nodes):
nodes = max_realtime_nodes
ncores = config['cores_per_node'] * nodes
if jobdesc in ('ARC', 'TESTARC'):
# adjust for workflow.schedule scheduler proc
ncores = ((ncores - 1) // 20) * 20 + 1
#- Allow KNL jobs to be slower than Haswell,
#- except for ARC so that we don't have ridiculously long times
#- (Normal arc is still ~15 minutes, albeit with a tail)
if jobdesc not in ['ARC', 'TESTARC']:
runtime *= config['timefactor']
#- Do not allow runtime to be less than 5 min
if runtime < 5:
runtime = 5
#- Add additional overhead factor if needed
if 'NERSC_RUNTIME_OVERHEAD' in os.environ:
t = os.environ['NERSC_RUNTIME_OVERHEAD']
log.info(f'Adding $NERSC_RUNTIME_OVERHEAD={t} minutes to batch runtime request')
runtime += float(runtime)
return ncores, nodes, runtime
[docs]def get_desi_proc_batch_file_path(night,reduxdir=None):
"""
Returns the default directory location to store a batch script file given a night
Args:
night (str or int): defines the night (should be 8 digits)
reduxdir (str, optional): define the base directory where the /run/scripts directory should or does live.
Returns:
str: the default location where a batch script file should be written
"""
if reduxdir is None:
reduxdir = desispec.io.specprod_root()
batchdir = os.path.join(reduxdir, 'run', 'scripts', 'night', str(night))
return batchdir
[docs]def get_desi_proc_tilenight_batch_file_name(night, tileid):
"""
Returns the filename for a tilenight batch script file given a night and tileid
Args:
night: str or int, defines the night (should be 8 digits)
tileid: str or int, defines the tile id relevant to the job
Returns:
pathname: str, the default script name for a desi_proc_tilenight batch script file
"""
if type(tileid) is not str:
if np.isscalar(tileid):
tileid = '{}'.format(tileid)
else:
raise RuntimeError('tileid should be either int or str')
jobname = 'tilenight-{}-{}'.format(night, tileid)
return jobname
[docs]def get_desi_proc_batch_file_name(night, exp, jobdesc, cameras):
"""
Returns the default directory location to store a batch script file given a night
Args:
night: str or int, defines the night (should be 8 digits)
exp: str, int, or array of ints, defines the exposure id(s) relevant to the job
jobdesc: str, type of data being processed
cameras: str or list of str. If str, must be camword, If list, must be list of cameras to include in the processing.
Returns:
pathname: str, the default script name for a desi_proc batch script file
"""
camword = parse_cameras(cameras)
if jobdesc.lower() == 'linkcal':
expstr = ""
elif type(exp) is not str:
if exp is None:
expstr = 'none'
elif np.isscalar(exp):
expstr = '{:08d}'.format(exp)
else:
#expstr = '-'.join(['{:08d}'.format(curexp) for curexp in exp])
expstr = '{:08d}'.format(exp[0])
else:
expstr = exp
if expstr != "":
expstr = "-" + expstr
jobname = f'{jobdesc.lower()}-{night}{expstr}-{camword}'
return jobname
[docs]def get_desi_proc_batch_file_pathname(night, exp, jobdesc, cameras,
reduxdir=None):
"""
Returns the default directory location to store a batch script file given a night
Args:
night: str or int, defines the night (should be 8 digits)
exp: str, int, or array of ints, defines the exposure id(s) relevant to the job
jobdesc: str, type of data being processed
cameras: str or list of str. If str, must be camword, If list, must be list of cameras to include in the processing.
reduxdir: str (optional), define the base directory where the /run/scripts directory should or does live
Returns:
pathname: str, the default location and script name for a desi_proc batch script file
"""
path = get_desi_proc_batch_file_path(night, reduxdir=reduxdir)
name = get_desi_proc_batch_file_name(night, exp, jobdesc, cameras)
return os.path.join(path, name)
[docs]def get_desi_proc_tilenight_batch_file_pathname(night, tileid, reduxdir=None):
"""
Returns the default directory location to store a tilenight batch script file given a night and tileid
Args:
night: str or int, defines the night (should be 8 digits)
tileid: str or int, defines the tile id relevant to the job
reduxdir: str (optional), define the base directory where the /run/scripts directory should or does live
Returns:
pathname: str, the default location and script name for a desi_proc_tilenight batch script file
"""
path = get_desi_proc_batch_file_path(night,reduxdir=reduxdir)
name = get_desi_proc_tilenight_batch_file_name(night,tileid)
return os.path.join(path, name)
[docs]def create_linkcal_batch_script(newnight, queue, cameras=None, runtime=None,
batch_opts=None, timingfile=None,
batchdir=None, jobname=None, cmd=None,
system_name=None):
"""
Generate a batch script to be submitted to the slurm scheduler to run
desi_link_calibnight.
Args:
newnight (str or int): The night in calibnight where the links will
queue (str): Queue to be used.
cameras (str or list of str): List of cameras to include in the processing.
runtime (str, optional): Timeout wall clock time.
batch_opts (str, optional): Other options to give to the slurm batch scheduler (written into the script).
timingfile (str, optional): Specify the name of the timing file.
batchdir (str, optional): Specify where the batch file will be written.
jobname (str, optional): Specify the name of the slurm script written.
cmd (str, optional): Complete command as would be given in terminal to
run desi_link_calibnight.
system_name (str, optional): name of batch system, e.g. cori-haswell, cori-knl
Returns:
scriptfile: the full path name for the script written.
Note:
batchdir and jobname can be used to define an alternative pathname, but may not work with assumptions in desi_proc.
These optional arguments should be used with caution and primarily for debugging.
"""
jobdesc = 'linkcal'
if cameras is None or np.isscalar(cameras):
camword = cameras
cameras = decode_camword(camword)
if batchdir is None:
batchdir = get_desi_proc_batch_file_path(newnight)
os.makedirs(batchdir, exist_ok=True)
if jobname is None:
jobname = get_desi_proc_batch_file_name(night=newnight, exp="",
jobdesc=jobdesc, cameras=cameras)
if timingfile is None:
timingfile = f'{jobname}-timing-$SLURM_JOBID.json'
timingfile = os.path.join(batchdir, timingfile)
scriptfile = os.path.join(batchdir, jobname + '.slurm')
## If system name isn't specified, pick it based upon jobdesc
if system_name is None:
system_name = batch.default_system(jobdesc=jobdesc)
batch_config = batch.get_config(system_name)
threads_per_core = batch_config['threads_per_core']
gpus_per_node = batch_config['gpus_per_node']
ncameras = len(cameras)
ncores, nodes, runtime = determine_resources(ncameras, jobdesc.upper(),
forced_runtime=runtime,
system_name=system_name)
runtime_hh = int(runtime // 60)
runtime_mm = int(runtime % 60)
with open(scriptfile, 'w') as fx:
fx.write('#!/bin/bash -l\n\n')
fx.write('#SBATCH -N {}\n'.format(nodes))
fx.write('#SBATCH --qos {}\n'.format(queue))
for opts in batch_config['batch_opts']:
fx.write('#SBATCH {}\n'.format(opts))
if batch_opts is not None:
fx.write('#SBATCH {}\n'.format(batch_opts))
if system_name == 'perlmutter-gpu':
# perlmutter-gpu requires projects name with "_g" appended
fx.write('#SBATCH --account desi_g\n')
else:
fx.write('#SBATCH --account desi\n')
fx.write('#SBATCH --job-name {}\n'.format(jobname))
fx.write('#SBATCH --output {}/{}-%j.log\n'.format(batchdir, jobname))
fx.write('#SBATCH --time={:02d}:{:02d}:00\n'.format(runtime_hh, runtime_mm))
#fx.write('#SBATCH --exclusive\n')
fx.write('\n')
fx.write(f'# {jobdesc} with {ncameras} cameras\n')
fx.write(f'# using {ncores} cores on {nodes} nodes\n\n')
fx.write('echo Starting job $SLURM_JOB_ID on $(hostname) at $(date)\n')
# fx.write("export OMP_NUM_THREADS=1\n")
fx.write(f'\n# Link refnight to new night\n')
fx.write(f'echo Running {cmd}\n')
fx.write(f'{cmd}\n')
fx.write('\nif [ $? -eq 0 ]; then\n')
fx.write(' echo SUCCESS: done at $(date)\n')
fx.write('else\n')
fx.write(' echo FAILED: done at $(date)\n')
fx.write(' exit 1\n')
fx.write('fi\n')
print('Wrote {}'.format(scriptfile))
print('logfile will be {}/{}-JOBID.log\n'.format(batchdir, jobname))
return scriptfile
[docs]def create_desi_proc_batch_script(night, exp, cameras, jobdesc, queue,
runtime=None, batch_opts=None, timingfile=None,
batchdir=None, jobname=None, cmdline=None,
system_name=None, use_specter=False,
no_gpu=False, nightlybias=None,
nightlycte=None, cte_expids=None):
"""
Generate a SLURM batch script to be submitted to the slurm scheduler to run desi_proc.
Args:
night (str or int): The night the data was acquired
exp (str, int, or list of int): The exposure id(s) for the data.
cameras (str or list of str): List of cameras to include in the processing.
jobdesc (str): Description of the job to be performed. Used to determine requested resources
and whether to operate in a more mpi parallelism (all except poststdstar) or less (only poststdstar).
Directly relate to the obstype, with science exposures being split into two (pre, post)-stdstar,
and adding joint fit categories stdstarfit, psfnight, and nightlyflat.
Options include: 'prestdstar', 'poststdstar', 'stdstarfit', 'arc', 'flat', 'psfnight', 'nightlyflat'
queue (str): Queue to be used.
runtime (str, optional): Timeout wall clock time.
batch_opts (str, optional): Other options to give to the slurm batch scheduler (written into the script).
timingfile (str, optional): Specify the name of the timing file.
batchdir (str, optional): Specify where the batch file will be written.
jobname (str, optional): Specify the name of the slurm script written.
cmdline (str, optional): Complete command as would be given in terminal to run the desi_proc. Can be used instead
of reading from argv.
system_name (str, optional): name of batch system, e.g. cori-haswell, cori-knl
use_specter (bool, optional): Use classic specter instead of gpu_specter for extractions
no_gpu (bool, optional): Do not use GPU even if available
nightlybias (bool): Create nightly bias model from ZEROs
nightlycte (bool): Fit CTE model from LED exposures
cte_expids (list): Explicitly name expids of the cte flat and flat to use for cte model
Returns:
scriptfile: the full path name for the script written.
Note:
batchdir and jobname can be used to define an alternative pathname, but may not work with assumptions in desi_proc.
These optional arguments should be used with caution and primarily for debugging.
"""
log = get_logger()
if np.isscalar(cameras):
camword = cameras
cameras = decode_camword(camword)
if batchdir is None:
batchdir = get_desi_proc_batch_file_path(night)
os.makedirs(batchdir, exist_ok=True)
if jobname is None:
jobname = get_desi_proc_batch_file_name(night, exp, jobdesc, cameras)
if timingfile is None:
timingfile = f'{jobname}-timing-$SLURM_JOBID.json'
timingfile = os.path.join(batchdir, timingfile)
scriptfile = os.path.join(batchdir, jobname + '.slurm')
## If system name isn't specified, pick it based upon jobdesc
if system_name is None:
system_name = batch.default_system(jobdesc=jobdesc)
batch_config = batch.get_config(system_name)
threads_per_core = batch_config['threads_per_core']
gpus_per_node = batch_config['gpus_per_node']
ncameras = len(cameras)
nexps = 1
if exp is not None and not np.isscalar(exp) and type(exp) is not str:
nexps = len(exp)
ncores, nodes, runtime = determine_resources(
ncameras, jobdesc.upper(), queue=queue, nexps=nexps,
forced_runtime=runtime, system_name=system_name)
## derive from cmdline or sys.argv whether this is a nightlybias job
## if not explicitly defined
if nightlybias is None:
nightlybias = False
if cmdline is not None:
if '--nightlybias' in cmdline:
nightlybias = True
elif '--nightlybias' in sys.argv:
nightlybias = True
#- nightlybias jobs are memory limited, so throttle number of ranks
if nightlybias:
tot_threads = batch_config['threads_per_core'] * batch_config['cores_per_node']
bias_threads_per_core = tot_threads // 8
bias_cores, bias_nodes, bias_runtime = determine_resources(
ncameras, 'NIGHTLYBIAS', queue=queue, nexps=nexps,
system_name=system_name)
nodes = max(nodes, bias_nodes)
runtime += bias_runtime
## derive from cmdline or sys.argv whether this is a nightlycte job
## if not explicitly defined
if nightlycte is None:
nightlycte = False
if cmdline is not None:
if '--nightlycte' in cmdline:
nightlycte = True
elif '--nightlycte' in sys.argv:
nightlycte = True
## nightlycte jobs add time to the job
## hardcoding a runtime for nightlycte.
## TODO should be moved into determine_resources()
if nightlycte:
cte_runtime = 5
runtime += cte_runtime
#- arc fits require 3.2 GB of memory per bundle, so increase nodes as needed
if jobdesc.lower() == 'arc':
cores_per_node = (ncores-1) // nodes + ((ncores-1) % nodes > 0)
mem_per_node = float(batch_config['memory'])
mem_per_core = mem_per_node / cores_per_node
while mem_per_core < 3.2:
nodes += 1
cores_per_node = (ncores-1) // nodes + ((ncores-1) % nodes > 0)
mem_per_core = mem_per_node / cores_per_node
threads_per_node = batch_config['threads_per_core'] * batch_config['cores_per_node']
threads_per_core = (threads_per_node * nodes) // ncores
runtime_hh = int(runtime // 60)
runtime_mm = int(runtime % 60)
with open(scriptfile, 'w') as fx:
fx.write('#!/bin/bash -l\n\n')
fx.write('#SBATCH -N {}\n'.format(nodes))
fx.write('#SBATCH --qos {}\n'.format(queue))
for opts in batch_config['batch_opts']:
fx.write('#SBATCH {}\n'.format(opts))
if batch_opts is not None:
fx.write('#SBATCH {}\n'.format(batch_opts))
if system_name == 'perlmutter-gpu':
# perlmutter-gpu requires projects name with "_g" appended
fx.write('#SBATCH --account desi_g\n')
else:
fx.write('#SBATCH --account desi\n')
fx.write('#SBATCH --job-name {}\n'.format(jobname))
fx.write('#SBATCH --output {}/{}-%j.log\n'.format(batchdir, jobname))
fx.write('#SBATCH --time={:02d}:{:02d}:00\n'.format(runtime_hh, runtime_mm))
fx.write('#SBATCH --exclusive\n')
fx.write('\n')
#- Special case CFS readonly mount at NERSC
#- SB 2023-01-27: disable this since Perlmutter might deprecate /dvs_ro;
#- inherit it from the environment but don't hardcode into script itself
# if 'DESI_ROOT_READONLY' in os.environ:
# readonlydir = os.environ['DESI_ROOT_READONLY']
# elif os.environ['DESI_ROOT'].startswith('/global/cfs/cdirs'):
# readonlydir = os.environ['DESI_ROOT'].replace(
# '/global/cfs/cdirs', '/dvs_ro/cfs/cdirs', 1)
# else:
# readonlydir = None
#
# if readonlydir is not None:
# fx.write(f'export DESI_ROOT_READONLY={readonlydir}\n\n')
if cmdline is None:
inparams = list(sys.argv).copy()
elif np.isscalar(cmdline):
inparams = []
for param in cmdline.split(' '):
for subparam in param.split("="):
inparams.append(subparam)
else:
inparams = list(cmdline)
for parameter in ['--queue', '-q', '--batch-opts', '--cte-expids']:
## If a parameter is in the list, remove it and its argument
## Elif it is a '--' command, it might be --option=value, which won't be split.
## check for that and remove the whole "--option=value"
if parameter in inparams:
loc = np.where(np.array(inparams) == parameter)[0][0]
# Remove the command
inparams.pop(loc)
# Remove the argument of the command (now in the command location after pop)
inparams.pop(loc)
elif '--' in parameter:
for ii,inparam in enumerate(inparams.copy()):
if parameter in inparam:
inparams.pop(ii)
break
cmd = ' '.join(inparams)
cmd = cmd.replace(' --batch', ' ').replace(' --nosubmit', ' ')
cmd = cmd.replace(' --nightlycte', ' ')
if '--mpi' not in cmd:
cmd += ' --mpi'
if jobdesc.lower() == 'stdstarfit':
cmd += ' --mpistdstars'
if no_gpu and '--no-gpu' not in cmd:
cmd += ' --no-gpu'
if (use_specter and ('--use-specter' not in cmd) and
jobdesc.lower() in ['flat', 'science', 'prestdstar', 'tilenight']):
cmd += ' --use-specter'
cmd += ' --starttime $(date +%s)'
cmd += f' --timingfile {timingfile}'
fx.write(f'# {jobdesc} exposure with {ncameras} cameras\n')
fx.write(f'# using {ncores} cores on {nodes} nodes\n\n')
fx.write('echo Starting job $SLURM_JOB_ID on $(hostname) at $(date)\n')
mps_wrapper=''
if jobdesc.lower() == 'arc':
fx.write("export OMP_NUM_THREADS={}\n".format(threads_per_core))
else:
fx.write("export OMP_NUM_THREADS=1\n")
if system_name == 'perlmutter-gpu' and jobdesc.lower() not in ['arc']:
fx.write("export MPICH_GPU_SUPPORT_ENABLED=1\n")
mps_wrapper='desi_mps_wrapper'
if jobdesc.lower() not in ['science', 'prestdstar', 'stdstarfit', 'poststdstar']:
if nightlybias:
tmp = cmd.split()
has_expid = False
if '-e' in tmp:
has_expid = True
i = tmp.index('-e')
tmp.pop(i) # -e
tmp.pop(i) # EXPID
if '--expid' in tmp:
has_expid = True
i = tmp.index('--expid')
tmp.pop(i) # --expid
tmp.pop(i) # EXPID
bias_cmd = ' '.join(tmp)
fx.write('\n# Run nightlybias first\n')
srun=f'srun -N {bias_nodes} -n {bias_cores} -c {bias_threads_per_core} {bias_cmd}'
fx.write('echo Running {}\n'.format(srun))
fx.write('{}\n'.format(srun))
if has_expid:
fx.write('\nif [ $? -eq 0 ]; then\n')
fx.write(' echo nightlybias succeeded at $(date)\n')
fx.write('else\n')
fx.write(' echo FAILED: nightlybias failed; stopping at $(date)\n')
fx.write(' exit 1\n')
fx.write('fi\n')
if ' -e ' in cmd or ' --expid ' in cmd:
fx.write('\n# Process exposure\n')
cmd = cmd.replace(' --nightlybias', '')
cmd = cmd.replace(' --nightlycte', '')
srun=(f'srun -N {nodes} -n {ncores} -c {threads_per_core} --cpu-bind=cores '
+mps_wrapper+f' {cmd}')
fx.write('echo Running {}\n'.format(srun))
fx.write('{}\n'.format(srun))
#- nightlybias implies that this is a ccdcalib job,
#- where we will also run CTE fitting
if nightlybias:
#- first check if previous command failed
fx.write('\nif [ $? -eq 0 ]; then\n')
fx.write(' echo command succeeded at $(date)\n')
fx.write('else\n')
fx.write(' echo FAILED: processing failed; stopping at $(date)\n')
fx.write(' exit 1\n')
fx.write('fi\n')
if nightlycte:
#- then proceed with desi_fit_cte_night command
camword = parse_cameras(cameras)
fx.write('\n# Fit CTE parameters from flats if needed\n')
cmd = f'desi_fit_cte_night -n {night} -c {camword}'
if cte_expids is not None:
cmd += f' -e ' + ','.join(np.atleast_1d(cte_expids).astype(str))
ctecorrfile = findfile('ctecorrnight', night=night)
fname = os.path.basename(ctecorrfile)
fx.write(f'if [ -f {ctecorrfile} ]; then\n')
fx.write(f' echo Already have {fname}\n')
fx.write(f'else\n')
fx.write(f' echo running {cmd}\n')
fx.write(f' {cmd}\n')
fx.write(f'fi\n')
else:
if jobdesc.lower() in ['science', 'prestdstar', 'stdstarfit']:
fx.write('\n# Do steps through stdstarfit at full MPI parallelism\n')
srun = (f' srun -N {nodes} -n {ncores} -c {threads_per_core} --cpu-bind=cores '
+mps_wrapper+f' {cmd}')
if jobdesc.lower() in ['science', 'prestdstar']:
srun += ' --nofluxcalib'
fx.write('echo Running {}\n'.format(srun))
fx.write('{}\n'.format(srun))
if jobdesc.lower() in ['science', 'poststdstar']:
ntasks=ncameras
tot_threads = nodes * batch_config['cores_per_node'] * batch_config['threads_per_core']
threads_per_task = max(int(tot_threads / ntasks), 1)
fx.write('\n# Use less MPI parallelism for fluxcalib MP parallelism\n')
fx.write('# This should quickly skip over the steps already done\n')
#- fluxcalib multiprocessing parallelism needs --cpu-bind=none (or at least not "cores")
srun = f'srun -N {nodes} -n {ntasks} -c {threads_per_task} --cpu-bind=none {cmd} '
fx.write('if [ $? -eq 0 ]; then\n')
fx.write(' echo Running {}\n'.format(srun))
fx.write(' {}\n'.format(srun))
fx.write('else\n')
fx.write(' echo FAILED: done at $(date)\n')
fx.write(' exit 1\n')
fx.write('fi\n')
fx.write('\nif [ $? -eq 0 ]; then\n')
fx.write(' echo SUCCESS: done at $(date)\n')
fx.write('else\n')
fx.write(' echo FAILED: done at $(date)\n')
fx.write(' exit 1\n')
fx.write('fi\n')
print('Wrote {}'.format(scriptfile))
print('logfile will be {}/{}-JOBID.log\n'.format(batchdir, jobname))
return scriptfile
[docs]def create_desi_proc_tilenight_batch_script(night, exp, tileid, ncameras, queue, runtime=None, batch_opts=None,
system_name=None, mpistdstars=True, use_specter=False,
no_gpu=False, laststeps=None, cameras=None
):
"""
Generate a SLURM batch script to be submitted to the slurm scheduler to run desi_proc.
Args:
night: str or int. The night the data was acquired.
exp: int, or list of ints. The exposure id(s) for the data.
tileid: str or int. The tile id for the data.
ncameras: int. The number of cameras used for joint fitting.
queue: str. Queue to be used.
Options:
runtime: str. Timeout wall clock time.
batch_opts: str. Other options to give to the slurm batch scheduler (written into the script).
system_name: name of batch system, e.g. cori-haswell, cori-knl.
mpistdstars: bool. Whether to use MPI for stdstar fitting.
use_specter: bool. Use classic specter instead of gpu_specter for extractions
no_gpu: bool. Do not use GPU even if available
laststeps: list of str. A list of laststeps to pass as the laststeps argument to tilenight
cameras: str, must be camword.
Returns:
scriptfile: the full path name for the script written.
"""
batchdir = get_desi_proc_batch_file_path(night)
os.makedirs(batchdir, exist_ok=True)
nexps = 1
if exp is not None and not np.isscalar(exp):
nexps = len(exp)
jobname = get_desi_proc_tilenight_batch_file_name(night, tileid)
timingfile = f'{jobname}-timing-$SLURM_JOBID.json'
timingfile = os.path.join(batchdir, timingfile)
scriptfile = os.path.join(batchdir, jobname + '.slurm')
## If system name isn't specified, pick it based upon jobdesc
if system_name is None:
system_name = batch.default_system(jobdesc='tilenight')
batch_config = batch.get_config(system_name)
threads_per_core = batch_config['threads_per_core']
gpus_per_node = batch_config['gpus_per_node']
ncores, nodes, runtime = determine_resources(ncameras,'TILENIGHT',
queue=queue, nexps=nexps, system_name=system_name,forced_runtime=runtime)
if runtime is None:
runtime = 30
runtime_hh = int(runtime // 60)
runtime_mm = int(runtime % 60)
with open(scriptfile, 'w') as fx:
fx.write('#!/bin/bash -l\n\n')
fx.write('#SBATCH -N {}\n'.format(nodes))
fx.write('#SBATCH --qos {}\n'.format(queue))
for opts in batch_config['batch_opts']:
fx.write('#SBATCH {}\n'.format(opts))
if batch_opts is not None:
fx.write('#SBATCH {}\n'.format(batch_opts))
if system_name == 'perlmutter-gpu':
# perlmutter-gpu requires projects name with "_g" appended
fx.write('#SBATCH --account desi_g\n')
else:
fx.write('#SBATCH --account desi\n')
fx.write('#SBATCH --job-name {}\n'.format(jobname))
fx.write('#SBATCH --output {}/{}-%j.log\n'.format(batchdir, jobname))
fx.write('#SBATCH --time={:02d}:{:02d}:00\n'.format(runtime_hh, runtime_mm))
fx.write('#SBATCH --exclusive\n')
fx.write('\n')
#- Special case CFS readonly mount at NERSC
#- SB 2023-01-27: disable this since Perlmutter might deprecate /dvs_ro;
#- inherit it from the environment but don't hardcode into script itself
# if 'DESI_ROOT_READONLY' in os.environ:
# readonlydir = os.environ['DESI_ROOT_READONLY']
# elif os.environ['DESI_ROOT'].startswith('/global/cfs/cdirs'):
# readonlydir = os.environ['DESI_ROOT'].replace(
# '/global/cfs/cdirs', '/dvs_ro/cfs/cdirs', 1)
# else:
# readonlydir = None
#
# if readonlydir is not None:
# fx.write(f'export DESI_ROOT_READONLY={readonlydir}\n\n')
#
# fx.write('\n')
cmd = 'desi_proc_tilenight'
cmd += f' -n {night}'
cmd += f' -t {tileid}'
cmd += f' --mpi'
if cameras is not None:
cmd += f' --cameras {cameras}'
else:
cmd += f' --cameras a0123456789'
if mpistdstars:
cmd += f' --mpistdstars'
if no_gpu:
cmd += f' --no-gpu'
elif use_specter:
cmd += f' --use-specter'
if laststeps is not None:
cmd += f' --laststeps="{",".join(laststeps)}"'
cmd += f' --timingfile {timingfile}'
fx.write(f'# running a tile-night\n')
fx.write(f'# using {ncores} cores on {nodes} nodes\n\n')
fx.write('echo Starting job $SLURM_JOB_ID on $(hostname) at $(date)\n')
mps_wrapper=''
if system_name == 'perlmutter-gpu':
fx.write("export MPICH_GPU_SUPPORT_ENABLED=1\n")
mps_wrapper='desi_mps_wrapper'
fx.write('\n# Do steps through stdstarfit at full MPI parallelism\n')
srun = (f' srun -N {nodes} -n {ncores} -c {threads_per_core} --cpu-bind=cores '
+mps_wrapper+f' {cmd}')
fx.write('echo Running {}\n'.format(srun))
fx.write('{}\n'.format(srun))
fx.write('\nif [ $? -eq 0 ]; then\n')
fx.write(' echo SUCCESS: done at $(date)\n')
fx.write('else\n')
fx.write(' echo FAILED: done at $(date)\n')
fx.write(' exit 1\n')
fx.write('fi\n')
print('Wrote {}'.format(scriptfile))
print('logfile will be {}/{}-JOBID.log\n'.format(batchdir, jobname))
return scriptfile
[docs]def find_most_recent(night, file_type='psfnight', cam='r', n_nights=30):
'''
Searches back in time for either psfnight or fiberflatnight (or anything supported by
desispec.calibfinder.findcalibfile. This only works on nightly-based files, so exposure id
information is not used.
Args:
night : str. YYYYMMDD night to look back from
file_type : str. psfnight or fiberflatnight
cam : str. camera (b, r, or z).
n_nights : int. number of nights to step back before giving up
Returns:
str: Full pathname to calibration file of interest.
If none found, None is returned.
'''
# Set the time as Noon on the day in question
today = time.strptime('{} 12'.format(night),'%Y%m%d %H')
one_day = 60*60*24 # seconds
test_night_struct = today
# Search a month in the past
for daysback in range(n_nights) :
test_night_struct = time.strptime(time.ctime(time.mktime(test_night_struct)-one_day))
test_night_str = time.strftime('%Y%m%d', test_night_struct)
nightfile = findfile(file_type, test_night_str, camera=cam)
if os.path.isfile(nightfile) :
return nightfile
return None