Source code for desispec.qproc.io

"""
desispec.qproc.io
=================

I/O routines for qproc objects
"""

import os.path

import numpy as np
import scipy, scipy.sparse
from astropy.io import fits
import warnings

from desiutil.depend import add_dependencies
from desiutil.io import encode_table

from desispec.qproc.qframe import QFrame
from desispec.io.util import fitsheader, native_endian, makepath
from desispec.io.util import get_tempfilename
from desiutil.log import get_logger


[docs]def write_qframe(outfile, qframe, header=None, fibermap=None, units=None): """Write a frame fits file and returns path to file written. Args: outfile: full path to output file, or tuple (night, expid, channel) qframe: desispec.qproc.QFrame object with wave, flux, ivar... Optional: header: astropy.io.fits.Header or dict to override frame.header fibermap: table to store as FIBERMAP HDU Returns: full filepath of output file that was written Note: to create a QFrame object to pass into write_qframe, qframe = QFrame(wave, flux, ivar) """ log = get_logger() outfile = makepath(outfile, 'qframe') if header is not None: hdr = fitsheader(header) else: hdr = fitsheader(qframe.meta) add_dependencies(hdr) hdus = fits.HDUList() x = fits.PrimaryHDU(qframe.flux.astype('f4'), header=hdr) x.header['EXTNAME'] = 'FLUX' if units is not None: units = str(units) if 'BUNIT' in hdr and hdr['BUNIT'] != units: log.warning('BUNIT {bunit} != units {units}; using {units}'.format( bunit=hdr['BUNIT'], units=units)) x.header['BUNIT'] = units hdus.append(x) hdus.append( fits.ImageHDU(qframe.ivar.astype('f4'), name='IVAR') ) if qframe.mask is None : qframe.mask=np.zeros(qframe.flux.shape,dtype=np.uint32) # hdus.append( fits.CompImageHDU(qframe.mask, name='MASK') ) hdus.append( fits.ImageHDU(qframe.mask, name='MASK') ) if qframe.sigma is None : qframe.sigma=np.zeros(qframe.flux.shape,dtype=np.float32) hdus.append( fits.ImageHDU(qframe.sigma.astype('f4'), name='YSIGMA') ) hdus.append( fits.ImageHDU(qframe.wave.astype('f8'), name='WAVELENGTH') ) hdus[-1].header['BUNIT'] = 'Angstrom' if fibermap is not None: fibermap = encode_table(fibermap) #- unicode -> bytes fibermap.meta['EXTNAME'] = 'FIBERMAP' hdus.append( fits.convenience.table_to_hdu(fibermap) ) elif qframe.fibermap is not None: fibermap = encode_table(qframe.fibermap) #- unicode -> bytes fibermap.meta['EXTNAME'] = 'FIBERMAP' hdus.append( fits.convenience.table_to_hdu(fibermap) ) elif qframe.spectrograph is not None: x.header['FIBERMIN'] = 500*qframe.spectrograph # Hard-coded (as in desispec.qproc.qframe) else: log.error("You are likely writing a qframe without sufficient fiber info") raise ValueError('no fibermap') tmpfile = get_tempfilename(outfile) hdus.writeto(tmpfile, overwrite=True, checksum=True) os.rename(tmpfile, outfile) return outfile
[docs]def read_qframe(filename, nspec=None, skip_resolution=False): """Reads a frame fits file and returns its data. Args: filename: path to a file skip_resolution: bool, option Speed up read time (>5x) by avoiding the Resolution matrix Returns: desispec.Frame object with attributes wave, flux, ivar, etc. """ log = get_logger() if not os.path.isfile(filename) : raise IOError("cannot open"+filename) fx = fits.open(filename, uint=True, memmap=False) hdr = fx[0].header flux = native_endian(fx['FLUX'].data.astype('f8')) ivar = native_endian(fx['IVAR'].data.astype('f8')) wave = native_endian(fx['WAVELENGTH'].data.astype('f8')) if wave.shape != flux.shape : log.error("{} is not a valid QFrame file because wave.shape != flux.shape".format(filename)) return None if 'MASK' in fx: mask = native_endian(fx['MASK'].data) else: mask = None #- let the Frame object create the default mask if 'SIGMA' in fx: sigma = native_endian(fx['SIGMA'].data.astype('f8')) else: sigma = None if 'FIBERMAP' in fx: fibermap = fx['FIBERMAP'].data fibers = fibermap['FIBER'] else: fibermap = None fibers = None fx.close() if nspec is not None: flux = flux[0:nspec] ivar = ivar[0:nspec] if mask is not None: mask = mask[0:nspec] if fibermap is not None: fibermap = fibermap[:][0:nspec] if fibers is not None: fibers = fibers[:][0:nspec] # return flux,ivar,wave,resolution_data, hdr qframe = QFrame(wave, flux, ivar, mask=mask, sigma=sigma, meta=hdr, fibermap=fibermap, fibers=fibers) # Return return qframe