Source code for desispec.workflow.redshifts

"""
desispec.workflow.redshifts
===========================

"""
import sys, os, glob
import re
import subprocess
import argparse
import numpy as np
from astropy.table import Table, vstack, Column

from desispec.io.util import parse_cameras, decode_camword
from desispec.workflow.desi_proc_funcs import determine_resources
from desiutil.log import get_logger

import desispec.io
from desispec.workflow.exptable import get_exposure_table_path, get_exposure_table_name, \
                                       get_exposure_table_pathname
from desispec.workflow.tableio import load_table
from desispec.workflow import batch
from desispec.util import parse_int_args



[docs]def get_ztile_relpath(tileid,group,night=None,expid=None): """ Determine the relative output directory of the tile redshift batch script for spectra+coadd+redshifts for a tile Args: tileid (int): Tile ID group (str): cumulative, pernight, perexp, or a custom name night (int): Night expid (int): Exposure ID Returns: outdir (str): the relative path of output directory of the batch script from the specprod/run/scripts """ log = get_logger() # - output directory relative to reduxdir if group == 'cumulative': outdir = f'tiles/{group}/{tileid}/{night}' elif group == 'pernight': outdir = f'tiles/{group}/{tileid}/{night}' elif group == 'perexp': outdir = f'tiles/{group}/{tileid}/{expid:08d}' elif group == 'pernight-v0': outdir = f'tiles/{tileid}/{night}' else: outdir = f'tiles/{group}/{tileid}' log.warning(f'Non-standard tile group={group}; writing outputs to {outdir}/*') return outdir
[docs]def get_ztile_script_pathname(tileid,group,night=None,expid=None): """ Generate the pathname of the tile redshift batch script for spectra+coadd+redshifts for a tile Args: tileid (int): Tile ID group (str): cumulative, pernight, perexp, or a custom name night (int): Night expid (int): Exposure ID Returns: (str): the pathname of the tile redshift batch script """ reduxdir = desispec.io.specprod_root() outdir = get_ztile_relpath(tileid,group,night=night,expid=expid) scriptdir = f'{reduxdir}/run/scripts/{outdir}' suffix = get_ztile_script_suffix(tileid,group,night=night,expid=expid) batchscript = f'ztile-{suffix}.slurm' return os.path.join(scriptdir, batchscript)
[docs]def get_ztile_script_suffix(tileid,group,night=None,expid=None): """ Generate the suffix of the tile redshift batch script for spectra+coadd+redshifts for a tile Args: tileid (int): Tile ID group (str): cumulative, pernight, perexp, or a custom name night (int): Night expid (int): Exposure ID Returns: suffix (str): the suffix of the batch script """ log = get_logger() if group == 'cumulative': suffix = f'{tileid}-thru{night}' elif group == 'pernight': suffix = f'{tileid}-{night}' elif group == 'perexp': suffix = f'{tileid}-exp{expid:08d}' elif group == 'pernight-v0': suffix = f'{tileid}-{night}' else: suffix = f'{tileid}-{group}' log.warning(f'Non-standard tile group={group}; writing outputs to {suffix}.*') return suffix
[docs]def get_zpix_redshift_script_pathname(healpix, survey, program): """Return healpix-based coadd+redshift+afterburner script pathname Args: healpix (int or array-like): healpixel(s) survey (str): DESI survey, e.g. main, sv1, sv3 program (str): survey program, e.g. dark, bright, backup Returns: zpix_script_pathname """ if np.isscalar(healpix): healpix = [healpix,] hpixmin = np.min(healpix) hpixmax = np.max(healpix) if len(healpix) == 1: scriptname = f'zpix-{survey}-{program}-{healpix[0]}.slurm' else: scriptname = f'zpix-{survey}-{program}-{hpixmin}-{hpixmax}.slurm' reduxdir = desispec.io.specprod_root() return os.path.join(reduxdir, 'run', 'scripts', 'healpix', survey, program, str(hpixmin//100), scriptname)
[docs]def create_desi_zproc_batch_script(group, tileid=None, cameras=None, thrunight=None, nights=None, expids=None, healpix=None, survey=None, program=None, queue='regular', batch_opts=None, runtime=None, timingfile=None, batchdir=None, jobname=None, cmdline=None, system_name=None, max_gpuprocs=None, no_gpu=False, run_zmtl=False, no_afterburners=False): """ Generate a SLURM batch script to be submitted to the slurm scheduler to run desi_proc. Args: group (str): Description of the job to be performed. zproc options include: 'perexp', 'pernight', 'cumulative'. tileid (int), optional: The tile id for the data. cameras (str or list of str), optional: List of cameras to include in the processing or a camword. thrunight (int), optional: For group=cumulative, include exposures through this night nights (list of int), optional: The nights the data was acquired. expids (list of int), optional: The exposure id(s) for the data. healpix (list of int), optional: healpixels to process (group='healpix') queue (str), optional: Queue to be used. batch_opts (str), optional: Other options to give to the slurm batch scheduler (written into the script). runtime (str), optional: Timeout wall clock time in minutes. timingfile (str), optional: Specify the name of the timing file. batchdir (str), optional: can define an alternative location to write the file. The default is to SPECPROD under run/scripts/tiles/GROUP/TILE/NIGHT jobname (str), optional: name to save this batch script file as and the name of the eventual log file. Script is save within the batchdir directory. cmdline (str or list of str), optional: Complete command as would be given in terminal to run the desi_zproc, or list of args. Can be used instead of reading from argv. system_name (str), optional: name of batch system, e.g. cori-haswell, cori-knl max_gpuprocs (int), optional: Number of gpu processes no_gpu (bool), optional: Default false. If true it doesn't use GPU's even if available. run_zmtl (bool), optional: Default false. If true it runs zmtl. no_afterburners (bool), optional: Default false. If true it doesn't run afterburners. Returns: scriptfile: the full path name for the script written. Note: batchdir and jobname can be used to define an alternative pathname, but may not work with assumptions in the spectro pipeline. """ log = get_logger() if nights is not None: nights = np.asarray(nights, dtype=int) night = np.max(nights) elif thrunight is not None: night = thrunight elif group == 'healpix': if (healpix is None or survey is None or program is None): msg = f"group='healpix' must define healpix,survey,program (got {healpix},{survey},{program})" log.error(msg) raise ValueError(msg) else: msg = f"group {group} must define either nights or thrunight" log.error(msg) raise ValueError(msg) expid = None if group == 'perexp': if expids is None: msg = f"Must define expids for perexp exposure" log.error(msg) raise ValueError(msg) else: expid = expids[0] if group == 'healpix': scriptpath = get_zpix_redshift_script_pathname(healpix, survey, program) else: scriptpath = get_ztile_script_pathname(tileid, group=group, night=night, expid=expid) if cameras is None: cameras = decode_camword('a0123456789') elif np.isscalar(cameras): camword = parse_cameras(cameras) cameras = decode_camword(camword) if batchdir is None: batchdir = os.path.dirname(scriptpath) os.makedirs(batchdir, exist_ok=True) if jobname is None: jobname = os.path.basename(scriptpath).removesuffix('.slurm') if timingfile is None: timingfile = f'{jobname}-timing-$SLURM_JOBID.json' timingfile = os.path.join(batchdir, timingfile) scriptfile = os.path.join(batchdir, jobname + '.slurm') ## If system name isn't specified, guess it if system_name is None: system_name = batch.default_system(jobdesc=group, no_gpu=no_gpu) batch_config = batch.get_config(system_name) threads_per_core = batch_config['threads_per_core'] gpus_per_node = batch_config['gpus_per_node'] if max_gpuprocs is not None and max_gpuprocs < gpus_per_node: gpus_per_node = max_gpuprocs ncameras = len(cameras) nexps = 1 if expids is not None and type(expids) is not str: nexps = len(expids) if cmdline is None: inparams = list(sys.argv).copy() elif np.isscalar(cmdline): inparams = [] for param in cmdline.split(' '): for subparam in param.split("="): inparams.append(subparam) else: # ensure all elements are str (not int or float) inparams = [str(x) for x in cmdline] for parameter in ['--queue', '-q', '--batch-opts']: ## If a parameter is in the list, remove it and its argument ## Elif it is a '--' command, it might be --option=value, which won't be split. ## check for that and remove the whole "--option=value" if parameter in inparams: loc = np.where(np.array(inparams) == parameter)[0][0] # Remove the command inparams.pop(loc) # Remove the argument of the command (now in the command location after pop) inparams.pop(loc) elif '--' in parameter: for ii, inparam in enumerate(inparams.copy()): if parameter in inparam: inparams.pop(ii) break cmd = ' '.join(inparams) cmd = cmd.replace(' --batch', ' ').replace(' --nosubmit', ' ') srun_rr_gpu_opts = '' if not no_gpu: if system_name == 'perlmutter-gpu': if '--max-gpuprocs' not in cmd: cmd += f' --max-gpuprocs {gpus_per_node}' gpumap = ','.join(np.arange(gpus_per_node).astype(str)[::-1]) srun_rr_gpu_opts = f' --gpu-bind=map_gpu:{gpumap}' else: ## no_gpu isn't set, but we want it set since not perlmutter-gpu cmd += ' --no-gpu' if run_zmtl and '--run-zmtl' not in cmd: cmd += ' --run-zmtl' if no_afterburners and '--no-afterburners' not in cmd: cmd += ' --no-afterburners' cmd += ' --starttime $(date +%s)' cmd += f' --timingfile {timingfile}' if '--mpi' not in cmd: cmd += ' --mpi' ncores, nodes, runtime = determine_resources( ncameras, group.upper(), queue=queue, nexps=nexps, forced_runtime=runtime, system_name=system_name) runtime_hh = int(runtime // 60) runtime_mm = int(runtime % 60) with open(scriptfile, 'w') as fx: fx.write('#!/bin/bash -l\n\n') fx.write('#SBATCH -N {}\n'.format(nodes)) fx.write('#SBATCH --qos {}\n'.format(queue)) for opts in batch_config['batch_opts']: fx.write('#SBATCH {}\n'.format(opts)) if batch_opts is not None: fx.write('#SBATCH {}\n'.format(batch_opts)) if system_name == 'perlmutter-gpu' and not no_gpu: # perlmutter-gpu requires projects name with "_g" appended fx.write('#SBATCH --account desi_g\n') else: fx.write('#SBATCH --account desi\n') fx.write('#SBATCH --job-name {}\n'.format(jobname)) fx.write('#SBATCH --output {}/{}-%j.log\n'.format(batchdir, jobname)) fx.write('#SBATCH --time={:02d}:{:02d}:00\n'.format(runtime_hh, runtime_mm)) fx.write('#SBATCH --exclusive\n') fx.write('\n') # batch-friendly matplotlib backend fx.write('export MPLBACKEND=agg\n') # fx.write("export OMP_NUM_THREADS={}\n".format(threads_per_core)) fx.write("export OMP_NUM_THREADS=1\n") #- Special case CFS readonly mount at NERSC #- SB 2023-01-27: disable this since Perlmutter might deprecate /dvs_ro; #- inherit it from the environment but don't hardcode into script itself # if 'DESI_ROOT_READONLY' in os.environ: # readonlydir = os.environ['DESI_ROOT_READONLY'] # elif os.environ['DESI_ROOT'].startswith('/global/cfs/cdirs'): # readonlydir = os.environ['DESI_ROOT'].replace( # '/global/cfs/cdirs', '/dvs_ro/cfs/cdirs', 1) # else: # readonlydir = None # # if readonlydir is not None: # fx.write(f'export DESI_ROOT_READONLY={readonlydir}\n\n') fx.write(f'# using {ncores} cores on {nodes} nodes\n\n') fx.write('echo Starting at $(date)\n') ## Don't currently need for 1 node jobs # mps_wrapper='' # if system_name == 'perlmutter-gpu': # fx.write("export MPICH_GPU_SUPPORT_ENABLED=1\n") # mps_wrapper='desi_mps_wrapper' srun = f"srun -N {nodes} -n {ncores} -c {threads_per_core}" \ + f"{srun_rr_gpu_opts} --cpu-bind=cores {cmd}" fx.write(f"echo RUNNING {srun}\n") fx.write(f'{srun}\n') fx.write('\nif [ $? -eq 0 ]; then\n') fx.write(' echo SUCCESS: done at $(date)\n') fx.write('else\n') fx.write(' echo FAILED: done at $(date)\n') fx.write(' exit 1\n') fx.write('fi\n') print('Wrote {}'.format(scriptfile)) print('logfile will be {}/{}-JOBID.log\n'.format(batchdir, jobname)) return scriptfile
[docs]def read_minimal_exptables_columns(nights=None, tileids=None): """ Read exposure tables while handling evolving formats Args: nights (list of int): nights to include (default all nights found) tileids (list of int): tileids to include (default all tiles found) Returns exptable with just columns TILEID, NIGHT, EXPID, 'CAMWORD', 'BADCAMWORD', filtered by science exposures with LASTSTEP='all' and TILEID>=0 Note: the returned table is the full pipeline exposures table. It is trimmed to science exposures that have LASTSTEP=='all' """ log = get_logger() if nights is None: exptab_path = get_exposure_table_path(night=None) monthglob = '202???' globname = get_exposure_table_name(night='202?????') etab_files = glob.glob(os.path.join(exptab_path, monthglob, globname)) else: etab_files = list() for night in nights: etab_file = get_exposure_table_pathname(night) if os.path.exists(etab_file): etab_files.append(etab_file) elif night >= 20201201: log.error(f"Exposure table missing for night {night}") else: # - these are expected for the daily run, ok log.debug(f"Exposure table missing for night {night}") etab_files = sorted(etab_files) exptables = list() for etab_file in etab_files: ## correct way but slower and we don't need multivalue columns #t = load_table(etab_file, tabletype='etable') t = Table.read(etab_file, format='ascii.csv') ## For backwards compatibility if BADCAMWORD column does not ## exist then add a blank one if 'BADCAMWORD' not in t.colnames: t.add_column(Table.Column(['' for i in range(len(t))], dtype='S36', name='BADCAMWORD')) keep = (t['OBSTYPE'] == 'science') & (t['TILEID'] >= 0) if 'LASTSTEP' in t.colnames: keep &= (t['LASTSTEP'] == 'all') if tileids is not None: # Default false keep &= np.isin(t['TILEID'], tileids) t = t[keep] ## Need to ensure that the string columns are consistent for col in ['CAMWORD', 'BADCAMWORD']: ## Masked arrays need special handling ## else just reassign with consistent dtype if isinstance(t[col], Table.MaskedColumn): ## If compeltely empty it's loaded as type int ## otherwise fill masked with '' if t[col].dtype == int: t[col] = Table.Column(['' for i in range(len(t))], dtype='S36', name=col) else: t[col] = Table.Column(t[col].filled(fill_value=''), dtype='S36', name=col) else: t[col] = Table.Column(t[col], dtype='S36', name=col) exptables.append(t['TILEID', 'NIGHT', 'EXPID', 'CAMWORD', 'BADCAMWORD']) return vstack(exptables)