Source code for desispec.preproc

'''
desispec.preproc
================

Preprocess raw DESI exposures.
'''

import re
import os
import numpy as np
import scipy.interpolate
import numba
import time

from scipy import signal
from scipy.ndimage import median_filter
from scipy.signal import fftconvolve

from desispec.image import Image
from desispec import cosmics
from desispec.maskbits import ccdmask
from desiutil.log import get_logger
from desiutil import depend
from desispec.calibfinder import CalibFinder,ccdregionmask
from desispec.darktrail import correct_dark_trail
from desispec.scatteredlight import model_scattered_light
from desispec.io.xytraceset import read_xytraceset
from desispec.io import read_fiberflat, shorten_filename, findfile
from desispec.io.util import addkeys
from desispec.maskedmedian import masked_median
from desispec.image_model import compute_image_model
from desispec.util import header2night

[docs]def get_amp_ids(header): ''' Return list of amp names based upon header keywords ''' amp_ids = [] for a in ['A', 'B', 'C', 'D', '1', '2', '3', '4'] : if 'BIASSEC'+a in header : amp_ids.append(a) if len(amp_ids)==0 : raise KeyError("No keyword BIASSECX with X in A,B,C,D,1,2,3,4 in header") return amp_ids
[docs]def get_readout_mode(header): """ Derive CCD readout mode from CCD header Args: header: dict-like FITS header object with BIASSEC keywrds Returns "4Amp", "2AmpLeftRight", or "2AmpUpDown" "4Amp" means all 4 amps (ABCD) were used for CCD readout; "2AmpLeftRight" means 1 left amp (AC) and 1 right amp (BD) were used; "2AmpUpDown" means 1 upper amp (CD) and one lower (AB) were used. """ # Amp arrangement: # C D 3 4 # A B or 1 2 # python note: set('ABCD') == set(['A', 'B', 'C', 'D']), not set(['ABCD',]) ampids = set(get_amp_ids(header)) if ampids in [set('ABCD'), set('1234')]: return "4Amp" elif ampids in [set('AB'), set('CD'), set('12'), set('34')]: return "2AmpLeftRight" elif ampids in [set('AC'), set('BD'), set('13'), set('24')]: return "2AmpUpDown" else: log = get_logger() msg = f"Unknown CCD readout mode with amps {ampids}" log.error(msg) raise ValueError(msg)
def _parse_sec_keyword(value): log = get_logger() log.warning('please use parse_sec_keyword (no underscore)') return parse_sec_keyword(value)
[docs]def parse_sec_keyword(value): ''' parse keywords like BIASSECB='[7:56,51:4146]' into python slices python and FITS have almost opposite conventions, * FITS 1-indexed vs. python 0-indexed * FITS upperlimit-inclusive vs. python upperlimit-exclusive * FITS[x,y] vs. python[y,x] i.e. BIASSEC2='[7:56,51:4146]' -> (slice(50,4146), slice(6,56)) ''' m = re.search(r'\[(\d+):(\d+)\,(\d+):(\d+)\]', value) if m is None: m = re.search(r'\[(\d+):(\d+)\, (\d+):(\d+)\]', value) if m is None : raise ValueError('unable to parse {} as [a:b, c:d]'.format(value)) xmin, xmax, ymin, ymax = tuple(map(int, m.groups())) return np.s_[ymin-1:ymax, xmin-1:xmax]
[docs]def _clipped_std_bias(nsigma): ''' Returns the bias on the standard deviation of a sigma-clipped dataset Divide by the returned bias to get a corrected value:: a = nsigma bias = sqrt((integrate x^2 exp(-x^2/2), x=-a..a) / (integrate exp(-x^2/2), x=-a..a)) = sqrt(1 - 2a exp(-a^2/2) / (sqrt(2pi) erf(a/sqrt(2)))) See http://www.wolframalpha.com/input/?i=(integrate+x%5E2+exp(-x%5E2%2F2),+x+%3D+-a+to+a)+%2F+(integrate+exp(-x%5E2%2F2),+x%3D-a+to+a) ''' from scipy.special import erf a = float(nsigma) stdbias = np.sqrt(1 - 2*a*np.exp(-a**2/2.) / (np.sqrt(2*np.pi) * erf(a/np.sqrt(2)))) return stdbias
[docs]def compute_overscan_step(overscan_col, median_size=7, edge_margin=50) : """ Compute the overscan step score 'OSTEP' from an array of overscan values averaged per CCD row Args: overscan_col: 1D numpy.array Options: median_size (int): window size for median pre-filter of overscan_col edge_margin (int): ignore this number of rows at the CCD edges Returns: OSTEP value (float scalar) """ # median filter to futher reduce the noise med_overscan_col = median_filter(overscan_col, median_size) # use diff. because we want to detect steps, not a continuous variation diff_med_overscan_col = np.zeros_like(med_overscan_col) diff_med_overscan_col[:-1] = med_overscan_col[1:]-med_overscan_col[:-1] # measure the range of variation of overscan while ignoring # the edges where we can measure offsets which do not impact the spectroscopy diff = diff_med_overscan_col[edge_margin:-edge_margin] overscan_step = np.max(diff)-np.min(diff) return overscan_step
[docs]def _overscan(pix, nsigma=5, niter=3): """DEPRECATED: See calc_overscan""" log = get_logger() log.warning('_overscan is deprecated; please use calc_overscan') return calc_overscan(pix, nsigma=nsigma, niter=niter)
[docs]def calc_overscan(pix, nsigma=5, niter=3): """ Calculates overscan, readnoise from overscan image pixels Args: pix (ndarray) : overscan pixels from CCD image Optional: nsigma (float) : number of standard deviations for sigma clipping niter (int) : number of iterative refits Returns: overscan (float): Mean, sigma-clipped value readnoise (float): """ log=get_logger() #- normalized median absolute deviation as robust version of RMS #- see https://en.wikipedia.org/wiki/Median_absolute_deviation overscan = np.median(pix) absdiff = np.abs(pix - overscan) readnoise = 1.4826*np.median(absdiff) #- input pixels are integers, so iteratively refit for i in range(niter): absdiff = np.abs(pix - overscan) good = absdiff < nsigma*readnoise if np.sum(good)<5 : log.error("error in sigma clipping for overscan measurement, return result without clipping") overscan = np.median(pix) absdiff = np.abs(pix - overscan) readnoise = 1.4826*np.median(absdiff) return overscan,readnoise overscan = np.mean(pix[good]) readnoise = np.std(pix[good]) #- correct for bias from sigma clipping readnoise /= _clipped_std_bias(nsigma) return overscan, readnoise
[docs]def subtract_peramp_overscan(image, hdr): """Subtract per-amp overscan using BIASSEC* keywords Args: image: 2D image array, modified in-place hdr: FITS header with BIASSEC[ABCD] or BIASSEC[1234] keywords Note: currently used in desispec.ccdcalib.compute_bias_file to model bias image, but not preproc itself (which subtracts that bias, and has more complex support for row-by-row, col-overscan, etc.) """ amp_ids = get_amp_ids(hdr) for a,amp in enumerate(amp_ids) : ii=parse_sec_keyword(hdr['BIASSEC'+amp]) s0,s1=ii[0],ii[1] for k in ["DATASEC","PRESEC","ORSEC","PRRSEC"] : if k+amp in hdr : t0,t1=parse_sec_keyword(hdr[k+amp]) s0 = slice(min(s0.start,t0.start),max(s0.stop,t0.stop)) s1 = slice(min(s1.start,t1.start),max(s1.stop,t1.stop)) overscan_image = image[ii].copy() overscan,rdnoise = calc_overscan(overscan_image) image[s0,s1] -= overscan
[docs]def _savgol_clipped(data, window=15, polyorder=5, niter=0, threshold=3.): """ Simple method to iteratively do a SavGol filter with rejection and replacing rejected pixels by nearest neighbors Args: data (ndarray): window (int): Window parameter for savgol polyorder (int): niter (int): threshold (float): Returns: """ ### 1st estimation array = data.copy() fitted = signal.savgol_filter(array, window, polyorder) filtered = array - fitted ### nth iteration nrej = 0 for i in range(niter): sigma = filtered.std(axis=0) mask = np.abs(filtered) >= threshold*sigma good = np.where(~mask)[0] # Replace with nearest neighbors new_nrej = np.sum(mask) if new_nrej == nrej: break else: nrej = new_nrej for imask in np.where(mask)[0]: # Replace with nearest neighbors i0 = np.max(good[good < imask]) i1 = np.min(good[good > imask]) array[imask] = np.mean([array[i0], array[i1]]) ### Refit fitted = signal.savgol_filter(array, window, polyorder) # Return return fitted
[docs]def _global_background(image,patch_width=200) : ''' determine background using a 2D median with square patches of width = width that are interpolated (does not subtract the background) Args: image (ndarray) is expected to be already preprocessed ( image = ((rawimage-bias-overscan)*gain)/pixflat ) Options: patch_width (integer) size in pixels of the median square patches Returns background image with same shape as input image ''' bkg=np.zeros_like(image) bins0=np.linspace(0,image.shape[0],image.shape[0]//patch_width).astype(int) bins1=np.linspace(0,image.shape[1],image.shape[1]//patch_width).astype(int) bkg_grid=np.zeros((bins0.size-1,bins1.size-1)) for j in range(bins1.size-1) : for i in range(bins0.size-1) : bkg_grid[i,j]=np.median(image[bins0[i]:bins0[i+1],bins1[j]:bins1[j+1]]) nodes0=bins0[:-1]+(bins0[1]-bins0[0])/2. nodes1=bins1[:-1]+(bins1[1]-bins0[0])/2. spline=scipy.interpolate.RectBivariateSpline(nodes0,nodes1,bkg_grid,kx=2, ky=2, s=0) return spline(np.arange(0,image.shape[0]),np.arange(0,image.shape[1]))
[docs]def _background(image,header,patch_width=200,stitch_width=10,stitch=False) : ''' determine background using a 2D median with square patches of width = width that are interpolated and optionnally try and match the level of amplifiers (does not subtract the background) Args: image (ndarray) is expected to be already preprocessed ( image = ((rawimage-bias-overscan)*gain)/pixflat ) header is used to read CCDSEC Options: patch_width (integer) size in pixels of the median square patches stitch_width (integer) width in pixels of amplifier edges to match level of amplifiers stitch : do match level of amplifiers Returns background image with same shape as input image ''' log=get_logger() log.info("fit a smooth background over the whole image with median patches of size %dx%d"%(patch_width,patch_width)) bkg=_global_background(image,patch_width) if stitch : tmp_image=image-bkg log.info("stitch amps one with the other using median patches of size %dx%d"%(stitch_width,patch_width)) tmp_bkg=np.zeros_like(image) for edge in [[1,2,1],[3,4,1],[1,3,0],[2,4,0]] : amp0=edge[0] amp1=edge[1] axis=edge[2] ii0=parse_sec_keyword(header['CCDSEC%d'%amp0]) ii1=parse_sec_keyword(header['CCDSEC%d'%amp1]) pos=ii0[axis].stop bins=np.linspace(ii0[axis-1].start,ii0[axis-1].stop,float(ii0[axis-1].stop-ii0[axis-1].start)/patch_width).astype(int) delta=np.zeros((bins.size-1)) for i in range(bins.size-1) : if axis==0 : delta[i]=np.median(tmp_image[pos-stitch_width:pos,bins[i]:bins[i+1]])-np.median(tmp_image[pos:pos+stitch_width,bins[i]:bins[i+1]]) else : delta[i]=np.median(tmp_image[bins[i]:bins[i+1],pos-stitch_width:pos])-np.median(tmp_image[bins[i]:bins[i+1],pos:pos+stitch_width]) nodes=bins[:-1]+(bins[1]-bins[0])/2. log.info("AMPS %d:%d mean diff=%f"%(amp0,amp1,np.mean(delta))) delta=np.interp(np.arange(ii0[axis-1].stop-ii0[axis-1].start),nodes,delta) # smooth ramp along axis of scale patch_width w=float(patch_width) x=np.arange(ii1[axis].stop-ii1[axis].start) #ramp=(x<w)*(x>w/2)*((x-w)/(w))**2+(x<=w/2)*(0.5-(x/(w))**2) # peaks at 0.5 ramp=0.5*np.ones(x.shape) if axis==0 : tmp_bkg[ii1] -= np.outer(ramp,delta) else : tmp_bkg[ii1] -= np.outer(delta,ramp) x=np.arange(ii0[axis].stop-ii0[axis].start) #ramp=(x<w)*(x>w/2)*((x-w)/(w))**2+(x<=w/2)*(0.5-(x/(w))**2) # peaks at 0.5 ramp=0.5*np.ones(x.shape) if axis==0 : tmp_bkg[ii0] += np.outer(ramp[::-1],delta) else : tmp_bkg[ii0] += np.outer(delta,ramp[::-1]) bkg += tmp_bkg tmp_image=image-bkg log.info("refit smooth background over the whole image with median patches of size %dx%d"%(patch_width,patch_width)) bkg+=_global_background(tmp_image,patch_width) log.info("done") return bkg
@numba.jit(nopython=True) def numba_mean(image_flux,image_ivar,x,hw=3) : """ Returns mean of pixels vs. row about x+-hw Args: image_flux: 2D array of CCD image pixels image_ivar: 2D array of inverse variance of image_flux x: 1D array of x location per row, len(x) = image_flux.shape[0] Options: hw (int): halfwidth over which to average Returns (flux, ivar) 1D arrays with weighted mean and inverse variance of pixels[i, int(x-hw):int(x+hw)+1] per row i """ n0=image_flux.shape[0] flux=np.zeros(n0) ivar=np.zeros(n0) for j in range(n0) : for i in range(int(x[j]-hw),int(x[j]+hw+1)) : flux[j] += image_ivar[j,i]*image_flux[j,i] ivar[j] += image_ivar[j,i] if ivar[j]>0 : flux[j] = flux[j]/ivar[j] return flux,ivar
[docs]def compute_background_between_fiber_blocks(image,xyset) : """ Computes CCD background between blocks of fibers Args: image: desispec.image.Image object xyset: desispec.xytraceset.XYTraceSet object Returns (model, qadict): model: np.array of same shape as image.pix qadict: dictionary of keywords for QA with min/max per amplifier Notes: Has hardcoded number of blocks and fibers and typical spacing between blocked tuned to DESI spectrographs. """ log = get_logger() if 'CAMERA' in image.meta: camera = image.meta['CAMERA'] else: camera = 'unknown' log.info(f"Camera {camera} estimating CCD background between blocks of fiber traces") ivar=image.ivar*(image.mask==0) bkg=np.zeros_like(image.pix) t0=time.time() # first estimate contribution of light from bright fibers # inspect only central part of image ny=image.pix.shape[0] image_yy=np.arange(ny) yb=ny//2-300 ye=ny//2+300 # we are convolving the image by a lorentzian profile along the cross-dispersion profile # as a proxy for the scattered light from bright stars # parameters (width of kernel, amplitude, tuned on b7 exposure 117268) hw=30 nw=2*hw+1 kern=np.zeros((3,nw)) x=np.arange(-hw,hw+1) kern[1]=1/(1+x**2) kern *= 0.05/np.sum(kern) # approx normalization cimg=fftconvolve(image.pix[yb:ye]*(ivar[yb:ye]>0),kern,mode="same") t1=time.time() log.info(f"Camera {camera} convolution to estimate contribution of light from bright fibers took {t1-t0:.2f} sec") # measure scattered light between blocks nblock=21 scattered_light=np.zeros(nblock) for block in range(nblock) : # x coordinate of band between fiber blocks if block==0 : image_xb = xyset.x_vs_y(0,image_yy)-7.5 elif block==20 : image_xb = xyset.x_vs_y(499,image_yy)+7.5 else : image_xb = (xyset.x_vs_y(block*25-1,image_yy)+xyset.x_vs_y(block*25,image_yy))/2. scattered_light[block] = np.median(numba_mean(cimg,ivar[yb:ye],image_xb[yb:ye])) # remove median across interblocks scattered_light -= np.median(scattered_light) # anything that is higher than 0.5 electron is masked # and will be set to the average value of the other blocks (average per amp) masked_interblocks=np.where(scattered_light>0.5)[0] if masked_interblocks.size>0 : log.warning(f"Camera {camera} masking inter blocks {masked_interblocks} because of scattered light from bright fibers") qadict = dict() for amp in get_amp_ids(image.meta) : sec=parse_sec_keyword(image.meta['CCDSEC'+amp]) log.info(f"Camera {camera} amp {amp} fitting bkg for {sec}") # compute value between blocks of fibers nblock=21 xinterblock=[] vinterblock=[] mwidth=400 image_yy=np.arange(sec[0].start,sec[0].stop) for block in range(nblock) : # x coordinate of band between fiber blocks if block==0 : image_xb = xyset.x_vs_y(0,image_yy)-7.5 elif block==20 : image_xb = xyset.x_vs_y(499,image_yy)+7.5 else : image_xb = (xyset.x_vs_y(block*25-1,image_yy)+xyset.x_vs_y(block*25,image_yy))/2. # boxcar extraction half width (narrow to avoid pollution by tail of fiber traces for bright stars) hw=1 # use only values in this amplifier (interpolate for others) inamp = (image_xb-hw>=sec[1].start)&(image_xb+hw<sec[1].stop) if np.all(~inamp) : continue if block in masked_interblocks : xinterblock.append(image_xb) vinterblock.append(np.zeros(image_xb.shape)) continue # extract vb,vb_ivar = numba_mean(image.pix[sec[0]],ivar[sec[0]],image_xb) # mask out region of brightest sky line in blue camera skyline_wave=5578.9 if block==0 : skyline_y = xyset.y_vs_wave(0,skyline_wave) elif block==20 : skyline_y = xyset.y_vs_wave(499,skyline_wave) else : skyline_y = (xyset.y_vs_wave(block*25-1,skyline_wave)+xyset.y_vs_wave(block*25,skyline_wave))/2. #log.info(f"interblock {block} y({skyline_wave})={skyline_y}") inamp &= np.abs(image_yy-skyline_y)>5. # keep only in amp values if np.any(~inamp) : vb = np.interp(image_yy,image_yy[inamp],vb[inamp]) # median filter vb = median_filter(vb,mwidth) xinterblock.append(image_xb) vinterblock.append(vb) xinterblock=np.array(xinterblock) vinterblock=np.array(vinterblock) #- BBKG = Bundle Background vmin, vmax = np.min(vinterblock), np.max(vinterblock) qadict['BBKGMIN'+amp] = vmin qadict['BBKGMAX'+amp] = vmax log.info(f'Camera {camera} amp {amp} interbundle CCD bkg min/max {vmin:.3f} to {vmax:.3f}') if np.any(vinterblock!=0) : # set average value to masked interblocks if np.any(vinterblock==0) : vinterblock[vinterblock==0] = np.median(vinterblock[vinterblock!=0]) # interpolate along x xx=np.arange(sec[1].start,sec[1].stop) for k,y in enumerate(image_yy) : bkg[y,sec[1]]=np.interp(xx,xinterblock[:,k],vinterblock[:,k]) dt = time.time() - t0 log.info(f"Camera {camera} computing time = {dt:.3f} sec") return bkg, qadict
[docs]def get_calibration_image(cfinder, keyword, entry, header=None): """Reads a calibration file Args: cfinder : None or CalibFinder object keyword : BIAS, MASK, or PIXFLAT entry : boolean or filename or image if entry==False return False if entry==True use calibration filename from calib. config and read it if entry==str use this for the filename if entry==image return input Options: header : if not None, update header['CAL...'] = calib provenance returns: 2D numpy array with calibration image For the case of keyword='BIAS', check for nightly bias before using default bias in $DESI_SPECTRO_CALIB """ log=get_logger() #- set the header to something so that we don't have to keep checking it if header is None: header = dict() calkey = 'CCD_CALIB_{}'.format(keyword.upper()) if entry is False: depend.setdep(header, calkey, 'None') return False # we don't want do anything filename = None if entry is True : # we have to find the filename if keyword.upper() == 'BIAS': # try biasnight first night = header2night(header) expid = header['EXPID'] camera = header['CAMERA'].lower() if 'DESI_SPECTRO_REDUX' in os.environ and 'SPECPROD' in os.environ: biasnight = findfile('biasnight', night, expid, camera) if os.path.exists(biasnight): log.info(f'Using {night} nightly bias for {expid} {camera}') filename = biasnight else: log.warning(f'{night} nightly bias not found; using default bias for {expid} {camera}') else: log.warning(f'SPECPROD not set; using default bias instead of nightly bias for {expid} {camera}') if filename is None: if cfinder is None : log.error("no calibration data was found") raise ValueError("no calibration data was found") if cfinder.haskey(keyword) : filename = cfinder.findfile(keyword) depend.setdep(header, calkey, shorten_filename(filename)) else : depend.setdep(header, calkey, 'None') return False # we say in the calibration data we don't need this elif isinstance(entry,str) : filename = entry depend.setdep(header, calkey, shorten_filename(filename)) else : depend.setdep(header, calkey, 'Unknown image') return entry # it's expected to be an image array log.info("Using %s %s"%(keyword,filename)) if keyword == "BIAS" : return read_bias(filename=filename) elif keyword == "MASK" : return read_mask(filename=filename) elif keyword == "PIXFLAT" : return read_pixflat(filename=filename) elif keyword == "DARK" : raise ValueError("Dark are now treated separately.") else : log.error("Don't known how to read %s in %s"%(keyword,path)) raise ValueError("Don't known how to read %s in %s"%(keyword,path)) return False
[docs]def find_overscan_cosmic_trails(rawimage, ov_col, overscan_values, col_width=300, threshold=25000., smooth=100): """ Find overscan columns that might be impacted by a trail from bright cosmic Args: rawimage: numpy 2D array of raw image ov_col: tuple(yslice, xslice) from parse_sec_keyword('BIASSECx') defining overscan region Options: col_width: number of pixels from overscan region to consider threshold: ADU threshold for what might cause a problematic trail smooth: median filter smoothing scale Returns (badrows, active_col_val) where badrows is a boolean array of whether each row is bad or not, and active_col_val is an array of column-summed and row median-filtered from the active region of the CCD next to the overscan region. """ # define a band in the active CCD region next to the overscan left_amp = ov_col[1].start < rawimage.shape[1]//2 if left_amp : if ov_col[1].start > rawimage.shape[1]//4 : # overscan is on the right of the active region active_col = np.s_[ov_col[0].start:ov_col[0].stop, ov_col[1].start-col_width:ov_col[1].start] else : # overscan is on the left of the active region which happens for some 2 amp read mode. active_col = np.s_[ov_col[0].start:ov_col[0].stop, ov_col[1].stop:ov_col[1].stop+col_width] else : active_col = np.s_[ov_col[0].start:ov_col[0].stop, ov_col[1].stop:ov_col[1].stop+col_width] # measure sum over columns in band active_col_val = np.max(rawimage[active_col].astype(float),axis=1) # subtract median filter (to limit effect of neighboring truly bright fiber) active_col_val -= median_filter(active_col_val, smooth) # flag rows with large signal in active region badrows=(active_col_val>threshold) med_overscan_col = median_filter(overscan_values, 20) badrows &= np.abs(overscan_values-med_overscan_col) > 2. # add 2 pixel margins to the list of badrows for _ in range(2) : badrows[1:] |= badrows[:-1] badrows[:-1] |= badrows[1:] return badrows, active_col_val
[docs]def find_masked_rows(mask, header, amp): """ Find rows that are entirely masked on this amp Args: mask: 2D array of mask values; 0=good header: dict-like header keywords (to look up amp regions) amp: CCD amplifier 'A', 'B', 'C', or 'D' Returns: boolean array of masked rows only for this amp (not entire CCD) """ yy, xx = parse_sec_keyword(header['CCDSEC'+amp.upper()]) masked_rows = np.all(mask[yy,xx] != 0, axis=1) return masked_rows
[docs]def preproc(rawimage, header, primary_header, bias=True, dark=True, pixflat=True, mask=True, bkgsub_dark=False, nocosmic=False, cosmics_nsig=6, cosmics_cfudge=3., cosmics_c2fudge=None, ccd_calibration_filename=None, nocrosstalk=False, nogain=False, overscan_per_row=False, use_overscan_row=False, use_savgol=None, nodarktrail=False,remove_scattered_light=False,psf_filename=None, bias_img=None,model_variance=False,no_traceshift=False,bkgsub_science=False, keep_overscan_cols=False,no_overscan_per_row=False,no_ccd_region_mask=False, no_cte_corr=False,cte_params_filename=None, fallback_on_dark_not_found=False): ''' preprocess image using metadata in header image = ((rawimage-bias-overscan)*gain)/pixflat Args: rawimage : 2D numpy array directly from raw data file header : dict-like metadata, e.g. from FITS header, with keywords CAMERA, BIASSECx, DATASECx, CCDSECx where x = A, B, C, D for each of the 4 amplifiers (also supports old naming convention 1, 2, 3, 4). primary_header: dict-like metadata fit keywords EXPTIME, DOSVER DATE-OBS is also required if bias, pixflat, or mask=True Optional bias, pixflat, and mask can each be: False: don't apply that step True: use default calibration data for that night ndarray: use that array filename (str or unicode): read HDU 0 and use that Optional overscan features: overscan_per_row : bool, Subtract the overscan_col values row by row from the data. use_overscan_row : bool, Subtract off the overscan_row from the data (default: False). Requires ORSEC in the Header use_savgol : bool, Specify whether to use Savitsky-Golay filter for the overscan. (default: False). Requires use_overscan_row=True to have any effect. Optional variance model if model_variance=True Optional background subtraction with median filtering accross the whole CCD if bkgsub_dark=True Optional background subtraction with median filtering between groups of fiber traces if bkgsub_science=True Optional disabling of cosmic ray rejection if nocosmic=True Optional disabling of dark trail correction if nodarktrail=True Optionally falling back to DESI_SPECTRO_CALIB if fallback_on_dark_not_found=True and files are not found in DESI_SPECTRO_DARK (else failing in that case) Optional bias image (testing only) may be provided by bias_img= Optional tuning of cosmic ray rejection parameters: cosmics_nsig: number of sigma above background required cosmics_cfudge: number of sigma inconsistent with PSF required cosmics_c2fudge: fudge factor applied to PSF (default is 0.5, set in code) Optional fit and subtraction of scattered light Optional disabling of overscan subtraction per row if no_overscan_per_row=True Optional disabling of CTE correction if no_cte_corr=True Returns Image object with member variables: pix : 2D preprocessed image in units of electrons per pixel ivar : 2D inverse variance of image mask : 2D mask of image (0=good) readnoise : 2D per-pixel readnoise of image meta : metadata dictionary TODO: define what keywords are included preprocessing includes the following steps: - bias image subtraction - overscan subtraction (from BIASSEC* keyword defined regions) - readnoise estimation (from BIASSEC* keyword defined regions) - gain correction (from GAIN* keywords) - pixel flat correction - cosmic ray masking - propagation of input known bad pixel mask - inverse variance estimation Notes: The bias image is subtracted before any other calculation to remove any non-uniformities in the overscan regions prior to calculating overscan levels and readnoise. The readnoise is an image not just one number per amp, because the pixflat image also affects the interpreted readnoise. The inverse variance is estimated from the readnoise and the image itself, and thus is biased. ''' log=get_logger() if keep_overscan_cols : if dark is not False : mess="need dark=False because keep_overscal_col=True, try option --nodark" log.error(mess) raise RuntimeError(mess) if mask is not False : mess="need mask=False because keep_overscal_col=True, try option --nomask" log.error(mess) raise RuntimeError(mess) if pixflat is not False : mess="need pixflat=False because keep_overscal_col=True, try option --nopixflat" log.error(mess) raise RuntimeError(mess) header = header.copy() depend.setdep(header, 'DESI_SPECTRO_CALIB', os.getenv('DESI_SPECTRO_CALIB')) for key in ['DESI_SPECTRO_REDUX', 'SPECPROD']: if key in os.environ: depend.setdep(header, key, os.environ[key]) cfinder = None if ccd_calibration_filename is not False: cfinder = CalibFinder([header, primary_header], yaml_file=ccd_calibration_filename, fallback_on_dark_not_found=fallback_on_dark_not_found) #- Check if this file uses amp names 1,2,3,4 (old) or A,B,C,D (new) amp_ids = get_amp_ids(header) #- if CAMERA is missing, this will raise an exception in a few lines, #- but allows CAMERA logging in the meantime if it is present try: camera = header['CAMERA'].lower() except KeyError: camera = 'unknown' #- Double check that we have the necessary keywords missing_keywords = list() for key in ['CAMERA', 'EXPID']: if key not in header: missing_keywords.append(key) for prefix in ['CCDSEC', 'BIASSEC']: for amp in amp_ids : key = prefix+amp if not key in header : log.error(f'Camera {camera} No {key} keyword in header') missing_keywords.append(key) if len(missing_keywords) > 0: raise KeyError("Camera {} missing keywords {}".format(camera, ' '.join(missing_keywords))) #- don't use CTE correction for DARK and ZERO images if 'OBSTYPE' in primary_header and primary_header['OBSTYPE'] in ('ZERO', 'DARK'): if no_cte_corr is False: log.info(f"Not applying CTE corrections for OBSTYPE={primary_header['OBSTYPE']}") no_cte_corr = True #- Fail early if we need to CTE correct but don't have enough info do_cte_corr = not no_cte_corr if do_cte_corr: from desispec.correct_cte import needs_ctecorr if needs_ctecorr(cfinder=cfinder): log.info(f'Camera {camera} needs CTE corrections') if cte_params_filename is None: if not ('DESI_SPECTRO_REDUX' in os.environ and 'SPECPROD' in os.environ): mess = f"No DESI_SPECTRO_REDUX or no SPECPROD defined, and no external specified with option --cte-params. Cannot find calibration data for {camera}, so cannot do a CTE correction. Run with --no-cte-corr to skip." log.critical(mess) raise RuntimeError(mess) else: cte_params_filename = findfile('ctecorrnight', night=header['NIGHT'], readonly=True) if not os.path.isfile(cte_params_filename): mess = f'Missing {cte_params_filename}; either generate that file, provide a different one with --cte-params, or run with --no-cte-corr to skip' log.critical(mess) raise RuntimeError(mess) else: log.info(f'Camera {camera} does not need CTE corrections') do_cte_corr = False #- Subtract bias image #- convert rawimage to float64 : this is the output format of read_image rawimage = rawimage.astype(np.float64) # Savgol if cfinder and cfinder.haskey("USE_ORSEC"): use_overscan_row = cfinder.value("USE_ORSEC") if cfinder and cfinder.haskey("SAVGOL"): use_savgol = cfinder.value("SAVGOL") # Set bias image, as desired if bias_img is None: #- will try biasnight first, then default bias bias = get_calibration_image(cfinder,"BIAS",bias,header) else: bias = bias_img overscan_col_width = 0 #- Output arrays ny=0 nx=0 for amp in amp_ids : yy, xx = parse_sec_keyword(header['CCDSEC%s'%amp]) ny=max(ny,yy.stop) nx=max(nx,xx.stop) if keep_overscan_cols : amp = amp_ids[0] tt = parse_sec_keyword(header['DATASEC'+amp]) ov_col = parse_sec_keyword(header['BIASSEC%s'%amp]) overscan_col_width = max((tt[1].start-ov_col[1].start),(ov_col[1].stop-tt[1].stop)) log.info(f"will keep overscan columns of width = {overscan_col_width} pixels") nx += 2*overscan_col_width image = np.zeros((ny,nx)) readnoise = np.zeros_like(image) #- Load dark if cfinder and cfinder.haskey("DARK") and (dark is not False): #- Exposure time if cfinder and cfinder.haskey("EXPTIMEKEY") : exptime_key=cfinder.value("EXPTIMEKEY") log.info(f"Camera {camera} Using exposure time keyword {exptime_key} for dark normalization") else : exptime_key="EXPTIME" exptime = primary_header[exptime_key] log.info(f"Camera {camera} use exptime = {exptime:.1f} sec to compute the dark current") if isinstance(dark,str): dark_filename=dark if not os.path.exists(dark_filename): message=f"Supplied a filename for the dark to be used for preprocessing ({dark}), but does not exist" log.error(message) raise ValueError(message) else: dark_filename = cfinder.findfile("DARK") depend.setdep(header, 'CCD_CALIB_DARK', shorten_filename(dark_filename)) log.info(f'Camera {camera} using DARK model from {dark_filename}') # dark is multipled by exptime, or we use the non-linear dark model in the routine dark = read_dark(filename=dark_filename,exptime=exptime) if dark.shape == image.shape : log.info(f"Camera {camera} dark is trimmed") trimmed_dark_in_electrons = dark dark_is_trimmed = True elif dark.shape == rawimage.shape : log.info(f"Camera {camera} dark is not trimmed") trimmed_dark_in_electrons = np.zeros_like(image) dark_is_trimmed = False else : message="Camera {} incompatible dark shape={} when raw shape={} and preproc shape={}".format( camera, dark.shape, rawimage.shape, image.shape) log.error(message) raise ValueError(message) if np.all(dark==0.0): if exptime == 0.0: log.info(f'Camera {camera} dark model for exptime=0 is all zeros; not applying') else: log.error(f'Camera {camera} dark model for exptime={exptime} unexpectedly all zeros; not applying') dark = False else: dark = False if bias is not False : #- it's an array if bias.shape == rawimage.shape : log.info(f"Camera {camera} subtracting bias") rawimage = rawimage - bias else: raise ValueError('shape mismatch bias {} != rawimage {}'.format(bias.shape, rawimage.shape)) #- Load mask mask = get_calibration_image(cfinder,"MASK",mask,header) if mask is False : mask = np.zeros(image.shape, dtype=np.int32) else : if mask.shape != image.shape : raise ValueError('shape mismatch mask {} != image {}'.format(mask.shape, image.shape)) if not no_ccd_region_mask : regionmasks = ccdregionmask([header, primary_header]) for regionmask in regionmasks : log.info(f"masking region {regionmask}") mask[regionmask["YMIN"]:regionmask["YMAX"],regionmask["XMIN"]:regionmask["XMAX"]] |= ccdmask.BAD if no_overscan_per_row : log.debug("Option no_overscan_per_row is set") for amp in amp_ids: # Grab the sections ov_col = parse_sec_keyword(header['BIASSEC'+amp]) if 'ORSEC'+amp in header.keys(): ov_row = parse_sec_keyword(header['ORSEC'+amp]) elif use_overscan_row: log.error(f'Camera {camera} no ORSEC{amp} keyword; not using overscan_row') use_overscan_row = False if nogain : gain = 1. else : #- Initial teststand data may be missing GAIN* keywords; don't crash if 'GAIN'+amp in header: gain = header['GAIN'+amp] #- gain = electrons / ADU else: if cfinder and cfinder.haskey('GAIN'+amp) : gain = float(cfinder.value('GAIN'+amp)) log.info(f'Camera {camera} using GAIN{amp}={gain} from calibration data') else : gain = 1.0 log.error(f'Camera {camera} missing keyword GAIN{amp} in header and nothing in calib data; using {gain}') #- Record what gain value was actually used header['GAIN'+amp] = gain #- Add saturation level if 'SATURLEV'+amp in header: saturlev_adu = header['SATURLEV'+amp] # in ADU else: if cfinder and cfinder.haskey('SATURLEV'+amp) : saturlev_adu = float(cfinder.value('SATURLEV'+amp)) log.info(f'Camera {camera} using SATURLEV{amp}={saturlev_adu} from calibration data') else : saturlev_adu = 2**16-1 # 65535 is the max value in the images log.warning(f'Camera {camera} Missing keyword SATURLEV{amp} in header and nothing in calib data; using {saturlev_adu} ADU') header['SATULEV'+amp] = (saturlev_adu,"saturation or non lin. level, in ADU, inc. bias") # Generate the overscan images raw_overscan_col = rawimage[ov_col].copy() kk = parse_sec_keyword(header['CCDSEC'+amp]) if keep_overscan_cols : if kk[1].stop>image.shape[1]//2 : start = kk[1].start + overscan_col_width stop = kk[1].stop + 2*overscan_col_width else : start = kk[1].start stop = kk[1].stop + overscan_col_width kk = np.s_[kk[0].start:kk[0].stop, start:stop] # Now remove the overscan_col nrows=raw_overscan_col.shape[0] log.info(f"Camera {camera} {nrows} rows in overscan") if not nodarktrail and cfinder is not None : if cfinder.haskey("DARKTRAILAMP%s"%amp) : log.info("Perform a dark trail correction before fitting the overscan region") amplitude = cfinder.value("DARKTRAILAMP%s"%amp) width = cfinder.value("DARKTRAILWIDTH%s"%amp) # Region is BIASSEC+DATASEC ii = parse_sec_keyword(header["BIASSEC"+amp]) jj = parse_sec_keyword(header["DATASEC"+amp]) start = min(ii[1].start,jj[1].start) stop = max(ii[1].stop,jj[1].stop) jj=np.s_[jj[0].start:jj[0].stop, start:stop] o,r = calc_overscan(rawimage[jj]) # tmp copy or rawimage tmp=rawimage[jj].copy()-o ll=np.s_[0:tmp.shape[0],0:tmp.shape[1]] correct_dark_trail(tmp,ll,left=((amp=="B")|(amp=="D")),width=width,amplitude=amplitude) tmp -= (rawimage[jj].copy()-o) # subtract input to keep only the correction start = ii[1].start-jj[1].start stop = ii[1].stop-jj[1].start raw_overscan_col += tmp[:,start:stop] # apply the correction only to the overscan cols overscan_col = np.zeros(nrows) rdnoise = np.zeros(nrows) for j in range(nrows) : if np.isnan(np.sum(overscan_col[j])) : log.warning(f"Camera {camera} amp {amp} NaN values in row {j} of overscan") continue o,r = calc_overscan(raw_overscan_col[j]) overscan_col[j]=o rdnoise[j]=r # find rows impacted by a large cosmic charge deposit badrows, active_col_val = find_overscan_cosmic_trails(rawimage, ov_col, overscan_values = overscan_col) # also mask overscan rows that are entirely masked in the active region masked_rows = find_masked_rows(mask, header, amp) num_masked_rows = np.sum(masked_rows) log.info(f'{num_masked_rows} rows entirely masked on amp {amp} of camera {camera}') badrows |= masked_rows if np.any(badrows) : log.warning("Camera {} amp {}, ignore overscan rows = {} because of large charge deposit = {} ADUs".format( camera,amp,np.where(badrows)[0],active_col_val[badrows])) # do not use overscan value for those, use interpolation goodrows = ~badrows rr=np.arange(nrows) try: overscan_col[badrows] = np.interp(rr[badrows],rr[goodrows],overscan_col[goodrows]) except ValueError: # If can't interpolate, log error but don't crash and let ostep do the flagging ngood = np.sum(goodrows) nbad = np.sum(badrows) log.error(f'Camera {camera} amp {amp} unable to interpolate overscan_col over {nbad} bad rows using {ngood} good rows') overscan_step = compute_overscan_step(overscan_col) header['OSTEP'+amp] = (overscan_step,'ADUs (max-min of median overscan per row)') log.info(f"Camera {camera} amp {amp} overscan max-min per row (OSTEP) = {overscan_step:2f} ADU") if overscan_step < 2 or no_overscan_per_row : # tuned to trig on the worst few log.info(f"Camera {camera} amp {amp} subtracting average overscan") o,r = calc_overscan(raw_overscan_col) # replace by single value overscan_col = np.repeat(o,nrows) rdnoise = np.repeat(r,nrows) header['OMETH'+amp]=("AVERAGE","use average overscan") else : header['OMETH'+amp]=("PER_ROW","use average overscan per row") log.info(f"Camera {camera} amp {amp} subtracting overscan per row") # The threshold of 5 ADUs discards 20% of the r8-A data # from Oct 2021. But it is necessary to discard some # exposures with OSTEPA>=8 ADU where bias variation # residuals were the cause of some bad redshifts. if overscan_step > 5. : mask[kk] |= ccdmask.BADREADNOISE log.warning(f"Camera {camera} amp {amp} OSTEP={overscan_step:.2f} is too large, set ccdmask.BADREADNOISE bit mask") # We use the overscan per row but we still compute a single readnoise value for the whole amplifier row_subtracted_overscan_col = raw_overscan_col - overscan_col[:,None] o,r = calc_overscan(row_subtracted_overscan_col) rdnoise = np.repeat(r,nrows) if bias is not False : # the master bias noise is already in the raw data # (because we already subtracted the bias) # so we only need to add the quadratic difference of the master bias read noise # between the active region and the overscan columns jj = parse_sec_keyword(header['DATASEC'+amp]) o,biasnoise_datasec = calc_overscan(bias[jj]) o,biasnoise_ovcol = calc_overscan(bias[ov_col]) new_rdnoise = np.sqrt(rdnoise**2+biasnoise_datasec**2-biasnoise_ovcol**2) log.info("Camera {} amp {} master bias noise {:4.3f} ADU, rdnoise {:4.3f} -> {:4.3f} ADU".format( camera, amp, biasnoise_datasec, np.mean(rdnoise), np.mean(new_rdnoise))) rdnoise = new_rdnoise if use_overscan_row: raw_overscan_row = rawimage[ov_row].copy() # Remove overscan_col from overscan_row o,r = calc_overscan(raw_overscan_col) overscan_row = raw_overscan_row - o rdnoise *= gain median_rdnoise = np.median(rdnoise) median_overscan = np.median(overscan_col) log.info(f"Camera {camera} amp {amp} Median rdnoise and overscan= {median_rdnoise:.3f} {median_overscan:.3f}") for j in range(nrows) : readnoise[kk][j] = rdnoise[j] header['OVERSCN'+amp] = (median_overscan,'ADUs (gain not applied)') if gain != 1 : rdnoise_message = 'electrons (gain is applied)' gain_message = 'e/ADU (gain applied to image)' else : rdnoise_message = 'ADUs (gain not applied)' gain_message = 'gain not applied to image' header['OBSRDN'+amp] = (median_rdnoise,rdnoise_message) header['GAIN'+amp] = (gain,gain_message) #- Warn/error if measured readnoise is very different from expected if exists if 'RDNOISE'+amp in header: expected_readnoise = header['RDNOISE'+amp] if median_rdnoise < 0.5*expected_readnoise: log.error('Camera {} amp {} measured readnoise {:.2f} < 0.5 * expected readnoise {:.2f}'.format( camera, amp, median_rdnoise, expected_readnoise)) elif median_rdnoise < 0.9*expected_readnoise: log.warning('Camera {} amp {} measured readnoise {:.2f} < 0.9 * expected readnoise {:.2f}'.format( camera, amp, median_rdnoise, expected_readnoise)) elif median_rdnoise > 2.0*expected_readnoise: log.error('Camera {} amp {} measured readnoise {:.2f} > 2 * expected readnoise {:.2f}'.format( camera, amp, median_rdnoise, expected_readnoise)) elif median_rdnoise > 1.2*expected_readnoise: log.warning('Camera {} amp {} measured readnoise {:.2f} > 1.2 * expected readnoise {:.2f}'.format( camera, amp, median_rdnoise, expected_readnoise)) log.info("Camera {} amp {} measured readnoise = {:.3f}".format( camera, amp, median_rdnoise)) #- subtract overscan from data region and apply gain jj = parse_sec_keyword(header['DATASEC'+amp]) if keep_overscan_cols : start=min(jj[1].start,ov_col[1].start) stop=max(jj[1].stop,ov_col[1].stop) jj = np.s_[jj[0].start:jj[0].stop, start:stop] data = rawimage[jj].copy() # Subtract columns for k in range(nrows): data[k] -= overscan_col[k] saturlev_elec = gain*(saturlev_adu - np.mean(overscan_col)) header['SATUELE'+amp] = (saturlev_elec,"saturation or non lin. level, in electrons") # And now the rows if use_overscan_row: # Savgol? if use_savgol: log.info(f"Camera {camera} amp {amp} Using savgol") collapse_oscan_row = np.zeros(overscan_row.shape[1]) for col in range(overscan_row.shape[1]): o, _ = calc_overscan(overscan_row[:,col]) collapse_oscan_row[col] = o oscan_row = _savgol_clipped(collapse_oscan_row, niter=0) oimg_row = np.outer(np.ones(data.shape[0]), oscan_row) data -= oimg_row else: o,r = calc_overscan(overscan_row) log.info("Camera {} amp {} removing overscan rows value = {:.2f}".format(camera,amp,o)) data -= o #- apply saturlev (defined in ADU), prior to multiplication by gain saturated = (rawimage[jj]>=saturlev_adu) mask[kk][saturated] |= ccdmask.SATURATED #- ADC to electrons image[kk] = data*gain #- add Poisson noise of bias if ( bias is not False ) and ( bias is not None ) : trimmed_bias_in_electrons = bias[jj]*gain readnoise[kk] = np.sqrt(readnoise[kk]**2+trimmed_bias_in_electrons*(trimmed_bias_in_electrons>0)) if dark is not False : if not dark_is_trimmed : trimmed_dark_in_electrons[kk] = dark[jj]*gain if not nocrosstalk : #- apply cross-talk # the ccd looks like : # C D # A B # for cross talk, we need a symmetric 4x4 flip_matrix # of coordinates ABCD giving flip of both axis # when computing crosstalk of # A B C D # # A AA AB AC AD # B BA BB BC BD # C CA CB CC CD # D DA DB DC BB # orientation_matrix_defines change of orientation # fip_axis_0= np.array([[1,1,-1,-1], [1,1,-1,-1], [-1,-1,1,1], [-1,-1,1,1]]) fip_axis_1= np.array([[1,-1,1,-1], [-1,1,-1,1], [1,-1,1,-1], [-1,1,-1,1]]) for a1 in range(len(amp_ids)) : amp1=amp_ids[a1] ii1 = parse_sec_keyword(header['CCDSEC'+amp1]) a1flux=image[ii1] #a1mask=mask[ii1] for a2 in range(len(amp_ids)) : if a1==a2 : continue amp2=amp_ids[a2] if cfinder is None : continue if not cfinder.haskey("CROSSTALK%s%s"%(amp1,amp2)) : continue crosstalk=cfinder.value("CROSSTALK%s%s"%(amp1,amp2)) if crosstalk==0. : continue log.info("Camera {} correct for crosstalk={} from AMP {} into {}".format( camera, crosstalk, amp1, amp2)) a12flux=crosstalk*a1flux.copy() #a12mask=a1mask.copy() if fip_axis_0[a1,a2]==-1 : a12flux=a12flux[::-1] #a12mask=a12mask[::-1] if fip_axis_1[a1,a2]==-1 : a12flux=a12flux[:,::-1] #a12mask=a12mask[:,::-1] ii2 = parse_sec_keyword(header['CCDSEC'+amp2]) image[ii2] -= a12flux # mask[ii2] |= a12mask (not sure we really need to propagate the mask) #- Poisson noise variance (prior to dark subtraction and prior to pixel flat field) #- This is biasing, but that's what we have for now poisson_var = image.clip(0) #- subtract dark after multiplication by gain if dark is not False : log.info(f"Camera {camera} subtracting dark") image -= trimmed_dark_in_electrons # measure its noise new_readnoise = np.zeros(readnoise.shape) for amp in amp_ids: kk = parse_sec_keyword(header['CCDSEC'+amp]) o,darknoise = calc_overscan(trimmed_dark_in_electrons[kk]) new_readnoise[kk] = np.sqrt(readnoise[kk]**2+darknoise**2) log.info("Camera {} amp {} master dark noise = {:4.3f} elec, rdnoise {:4.3f} -> {:4.3f} elec".format( camera, amp, darknoise, np.mean(readnoise[kk]), np.mean(new_readnoise[kk]))) readnoise = new_readnoise #- Correct for dark trails if any if not nodarktrail and cfinder is not None : for amp in amp_ids : if cfinder.haskey("DARKTRAILAMP%s"%amp) : amplitude = cfinder.value("DARKTRAILAMP%s"%amp) width = cfinder.value("DARKTRAILWIDTH%s"%amp) ii = parse_sec_keyword(header["CCDSEC"+amp]) if keep_overscan_cols and ii[1].stop>image.shape[1]//2 : start = ii[1].start + overscan_col_width stop = ii[1].stop + 2*overscan_col_width ii = np.s_[ii[0].start:ii[0].stop, start:stop] else : start = ii[1].start stop = ii[1].stop + overscan_col_width ii = np.s_[ii[0].start:ii[0].stop, start:stop] log.info("Camera {} amp {} removing dark trails with width={:3.1f} and amplitude={:5.4f}".format( camera, amp, width, amplitude)) correct_dark_trail(image,ii,left=((amp=="B")|(amp=="D")),width=width,amplitude=amplitude) #- Divide by pixflat image pixflat = get_calibration_image(cfinder,"PIXFLAT",pixflat,header) if pixflat is not False : if pixflat.shape != image.shape: raise ValueError('shape mismatch pixflat {} != image {}'.format(pixflat.shape, image.shape)) almost_zero = 0.001 if np.all(pixflat > almost_zero ): image /= pixflat readnoise /= pixflat poisson_var /= pixflat**2 else: good = (pixflat > almost_zero ) image[good] /= pixflat[good] readnoise[good] /= pixflat[good] poisson_var[good] /= pixflat[good]**2 mask[~good] |= ccdmask.PIXFLATZERO lowpixflat = (0 < pixflat) & (pixflat < 0.1) if np.any(lowpixflat): mask[lowpixflat] |= ccdmask.PIXFLATLOW #- Inverse variance, estimated directly from the data (BEWARE: biased!) var = poisson_var + readnoise**2 ivar = np.zeros(var.shape) ivar[var>0] = 1.0 / var[var>0] #- High readnoise is bad mask[readnoise>15] |= ccdmask.BADREADNOISE if bkgsub_dark : bkg = _background(image,header) image -= bkg img = Image(image, ivar=ivar, mask=mask, meta=header, readnoise=readnoise, camera=camera) #- force using img object (which gets further updated, but not original inputs) del image del ivar del mask del header #- CTE correction if any exist if do_cte_corr: # import only if needed to simplify required dependencies import desispec.correct_cte log.info(f"Applying CTE correction for camera {camera}") img = desispec.correct_cte.correct_image_via_model(img,niter=5,cte_params_filename=cte_params_filename) #- update img.mask to mask cosmic rays if not nocosmic : if cfinder is not None and cfinder.haskey("COSMICS_C2") : if cosmics_c2fudge is None : cosmics_c2fudge = cfinder.value("COSMICS_C2") log.info(f"Using custom cosmics C2 parameter = {cosmics_c2fudge}") else : cosmics_c2fudge_in_calib = cfinder.value("COSMICS_C2") log.warning(f"Cosmics C2 parameter from calibration ({cosmics_c2fudge_in_calib}) overridden by function arg = {cosmics_c2fudge}") if cosmics_c2fudge is None : cosmics_c2fudge = 0.5 log.debug(f"Using default cosmics C2 parameter = {cosmics_c2fudge}") cosmics.reject_cosmic_rays(img,nsig=cosmics_nsig,cfudge=cosmics_cfudge,c2fudge=cosmics_c2fudge) mask = img.mask xyset = None if model_variance : psf = None if psf_filename is None : psf_filename = cfinder.findfile("PSF") depend.setdep(img.meta, 'CCD_CALIB_PSF', shorten_filename(psf_filename)) xyset = read_xytraceset(psf_filename) fiberflat = None with_spectral_smoothing=True with_sky_model = True if with_sky_model : log.debug(f"Camera {camera} will use a sky model to model the spectra") fiberflat_filename = cfinder.findfile("FIBERFLAT") depend.setdep(img.meta, 'CCD_CALIB_FIBERFLAT', shorten_filename(fiberflat_filename)) if fiberflat_filename is not None : fiberflat = read_fiberflat(fiberflat_filename) log.info(f"Camera {camera} compute an image model after dark correction and pixel flat") nsig = 5. mimage = compute_image_model(img, xyset, fiberflat=fiberflat, with_spectral_smoothing=with_spectral_smoothing, with_sky_model=with_sky_model, spectral_smoothing_nsig=nsig, psf=psf, fit_x_shift=(not no_traceshift)) # here we bring back original image for large outliers # this allows to have a correct ivar for cosmic rays and bright sources eps = 0.1 out = (((img.ivar>0)*(img.pix-mimage)**2/(1./(img.ivar+(img.ivar==0))+(0.1*mimage)**2))>nsig**2) # out &= (image>mimage) # could request this to be conservative on the variance ... but this could cause other issues mimage[out] = img.pix[out] log.info(f"Camera {camera} use image model to compute variance") if bkgsub_dark : mimage += bkg if pixflat is not False : # undo pixflat mimage *= pixflat if dark is not False : mimage += dark poisson_var = mimage.clip(0) if pixflat is not False : if np.all(pixflat > almost_zero ): poisson_var /= pixflat**2 else: poisson_var[good] /= pixflat[good]**2 var = poisson_var + readnoise**2 img.ivar[var>0] = 1.0 / var[var>0] if np.all(img.mask>0): log.error(f'Camera {camera} is entirely masked (i.e. unusable)') if remove_scattered_light : if xyset is None : if psf_filename is None : psf_filename = cfinder.findfile("PSF") depend.setdep(img.meta, 'SCATTERED_LIGHT_PSF', shorten_filename(psf_filename)) xyset = read_xytraceset(psf_filename) img.pix -= model_scattered_light(img,xyset) if bkgsub_science : if xyset is None : if psf_filename is None : psf_filename = cfinder.findfile("PSF") depend.setdep(img.meta, 'SCATTERED_LIGHT_PSF', shorten_filename(psf_filename)) xyset = read_xytraceset(psf_filename) ccdbkg, bkgqa = compute_background_between_fiber_blocks(img,xyset) img.pix -= ccdbkg #- Adds BBKG (Bundle Background) MIN/MAX per amp for QA/debugging addkeys(img.meta, bkgqa) #- Extend header with primary header keywords too try: img.meta.extend(primary_header, strip=True, unique=True) except AttributeError: addkeys(img.meta, primary_header) return img
#------------------------------------------------------------------------- #- The following I/O routines are here instead of desispec.io to avoid a #- circular dependency between io and preproc: #- io.read_raw -> preproc.preproc -> io.read_bias (bad)
[docs]def read_bias(filename=None, camera=None, dateobs=None): ''' Return calibration bias filename for camera on dateobs or night Options: filename : input filename to read camera : e.g. 'b0', 'r1', 'z9' dateobs : DATE-OBS string, e.g. '2018-09-23T08:17:03.988' Notes: must provide filename, or both camera and dateobs ''' from astropy.io import fits if filename is None: #- use camera and dateobs to derive what bias file should be used raise NotImplementedError else: return fits.getdata(filename, 0)
[docs]def read_pixflat(filename=None, camera=None, dateobs=None): ''' Read calibration pixflat image for camera on dateobs. Options: filename : input filename to read camera : e.g. 'b0', 'r1', 'z9' dateobs : DATE-OBS string, e.g. '2018-09-23T08:17:03.988' Notes: must provide filename, or both camera and dateobs ''' from astropy.io import fits if filename is None: #- use camera and dateobs to derive what pixflat file should be used raise NotImplementedError else: return fits.getdata(filename, 0)
def recover_2d_dark(hdus,exptime,extname): log=get_logger() shape=hdus[0].data.shape nx=shape[0] ny=shape[1] profileLeft=hdus[extname].data[0] profileRight=hdus[extname].data[1] profile_2d_Left=np.transpose(np.tile(profileLeft,(int(ny/2),1))) profile_2d_Right=np.transpose(np.tile(profileRight,(int(ny/2),1))) profile_2d=np.concatenate((profile_2d_Left,profile_2d_Right),axis=1) return profile_2d
[docs]def read_dark(filename=None, camera=None, dateobs=None, exptime=None): ''' Return dark current 2D image ( accounting for exposure time) Args: filename : input filename to read camera : e.g. 'b0', 'r1', 'z9' dateobs : DATE-OBS string, e.g. '2018-09-23T08:17:03.988' exptime : the exposure time of the image in seconds Notes: must provide filename ''' from astropy.io import fits log=get_logger() if filename is None: #- use camera and dateobs to derive what pixflat file should be used raise NotImplementedError if exptime is None : #- make sure we have the exposure time set to avoid trouble raise ValueError("Need exposure time for dark") exptime = float(exptime) # will throw exception if cannot cast to float hdus = fits.open(filename) if len(hdus)==1 : log.info("Single dark frame") return exptime * hdus[0].data else : log.info("Exposure time dependent dark") exptime_arr=[] ext_arr={} for hdu in hdus: if hdu.header['EXTNAME'] == 'DARK' or hdu.header['EXTNAME'] == 'ZERO': pass #elif hdu.header['EXTNAME'] == 'ZERO': # ext_arr['0']=hdu.header['EXTNAME'] # exptime_arr.append(0) else: ext_arr[hdu.header['EXTNAME'][1:]]=hdu.header['EXTNAME'] exptime_arr.append(float(hdu.header['EXTNAME'][1:])) exptime_arr = np.array(exptime_arr) min_exptime = np.min(exptime_arr) max_exptime = np.max(exptime_arr) if exptime==0.: profile_2d=0. elif exptime in exptime_arr: profile_2d = recover_2d_dark(hdus,exptime,ext_arr[str(int(exptime))]) elif exptime < min_exptime : log.warning("Use 2D dark profile at min. exptime={}".format(min_exptime)) profile_2d = recover_2d_dark(hdus,min_exptime,ext_arr[str(int(min_exptime))]) elif exptime > max_exptime : log.warning("Use 2D dark profile at max. exptime={}".format(max_exptime)) profile_2d = recover_2d_dark(hdus,max_exptime,ext_arr[str(int(max_exptime))]) else: # Interpolate exptime_arr=np.sort(exptime_arr) ind=np.where(exptime_arr>exptime) ind1=ind[0][0]-1 ind2=ind[0][0] log.info('Interpolate between '+str(exptime_arr[ind1])+' and '+str(exptime_arr[ind2])) precision=(exptime-exptime_arr[ind1])/(exptime_arr[ind2]-exptime_arr[ind1]) # Run interpolation image1=recover_2d_dark(hdus,exptime_arr[ind1],ext_arr[str(int(exptime_arr[ind1]))]) image2=recover_2d_dark(hdus,exptime_arr[ind2],ext_arr[str(int(exptime_arr[ind2]))]) profile_2d = image1*(1-precision)+image2*precision return profile_2d + exptime * hdus['DARK'].data
[docs]def read_mask(filename=None, camera=None, dateobs=None): ''' Read bad pixel mask image for camera on dateobs. Options: filename : input filename to read camera : e.g. 'b0', 'r1', 'z9' dateobs : DATE-OBS string, e.g. '2018-09-23T08:17:03.988' Notes: must provide filename, or both camera and dateobs ''' from astropy.io import fits if filename is None: #- use camera and dateobs to derive what mask file should be used raise NotImplementedError else: return fits.getdata(filename, 0)