Source code for desispec.scripts.tile_redshifts

"""
desispec.scripts.tile_redshifts
===============================

"""
import sys, os, glob
import re
import subprocess
import argparse
import numpy as np
from astropy.table import Table, vstack

from desispec.io.util import parse_cameras
from desispec.workflow.redshifts import read_minimal_exptables_columns, \
                                        create_desi_zproc_batch_script
from desiutil.log import get_logger

from desispec.workflow import batch

def parse(options=None):

    parser = argparse.ArgumentParser()
    parser.add_argument("-n", "--nights", type=int, nargs='+', help="YEARMMDD nights")
    parser.add_argument("-t", "--tileid", type=int, help="Tile ID")
    parser.add_argument("-e", "--expids", type=int, nargs='+', help="exposure IDs")
    #parser.add_argument("-s", "--spectrographs", type=str,
    #        help="spectrographs to include, e.g. 0-4,9; includes final number in range")
    parser.add_argument("-g", "--group", type=str, required=True,
                   help="cumulative, pernight, perexp, or a custom name")
    parser.add_argument("--run_zmtl", action="store_true",
                   help="also run make_zmtl_files")
    parser.add_argument("--explist", type=str,
                   help="file with columns TILE NIGHT EXPID to use")
    parser.add_argument("--no-gpu", action="store_true",
                        help="Don't use gpus")
    parser.add_argument("--max-gpuprocs", type=int, default=4,
                        help="Number of GPU prcocesses per node")
    parser.add_argument("--nosubmit", action="store_true",
                   help="generate scripts but don't submit batch jobs")
    parser.add_argument("--no-afterburners", action="store_true",
                   help="Do not run afterburners (like QSO fits)")
    parser.add_argument("--batch-queue", type=str, default='regular',
                   help="batch queue name")
    parser.add_argument("--batch-reservation", type=str,
                   help="batch reservation name")
    parser.add_argument("--batch-dependency", type=str,
                   help="job dependencies passed to sbatch --dependency")
    parser.add_argument("--system-name", type=str, default=batch.default_system(),
                   help="batch system name, e.g. cori-haswell, cori-knl, perlmutter-gpu")

    args = parser.parse_args(options)

    return args

def main(args=None):
    if not isinstance(args, argparse.Namespace):
        args = parse(args)

    batch_scripts, failed_jobs = generate_tile_redshift_scripts(**args.__dict__)
    num_error = len(failed_jobs)
    sys.exit(num_error)

# _allexp is cache of all exposure tables stacked so that we don't have to read all
# of them every time we call generate_tile_redshift_scripts()
_allexp = None

