Source code for desispec.quicklook.qas

"""
desispec.quicklook.qas
======================

"""
from desispec.quicklook import qllogger
from desispec.quicklook import qlexceptions
import collections
import numpy as np
from enum import Enum
from astropy.io import fits


[docs]class QASeverity(Enum): ALARM=30 WARNING=20 NORMAL=0
[docs]class MonitoringAlg: """ Simple base class for monitoring algorithms """ def __init__(self,name,inptype,config,logger=None): if logger is None: self.m_log=qllogger.QLLogger().getlog(name) else: self.m_log=logger self.__inpType__=type(inptype) self.name=name self.config=config self.__deviation = None self.m_log.debug("initializing Monitoring alg {}".format(name)) def __call__(self,*args,**kwargs): res=self.run(*args,**kwargs) cargs=self.config['kwargs'] params=cargs['param'] metrics=res["METRICS"] if 'METRICS' in res else None if metrics is None: metrics={} res["METRICS"]=metrics reskey="RESULT" QARESULTKEY="QA_STATUS" if res['FLAVOR'] == 'science': REFNAME = cargs["RESULTKEY"]+'_'+format(res['PROGRAM']).upper()+'_REF' # SE: get the REF name from cargs else: REFNAME = cargs["RESULTKEY"]+'_REF' NORM_range = cargs["RESULTKEY"]+'_NORMAL_RANGE' WARN_range = cargs["RESULTKEY"]+'_WARN_RANGE' norm_range_val = [0,0] warn_range_val = [0,0] if "QASTATUSKEY" in cargs: QARESULTKEY=cargs["QASTATUSKEY"] if "RESULTKEY" in cargs: reskey=cargs["RESULTKEY"] if cargs["RESULTKEY"] == 'CHECKHDUS': stats=[] stats.append(metrics['CHECKHDUS_STATUS']) stats.append(metrics['EXPNUM_STATUS']) if np.isin(stats,'NORMAL').all(): metrics[QARESULTKEY]='NORMAL' elif np.isin(stats,'ALARM').any(): metrics[QARESULTKEY] = 'ALARM' self.m_log.info("{}: {}".format(QARESULTKEY,metrics[QARESULTKEY])) if reskey in metrics: current = metrics[reskey] #SE: Replacing this chunk (between the dashed lines) with an alternative that accomodates receiving the REF keys from the configuration ----------------------------------------------------------------------------------------------------------------- #if "REFERENCE" in cargs: #refval=cargs["REFERENCE"] ## print(refval,"MA inside if") #else: #- For absolute value checks #self.m_log.warning("No reference given. STATUS will be assigned for the Absolute Value. Confirm your ranges.") ##- check the data type #if isinstance(current,float) or isinstance(current,np.float32) or isinstance(current,int): #refval=0 #else: #refval=np.zeros(len(current)) #- 1D list or array ##- Update PARAMS ref key #res["PARAMS"][reskey+'_REF']=refval #currlist=isinstance(current,(np.ndarray,collections.Sequence)) #reflist=isinstance(refval,(np.ndarray,collections.Sequence)) #if currlist != reflist: # different types #self.m_log.critical("QL {} : REFERENCE({}) and RESULT({}) are of different types!".format(self.name,type(refval),type(current))) #elif currlist: #both are lists #if len(refval)==len(current): #self.__deviation=[c-r for c,r in zip(current,refval)] #else: #self.m_log.critical("QL {} : REFERENCE({}) and RESULT({}) are of different length!".format(self.name,len(refval),len(current))) #else: # both are scalars #self.__deviation=sorted(current)-sorted(refval) ## check RANGES given in config and set QA_STATUS keyword ## it should be a sorted overlapping list of range tuples in the form [ ((interval),QASeverity),((-1.0,1.0),QASeverity.NORMAL),(-2.0,2.0),QAStatus.WARNING)] ## for multiple results, thresholds should be a list of lists as given above (one range list per result) ## intervals should be non overlapping. ## lower bound is inclusive upper bound is exclusive ## first matching interval will be used ## if no interval contains the deviation, it will be set to QASeverity.ALARM ## if RANGES or REFERENCE are not given in config, QA_STATUS will be set to UNKNOWN #def findThr(d,t): #val=QASeverity.ALARM #for l in list(t): #if d>=l[0][0] and d<l[0][1]: #val=l[1] #return val #metrics[QARESULTKEY]='NORMAL' #if self.__deviation is not None and "RANGES" in cargs: #self.m_log.info("QL Reference checking for QA {}".format(self.name)) #thr=cargs["RANGES"] #print(thr) #metrics[QARESULTKEY]="ERROR" #thrlist=isinstance(thr[0][0][0],(np.ndarray,collections.Sequence)) #multiple threshols for multiple results #devlist=isinstance(self.__deviation,(np.ndarray,collections.Sequence)) ##if devlist!=thrlist and len(thr)!=1: #different types and thresholds are a list ## self.m_log.critical("QL {} : dimension of RANGES({}) and RESULTS({}) are incompatible! Check configuration RANGES={}, RESULTS={}".format(self.name,len(thr),len(self.__deviation), thr,current)) ## return res ##else: #they are of the same type #if devlist: # if results are a list #if len(thr)==2: # check all results against same thresholds ##- maximum deviation #kk=np.argmax(np.abs(self.__deviation).flatten()) #- flatten for > 1D array #metrics[QARESULTKEY]=findThr(np.array(self.__deviation).flatten()[kk],thr) ##metrics[QARESULTKEY]=[findThr(d,thr) for d in self.__deviation] ##else: # each result has its own thresholds ## metrics[QARESULTKEY]=[str(findThr(d,t)) for d,t in zip(self.__deviation,thr)] #else: #result is a scalar #metrics[QARESULTKEY]=findThr(self.__deviation,thr) #if metrics[QARESULTKEY]==QASeverity.NORMAL: #metrics[QARESULTKEY]='NORMAL' #elif metrics[QARESULTKEY]==QASeverity.WARNING: #metrics[QARESULTKEY]='WARNING' #else: #metrics[QARESULTKEY]='ALARM' #else: #self.m_log.warning("No Reference checking for QA {}".format(self.name)) #self.m_log.info("{}: {}".format(QARESULTKEY,metrics[QARESULTKEY])) #return res #def run(self,*argv,**kwargs): #pass #def is_compatible(self,Type): #return isinstance(Type,self.__inpType__) #def check_reference(): #return self.__deviation #def get_default_config(self): #""" return a dictionary of 3-tuples, #field 0 is the name of the parameter #field 1 is the default value of the parameter #field 2 is the comment for human readable format. #Field 2 can be used for QLF to dynamically setup the display""" #return None #---------------------------------------------------------------------------------------------------------- if REFNAME in params: #SE: get the REF value/ranges from params refval=params[REFNAME] if len(refval) ==1: refval = refval[0] refval = np.asarray(refval) current = np.asarray(current) norm_range_val=params[NORM_range] warn_range_val=params[WARN_range] #SE: just in case any nan value sneaks in the array of the scalar metrics ind = np.argwhere(np.isnan(current)) if (ind.shape[0] > 0 and refval.shape[0] == current.shape[0]): self.m_log.critical("QL {} : elements({}) of the result are returned as NaN! STATUS is determined for the real values".format(self.name,str(ind))) ind = list(np.hstack(ind)) for index in sorted(ind, reverse=True): del current[index] del refval[index] else: self.m_log.warning("No reference given. Update the configuration file to include reference value for QA: {}".format(self.name)) currlist=isinstance(current,(np.ndarray,collections.Sequence)) reflist=isinstance(refval,(np.ndarray,collections.Sequence)) if currlist != reflist: self.m_log.critical("QL {} : REFERENCE({}) and RESULT({}) are of different types!".format(self.name,type(refval),type(current))) elif currlist: if refval.size == current.size and current.size >1: self.__deviation=[c-r for c,r in zip(np.sort(current),np.sort(refval))] elif refval.size == current.size and current.size and current.size == 1: self.__deviation = current - refval elif np.size(current) == 0 or np.size(refval) == 0: self.m_log.warning("No measurement is done or no reference is available for this QA!- check the configuration file for references!") metrics[QARESULTKEY]='UNKNOWN' self.m_log.info("{}: {}".format(QARESULTKEY,metrics[QARESULTKEY])) elif refval.size != current.size: self.m_log.critical("QL {} : REFERENCE({}) and RESULT({}) are of different length!".format(self.name,refval.size,current.size)) metrics[QARESULTKEY]='UNKNOWN' self.m_log.info("{}: {}".format(QARESULTKEY,metrics[QARESULTKEY])) else: #SE "sorting" eliminate the chance of randomly shuffling items in the list that we observed in the past self.__deviation=(np.sort(current)-np.sort(refval))/np.sort(current) def findThr(d,t): if d != None and len(list(t)) >1: val=QASeverity.ALARM for l in list(t): if d>=l[0][0] and d<l[0][1]: val=l[1] else: if d>=l and d<l: val=l return val devlist = self.__deviation thr = norm_range_val wthr = warn_range_val if devlist is None: pass #SE: temporarily here until we know OBJLIST is ['SCIENCE', 'STD'] or anything else----------- line below should only be "elif len(thr)==2 and len(wthr)==2:" # RS: if one fit fails SNR but the rest pass, return normal elif (cargs["RESULTKEY"] == 'FIDSNR_TGT'): devlist = current stats = [] nofit = np.where(devlist==0.0)[0] if len(nofit) >= 2: stats.append('ALARM') else: for i,val in enumerate(devlist): if len(nofit) != 0 and i == nofit[0]: stats.append('NORMAL') else: diff = refval[i] - val if thr[0]<= diff <= thr[1]: stats.append('NORMAL') elif wthr[0] <= diff <= wthr[1]: stats.append('WARNING') else: stats.append('ALARM') if np.isin(stats,'NORMAL').all(): metrics[QARESULTKEY]='NORMAL' elif np.isin(stats,'WARNING').any() and np.isin(stats,'ALARM').any(): metrics[QARESULTKEY] = 'ALARM' elif np.isin(stats,'ALARM').any(): metrics[QARESULTKEY] = 'ALARM' elif np.isin(stats,'WARNING').any(): metrics[QARESULTKEY] = 'WARNING' self.m_log.info("{}: {}".format(QARESULTKEY,metrics[QARESULTKEY])) elif (len(thr)==2 and len(wthr)==2): if np.size(devlist)== 1: d=[] d.append(devlist) devlist = d stats = [] for val in devlist: if thr[0] <= val <= thr[1]: stats.append('NORMAL') elif wthr[0] <= val <= wthr[1]: stats.append('WARNING') else: stats.append('ALARM') if np.isin(stats,'NORMAL').all(): metrics[QARESULTKEY]='NORMAL' elif np.isin(stats,'WARNING').any() and np.isin(stats,'ALARM').any(): metrics[QARESULTKEY] = 'ALARM' elif np.isin(stats,'ALARM').any(): metrics[QARESULTKEY] = 'ALARM' elif np.isin(stats,'WARNING').any(): metrics[QARESULTKEY] = 'WARNING' self.m_log.info("{}: {}".format(QARESULTKEY,metrics[QARESULTKEY])) return res def run(self,*argv,**kwargs): pass def is_compatible(self,Type): return isinstance(Type,self.__inpType__) def check_reference(): return self.__deviation def get_default_config(self): return None