Source code for desispec.scripts.zproc

"""
desispec.scripts.zproc
======================

One stop shopping for redshifting  DESI spectra
"""

import time
start_imports = time.time()

#- python imports
import datetime
import sys, os, argparse, re
import subprocess
from copy import deepcopy
import json
import glob

#- external 3rd party imports
import numpy as np
import fitsio
from astropy.io import fits
from astropy.table import Table,vstack

#- external desi imports
from redrock.external import desi
import desiutil.timer
from desiutil.log import get_logger, DEBUG, INFO
import desiutil.iers

from desispec.io.meta import get_nights_up_to_date
from desispec.workflow.redshifts import read_minimal_exptables_columns, \
    create_desi_zproc_batch_script

#- internal desispec imports
import desispec.io
from desispec.io import findfile, specprod_root, replace_prefix, shorten_filename, get_readonly_filepath
from desispec.io.util import create_camword, decode_camword, parse_cameras, \
    camword_to_spectros, columns_to_goodcamword, difference_camwords
from desispec.io.util import validate_badamps, get_tempfilename, backup_filename
from desispec.util import runcmd
from desispec.scripts import group_spectra
from desispec.parallel import stdouterr_redirected
from desispec.workflow import batch
from desispec.workflow.exptable import get_exposure_table_pathname
from desispec.workflow.desi_proc_funcs import assign_mpi, update_args_with_headers, log_timer
from desispec.workflow.desi_proc_funcs import determine_resources, create_desi_proc_batch_script

stop_imports = time.time()

