Source code for desispec.io.skycorr

"""
desispec.io.skycorr
===================

IO routines for sky.
"""
from __future__ import absolute_import, division
import os
import time
from astropy.io import fits
from desiutil.log import get_logger
import numpy as np

from . import iotime
from .util import get_tempfilename

[docs]def write_skycorr(outfile, skycorr): """Write sky model. Args: outfile : filename or (night, expid, camera) tuple skycorr : SkyCorr object """ from .util import fitsheader, makepath log = get_logger() outfile = makepath(outfile, 'skycorr') #- Convert header to fits.Header if needed hdr = fitsheader(skycorr.header) hx = fits.HDUList() hdr['EXTNAME'] = 'WAVELENGTH' hx.append( fits.PrimaryHDU(skycorr.wave.astype('f8'), header=hdr) ) hx.append( fits.ImageHDU(skycorr.dwave.astype('f4'), name='DWAVE') ) hx.append( fits.ImageHDU(skycorr.dlsf.astype('f4'), name='DLSF') ) hx['DWAVE'].header['BUNIT'] = 'Angstrom' hx['DLSF'].header['BUNIT'] = 'Angstrom' t0 = time.time() tmpfile = get_tempfilename(outfile) hx.writeto(tmpfile, overwrite=True, checksum=True) os.rename(tmpfile, outfile) duration = time.time() - t0 log.info(iotime.format('write', outfile, duration)) return outfile
[docs]def read_skycorr(filename) : """Read sky correction and return SkyCorr object with attributes wave, flux, ivar, mask, header. skymodel.wave is 1D common wavelength grid, the others are 2D[nspec, nwave] """ from .meta import findfile from .util import native_endian, checkgzip from ..skycorr import SkyCorr log = get_logger() #- check if filename is (night, expid, camera) tuple instead if not isinstance(filename, str): night, expid, camera = filename filename = findfile('skycorr', night, expid, camera) t0 = time.time() filename = checkgzip(filename) fx = fits.open(filename, memmap=False, uint=True) hdr = fx[0].header wave = native_endian(fx["WAVELENGTH"].data.astype('f8')) dwave = native_endian(fx["DWAVE"].data.astype('f8')) dlsf = native_endian(fx["DLSF"].data.astype('f8')) fx.close() duration = time.time() - t0 log.info(iotime.format('read', filename, duration)) return SkyCorr(wave, dwave, dlsf, header=hdr)
[docs]def write_skycorr_pca(outfile, skycorrpca): """Write sky model. Args: outfile : filename or (night, expid, camera) tuple skycorrpca : SkyCorrPCA object """ from .util import fitsheader, makepath log = get_logger() outfile = makepath(outfile, 'skycorr') #- Convert header to fits.Header if needed hdr = fitsheader(skycorrpca.header) hx = fits.HDUList() hdr['EXTNAME'] = 'DWAVE_MEAN' hx.append( fits.PrimaryHDU(skycorrpca.dwave_mean.astype('f4'), header=hdr) ) for i in range(skycorrpca.dwave_eigenvectors.shape[0]) : hx.append( fits.ImageHDU(skycorrpca.dwave_eigenvectors[i].astype('f4'), name='DWAVE_EIG{}'.format(i+1))) hx.append( fits.ImageHDU(skycorrpca.dwave_eigenvalues.astype('f8'), name='DWAVE_EIGENVALS')) hx.append( fits.ImageHDU(skycorrpca.dlsf_mean.astype('f4'), name='DLSF_MEAN')) for i in range(skycorrpca.dlsf_eigenvectors.shape[0]) : hx.append( fits.ImageHDU(skycorrpca.dlsf_eigenvectors[i].astype('f4'), name='DLSF_EIG{}'.format(i+1))) hx.append( fits.ImageHDU(skycorrpca.dlsf_eigenvalues.astype('f8'), name='DLSF_EIGENVALS')) hx.append( fits.ImageHDU(skycorrpca.wave.astype('f8'), name='WAVELENGTH')) t0 = time.time() tmpfile = get_tempfilename(outfile) hx.writeto(tmpfile, overwrite=True, checksum=True) os.rename(tmpfile, outfile) duration = time.time() - t0 log.info(iotime.format('write', outfile, duration)) return outfile
[docs]def read_skycorr_pca(filename) : """Read sky correction pca file and return SkyCorrPCA object. """ from .meta import findfile from .util import native_endian, checkgzip from ..skycorr import SkyCorrPCA log = get_logger() t0 = time.time() filename = checkgzip(filename) fx = fits.open(filename, memmap=False, uint=True) hdr = fx[0].header wave = native_endian(fx["WAVELENGTH"].data.astype('f8')) dwave_mean = native_endian(fx["DWAVE_MEAN"].data.astype('f8')) dwave_eigenvectors = [] for i in range(1,12): key="DWAVE_EIG{}".format(i) if key in fx : dwave_eigenvectors.append(fx[key].data.astype('f8')) else: break dwave_eigenvectors = np.array(dwave_eigenvectors) dwave_eigenvalues = native_endian(fx["DWAVE_EIGENVALS"].data.astype('f8')) dlsf_mean = native_endian(fx["DLSF_MEAN"].data.astype('f8')) dlsf_eigenvectors = [] for i in range(1,12): key="DLSF_EIG{}".format(i) if key in fx : dlsf_eigenvectors.append(fx[key].data.astype('f8')) else: break dlsf_eigenvectors = np.array(dlsf_eigenvectors) dlsf_eigenvalues = native_endian(fx["DLSF_EIGENVALS"].data.astype('f8')) fx.close() duration = time.time() - t0 log.info(iotime.format('read', filename, duration)) return SkyCorrPCA(dwave_mean=dwave_mean, dwave_eigenvectors=dwave_eigenvectors, dwave_eigenvalues=dwave_eigenvalues, dlsf_mean=dlsf_mean, dlsf_eigenvectors=dlsf_eigenvectors, dlsf_eigenvalues=dlsf_eigenvalues, wave=wave, header=hdr)