"""
desispec.coaddition
===================
Coadd spectra.
"""
from __future__ import absolute_import, division, print_function
import os, sys, time
import numpy as np
import scipy.sparse
import scipy.linalg
import scipy.sparse.linalg
import scipy.stats
from astropy.table import Table, Column
import multiprocessing
from desiutil.log import get_logger
from desispec.interpolation import resample_flux
from desispec.spectra import Spectra
from desispec.resolution import Resolution
from desispec.fiberbitmasking import get_all_fiberbitmask_with_amp, get_all_nonamp_fiberbitmask_val, get_justamps_fiberbitmask
from desispec.maskbits import fibermask as fmsk
from desispec.specscore import compute_coadd_scores
from desispec.util import ordered_unique
#- Fibermap columns that come from targeting or MTL files
fibermap_target_cols = (
'TARGETID',
'TARGET_RA',
'TARGET_DEC',
'PMRA',
'PMDEC',
'REF_EPOCH',
'LAMBDA_REF',
'FA_TARGET',
'FA_TYPE',
'OBJTYPE',
'NUMTARGET',
'OBSCONDITIONS',
'MORPHTYPE',
'FLUX_G',
'FLUX_R',
'FLUX_Z',
'FLUX_IVAR_G',
'FLUX_IVAR_R',
'FLUX_IVAR_Z',
'REF_ID',
'REF_CAT',
'GAIA_PHOT_G_MEAN_MAG',
'GAIA_PHOT_BP_MEAN_MAG',
'GAIA_PHOT_RP_MEAN_MAG',
'PARALLAX',
'EBV',
'FLUX_W1',
'FLUX_W2',
'FIBERFLUX_G',
'FIBERFLUX_R',
'FIBERFLUX_Z',
'FIBERTOTFLUX_G',
'FIBERTOTFLUX_R',
'FIBERTOTFLUX_Z',
'MASKBITS',
'SERSIC',
'SHAPE_R', 'SHAPE_E1', 'SHAPE_E2',
'PHOTSYS',
'PRIORITY_INIT',
'NUMOBS_INIT',
'RELEASE',
'BRICKID',
'BRICKNAME', 'BRICK_OBJID',
'BLOBDIST',
'FIBERFLUX_IVAR_G',
'FIBERFLUX_IVAR_R',
'FIBERFLUX_IVAR_Z',
'CMX_TARGET',
'SV0_TARGET',
'SV1_TARGET',
'SV2_TARGET',
'SV3_TARGET',
'DESI_TARGET',
'BGS_TARGET',
'MWS_TARGET',
'SCND_TARGET',
'HPXPIXEL',
)
#- PRIORITY could change between tiles; SUBPRIORITY won't but keep 'em together
fibermap_mtl_cols = (
'PRIORITY',
'SUBPRIORITY',
)
#- Fibermap columns that were added by fiberassign
#- ... same per target, regardless of assignment
fibermap_fiberassign_target_cols = (
'FA_TARGET', 'FA_TYPE', 'OBJTYPE',
)
#- ... varies per assignment
fibermap_fiberassign_cols = (
'PETAL_LOC', 'DEVICE_LOC', 'LOCATION', 'FIBER', 'FIBERSTATUS',
'FIBERASSIGN_X', 'FIBERASSIGN_Y',
'LAMBDA_REF', 'PLATE_RA', 'PLATE_DEC',
)
#- Fibermap columns added frome the platemaker coordinates file
fibermap_coords_cols = (
'NUM_ITER', 'FIBER_X', 'FIBER_Y', 'DELTA_X', 'DELTA_Y',
'FIBER_RA', 'FIBER_DEC',
)
#- Fibermap columns with exposure metadata
fibermap_exp_cols = (
'NIGHT', 'EXPID', 'MJD', 'TILEID', 'EXPTIME'
)
#- Fibermap columns added by flux calibration
fibermap_cframe_cols = (
'PSF_TO_FIBER_SPECFLUX',
'FLAT_TO_PSF_FLUX',
'HELIOCOR_OFFSET',
)
#- Columns to include in the per-exposure EXP_FIBERMAP
fibermap_perexp_cols = \
('TARGETID',) + \
fibermap_mtl_cols + \
fibermap_exp_cols + \
fibermap_fiberassign_cols + \
fibermap_coords_cols + fibermap_cframe_cols + \
('IN_COADD_B', 'IN_COADD_R', 'IN_COADD_Z', 'COADD_NORM', 'SIGMA_COADD_NORM')
[docs]
def calc_mean_std_ra_dec(ras, decs):
"""
Calculate mean/std of ras, decs accounting for RA wraparound and cos(dec)
Args:
ras (array): input RA values in degrees
decs (array): input declination values in degrees
Returns: mean_ra, std_ra, mean_dec, std_dec
where the means are in degrees and the standard deviations are in arcsec,
including cos(dec) correction.
For efficiency, this does not try to handle dec= +/-90 poles correctly,
nor arbitrarily large spreads of angles. i.e. this is ok for a mean
of fiber positions scattered about a single target, but not for e.g.
a science analysis of the central location of a cluster of galaxies.
"""
ras = np.asarray(ras)
decs = np.asarray(decs)
if np.max(ras) - np.min(ras) > 180:
offset = 180.0
ras = (ras + offset) % 360
else:
offset = 0.0
mean_dec = np.mean(decs)
std_dec = np.std(decs) * 3600
mean_ra = (np.mean(ras) - offset + 360) % 360
std_ra = np.std(ras) * np.cos(np.radians(mean_dec)) * 3600
return mean_ra, std_ra, mean_dec, std_dec
[docs]
def use_for_coadd(fiberstatus, band):
"""
Determine which exposures should be used for a per-camera coadd
Args:
fiberstatus (array): FIBERSTATUS bitmasks, one per exposure
band (str): camera band, 'b', 'r', or 'z'
Returns: boolean array of whether exposure should be used in coadd or not
This is factored into a separate function because it is used in
`coadd`, `coadd_cameras`, and `coadd_fibermap`
"""
if band not in ('b', 'r', 'z'):
raise ValueError(f'band={band} should be b, r, or z')
fiberstatus_bits = get_all_fiberbitmask_with_amp(band)
good_fiberstatus = ( (fiberstatus & fiberstatus_bits) == 0 )
return good_fiberstatus
[docs]
def coadd_fibermap(fibermap, onetile=False):
"""
Coadds fibermap
Args:
fibermap (Table or ndarray): fibermap of individual exposures
Options:
onetile (bool): this is a coadd of a single tile, not across tiles
Returns: (coadded_fibermap, exp_fibermap) Tables
coadded_fibermap contains the coadded_fibermap for the columns that can
be coadded, while exp_fibermap is the subset of columns of the original
fibermap that can't be meaningfully coadded because they are per-exposure
quantities like FIBER_X.
If onetile is True, the coadded_fibermap includes additional columns like
MEAN_FIBER_X that are meaningful if coadding a single tile, but not if
coadding across tiles.
"""
log = get_logger()
log.debug("'coadding' fibermap")
if onetile:
ntile = len(np.unique(fibermap['TILEID']))
if ntile != 1:
msg = f'input has {ntile} tiles, but onetile=True option'
log.error(msg)
raise ValueError(msg)
#- make a copy of input fibermap that we can modify with new columns.
#- This will become the per-exposure fibermap for the EXP_FIBERMAP HDU
exp_fibermap = Table(fibermap, copy=True)
#- Remove the "fibermap" input from the current namespace so that we don't accidentally use it
#- NOTE: does not actually delete/modify the original input
del fibermap
#- Get TARGETIDs, preserving order in which they first appear
#- tfmap = "Target Fiber Map", i.e. one row per target instead of one row per exposure
targets, ii = ordered_unique(exp_fibermap["TARGETID"], return_index=True)
tfmap = exp_fibermap[ii]
assert np.all(targets == tfmap['TARGETID'])
ntarget = targets.size
#- New columns to fill in for whether exposure was used in coadd
exp_fibermap['IN_COADD_B'] = np.zeros(len(exp_fibermap), dtype=bool)
exp_fibermap['IN_COADD_R'] = np.zeros(len(exp_fibermap), dtype=bool)
exp_fibermap['IN_COADD_Z'] = np.zeros(len(exp_fibermap), dtype=bool)
#- initialize NUMEXP=-1 to check that they all got filled later
tfmap['COADD_NUMEXP'] = np.zeros(len(tfmap), dtype=np.int16) - 1
tfmap['COADD_EXPTIME'] = np.zeros(len(tfmap), dtype=np.float32) - 1
tfmap['COADD_NUMNIGHT'] = np.zeros(len(tfmap), dtype=np.int16) - 1
tfmap['COADD_NUMTILE'] = np.zeros(len(tfmap), dtype=np.int16) - 1
# some cols get combined into mean or rms;
# MJD handled separately to get min/max/mean
mean_cols = [
'DELTA_X', 'DELTA_Y',
'PSF_TO_FIBER_SPECFLUX',
]
# Note: treat the fiber coordinates separately because of missing coordinate problem
# that require an additional "good_coords" condition relative to other mean cols
if onetile:
mean_cols.extend(['FIBER_X', 'FIBER_Y'])
#- rms_cols and std_cols must also be in mean_cols
rms_cols = ['DELTA_X', 'DELTA_Y']
std_cols = [] # currently none; RA/dec handled separately
#- Add other MEAN/RMS/STD columns
for k in mean_cols:
if k in exp_fibermap.colnames :
dtype = np.float32
if k in mean_cols:
xx = Column(np.zeros(ntarget, dtype=dtype))
tfmap.add_column(xx,name='MEAN_'+k)
if k in rms_cols:
xx = Column(np.zeros(ntarget, dtype=np.float32))
tfmap.add_column(xx,name='RMS_'+k)
if k in std_cols:
xx = Column(np.zeros(ntarget, dtype=np.float32))
tfmap.add_column(xx,name='STD_'+k)
tfmap.remove_column(k)
#- FIBER_RA/DEC handled differently due to RA wraparound and cos(dec)
if 'FIBER_RA' in exp_fibermap.colnames and 'FIBER_DEC' in exp_fibermap.colnames:
tfmap.add_column(Column(np.zeros(ntarget, dtype=np.float64)), name='MEAN_FIBER_RA')
tfmap.add_column(Column(np.zeros(ntarget, dtype=np.float32)), name='STD_FIBER_RA')
tfmap.add_column(Column(np.zeros(ntarget, dtype=np.float64)), name='MEAN_FIBER_DEC')
tfmap.add_column(Column(np.zeros(ntarget, dtype=np.float32)), name='STD_FIBER_DEC')
tfmap.remove_column('FIBER_RA')
tfmap.remove_column('FIBER_DEC')
#- MIN_, MAX_, MEAN_MJD over exposures used in coadd
if 'MJD' in exp_fibermap.colnames :
dtype = np.float64
if not 'MIN_MJD' in tfmap.dtype.names :
xx = Column(np.zeros(ntarget, dtype=dtype))
tfmap.add_column(xx,name='MIN_MJD')
if not 'MAX_MJD' in tfmap.dtype.names :
xx = Column(np.zeros(ntarget, dtype=dtype))
tfmap.add_column(xx,name='MAX_MJD')
if not 'MEAN_MJD' in tfmap.dtype.names :
xx = Column(np.zeros(ntarget, dtype=dtype))
tfmap.add_column(xx,name='MEAN_MJD')
if 'FIBERSTATUS' in tfmap.dtype.names :
tfmap.rename_column('FIBERSTATUS', 'COADD_FIBERSTATUS')
if not 'COADD_FIBERSTATUS' in tfmap.dtype.names :
raise KeyError("no COADD_FIBERSTATUS column in tfmap")
if 'FIBERSTATUS' in exp_fibermap.dtype.names :
fiberstatus_key='FIBERSTATUS'
elif 'COADD_FIBERSTATUS' in exp_fibermap.dtype.names :
fiberstatus_key='COADD_FIBERSTATUS'
else :
raise KeyError("no FIBERSTATUS nor COADD_FIBERSTATUS column in fibermap")
for i,tid in enumerate(targets) :
jj = exp_fibermap["TARGETID"]==tid
#- Only a subset of "good" FIBERSTATUS flags are included in the coadd
targ_fibstatuses = exp_fibermap[fiberstatus_key][jj]
in_coadd_b = use_for_coadd(targ_fibstatuses, 'b')
in_coadd_r = use_for_coadd(targ_fibstatuses, 'r')
in_coadd_z = use_for_coadd(targ_fibstatuses, 'z')
good_coadds = (in_coadd_b | in_coadd_r | in_coadd_z)
coadd_numexp = np.count_nonzero(good_coadds)
tfmap['COADD_NUMEXP'][i] = coadd_numexp
exp_fibermap['IN_COADD_B'][jj] = in_coadd_b
exp_fibermap['IN_COADD_R'][jj] = in_coadd_r
exp_fibermap['IN_COADD_Z'][jj] = in_coadd_z
# Check if there are some good coadds to compute aggregate quantities;
# Otherwise just use all the (bad) exposures; will still count NUM on good_coadds
if coadd_numexp>0:
compute_coadds = good_coadds
# coadded FIBERSTATUS = bitwise AND of input FIBERSTATUS
tfmap['COADD_FIBERSTATUS'][i] = np.bitwise_and.reduce(exp_fibermap[fiberstatus_key][jj][good_coadds])
else:
compute_coadds = ~good_coadds
# if all inputs were bad, COADD_FIBERSTATUS is OR of inputs instead of AND
tfmap['COADD_FIBERSTATUS'][i] = np.bitwise_or.reduce(exp_fibermap[fiberstatus_key][jj])
#- For FIBER_RA/DEC quantities, only average over good coordinates.
# There is a bug that some "missing" coordinates were set to FIBER_RA=FIBER_DEC=0
# (we are assuming there are not valid targets at exactly 0,0; only missing coords)
if 'FIBER_RA' in exp_fibermap.colnames and 'FIBER_DEC' in exp_fibermap.colnames:
good_coords = (exp_fibermap['FIBER_RA'][jj]!=0)|(exp_fibermap['FIBER_DEC'][jj]!=0)
#- Check whether entries with good coordinates exist (if not use all coordinates)
if np.count_nonzero(good_coords)>0:
compute_coords = good_coords
else:
compute_coords = ~good_coords
#- Check for edge case where good_coadds and good_coords do not overlap:
# if they overlap, use both conditions; otherwise compute coordinates over good_coords
if np.count_nonzero(compute_coadds&compute_coords)>0:
compute_coadds_coords = compute_coadds&compute_coords
else:
#TODO - decide if it's worth adding the following Warning message to the log
#print(f"Warning: TARGETID lacks overlap between good_coadds and good_coords: {tid}")
compute_coadds_coords = compute_coords
# Note: NIGHT and TILEID may not be present when coadding previously
# coadded spectra.
if 'NIGHT' in exp_fibermap.colnames:
tfmap['COADD_NUMNIGHT'][i] = len(np.unique(exp_fibermap['NIGHT'][jj][good_coadds]))
if 'TILEID' in exp_fibermap.colnames:
tfmap['COADD_NUMTILE'][i] = len(np.unique(exp_fibermap['TILEID'][jj][good_coadds]))
if 'EXPTIME' in exp_fibermap.colnames :
tfmap['COADD_EXPTIME'][i] = np.sum(exp_fibermap['EXPTIME'][jj][good_coadds])
# Calc MEAN_* and RMS_* columns using the same exposures as the coadd
# Note RA/DEC/MJD are handled separately
for k in mean_cols:
if k in exp_fibermap.colnames :
vals=exp_fibermap[k][jj][compute_coadds]
tfmap['MEAN_'+k][i] = np.mean(vals)
for k in rms_cols:
if k in exp_fibermap.colnames :
vals=exp_fibermap[k][jj][compute_coadds]
# RMS includes mean offset, not same as STD
tfmap['RMS_'+k][i] = np.sqrt(np.mean(vals**2)).astype(np.float32)
#- STD of FIBER_RA, FIBER_DEC in arcsec, handling cos(dec) and RA wrap
if 'FIBER_RA' in exp_fibermap.colnames and 'FIBER_DEC' in exp_fibermap.colnames:
decs = exp_fibermap['FIBER_DEC'][jj][compute_coadds_coords]
ras = exp_fibermap['FIBER_RA'][jj][compute_coadds_coords]
mean_ra, std_ra, mean_dec, std_dec = calc_mean_std_ra_dec(ras, decs)
tfmap['MEAN_FIBER_RA'][i] = mean_ra
tfmap['STD_FIBER_RA'][i] = np.float32(std_ra)
tfmap['MEAN_FIBER_DEC'][i] = mean_dec
tfmap['STD_FIBER_DEC'][i] = np.float32(std_dec)
#- future proofing possibility of other STD cols
for k in std_cols:
if k in exp_fibermap.colnames:
vals=exp_fibermap[k][jj][compute_coadds]
# STD removes mean offset, not same as RMS
tfmap['STD_'+k][i] = np.std(vals).astype(np.float32)
# MIN_, MAX_MJD over exposures used in the coadd
if 'MJD' in exp_fibermap.colnames :
vals=exp_fibermap['MJD'][jj][compute_coadds]
tfmap['MIN_MJD'][i] = np.min(vals)
tfmap['MAX_MJD'][i] = np.max(vals)
tfmap['MEAN_MJD'][i] = np.mean(vals)
# Error propagation of IVAR values when taking an unweighted MEAN
#- (Note 1: IVAR will be 0.0 if any of ivar[compute_coadds]=0)
#- (Note 2: these columns are place-holder for possible future use)
for k in ['FIBER_RA_IVAR', 'FIBER_DEC_IVAR',
'DELTA_X_IVAR', 'DELTA_Y_IVAR'] :
if k in exp_fibermap.colnames :
tfmap[k][i]=1./np.mean(1./exp_fibermap[k][jj][compute_coadds])
#- Targeting bits can evolve, so use bitwise OR of any input bits set
#- See Sec 5.1 of https://ui.adsabs.harvard.edu/abs/2023AJ....165...50M/abstract
for targetcol in ('CMX_TARGET',
'SV1_DESI_TARGET', 'SV1_BGS_TARGET', 'SV1_MWS_TARGET', 'SV1_SCND_TARGET',
'SV2_DESI_TARGET', 'SV2_BGS_TARGET', 'SV2_MWS_TARGET', 'SV2_SCND_TARGET',
'SV3_DESI_TARGET', 'SV3_BGS_TARGET', 'SV3_MWS_TARGET', 'SV3_SCND_TARGET',
'DESI_TARGET', 'BGS_TARGET', 'MWS_TARGET', 'SCND_TARGET'):
if targetcol in tfmap.colnames:
tfmap[targetcol][i] = np.bitwise_or.reduce(exp_fibermap[targetcol][jj],axis=0)
#- Remove some columns that apply to individual exp but not coadds
#- (even coadds of the same tile)
for k in ['NIGHT', 'EXPID', 'MJD', 'EXPTIME', 'NUM_ITER',
'PSF_TO_FIBER_SPECFLUX', 'FLAT_TO_PSF_FLUX', 'HELIOCOR_OFFSET',
'COADD_NORM','SIGMA_COADD_NORM']:
if k in tfmap.colnames:
tfmap.remove_column(k)
#- Remove columns that don't apply to coadds across tiles
if not onetile:
for k in ['TILEID', 'FIBER', 'FIBER_X', 'FIBER_Y',
'PRIORITY', 'PETAL_RA', 'PETAL_DEC', 'LAMBDA_REF',
'FIBERASSIGN_X', 'FIBERASSIGN_Y',
'PETAL_LOC', 'DEVICE_LOC', 'LOCATION'
]:
if k in tfmap.colnames:
tfmap.remove_column(k)
#- keep exposure-specific columns that are present in the input fibermap
ii = np.isin(fibermap_perexp_cols, exp_fibermap.dtype.names)
keepcols = tuple(np.array(fibermap_perexp_cols)[ii])
exp_fibermap = exp_fibermap[keepcols]
return tfmap, exp_fibermap
[docs]
def _chi2_threshold(nspec, nsig):
"""
Return the threshold for the the
chi^2 = Sum_i 1/sigma_i^2 (x_i - Mean(x))^2
corresponding to the tail probability of nsig sigma
Args:
nspec(int): number of pixels in the sample
nsig(float): how many sigma of tail probability to cut-off
Returns:
threshold(float): the chi^2 (not reduced one) value
"""
threshold = scipy.stats.chi2(nspec - 1).isf(scipy.stats.norm.cdf(-nsig))
return threshold
[docs]
def _iterative_masker(vec,
ivar,
cosmics_nsig,
min_for_cosmics,
threshold=None):
"""
Given a vector and inverse variances vector
perform the iterative cosmic masking based on
chi^2 value. I.e. if chi2^2 for the ensemble is larger then threshold we
pick up the most deviating value.
A special mode was designed for cases when variability is much larger
than expected from noise. In the case we just use static threshold value for
chi2
Args:
vec(ndarray): input vector
ivar(ndarray): inverse variances
cosmics_nsig(float): threshold in units of sigma
min_for_cosmics(int): what's the threshold in number of spectra when we stop trying to find more cosmics
threshold(float): optional threshold, if specified we ignore cosmic_nsig and chi2 statistics and just use static chi2 threshold
Returns:
badmask(ndarray): boolean mask of bad/cosmic pixels
"""
good = np.ones(len(vec), dtype=bool)
while True:
mean = (ivar[good] * vec[good]).sum() / ivar[good].sum()
metric = (vec - mean)**2 * ivar
chi2 = metric[good].sum()
nspec = good.sum()
if threshold is None:
cur_threshold = _chi2_threshold(nspec, cosmics_nsig)
else:
cur_threshold = threshold
if chi2 > cur_threshold:
good[np.argmax(metric * good)] = False
else:
break
if good.sum() < min_for_cosmics:
# there no point in proceeding with two pixels
break
return ~good
[docs]
def _mask_cosmics(wave, flux, ivar, tid=None, cosmics_nsig=None, camera=''):
"""
Mask cosmics in multiple spectra
Args:
wave (numpy.ndarray): 1d array of wavelengths
flux (numpy.ndarray): 2d array of fluxes (from Spectra object)
ivar (numpy.ndarray): 2d array of ivars (from Spectra) with ivar=0 for previously masked pixels
tid (int): targetid (used for logging)
cosmics_nsig (float): threshold for cosmic ray rejection
camera (string): camera corresponding to the spectrum b/r/z (used for logging)
Returns:
cosmic_mask (numpy.ndarray): 2d mask with trues where we think we have a cosmic
"""
log = get_logger()
grad = []
gradvar = []
spec_pos = []
min_for_cosmics = 3
# we do not attempt to mask if the number of spectra
# is strictly less than this
nspec0 = ivar.shape[0]
cosmic_mask = np.zeros(ivar.shape, dtype=bool)
if nspec0 < min_for_cosmics:
return cosmic_mask
for j in range(nspec0):
ttivar0 = ivar[j]
good = (ttivar0 > 0)
bad = ~good
if np.sum(good) == 0:
continue
spec_pos.append(j)
# we must keep the position of 'good' spectra for later
nbad = np.sum(bad)
ttflux = flux[j].copy()
# interpolate over bad measurements to be able to compute gradient next
# to a bad pixel and identify outlier many cosmics residuals are on edge
# of cosmic ray trace, and so can be next to a masked flux bin
if nbad > 0:
ttflux[bad] = np.interp(wave[bad], wave[good], ttflux[good])
ttivar = ivar[j].copy()
if nbad > 0:
ttivar[bad] = np.interp(wave[bad], wave[good], ttivar[good])
# ttivar should not be equal to zero anywhere but just in case
# we still protect against it
ttvar = 1. / (ttivar + (ttivar == 0))
# these have one pixel less
cur_grad = ttflux[1:] - ttflux[:-1]
cur_grad_var = ttvar[1:] + ttvar[:-1]
grad.append(cur_grad)
gradvar.append(cur_grad_var)
# we have to be careful here
# because grad can have smaller number of spectra than
# original data because we throw away fully masked spectra in
# the loop before
spec_pos, grad, gradvar = [np.array(_) for _ in [spec_pos, grad, gradvar]]
gradivar = (gradvar > 0) / np.array(gradvar + (gradvar == 0))
nspec = grad.shape[0]
if nspec < min_for_cosmics:
# if after throwing out masked spectra we have not enough spectra
return cosmic_mask
sgradivar = np.sum(gradivar, axis=0)
bad = sgradivar == 0
# this should not happen really as we already
# interpolated over all zeros in ivars
if (~bad).any():
meangrad = np.sum(gradivar * grad,
axis=0) / (sgradivar + bad.astype(int))
deltagrad = grad - meangrad
chi2 = np.sum(gradivar * deltagrad**2, axis=0)
# this is chi^2 array for each pixel in the stack
med_chi2 = np.median(chi2)
if med_chi2 > 2 * scipy.stats.chi2(nspec - 1).ppf(0.5):
# the median(chi^2) across the whole spectrum
# at least a factor of two larger
# then expected so we are dominated by not-noise related reasons
# i.e. the object is variable
# we switch to handwaving and estimate the median chi2
# and stddev of chi2 distribution from 84-th percentile
threshold = med_chi2 + (scipy.stats.scoreatpercentile(chi2, 84) -
med_chi2) * cosmics_nsig
fix_threshold = threshold
# this is threshold for total chi^2 with nspec pixels
# when we run the iterative_masker we still use this fixed
# threshold even if when we mask more and more pixels
else:
threshold = _chi2_threshold(nspec, cosmics_nsig)
fix_threshold = None
cosmic_bad = (chi2 > threshold) & (~bad)
n_cosmic = np.sum(cosmic_bad)
if n_cosmic > 0:
badindex = np.where(cosmic_bad)[0]
# these are the problematic pixels with potentially more than
# one cosmic
n_dups = 0
# count how many wavelengths with more than 1 masked value
for bi in badindex:
cur_bad_mask = _iterative_masker(deltagrad[:, bi],
gradivar[:, bi],
cosmics_nsig,
min_for_cosmics,
threshold=fix_threshold)
if cur_bad_mask.sum() > 1:
n_dups += 1
cur_mask_pos = spec_pos[cur_bad_mask]
cosmic_mask[cur_mask_pos, bi] = True
cosmic_mask[cur_mask_pos, bi + 1] = True
# since we are using the maximum value of grad^2
# we really cannot say which pixel is responsible for
# large gradient hence we must mask two pixels
log.debug("masking specs {} wave={}".format(
cur_mask_pos, wave[bi]))
log.info(("masking {} wavelengths in {} spectra in cam {}"
" for targetid={}").format(n_cosmic, nspec, camera, tid))
if n_dups > 0:
log.info(("masking {} wavelengths with more than 1 mask per "
"pixel for targetid={}").format(n_dups, tid))
return cosmic_mask
[docs]
def _resolution_coadd(resolution, pix_weights):
"""
Given the resolution matrices for set of spectra, and
inverse variances (or generally weights) for fluxes return the
accumulated resolution matrix, and the combined weights
See #2372.
Args:
resolution (ndarray): (nspec, nres, npix) array of resolution matrices
pix_weights (ndarray): (nspec, npix) array of ivars or weights
Returns resolution matrix (nres, npix),
and the weight (nres, npix)
"""
ww = resolution.shape[1] // 2
# resolution kernel width
npix = resolution.shape[2]
# indices of the corresponding variance point
# that needs to be used for ivar weights
res_indices = (np.arange(npix)[None, :] +
np.arange(-ww, ww + 1)[:, None]) % npix
res_whts = np.array([_[res_indices] for _ in pix_weights])
res = np.sum(res_whts * resolution, axis=0)
res_norm = np.sum(res_whts, axis=0)
return res, res_norm
[docs]
def coadd_exposures(spectra, cosmics_nsig=None, onetile=False):
"""
Coadd spectra across exposures, returning new Spectra object without changing original.
Args:
spectra: desispec.spectra.Spectra object
Options:
cosmics_nsig: float, nsigma clipping threshold for cosmic rays (default 4)
onetile: bool, if True, inputs are from a single tile
Returns:
coadded_spectra: desispec.spectra.Spectra object
See ``coadd`` for a version of this function that does an in-place coaddition,
modifying its inputs instead of returning a new object.
This function coadds across exposures but not across cameras; see ``coadd_cameras``
for coadding across cameras but not across exposures. Combine the two to get a
coadd across both.
"""
log = get_logger()
log.debug("coadding spectra to new Spectra object")
#- Make a deep copy of input spectra to become the coadded output
coadded_spectra = spectra.copy()
#- Perform coaddition in place on the copy
coadd(coadded_spectra,
cosmics_nsig=cosmics_nsig,
onetile=onetile)
return coadded_spectra
[docs]
def _build_crude_coadd_arrays(spectra, all_idx, good_idx, bands_to_use, sbands):
"""
Build per-exposure flux and weight arrays on a merged from wavelength bands
in non-overlapping regions, and compute the corresponding inverse-variance
weighted crude coadd.
Args:
spectra : desispec.spectra.Spectra object
all_idx : integer index array into spectra.fibermap for the
all exposures of the target being processed
good_idx : integer index array into spectra.fibermap for the
good exposures of the target being processed
bands_to_use : ordered list of band names to include; must follow
the same wavelength ordering as spectra.bands so
that adjacent entries share wavelength overlaps
Returns:
f_i ndarray (n_exp, n_wave) per-exposure flux
w_i ndarray (n_exp, n_wave) per-exposure ivar weights
coadd_wave ndarray (n_wave,) merged wavelength grid
crude_coadd ndarray (n_wave,) ivar-weighted coadd using only good_idx
w_tot ndarray (n_wave,) total ivar weight per pixel
"""
bands_list = list(bands_to_use)
# check for overlap between usable bands,
# exclude these regions to avoid coadding cameras
overlap = {}
for j, b in enumerate(bands_list):
wave_b = spectra.wave[b]
overlap_flag = np.zeros(wave_b.size, dtype=int)
if j > 0:
wave_prev = spectra.wave[bands_list[j - 1]]
for k, w in enumerate(wave_b):
if np.any(np.abs(w - wave_prev) <= 1e-4):
overlap_flag[k] = 1
if j < len(bands_list) - 1:
wave_next = spectra.wave[bands_list[j + 1]]
for k, w in enumerate(wave_b):
if np.any(np.abs(w - wave_next) <= 1e-4):
overlap_flag[k] = 1
overlap[b] = overlap_flag
# construct coadd wave grid, skipping overlap regions
coadd_wave = None
for b in sbands[np.isin(sbands, bands_list)]:
wave_b = spectra.wave[b][overlap[b] == 0]
coadd_wave = wave_b if coadd_wave is None else np.append(coadd_wave, wave_b)
# fill per-exposure flux and weight arrays
f_i = np.zeros((all_idx.size, coadd_wave.size))
w_i = np.zeros((all_idx.size, coadd_wave.size))
for b in bands_list:
pix_mask = (overlap[b] == 0)
if spectra.mask is not None:
spectra_mask = spectra.mask[b][all_idx][:, pix_mask]
else:
# create a zero mask to use
spectra_mask = np.zeros_like(spectra.flux[b][all_idx][:, pix_mask])
# where to insert
wband = spectra.wave[b][pix_mask]
start = np.searchsorted(coadd_wave, wband[0])
end = start + len(wband)
iband = slice(start, end)
# directly copy because non-overlapping
f_i[:, iband] = spectra.flux[b][all_idx][:, pix_mask]
w_i[:, iband] = spectra.ivar[b][all_idx][:, pix_mask] * (spectra_mask == 0)
# compute the crude ivar-weighted coadd using only good_idx
w_tot = np.sum(w_i[np.isin(all_idx,good_idx)], axis=0)
crude_coadd = np.sum(f_i[np.isin(all_idx,good_idx)] * w_i[np.isin(all_idx,good_idx)], axis=0) / (w_tot + (w_tot == 0))
return f_i, w_i, coadd_wave, crude_coadd, w_tot
[docs]
def per_exposure_normalization(spectra, norm_chi2_threshold=0.1):
"""
Compute per‑exposure multiplicative normalization factors for each target.
The resulting normalization terms and their uncertainties are stored in the
``COADD_NORM`` and ``SIGMA_COADD_NORM`` columns of the ``spectra.fibermap``.
If the rescaling changes the coadded spectrum by a significance larger than
``norm_chi2_threshold``, the ``VARIABLE`` bit in ``FIBERSTATUS`` is set. Objects
that are not normalized (e.g. sky spectra or single‑exposure objects) receive
a default ``COADD_NORM=1.0`` and ``SIGMA_COADD_NORM=0.0``.
If any exposure yields an unphysical scaling factor while the
``VARIABLE`` bit is set, the exposure is flagged with the ``BADFIBER`` bit
in ``FIBERSTATUS``.
Parameters
----------
spectra : desispec.spectra.Spectra
Spectra object that must provide ``flux``, ``ivar``, optional ``mask``,
``wave`` dictionaries, and a fibermap table containing at least the
columns ``TARGETID``, ``FIBERSTATUS`` and ``OBJTYPE``.
norm_chi2_threshold : float
Minimum reduced chi‑squared value between the standard error‑weighted
coadd and the coadd produced with per‑exposure normalization for which
the per‑exposure normalization is preferred for a target. When this
threshold is met, the ``VARIABLE`` (``coadd_``) fiberstatus bit is set.
Returns
-------
None
The function updates ``spectra`` in place; it does not return a value.
Examples
--------
>>> from desispec.io import read_spectra
>>> spec = read_spectra('path/to/spectra-file.fits')
>>> compute_per_exposure_norm(spec)
# ``spec.fibermap`` now contains COADD_NORM, SIGMA_COADD_NORM,
# and updated FIBERSTATUS bits.
"""
log = get_logger()
bands = spectra.bands
mwave = [np.mean(spectra.wave[b]) for b in bands]
sbands = np.array(bands)[np.argsort(mwave)] #wavelength-sorted bands
targets = ordered_unique(spectra.fibermap["TARGETID"])
# bitmask of fatal fiber‑status flags – exposures with any of these bits set are ignored
fatal_fiberstatus_bits = get_all_nonamp_fiberbitmask_val()
good_fiberstatus = ( (spectra.fibermap['FIBERSTATUS'] & fatal_fiberstatus_bits) == 0 )
# since the default is not to apply rescaling;
# initialize default COADD_NORM = 1., SIGMA_COADD_NORM = 0 (no corrections necessary)
spectra.fibermap['COADD_NORM'] = 1.
spectra.fibermap['SIGMA_COADD_NORM'] = 0.
# compute normalization terms per exposure
for i,tgt in enumerate(targets):
idx = np.where(spectra.fibermap['TARGETID'] == tgt)[0]
# flag to check if solution is physically reasonable
is_converged = False
iteration = 0 # avoiding infinite loop
# track if BADFIBER fiberstatus was set function
idx_good_init = np.where((spectra.fibermap['TARGETID'] == tgt) & good_fiberstatus)[0]
while not(is_converged) and (iteration < 5):
iteration += 1
idx_good = np.where((spectra.fibermap['TARGETID'] == tgt) & good_fiberstatus)[0]
# do not apply to sky spectra or single exposures or all bad exposures
if np.all(spectra.fibermap['OBJTYPE'][idx_good] == 'TGT') & (idx_good.size > 1):
# check which cameras are in ALL exposures
usable_bands = []
for j, b in enumerate(bands):
# check for fatal camera fiberstatus is good for all exposures
cam_fiberstatus = use_for_coadd(spectra.fibermap['FIBERSTATUS'][idx_good], b)
if np.sum(cam_fiberstatus) != idx_good.size:
continue
if spectra.mask is not None:
spectra_mask = spectra.mask[b][idx_good]
else:
# create a zero mask to use
spectra_mask = np.zeros_like(spectra.flux[b][idx_good])
wave_b = spectra.wave[b]
nwave = wave_b.size
num_masked_pixels = np.sum((spectra_mask != 0)|(spectra.ivar[b][idx_good] == 0), 1)
if np.all(num_masked_pixels < int(0.4*nwave)):
# Require that less than 40 % of the pixels are masked
usable_bands.append(b)
if len(usable_bands) == 0:
log.error(f'spectra for targetid {tgt} could not be rescaled before coaddition, no usable bands')
is_converged = True
idx_good = []
continue
# construct flux, weights, and crude coadd from the usable_bands and good indices
f_i, w_i, coadd_wave, crude_coadd, w_tot = _build_crude_coadd_arrays(
spectra, idx_good_init, idx_good, usable_bands, sbands
)
# compute unbiased normalization constant by minimizing chi2
# chi2 = sum( w*(f_i - b*f_c )^2 )
# optimal scalar for f_i is a = 1/b = ( w*f_i*f_c / w*f_c^2 )^-1
a = np.ones(idx.size)
var_a = np.full(idx.size, np.inf)
# loop over all original good indices so new BADFIBER exposures
# COADD_NORM is relative to updated crude_coadd
for j,k in enumerate(idx_good_init):
mask = w_i[j]*w_tot != 0
w = (1./w_i[j][mask] + 1./w_tot[mask])**-1
numerator = w*crude_coadd[mask]**2
denominator = w*crude_coadd[mask]*f_i[j][mask]
if np.isfinite(np.sum(denominator)):
a[np.isin(idx,k)] = np.sum(numerator)/np.sum(denominator)
scalar = a[np.isin(idx,k)]
# compute error on scalar
var_a[np.isin(idx,k)] = scalar**2 / np.sum(w * crude_coadd[mask]**2)
else:
log.warning(f'coadd*exposure product is not finite for an exposure of {tgt}')
good_fiberstatus[k] = False
# need to retry without this exposure
a[np.isin(idx,k)] = 0
# enforce physically plausible scaling factors (positive and not extreme).
is_converged = np.all( (a[np.isin(idx,idx_good)]>0.1) & (a[np.isin(idx,idx_good)]<10.) )
if is_converged:
spectra.fibermap['COADD_NORM'][idx_good_init] = a[np.isin(idx,idx_good_init)]
spectra.fibermap['SIGMA_COADD_NORM'][idx_good_init] = np.sqrt(var_a[np.isin(idx,idx_good_init)])
else:
# saving "bad a" in case VARIABLE is not set on future iteration
bad_a = np.where((a<0.1) | (a>10.))[0]
bad_indices = idx[bad_a]
# set bad fiberstatus for these exposures
good_fiberstatus[bad_indices] = False
# also need to update fiberstatus for these exposures
spectra.fibermap['FIBERSTATUS'][bad_indices] |= fmsk.BADFIBER
# save the bad values for further analyses
spectra.fibermap['COADD_NORM'][bad_indices] = a[bad_a]
spectra.fibermap['SIGMA_COADD_NORM'][bad_indices] = np.sqrt(var_a[bad_a])
else:
# skip normalization for non‑target objects, single‑exposure targets
is_converged = True
if not(is_converged):
# this should never occur and something is very wrong
log.error(f"normalization for targetid {tgt} could not converge!")
spectra.fibermap['COADD_NORM'][idx] = 1.
spectra.fibermap['SIGMA_COADD_NORM'][idx] = 0.
# compute crude_coadd and rescaled_crude_coadd over all bands and compare
# do not apply to sky spectra or single exposures or all bad exposures
elif np.all(spectra.fibermap['OBJTYPE'][idx_good] == 'TGT') and (len(idx_good) > 1):
# if all bands were used, no need to recompute crude coadd
# and can use existing f_i, w_i arrays
if not np.all(np.isin(bands, usable_bands)):
# usable_bands is a strict subset: recompute over all bands for comparison
f_i, w_i, coadd_wave, crude_coadd, w_tot = _build_crude_coadd_arrays(
spectra, idx_good_init, idx_good, bands, sbands
)
# else: f_i, w_i, crude_coadd, w_tot from the normalization loop are still valid
# compute rescaled coadd (was duplicated in both branches before)
used_in_coadd = good_fiberstatus[idx]
not_flagged = np.isin(idx_good_init, idx_good)
scaling = a[used_in_coadd].reshape(np.sum(used_in_coadd), 1)
new_w_tot = np.sum(w_i[not_flagged] * scaling**-2, axis=0)
new_crude_coadd = np.sum(f_i[not_flagged] * w_i[not_flagged] * scaling**-1, axis=0) / (new_w_tot + (new_w_tot == 0))
# compute chi2dof; dof accounts for missing regions
chi2 = np.sum( (crude_coadd - new_crude_coadd)**2 * w_tot)
chi2 /= np.sum(w_tot != 0)
if chi2 < norm_chi2_threshold:
# if rescaling makes a minimal difference, we coadd as normal
spectra.fibermap['COADD_NORM'][idx_good] = 1.
spectra.fibermap['SIGMA_COADD_NORM'][idx_good] = 0.
else:
# suspect for variable calibration owing to positioning offsets,
# calib errors, intrinsic variability, etc.
spectra.fibermap['FIBERSTATUS'][idx] |= fmsk.VARIABLE
# How many are we rescaling?
ii = (spectra.fibermap['FIBERSTATUS'] & fmsk.VARIABLE) != 0
n_rescaled = len(np.unique(spectra.fibermap['TARGETID'][ii]))
log.info(f'Rescaling {n_rescaled} variable target(s)')
# downgrade to float 32
spectra.fibermap['COADD_NORM'] = spectra.fibermap['COADD_NORM'].astype(np.float32)
spectra.fibermap['SIGMA_COADD_NORM'] = spectra.fibermap['SIGMA_COADD_NORM'].astype(np.float32)
[docs]
def coadd(spectra, cosmics_nsig=None, onetile=False, no_normalize=False, norm_chi2_threshold=0.1):
"""
Coadd spectra for each target and each camera, modifying input spectra obj.
Args:
spectra: desispec.spectra.Spectra object
Options:
cosmics_nsig: float, nsigma clipping threshold for cosmic rays (default 4)
onetile: bool, if True, inputs are from a single tile
no_normalize: bool, if True, calculation of per-exposure re-normalization
constants is skipped
norm_chi2_threshold: float, minimum reduced chi‑squared value to trigger
per‑exposure normalization; only used if no_normalize is False
Notes: if `onetile` is True, additional tile-specific columns
like LOCATION and FIBER are included the FIBERMAP; otherwise
these are only in the EXP_FIBERMAP since for the same target they could
be different on different tiles.
See ``coadd_exposures`` for a version of this function that returns a new object
without modifying the input, instead of doing an in-place coaddition,
"""
log = get_logger()
targets = ordered_unique(spectra.fibermap["TARGETID"])
ntarget = targets.size
log.debug("number of targets= {}".format(ntarget))
#- Use "None" as default -> 4.0 so that scripts can use args.nsig
#- with default None, which lets this function be the sole "owner" of
#- the true numeric default
if cosmics_nsig is None:
cosmics_nsig = 4.0 # Note: if you change this, also change docstring
if cosmics_nsig > 0:
log.info(f'Clipping cosmics with {cosmics_nsig=}')
else:
log.info(f'Not performing cosmics sigma clipping ({cosmics_nsig=})')
normalize = not(no_normalize)
if normalize:
# compute normalization constants for handling flux calibration offset
per_exposure_normalization(spectra, norm_chi2_threshold=norm_chi2_threshold)
for b in spectra.bands:
log.debug("coadding band '{}'".format(b))
nwave = spectra.wave[b].size
tflux = np.zeros((ntarget, nwave), dtype=spectra.flux[b].dtype)
tivar = np.zeros((ntarget, nwave), dtype=spectra.ivar[b].dtype)
# these are the output arrays from stacking for all objects
if spectra.mask is not None:
spectra_mask = spectra.mask[b]
else:
# I am creating a zero mask if there is no mask
# to not have to deal with two code-paths throughout
# the function
spectra_mask = np.zeros(spectra.flux[b].shape, dtype=int)
tmask = np.zeros((ntarget, nwave), dtype=spectra_mask.dtype)
if spectra.resolution_data is not None :
trdata = np.zeros(
(ntarget, spectra.resolution_data[b].shape[1], nwave),
dtype=spectra.resolution_data[b].dtype)
else :
trdata = None
if 'FIBERSTATUS' in spectra.fibermap.dtype.names:
fiberstatus = spectra.fibermap['FIBERSTATUS']
else:
fiberstatus = spectra.fibermap['COADD_FIBERSTATUS']
good_fiberstatus = use_for_coadd(fiberstatus, b)
for i, tid in enumerate(targets):
jj = np.where((spectra.fibermap["TARGETID"] == tid)
& good_fiberstatus)[0]
# if all spectra were flagged as bad (FIBERSTATUS != 0), continue
# to next target, leaving tflux and tivar=0 for this target
if len(jj) == 0:
continue
if normalize:
# reshape normalization term
norm_term = spectra.fibermap['COADD_NORM'][jj].reshape(jj.size,1)
else:
# set normalization coefficient array to ones
norm_term = np.ones((jj.size,1), dtype=np.float32)
# here we keep original variance array that will not be modified
# and ivarjj_masked which will be modified by
# cosmic rays check and mask>0 check
ivarjj_orig = spectra.ivar[b][jj] * norm_term**-2
ivarjj_masked = spectra.ivar[b][jj] * (spectra_mask[jj] == 0) * norm_term**-2
flux_scaled = spectra.flux[b][jj] * norm_term
if cosmics_nsig is not None and cosmics_nsig > 0:
cosmic_mask = _mask_cosmics(spectra.wave[b],
flux_scaled,
ivarjj_masked,
cosmics_nsig=cosmics_nsig,
tid=tid,
camera=b)
ivarjj_masked[cosmic_mask] = 0
# We might think to log some info about cosmic mask
# inverse variance weights
weights = ivarjj_masked * 1
tivar[i] = np.sum(ivarjj_masked, axis=0)
bad = (tivar[i] == 0)
weights[:, bad] = ivarjj_orig[:, bad]
# in the case of all masked pixels
# we still use the variances ignoring masking
tivar[i][bad] = np.sum(weights[:, bad], axis=0)
# we now recalculate the tivar, because we just replaced updated the weights
weights = weights / (tivar[i] + (tivar[i] == 0))
tflux[i] = np.sum(weights * flux_scaled, axis=0)
if spectra.resolution_data is not None :
trdata[i, :, :] = _resolution_coadd(spectra.resolution_data[b][jj],
weights)[0]
# note we ignore the resolution matrix norm (sum of weights)
# because weights already were normalized
# for pixels where we first found ivar=0, since we decided
# to combine data anyway we need to OR the masks to indicate issues
# if ivar wave foudn to be >0 it means assume there were some good pixels
# hence the mask should stay zero
tmask[i, bad] = np.bitwise_or.reduce(spectra_mask[jj][:, bad],
axis=0)
spectra.flux[b] = tflux
spectra.ivar[b] = tivar
if spectra.mask is not None:
spectra.mask[b] = tmask
if spectra.resolution_data is not None :
spectra.resolution_data[b] = trdata
if spectra.scores is not None:
orig_scores = Table(spectra.scores.copy())
orig_scores['TARGETID'] = spectra.fibermap['TARGETID']
else:
orig_scores = None
spectra.fibermap, exp_fibermap = coadd_fibermap(spectra.fibermap,
onetile=onetile)
spectra.exp_fibermap = exp_fibermap
spectra.scores = None
compute_coadd_scores(spectra, orig_scores, update_coadd=True)
[docs]
def coadd_cameras(spectra):
"""
Coadd flux, ivar, and resolution (and model, if available)
across all bands (cameras) for a given Spectra object.
Args:
spectra: desispec.spectra.Spectra (Input spectra object to coadd)
Returns:
desispec.spectra.Spectra
Coadded Spectra object (new object).
Note: unlike ``coadd``, this does not modify the input spectra object, rather
it returns a new Spectra object like ``coadd_exposures``.
This function coadds across cameras but not across exposures; see ``coadd_exposures``
for coadding across exposures but not across cameras. Combine the two to get a
coadd across both.
"""
log = get_logger()
t0 = time.time()
bands = spectra.bands
if len(bands)==1:
log.info('single band is provided, no coadding done, returning the same spectra object')
return spectra
ntarget = spectra.flux[bands[0]].shape[0]
log.debug("number of targets = %d", ntarget)
# Build combined wavelength grid
wave = None
tolerance = 1e-4
mwave = [np.mean(spectra.wave[b]) for b in bands]
sbands = np.array(bands)[np.argsort(mwave)]
log.debug("wavelength sorted cameras= {}".format(sbands))
windict = {}
for b in sbands:
if wave is None:
wave = spectra.wave[b]
else:
wave = np.append(wave, spectra.wave[b][spectra.wave[b] > wave[-1] + tolerance])
# check alignment, caching band wavelength grid indices as we go
for b in spectra.bands:
imin = np.argmin(np.abs(spectra.wave[b][0] - wave))
windices = np.arange(imin, imin + len(spectra.wave[b]), dtype=int)
dwave = spectra.wave[b] - wave[windices]
if np.any(np.abs(dwave) > tolerance):
msg = "Input wavelength grids (band '{}') are not aligned. Use --lin-step or --log10-step to resample to a common grid.".format(
b)
log.error(msg)
raise ValueError(msg)
nwave = wave.size
# creating a dictionary for each band to assign
# which pixels are overlapping with other bands
# it masked life easier tracking everything when normalizing the pixels
overlap_flag = {}
for i, b in enumerate(bands):
wave_b = spectra.wave[b]
flag = np.zeros_like(wave_b, dtype=int)
# Check overlap with previous band
if i > 0:
wave_prev = spectra.wave[bands[i - 1]]
# Mark overlapping pixels in current band
for j, w in enumerate(wave_b):
if np.any(np.abs(w - wave_prev) <= tolerance):
flag[j] = 1
# Check overlap with next band
if i < len(bands) - 1:
wave_next = spectra.wave[bands[i + 1]]
for j, w in enumerate(wave_b):
if np.any(np.abs(w - wave_next) <= tolerance):
flag[j] = 1
overlap_flag[b] = flag
# defining arrays for coadded data
flux = np.zeros((ntarget, nwave))
ivar = np.zeros((ntarget, nwave))
mask = np.zeros((ntarget, nwave), dtype=np.int32)
has_mask = spectra.mask is not None
has_res = spectra.resolution_data is not None
has_model = spectra.model is not None
if has_model:
log.info('MODELS found, so coadding them as well..')
model = np.zeros((ntarget, nwave))
model_counts = np.zeros((ntarget, nwave))
if has_res:
max_ndiag = max([spectra.resolution_data[b].shape[1] for b in bands])
rdata = np.zeros((ntarget, max_ndiag, nwave))
rnorm = np.zeros_like(rdata)
for b in sbands:
log.debug("coadding band '{}'".format(b))
wband = spectra.wave[b]
start = np.searchsorted(wave, wband[0])
end = start + len(wband)
iband = slice(start, end)
windict[b] = iband
no_overlap = (overlap_flag[b]==0)
f = spectra.flux[b]
iv = spectra.ivar[b]
m = spectra.mask[b] if has_mask else np.zeros_like(f, dtype=np.int32)
# True for pixels in b that are non-overlapping
no_overlap = (overlap_flag[b] == 0)
for i in range(ntarget):
# Non-overlapping: directly copy
flux[i, iband][no_overlap] = f[i][no_overlap]
ivar[i, iband][no_overlap] = iv[i][no_overlap]
# Overlapping: accumulate (inverse variance weighted sum)
overlap = ~no_overlap
# coadding flux and ivar
flux[i, iband][overlap] += iv[i][overlap] * f[i][overlap]
ivar[i, iband][overlap] += iv[i][overlap]
# for masks, models and resolution matrix
# (in no overlapping regions, simple copying)
# in overlapping regions, inverse variance weighted mean
if has_mask:
# coadding mask
mask[i, iband][no_overlap] = m[i][no_overlap] # non-overlapping, simple copy
mask[i, iband][overlap] |= m[i][overlap] # overlapping, OR logic
if has_model:
model[i, iband][no_overlap] = spectra.model[f"{b}"][i][no_overlap]
model_counts[i, iband][no_overlap] = 1
# coadding model (basically same as non overlap region, as there is no error associated with model)
model[i, iband][overlap] += spectra.model[f"{b}"][i][overlap]
model_counts[i, iband][overlap] += 1
if has_res:
res = spectra.resolution_data[b][i][np.newaxis, :, :]
iv_i = iv[i:i+1]
raccum, rnorm_i = _resolution_coadd(res, iv_i)
ndiag = raccum.shape[0]
offset = (max_ndiag - ndiag) // 2
# non-overlapping regions, simple copying
rdata[i, offset:offset+ndiag, iband.start:iband.stop][:, no_overlap] = res[0][:, no_overlap]
rnorm[i, offset:offset+ndiag, iband.start:iband.stop][:, no_overlap] = 1.0
# non-overlapping regions, weighted mean
rdata[i, offset:offset+ndiag, iband.start:iband.stop][:, overlap] += raccum[:, overlap]
rnorm[i, offset:offset+ndiag, iband.start:iband.stop][:, overlap] += rnorm_i[:, overlap]
# in the combined unique wave pixels
# which pixels have two measurements due to overlapping
overlap_pixel_mask = np.zeros_like(flux, dtype=bool)
for b in bands:
band_indices = np.arange(windict[b].start, windict[b].stop)
overlap_pixel_mask[:, band_indices] = overlap_flag[b][None, :]
# Only normalize on overlapping pixels (basically inverse variance weighted mean)
# For non-overlapping (already direct copied), skip normalization
normalize_mask = (overlap_pixel_mask == 1)
flux[normalize_mask] /= (ivar[normalize_mask] + (ivar[normalize_mask] == 0))
mask[ivar > 0] = 0 # mask =0 means good pixels
wavebands = "".join(sbands)
# combined dictionaries
wave_combined, flux_combined, ivar_combined = {wavebands: wave}, {wavebands: flux}, {wavebands: ivar}
# just sanity chack that wavelength is an increasing array
assert np.all(np.diff(wave) > 0)
if has_res:
# we need to the same procedure for the resolution matrices
# as we did for fluxes
rdata_norm_pixels = normalize_mask[0] # all rows of normalize mask are basically same
rdata[:, :, rdata_norm_pixels] /= rnorm[:, :, rdata_norm_pixels] + (rnorm[:, :, rdata_norm_pixels] == 0)
rdict = {wavebands: rdata}
else:
rdict = None
if has_model:
model[model_counts > 0] /= model_counts[model_counts > 0] # normalization
model_dict = {wavebands: model}
else:
model_dict = None
if has_mask:
mask_combined = {wavebands: mask}
else:
mask_combined = None
#- Syntax note: "spectra.blat.copy() if spectra.blat is not None else None"
#- will make a copy of the input tables/arrays while also handling the case
#- where they might be None without crashing on None.copy().
# extras does not have a clear definition for being coadded across cameras
# drop with warning
if spectra.extra is not None:
log.warning("extras dictionary cannot be coadded across cameras, ignoring")
res = Spectra(
bands= [wavebands],
wave=wave_combined,
flux=flux_combined,
ivar=ivar_combined,
mask=mask_combined,
resolution_data=rdict,
model=model_dict,
fibermap=spectra.fibermap.copy() if spectra.fibermap is not None else None,
exp_fibermap=spectra.exp_fibermap.copy() if spectra.exp_fibermap is not None else None,
meta=spectra.meta.copy() if spectra.meta is not None else None,
extra=None,
scores=spectra.scores.copy() if spectra.scores is not None else None,
redshifts=spectra.redshifts.copy() if spectra.redshifts is not None else None,
)
duration = time.time() - t0
log.info(f"coadding spectra across cameras took: {duration:.3f} [sec]")
return res
[docs]
def get_resampling_matrix(global_grid,local_grid,sparse=False):
"""Build the rectangular matrix that linearly resamples from the global grid to a local grid.
The local grid range must be contained within the global grid range.
Args:
global_grid(numpy.ndarray): Sorted array of n global grid wavelengths.
local_grid(numpy.ndarray): Sorted array of m local grid wavelengths.
Returns:
numpy.ndarray: Array of (m,n) matrix elements that perform the linear resampling.
"""
assert np.all(np.diff(global_grid) > 0),'Global grid is not strictly increasing.'
assert np.all(np.diff(local_grid) > 0),'Local grid is not strictly increasing.'
# Locate each local wavelength in the global grid.
global_index = np.searchsorted(global_grid,local_grid)
assert local_grid[0] >= global_grid[0],'Local grid extends below global grid.'
assert local_grid[-1] <= global_grid[-1],'Local grid extends above global grid.'
# Lookup the global-grid bracketing interval (xlo,xhi) for each local grid point.
# Note that this gives xlo = global_grid[-1] if local_grid[0] == global_grid[0]
# but this is fine since the coefficient of xlo will be zero.
global_xhi = global_grid[global_index]
global_xlo = global_grid[global_index-1]
# Create the rectangular interpolation matrix to return.
alpha = (local_grid - global_xlo)/(global_xhi - global_xlo)
local_index = np.arange(len(local_grid),dtype=int)
matrix = np.zeros((len(local_grid),len(global_grid)))
matrix[local_index,global_index] = alpha
matrix[local_index,global_index-1] = 1-alpha
# turn into a sparse matrix
return scipy.sparse.csc_matrix(matrix)
[docs]
def decorrelate_divide_and_conquer(Cinv,Cinvf,wavebin,flux,ivar,rdata) :
"""Decorrelate an inverse covariance using the matrix square root.
Implements the decorrelation part of the spectroperfectionism algorithm described in
Bolton & Schlegel 2009 (BS) http://arxiv.org/abs/0911.2689.
with the divide and conquer approach, i.e. per diagonal block of the matrix, with an
overlapping 'skin' from one block to another.
Args:
Cinv: Square 2D array: input inverse covariance matrix
Cinvf: 1D array: input
wavebin: minimal size of wavelength bin in A, used to define the core and skin size
flux: 1D array: output flux (has to be allocated)
ivar: 1D array: output flux inverse variance (has to be allocated)
rdata: 2D array: output resolution matrix per diagonal (has to be allocated)
"""
chw=max(10,int(50/wavebin)) #core is 2*50+1 A
skin=max(2,int(10/wavebin)) #skin is 10A
nn=Cinv.shape[0]
nstep=nn//(2*chw+1)+1
Lmin=1e-15/np.mean(np.diag(Cinv)) # Lmin is scaled with Cinv values
ndiag=rdata.shape[0]
dd=np.arange(ndiag,dtype=int)-ndiag//2
for c in range(chw,nn+(2*chw+1),(2*chw+1)) :
b=max(0,c-chw-skin)
e=min(nn,c+chw+skin+1)
b1=max(0,c-chw)
e1=min(nn,c+chw+1)
bb=max(0,b1-b)
ee=min(e-b,e1-b)
if e<=b : continue
L,X = scipy.linalg.eigh(Cinv[b:e,b:e],np.eye(e-b),overwrite_a=False,driver='gvd')
nbad = np.count_nonzero(L < Lmin)
if nbad > 0:
#log.warning('zeroing {0:d} negative eigenvalue(s).'.format(nbad))
L[L < Lmin] = Lmin
Q = X.dot(np.diag(np.sqrt(L)).dot(X.T))
s = np.sum(Q,axis=1)
b1x=max(0,c-chw-3)
e1x=min(nn,c+chw+1+3)
tR = (Q/s[:,np.newaxis])
tR_it = scipy.linalg.inv(tR.T)
tivar = s**2
flux[b1:e1] = (tR_it.dot(Cinvf[b:e])/tivar)[bb:ee]
ivar[b1:e1] = (s[bb:ee])**2
for j in range(b1,e1) :
k=(dd>=-j)&(dd<nn-j)
# k is the diagonal index
# j is the wavelength index
# it could be the transposed, I am following what it is specter.ex2d, L209
rdata[k,j] = tR[j-b+dd[k],j-b]
def spectroperf_resample_spectrum_singleproc(spectra,target_index,wave,wavebin,resampling_matrix,ndiag,flux,ivar,rdata) :
cinv = None
for b in spectra.bands :
twave=spectra.wave[b]
jj=(twave>=wave[0])&(twave<=wave[-1])
twave=twave[jj]
tivar=spectra.ivar[b][target_index][jj]
diag_ivar = scipy.sparse.dia_matrix((tivar,[0]),(twave.size,twave.size))
RR = Resolution(spectra.resolution_data[b][target_index][:,jj]).dot(resampling_matrix[b])
tcinv = RR.T.dot(diag_ivar.dot(RR))
tcinvf = RR.T.dot(tivar*spectra.flux[b][target_index][jj])
if cinv is None :
cinv = tcinv
cinvf = tcinvf
else :
cinv += tcinv
cinvf += tcinvf
cinv = cinv.todense()
decorrelate_divide_and_conquer(cinv,cinvf,wavebin,flux[target_index],ivar[target_index],rdata[target_index])
# for multiprocessing, with shared memory buffers
def spectroperf_resample_spectrum_multiproc(shm_in_wave,shm_in_flux,shm_in_ivar,shm_in_rdata,in_nwave,in_ndiag,in_bands,target_indices,wave,wavebin,resampling_matrix,ndiag,ntarget,shm_flux,shm_ivar,shm_rdata) :
nwave = wave.size
# manipulate shared memory as np arrays
# input shared memory
in_wave = list()
in_flux = list()
in_ivar = list()
in_rdata = list()
nbands = len(shm_in_wave)
for b in range(nbands) :
in_wave.append( np.array(shm_in_wave[b],copy=False).reshape(in_nwave[b]) )
in_flux.append( np.array(shm_in_flux[b],copy=False).reshape((ntarget,in_nwave[b])) )
in_ivar.append( np.array(shm_in_ivar[b],copy=False).reshape((ntarget,in_nwave[b])) )
in_rdata.append( np.array(shm_in_rdata[b],copy=False).reshape((ntarget,in_ndiag[b],in_nwave[b])) )
# output shared memory
flux = np.array(shm_flux,copy=False).reshape(ntarget,nwave)
ivar = np.array(shm_ivar,copy=False).reshape(ntarget,nwave)
rdata = np.array(shm_rdata,copy=False).reshape(ntarget,ndiag,nwave)
for target_index in target_indices :
cinv = None
for b in range(nbands) :
twave=in_wave[b]
jj=(twave>=wave[0])&(twave<=wave[-1])
twave=twave[jj]
tivar=in_ivar[b][target_index][jj]
diag_ivar = scipy.sparse.dia_matrix((tivar,[0]),(twave.size,twave.size))
RR = Resolution(in_rdata[b][target_index][:,jj]).dot(resampling_matrix[in_bands[b]])
tcinv = RR.T.dot(diag_ivar.dot(RR))
tcinvf = RR.T.dot(tivar*in_flux[b][target_index][jj])
if cinv is None :
cinv = tcinv
cinvf = tcinvf
else :
cinv += tcinv
cinvf += tcinvf
cinv = cinv.todense()
decorrelate_divide_and_conquer(cinv,cinvf,wavebin,flux[target_index],ivar[target_index],rdata[target_index])
[docs]
def spectroperf_resample_spectra(spectra, wave, nproc=1) :
"""
Resampling of spectra file using the spectrophotometic approach
Args:
spectra: desispec.spectra.Spectra object
wave: 1D numy array with new wavelenght grid
Returns:
desispec.spectra.Spectra object
"""
log = get_logger()
log.debug("resampling to wave grid of size {}: {}".format(wave.size,wave))
b=spectra.bands[0]
ntarget=spectra.flux[b].shape[0]
nwave=wave.size
if spectra.mask is not None :
mask = np.zeros((ntarget,nwave),dtype=spectra.mask[b].dtype)
else :
mask = None
# number of diagonals is the max of the number of diagonals in the
# input spectra cameras
ndiag = 0
for b in spectra.bands :
ndiag = max(ndiag,spectra.resolution_data[b].shape[1])
dw=np.gradient(wave)
wavebin=np.min(dw[dw>0.]) # min wavelength bin size
log.debug("min wavelength bin= {:2.1f} A; ndiag= {:d}".format(wavebin,ndiag))
log.debug("compute resampling matrices")
resampling_matrix=dict()
for b in spectra.bands :
twave=spectra.wave[b]
jj=np.where((twave>=wave[0])&(twave<=wave[-1]))[0]
twave=spectra.wave[b][jj]
resampling_matrix[b] = get_resampling_matrix(wave,twave)
if nproc==1 :
# allocate array
flux = np.zeros((ntarget,nwave),dtype=float)
ivar = np.zeros((ntarget,nwave),dtype=float)
rdata = np.zeros((ntarget,ndiag,nwave),dtype=float)
# simply loop on targets
for target_index in range(ntarget) :
log.debug("resampling {}/{}".format(target_index+1,ntarget))
t0=time.time()
spectroperf_resample_spectrum_singleproc(spectra,target_index,wave,wavebin,resampling_matrix,ndiag,flux,ivar,rdata)
t1=time.time()
log.debug("done one spectrum in {} sec".format(t1-t0))
else :
log.debug("allocate shared memory")
# input
shm_in_wave = list()
shm_in_flux = list()
shm_in_ivar = list()
shm_in_rdata = list()
in_nwave = list()
in_ndiag = list()
for b in spectra.bands :
shm_in_wave.append( multiprocessing.Array('d',spectra.wave[b],lock=False) )
shm_in_flux.append( multiprocessing.Array('d',spectra.flux[b].ravel(),lock=False) )
shm_in_ivar.append( multiprocessing.Array('d',spectra.ivar[b].ravel(),lock=False) )
shm_in_rdata.append( multiprocessing.Array('d',spectra.resolution_data[b].ravel(),lock=False) )
in_nwave.append(spectra.wave[b].size)
in_ndiag.append(spectra.resolution_data[b].shape[1])
# output
shm_flux=multiprocessing.Array('d',ntarget*nwave,lock=False)
shm_ivar=multiprocessing.Array('d',ntarget*nwave,lock=False)
shm_rdata=multiprocessing.Array('d',ntarget*ndiag*nwave,lock=False)
# manipulate shared memory as np arrays
flux = np.array(shm_flux,copy=False).reshape(ntarget,nwave)
ivar = np.array(shm_ivar,copy=False).reshape(ntarget,nwave)
rdata = np.array(shm_rdata,copy=False).reshape(ntarget,ndiag,nwave)
# split targets per process
target_indices = np.array_split(np.arange(ntarget),nproc)
# loop on processes
procs=list()
for proc_index in range(nproc) :
log.debug("starting process #{}".format(proc_index+1))
proc = multiprocessing.Process(target=spectroperf_resample_spectrum_multiproc,
args=(shm_in_wave,shm_in_flux,shm_in_ivar,shm_in_rdata,
in_nwave,in_ndiag,spectra.bands,
target_indices[proc_index],wave,wavebin,
resampling_matrix,ndiag,ntarget,
shm_flux,shm_ivar,shm_rdata))
proc.start()
procs.append(proc)
# wait for the processes to finish
log.info("waiting for the {} processes to finish ...".format(nproc))
for proc in procs :
proc.join()
log.info("all done!")
bands=""
for b in spectra.bands : bands += b
if spectra.mask is not None :
dmask={bands:mask,}
else :
dmask=None
res=Spectra(bands=[bands,],wave={bands:wave,},flux={bands:flux,},ivar={bands:ivar,},mask=dmask,resolution_data={bands:rdata,},
fibermap=spectra.fibermap,meta=spectra.meta,extra=spectra.extra,scores=spectra.scores)
return res
[docs]
def fast_resample_spectra(spectra, wave) :
"""
Fast resampling of spectra file.
The output resolution = Id. The neighboring
flux bins are correlated.
Args:
spectra: desispec.spectra.Spectra object
wave: 1D numy array with new wavelenght grid
Returns:
desispec.spectra.Spectra object, resolution data=Id
"""
log = get_logger()
log.debug("Resampling to wave grid: {}".format(wave))
nwave=wave.size
b=spectra.bands[0]
ntarget=spectra.flux[b].shape[0]
nres=spectra.resolution_data[b].shape[1]
ivar=np.zeros((ntarget,nwave),dtype=spectra.flux[b].dtype)
flux=np.zeros((ntarget,nwave),dtype=spectra.ivar[b].dtype)
if spectra.mask is not None :
mask = np.zeros((ntarget,nwave),dtype=spectra.mask[b].dtype)
else :
mask = None
rdata=np.ones((ntarget,1,nwave),dtype=spectra.resolution_data[b].dtype) # pointless for this resampling
bands=""
for b in spectra.bands :
if spectra.mask is not None :
tivar=spectra.ivar[b]*(spectra.mask[b]==0)
else :
tivar=spectra.ivar[b]
for i in range(ntarget) :
ivar[i] += resample_flux(wave,spectra.wave[b],tivar[i])
flux[i] += resample_flux(wave,spectra.wave[b],tivar[i]*spectra.flux[b][i])
bands += b
for i in range(ntarget) :
ok=(ivar[i]>0)
flux[i,ok]/=ivar[i,ok]
if spectra.mask is not None :
dmask={bands:mask,}
else :
dmask=None
res=Spectra(bands=[bands,],wave={bands:wave,},flux={bands:flux,},ivar={bands:ivar,},mask=dmask,resolution_data={bands:rdata,},
fibermap=spectra.fibermap,meta=spectra.meta,extra=spectra.extra,scores=spectra.scores)
return res
[docs]
def resample_spectra_lin_or_log(spectra, linear_step=0, log10_step=0, fast=False, wave_min=None, wave_max=None, nproc=1) :
"""
Resampling of spectra file.
Args:
spectra: desispec.spectra.Spectra object
linear_step: if not null the ouput wavelenght grid will be linear with this step
log10_step: if not null the ouput wavelenght grid will be logarthmic with this step
Options:
fast: simple resampling. fast but at the price of correlated output flux bins and no information on resolution
wave_min: if set, use this min wavelength
wave_max: if set, use this max wavelength
Returns:
desispec.spectra.Spectra object
"""
wmin=None
wmax=None
for b in spectra.bands :
if wmin is None :
wmin=spectra.wave[b][0]
wmax=spectra.wave[b][-1]
else :
wmin=min(wmin,spectra.wave[b][0])
wmax=max(wmax,spectra.wave[b][-1])
if wave_min is not None :
wmin = wave_min
if wave_max is not None :
wmax = wave_max
if linear_step>0 :
nsteps=int((wmax-wmin)/linear_step) + 1
wave=wmin+np.arange(nsteps)*linear_step
elif log10_step>0 :
lwmin=np.log10(wmin)
lwmax=np.log10(wmax)
nsteps=int((lwmax-lwmin)/log10_step) + 1
wave=10**(lwmin+np.arange(nsteps)*log10_step)
if fast :
return fast_resample_spectra(spectra=spectra,wave=wave)
else :
return spectroperf_resample_spectra(spectra=spectra,wave=wave,nproc=nproc)