Source code for desispec.io.fluxcalibration

"""
desispec.io.fluxcalibration
===========================

IO routines for flux calibration.
"""
from __future__ import absolute_import, print_function
import os
import time
from astropy.io import fits
from astropy.io.fits.convenience import table_to_hdu
from astropy.table import Table
import numpy,scipy

from desiutil.depend import add_dependencies
from desiutil.log import get_logger
from desiutil.io import encode_table

from .util import fitsheader, native_endian, makepath, checkgzip
from .util import get_tempfilename
from . import iotime

[docs]def write_stdstar_models(norm_modelfile, normalizedFlux, wave, fibers, data, fibermap, input_frames, header=None): """Writes the normalized flux for the best models. Args: norm_modelfile : output file path normalizedFlux : 2D array of flux[nstdstars, nwave] wave : 1D array of wavelengths[nwave] in Angstroms fibers : 1D array of fiberids for these spectra data : meta data table about which templates best fit fibermap : fibermaps rows for the input standard stars input_frames : Table with NIGHT, EXPID, CAMERA of input frames used """ log = get_logger() hdr = fitsheader(header) add_dependencies(hdr) #- support input Table, np.array, and dict data = Table(data) hdr['EXTNAME'] = ('FLUX', '[10**-17 erg/(s cm2 Angstrom)]') hdr['BUNIT'] = ('10**-17 erg/(s cm2 Angstrom)', 'Flux units') hdu1=fits.PrimaryHDU(normalizedFlux.astype('f4'), header=hdr) hdu2 = fits.ImageHDU(wave.astype('f4')) hdu2.header['EXTNAME'] = ('WAVELENGTH', '[Angstrom]') hdu2.header['BUNIT'] = ('Angstrom', 'Wavelength units') hdu3 = fits.ImageHDU(fibers, name='FIBERS') # Create metadata table, using table_to_hdu(Table(data)) to handle # different column dtypes. # Also remove 2D columns like COEFF, which are stored separately metatable = Table(data) for col in list(metatable.colnames): ndim = len(metatable[col].shape) if ndim != 1: log.debug('Dropping %d-dim metadata column %s', ndim, col) metatable.remove_column(col) tbhdu = table_to_hdu(metatable) tbhdu.name = 'METADATA' hdulist=fits.HDUList([hdu1,hdu2,hdu3,tbhdu]) # add coefficients if "COEFF" in data.colnames: hdulist.append(fits.ImageHDU(data["COEFF"],name="COEFF")) fmhdu = table_to_hdu(Table(fibermap)) fmhdu.name = 'FIBERMAP' hdulist.append(fmhdu) inhdu = table_to_hdu(Table(input_frames)) inhdu.name = 'INPUT_FRAMES' hdulist.append(inhdu) t0 = time.time() tmpfile = get_tempfilename(norm_modelfile) hdulist.writeto(tmpfile, overwrite=True, checksum=True) os.rename(tmpfile, norm_modelfile) duration = time.time() - t0 log.info(iotime.format('write', norm_modelfile, duration))
[docs]def read_stdstar_models(filename): """Read stdstar models from filename. Args: filename (str): File containing standard star models. Returns: read_stdstar_models (tuple): flux[nspec, nwave], wave[nwave], fibers[nspec] """ log = get_logger() t0 = time.time() filename = checkgzip(filename) with fits.open(filename, memmap=False) as fx: flux = native_endian(fx['FLUX'].data.astype('f8')) wave = native_endian(fx['WAVELENGTH'].data.astype('f8')) fibers = native_endian(fx['FIBERS'].data) metadata = fx['METADATA'].data duration = time.time() - t0 log.info(iotime.format('read', filename, duration)) return flux, wave, fibers, metadata
[docs]def write_flux_calibration(outfile, fluxcalib, header=None): """Writes flux calibration. Args: outfile : output file name fluxcalib : FluxCalib object Options: header : dict-like object of key/value pairs to include in header """ log = get_logger() hx = fits.HDUList() hdr = fitsheader(header) add_dependencies(hdr) hdr['EXTNAME'] = 'FLUXCALIB' hdr['BUNIT'] = ('10**+17 cm2 count s / erg', 'i.e. (elec/A) / (1e-17 erg/s/cm2/A)') hx.append( fits.PrimaryHDU(fluxcalib.calib.astype('f4'), header=hdr) ) hx.append( fits.ImageHDU(fluxcalib.ivar.astype('f4'), name='IVAR') ) # hx.append( fits.CompImageHDU(fluxcalib.mask, name='MASK') ) hx.append( fits.ImageHDU(fluxcalib.mask, name='MASK') ) hx.append( fits.ImageHDU(fluxcalib.wave.astype('f4'), name='WAVELENGTH') ) hx[-1].header['BUNIT'] = 'Angstrom' if fluxcalib.fibercorr is not None : tbl = encode_table(fluxcalib.fibercorr) #- unicode -> bytes tbl.meta['EXTNAME'] = 'FIBERCORR' hx.append( fits.convenience.table_to_hdu(tbl) ) if fluxcalib.fibercorr_comments is not None : # add comments in header hdu=hx['FIBERCORR'] for i in range(1,999): key = 'TTYPE'+str(i) if key in hdu.header: value = hdu.header[key] if value in fluxcalib.fibercorr_comments.keys() : hdu.header[key] = (value, fluxcalib.fibercorr_comments[value]) if fluxcalib.stdstar_fibermap is not None : fibermap = encode_table(fluxcalib.stdstar_fibermap) #- unicode -> bytes fibermap.meta['EXTNAME'] = 'STDSTAR_FIBERMAP' hx.append( fits.convenience.table_to_hdu(fibermap) ) t0 = time.time() tmpfile = get_tempfilename(outfile) hx.writeto(tmpfile, overwrite=True, checksum=True) os.rename(tmpfile, outfile) duration = time.time() - t0 log.info(iotime.format('write', outfile, duration)) return outfile
[docs]def read_flux_calibration(filename): """Read flux calibration file; returns a FluxCalib object """ # Avoid a circular import conflict at package install/build_sphinx time. from ..fluxcalibration import FluxCalib log = get_logger() t0 = time.time() filename = checkgzip(filename) with fits.open(filename, memmap=False, uint=True) as fx: calib = native_endian(fx[0].data.astype('f8')) ivar = native_endian(fx["IVAR"].data.astype('f8')) mask = native_endian(fx["MASK"].data) wave = native_endian(fx["WAVELENGTH"].data.astype('f8')) header = fx[0].header if 'FIBERCORR' in fx: fibercorr = fx['FIBERCORR'].data # I need to open the header to read the comments fibercorr_comments = dict() head = fx['FIBERCORR'].header for i in range(1,len(fibercorr.columns)+1) : k='TTYPE'+str(i) fibercorr_comments[head[k]]=head.comments[k] else: fibercorr = None fibercorr_comments = None if 'STDSTAR_FIBERMAP' in fx: stdstar_fibermap = fx['STDSTAR_FIBERMAP'].data else : stdstar_fibermap = None duration = time.time() - t0 log.info(iotime.format('read', filename, duration)) fluxcalib = FluxCalib(wave, calib, ivar, mask, fibercorr=fibercorr, fibercorr_comments=fibercorr_comments, stdstar_fibermap = stdstar_fibermap) fluxcalib.header = header return fluxcalib
[docs]def write_average_flux_calibration(outfile, averagefluxcalib): """Writes average flux calibration. Args: outfile : output file name averagefluxcalib : AverageFluxCalib object Options: header : dict-like object of key/value pairs to include in header """ log = get_logger() hx = fits.HDUList() hx.append( fits.PrimaryHDU(averagefluxcalib.average_calib.astype('f4')) ) hx[-1].header['EXTNAME'] = 'FLUXCALIB' hx[-1].header['BUNIT'] = ('10**+17 cm2 count / erg', 'i.e. (elec/A/s) / (1e-17 erg/s/cm2/A)') hx.append( fits.ImageHDU(averagefluxcalib.atmospheric_extinction.astype('f4'), name='ATERM') ) hx[-1].header['PAIRMASS'] = averagefluxcalib.pivot_airmass hx.append( fits.ImageHDU(averagefluxcalib.seeing_term.astype('f4'), name='STERM') ) hx[-1].header['PSEEING'] = averagefluxcalib.pivot_seeing hx.append( fits.ImageHDU(averagefluxcalib.wave.astype('f4'), name='WAVELENGTH') ) hx[-1].header['BUNIT'] = 'Angstrom' if averagefluxcalib.atmospheric_extinction_uncertainty is not None : hx.append( fits.ImageHDU(averagefluxcalib.atmospheric_extinction_uncertainty.astype('f4'), name='ATERM_ERR') ) if averagefluxcalib.seeing_term_uncertainty is not None : hx.append( fits.ImageHDU(averagefluxcalib.seeing_term_uncertainty.astype('f4'), name='STERM_ERR') ) # AR wave-dependent FIBER_FRACFLUX curve for the median seeing and median FIBER_FRACFLUX of the used exposures if averagefluxcalib.ffracflux_wave is not None: hx.append( fits.ImageHDU(averagefluxcalib.ffracflux_wave.astype('f4'), name='FFRACFLUX_WAVE') ) hx[-1].header['MDSEEING'] = averagefluxcalib.median_seeing hx[-1].header['MDFFRACF'] = averagefluxcalib.median_ffracflux hx[-1].header['FACWPOW'] = averagefluxcalib.fac_wave_power hx[-1].header['FSTNIGHT'] = averagefluxcalib.first_night t0 = time.time() tmpfile = get_tempfilename(outfile) hx.writeto(tmpfile, overwrite=True, checksum=True) os.rename(tmpfile, outfile) duration = time.time() - t0 log.info(iotime.format('write', outfile, duration)) return outfile
[docs]def read_average_flux_calibration(filename): """Read average flux calibration file; returns an AverageFluxCalib object """ # Avoid a circular import conflict at package install/build_sphinx time. from ..averagefluxcalibration import AverageFluxCalib log = get_logger() t0 = time.time() filename = checkgzip(filename) with fits.open(filename, memmap=False, uint=True) as fx: average_calib = native_endian(fx[0].data.astype('f8')) atmospheric_extinction = native_endian(fx["ATERM"].data.astype('f8')) seeing_term = native_endian(fx["STERM"].data.astype('f8')) pivot_airmass = fx["ATERM"].header["PAIRMASS"] pivot_seeing = fx["STERM"].header["PSEEING"] wave = native_endian(fx["WAVELENGTH"].data.astype('f8')) if "ATERM_ERR" in fx : atmospheric_extinction_uncertainty = native_endian(fx["ATERM_ERR"].data.astype('f8')) else : atmospheric_extinction_uncertainty = None if "STERM_ERR" in fx : seeing_term_uncertainty = native_endian(fx["STERM_ERR"].data.astype('f8')) else : seeing_term_uncertainty = None if "FFRACFLUX_WAVE" in fx: median_seeing = fx["FFRACFLUX_WAVE"].header["MDSEEING"] median_ffracflux = fx["FFRACFLUX_WAVE"].header["MDFFRACF"] fac_wave_power = fx["FFRACFLUX_WAVE"].header["FACWPOW"] ffracflux_wave = native_endian(fx["FFRACFLUX_WAVE"].data.astype('f8')) first_night = fx["FFRACFLUX_WAVE"].header["FSTNIGHT"] else: median_seeing, median_ffracflux, fac_wave_power, ffracflux_wave = None, None, None, None first_night = None duration = time.time() - t0 log.info(iotime.format('read', filename, duration)) afluxcalib = AverageFluxCalib(wave=wave, average_calib=average_calib, atmospheric_extinction=atmospheric_extinction, seeing_term=seeing_term, pivot_airmass=pivot_airmass, pivot_seeing=pivot_seeing, atmospheric_extinction_uncertainty=atmospheric_extinction_uncertainty, seeing_term_uncertainty=seeing_term_uncertainty, median_seeing=median_seeing, median_ffracflux=median_ffracflux, fac_wave_power=fac_wave_power, ffracflux_wave=ffracflux_wave, first_night=first_night) return afluxcalib
[docs]def read_stdstar_templates(stellarmodelfile): """ Reads an input stellar model file Args: stellarmodelfile : input filename Returns (wave, flux, templateid, teff, logg, feh) tuple: wave : 1D[nwave] array of wavelengths [Angstroms] flux : 2D[nmodel, nwave] array of model fluxes templateid : 1D[nmodel] array of template IDs for each spectrum teff : 1D[nmodel] array of effective temperature for each model logg : 1D[nmodel] array of surface gravity for each model feh : 1D[nmodel] array of metallicity for each model """ log = get_logger() t0 = time.time() stellarmodelfile = checkgzip(stellarmodelfile) phdu=fits.open(stellarmodelfile, memmap=False) #- New templates have wavelength in HDU 2 if len(phdu) >= 3: wavebins = native_endian(phdu[2].data) #- Old templates define wavelength grid in HDU 0 keywords else: hdr0=phdu[0].header crpix1=hdr0['CRPIX1'] crval1=hdr0['CRVAL1'] cdelt1=hdr0['CDELT1'] if hdr0["LOGLAM"]==1: #log bins wavebins=10**(crval1+cdelt1*numpy.arange(len(phdu[0].data[0]))) else: #lin bins model_wave_step = cdelt1 model_wave_offset = (crval1-cdelt1*(crpix1-1)) n_model_wave = phdu[0].data.shape[1] wavebins=model_wave_step*numpy.arange(n_model_wave) + model_wave_offset paramData=phdu[1].data templateid=paramData["TEMPLATEID"] teff=paramData["TEFF"] logg=paramData["LOGG"] feh=paramData["FEH"] fluxData=native_endian(phdu[0].data) phdu.close() duration = time.time() - t0 log.info(iotime.format('read', stellarmodelfile, duration)) return wavebins,fluxData,templateid,teff,logg,feh