Source code for desispec.coaddition

"""
desispec.coaddition
===================

Coadd spectra.
"""

from __future__ import absolute_import, division, print_function
import os, sys, time

import numpy as np

import scipy.sparse
import scipy.linalg
import scipy.sparse.linalg

from astropy.table import Table, Column

import multiprocessing

from desiutil.log import get_logger

from desispec.interpolation import resample_flux
from desispec.spectra import Spectra
from desispec.resolution import Resolution
from desispec.fiberbitmasking import get_all_fiberbitmask_with_amp, get_all_nonamp_fiberbitmask_val, get_justamps_fiberbitmask
from desispec.specscore import compute_coadd_scores
from desispec.util import ordered_unique

#- Fibermap columns that come from targeting or MTL files
fibermap_target_cols = (
    'TARGETID',
    'TARGET_RA',
    'TARGET_DEC',
    'PMRA',
    'PMDEC',
    'REF_EPOCH',
    'LAMBDA_REF',
    'FA_TARGET',
    'FA_TYPE',
    'OBJTYPE',
    'NUMTARGET',
    'OBSCONDITIONS',
    'MORPHTYPE',
    'FLUX_G',
    'FLUX_R',
    'FLUX_Z',
    'FLUX_IVAR_G',
    'FLUX_IVAR_R',
    'FLUX_IVAR_Z',
    'REF_ID',
    'REF_CAT',
    'GAIA_PHOT_G_MEAN_MAG',
    'GAIA_PHOT_BP_MEAN_MAG',
    'GAIA_PHOT_RP_MEAN_MAG',
    'PARALLAX',
    'EBV',
    'FLUX_W1',
    'FLUX_W2',
    'FIBERFLUX_G',
    'FIBERFLUX_R',
    'FIBERFLUX_Z',
    'FIBERTOTFLUX_G',
    'FIBERTOTFLUX_R',
    'FIBERTOTFLUX_Z',
    'MASKBITS',
    'SERSIC',
    'SHAPE_R', 'SHAPE_E1', 'SHAPE_E2',
    'PHOTSYS',
    'PRIORITY_INIT',
    'NUMOBS_INIT',
    'RELEASE',
    'BRICKID',
    'BRICKNAME', 'BRICK_OBJID',
    'BLOBDIST',
    'FIBERFLUX_IVAR_G',
    'FIBERFLUX_IVAR_R',
    'FIBERFLUX_IVAR_Z',
    'CMX_TARGET',
    'SV0_TARGET',
    'SV1_TARGET',
    'SV2_TARGET',
    'SV3_TARGET',
    'DESI_TARGET',
    'BGS_TARGET',
    'MWS_TARGET',
    'SCND_TARGET',
    'HPXPIXEL',
)

#- PRIORITY could change between tiles; SUBPRIORITY won't but keep 'em together
fibermap_mtl_cols = (
    'PRIORITY',
    'SUBPRIORITY',
)

#- Fibermap columns that were added by fiberassign
#- ... same per target, regardless of assignment
fibermap_fiberassign_target_cols = (
    'FA_TARGET', 'FA_TYPE', 'OBJTYPE',
    )
#- ... varies per assignment
fibermap_fiberassign_cols = (
    'PETAL_LOC', 'DEVICE_LOC', 'LOCATION', 'FIBER', 'FIBERSTATUS',
    'FIBERASSIGN_X', 'FIBERASSIGN_Y',
    'LAMBDA_REF', 'PLATE_RA', 'PLATE_DEC',
    )

#- Fibermap columns added frome the platemaker coordinates file
fibermap_coords_cols = (
    'NUM_ITER', 'FIBER_X', 'FIBER_Y', 'DELTA_X', 'DELTA_Y',
    'FIBER_RA', 'FIBER_DEC',
    )

#- Fibermap columns with exposure metadata
fibermap_exp_cols = (
    'NIGHT', 'EXPID', 'MJD', 'TILEID', 'EXPTIME'
    )

#- Fibermap columns added by flux calibration
fibermap_cframe_cols = (
    'PSF_TO_FIBER_SPECFLUX',
    )

#- Columns to include in the per-exposure EXP_FIBERMAP
fibermap_perexp_cols = \
    ('TARGETID',) + \
    fibermap_mtl_cols + \
    fibermap_exp_cols + \
    fibermap_fiberassign_cols + \
    fibermap_coords_cols + fibermap_cframe_cols + \
    ('IN_COADD_B', 'IN_COADD_R', 'IN_COADD_Z')

