Source code for desispec.io.sky

"""
desispec.io.sky
===============

IO routines for sky.
"""
from __future__ import absolute_import, division
import os
import time
from astropy.io import fits
from desiutil.log import get_logger

from . import iotime
from .util import get_tempfilename

[docs]def write_sky(outfile, skymodel, header=None): """Write sky model. Args: outfile : filename or (night, expid, camera) tuple skymodel : SkyModel object, with the following attributes wave : 1D wavelength in vacuum Angstroms flux : 2D[nspec, nwave] sky flux ivar : 2D inverse variance of sky flux mask : 2D mask for sky flux stat_ivar : 2D inverse variance of sky flux (statistical only) dwavecoeff : 1D[ncoeff] array of PCA dwavelength coefficients (optional) dlsfcoeff : 1D[ncoeff] array of PCA dlsf coefficients (optional) header : optional fits header data (fits.Header, dict, or list) """ from desiutil.depend import add_dependencies from .util import fitsheader, makepath log = get_logger() outfile = makepath(outfile, 'sky') #- Convert header to fits.Header if needed if header is not None: hdr = fitsheader(header) else: hdr = fitsheader(skymodel.header) add_dependencies(hdr) hx = fits.HDUList() hdr['EXTNAME'] = ('SKY', 'no dimension') hx.append( fits.PrimaryHDU(skymodel.flux.astype('f4'), header=hdr) ) hx.append( fits.ImageHDU(skymodel.ivar.astype('f4'), name='IVAR') ) # hx.append( fits.CompImageHDU(skymodel.mask, name='MASK') ) hx.append( fits.ImageHDU(skymodel.mask, name='MASK') ) hx.append( fits.ImageHDU(skymodel.wave.astype('f4'), name='WAVELENGTH') ) hx[-1].header['BUNIT'] = 'Angstrom' if skymodel.stat_ivar is not None : hx.append( fits.ImageHDU(skymodel.stat_ivar.astype('f4'), name='STATIVAR') ) if skymodel.throughput_corrections is not None: hx.append( fits.ImageHDU(skymodel.throughput_corrections.astype('f4'), name='THRPUTCORR') ) if skymodel.throughput_corrections_model is not None: hx.append( fits.ImageHDU(skymodel.throughput_corrections_model.astype('f4'), name='THRPUTCORR_MOD') ) if skymodel.dwavecoeff is not None: hx.append(fits.ImageHDU(skymodel.dwavecoeff.astype('f4'), name='DWAVECOEFF')) if skymodel.dlsfcoeff is not None: hx.append(fits.ImageHDU(skymodel.dlsfcoeff.astype('f4'), name='DLSFCOEFF')) if skymodel.skygradpcacoeff is not None: hx.append(fits.ImageHDU(skymodel.skygradpcacoeff.astype('f4'), name='SKYGRADPCACOEFF')) if skymodel.skytargetid is not None: hx.append(fits.ImageHDU(skymodel.skytargetid, name='SKYTARGETID')) t0 = time.time() tmpfile = get_tempfilename(outfile) hx.writeto(tmpfile, overwrite=True, checksum=True) os.rename(tmpfile, outfile) duration = time.time() - t0 log.info(iotime.format('write', outfile, duration)) return outfile
[docs]def read_sky(filename): """Read sky model and return SkyModel object with attributes wave, flux, ivar, mask, header. skymodel.wave is 1D common wavelength grid, the others are 2D[nspec, nwave] """ from .meta import findfile from .util import native_endian, checkgzip from ..sky import SkyModel log = get_logger() #- check if filename is (night, expid, camera) tuple instead if not isinstance(filename, str): night, expid, camera = filename filename = findfile('sky', night, expid, camera) t0 = time.time() filename = checkgzip(filename) fx = fits.open(filename, memmap=False, uint=True) hdr = fx[0].header wave = native_endian(fx["WAVELENGTH"].data.astype('f8')) skyflux = native_endian(fx["SKY"].data.astype('f8')) ivar = native_endian(fx["IVAR"].data.astype('f8')) mask = native_endian(fx["MASK"].data) if "STATIVAR" in fx : stat_ivar = native_endian(fx["STATIVAR"].data.astype('f8')) else : stat_ivar = None if "THRPUTCORR" in fx : throughput_corrections = native_endian(fx["THRPUTCORR"].data.astype('f8')) else : throughput_corrections = None if "THRPUTCORR_MOD" in fx : throughput_corrections_model = native_endian(fx["THRPUTCORR_MOD"].data.astype('f8')) else : throughput_corrections_model = None if "DWAVECOEFF" in fx : dwavecoeff = native_endian(fx["DWAVECOEFF"].data.astype('f8')) else : dwavecoeff = None if "DLSFCOEFF" in fx : dlsfcoeff = native_endian(fx["DLSFCOEFF"].data.astype('f8')) else : dlsfcoeff = None if "SKYGRADPCACOEFF" in fx: skygradpcacoeff = native_endian(fx['SKYGRADPCACOEFF'].data.astype('f8')) else: skygradpcacoeff = None fx.close() duration = time.time() - t0 log.info(iotime.format('read', filename, duration)) skymodel = SkyModel(wave, skyflux, ivar, mask, header=hdr,stat_ivar=stat_ivar, throughput_corrections=throughput_corrections, throughput_corrections_model=throughput_corrections_model, dwavecoeff=dwavecoeff, dlsfcoeff=dlsfcoeff, skygradpcacoeff=skygradpcacoeff) return skymodel