Source code for desispec.io.xytraceset

"""
desispec.io.xytraceset
======================

I/O routines for XYTraceSet objects.
"""

import os.path
import time
import numpy as np
from astropy.io import fits

from ..xytraceset import XYTraceSet
from desiutil.log import get_logger
from .util import makepath, get_tempfilename
from . import iotime

def _traceset_from_image(wavemin,wavemax,hdu,label=None) :
    log=get_logger()
    head=hdu.header
    extname=head["EXTNAME"]
    if wavemin is not None :
        if abs(head["WAVEMIN"]-wavemin)>0.001 :
            mess="WAVEMIN not matching in hdu {} {}!={}".format(extname,head["WAVEMIN"],wavemin)
            log.error(mess)
            raise ValueError(mess)
    else :
        wavemin=head["WAVEMIN"]
    if wavemax is not None :
        if abs(head["WAVEMAX"]-wavemax)>0.001 :
            mess="WAVEMAX not matching in hdu {} {}!={}".format(extname,head["WAVEMAX"],wavemax)
            log.error(mess)
            raise ValueError(mess)
    else :
        wavemax=head["WAVEMAX"]
    if label is not None :
        log.debug("read {} from hdu {}".format(label,extname))
    else :
        log.debug("read coefficients from hdu {}".format(label,extname))

    return hdu.data,wavemin,wavemax

def _traceset_from_table(wavemin,wavemax,hdu,pname) :
    log=get_logger()
    head=hdu.header
    table=hdu.data

    extname=head["EXTNAME"]
    i=np.where(table["PARAM"]==pname)[0][0]

    if "WAVEMIN" in table.dtype.names :
        twavemin=table["WAVEMIN"][i]
        if wavemin is not None :
            if abs(twavemin-wavemin)>0.001 :
                mess="WAVEMIN not matching in hdu {} {}!={}".format(extname,twavemin,wavemin)
                log.error(mess)
                raise ValueError(mess)
        else :
            wavemin=twavemin

    if "WAVEMAX" in table.dtype.names :
        twavemax=table["WAVEMAX"][i]
        if wavemax is not None :
            if abs(twavemax-wavemax)>0.001 :
                mess="WAVEMAX not matching in hdu {} {}!={}".format(extname,twavemax,wavemax)
                log.error(mess)
                raise ValueError(mess)
        else :
            wavemax=twavemax

    log.debug("read {} from hdu {}".format(pname,extname))
    return table["COEFF"][i],wavemin,wavemax

