"""
desispec.io.frame
=================
I/O routines for Frame objects
"""
import os.path
import time
import numpy as np
import scipy, scipy.sparse
from astropy.io import fits
from astropy.table import Table
import warnings
from desiutil.depend import add_dependencies
from desiutil.log import get_logger
from ..frame import Frame
from .fibermap import read_fibermap
from .meta import findfile, get_nights, get_exposures
from .util import fitsheader, native_endian, makepath
from . import iotime
[docs]def write_frame(outfile, frame, header=None, fibermap=None, units=None):
"""Write a frame fits file and returns path to file written.
Args:
outfile: full path to output file, or tuple (night, expid, channel)
frame: desispec.frame.Frame object with wave, flux, ivar...
Optional:
header: astropy.io.fits.Header or dict to override frame.header
fibermap: table to store as FIBERMAP HDU
Returns:
full filepath of output file that was written
Note:
to create a Frame object to pass into write_frame,
frame = Frame(wave, flux, ivar, resolution_data)
"""
log = get_logger()
outfile = makepath(outfile, 'frame')
#- Ignore some known and harmless units warnings
import warnings
warnings.filterwarnings('ignore', message="'.*nanomaggies.* did not parse as fits unit.*")
warnings.filterwarnings('ignore', message=r".*'10\*\*6 arcsec.* did not parse as fits unit.*")
if header is not None:
hdr = fitsheader(header)
else:
hdr = fitsheader(frame.meta)
add_dependencies(hdr)
# Vet
diagnosis = frame.vet()
if diagnosis != 0:
raise IOError("Frame did not pass simple vetting test. diagnosis={:d}".format(diagnosis))
hdus = fits.HDUList()
x = fits.PrimaryHDU(frame.flux.astype('f4'), header=hdr)
x.header['EXTNAME'] = 'FLUX'
if units is not None:
units = str(units)
if 'BUNIT' in hdr and hdr['BUNIT'] != units:
log.warning('BUNIT {bunit} != units {units}; using {units}'.format(
bunit=hdr['BUNIT'], units=units))
x.header['BUNIT'] = units
hdus.append(x)
hdus.append( fits.ImageHDU(frame.ivar.astype('f4'), name='IVAR') )
# hdus.append( fits.CompImageHDU(frame.mask, name='MASK') )
hdus.append( fits.ImageHDU(frame.mask, name='MASK') )
hdus.append( fits.ImageHDU(frame.wave.astype('f8'), name='WAVELENGTH') )
hdus[-1].header['BUNIT'] = 'Angstrom'
if frame.resolution_data is not None:
hdus.append( fits.ImageHDU(frame.resolution_data.astype('f4'), name='RESOLUTION' ) )
elif frame.wsigma is not None:
log.debug("Using ysigma from qproc")
qrimg=fits.ImageHDU(frame.wsigma.astype('f4'), name='YSIGMA' )
qrimg.header["NDIAG"] =frame.ndiag
hdus.append(qrimg)
if fibermap is not None:
fibermap = Table(fibermap)
fibermap.meta['EXTNAME'] = 'FIBERMAP'
add_dependencies(fibermap.meta)
hdus.append( fits.convenience.table_to_hdu(fibermap) )
elif frame.fibermap is not None:
fibermap = Table(frame.fibermap)
fibermap.meta['EXTNAME'] = 'FIBERMAP'
hdus.append( fits.convenience.table_to_hdu(fibermap) )
elif frame.spectrograph is not None:
x.header['FIBERMIN'] = 500*frame.spectrograph # Hard-coded (as in desispec.frame)
else:
log.error("You are likely writing a frame without sufficient fiber info")
if frame.chi2pix is not None:
hdus.append( fits.ImageHDU(frame.chi2pix.astype('f4'), name='CHI2PIX' ) )
if frame.scores is not None :
scores_tbl = Table(frame.scores)
scores_tbl.meta['EXTNAME'] = 'SCORES'
hdus.append( fits.convenience.table_to_hdu(scores_tbl) )
if frame.scores_comments is not None : # add comments in header
hdu=hdus['SCORES']
for i in range(1,999):
key = 'TTYPE'+str(i)
if key in hdu.header:
value = hdu.header[key]
if value in frame.scores_comments.keys() :
hdu.header[key] = (value, frame.scores_comments[value])
t0 = time.time()
hdus.writeto(outfile+'.tmp', overwrite=True, checksum=True)
os.rename(outfile+'.tmp', outfile)
duration = time.time() - t0
log.info(iotime.format('write', outfile, duration))
return outfile
[docs]def read_frame(filename, nspec=None, skip_resolution=False):
"""Reads a frame fits file and returns its data.
Args:
filename: path to a file, or (night, expid, camera) tuple where
night = string YEARMMDD
expid = integer exposure ID
camera = b0, r1, .. z9
skip_resolution: bool, option
Speed up read time (>5x) by avoiding the Resolution matrix
Returns:
desispec.Frame object with attributes wave, flux, ivar, etc.
"""
log = get_logger()
#- check if filename is (night, expid, camera) tuple instead
if not isinstance(filename, str):
night, expid, camera = filename
filename = findfile('frame', night, expid, camera)
if not os.path.isfile(filename):
raise FileNotFoundError("cannot open"+filename)
t0 = time.time()
fx = fits.open(filename, uint=True, memmap=False)
hdr = fx[0].header
flux = native_endian(fx['FLUX'].data.astype('f8'))
ivar = native_endian(fx['IVAR'].data.astype('f8'))
wave = native_endian(fx['WAVELENGTH'].data.astype('f8'))
if 'MASK' in fx:
mask = native_endian(fx['MASK'].data)
else:
mask = None #- let the Frame object create the default mask
# Init
resolution_data=None
qwsigma=None
qndiag=None
fibermap = None
chi2pix = None
scores = None
scores_comments = None
if skip_resolution:
pass
elif 'RESOLUTION' in fx:
resolution_data = native_endian(fx['RESOLUTION'].data.astype('f8'))
elif 'QUICKRESOLUTION' in fx:
qr=fx['QUICKRESOLUTION'].header
qndiag =qr['NDIAG']
qwsigma=native_endian(fx['QUICKRESOLUTION'].data.astype('f4'))
if 'FIBERMAP' in fx:
fibermap = read_fibermap(filename)
else:
fibermap = None
if 'CHI2PIX' in fx:
chi2pix = native_endian(fx['CHI2PIX'].data.astype('f8'))
else:
chi2pix = None
if 'SCORES' in fx:
scores = fx['SCORES'].data
# I need to open the header to read the comments
scores_comments = dict()
head = fx['SCORES'].header
for i in range(1,len(scores.columns)+1) :
k='TTYPE'+str(i)
scores_comments[head[k]]=head.comments[k]
else:
scores = None
scores_comments = None
fx.close()
duration = time.time() - t0
log.info(iotime.format('read', filename, duration))
if nspec is not None:
flux = flux[0:nspec]
ivar = ivar[0:nspec]
if resolution_data is not None:
resolution_data = resolution_data[0:nspec]
else:
qwsigma=qwsigma[0:nspec]
if chi2pix is not None:
chi2pix = chi2pix[0:nspec]
if mask is not None:
mask = mask[0:nspec]
# return flux,ivar,wave,resolution_data, hdr
frame = Frame(wave, flux, ivar, mask, resolution_data, meta=hdr, fibermap=fibermap, chi2pix=chi2pix,
scores=scores,scores_comments=scores_comments,
wsigma=qwsigma,ndiag=qndiag, suppress_res_warning=skip_resolution)
# This Frame came from a file, so set that
frame.filename = os.path.abspath(filename)
# Vette
diagnosis = frame.vet()
if diagnosis != 0:
warnings.warn("Frame did not pass simple vetting test. diagnosis={:d}".format(diagnosis))
log.error("Frame did not pass simple vetting test. diagnosis={:d}".format(diagnosis))
# Return
return frame
[docs]def search_for_framefile(frame_file, specprod_dir=None):
""" Search for an input frame_file in the desispec redux hierarchy
Args:
frame_file: str
specprod_dir: str, optional
Returns:
mfile: str, full path to frame_file if found else raise error
"""
log=get_logger()
# Parse frame file
path, ifile = os.path.split(frame_file)
splits = ifile.split('-')
root = splits[0]
camera = splits[1]
fexposure = int(splits[2].split('.')[0])
# Loop on nights
nights = get_nights(specprod_dir=specprod_dir)
for night in nights:
for exposure in get_exposures(night, specprod_dir=specprod_dir):
if exposure == fexposure:
mfile = findfile(root, camera=camera, night=night, expid=exposure, specprod_dir=specprod_dir)
if os.path.isfile(mfile):
return mfile
else:
log.error("Expected file {:s} not found..".format(mfile))