Source code for desispec.fiberflat_vs_humidity

"""
desispec.fiberflat_vs_humidity
==============================

Utility functions to compute a fiber flat corrected for variations with humidity in the shack
"""
from __future__ import absolute_import, division


import numpy as np
import copy

from desiutil.log import get_logger
from desispec.fiberflat import apply_fiberflat
from desispec.fiberbitmasking import get_skysub_fiberbitmask_val

def fit_wave_of_dip(wave1d,flux2d) :

    assert(len(flux2d.shape)==2)
    assert(flux2d.shape[0]==500)
    assert(flux2d.shape[1]==wave1d.shape[0])

    # 2D -> 1D
    mflux=(np.median(flux2d[0:200],axis=0)+np.median(flux2d[300:500],axis=0))/2.
    flux1d=np.median(flux2d[200:300],axis=0)
    flux1d/=mflux

    # minimum
    ii=np.where((wave1d>4250)&(wave1d<4500))[0]
    i=np.argmin(flux1d[ii])
    mwave1=wave1d[ii[i]]

    # parabola fit
    ii=np.abs(wave1d-mwave1)<10
    x=wave1d[ii]
    y=flux1d[ii]
    c=np.polyfit(x,y,2)
    mwave2=-c[1]/(2*c[0])

    return mwave2

