Source code for desispec.io.spectra

# Licensed under a 3-clause BSD style license - see LICENSE.rst
# -*- coding: utf-8 -*-
"""
desispec.io.spectra
===================

I/O routines for working with spectral grouping files.

"""
from __future__ import absolute_import, division, print_function

import os
import re
import warnings
import time
import glob
import multiprocessing

import numpy as np
import astropy.units as u
import astropy.io.fits as fits
import astropy.table
from astropy.table import Table
from astropy.table import vstack as stack_tables
import fitsio

from desiutil.depend import add_dependencies, hasdep, getdep
from desiutil.io import encode_table
from desiutil.log import get_logger

from .util import fitsheader, native_endian, add_columns, checkgzip
from .util import get_tempfilename, addkeys
from . import iotime

from .frame import read_frame
from .fibermap import annotate_fibermap

from ..spectra import Spectra
from ..spectra import stack as stack_spectra
from .meta import specprod_root, findfile
from ..util import argmatch


[docs]def write_spectra(outfile, spec, units=None): """ Write Spectra object to FITS file. This places the metadata into the header of the (empty) primary HDU. The first extension contains the fibermap, and then HDUs are created for the different data arrays for each band. Floating point data is converted to 32 bits before writing. Args: outfile (str): path to write spec (Spectra): the object containing the data units (str): optional string to use for the BUNIT key of the flux HDUs for each band. Returns: The absolute path to the file that was written. """ log = get_logger() outfile = os.path.abspath(outfile) # Create the parent directory, if necessary. dir, base = os.path.split(outfile) if not os.path.exists(dir): os.makedirs(dir) # Create HDUs from the data all_hdus = fits.HDUList() # metadata goes in empty primary HDU hdr = fitsheader(spec.meta) hdr['LONGSTRN'] = 'OGIP 1.0' add_dependencies(hdr) all_hdus.append(fits.PrimaryHDU(header=hdr)) # Next is the fibermap fmap = encode_table(spec.fibermap.copy()) fmap.meta['EXTNAME'] = 'FIBERMAP' fmap.meta['LONGSTRN'] = 'OGIP 1.0' add_dependencies(fmap.meta) with warnings.catch_warnings(): #- nanomaggies aren't an official IAU unit but don't complain warnings.filterwarnings('ignore', '.*nanomaggies.*') hdu = fits.convenience.table_to_hdu(fmap) # Add comments for fibermap columns. hdu = annotate_fibermap(hdu) all_hdus.append(hdu) # Optional: exposure-fibermap, used in coadds if spec.exp_fibermap is not None: expfmap = encode_table(spec.exp_fibermap.copy()) expfmap.meta["EXTNAME"] = "EXP_FIBERMAP" with warnings.catch_warnings(): #- nanomaggies aren't an official IAU unit but don't complain warnings.filterwarnings('ignore', '.*nanomaggies.*') hdu = fits.convenience.table_to_hdu(expfmap) # Add comments for exp_fibermap columns. hdu = annotate_fibermap(hdu) all_hdus.append(hdu) # Now append the data for all bands for band in spec.bands: hdu = fits.ImageHDU(name="{}_WAVELENGTH".format(band.upper())) hdu.header["BUNIT"] = "Angstrom" hdu.data = spec.wave[band].astype("f8") all_hdus.append(hdu) hdu = fits.ImageHDU(name="{}_FLUX".format(band.upper())) if units is None: hdu.header["BUNIT"] = "10**-17 erg/(s cm2 Angstrom)" else: hdu.header["BUNIT"] = units hdu.data = spec.flux[band].astype("f4") all_hdus.append(hdu) hdu = fits.ImageHDU(name="{}_IVAR".format(band.upper())) if units is None: hdu.header["BUNIT"] = '10**+34 (s2 cm4 Angstrom2) / erg2' else: hdu.header["BUNIT"] = ((u.Unit(units, format='fits'))**-2).to_string('fits') hdu.data = spec.ivar[band].astype("f4") all_hdus.append(hdu) if spec.mask is not None: # hdu = fits.CompImageHDU(name="{}_MASK".format(band.upper())) hdu = fits.ImageHDU(name="{}_MASK".format(band.upper())) hdu.data = spec.mask[band].astype(np.int32) all_hdus.append(hdu) if spec.resolution_data is not None: hdu = fits.ImageHDU(name="{}_RESOLUTION".format(band.upper())) hdu.data = spec.resolution_data[band].astype("f4") all_hdus.append(hdu) if spec.extra is not None: for ex in spec.extra[band].items(): hdu = fits.ImageHDU(name="{}_{}".format(band.upper(), ex[0])) hdu.data = ex[1].astype("f4") all_hdus.append(hdu) if spec.scores is not None : scores_tbl = encode_table(spec.scores) #- unicode -> bytes scores_tbl.meta['EXTNAME'] = 'SCORES' all_hdus.append( fits.convenience.table_to_hdu(scores_tbl) ) # add comments in header if hasattr(spec, 'scores_comments') and spec.scores_comments is not None: hdu=all_hdus['SCORES'] for i in range(1,999): key = 'TTYPE'+str(i) if key in hdu.header: value = hdu.header[key] if value in spec.scores_comments.keys() : hdu.header[key] = (value, spec.scores_comments[value]) if spec.extra_catalog is not None: extra_catalog = encode_table(spec.extra_catalog) extra_catalog.meta['EXTNAME'] = 'EXTRA_CATALOG' all_hdus.append(fits.convenience.table_to_hdu(extra_catalog)) t0 = time.time() tmpfile = get_tempfilename(outfile) all_hdus.writeto(tmpfile, overwrite=True, checksum=True) os.rename(tmpfile, outfile) duration = time.time() - t0 log.info(iotime.format('write', outfile, duration)) return outfile
[docs]def _read_image(hdus, extname, dtype, rows=None): """ Helper function to read extname from fitsio.FITS hdus, filter by rows, convert to native endian, and cast to dtype. Returns image. """ data = hdus[extname].read() if rows is not None: data = data[rows] return native_endian(data).astype(dtype)
[docs]def read_spectra( infile, single=False, targetids=None, rows=None, skip_hdus=None, select_columns={ "FIBERMAP": None, "EXP_FIBERMAP": None, "SCORES": None, "EXTRA_CATALOG": None, }, ): """ Read Spectra object from FITS file. This reads data written by the write_spectra function. A new Spectra object is instantiated and returned. Args: infile (str): path to read single (bool): if True, keep spectra as single precision in memory. targetids (list): Optional, list of targetids to read from file, if present. rows (list): Optional, list of rows to read from file skip_hdus (list): Optional, list/set/tuple of HDUs to skip select_columns (dict): Optional, dictionary to select column names to be read. Default, all columns are read. Returns (Spectra): The object containing the data read from disk. `skip_hdus` options are FIBERMAP, EXP_FIBERMAP, SCORES, EXTRA_CATALOG, MASK, RESOLUTION; where MASK and RESOLUTION mean to skip those for all cameras. Note that WAVE, FLUX, and IVAR are always required. If a table HDU is not listed in `select_columns`, all of its columns will be read User can optionally specify targetids OR rows, but not both """ log = get_logger() infile = checkgzip(infile) ftype = np.float64 if single: ftype = np.float32 infile = os.path.abspath(infile) if not os.path.isfile(infile): raise IOError("{} is not a file".format(infile)) t0 = time.time() hdus = fitsio.FITS(infile, mode="r") nhdu = len(hdus) if targetids is not None and rows is not None: raise ValueError('Set rows or targetids but not both') #- default skip_hdus empty set -> include everything, without #- having to check for None before checking if X is in skip_hdus if skip_hdus is None: skip_hdus = set() #- Map targets -> rows and exp_rows. #- Note: coadds can have uncoadded EXP_FIBERMAP HDU with more rows than #- the coadded FIBERMAP HDU, so track rows vs. exp_rows separately exp_rows = None if targetids is not None: targetids = np.atleast_1d(targetids) file_targetids = hdus["FIBERMAP"].read(columns="TARGETID") rows = np.where(np.isin(file_targetids, targetids))[0] if 'EXP_FIBERMAP' in hdus and 'EXP_FIBERMAP' not in skip_hdus: exp_targetids = hdus["EXP_FIBERMAP"].read(columns="TARGETID") exp_rows = np.where(np.isin(exp_targetids, targetids))[0] if len(rows) == 0: return Spectra() elif rows is not None: rows = np.asarray(rows) # figure out exp_rows file_targetids = hdus["FIBERMAP"].read(rows=rows, columns="TARGETID") if 'EXP_FIBERMAP' in hdus and 'EXP_FIBERMAP' not in skip_hdus: exp_targetids = hdus["EXP_FIBERMAP"].read(columns="TARGETID") exp_rows = np.where(np.isin(exp_targetids, file_targetids))[0] if select_columns is None: select_columns = dict() for extname in ("FIBERMAP", "EXP_FIBERMAP", "SCORES", "EXTRA_CATALOG"): if extname not in select_columns: select_columns[extname] = None # load the metadata. meta = dict(hdus[0].read_header()) # initialize data objects bands = [] fmap = None expfmap = None wave = None flux = None ivar = None mask = None res = None extra = None extra_catalog = None scores = None # For efficiency, go through the HDUs in disk-order. Use the # extension name to determine where to put the data. We don't # explicitly copy the data, since that will be done when constructing # the Spectra object. for h in range(1, nhdu): name = hdus[h].read_header()["EXTNAME"] log.debug('Reading %s', name) if name == "FIBERMAP": if name not in skip_hdus: fmap = encode_table( Table( hdus[h].read(rows=rows, columns=select_columns["FIBERMAP"]), copy=True, ).as_array() ) addkeys(fmap.meta, hdus[h].read_header()) elif name == "EXP_FIBERMAP": if name not in skip_hdus: expfmap = encode_table( Table( hdus[h].read(rows=exp_rows, columns=select_columns["EXP_FIBERMAP"]), copy=True, ).as_array() ) addkeys(expfmap.meta, hdus[h].read_header()) elif name == "SCORES": if name not in skip_hdus: scores = encode_table( Table( hdus[h].read(rows=rows, columns=select_columns["SCORES"]), copy=True, ).as_array() ) addkeys(scores.meta, hdus[h].read_header()) elif name == "EXTRA_CATALOG": if name not in skip_hdus: extra_catalog = encode_table( Table( hdus[h].read( rows=rows, columns=select_columns["EXTRA_CATALOG"] ), copy=True, ).as_array() ) addkeys(extra_catalog.meta, hdus[h].read_header()) else: # Find the band based on the name mat = re.match(r"(.*)_(.*)", name) if mat is None: raise RuntimeError( "FITS extension name {} does not contain the band".format(name) ) band = mat.group(1).lower() type = mat.group(2) if band not in bands: bands.append(band) if type == "WAVELENGTH": if wave is None: wave = {} # - Note: keep original float64 resolution for wavelength wave[band] = native_endian(hdus[h].read()) elif type == "FLUX": if flux is None: flux = {} flux[band] = _read_image(hdus, h, ftype, rows=rows) elif type == "IVAR": if ivar is None: ivar = {} ivar[band] = _read_image(hdus, h, ftype, rows=rows) elif type == "MASK" and type not in skip_hdus: if mask is None: mask = {} mask[band] = _read_image(hdus, h, np.uint32, rows=rows) elif type == "RESOLUTION" and type not in skip_hdus: if res is None: res = {} res[band] = _read_image(hdus, h, ftype, rows=rows) elif type != "MASK" and type != "RESOLUTION" and type not in skip_hdus: # this must be an "extra" HDU log.debug('Reading extra HDU %s', name) if extra is None: extra = {} if band not in extra: extra[band] = {} extra[band][type] = _read_image(hdus, h, ftype, rows=rows) hdus.close() duration = time.time() - t0 log.info(iotime.format("read", infile, duration)) # Construct the Spectra object from the data. If there are any # inconsistencies in the sizes of the arrays read from the file, # they will be caught by the constructor. spec = Spectra( bands, wave, flux, ivar, mask=mask, resolution_data=res, fibermap=fmap, exp_fibermap=expfmap, meta=meta, extra=extra, extra_catalog=extra_catalog, single=single, scores=scores, ) # if needed, sort spectra to match order of targetids, which could be # different than the order they appear in the file if targetids is not None: from desispec.util import ordered_unique #- Input targetids that we found in the file, in the order they appear in targetids ii = np.isin(targetids, spec.fibermap['TARGETID']) found_targetids = ordered_unique(targetids[ii]) log.debug('found_targetids=%s', found_targetids) #- Unique targetids of input file in the order they first appear input_targetids = ordered_unique(spec.fibermap['TARGETID']) log.debug('input_targetids=%s', np.asarray(input_targetids)) #- Only reorder if needed if not np.all(input_targetids == found_targetids): rows = np.concatenate([np.where(spec.fibermap['TARGETID'] == tid)[0] for tid in targetids]) log.debug("spec.fibermap['TARGETID'] = %s", np.asarray(spec.fibermap['TARGETID'])) log.debug("rows for subselection=%s", rows) spec = spec[rows] return spec
[docs]def read_frame_as_spectra(filename, night=None, expid=None, band=None, single=False): """ Read a FITS file containing a Frame and return a Spectra. A Frame file is very close to a Spectra object (by design), and only differs by missing the NIGHT and EXPID in the fibermap, as well as containing only one band of data. Args: infile (str): path to read Options: night (int): the night value to use for all rows of the fibermap. expid (int): the expid value to use for all rows of the fibermap. band (str): the name of this band. single (bool): if True, keep spectra as single precision in memory. Returns (Spectra): The object containing the data read from disk. """ filename = checkgzip(filename) fr = read_frame(filename) if fr.fibermap is None: raise RuntimeError("reading Frame files into Spectra only supported if a fibermap exists") nspec = len(fr.fibermap) if band is None: band = fr.meta['camera'][0] if night is None: night = fr.meta['night'] if expid is None: expid = fr.meta['expid'] fmap = np.asarray(fr.fibermap.copy()) fmap = add_columns(fmap, ['NIGHT', 'EXPID', 'TILEID'], [np.int32(night), np.int32(expid), np.int32(fr.meta['TILEID'])], ) fmap = encode_table(fmap) bands = [ band ] mask = None if fr.mask is not None: mask = {band : fr.mask} res = None if fr.resolution_data is not None: res = {band : fr.resolution_data} extra = None if fr.chi2pix is not None: extra = {band : {"CHI2PIX" : fr.chi2pix}} spec = Spectra(bands, {band : fr.wave}, {band : fr.flux}, {band : fr.ivar}, mask=mask, resolution_data=res, fibermap=fmap, meta=fr.meta, extra=extra, single=single, scores=fr.scores) return spec
[docs]def read_tile_spectra(tileid, night=None, specprod=None, reduxdir=None, coadd=False, single=False, targets=None, fibers=None, redrock=True, group='cumulative'): """ Read and return combined spectra for a tile/night Args: tileid (int) : Tile ID Options: night (int or str) : YEARMMDD night specprod (str) : overrides $SPECPROD reduxdir (str) : overrides $DESI_SPECTRO_REDUX/$SPECPROD coadd (bool) : if True, read coadds instead of per-exp spectra single (bool) : if True, use float32 instead of double precision targets (array-like) : filter by TARGETID fibers (array-like) : filter by FIBER redrock (bool) : if True, also return row-matched redrock redshift catalog group (str) : reads spectra in group (pernight, cumulative, ...) Returns: spectra or (spectra, redrock) combined Spectra obj for all matching targets/fibers filter row-matched redrock catalog (if redrock=True) Raises: ValueError if no files or matching spectra are found Note: the returned spectra are not necessarily in the same order as the `targets` or `fibers` input filters """ log = get_logger() if reduxdir is None: #- will automatically use $SPECPROD if specprod=None reduxdir = specprod_root(specprod) tiledir = os.path.join(reduxdir, 'tiles', group) if night is None: nightdirglob = os.path.join(tiledir, str(tileid), '*') tilenightdirs = sorted(glob.glob(nightdirglob)) night = os.path.basename(tilenightdirs[-1]) nightstr = str(night) if group == 'cumulative': nightstr = 'thru'+nightstr tiledir = os.path.join(tiledir, str(tileid), str(night)) if coadd: log.debug(f'Reading coadds from {tiledir}') prefix = 'coadd' else: log.debug(f'Reading spectra from {tiledir}') prefix = 'spectra' specglob = f'{tiledir}/{prefix}-?-{tileid}-{nightstr}.fits*' specfiles = glob.glob(specglob) if len(specfiles) == 0: raise ValueError(f'No spectra found in {specglob}') specfiles = sorted(specfiles) spectra = list() redshifts = list() for filename in specfiles: log.debug(f'reading {os.path.basename(filename)}') #- if filtering by fibers, check if we need to read this file if fibers is not None: # filenames are like prefix-PETAL-tileid-night.* thispetal = int(os.path.basename(filename).split('-')[1]) petals = np.asarray(fibers)//500 if not np.any(np.isin(thispetal, petals)): log.debug('Skipping petal %d, not needed by fibers %s', thispetal, fibers) continue sp = read_spectra(filename, single=single) if targets is not None: keep = np.in1d(sp.fibermap['TARGETID'], targets) sp = sp[keep] if fibers is not None: keep = np.in1d(sp.fibermap['FIBER'], fibers) sp = sp[keep] if sp.num_spectra() > 0: spectra.append(sp) if redrock: #- Read matching redrock file for this spectra/coadd file rrfile = os.path.basename(filename).replace(prefix, 'redrock', 1) rrfile = checkgzip(os.path.join(tiledir, rrfile)) log.debug(f'Reading {os.path.basename(rrfile)}') rr = Table.read(rrfile, 'REDSHIFTS') #- Trim rr to only have TARGETIDs in filtered spectra sp keep = np.in1d(rr['TARGETID'], sp.fibermap['TARGETID']) rr = rr[keep] #- match the Redrock entries to the spectra fibermap entries ii = argmatch(rr['TARGETID'], sp.fibermap['TARGETID']) rrx = rr[ii] #- Confirm that we got all that expanding and sorting correct assert np.all(sp.fibermap['TARGETID'] == rrx['TARGETID']) redshifts.append(rrx) if len(spectra) == 0: raise ValueError('No spectra found matching filters') spectra = stack_spectra(spectra) if redrock: redshifts = astropy.table.vstack(redshifts) assert np.all(spectra.fibermap['TARGETID'] == redshifts['TARGETID']) return (spectra, redshifts) else: return spectra
[docs]def determine_specgroup(colnames): """Determine specgroup 'healpix' or 'cumulative' based upon column names Args: colnames: list of str column names Returns (specgroup, keycols) where specgroup is 'cumulative' or 'healpix', and keycols is list of columns to use for unique primary key. """ colnames = set(colnames) # for faster lookup if ('TILEID' in colnames and 'LASTNIGHT' in colnames and 'PETAL_LOC' in colnames): return 'cumulative', ('TARGETID', 'TILEID', 'LASTNIGHT') elif 'HEALPIX' in colnames: return 'healpix', ('TARGETID', 'HEALPIX', 'SURVEY', 'PROGRAM') else: raise ValueError(f'Unable to determine healpix or tiles(cumulative) from columns {colnames}')
[docs]def split_targets_by_file(targets, n, specgroup='healpix'): """ Split targets table into n tables, keeping all TARGETIDs in a file together Args: targets (table): table of targets; see notes for columns n (int): number of groups to split into specgroup (str): 'healpix', 'tiles', or 'cumulative' Returns: list of n subtables, grouped by file Notes: if specgroup=='healpix', group by HEALPIX, SURVEY, and PROGRAM; if spectroup=='tiles' or 'cumulative', group by 'TILEID', 'LASTNIGHT', and 'PETAL_LOC'. Additional columns are allowed and propagated, but not used. """ #- What columns should we group by to split by file? specgroup, keycols = determine_specgroup(targets.dtype.names) if specgroup == 'healpix': filecolumns = ('HEALPIX', 'SURVEY', 'PROGRAM') elif specgroup in ('tiles', 'cumulative'): filecolumns = ('TILEID', 'LASTNIGHT', 'PETAL_LOC') #- Split into n groups, keeping TARGETID by file together #- If there are fewer than n files, pad with blank tables of right format target_filerows = targets.group_by(filecolumns).groups group_indices = np.array_split(np.arange(len(target_filerows)), n) target_tables = [target_filerows[ii] for ii in group_indices] return target_tables
[docs]def _readspec_healpix(targets, prefix, rdspec_kwargs, specprod=None): """ Read healpix-based spectra for targets table Args: targets (table): table of targets with TARGETID,HEALPIX,SURVEY,PROGRAM prefix (str): 'coadd' or 'spectra' rdspec_kwargs (dict): additional key/value args to pass to read_spectra Options: specprod (str): production name or full path to production Returns: Spectra for targets table If len(targets)==0, returns None rather than guessing Spectra.fibermap cols """ if len(targets) == 0: return None log = get_logger() spectra = list() for zz in targets.group_by(['HEALPIX', 'SURVEY', 'PROGRAM']).groups: hpix = zz['HEALPIX'][0] survey = zz['SURVEY'][0] program = zz['PROGRAM'][0] targetids = np.array(zz['TARGETID']) specfile = findfile(prefix, healpix=hpix, survey=survey, faprogram=program, readonly=True, specprod=specprod) log.debug('Reading spectra from %s', specfile) sp = read_spectra(specfile, targetids=targetids, **rdspec_kwargs) if sp.num_targets() == 0: log.warning(f'No matching targets found in {specfile}') continue if 'HEALPIX' not in sp.fibermap.colnames: sp.fibermap['HEALPIX'] = hpix #- String columns force dtype to match number of chars to input targets if 'SURVEY' not in sp.fibermap.colnames: sp.fibermap['SURVEY'] = survey sp.fibermap['SURVEY'] = sp.fibermap['SURVEY'].astype(targets['SURVEY'].dtype) if 'PROGRAM' not in sp.fibermap.colnames: sp.fibermap['PROGRAM'] = program sp.fibermap['PROGRAM'] = sp.fibermap['PROGRAM'].astype(targets['PROGRAM'].dtype) spectra.append(sp) if len(spectra) == 0: msg = 'No matching targets found in input files' log.critical(msg) raise RuntimeError(msg) spectra = stack_spectra(spectra) assert np.all(spectra.fibermap['TARGETID'] == targets['TARGETID']) assert np.all(spectra.fibermap['HEALPIX'] == targets['HEALPIX']) assert np.all(spectra.fibermap['SURVEY'] == targets['SURVEY']) assert np.all(spectra.fibermap['PROGRAM'] == targets['PROGRAM']) return spectra
[docs]def _readspec_tiles(targets, prefix, rdspec_kwargs, specprod=None): """ Read tile-based spectra for targets table Args: targets (table): target table with TARGETID,TILEID,LASTNIGHT,PETAL_LOC prefix (str): 'coadd' or 'spectra' rdspec_kwargs (dict): additional key/value args to pass to read_spectra Options: specprod (str): production name or full path to production Returns: Spectra for targets table If len(targets)==0, returns None rather than guessing Spectra.fibermap cols """ if len(targets) == 0: return None log = get_logger() spectra = list() for zz in targets.group_by(('TILEID', 'LASTNIGHT', 'PETAL_LOC')).groups: tileid = zz['TILEID'][0] night = zz['LASTNIGHT'][0] spectro = zz['PETAL_LOC'][0] targetids = np.array(zz['TARGETID']) specfile = findfile(prefix, night=night, tile=tileid, spectrograph=spectro, readonly=True, specprod=specprod) log.debug('Reading spectra from %s', specfile) sp = read_spectra(specfile, targetids=targetids, **rdspec_kwargs) if sp.num_targets() == 0: log.warning(f'No matching targets found in {specfile}') continue if 'LASTNIGHT' not in sp.fibermap.colnames: sp.fibermap['LASTNIGHT'] = night if 'TILEID' not in sp.fibermap.colnames: sp.fibermap['TILEID'] = tileid if 'PETAL_LOC' not in sp.fibermap.colnames: sp.fibermap['PETAL_LOC'] = spectro spectra.append(sp) if len(spectra) == 0: msg = 'No matching targets found in input files' log.critical(msg) raise RuntimeError(msg) spectra = stack_spectra(spectra) assert np.all(spectra.fibermap['TARGETID'] == targets['TARGETID']) assert np.all(spectra.fibermap['LASTNIGHT'] == targets['LASTNIGHT']) assert np.all(spectra.fibermap['PETAL_LOC'] == targets['PETAL_LOC']) assert np.all(spectra.fibermap['TILEID'] == targets['TILEID']) return spectra
[docs]def read_spectra_parallel(targets, nproc=None, prefix='coadd', rdspec_kwargs=dict(), specprod=None, match_order=True, comm=None): """ Read spectra for targets table in parallel Args: targets (table): targets table; see notes for required columns Options nproc (int): number of processes to use if comm is None prefix (str): 'coadd' or 'spectra' rdspec_kwargs (dict): additional key/value args to pass to read_spectra specprod (str): production name or full path to production match_order (bool): if True (default), spectra will match order of input targets comm: MPI communicator Returns: Spectra for targets in targets table If comm is not None, use MPI, elif nproc>1 use multiprocessing, else read serially but still read each file only once to get the necessary targets. If targets table has columns TARGETID,TILEID,LASTNIGHT,PETAL_LOC, then specta will be read from tiles/cumulative files. Otherwise, if targets has columns TARGETID,SURVEY,PROGRAM,HEALPIX (or HPXPIXEL), spectra will be read from healpix-based spectra. Additional columns are allowed and ignored. determine_specgroup(colnames) is used to determine tiles/cumulative vs. healpix and can be used to pre-check how a targets table will be interpreted. """ log = get_logger() #- recreate Table wrapper, but don't copy underlying data; #- allows adding/renaming columns if needed and supporting non-Table input targets = Table(targets, copy=False) #- support HPXPIXEL or HEALPIX in input targets table if 'HPXPIXEL' in targets.colnames: targets.rename_column('HPXPIXEL', 'HEALPIX') #- if not specified, get specprod from header or $SPECPROD if specprod is None: if 'SPECPROD' in targets.meta: specprod = targets.meta['SPECPROD'] elif hasdep(targets.meta, 'SPECPROD'): specprod = getdep(targets.meta, 'SPECPROD') else: specprod = os.environ['SPECPROD'] #- Determine which reader to use specgroup, keycols = determine_specgroup(targets.dtype.names) if specgroup == 'healpix': readspec = _readspec_healpix elif specgroup in ('tiles', 'cumulative'): readspec = _readspec_tiles #- promote columns from targets.meta if needed #- e.g. target.meta['SURVEY'] -> targets['SURVEY'] ok = True for col in keycols: if ((col not in targets.colnames) and (col in targets.meta)): targets[col] = targets.meta[col] if col not in targets.colnames: log.error(f'{specgroup} targets need {col} in either header or columns') ok = False if not ok: msg = 'Unable to find SURVEY and/or PROGRAM for healpix targets' log.critical(msg) raise ValueError(msg) if comm is not None: rank, size = comm.rank, comm.size nproc = size else: rank, size = 0, 1 if nproc is None: nproc = max(1, multiprocessing.cpu_count() // 2) #- split targets into list of nproc subtables, such that targets in #- a file are only in a single subtable (i.e. it will be ready only once) target_groups = split_targets_by_file(targets, nproc) #- strip out emptry target_groups, which happens if nfiles < nproc target_groups = [t for t in target_groups if len(t)>0] nproc = min(nproc, len(target_groups)) #- list of (targets, prefix, rdspec_kwargs) arglist = [(t, prefix, rdspec_kwargs, specprod) for t in target_groups] if comm is not None: #- MPI parallelism from mpi4py.futures import MPICommExecutor with MPICommExecutor(comm, root=0) as pool: if pool is not None: spectra = list(pool.starmap(readspec, arglist)) else: spectra = None elif nproc>1: #- Multiprocessing parallelism with multiprocessing.Pool(nproc) as pool: spectra = list(pool.starmap(readspec, arglist)) else: #- Serial spectra = [readspec(*args) for args in arglist] if spectra is not None: #- belt and suspenders check: remove any None values before stack spectra = [sp for sp in spectra if sp is not None] spectra = stack_spectra(spectra) #- reorder spectra to match input target table if needed if match_order: need_to_reorder = False for col in keycols: if np.any(spectra.fibermap[col] != targets[col]): need_to_reorder = True break if need_to_reorder: ii = argmatch(spectra.fibermap[keycols], targets[keycols]) spectra = spectra[ii] return spectra