Source code for desispec.scripts.trace_shifts

"""
desispec.scripts.trace_shifts
=============================

"""

import os, sys
import argparse
import numpy as np
from numpy.linalg import LinAlgError
import astropy.io.fits as pyfits
from numpy.polynomial.legendre import legval,legfit
from importlib import resources
from scipy.signal import fftconvolve
from astropy.table import Table

import specter.psf

from desispec.calibfinder import findcalibfile,CalibFinder
from desispec.xytraceset import XYTraceSet
from desispec.io.xytraceset import read_xytraceset
from desispec.io import read_image
from desispec.io import shorten_filename
from desiutil.log import get_logger
from desispec.util import parse_int_args
from desispec.trace_shifts import write_traces_in_psf,compute_dx_from_cross_dispersion_profiles,compute_dy_from_spectral_cross_correlation,monomials,polynomial_fit,compute_dy_using_boxcar_extraction,compute_dx_dy_using_psf,shift_ycoef_using_external_spectrum,recompute_legendre_coefficients,recompute_legendre_coefficients_for_x,recompute_legendre_coefficients_for_y,list_of_expected_spot_positions,compute_x_offset_from_central_band_cross_dispersion_profile
from desispec.large_trace_shifts import detect_spots_in_image,match_same_system

def parse(options=None):
    parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter,
                                     description="""Measures trace shifts from a preprocessed image and an input psf, and writes the modified trace coordinates in an output psf file to be used for extractions.
Two methods are implemented.
1) cross-correlation : dx shifts are measured from cross-dispersion profiles of traces.
   dy shifts (wavelength calibration) are measured in two steps, an internal calibration determined from the cross-correlation of fiber spectra obtained from a resampled boxcar extraction with their median, and the final wavelength calibration is obtained from the  cross-correlation of the median fiber spectrum (after a second boxcar extraction) with an external spectrum (given with --spectrum option).
   This method is efficient for measuring trace shifts on science exposures with a large sky background.
2) forward model : dy,dy shifts are determined simultaneously by a forward modeling of the image around a given external list of lines (--lines option).
   This method is in principle statistically optimal, but it is slow and cannot be applied to blended and broad sky lines. It is useful to shift traces from arc lamp images (though specex does the same thing in C++).""")

    parser.add_argument('-i','--image', type = str, default = None, required=True,
                        help = 'path of DESI preprocessed fits image')
    parser.add_argument('--psf', type = str, default = None, required=False,
                        help = 'path of DESI psf fits file. If none, will use default PSF from calibration data.')
    parser.add_argument('--lines', type = str, default = None, required=False,
                        help = 'path of lines ASCII file. Using this option changes the fit method.')
    parser.add_argument('--spectrum', type = str, default = None, required=False,
                        help = 'path to an spectrum ASCII file for external wavelength calibration')
    parser.add_argument('--sky', action = 'store_true',
                        help = 'use sky spectrum desispec/data/spec-sky.dat for external wavelength calibration')
    parser.add_argument('--arc-lamps', action = 'store_true',
                        help = 'use arc lamps spectrum desispec/data/spec-arc-lamps.dat for external wavelength calibration')
    parser.add_argument('-o','--outpsf', type = str, default = None, required=True,
                        help = 'path of output PSF with shifted traces')
    parser.add_argument('--outoffsets', type = str, default = None, required=False,
                        help = 'path of output ASCII file with measured offsets for QA')
    parser.add_argument('--degxx', type = int, default = 0, required=False,
                        help = 'polynomial degree for x shifts along x')
    parser.add_argument('--degxy', type = int, default = 0, required=False,
                        help = 'polynomial degree for x shifts along y')
    parser.add_argument('--degyx', type = int, default = 0, required=False,
                        help = 'polynomial degree for y shifts along x')
    parser.add_argument('--degyy', type = int, default = 0, required=False,
                        help = 'polynomial degree for y shifts along y')
    parser.add_argument('--continuum', action='store_true',
                        help = 'only fit shifts along x for continuum or LED input image')
    parser.add_argument('--auto', action='store_true',
                        help = 'choose best method (sky,continuum or just internal calib) from the FLAVOR keyword in the input image header')

    parser.add_argument('--nfibers', type = int, default = None, required=False,
                        help = 'limit the number of fibers for debugging')
    parser.add_argument('--max-error', type = float, default = 0.05 , required=False,
                        help = "max statistical error threshold to automatically lower polynomial degree")
    parser.add_argument('--width', type = int, default = 7 , required=False,
                        help = "width of cross-dispersion profile")
    parser.add_argument('--ccd-rows-rebin', type = int, default = 4 , required=False,
                        help = "rebinning of CCD rows to run faster")
    parser.add_argument('--no-large-shift-scan', action="store_true",
                        help = "do not perform a large shift scan for arc lamp or continuum exposures")
    args = parser.parse_args(options)

    return args


