Source code for desispec.correct_cte

"""
desispec.correct_cte
====================

Methods to fit CTE effects and remove them from images.
"""

import os
import copy
from functools import partial

import numpy as np
from scipy.optimize import least_squares
from scipy.ndimage import median_filter
from astropy.stats import sigma_clipped_stats
from astropy.table import Table
import fitsio

from desispec.io import findfile
from desispec.util import difference_nights
from desiutil.log import get_logger

import desispec.preproc
from desispec.calibfinder import CalibFinder
from desispec.trace_shifts import compute_dx_from_cross_dispersion_profiles
from desispec.qproc import qfiberflat, qsky, rowbyrowextract
from desispec.image_model import compute_image_model
from desispec.workflow.tableio import load_table

[docs]def apply_multiple_cte_effects(amp, locations, ctefuns): """Apply a series of traps to an amp. This function assumes that it is only being given a single amp that has some traps on it. The readout direction is always assumed to be toward smaller numbers on the second axis. Parameters ---------- amp : np.ndarray image of amplifier locations : np.ndarray [int] locations of traps ctefuns : callables functions that can be called to apply CTE to an amp Returns ------- amplifier after applying traps """ # amp is affected by a series of traps. # apply them all. # I don't think we want to have more than one, though! # assume that the read out is such that we don't have to # reverse the amplifier to apply in the right direction; # that's been done for us, as has the corresponding # reversing of the locations. locations = np.array(np.atleast_1d(locations)) s = np.argsort(-locations) amp_out = amp.copy() # sequentially step down the serial register, # trap-by-trap, applying the appropriate CTE # function to all of the pixels affected by that trap. # in the picture in my head, the serial register is read # out in the downward direction. All of the pixels # above the trap are affected. So we start with the # highest trap and walk down. Those later traps # see the CTE-affected image from the earlier traps. # for the one trap case, which is the only one we actually # have right now, this just applies the trap function to the # portion of the amplifier above the trap. for ii in s: affected = np.s_[:, locations[ii]:] amp_out[affected] = ctefuns[ii](amp_out[affected]) return amp_out
[docs]def correct_amp(image, ctefun, niter=1, apply_only=False): """Correct an amp for CTE. In the limit that CTE is perturbative, one can correct for CTE as follows: for i in range(niter): I' = CTE(I) I = I - (I' - I) This function implements that approach. However, for large traps, this approach correlates the noise and leads to poor performance; we do not use it for DESI. Parameters ---------- image : np.ndarray CTE-affected amplifier ctefun : callable function that applies CTE to an amplifier niter : int number of iterations to run apply_only : bool Do not correct, just apply the CTE function to the image. Returns ------- CTE-corrected image """ # more than one iteration makes the noise properties worse, but # does make things more self-consistent. corrected_image = image.copy() log = get_logger() for i in range(niter): cte_image = ctefun(corrected_image) if apply_only: return cte_image correction = cte_image - corrected_image mn, med, rms = sigma_clipped_stats(correction) log.info(f'Correcting CTE, iteration {i}, correction rms {rms:6.3f}') corrected_image = image - correction return corrected_image
[docs]def add_cte(img, cte_transfer_func=None, **cteparam): """Add CTE to a region affected by CTE. Assumes all of img is affected. Parameters ---------- img : np.ndarray input image cte_transfer_func : callable function describing how many electrons trap soaks up and emits **cteparam : dictionary additional arguments to cte_transfer_func Returns ------- np.ndarray img with additional CTE applied """ if cte_transfer_func is None: cte_transfer_func = simplified_regnault out = img.copy() in_trap = img[:, 0] * 0.0 for i in range(img.shape[1]): transfer_amount = cte_transfer_func( np.clip(img[:, i], 0, np.inf), in_trap, **cteparam) in_trap -= transfer_amount out[:, i] += transfer_amount return out
[docs]def needs_ctecorr(header=None, cfinder=None): """ Return True/False for whether this CCD needs CTE corrections Args: header: dict-like header from raw data HDU for this CCD cfinder: CalibFinder object for this CCD Returns True/False Must provide header or cfinder but not both; providing a pre-generated cfinder avoids re-reading the $DESI_SPECTRO_CALIB yaml file. """ if cfinder is not None and header is not None: raise ValueError('provide header or cfinder but not both') if cfinder is None: cfinder = CalibFinder([header]) else: header = cfinder.header docte = False for amp in desispec.preproc.get_amp_ids(header): key = "CTECOLS"+amp.upper() if cfinder.haskey(key): docte = True break return docte
[docs]def get_amp_regions_to_cte_correct(header): """ Get CTE corrections parameters for amps on this image Parameters ---------- header (dict-like): image header Returns ------- amp_regions, cte_regions amp_regions : dict dictionary of slices defining each amp cte_regions : dict The keys are the names of the amplifier, matching amp_regions. Each entry is a list containing one entry for each trap on that amp. Each item in the list is dictionary with the start and stop locations, Uses `header` and CalibFinder to identify any regions to correct Returns pair of empty dictionaries if there are no amps needing CTE corrections. """ log = get_logger() cfinder = CalibFinder([header]) amps = desispec.preproc.get_amp_ids(header) night = desispec.preproc.header2night(header) camera = header['CAMERA'].lower() amp_regions = dict() cte_regions = dict() for amp in amps: key = "CTECOLS"+amp if not cfinder.haskey(key) : # that's ok, we don't expect this keyword for each camera and amplifier luckily log.debug(f"No {key} for {camera} on {night}") continue value = cfinder.value(key) amp_sec = desispec.preproc.parse_sec_keyword(header["CCDSEC"+amp]) yb = amp_sec[0].start ye = amp_sec[0].stop cte_regions_in_amp = list() for ctecols in value.split(","): if len(ctecols)==0 : continue vals = ctecols.split(":") nvals = len(vals) if nvals != 2 : mess = "cannot decode {}={}".format(key, value) log.critical(mess) raise KeyError(mess) start, stop = int(vals[0]), int(vals[1]) xb = max(amp_sec[1].start, start) xe = min(amp_sec[1].stop, stop) sector = [yb, ye, xb, xe] cteparam={"ctecols":ctecols, "start":start, "stop":stop} cte_regions_in_amp.append(cteparam) # only add if we have a model for it if len(cte_regions_in_amp)>0 : amp_regions[amp] = amp_sec cte_regions[amp] = cte_regions_in_amp return amp_regions, cte_regions
[docs]def get_cte_params(header, cte_params_filename=None): """ Get CTE corrections parameters for amps on this image Parameters ---------- header (dict-like): image header Optional -------- cte_params_filename (str): Alternate filename with nightly CTE parameters instead of default findfile('ctecorrnight', night) Returns ------- amp_regions, cte_regions amp_regions : dict dictionary of slices defining each amp cte_regions : dict The keys are the names of the amplifier, matching amp_regions. Each entry is a list containing one entry for each trap on that amp. Each item in the list is dictionary with the start and stop locations, and CTE correction parameters. Uses `get_amp_regions_to_cte_correct` (which uses CalibFinder and header) to identify any regions to correct, then reads parameters for those regions from `cte_params_filename` or default ctecorrnight file. Returns pair of empty dictionaries if there are no amps needing CTE corrections. """ import desispec.io # inside func to avoid circular import log = get_logger() amp_regions, cte_regions = get_amp_regions_to_cte_correct(header) if len(cte_regions) == 0: return amp_regions, cte_regions night = desispec.preproc.header2night(header) camera = header['CAMERA'].lower() if cte_params_filename is None : cte_params_filename = desispec.io.findfile('ctecorrnight', night=night, readonly=True) if not os.path.isfile(cte_params_filename): msg = f'Missing {cte_params_filename}; Generate it with desi_fit_cte_night -n {night}, or run preproc with --no-cte-correction or specify an alternate file with --cte-params FILENAME' log.critical(msg) raise RuntimeError(msg) # CTE table has columns NIGHT CAMERA AMPLIFIER SECTOR to identify regions # and columns FUNC AMPLITUDE FRACLEAK with CTE parameters ctecorrnight_table = Table.read(cte_params_filename) ## For each row of the input table, check if the row data was derived from ## a night within 2 weeks of the current night, otherwise it isn't valid valid_night = np.array([np.abs(difference_nights(rownight, night)) < 14 for rownight in ctecorrnight_table["NIGHT"]]) ## Check for rows that match the camera we want valid_camera = (ctecorrnight_table["CAMERA"] == camera) #- augment cte_regions with CTE correction parameters for amp in cte_regions: ## check for rows that are from a valid not and on the desired camera ## and amplifier selection = valid_night & valid_camera & (ctecorrnight_table["AMPLIFIER"] == amp) if np.sum(selection)==0 : # we do expect a set of CTE parameter for the amplifier because we know the effect is there and # we asked for the parameters, this is an error mess = f"No CTE correction in {cte_params_filename} for night {night} camera {camera} amplifier {amp}" log.critical(mess) raise RuntimeError(mess) for i in range(len(cte_regions[amp])): ctecols = cte_regions[amp][i]['ctecols'] selection2 = selection & (ctecorrnight_table["SECTOR"] == ctecols) if np.sum(selection2)==0 : mess = f"No CTE correction in {cte_params_filename} for night {night} camera {camera} amplifier {amp} sector {ctecols}" log.critical(mess) raise RuntimeError(mess) #- Having identified which row we want, add params to cte_regions entry=np.where(selection2)[0][0] for k in ["FUNC","AMPLITUDE","FRACLEAK"] : cte_regions[amp][i][k] = ctecorrnight_table[k][entry] cteparam = cte_regions[amp][i] log.info(f"CTE correction in amplifier {amp}, sector {ctecols}, {cteparam}") return amp_regions, cte_regions
[docs]def simplified_regnault(pixval, in_trap, amplitude, fracleak): """CTE transfer function of Regnault+. The model is transfer = in_trap * fracleak - pixval * (1 - in_trap / amplitude) with slight elaborations to prevent more electrons than exist from entering or leaving the trap. Parameters ---------- pixval : float value of uncorrupted image in_trap : float number of electrons presently in trap amplitude : float amplitude of trap fracleak : float fraction of electrons that leak out of trap each transfer Returns ------- int number of electrons that leak out of the trap into the pixel """ maxin = np.minimum(pixval, amplitude - in_trap) # leak out of trap transfer_amount = np.clip(in_trap * fracleak, 0, in_trap) # leak into trap transfer_amount -= np.clip(pixval * (1 - in_trap / amplitude), 0, maxin) return transfer_amount
# when full, always leaking ~amplitude * fracleak # and stealing back up to this amount.
[docs]def chi_simplified_regnault(param, cleantraces=None, ctetraces=None, uncertainties=None): """Chi loss function for a Regnault transfer function.""" models = [add_cte(trace, amplitude=param[0], fracleak=param[1]) for trace in cleantraces] res = np.array([ (m - c)/u for (m, c, u) in zip(models, ctetraces, uncertainties)]) return res.reshape(-1)
[docs]def get_transfer_function(function_name) : """Returns a CTE correction function from its name. This maps the FUNC name in $DESI_SPECTRO_CALIB yaml files to the Python function that should be called. Only 'simplified_regnault' so far. """ if function_name == "simplified_regnault" : return simplified_regnault else : raise KeyError(f"No transfer function called '{function_name}'")
[docs]def fit_cte(images): """Fits CTE models to a list of images. This fits the parameters of the Regnault transfer function model to an image with a trap. It works by comparing an unaffected amplifier to an amplifier with a trap along the boundary between the two amplifiers, solving for the trap parameters that make the unaffected image + the CTE effect a good match to the CTE-affected image. It will not work if both amplifiers have CTE, or if there are multiple CTE effects to solve for. It assumes that amp A is across from amp C and amp B is across from amp D. It would likely fail badly for a two-amp mode image. Parameters ---------- images : list A list of images. Usually these are all flats with different exposure lengths on the same device. Returns ------- astropy.Table with columns "NIGHT","CAMERA","AMPLIFIER","SECTOR","FUNC", "AMPLITUDE","FRACLEAK","CHI2PDF" "NIGHT","CAMERA","AMPLIFIER" are properties of the image "SECTOR" is a string of the form 'BEGIN:END' defining a range of CCD cols. "FUNC" is a transfer function, only 'simplified_regnault' implemented for now. "AMPLITUDE","FRACLEAK" are parameters of the transfer function "CHI2PDF" is the reduced chi2 of the fit """ # take a bunch of preproc images on one camera # these should all be flats and the same device # compare areas above and below the amp boundary # only works if only one of the two amps has a CTE effect! # if both have CTE effects then we need another approach. # assume that A <-> C and B <-> D are paired for these purposes. # take an uncontaminated line from one of the pair. Apply the CTE # model with some parameters. Get the contaminated line. # compare with the actual contaminated line. Minimize. # we need to extract only the relevant region and have # some robustness / noise tracking. log = get_logger() log.debug("begin fit_cte") keys = ["NIGHT","CAMERA","AMPLIFIER","SECTOR","FUNC","AMPLITUDE","FRACLEAK","CHI2PDF"] dtypes = [int,str,str,str,str,float,float,float] if images is None or len(images)==0: # nothing to do empty_table = Table(names=keys, dtype=dtypes) return empty_table assert len(images) > 0 night = desispec.preproc.header2night(images[0].meta) camera = images[0].meta['CAMERA'] obstype = images[0].meta['OBSTYPE'] if obstype != 'FLAT': log.warning('Really should not use this function with a non-flat?!') # only use with flats; otherwise the matching above and below the # boundary is particularly fraught. for image in images: assert image.meta['CAMERA'] == camera assert image.meta['OBSTYPE'] == obstype matching_amps = { 'A': 'C', 'C': 'A', 'B': 'D', 'D': 'B', } amp, cte = get_amp_regions_to_cte_correct(images[0].meta) res = dict() for k in keys : res[k]=list() for ampname in amp: tcte = cte[ampname] if len(tcte) == 0: continue if len(tcte) > 1: raise ValueError('Two CTE effect fitting not yet implemented.') if matching_amps[ampname] in cte and len(cte[matching_amps[ampname]]) != 0: raise ValueError('CTE effect on amp and its mirror not yet ' 'implemented.') tcte = tcte[0] # okay! we should be able to do the fit. # we need to select out the region near the amp boundary. ampreg = amp[ampname] on_bottom = ampreg[0].start == 0 if on_bottom: ampbd = ampreg[0].stop else: ampbd = ampreg[0].start npix = 11 need_to_reverse = ampreg[1].stop == image.pix.shape[1] start, stop = tcte['start'], tcte['stop'] step = 1 if need_to_reverse: step = -1 start, stop = stop, start scte = np.s_[ampbd:ampbd+npix, start:stop:step] sclean = np.s_[ampbd-npix:ampbd, start:stop:step] if on_bottom: scte, sclean = sclean, scte cleantraces = [np.median(im.pix[sclean], axis=0, keepdims=True) for im in images] ctetraces = [np.median(im.pix[scte], axis=0, keepdims=True) for im in images] # variance in median of a normal distribution is sigma^2 * pi / 2 / n # this is more careful than it makes sense to be here, but I # wanted to look it up again and figured I might as well. fac = np.sqrt(np.pi / 2 / npix) uncertainties = [ fac * np.sqrt(np.median( im.ivar[sclean]**-1 + im.ivar[scte]**-1, axis=0, keepdims=True)) for im in images] chi = partial(chi_simplified_regnault, cleantraces=cleantraces, ctetraces=ctetraces, uncertainties=uncertainties) startguesses = [1, 20, 50, 100] chiguesses = np.array([chi([g, 0.2]) for g in startguesses]) bestguess = np.argmin(np.sum(chiguesses**2, axis=1)) par = least_squares(chi, [startguesses[bestguess], 0.2], diff_step=[0.2, 0.01], loss='huber') amplitude = par.x[0] fracleak = par.x[1] offcols = f'{tcte["start"]}:{tcte["stop"]}' chi2dof = par.cost / len(par.fun) log.info(f'CTE fit {night} {camera} {amplitude=:.3f} {fracleak=:.3f} chi^2/dof={chi2dof:5.2f}') res["NIGHT"].append(night) res["CAMERA"].append(camera) res["AMPLIFIER"].append(ampname) res["SECTOR"].append(offcols) res["FUNC"].append("simplified_regnault") res["AMPLITUDE"].append(amplitude) res["FRACLEAK"].append(fracleak) res["CHI2PDF"].append(chi2dof) table = Table() for k, dt in zip(keys, dtypes): table[k] = np.array(res[k], dtype=dt) return table
[docs]def get_cte_images(night, camera, expids=None): """Get the images needed for a CTE fit for a particular night. This function looks up the appropriate exposure tables to find the CTE-detection image and the previous image, which is usually a normal flat field image. Parameters ---------- night : int the night YEARMMDD integer camera : str the camera, e.g., z1 Options ------- expids : array-like list of exposure IDs to use; cte first, 120s flat second if None, determine from exposure table Returns ------- List of preprocessed Images without CTE correction applied """ # inside func to avoid circular import import desispec.io from desispec.io.util import decode_camword, erow_to_goodcamword, get_tempfilename log = get_logger() if expids is not None: filename = desispec.io.findfile('raw', night=night, expid=expids[0], readonly=True) header = fitsio.read_header(filename, camera.upper()) if not needs_ctecorr(header): log.info(f"No CTE correction needed for {night} {camera}") return None else: #- Look up exposures in production exposure table exptablefn = findfile('exptable', night=night) if not os.path.isfile(exptablefn) : mess = f"Cannot find exposure table file '{exptablefn}'. Because of that the flat exposures needed for the CTE correction modeling cannot be identified. Maybe check env. variables DESI_SPECTRO_REDUX and SPECPROD?" log.critical(mess) raise RuntimeError(mess) #- Read the exposure table and trim to just the good exposures covering this camera exptable = load_table(exptablefn, tabletype='exptable') keep = np.ones(len(exptable), dtype=bool) for i, erow in enumerate(exptable): if ((erow['LASTSTEP'] == 'ignore') or (camera not in decode_camword(erow_to_goodcamword(erow, suppress_logging=True)))): keep[i] = False if np.sum(keep) == 0: log.info(f'No good exposures on {night} for {camera}, so no CTE correction needed') return None else: exptable = exptable[keep] #- check the first exp of any OBSTYPE covering this camera to see if CTE corr is needed filename = desispec.io.findfile('raw', night=night, expid=exptable['EXPID'][0], readonly=True) header = fitsio.read_header(filename, camera.upper()) if not needs_ctecorr(header): log.info(f"No CTE correction needed for {night} {camera}") return None #- CTE correction is needed, so trim to OBSTYPE=flat keep = exptable['OBSTYPE'] == 'flat' exptable = exptable[keep] #- Find a 1sec and 120sec flat selection = (np.abs(exptable['EXPTIME'] - 1) < 0.1) if np.sum(selection)<1 : mess = f"No good {camera} 1 sec flat for {night} (in {exptablefn}), needed for CTE correction model fit" log.critical(mess) raise RuntimeError(mess) index1 = np.where(selection)[0][0] selection = (np.abs(exptable['EXPTIME'] - 120) < 10) if np.sum(selection)<1 : mess = f"No good {camera} 120 sec flat for {night} (in {exptablefn}), needed for CTE correction model fit" log.critical(mess) raise RuntimeError(mess) index2 = np.where(selection)[0][-1] expids = list(exptable['EXPID'][ [index1, index2] ]) log.info(f"Will use exposures {expids} for {night} {camera} CTE corrections") images = list() for expid in expids: preproc_filename = desispec.io.findfile('preproc_for_cte', night=night, expid=expid, camera=camera) if not os.path.isfile(preproc_filename) : log.info(f"Computing {preproc_filename}") infile = desispec.io.findfile('raw', night=night, expid=expid, readonly=True) image = desispec.io.read_raw(infile, camera, no_cte_corr=True) tmpfile = get_tempfilename(preproc_filename) desispec.io.write_image(tmpfile,image) os.rename(tmpfile, preproc_filename) log.info(f"Wrote {preproc_filename}") images.append(image) else : images.append(desispec.io.read_image(preproc_filename)) return images
[docs]def fit_cte_night(night, camera, expids=None): """Fit the CTE parameters for a particular night. Parameters ---------- night : int the night YEARMMDD integer camera : str the camera, e.g., z1 Options ------- expids : array-like list of exposure IDs to use; if None, determine from exposure table Returns ------- Fit results; see fit_cte for details. """ images = get_cte_images(night, camera, expids=expids) return fit_cte(images)
[docs]def get_image_model(preproc, psf=None): """Compute model for an image using aperture extraction. This computes a simple model for an image based on an aperture extraction. Parameters ---------- preproc : Image Image to model Returns ------- np.ndarray Model image """ # inside func to avoid circular import import desispec.io import desispec.io.xytraceset import desispec.io.fiberflat meta = preproc.meta cfinder = CalibFinder([meta]) psf_filename = cfinder.findfile("PSF") xyset = desispec.io.xytraceset.read_xytraceset(psf_filename) fiberflat_filename = cfinder.findfile("FIBERFLAT") fiberflat = desispec.io.fiberflat.read_fiberflat(fiberflat_filename) with_sky_model = True with_spectral_smoothing = True spectral_smoothing_sigma_length = 71 no_traceshift = False mask = preproc.mask mimage = compute_image_model( preproc, xyset, fiberflat=fiberflat, with_spectral_smoothing=with_spectral_smoothing, spectral_smoothing_sigma_length=spectral_smoothing_sigma_length, with_sky_model=with_sky_model, psf=psf, fit_x_shift=(not no_traceshift)) preproc.mask = mask # compute_image_model sets this to None for some reason. # we're restoring it. return mimage
[docs]def get_rowbyrow_image_model(preproc, fibermap=None, spectral_smoothing_sigma_length=31, nspec=500, psf=None): """Compute row-by-row image model. This model uses a simultaneous PSF fit in each row to get better performance than get_image_model at the expense of reduced speed. The extracted fluxes are then combined with the PSF to produce a 2D model image. Parameters ---------- preproc : Image image to model fibermap : astropy.Table fibermap to use with image spectral_smoothing_sigma_length : int amount to smooth source spectra in model nspec : int number of spectra to extract and model psf : specter.psf.gausshermite.GaussHermitePSF PSF to use Returns ------- np.ndarray Model image. """ # load specter only if needed to simplify required dependencies import specter.psf import desispec.io # inside func to avoid circular import meta = preproc.meta cfinder = CalibFinder([meta]) if fibermap is None and hasattr(preproc, 'fibermap'): fibermap = preproc.fibermap if psf is None: psf_filename = cfinder.findfile("PSF") psf = specter.psf.load_psf(psf_filename) # try to update the trace shifts first? xytraceset = desispec.io.xytraceset.read_xytraceset(psf_filename) x, y, dx, ex, fiber, wave = compute_dx_from_cross_dispersion_profiles( xcoef=xytraceset.x_vs_wave_traceset._coeff, ycoef=xytraceset.y_vs_wave_traceset._coeff, wavemin=xytraceset.wavemin, wavemax=xytraceset.wavemax, image=preproc, fibers=np.arange(xytraceset.nspec, dtype=int)) dx = np.median(dx) psf._x._coeff[:, 0] += dx res = rowbyrowextract.extract(preproc, psf, nspec=nspec, fibermap=fibermap, return_model=True) qframe, model, profile, profilepix = res fiberflat_filename = cfinder.findfile("FIBERFLAT") fiberflat = desispec.io.fiberflat.read_fiberflat(fiberflat_filename) fqframe = copy.deepcopy(qframe) flat = qfiberflat.qproc_apply_fiberflat( fqframe, fiberflat, return_flat=True) sfqframe = copy.deepcopy(fqframe) sky = qsky.qproc_sky_subtraction(sfqframe, return_skymodel=True) sfflux = median_filter( sfqframe.flux, size=(1, spectral_smoothing_sigma_length), mode='nearest') modflux = (sfflux + sky) * flat mframe = copy.deepcopy(sfqframe) mframe.flux = modflux return rowbyrowextract.model(mframe, profile, profilepix, preproc.pix.shape)
[docs]def correct_image_via_model(image, niter=5, cte_params_filename=None): """Correct for CTE via an image model. The idea here is that you can roughly extract spectra from a CTE-affected image just by extracting as usual. You can then create a noise-free image from that extraction in the context of a PSF and traces. You can then apply CTE to that noise-free image. The difference between the CTE-affected image and the original image is a noise-free version of what CTE is doing to your data, which you can then subtract. You can then re-extract from the corrected image, and repeat, improving the CTE correction. As pseudocode, this corresponds to: for i in range(niter): M = get_model(I) M' = CTE(M) I = I - (M' - M) This function implements that approach. Parameters ---------- image : Image input image niter : int number of iterations to run Optional -------- cte_params_filename : str or None (default) if filename is not None, use this one instead of the default one found with find_file Returns ------- outimage : Image image after correction for CTE """ log = get_logger() # here we get the list of amplifiers and the list # of sectors per amplifiers that are affected by CTE issues # and for which we have a model to apply # (only amplifers and sectors with a model are in this list) amp, cte = get_cte_params(image.meta, cte_params_filename=cte_params_filename) if len(cte) == 0 : log.info("No CTE correction to do for this image, return original") return image outimage = copy.deepcopy(image) previous_rms = 0. for i in range(niter): outmodel = get_rowbyrow_image_model(outimage) cteimage = outmodel.copy() for ampname in amp: ampreg = amp[ampname] imamp = outmodel[ampreg] cteamp = cte[ampname] if len(cteamp) == 0: # don't need to do anything in this case. continue need_to_reverse = ampreg[1].stop == image.pix.shape[1] if need_to_reverse: field, offset, sign = 'stop', ampreg[1].stop, -1 else: field, offset, sign = 'start', 0, 1 ctelocs = [sign * (x[field] - offset) for x in cteamp] individual_ctefuns = [] for entry in cteamp : individual_ctefuns.append(partial( add_cte, cte_transfer_func=get_transfer_function(entry['FUNC']), amplitude=entry['AMPLITUDE'], fracleak=entry['FRACLEAK'])) cteimage[ampreg] = apply_multiple_cte_effects( imamp[:, ::sign], locations=ctelocs, ctefuns=individual_ctefuns)[:, ::sign] correction_amp = imamp - cteimage[ampreg] mn, med, rms = sigma_clipped_stats(correction_amp) log.info( f'Correcting CTE, iteration {i}, correction rms {rms:6.3f}') if abs(rms-previous_rms)<0.1 : break previous_rms =rms correction = cteimage - outmodel outimage.pix = image.pix - correction return outimage