Source code for desispec.scripts.zcatalog

#!/usr/bin/env python

"""
Combine individual redrock files into a single zcatalog

Stephen Bailey
Lawrence Berkeley National Lab
Fall 2015

substantially updated Fall 2023
updated again Summer 2025
"""

from __future__ import absolute_import, division, print_function

import sys, os, glob, time
import itertools
import argparse
import importlib.resources
import multiprocessing as mp

import numpy as np
from numpy.lib.recfunctions import append_fields, drop_fields

import fitsio
from astropy.table import Table, hstack, vstack
import astropy.constants

from desiutil.log import get_logger, DEBUG
from desiutil.annotate import load_csv_units
from desiutil.names import radec_to_desiname
import desiutil.depend
import desiutil.healpix

from desitarget.targetmask import desi_mask

from desispec import io
from desispec.zcatalog import find_primary_spectra
from desispec.io.util import get_tempfilename, checkgzip, replace_prefix, write_bintable
from desispec.io.meta import get_readonly_filepath
from desispec.io.table import read_table
from desispec.coaddition import coadd_fibermap
from desispec.specscore import compute_coadd_tsnr_scores
from desispec.util import parse_keyval
from desispec import validredshifts
from desispec.tsnr import tsnr2_to_efftime, program_to_tsnr2_colname

