"""
desispec.trace_shifts
=====================
"""
from __future__ import absolute_import, division
import os
import sys
import argparse
import time
import numpy as np
from numpy.linalg import LinAlgError
import astropy.io.fits as pyfits
from numpy.polynomial.legendre import legval,legfit
from scipy.signal import fftconvolve
from scipy.ndimage import median_filter
import numba
from importlib import resources
from desispec.io import read_image, read_xytraceset
from desispec.io.util import get_tempfilename
from desiutil.log import get_logger
from desispec.linalg import cholesky_solve,cholesky_solve_and_invert
from desispec.interpolation import resample_flux
from desispec.qproc.qextract import qproc_boxcar_extraction
[docs]
def write_traces_in_psf(input_psf_filename,output_psf_filename,xytraceset, internal_offset_info=None,
external_offset_info=None) :
"""
Writes traces in a PSF.
Args:
input_psf_filename : Path to input fits file which has to contain XTRACE and YTRACE HDUs
output_psf_filename : Path to output fits file which has to contain XTRACE and YTRACE HDUs
xytraceset : xytraceset
internal_offset_info: dictionary of internal offsets (i.e. fiber vs 'median fiber') in wavelength
external_offset_info: dictionary of external offsets (i.e. 'median fiber' vs external spectrum) in wavelength
"""
xcoef=xytraceset.x_vs_wave_traceset._coeff
ycoef=xytraceset.y_vs_wave_traceset._coeff
wavemin=xytraceset.wavemin
wavemax=xytraceset.wavemax
log = get_logger()
psf_fits=pyfits.open(input_psf_filename)
if "PSFTYPE" in psf_fits[0].header :
psftype=psf_fits[0].header["PSFTYPE"]
else :
psftype=None
modified_x=False
modified_y=False
if psftype is not None and psftype=="GAUSS-HERMITE" :
if "X" in psf_fits["PSF"].data["PARAM"] :
i=np.where(psf_fits["PSF"].data["PARAM"]=="X")[0][0]
ishape=psf_fits["PSF"].data["COEFF"][i].shape
if ishape != xcoef.shape :
log.warning("xcoef from file and from arg don't have same shape : %s != %s"%(str(ishape),str(xcoef.shape)))
n0=min(ishape[0],xcoef.shape[0])
n1=min(ishape[1],xcoef.shape[1])
psf_fits["PSF"].data["COEFF"][i] *= 0.
psf_fits["PSF"].data["COEFF"][i][:n0,:n1]=xcoef[:n0,:n1]
psf_fits["PSF"].data["WAVEMIN"][i]=wavemin
psf_fits["PSF"].data["WAVEMAX"][i]=wavemax
modified_x=True
if "Y" in psf_fits["PSF"].data["PARAM"] :
i=np.where(psf_fits["PSF"].data["PARAM"]=="Y")[0][0]
ishape=psf_fits["PSF"].data["COEFF"][i].shape
if ishape != ycoef.shape :
log.warning("xcoef from file and from arg don't have same shape : %s != %s"%(str(ishape),str(ycoef.shape)))
n0=min(psf_fits["PSF"].data["COEFF"][i].shape[0],ycoef.shape[0])
n1=min(psf_fits["PSF"].data["COEFF"][i].shape[1],ycoef.shape[1])
psf_fits["PSF"].data["COEFF"][i] *= 0.
psf_fits["PSF"].data["COEFF"][i][:n0,:n1]=ycoef[:n0,:n1]
psf_fits["PSF"].data["WAVEMIN"][i]=wavemin
psf_fits["PSF"].data["WAVEMAX"][i]=wavemax
modified_y=True
if "XTRACE" in psf_fits :
psf_fits["XTRACE"].data = xcoef
psf_fits["XTRACE"].header["WAVEMIN"] = wavemin
psf_fits["XTRACE"].header["WAVEMAX"] = wavemax
modified_x=True
if "YTRACE" in psf_fits :
psf_fits["YTRACE"].data = ycoef
psf_fits["YTRACE"].header["WAVEMIN"] = wavemin
psf_fits["YTRACE"].header["WAVEMAX"] = wavemax
modified_y=True
if not modified_x :
log.error("didn't change the X coefs in the psf: I/O error")
raise IOError("didn't change the X coefs in the psf")
if not modified_y :
log.error("didn't change the Y coefs in the psf: I/O error")
raise IOError("didn't change the Y coefs in the psf")
if (xytraceset.meta is not None) and ("PSF" in psf_fits):
for k in xytraceset.meta.keys() :
psf_fits["PSF"].header[k] = xytraceset.meta[k]
if internal_offset_info is not None:
data = {}
dwave, dwave_err, fiber, wave=[internal_offset_info[_] for _ in ['dwave','dwave_err','fiber','wave']]
data = np.rec.fromarrays((fiber, wave, dwave, dwave_err),
dtype=np.dtype([('FIBER','i4'),
('WAVE','f4'),
('DWAVE','f4'),
('DWAVE_ERR','f4')]))
off_hdu_name = 'INTOFF'
new_off_hdu = pyfits.BinTableHDU(data, name=off_hdu_name)
if off_hdu_name in psf_fits:
psf_fits[off_hdu_name] = new_off_hdu
# this will overwrite existing extension with same name
else:
psf_fits.append(new_off_hdu)
if external_offset_info is not None:
data = {}
dwave,dwave_err,wave=[external_offset_info[_] for _ in ['dwave','dwave_err','wave']]
data = np.rec.fromarrays((wave, dwave, dwave_err),
dtype=np.dtype([
('WAVE','f4'),
('DWAVE','f4'),
('DWAVE_ERR','f4')]))
off_hdu_name = 'EXTOFF'
new_off_hdu = pyfits.BinTableHDU(data, name=off_hdu_name)
if off_hdu_name in psf_fits:
psf_fits[off_hdu_name] = new_off_hdu
# this will overwrite existing extension with same name
else:
psf_fits.append(new_off_hdu)
tmpfile = get_tempfilename(output_psf_filename)
psf_fits.writeto(tmpfile, overwrite=True)
os.rename(tmpfile, output_psf_filename)
log.info("wrote traces and psf in %s"%output_psf_filename)
[docs]
def legx(wave,wavemin,wavemax) :
"""
Reduced coordinate (range [-1,1]) for calls to legval and legfit
Args:
wave : ND np.array
wavemin : float, min. val
wavemax : float, max. val
Returns:
array of same shape as wave
"""
return 2.*(wave-wavemin)/(wavemax-wavemin)-1.
# beginning of routines for cross-correlation method for trace shifts
[docs]
def resample_boxcar_frame(frame_flux,frame_ivar,frame_wave,oversampling=2) :
"""
Resamples the spectra in a frame obtained with boxcar extraction to the same wavelength grid, with oversampling.
Uses resample_flux routine.
Args:
frame_flux : 2D np.array of shape (nfibers,nwave), sum of pixel values per row of length=width per fiber
frame_ivar : 2D np.array of shape (nfibers,nwave), ivar[f,j] = 1/( sum_[j,b:e] (1/image.ivar) ), ivar=0 if at least 1 pixel in the row has image.ivar=0 or image.mask!=0
frame_wave : 2D np.array of shape (nfibers,nwave), determined from the traces
Optional:
oversampling : int , oversampling factor , default is 2
Returns:
flux : 2D np.array of shape (nfibers,nwave*oversampling)
ivar : 2D np.array of shape (nfibers,nwave*oversampling)
frame_wave : 1D np.array of size (nwave*oversampling)
"""
log=get_logger()
log.info("resampling with oversampling")
t0=time.time()
nfibers=frame_flux.shape[0]
wave=frame_wave[nfibers//2]
dwave=np.median(np.gradient(frame_wave))/oversampling
wave=np.linspace(wave[0],wave[-1],int((wave[-1]-wave[0])/dwave))
nwave=wave.size
flux=np.zeros((nfibers,nwave))
ivar=np.zeros((nfibers,nwave))
n1=frame_ivar.shape[1]
for i in range(nfibers) :
#log.info("resampling fiber #%03d"%i)
#flux[i],ivar[i] = resample_flux(wave, frame_wave[i],frame_flux[i],frame_ivar[i])
# because I am oversampling, a linear interpolation is sufficient
# increase the masked regions.
for d in [-2,-1,1,2] :
frame_ivar[i,2:n1-2][frame_ivar[i,2+d:n1-2+d]==0]=0
good=(frame_ivar[i]>0)
if np.sum(good)>0 :
flux[i] = np.interp(wave, frame_wave[i][good],frame_flux[i][good],left=0,right=0)
ivar[i] = np.interp(wave, frame_wave[i],frame_ivar[i],left=0,right=0)/oversampling # larger variance, approximately
else :
log.warning("fiber {} has no valid data".format(i))
t1=time.time()
log.info("Resampled {} fibers in {:3.1f} sec".format(nfibers,t1-t0))
return flux,ivar,wave
# @numba.jit no real gain
[docs]
def compute_dy_from_spectral_cross_correlation(flux, wave, refflux, ivar=None,
hw=3., calibrate=False,
prior_width_dy=None) :
"""
Measure y offsets from two spectra expected to be on the same wavelength grid.
refflux is the assumed well calibrated spectrum.
A relative flux calibration of the two spectra is done internally.
Args:
flux : 1D array of spectral flux as a function of wavelength
wave : 1D array of wavelength (in Angstrom)
refflux : 1D array of reference spectral flux
Optional:
ivar : 1D array of inverse variance of flux
hw : half width in Angstrom of the cross-correlation chi2 scan, default=3A corresponding approximatly to 5 pixels for DESI
prior_width_dy: applied sigma of the Gaussian prior on dy
Returns:
x : 1D array of x coordinates on CCD (axis=1 in numpy image array, AXIS=0 in FITS, cross-dispersion axis = fiber number direction)
y : 1D array of y coordinates on CCD (axis=0 in numpy image array, AXIS=1 in FITS, wavelength dispersion axis)
dx : 1D array of shifts along x coordinates on CCD
ex : 1D array of uncertainties on dx
fiber : 1D array of fiber ID (first fiber = 0)
wave : 1D array of wavelength
"""
# absorb differences of calibration (fiberflat not yet applied)
if calibrate:
x=(wave-wave[wave.size//2])/500.
kernel=np.exp(-x**2/2)
f1=fftconvolve(flux,kernel,mode='same')
f2=fftconvolve(refflux,kernel,mode='same')
if np.all(f2>0) :
scale=f1/f2
refflux *= scale
error_floor=0.01 #A
if ivar is None :
ivar = np.ones(flux.shape)
dwave = wave[1] - wave[0]
ihw = int(hw / dwave)+1
chi2 = np.zeros((2 * ihw + 1))
for d in range(-ihw, ihw + 1) :
b = d + ihw
e = wave.size + d - ihw
chi2[ihw + d] = np.sum(ivar[ihw:-ihw] *
(flux[ihw:-ihw] - refflux[b:e])**2)
if prior_width_dy is not None:
chi2[ihw + d] += (dwave * d / prior_width_dy)**2
i=np.argmin(chi2)
if i<2 or i>=chi2.size-2 :
# something went wrong
delta=0.
sigma=100.
else :
# refine minimum
hh=int(0.6/dwave)+1
b=i-hh
e=i+hh+1
if b<0 :
b=0
e=b+2*hh+1
if e>2*ihw+1 :
e=2*ihw+1
b=e-(2*hh+1)
x=dwave*(np.arange(b,e)-ihw)
c=np.polyfit(x,chi2[b:e],2)
if c[0]>0 :
delta=-c[1]/(2.*c[0])
sigma=np.sqrt(1./c[0] + error_floor**2)
# do not scale error bars using chi2pdf because the
# two spectra do not necessarily match
# (for instance dark vs bright time sky spectrum)
else :
# something else went wrong
delta=0.
sigma=100.
'''
print("dw= %f +- %f"%(delta,sigma))
if np.abs(delta)>1. :
print("chi2/ndf=%f/%d=%f"%(chi2[i],(ndata-1),chi2[i]/(ndata-1)))
import matplotlib.pyplot as plt
x=dwave*(np.arange(chi2.size)-ihw)
plt.plot(x,chi2,"o-")
pol=np.poly1d(c)
xx=np.linspace(x[b],x[e-1],20)
plt.plot(xx,pol(xx))
plt.axvline(delta)
plt.axvline(delta-sigma)
plt.axvline(delta+sigma)
plt.show()
'''
return delta,sigma
[docs]
def compute_dy_from_spectral_cross_correlations_of_frame(flux, ivar, wave , xcoef, ycoef, wavemin, wavemax, reference_flux , n_wavelength_bins = 4) :
"""
Measures y offsets from a set of resampled spectra and a reference spectrum that are on the same wavelength grid.
reference_flux is the assumed well calibrated spectrum.
Calls compute_dy_from_spectral_cross_correlation per fiber
Args:
flux : 2D np.array of shape (nfibers,nwave)
ivar : 2D np.array of shape (nfibers,nwave) , inverse variance of flux
wave : 1D array of wavelength (in Angstrom) of size nwave
refflux : 1D array of reference spectral flux of size nwave
Optional:
n_wavelength_bins : number of bins along wavelength
Returns:
x : 1D array of x coordinates on CCD (axis=1 in numpy image array, AXIS=0 in FITS, cross-dispersion axis = fiber number direction)
y : 1D array of y coordinates on CCD (axis=0 in numpy image array, AXIS=1 in FITS, wavelength dispersion axis)
dy : 1D array of shifts along y coordinates on CCD
ey : 1D array of uncertainties on dy
fiber : 1D array of fiber ID (first fiber = 0)
wave : 1D array of wavelength
dwave : 1D array of wavelength offsets
dwave_err: 1D array of wavelength offset uncertainties
"""
log=get_logger()
x_for_dy=np.array([])
y_for_dy=np.array([])
dy=np.array([])
ey=np.array([])
fiber_for_dy=np.array([])
wave_for_dy=np.array([])
dwave_list = np.array([])
dwave_err_list = np.array([])
nfibers = flux.shape[0]
for fiber in range(nfibers) :
for b in range(n_wavelength_bins) :
wmin=wave[0]+((wave[-1]-wave[0])/n_wavelength_bins)*b
if b<n_wavelength_bins-1 :
wmax=wave[0]+((wave[-1]-wave[0])/n_wavelength_bins)*(b+1)
else :
wmax=wave[-1]
ok=(wave>=wmin)&(wave<=wmax)
flux_weights = ivar[fiber,ok] * flux[fiber,ok]**2 * (flux[fiber,ok]>0)
flux_weights_sum = np.sum(flux_weights)
if flux_weights_sum <= 0 :
continue
block_wave = np.sum(flux_weights * wave[ok]) / flux_weights_sum
dwave,err = compute_dy_from_spectral_cross_correlation(flux[fiber,ok], wave[ok],
reference_flux[ok],
ivar=ivar[fiber,ok],
hw=3., calibrate=True)
if fiber % 100 == 50 :
log.info(f"Wavelength offset {dwave} +/- {err} for fiber {fiber:03d} at wave {block_wave}")
if err > 1 :
continue
dwave_list = np.append(dwave_list, dwave)
dwave_err_list = np.append(dwave_err_list, err)
rw = legx(block_wave,wavemin,wavemax)
tx = legval(rw,xcoef[fiber])
ty = legval(rw,ycoef[fiber])
eps=0.1
yp = legval(legx(block_wave+eps,wavemin,wavemax),ycoef[fiber])
dydw = (yp-ty)/eps
tdy = -dwave*dydw
tey = err*dydw
x_for_dy=np.append(x_for_dy,tx)
y_for_dy=np.append(y_for_dy,ty)
dy=np.append(dy,tdy)
ey=np.append(ey,tey)
fiber_for_dy=np.append(fiber_for_dy,fiber)
wave_for_dy=np.append(wave_for_dy,block_wave)
return x_for_dy,y_for_dy,dy,ey,fiber_for_dy,wave_for_dy, dwave_list, dwave_err_list
@numba.jit
def numba_cross_profile(image_flux,image_ivar,x,wave,hw=3) :
n0=image_flux.shape[0]
swdx=np.zeros(n0)
sw=np.zeros(n0)
svar=np.zeros(n0)
swy=np.zeros(n0)
swx=np.zeros(n0)
swl=np.zeros(n0)
for j in range(n0) :
for i in range(int(x[j]-hw),int(x[j]+hw+1)) :
if image_ivar[j,i]==0 :
swdx[j]=0
sw[j]=0
break
swdx[j] += (i-x[j])*image_flux[j,i]
sw[j] += image_flux[j,i]
svar[j] += 1./image_ivar[j,i]
swy[j] = sw[j]*j
swx[j] = sw[j]*x[j]
swl[j] = sw[j]*wave[j]
return swdx,sw,svar,swy,swx,swl
[docs]
def compute_dx_from_cross_dispersion_profiles(xcoef,ycoef,wavemin,wavemax, image, fibers=None, width=7,deg=2,image_rebin=4) :
"""
Measure x offsets from a preprocessed image and a trace set
Args:
xcoef : 2D np.array of shape (nfibers,ncoef) containing Legendre coefficients for each fiber to convert wavelength to XCCD
ycoef : 2D np.array of shape (nfibers,ncoef) containing Legendre coefficients for each fiber to convert wavelength to YCCD
wavemin : float
wavemax : float. wavemin and wavemax are used to define a reduced variable legx(wave,wavemin,wavemax)=2*(wave-wavemin)/(wavemax-wavemin)-1
used to compute the traces, xccd=legval(legx(wave,wavemin,wavemax),xtrace[fiber])
image : DESI preprocessed image object
Optional:
fibers : 1D np.array of int (default is all fibers, the first fiber is always = 0)
width : extraction boxcar width, default is 5
deg : degree of polynomial fit as a function of y, only used to find and mask outliers
image_rebin : rebinning of CCD rows to run faster (with rebin=4 loss of precision <0.01 pixel)
Returns:
x : 1D array of x coordinates on CCD (axis=1 in numpy image array, AXIS=0 in FITS, cross-dispersion axis = fiber number direction)
y : 1D array of y coordinates on CCD (axis=0 in numpy image array, AXIS=1 in FITS, wavelength dispersion axis)
dx : 1D array of shifts along x coordinates on CCD
ex : 1D array of uncertainties on dx
fiber : 1D array of fiber ID (first fiber = 0)
wave : 1D array of wavelength
"""
log=get_logger()
log.info("Starting compute_dx_from_cross_dispersion_profiles with width={} deg={} rebin={}...".format(width,deg,image_rebin))
t0=time.time()
if fibers is None :
fibers = np.arange(xcoef.shape[0])
log.debug("wavelength range : [%f,%f]"%(wavemin,wavemax))
if image.mask is not None :
image_ivar = image.ivar*(image.mask==0)
else :
image_ivar = image.ivar
error_floor = 0.04 # pixel
n0 = image.pix.shape[0]
n1 = image.pix.shape[1]
# image rebinning to got faster !!!
if image_rebin>1 :
pix=image.pix[:(n0//image_rebin)*image_rebin,:].reshape(n0//image_rebin,image_rebin,n1).sum(1)
ivar=image_ivar[:(n0//image_rebin)*image_rebin,:].reshape(n0//image_rebin,image_rebin,n1)
hasnozero=(np.sum(ivar==0,axis=1)==0)
ivar=ivar.sum(1)*hasnozero
n0 = pix.shape[0]
else :
pix = image.pix
ivar = image_ivar
y = np.arange(n0)+0.5 # this 0.5 is important when rebinning to avoid a bias on y (here y = CCD_rows//rebin + 0.5 )
xx = np.tile(np.arange(n1),(n0,1))
hw = width//2
ncoef=ycoef.shape[1]
twave=np.linspace(wavemin, wavemax, n0)
rwave=legx(twave, wavemin, wavemax)
ox=np.array([])
oy=np.array([])
odx=np.array([])
oex=np.array([])
of=np.array([])
ol=np.array([])
dt=0.
for f,fiber in enumerate(fibers) :
# log.info("computing dx for fiber #%03d"%fiber)
# the following 5 lines take 0.25 sec for all 500 fibers
ty = legval(rwave, ycoef[fiber])/image_rebin
tx = legval(rwave, xcoef[fiber])
wave_of_y = np.interp(y,ty,twave)
x_of_y = np.interp(y,ty,tx)
swdx,sw,svar,swy,swx,swl = numba_cross_profile(pix,ivar,x_of_y,wave_of_y,hw=hw)
# rebin
tn0 = sw.size
rebin = 100//image_rebin
sw = sw[:(tn0//rebin)*rebin].reshape(tn0//rebin,rebin).sum(-1)
swdx = swdx[:(tn0//rebin)*rebin].reshape(tn0//rebin,rebin).sum(-1)
svar = svar[:(tn0//rebin)*rebin].reshape(tn0//rebin,rebin).sum(-1)
swx = swx[:(tn0//rebin)*rebin].reshape(tn0//rebin,rebin).sum(-1)
swy = swy[:(tn0//rebin)*rebin].reshape(tn0//rebin,rebin).sum(-1)
swl = swl[:(tn0//rebin)*rebin].reshape(tn0//rebin,rebin).sum(-1)
snr = sw/np.sqrt(svar+(svar==0)) # signal to noise in bin
ok = (snr>3) # keep only high snr pixels
fex = np.sqrt( (20./snr[ok])**2 + 0.01**2) # uncertainties scale as snr
fdx = (swdx/(sw+(sw==0)))[ok]
fx = (swx/(sw+(sw==0)))[ok]
fy = (swy/(sw+(sw==0)))[ok]*image_rebin-0.5
fl = (swl/(sw+(sw==0)))[ok]
good_fiber=True
for loop in range(10) :
if fdx.size < deg+2 :
good_fiber=False
break
try :
c = np.polyfit(fy, fdx, deg, w=1. / fex)
pol = np.poly1d(c)
chi2 = (fdx-pol(fy))**2/fex**2
mchi2 = 2*np.median(chi2)
ok = np.where(chi2<=9.*mchi2)[0]
nbad = fdx.size-ok.size
if nbad > 0 :
log.debug("fiber {} loop {} mchi2 = {}, nbad= {}".format(fiber,loop,mchi2,nbad))
fex = fex[ok]
fdx = fdx[ok]
fx = fx[ok]
fy = fy[ok]
fl = fl[ok]
except LinAlgError :
good_fiber=False
break
if nbad==0 :
break
"""
plt.errorbar(fy,fdx,fex,fmt="o")
plt.plot(fy,pol(fy),"-")
plt.show()
"""
# we return the original sample of offset values
if good_fiber :
ox = np.append(ox,fx)
oy = np.append(oy,fy)
odx = np.append(odx,fdx)
oex = np.append(oex,fex)
of = np.append(of,fiber*np.ones(fy.size))
ol = np.append(ol,fl)
t1=time.time()
log.debug("computing dx for {} fibers in {} sec".format(len(fibers),t1-t0))
return ox,oy,odx,oex,of,ol
[docs]
def _prepare_ref_spectrum(ref_wave, ref_spectrum, psf, wave, mflux, nfibers):
"""
Prepare the reference spectrum to be used for wavelength offset
determination. Here we convolve it to the right LSF and rescale it
to match the measured flux.
Arguments:
ref_wave: np.array of wavelengths
ref_spectrum: np.array of reference spectrum flux
psf: PSF object
wave: np.array wavelength of extracted spectra
mflux: np.array flux of extracted spectra
nfibers: int
Returns:
ref_wave, ref_spectrum: tuple of wavelength and flux arrays
"""
log = get_logger()
# trim ref_spectrum
subset = (ref_wave >= wave[0]) & (ref_wave <= wave[-1])
ref_wave = ref_wave[subset]
ref_spectrum = ref_spectrum[subset]
# check wave is linear or make it linear
if (np.abs((ref_wave[1] - ref_wave[0]) - (ref_wave[-1] - ref_wave[-2])) >
0.0001 * (ref_wave[1]-ref_wave[0])):
log.info("reference spectrum wavelength is not on a linear grid, resample it")
dwave = np.min(np.gradient(ref_wave))
tmp_wave = np.linspace(ref_wave[0], ref_wave[-1],
int((ref_wave[-1]-ref_wave[0])/dwave))
ref_spectrum = resample_flux(tmp_wave, ref_wave, ref_spectrum)
ref_wave = tmp_wave
n_wave_bins = 20 # how many points along wavelength to use to get psf
n_representative_fibers = 20 # how many fibers to use to get psf
fiber_list = np.unique(np.linspace(0, nfibers - 1,
n_representative_fibers).astype(int))
wave_bins = np.linspace(wave[0], wave[-1], n_wave_bins+1)
wave_bins = wave_bins[:-1] + .5 * np.diff(wave_bins)
spectra = []
ref_spectrum0 = ref_spectrum * 1
# original before convolution
angstrom_hwidth = 3 # psf half width
for central_wave0 in wave_bins:
ipos = np.searchsorted(ref_wave, central_wave0)
central_wave = ref_wave[ipos] # actual value from the grid
dwave = ref_wave[ipos + 1] - ref_wave[ipos]
hw = int(angstrom_hwidth / dwave) + 1
wave_range = ref_wave[ipos - hw:ipos + hw + 1]
kernels = []
for fiber in fiber_list:
x, y = psf.xy(fiber, wave_range)
x = x[:,None] + np.linspace(-hw, hw, 2*hw+1)[None,:]
# original code below but I don't understand y[-1]-y[0] part
# x=np.tile(x[hw]+np.arange(-hw,hw+1)*(y[-1]-y[0])/(2*hw+1),(y.size,1))
y = np.tile(y, (2 * hw + 1, 1)).T
kernel2d = psf._value(x, y, fiber, central_wave)
kernel1d = np.sum(kernel2d, axis=1)
kernels.append(kernel1d)
kernels = np.mean(kernels, axis=0)
# average across fibers
ref_spectrum = fftconvolve(ref_spectrum0, kernels, mode='same')
spectra.append(ref_spectrum)
log.info("convolve reference spectrum using PSF")
spectra = np.array(spectra)
spectra_pos = np.searchsorted(wave_bins, ref_wave)
result = ref_spectrum * 0.
# Edges of the spectrum
result[spectra_pos == 0] = spectra[0][spectra_pos==0]
result[spectra_pos == n_wave_bins] = spectra[-1][spectra_pos == n_wave_bins]
inside = (spectra_pos > 0) & (spectra_pos < n_wave_bins)
weight = (ref_wave[inside] - wave_bins[spectra_pos[inside] - 1])/(
wave_bins[spectra_pos[inside]] - wave_bins[spectra_pos[inside] - 1])
# weight is zero on left edge one on right
# linearly stitching the spectra convolved with different kernel
result[inside] = (1 - weight) * spectra[spectra_pos[inside] - 1, inside]+(
weight * spectra[spectra_pos[inside],inside])
ref_spectrum = result
# resample input spectrum
log.info("resample convolved reference spectrum")
ref_spectrum = resample_flux(wave, ref_wave , ref_spectrum)
log.info("absorb difference of calibration")
x = (wave - wave[wave.size//2]) / 50.
kernel = np.exp(- x**2/2)
f1 = fftconvolve(mflux, kernel, mode='same')
f2 = fftconvolve(ref_spectrum, kernel, mode='same')
# We scale by a constant factor
scale = (f1 * f2).sum() / (f2 * f2).sum()
ref_spectrum *= scale
return ref_wave, ref_spectrum
[docs]
def shift_ycoef_using_external_spectrum(psf, xytraceset, image, fibers,
spectrum_filename, degyy=2, width=7,
prior_width_dy=0.1):
"""
Measure y offsets (external wavelength calibration) from a preprocessed image , a PSF + trace set using a cross-correlation of boxcar extracted spectra
and an external well-calibrated spectrum.
The PSF shape is used to convolve the input spectrum. It could also be used to correct for the PSF asymetry (disabled for now).
A relative flux calibration of the spectra is performed internally.
Args:
psf : specter PSF
xytraceset : XYTraceset object
image : DESI preprocessed image object
fibers : 1D np.array of fiber indices
spectrum_filename : path to input spectral file ( read with np.loadtxt , first column is wavelength (in vacuum and Angstrom) , second column in flux (arb. units)
Optional:
width : int, extraction boxcar width, default is 7
degyy : int, degree of polynomial fit of shifts as a function of y, used to reject outliers.
prior_width_dy: float with of the Gaussian prior on dy
Returns:
ycoef : 2D np.array of same shape as input, with modified Legendre coefficients for each fiber to convert wavelength to YCCD
wave: : 1D array of wavelengths for which offsets are computed
dwave: : 1D array of wavelength offsets
dwave_err: 1D array of wavelength offset uncertainties
"""
log = get_logger()
wavemin = xytraceset.wavemin
wavemax = xytraceset.wavemax
xcoef = xytraceset.x_vs_wave_traceset._coeff
ycoef = xytraceset.y_vs_wave_traceset._coeff
tmp=np.loadtxt(spectrum_filename).T
ref_wave=tmp[0]
ref_spectrum=tmp[1]
log.info("read reference spectrum in %s with %d entries"%(spectrum_filename,ref_wave.size))
log.info("rextract spectra with boxcar")
# boxcar extraction
qframe = qproc_boxcar_extraction(xytraceset, image, fibers=fibers, width=7)
# resampling on common finer wavelength grid
oversampling = 2 #
flux, ivar, wave = resample_boxcar_frame(qframe.flux, qframe.ivar, qframe.wave, oversampling=oversampling)
mflux, mivar, flux = _continuum_subtract_median(flux, ivar, continuum_win=oversampling*9)
ref_wave, ref_spectrum = _prepare_ref_spectrum(ref_wave, ref_spectrum, psf, wave, mflux, len(ivar))
log.info("fit shifts on wavelength bins")
# define bins
n_wavelength_bins = degyy + 4
y_for_dy = np.array([])
dy = np.array([])
ey = np.array([])
wave_for_dy = np.array([])
dwave_list = np.array([])
dwave_err_list = np.array([])
fiber_for_psf_evaluation = flux.shape[0] //2
wavelength_bins = np.linspace(wave[0], wave[-1], n_wavelength_bins+1)
for b in range(n_wavelength_bins) :
wmin, wmax = [wavelength_bins[_] for _ in [b, b + 1]]
ok = (wave >= wmin) & (wave <= wmax)
flux_weights = mflux[ok]**2 * (mflux[ok] > 0) * mivar[ok]
flux_weights_sum = np.sum(flux_weights)
if flux_weights_sum == 0 :
continue
dwave,err = compute_dy_from_spectral_cross_correlation(mflux[ok],
wave[ok], ref_spectrum[ok], ivar=mivar[ok], hw=10.,
prior_width_dy=prior_width_dy)
bin_wave = np.sum(flux_weights * wave[ok]) / flux_weights_sum
# flux weighted wavelength of the center
# Computing the derivative dy/dwavelength
x, y = psf.xy(fiber_for_psf_evaluation, bin_wave)
eps = 0.1
x, yp = psf.xy(fiber_for_psf_evaluation, bin_wave+eps)
dydw = (yp - y) / eps
if err * dydw < 1 :
dy = np.append(dy, -dwave * dydw)
ey = np.append(ey, err*dydw)
wave_for_dy = np.append(wave_for_dy,bin_wave)
dwave_list = np.append(dwave_list, dwave)
dwave_err_list = np.append(dwave_err_list, err)
y_for_dy=np.append(y_for_dy,y)
log.info(f"wave = {bin_wave}A , y={y}, measured dwave = {dwave} +- {err} A")
if False : # we don't need this for now
try :
log.info("correcting bias due to asymmetry of PSF")
hw=5
oversampling=4
xx=np.tile(np.arange(2*hw*oversampling+1)-hw*oversampling,(2*hw*oversampling+1,1))/float(oversampling)
yy=xx.T
x,y=psf.xy(fiber_for_psf_evaluation,central_wave_for_psf_evaluation)
prof=psf._value(xx+x,yy+y,fiber_for_psf_evaluation,central_wave_for_psf_evaluation)
dy_asym_central = np.sum(yy*prof)/np.sum(prof)
for i in range(dy.size) :
x,y=psf.xy(fiber_for_psf_evaluation,wave_for_dy[i])
prof=psf._value(xx+x,yy+y,fiber_for_psf_evaluation,wave_for_dy[i])
dy_asym = np.sum(yy*prof)/np.sum(prof)
log.info("y=%f, measured dy=%f , bias due to PSF asymetry = %f"%(y,dy[i],dy_asym-dy_asym_central))
dy[i] -= (dy_asym-dy_asym_central)
except :
log.warning("couldn't correct for asymmetry of PSF: %s %s"%(sys.exc_info()[0],sys.exc_info()[1]))
log.info("polynomial fit of shifts and modification of PSF ycoef")
# pol fit
coef = np.polyfit(wave_for_dy, dy, degyy, w=1. / ey)
pol = np.poly1d(coef)
for i in range(dy.size) :
log.info("wave=%fA y=%f, measured dy=%f+-%f , pol(wave) = %f"%(wave_for_dy[i],y_for_dy[i],dy[i],ey[i],pol(wave_for_dy[i])))
log.info("apply this to the PSF ycoef")
wave = np.linspace(wavemin,wavemax,100)
dy = pol(wave)
dycoef = legfit(legx(wave,wavemin,wavemax),dy,deg=ycoef.shape[1]-1)
for fiber in range(ycoef.shape[0]) :
ycoef[fiber] += dycoef
return ycoef, wave_for_dy, dwave_list, dwave_err_list
# end of routines for cross-correlation method for trace shifts
# beginning of routines for forward model method for trace shifts
[docs]
def compute_fiber_bundle_trace_shifts_using_psf(fibers,line,psf,image,maxshift=2.) :
"""
Computes trace shifts along x and y from a preprocessed image, a PSF (with trace coords), and a given emission line,
by doing a forward model of the image.
Args:
fibers : 1D array with list of fibers
line : float, wavelength of an emission line (in Angstrom)
psf : specter psf object
image : DESI preprocessed image object
Optional:
maxshift : float maximum shift in pixels for 2D chi2 scan
Returns:
x : 1D array of x coordinates on CCD (axis=1 in numpy image array, AXIS=0 in FITS, cross-dispersion axis = fiber number direction)
y : 1D array of y coordinates on CCD (axis=0 in numpy image array, AXIS=1 in FITS, wavelength dispersion axis)
dx : 1D array of shifts along x coordinates on CCD
dy : 1D array of shifts along y coordinates on CCD
sx : 1D array of uncertainties on dx
sy : 1D array of uncertainties on dy
"""
log=get_logger()
#log.info("compute_fiber_bundle_offsets fibers={} line={}".format(fibers,line))
# get central coordinates of bundle for interpolation of offsets on CCD
x,y = psf.xy([int(np.median(fibers)),],line)
try :
nfibers=len(fibers)
# compute stamp coordinates
xstart=None
xstop=None
ystart=None
ystop=None
xs=[]
ys=[]
pix=[]
xx=[]
yy=[]
for fiber in fibers :
txs,tys,tpix = psf.xypix(fiber,line)
xs.append(txs)
ys.append(tys)
pix.append(tpix)
if xstart is None :
xstart =txs.start
xstop =txs.stop
ystart =tys.start
ystop =tys.stop
else :
xstart =min(xstart,txs.start)
xstop =max(xstop,txs.stop)
ystart =min(ystart,tys.start)
ystop =max(ystop,tys.stop)
# load stamp data, with margins to avoid problems with shifted psf
margin=int(maxshift)+1
stamp=np.zeros((ystop-ystart+2*margin,xstop-xstart+2*margin))
stampivar=np.zeros(stamp.shape)
stamp[margin:-margin,margin:-margin]=image.pix[ystart:ystop,xstart:xstop]
stampivar[margin:-margin,margin:-margin]=image.ivar[ystart:ystop,xstart:xstop]
# will use a fixed footprint despite changes of psf stamps
# so that chi2 always based on same data set
footprint=np.zeros(stamp.shape)
for i in range(nfibers) :
footprint[margin-ystart+ys[i].start:margin-ystart+ys[i].stop,margin-xstart+xs[i].start:margin-xstart+xs[i].stop]=1
#plt.imshow(footprint) ; plt.show() ; sys.exit(12)
# define grid of shifts to test
res=0.5
nshift=int(maxshift/res)
dx=res*np.tile(np.arange(2*nshift+1)-nshift,(2*nshift+1,1))
dy=dx.T
original_shape=dx.shape
dx=dx.ravel()
dy=dy.ravel()
chi2=np.zeros(dx.shape)
A=np.zeros((nfibers,nfibers))
B=np.zeros((nfibers))
mods=np.zeros(np.zeros(nfibers).shape+stamp.shape)
debugging=False
if debugging : # FOR DEBUGGING KEEP MODELS
models=[]
# loop on possible shifts
# refit fluxes and compute chi2
for d in range(len(dx)) :
# print(d,dx[d],dy[d])
A *= 0
B *= 0
mods *= 0
for i,fiber in enumerate(fibers) :
# apply the PSF shift
psf._cache={} # reset cache !!
psf.coeff['X']._coeff[fiber][0] += dx[d]
psf.coeff['Y']._coeff[fiber][0] += dy[d]
# compute pix and paste on stamp frame
xx, yy, pix = psf.xypix(fiber,line)
mods[i][margin-ystart+yy.start:margin-ystart+yy.stop,margin-xstart+xx.start:margin-xstart+xx.stop]=pix
# undo the PSF shift
psf.coeff['X']._coeff[fiber][0] -= dx[d]
psf.coeff['Y']._coeff[fiber][0] -= dy[d]
B[i] = np.sum(stampivar*stamp*mods[i])
for j in range(i+1) :
A[i,j] = np.sum(stampivar*mods[i]*mods[j])
if j!=i :
A[j,i] = A[i,j]
Ai=np.linalg.inv(A)
flux=Ai.dot(B)
model=np.zeros(stamp.shape)
for i in range(nfibers) :
model += flux[i]*mods[i]
chi2[d]=np.sum(stampivar*(stamp-model)**2)
if debugging :
models.append(model)
if debugging :
schi2=chi2.reshape(original_shape).copy() # FOR DEBUGGING
sdx=dx.copy()
sdy=dy.copy()
# find minimum chi2 grid point
k = chi2.argmin()
j,i = np.unravel_index(k, ((2*nshift+1),(2*nshift+1)))
#print("node dx,dy=",dx.reshape(original_shape)[j,i],dy.reshape(original_shape)[j,i])
# cut a region around minimum
delta=1
istart=max(0,i-delta)
istop=min(2*nshift+1,i+delta+1)
jstart=max(0,j-delta)
jstop=min(2*nshift+1,j+delta+1)
chi2=chi2.reshape(original_shape)[jstart:jstop,istart:istop].ravel()
dx=dx.reshape(original_shape)[jstart:jstop,istart:istop].ravel()
dy=dy.reshape(original_shape)[jstart:jstop,istart:istop].ravel()
# fit 2D polynomial of deg2
m = np.array([dx*0+1, dx, dy, dx**2, dy**2, dx*dy ]).T
c, r, rank, s = np.linalg.lstsq(m, chi2)
if c[3]>0 and c[4]>0 :
# get minimum
# dchi2/dx=0 : c[1]+2*c[3]*dx+c[5]*dy = 0
# dchi2/dy=0 : c[2]+2*c[4]*dy+c[5]*dx = 0
a=np.array([[2*c[3],c[5]],[c[5],2*c[4]]])
b=np.array([c[1],c[2]])
t=-np.linalg.inv(a).dot(b)
dx=t[0]
dy=t[1]
sx=1./np.sqrt(c[3])
sy=1./np.sqrt(c[4])
#print("interp dx,dy=",dx,dy)
if debugging : # FOR DEBUGGING
import matplotlib.pyplot as plt
plt.figure()
plt.subplot(2,2,1,title="chi2")
plt.imshow(schi2,extent=(-nshift*res,nshift*res,-nshift*res,nshift*res),origin=0,interpolation="nearest")
plt.plot(dx,dy,"+",color="white",ms=20)
plt.xlabel("x")
plt.ylabel("y")
plt.subplot(2,2,2,title="data")
plt.imshow(stamp*footprint,origin=0,interpolation="nearest")
plt.grid()
k0=np.argmin(sdx**2+sdy**2)
plt.subplot(2,2,3,title="original psf")
plt.imshow(models[k0],origin=0,interpolation="nearest")
plt.grid()
plt.subplot(2,2,4,title="shifted psf")
plt.imshow(models[k],origin=0,interpolation="nearest")
plt.grid()
plt.show()
else :
log.warning("fit failed (bad chi2 surf.) for fibers [%d:%d] line=%dA"%(fibers[0],fibers[-1]+1,int(line)))
dx=0.
dy=0.
sx=10.
sy=10.
except LinAlgError :
log.warning("fit failed (masked or missing data) for fibers [%d:%d] line=%dA"%(fibers[0],fibers[-1]+1,int(line)))
dx=0.
dy=0.
sx=10.
sy=10.
return x,y,dx,dy,sx,sy
[docs]
def compute_dx_dy_using_psf(psf,image,fibers,lines) :
"""
Computes trace shifts along x and y from a preprocessed image, a PSF (with trace coords), and a set of emission lines,
by doing a forward model of the image.
Calls compute_fiber_bundle_trace_shifts_using_psf.
Args:
psf : specter psf object
image : DESI preprocessed image object
fibers : 1D array with list of fibers
lines : 1D array of wavelength of emission lines (in Angstrom)
Returns:
x : 1D array of x coordinates on CCD (axis=1 in numpy image array, AXIS=0 in FITS, cross-dispersion axis = fiber number direction)
y : 1D array of y coordinates on CCD (axis=0 in numpy image array, AXIS=1 in FITS, wavelength dispersion axis)
dx : 1D array of shifts along x coordinates on CCD
dy : 1D array of shifts along y coordinates on CCD
sx : 1D array of uncertainties on dx
sy : 1D array of uncertainties on dy
fiber : 1D array of fiber ID
wave : 1D array of wavelength
"""
log = get_logger()
nlines=len(lines)
nfibers=len(fibers)
log.info("computing spots coordinates and define bundles")
x=np.zeros((nfibers,nlines))
y=np.zeros((nfibers,nlines))
# load expected spots coordinates
for fiber in range(nfibers) :
for l,line in enumerate(lines) :
x[fiber,l],y[fiber,l] = psf.xy(fiber,line)
bundle_fibers=[]
bundle_xmin=[]
bundle_xmax=[]
xwidth=9.
bundle_xmin.append(x[0,nlines//2]-xwidth/2)
bundle_xmax.append(x[0,nlines//2]+xwidth/2)
bundle_fibers.append([0,])
for fiber in range(1,nfibers) :
tx=x[fiber,nlines//2]
found=False
for b in range(len(bundle_fibers)) :
if tx+xwidth/2 >= bundle_xmin[b] and tx-xwidth/2 <= bundle_xmax[b] :
found=True
bundle_fibers[b].append(fiber)
bundle_xmin[b]=min(bundle_xmin[b],tx-xwidth/2)
bundle_xmax[b]=max(bundle_xmax[b],tx+xwidth/2)
break
if not found :
bundle_fibers.append([fiber,])
bundle_xmin.append(tx-xwidth/2)
bundle_xmax.append(tx+xwidth/2)
log.info("measure offsets dx dy per bundle ({}) and spectral line ({})".format(len(bundle_fibers),len(lines)))
wave_xy=np.array([]) # line
fiber_xy=np.array([]) # central fiber in bundle
x=np.array([]) # central x in bundle at line wavelength
y=np.array([]) # central x in bundle at line wavelength
dx=np.array([]) # measured offset along x
dy=np.array([]) # measured offset along y
ex=np.array([]) # measured offset uncertainty along x
ey=np.array([]) # measured offset uncertainty along y
for b in range(len(bundle_fibers)) :
for l,line in enumerate(lines) :
tx,ty,tdx,tdy,tex,tey = compute_fiber_bundle_trace_shifts_using_psf(fibers=bundle_fibers[b],psf=psf,image=image,line=line)
log.info("fibers [%d:%d] %dA dx=%4.3f+-%4.3f dy=%4.3f+-%4.3f"%(bundle_fibers[b][0],bundle_fibers[b][-1]+1,int(line),tdx,tex,tdy,tey))
if tex<1. and tey<1. :
wave_xy=np.append(wave_xy,line)
fiber_xy=np.append(fiber_xy,int(np.median(bundle_fibers[b])))
x=np.append(x,tx)
y=np.append(y,ty)
dx=np.append(dx,tdx)
dy=np.append(dy,tdy)
ex=np.append(ex,tex)
ey=np.append(ey,tey)
return x,y,dx,ex,dy,ey,fiber_xy,wave_xy
# end of routines for forward model method
[docs]
def monomials(x,y,degx,degy) :
"""
Computes monomials as a function of x and y of a 2D polynomial of degrees degx and degy
Args:
x : ND array
y : ND array of same shape as x
degx : int (>=0), polynomial degree along x
degy : int (>=0), polynomial degree along y
Returns :
monomials : ND array of shape ( (degx+1)*(degy+1) , x shape )
"""
M=[]
for i in range(degx+1) :
for j in range(degy+1) :
M.append(x**i*y**j)
return np.array(M)
[docs]
def polynomial_fit(z,ez,xx,yy,degx,degy) :
"""
Computes and 2D polynomial fit of z as a function of (x,y) of degrees degx and degy
Args:
z : ND array
ez : ND array of same shape as z, uncertainties on z
x : ND array of same shape as z
y : ND array of same shape as z
degx : int (>=0), polynomial degree along x
degy : int (>=0), polynomial degree along y
Returns:
coeff : 1D array of size (degx+1)*(degy+1) with polynomial coefficients (as defined by routine monomials)
covariance : 2D array of covariance of coeff
error_floor : float , extra uncertainty needed to get chi2/ndf=1
polval : ND array of same shape as z with values of pol(x,y)
mask : ND array of same shape as z indicating the masked data points in the fit
"""
M=monomials(x=xx,y=yy,degx=degx,degy=degy)
error_floor = 0.
npar=M.shape[0]
A=np.zeros((npar,npar))
B=np.zeros((npar))
mask=np.ones(z.shape).astype(int)
for loop in range(100) : # loop to increase errors
w=1./(ez**2+error_floor**2)
w[mask==0]=0.
A *= 0.
B *= 0.
for k in range(npar) :
B[k]=np.sum(w*z*M[k])
for l in range(k+1) :
A[k,l]=np.sum(w*M[k]*M[l])
if l!=k : A[l,k]=A[k,l]
coeff=cholesky_solve(A,B)
polval = M.T.dot(coeff)
# compute rchi2 with median
ndata=np.sum(w>0)
rchi2=1.4826*np.median(np.sqrt(w)*np.abs(z-polval))*ndata/float(ndata-npar)
# std chi2
rchi2_std = np.sum(w*(z-polval)**2)/(ndata-npar)
#print("#%d rchi2=%f rchi2_std=%f ngood=%d nbad=%d error floor=%f"%(loop,rchi2,rchi2_std,ndata,np.sum(w==0),error_floor))
# reject huge outliers
nbad=0
rvar=w*(z-polval)**2
worst=np.argmax(rvar)
if rvar[worst] > 25*max(rchi2,1.2) : # cap rchi2 if starting point is very bad
#print("remove one bad measurement at %2.1f sigmas"%np.sqrt(rvar[worst]))
mask[worst]=0
nbad=1
if rchi2>1 :
if nbad==0 or loop>5 :
error_floor+=0.002
if rchi2<=1. and nbad==0 :
break
# rerun chol. solve to get covariance
coeff,covariance=cholesky_solve_and_invert(A,B)
return coeff,covariance,error_floor,polval,mask
[docs]
def recompute_legendre_coefficients(xcoef,ycoef,wavemin,wavemax,degxx,degxy,degyx,degyy,dx_coeff,dy_coeff) :
"""
Modifies legendre coefficients of an input trace set using polynomial coefficients (as defined by the routine monomials)
Args:
xcoef : 2D np.array of shape (nfibers,ncoef) containing Legendre coefficients for each fiber to convert wavelength to XCCD
ycoef : 2D np.array of shape (nfibers,ncoef) containing Legendre coefficients for each fiber to convert wavelength to YCCD
wavemin : float
wavemax : float. wavemin and wavemax are used to define a reduced variable legx(wave,wavemin,wavemax)=2*(wave-wavemin)/(wavemax-wavemin)-1
used to compute the traces, xccd=legval(legx(wave,wavemin,wavemax),xtrace[fiber])
degxx : int, degree of polynomial for x shifts as a function of x (x is axis=1 in numpy image array, AXIS=0 in FITS, cross-dispersion axis = fiber number direction)
degxy : int, degree of polynomial for x shifts as a function of y (y is axis=0 in numpy image array, AXIS=1 in FITS, wavelength dispersion axis)
degyx : int, degree of polynomial for y shifts as a function of x
degyy : int, degree of polynomial for y shifts as a function of y
dx_coeff : 1D np.array of polynomial coefficients of size (degxx*degxy) as defined by the routine monomials.
dy_coeff : 1D np.array of polynomial coefficients of size (degyx*degyy) as defined by the routine monomials.
Returns:
new_xcoef : 2D np.array of shape (nfibers,ncoef) with modified Legendre coefficients
new_ycoef : 2D np.array of shape (nfibers,ncoef) with modified Legendre coefficients
"""
new_xcoef = np.zeros_like(xcoef)
new_ycoef = np.zeros_like(ycoef)
wave=np.linspace(wavemin,wavemax,100)
nfibers=xcoef.shape[0]
rw=legx(wave,wavemin,wavemax)
for fiber in range(nfibers) :
x = legval(rw,xcoef[fiber])
y = legval(rw,ycoef[fiber])
m=monomials(x,y,degxx,degxy)
dx=m.T.dot(dx_coeff)
new_xcoef[fiber]=legfit(rw,x+dx,deg=xcoef.shape[1]-1)
m=monomials(x,y,degyx,degyy)
dy=m.T.dot(dy_coeff)
new_ycoef[fiber]=legfit(rw,y+dy,deg=ycoef.shape[1]-1)
return new_xcoef,new_ycoef
[docs]
def recompute_legendre_coefficients_for_x(xcoef,ycoef,wavemin,wavemax,degxx,degxy,dx_coeff) :
"""
Modifies legendre coefficients of an input trace set using polynomial coefficients (as defined by the routine monomials)
Args:
xcoef : 2D np.array of shape (nfibers,ncoef) containing Legendre coefficients for each fiber to convert wavelength to XCCD
ycoef : 2D np.array of shape (nfibers,ncoef) containing Legendre coefficients for each fiber to convert wavelength to YCCD
wavemin : float
wavemax : float. wavemin and wavemax are used to define a reduced variable legx(wave,wavemin,wavemax)=2*(wave-wavemin)/(wavemax-wavemin)-1
used to compute the traces, xccd=legval(legx(wave,wavemin,wavemax),xtrace[fiber])
degxx : int, degree of polynomial for x shifts as a function of x (x is axis=1 in numpy image array, AXIS=0 in FITS, cross-dispersion axis = fiber number direction)
degxy : int, degree of polynomial for x shifts as a function of y (y is axis=0 in numpy image array, AXIS=1 in FITS, wavelength dispersion axis)
dx_coeff : 1D np.array of polynomial coefficients of size (degxx*degxy) as defined by the routine monomials.
Returns:
new_xcoef : 2D np.array of shape (nfibers,ncoef) with modified Legendre coefficients
"""
new_xcoef = np.zeros_like(xcoef)
wave=np.linspace(wavemin,wavemax,100)
nfibers=xcoef.shape[0]
rw=legx(wave,wavemin,wavemax)
for fiber in range(nfibers) :
x = legval(rw,xcoef[fiber])
y = legval(rw,ycoef[fiber])
m=monomials(x,y,degxx,degxy)
dx=m.T.dot(dx_coeff)
new_xcoef[fiber]=legfit(rw,x+dx,deg=xcoef.shape[1]-1)
return new_xcoef
[docs]
def recompute_legendre_coefficients_for_y(xcoef,ycoef,wavemin,wavemax,degyx,degyy,dy_coeff) :
"""
Modifies legendre coefficients of an input trace set using polynomial coefficients (as defined by the routine monomials)
Args:
xcoef : 2D np.array of shape (nfibers,ncoef) containing Legendre coefficients for each fiber to convert wavelength to XCCD
ycoef : 2D np.array of shape (nfibers,ncoef) containing Legendre coefficients for each fiber to convert wavelength to YCCD
wavemin : float
wavemax : float. wavemin and wavemax are used to define a reduced variable legx(wave,wavemin,wavemax)=2*(wave-wavemin)/(wavemax-wavemin)-1
used to compute the traces, xccd=legval(legx(wave,wavemin,wavemax),xtrace[fiber])
degyx : int, degree of polynomial for y shifts as a function of x
degyy : int, degree of polynomial for y shifts as a function of y
dy_coeff : 1D np.array of polynomial coefficients of size (degyx*degyy) as defined by the routine monomials.
Returns:
new_ycoef : 2D np.array of shape (nfibers,ncoef) with modified Legendre coefficients
"""
new_ycoef = np.zeros_like(ycoef)
wave=np.linspace(wavemin,wavemax,100)
nfibers=xcoef.shape[0]
rw=legx(wave,wavemin,wavemax)
for fiber in range(nfibers) :
x = legval(rw,xcoef[fiber])
y = legval(rw,ycoef[fiber])
m=monomials(x,y,degyx,degyy)
dy=m.T.dot(dy_coeff)
new_ycoef[fiber]=legfit(rw,y+dy,deg=ycoef.shape[1]-1)
return new_ycoef
[docs]
def list_of_expected_spot_positions(traceset,fibers=None,max_number_of_lines=50) :
"""
Computes a list of expected arc lamp spots position in the CCD image.
This function uses the data file data/spec-arc-lamps.dat that is already
used elsewhere.
Args:
traceset: desispec.xytracet instance
Optionnal:
fibers: numpy 1D array of int with fiber indices (between 0 and 500)
max_number_of_lines: max number of emission lines to consider (per fiber)
Returns:
x,y : two 1D arrays of floats with CCD coordinates of spots
"""
x=[]
y=[]
# get list of arc lamp spots from arc lamp spectrum
log = get_logger()
srch_file = "data/spec-arc-lamps.dat"
if not resources.files('desispec').joinpath(srch_file).is_file():
log.error("Cannot find arc lamps spectrum file {:s}".format(srch_file))
raise RuntimeError("Cannot find arc lamps spectrum file {:s}".format(srch_file))
spectrum_filename = resources.files('desispec').joinpath(srch_file)
tmp=np.loadtxt(spectrum_filename).T
ref_wave=tmp[0]
ref_spectrum=tmp[1]
log.info("read reference spectrum in %s with %d entries"%(spectrum_filename,ref_wave.size))
# convolve to get one peak per line
dw=np.median(np.gradient(ref_wave))
sigma=2. #A
hw=int(5*sigma/dw)+1
x=np.arange(-hw,hw+1)*dw
kernel=np.exp(-x**2/2/sigma**2)
ref_spectrum=fftconvolve(ref_spectrum,kernel,mode='same')
medflux=np.median(ref_spectrum)
nmad=1.4*np.median(np.abs(ref_spectrum-medflux))
maxflux=np.max(ref_spectrum)
threshold=min(medflux+10*nmad,maxflux/5.)
while (threshold<maxflux/2.) :
peaks=np.zeros(ref_spectrum.shape)
peaks[2:-2] = (ref_spectrum[2:-2]>threshold)\
*(ref_spectrum[2:-2]>ref_spectrum[3:-1])\
*(ref_spectrum[2:-2]>ref_spectrum[1:-3])\
*(ref_spectrum[2:-2]>ref_spectrum[4:])\
*(ref_spectrum[2:-2]>ref_spectrum[:-4])
peaks=np.where(peaks>0)[0]
if(peaks.size<=max_number_of_lines) :
break
threshold *= 1.5
# wavelength of peaks (will refine after selection)
peak_wavelengths=ref_wave[peaks]
inccd=np.zeros(peak_wavelengths.shape,dtype=bool)
for fiber in [0,traceset.nspec//2,traceset.nspec-1] :
y=traceset.y_vs_wave(fiber,peak_wavelengths)
inccd |= ( (y>0) & (y<traceset.npix_y))
peaks=peaks[inccd]
# refine wavelength with barycenter
peak_wavelengths=np.zeros(peaks.size)
for i,p in enumerate(peaks) :
peak_wavelengths[i] = np.sum(ref_wave[p-1:p+2]*ref_spectrum[p-1:p+2])/np.sum(ref_spectrum[p-1:p+2])
# create list of dots
xspot=[]
yspot=[]
if fibers is None :
fibers = np.arange(traceset.nspec)
for fiber in fibers :
x=traceset.x_vs_wave(fiber,peak_wavelengths)
y=traceset.y_vs_wave(fiber,peak_wavelengths)
ok=((y>0)&(y<traceset.npix_y))
xspot.append(x[ok])
yspot.append(y[ok])
xspot=np.hstack(xspot)
yspot=np.hstack(yspot)
return xspot,yspot
def compute_x_offset_from_central_band_cross_dispersion_profile(tset, image, fibers=None,halfwidth=50) :
log=get_logger()
bands=7 # measure delta_x in 7 horizontal bands of 100 pixel wide each and return the median offset
bandwidth=100
hb=bands//2
n0=image.pix.shape[0]
n1=image.pix.shape[1]
ivar=image.ivar*(image.mask==0)
if fibers is None :
fibers = np.arange(tset.nspec)
delta_x_array=[]
for b in range(-hb,hb+1) :
begin=int(n0//2+(b-0.5)*bandwidth)
end=begin+bandwidth
sw=np.sum(ivar[begin:end,:],axis=0)
swf=np.sum(image.pix[begin:end,:]*ivar[begin:end,:])
prof=(swf/(sw+(sw==0)))
y=int(n0//2+b*bandwidth)
xc=np.array([tset.x_vs_y(fiber,y) for fiber in fibers])
oversample=4
ox=np.arange(n1*oversample)/oversample
xx=np.arange(n1)
sigma=1.5 # pixel, approximate
mprof=np.exp(-(ox[:,None]-xc[None,:])**2/2./sigma**2).sum(axis=1)
norm = 1./np.sqrt(np.sum(prof**2)*(np.sum(mprof**2)/oversample))
ddx=np.linspace(-halfwidth,halfwidth,4*halfwidth+1)
xcorr=np.zeros(ddx.size)
for i,dx in enumerate(ddx) :
xcorr[i]=np.sum(prof*np.interp(xx,ox+dx,mprof))
xcorr *= norm
imax=np.argmax(xcorr)
log.info("horizontal band {}:{} delta x = {:.1f} pixels".format(begin,end,ddx[imax]))
delta_x_array.append(ddx[imax])
delta_x = np.median(delta_x_array)
log.info("median delta x = {:.1f} pixels".format(delta_x))
return delta_x