#!/usr/bin/env python

Combine individual redrock files into a single zcatalog

Stephen Bailey
Lawrence Berkeley National Lab
Fall 2015

substantially updated Fall 2023

from __future__ import absolute_import, division, print_function

import sys, os, glob
import argparse
import importlib.resources
import multiprocessing as mp

import numpy as np
from numpy.lib.recfunctions import append_fields

import fitsio
from astropy.table import Table, hstack, vstack

from desiutil.log import get_logger
from desispec import io
from desispec.zcatalog import find_primary_spectra
from import get_tempfilename, checkgzip, replace_prefix, write_bintable
from import read_table
from desispec.coaddition import coadd_fibermap
from desispec.util import parse_keyval
from desiutil.annotate import load_csv_units
import desiutil.depend

[docs]def load_sv1_ivar_w12(hpix, targetids): """ Load FLUX_IVAR_W1/W2 from sv1 target files for requested targetids Args: hpix (int): nside=8 nested healpix targetids (array): TARGETIDs to include Returns table of TARGETID, FLUX_IVAR_W1, FLUX_IVAR_W2 Note: this is only for the special case of sv1 dark/bright and the FLUX_IVAR_W1/W2 columns which were not included in fiberassign for tiles designed before 20201212. Note: nside=8 nested healpix is hardcodes for simplicity because that is what was used for sv1 target selection and this is not trying to be a more generic targetid lookup function. """ log = get_logger() #- the targets could come from any version of desitarget, so search all, #- but once a TARGETID is found it will be the same answer (for FLUX_IVAR*) #- as any other version because it is propagated from the same dr9 input #- Tractor files. targetdir = os.path.join(os.environ['DESI_TARGET'], 'catalogs', 'dr9') fileglob = f'{targetdir}/*/targets/sv1/resolve/*/sv1targets-*-hp-{hpix}.fits' sv1targetfiles = sorted(glob.glob(fileglob)) nfiles = len(sv1targetfiles) ntarg = len(np.unique(targetids))'Searching {nfiles} sv1 target files for {ntarg} targets in nside=8 healpix={hpix}') columns = ['TARGETID', 'FLUX_IVAR_W1', 'FLUX_IVAR_W2'] targets = list() found_targetids = list() for filename in sv1targetfiles: tx =, 1, columns=columns) keep = np.isin(tx['TARGETID'], targetids) keep &= ~np.isin(tx['TARGETID'], found_targetids) targets.append(tx[keep]) found_targetids.extend(tx['TARGETID'][keep]) if np.all(np.isin(targetids, found_targetids)): break targets = np.hstack(targets) missing = np.isin(targetids, targets['TARGETID'], invert=True) if np.any(missing): nmissing = np.sum(missing) log.error(f'{nmissing} TARGETIDs not found in sv1 healpix={hpix}') return targets
[docs]def _wrap_read_redrock(optdict): """read_redrock wrapper to expand dictionary of named args for multiprocessing""" return read_redrock(**optdict)
[docs]def read_redrock(rrfile, group=None, recoadd_fibermap=False, minimal=False, pertile=False, counter=None): """ Read a Redrock file, combining REDSHIFTS, FIBERMAP, and TSNR2 HDUs Args: rrfile (str): full path to redrock filename Options: group (str): add group-specific columns for cumulative, pernight, healpix readcoadd_fibermap (bool): recoadd fibermap from spectra file in same dir minimal (bool): only propagate minimal subet of columns pertile (bool): input Redrock file is single tile (not healpix) counter (tuple): (i,n) log loading ith file out of n Returns (zcat, expfibermap) where zcat is a join of the redrock REDSHIFTS catalog and the coadded FIBERMAP """ log = get_logger() if counter is not None: i, n = counter'Reading {i}/{n} {rrfile}') else:'Reading {rrfile}') with fitsio.FITS(rrfile) as fx: hdr = fx[0].read_header() if group is not None and 'SPGRP' in hdr and \ hdr['SPGRP'] != group: log.warning("Skipping {} with SPGRP {} != group {}".format( rrfile, hdr['SPGRP'], group)) return None redshifts = fx['REDSHIFTS'].read() if recoadd_fibermap: spectra_filename = checkgzip(replace_prefix(rrfile, 'redrock', 'spectra'))'Recoadding fibermap from %s', os.path.basename(spectra_filename)) fibermap_orig = read_table(spectra_filename) fibermap, expfibermap = coadd_fibermap(fibermap_orig, onetile=pertile) else: fibermap = Table(fx['FIBERMAP'].read()) expfibermap = fx['EXP_FIBERMAP'].read() tsnr2 = fx['TSNR2'].read() assert np.all(redshifts['TARGETID'] == fibermap['TARGETID']) assert np.all(redshifts['TARGETID'] == tsnr2['TARGETID']) if minimal: # basic set of target information fmcols = ['TARGET_RA', 'TARGET_DEC', 'FLUX_G', 'FLUX_R', 'FLUX_Z'] # add targeting columns for colname in fibermap.dtype.names: if colname.endswith('_TARGET') and colname != 'FA_TARGET': fmcols.append(colname) # add columns needed for uniqueness that differ for healpix vs. tiles extracols = ['TILEID', 'LASTNIGHT', 'HEALPIX', 'SURVEY', 'PROGRAM'] for colname in extracols: if colname in fibermap.dtype.names: fmcols.append(colname) # NIGHT header -> fibermap LASTNIGHT if ('LASTNIGHT' not in fmcols) and ('NIGHT' in hdr): fibermap['LASTNIGHT'] = np.int32(hdr['NIGHT']) fmcols.append('LASTNIGHT') data = hstack( [Table(redshifts), Table(fibermap[fmcols])] ) else: fmcols = list(fibermap.dtype.names) fmcols.remove('TARGETID') if tsnr2 is not None: tsnr2cols = list(tsnr2.dtype.names) tsnr2cols.remove('TARGETID') data = hstack([ Table(redshifts), Table(fibermap[fmcols]), Table(tsnr2[tsnr2cols]), ]) else: data = hstack( [Table(redshifts), Table(fibermap[fmcols])] ) #- Add group specific columns, recognizing some some of them may #- have already been inherited from the fibermap. #- Put these columns right after TARGETID nrows = len(data) icol = 1 if group in ('perexp', 'pernight', 'cumulative'): if 'TILEID' not in data.colnames: data.add_column(np.full(nrows, hdr['TILEID'], dtype=np.int32), index=icol, name='TILEID') icol += 1 if 'PETAL_LOC' not in data.colnames: data.add_column(np.full(nrows, hdr['PETAL'], dtype=np.int16), index=icol, name='PETAL_LOC') icol += 1 if group == 'perexp': data.add_column(np.full(nrows, hdr['NIGHT'], dtype=np.int32), index=icol, name='NIGHT') icol += 1 data.add_column(np.full(nrows, hdr['EXPID'], dtype=np.int32), index=icol, name='EXPID') elif group == 'pernight': data.add_column(np.full(nrows, hdr['NIGHT'], dtype=np.int32), index=icol, name='NIGHT') elif group == 'cumulative': if 'LASTNIGHT' not in data.colnames: data.add_column(np.full(nrows, hdr['NIGHT'], dtype=np.int32), index=icol, name='LASTNIGHT') elif group == 'healpix': data.add_column(np.full(nrows, hdr['HPXPIXEL'], dtype=np.int32), index=icol, name='HEALPIX') icol += 1 # SPGRPVAL = night for pernight, expid for perexp, subset for custom coadds if 'SPGRPVAL' in hdr.keys(): val = hdr['SPGRPVAL'] # if int, try to make int32, otherwise let numpy pick dtype if isinstance(val, int): if np.int32(val) == val: dtype = np.int32 else: dtype = np.int64 else: dtype = None data.add_column(np.full(nrows, hdr['SPGRPVAL'], dtype=dtype), index=icol, name='SPGRPVAL') else: log.warning(f'SPGRPVAL keyword missing from {rrfile}') return data, expfibermap
#-------------------------------------------------------------------------- def parse(options=None): parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter) parser.add_argument("-i", "--indir", type=str, help="input directory") parser.add_argument("-o", "--outfile",type=str, help="output file") parser.add_argument("--minimal", action='store_true', help="only include minimal output columns") parser.add_argument("-t", "--tiles", type=str, help="ascii file with tileids to include (one per line)") parser.add_argument("--survey", type=str, help="DESI survey, e.g. sv1, sv3, main") parser.add_argument("--program", type=str, help="DESI program, e.g bright, dark") parser.add_argument("-g", "--group", type=str, help="Add columns specific to this spectral grouping " "e.g. pernight adds NIGHT column from input header keyword") parser.add_argument("--header", type=str, nargs="*", help="KEYWORD=VALUE entries to add to the output header") parser.add_argument('--patch-missing-ivar-w12', action='store_true', help="Use target files to patch missing FLUX_IVAR_W1/W2 values") parser.add_argument('--recoadd-fibermap', action='store_true', help="Re-coadd FIBERMAP from spectra files") parser.add_argument('--add-units', action='store_true', help="Add units to output catalog from desidatamodel " "column descriptions") parser.add_argument('--nproc', type=int, default=1, help="Number of multiprocessing processes to use") args = parser.parse_args(options) return args def main(args=None): if not isinstance(args, argparse.Namespace): args = parse(options=args) log=get_logger() if args.outfile is None: args.outfile = io.findfile('zcatalog') #- If adding units, check dependencies before doing a lot of work if args.add_units: try: import desidatamodel except ImportError: log.critical('Unable to import desidatamodel, required to add units (try "module load desidatamodel" first)') sys.exit(1) if args.indir: indir = args.indir redrockfiles = sorted(io.iterfiles(f'{indir}', prefix='redrock', suffix='.fits')) pertile = ( != 'healpix') # assume tile-based input unless explicitely healpix elif == 'healpix': pertile = False survey = args.survey if args.survey is not None else "*" program = args.program if args.program is not None else "*" indir = os.path.join(io.specprod_root(), 'healpix') #- specprod/healpix/SURVEY/PROGRAM/HPIXGROUP/HPIX/redrock*.fits globstr = os.path.join(indir, survey, program, '*', '*', 'redrock*.fits')'Looking for healpix redrock files in {globstr}') redrockfiles = sorted(glob.glob(globstr)) else: pertile = True tilefile = args.tiles if args.tiles is not None else io.findfile('tiles') indir = os.path.join(io.specprod_root(), 'tiles','Loading tiles from {tilefile}') tiles = if args.survey is not None: keep = tiles['SURVEY'] == args.survey tiles = tiles[keep] if len(tiles) == 0: log.critical(f'No tiles kept after filtering by SURVEY={args.survey}') sys.exit(1) if args.program is not None: keep = tiles['PROGRAM'] == args.program tiles = tiles[keep] if len(tiles) == 0: log.critical(f'No tiles kept after filtering by PROGRAM={args.program}') sys.exit(1) tileids = tiles['TILEID'] redrockfiles = list() for tileid in tileids: tmp = sorted(io.iterfiles(f'{indir}/{tileid}', prefix='redrock', suffix='.fits')) if len(tmp) > 0: redrockfiles.extend(tmp) else: log.error(f'no redrock files found in {indir}/{tileid}') nfiles = len(redrockfiles) if nfiles == 0: msg = f'No redrock files found in {indir}' log.critical(msg) raise ValueError(msg)'Reading {nfiles} redrock files') #- build list of args to support multiprocessing parallelism read_args = list() for ifile, rrfile in enumerate(redrockfiles): read_args.append(dict(rrfile=rrfile,, pertile=pertile, recoadd_fibermap=args.recoadd_fibermap, minimal=args.minimal, counter=(ifile+1, nfiles))) #- Read individual Redrock files if args.nproc>1: from multiprocessing import Pool with Pool(args.nproc) as pool: results =, read_args) else: results = [_wrap_read_redrock(a) for a in read_args] #- Stack catalogs zcatdata = list() exp_fibermaps = list() dependencies = dict() for data, expfibermap in results: if data is not None: desiutil.depend.mergedep(data.meta, dependencies) desiutil.depend.remove_dependencies(data.meta) zcatdata.append(data) if expfibermap is not None: exp_fibermaps.append(expfibermap)'Stacking zcat') zcat = vstack(zcatdata) desiutil.depend.mergedep(dependencies, zcat.meta) if exp_fibermaps:'Stacking exposure fibermaps') expfm = np.hstack(exp_fibermaps) else: expfm = None #- Add FIRSTNIGHT for tile-based cumulative catalogs #- (LASTNIGHT was added while reading from NIGHT header keyword) if == 'cumulative' and expfm is not None and 'FIRSTNIGHT' not in zcat.colnames:'Adding FIRSTNIGHT per tile') icol = zcat.colnames.index('LASTNIGHT') zcat.add_column(np.zeros(len(zcat), dtype=np.int32), index=icol, name='FIRSTNIGHT') for tilefm in Table(expfm[['TILEID', 'NIGHT']]).group_by('TILEID').groups: tileid = tilefm['TILEID'][0] iitile = zcat['TILEID'] == tileid zcat['FIRSTNIGHT'][iitile] = np.min(tilefm['NIGHT']) #- all FIRSTNIGHT entries should be filled (no more zeros) bad = zcat['FIRSTNIGHT'] == 0 if np.any(bad): badtiles = np.unique(zcat['TILEID'][bad]) raise ValueError(f'FIRSTNIGHT not set for tiles {badtiles}') #- if TARGETIDs appear more than once, which one is best within this catalog? if 'TSNR2_LRG' in zcat.colnames and 'ZWARN' in zcat.colnames:'Finding best spectrum for each target') nspec, primary = find_primary_spectra(zcat) zcat['ZCAT_NSPEC'] = nspec.astype(np.int16) zcat['ZCAT_PRIMARY'] = primary else:'Missing TSNR2_LRG or ZWARN; not adding ZCAT_PRIMARY/_NSPEC') #- Used for fuji, should not be needed for later prods if args.patch_missing_ivar_w12: from desimodel.footprint import radec2pix missing = (zcat['FLUX_IVAR_W1'] < 0) | (zcat['FLUX_IVAR_W2'] < 0) missing &= zcat['OBJTYPE'] == 'TGT' missing &= zcat['TARGETID'] > 0 if not np.any(missing):'No targets missing FLUX_IVAR_W1/W2 to patch') else: #- Load targets from sv1 targeting files ra = zcat['TARGET_RA'] dec = zcat['TARGET_DEC'] nside = 8 #- use for sv1 targeting hpix8 = radec2pix(nside, ra, dec) for hpix in np.unique(hpix8[missing]): hpixmiss = (hpix == hpix8) & missing targets = load_sv1_ivar_w12(hpix, zcat['TARGETID'][hpixmiss]) #- create dict[TARGETID] -> row number targetid2idx = dict(zip(targets['TARGETID'], np.arange(len(targets)))) #- patch missing values, if they are in the targets file for i in np.where(hpixmiss)[0]: tid = zcat['TARGETID'][i] try: j = targetid2idx[ tid ] zcat['FLUX_IVAR_W1'][i] = targets['FLUX_IVAR_W1'][j] zcat['FLUX_IVAR_W2'][i] = targets['FLUX_IVAR_W2'][j] except KeyError: log.warning(f'TARGETID {tid} (row {i}) not found in sv1 targets') #- we're done adding columns, convert to numpy array for fitsio zcat = np.array(zcat) #- Inherit header from first input, but remove keywords that don't apply #- across multiple files header = fitsio.read_header(redrockfiles[0], 0) for key in ['SPGRPVAL', 'TILEID', 'SPECTRO', 'PETAL', 'NIGHT', 'EXPID', 'HPXPIXEL', 'NAXIS', 'BITPIX', 'SIMPLE', 'EXTEND']: if key in header: header.delete(key) #- Intercept previous incorrect boolean special cases if 'HPXNEST' in header: if header['HPXNEST'] == 'True':"Correcting header HPXNEST='True' string to boolean True") header['HPXNEST'] = True elif header['HPXNEST'] == 'False': # False is not expected for DESI, but cover it for completeness"Correcting header HPXNEST='False' string to boolean False") header['HPXNEST'] = False #- Add extra keywords if requested if args.header is not None: for keyval in args.header: key, value = parse_keyval(keyval) header[key] = value if args.survey is not None: header['SURVEY'] = args.survey if args.program is not None: header['PROGRAM'] = args.program #- Add units if requested if args.add_units: datamodeldir = str(importlib.resources.files('desidatamodel')) unitsfile = os.path.join(datamodeldir, 'data', 'column_descriptions.csv')'Adding units from {unitsfile}') units, comments = load_csv_units(unitsfile) else: units = dict() comments = dict()'Writing {args.outfile}') tmpfile = get_tempfilename(args.outfile) write_bintable(tmpfile, zcat, header=header, extname='ZCATALOG', units=units, clobber=True) if not args.minimal and expfm is not None: write_bintable(tmpfile, expfm, extname='EXP_FIBERMAP', units=units) os.rename(tmpfile, args.outfile)"Successfully wrote {}".format(args.outfile))