Source code for desispec.io.frame

"""
desispec.io.frame
=================

I/O routines for Frame objects
"""
import os.path
import time

import numpy as np
import scipy, scipy.sparse
from astropy.io import fits
from astropy.table import Table
import warnings

from desiutil.depend import add_dependencies
from desiutil.log import get_logger

from ..frame import Frame
from .fibermap import read_fibermap
from .meta import findfile, get_nights, get_exposures
from .util import fitsheader, native_endian, makepath
from . import iotime

[docs]def write_frame(outfile, frame, header=None, fibermap=None, units=None): """Write a frame fits file and returns path to file written. Args: outfile: full path to output file, or tuple (night, expid, channel) frame: desispec.frame.Frame object with wave, flux, ivar... Optional: header: astropy.io.fits.Header or dict to override frame.header fibermap: table to store as FIBERMAP HDU Returns: full filepath of output file that was written Note: to create a Frame object to pass into write_frame, frame = Frame(wave, flux, ivar, resolution_data) """ log = get_logger() outfile = makepath(outfile, 'frame') #- Ignore some known and harmless units warnings import warnings warnings.filterwarnings('ignore', message="'.*nanomaggies.* did not parse as fits unit.*") warnings.filterwarnings('ignore', message=r".*'10\*\*6 arcsec.* did not parse as fits unit.*") if header is not None: hdr = fitsheader(header) else: hdr = fitsheader(frame.meta) add_dependencies(hdr) # Vet diagnosis = frame.vet() if diagnosis != 0: raise IOError("Frame did not pass simple vetting test. diagnosis={:d}".format(diagnosis)) hdus = fits.HDUList() x = fits.PrimaryHDU(frame.flux.astype('f4'), header=hdr) x.header['EXTNAME'] = 'FLUX' if units is not None: units = str(units) if 'BUNIT' in hdr and hdr['BUNIT'] != units: log.warning('BUNIT {bunit} != units {units}; using {units}'.format( bunit=hdr['BUNIT'], units=units)) x.header['BUNIT'] = units hdus.append(x) hdus.append( fits.ImageHDU(frame.ivar.astype('f4'), name='IVAR') ) # hdus.append( fits.CompImageHDU(frame.mask, name='MASK') ) hdus.append( fits.ImageHDU(frame.mask, name='MASK') ) hdus.append( fits.ImageHDU(frame.wave.astype('f8'), name='WAVELENGTH') ) hdus[-1].header['BUNIT'] = 'Angstrom' if frame.resolution_data is not None: hdus.append( fits.ImageHDU(frame.resolution_data.astype('f4'), name='RESOLUTION' ) ) elif frame.wsigma is not None: log.debug("Using ysigma from qproc") qrimg=fits.ImageHDU(frame.wsigma.astype('f4'), name='YSIGMA' ) qrimg.header["NDIAG"] =frame.ndiag hdus.append(qrimg) if fibermap is not None: fibermap = Table(fibermap) fibermap.meta['EXTNAME'] = 'FIBERMAP' add_dependencies(fibermap.meta) hdus.append( fits.convenience.table_to_hdu(fibermap) ) elif frame.fibermap is not None: fibermap = Table(frame.fibermap) fibermap.meta['EXTNAME'] = 'FIBERMAP' hdus.append( fits.convenience.table_to_hdu(fibermap) ) elif frame.spectrograph is not None: x.header['FIBERMIN'] = 500*frame.spectrograph # Hard-coded (as in desispec.frame) else: log.error("You are likely writing a frame without sufficient fiber info") if frame.chi2pix is not None: hdus.append( fits.ImageHDU(frame.chi2pix.astype('f4'), name='CHI2PIX' ) ) if frame.scores is not None : scores_tbl = Table(frame.scores) scores_tbl.meta['EXTNAME'] = 'SCORES' hdus.append( fits.convenience.table_to_hdu(scores_tbl) ) if frame.scores_comments is not None : # add comments in header hdu=hdus['SCORES'] for i in range(1,999): key = 'TTYPE'+str(i) if key in hdu.header: value = hdu.header[key] if value in frame.scores_comments.keys() : hdu.header[key] = (value, frame.scores_comments[value]) t0 = time.time() hdus.writeto(outfile+'.tmp', overwrite=True, checksum=True) os.rename(outfile+'.tmp', outfile) duration = time.time() - t0 log.info(iotime.format('write', outfile, duration)) return outfile
[docs]def read_meta_frame(filename, extname=0): """ Load the meta information of a Frame Args: filename: path to a file extname: int, optional; Extension for grabbing header info Returns: meta: dict or astropy.fits.header """ with fits.open(filename, uint=True, memmap=False) as fx: hdr = fx[extname].header return hdr
[docs]def read_frame(filename, nspec=None, skip_resolution=False): """Reads a frame fits file and returns its data. Args: filename: path to a file, or (night, expid, camera) tuple where night = string YEARMMDD expid = integer exposure ID camera = b0, r1, .. z9 skip_resolution: bool, option Speed up read time (>5x) by avoiding the Resolution matrix Returns: desispec.Frame object with attributes wave, flux, ivar, etc. """ log = get_logger() #- check if filename is (night, expid, camera) tuple instead if not isinstance(filename, str): night, expid, camera = filename filename = findfile('frame', night, expid, camera) if not os.path.isfile(filename): raise FileNotFoundError("cannot open"+filename) t0 = time.time() fx = fits.open(filename, uint=True, memmap=False) hdr = fx[0].header flux = native_endian(fx['FLUX'].data.astype('f8')) ivar = native_endian(fx['IVAR'].data.astype('f8')) wave = native_endian(fx['WAVELENGTH'].data.astype('f8')) if 'MASK' in fx: mask = native_endian(fx['MASK'].data) else: mask = None #- let the Frame object create the default mask # Init resolution_data=None qwsigma=None qndiag=None fibermap = None chi2pix = None scores = None scores_comments = None if skip_resolution: pass elif 'RESOLUTION' in fx: resolution_data = native_endian(fx['RESOLUTION'].data.astype('f8')) elif 'QUICKRESOLUTION' in fx: qr=fx['QUICKRESOLUTION'].header qndiag =qr['NDIAG'] qwsigma=native_endian(fx['QUICKRESOLUTION'].data.astype('f4')) if 'FIBERMAP' in fx: fibermap = read_fibermap(filename) else: fibermap = None if 'CHI2PIX' in fx: chi2pix = native_endian(fx['CHI2PIX'].data.astype('f8')) else: chi2pix = None if 'SCORES' in fx: scores = fx['SCORES'].data # I need to open the header to read the comments scores_comments = dict() head = fx['SCORES'].header for i in range(1,len(scores.columns)+1) : k='TTYPE'+str(i) scores_comments[head[k]]=head.comments[k] else: scores = None scores_comments = None fx.close() duration = time.time() - t0 log.info(iotime.format('read', filename, duration)) if nspec is not None: flux = flux[0:nspec] ivar = ivar[0:nspec] if resolution_data is not None: resolution_data = resolution_data[0:nspec] else: qwsigma=qwsigma[0:nspec] if chi2pix is not None: chi2pix = chi2pix[0:nspec] if mask is not None: mask = mask[0:nspec] # return flux,ivar,wave,resolution_data, hdr frame = Frame(wave, flux, ivar, mask, resolution_data, meta=hdr, fibermap=fibermap, chi2pix=chi2pix, scores=scores,scores_comments=scores_comments, wsigma=qwsigma,ndiag=qndiag, suppress_res_warning=skip_resolution) # This Frame came from a file, so set that frame.filename = os.path.abspath(filename) # Vette diagnosis = frame.vet() if diagnosis != 0: warnings.warn("Frame did not pass simple vetting test. diagnosis={:d}".format(diagnosis)) log.error("Frame did not pass simple vetting test. diagnosis={:d}".format(diagnosis)) # Return return frame
[docs]def search_for_framefile(frame_file, specprod_dir=None): """ Search for an input frame_file in the desispec redux hierarchy Args: frame_file: str specprod_dir: str, optional Returns: mfile: str, full path to frame_file if found else raise error """ log=get_logger() # Parse frame file path, ifile = os.path.split(frame_file) splits = ifile.split('-') root = splits[0] camera = splits[1] fexposure = int(splits[2].split('.')[0]) # Loop on nights nights = get_nights(specprod_dir=specprod_dir) for night in nights: for exposure in get_exposures(night, specprod_dir=specprod_dir): if exposure == fexposure: mfile = findfile(root, camera=camera, night=night, expid=exposure, specprod_dir=specprod_dir) if os.path.isfile(mfile): return mfile else: log.error("Expected file {:s} not found..".format(mfile))