#########################################
######## Begin Body of the Code #########
#########################################
[docs]def parse(options=None): """ Parses an argparser object for use with desi_zproc and returns arguments """ parser = argparse.ArgumentParser(usage="{prog} [options]") parser.add_argument("-g", "--groupname", type=str, help="Redshift grouping type: cumulative, perexp, pernight, healpix") #- Options for tile-based redshifts tiles_options = parser.add_argument_group("tile-based options (--groupname perexp, pernight, or cumulative)") tiles_options.add_argument("-t", "--tileid", type=int, default=None, help="Tile ID") tiles_options.add_argument("-n", "--nights", type=int, nargs='*', default=None, help="YEARMMDD night") tiles_options.add_argument("-e", "--expids", type=int, nargs='*', default=None, help="Exposure IDs") tiles_options.add_argument("--thrunight", type=int, default=None, help="Last night to include (YEARMMDD night) for " + "cumulative redshift jobs. Used instead of nights.") tiles_options.add_argument("-c", "--cameras", type=str, help="Subset of cameras to process, either as a camword (e.g. a012)" + "Or a comma separated list (e.g. b0,r0,z0).") #- Options for healpix-based redshifts healpix_options = parser.add_argument_group("healpix-based options (--groupname healpix)") healpix_options.add_argument("-p", "--healpix", type=int, nargs='*', default=None, help="healpix pixels (nested nside=64") healpix_options.add_argument("--survey", help="survey name, e.g. main,sv1,sv3") healpix_options.add_argument("--program", help="program name, e.g. dark,bright,backup,other") healpix_options.add_argument("--expfiles", nargs='*', help="csv files with NIGHT,EXPID,SPECTRO,HEALPIX") healpix_options.add_argument("--prodexpfile", help="production summary exposure file (using pre-generated --expfiles is more efficient)") #- Processing options processing_options = parser.add_argument_group('processing options') processing_options.add_argument("--mpi", action="store_true", help="Use MPI parallelism") processing_options.add_argument("--no-gpu", action="store_true", help="Don't use gpus") processing_options.add_argument("--max-gpuprocs", type=int, default=4, help="Number of GPU prcocesses per node") processing_options.add_argument("--run-zmtl", action="store_true", help="Whether to run zmtl or not") processing_options.add_argument("--no-afterburners", action="store_true", help="Set if you don't want to run afterburners") processing_options.add_argument("--starttime", type=float, help='start time; use "--starttime $(date +%%s)"') processing_options.add_argument("--timingfile", type=str, help='save runtime info to this json file; augment if pre-existing') processing_options.add_argument("-d", "--dryrun", action="store_true", help="show commands only, do not run") #- Batch submission options batch_options = parser.add_argument_group("batch queue options") batch_options.add_argument("--batch", action="store_true", help="Submit a batch job to process this exposure") batch_options.add_argument("--nosubmit", action="store_true", help="Create batch script but don't submit") batch_options.add_argument("-q", "--queue", type=str, default="realtime", help="batch queue to use (default %(default)s)") batch_options.add_argument("--batch-opts", type=str, default=None, help="additional batch commands") batch_options.add_argument("--batch-reservation", type=str, help="batch reservation name") batch_options.add_argument("--batch-dependency", type=str, help="job dependencies passed to sbatch --dependency") batch_options.add_argument("--runtime", type=int, default=None, help="batch runtime in minutes") batch_options.add_argument("--system-name", type=str, default=None, help='Batch system name (cori-haswell, perlmutter-gpu, ...)') if options is not None: args = parser.parse_args(options) else: args = parser.parse_args() return args
def main(args=None, comm=None): if not isinstance(args, argparse.Namespace): args = parse(options=args) if args.starttime is not None: start_time = args.starttime else: start_time = time.time() log = get_logger() start_mpi_connect = time.time() if comm is not None: ## Use the provided comm to determine rank and size rank = comm.rank size = comm.size else: ## Check MPI flags and determine the comm, rank, and size given the arguments comm, rank, size = assign_mpi(do_mpi=args.mpi, do_batch=args.batch, log=log) stop_mpi_connect = time.time() #- set default groupname if needed (cumulative for tiles, otherwise healpix) if args.groupname is None: if args.tileid is not None: args.groupname = 'cumulative' elif args.healpix is not None: args.groupname = 'healpix' else: msg = 'Must specify --tileid or --healpix' log.critical(msg) raise ValueError(msg) #- consistency of options if args.groupname == 'healpix': assert args.healpix is not None, "--groupname healpix requires setting --healpix too" assert args.nights is None, f"--groupname healpix doesn't use --nights {args.nights}" assert args.expids is None, f"--groupname healpix doesn't use --expids {args.expids}" assert args.thrunight is None, f"--groupname healpix doesn't use --thrunight {args.thrunight}" assert args.cameras is None, f"--groupname healpix doesn't use --cameras {args.cameras}" assert (args.expfiles is None) or (args.prodexpfile is None), \ "--groupname healpix use --expfiles OR --prodexpfile but not both" else: assert args.tileid is not None, f"--groupname {args.groupname} requires setting --tileid too" if args.cameras is None: args.cameras = 'a0123456789' if args.expfiles is not None: if args.nights is not None or args.expids is not None: msg = "use --expfiles OR --nights and --expids, but not both" log.error(msg) raise ValueError(msg) today = int(time.strftime('%Y%m%d')) if args.thrunight is not None: if args.groupname not in ['cumulative',]: msg = f"--thrunight only valid for cumulative redshifts." log.error(msg) raise ValueError(msg) #- very early data isn't supported, and future dates aren't supported #- because that implies data are included that don't yet exist elif args.thrunight < 20200214 or args.thrunight > today: msg = f"--thrunight must be between 20200214 and today" log.error(msg) raise ValueError(msg) if args.expids is not None: if args.nights is None: msg = f"Must specify --nights if specifying --expids." log.error(msg) raise ValueError(msg) else: msg = f"Only using exposures specified with --expids {args.expids}" log.info(msg) if args.groupname in ['perexp', 'pernight'] and args.nights is not None: if len(args.nights) > 1: msg = f"Only expect one night for groupname {args.groupname}" \ + f" but received nights={args.nights}." log.error(msg) raise ValueError(msg) if (args.groupname == 'healpix') and (args.expfiles is None) and (args.prodexpfile is None): args.prodexpfile = findfile('exposures') if rank == 0: log.info(f'Using default --prodexpfile {args.prodexpfile}') if not os.path.exists(args.prodexpfile): msg = f'Missing {args.prodexpfile}; please create with desi_tsnr_afterburner or specify different --prodexpfile' if rank == 0: log.critical(msg) raise ValueError(msg) #- redrock non-MPI mode isn't compatible with GPUs, #- so if zproc is running in non-MPI mode, force --no-gpu #- https://github.com/desihub/redrock/issues/223 if (args.mpi == False) and (args.no_gpu == False) and (not args.batch): log.warning("Redrock+GPU currently only works with MPI; since this is non-MPI, forcing --no-gpu") log.warning("See https://github.com/desihub/redrock/issues/223") args.no_gpu = True error_count = 0 if rank == 0: thisfile=os.path.dirname(os.path.abspath(__file__)) thistime=datetime.datetime.fromtimestamp(start_imports).isoformat() log.info(f'rank 0 started {thisfile} at {thistime}') ## Start timer; only print log messages from rank 0 (others are silent) timer = desiutil.timer.Timer(silent=rank>0) ## Fill in timing information for steps before we had the timer created if args.starttime is not None: timer.start('startup', starttime=args.starttime) timer.stop('startup', stoptime=start_imports) timer.start('imports', starttime=start_imports) timer.stop('imports', stoptime=stop_imports) timer.start('mpi_connect', starttime=start_mpi_connect) timer.stop('mpi_connect', stoptime=stop_mpi_connect) ## Freeze IERS after parsing args so that it doesn't bother if only --help timer.start('freeze_iers') ## Redirect all of the freeze_iers messages to /dev/null with stdouterr_redirected(comm=comm): desiutil.iers.freeze_iers() if rank == 0: log.info("Froze iers for all ranks") timer.stop('freeze_iers') timer.start('preflight') ## Derive the available cameras if args.groupname == 'healpix': camword = 'a0123456789' elif isinstance(args.cameras, str): if rank == 0: camword = parse_cameras(args.cameras) else: camword = parse_cameras(args.cameras, loglevel='ERROR') else: camword = create_camword(args.cameras) ## Unpack arguments for shorter names (tileid might be None, ok) tileid, groupname = args.tileid, args.groupname known_groups = ['cumulative', 'pernight', 'perexp', 'healpix'] if groupname not in known_groups: msg = 'obstype {} not in {}'.format(groupname, known_groups) log.error(msg) raise ValueError(msg) if args.batch: err = 0 #------------------------------------------------------------------------- ## Create and submit a batch job if requested if rank == 0: ## create the batch script cmdline = list(sys.argv).copy() scriptfile = create_desi_zproc_batch_script(group=groupname, tileid=tileid, cameras=camword, thrunight=args.thrunight, nights=args.nights, expids=args.expids, healpix=args.healpix, survey=args.survey, program=args.program, queue=args.queue, runtime=args.runtime, batch_opts=args.batch_opts, timingfile=args.timingfile, system_name=args.system_name, no_gpu=args.no_gpu, max_gpuprocs=args.max_gpuprocs, cmdline=cmdline) log.info("Generating batch script and exiting.") if not args.nosubmit and not args.dryrun: err = subprocess.call(['sbatch', scriptfile]) ## All ranks need to exit if submitted batch if comm is not None: err = comm.bcast(err, root=0) sys.exit(err) exposure_table = None hpixexp = None if rank == 0: if groupname != 'healpix' and args.expfiles is not None: tmp = vstack([Table.read(fn) for fn in args.expfiles]) args.expids = list(tmp['EXPID']) args.nights = list(tmp['NIGHT']) if groupname == 'healpix': if args.expfiles is not None: hpixexp = vstack([Table.read(fn) for fn in args.expfiles]) else: from desispec.pixgroup import get_exp2healpix_map hpixexp = get_exp2healpix_map(args.prodexpfile, survey=args.survey, program=args.program) keep = np.isin(hpixexp['HEALPIX'], args.healpix) hpixexp = hpixexp[keep] elif groupname == 'perexp' and args.nights is not None \ and args.cameras is not None and args.expids is not None: assert len(args.expids) == 1, "perexp job should only have one exposure" assert len(args.nights) == 1, "perexp job should only have one night" exposure_table = Table([Table.Column(name='EXPID', data=args.expids), Table.Column(name='NIGHT', data=args.nights), Table.Column(name='CAMWORD', data=[camword]), Table.Column(name='BADCAMWORD', data=[''])]) else: if args.nights is not None: nights = args.nights elif args.thrunight is None: ## None will glob for all nights nights = None else: ## Get list of only nights up to date of thrunight nights = get_nights_up_to_date(args.thrunight) exposure_table = read_minimal_exptables_columns(nights=nights, tileids=[tileid]) if args.expids is not None: exposure_table = exposure_table[np.isin(exposure_table['EXPID'], args.expids)] exposure_table.sort(keys=['EXPID']) ## Should remove, just nice for printouts while performance isn't important if comm is not None: comm.barrier() if groupname == 'healpix': hpixexp = comm.bcast(hpixexp, root=0) else: exposure_table = comm.bcast(exposure_table, root=0) if groupname != 'healpix': if len(exposure_table) == 0: msg = f"Didn't find any exposures!" log.error(msg) raise ValueError(msg) ## Get night and expid information expids = np.unique(exposure_table['EXPID'].data) nights = np.unique(exposure_table['NIGHT'].data) thrunight = np.max(nights) else: expids = None nights = None thrunight = None #------------------------------------------------------------------------# #------------------------ Proceed with running --------------------------# #------------------------------------------------------------------------# ## Print a summary of what we're going to do if rank == 0: log.info('------------------------------') log.info('Groupname {}'.format(groupname)) if args.healpix is not None: log.info(f'Healpixels {args.healpix}') else: log.info(f'Tileid={tileid} nights={nights} expids={expids}') log.info(f'Supplied camword: {camword}') log.info('Output root {}'.format(desispec.io.specprod_root())) if args.run_zmtl: log.info(f'Will be running zmtl') if not args.no_afterburners: log.info(f'Will be running aferburners') log.info('------------------------------') if comm is not None: comm.barrier() ## Derive the available spectrographs if groupname == 'healpix': all_subgroups = args.healpix else: ## Find nights, exposures, and camwords expnight_dict = dict() complete_cam_set = set() camword_set = set(decode_camword(camword)) for erow in exposure_table: key = (erow['EXPID'],erow['NIGHT']) val = set(decode_camword(difference_camwords(erow['CAMWORD'], erow['BADCAMWORD'], suppress_logging=True))) if camword != 'a0123456789': val = camword_set.intersection(val) complete_cam_set = complete_cam_set.union(val) expnight_dict[key] = val all_subgroups = camword_to_spectros(create_camword(list(complete_cam_set)), full_spectros_only=False) if len(all_subgroups) == 0: msg = f"Didn't find any spectrographs! complete_cam_set={complete_cam_set}" log.error(msg) raise ValueError(msg) ## options to be used by findfile for all output files if groupname == 'healpix': findfileopts = dict(groupname=groupname, survey=args.survey, faprogram=args.program) else: findfileopts = dict(night=thrunight, tile=tileid, groupname=groupname) if groupname == 'perexp': assert len(expids) == 1 findfileopts['expid'] = expids[0] timer.stop('preflight') #------------------------------------------------------------------------- ## Do spectral grouping and coadding timer.start('groupspec') nblocks, block_size, block_rank, block_num = \ distribute_ranks_to_blocks(len(all_subgroups), rank=rank, size=size, log=log) if rank == 0: if groupname == 'healpix': for hpix in args.healpix: findfileopts['healpix'] = hpix splog = findfile('spectra', spectrograph=0, logfile=True, **findfileopts) os.makedirs(os.path.dirname(splog), exist_ok=True) else: splog = findfile('spectra', spectrograph=0, logfile=True, **findfileopts) os.makedirs(os.path.dirname(splog), exist_ok=True) if comm is not None: comm.barrier() if block_rank == 0: for i in range(block_num, len(all_subgroups), nblocks): result, success = 0, True if groupname == 'healpix': healpix = all_subgroups[i] log.info(f'Coadding spectra for healpix {healpix}') findfileopts['healpix'] = healpix cframes = [] ii = hpixexp['HEALPIX'] == healpix for night, expid, spectro in hpixexp['NIGHT', 'EXPID', 'SPECTRO'][ii]: for band in ('b', 'r', 'z'): camera = band+str(spectro) filename = findfile('cframe', night=night, expid=expid, camera=camera) if os.path.exists(filename): cframes.append(filename) else: log.warning(f'Missing {filename}') if len(cframes) < 3: log.error(f'healpix {healpix} only has {len(cframes)} cframes; skipping') error_count += 1 continue else: spectro = all_subgroups[i] log.info(f'Coadding spectra for spectrograph {spectro}') findfileopts['spectrograph'] = spectro # generate list of cframes from dict of exposures, nights, and cameras cframes = [] for (expid, night), cameras in expnight_dict.items(): for camera in cameras: if int(spectro) == int(camera[1]): cframes.append(findfile('cframe', night=night, expid=expid, camera=camera)) spectrafile = findfile('spectra', **findfileopts) splog = findfile('spectra', logfile=True, **findfileopts) coaddfile = findfile('coadd', **findfileopts) cmd = f"desi_group_spectra --inframes {' '.join(cframes)} " \ + f"--outfile {spectrafile} " \ + f"--coaddfile {coaddfile} " if groupname == 'healpix': cmd += f"--healpix {healpix} " cmd += f"--header SURVEY={args.survey} PROGRAM={args.program} " else: cmd += "--onetile " cmd += (f"--header SPGRP={groupname} SPGRPVAL={thrunight} " f"NIGHT={thrunight} TILEID={tileid} SPECTRO={spectro} PETAL={spectro} ") if groupname == 'perexp': cmd += f'EXPID={expids[0]} ' cmdargs = cmd.split()[1:] if args.dryrun: if rank == 0: log.info(f"dryrun: Would have run {cmd}") else: with stdouterr_redirected(splog): result, success = runcmd(group_spectra.main, args=cmdargs, inputs=cframes, outputs=[spectrafile, coaddfile]) if not success: log.error(f'desi_group_spectra petal {spectro} failed; see {splog}') error_count += 1 timer.stop('groupspec') if comm is not None: comm.barrier() if rank == 0: log.info("Done with spectra") #------------------------------------------------------------------------- ## Do redshifting timer.start('redrock') for subgroup in all_subgroups: result, success = 0, True if groupname == 'healpix': findfileopts['healpix'] = subgroup else: findfileopts['spectrograph'] = subgroup coaddfile = findfile('coadd', **findfileopts) rrfile = findfile('redrock', **findfileopts) rdfile = findfile('rrdetails', **findfileopts) rrlog = findfile('redrock', logfile=True, **findfileopts) cmd = f"rrdesi_mpi -i {coaddfile} -o {rrfile} -d {rdfile}" if not args.no_gpu: cmd += f' --gpu --max-gpuprocs {args.max_gpuprocs}' cmdargs = cmd.split()[1:] if args.dryrun: if rank == 0: log.info(f"dryrun: Would have run {cmd}") else: with stdouterr_redirected(rrlog, comm=comm): result, success = runcmd(desi.rrdesi, comm=comm, args=cmdargs, inputs=[coaddfile], outputs=[rrfile, rdfile]) ## Since all ranks running redrock, only count failure/success once if rank == 0 and not success: log.error(f'Redrock petal/healpix {subgroup} failed; see {rrlog}') error_count += 1 ## Since all ranks running redrock, ensure we're all moving on to next ## iteration together if comm is not None: comm.barrier() if comm is not None: comm.barrier() timer.stop('redrock') if rank == 0: log.info("Done with redrock") #------------------------------------------------------------------------- ## Do tileqa if a tile (i.e. not for healpix) timer.start('tileqa') if rank == 0 and groupname in ['pernight', 'cumulative']: from desispec.scripts import tileqa result, success = 0, True qafile = findfile('tileqa', **findfileopts) qapng = findfile('tileqapng', **findfileopts) qalog = findfile('tileqa', logfile=True, **findfileopts) infiles = [] for spectro in all_subgroups: findfileopts['spectrograph'] = spectro infiles.append(findfile('coadd', **findfileopts)) infiles.append(findfile('redrock', **findfileopts)) cmd = f"desi_tile_qa -g {groupname} -n {thrunight} -t {tileid}" cmdargs = cmd.split()[1:] if args.dryrun: log.info(f"dryrun: Would have run {cmd} with" + f"outputs {qafile}, {qapng}") else: with stdouterr_redirected(qalog): result, success = runcmd(tileqa.main, args=cmdargs, inputs=infiles, outputs=[qafile, qapng]) ## count failure/success if not success: log.error(f'tileqa failed; see {qalog}') error_count += 1 log.info("Done with tileqa") timer.stop('tileqa') if comm is not None: comm.barrier() #------------------------------------------------------------------------- ## Do zmtl if asked to if args.run_zmtl: from desispec.scripts import makezmtl timer.start('zmtl') if block_rank == 0: for i in range(block_num, len(all_subgroups), nblocks): result, success = 0, True if groupname == 'healpix': findfileopts['healpix'] = all_subgroups[i] else: findfileopts['spectrograph'] = all_subgroups[i] rrfile = findfile('redrock', **findfileopts) zmtlfile = findfile('zmtl', **findfileopts) zmtllog = findfile('zmtl', logfile=True, **findfileopts) cmd = f"make_zmtl_files --input_file {rrfile} --output_file {zmtlfile}" cmdargs = cmd.split()[1:] if args.dryrun: if rank == 0: log.info(f"dryrun: Would have run {cmd}") else: with stdouterr_redirected(zmtllog): result, success = runcmd(makezmtl.main, args=cmdargs, inputs=[rrfile], outputs=[zmtlfile]) if not success: log.error(f'zmtl petal/healpix {all_subgroups[i]} failed; see {zmtllog}') error_count += 1 if rank == 0: log.info("Done with zmtl") timer.stop('zmtl') if comm is not None: comm.barrier() #------------------------------------------------------------------------- ## Do afterburners if asked to if not args.no_afterburners: from desispec.scripts import qsoqn, qsomgii, emlinefit """ for SPECTRO in 0 1 2 3 4 5 6 7 8 9; do coadd=tiles/cumulative/2288/20220918/coadd-$SPECTRO-2288-thru20220918.fits redrock=tiles/cumulative/2288/20220918/redrock-$SPECTRO-2288-thru20220918.fits qsomgii=tiles/cumulative/2288/20220918/qso_mgii-$SPECTRO-2288-thru20220918.fits qsoqn=tiles/cumulative/2288/20220918/qso_qn-$SPECTRO-2288-thru20220918.fits emfit=tiles/cumulative/2288/20220918/emline-$SPECTRO-2288-thru20220918.fits qsomgiilog=tiles/cumulative/2288/20220918/logs/qso_mgii-$SPECTRO-2288-thru20220918.log qsoqnlog=tiles/cumulative/2288/20220918/logs/qso_qn-$SPECTRO-2288-thru20220918.log emfitlog=tiles/cumulative/2288/20220918/logs/emline-$SPECTRO-2288-thru20220918.log cmd="srun -N 1 -n 1 -c 64 --cpu-bind=none desi_qso_mgii_afterburner --coadd $coadd --redrock $redrock --output $qsomgii --target_selection all --save_target all" cmd="srun -N 1 -n 1 -c 64 --cpu-bind=none desi_qso_qn_afterburner --coadd $coadd --redrock $redrock --output $qsoqn --target_selection all --save_target all" cmd="srun -N 1 -n 1 -c 64 --cpu-bind=none desi_emlinefit_afterburner --coadd $coadd --redrock $redrock --output $emfit" """ timer.start('afterburners') nafterburners = 3 nsubgroups = len(all_subgroups) ntasks = nafterburners * nsubgroups # TODO: for 64//3, this creates 4 blocks, with rank 63/64 being block 3 block_rank==0, # which ends up running mgii afterburner twice nblocks, block_size, block_rank, block_num = \ distribute_ranks_to_blocks(ntasks, rank=rank, size=size, log=log) #- Create a subcommunicator with just this rank, e.g. for #- qsoqn afterburner that needs a communicator to pass to #- redrock, but is otherwise only a single rank. if comm is not None: monocomm = comm.Split(color=comm.rank) else: monocomm = None if block_rank == 0: ## If running mutiple afterburners at once, wait some time so ## I/O isn't hit all at once ## afterburner 2 runs with 10s delay, 3 with 20s delay time.sleep(0.2*block_num) for i in range(block_num, ntasks, nblocks): result, success = 0, True ## If running mutiple afterburners at once, wait some time so ## I/O isn't hit all at once ## afterburner 2 runs with 10s delay, 3 with 20s delay #time.sleep(0.2*i) subgroup = all_subgroups[i % nsubgroups] if groupname == 'healpix': findfileopts['healpix'] = subgroup else: findfileopts['spectrograph'] = subgroup coaddfile = findfile('coadd', **findfileopts) rrfile = findfile('redrock', **findfileopts) ## First set of nsubgroups ranks go to desi_qso_mgii_afterburner if i // nsubgroups == 0: log.info(f"rank {rank}, block_rank {block_rank}, block_num {block_num}, is running spectro/healpix {subgroup} for qso mgii") mgiifile = findfile('qso_mgii', **findfileopts) mgiilog = findfile('qso_mgii', logfile=True, **findfileopts) cmd = f"desi_qso_mgii_afterburner --coadd {coaddfile} " \ + f"--redrock {rrfile} --output {mgiifile} " \ + f"--target_selection all --save_target all" cmdargs = cmd.split()[1:] if args.dryrun: if rank == 0: log.info(f"dryrun: Would have run {cmd}") else: with stdouterr_redirected(mgiilog): result, success = runcmd(qsomgii.main, args=cmdargs, inputs=[coaddfile, rrfile], outputs=[mgiifile]) if not success: log.error(f'qsomgii afterburner petal/healpix {subgroup} failed; see {mgiilog}') ## Second set of nsubgroups ranks go to desi_qso_qn_afterburner elif i // nsubgroups == 1: log.info(f"rank {rank}, block_rank {block_rank}, block_num {block_num}, is running spectro/healpix {subgroup} for qso qn") qnfile = findfile('qso_qn', **findfileopts) qnlog = findfile('qso_qn', logfile=True, **findfileopts) cmd = f"desi_qso_qn_afterburner --coadd {coaddfile} " \ + f"--redrock {rrfile} --output {qnfile} " \ + f"--target_selection all --save_target all" cmdargs = cmd.split()[1:] if args.dryrun: if rank == 0: log.info(f"dryrun: Would have run {cmd}") else: with stdouterr_redirected(qnlog): result, success = runcmd(qsoqn.main, args=cmdargs, inputs=[coaddfile, rrfile], outputs=[qnfile], comm=monocomm) if not success: log.error(f'qsoqn afterburner petal/healpix {subgroup} failed; see {qnlog}') ## Third set of nsubgroups ranks go to desi_emlinefit_afterburner elif i // nsubgroups == 2: log.info(f"rank {rank}, block_rank {block_rank}, block_num {block_num}, is running spectro/healpix {subgroup} for emlinefit") emfile = findfile('emline', **findfileopts) emlog = findfile('emline', logfile=True, **findfileopts) cmd = f"desi_emlinefit_afterburner --coadd {coaddfile} " \ + f"--redrock {rrfile} --output {emfile}" cmdargs = cmd.split()[1:] if args.dryrun: if rank == 0: log.info(f"dryrun: Would have run {cmd}") else: with stdouterr_redirected(emlog): result, success = runcmd(emlinefit.main, args=cmdargs, inputs=[coaddfile, rrfile], outputs=[emfile]) if not success: log.error(f'emlinefit afterburner petal/healpix {subgroup} failed; see {emlog}') ## For now only 3 afterburners, so shout if we loop goes higher than that else: log.error(f"Index i={i} // nsubgroups={nsubgroups} should " \ + f"be between 0 and {nafterburners-1}!") if not success: error_count += 1 if rank == 0: log.info("Done with afterburners") timer.stop('afterburners') if comm is not None: comm.barrier() #------------------------------------------------------------------------- ## Collect error count and wrap up if comm is not None: all_error_counts = comm.gather(error_count, root=0) if rank == 0: final_error_count = int(np.sum(all_error_counts)) else: final_error_count = 0 error_count = comm.bcast(final_error_count, root=0) #- save / print timing information log_timer(timer, args.timingfile, comm=comm) if rank == 0: duration_seconds = time.time() - start_time mm = int(duration_seconds) // 60 ss = int(duration_seconds - mm*60) goodbye = f'All done at {time.asctime()}; duration {mm}m{ss}s' if error_count > 0: log.error(f'{error_count} processing errors; see logs above') log.error(goodbye) else: log.info(goodbye) if error_count > 0: sys.exit(int(error_count)) else: return 0
[docs]def distribute_ranks_to_blocks(nblocks, rank=None, size=None, comm=None, log=None, split_comm=False): """ Function to split up a set of ranks of size 'size' into nblock number of blocks or roughly equal size. Args: nblocks (int): the number of blocks to split the ranks into rank (int): the MPI world rank size (int): the number of world MPI ranks comm (object): MPI communicator log (object): logger split_comm (bool): whether to split the world communicator into blocks and return the block communicator Returns: tuple: A tuple containing: * nblocks, int: the achievable number of block based on size * block_size, int: the number of ranks in the assigned block of current rank * block_rank. int: the rank in the assigned block of the current rank * block_num, int: the block number (of nblocks blocks) in which the rank was assigned * block_comm (optional): if split_comm is true, returns a communicator of only the ranks in the current block. Splits from the world communicator """ if rank is None or size is None: if comm is not None: # - Use the provided comm to determine rank and size rank = comm.rank size = comm.size else: msg = 'Either rank and size or comm must be defined. ' msg += f'Received rank={rank}, size={size}, comm={comm}' if log is None: log = get_logger() log.error(msg) raise ValueError(msg) if log is not None and rank == 0: log.info(f"Attempting to split MPI ranks of size {size} into " + f"{nblocks} blocks") if size <= nblocks: nblocks = size block_size = 1 block_rank = 0 block_num = rank if split_comm: block_comm = comm else: # nblocks = nblocks block_num = int(rank / (size/nblocks)) block_rank = int(rank % (size/nblocks)) # Calculate assignment for all ranks to be able to calculate # how many other ranks are in this same block all_ranks = np.arange(size) all_block_num = (all_ranks / (size/nblocks)).astype(int) assert all_block_num[rank] == block_num ii_this_block = all_block_num == block_num block_size = np.sum(ii_this_block) if split_comm: if comm is not None: block_comm = comm.Split(block_num, block_rank) assert block_rank == block_comm.Get_rank() else: block_comm = comm if log is not None: log.info(f"World rank/size: {rank}/{size} mapped to: Block #{block_num}, " + f"block_rank/block_size: {block_rank}/{block_size}") if split_comm: return nblocks, block_size, block_rank, block_num, block_comm else: return nblocks, block_size, block_rank, block_num