[docs]def calc_mean_std_ra_dec(ras, decs): """ Calculate mean/std of ras, decs accounting for RA wraparound and cos(dec) Args: ras (array): input RA values in degrees decs (array): input declination values in degrees Returns: mean_ra, std_ra, mean_dec, std_dec where the means are in degrees and the standard deviations are in arcsec, including cos(dec) correction. For efficiency, this does not try to handle dec= +/-90 poles correctly, nor arbitrarily large spreads of angles. i.e. this is ok for a mean of fiber positions scattered about a single target, but not for e.g. a science analysis of the central location of a cluster of galaxies. """ ras = np.asarray(ras) decs = np.asarray(decs) if np.max(ras) - np.min(ras) > 180: offset = 180.0 ras = (ras + offset) % 360 else: offset = 0.0 mean_dec = np.mean(decs) std_dec = np.std(decs) * 3600 mean_ra = (np.mean(ras) - offset + 360) % 360 std_ra = np.std(ras) * np.cos(np.radians(mean_dec)) * 3600 return mean_ra, std_ra, mean_dec, std_dec
[docs]def use_for_coadd(fiberstatus, band): """ Determine which exposures should be used for a per-camera coadd Args: fiberstatus (array): FIBERSTATUS bitmasks, one per exposure band (str): camera band, 'b', 'r', or 'z' Returns: boolean array of whether exposure should be used in coadd or not This is factored into a separate function because it is used in `coadd`, `coadd_cameras`, and `coadd_fibermap` """ if band not in ('b', 'r', 'z'): raise ValueError(f'band={band} should be b, r, or z') fiberstatus_bits = get_all_fiberbitmask_with_amp(band) good_fiberstatus = ( (fiberstatus & fiberstatus_bits) == 0 ) return good_fiberstatus
[docs]def coadd_fibermap(fibermap, onetile=False): """ Coadds fibermap Args: fibermap (Table or ndarray): fibermap of individual exposures Options: onetile (bool): this is a coadd of a single tile, not across tiles Returns: (coadded_fibermap, exp_fibermap) Tables coadded_fibermap contains the coadded_fibermap for the columns that can be coadded, while exp_fibermap is the subset of columns of the original fibermap that can't be meaningfully coadded because they are per-exposure quantities like FIBER_X. If onetile is True, the coadded_fibermap includes additional columns like MEAN_FIBER_X that are meaningful if coadding a single tile, but not if coadding across tiles. """ log = get_logger() log.debug("'coadding' fibermap") if onetile: ntile = len(np.unique(fibermap['TILEID'])) if ntile != 1: msg = f'input has {ntile} tiles, but onetile=True option' log.error(msg) raise ValueError(msg) #- make a copy of input fibermap that we can modify with new columns. #- This will become the per-exposure fibermap for the EXP_FIBERMAP HDU exp_fibermap = Table(fibermap, copy=True) #- Remove the "fibermap" input from the current namespace so that we don't accidentally use it #- NOTE: does not actually delete/modify the original input del fibermap #- Get TARGETIDs, preserving order in which they first appear #- tfmap = "Target Fiber Map", i.e. one row per target instead of one row per exposure targets, ii = ordered_unique(exp_fibermap["TARGETID"], return_index=True) tfmap = exp_fibermap[ii] assert np.all(targets == tfmap['TARGETID']) ntarget = targets.size #- New columns to fill in for whether exposure was used in coadd exp_fibermap['IN_COADD_B'] = np.zeros(len(exp_fibermap), dtype=bool) exp_fibermap['IN_COADD_R'] = np.zeros(len(exp_fibermap), dtype=bool) exp_fibermap['IN_COADD_Z'] = np.zeros(len(exp_fibermap), dtype=bool) #- initialize NUMEXP=-1 to check that they all got filled later tfmap['COADD_NUMEXP'] = np.zeros(len(tfmap), dtype=np.int16) - 1 tfmap['COADD_EXPTIME'] = np.zeros(len(tfmap), dtype=np.float32) - 1 tfmap['COADD_NUMNIGHT'] = np.zeros(len(tfmap), dtype=np.int16) - 1 tfmap['COADD_NUMTILE'] = np.zeros(len(tfmap), dtype=np.int16) - 1 # some cols get combined into mean or rms; # MJD handled separately to get min/max/mean mean_cols = [ 'DELTA_X', 'DELTA_Y', 'PSF_TO_FIBER_SPECFLUX', ] # Note: treat the fiber coordinates separately because of missing coordinate problem # that require an additional "good_coords" condition relative to other mean cols if onetile: mean_cols.extend(['FIBER_X', 'FIBER_Y']) #- rms_cols and std_cols must also be in mean_cols rms_cols = ['DELTA_X', 'DELTA_Y'] std_cols = [] # currently none; RA/dec handled separately #- Add other MEAN/RMS/STD columns for k in mean_cols: if k in exp_fibermap.colnames : dtype = np.float32 if k in mean_cols: xx = Column(np.zeros(ntarget, dtype=dtype)) tfmap.add_column(xx,name='MEAN_'+k) if k in rms_cols: xx = Column(np.zeros(ntarget, dtype=np.float32)) tfmap.add_column(xx,name='RMS_'+k) if k in std_cols: xx = Column(np.zeros(ntarget, dtype=np.float32)) tfmap.add_column(xx,name='STD_'+k) tfmap.remove_column(k) #- FIBER_RA/DEC handled differently due to RA wraparound and cos(dec) if 'FIBER_RA' in exp_fibermap.colnames and 'FIBER_DEC' in exp_fibermap.colnames: tfmap.add_column(Column(np.zeros(ntarget, dtype=np.float64)), name='MEAN_FIBER_RA') tfmap.add_column(Column(np.zeros(ntarget, dtype=np.float32)), name='STD_FIBER_RA') tfmap.add_column(Column(np.zeros(ntarget, dtype=np.float64)), name='MEAN_FIBER_DEC') tfmap.add_column(Column(np.zeros(ntarget, dtype=np.float32)), name='STD_FIBER_DEC') tfmap.remove_column('FIBER_RA') tfmap.remove_column('FIBER_DEC') #- MIN_, MAX_, MEAN_MJD over exposures used in coadd if 'MJD' in exp_fibermap.colnames : dtype = np.float64 if not 'MIN_MJD' in tfmap.dtype.names : xx = Column(np.zeros(ntarget, dtype=dtype)) tfmap.add_column(xx,name='MIN_MJD') if not 'MAX_MJD' in tfmap.dtype.names : xx = Column(np.zeros(ntarget, dtype=dtype)) tfmap.add_column(xx,name='MAX_MJD') if not 'MEAN_MJD' in tfmap.dtype.names : xx = Column(np.zeros(ntarget, dtype=dtype)) tfmap.add_column(xx,name='MEAN_MJD') if 'FIBERSTATUS' in tfmap.dtype.names : tfmap.rename_column('FIBERSTATUS', 'COADD_FIBERSTATUS') if not 'COADD_FIBERSTATUS' in tfmap.dtype.names : raise KeyError("no COADD_FIBERSTATUS column in tfmap") if 'FIBERSTATUS' in exp_fibermap.dtype.names : fiberstatus_key='FIBERSTATUS' elif 'COADD_FIBERSTATUS' in exp_fibermap.dtype.names : fiberstatus_key='COADD_FIBERSTATUS' else : raise KeyError("no FIBERSTATUS nor COADD_FIBERSTATUS column in fibermap") for i,tid in enumerate(targets) : jj = exp_fibermap["TARGETID"]==tid #- Only a subset of "good" FIBERSTATUS flags are included in the coadd targ_fibstatuses = exp_fibermap[fiberstatus_key][jj] in_coadd_b = use_for_coadd(targ_fibstatuses, 'b') in_coadd_r = use_for_coadd(targ_fibstatuses, 'r') in_coadd_z = use_for_coadd(targ_fibstatuses, 'z') good_coadds = (in_coadd_b | in_coadd_r | in_coadd_z) coadd_numexp = np.count_nonzero(good_coadds) tfmap['COADD_NUMEXP'][i] = coadd_numexp exp_fibermap['IN_COADD_B'][jj] = in_coadd_b exp_fibermap['IN_COADD_R'][jj] = in_coadd_r exp_fibermap['IN_COADD_Z'][jj] = in_coadd_z # Check if there are some good coadds to compute aggregate quantities; # Otherwise just use all the (bad) exposures; will still count NUM on good_coadds if coadd_numexp>0: compute_coadds = good_coadds # coadded FIBERSTATUS = bitwise AND of input FIBERSTATUS tfmap['COADD_FIBERSTATUS'][i] = np.bitwise_and.reduce(exp_fibermap[fiberstatus_key][jj][good_coadds]) else: compute_coadds = ~good_coadds # if all inputs were bad, COADD_FIBERSTATUS is OR of inputs instead of AND tfmap['COADD_FIBERSTATUS'][i] = np.bitwise_or.reduce(exp_fibermap[fiberstatus_key][jj]) #- For FIBER_RA/DEC quantities, only average over good coordinates. # There is a bug that some "missing" coordinates were set to FIBER_RA=FIBER_DEC=0 # (we are assuming there are not valid targets at exactly 0,0; only missing coords) if 'FIBER_RA' in exp_fibermap.colnames and 'FIBER_DEC' in exp_fibermap.colnames: good_coords = (exp_fibermap['FIBER_RA'][jj]!=0)|(exp_fibermap['FIBER_DEC'][jj]!=0) #- Check whether entries with good coordinates exist (if not use all coordinates) if np.count_nonzero(good_coords)>0: compute_coords = good_coords else: compute_coords = ~good_coords #- Check for edge case where good_coadds and good_coords do not overlap: # if they overlap, use both conditions; otherwise compute coordinates over good_coords if np.count_nonzero(compute_coadds&compute_coords)>0: compute_coadds_coords = compute_coadds&compute_coords else: #TODO - decide if it's worth adding the following Warning message to the log #print(f"Warning: TARGETID lacks overlap between good_coadds and good_coords: {tid}") compute_coadds_coords = compute_coords # Note: NIGHT and TILEID may not be present when coadding previously # coadded spectra. if 'NIGHT' in exp_fibermap.colnames: tfmap['COADD_NUMNIGHT'][i] = len(np.unique(exp_fibermap['NIGHT'][jj][good_coadds])) if 'TILEID' in exp_fibermap.colnames: tfmap['COADD_NUMTILE'][i] = len(np.unique(exp_fibermap['TILEID'][jj][good_coadds])) if 'EXPTIME' in exp_fibermap.colnames : tfmap['COADD_EXPTIME'][i] = np.sum(exp_fibermap['EXPTIME'][jj][good_coadds]) # Calc MEAN_* and RMS_* columns using the same exposures as the coadd # Note RA/DEC/MJD are handled separately for k in mean_cols: if k in exp_fibermap.colnames : vals=exp_fibermap[k][jj][compute_coadds] tfmap['MEAN_'+k][i] = np.mean(vals) for k in rms_cols: if k in exp_fibermap.colnames : vals=exp_fibermap[k][jj][compute_coadds] # RMS includes mean offset, not same as STD tfmap['RMS_'+k][i] = np.sqrt(np.mean(vals**2)).astype(np.float32) #- STD of FIBER_RA, FIBER_DEC in arcsec, handling cos(dec) and RA wrap if 'FIBER_RA' in exp_fibermap.colnames and 'FIBER_DEC' in exp_fibermap.colnames: decs = exp_fibermap['FIBER_DEC'][jj][compute_coadds_coords] ras = exp_fibermap['FIBER_RA'][jj][compute_coadds_coords] mean_ra, std_ra, mean_dec, std_dec = calc_mean_std_ra_dec(ras, decs) tfmap['MEAN_FIBER_RA'][i] = mean_ra tfmap['STD_FIBER_RA'][i] = np.float32(std_ra) tfmap['MEAN_FIBER_DEC'][i] = mean_dec tfmap['STD_FIBER_DEC'][i] = np.float32(std_dec) #- future proofing possibility of other STD cols for k in std_cols: if k in exp_fibermap.colnames: vals=exp_fibermap[k][jj][compute_coadds] # STD removes mean offset, not same as RMS tfmap['STD_'+k][i] = np.std(vals).astype(np.float32) # MIN_, MAX_MJD over exposures used in the coadd if 'MJD' in exp_fibermap.colnames : vals=exp_fibermap['MJD'][jj][compute_coadds] tfmap['MIN_MJD'][i] = np.min(vals) tfmap['MAX_MJD'][i] = np.max(vals) tfmap['MEAN_MJD'][i] = np.mean(vals) # Error propagation of IVAR values when taking an unweighted MEAN #- (Note 1: IVAR will be 0.0 if any of ivar[compute_coadds]=0) #- (Note 2: these columns are place-holder for possible future use) for k in ['FIBER_RA_IVAR', 'FIBER_DEC_IVAR', 'DELTA_X_IVAR', 'DELTA_Y_IVAR'] : if k in exp_fibermap.colnames : tfmap[k][i]=1./np.mean(1./exp_fibermap[k][jj][compute_coadds]) #- Targeting bits can evolve, so use bitwise OR of any input bits set #- See Sec 5.1 of https://ui.adsabs.harvard.edu/abs/2023AJ....165...50M/abstract for targetcol in ('CMX_TARGET', 'SV1_DESI_TARGET', 'SV1_BGS_TARGET', 'SV1_MWS_TARGET', 'SV1_SCND_TARGET', 'SV2_DESI_TARGET', 'SV2_BGS_TARGET', 'SV2_MWS_TARGET', 'SV2_SCND_TARGET', 'SV3_DESI_TARGET', 'SV3_BGS_TARGET', 'SV3_MWS_TARGET', 'SV3_SCND_TARGET', 'DESI_TARGET', 'BGS_TARGET', 'MWS_TARGET', 'SCND_TARGET'): if targetcol in tfmap.colnames: tfmap[targetcol][i] = np.bitwise_or.reduce(exp_fibermap[targetcol][jj],axis=0) #- Remove some columns that apply to individual exp but not coadds #- (even coadds of the same tile) for k in ['NIGHT', 'EXPID', 'MJD', 'EXPTIME', 'NUM_ITER', 'PSF_TO_FIBER_SPECFLUX']: if k in tfmap.colnames: tfmap.remove_column(k) #- Remove columns that don't apply to coadds across tiles if not onetile: for k in ['TILEID', 'FIBER', 'FIBER_X', 'FIBER_Y', 'PRIORITY', 'PETAL_RA', 'PETAL_DEC', 'LAMBDA_REF', 'FIBERASSIGN_X', 'FIBERASSIGN_Y', 'PETAL_LOC', 'DEVICE_LOC', 'LOCATION' ]: if k in tfmap.colnames: tfmap.remove_column(k) #- keep exposure-specific columns that are present in the input fibermap ii = np.isin(fibermap_perexp_cols, exp_fibermap.dtype.names) keepcols = tuple(np.array(fibermap_perexp_cols)[ii]) exp_fibermap = exp_fibermap[keepcols] return tfmap, exp_fibermap
[docs]def coadd(spectra, cosmics_nsig=None, onetile=False) : """ Coadd spectra for each target and each camera, modifying input spectra obj. Args: spectra: desispec.spectra.Spectra object Options: cosmics_nsig: float, nsigma clipping threshold for cosmics rays (default 4) onetile: bool, if True, inputs are from a single tile Notes: if `onetile` is True, additional tile-specific columns like LOCATION and FIBER are included the FIBERMAP; otherwise these are only in the EXP_FIBERMAP since for the same target they could be different on different tiles. """ log = get_logger() targets = ordered_unique(spectra.fibermap["TARGETID"]) ntarget=targets.size log.debug("number of targets= {}".format(ntarget)) #- Use "None" as default -> 4.0 so that scripts can use args.nsig #- with default None, which lets this function be the sole "owner" of #- the true numeric default if cosmics_nsig is None: cosmics_nsig = 4.0 # Note: if you change this, also change docstring if cosmics_nsig > 0: log.info(f'Clipping cosmics with {cosmics_nsig=}') else: log.info(f'Not performing cosmics sigma clipping ({cosmics_nsig=})') for b in spectra.bands : log.debug("coadding band '{}'".format(b)) nwave=spectra.wave[b].size tflux=np.zeros((ntarget,nwave),dtype=spectra.flux[b].dtype) tivar=np.zeros((ntarget,nwave),dtype=spectra.ivar[b].dtype) if spectra.mask is not None : tmask=np.zeros((ntarget,nwave),dtype=spectra.mask[b].dtype) else : tmask=None trdata=np.zeros((ntarget,spectra.resolution_data[b].shape[1],nwave),dtype=spectra.resolution_data[b].dtype) if 'FIBERSTATUS' in spectra.fibermap.dtype.names: fiberstatus = spectra.fibermap['FIBERSTATUS'] else: fiberstatus = spectra.fibermap['COADD_FIBERSTATUS'] good_fiberstatus = use_for_coadd(fiberstatus, b) for i,tid in enumerate(targets) : jj=np.where( (spectra.fibermap["TARGETID"]==tid) & good_fiberstatus )[0] #- if all spectra were flagged as bad (FIBERSTATUS != 0), contine #- to next target, leaving tflux and tivar=0 for this target if len(jj) == 0: continue if cosmics_nsig is not None and cosmics_nsig > 0 and len(jj)>2 : # interpolate over bad measurements # to be able to compute gradient next # to a bad pixel and identify outlier # many cosmics residuals are on edge # of cosmic ray trace, and so can be # next to a masked flux bin grad=[] gradvar=[] for j in jj : if spectra.mask is not None : ttivar = spectra.ivar[b][j]*(spectra.mask[b][j]==0) else : ttivar = spectra.ivar[b][j] good = (ttivar>0) bad = ~good if np.sum(good)==0 : continue nbad = np.sum(bad) ttflux = spectra.flux[b][j].copy() if nbad>0 : ttflux[bad] = np.interp(spectra.wave[b][bad],spectra.wave[b][good],ttflux[good]) ttivar = spectra.ivar[b][j].copy() if nbad>0 : ttivar[bad] = np.interp(spectra.wave[b][bad],spectra.wave[b][good],ttivar[good]) ttvar = 1./(ttivar+(ttivar==0)) ttflux[1:] = ttflux[1:]-ttflux[:-1] ttvar[1:] = ttvar[1:]+ttvar[:-1] ttflux[0] = 0 grad.append(ttflux) gradvar.append(ttvar) tivar_unmasked= np.sum(spectra.ivar[b][jj],axis=0) if spectra.mask is not None : ivarjj=spectra.ivar[b][jj]*(spectra.mask[b][jj]==0) else : ivarjj=spectra.ivar[b][jj] if cosmics_nsig is not None and cosmics_nsig > 0 and len(jj)>2 : grad=np.array(grad) gradvar=np.array(gradvar) gradivar=(gradvar>0)/np.array(gradvar+(gradvar==0)) nspec=grad.shape[0] sgradivar=np.sum(gradivar) if sgradivar>0 : meangrad=np.sum(gradivar*grad,axis=0)/sgradivar deltagrad=grad-meangrad chi2=np.sum(gradivar*deltagrad**2,axis=0)/(nspec-1) bad = (chi2>cosmics_nsig**2) nbad = np.sum(bad) if nbad>0 : log.info("masking {} values for targetid={}".format(nbad,tid)) badindex=np.where(bad)[0] for bi in badindex : k=np.argmax(gradivar[:,bi]*deltagrad[:,bi]**2) ivarjj[k,bi]=0. log.debug("masking spec {} wave={}".format(k,spectra.wave[b][bi])) tivar[i]=np.sum(ivarjj,axis=0) tflux[i]=np.sum(ivarjj*spectra.flux[b][jj],axis=0) for r in range(spectra.resolution_data[b].shape[1]) : trdata[i,r]=np.sum((spectra.ivar[b][jj]*spectra.resolution_data[b][jj,r]),axis=0) # not sure applying mask is wise here bad=(tivar[i]==0) if np.sum(bad)>0 : tivar[i][bad] = np.sum(spectra.ivar[b][jj][:,bad],axis=0) # if all masked, keep original ivar tflux[i][bad] = np.sum(spectra.ivar[b][jj][:,bad]*spectra.flux[b][jj][:,bad],axis=0) ok=(tivar[i]>0) if np.sum(ok)>0 : tflux[i][ok] /= tivar[i][ok] ok=(tivar_unmasked>0) if np.sum(ok)>0 : trdata[i][:,ok] /= tivar_unmasked[ok] if spectra.mask is not None : tmask[i] = np.bitwise_and.reduce(spectra.mask[b][jj],axis=0) spectra.flux[b] = tflux spectra.ivar[b] = tivar if spectra.mask is not None : spectra.mask[b] = tmask spectra.resolution_data[b] = trdata if spectra.scores is not None: orig_scores = Table(spectra.scores.copy()) orig_scores['TARGETID'] = spectra.fibermap['TARGETID'] else: orig_scores = None spectra.fibermap, exp_fibermap = coadd_fibermap(spectra.fibermap, onetile=onetile) spectra.exp_fibermap = exp_fibermap spectra.scores=None compute_coadd_scores(spectra, orig_scores, update_coadd=True)
[docs]def coadd_cameras(spectra, cosmics_nsig=0., onetile=False) : """ Return coadd across both exposures and cameras Args: spectra: desispec.spectra.Spectra object Options: cosmics_nsig: float, nsigma clipping threshold for cosmics rays onetile: bool, if True, inputs are from a single tile If `onetile` is True, additional tile-specific columns like LOCATION and FIBER are included the FIBERMAP; otherwise these are only in the EXP_FIBERMAP since for the same target they could be different on different tiles. Note: unlike `coadd`, this does not modify the input spectra object """ #check_alignement_of_camera_wavelength(spectra) log = get_logger() # ordering mwave=[np.mean(spectra.wave[b]) for b in spectra.bands] sbands=np.array(spectra.bands)[np.argsort(mwave)] # bands sorted by inc. wavelength log.debug("wavelength sorted cameras= {}".format(sbands)) # create wavelength array wave=None tolerance=0.0001 #A , tolerance for b in sbands : if wave is None : wave=spectra.wave[b] else : wave=np.append(wave,spectra.wave[b][spectra.wave[b]>wave[-1]+tolerance]) nwave=wave.size # check alignment, caching band wavelength grid indices as we go windict = {} number_of_overlapping_cameras=np.zeros(nwave) for b in spectra.bands : imin = np.argmin(np.abs(spectra.wave[b][0] - wave)) windices = np.arange(imin, imin+len(spectra.wave[b]), dtype=int) dwave = spectra.wave[b] - wave[windices] if np.any(np.abs(dwave) > tolerance): msg = "Input wavelength grids (band '{}') are not aligned. Use --lin-step or --log10-step to resample to a common grid.".format(b) log.error(msg) raise ValueError(msg) number_of_overlapping_cameras[windices] += 1 windict[b] = windices # targets targets = ordered_unique(spectra.fibermap["TARGETID"]) ntarget=targets.size log.debug("number of targets= {}".format(ntarget)) # ndiag = max of all cameras ndiag=0 if spectra.resolution_data is not None: for b in sbands : ndiag=max(ndiag,spectra.resolution_data[b].shape[1]) log.debug("ndiag=%d", ndiag) b = sbands[0] flux=np.zeros((ntarget,nwave),dtype=spectra.flux[b].dtype) ivar=np.zeros((ntarget,nwave),dtype=spectra.ivar[b].dtype) if spectra.mask is not None : ivar_unmasked=np.zeros((ntarget,nwave),dtype=spectra.ivar[b].dtype) mask=np.zeros((ntarget,nwave),dtype=spectra.mask[b].dtype) else : ivar_unmasked=ivar mask=None if spectra.resolution_data is not None: rdata=np.zeros((ntarget,ndiag,nwave),dtype=spectra.resolution_data[b].dtype) else: rdata = None for b in spectra.bands : log.debug("coadding band '{}'".format(b)) # indices windices = windict[b] if spectra.resolution_data is not None: band_ndiag = spectra.resolution_data[b].shape[1] else: band_ndiag = 0 if 'FIBERSTATUS' in spectra.fibermap.dtype.names: fiberstatus = spectra.fibermap['FIBERSTATUS'] else: fiberstatus = spectra.fibermap['COADD_FIBERSTATUS'] good_fiberstatus = use_for_coadd(fiberstatus, b) for i,tid in enumerate(targets) : jj=np.where( (spectra.fibermap["TARGETID"]==tid) & good_fiberstatus )[0] #- if all spectra were flagged as bad (FIBERSTATUS != 0), contine #- to next target, leaving tflux and tivar=0 for this target if len(jj) == 0: continue if cosmics_nsig is not None and cosmics_nsig > 0 and len(jj)>2 : # interpolate over bad measurements # to be able to compute gradient next # to a bad pixel and identify oulier # many cosmics residuals are on edge # of cosmic ray trace, and so can be # next to a masked flux bin grad=[] gradvar=[] for j in jj : if spectra.mask is not None : ttivar = spectra.ivar[b][j]*(spectra.mask[b][j]==0) else : ttivar = spectra.ivar[b][j] good = (ttivar>0) bad = ~good ttflux = spectra.flux[b][j].copy() ttflux[bad] = np.interp(spectra.wave[b][bad],spectra.wave[b][good],ttflux[good]) ttivar = spectra.ivar[b][j].copy() ttivar[bad] = np.interp(spectra.wave[b][bad],spectra.wave[b][good],ttivar[good]) ttvar = 1./(ttivar+(ttivar==0)) ttflux[1:] = ttflux[1:]-ttflux[:-1] ttvar[1:] = ttvar[1:]+ttvar[:-1] ttflux[0] = 0 grad.append(ttflux) gradvar.append(ttvar) ivar_unmasked[i,windices] += np.sum(spectra.ivar[b][jj],axis=0) if spectra.mask is not None : ivarjj=spectra.ivar[b][jj]*(spectra.mask[b][jj]==0) else : ivarjj=spectra.ivar[b][jj] if cosmics_nsig is not None and cosmics_nsig > 0 and len(jj)>2 : grad=np.array(grad) gradivar=1/np.array(gradvar) nspec=grad.shape[0] meangrad=np.sum(gradivar*grad,axis=0)/np.sum(gradivar) deltagrad=grad-meangrad chi2=np.sum(gradivar*deltagrad**2,axis=0)/(nspec-1) bad = (chi2>cosmics_nsig**2) nbad = np.sum(bad) if nbad>0 : log.info("masking {} values for targetid={}".format(nbad,tid)) badindex=np.where(bad)[0] for bi in badindex : k=np.argmax(gradivar[:,bi]*deltagrad[:,bi]**2) ivarjj[k,bi]=0. log.debug("masking spec {} wave={}".format(k,spectra.wave[b][bi])) ivar[i,windices] += np.sum(ivarjj,axis=0) flux[i,windices] += np.sum(ivarjj*spectra.flux[b][jj],axis=0) if spectra.resolution_data is not None: for r in range(band_ndiag) : rdata[i,r+(ndiag-band_ndiag)//2,windices] += np.sum((spectra.ivar[b][jj]*spectra.resolution_data[b][jj,r]),axis=0) if spectra.mask is not None : # this deserves some attention ... tmpmask=np.bitwise_and.reduce(spectra.mask[b][jj],axis=0) # directly copy mask where no overlap jj=(number_of_overlapping_cameras[windices]==1) mask[i,windices[jj]] = tmpmask[jj] # 'and' in overlapping regions jj=(number_of_overlapping_cameras[windices]>1) mask[i,windices[jj]] = mask[i,windices[jj]] & tmpmask[jj] for i,tid in enumerate(targets) : ok=(ivar[i]>0) if np.sum(ok)>0 : flux[i][ok] /= ivar[i][ok] ok=(ivar_unmasked[i]>0) if np.sum(ok)>0 and rdata is not None: rdata[i][:,ok] /= ivar_unmasked[i][ok] if 'COADD_NUMEXP' in spectra.fibermap.colnames: fibermap = spectra.fibermap exp_fibermap = spectra.exp_fibermap else: fibermap, exp_fibermap = coadd_fibermap(spectra.fibermap, onetile=onetile) bands="" for b in sbands : bands+=b if spectra.mask is not None : dmask={bands:mask,} else : dmask=None if rdata is not None: rdata = {bands:rdata} res = Spectra( bands=[bands,], wave={bands:wave,}, flux={bands:flux,}, ivar={bands:ivar,}, mask=dmask, resolution_data=rdata, fibermap=fibermap, exp_fibermap=exp_fibermap, meta=spectra.meta, extra=spectra.extra, scores=None) if spectra.scores is not None: orig_scores = spectra.scores.copy() orig_scores['TARGETID'] = spectra.fibermap['TARGETID'] else: orig_scores = None compute_coadd_scores(res, orig_scores, update_coadd=True) return res
[docs]def get_resampling_matrix(global_grid,local_grid,sparse=False): """Build the rectangular matrix that linearly resamples from the global grid to a local grid. The local grid range must be contained within the global grid range. Args: global_grid(numpy.ndarray): Sorted array of n global grid wavelengths. local_grid(numpy.ndarray): Sorted array of m local grid wavelengths. Returns: numpy.ndarray: Array of (m,n) matrix elements that perform the linear resampling. """ assert np.all(np.diff(global_grid) > 0),'Global grid is not strictly increasing.' assert np.all(np.diff(local_grid) > 0),'Local grid is not strictly increasing.' # Locate each local wavelength in the global grid. global_index = np.searchsorted(global_grid,local_grid) assert local_grid[0] >= global_grid[0],'Local grid extends below global grid.' assert local_grid[-1] <= global_grid[-1],'Local grid extends above global grid.' # Lookup the global-grid bracketing interval (xlo,xhi) for each local grid point. # Note that this gives xlo = global_grid[-1] if local_grid[0] == global_grid[0] # but this is fine since the coefficient of xlo will be zero. global_xhi = global_grid[global_index] global_xlo = global_grid[global_index-1] # Create the rectangular interpolation matrix to return. alpha = (local_grid - global_xlo)/(global_xhi - global_xlo) local_index = np.arange(len(local_grid),dtype=int) matrix = np.zeros((len(local_grid),len(global_grid))) matrix[local_index,global_index] = alpha matrix[local_index,global_index-1] = 1-alpha # turn into a sparse matrix return scipy.sparse.csc_matrix(matrix)
[docs]def decorrelate_divide_and_conquer(Cinv,Cinvf,wavebin,flux,ivar,rdata) : """Decorrelate an inverse covariance using the matrix square root. Implements the decorrelation part of the spectroperfectionism algorithm described in Bolton & Schlegel 2009 (BS) http://arxiv.org/abs/0911.2689. with the divide and conquer approach, i.e. per diagonal block of the matrix, with an overlapping 'skin' from one block to another. Args: Cinv: Square 2D array: input inverse covariance matrix Cinvf: 1D array: input wavebin: minimal size of wavelength bin in A, used to define the core and skin size flux: 1D array: output flux (has to be allocated) ivar: 1D array: output flux inverse variance (has to be allocated) rdata: 2D array: output resolution matrix per diagonal (has to be allocated) """ chw=max(10,int(50/wavebin)) #core is 2*50+1 A skin=max(2,int(10/wavebin)) #skin is 10A nn=Cinv.shape[0] nstep=nn//(2*chw+1)+1 Lmin=1e-15/np.mean(np.diag(Cinv)) # Lmin is scaled with Cinv values ndiag=rdata.shape[0] dd=np.arange(ndiag,dtype=int)-ndiag//2 for c in range(chw,nn+(2*chw+1),(2*chw+1)) : b=max(0,c-chw-skin) e=min(nn,c+chw+skin+1) b1=max(0,c-chw) e1=min(nn,c+chw+1) bb=max(0,b1-b) ee=min(e-b,e1-b) if e<=b : continue L,X = scipy.linalg.eigh(Cinv[b:e,b:e],overwrite_a=False,turbo=True) nbad = np.count_nonzero(L < Lmin) if nbad > 0: #log.warning('zeroing {0:d} negative eigenvalue(s).'.format(nbad)) L[L < Lmin] = Lmin Q = X.dot(np.diag(np.sqrt(L)).dot(X.T)) s = np.sum(Q,axis=1) b1x=max(0,c-chw-3) e1x=min(nn,c+chw+1+3) tR = (Q/s[:,np.newaxis]) tR_it = scipy.linalg.inv(tR.T) tivar = s**2 flux[b1:e1] = (tR_it.dot(Cinvf[b:e])/tivar)[bb:ee] ivar[b1:e1] = (s[bb:ee])**2 for j in range(b1,e1) : k=(dd>=-j)&(dd<nn-j) # k is the diagonal index # j is the wavelength index # it could be the transposed, I am following what it is specter.ex2d, L209 rdata[k,j] = tR[j-b+dd[k],j-b]
def spectroperf_resample_spectrum_singleproc(spectra,target_index,wave,wavebin,resampling_matrix,ndiag,flux,ivar,rdata) : cinv = None for b in spectra.bands : twave=spectra.wave[b] jj=(twave>=wave[0])&(twave<=wave[-1]) twave=twave[jj] tivar=spectra.ivar[b][target_index][jj] diag_ivar = scipy.sparse.dia_matrix((tivar,[0]),(twave.size,twave.size)) RR = Resolution(spectra.resolution_data[b][target_index][:,jj]).dot(resampling_matrix[b]) tcinv = RR.T.dot(diag_ivar.dot(RR)) tcinvf = RR.T.dot(tivar*spectra.flux[b][target_index][jj]) if cinv is None : cinv = tcinv cinvf = tcinvf else : cinv += tcinv cinvf += tcinvf cinv = cinv.todense() decorrelate_divide_and_conquer(cinv,cinvf,wavebin,flux[target_index],ivar[target_index],rdata[target_index]) # for multiprocessing, with shared memory buffers def spectroperf_resample_spectrum_multiproc(shm_in_wave,shm_in_flux,shm_in_ivar,shm_in_rdata,in_nwave,in_ndiag,in_bands,target_indices,wave,wavebin,resampling_matrix,ndiag,ntarget,shm_flux,shm_ivar,shm_rdata) : nwave = wave.size # manipulate shared memory as np arrays # input shared memory in_wave = list() in_flux = list() in_ivar = list() in_rdata = list() nbands = len(shm_in_wave) for b in range(nbands) : in_wave.append( np.array(shm_in_wave[b],copy=False).reshape(in_nwave[b]) ) in_flux.append( np.array(shm_in_flux[b],copy=False).reshape((ntarget,in_nwave[b])) ) in_ivar.append( np.array(shm_in_ivar[b],copy=False).reshape((ntarget,in_nwave[b])) ) in_rdata.append( np.array(shm_in_rdata[b],copy=False).reshape((ntarget,in_ndiag[b],in_nwave[b])) ) # output shared memory flux = np.array(shm_flux,copy=False).reshape(ntarget,nwave) ivar = np.array(shm_ivar,copy=False).reshape(ntarget,nwave) rdata = np.array(shm_rdata,copy=False).reshape(ntarget,ndiag,nwave) for target_index in target_indices : cinv = None for b in range(nbands) : twave=in_wave[b] jj=(twave>=wave[0])&(twave<=wave[-1]) twave=twave[jj] tivar=in_ivar[b][target_index][jj] diag_ivar = scipy.sparse.dia_matrix((tivar,[0]),(twave.size,twave.size)) RR = Resolution(in_rdata[b][target_index][:,jj]).dot(resampling_matrix[in_bands[b]]) tcinv = RR.T.dot(diag_ivar.dot(RR)) tcinvf = RR.T.dot(tivar*in_flux[b][target_index][jj]) if cinv is None : cinv = tcinv cinvf = tcinvf else : cinv += tcinv cinvf += tcinvf cinv = cinv.todense() decorrelate_divide_and_conquer(cinv,cinvf,wavebin,flux[target_index],ivar[target_index],rdata[target_index])
[docs]def spectroperf_resample_spectra(spectra, wave, nproc=1) : """ Resampling of spectra file using the spectrophotometic approach Args: spectra: desispec.spectra.Spectra object wave: 1D numy array with new wavelenght grid Returns: desispec.spectra.Spectra object """ log = get_logger() log.debug("resampling to wave grid of size {}: {}".format(wave.size,wave)) b=spectra.bands[0] ntarget=spectra.flux[b].shape[0] nwave=wave.size if spectra.mask is not None : mask = np.zeros((ntarget,nwave),dtype=spectra.mask[b].dtype) else : mask = None # number of diagonals is the max of the number of diagonals in the # input spectra cameras ndiag = 0 for b in spectra.bands : ndiag = max(ndiag,spectra.resolution_data[b].shape[1]) dw=np.gradient(wave) wavebin=np.min(dw[dw>0.]) # min wavelength bin size log.debug("min wavelength bin= {:2.1f} A; ndiag= {:d}".format(wavebin,ndiag)) log.debug("compute resampling matrices") resampling_matrix=dict() for b in spectra.bands : twave=spectra.wave[b] jj=np.where((twave>=wave[0])&(twave<=wave[-1]))[0] twave=spectra.wave[b][jj] resampling_matrix[b] = get_resampling_matrix(wave,twave) if nproc==1 : # allocate array flux = np.zeros((ntarget,nwave),dtype=float) ivar = np.zeros((ntarget,nwave),dtype=float) rdata = np.zeros((ntarget,ndiag,nwave),dtype=float) # simply loop on targets for target_index in range(ntarget) : log.debug("resampling {}/{}".format(target_index+1,ntarget)) t0=time.time() spectroperf_resample_spectrum_singleproc(spectra,target_index,wave,wavebin,resampling_matrix,ndiag,flux,ivar,rdata) t1=time.time() log.debug("done one spectrum in {} sec".format(t1-t0)) else : log.debug("allocate shared memory") # input shm_in_wave = list() shm_in_flux = list() shm_in_ivar = list() shm_in_rdata = list() in_nwave = list() in_ndiag = list() for b in spectra.bands : shm_in_wave.append( multiprocessing.Array('d',spectra.wave[b],lock=False) ) shm_in_flux.append( multiprocessing.Array('d',spectra.flux[b].ravel(),lock=False) ) shm_in_ivar.append( multiprocessing.Array('d',spectra.ivar[b].ravel(),lock=False) ) shm_in_rdata.append( multiprocessing.Array('d',spectra.resolution_data[b].ravel(),lock=False) ) in_nwave.append(spectra.wave[b].size) in_ndiag.append(spectra.resolution_data[b].shape[1]) # output shm_flux=multiprocessing.Array('d',ntarget*nwave,lock=False) shm_ivar=multiprocessing.Array('d',ntarget*nwave,lock=False) shm_rdata=multiprocessing.Array('d',ntarget*ndiag*nwave,lock=False) # manipulate shared memory as np arrays flux = np.array(shm_flux,copy=False).reshape(ntarget,nwave) ivar = np.array(shm_ivar,copy=False).reshape(ntarget,nwave) rdata = np.array(shm_rdata,copy=False).reshape(ntarget,ndiag,nwave) # split targets per process target_indices = np.array_split(np.arange(ntarget),nproc) # loop on processes procs=list() for proc_index in range(nproc) : log.debug("starting process #{}".format(proc_index+1)) proc = multiprocessing.Process(target=spectroperf_resample_spectrum_multiproc, args=(shm_in_wave,shm_in_flux,shm_in_ivar,shm_in_rdata, in_nwave,in_ndiag,spectra.bands, target_indices[proc_index],wave,wavebin, resampling_matrix,ndiag,ntarget, shm_flux,shm_ivar,shm_rdata)) proc.start() procs.append(proc) # wait for the processes to finish log.info("waiting for the {} processes to finish ...".format(nproc)) for proc in procs : proc.join() log.info("all done!") bands="" for b in spectra.bands : bands += b if spectra.mask is not None : dmask={bands:mask,} else : dmask=None res=Spectra(bands=[bands,],wave={bands:wave,},flux={bands:flux,},ivar={bands:ivar,},mask=dmask,resolution_data={bands:rdata,}, fibermap=spectra.fibermap,meta=spectra.meta,extra=spectra.extra,scores=spectra.scores) return res
[docs]def fast_resample_spectra(spectra, wave) : """ Fast resampling of spectra file. The output resolution = Id. The neighboring flux bins are correlated. Args: spectra: desispec.spectra.Spectra object wave: 1D numy array with new wavelenght grid Returns: desispec.spectra.Spectra object, resolution data=Id """ log = get_logger() log.debug("Resampling to wave grid: {}".format(wave)) nwave=wave.size b=spectra.bands[0] ntarget=spectra.flux[b].shape[0] nres=spectra.resolution_data[b].shape[1] ivar=np.zeros((ntarget,nwave),dtype=spectra.flux[b].dtype) flux=np.zeros((ntarget,nwave),dtype=spectra.ivar[b].dtype) if spectra.mask is not None : mask = np.zeros((ntarget,nwave),dtype=spectra.mask[b].dtype) else : mask = None rdata=np.ones((ntarget,1,nwave),dtype=spectra.resolution_data[b].dtype) # pointless for this resampling bands="" for b in spectra.bands : if spectra.mask is not None : tivar=spectra.ivar[b]*(spectra.mask[b]==0) else : tivar=spectra.ivar[b] for i in range(ntarget) : ivar[i] += resample_flux(wave,spectra.wave[b],tivar[i]) flux[i] += resample_flux(wave,spectra.wave[b],tivar[i]*spectra.flux[b][i]) bands += b for i in range(ntarget) : ok=(ivar[i]>0) flux[i,ok]/=ivar[i,ok] if spectra.mask is not None : dmask={bands:mask,} else : dmask=None res=Spectra(bands=[bands,],wave={bands:wave,},flux={bands:flux,},ivar={bands:ivar,},mask=dmask,resolution_data={bands:rdata,}, fibermap=spectra.fibermap,meta=spectra.meta,extra=spectra.extra,scores=spectra.scores) return res
[docs]def resample_spectra_lin_or_log(spectra, linear_step=0, log10_step=0, fast=False, wave_min=None, wave_max=None, nproc=1) : """ Resampling of spectra file. Args: spectra: desispec.spectra.Spectra object linear_step: if not null the ouput wavelenght grid will be linear with this step log10_step: if not null the ouput wavelenght grid will be logarthmic with this step Options: fast: simple resampling. fast but at the price of correlated output flux bins and no information on resolution wave_min: if set, use this min wavelength wave_max: if set, use this max wavelength Returns: desispec.spectra.Spectra object """ wmin=None wmax=None for b in spectra.bands : if wmin is None : wmin=spectra.wave[b][0] wmax=spectra.wave[b][-1] else : wmin=min(wmin,spectra.wave[b][0]) wmax=max(wmax,spectra.wave[b][-1]) if wave_min is not None : wmin = wave_min if wave_max is not None : wmax = wave_max if linear_step>0 : nsteps=int((wmax-wmin)/linear_step) + 1 wave=wmin+np.arange(nsteps)*linear_step elif log10_step>0 : lwmin=np.log10(wmin) lwmax=np.log10(wmax) nsteps=int((lwmax-lwmin)/log10_step) + 1 wave=10**(lwmin+np.arange(nsteps)*log10_step) if fast : return fast_resample_spectra(spectra=spectra,wave=wave) else : return spectroperf_resample_spectra(spectra=spectra,wave=wave,nproc=nproc)