Source code for desispec.tpcorrparam

"""
desispec.tpcorrparam
====================

This module implements a model for how the throughput of a fiber varies.

It has the following components:

1. We find that the fibers don't follow the nighly flat field exactly, and
   apply a fixed mean correction from the nightly flat field.  This is thought
   to reflect the different range of angles entering the fiber from the flat
   field relative to the sky.
2. We find that fibers' throughput depends modestly on the fibers' location
   within their patrol radii.  We apply a cubic polynomial in (x, y) within
   the patrol disk to correct for this.
3. We find additional coherent variations in throughput from exposure to
   exposure.  These are modeled with a PCA and fit exposure-by-exposure.

The TPCorrParam object has mean, spatial, and pca attributes describing
each of these three components.
"""


import os
import numpy as np
from astropy.io import fits
from desiutil.log import get_logger
import desispec.calibfinder


[docs]def cubic(p, x, y): """Returns a cubic polynomial with coefficients p in x & y. Args: p (list[float]): 10 element floating point list with poly coefficients x (ndarray[float]): x coordinates y (ndarray[float]): y coordinates Returns: cubic polynomial in two variables with coefficients p evaluated at x, y """ return (p[0] + p[1]*x + p[2]*y + p[3]*x**2 + p[4]*x*y + p[5]*y**2 + p[6]*x**3 + p[7]*x**2*y + p[8]*x*y**2 + p[9]*y**3)
[docs]def tpcorrspatialmodel(param, xfib, yfib): """Evaluate spatial tpcorr throughput model at xfib, yfib. Evaluates the throughput model at xfib, yfib. param carries the polynomial coefficients of the model as well as the central fiber location. Args: param: ndarray describing fit. Contains at least the following fields: PARAM: cubic polynomial coefficients X: central fiber X location (mm) Y: central fiber Y location (mm) xfib: x location at which model is to be evaluated (mm) yfib: y location at which model is to be evaluated (mm) Returns: throughput model evaluated at xfib, yfib """ return cubic(param['PARAM'].T, (xfib-param['X'])/6, (yfib-param['Y'])/6)
[docs]def tpcorrmodel(tpcorrparam, xfib, yfib, pcacoeff=None): """Evaluates full TPCORR throughput model. The full model consistents of a PCA and a spatial within-patrol-radius component. These two components are evaluated and summed. Args: tpcorrparam: TPCorrParam object describing the model xfib: x coordinates of fibers (mm) yfib: y coordinates of fibers (mm) pcacoeff (optional): PCA coefficients of throughput model If not set, no TPCORR PCA terms are fit. Returns: Throughput model evaluated at xfib, yfib with pca coefficients pcacoeff. """ xfib = xfib.copy() yfib = yfib.copy() m = ~np.isfinite(xfib) | ~np.isfinite(yfib) | (xfib == 0) | (yfib == 0) xfib[m] = tpcorrparam.spatial['X'][m] yfib[m] = tpcorrparam.spatial['Y'][m] res = tpcorrparam.mean + tpcorrspatialmodel( tpcorrparam.spatial, xfib, yfib) - 1 if pcacoeff is not None: for cc, vv in zip(pcacoeff, tpcorrparam.pca): res += cc * vv return res
class TPCorrParam: def __init__(self, mean, spatial, pca): """Create TPCorrParam object. Args: mean: mean throughput difference between flat and sky for each fiber spatial (ndarray): parameters of spatial model describing variation of throughput within patrol radius. Contains the following fields: PARAM: cubic polynomial coefficients X: central X coordinate of fiber (mm) Y: central Y coordinate of fiber (mm) pca (ndarray[float]): principal components of throughput variations between fibers in an exposure. """ self.mean = mean self.spatial = spatial self.pca = pca
[docs]def gather_tpcorr(recs): """Gather TPCORR measurements for exposures. Args: recs (ndarray): ndarray array containing information about exposures from which to gather TPCORR information. Includes at least the following fields: NIGHT (int): night on which exposure was observed EXPID (int): exposure id Returns: ndarray with the following fields for each exposure X: x coordinate of fiber (mm) Y: y coordinate of fiber (mm) TPCORR: tpcorr of fiber CAMERA: camera for exposure NIGHT: night for exposure EXPID: expid of exposure """ out = np.zeros(3*len(recs), dtype=[ ('X', '5000f4'), ('Y', '5000f4'), ('TPCORR', '5000f4'), ('CAMERA', 'U1'), ('NIGHT', 'i4'), ('EXPID', 'i4')]) count = -1 for rec in recs: for camera in 'brz': count += 1 out['CAMERA'][count] = camera out['NIGHT'][count] = rec['NIGHT'] out['EXPID'][count] = rec['EXPID'] for petal in range(10): address = (rec['NIGHT'], rec['EXPID'], f'{camera}{petal}') try: sky = desispec.io.read_sky(address) frame = desispec.io.read_frame(address) except Exception as e: print(e) continue fiber = frame.fibermap['FIBER'] out['TPCORR'][count, fiber] = sky.throughput_corrections out['X'][count, fiber] = frame.fibermap['FIBER_X'] out['Y'][count, fiber] = frame.fibermap['FIBER_Y'] return out
[docs]def gather_dark_tpcorr(specprod=None): """Gathers TPCORR for dark exposures expected to be well behaved. This wraps gather_tpcorr, making a sensible selection of dark exposures from the given specprod. Args: specprod (str): specprod to use Returns: ndarray from gather_tpcorr with tpcorr information from selected exposures. """ if specprod is None: specprod = os.environ.get('SPECPROD', 'daily') expfn = os.path.join(os.environ['DESI_SPECTRO_REDUX'], specprod, 'exposures-{specprod}.fits') exps = fits.getdata(expfn) m = ((exps['EXPTIME'] > 300) & (exps['SKY_MAG_R_SPEC'] > 20.5) & (exps['SKY_MAG_R_SPEC'] < 30) & (exps['FAPRGRM'] == 'dark')) if specprod == 'daily': exps = exps[exps['NIGHT'] >= 20210901] return gather_tpcorr(exps[m])
[docs]def pca_tpcorr(tpcorr): """Run a PCA on TPCORR measurements. This uses the tpcorr gathered by gather_tpcorr and runs a PCA on them. Args: tpcorr (ndarray): result of gather_tpcorr Returns: dict: A dictionary containing: * dict[filter] of (tpcorrmed, pc_info, exp_info), describing the results of the fit. * tpcorrmed: the median TPCORR of the whole sample; i.e., how different the nightly flat usually is from the sky. * pc_info: ndarray with: - pca: the principal components - amplitude: their corresponding singular values * exp_info: ndarray with: - expid: exposure id - coeff: pc component coefficients for this exposure """ from astropy.stats import mad_std out = dict() for f in 'brz': # z5 is ugly in 1st PC; what would we want to do here? # r7, 3994 in mean, 1st PC # 4891 is a mystery and shows very large variability m = ((tpcorr['CAMERA'] == f) & (np.sum(tpcorr['TPCORR'] == 0, axis=1) < 100)) tpcorr0 = tpcorr['TPCORR'][m] tpcorr0[tpcorr0 == 0] = 1 rms = mad_std(tpcorr0, axis=1) tpcorr0[rms > 0.025, :] = 1 rms = mad_std(tpcorr0, axis=0) tpcorr0[:, rms > 0.1] = 1 # zero 4891 if f == 'r': # zero bad CTE region on r4 tpcorr0[:, 2250:2272] = 1 tpcorr0 = np.clip(tpcorr0, 0.9, 1/0.9) tpcorrmed = np.median(tpcorr0, axis=0) tpcorr0 -= tpcorrmed[None, :] uu, ss, vv = np.linalg.svd(tpcorr0) eid = tpcorr['EXPID'][m] resvec = np.zeros(vv.shape[0], dtype=[ ('pca', '%df4' % vv.shape[1]), ('amplitude', 'f4')]) resvec['pca'] = vv if len(ss) < len(resvec): ss = np.concatenate([ss, np.zeros(len(resvec)-len(ss))]) resvec['amplitude'] = ss resexp = np.zeros(uu.shape[0], dtype=[ ('expid', 'i4'), ('coeff', '%df4' % uu.shape[1])]) resexp['expid'] = eid resexp['coeff'] = uu out[f] = tpcorrmed, resvec, resexp return out
[docs]def fit_tpcorr_per_fiber(tpcorr): """Fit the spatial throughput variations for each fiber. Args: tpcorr (ndarray): output from gather_tpcorr with tpcorr measurements and related metadata for each fiber Returns: dict[camera] of ndarray including the following fields: X (float): central X position of each fiber used in fit (mm) Y (float): central Y position of each fiber used in fit (mm) FIBER (int): fiber number PARAM (ndarray[float], length 10): 2D cubic polynomial coefficients """ from scipy.optimize import least_squares guess = np.zeros(10, dtype='f4') out = dict() log = get_logger() for camera in 'brz': res = np.zeros(5000, dtype=[ ('FIBER', 'f4'), ('X', 'f4'), ('Y', 'f4'), ('PARAM', '10f4')]) m = (tpcorr['CAMERA'] == camera) log.info('Starting spatial tpcorr analysis for %s camera' % camera) tpcorr0 = tpcorr[m] for i in range(5000): if (i % 1000) == 0: log.info('Fiber %d of 5000' % i) res['FIBER'][i] = i xx = tpcorr0['X'][:, i] yy = tpcorr0['Y'][:, i] mok = np.isfinite(xx) & np.isfinite(yy) & (xx != 0) & (yy != 0) if np.sum(mok) == 0: res['X'][i] = 0 res['Y'][i] = 0 res['PARAM'][i, :] = 0 res['PARAM'][i, 0] = 1 log.info( 'Dummy entry for fiber %d with no good measurements.' % i) continue res['X'][i] = np.median(xx[mok]) res['Y'][i] = np.median(yy[mok]) xx = (xx - res['X'][i]) / 6 # 6 mm fiber patrol radius yy = (yy - res['Y'][i]) / 6 bb = tpcorr0['TPCORR'][:, i] m = (bb != 0) & (bb > 0.5) & (bb < 2) & mok # a few fibers (e.g., 1680) have some weird historical TPCORR # data. Don't use those horrible points. # a large chunk of fibers on r4 are bad (250 - 272). # This is the bad CTE region. We probably don't want to use # these in the mean adjustment. See if these have IVAR=0 or # something? xx = xx[m] yy = yy[m] bb = bb[m] def model(param): return cubic(param, xx, yy) def chi(param): return (bb - model(param)) fit = least_squares(chi, guess, loss='soft_l1') res['PARAM'][i, :] = fit.x # do something sane for stationary fibers cov = np.cov(xx*6, yy*6) det = np.linalg.det(cov) # normal positioner has a determinant of ~60 # this flags stuck ones, or ones that have been stuck for # a very long time. # covariance does a bit better than stdev for occasional # positioners that have been stuck in a few different positions if det < 10: res['PARAM'][i, 0] = np.median(bb) res['PARAM'][i, 1:] = 0 out[camera] = res return out
# note! one worry: in the current PR, the meaning of TPCORR will change # moving forward, and we won't have any of the current throughput measurements # for computing these coefficients in the future. We'd have to develop # an alternative scheme for computing these coefficients or would have # to change this PR so that the new and old TPCORR have similar meanings. # That option would mean passing calculate_throughput_corrections some # goofy sky model that doesn't include the new model TPCORR from this PR. # Or we add another model extension with the new TPCORR from this PR, # and then this routine would require the product of the model TPCORR and # the measured TPCORR.
[docs]def make_tpcorrparam_files(tpcorr=None, spatial=None, dirname=None): """Gather, fit, and write tpcorrparam files to output directory. This function is intended for producing tpcorrparam files to DESI_SPECTRO_CALIB. It can gather and fit tpcorr measurements and write the results. Args: tpcorr (optional): result of gather_tpcorr. If None, this is populated with the result of gather_dark_tpcorr. spatial (optional): spatial fit results. If None, this is populated with fit_tpcorr_per_fiber(tpcorr). dirname (optional): directory to which to write files. Defaults to DESI_SPECTRO_CALIB. """ if dirname is None: dirname = os.environ['DESI_SPECTRO_CALIB'] if tpcorr is None: tpcorr = gather_dark_tpcorr() if spatial is None: spatial = fit_tpcorr_per_fiber(tpcorr) tpcorrspatmod = np.zeros_like(tpcorr['TPCORR']) for f in 'brz': m = tpcorr['CAMERA'] == f tpcorrspatmod[m] = tpcorrspatialmodel( spatial[f], tpcorr['X'][m], tpcorr['Y'][m]) m = ((tpcorr['X'] == 0) | (tpcorr['Y'] == 0) | ~np.isfinite(tpcorr['X']) | ~np.isfinite(tpcorr['Y']) | ~np.isfinite(tpcorr['TPCORR'])) tpcorrresid = tpcorr.copy() tpcorrresid['TPCORR'] -= tpcorrspatmod tpcorrresid['TPCORR'] += 1 tpcorrresid['TPCORR'][m] = 0 # invalid value pca = pca_tpcorr(tpcorrresid) for f in 'brz': constant = spatial[f]['PARAM'][:, 0].copy() spatial[f]['PARAM'][:, 0] = 1 for i in range(10): sm = desispec.calibfinder.sp2sm(i) sm = 'sm%d' % sm fn = os.path.join(dirname, 'spec', sm, f'tpcorrparam-{sm}-{f}{i}.fits') os.makedirs(os.path.dirname(fn), exist_ok=True) fits.writeto(fn, spatial[f][i*500:(i+1)*500], fits.Header(dict(extname='SPATIAL'))) fits.append(fn, constant[i*500:(i+1)*500], fits.Header(dict(extname='MEAN'))) fits.append(fn, pca[f][1]['pca'][:2, i*500:(i+1)*500], fits.Header(dict(extname='PCA')))