Source code for desispec.coaddition

"""
Coadd spectra
"""

from __future__ import absolute_import, division, print_function
import os, sys, time

import numpy as np

import scipy.sparse
import scipy.linalg
import scipy.sparse.linalg

from astropy.table import Table, Column

import multiprocessing

from desiutil.log import get_logger

from desispec.interpolation import resample_flux
from desispec.spectra import Spectra
from desispec.resolution import Resolution
from desispec.fiberbitmasking import get_all_fiberbitmask_with_amp, get_all_nonamp_fiberbitmask_val, get_justamps_fiberbitmask
from desispec.specscore import compute_coadd_scores
from desispec.util import ordered_unique

#- Fibermap columns that come from targeting or MTL files
fibermap_target_cols = (
    'TARGETID',
    'TARGET_RA',
    'TARGET_DEC',
    'PMRA',
    'PMDEC',
    'REF_EPOCH',
    'LAMBDA_REF',
    'FA_TARGET',
    'FA_TYPE',
    'OBJTYPE',
    'NUMTARGET',
    'OBSCONDITIONS',
    'MORPHTYPE',
    'FLUX_G',
    'FLUX_R',
    'FLUX_Z',
    'FLUX_IVAR_G',
    'FLUX_IVAR_R',
    'FLUX_IVAR_Z',
    'REF_ID',
    'REF_CAT',
    'GAIA_PHOT_G_MEAN_MAG',
    'GAIA_PHOT_BP_MEAN_MAG',
    'GAIA_PHOT_RP_MEAN_MAG',
    'PARALLAX',
    'EBV',
    'FLUX_W1',
    'FLUX_W2',
    'FIBERFLUX_G',
    'FIBERFLUX_R',
    'FIBERFLUX_Z',
    'FIBERTOTFLUX_G',
    'FIBERTOTFLUX_R',
    'FIBERTOTFLUX_Z',
    'MASKBITS',
    'SERSIC',
    'SHAPE_R', 'SHAPE_E1', 'SHAPE_E2',
    'PHOTSYS',
    'PRIORITY_INIT',
    'NUMOBS_INIT',
    'RELEASE',
    'BRICKID',
    'BRICKNAME', 'BRICK_OBJID',
    'BLOBDIST',
    'FIBERFLUX_IVAR_G',
    'FIBERFLUX_IVAR_R',
    'FIBERFLUX_IVAR_Z',
    'CMX_TARGET',
    'SV0_TARGET',
    'SV1_TARGET',
    'SV2_TARGET',
    'SV3_TARGET',
    'DESI_TARGET',
    'BGS_TARGET',
    'MWS_TARGET',
    'SCND_TARGET',
    'HPXPIXEL',
)

#- PRIORITY could change between tiles; SUBPRIORITY won't but keep 'em together
fibermap_mtl_cols = (
    'PRIORITY',
    'SUBPRIORITY',
)

#- Fibermap columns that were added by fiberassign
#- ... same per target, regardless of assignment
fibermap_fiberassign_target_cols = (
    'FA_TARGET', 'FA_TYPE', 'OBJTYPE',
    )
#- ... varies per assignment
fibermap_fiberassign_cols = (
    'PETAL_LOC', 'DEVICE_LOC', 'LOCATION', 'FIBER', 'FIBERSTATUS',
    'FIBERASSIGN_X', 'FIBERASSIGN_Y',
    'LAMBDA_REF', 'PLATE_RA', 'PLATE_DEC',
    )

#- Fibermap columns added frome the platemaker coordinates file
fibermap_coords_cols = (
    'NUM_ITER', 'FIBER_X', 'FIBER_Y', 'DELTA_X', 'DELTA_Y',
    'FIBER_RA', 'FIBER_DEC',
    )

#- Fibermap columns with exposure metadata
fibermap_exp_cols = (
    'NIGHT', 'EXPID', 'MJD', 'TILEID', 'EXPTIME'
    )

#- Fibermap columns added by flux calibration
fibermap_cframe_cols = (
    'PSF_TO_FIBER_SPECFLUX',
    )

#- Columns to include in the per-exposure EXP_FIBERMAP
fibermap_perexp_cols = \
    ('TARGETID',) + \
    fibermap_mtl_cols + \
    fibermap_exp_cols + \
    fibermap_fiberassign_cols + \
    fibermap_coords_cols + fibermap_cframe_cols