[docs]def read_xytraceset(filename) : """ Reads traces in PSF fits file Args: filename : Path to input fits file which has to contain XTRACE and YTRACE HDUs Returns: XYTraceSet object """ #- specter import isolated within function so specter only loaded if #- really needed from specter.util.traceset import TraceSet,fit_traces log=get_logger() xcoef=None ycoef=None xsigcoef=None ysigcoef=None wsigmacoef=None wavemin=None wavemax=None log.info("reading traces in '%s'"%filename) t0 = time.time() fits_file = fits.open(filename) # npix_y, needed for boxcar extractions npix_y=0 for hdu in [0,"XTRACE","PSF"] : if npix_y > 0 : break if hdu in fits_file : head = fits_file[hdu].header if "NPIX_Y" in head : npix_y=int(head["NPIX_Y"]) if npix_y == 0 : raise KeyError("Didn't find head entry NPIX_Y in hdu 0, XTRACE or PSF") log.debug("npix_y={}".format(npix_y)) try : psftype=fits_file[0].header["PSFTYPE"] except KeyError : psftype="" # now read trace coefficients log.debug("psf is a '%s'"%psftype) if psftype == "bootcalib" : xcoef,wavemin,wavemax =_traceset_from_image(wavemin,wavemax,fits_file[0],"xcoef") ycoef,wavemin,wavemax =_traceset_from_image(wavemin,wavemax,fits_file[1],"ycoef") else : for k in ["XTRACE","XCOEF","XCOEFF"] : if k in fits_file : xcoef,wavemin,wavemax =_traceset_from_image(wavemin,wavemax,fits_file[k],"xcoef") for k in ["YTRACE","YCOEF","YCOEFF"] : if k in fits_file : ycoef,wavemin,wavemax =_traceset_from_image(wavemin,wavemax,fits_file[k],"ycoef") for k in ["XSIG"] : if k in fits_file : xsigcoef,wavemin,wavemax =_traceset_from_image(wavemin,wavemax,fits_file[k],"xsigcoef") for k in ["YSIG"] : if k in fits_file : ysigcoef,wavemin,wavemax =_traceset_from_image(wavemin,wavemax,fits_file[k],"ysigcoef") if "WSIGMA" in fits_file : wsigmacoef = fits_file["WSIGMA"].data if psftype == "GAUSS-HERMITE" : # older version where XTRACE and YTRACE are not saved in separate HDUs hdu=fits_file["PSF"] if xcoef is None : xcoef,wavemin,wavemax =_traceset_from_table(wavemin,wavemax,hdu,"X") if ycoef is None : ycoef,wavemin,wavemax =_traceset_from_table(wavemin,wavemax,hdu,"Y") if xsigcoef is None : xsigcoef,wavemin,wavemax =_traceset_from_table(wavemin,wavemax,hdu,"GHSIGX") if ysigcoef is None : ysigcoef,wavemin,wavemax =_traceset_from_table(wavemin,wavemax,hdu,"GHSIGY") log.debug("wavemin={} wavemax={}".format(wavemin,wavemax)) if xcoef is None or ycoef is None : raise ValueError("could not find xcoef and ycoef in psf file %s"%filename) if xcoef.shape[0] != ycoef.shape[0] : raise ValueError("XCOEF and YCOEF don't have same number of fibers %d %d"%(xcoef.shape[0],ycoef.shape[0])) fits_file.close() duration = time.time() - t0 log.info(iotime.format('read', filename, duration)) if wsigmacoef is not None : log.warning("Converting deprecated WSIGMA coefficents (in Ang.) into YSIG (in CCD pixels)") nfiber = wsigmacoef.shape[0] ncoef = wsigmacoef.shape[1] nw = 100 # to get accurate dydw wave = np.linspace(wavemin,wavemax,nw) wsig_set = TraceSet(wsigmacoef,[wavemin,wavemax]) y_set = TraceSet(ycoef,[wavemin,wavemax]) wsig_vals = np.zeros((nfiber,nw)) for f in range(nfiber) : y_vals = y_set.eval(f,wave) dydw = np.gradient(y_vals)/np.gradient(wave) wsig_vals[f]=wsig_set.eval(f,wave)*dydw tset = fit_traces(wave, wsig_vals, deg=ncoef-1, domain=(wavemin,wavemax)) ysigcoef = tset._coeff return XYTraceSet(xcoef,ycoef,wavemin,wavemax,npix_y,xsigcoef=xsigcoef,ysigcoef=ysigcoef)
[docs]def write_xytraceset(outfile,xytraceset) : """ Write a traceset fits file and returns path to file written. Args: outfile: full path to output file xytraceset: desispec.xytraceset.XYTraceSet object Returns: full filepath of output file that was written """ log=get_logger() outfile = makepath(outfile, 'frame') hdus = fits.HDUList() x = fits.PrimaryHDU(xytraceset.x_vs_wave_traceset._coeff.astype('f4')) x.header['EXTNAME'] = "XTRACE" if xytraceset.meta is not None : for k in xytraceset.meta : if not k in x.header : x.header[k]=xytraceset.meta[k] hdus.append(x) hdus.append( fits.ImageHDU(xytraceset.y_vs_wave_traceset._coeff.astype('f4'), name="YTRACE") ) if xytraceset.xsig_vs_wave_traceset is not None : hdus.append( fits.ImageHDU(xytraceset.xsig_vs_wave_traceset._coeff.astype('f4'), name='XSIG') ) if xytraceset.ysig_vs_wave_traceset is not None : hdus.append( fits.ImageHDU(xytraceset.ysig_vs_wave_traceset._coeff.astype('f4'), name='YSIG') ) for hdu in ["XTRACE","YTRACE","XSIG","YSIG"] : if hdu in hdus : hdus[hdu].header["WAVEMIN"] = xytraceset.wavemin hdus[hdu].header["WAVEMAX"] = xytraceset.wavemax hdus[hdu].header["NPIX_Y"] = xytraceset.npix_y t0 = time.time() tmpfile = get_tempfilename(outfile) hdus.writeto(tmpfile, overwrite=True, checksum=True) os.rename(tmpfile, outfile) duration = time.time() - t0 log.info("wrote a xytraceset in {}".format(outfile)) log.info(iotime.format('write', outfile, duration)) return outfile