#!/usr/bin/env python
"""
Combine individual redrock files into individual zcatalogs for all of a --group type
Anthony Kremin
LBNL
updated Fall 2024
"""
import sys, os, glob, time
import argparse
from astropy.table import Table
import numpy as np
from desispec.parallel import stdouterr_redirected
from desispec.util import runcmd
from desiutil.log import get_logger, DEBUG
from desispec.io.meta import findfile
from desispec.io import specprod_root
from desispec.scripts import zcatalog as zcatalog_script
from desispec.zcatalog import create_summary_catalog
def parse(options=None):
parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument("-g", "--group", type=str, required=True,
help="Add columns specific to this spectral grouping "
"e.g. pernight, perexp, cumulative, uniqpix, healpix")
parser.add_argument('-V', "--cat-version",type=str, required=True,
help="The version number of the output catalogs")
parser.add_argument("-i", "--indir", type=str, default=None,
help="Input directory")
parser.add_argument("-o", "--outdir", type=str, default=None,
help="Output directory without version number included.")
parser.add_argument("--header", type=str, default=None,
help="KEYWORD=VALUE entries to add to the output header")
# parser.add_argument("--survey", type=str, nargs="*", default=None,
# help="DESI survey, e.g. sv1, sv3, main")
parser.add_argument('--nproc', type=int, default=1,
help="Number of multiprocessing processes to use")
parser.add_argument("--minimal", action='store_true',
help="only include minimal output columns")
parser.add_argument('--patch-missing-ivar-w12', action='store_true',
help="Use target files to patch missing FLUX_IVAR_W1/W2 values")
parser.add_argument('--recoadd-fibermap', action='store_true',
help="Re-coadd FIBERMAP from spectra files")
parser.add_argument('--do-not-add-units', action='store_true',
help="Set if you do not want to add units to output catalog from desidatamodel "
"column descriptions")
parser.add_argument('-v', '--verbose', action='store_true',
help="Set log level to DEBUG.")
args = parser.parse_args(options)
return args
[docs]
def _create_summary_catalog_wrapper(kwargs):
"""
Trivial wrapper around create_summary_catalog that takes a dict
and passes the key-value pairs to create_summary_catalog as keyword arguments
"""
return create_summary_catalog(**kwargs)
[docs]
def main(args=None):
"""Entry-point for command-line scripts.
Parameters
----------
args : :class:`list`, optional
A list of arguments to be parsed.
Returns
-------
:class:`int`
An integer suitable for passing to :func:`sys.exit`.
"""
if not isinstance(args, argparse.Namespace):
args = parse(options=args)
if args.verbose:
log=get_logger(DEBUG)
else:
log=get_logger()
## If adding units, check dependencies before doing a lot of work
if not args.do_not_add_units:
try:
import desidatamodel
except ImportError:
log.critical('Unable to import desidatamodel, required to add units.'
+ ' Try "module load desidatamodel" first or use '
+ '--do-not-add-units')
return 1
## Define filetype based on uniqpix/healpix vs otherwise (tiles or custom)
if args.group in ('uniqpix', 'healpix'):
ftype = 'zcat_pix'
else:
ftype = 'zcat_tile'
## Ensure input directory exists
if args.indir is not None and not os.path.exists(args.indir):
log.critical(f"Input directory {args.indir} does not exist.")
return 1
if args.outdir is None:
## since we only care about the directory path, we can
## use dummy values for survey and faprogram
args.outdir = os.path.dirname(findfile(ftype,
version=args.cat_version,
groupname=args.group,
survey='dummy', faprogram='dummy'))
else:
args.outdir = os.path.join(args.outdir, args.cat_version)
log.info(f"Writing outputs to the following directory: {args.outdir}")
logdir = os.path.join(args.outdir, 'logs')
if not os.path.exists(logdir):
log.info(f"Output log directory {logdir} does not exist, creating now.")
os.makedirs(logdir)
## Load the tiles file to know what to run
tilesfile = findfile('tiles')
if os.path.exists(tilesfile):
tiles_tab = Table.read(tilesfile, format='fits')
else:
log.warning(f'Tiles file {tilesfile} does not exist. Trying CSV instead.')
tilesfile = findfile('tiles_csv')
if os.path.exists(tilesfile):
tiles_tab = Table.read(tilesfile, format='ascii.csv')
else:
log.critical(f"Could not find a valid tiles file!")
return 1
# ## if user didn't specify survey or surveys, run over all surveys in tiles file
# if args.survey is None:
# args.survey = np.unique(tiles_tab['SURVEY'])
## Define the generic command to be run each time
cmd = f'desi_zcatalog -g {args.group} --nproc={args.nproc}'
if args.header is not None:
cmd += f" --header='{args.header}'"
for argument, argval in [('-v', args.verbose),
("--minimal", args.minimal),
('--patch-missing-ivar-w12', args.patch_missing_ivar_w12),
('--recoadd-fibermap', args.recoadd_fibermap),
('--do-not-add-units', args.do_not_add_units)]:
if argval:
cmd += f" {argument}"
error_count = 0
survey_program_outfiles = []
#for survey in args.survey:
for survey in np.unique(tiles_tab['SURVEY']):
for program in np.unique(tiles_tab['PROGRAM'][tiles_tab['SURVEY']==survey]):
## note that the version here isn't actually used because we only
## take basename of findfile output
out_fname = os.path.basename(findfile(ftype, groupname=args.group,
survey=survey, faprogram=program,
version=args.cat_version))
outfile = os.path.join(args.outdir, out_fname)
## update the base command with the program and survey information
current_cmd = cmd + f" --survey={survey} --program={program} --outfile={outfile}"
if args.indir is not None:
## the uniqpix/healpix path includes survey and program and the
## zcatalog assumes indir has them included
if args.group in ('uniqpix', 'healpix'):
current_cmd += f" --indir={args.indir}/{survey}/{program}"
else:
current_cmd += f" --indir={args.indir}"
cmdargs = current_cmd.split()[1:]
log.info(f"Running {survey=}, {program=}, to produce {outfile}")
## create a log file with the same name as the output except *.log
log_fname = os.path.splitext(out_fname)[0] + '.log'
outlog = os.path.join(args.outdir, 'logs', log_fname)
## redirect stdout and stderr to the log file and only run if
## outfile doesn't exist
with stdouterr_redirected(outlog):
result, success = runcmd(zcatalog_script.main, args=cmdargs, inputs=[], outputs=[outfile,])
## Save the outfile so we know what infiles to expect for zall
survey_program_outfiles.append(outfile)
## Track the number of failures and report result
if not success:
error_count += 1
log.warning(
f"Failed to produce output: {outfile}, see {outlog}")
else:
log.info(f"Success in producing output: {outfile}")
## If all runs above were successful and running cumulative, uniqpix, or healpix,
## then run zall as well
if error_count == 0 and len(survey_program_outfiles) > 0 and args.group in ['uniqpix', 'healpix', 'cumulative']:
"""
create_summary_catalog(specgroup, indir=None, specprod=None,
all_columns=True, columns_list=None,
output_filename=None)
"""
## note that the version here isn't actually used because we only
## take basename of findfile output
out_fname = os.path.basename(findfile(ftype.replace('zcat', 'zall'),
groupname=args.group,
version=args.cat_version))
outfile = os.path.join(args.outdir, out_fname)
## summary catalog code calls these zpix and ztile instead
if args.group in ('uniqpix', 'healpix'):
specgroup = 'zpix'
else:
specgroup = 'ztile'
## input here is the output file of the previous loop
kwargs = {'specgroup': specgroup, 'indir': args.outdir, 'output_filename': outfile}
log.info(f"Running zall generation to produce {outfile}")
log_fname = os.path.splitext(out_fname)[0] + '.log'
outlog = os.path.join(args.outdir, 'logs', log_fname)
## Redirect logging to seperate file and only run of all files output in the last
## step exist
with stdouterr_redirected(outlog):
result, success = runcmd(_create_summary_catalog_wrapper, args=kwargs,
inputs=survey_program_outfiles, outputs=[outfile])
if not success:
error_count += 1
log.warning(f"Failed to produce output: {outfile}, see {outlog}")
else:
log.info(f"Success for job producing output: {outfile}")
if error_count == 0:
log.info(f"SUCCESS: All done at {time.asctime()}")
else:
log.info(f"{error_count} FAILURES: All done at {time.asctime()}")
return error_count