[docs]def coadd_fibermap(fibermap, onetile=False): """ Coadds fibermap Args: fibermap (Table or ndarray): fibermap of individual exposures Options: onetile (bool): this is a coadd of a single tile, not across tiles Returns: (coadded_fibermap, exp_fibermap) Tables coadded_fibermap contains the coadded_fibermap for the columns that can be coadded, while exp_fibermap is the subset of columns of the original fibermap that can't be meaningfully coadded because they are per-exposure quantities like FIBER_X. If onetile is True, the coadded_fibermap includes additional columns like MEAN_FIBER_X that are meaningful if coadding a single tile, but not if coadding across tiles. """ log = get_logger() log.debug("'coadding' fibermap") if onetile: ntile = len(np.unique(fibermap['TILEID'])) if ntile != 1: msg = f'input has {ntile} tiles, but onetile=True option' log.error(msg) raise ValueError(msg) #- Get TARGETIDs, preserving order in which they first appear targets, ii = ordered_unique(fibermap["TARGETID"], return_index=True) tfmap = fibermap[ii] assert np.all(targets == tfmap['TARGETID']) ntarget = targets.size #- initialize NUMEXP=-1 to check that they all got filled later tfmap['COADD_NUMEXP'] = np.zeros(len(tfmap), dtype=np.int16) - 1 tfmap['COADD_EXPTIME'] = np.zeros(len(tfmap), dtype=np.float32) - 1 tfmap['COADD_NUMNIGHT'] = np.zeros(len(tfmap), dtype=np.int16) - 1 tfmap['COADD_NUMTILE'] = np.zeros(len(tfmap), dtype=np.int16) - 1 # some cols get combined into mean or rms mean_cols = [ 'DELTA_X', 'DELTA_Y', 'FIBER_RA', 'FIBER_DEC', 'PSF_TO_FIBER_SPECFLUX', ] if onetile: mean_cols.extend(['FIBER_X', 'FIBER_Y']) #- rms_cols and std_cols must also be in mean_cols rms_cols = ['DELTA_X', 'DELTA_Y'] std_cols = ['FIBER_RA', 'FIBER_DEC'] for k in mean_cols: if k in fibermap.colnames : if k.endswith('_RA') or k.endswith('_DEC'): dtype = np.float64 else: dtype = np.float32 if k in mean_cols: xx = Column(np.zeros(ntarget, dtype=dtype)) tfmap.add_column(xx,name='MEAN_'+k) if k in rms_cols: xx = Column(np.zeros(ntarget, dtype=np.float32)) tfmap.add_column(xx,name='RMS_'+k) if k in std_cols: xx = Column(np.zeros(ntarget, dtype=np.float32)) tfmap.add_column(xx,name='STD_'+k) tfmap.remove_column(k) #- TODO: should any of these be retained? # first_last_cols = ['NIGHT','EXPID','TILEID','SPECTROID','FIBER','MJD'] # for k in first_last_cols: # if k in fibermap.colnames : # if k in ['MJD']: # dtype = np.float32 # else: # dtype = np.int32 # if not 'FIRST_'+k in tfmap.dtype.names : # xx = Column(np.arange(ntarget, dtype=dtype)) # tfmap.add_column(xx,name='FIRST_'+k) # if not 'LAST_'+k in tfmap.dtype.names : # xx = Column(np.arange(ntarget, dtype=dtype)) # tfmap.add_column(xx,name='LAST_'+k) # if not 'NUM_'+k in tfmap.dtype.names : # xx = Column(np.arange(ntarget, dtype=np.int16)) # tfmap.add_column(xx,name='NUM_'+k) tfmap.rename_column('FIBERSTATUS', 'COADD_FIBERSTATUS') for i,tid in enumerate(targets) : jj = fibermap["TARGETID"]==tid #- coadded FIBERSTATUS = bitwise AND of input FIBERSTATUS tfmap['COADD_FIBERSTATUS'][i] = np.bitwise_and.reduce(fibermap['FIBERSTATUS'][jj]) #- Only FIBERSTATUS=0 were included in the coadd fiberstatus_nonamp_bits = get_all_nonamp_fiberbitmask_val() fiberstatus_amp_bits = get_justamps_fiberbitmask() targ_fibstatuses = fibermap['FIBERSTATUS'][jj] nonamp_fiberstatus_flagged = ( (targ_fibstatuses & fiberstatus_nonamp_bits) > 0 ) allamps_flagged = ( (targ_fibstatuses & fiberstatus_amp_bits) == fiberstatus_amp_bits ) good_coadds = np.bitwise_not( nonamp_fiberstatus_flagged | allamps_flagged ) tfmap['COADD_NUMEXP'][i] = np.count_nonzero(good_coadds) # Note: NIGHT and TILEID may not be present when coadding previously # coadded spectra. if 'NIGHT' in fibermap.colnames: tfmap['COADD_NUMNIGHT'][i] = len(np.unique(fibermap['NIGHT'][jj][good_coadds])) if 'TILEID' in fibermap.colnames: tfmap['COADD_NUMTILE'][i] = len(np.unique(fibermap['TILEID'][jj][good_coadds])) if 'EXPTIME' in fibermap.colnames : tfmap['COADD_EXPTIME'][i] = np.sum(fibermap['EXPTIME'][jj][good_coadds]) for k in mean_cols: if k in fibermap.colnames : vals=fibermap[k][jj] tfmap['MEAN_'+k][i] = np.mean(vals) for k in rms_cols: if k in fibermap.colnames : vals=fibermap[k][jj] # RMS includes mean offset, not same as STD tfmap['RMS_'+k][i] = np.sqrt(np.mean(vals**2)).astype(np.float32) #- STD of FIBER_RA, FIBER_DEC in arcsec, handling cos(dec) and RA wrap if 'FIBER_RA' in fibermap.colnames: dec = fibermap['TARGET_DEC'][jj][0] vals = fibermap['FIBER_RA'][jj] std = np.std(vals+360.0) * 3600 / np.cos(np.radians(dec)) tfmap['STD_FIBER_RA'][i] = std if 'FIBER_DEC' in fibermap.colnames: vals = fibermap['FIBER_DEC'][jj] tfmap['STD_FIBER_DEC'][i] = np.std(vals) * 3600 #- future proofing possibility of other STD cols for k in std_cols: if k in fibermap.colnames and k not in ('FIBER_RA', 'FIBER_DEC') : vals=fibermap[k][jj] # STD removes mean offset, not same as RMS tfmap['STD_'+k][i] = np.std(vals).astype(np.float32) #- TODO: see above, should any of these be retained? # for k in first_last_cols: # if k in fibermap.colnames : # vals=fibermap[k][jj] # tfmap['FIRST_'+k][i] = np.min(vals) # tfmap['LAST_'+k][i] = np.max(vals) # tfmap['NUM_'+k][i] = np.unique(vals).size for k in ['FIBER_RA_IVAR', 'FIBER_DEC_IVAR', 'DELTA_X_IVAR', 'DELTA_Y_IVAR'] : if k in fibermap.colnames : tfmap[k][i]=np.sum(fibermap[k][jj]) #- Remove some columns that apply to individual exp but not coadds #- (even coadds of the same tile) for k in ['NIGHT', 'EXPID', 'MJD', 'EXPTIME', 'NUM_ITER', 'PSF_TO_FIBER_SPECFLUX']: if k in tfmap.colnames: tfmap.remove_column(k) #- Remove columns that don't apply to coadds across tiles if not onetile: for k in ['TILEID', 'FIBER', 'FIBER_X', 'FIBER_Y', 'PRIORITY', 'PETAL_RA', 'PETAL_DEC', 'LAMBDA_REF', 'FIBERASSIGN_X', 'FIBERASSIGN_Y', 'PETAL_LOC', 'DEVICE_LOC', 'LOCATION' ]: if k in tfmap.colnames: tfmap.remove_column(k) #- keep exposure-specific columns that are present in the input fibermap ii = np.isin(fibermap_perexp_cols, fibermap.dtype.names) keepcols = tuple(np.array(fibermap_perexp_cols)[ii]) if isinstance(fibermap, Table): exp_fibermap = fibermap[keepcols].copy() else: exp_fibermap = Table(fibermap, copy=False)[keepcols].copy() return tfmap, exp_fibermap
[docs]def coadd(spectra, cosmics_nsig=0.0, onetile=False) : """ Coadd spectra for each target and each camera, modifying input spectra obj. Args: spectra: desispec.spectra.Spectra object Options: cosmics_nsig: float, nsigma clipping threshold for cosmics rays onetile: bool, if True, inputs are from a single tile Notes: if `onetile` is True, additional tile-specific columns like LOCATION and FIBER are included the FIBERMAP; otherwise these are only in the EXP_FIBERMAP since for the same target they could be different on different tiles. """ log = get_logger() targets = ordered_unique(spectra.fibermap["TARGETID"]) ntarget=targets.size log.debug("number of targets= {}".format(ntarget)) for b in spectra.bands : log.debug("coadding band '{}'".format(b)) nwave=spectra.wave[b].size tflux=np.zeros((ntarget,nwave),dtype=spectra.flux[b].dtype) tivar=np.zeros((ntarget,nwave),dtype=spectra.ivar[b].dtype) if spectra.mask is not None : tmask=np.zeros((ntarget,nwave),dtype=spectra.mask[b].dtype) else : tmask=None trdata=np.zeros((ntarget,spectra.resolution_data[b].shape[1],nwave),dtype=spectra.resolution_data[b].dtype) fiberstatus_bits = get_all_fiberbitmask_with_amp(b) if 'FIBERSTATUS' in spectra.fibermap.dtype.names: fiberstatus = spectra.fibermap['FIBERSTATUS'] else: fiberstatus = spectra.fibermap['COADD_FIBERSTATUS'] good_fiberstatus = ( (fiberstatus & fiberstatus_bits) == 0 ) for i,tid in enumerate(targets) : jj=np.where( (spectra.fibermap["TARGETID"]==tid) & good_fiberstatus )[0] #- if all spectra were flagged as bad (FIBERSTATUS != 0), contine #- to next target, leaving tflux and tivar=0 for this target if len(jj) == 0: continue if cosmics_nsig is not None and cosmics_nsig > 0 and len(jj)>2 : # interpolate over bad measurements # to be able to compute gradient next # to a bad pixel and identify outlier # many cosmics residuals are on edge # of cosmic ray trace, and so can be # next to a masked flux bin grad=[] gradvar=[] for j in jj : if spectra.mask is not None : ttivar = spectra.ivar[b][j]*(spectra.mask[b][j]==0) else : ttivar = spectra.ivar[b][j] good = (ttivar>0) bad = ~good if np.sum(good)==0 : continue nbad = np.sum(bad) ttflux = spectra.flux[b][j].copy() if nbad>0 : ttflux[bad] = np.interp(spectra.wave[b][bad],spectra.wave[b][good],ttflux[good]) ttivar = spectra.ivar[b][j].copy() if nbad>0 : ttivar[bad] = np.interp(spectra.wave[b][bad],spectra.wave[b][good],ttivar[good]) ttvar = 1./(ttivar+(ttivar==0)) ttflux[1:] = ttflux[1:]-ttflux[:-1] ttvar[1:] = ttvar[1:]+ttvar[:-1] ttflux[0] = 0 grad.append(ttflux) gradvar.append(ttvar) tivar_unmasked= np.sum(spectra.ivar[b][jj],axis=0) if spectra.mask is not None : ivarjj=spectra.ivar[b][jj]*(spectra.mask[b][jj]==0) else : ivarjj=spectra.ivar[b][jj] if cosmics_nsig is not None and cosmics_nsig > 0 and len(jj)>2 : grad=np.array(grad) gradvar=np.array(gradvar) gradivar=(gradvar>0)/np.array(gradvar+(gradvar==0)) nspec=grad.shape[0] sgradivar=np.sum(gradivar) if sgradivar>0 : meangrad=np.sum(gradivar*grad,axis=0)/sgradivar deltagrad=grad-meangrad chi2=np.sum(gradivar*deltagrad**2,axis=0)/(nspec-1) bad = (chi2>cosmics_nsig**2) nbad = np.sum(bad) if nbad>0 : log.info("masking {} values for targetid={}".format(nbad,tid)) badindex=np.where(bad)[0] for bi in badindex : k=np.argmax(gradivar[:,bi]*deltagrad[:,bi]**2) ivarjj[k,bi]=0. log.debug("masking spec {} wave={}".format(k,spectra.wave[b][bi])) tivar[i]=np.sum(ivarjj,axis=0) tflux[i]=np.sum(ivarjj*spectra.flux[b][jj],axis=0) for r in range(spectra.resolution_data[b].shape[1]) : trdata[i,r]=np.sum((spectra.ivar[b][jj]*spectra.resolution_data[b][jj,r]),axis=0) # not sure applying mask is wise here bad=(tivar[i]==0) if np.sum(bad)>0 : tivar[i][bad] = np.sum(spectra.ivar[b][jj][:,bad],axis=0) # if all masked, keep original ivar tflux[i][bad] = np.sum(spectra.ivar[b][jj][:,bad]*spectra.flux[b][jj][:,bad],axis=0) ok=(tivar[i]>0) if np.sum(ok)>0 : tflux[i][ok] /= tivar[i][ok] ok=(tivar_unmasked>0) if np.sum(ok)>0 : trdata[i][:,ok] /= tivar_unmasked[ok] if spectra.mask is not None : tmask[i] = np.bitwise_and.reduce(spectra.mask[b][jj],axis=0) spectra.flux[b] = tflux spectra.ivar[b] = tivar if spectra.mask is not None : spectra.mask[b] = tmask spectra.resolution_data[b] = trdata if spectra.scores is not None: orig_scores = Table(spectra.scores.copy()) orig_scores['TARGETID'] = spectra.fibermap['TARGETID'] else: orig_scores = None spectra.fibermap, exp_fibermap = coadd_fibermap(spectra.fibermap, onetile=onetile) spectra.exp_fibermap = exp_fibermap spectra.scores=None compute_coadd_scores(spectra, orig_scores, update_coadd=True)
[docs]def coadd_cameras(spectra, cosmics_nsig=0., onetile=False) : """ Return coadd across both exposures and cameras Args: spectra: desispec.spectra.Spectra object Options: cosmics_nsig: float, nsigma clipping threshold for cosmics rays onetile: bool, if True, inputs are from a single tile If `onetile` is True, additional tile-specific columns like LOCATION and FIBER are included the FIBERMAP; otherwise these are only in the EXP_FIBERMAP since for the same target they could be different on different tiles. Note: unlike `coadd`, this does not modify the input spectra object """ #check_alignement_of_camera_wavelength(spectra) log = get_logger() # ordering mwave=[np.mean(spectra.wave[b]) for b in spectra.bands] sbands=np.array(spectra.bands)[np.argsort(mwave)] # bands sorted by inc. wavelength log.debug("wavelength sorted cameras= {}".format(sbands)) # create wavelength array wave=None tolerance=0.0001 #A , tolerance for b in sbands : if wave is None : wave=spectra.wave[b] else : wave=np.append(wave,spectra.wave[b][spectra.wave[b]>wave[-1]+tolerance]) nwave=wave.size # check alignment, caching band wavelength grid indices as we go windict = {} number_of_overlapping_cameras=np.zeros(nwave) for b in spectra.bands : imin = np.argmin(np.abs(spectra.wave[b][0] - wave)) windices = np.arange(imin, imin+len(spectra.wave[b]), dtype=int) dwave = spectra.wave[b] - wave[windices] if np.any(np.abs(dwave) > tolerance): msg = "Input wavelength grids (band '{}') are not aligned. Use --lin-step or --log10-step to resample to a common grid.".format(b) log.error(msg) raise ValueError(msg) number_of_overlapping_cameras[windices] += 1 windict[b] = windices # targets targets = ordered_unique(spectra.fibermap["TARGETID"]) ntarget=targets.size log.debug("number of targets= {}".format(ntarget)) # ndiag = max of all cameras ndiag=0 for b in sbands : ndiag=max(ndiag,spectra.resolution_data[b].shape[1]) log.debug("ndiag= {}".format(ndiag)) b = sbands[0] flux=np.zeros((ntarget,nwave),dtype=spectra.flux[b].dtype) ivar=np.zeros((ntarget,nwave),dtype=spectra.ivar[b].dtype) if spectra.mask is not None : ivar_unmasked=np.zeros((ntarget,nwave),dtype=spectra.ivar[b].dtype) mask=np.zeros((ntarget,nwave),dtype=spectra.mask[b].dtype) else : ivar_unmasked=ivar mask=None rdata=np.zeros((ntarget,ndiag,nwave),dtype=spectra.resolution_data[b].dtype) for b in spectra.bands : log.debug("coadding band '{}'".format(b)) # indices windices = windict[b] band_ndiag = spectra.resolution_data[b].shape[1] fiberstatus_bits = get_all_fiberbitmask_with_amp(b) if 'FIBERSTATUS' in spectra.fibermap.dtype.names: fiberstatus = spectra.fibermap['FIBERSTATUS'] else: fiberstatus = spectra.fibermap['COADD_FIBERSTATUS'] good_fiberstatus = ( (fiberstatus & fiberstatus_bits) == 0 ) for i,tid in enumerate(targets) : jj=np.where( (spectra.fibermap["TARGETID"]==tid) & good_fiberstatus )[0] #- if all spectra were flagged as bad (FIBERSTATUS != 0), contine #- to next target, leaving tflux and tivar=0 for this target if len(jj) == 0: continue if cosmics_nsig is not None and cosmics_nsig > 0 and len(jj)>2 : # interpolate over bad measurements # to be able to compute gradient next # to a bad pixel and identify oulier # many cosmics residuals are on edge # of cosmic ray trace, and so can be # next to a masked flux bin grad=[] gradvar=[] for j in jj : if spectra.mask is not None : ttivar = spectra.ivar[b][j]*(spectra.mask[b][j]==0) else : ttivar = spectra.ivar[b][j] good = (ttivar>0) bad = ~good ttflux = spectra.flux[b][j].copy() ttflux[bad] = np.interp(spectra.wave[b][bad],spectra.wave[b][good],ttflux[good]) ttivar = spectra.ivar[b][j].copy() ttivar[bad] = np.interp(spectra.wave[b][bad],spectra.wave[b][good],ttivar[good]) ttvar = 1./(ttivar+(ttivar==0)) ttflux[1:] = ttflux[1:]-ttflux[:-1] ttvar[1:] = ttvar[1:]+ttvar[:-1] ttflux[0] = 0 grad.append(ttflux) gradvar.append(ttvar) ivar_unmasked[i,windices] += np.sum(spectra.ivar[b][jj],axis=0) if spectra.mask is not None : ivarjj=spectra.ivar[b][jj]*(spectra.mask[b][jj]==0) else : ivarjj=spectra.ivar[b][jj] if cosmics_nsig is not None and cosmics_nsig > 0 and len(jj)>2 : grad=np.array(grad) gradivar=1/np.array(gradvar) nspec=grad.shape[0] meangrad=np.sum(gradivar*grad,axis=0)/np.sum(gradivar) deltagrad=grad-meangrad chi2=np.sum(gradivar*deltagrad**2,axis=0)/(nspec-1) bad = (chi2>cosmics_nsig**2) nbad = np.sum(bad) if nbad>0 : log.info("masking {} values for targetid={}".format(nbad,tid)) badindex=np.where(bad)[0] for bi in badindex : k=np.argmax(gradivar[:,bi]*deltagrad[:,bi]**2) ivarjj[k,bi]=0. log.debug("masking spec {} wave={}".format(k,spectra.wave[b][bi])) ivar[i,windices] += np.sum(ivarjj,axis=0) flux[i,windices] += np.sum(ivarjj*spectra.flux[b][jj],axis=0) for r in range(band_ndiag) : rdata[i,r+(ndiag-band_ndiag)//2,windices] += np.sum((spectra.ivar[b][jj]*spectra.resolution_data[b][jj,r]),axis=0) if spectra.mask is not None : # this deserves some attention ... tmpmask=np.bitwise_and.reduce(spectra.mask[b][jj],axis=0) # directly copy mask where no overlap jj=(number_of_overlapping_cameras[windices]==1) mask[i,windices[jj]] = tmpmask[jj] # 'and' in overlapping regions jj=(number_of_overlapping_cameras[windices]>1) mask[i,windices[jj]] = mask[i,windices[jj]] & tmpmask[jj] for i,tid in enumerate(targets) : ok=(ivar[i]>0) if np.sum(ok)>0 : flux[i][ok] /= ivar[i][ok] ok=(ivar_unmasked[i]>0) if np.sum(ok)>0 : rdata[i][:,ok] /= ivar_unmasked[i][ok] if 'COADD_NUMEXP' in spectra.fibermap.colnames: fibermap = spectra.fibermap exp_fibermap = spectra.exp_fibermap else: fibermap, exp_fibermap = coadd_fibermap(spectra.fibermap, onetile=onetile) bands="" for b in sbands : bands+=b if spectra.mask is not None : dmask={bands:mask,} else : dmask=None res = Spectra( bands=[bands,], wave={bands:wave,}, flux={bands:flux,}, ivar={bands:ivar,}, mask=dmask, resolution_data={bands:rdata,}, fibermap=fibermap, exp_fibermap=exp_fibermap, meta=spectra.meta, extra=spectra.extra, scores=None) if spectra.scores is not None: orig_scores = spectra.scores.copy() orig_scores['TARGETID'] = spectra.fibermap['TARGETID'] else: orig_scores = None compute_coadd_scores(res, orig_scores, update_coadd=True) return res
[docs]def get_resampling_matrix(global_grid,local_grid,sparse=False): """Build the rectangular matrix that linearly resamples from the global grid to a local grid. The local grid range must be contained within the global grid range. Args: global_grid(numpy.ndarray): Sorted array of n global grid wavelengths. local_grid(numpy.ndarray): Sorted array of m local grid wavelengths. Returns: numpy.ndarray: Array of (m,n) matrix elements that perform the linear resampling. """ assert np.all(np.diff(global_grid) > 0),'Global grid is not strictly increasing.' assert np.all(np.diff(local_grid) > 0),'Local grid is not strictly increasing.' # Locate each local wavelength in the global grid. global_index = np.searchsorted(global_grid,local_grid) assert local_grid[0] >= global_grid[0],'Local grid extends below global grid.' assert local_grid[-1] <= global_grid[-1],'Local grid extends above global grid.' # Lookup the global-grid bracketing interval (xlo,xhi) for each local grid point. # Note that this gives xlo = global_grid[-1] if local_grid[0] == global_grid[0] # but this is fine since the coefficient of xlo will be zero. global_xhi = global_grid[global_index] global_xlo = global_grid[global_index-1] # Create the rectangular interpolation matrix to return. alpha = (local_grid - global_xlo)/(global_xhi - global_xlo) local_index = np.arange(len(local_grid),dtype=int) matrix = np.zeros((len(local_grid),len(global_grid))) matrix[local_index,global_index] = alpha matrix[local_index,global_index-1] = 1-alpha # turn into a sparse matrix return scipy.sparse.csc_matrix(matrix)
[docs]def decorrelate_divide_and_conquer(Cinv,Cinvf,wavebin,flux,ivar,rdata) : """Decorrelate an inverse covariance using the matrix square root. Implements the decorrelation part of the spectroperfectionism algorithm described in Bolton & Schlegel 2009 (BS) http://arxiv.org/abs/0911.2689. with the divide and conquer approach, i.e. per diagonal block of the matrix, with an overlapping 'skin' from one block to another. Args: Cinv: Square 2D array: input inverse covariance matrix Cinvf: 1D array: input wavebin: minimal size of wavelength bin in A, used to define the core and skin size flux: 1D array: output flux (has to be allocated) ivar: 1D array: output flux inverse variance (has to be allocated) rdata: 2D array: output resolution matrix per diagonal (has to be allocated) """ chw=max(10,int(50/wavebin)) #core is 2*50+1 A skin=max(2,int(10/wavebin)) #skin is 10A nn=Cinv.shape[0] nstep=nn//(2*chw+1)+1 Lmin=1e-15/np.mean(np.diag(Cinv)) # Lmin is scaled with Cinv values ndiag=rdata.shape[0] dd=np.arange(ndiag,dtype=int)-ndiag//2 for c in range(chw,nn+(2*chw+1),(2*chw+1)) : b=max(0,c-chw-skin) e=min(nn,c+chw+skin+1) b1=max(0,c-chw) e1=min(nn,c+chw+1) bb=max(0,b1-b) ee=min(e-b,e1-b) if e<=b : continue L,X = scipy.linalg.eigh(Cinv[b:e,b:e],overwrite_a=False,turbo=True) nbad = np.count_nonzero(L < Lmin) if nbad > 0: #log.warning('zeroing {0:d} negative eigenvalue(s).'.format(nbad)) L[L < Lmin] = Lmin Q = X.dot(np.diag(np.sqrt(L)).dot(X.T)) s = np.sum(Q,axis=1) b1x=max(0,c-chw-3) e1x=min(nn,c+chw+1+3) tR = (Q/s[:,np.newaxis]) tR_it = scipy.linalg.inv(tR.T) tivar = s**2 flux[b1:e1] = (tR_it.dot(Cinvf[b:e])/tivar)[bb:ee] ivar[b1:e1] = (s[bb:ee])**2 for j in range(b1,e1) : k=(dd>=-j)&(dd<nn-j) # k is the diagonal index # j is the wavelength index # it could be the transposed, I am following what it is specter.ex2d, L209 rdata[k,j] = tR[j-b+dd[k],j-b]
def spectroperf_resample_spectrum_singleproc(spectra,target_index,wave,wavebin,resampling_matrix,ndiag,flux,ivar,rdata) : cinv = None for b in spectra.bands : twave=spectra.wave[b] jj=(twave>=wave[0])&(twave<=wave[-1]) twave=twave[jj] tivar=spectra.ivar[b][target_index][jj] diag_ivar = scipy.sparse.dia_matrix((tivar,[0]),(twave.size,twave.size)) RR = Resolution(spectra.resolution_data[b][target_index][:,jj]).dot(resampling_matrix[b]) tcinv = RR.T.dot(diag_ivar.dot(RR)) tcinvf = RR.T.dot(tivar*spectra.flux[b][target_index][jj]) if cinv is None : cinv = tcinv cinvf = tcinvf else : cinv += tcinv cinvf += tcinvf cinv = cinv.todense() decorrelate_divide_and_conquer(cinv,cinvf,wavebin,flux[target_index],ivar[target_index],rdata[target_index]) # for multiprocessing, with shared memory buffers def spectroperf_resample_spectrum_multiproc(shm_in_wave,shm_in_flux,shm_in_ivar,shm_in_rdata,in_nwave,in_ndiag,in_bands,target_indices,wave,wavebin,resampling_matrix,ndiag,ntarget,shm_flux,shm_ivar,shm_rdata) : nwave = wave.size # manipulate shared memory as np arrays # input shared memory in_wave = list() in_flux = list() in_ivar = list() in_rdata = list() nbands = len(shm_in_wave) for b in range(nbands) : in_wave.append( np.array(shm_in_wave[b],copy=False).reshape(in_nwave[b]) ) in_flux.append( np.array(shm_in_flux[b],copy=False).reshape((ntarget,in_nwave[b])) ) in_ivar.append( np.array(shm_in_ivar[b],copy=False).reshape((ntarget,in_nwave[b])) ) in_rdata.append( np.array(shm_in_rdata[b],copy=False).reshape((ntarget,in_ndiag[b],in_nwave[b])) ) # output shared memory flux = np.array(shm_flux,copy=False).reshape(ntarget,nwave) ivar = np.array(shm_ivar,copy=False).reshape(ntarget,nwave) rdata = np.array(shm_rdata,copy=False).reshape(ntarget,ndiag,nwave) for target_index in target_indices : cinv = None for b in range(nbands) : twave=in_wave[b] jj=(twave>=wave[0])&(twave<=wave[-1]) twave=twave[jj] tivar=in_ivar[b][target_index][jj] diag_ivar = scipy.sparse.dia_matrix((tivar,[0]),(twave.size,twave.size)) RR = Resolution(in_rdata[b][target_index][:,jj]).dot(resampling_matrix[in_bands[b]]) tcinv = RR.T.dot(diag_ivar.dot(RR)) tcinvf = RR.T.dot(tivar*in_flux[b][target_index][jj]) if cinv is None : cinv = tcinv cinvf = tcinvf else : cinv += tcinv cinvf += tcinvf cinv = cinv.todense() decorrelate_divide_and_conquer(cinv,cinvf,wavebin,flux[target_index],ivar[target_index],rdata[target_index])
[docs]def spectroperf_resample_spectra(spectra, wave, nproc=1) : """ Resampling of spectra file using the spectrophotometic approach Args: spectra: desispec.spectra.Spectra object wave: 1D numy array with new wavelenght grid Returns: desispec.spectra.Spectra object """ log = get_logger() log.debug("resampling to wave grid of size {}: {}".format(wave.size,wave)) b=spectra.bands[0] ntarget=spectra.flux[b].shape[0] nwave=wave.size if spectra.mask is not None : mask = np.zeros((ntarget,nwave),dtype=spectra.mask[b].dtype) else : mask = None # number of diagonals is the max of the number of diagonals in the # input spectra cameras ndiag = 0 for b in spectra.bands : ndiag = max(ndiag,spectra.resolution_data[b].shape[1]) dw=np.gradient(wave) wavebin=np.min(dw[dw>0.]) # min wavelength bin size log.debug("min wavelength bin= {:2.1f} A; ndiag= {:d}".format(wavebin,ndiag)) log.debug("compute resampling matrices") resampling_matrix=dict() for b in spectra.bands : twave=spectra.wave[b] jj=np.where((twave>=wave[0])&(twave<=wave[-1]))[0] twave=spectra.wave[b][jj] resampling_matrix[b] = get_resampling_matrix(wave,twave) if nproc==1 : # allocate array flux = np.zeros((ntarget,nwave),dtype=float) ivar = np.zeros((ntarget,nwave),dtype=float) rdata = np.zeros((ntarget,ndiag,nwave),dtype=float) # simply loop on targets for target_index in range(ntarget) : log.debug("resampling {}/{}".format(target_index+1,ntarget)) t0=time.time() spectroperf_resample_spectrum_singleproc(spectra,target_index,wave,wavebin,resampling_matrix,ndiag,flux,ivar,rdata) t1=time.time() log.debug("done one spectrum in {} sec".format(t1-t0)) else : log.debug("allocate shared memory") # input shm_in_wave = list() shm_in_flux = list() shm_in_ivar = list() shm_in_rdata = list() in_nwave = list() in_ndiag = list() for b in spectra.bands : shm_in_wave.append( multiprocessing.Array('d',spectra.wave[b],lock=False) ) shm_in_flux.append( multiprocessing.Array('d',spectra.flux[b].ravel(),lock=False) ) shm_in_ivar.append( multiprocessing.Array('d',spectra.ivar[b].ravel(),lock=False) ) shm_in_rdata.append( multiprocessing.Array('d',spectra.resolution_data[b].ravel(),lock=False) ) in_nwave.append(spectra.wave[b].size) in_ndiag.append(spectra.resolution_data[b].shape[1]) # output shm_flux=multiprocessing.Array('d',ntarget*nwave,lock=False) shm_ivar=multiprocessing.Array('d',ntarget*nwave,lock=False) shm_rdata=multiprocessing.Array('d',ntarget*ndiag*nwave,lock=False) # manipulate shared memory as np arrays flux = np.array(shm_flux,copy=False).reshape(ntarget,nwave) ivar = np.array(shm_ivar,copy=False).reshape(ntarget,nwave) rdata = np.array(shm_rdata,copy=False).reshape(ntarget,ndiag,nwave) # split targets per process target_indices = np.array_split(np.arange(ntarget),nproc) # loop on processes procs=list() for proc_index in range(nproc) : log.debug("starting process #{}".format(proc_index+1)) proc = multiprocessing.Process(target=spectroperf_resample_spectrum_multiproc, args=(shm_in_wave,shm_in_flux,shm_in_ivar,shm_in_rdata, in_nwave,in_ndiag,spectra.bands, target_indices[proc_index],wave,wavebin, resampling_matrix,ndiag,ntarget, shm_flux,shm_ivar,shm_rdata)) proc.start() procs.append(proc) # wait for the processes to finish log.info("waiting for the {} processes to finish ...".format(nproc)) for proc in procs : proc.join() log.info("all done!") bands="" for b in spectra.bands : bands += b if spectra.mask is not None : dmask={bands:mask,} else : dmask=None res=Spectra(bands=[bands,],wave={bands:wave,},flux={bands:flux,},ivar={bands:ivar,},mask=dmask,resolution_data={bands:rdata,}, fibermap=spectra.fibermap,meta=spectra.meta,extra=spectra.extra,scores=spectra.scores) return res
[docs]def fast_resample_spectra(spectra, wave) : """ Fast resampling of spectra file. The output resolution = Id. The neighboring flux bins are correlated. Args: spectra: desispec.spectra.Spectra object wave: 1D numy array with new wavelenght grid Returns: desispec.spectra.Spectra object, resolution data=Id """ log = get_logger() log.debug("Resampling to wave grid: {}".format(wave)) nwave=wave.size b=spectra.bands[0] ntarget=spectra.flux[b].shape[0] nres=spectra.resolution_data[b].shape[1] ivar=np.zeros((ntarget,nwave),dtype=spectra.flux[b].dtype) flux=np.zeros((ntarget,nwave),dtype=spectra.ivar[b].dtype) if spectra.mask is not None : mask = np.zeros((ntarget,nwave),dtype=spectra.mask[b].dtype) else : mask = None rdata=np.ones((ntarget,1,nwave),dtype=spectra.resolution_data[b].dtype) # pointless for this resampling bands="" for b in spectra.bands : if spectra.mask is not None : tivar=spectra.ivar[b]*(spectra.mask[b]==0) else : tivar=spectra.ivar[b] for i in range(ntarget) : ivar[i] += resample_flux(wave,spectra.wave[b],tivar[i]) flux[i] += resample_flux(wave,spectra.wave[b],tivar[i]*spectra.flux[b][i]) bands += b for i in range(ntarget) : ok=(ivar[i]>0) flux[i,ok]/=ivar[i,ok] if spectra.mask is not None : dmask={bands:mask,} else : dmask=None res=Spectra(bands=[bands,],wave={bands:wave,},flux={bands:flux,},ivar={bands:ivar,},mask=dmask,resolution_data={bands:rdata,}, fibermap=spectra.fibermap,meta=spectra.meta,extra=spectra.extra,scores=spectra.scores) return res
[docs]def resample_spectra_lin_or_log(spectra, linear_step=0, log10_step=0, fast=False, wave_min=None, wave_max=None, nproc=1) : """ Resampling of spectra file. Args: spectra: desispec.spectra.Spectra object linear_step: if not null the ouput wavelenght grid will be linear with this step log10_step: if not null the ouput wavelenght grid will be logarthmic with this step Options: fast: simple resampling. fast but at the price of correlated output flux bins and no information on resolution wave_min: if set, use this min wavelength wave_max: if set, use this max wavelength Returns: desispec.spectra.Spectra object """ wmin=None wmax=None for b in spectra.bands : if wmin is None : wmin=spectra.wave[b][0] wmax=spectra.wave[b][-1] else : wmin=min(wmin,spectra.wave[b][0]) wmax=max(wmax,spectra.wave[b][-1]) if wave_min is not None : wmin = wave_min if wave_max is not None : wmax = wave_max if linear_step>0 : nsteps=int((wmax-wmin)/linear_step) + 1 wave=wmin+np.arange(nsteps)*linear_step elif log10_step>0 : lwmin=np.log10(wmin) lwmax=np.log10(wmax) nsteps=int((lwmax-lwmin)/log10_step) + 1 wave=10**(lwmin+np.arange(nsteps)*log10_step) if fast : return fast_resample_spectra(spectra=spectra,wave=wave) else : return spectroperf_resample_spectra(spectra=spectra,wave=wave,nproc=nproc)