[docs] def load_sv1_ivar_w12(hpix, targetids): """ Load FLUX_IVAR_W1/W2 from sv1 target files for requested targetids Args: hpix (int): nside=8 nested healpix targetids (array): TARGETIDs to include Returns table of TARGETID, FLUX_IVAR_W1, FLUX_IVAR_W2 Note: this is only for the special case of sv1 dark/bright and the FLUX_IVAR_W1/W2 columns which were not included in fiberassign for tiles designed before 20201212. Note: nside=8 nested healpix is hardcodes for simplicity because that is what was used for sv1 target selection and this is not trying to be a more generic targetid lookup function. """ log = get_logger() #- the targets could come from any version of desitarget, so search all, #- but once a TARGETID is found it will be the same answer (for FLUX_IVAR*) #- as any other version because it is propagated from the same dr9 input #- Tractor files. targetdir = os.path.join(os.environ['DESI_TARGET'], 'catalogs', 'dr9') fileglob = f'{targetdir}/*/targets/sv1/resolve/*/sv1targets-*-hp-{hpix}.fits' sv1targetfiles = sorted(glob.glob(fileglob)) nfiles = len(sv1targetfiles) ntarg = len(np.unique(targetids)) log.info(f'Searching {nfiles} sv1 target files for {ntarg} targets in nside=8 healpix={hpix}') columns = ['TARGETID', 'FLUX_IVAR_W1', 'FLUX_IVAR_W2'] targets = list() found_targetids = list() for filename in sv1targetfiles: tx = fitsio.read(filename, 1, columns=columns) keep = np.isin(tx['TARGETID'], targetids) keep &= ~np.isin(tx['TARGETID'], found_targetids) targets.append(tx[keep]) found_targetids.extend(tx['TARGETID'][keep]) if np.all(np.isin(targetids, found_targetids)): break targets = np.hstack(targets) missing = np.isin(targetids, targets['TARGETID'], invert=True) if np.any(missing): nmissing = np.sum(missing) log.error(f'{nmissing} TARGETIDs not found in sv1 healpix={hpix}') return targets
[docs] def _wrap_read_redrock(optdict): """read_redrock wrapper to expand dictionary of named args for multiprocessing""" return read_redrock(**optdict)
[docs] def read_redrock(rrfile, group=None, pertile=False, counter=None): """ Read Redrock, emline, mgii, and qso_qn files, combining HDUs into single table Args: rrfile (str): full path to Redrock filename; others will be derived from this Options: group (str): add group-specific columns for cumulative, pernight, healpix pertile (bool): input Redrock file is single tile (not healpix) counter (tuple): (i,n) log loading ith file out of n Returns (zcat, expfibermap) where zcat is a join of the redrock REDSHIFTS catalog and the coadded FIBERMAP """ log = get_logger() if counter is not None: i, n = counter log.info(f'Reading {i}/{n} {rrfile}') else: log.info(f'Reading {rrfile}') with fitsio.FITS(rrfile) as fx: hdr = fx[0].read_header() if group is not None and 'SPGRP' in hdr and \ hdr['SPGRP'] != group: log.warning("Skipping {} with SPGRP {} != group {}".format( rrfile, hdr['SPGRP'], group)) return None if 'REDSHIFTS' in fx: redshifts = Table(fx['REDSHIFTS'].read()) zbest_file = False elif 'ZBEST' in fx: redshifts = Table(fx['ZBEST'].read()) zbest_file = True else: raise IOError(f'Unable to find REDSHIFTS or ZBEST HDU in {rrfile}') if zbest_file: spectra_filename = checkgzip(replace_prefix(rrfile, 'zbest', 'spectra')) log.info('Recoadding fibermap from %s.', os.path.basename(spectra_filename)) fibermap_orig = read_table(spectra_filename) fibermap, expfibermap = coadd_fibermap(fibermap_orig, onetile=pertile) if zbest_file: fibermap.sort(['TARGETID']) else: fibermap = Table(fx['FIBERMAP'].read()) expfibermap = Table(fx['EXP_FIBERMAP'].read()) assert np.all(redshifts['TARGETID'] == fibermap['TARGETID']) try: tsnr2 = Table(fx['TSNR2'].read()) except IOError: if zbest_file: # # zbest files do not have a TSNR2 HDU. # log.info('Computing TSNR2 from SCORES HDU in %s.', os.path.basename(spectra_filename)) scores = read_table(spectra_filename, ext='SCORES') if 'TARGETID' not in scores.colnames: assert len(scores) == len(fibermap_orig) scores['TARGETID'] = fibermap_orig['TARGETID'] tsnr2 = Table(compute_coadd_tsnr_scores(scores)[0]) tsnr2.sort(['TARGETID']) else: log.warning("Unable to obtain TSNR2 information for %s!", rrfile) tsnr2 = None if tsnr2 is not None: assert np.all(redshifts['TARGETID'] == tsnr2['TARGETID']) # # Fill missing columns with dummy values. # for band in ('B', 'R', 'Z', ''): for targ in ('ELG', 'GPBBACKUP', 'GPBBRIGHT', 'GPBDARK', 'LYA', 'BGS', 'QSO', 'LRG'): if band: colname = f'TSNR2_{targ}_{band}' else: colname = f'TSNR2_{targ}' if colname not in tsnr2.dtype.names: # This should work for both numpy arrays and Tables. log.warning("TSNR2 table is missing %s, filling with dummy values.", colname) if isinstance(tsnr2, Table): tsnr2[colname] = np.zeros((len(tsnr2), ), dtype=np.float32) else: tsnr2 = append_fields(tsnr2, colname, np.zeros(tsnr2.shape, dtype=np.float32), dtypes=np.float32) emline_file = replace_prefix(rrfile, 'redrock', 'emline') qso_mgii_file = replace_prefix(rrfile, 'redrock', 'qso_mgii') qso_qn_file = replace_prefix(rrfile, 'redrock', 'qso_qn') with fitsio.FITS(emline_file) as fx: emline = Table(fx['EMLINEFIT'].read()) with fitsio.FITS(qso_mgii_file) as fx: qso_mgii = Table(fx['MGII'].read()) with fitsio.FITS(qso_qn_file) as fx: qso_qn = Table(fx['QN_RR'].read()) assert np.all(redshifts['TARGETID'] == fibermap['TARGETID']) assert np.all(redshifts['TARGETID'] == tsnr2['TARGETID']) assert np.all(redshifts['TARGETID'] == emline['TARGETID']) assert np.all(redshifts['TARGETID'] == qso_mgii['TARGETID']) assert np.all(redshifts['TARGETID'] == qso_qn['TARGETID']) fmcols = list(fibermap.dtype.names) fmcols.remove('TARGETID') emline_cols = ['OII_FLUX', 'OII_FLUX_IVAR'] qso_mgii_cols = ['IS_QSO_MGII'] qso_qn_cols = ['IS_QSO_QN_NEW_RR', 'C_LYA', 'C_CIV', 'C_CIII', 'C_MgII', 'C_Hbeta', 'C_Halpha', 'Z_NEW', 'ZERR_NEW', 'ZWARN_NEW', 'SPECTYPE_NEW', 'SUBTYPE_NEW', 'CHI2_NEW', 'DELTACHI2_NEW', 'COEFF_NEW'] tsnr2cols = list(tsnr2.dtype.names) tsnr2cols.remove('TARGETID') data = hstack([ redshifts, fibermap[fmcols], tsnr2[tsnr2cols], emline[emline_cols], qso_mgii[qso_mgii_cols], qso_qn[qso_qn_cols] ], join_type='exact') # # These old columns show up in zbest files. They have been replaced with # COADD_NUMEXP, COADD_NUMTILE, which are obtained from coadd_fibermapp() # for zbest files. # for drop_col in ('NUMEXP', 'NUMTILE', 'NUMTARGET', 'HPXPIXEL', 'BLOBDIST', 'FIBERFLUX_IVAR_G', 'FIBERFLUX_IVAR_R', 'FIBERFLUX_IVAR_Z'): if drop_col in data.colnames: log.info("Removing column '%s' from %s ('ZCATALOG').", drop_col, os.path.basename(rrfile)) data.remove_column(drop_col) if data['RELEASE'].dtype == np.dtype('>i4'): log.info("Casting column 'RELEASE' in %s ('ZCATALOG') to 'int16'.", os.path.basename(rrfile)) data.replace_column('RELEASE', data['RELEASE'].astype(np.int16)) # # Older files, including, but not limited to zbest files, may not have DESINAME and other columns. # for add_col in ('DESINAME', 'MEAN_PSF_TO_FIBER_SPECFLUX', 'PLATE_RA', 'PLATE_DEC', 'FITMETHOD'): if add_col not in data.colnames: log.info("Adding missing column '%s' to %s ('ZCATALOG').", add_col, os.path.basename(rrfile)) if add_col == 'DESINAME': i = data.colnames.index('TARGET_DEC') if (np.isfinite(data['TARGET_RA']) & np.isfinite(data['TARGET_DEC'])).all(): desiname = radec_to_desiname(data['TARGET_RA'], data['TARGET_DEC']) else: log.warning("NaN detected in TARGET_RA, TARGET_DEC for %s ('ZCATALOG'), dummy values will be used!", os.path.basename(rrfile)) desiname = np.array(['-'*22]*len(data)) good_radec = np.where(np.isfinite(data['TARGET_RA']) & np.isfinite(data['TARGET_DEC']))[0] desiname[good_radec] = radec_to_desiname(data['TARGET_RA'][good_radec], data['TARGET_DEC'][good_radec]) data.add_column(desiname, index=i, name=add_col) if add_col == 'MEAN_PSF_TO_FIBER_SPECFLUX': log.warning("Adding missing column '%s' to %s ('ZCATALOG') with dummy values!", add_col, os.path.basename(rrfile)) i = data.colnames.index('MEAN_MJD') data.add_column(np.zeros((len(data), ), dtype=np.float32), index=i, name=add_col) if add_col == 'PLATE_RA': try: i = data.colnames.index('SCND_TARGET') except ValueError: i = data.colnames.index('MWS_TARGET') data.add_column(data['TARGET_RA'], index=i, name=add_col) if add_col == 'PLATE_DEC': i = data.colnames.index('PLATE_RA') data.add_column(data['TARGET_DEC'], index=i, name=add_col) if add_col == 'FITMETHOD': i = data.colnames.index('DESINAME') data.add_column(np.array(['PCA'] * len(data)).astype(np.dtype('S4')), index=i, name=add_col) for add_col in ('PSF_TO_FIBER_SPECFLUX', 'PLATE_RA', 'PLATE_DEC'): if add_col not in expfibermap.colnames: log.info("Adding missing column '%s' to %s ('EXP_FIBERMAP').", add_col, os.path.basename(rrfile)) if add_col == 'PSF_TO_FIBER_SPECFLUX': log.warning("Adding missing column '%s' to %s ('EXP_FIBERMAP') with dummy values!", add_col, os.path.basename(rrfile)) i = expfibermap.colnames.index('FIBER_DEC') expfibermap.add_column(np.zeros((len(expfibermap), ), dtype=np.float64), index=i, name=add_col) if add_col == 'PLATE_RA': i = expfibermap.colnames.index('LAMBDA_REF') expfibermap.add_column(expfibermap['FIBER_RA'], index=i, name=add_col) if add_col == 'PLATE_DEC': i = expfibermap.colnames.index('PLATE_RA') expfibermap.add_column(expfibermap['FIBER_DEC'], index=i, name=add_col) #- Add group specific columns, recognizing some some of them may #- have already been inherited from the fibermap. #- Put these columns right after TARGETID nrows = len(data) icol = 1 if group in ('perexp', 'pernight', 'cumulative'): if 'TILEID' not in data.colnames: data.add_column(np.full(nrows, hdr['TILEID'], dtype=np.int32), index=icol, name='TILEID') icol += 1 if 'PETAL_LOC' not in data.colnames: data.add_column(np.full(nrows, hdr['PETAL'], dtype=np.int16), index=icol, name='PETAL_LOC') icol += 1 if group == 'perexp': data.add_column(np.full(nrows, hdr['NIGHT'], dtype=np.int32), index=icol, name='NIGHT') icol += 1 data.add_column(np.full(nrows, hdr['EXPID'], dtype=np.int32), index=icol, name='EXPID') elif group == 'pernight': data.add_column(np.full(nrows, hdr['NIGHT'], dtype=np.int32), index=icol, name='NIGHT') elif group == 'cumulative': if 'LASTNIGHT' not in data.colnames: try: lastnight = int(hdr['NIGHT']) except KeyError: # Some daily reductions do not have this set, use the filename. log.warning(f'NIGHT keyword missing from {rrfile}!') lastnight = int(rrfile.split('-')[-1].split('.')[0].replace('thru', '')) data.add_column(np.full(nrows, lastnight, dtype=np.int32), index=icol, name='LASTNIGHT') elif group == 'healpix': data.add_column(np.full(nrows, hdr['HPXPIXEL'], dtype=np.int32), index=icol, name='HEALPIX') elif group == 'uniqpix': uniqpix = desiutil.healpix.hpix2upix(hdr['HPXNSIDE'], hdr['HPXPIXEL']) data.add_column(np.full(nrows, uniqpix, dtype=np.int32), index=icol, name='UNIQPIX') icol += 1 data.add_column(np.full(nrows, hdr['HPXPIXEL'], dtype=np.int32), index=icol, name='HEALPIX') icol += 1 data.add_column(np.full(nrows, hdr['HPXNSIDE'], dtype=np.int32), index=icol, name='NSIDE') icol += 1 # SPGRPVAL = night for pernight, expid for perexp, subset for custom coadds if 'SPGRPVAL' in hdr.keys(): val = hdr['SPGRPVAL'] # if int, try to make int32, otherwise let numpy pick dtype if isinstance(val, int): if np.int32(val) == val: dtype = np.int32 else: dtype = np.int64 else: dtype = None else: log.warning(f'SPGRPVAL keyword missing from {rrfile}') if group == 'cumulative': val = lastnight dtype = np.int32 else: # This is temporary. The whole section above could do with some refactoring. raise NotImplementedError(f'No method to reconstruct SPGRPVAL!') data.add_column(np.full(nrows, val, dtype=dtype), index=icol, name='SPGRPVAL') return data, expfibermap
#-------------------------------------------------------------------------- def parse(options=None): parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter) parser.add_argument("-i", "--indir", type=str, default=None, help="input directory") parser.add_argument("-o", "--outfile",type=str, default=None, help="output file") parser.add_argument("--minimal", action='store_true', help="only include minimal output columns") parser.add_argument("-t", "--tiles", type=str, help="FITS or CSV file with TILEIDs to include") parser.add_argument("--survey", type=str, required=True, help="DESI survey, e.g. sv1, sv3, main") parser.add_argument("--program", type=str, required=True, help="DESI program, e.g bright, dark") parser.add_argument("-g", "--group", type=str, help="Add columns specific to this spectral grouping " "e.g. pernight adds NIGHT column from input header keyword") parser.add_argument("--header", type=str, nargs="*", help="KEYWORD=VALUE entries to add to the output header") parser.add_argument('--patch-missing-ivar-w12', action='store_true', help="Use target files to patch missing FLUX_IVAR_W1/W2 values") parser.add_argument('--recoadd-fibermap', action='store_true', help="Re-coadd FIBERMAP from spectra files") parser.add_argument('--do-not-add-units', action='store_true', help="Don't add units to output catalog from desidatamodel " "column descriptions") parser.add_argument('--nproc', type=int, default=1, help="Number of multiprocessing processes to use") parser.add_argument('-v', '--verbose', action='store_true', help="Set log level to DEBUG.") parser.add_argument('--force', action='store_true', help="Rerun and overwrite even if all outputs already exist.") args = parser.parse_args(options) return args def main(args=None): if not isinstance(args, argparse.Namespace): args = parse(options=args) if args.verbose: log=get_logger(DEBUG) else: log=get_logger() log.info(f'desi_zcatalog starting at {time.asctime()}') if args.outfile is None: args.outfile = io.findfile('zcatalog') #- Check if this has already been done #- NOTE: this repeats filepath logic in final writing if not args.force: done = True for ext in ['.fits', '-imaging.fits', '-extra.fits']: if not os.path.exists(args.outfile+ext): done = False break # exp_fibermap in a separate directory outdir, basename = os.path.split(args.outfile) expfibermap_file = os.path.join(outdir, 'exp_fibermap', basename+'-expfibermap.fits') if not os.path.exists(expfibermap_file): done = False if done: log.info('All outfiles already exist; use --force to rerun if needed') return #- If adding units, check dependencies before doing a lot of work add_units = not args.do_not_add_units if add_units: try: import desidatamodel except ImportError: log.critical('Unable to import desidatamodel, required to add units.' + ' Try "module load desidatamodel" first or use ' + '--do-not-add-units') return 1 survey = args.survey program = args.program if program not in ['backup', 'bright', 'dark', 'other']: raise ValueError('Invalid program \"{}\"; it must be one of the following: backup, bright, dark, other'.format(program)) if args.indir is not None: indir = args.indir redrockfiles = sorted(io.iterfiles(f'{indir}', prefix='redrock', suffix='.fits')) pertile = args.group not in ('healpix', 'uniqpix') elif args.group == 'healpix': pertile = False indir = os.path.join(io.specprod_root(), 'healpix') # special case for NERSC; use read-only mount regardless of $DESI_SPECTRO_REDUX indir = get_readonly_filepath(indir) # specprod/healpix/SURVEY/PROGRAM/HPIXGROUP/HPIX/redrock*.fits globstr = os.path.join(indir, survey, program, '*', '*', 'redrock*.fits') log.info(f'Looking for healpix redrock files in {globstr}') redrockfiles = sorted(glob.glob(globstr)) elif args.group == 'uniqpix': pertile = False indir = os.path.join(io.specprod_root(), 'spectra') # special case for NERSC; use read-only mount regardless of $DESI_SPECTRO_REDUX indir = get_readonly_filepath(indir) # specprod/spectra/SURVEY/PROGRAM/UPIXGROUP/UPIX/redrock*.fits globstr = os.path.join(indir, survey, program, '*', '*', 'redrock*.fits') log.info(f'Looking for uniqpix redrock files in {globstr}') redrockfiles = sorted(glob.glob(globstr)) else: pertile = True tilefile = args.tiles if args.tiles is not None else io.findfile('tiles') tilefile_format = 'fits' indir = os.path.join(io.specprod_root(), 'tiles', args.group) # special case for NERSC; use read-only mount regardless of $DESI_SPECTRO_REDUX indir = get_readonly_filepath(indir) if not os.path.exists(tilefile): log.warning(f'Tiles file {tilefile} does not exist. Trying CSV instead.') tilefile = io.findfile('tiles_csv') tilefile_format = 'ascii.csv' if not os.path.exists(tilefile): log.critical(f"Could not find a valid tiles file!") return 1 log.info(f'Loading tiles from {tilefile}') tiles = Table.read(tilefile, format=tilefile_format) if args.survey is not None: keep = tiles['SURVEY'] == args.survey tiles = tiles[keep] if len(tiles) == 0: log.critical(f'No tiles kept after filtering by SURVEY={args.survey}') return 1 # use startswith so that bright1b/dark1b are included in bright/dark # need to convert bytes (from Table.read) to string keep = np.char.startswith(np.array(tiles['PROGRAM'], dtype=str), program) tiles = tiles[keep] if len(tiles) == 0: log.critical(f'No tiles kept after filtering by PROGRAM={program}') return 1 tileids = tiles['TILEID'] log.info(f'Searching disk for redrock*.fits files from {len(tileids)} tiles') redrockfiles = list() t0 = t1 = time.time() for i, tileid in enumerate(tileids): if i%500 == 0: tnow = time.time() dt0 = tnow - t0 dt1 = tnow - t1 t1 = tnow log.info(f'{i}/{len(tileids)} +{dt1:.1f} sec {dt0:.1f} total elapsed') ### globbing is faster than iterfiles if the directory pattern level is known # tmp = sorted(io.iterfiles(f'{indir}/{tileid}', prefix='redrock', suffix='.fits')) tmp = sorted(glob.glob(f'{indir}/{tileid}/*/redrock-*.fits')) if len(tmp) > 0: redrockfiles.append(tmp) else: log.warning(f'No redrock files found in {indir}/{tileid}. Checking for zbest files.') tmp = sorted(glob.glob(f'{indir}/{tileid}/*/zbest-*.fits')) if len(tmp) > 0: redrockfiles.append(tmp) else: log.error(f'No redrock OR zbest files found in {indir}/{tileid}!') #- convert list of lists -> list redrockfiles = list(itertools.chain.from_iterable(redrockfiles)) nfiles = len(redrockfiles) if nfiles == 0: msg = f'No redrock files found in {indir}' log.critical(msg) raise ValueError(msg) log.info(f'Reading {nfiles} redrock files') #- build list of args to support multiprocessing parallelism read_args = list() for ifile, rrfile in enumerate(redrockfiles): read_args.append(dict(rrfile=rrfile, group=args.group, pertile=pertile, counter=(ifile+1, nfiles))) #- Read individual Redrock files t0 = time.time() if args.nproc>1: from multiprocessing import Pool with Pool(args.nproc) as pool: results = pool.map(_wrap_read_redrock, read_args, chunksize=1) else: results = [_wrap_read_redrock(a) for a in read_args] dt = time.time() - t0 log.info(f"Successfully read {nfiles} redrock files in {dt:.1f} sec") #- Stack catalogs zcatdata = list() exp_fibermaps = list() dependencies = dict() for data, expfibermap in results: if data is not None: desiutil.depend.mergedep(data.meta, dependencies) desiutil.depend.remove_dependencies(data.meta) zcatdata.append(data) if expfibermap is not None: exp_fibermaps.append(expfibermap) log.info('Stacking zcat') zcat = vstack(zcatdata, join_type='exact') desiutil.depend.mergedep(dependencies, zcat.meta) if exp_fibermaps: log.info('Stacking exposure fibermaps') assert all([isinstance(e, Table) for e in exp_fibermaps]) try: expfm = vstack(exp_fibermaps) except Exception as e: log.error(f'Unexpected exception when stacking exposure fibermaps!') log.error(type(e)) log.error(e.args[0]) expfm = None else: expfm = None #- Add FIRSTNIGHT for tile-based cumulative catalogs #- (LASTNIGHT was added while reading from NIGHT header keyword) if args.group == 'cumulative' and 'FIRSTNIGHT' not in zcat.colnames: log.info('Adding FIRSTNIGHT per tile') icol = zcat.colnames.index('LASTNIGHT') zcat.add_column(np.zeros(len(zcat), dtype=np.int32), index=icol, name='FIRSTNIGHT') for tilefm in Table(expfm[['TILEID', 'NIGHT']]).group_by('TILEID').groups: tileid = tilefm['TILEID'][0] iitile = zcat['TILEID'] == tileid zcat['FIRSTNIGHT'][iitile] = np.min(tilefm['NIGHT']) #- all FIRSTNIGHT entries should be filled (no more zeros) bad = zcat['FIRSTNIGHT'] == 0 if np.any(bad): badtiles = np.unique(zcat['TILEID'][bad]) raise ValueError(f'FIRSTNIGHT not set for tiles {badtiles}') # Add EFFTIME_SPEC tsnr2_col = program_to_tsnr2_colname(program) zcat['EFFTIME_SPEC'] = tsnr2_to_efftime(zcat[tsnr2_col], tsnr2_col[6:]) log.info('Finding best spectrum for each target') nspec, primary = find_primary_spectra(zcat, sort_column='EFFTIME_SPEC') zcat['ZCAT_NSPEC'] = nspec.astype(np.int16) zcat['ZCAT_PRIMARY'] = primary #- Used for fuji, should not be needed for later prods if args.patch_missing_ivar_w12: from desimodel.footprint import radec2pix missing = (zcat['FLUX_IVAR_W1'] < 0) | (zcat['FLUX_IVAR_W2'] < 0) missing &= zcat['OBJTYPE'] == 'TGT' missing &= zcat['TARGETID'] > 0 if not np.any(missing): log.info('No targets missing FLUX_IVAR_W1/W2 to patch') else: #- Load targets from sv1 targeting files ra = zcat['TARGET_RA'] dec = zcat['TARGET_DEC'] nside = 8 #- use for sv1 targeting hpix8 = radec2pix(nside, ra, dec) for hpix in np.unique(hpix8[missing]): hpixmiss = (hpix == hpix8) & missing targets = load_sv1_ivar_w12(hpix, zcat['TARGETID'][hpixmiss]) #- create dict[TARGETID] -> row number targetid2idx = dict(zip(targets['TARGETID'], np.arange(len(targets)))) #- patch missing values, if they are in the targets file for i in np.where(hpixmiss)[0]: tid = zcat['TARGETID'][i] try: j = targetid2idx[ tid ] zcat['FLUX_IVAR_W1'][i] = targets['FLUX_IVAR_W1'][j] zcat['FLUX_IVAR_W2'][i] = targets['FLUX_IVAR_W2'][j] except KeyError: log.warning(f'TARGETID {tid} (row {i}) not found in sv1 targets') ###### # Add GOOD_Z_{BGS,LRG,ELG,QSO} redshift quality flags # - passes LSS quality cuts from validredshifts.actually_validate # - science target with good hardware # - core DESI tracer target selection (and not e.g. secondary QSOs) # - SURVEY is main/sv1/sv2/sv3 (not special) # LSS cuts zqual = validredshifts.actually_validate(zcat) # GOOD_SPEC: true if it is a science spectrum with good hardware status good_spec = validredshifts.get_good_fiberstatus(zcat) good_spec &= zcat['OBJTYPE']=='TGT' # not included in LSS BGS,LRG,ELG cuts zqual['GOOD_SPEC'] = good_spec.copy() # GOOD_SPEC: true if it is a science spectrum with good hardware status for col in ['GOOD_Z_BGS', 'GOOD_Z_LRG', 'GOOD_Z_ELG', 'GOOD_Z_QSO']: zqual[col] &= zqual['GOOD_SPEC'] # require good hardware quality for GOOD_Z_TRACER # Require primary tracer targeting if survey in ['main', 'sv1', 'sv2', 'sv3']: if survey=='main': desi_target_col = 'DESI_TARGET' else: desi_target_col = survey.upper()+'_DESI_TARGET' # The BGS_ANY, LRG, ELG and QSO target bits are the same in SV1 to main is_bgs = (zcat[desi_target_col] & desi_mask.BGS_ANY) != 0 is_lrg = (zcat[desi_target_col] & desi_mask.LRG) != 0 is_elg = (zcat[desi_target_col] & desi_mask.ELG) != 0 is_qso = (zcat[desi_target_col] & desi_mask.QSO) != 0 # GOOD_Z_TRACER: # True if it is a TRACER target and passes TRACER redshift quality cut # False if it is not a Tracer target or if it is a TRACER target but fails TRACER redshift quality cut; # They apply to the Z column zqual['GOOD_Z_BGS'] &= is_bgs zqual['GOOD_Z_LRG'] &= is_lrg zqual['GOOD_Z_ELG'] &= is_elg # GOOD_Z_QSO: like GOOD_Z_{BGS,LRG,ELG}, but applies to Z_QSO column, not Z column # True if it is a QSO target AND passes the QSO redshift quality cut zqual['GOOD_Z_QSO'] &= is_qso # Note that the GOOD_Z_{BGS,LRG,ELG,QSO} definitions are more restrictive than in desispec.validredshifts # as the per-target class and GOOD_SPEC requirements are added here else: for col in ['GOOD_Z_BGS', 'GOOD_Z_LRG', 'GOOD_Z_ELG', 'GOOD_Z_QSO']: zqual[col] = False ###### # evaluate Z_CONF; proceed from low-confidence to high-confidence # default Z_CONF=0 is no confidence zqual['Z_CONF'] = np.uint8(0) # Note: unsigned int because FITS converts signed int8 to bool (!) # Z_CONF=1: less confident redshift but maybe ok # criteria: the Z_CONF==3 or 2 criteria are not met, but GOOD_SPEC==True & ZWARN==0 mask = zqual['GOOD_SPEC'] & (zcat['ZWARN']==0) zqual['Z_CONF'][mask] = 1 # Z_CONF=2: placeholder for non-LSS WG supplied quality criteria # Z_CONF=3: highly confident redshift # criteria: the object must belong to one of the DESI primary extragalactic target classes (BGS, LRG, ELG, QSO) # and pass the LSS redshift quality cuts mask = zqual['GOOD_Z_BGS'] | zqual['GOOD_Z_LRG'] | zqual['GOOD_Z_ELG'] | zqual['GOOD_Z_QSO'] zqual['Z_CONF'][mask] = 3 zcat = hstack([zcat, zqual], join_type='exact') # Create "best redshift" columns, choosing between Z and Z_QSO z_cols = ['Z', 'ZERR', 'ZWARN', 'SPECTYPE', 'SUBTYPE', 'CHI2', 'DELTACHI2', 'COEFF'] for col in z_cols: zcat[col+'_BEST'] = zcat[col].copy() # Use Z_QSO if GOOD_Z_QSO==True and Z_QSO differs by more than 1000 km/s from Z c = astropy.constants.c.to('km/s').value dv = c*(zcat['Z']-zcat['Z_QSO'])/(1+zcat['Z_QSO']) mask = zcat['GOOD_Z_QSO'] & (np.abs(dv) > 1000) zcat['Z_BEST'][mask] = zcat['Z_QSO'][mask].copy() for col in z_cols: if col!='Z': zcat[col+'_BEST'][mask] = zcat[col+'_NEW'][mask].copy() # Downgrade some of the columns to lower precision if 'LOCATION' in zcat.colnames: zcat['LOCATION'] = zcat['LOCATION'].astype('int16') if 'DEVICE_LOC' in zcat.colnames: zcat['DEVICE_LOC'] = zcat['DEVICE_LOC'].astype('int16') if 'OBSCONDITIONS' in zcat.colnames: if np.max(zcat['OBSCONDITIONS']) <= 32767 and np.min(zcat['OBSCONDITIONS']) >= -32768: zcat['OBSCONDITIONS'] = zcat['OBSCONDITIONS'].astype('int16') columns_basic = ['TARGETID', 'TILEID', 'HEALPIX', 'LASTNIGHT', 'Z_BEST', 'Z_CONF', 'ZERR_BEST', 'ZWARN_BEST', 'SPECTYPE_BEST', 'SUBTYPE_BEST', 'CHI2_BEST', 'DELTACHI2_BEST', 'PETAL_LOC', 'FIBER', 'COADD_FIBERSTATUS', 'TARGET_RA', 'TARGET_DEC', 'DESINAME', 'OBJTYPE', 'FIBERASSIGN_X', 'FIBERASSIGN_Y', 'PRIORITY', 'DESI_TARGET', 'BGS_TARGET', 'MWS_TARGET', 'SCND_TARGET', 'CMX_TARGET', 'SV1_DESI_TARGET', 'SV1_BGS_TARGET', 'SV1_MWS_TARGET', 'SV1_SCND_TARGET', 'SV2_DESI_TARGET', 'SV2_BGS_TARGET', 'SV2_MWS_TARGET', 'SV2_SCND_TARGET', 'SV3_DESI_TARGET', 'SV3_BGS_TARGET', 'SV3_MWS_TARGET', 'SV3_SCND_TARGET', 'COADD_NUMEXP', 'COADD_EXPTIME', 'COADD_NUMNIGHT', 'COADD_NUMTILE', 'MIN_MJD', 'MAX_MJD', 'MEAN_MJD', 'GOOD_SPEC', 'EFFTIME_SPEC', 'ZCAT_NSPEC', 'ZCAT_PRIMARY'] if args.group == 'uniqpix': columns_basic = ['UNIQPIX' if col == 'HEALPIX' else col for col in columns_basic] columns_imaging = ['PMRA', 'PMDEC', 'REF_EPOCH', 'RELEASE', 'BRICKNAME', 'BRICKID', 'BRICK_OBJID', 'MORPHTYPE', 'EBV', 'FLUX_G', 'FLUX_R', 'FLUX_Z', 'FLUX_W1', 'FLUX_W2', 'FLUX_IVAR_G', 'FLUX_IVAR_R', 'FLUX_IVAR_Z', 'FLUX_IVAR_W1', 'FLUX_IVAR_W2', 'FIBERFLUX_G', 'FIBERFLUX_R', 'FIBERFLUX_Z', 'FIBERTOTFLUX_G', 'FIBERTOTFLUX_R', 'FIBERTOTFLUX_Z', 'MASKBITS', 'SERSIC', 'SHAPE_R', 'SHAPE_E1', 'SHAPE_E2', 'REF_ID', 'REF_CAT', 'GAIA_PHOT_G_MEAN_MAG', 'GAIA_PHOT_BP_MEAN_MAG', 'GAIA_PHOT_RP_MEAN_MAG', 'PARALLAX', 'PHOTSYS'] assert len(np.intersect1d(columns_basic, columns_imaging))==0 # Remove main-survey target bits for non-main surveys (they are not the actual main-survey target bits) if survey!='main': for col in ['DESI_TARGET', 'BGS_TARGET', 'MWS_TARGET', 'SCND_TARGET']: if col in zcat.colnames: zcat.remove_column(col) # Remove the columns that do not exist columns_basic = [col for col in columns_basic if col in zcat.colnames] columns_imaging = ['TARGETID', 'TILEID'] + columns_imaging columns_imaging = [col for col in columns_imaging if col in zcat.colnames] # remove columns that do not exist columns_extra = ['TARGETID', 'TILEID'] + [col for col in zcat.colnames if (col not in columns_basic and col not in columns_imaging)] columns_extra = [col for col in columns_extra if col in zcat.colnames] # remove columns that do not exist #- we're done adding columns, split and convert to numpy array for fitsio zcat_basic = np.array(zcat[columns_basic]) zcat_imaging = np.array(zcat[columns_imaging]) zcat_extra = zcat[columns_extra].copy() #- we don't need the original anymore; del to save memory del zcat #- Inherit header from first input, but remove keywords that don't apply #- across multiple files header = fitsio.read_header(redrockfiles[0], 0) for key in ['SPGRPVAL', 'TILEID', 'SPECTRO', 'PETAL', 'NIGHT', 'EXPID', 'HPXPIXEL', 'UNIQPIX', 'HPXNSIDE', 'NAXIS', 'BITPIX', 'SIMPLE', 'EXTEND']: if key in header: header.delete(key) #- Intercept previous incorrect boolean special cases if 'HPXNEST' in header: if header['HPXNEST'] == 'True': log.info("Correcting header HPXNEST='True' string to boolean True") header['HPXNEST'] = True elif header['HPXNEST'] == 'False': # False is not expected for DESI, but cover it for completeness log.info("Correcting header HPXNEST='False' string to boolean False") header['HPXNEST'] = False #- Add extra keywords if requested if args.header is not None: for keyval in args.header: key, value = parse_keyval(keyval) header[key] = value header['SURVEY'] = survey header['PROGRAM'] = program #- Add units if requested if add_units: datamodeldir = str(importlib.resources.files('desidatamodel')) unitsfile = os.path.join(datamodeldir, 'data', 'column_descriptions.csv') log.info(f'Adding units from {unitsfile}') units, comments = load_csv_units(unitsfile) else: units = dict() comments = dict() if not os.path.isdir(os.path.dirname(args.outfile)): os.makedirs(os.path.dirname(args.outfile)) outfile_basic = args.outfile+'.fits' log.info(f'Writing {outfile_basic}') tmpfile = get_tempfilename(outfile_basic) write_bintable(tmpfile, zcat_basic, header=header, extname='ZCATALOG', units=units, clobber=True) os.rename(tmpfile, outfile_basic) log.info("Successfully wrote {}".format(outfile_basic)) outfile_imaging = args.outfile+'-imaging.fits' log.info(f'Writing {outfile_imaging}') tmpfile = get_tempfilename(outfile_imaging) write_bintable(tmpfile, zcat_imaging, header=header, extname='ZCATALOG_IMAGING', units=units, clobber=True) os.rename(tmpfile, outfile_imaging) log.info("Successfully wrote {}".format(outfile_imaging)) outfile_extra = args.outfile+'-extra.fits' log.info(f'Writing {outfile_extra}') tmpfile = get_tempfilename(outfile_extra) write_bintable(tmpfile, zcat_extra, header=header, extname='ZCATALOG_EXTRA', units=units, clobber=True) os.rename(tmpfile, outfile_extra) log.info("Successfully wrote {}".format(outfile_extra)) outfile_expfm = os.path.join(os.path.dirname(args.outfile), 'exp_fibermap', os.path.basename(args.outfile)+'-expfibermap.fits') if not os.path.isdir(os.path.dirname(outfile_expfm)): os.makedirs(os.path.dirname(outfile_expfm)) log.info(f'Writing {outfile_expfm}') tmpfile = get_tempfilename(outfile_expfm) write_bintable(tmpfile, expfm, header=header, extname='EXP_FIBERMAP', units=units, clobber=True) os.rename(tmpfile, outfile_expfm) log.info("Successfully wrote {}".format(outfile_expfm)) log.info(f'desi_zcatalog all done at {time.asctime()}')