"""
desispec.calibfinder
====================
Reading and selecting calibration data from $DESI_SPECTRO_CALIB using content of image headers
"""
import re
import os
import os.path
import numpy as np
import yaml
from astropy.table import Table
from desispec.util import parse_int_args, header2night
from desiutil.log import get_logger
[docs]def parse_date_obs(value):
'''
converts DATE-OBS keywork to int
with for instance DATE-OBS=2016-12-21T18:06:21.268371-05:00
'''
m = re.search(r'(\d+)-(\d+)-(\d+)T', value)
Y,M,D=tuple(map(int, m.groups()))
dateobs=int(Y*10000+M*100+D)
return dateobs
_sp2sm = None
_sm2sp = None
[docs]def _load_smsp():
"""
Loads $DESI_SPECTRO_CALIB/spec/smsp.txt into global _sp2sm and _sm2sp dicts
"""
global _sp2sm
global _sm2sp
tmp_sp2sm = dict()
tmp_sm2sp = dict()
filename = os.getenv('DESI_SPECTRO_CALIB') + "/spec/smsp.txt"
tmp = np.loadtxt(filename, dtype=str)
for sp, sm in tmp:
p = int(sp[2:])
m = int(sm[2:])
tmp_sp2sm[str(sp)] = str(sm)
tmp_sm2sp[str(sm)] = str(sp)
tmp_sp2sm[p] = m
tmp_sm2sp[m] = p
#- Assign to global variables only after successful loading and parsing
_sp2sm = tmp_sp2sm
_sm2sp = tmp_sm2sp
[docs]def sp2sm(sp):
"""
Converts spectrograph sp logical number to sm hardware number
Args:
sp : spectrograph int 0-9 or str sp[0-9]
Returns "smM" if input is str "spP", or int M if input is int P
Note: uses $DESI_SPECTRO_CALIB/spec/smsp.txt
TODO: add support for different mappings based on night
"""
global _sp2sm
if _sp2sm is None:
_load_smsp()
return _sp2sm[sp]
[docs]def sm2sp(sm, night=None):
"""
Converts spectrograph sm hardware number to sp logical number
Args:
sm : spectrograph sm number 1-10 or str sm[1-10]
Returns "spP" if input is str "smM", or int P if input is int M
Note: uses $DESI_SPECTRO_CALIB/spec/smsp.txt
TODO: add support for different mappings based on night
"""
global _sm2sp
if _sm2sp is None:
_load_smsp()
return _sm2sp[sm]
[docs]def findcalibfile(headers,key,yaml_file=None) :
"""
read and select calibration data file from $DESI_SPECTRO_CALIB using the keywords found in the headers
Args:
headers: list of fits headers, or list of dictionnaries
key: type of calib file, e.g. 'PSF' or 'FIBERFLAT'
Optional:
yaml_file: path to a specific yaml file. By default, the code will
automatically find the yaml file from the environment variable
DESI_SPECTRO_CALIB and the CAMERA keyword in the headers
Returns path to calibration file
"""
cfinder = CalibFinder(headers,yaml_file)
if cfinder.haskey(key) :
return cfinder.findfile(key)
else :
return None
[docs]def ccdregionmask(headers) :
"""
Looks for regions of CCD to mask for a given NIGHT EXPID CAMERA and returns a list of dictionnaries.
NIGHT EXPID CAMERA are retrieved from the input image headers and compared to corresponding columns
in the cvs table $DESI_SPECTRO_CALIB/ccd/ccd-region-mask.csv
Args:
headers: list of fits headers, or list of dictionnaries
Returns list of dictionnaries with keys XMIN, XMAX, YMIN,YMAX
"""
log = get_logger()
ccd_region_mask_filename = os.path.join(os.getenv('DESI_SPECTRO_CALIB'),"ccd/ccd-region-mask.csv")
if not os.path.isfile(ccd_region_mask_filename) :
log.warning(f"No file {ccd_region_mask_filename}")
return list() # empty list
mask_table = Table.read(ccd_region_mask_filename)
head=dict()
keys=["NIGHT","EXPID","CAMERA"]
for k in keys :
for header in headers :
if k in header :
head[k]=header[k]
break
if not k in head.keys() :
log.error(f"Missing key {k} in input headers")
return list() # empty list
entries=np.where((mask_table["NIGHT"]==head["NIGHT"])&(mask_table["EXPID"]==head["EXPID"])&(mask_table["CAMERA"]==head["CAMERA"]))[0]
masks=list()
for entry in entries :
mask=dict()
for k in ["XMIN","XMAX","YMIN","YMAX"] :
mask[k]=int(mask_table[k][entry])
masks.append(mask)
return masks
[docs]def badfibers(headers,keys=["BROKENFIBERS","BADCOLUMNFIBERS","LOWTRANSMISSIONFIBERS"],yaml_file=None) :
"""
find list of bad fibers from $DESI_SPECTRO_CALIB using the keywords found in the headers
Args:
headers: list of fits headers, or list of dictionnaries
Optional:
keys: list of keywords, among ["BROKENFIBERS","BADCOLUMNFIBERS","LOWTRANSMISSIONFIBERS"]. Default is all of them.
yaml_file: path to a specific yaml file. By default, the code will
automatically find the yaml file from the environment variable
DESI_SPECTRO_CALIB and the CAMERA keyword in the headers
Returns List of bad fibers as a 1D array of intergers
"""
cfinder = CalibFinder(headers,yaml_file)
return cfinder.badfibers(keys)
class CalibFinder() :
def __init__(self,headers,yaml_file=None, fallback_on_dark_not_found=False) :
"""
Class to read and select calibration data from $DESI_SPECTRO_CALIB using the keywords found in the headers
Args:
headers: list of fits headers, or list of dictionnaries
Optional:
yaml_file: path to a specific yaml file. By default, the code will
automatically find the yaml file from the environment variable
DESI_SPECTRO_CALIB and the CAMERA keyword in the headers
"""
log = get_logger()
old_version = False
self.fallback_on_dark_not_found=fallback_on_dark_not_found
# temporary backward compatibility
if not "DESI_SPECTRO_CALIB" in os.environ :
if "DESI_CCD_CALIBRATION_DATA" in os.environ :
log.warning("Using deprecated DESI_CCD_CALIBRATION_DATA env. variable to find calibration data\nPlease switch to DESI_SPECTRO_CALIB a.s.a.p.")
self.directory = os.environ["DESI_CCD_CALIBRATION_DATA"]
old_version = True
else :
log.error("Need environment variable DESI_SPECTRO_CALIB")
raise KeyError("Need environment variable DESI_SPECTRO_CALIB")
else :
self.directory = os.environ["DESI_SPECTRO_CALIB"]
if len(headers)==0 :
log.error("Need at least a header")
raise RuntimeError("Need at least a header")
header=dict()
for other_header in headers :
for k in other_header :
if k not in header :
try :
header[k]=other_header[k]
except KeyError :
# it happens with the current version of fitsio
# if the value = 'None'.
pass
# save header in object for reference
self.header = header
if "CAMERA" not in header :
log.error("no 'CAMERA' keyword in header, cannot find calib")
log.error("header is:")
for k in header :
log.error("{} : {}".format(k,header[k]))
raise KeyError("no 'CAMERA' keyword in header, cannot find calib")
log.debug("header['CAMERA']={}".format(header['CAMERA']))
camera=header["CAMERA"].strip().lower()
if "SPECID" in header :
log.debug("header['SPECID']={}".format(header['SPECID']))
specid=int(header["SPECID"])
else :
specid=None
dateobs = header2night(header)
detector=header["DETECTOR"].strip()
if "CCDCFG" in header :
ccdcfg = header["CCDCFG"].strip()
else :
ccdcfg = None
if "CCDTMING" in header :
ccdtming = header["CCDTMING"].strip()
else :
ccdtming = None
log.debug("camera=%s specid=%s detector=%s ccdcfg=%s ccdtming=%s",
camera, specid, detector, ccdcfg, ccdtming)
#if "DOSVER" in header :
# dosver = str(header["DOSVER"]).strip()
#else :
# dosver = None
#if "FEEVER" in header :
# feever = str(header["FEEVER"]).strip()
#else :
# feever = None
# Support simulated data even if $DESI_SPECTRO_CALIB points to
# real data calibrations
self.directory = os.path.normpath(self.directory) # strip trailing /
if detector == "SIM" and (not self.directory.endswith("sim")) :
newdir = os.path.join(self.directory, "sim")
if os.path.isdir(newdir) :
self.directory = newdir
if not os.path.isdir(self.directory):
raise IOError("Calibration directory {} not found".format(self.directory))
if dateobs < 20191211 or detector == 'SIM': # old spectro identifiers
cameraid = camera
spectro=int(camera[-1])
if yaml_file is None :
if old_version :
yaml_file = os.path.join(self.directory,"ccd_calibration.yaml")
else :
yaml_file = "{}/spec/sp{}/{}.yaml".format(self.directory,spectro,cameraid)
else :
if specid is None :
log.error("dateobs = {} >= 20191211 but no SPECID keyword in header!".format(dateobs))
raise RuntimeError("dateobs = {} >= 20191211 but no SPECID keyword in header!".format(dateobs))
log.debug("Use spectrograph hardware identifier SMY")
cameraid = "sm{}-{}".format(specid,camera[0].lower())
if yaml_file is None :
yaml_file = "{}/spec/sm{}/{}.yaml".format(self.directory,specid,cameraid)
if not os.path.isfile(yaml_file) :
log.error("Cannot read {}".format(yaml_file))
raise IOError("Cannot read {}".format(yaml_file))
log.debug("reading calib data in {}".format(yaml_file))
stream = open(yaml_file, 'r')
data = yaml.safe_load(stream)
stream.close()
if not cameraid in data :
log.error("Cannot find data for camera %s in filename %s"%(cameraid,yaml_file))
raise KeyError("Cannot find data for camera %s in filename %s"%(cameraid,yaml_file))
data=data[cameraid]
log.debug("Found %d data for camera %s in filename %s"%(len(data),cameraid,yaml_file))
log.debug("Finding matching version ...")
log.debug("DATE-OBS=%d"%dateobs)
found=False
matching_data=None
for version in data :
log.debug("Checking version %s"%version)
datebegin=int(data[version]["DATE-OBS-BEGIN"])
if dateobs < datebegin :
log.debug("Skip version %s with DATE-OBS-BEGIN=%d > DATE-OBS=%d"%(version,datebegin,dateobs))
continue
if "DATE-OBS-END" in data[version] and data[version]["DATE-OBS-END"].lower() != "none" :
dateend=int(data[version]["DATE-OBS-END"])
if dateobs > dateend :
log.debug("Skip version %s with DATE-OBS-END=%d < DATE-OBS=%d"%(version,datebegin,dateobs))
continue
if detector != data[version]["DETECTOR"].strip() :
log.debug("Skip version %s with DETECTOR=%s != %s"%(version,data[version]["DETECTOR"],detector))
continue
if "CCDCFG" in data[version] :
if ccdcfg is None or ccdcfg != data[version]["CCDCFG"].strip() :
log.debug("Skip version %s with CCDCFG=%s != %s "%(version,data[version]["CCDCFG"],ccdcfg))
continue
if "CCDTMING" in data[version] :
if ccdtming is None or ccdtming != data[version]["CCDTMING"].strip() :
log.debug("Skip version %s with CCDTMING=%s != %s "%(version,data[version]["CCDTMING"],ccdtming))
continue
#if dosver is not None and "DOSVER" in data[version] and dosver != str(data[version]["DOSVER"]).strip() :
# log.debug("Skip version %s with DOSVER=%s != %s "%(version,data[version]["DOSVER"],dosver))
# continue
#if feever is not None and "FEEVER" in data[version] and feever != str(data[version]["FEEVER"]).strip() :
# log.debug("Skip version %s with FEEVER=%s != %s"%(version,data[version]["FEEVER"],feever))
# continue
log.debug("Found data version %s for camera %s in %s"%(version,cameraid,yaml_file))
if found :
log.error("But we already has a match. Please fix this ambiguity in %s"%yaml_file)
raise KeyError("Duplicate possible calibration data. Please fix this ambiguity in %s"%yaml_file)
found=True
matching_data=data[version]
if not found :
log.error("Didn't find matching calibration data in %s"%(yaml_file))
raise KeyError("Didn't find matching calibration data in %s"%(yaml_file))
self.data = matching_data
if "DESI_SPECTRO_DARK" in os.environ:
self.find_darks_in_desi_spectro_dark(header)
def haskey(self,key) :
"""
Args:
key: keyword, string, like 'GAINA'
Returns:
yes or no, boolean
"""
return ( key in self.data )
def value(self,key) :
"""
Args:
key: header keyword, string, like 'GAINA'
Returns:
data found in yaml file
"""
return self.data[key]
def findfile(self,key) :
"""
Args:
key: header keyword, string, like 'DARK'
Returns:
path to calibration file
"""
return os.path.join(self.directory,self.data[key])
def badfibers(self,keys=["BROKENFIBERS","BADCOLUMNFIBERS","LOWTRANSMISSIONFIBERS","BADAMPFIBERS","EXCLUDEFIBERS"]) :
"""
Args:
keys: optional, list of keywords, among BROKENFIBERS,BADCOLUMNFIBERS,LOWTRANSMISSIONFIBERS,BADAMPFIBERS,EXCLUDEFIBERS. Default is all of them.
Returns:
List of bad fibers from yaml file as a 1D array of intergers
"""
log = get_logger()
fibers=[]
badfiber_keywords=["BROKENFIBERS","BADCOLUMNFIBERS","LOWTRANSMISSIONFIBERS","BADAMPFIBERS","EXCLUDEFIBERS"]
for key in keys :
if key not in badfiber_keywords :
log.error(f"key '{key}' not in the list of valid keys for bad fibers: {validkeys}")
continue
if self.haskey(key) :
val = self.value(key)
fibers.append(parse_int_args(val))
if len(fibers)==0 :
return np.array([],dtype=int)
return np.unique(np.hstack(fibers))
def find_darks_in_desi_spectro_dark(self, header):
"""
Function to select dark frames from $DESI_SPECTRO_DARK using the keywords found in the headers
Args:
header: header as created in calibfinder
Updates self in-place
"""
log = get_logger()
#temperature tolerance to be used in K
#only applicable for R,Z as B stores 850 deg throughout
#temperature tolerance set to 3K for the moment, this should be large enough
#to reduce all relevant data without choking (the largest outlier currently being r9 on
#20210610 with ~2.3K at the start and ~2.7K at the end of the night)
temperature_tolerance = 3.
#- Should only be called if $DESI_SPECTRO_DARK is set, but check that
#- to avoid accidentally creating paths like "None/dark_table.csv"
if 'DESI_SPECTRO_DARK' not in os.environ:
msg = '$DESI_SPECTRO_DARK not set'
log.critical(msg)
raise ValueError(msg)
self.dark_directory = f'{os.getenv("DESI_SPECTRO_DARK")}/'
if not os.path.isdir(self.dark_directory):
msg = "Dark directory {} not found".format(self.dark_directory)
log.critical(msg)
raise IOError(msg)
camera=header["CAMERA"].strip().lower()
if "SPECID" in header :
specid=int(header["SPECID"])
else :
specid=None
dateobs = header2night(header)
cameraid = "sm{}-{}".format(specid,camera[0].lower())
dark_table_file = f'{os.getenv("DESI_SPECTRO_DARK")}/dark_table.csv'
bias_table_file = f'{os.getenv("DESI_SPECTRO_DARK")}/bias_table.csv'
if os.path.exists(dark_table_file) and os.path.exists(bias_table_file):
dark_table = Table.read(dark_table_file)
bias_table = Table.read(bias_table_file)
dark_table_select = np.array([cameraid in fn for fn in dark_table["FILENAME"]])
bias_table_select = np.array([cameraid in fn for fn in bias_table["FILENAME"]])
dark_table=dark_table[dark_table_select]
bias_table=bias_table[bias_table_select]
dark_table.sort('FILENAME')
bias_table.sort('FILENAME')
if len(dark_table) == 0 or len(bias_table) == 0:
log.warning("Didn't find matching calibration darks/biases in $DESI_SPECTRO_DARK using from $DESI_SPECTRO_CALIB instead")
return
dark_dates = np.array([int(f.split('-')[-1].split('.')[0]) for f in dark_table['FILENAME']])
bias_dates = np.array([int(f.split('-')[-1].split('.')[0]) for f in bias_table['FILENAME']])
log.debug(f"Finding matching dark frames in {self.dark_directory} for camera {cameraid} ...")
#loop over all dark filenames
log.debug("DATE-OBS=%d"%dateobs)
found=False
for datebegin in sorted(dark_dates)[::-1]:
if dateobs >= datebegin :
#TODO: extra checks that evaluate if selection from yaml file is matching...
date_used=datebegin
dark_entry=dark_table[dark_dates == date_used]
if len(dark_entry)>0:
dark_entry=dark_entry[0]
else:
log.debug(f"no master dark model found for {datebegin}")
continue
bias_entry=bias_table[bias_dates == date_used]
if len(bias_entry)>0:
bias_entry=bias_entry[0]
else:
log.debug(f"no master bias model found for {datebegin}")
continue
#those check if the already matched ver (from calibfinder) that is stored in self.data is the same as the one from the dark file
if dark_entry["DETECTOR"].strip() != self.data["DETECTOR"].strip() :
log.debug("Skip file %s with DETECTOR=%s != %s"%(dark_entry["FILENAME"],dark_entry["DETECTOR"],self.data["DETECTOR"]))
continue
if "CCDCFG" in self.data :
if dark_entry["CCDCFG"].strip() != self.data["CCDCFG"].strip() :
log.debug("Skip file %s with CCDCFG=%s != %s "%(dark_entry["FILENAME"],dark_entry["CCDCFG"],self.data["CCDCFG"]))
continue
if "CCDTMING" in self.data :
if dark_entry["CCDTMING"].strip() != self.data["CCDTMING"].strip() :
log.debug("Skip file %s with CCDTMING=%s != %s "%(dark_entry["FILENAME"],dark_entry["CCDTMING"],self.data["CCDTMING"]))
continue
if "CCDTEMP" in header and "CCDTEMP" in dark_entry.colnames:
if np.abs(dark_entry["CCDTEMP"] - header["CCDTEMP"])>temperature_tolerance :
log.debug("Skip file %s with CCDTEMP=%s != %s "%(dark_entry["FILENAME"],dark_entry["CCDTEMP"],header["CCDTEMP"]))
continue
else:
log.debug(f'Temperature difference to selected dark is {np.abs(dark_entry["CCDTEMP"] - header["CCDTEMP"]):.5f}')
#same for bias
if bias_entry["DETECTOR"].strip() != self.data["DETECTOR"].strip() :
log.debug("Skip file %s with DETECTOR=%s != %s"%(bias_entry["FILENAME"],bias_entry["DETECTOR"],self.data["DETECTOR"]))
continue
if "CCDCFG" in self.data :
if bias_entry["CCDCFG"].strip() != self.data["CCDCFG"].strip() :
log.debug("Skip file %s with CCDCFG=%s != %s "%(bias_entry["FILENAME"],bias_entry["CCDCFG"],self.data["CCDCFG"]))
continue
if "CCDTMING" in self.data :
if bias_entry["CCDTMING"].strip() != self.data["CCDTMING"].strip() :
log.debug("Skip file %s with CCDTMING=%s != %s "%(bias_entry["FILENAME"],bias_entry["CCDTMING"],self.data["CCDTMING"]))
continue
if "CCDTEMP" in header and "CCDTEMP" in bias_entry.colnames:
if np.abs(bias_entry["CCDTEMP"] - header["CCDTEMP"])>temperature_tolerance :
log.debug("Skip file %s with CCDTEMP=%s != %s "%(bias_entry["FILENAME"],bias_entry["CCDTEMP"],header["CCDTEMP"]))
continue
else:
log.debug(f'Temperature difference to selected bias is {np.abs(bias_entry["CCDTEMP"] - header["CCDTEMP"]):.5f}')
found=True
log.debug(f"Found matching dark frames for camera {cameraid} created on {date_used}")
break
if found:
dark_filename=f"{self.dark_directory}{dark_entry['FILENAME']}"
bias_filename=f"{self.dark_directory}{bias_entry['FILENAME']}"
if not os.path.exists(dark_filename) or not os.path.exists(bias_filename):
log.critical(f"DESI_SPECTRO_DARK has been set, but dark/bias file not found in {self.dark_directory}")
raise IOError(f"DESI_SPECTRO_DARK has been set, but dark/bias file not found in {self.dark_directory}")
else: #this will only be done as long as files do not yet exist
if not self.fallback_on_dark_not_found:
log.critical(f"DESI_SPECTRO_DARK has been set, but dark/bias file tables not found in {self.dark_directory}")
raise IOError(f"DESI_SPECTRO_DARK has been set, but dark/bias file tables not found in {self.dark_directory}")
else:
#this would prevent nightwatch failures in case of problems with e.g. permissions
log.error(f"DESI_SPECTRO_DARK has been set, but dark/bias file tables not found in {self.dark_directory}, "
"falling back to DESI_SPECTRO_CALIB")
return
if found:
self.data.update({"DARK": dark_filename,
"BIAS": bias_filename})
else:
if not self.fallback_on_dark_not_found:
log.critical(f"Didn't find matching {camera} calibration darks in $DESI_SPECTRO_DARK, quitting")
raise IOError(f"Didn't find matching {camera} calibration darks in $DESI_SPECTRO_DARK, quitting")
else:
#this would prevent nightwatch failures in case of not-yet-existing files
log.error(f"Didn't find matching {camera} calibration darks in $DESI_SPECTRO_DARK, "
"falling back to $DESI_SPECTRO_CALIB")