Source code for desispec.fluxcalibration

"""
desispec.fluxcalibration
========================

Flux calibration routines.
"""
from __future__ import absolute_import
import numpy as np
from .resolution import Resolution
from .linalg import cholesky_solve, cholesky_solve_and_invert, spline_fit
from .interpolation import resample_flux
from desiutil.log import get_logger
from .io.filters import load_legacy_survey_filter
from desispec import util
from desispec.frame import Frame
from desispec.io.fluxcalibration import read_average_flux_calibration
from desispec.calibfinder import findcalibfile
from desitarget.targets import main_cmx_or_sv
from desispec.fiberfluxcorr import flat_to_psf_flux_correction,psf_to_fiber_flux_correction
from desispec.gpu import is_gpu_available, NoGPU
import scipy, scipy.sparse, scipy.ndimage
import sys
import time
from astropy import units
import multiprocessing
from importlib import resources
import numpy.linalg
import copy

try:
    from scipy import constants
    C_LIGHT = constants.c/1000.0
except TypeError: # This can happen during documentation builds.
    C_LIGHT = 299792458.0/1000.0

[docs]def isStdStar(fibermap, bright=None): """ Determines if target(s) are standard stars Args: fibermap: table including DESI_TARGET or SV1_DESI_TARGET bit mask(s) Optional: bright: if True, only bright time standards; if False, only darktime, otherwise both Returns bool or array of bool """ log = get_logger() target_colnames, target_masks, survey = main_cmx_or_sv(fibermap) desi_target = fibermap[target_colnames[0]] # (SV1_)DESI_TARGET desi_mask = target_masks[0] # (sv1_)desi_mask if survey != 'cmx': mws_target = fibermap[target_colnames[2]] # (SV1_)MWS_TARGET mws_mask = target_masks[2] # (sv1_)mws_mask # mapping of which stdstar bits to use depending upon `bright` input # NOTE: STD_WD and GAIA_STD_WD not yet included in stdstar fitting desiDict ={ None:['STD_FAINT','STD_BRIGHT', 'SV0_STD_FAINT', 'SV0_STD_BRIGHT'], True: ['STD_BRIGHT', 'SV0_STD_BRIGHT'], False: ['STD_FAINT', 'SV0_STD_FAINT'] } mwsDict ={ None:['GAIA_STD_FAINT','GAIA_STD_BRIGHT'], True:['GAIA_STD_BRIGHT'], False:['GAIA_STD_FAINT'], } yes = np.zeros_like(desi_target, dtype=bool) for k in desiDict[bright]: if k in desi_mask.names(): yes = yes | ((desi_target & desi_mask[k])!=0) if survey != 'cmx': yes_mws = np.zeros_like(desi_target, dtype=bool) for k in mwsDict[bright]: if k in mws_mask.names(): yes_mws |= ((mws_target & mws_mask[k])!=0) yes = yes | yes_mws #- Hack for data on 20201214 where some fiberassign files had targeting #- bits set to 0, but column FA_TYPE was still ok #- Hardcode mask to avoid fiberassign dependency loop FA_STDSTAR_MASK = 2 # fiberassing.targets.TARGET_TYPE_STANDARD if np.count_nonzero(yes) == 0: log.error(f'No standard stars found in {target_colnames[0]} or {target_colnames[2]}') if 'FA_TYPE' in fibermap.dtype.names and \ np.sum((fibermap['FA_TYPE'] & FA_STDSTAR_MASK) != 0) > 0: log.warning('Using FA_TYPE to find standard stars instead') yes = (fibermap['FA_TYPE'] & FA_STDSTAR_MASK) != 0 return yes
[docs]def applySmoothingFilter(flux,width=200) : """ Return a smoothed version of the input flux array using a median filter Args: flux : 1D array of flux width : size of the median filter box Returns: smooth_flux : median filtered flux of same size as input """ # it was checked that the width of the median_filter has little impact on best fit stars # smoothing the ouput (with a spline for instance) does not improve the fit if is_gpu_available(): import cupy import cupyx.scipy.ndimage # move flux array to device device_flux = cupy.asarray(flux) # smooth flux array using median filter device_smoothed = cupyx.scipy.ndimage.median_filter(device_flux, width, mode='constant') # move smoothed flux array by to host and return return cupy.asnumpy(device_smoothed) else: return scipy.ndimage.median_filter(flux, width, mode='constant')
# # Import some global constants. # # Why not use astropy constants? # # This is VERY inconvenient when trying to build documentation! # The documentation may be build in an environment that does not have # scipy installed. There is no obvious reason why this has to be a module-level # calculation. # import scipy.constants as const h=const.h pi=const.pi e=const.e c=const.c erg=const.erg try: hc = const.h/const.erg*const.c*1.e10 # (in units of ergsA) except TypeError: hc = 1.9864458241717586e-08
[docs]def resample_template(data_wave_per_camera,resolution_obj_per_camera,template_wave,template_flux,template_id) : """Resample a spectral template on the data wavelength grid. Then convolve the spectra by the resolution for each camera. Also returns the result of applySmoothingFilter. This routine is used internally in a call to multiprocessing.Pool. Args: data_wave_per_camera : A dictionary of 1D array of vacuum wavelengths [Angstroms], one entry per camera and exposure. resolution_obj_per_camera : A dictionary of Resolution objects corresponding for the fiber, one entry per camera and exposure. template_wave : 1D array, input spectral template wavelength [Angstroms] (arbitrary spacing). template_flux : 1D array, input spectral template flux density. template_id : int, template identification index, used to ensure matching of input/output after a multiprocessing run. Returns: template_id : int, template identification index, same as input. output_wave : A dictionary of 1D array of vacuum wavelengths output_flux : A dictionary of 1D array of output template flux output_norm : A dictionary of 1D array of output template smoothed flux """ output_wave=np.array([]) output_flux=np.array([]) output_norm=np.array([]) sorted_keys = list(data_wave_per_camera.keys()) sorted_keys.sort() # force sorting the keys to agree with data (found unpredictable ordering in tests) for cam in sorted_keys : flux1=resample_flux(data_wave_per_camera[cam],template_wave,template_flux) # this is slow flux2=resolution_obj_per_camera[cam].dot(flux1) norme=applySmoothingFilter(flux2) # this is fast flux3=flux2/(norme+(norme==0)) output_flux = np.append(output_flux,flux3) output_norm = np.append(output_norm,norme) output_wave = np.append(output_wave,data_wave_per_camera[cam]) # need to add wave to avoid wave/flux matching errors return template_id,output_wave,output_flux,output_norm
[docs]def _func(arg) : """ Used for multiprocessing.Pool """ return resample_template(**arg)
[docs]def _smooth_template(template_id,camera_index,template_flux) : """ Used for multiprocessing.Pool """ norme = applySmoothingFilter(template_flux) return template_id,camera_index,norme
[docs]def _func2(arg) : """ Used for multiprocessing.Pool """ return _smooth_template(**arg)
[docs]def redshift_fit(wave, flux, ivar, resolution_data, stdwave, stdflux, z_max=0.005, z_res=0.00005, template_error=0.): """ Redshift fit of a single template Args: wave : A dictionary of 1D array of vacuum wavelengths [Angstroms]. Example below. flux : A dictionary of 1D observed flux for the star ivar : A dictionary 1D inverse variance of flux resolution_data: resolution corresponding to the star's fiber stdwave : 1D standard star template wavelengths [Angstroms] stdflux : 1D[nwave] template flux z_max : float, maximum blueshift and redshift in scan, has to be positive z_res : float, step of of redshift scan between [-z_max,+z_max] template_error : float, assumed template flux relative error Returns: redshift : redshift of standard star Notes: - wave and stdwave can be on different grids that don't necessarily overlap - wave does not have to be uniform or monotonic. Multiple cameras can be supported by concatenating their wave and flux arrays """ cameras = list(flux.keys()) log = get_logger() log.debug(time.asctime()) # resampling on a log wavelength grid ##################################### # need to go fast so we resample both data and model on a log grid # define grid minwave = 100000. maxwave = 0. for cam in cameras : minwave=min(minwave,np.min(wave[cam])) maxwave=max(maxwave,np.max(wave[cam])) # ala boss lstep=np.log10(1+z_res) margin=int(np.log10(1+z_max)/lstep)+1 minlwave=np.log10(minwave) maxlwave=np.log10(maxwave) # desired, but readjusted nstep=(maxlwave-minlwave)/lstep resampled_lwave=minlwave+lstep*np.arange(nstep) resampled_wave=10**resampled_lwave # map data on grid resampled_data={} resampled_ivar={} resampled_model={} for cam in cameras : tmp_flux,tmp_ivar=resample_flux(resampled_wave,wave[cam],flux[cam],ivar[cam]) resampled_data[cam]=tmp_flux resampled_ivar[cam]=tmp_ivar # we need to have the model on a larger grid than the data wave for redshifting dwave=wave[cam][-1]-wave[cam][-2] npix=int((wave[cam][-1]*z_max)/dwave+2) extended_cam_wave=np.append( wave[cam][0]+dwave*np.arange(-npix,0) , wave[cam]) extended_cam_wave=np.append( extended_cam_wave, wave[cam][-1]+dwave*np.arange(1,npix+1)) # ok now we also need to increase the resolution tmp_res=np.zeros((resolution_data[cam].shape[0],resolution_data[cam].shape[1]+2*npix)) tmp_res[:,:npix] = np.tile(resolution_data[cam][:,0],(npix,1)).T tmp_res[:,npix:-npix] = resolution_data[cam] tmp_res[:,-npix:] = np.tile(resolution_data[cam][:,-1],(npix,1)).T # resampled model at camera resolution, with margin tmp=resample_flux(extended_cam_wave,stdwave,stdflux) tmp=Resolution(tmp_res).dot(tmp) # map on log lam grid resampled_model[cam]=resample_flux(resampled_wave,extended_cam_wave,tmp) # we now normalize both model and data tmp=applySmoothingFilter(resampled_data[cam]) resampled_data[cam]/=(tmp+(tmp==0)) resampled_ivar[cam]*=tmp**2 if template_error>0 : ok=np.where(resampled_ivar[cam]>0)[0] if ok.size > 0 : resampled_ivar[cam][ok] = 1./ ( 1/resampled_ivar[cam][ok] + template_error**2 ) tmp=applySmoothingFilter(resampled_model[cam]) resampled_model[cam]/=(tmp+(tmp==0)) resampled_ivar[cam]*=(tmp!=0) # fit the best redshift chi2=np.zeros((2*margin+1)) ndata=np.zeros((2*margin+1)) for i in range(-margin,margin+1) : for cam in cameras : ndata[i+margin] += np.sum(resampled_ivar[cam][margin:-margin]>0) if i<margin : chi2[i+margin] += np.sum(resampled_ivar[cam][margin:-margin]*(resampled_data[cam][margin:-margin]-resampled_model[cam][margin+i:-margin+i])**2) else : chi2[i+margin] += np.sum(resampled_ivar[cam][margin:-margin]*(resampled_data[cam][margin:-margin]-resampled_model[cam][margin+i:])**2) i=np.argmin(chi2)-margin z=10**(-i*lstep)-1 log.debug("Best z=%f"%z) ''' log.debug("i=%d"%i) log.debug("lstep=%f"%lstep) log.debug("margin=%d"%margin) plt.figure() #plt.plot(chi2) for cam in cameras : ok=np.where(resampled_ivar[cam]>0)[0] #plt.plot(resampled_wave[ok],resampled_data[cam][ok],"o",c="gray") plt.errorbar(resampled_wave[ok],resampled_data[cam][ok],1./np.sqrt(resampled_ivar[cam][ok]),fmt="o",color="gray") plt.plot(resampled_wave[margin:-margin],resampled_model[cam][margin+i:-margin+i],"-",c="r") plt.show() ''' return z
[docs]def _compute_coef(coord,node_coords) : """ Function used by interpolate_on_parameter_grid2 Args: coord : 1D array of coordinates of size n_axis node_coords : 2D array of coordinates of nodes, shape = (n_nodes,n_axis) Returns: coef : 1D array of linear coefficients for each node, size = n_nodes """ n_nodes=node_coords.shape[0] npar=node_coords.shape[1] coef=np.ones(n_nodes) for s in range(n_nodes) : coef[s]=1. for a in range(npar) : dist=np.abs(node_coords[s,a]-coord[a]) # distance between model point and node along axis a # piece-wise linear version if dist>1 : coef[s]=0. break coef[s] *= (1.-dist) # we could alternatively have used b-spline of higher order norme=np.sum(coef) if norme<=0 : # we are outside of valid grid return np.zeros(coef.shape) # will be detected in fitter coef /= norme return coef
[docs]def interpolate_on_parameter_grid(data_wave, data_flux, data_ivar, template_flux, teff, logg, feh, template_chi2) : """ 3D Interpolation routine among templates based on a grid of parameters teff, logg, feh. The tricky part is to define a cube on the parameter grid populated with templates, and it is not always possible. The routine never extrapolates, so that we stay in the range of input parameters. Args: data_wave : 1D[nwave] array of wavelength (concatenated list of input wavelength of different cameras and exposures) data_flux : 1D[nwave] array of normalized flux = (input flux)/median_filter(input flux) (concatenated list) data_ivar : 1D[nwave] array of inverse variance of normalized flux template_flux : 2D[ntemplates,nwave] array of normalized flux of templates (after resample, convolution and division by median_filter) teff : 1D[ntemplates] logg : 1D[ntemplates] feh : 1D[ntemplates] template_chi2 : 1D[ntemplatess] array of precomputed chi2 = sum(data_ivar*(data_flux-template_flux)**2) Returns: coefficients : best fit coefficient of linear combination of templates chi2 : chi2 of the linear combination """ log = get_logger() log.debug("starting interpolation on grid") best_model_id = np.argmin(template_chi2) ndata=np.sum(data_ivar>0) log.debug("best model id=%d chi2/ndata=%f teff=%d logg=%2.1f feh=%2.1f"%(best_model_id,template_chi2[best_model_id]/ndata,teff[best_model_id],logg[best_model_id],feh[best_model_id])) ntemplates=template_flux.shape[0] log_linear = False # if True , model = exp( sum_i a_i * log(template_flux_i) ), else model = sum_i a_i * template_flux_i # physical parameters define axes npar=3 param=np.zeros((npar,ntemplates)) param[0]=teff param[1]=logg param[2]=feh # grid nodes coordinates (unique values of the parameters) uparam=[] for a in range(npar) : uparam.append(np.unique(param[a])) #for a in range(npar) : # log.debug("param %d : %s"%(a,str(uparam[a]))) node_grid_coords=np.zeros((npar,3)).astype(int) for a in range(npar) : # a is an axis # this is the coordinate on axis 'a' of the best node i=np.where(uparam[a]==param[a,best_model_id])[0][0] node_grid_coords[a]=np.array([i-1,i,i+1]) log.debug("node_grid_coords[%d]=%s"%(a,node_grid_coords[a])) # we don't always have a template on all nodes node_template_ids=[] node_cube_coords=[] for i0,j0 in zip(node_grid_coords[0],[-1,0,1]) : for i1,j1 in zip(node_grid_coords[1],[-1,0,1]) : for i2,j2 in zip(node_grid_coords[2],[-1,0,1]) : # check whether coord is in grid in_grid = (i0>=0)&(i0<uparam[0].size)&(i1>=0)&(i1<uparam[1].size)&(i2>=0)&(i2<uparam[2].size) if not in_grid : continue # check whether there is a template on this node selection=np.where((param[0]==uparam[0][i0])&(param[1]==uparam[1][i1])&(param[2]==uparam[2][i2]))[0] if selection.size == 0 : # no template on node log.debug("not template for params = %f,%f,%f"%(uparam[0][i0],uparam[1][i1],uparam[2][i2])) continue # we have one node_cube_coords.append([j0,j1,j2]) node_template_ids.append(selection[0]) node_template_ids=np.array(node_template_ids).astype(int) node_cube_coords=np.array(node_cube_coords).astype(int) # the parameters of the fit are npar coordinates in the range [-1,1] centered on best fit node coord=np.zeros(npar) n_templates = node_template_ids.size # we are done with the indexing and choice of template nodes node_template_flux = template_flux[node_template_ids] # compute all weighted scalar products among templates (only works if linear combination, not the log version) HB=np.zeros(n_templates) HA=np.zeros((n_templates,n_templates)) for t in range(n_templates) : HB[t] = np.sum(data_ivar*data_flux*node_template_flux[t]) for t2 in range(n_templates) : if HA[t2,t] != 0 : HA[t,t2] = HA[t2,t] else : HA[t,t2] = np.sum(data_ivar*node_template_flux[t]*node_template_flux[t2]) chi2_0 = np.sum(data_ivar*data_flux**2) # chi2 = np.sum(data_ivar*(data_flux-model)**2) # = chi2_0 - 2*np.sum(data_ivar*data_flux*model) + np.sum(data_ivar*model**2) # model = sum_i coef_i model_i # chi2 = chi2_0 - 2* sum_i coef_i * HB[i] + sum_ij coef_i * coef_j * HA[i,j] # chi2 = chi2_0 - 2*np.inner(coef,HB) + np.inner(coef,HA.dot(coef)) # initial state coef = _compute_coef(coord,node_cube_coords) chi2 = chi2_0 - 2*np.inner(coef,HB) + np.inner(coef,HA.dot(coef)) log.debug("init coord=%s chi2/ndata=%f"%(coord,chi2/ndata)) # now we have to do the fit # fitting one axis at a time (simultaneous fit of 3 axes was tested and found inefficient : rapidly stuck on edges) # it has to be iterative because the model is a non-linear combination of parameters w, ex: w[0]*(1-w[1])*(1-w[2]) for loop in range(50) : previous_chi2=chi2.copy() previous_coord=coord.copy() for a in range(npar) : previous_chi2_a=chi2.copy() # it's a linear combination of templates, but the model is non-linear function of coordinates # so there is no gain in trying to fit robustly with Gauss-Newton, we simply do a scan # it is converging rapidely (need however to iterate on axes) xcoord=coord.copy() xx=np.linspace(-1,1,41) # keep points on nodes , 41 is the resolution, 0.05 of node inter-distance chi2=np.zeros(xx.shape) for i,x in enumerate(xx) : xcoord[a]=x coef = _compute_coef(xcoord,node_cube_coords) if np.sum(coef)==0 : # outside valid range chi2[i]=1e20 else : chi2[i] = chi2_0 - 2*np.inner(coef,HB) + np.inner(coef,HA.dot(coef)) ibest=np.argmin(chi2) chi2=chi2[ibest] coord[a]=xx[ibest] log.debug("loop #%d coord=%s chi2/ndata=%f (-dchi2_loop=%f -dchi2_tot=%f)"%(loop,coord,chi2/ndata,previous_chi2-chi2,template_chi2[best_model_id]-chi2)) diff=np.max(np.abs(coord-previous_coord)) if diff < 0.001 : break # finally perform an exact best fit per axis for loop in range(50) : previous_chi2=chi2.copy() previous_coord=coord.copy() for a in range(npar) : if coord[a]==-1 or coord[a]==1 : continue # we are on edge, no gain in refitting xcoord=coord.copy() coef_minus = _compute_coef(xcoord,node_cube_coords) eps=0.001 xcoord[a] += eps coef_plus = _compute_coef(xcoord,node_cube_coords) dcoef_dcoord = (coef_plus-coef_minus)/eps # do a numeric derivative #log.debug("dcoef_dcoord=%s"%dcoef_dcoord) B = np.inner(dcoef_dcoord,HB) - np.inner(dcoef_dcoord,HA.dot(coef_minus)) A = np.inner(dcoef_dcoord,HA.dot(dcoef_dcoord)) if A>0 : dcoord=B/A #log.debug("dcoord=%f"%dcoord) tmp_coord=coord.copy() tmp_coord[a] += dcoord if tmp_coord[a]<-1 or tmp_coord[a]>1 : #log.debug("do not allow extrapolations") continue coef = _compute_coef(tmp_coord,node_cube_coords) tmp_chi2 = chi2_0 - 2*np.inner(coef,HB) + np.inner(coef,HA.dot(coef)) if tmp_chi2 < chi2 : log.debug("Improved chi2 by %f with a shift along %d of %f"%(chi2-tmp_chi2,a,dcoord)) coord=tmp_coord chi2 = tmp_chi2 diff=np.max(np.abs(coord-previous_coord)) if diff < 0.001 : break coef = _compute_coef(coord,node_cube_coords) chi2 = chi2_0 - 2*np.inner(coef,HB) + np.inner(coef,HA.dot(coef)) input_number_of_templates=template_flux.shape[0] final_coefficients=np.zeros(input_number_of_templates) final_coefficients[node_template_ids]=coef log.debug("COORD=%s"%coord) log.debug("COEF=%s"%coef) #for i in np.where(final_coefficients>0)[0] : # log.debug("TEFF[%d]=%f"%(i,teff[i])) # log.debug("LOGG[%d]=%f"%(i,logg[i])) # log.debug("FEH[%d]=%f"%(i,feh[i])) log.debug("TEFF=%f"%np.inner(final_coefficients,teff)) log.debug("LOGG=%f"%np.inner(final_coefficients,logg)) log.debug("FEH=%f"%np.inner(final_coefficients,feh)) log.debug("Contributing template Ids=%s"%np.where(final_coefficients!=0)[0]) ''' # useful debugging plot import matplotlib.pyplot as plt plt.figure() ok=np.where(data_ivar>0)[0] ii=np.argsort(data_wave[ok]) twave=data_wave[ok][ii] tflux=data_flux[ok][ii] tivar=data_ivar[ok][ii] #plt.errorbar(twave,tflux,1./np.sqrt(tivar),fmt="o") plt.plot(twave,tflux,".",c="gray",alpha=0.2) dw=np.min(twave[twave>twave[0]+0.5]-twave[0]) bins=np.linspace(twave[0],twave[-1],(twave[-1]-twave[0])/dw+1) sw,junk=np.histogram(twave,bins=bins,weights=tivar) swx,junk=np.histogram(twave,bins=bins,weights=tivar*twave) swy,junk=np.histogram(twave,bins=bins,weights=tivar*tflux) tflux=swy[sw>0]/sw[sw>0] twave2=swx[sw>0]/sw[sw>0] terr=1./np.sqrt(sw[sw>0]) plt.errorbar(twave2,tflux,terr,fmt="o",alpha=0.5) model = np.zeros(data_flux.shape) for c,t in zip(coef,node_template_flux) : model += c*t plt.plot(twave,model[ok][ii],"-",c="r") plt.show() ''' return final_coefficients,chi2
[docs]def match_templates(wave, flux, ivar, resolution_data, stdwave, stdflux, teff, logg, feh, ncpu=1, z_max=0.005, z_res=0.00002, template_error=0, comm=None): """For each input spectrum, identify which standard star template is the closest match, factoring out broadband throughput/calibration differences. Args: wave : A dictionary of 1D array of vacuum wavelengths [Angstroms]. Example below. flux : A dictionary of 1D observed flux for the star ivar : A dictionary 1D inverse variance of flux resolution_data: resolution corresponding to the star's fiber stdwave : 1D standard star template wavelengths [Angstroms] stdflux : 2D[nstd, nwave] template flux teff : 1D[nstd] effective model temperature logg : 1D[nstd] model surface gravity feh : 1D[nstd] model metallicity ncpu : number of cpu for multiprocessing comm : MPI communicator; if given, ncpu will be ignored and only rank 0 will return results that are not None Returns: coef : numpy.array of linear coefficient of standard stars redshift : redshift of standard star chipdf : reduced chi2 Notes: - wave and stdwave can be on different grids that don't necessarily overlap - wave does not have to be uniform or monotonic. Multiple cameras can be supported by concatenating their wave and flux arrays """ # I am treating the input arguments from three frame files as dictionary. For example # wave{"r":rwave,"b":bwave,"z":zwave} # Each data(3 channels) is compared to every model. # flux should be already flat fielded and sky subtracted. cameras = list(flux.keys()) log = get_logger() log.debug(time.asctime()) # Initialize MPI if comm is not None: from mpi4py import MPI size, rank = comm.Get_size(), comm.Get_rank() log.debug("Using MPI. rank: {}, size: {}".format(rank, size)) # fit continuum and save it continuum={} for cam in wave.keys() : tmp=applySmoothingFilter(flux[cam]) # this is fast continuum[cam] = tmp # mask out wavelength that could bias the fit log.debug("mask potential cosmics (3 sigma positive fluctuations)") for cam in wave.keys() : ok=np.where((ivar[cam]>0))[0] if ok.size>0 : ivar[cam][ok] *= (flux[cam][ok]<(continuum[cam][ok]+3/np.sqrt(ivar[cam][ok]))) log.debug("mask sky lines") # in vacuum # mask blue lines that can affect fit of Balmer series # line at 5577 has a stellar line close to it ! # line at 7853. has a stellar line close to it ! # mask everything above 8270A because it can bias the star redshift # all of this is based on analysis of a few exposures of BOSS data # in vacuum skylines=np.array([4047.5,4359.3,5462.3,5578.9,5891.3,5897.3,6301.8,6365.4,7823.3,7855.2]) hw=6. # A for cam in wave.keys() : for line in skylines : ivar[cam][(wave[cam]>=(line-hw))&(wave[cam]<=(line+hw))]=0. ivar[cam][wave[cam]>8270]=0. # mask telluric lines srch_filename = "data/arc_lines/telluric_lines.txt" if not resources.files('desispec').joinpath(srch_filename).is_file(): log.error("Cannot find telluric mask file {:s}".format(srch_filename)) raise Exception("Cannot find telluric mask file {:s}".format(srch_filename)) telluric_mask_filename = resources.files('desispec').joinpath(srch_filename) telluric_features = np.loadtxt(telluric_mask_filename) log.debug("Masking telluric features from file %s"%telluric_mask_filename) for cam in wave.keys() : for feature in telluric_features : ivar[cam][(wave[cam]>=feature[0])&(wave[cam]<=feature[1])]=0. # add error propto to flux to account for model error if template_error>0 : for cam in wave.keys() : ok=np.where(ivar[cam]>0)[0] if ok.size>0 : ivar[cam][ok] = 1./ ( 1./ivar[cam][ok] + (template_error*continuum[cam][ok] )**2 ) # normalize data and store them in single array data_wave=np.array([]) data_flux=np.array([]) data_continuum=np.array([]) data_ivar=np.array([]) data_index=np.array([]) sorted_keys = list(wave.keys()) sorted_keys.sort() # force sorting the keys to agree with models (found unpredictable ordering in tests) for index,cam in enumerate(sorted_keys) : data_index=np.append(data_index,np.ones(wave[cam].size)*index) data_wave=np.append(data_wave,wave[cam]) data_flux=np.append(data_flux,flux[cam]/(continuum[cam]+(continuum[cam]==0))) data_continuum=np.append(data_continuum,continuum[cam]) data_ivar=np.append(data_ivar,ivar[cam]*continuum[cam]**2) data_index=data_index.astype(int) ndata = np.sum(data_ivar>0) # start looking at models # find canonical f-type model: Teff=6000, logg=4, Fe/H=-1.5 canonical_model=np.argmin((teff-6000.0)**2+(logg-4.0)**2+(feh+1.5)**2) # fit redshift on canonical model # we use the original data to do this # because we resample both the data and model on a logarithmic grid in the routine if True : # mask Ca H&K lines. Present in ISM, can bias the stellar redshift fit log.debug("Mask ISM lines for redshift") ismlines=np.array([3934.77,3969.59]) hw=6. # A for cam in wave.keys() : for line in ismlines : ivar[cam][(wave[cam]>=(line-hw))&(wave[cam]<=(line+hw))]=0. z = redshift_fit(wave, flux, ivar, resolution_data, stdwave, stdflux[canonical_model], z_max, z_res) # now we go back to the model spectra , redshift them, resample, apply resolution, normalize and chi2 match ntemplates=stdflux.shape[0] # here we take into account the redshift once and for all shifted_stdwave=stdwave*(1+z) # convert resolution_data to sparse R once; re-use for every template R = dict() for cam in resolution_data: R[cam] = Resolution(resolution_data[cam]) func_args = [] # need to parallelize the model resampling for template_id in range(ntemplates) : arguments={"data_wave_per_camera":wave, "resolution_obj_per_camera":R, "template_wave":shifted_stdwave, "template_flux":stdflux[template_id], "template_id":template_id} func_args.append( arguments ) if comm is not None and comm.Get_size() > 1: # MPI mode & more than one rank per star results = list(map(_func, func_args[rank::size])) # All reduce here because we'll need to divide the work out again results = comm.allreduce(results, op=MPI.SUM) elif ncpu > 1: log.debug("Running pool(%d).map() for %d items", ncpu, len(func_args)); sys.stdout.flush() with NoGPU(): with multiprocessing.Pool(ncpu) as pool: results = pool.map(_func, func_args) log.debug("Finished pool.map()"); sys.stdout.flush() else: log.debug("Not using multiprocessing for {} cpus".format(ncpu)) results = [_func(x) for x in func_args] log.debug("Finished serial loop") # collect results # in case the exit of the multiprocessing pool is not ordered as the input # we returned the template_id template_flux=np.zeros((ntemplates,data_flux.size)) template_norm=np.zeros((ntemplates,data_flux.size)) for result in results : template_id = result[0] template_tmp_wave = result[1] template_tmp_flux = result[2] template_tmp_norm = result[3] mdiff=np.max(np.abs(data_wave-template_tmp_wave)) # just a safety check if mdiff>1.e-5 : log.error("error indexing of wave and flux somewhere above, checking if it's just an ordering issue, max diff=%f"%mdiff) raise ValueError("wavelength array difference cannot be fixed with reordering, ordered max diff=%f"%mdiff) template_flux[template_id] = template_tmp_flux template_norm[template_id] = template_tmp_norm # compute model chi2 template_chi2=np.zeros(ntemplates) for template_id in range(ntemplates) : template_chi2[template_id] = np.sum(data_ivar*(data_flux-template_flux[template_id])**2) best_model_id=np.argmin(template_chi2) best_chi2=template_chi2[best_model_id] log.debug("selected best model {} chi2/ndf {}".format(best_model_id, best_chi2/ndata)) # interpolate around best model using parameter grid coef,chi2 = interpolate_on_parameter_grid(data_wave, data_flux, data_ivar, template_flux, teff, logg, feh, template_chi2) log.debug("after interpolation chi2/ndf {}".format(chi2/ndata)) log.debug("use best fit to derive calibration and apply it to the templates before refitting the star ...") # the division by the median filtered spectrum leaves some imprint of the input transmission # so we will apply calibration to the model and redo the whole fit # to make sure this is not driving the stellar model selection. log.debug("remultiply template by their norme") template_flux *= template_norm log.debug("compute best fit model") model=np.zeros(data_wave.size) for c,t in zip(coef,template_flux) : if c>0 : model += c*t func_args=[] for index in np.unique(data_index) : log.debug("compute calib for cam index %d"%index) ii=np.where(data_index==index)[0] calib = (data_flux[ii]*data_continuum[ii])/(model[ii]+(model[ii]==0)) scalib = applySmoothingFilter(calib,width=400) min_scalib=0. bad=scalib<=min_scalib if np.sum(bad)>0 : scalib[bad]=min_scalib log.debug("multiply templates by calib for cam index %d"%index) template_flux[:,ii] *= scalib # apply this to all the templates and recompute median filter for t in range(template_flux.shape[0]) : arguments={"template_id":t,"camera_index":index,"template_flux":template_flux[t][ii]} func_args.append(arguments) if comm is not None and comm.Get_size() > 1: # MPI mode & more than one rank per star results = list(map(_func2, func_args[rank::size])) results = comm.reduce(results, op=MPI.SUM, root=0) elif ncpu > 1: log.debug("divide templates by median filters using multiprocessing.Pool of ncpu=%d", ncpu) with NoGPU(): with multiprocessing.Pool(ncpu) as pool: results = pool.map(_func2, func_args) log.debug("finished pool.map()"); sys.stdout.flush() else : log.debug("divide templates serially") results = [_func2(x) for x in func_args] log.debug("Finished serial loop") if comm is not None and rank != 0: # All the work left belongs to rank 0 return None # collect results for result in results : template_id = result[0] index = result[1] template_flux[template_id][data_index==index] /= (result[2] + (result[2]==0)) log.debug("refit the model ...") template_chi2=np.zeros(ntemplates) for template_id in range(ntemplates) : template_chi2[template_id] = np.sum(data_ivar*(data_flux-template_flux[template_id])**2) best_model_id=np.argmin(template_chi2) best_chi2=template_chi2[best_model_id] log.debug("selected best model {} chi2/ndf {}".format(best_model_id, best_chi2/ndata)) # interpolate around best model using parameter grid coef,chi2 = interpolate_on_parameter_grid(data_wave, data_flux, data_ivar, template_flux, teff, logg, feh, template_chi2) log.debug("after interpolation chi2/ndf {}".format(chi2/ndata)) return coef,z,chi2/ndata
[docs]def normalize_templates(stdwave, stdflux, mag, band, photsys): """Returns spectra normalized to input magnitudes. Args: stdwave : 1D array of standard star wavelengths [Angstroms] stdflux : 1D observed flux mag : float desired magnitude band : G,R,Z,W1 or W2 photsys : N or S (for Legacy Survey North or South) Returns: stdwave : same as input normflux : normalized flux array Only SDSS_r band is assumed to be used for normalization for now. """ log = get_logger() fluxunits = 1e-17 * units.erg / units.s / units.cm**2 / units.Angstrom filter_response=load_legacy_survey_filter(band,photsys) apMag=filter_response.get_ab_magnitude(stdflux*fluxunits,stdwave) scalefac=10**((apMag-mag)/2.5) log.debug('scaling mag {:.3f} to {:.3f} using scalefac {:.3f}'.format(apMag,mag, scalefac)) normflux=stdflux*scalefac return normflux
[docs]def compute_flux_calibration(frame, input_model_wave, input_model_flux, input_model_fibers, nsig_clipping=10., deg=2, debug=False, highest_throughput_nstars=0, exposure_seeing_fwhm=1.1, stdcheck=True, nsig_flux_scale=3) : """Compute average frame throughput based on data frame.(wave,flux,ivar,resolution_data) and spectro-photometrically calibrated stellar models (model_wave,model_flux). Wave and model_wave are not necessarily on the same grid Args: frame : Frame object with attributes wave, flux, ivar, resolution_data input_model_wave : 1D[nwave] array of model wavelengths input_model_flux : 2D[nstd, nwave] array of model fluxes input_model_fibers : 1D[nstd] array of model fibers nsig_clipping : (optional) sigma clipping level exposure_seeing_fwhm : (optional) seeing FWHM in arcsec of the exposure stdcheck: check if the model stars are actually standards according to the fibermap and only rely on those nsig_flux_scale: n sigma cutoff on the flux scale among standard stars Returns: desispec.FluxCalib object calibration: mean calibration (without resolution) Notes: - we first resample the model on the input flux wave grid - then convolve it to the data resolution (the input wave grid is supposed finer than the spectral resolution) - then iteratively - fit the mean throughput (deconvolved, this is needed because of sharp atmospheric absorption lines) - compute a scale factor to fibers (to correct for small mis-alignement for instance) - perform outlier rejection There is one subtelty with the relation between calibration and resolution. - The input frame flux is on average flux^frame_fiber = R_fiber*C*flux^true where C is the true calibration (or throughput) which is a function of wavelength. This is the system we solve. - But we want to return a calibration vector per fiber C_fiber defined by flux^cframe_fiber = flux^frame_fiber/C_fiber, such that flux^cframe can be compared with a convolved model of the truth, flux^cframe_fiber = R_fiber*flux^true, i.e. (R_fiber*C*flux^true)/C_fiber = R_fiber*true_flux, giving C_fiber = (R_fiber*C*flux^true)/(R_fiber*flux^true) - There is no solution for this for all possible input specta. The solution for a flat spectrum is returned, which is very close to C_fiber = R_fiber*C (but not exactly). """ log=get_logger() if frame.meta is not None and 'CAMERA' in frame.meta: camera = frame.meta['CAMERA'] else: camera = 'cam?' log.warning(f'CAMERA not in frame header; using {camera}') log.info(f"starting {camera} flux calibration") # add margin to frame def add_margin_2d_dim1(iarray,margin) : shape=(iarray.shape[0],iarray.shape[1]+2*margin) oarray=np.zeros(shape,dtype=iarray.dtype) oarray[:,:margin]=iarray[:,0][:,None] oarray[:,margin:-margin]=iarray oarray[:,-margin:]=iarray[:,-1][:,None] return oarray def add_margin_3d_dim2(iarray,margin) : shape=(iarray.shape[0],iarray.shape[1],iarray.shape[2]+2*margin) oarray=np.zeros(shape,dtype=iarray.dtype) oarray[:,:,:margin]=iarray[:,:,0][:,:,None] oarray[:,:,margin:-margin]=iarray oarray[:,:,-margin:]=iarray[:,:,-1][:,:,None] return oarray margin = 3 log.info(f"{camera} adding margin of {margin} pixels on each side") nwave=frame.wave.size dw=frame.wave[1]-frame.wave[0] wave_with_margin=np.zeros(nwave+2*margin) wave_with_margin[margin:nwave+margin]=frame.wave wave_with_margin[0:margin]=frame.wave[0]+dw*np.arange(-margin,0) wave_with_margin[nwave+margin:]=frame.wave[-1]+dw*np.arange(1,margin+1) tframe = copy.deepcopy(frame) tframe.wave = wave_with_margin tframe.nwave = tframe.wave.size tframe.flux = add_margin_2d_dim1(frame.flux,margin) tframe.ivar = add_margin_2d_dim1(frame.ivar,margin) tframe.mask = add_margin_2d_dim1(frame.mask,margin) tframe.resolution_data = add_margin_3d_dim2(frame.resolution_data,margin) tframe.R = np.array( [Resolution(r) for r in tframe.resolution_data] ) #- Compute point source flux correction and fiber flux correction log.info("{} compute point source flux correction for seeing FWHM = {:4.2f} arcsec".format( camera, exposure_seeing_fwhm)) point_source_correction = flat_to_psf_flux_correction(frame.fibermap,exposure_seeing_fwhm) good=(point_source_correction!=0) bad=~good #- Set temporary ivar=0 for the targets with bad point source correction tframe.ivar[bad]=0. if np.sum(good)>1 : log.info("{} point source correction mean = {} rms = {}, nbad = {}".format( camera, np.mean(point_source_correction[good]),np.std(point_source_correction[good]),np.sum(bad))) else : log.warning("{} bad point source correction for most fibers ngood = {} nbad = {}".format( camera, np.sum(good),np.sum(bad))) #- Set back to 1 the correction for bad fibers point_source_correction[bad]=1. #- Apply point source flux correction tframe.flux *= point_source_correction[:,None] #- Pull out just the standard stars for convenience, but keep the #- full frame of spectra around because we will later need to convolved #- the calibration vector for each fiber individually if stdcheck: stdfibers = np.where(isStdStar(tframe.fibermap))[0] assert len(stdfibers) > 0 if not np.all(np.in1d(stdfibers, input_model_fibers)): bad = set(input_model_fibers) - set(stdfibers) if len(bad) > 0: log.error('Discarding {} input_model_fibers that are not standards: {}'.format(camera, bad)) stdfibers = np.intersect1d(stdfibers, input_model_fibers) # also other way around stdfibers = np.intersect1d(input_model_fibers, stdfibers) else: stdfibers = input_model_fibers log.info(f"{camera} stdstar fibers: {stdfibers}") stdstars = tframe[stdfibers] nwave=stdstars.nwave nstds=stdstars.flux.shape[0] dwave=(stdstars.wave-np.mean(stdstars.wave))/(stdstars.wave[-1]-stdstars.wave[0]) # normalized wave for polynomial fit # resample model to data grid and convolve by resolution model_flux=np.zeros((nstds, nwave)) convolved_model_flux=np.zeros((nstds, nwave)) for star in range(nstds) : model_flux_index = np.where(input_model_fibers == stdfibers[star])[0][0] model_flux[star]=resample_flux(stdstars.wave,input_model_wave,input_model_flux[model_flux_index]) convolved_model_flux[star]=stdstars.R[star].dot(model_flux[star]) # check S/N before doing anything else ii=(stdstars.ivar>0)*(stdstars.mask==0) median_snr = np.median(stdstars.flux[ii]*np.sqrt(stdstars.ivar[ii])) log.info(f"{camera} median S/N per flux bin in stars= {median_snr}") if median_snr < 2. : log.warning(f"{camera} very low S/N, use simple scaled version of the average throughput") fluxcalib_filename = findcalibfile([frame.meta],"FLUXCALIB") log.warning(f"{camera} will scale throughput starting from {fluxcalib_filename}") acal = read_average_flux_calibration(fluxcalib_filename) if "AIRMASS" in frame.meta : airmass = frame.meta["AIRMASS"] else : airmass = 1.0 # apply heliocentric/barocentric correction if exists if 'HELIOCOR' in stdstars.meta : heliocor = stdstars.meta['HELIOCOR'] log.info(f"{camera} also apply heliocentric correction scale factor {heliocor} to calibration") acal.wave *= heliocor # now the calibration wave are also in the solar system frame # interpolate wavelength fluxcal = np.interp(stdstars.wave,acal.wave,acal.value(airmass=airmass,seeing=1.1),left=0,right=0) # it's not essential because we will fit a scale factor, but multiplying but the exposure time # as it should be will give us a meaningful value for the scale factor if "EXPTIME" in frame.meta : fluxcal *= frame.meta["EXPTIME"] # fit scale factor # use a median instead of an optimal fit here waveindices = np.where(fluxcal>0.5*np.median(fluxcal))[0] scale = np.median(stdstars.flux[:,waveindices]/(fluxcal[waveindices][None,:]*convolved_model_flux[:,waveindices]*point_source_correction[stdfibers,None])) log.info(f"{camera} scale factor = {scale:4.3f}") minscale = 0.0001 if scale<minscale : scale=minscale log.warning(f"{camera} force scale factor = {scale:5.4f}") fluxcal *= scale # return FluxCalib object stdstars.wave = stdstars.wave[margin:-margin] fluxcal = fluxcal[margin:-margin] nfibers = tframe.flux.shape[0] ccalibration = np.tile(fluxcal,(nfibers,1)) ccalibivar = 1/(ccalibration**2+(ccalibration==0)) # 100% uncertainty! mask = np.ones(ccalibration.shape,dtype=int) fibercorr = dict() fibercorr_comments = dict() fibercorr["FLAT_TO_PSF_FLUX"]=point_source_correction fibercorr_comments["FLAT_TO_PSF_FLUX"]="correction for point sources already applied to the flux vector" fibercorr["PSF_TO_FIBER_FLUX"]=psf_to_fiber_flux_correction(frame.fibermap,exposure_seeing_fwhm) fibercorr_comments["PSF_TO_FIBER_FLUX"]="multiplication correction to apply to get the fiber flux spectrum" return FluxCalib(stdstars.wave, ccalibration, ccalibivar, mask, fluxcal, fibercorr=fibercorr) input_model_flux = None # I shall not use any more the input_model_flux here # iterative fitting and clipping to get precise mean spectrum current_ivar=stdstars.ivar*(stdstars.mask==0) #- Start with a first pass median rejection calib = (convolved_model_flux!=0)*(stdstars.flux/(convolved_model_flux + (convolved_model_flux==0))) median_calib = np.median(calib, axis=0) # Fit one normalization per fiber, and 10% model error to variance, and perform first outlier rejection scale=np.ones((nstds)) chi2=np.zeros((stdstars.flux.shape)) badfiber=np.zeros(nstds,dtype=int) number_of_stars_with_negative_correction = 0 for star in range(nstds) : if badfiber[star] : continue if np.sum(current_ivar[star]) == 0 : log.warning(f"{camera} null inverse variance for star {star}") badfiber[star] = 1 continue M = median_calib*stdstars.R[star].dot(model_flux[star]) a = np.sum(current_ivar[star]*M**2) b = np.sum(current_ivar[star]*stdstars.flux[star]*M) if a>0 : scale[star] = b/a else : scale[star] = 0 log.info("{} star {} initial scale = {}".format(camera, star, scale[star])) if scale[star] < 0.2 : log.warning("{} ignore star {} with scale = {}".format(camera, star, scale[star])) badfiber[star] = 1 current_ivar[star]=0. continue # add generous multiplicative error to ivar for sigma clipping (because we only want to discard cosmics or bad pixels here) current_ivar[star] = (current_ivar[star]>0)/(1./(current_ivar[star] + (current_ivar[star]==0))+(0.2*stdstars.flux[star])**2) chi2[star]=current_ivar[star]*(stdstars.flux[star]-scale[star]*M)**2 # normalize to preserve the average throughput # throughput = < data/(model*scale) > # but we would like to have throughput = < data/model > # (we don't do that directly to reduce noise) # so we want to average the inverse of the scale mscale=1./np.mean(1./scale[badfiber==0]) log.info(f"{camera} mean scale = {mscale:4.3f}") scale /= mscale median_calib *= mscale bad=(chi2>nsig_clipping**2) current_ivar[bad] = 0 sqrtw=np.sqrt(current_ivar) sqrtwflux=np.sqrt(current_ivar)*stdstars.flux # diagonal sparse matrices D1=scipy.sparse.lil_matrix((nwave,nwave)) D2=scipy.sparse.lil_matrix((nwave,nwave)) nout_tot=0 for iteration in range(20) : # fit mean calibration A=scipy.sparse.lil_matrix((nwave,nwave)).tocsr() B=np.zeros((nwave)) # loop on star to handle resolution for star in range(nstds) : if badfiber[star]: continue R = stdstars.R[star] # diagonal sparse matrix with content = sqrt(ivar)*flat D1.setdiag(sqrtw[star]*scale[star]) D2.setdiag(model_flux[star]) sqrtwmodelR = D1.dot(R.dot(D2)) # chi2 = sum (sqrtw*data_flux -diag(sqrtw)*scale*R*diag(model_flux)*calib ) A = A+(sqrtwmodelR.T*sqrtwmodelR).tocsr() B += sqrtwmodelR.T*sqrtwflux[star] if np.sum(current_ivar>0)==0 : msg = f"{camera} null ivar, cannot calibrate this frame" log.error(msg) raise ValueError(msg) #- Add a weak prior that calibration = median_calib #- to keep A well conditioned minivar = np.min(current_ivar[current_ivar>0]) log.debug('%s min(ivar[ivar>0]) = %f', camera, minivar) epsilon = minivar/10000 A = epsilon*np.eye(nwave) + A #- converts sparse A -> dense A B += median_calib*epsilon log.info("%s iter %d solving", camera, iteration) w = np.diagonal(A)>0 A_pos_def = A[w,:] A_pos_def = A_pos_def[:,w] calibration = B*0 try: calibration[w]=cholesky_solve(A_pos_def, B[w]) except np.linalg.linalg.LinAlgError : log.info('{} cholesky fails in iteration {}, trying svd'.format(camera, iteration)) calibration[w] = np.linalg.lstsq(A_pos_def,B[w])[0] wmask = (np.diagonal(A)<=0) if np.sum(wmask)>0 : wmask = wmask.astype(float) wmask = R.dot(R.dot(wmask)) bad = np.where(wmask!=0)[0] log.info("{} nbad={}".format(camera, bad.size)) good = np.where(wmask==0)[0] calibration[bad] = np.interp(bad,good,calibration[good],left=0,right=0) log.info("%s iter %d fit scale per fiber", camera, iteration) for star in range(nstds) : if badfiber[star]: continue M = stdstars.R[star].dot(calibration*model_flux[star]) a = np.sum(current_ivar[star]*M**2) b = np.sum(current_ivar[star]*stdstars.flux[star]*M) if a>0 : scale[star] = b/a else : scale[star] = 0 log.debug("%s iter %d star %d scale = %f", camera, iteration, star, scale[star]) if scale[star] < 0.2 : log.warning("{} iter {} ignore star {} with scale = {}".format(camera, iteration,star,scale[star])) badfiber[star] = 1 current_ivar[star]=0. continue chi2[star]=current_ivar[star]*(stdstars.flux[star]-scale[star]*M)**2 log.info(f"{camera} iter {iteration} rejecting") nout_iter=0 if iteration<1 : # only remove worst outlier per wave # apply rejection iteratively, only one entry per wave among stars # find waves with outlier (fastest way) nout_per_wave=np.sum(chi2>nsig_clipping**2,axis=0) selection=np.where(nout_per_wave>0)[0] for i in selection : worst_entry=np.argmax(chi2[:,i]) current_ivar[worst_entry,i]=0 sqrtw[worst_entry,i]=0 #sqrtwmodel[worst_entry,i]=0 sqrtwflux[worst_entry,i]=0 nout_iter += 1 else : # remove all of them at once bad=(chi2>nsig_clipping**2) current_ivar *= (bad==0) sqrtw *= (bad==0) #sqrtwmodel *= (bad==0) sqrtwflux *= (bad==0) nout_iter += np.sum(bad) nout_tot += nout_iter sum_chi2=float(np.sum(chi2)) ndf=int(np.sum(chi2>0)-nwave-nstds*2) chi2pdf=0. if ndf>0 : chi2pdf=sum_chi2/ndf # see comment above about why we perform this averaging mscale=1./np.mean(1./scale[badfiber==0]) if highest_throughput_nstars > 0 : log.info("{} keeping {} stars with highest throughput".format(camera, highest_throughput_nstars)) ii=np.argsort(scale)[::-1][:highest_throughput_nstars] log.info("{} use those fibers = {}".format(camera, stdfibers[ii])) log.info("{} with median correction = {}".format(camera, medcorr[ii])) mscale=1./np.mean(1./scale[ii][badfiber[ii]==0]) else : medscale = np.median(scale[badfiber==0]) rmscorr = 1.4*np.median(np.abs(scale[badfiber==0]-medscale)) log.info("{} iter {} mean median rms scale = {:4.3f} {:4.3f} {:4.3f}".format( camera,iteration,mscale,medscale,rmscorr)) if nsig_flux_scale>0 : bad=(np.abs(scale-medscale)> nsig_flux_scale*rmscorr) else : bad=np.repeat(False,scale.size) if np.sum(bad)>0 : good=~bad log.info("{} iter {} use {} stars, discarding {} sigma outlier stars with medcorr = {}".format( camera, iteration,np.sum(good),nsig_flux_scale,scale[bad])) mscale=1./np.mean(1./scale[good][badfiber[good]==0]) newly_bad = bad&(badfiber==0)&(np.abs(scale-1)>0.05) if np.sum(newly_bad)>0 : log.info("{} newly bad fiber(s) at scale(s) = {}".format(camera, list(scale[newly_bad]))) i=np.argmax(np.abs(scale[newly_bad]-1)) new_badfiber_index = np.where(newly_bad)[0][i] log.info("{} set worst fiber ({}) as badfiber".format(camera, new_badfiber_index)) badfiber[new_badfiber_index] = 1 else : log.info("{} iter {} use {} stars".format(camera, iteration,np.sum(badfiber==0))) scale /= mscale calibration *= mscale log.info("%s iter %d chi2=%4.3f ndf=%d chi2pdf=%4.3f nout=%d mscale=%4.3f", camera, iteration, sum_chi2, ndf, chi2pdf, nout_iter, mscale) if nout_iter == 0 and np.abs(mscale-1.)<0.0001 : break for star in range(nstds) : if badfiber[star]==0 : log.info("{} star {} final scale = {}".format(camera, star, scale[star])) log.info("{} n stars kept = {}".format(camera, np.sum(badfiber==0))) log.info("{} rms of scales = {:4.3f}".format(camera, np.std(scale[badfiber==0]))) log.info("{} n stars excluded = {}".format(camera, np.sum(badfiber>0))) log.info("{} n flux values excluded = {}".format(camera, nout_tot)) # solve once again to get deconvolved variance #calibration,calibcovar=cholesky_solve_and_invert(A.todense(),B) calibcovar=np.linalg.inv(A) calibvar=np.diagonal(calibcovar) log.info("{} mean(var)={:f}".format(camera, np.mean(calibvar))) calibvar=np.array(np.diagonal(calibcovar)) # apply the mean (as in the iterative loop) calibivar=(calibvar>0)/(calibvar+(calibvar==0)) # we also want to save the convolved calibration and a calibration variance # first compute average resolution mean_res_data=np.mean(tframe.resolution_data,axis=0) R = Resolution(mean_res_data) # compute convolved calib ccalibration = np.zeros(tframe.flux.shape) for i in range(tframe.nspec): norme = tframe.R[i].dot(np.ones(calibration.shape)) ok=np.where(norme>0)[0] if ok.size : ccalibration[i][ok]=tframe.R[i].dot(calibration)[ok]/norme[ok] # Use diagonal of mean calibration covariance for output. ccalibcovar=R.dot(calibcovar).dot(R.T.todense()) ccalibvar=np.array(np.diagonal(ccalibcovar)) # apply the mean (as in the iterative loop) ccalibivar=(ccalibvar>0)/(ccalibvar+(ccalibvar==0)) # at least a few stars at each wavelength min_number_of_stars = min(3,max(1,nstds//2)) nstars_with_signal=np.sum(current_ivar>0,axis=0) bad = (nstars_with_signal<min_number_of_stars) nallbad = np.sum(nstars_with_signal==0) # increase by 1 pixel bad[1:-1] |= bad[2:] bad[1:-1] |= bad[:-2] nbad=np.sum(bad>0) log.info("{} requesting at least {} star spectra at each wavelength results in masking {} add. flux bins ({} already masked)".format(camera, min_number_of_stars,nbad-nallbad,nallbad)) ccalibivar[bad]=0. ccalibration[:,bad]=0. # convert to 2D # For now this is the same for all fibers; in the future it may not be ccalibivar = np.tile(ccalibivar, tframe.nspec).reshape(tframe.nspec, tframe.nwave) # need to do better here mask = tframe.mask.copy() mccalibration = R.dot(calibration) #- Apply point source flux correction ccalibration /= point_source_correction[:,None] log.info(f"{camera} interpolate calibration over Ca and Na ISM lines") # do this after convolution with resolution ismlines=np.array([3934.77,3969.59,5891.58,5897.56]) # in vacuum hw=6. # A good=np.ones(tframe.nwave,dtype=bool) for line in ismlines : good &= (np.abs(tframe.wave-line)>hw) bad = ~good if np.sum(bad)>0 : mccalibration[bad] = np.interp(tframe.wave[bad],tframe.wave[good],mccalibration[good]) for spec in range(tframe.nspec) : ccalibration[spec,bad] = np.interp(tframe.wave[bad],tframe.wave[good],ccalibration[spec,good]) # trim back ccalibration=ccalibration[:,margin:-margin] ccalibivar=ccalibivar[:,margin:-margin] mask=mask[:,margin:-margin] mccalibration=mccalibration[margin:-margin] stdstars.wave=stdstars.wave[margin:-margin] fibercorr = dict() fibercorr_comments = dict() fibercorr["FLAT_TO_PSF_FLUX"]=point_source_correction fibercorr_comments["FLAT_TO_PSF_FLUX"]="correction for point sources already applied to the flux vector" fibercorr["PSF_TO_FIBER_FLUX"]=psf_to_fiber_flux_correction(frame.fibermap,exposure_seeing_fwhm) fibercorr_comments["PSF_TO_FIBER_FLUX"]="multiplication correction to apply to get the fiber flux spectrum" # save information about the standard stars used in the fit stdstar_fibermap = stdstars.fibermap[badfiber==0] #log.info("number of stars used in fit = {}".format(len(stdstar_fibermap))) # return calibration, calibivar, mask, ccalibration, ccalibivar return FluxCalib(stdstars.wave, ccalibration, ccalibivar, mask, mccalibration, fibercorr=fibercorr, stdstar_fibermap=stdstar_fibermap)
class FluxCalib(object): def __init__(self, wave, calib, ivar, mask, meancalib=None, fibercorr=None, fibercorr_comments=None, stdstar_fibermap=None): """Lightweight wrapper object for flux calibration vectors Args: wave : 1D[nwave] input wavelength (Angstroms) calib: 2D[nspec, nwave] calibration vectors for each spectrum ivar : 2D[nspec, nwave] inverse variance of calib mask : 2D[nspec, nwave] mask of calib (0=good) meancalib : 1D[nwave] mean convolved calibration (optional) fibercorr : dictionary of 1D arrays of size nspec (optional) fibercorr_comments : dictionnary of string (explaining the fibercorr) stdstar_fibermap : table with the fibermap of the std stars actually used All arguments become attributes, plus nspec,nwave = calib.shape The calib vector should be such that [1e-17 erg/s/cm^2/A] = [photons/A] / calib """ assert wave.ndim == 1 assert calib.ndim == 2 assert calib.shape == ivar.shape assert calib.shape == mask.shape assert np.all(ivar >= 0) self.nspec, self.nwave = calib.shape self.wave = wave self.calib = calib self.ivar = ivar self.mask = util.mask32(mask) self.meancalib = meancalib self.fibercorr = fibercorr self.fibercorr_comments= fibercorr_comments self.stdstar_fibermap = stdstar_fibermap self.meta = dict(units='photons/(erg/s/cm^2)') def __repr__(self): txt = '<{:s}: nspec={:d}, nwave={:d}, units={:s}'.format( self.__class__.__name__, self.nspec, self.nwave, self.meta['units']) # Finish txt = txt + '>' return (txt)
[docs]def apply_flux_calibration(frame, fluxcalib): """ Applies flux calibration to input flux and ivar Args: frame: Spectra object with attributes wave, flux, ivar, resolution_data fluxcalib : FluxCalib object with wave, calib, ... Modifies frame.flux and frame.ivar """ log=get_logger() log.info("starting") # check same wavelength, die if not the case mval=np.max(np.abs(frame.wave-fluxcalib.wave)) #if mval > 0.00001 : if mval > 0.001 : log.error("not same wavelength (should raise an error instead)") sys.exit(12) nwave=frame.nwave nfibers=frame.nspec """ F'=F/C Var(F') = Var(F)/C**2 + F**2*( d(1/C)/dC )**2*Var(C) = 1/(ivar(F)*C**2) + F**2*(1/C**2)**2*Var(C) = 1/(ivar(F)*C**2) + F**2*Var(C)/C**4 = 1/(ivar(F)*C**2) + F**2/(ivar(C)*C**4) """ # for fiber in range(nfibers) : # C = fluxcalib.calib[fiber] # flux[fiber]=frame.flux[fiber]*(C>0)/(C+(C==0)) # ivar[fiber]=(ivar[fiber]>0)*(civar[fiber]>0)*(C>0)/( 1./((ivar[fiber]+(ivar[fiber]==0))*(C**2+(C==0))) + flux[fiber]**2/(civar[fiber]*C**4+(civar[fiber]*(C==0))) ) C = fluxcalib.calib frame.flux = frame.flux * (C>0) / (C+(C==0)) frame.ivar *= (fluxcalib.ivar>0) * (C>0) for i in range(nfibers) : ok=np.where(frame.ivar[i]>0)[0] if ok.size>0 : frame.ivar[i,ok] = 1./( 1./(frame.ivar[i,ok]*C[i,ok]**2)+frame.flux[i,ok]**2/(fluxcalib.ivar[i,ok]*C[i,ok]**4) ) if fluxcalib.fibercorr is not None and frame.fibermap is not None : if "PSF_TO_FIBER_FLUX" in fluxcalib.fibercorr.dtype.names : log.info("add a column PSF_TO_FIBER_SPECFLUX to fibermap") frame.fibermap["PSF_TO_FIBER_SPECFLUX"]=fluxcalib.fibercorr["PSF_TO_FIBER_FLUX"]
[docs]def ZP_from_calib(exptime, wave, calib): """ Calculate the ZP in AB magnitudes given the calibration and the wavelength arrays Args: exptime: float; exposure time in seconds wave: 1D array (A) calib: 1D array (converts erg/s/A to photons/s/A) Returns: ZP_AB: 1D array of ZP values in AB magnitudes """ ZP_flambda = 1e-17 / (calib/exptime) # erg/s/cm^2/A ZP_fnu = ZP_flambda * wave**2 / (2.9979e18) # c in A/s # Avoid 0 values ZP_AB = np.zeros_like(ZP_fnu) gdZ = ZP_fnu > 0. ZP_AB[gdZ] = -2.5 * np.log10(ZP_fnu[gdZ]) - 48.6 # Return return ZP_AB
[docs]def qa_fluxcalib(param, frame, fluxcalib): """ Args: param: dict of QA parameters frame: Frame fluxcalib: FluxCalib Returns: qadict: dict of QA outputs Need to record simple Python objects for yaml (str, float, int) """ log = get_logger() qadict = {} # Unpack model exptime = frame.meta['EXPTIME'] # Standard stars stdfibers = np.where(isStdStar(frame.fibermap))[0] stdstars = frame[stdfibers] nstds = len(stdfibers) # Calculate ZP for mean spectrum #medcalib = np.median(fluxcalib.calib,axis=0) medcalib = np.median(fluxcalib.calib[stdfibers],axis=0) ZP_AB = ZP_from_calib(exptime, fluxcalib.wave, medcalib) # erg/s/cm^2/A # ZP at fiducial wavelength (AB mag for 1 photon/s/A) iZP = np.argmin(np.abs(fluxcalib.wave-param['ZP_WAVE'])) qadict['ZP'] = float(np.median(ZP_AB[iZP-10:iZP+10])) # Unpack star data #sqrtwmodel, sqrtwflux, current_ivar, chi2 = indiv_stars # RMS qadict['NSTARS_FIBER'] = int(nstds) ZP_fiducial = np.zeros(nstds) for ii in range(nstds): # Good pixels gdp = stdstars.ivar[ii, :] > 0. if not np.any(gdp): continue icalib = fluxcalib.calib[stdfibers[ii]][gdp] i_wave = fluxcalib.wave[gdp] # ZP ZP_stars = ZP_from_calib(exptime, i_wave, icalib) iZP = np.argmin(np.abs(i_wave-param['ZP_WAVE'])) ZP_fiducial[ii] = float(np.median(ZP_stars[iZP-10:iZP+10])) #import pdb; pdb.set_trace() qadict['RMS_ZP'] = float(np.std(ZP_fiducial)) # MAX ZP Offset ZPoffset = ZP_fiducial-qadict['ZP'] imax = np.argmax(np.abs(ZPoffset)) qadict['MAX_ZP_OFF'] = [float(ZPoffset[imax]), int(stdfibers[np.argmax(ZPoffset)])] if qadict['MAX_ZP_OFF'][0] > param['MAX_ZP_OFF']: log.warning("Bad standard star ZP {:g}, in fiber {:d}".format( qadict['MAX_ZP_OFF'][0], qadict['MAX_ZP_OFF'][1])) # Return return qadict