def read_specter_psf(filename) :
    hdulist = pyfits.open(filename)
    head=hdulist[0].header
    if "PSFTYPE" not in head :
        raise KeyError("No PSFTYPE in PSF header, cannot load this PSF in specter")
    psftype=head["PSFTYPE"]
    hdulist.close()

    if psftype=="GAUSS-HERMITE" :
        psf = specter.psf.GaussHermitePSF(filename)
    elif psftype=="SPOTGRID" :
        psf = specter.psf.SpotGridPSF(filename)
    else :
        raise ValueError("Unknown PSFTYPE='{}'".format(psftype))
    return psf


[docs] def fit_xcoeff_ycoeff(dx,ex,x_for_dx,y_for_dx,degxx,degxy, dy,ey,x_for_dy,y_for_dy,degyx,degyy,tset,max_error) : ''' Fit offsets dx and dy as a function of x and y using low order Legendre polynomials and then refit using those the full Legendre coefficients (per fiber and wavelength) of the trace sets. Args : dx = 1D array of offsets along X (in pixels) ex = 1D array of errors on offsets along X (in pixels), same size as dx x_for_dx = 1D array of X coordinates of points where the dx offsets are measured (in pixels), same size as dx y_for_dx = 1D array of Y coordinates of points where the dx offsets are measured (in pixels), same size as dx degxx = prefered Legendre polynomial degree along X for the fit of dx(x,y) degxy = prefered Legendre polynomial degree along Y for the fit of dx(x,y) dy = 1D array of offsets along Y (in pixels) ey = 1D array of errors on offsets along Y (in pixels), same size as dy x_for_dy = 1D array of X coordinates of points where the dy offsets are measured (in pixels), same size as dy y_for_dy = 1D array of Y coordinates of points where the dy offsets are measured (in pixels), same size as dy degyx = prefered Legendre polynomial degree along X for the fit of dy(x,y) degyy = prefered Legendre polynomial degree along Y for the fit of dy(x,y) tset = xytracset instance max_error = max allowed uncertainty on dx or dy before automatically reducing the Legendre polynomial degree Returns : xcoeff = 2D array with new array of Legendre coeffcient for the traceset tsec for X(fiber,wavelength) ycoeff = 2D array with new array of Legendre coeffcient for the traceset tsec for Y(fiber,wavelength) ''' log = get_logger() xcoef = tset.x_vs_wave_traceset._coeff ycoef = tset.y_vs_wave_traceset._coeff nfibers = xcoef.shape[0] n = 0 nloops = max(degxx, degyx) + max(degxy, degyy) while True: # loop because polynomial degrees could be reduced # Try fitting offsets. log.info("polynomial fit of measured offsets with degx=(%d,%d) degy=(%d,%d)"%(degxx,degxy,degyx,degyy)) try : dx_coeff,dx_coeff_covariance,dx_errorfloor,dx_mod,dx_mask=polynomial_fit(z=dx,ez=ex,xx=x_for_dx,yy=y_for_dx,degx=degxx,degy=degxy) dy_coeff,dy_coeff_covariance,dy_errorfloor,dy_mod,dy_mask=polynomial_fit(z=dy,ez=ey,xx=x_for_dy,yy=y_for_dy,degx=degyx,degy=degyy) log.info("dx dy error floor = %4.3f %4.3f pixels"%(dx_errorfloor,dy_errorfloor)) #log.info("check fit uncertainties are ok on edge of CCD") merr=0. for fiber in [0,nfibers-1] : for rw in [-1,1] : tx = legval(rw,xcoef[fiber]) ty = legval(rw,ycoef[fiber]) m=monomials(tx,ty,degxx,degxy) tdx=np.inner(dx_coeff,m) tsx=np.sqrt(np.inner(m,dx_coeff_covariance.dot(m))) m=monomials(tx,ty,degyx,degyy) tdy=np.inner(dy_coeff,m) tsy=np.sqrt(np.inner(m,dy_coeff_covariance.dot(m))) merr=max(merr,tsx) merr=max(merr,tsy) log.info("max edge shift error = %4.3f pixels"%merr) if degxx==0 and degxy==0 and degyx==0 and degyy==0 : break except ( LinAlgError , ValueError ) : log.warning("polynomial fit failed with degx=(%d,%d) degy=(%d,%d)"%(degxx,degxy,degyx,degyy)) if degxx==0 and degxy==0 and degyx==0 and degyy==0 : log.error("polynomial degrees are already 0. we can't fit the offsets") raise RuntimeError("polynomial degrees are already 0. we can't fit the offsets") merr = 100000. # this will lower the pol. degree. if merr > max_error : if merr != 100000. : log.warning("max edge shift error = %4.3f pixels is too large, reducing degrees"%merr) if (degxy>0 or degyy>0) and (degxy>degxx or degyy>degyx): # first along wavelength if degxy>0 : degxy-=1 if degyy>0 : degyy-=1 else : # then along fiber if degxx>0 : degxx-=1 if degyx>0 : degyx-=1 else : # error is ok, so we quit the loop break # Sanity check to ensure looping is not infinite. n += 1 if n > nloops: raise RuntimeError(f'Maximum fit iterations {nloops} exceeded.') # print central shift mx=np.median(x_for_dx) my=np.median(y_for_dx) m=monomials(mx,my,degxx,degxy) mdx=np.inner(dx_coeff,m) mex=np.sqrt(np.inner(m,dx_coeff_covariance.dot(m))) mx=np.median(x_for_dy) my=np.median(y_for_dy) m=monomials(mx,my,degyx,degyy) mdy=np.inner(dy_coeff,m) mey=np.sqrt(np.inner(m,dy_coeff_covariance.dot(m))) log.info("central shifts dx = %4.3f +- %4.3f dy = %4.3f +- %4.3f "%(mdx,mex,mdy,mey)) new_xcoeff , new_ycoeff = recompute_legendre_coefficients(xcoef=tset.x_vs_wave_traceset._coeff, ycoef=tset.y_vs_wave_traceset._coeff, wavemin=tset.wavemin, wavemax=tset.wavemax, degxx=degxx,degxy=degxy,degyx=degyx,degyy=degyy, dx_coeff=dx_coeff,dy_coeff=dy_coeff) return new_xcoeff , new_ycoeff
[docs] def fit_xcoeff(dx,ex,x_for_dx,y_for_dx,degxx,degxy,tset,max_error) : ''' Fit offsets dx as a function of x and y using low order Legendre polynomials and then refit using those the full Legendre coefficients (per fiber and wavelength) of the trace sets. Args : dx = 1D array of offsets along X (in pixels) ex = 1D array of errors on offsets along X (in pixels), same size as dx x_for_dx = 1D array of X coordinates of points where the dx offsets are measured (in pixels), same size as dx y_for_dx = 1D array of Y coordinates of points where the dx offsets are measured (in pixels), same size as dx degxx = prefered Legendre polynomial degree along X for the fit of dx(x,y) degxy = prefered Legendre polynomial degree along Y for the fit of dx(x,y) tset = xytracset instance max_error = max allowed uncertainty on dx or dy before automatically reducing the Legendre polynomial degree Returns : xcoeff = 2D array with new array of Legendre coeffcient for the traceset tsec for X(fiber,wavelength) ''' log = get_logger() xcoef = tset.x_vs_wave_traceset._coeff ycoef = tset.x_vs_wave_traceset._coeff nfibers = xcoef.shape[0] n = 0 nloops = degxx + degxy while True: # loop because polynomial degrees could be reduced # Try fitting offsets. log.debug("polynomial fit of measured offsets with degx=(%d,%d)"%(degxx,degxy)) try : dx_coeff,dx_coeff_covariance,dx_errorfloor,dx_mod,dx_mask=polynomial_fit(z=dx,ez=ex,xx=x_for_dx,yy=y_for_dx,degx=degxx,degy=degxy) if dx_errorfloor>0.1 : log.info("dx error floor = %4.3f pixels"%(dx_errorfloor)) log.debug("check fit uncertainties are ok on edge of CCD") merr=0. for fiber in [0,nfibers-1] : for rw in [-1,1] : tx = legval(rw,xcoef[fiber]) ty = legval(rw,ycoef[fiber]) m=monomials(tx,ty,degxx,degxy) tdx=np.inner(dx_coeff,m) tsx=np.sqrt(np.inner(m,dx_coeff_covariance.dot(m))) merr=max(merr,tsx) log.debug("max edge shift error = %4.3f pixels"%merr) if degxx==0 and degxy==0 : break except ( LinAlgError , ValueError ) : log.warning("polynomial fit failed with degx=(%d,%d)"%(degxx,degxy)) if degxx==0 and degxy==0 : log.error("polynomial degrees are already 0. we can't fit the offsets") raise RuntimeError("polynomial degrees are already 0. we can't fit the offsets") merr = 100000. # this will lower the pol. degree. if merr > max_error : if merr != 100000. : log.warning("max edge shift error = %4.3f pixels is too large, reducing degrees"%merr) if degxy>0 and degxy>degxx : # first along wavelength if degxy>0 : degxy-=1 else : # then along fiber if degxx>0 : degxx-=1 else : # error is ok, so we quit the loop break # Sanity check to ensure looping is not infinite. n += 1 if n > nloops: raise RuntimeError(f'Maximum fit iterations {nloops} exceeded.') # print central shift mx=np.median(x_for_dx) my=np.median(y_for_dx) m=monomials(mx,my,degxx,degxy) mdx=np.inner(dx_coeff,m) mex=np.sqrt(np.inner(m,dx_coeff_covariance.dot(m))) log.info("central shifts dx = %4.3f +- %4.3f"%(mdx,mex)) new_xcoeff = recompute_legendre_coefficients_for_x(xcoef=tset.x_vs_wave_traceset._coeff, ycoef=tset.y_vs_wave_traceset._coeff, wavemin=tset.wavemin, wavemax=tset.wavemax, degxx=degxx,degxy=degxy, dx_coeff=dx_coeff) return new_xcoeff
[docs] def fit_ycoeff(dy,ey,x_for_dy,y_for_dy,degyx,degyy,tset,max_error) : ''' Fit offsets dy as a function of x and y using low order Legendre polynomials and then refit using those the full Legendre coefficients (per fiber and wavelength) of the trace sets. Args : dy = 1D array of offsets along Y (in pixels) ey = 1D array of errors on offsets along Y (in pixels), same size as dy x_for_dy = 1D array of X coordinates of points where the dy offsets are measured (in pixels), same size as dy y_for_dy = 1D array of Y coordinates of points where the dy offsets are measured (in pixels), same size as dy degyx = prefered Legendre polynomial degree along X for the fit of dy(x,y) degyy = prefered Legendre polynomial degree along Y for the fit of dy(x,y) tset = xytracset instance max_error = max allowed uncertainty on dx or dy before automatically reducing the Legendre polynomial degree Returns : ycoeff = 2D array with new array of Legendre coeffcient for the traceset tsec for Y(fiber,wavelength) ''' log = get_logger() xcoef = tset.x_vs_wave_traceset._coeff ycoef = tset.y_vs_wave_traceset._coeff nfibers = xcoef.shape[0] n = 0 nloops = degyx + degyy while True: # loop because polynomial degrees could be reduced # Try fitting offsets. log.info("polynomial fit of measured offsets with degy=(%d,%d)"%(degyx,degyy)) try : dy_coeff,dy_coeff_covariance,dy_errorfloor,dy_mod,dy_mask=polynomial_fit(z=dy,ez=ey,xx=x_for_dy,yy=y_for_dy,degx=degyx,degy=degyy) if dy_errorfloor>0.1 : log.info("dx error floor = %4.3f pixels"%(dy_errorfloor)) #log.info("check fit uncertainties are ok on edge of CCD") merr=0. for fiber in [0,nfibers-1] : for rw in [-1,1] : tx = legval(rw,xcoef[fiber]) ty = legval(rw,ycoef[fiber]) m=monomials(tx,ty,degyx,degyy) tdy=np.inner(dy_coeff,m) tsy=np.sqrt(np.inner(m,dy_coeff_covariance.dot(m))) merr=max(merr,tsy) log.info("max edge shift error = %4.3f pixels"%merr) if degyx==0 and degyy==0 : break except ( LinAlgError , ValueError ) : log.warning("polynomial fit failed with degy=(%d,%d)"%(degyx,degyy)) if degyx==0 and degyy==0 : log.error("polynomial degrees are already 0. we can't fit the offsets") raise RuntimeError("polynomial degrees are already 0. we can't fit the offsets") merr = 100000. # this will lower the pol. degree. if merr > max_error : if merr != 100000. : log.warning("max edge shift error = %4.3f pixels is too large, reducing degrees"%merr) if degyy>0 and degyy>degyx: # first along wavelength if degyy>0 : degyy-=1 else : # then along fiber if degyx>0 : degyx-=1 else : # error is ok, so we quit the loop break # Sanity check to ensure looping is not infinite. n += 1 if n > nloops: raise RuntimeError(f'Maximum fit iterations {nloops} exceeded.') # print central shift mx=np.median(x_for_dy) my=np.median(y_for_dy) m=monomials(mx,my,degyx,degyy) mdy=np.inner(dy_coeff,m) mey=np.sqrt(np.inner(m,dy_coeff_covariance.dot(m))) log.info("central shifts dy = %4.3f +- %4.3f "%(mdy,mey)) new_ycoeff = recompute_legendre_coefficients_for_y(xcoef=tset.x_vs_wave_traceset._coeff, ycoef=tset.y_vs_wave_traceset._coeff, wavemin=tset.wavemin, wavemax=tset.wavemax, degyx=degyx,degyy=degyy, dy_coeff=dy_coeff) return new_ycoeff
[docs] def fit_trace_shifts(image, args): """ Perform the fitting of shifts of spectral traces This consists of two steps, one is internal, by cross-correlating spectra to themselves, and then cross-correlating to external (ususally sky) spectrum Return updated traceset and two dictionaies with offset information to be written in the PSF file """ global psfs log=get_logger() log.info("starting") if args.psf is None : args.psf = findcalibfile([image.meta],"PSF") log.info(f"Will use default PSF: {args.psf}") tset = read_xytraceset(args.psf) wavemin = tset.wavemin wavemax = tset.wavemax xcoef = tset.x_vs_wave_traceset._coeff ycoef = tset.y_vs_wave_traceset._coeff # keep a copy to compare final shifts at the end benchmark_wave = np.linspace(tset.wavemin,tset.wavemax,5) benchmark_x_in = np.zeros((tset.nspec,benchmark_wave.size)) benchmark_y_in = np.zeros((tset.nspec,benchmark_wave.size)) for s in range(tset.nspec) : benchmark_x_in[s]=tset.x_vs_wave(s,benchmark_wave) benchmark_y_in[s]=tset.y_vs_wave(s,benchmark_wave) nfibers=xcoef.shape[0] log.info("read PSF trace with xcoef.shape = {} , ycoef.shape = {} , and wavelength range {}:{}".format(xcoef.shape,ycoef.shape,int(wavemin),int(wavemax))) lines=None if args.lines is not None : log.info("We will fit the image using the psf model and lines") # read lines lines=np.loadtxt(args.lines,usecols=[0]) ok=(lines>wavemin)&(lines<wavemax) log.info("read {} lines in {}, with {} of them in traces wavelength range".format(len(lines),args.lines,np.sum(ok))) lines=lines[ok] else : log.info("We will do an internal calibration of trace coordinates without using the psf shape in a first step") internal_wavelength_calib = (not args.continuum) if args.auto : log.debug("read flavor of input image") if "FLAVOR" not in image.meta : log.error("no FLAVOR keyword in image header, cannot run with --auto option") raise KeyError("no FLAVOR keyword in image header, cannot run with --auto option") flavor = image.meta["FLAVOR"].strip().lower() log.info("Input is a '{}' image".format(flavor)) if flavor == "flat" : internal_wavelength_calib = False elif flavor == "arc" : internal_wavelength_calib = True args.arc_lamps = True else : internal_wavelength_calib = True args.sky = True log.info("wavelength calib, internal={}, sky={} , arc_lamps={}".format(internal_wavelength_calib,args.sky,args.arc_lamps)) cfinder = CalibFinder([image.meta]) fibers=np.arange(tset.nspec) if cfinder.haskey("BROKENFIBERS") : brokenfibers=parse_int_args(cfinder.value("BROKENFIBERS"))%500 log.debug(f"brokenfibers={brokenfibers}") fibers=fibers[np.isin(fibers, brokenfibers, invert=True)] # shift of traces in PSF delta_xref=0. delta_yref=0. # for arc lamp images, we can predict where we expect to see spots on the CCD image # given an input traceset (variable tset), and use the comparison between the prediction # and actual spots locations to derive a first correction to the coordinates saved in the traceset # this is disabled by the option --no-large-shift-scan if args.arc_lamps and (not args.no_large_shift_scan) : log.info("for arc lamps, find a first solution by comparing expected spots positions with detections over the whole CCD") xref,yref = list_of_expected_spot_positions(tset,fibers) xin,yin = detect_spots_in_image(image) log.info("start match ...") fibersep=7. oversampling=2 nfiberexcursion=25 bestnmatch=0 bestdx=0 bestindices=None bestdistances=None # iterate over max allowed distance for maxdistance in [ 4*fibersep,2*fibersep,fibersep ] : # loop over possible offsets bestnmatch=0 for dx in np.arange(-nfiberexcursion*oversampling,nfiberexcursion*oversampling+1)*fibersep/oversampling : indices,distances = match_same_system(xref+delta_xref+dx,yref+delta_yref,xin,yin,remove_duplicates=True) nmatch=np.sum( (indices>=0)&(distances<maxdistance) ) if nmatch > bestnmatch : bestnmatch = nmatch bestindices = indices bestdistances = distances bestdx=dx # refine measure of best offset log.info(f"best number of match={bestnmatch} for deltax={bestdx+delta_xref}") ok=(bestindices>=0)&(bestdistances<maxdistance) delta_xref = np.median(-xref[ok]+xin[bestindices[ok]]) delta_yref = np.median(-yref[ok]+yin[bestindices[ok]]) median_dist = np.median(np.sqrt((xref[ok]+delta_xref-xin[indices[ok]])**2+(yref[ok]+delta_yref-yin[indices[ok]])**2)) log.info(f"delta x = {delta_xref} , delta y = {delta_yref} , median_dist = {median_dist}") log.info(f"apply best shift delta x = {delta_xref} , delta y = {delta_yref} to traceset") xcoef[:,0] += delta_xref ycoef[:,0] += delta_yref # for continuum images, we can predict where we expect to see spectral traces on the CCD image # given an input traceset (variable tset), and use the comparison between the prediction # and actual spectral traces to derive a first correction to the X coordinates saved in the traceset # this is disabled by the option --no-large-shift-scan if args.continuum and (not args.no_large_shift_scan) : log.info("for continuum or LED exposures, find a first solution on delta_X by comparing a wide cross-dispersion profile with expectations") delta_xref = compute_x_offset_from_central_band_cross_dispersion_profile(tset, image, fibers=fibers) log.info(f"apply best shift delta x = {delta_xref} to traceset") xcoef[:,0] += delta_xref spectrum_filename = args.spectrum continuum_subtract = False if args.sky : continuum_subtract = True srch_file = "data/spec-sky.dat" if not resources.files('desispec').joinpath(srch_file).is_file(): log.error("Cannot find sky spectrum file {:s}".format(srch_file)) raise RuntimeError("Cannot find sky spectrum file {:s}".format(srch_file)) spectrum_filename = resources.files('desispec').joinpath(srch_file) elif args.arc_lamps : srch_file = "data/spec-arc-lamps.dat" if not resources.files('desispec').joinpath(srch_file).is_file(): log.error("Cannot find arc lamps spectrum file {:s}".format(srch_file)) raise RuntimeError("Cannot find arc lamps spectrum file {:s}".format(srch_file)) spectrum_filename = resources.files('desispec').joinpath(srch_file) if spectrum_filename is not None : log.info("Use external calibration from cross-correlation with {}".format(spectrum_filename)) if args.nfibers is not None : nfibers = args.nfibers # FOR DEBUGGING fibers=np.arange(nfibers) internal_offset_info = None specter_psf = None if lines is not None : # use a forward modeling of the image # it's slower and works only for individual lines # it's in principle more accurate # but gives systematic residuals for complex spectra like the sky if specter_psf is None : specter_psf = read_specter_psf(args.psf) x,y,dx,ex,dy,ey,fiber_xy,wave_xy=compute_dx_dy_using_psf(specter_psf,image,fibers,lines) x_for_dx=x y_for_dx=y fiber_for_dx=fiber_xy wave_for_dx=wave_xy x_for_dy=x y_for_dy=y fiber_for_dy=fiber_xy wave_for_dy=wave_xy else : # internal calibration method that does not use the psf # nor a prior set of lines. this method is much faster degxx=args.degxx degxy=args.degxy degyx=args.degyx degyy=args.degyy for iteration in range(10) : # measure x shifts x_for_dx,y_for_dx,dx,ex,fiber_for_dx,wave_for_dx = compute_dx_from_cross_dispersion_profiles(xcoef,ycoef,wavemin,wavemax, image=image, fibers=fibers, width=args.width, deg=args.degxy,image_rebin=args.ccd_rows_rebin) if iteration == 0 : # if any quadrant is masked, reduce to a single offset hy = image.pix.shape[0] // 2 hx = image.pix.shape[1] // 2 allgood = True for curxop in [np.less, np.greater]: for curyop in [np.less, np.greater]: allgood &= np.any(curxop(x_for_dx, hx) & curyop(y_for_dx, hy)) # some data in this quadrant if not allgood : log.warning("No shift data for at least one quadrant of the CCD, falls back to deg=0 shift") degxx=0 degxy=0 degyx=0 degyy=0 new_xcoeff = fit_xcoeff(dx,ex,x_for_dx,y_for_dx,degxx,degxy,tset,args.max_error) maxdx = np.max(np.abs(new_xcoeff[:,0]-xcoef[:,0])) tset.x_vs_wave_traceset._coeff = new_xcoeff xcoef = tset.x_vs_wave_traceset._coeff if maxdx<0.001 : break if internal_wavelength_calib and (degyx>0 or degyy>0) : for iteration in range(10) : # measure y shifts x_for_dy,y_for_dy,dy,ey,fiber_for_dy,wave_for_dy,dwave,dwave_err = compute_dy_using_boxcar_extraction(tset, image=image, fibers=fibers, width=args.width, continuum_subtract=continuum_subtract) mdy = np.median(dy) log.info("Subtract median(dy)={}".format(mdy)) dy -= mdy # remove median, because this is an internal calibration new_ycoeff = fit_ycoeff(dy,ey,x_for_dy,y_for_dy,degyx,degyy,tset,args.max_error) meandy = np.mean(new_ycoeff[:,0]-ycoef[:,0]) maxdy = np.max(np.abs(new_ycoeff[:,0]-ycoef[:,0]-meandy)) # remove mean because of the median dy correction above log.info("iteration = {} maxdy={:0.4f}".format(iteration,maxdy)) tset.y_vs_wave_traceset._coeff = new_ycoeff ycoef = tset.y_vs_wave_traceset._coeff if maxdy<0.001 : break internal_offset_info = dict(wave=wave_for_dy, fiber=fiber_for_dy, dwave=dwave, dwave_err=dwave_err) else : # duplicate dx results with zero shift to avoid write special case code below x_for_dy = x_for_dx.copy() y_for_dy = y_for_dx.copy() dy = np.zeros(dx.shape) ey = 1.e-6*np.ones(ex.shape) fiber_for_dy = fiber_for_dx.copy() wave_for_dy = wave_for_dx.copy() # write this for debugging if args.outoffsets : t = Table() t["AXIS"]=np.hstack([np.zeros(dy.size),np.ones(dx.size)]) t["WAVELENGTH"]=np.hstack([wave_for_dy,wave_for_dx]) t["FIBER"]=np.hstack([fiber_for_dy,fiber_for_dx]) t["X"]=np.hstack([x_for_dy,x_for_dx]) t["Y"]=np.hstack([y_for_dy,y_for_dx]) t["DELTA"]=np.hstack([dy+delta_yref,dx+delta_xref]) t["ERROR"]=np.hstack([ey,ex]) t.write(args.outoffsets,overwrite=True) log.info("wrote offsets table in %s"%args.outoffsets) #tset.x_vs_wave_traceset._coeff,tset.y_vs_wave_traceset._coeff = fit_xcoeff_ycoeff(dx,ex,x_for_dx,y_for_dx,degxx,degxy, # dy,ey,x_for_dy,y_for_dy,degyx,degyy,tset,args.max_error) # compute x y to record max deviations wave = np.linspace(tset.wavemin,tset.wavemax,5) x0 = np.zeros((tset.nspec,wave.size)) y0 = np.zeros((tset.nspec,wave.size)) for s in range(tset.nspec) : x0[s]=tset.x_vs_wave(s,wave) y0[s]=tset.y_vs_wave(s,wave) # use an input spectrum as an external calibration of wavelength if spectrum_filename is not None : # the psf is used only to convolve the input spectrum # the traceset of the psf is not used here psf = read_specter_psf(args.psf) (tset.y_vs_wave_traceset._coeff, wave_external, dwave_external, dwave_err_external) = shift_ycoef_using_external_spectrum(psf=psf, xytraceset=tset, image=image, fibers=fibers, spectrum_filename=spectrum_filename, degyy=args.degyy, width=7) external_offset_info = dict(wave=wave_external, dwave=dwave_external, dwave_err=dwave_err_external) else: external_offset_info = None benchmark_x_out = np.zeros((tset.nspec,benchmark_wave.size)) benchmark_y_out = np.zeros((tset.nspec,benchmark_wave.size)) for s in range(tset.nspec) : benchmark_x_out[s]=tset.x_vs_wave(s,benchmark_wave) benchmark_y_out[s]=tset.y_vs_wave(s,benchmark_wave) dx = benchmark_x_out - benchmark_x_in dy = benchmark_y_out - benchmark_y_in if tset.meta is None : tset.meta = dict() meandx=np.mean(dx) meandy=np.mean(dy) tset.meta["MEANDX"]=meandx tset.meta["MINDX"]=np.min(dx) tset.meta["MAXDX"]=np.max(dx) tset.meta["MEANDY"]=meandy tset.meta["MINDY"]=np.min(dy) tset.meta["MAXDY"]=np.max(dy) log.info("MEANDX = {:+0.2f} pixel".format(meandx)) log.info("MEANDY = {:+0.2f} pixel".format(meandy)) return tset, internal_offset_info, external_offset_info
def main(args=None) : log= get_logger() if not isinstance(args, argparse.Namespace): args = parse(args) log.info("degxx={} degxy={} degyx={} degyy={}".format(args.degxx,args.degxy,args.degyx,args.degyy)) # read preprocessed image image=read_image(args.image) log.info("read image {}".format(args.image)) if image.mask is not None : image.ivar *= (image.mask==0) if np.all(image.ivar == 0.0): log.critical(f"Entire {os.path.basename(args.image)} image is masked; can't fit traceshifts") sys.exit(1) tset, internal_offset_info, external_offset_info = fit_trace_shifts(image=image, args=args) tset.meta['IN_PSF'] = shorten_filename(args.psf) tset.meta['IN_IMAGE'] = shorten_filename(args.image) if args.outpsf is not None : write_traces_in_psf(args.psf,args.outpsf,tset, internal_offset_info=internal_offset_info, external_offset_info=external_offset_info) log.info("wrote modified PSF in %s"%args.outpsf)