[docs]def _interpolated_fiberflat_vs_humidity(fiberflat_vs_humidity , humidity_indices, humidity_index) : """ Interpolates between fiberflat templates indexed by humidity. Args: fiberflat_vs_humidity: 3D numpy array (n_humidity,n_fibers,n_wavelength) humidity_indices: 1D numpy array (n_humidity) humidity_index: float, humidity index value (same unit as humidity_indices) Returns 2D numpy array (n_fibers,n_wavelength) """ if humidity_index<=humidity_indices[0] : i1=0 else : i1=np.where(humidity_indices<humidity_index)[0][-1] i2=i1+1 if i2>=humidity_indices.size : # return largest value return fiberflat_vs_humidity[-1] w1=(humidity_indices[i2]-humidity_index)/(humidity_indices[i2]-humidity_indices[i1]) w2=(humidity_index-humidity_indices[i1])/(humidity_indices[i2]-humidity_indices[i1]) return w1*fiberflat_vs_humidity[i1]+w2*fiberflat_vs_humidity[i2]
[docs]def _fit_flat(wavelength,flux,ivar,fibers,mean_fiberflat_vs_humidity,humidity_array) : """ Finds best fit interpolation of fiberflat templates that matches an input flux frame Works only if wavelength array intersects the range [4000,4600]A, i.e. the blue cameras Args: wavelength: 1D numpy array (n_wavelength) in Angstrom flux: 2D numpy array (n_fibers,n_wavelength) unit does not matter ivar: 2D numpy array (n_fibers,n_wavelength) inverse variance of flux fibers: list or 1D number arrays of fibers to use among range(n_fibers) mean_fiberflat_vs_humidity: 3D numpy array (n_humidity,n_fibers,n_wavelength) humidity_array: 1D numpy array (n_humidity) Returns best_fit_flat best_fit_humidity (2D numpy array (n_fibers,n_wavelength) and float) """ log = get_logger() selection = (wavelength > 4000.) & (wavelength < 4600) if np.sum(selection)==0 : message="incorrect wavelength range" log.error(message) raise RuntimeError(message) waveindex = np.where(selection)[0] tmp_flux = flux[fibers][:,waveindex].copy() tmp_ivar = ivar[fibers][:,waveindex].copy() for loop in range(2) : # remove mean variation from fiber to fiber med = np.median(tmp_flux,axis=-1) tmp_flux /= med[:,None] tmp_ivar *= med[:,None]**2 # remove average over fibers med = np.median(tmp_flux,axis=0) tmp_flux /= med[None,:] tmp_ivar *= med[None,:]**2 tmp_flat = mean_fiberflat_vs_humidity[:,fibers][:,:,waveindex].copy() for loop in range(2) : # remove mean variation from fiber to fiber med = np.median(tmp_flat,axis=-1) tmp_flat /= med[:,:,None] # remove average over fibers for index in range(tmp_flat.shape[0]) : med = np.median(tmp_flat[index],axis=0) tmp_flat[index] /= med[None,:] # chi2 between all fiberflat templates (one per humidity bin) # with current flux value # summed over all fibers and all wavelength # after having both the fiberflat and the flux normalized to 1 # per fiber when averaged over all wavelength # and per wavelength when averaged over all fibers chi2 = np.sum(np.sum(tmp_ivar*(tmp_flux-tmp_flat)**2,axis=-1),axis=-1) # chi2 is a 1D array with size = number of humidity bins # index of minimum, but then we refine best_index=np.argmin(chi2) humidity_indices = np.arange(humidity_array.size,dtype=float) if best_index==0 or best_index==humidity_array.size-1 : best_humidity = humidity_array[best_index] flat = mean_fiberflat_vs_humidity[best_index] log.warning("best fit at edge of model humidity range") else : bb=best_index-1 ee=best_index+2 if bb<0 : bb+=1 ee+=1 if ee>chi2.size : bb-=1 ee-=1 # get the chi2 minimum, using the indices, not the humidity values which can fluctuate c=np.polyfit(humidity_indices[bb:ee],chi2[bb:ee],2) best_index = -c[1]/2./c[0] best_index = max(0,best_index) best_index = min(chi2.size-1,best_index) best_humidity = np.interp(best_index,humidity_indices,humidity_array) # simple linear interpolation indexed by the humidity flat = _interpolated_fiberflat_vs_humidity(mean_fiberflat_vs_humidity , humidity_indices, best_index) log.info("best fit index = {} , humidity = {:.2f}".format(best_index, best_humidity)) #import matplotlib.pyplot as plt #plt.figure() #plt.plot(humidity_indices,chi2,"o") #plt.axvline(best_index) #plt.show() return flat , best_humidity
[docs]def compute_humidity_corrected_fiberflat(calib_fiberflat, mean_fiberflat_vs_humidity , humidity_array, current_humidity, frame) : """ Apply a humidity-dependent correction to an input fiber flat Returns frame_fiberflat = calib_fiberflat / flat_vs_humidity_model(calib) * flat_vs_humidity_model(frame) Args: calib_fiberflat: desispec.FiberFlat object mean_fiberflat_vs_humidity: 3D numpy array (n_humidity,n_fibers,n_wavelength) humidity_array: 1D numpy array (n_humidity) current_humidity: float (same unit as humidity_array) frame: desispec.Frame object Returns modified desispec.FiberFlat object """ log = get_logger() best_humidity = current_humidity # only consider model humidity within 20% of measured humidity humidity_selection = np.abs(humidity_array - current_humidity)<20 log.info("using nightly flat to fit for the best fit nightly flat humidity") selection = np.sum(calib_fiberflat.ivar!=0,axis=1)>10 good_flat_fibers = np.where(selection)[0] flat2, hum2 = _fit_flat(calib_fiberflat.wave, calib_fiberflat.fiberflat, calib_fiberflat.ivar, good_flat_fibers, mean_fiberflat_vs_humidity[humidity_selection], humidity_array[humidity_selection]) flat1 = None hum1 = current_humidity if frame is not None : log.info("using frame to fit for the best fit current humidity") ivar = frame.ivar*(frame.mask==0) band = 'brz' # all by default if frame.meta is not None : if "CAMERA" in frame.meta.keys() : camera = frame.meta["CAMERA"].lower() band = camera[0] badfibermask = get_skysub_fiberbitmask_val(band=band) selection = (frame.fibermap["OBJTYPE"]=="SKY") & (frame.fibermap["FIBERSTATUS"] & badfibermask == 0) & (np.sum(ivar!=0,axis=1)>10) if np.sum(selection)>0 : good_sky_fibers = np.where(selection)[0] heliocor=frame.meta['HELIOCOR'] frame_wave_in_fiberflat_system = frame.wave/heliocor tmp_flux = frame.flux.copy() tmp_ivar = ivar.copy() for fiber in good_sky_fibers: ok=(ivar[fiber]>0) tmp_flux[fiber] = np.interp(frame.wave,frame_wave_in_fiberflat_system[ok],frame.flux[fiber][ok]) tmp_ivar[fiber] = np.interp(frame.wave,frame_wave_in_fiberflat_system[ok],ivar[fiber][ok]) flat1, hum1 = _fit_flat(frame.wave, tmp_flux*flat2/calib_fiberflat.fiberflat, tmp_ivar, good_sky_fibers, mean_fiberflat_vs_humidity[humidity_selection], humidity_array[humidity_selection]) if flat1 is None : log.info("use input humidity = {:.2f}".format(current_humidity)) flat1 = _interpolated_fiberflat_vs_humidity(mean_fiberflat_vs_humidity , humidity_array, current_humidity) # apply humidity correction to current calib fiberflat fiberflat = copy.deepcopy(calib_fiberflat) fiberflat.fiberflat = calib_fiberflat.fiberflat/flat2*flat1 fiberflat.header["EXPTHUM"] = (current_humidity,"exposure humidity from telemetry") fiberflat.header["EXPFHUM"] = (hum1,"exposure humidity from flat fit") fiberflat.header["CALFHUM"] = (hum2,"dome flat humidity from flat fit") if np.abs(hum1-current_humidity)>10 : message="large difference between best fit humidity during science exposure ({:.1f}) and value from telemetry ({:.1f})".format(hum1,current_humidity) if np.abs(hum1-current_humidity)>20 : log.error(message) raise RuntimeError(message) log.warning(message) return fiberflat