[docs]def reset_allexp_cache(): """ Utility script to reset the _allexp cache to ensure it is re-read from disk """ global _allexp _allexp = None
[docs]def generate_tile_redshift_scripts(group, nights=None, tileid=None, expids=None, explist=None, camword=None, max_gpuprocs=None, no_gpu=False, run_zmtl=False, no_afterburners=False, batch_queue='regular', batch_reservation=None, batch_dependency=None, system_name=None, nosubmit=False): """ Creates a slurm script to run redshifts per tile. By default it also submits the job to Slurm. If nosubmit is True, the script is created but not submitted to Slurm. Args: group (str): Type of coadd redshifts to run. Options are cumulative, pernight, perexp, or a custom name. nights (int, or list or np.array of int's): YEARMMDD nights. tileid (int): Tile ID. expids (int, or list or np.array of int's): Exposure IDs. explist (str): File with columns TILE NIGHT EXPID to use camword (str): camword of cameras to include max_gpuprocs (int): Number of gpu processes no_gpu (bool): Default false. If true it doesn't use GPU's even if available. run_zmtl (bool): If True, also run make_zmtl_files no_afterburners (bool): If True, do not run QSO afterburners batch_queue (str): Batch queue name. Default is 'regular'. batch_reservation (str): Batch reservation name. batch_dependency (str): Job dependencies passed to sbatch --dependency . system_name (str): Batch system name, e.g. cori-haswell, cori-knl, perlmutter-gpu. nosubmit (bool): Generate scripts but don't submit batch jobs. Default is False. Returns: batch_scripts (list of str): The path names of the scripts created during the function call that returned a null batcherr. failed_jobs (list of str): The path names of the scripts created during the function call that returned a batcherr. Note: specify ``spectrographs`` or ``camword`` but not both """ log = get_logger() # - If --tileid, --nights, and --expids are all given, create exptable if ((tileid is not None) and (nights is not None) and (len(nights) == 1) and (expids is not None)): log.info('Creating exposure table from --tileid --nights --expids options') exptable = Table() exptable['EXPID'] = expids exptable['NIGHT'] = nights[0] exptable['TILEID'] = tileid if explist is not None: log.warning('Ignoring --explist, using --tileid --nights --expids') # - otherwise load exposure tables for those nights elif explist is None: if nights is not None: log.info(f'Loading production exposure tables for nights {nights}') else: log.info(f'Loading production exposure tables for all nights') exptable = read_minimal_exptables_columns(nights) else: log.info(f'Loading exposure list from {explist}') if explist.endswith( ('.fits', '.fits.gz') ): exptable = Table.read(explist, format='fits') elif explist.endswith('.csv'): exptable = Table.read(explist, format='ascii.csv') elif explist.endswith('.ecsv'): exptable = Table.read(explist, format='ascii.ecsv') else: exptable = Table.read(explist, format='ascii') if nights is not None: keep = np.in1d(exptable['NIGHT'], nights) exptable = exptable[keep] # - Filter exposure tables by exposure IDs or by tileid # - Note: If exptable was created from --expids --nights --tileid these should # - have no effect, but are left in for code flow simplicity if expids is not None: keep = np.in1d(exptable['EXPID'], expids) exptable = exptable[keep] #expids = np.array(exptable['EXPID']) tileids = np.unique(np.array(exptable['TILEID'])) # - if provided, tileid should be redundant with the tiles in those exps if tileid is not None: if not np.all(exptable['TILEID'] == tileid): log.critical(f'Exposure TILEIDs={tileids} != --tileid={tileid}') sys.exit(1) elif tileid is not None: keep = (exptable['TILEID'] == tileid) exptable = exptable[keep] #expids = np.array(exptable['EXPID']) tileids = np.array([tileid, ]) else: tileids = np.unique(np.array(exptable['TILEID'])) # - anything left? if len(exptable) == 0: log.critical(f'No exposures left after filtering by tileid/nights/expids') sys.exit(1) #if (spectrographs is not None) and (camword is not None): # msg = f'Give {spectrographs=} OR {camword=} but not both' # log.error(msg) # raise ValueError(msg) #if spectrographs is not None: # camword = spectro_to_camword(spectrographs) if camword is not None: if isinstance(camword, str): camword = parse_cameras(camword) else: camword = 'a0123456789' # - If cumulative, find all prior exposures that also observed these tiles # - NOTE: depending upon options, this might re-read all the exptables again # - NOTE: this may not scale well several years into the survey if group == 'cumulative': log.info(f'{len(tileids)} tiles; searching for exposures on prior nights') global _allexp if _allexp is None: log.info(f'Reading all exposure_tables from all nights') _allexp = read_minimal_exptables_columns() keep = np.in1d(_allexp['TILEID'], tileids) newexptable = _allexp[keep] if exptable is not None: expids = exptable['EXPID'] missing_exps = np.in1d(expids, newexptable['EXPID'], invert=True) if np.any(missing_exps): latest_exptable = read_minimal_exptables_columns(nights=np.unique(exptable['NIGHT'][missing_exps])) keep = np.in1d(latest_exptable['EXPID'], expids[missing_exps]) latest_exptable = latest_exptable[keep] newexptable = vstack([newexptable, latest_exptable]) newexptable.sort(['EXPID']) exptable = newexptable ## Ensure we only include data for nights up to and including specified nights if nights is not None: lastnight = int(np.max(nights)) exptable = exptable[exptable['NIGHT'] <= lastnight] #expids = np.array(exptable['EXPID']) tileids = np.unique(np.array(exptable['TILEID'])) # - Generate the scripts and optionally submit them failed_jobs, batch_scripts = list(), list() for tileid in tileids: tilerows = (exptable['TILEID'] == tileid) inights = np.unique(np.array(exptable['NIGHT'][tilerows])) iexpids = np.unique(np.array(exptable['EXPID'][tilerows])) log.info(f'Tile {tileid} nights={inights} expids={iexpids}') submit = (not nosubmit) opts = dict( camword=camword, submit=submit, max_gpuprocs=max_gpuprocs, no_gpu=no_gpu, run_zmtl=run_zmtl, no_afterburners=no_afterburners, queue=batch_queue, reservation=batch_reservation, dependency=batch_dependency, system_name=system_name, ) if group == 'perexp': for i in range(len(exptable[tilerows])): batchscript, batcherr = batch_tile_redshifts( tileid, exptable[tilerows][i:i + 1], group, **opts) elif group in ['pernight', 'pernight-v0']: for night in inights: thisnight = exptable['NIGHT'] == night batchscript, batcherr = batch_tile_redshifts( tileid, exptable[tilerows & thisnight], group, **opts) else: batchscript, batcherr = batch_tile_redshifts( tileid, exptable[tilerows], group, **opts) if batcherr != 0: failed_jobs.append(batchscript) else: batch_scripts.append(batchscript) #- Report num_error but don't sys.exit for pipeline workflow needs, do that at script level num_error = len(failed_jobs) if num_error > 0: tmp = [os.path.basename(filename) for filename in failed_jobs] log.error(f'problem submitting {num_error} scripts: {tmp}') #- Return batch_scripts for use in pipeline and failed_jobs for explicit exit code in script return batch_scripts, failed_jobs
[docs]def batch_tile_redshifts(tileid, exptable, group, camword=None, submit=False, queue='regular', reservation=None, max_gpuprocs=None, no_gpu=False, dependency=None, system_name=None, run_zmtl=False, no_afterburners=False): """ Generate batch script for spectra+coadd+redshifts for a tile Args: tileid (int): Tile ID exptable (Table): has columns NIGHT EXPID to use; ignores other columns. Doesn't need to be full pipeline exposures table (but could be) group (str): cumulative, pernight, perexp, or a custom name Options: camword (str): camword of cameras to include submit (bool): also submit batch script to queue queue (str): batch queue name reservation (str): batch reservation name max_gpuprocs (int): Number of gpu processes no_gpu (bool): Default false. If true it doesn't use GPU's even if available. dependency (str): passed to sbatch --dependency upon submit system_name (str): batch system name, e.g. cori-haswell, perlmutter-gpu run_zmtl (bool): if True, also run make_zmtl_files no_afterburners (bool): if True, do not run QSO afterburners Returns tuple (scriptpath, error): scriptpath (str): full path to generated script err (int): return code from submitting job (0 if submit=False) By default this generates the script but don't submit it """ log = get_logger() if camword is None: camword = 'a0123456789' if (group == 'perexp') and len(exptable)>1: msg = f'group=perexp requires 1 exptable row, not {len(exptable)}' log.error(msg) raise ValueError(msg) nights = np.unique(np.asarray(exptable['NIGHT'])) if (group in ['pernight', 'pernight-v0']) and len(nights)>1: msg = f'group=pernight requires all exptable rows to be same night, not {nights}' log.error(msg) raise ValueError(msg) tileids = np.unique(np.asarray(exptable['TILEID'])) if len(tileids)>1: msg = f'batch_tile_redshifts requires all exptable rows to be same tileid, not {tileids}' log.error(msg) raise ValueError(msg) elif len(tileids) == 1 and tileids[0] != tileid: msg = f'Specified tileid={tileid} didnt match tileid given in exptable, {tileids}' log.error(msg) raise ValueError(msg) #- Be explicit about naming. Night should be the most recent Night. #- Expid only used for labeling perexp, for which there is only one row here anyway expids = np.unique(np.asarray(exptable['EXPID'])) cmdline = ['desi_zproc', '-t', str(tileid), '-g', group, '-n', ' '.join(nights.astype(str)), '-e', ' '.join(expids.astype(str)), '-c', camword, '--mpi'] if run_zmtl: cmdline.append('--run-zmtl') if no_afterburners: cmdline.append('--no-afterburners') scriptfile = create_desi_zproc_batch_script(group=group, tileid=tileid, cameras=camword, nights=nights, expids=expids, queue=queue, cmdline=cmdline, system_name=system_name, max_gpuprocs=max_gpuprocs, no_gpu=no_gpu) err = 0 if submit: cmd = ['sbatch' ,] if reservation: cmd.extend(['--reservation', reservation]) if dependency: cmd.extend(['--dependency', dependency]) # - sbatch requires the script to be last, after all options cmd.append(scriptfile) err = subprocess.call(cmd) basename = os.path.basename(scriptfile) if err == 0: log.info(f'Submitted {basename}') else: log.error(f'Error {err} submitting {basename}') return scriptfile, err