Source code for desispec.scripts.proc_night

"""
desispec.scripts.proc_night
=============================

"""
from desispec.io import findfile
from desispec.scripts.link_calibnight import derive_include_exclude
from desispec.workflow.calibration_selection import \
    determine_calibrations_to_proc
from desispec.workflow.science_selection import determine_science_to_proc, \
    get_tiles_cumulative
from desiutil.log import get_logger
import numpy as np
import os
import sys
import time
import re
from socket import gethostname
from astropy.table import Table, vstack

## Import some helper functions, you can see their definitions by uncomenting the bash shell command
from desispec.scripts.update_exptable import update_exposure_table
from desispec.workflow.tableio import load_tables, write_table
from desispec.workflow.utils import sleep_and_report, \
    verify_variable_with_environment, load_override_file
from desispec.workflow.timing import what_night_is_it, during_operating_hours
from desispec.workflow.exptable import get_last_step_options
from desispec.workflow.proctable import default_obstypes_for_proctable, \
    erow_to_prow, default_prow
from desispec.workflow.processing import define_and_assign_dependency, \
    create_and_submit, \
    submit_tilenight_and_redshifts, \
    generate_calibration_dict, \
    night_to_starting_iid, make_joint_prow, \
    set_calibrator_flag, make_exposure_prow, \
    update_calibjobs_with_linking, all_calibs_submitted, \
    update_and_recurvsively_submit
from desispec.workflow.queue import update_from_queue, any_jobs_failed
from desispec.io.util import decode_camword, difference_camwords, \
    create_camword, replace_prefix


[docs]def proc_night(night=None, proc_obstypes=None, z_submit_types=None, queue=None, reservation=None, system_name=None, exp_table_pathname=None, proc_table_pathname=None, override_pathname=None, update_exptable=False, dry_run_level=0, dry_run=False, no_redshifts=False, ignore_proc_table_failures = False, dont_check_job_outputs=False, dont_resubmit_partial_jobs=False, tiles=None, surveys=None, science_laststeps=None, all_tiles=False, specstatus_path=None, use_specter=False, no_cte_flats=False, complete_tiles_thrunight=None, all_cumulatives=False, daily=False, specprod=None, path_to_data=None, exp_obstypes=None, camword=None, badcamword=None, badamps=None, exps_to_ignore=None, sub_wait_time=0.1, verbose=False, dont_require_cals=False, psf_linking_without_fflat=False, still_acquiring=False): """ Process some or all exposures on a night. Can be used to process an entire night, or used to process data currently available on a given night using the '--daily' flag. Args: night (int): The night of data to be processed. Exposure table must exist. proc_obstypes (list or np.array, optional): A list of exposure OBSTYPE's that should be processed (and therefore added to the processing table). z_submit_types (list of str): The "group" types of redshifts that should be submitted with each exposure. If not specified, default for daily processing is ['cumulative', 'pernight-v0']. If false, 'false', or [], then no redshifts are submitted. queue (str, optional): The name of the queue to submit the jobs to. Default is "realtime". reservation (str, optional): The reservation to submit jobs to. If None, it is not submitted to a reservation. system_name (str): batch system name, e.g. cori-haswell, cori-knl, perlmutter-gpu exp_table_pathname (str): Full path to where to exposure tables are stored, including file name. proc_table_pathname (str): Full path to where to processing tables to be written, including file name override_pathname (str): Full path to the override file. update_exptable (bool): If true then the exposure table is updated. The default is False. dry_run_level (int, optional): If nonzero, this is a simulated run. If dry_run_level=1 the scripts will be written but not submitted. If dry_run_level=2, the scripts will not be written nor submitted but the processing_table is still created. If dry_run_level=3, no output files are written. Logging will remain the same for testing as though scripts are being submitted. Default is 0 (false). dry_run (bool, optional): When to run without submitting scripts or not. If dry_run_level is defined, then it over-rides this flag. dry_run_level not set and dry_run=True, dry_run_level is set to 2 (no scripts generated or run). Default for dry_run is False. no_redshifts (bool, optional): Whether to submit redshifts or not. If True, redshifts are not submitted. ignore_proc_table_failures (bool, optional): True if you want to submit other jobs even the loaded processing table has incomplete jobs in it. Use with caution. Default is False. dont_check_job_outputs (bool, optional): Default is False. If False, the code checks for the existence of the expected final data products for the script being submitted. If all files exist and this is False, then the script will not be submitted. If some files exist and this is False, only the subset of the cameras without the final data products will be generated and submitted. dont_resubmit_partial_jobs (bool, optional): Default is False. Must be used with dont_check_job_outputs=False. If this flag is False, jobs with some prior data are pruned using PROCCAMWORD to only process the remaining cameras not found to exist. tiles (array-like, optional): Only submit jobs for these TILEIDs. surveys (array-like, optional): Only submit science jobs for these surveys (lowercase) science_laststeps (array-like, optional): Only submit jobs for exposures with LASTSTEP in these science_laststeps (lowercase) all_tiles (bool, optional): Default is False. Set to NOT restrict to completed tiles as defined by the table pointed to by specstatus_path. specstatus_path (str, optional): Default is $DESI_SURVEYOPS/ops/tiles-specstatus.ecsv. Location of the surveyops specstatus table. use_specter (bool, optional): Default is False. If True, use specter, otherwise use gpu_specter by default. no_cte_flats (bool, optional): Default is False. If False, cte flats are used if available to correct for cte effects. complete_tiles_thrunight (int, optional): Default is None. Only tiles completed on or before the supplied YYYYMMDD are considered completed and will be processed. All complete tiles are submitted if None or all_tiles is True. all_cumulatives (bool, optional): Default is False. Set to run cumulative redshifts for all tiles even if the tile has observations on a later night. specprod: str. The name of the current production. If used, this will overwrite the SPECPROD environment variable. daily: bool. Flag that sets other flags for running this script for the daily pipeline. path_to_data: str. Path to the raw data. exp_obstypes: str or comma separated list of strings. The exposure OBSTYPE's that you want to include in the exposure table. camword: str. Camword that, if set, alters the set of cameras that will be set for processing. Examples: a0123456789, a1, a2b3r3, a2b3r4z3. Note this is only true for new exposures being added to the exposure_table in 'daily' mode. badcamword: str. Camword that, if set, will be removed from the camword defined in camword if given, or the camword inferred from the data if camword is not given. Note this is only true for new exposures being added to the exposure_table in 'daily' mode. badamps: str. Comma seperated list of bad amplifiers that should not be processed. Should be of the form "{camera}{petal}{amp}", i.e. "[brz][0-9][ABCD]". Example: 'b7D,z8A'. Note this is only true for new exposures being added to the exposure_table in 'daily' mode. sub_wait_time: int. Wait time in seconds between submission loops. Default 0.1 seconds. verbose: bool. True if you want more verbose output, false otherwise. Current not propagated to lower code, so it is only used in the main daily_processing script itself. dont_require_cals: bool. Default False. If set then the code doesn't require either a valid set of calibrations or a valid override file to link to calibrations in order to proceed with science processing. psf_linking_without_fflat: bool. Default False. If set then the code will NOT raise an error if asked to link psfnight calibrations without fiberflatnight calibrations. still_acquiring: bool. If True, assume more data might be coming, e.g. wait for additional exposures of latest tile. If False, auto-derive True/False based upon night and current time. Primarily for testing. """ ## Get logger log = get_logger() log.info(f'----- Processing {night} at {time.asctime()} -----') log.info(f"SLURM_JOB_ID={os.getenv('SLURM_JOB_ID')} on {gethostname()}") ## Inform user of how some parameters will be used if camword is not None: log.info(f"Note custom {camword=} will only be used for new exposures" f" being entered into the exposure_table, not all exposures" f" to be processed.") if badcamword is not None: log.info(f"Note custom {badcamword=} will only be used for new exposures" f" being entered into the exposure_table, not all exposures" f" to be processed.") if badamps is not None: log.info(f"Note custom {badamps=} will only be used for new exposures" f" being entered into the exposure_table, not all exposures" f" to be processed.") ## Reconcile the dry_run and dry_run_level if dry_run and dry_run_level == 0: dry_run_level = 2 elif dry_run_level > 0: dry_run = True ## If running in daily mode, change a bunch of defaults if daily: ## What night are we running on? true_night = what_night_is_it() if night is not None: night = int(night) if true_night != night: log.info(f"True night is {true_night}, but running daily for {night=}") else: night = true_night if science_laststeps is None: science_laststeps = ['all', 'skysub', 'fluxcal'] if z_submit_types is None and not no_redshifts: z_submit_types = ['cumulatives'] ## still_acquiring is flag to determine whether to process the last tile in the exposure table ## or not. This is used in daily mode when processing and exiting mid-night. ## override still_acquiring==False if daily mode during observing hours if during_operating_hours(dry_run=dry_run) and (true_night == night): if still_acquiring is False: log.info(f'Daily mode during observing hours on current night, so assuming that more data might arrive and setting still_acquiring=True') still_acquiring = True update_exptable = True append_to_proc_table = True all_cumulatives = True all_tiles = True complete_tiles_thrunight = None ## Default for nightly processing is realtime queue if queue is None: queue = 'realtime' ## Default for normal processing is regular queue if queue is None: queue = 'regular' log.info(f"Submitting to the {queue} queue.") ## Set night if night is None: err = "Must specify night unless running in daily=True mode" log.error(err) raise ValueError(err) else: log.info(f"Processing {night=}") ## Recast booleans from double negative check_for_outputs = (not dont_check_job_outputs) resubmit_partial_complete = (not dont_resubmit_partial_jobs) require_cals = (not dont_require_cals) do_cte_flats = (not no_cte_flats) ## cte flats weren't available before 20211130 so hardcode that in if do_cte_flats and night < 20211130: log.info("Asked to do cte flat correction but before 20211130 no " + "no cte flats are available to do the correction. " + "Code will NOT perform cte flat corrections.") do_cte_flats = False ################### ## Set filenames ## ################### ## Ensure specprod is set in the environment and that it matches user ## specified value if given specprod = verify_variable_with_environment(specprod, var_name='specprod', env_name='SPECPROD') ## Determine where the exposure table will be written if exp_table_pathname is None: exp_table_pathname = findfile('exposure_table', night=night) if not os.path.exists(exp_table_pathname) and not update_exptable: raise IOError(f"Exposure table: {exp_table_pathname} not found. Exiting this night.") ## Determine where the processing table will be written if proc_table_pathname is None: proc_table_pathname = findfile('processing_table', night=night) proc_table_path = os.path.dirname(proc_table_pathname) if dry_run_level < 3: os.makedirs(proc_table_path, exist_ok=True) ## Determine where the unprocessed data table will be written unproc_table_pathname = replace_prefix(proc_table_pathname, 'processing', 'unprocessed') ## Require cal_override to exist if explcitly specified if override_pathname is None: override_pathname = findfile('override', night=night) elif not os.path.exists(override_pathname): raise IOError(f"Specified override file: " f"{override_pathname} not found. Exiting this night.") ####################################### ## Define parameters based on inputs ## ####################################### ## If science_laststeps not defined, default is only LASTSTEP=='all' exposures if science_laststeps is None: science_laststeps = ['all'] else: laststep_options = get_last_step_options() for laststep in science_laststeps: if laststep not in laststep_options: raise ValueError(f"Couldn't understand laststep={laststep} " + f"in science_laststeps={science_laststeps}.") log.info(f"Processing exposures with the following LASTSTEP's: {science_laststeps}") ## Define the group types of redshifts you want to generate for each tile if no_redshifts: log.info(f"no_redshifts set, so ignoring {z_submit_types=}") z_submit_types = None if z_submit_types is None: log.info("Not submitting scripts for redshift fitting") else: for ztype in z_submit_types: if ztype not in ['cumulative', 'pernight-v0', 'pernight', 'perexp']: raise ValueError(f"Couldn't understand ztype={ztype} " + f"in z_submit_types={z_submit_types}.") log.info(f"Redshift fitting with redshift group types: {z_submit_types}") ## Identify OBSTYPES to process if proc_obstypes is None: proc_obstypes = default_obstypes_for_proctable() ############################# ## Start the Actual Script ## ############################# ## If running in daily mode, or requested, then update the exposure table ## This reads in and writes out the exposure table to disk if update_exptable: log.info("Running update_exposure_table.") update_exposure_table(night=night, specprod=specprod, exp_table_pathname=exp_table_pathname, path_to_data=path_to_data, exp_obstypes=exp_obstypes, camword=camword, badcamword=badcamword, badamps=badamps, exps_to_ignore=exps_to_ignore, dry_run_level=dry_run_level, verbose=verbose) log.info("Done with update_exposure_table.\n\n") ## Combine the table names and types for easier passing to io functions table_pathnames = [exp_table_pathname, proc_table_pathname] table_types = ['exptable', 'proctable'] ## Load in the files defined above etable, ptable = load_tables(tablenames=table_pathnames, tabletypes=table_types) full_etable = etable.copy() ## Cut on OBSTYPES log.info(f"Processing the following obstypes: {proc_obstypes}") good_types = np.isin(np.array(etable['OBSTYPE']).astype(str), proc_obstypes) etable = etable[good_types] ## Update processing table tableng = len(ptable) if tableng > 0: ptable = update_from_queue(ptable, dry_run=dry_run_level) if dry_run_level < 3: write_table(ptable, tablename=proc_table_pathname) if any_jobs_failed(ptable['STATUS']): ## Try up to two times to resubmit failures, afterwards give up ## unless explicitly told to proceed with the failures ## Note after 2 resubmissions, the code won't resubmit anymore even ## if given ignore_proc_table_failures if np.max([len(qids) for qids in ptable['ALL_QIDS']]) < 3: log.info("Job failures were detected. Resubmitting those jobs " + "before continuing with new submissions.") ptable, nsubmits = update_and_recurvsively_submit(ptable, ptab_name=proc_table_pathname, dry_run=dry_run, reservation=reservation) elif not ignore_proc_table_failures: err = "Some jobs have an incomplete job status. This script " \ + "will not fix them. You should remedy those first. " log.error(err) ## if the failures are in calibrations, then crash since ## we need them for any new jobs if any_jobs_failed(ptable['STATUS'][ptable['CALIBRATOR'] > 0]): err += "To proceed anyway use " err += "'--ignore-proc-table-failures'. Exiting." raise AssertionError(err) else: log.warning("Some jobs have an incomplete job status, but " + "you entered '--ignore-proc-table-failures'. This " + "script will not fix them. " + "You should have fixed those first. Proceeding...") if np.sum(ptable['OBSTYPE']=='science') > 0: ptable_expids = set(np.concatenate( ptable['EXPID'][ptable['OBSTYPE']=='science'] )) else: ptable_expids = set() etable_expids = set(etable['EXPID'][etable['OBSTYPE']=='science']) if len(etable_expids) == 0: log.info(f"No science exposures yet. Exiting at {time.asctime()}.") return ptable, None elif len(etable_expids.difference(ptable_expids)) == 0: log.info("All science EXPID's already present in processing table, " + f"nothing to run. Exiting at {time.asctime()}.") return ptable, None int_id = np.max(ptable['INTID'])+1 else: int_id = night_to_starting_iid(night=night) ################### Determine What to Process ################### ## Load calibration_override_file overrides = load_override_file(filepathname=override_pathname) cal_override = {} if 'calibration' in overrides: cal_override = overrides['calibration'] ## Determine calibrations that will be linked if 'linkcal' in cal_override: files_to_link, files_not_linked = None, None if 'include' in cal_override['linkcal']: files_to_link = cal_override['linkcal']['include'] if 'exclude' in cal_override['linkcal']: files_not_linked = cal_override['linkcal']['exclude'] files_to_link, files_not_linked = derive_include_exclude(files_to_link, files_not_linked) ## Fiberflatnights need to be generated with psfs from same time, so ## can't link psfs without also linking fiberflatnight if 'psfnight' in files_to_link and not 'fiberflatnight' in files_to_link \ and not psf_linking_without_fflat: err = "Must link fiberflatnight if linking psfnight" log.error(err) raise ValueError(err) else: files_to_link = set() ## Identify what calibrations have been done calibjobs = generate_calibration_dict(ptable, files_to_link) ## Determine the appropriate set of calibrations ## Only run if we haven't already linked or done fiberflatnight's cal_etable = None if not all_calibs_submitted(calibjobs['accounted_for'], do_cte_flats): cal_etable = determine_calibrations_to_proc(etable, do_cte_flats=do_cte_flats, still_acquiring=still_acquiring) ## Determine the appropriate science exposures sci_etable, tiles_to_proc = determine_science_to_proc( etable=etable, tiles=tiles, surveys=surveys, laststeps=science_laststeps, processed_tiles=np.unique(ptable['TILEID']), all_tiles=all_tiles, ignore_last_tile=still_acquiring, complete_tiles_thrunight=complete_tiles_thrunight, specstatus_path=specstatus_path) ## For cumulative redshifts, identify tiles for which this is the last ## night that they were observed tiles_cumulative = get_tiles_cumulative(sci_etable, z_submit_types, all_cumulatives, night) ################### Process the data ################### ## Process Calibrations ## For now assume that a linkcal job links all files and we therefore ## don't need to submit anything more. def create_submit_add_and_save(prow, proctable, check_outputs=check_for_outputs, extra_job_args=None): log.info(f"\nProcessing: {prow}\n") prow = create_and_submit(prow, dry_run=dry_run_level, queue=queue, reservation=reservation, strictly_successful=True, check_for_outputs=check_outputs, resubmit_partial_complete=resubmit_partial_complete, system_name=system_name, use_specter=use_specter, extra_job_args=extra_job_args) ## Add the processing row to the processing table proctable.add_row(prow) if len(proctable) > 0 and dry_run_level < 3: write_table(proctable, tablename=proc_table_pathname) sleep_and_report(sub_wait_time, message_suffix=f"to slow down the queue submission rate", dry_run=dry_run, logfunc=log.info) return prow, proctable ## Actually process the calibrations ## Only run if we haven't already linked or done fiberflatnight's if not all_calibs_submitted(calibjobs['accounted_for'], do_cte_flats): ptable, calibjobs, int_id = submit_calibrations(cal_etable, ptable, cal_override, calibjobs, int_id, night, files_to_link, create_submit_add_and_save) ## Require some minimal level of calibrations to process science exposures if require_cals and not all_calibs_submitted(calibjobs['accounted_for'], do_cte_flats): err = (f"Exiting because not all calibration files accounted for " + f"with links or submissions and require_cals is True.") log.error(err) ## If still acquiring new data in daily mode, don't exit with error code ## But do exit log.info(f'Stopping at {time.asctime()}\n') if still_acquiring: if len(ptable) > 0: processed = np.isin(full_etable['EXPID'], np.unique(np.concatenate(ptable['EXPID']))) unproc_table = full_etable[~processed] else: unproc_table = full_etable return ptable, unproc_table else: sys.exit(1) ## Process Sciences ## Loop over new tiles and process them for tile in tiles_to_proc: log.info(f'\n\n################# Submitting {tile} #####################') ## Identify the science exposures for the given tile tile_etable = sci_etable[sci_etable['TILEID'] == tile] ## Should change submit_tilenight_and_redshifts to take erows ## but for now will remain backward compatible and use prows ## Create list of prows from selected etable rows sciences = [] for erow in tile_etable: prow = erow_to_prow(erow) prow['INTID'] = int_id int_id += 1 prow['JOBDESC'] = prow['OBSTYPE'] prow = define_and_assign_dependency(prow, calibjobs) sciences.append(prow) # don't submit cumulative redshifts for lasttile if it isn't in tiles_cumulative if z_submit_types is None: cur_z_submit_types = None else: cur_z_submit_types = z_submit_types.copy() if ((z_submit_types is not None) and ('cumulative' in z_submit_types) and (tile not in tiles_cumulative)): cur_z_submit_types.remove('cumulative') ## No longer need to return sciences since this is always the ## full set of exposures, but will keep for now for backward ## compatibility extra_job_args = {} if 'science' in overrides and 'tilenight' in overrides['science']: extra_job_args = overrides['science']['tilenight'] else: extra_job_args = {} extra_job_args['z_submit_types'] = cur_z_submit_types extra_job_args['laststeps'] = science_laststeps ptable, sciences, int_id = submit_tilenight_and_redshifts( ptable, sciences, calibjobs, int_id, dry_run=dry_run_level, queue=queue, reservation=reservation, strictly_successful=True, check_for_outputs=check_for_outputs, resubmit_partial_complete=resubmit_partial_complete, system_name=system_name, use_specter=use_specter, extra_job_args=extra_job_args) if len(ptable) > 0 and dry_run_level < 3: write_table(ptable, tablename=proc_table_pathname) sleep_and_report(sub_wait_time, message_suffix=f"to slow down the queue submission rate", dry_run=dry_run, logfunc=log.info) ## Flush the outputs sys.stdout.flush() sys.stderr.flush() ################### Wrap things up ################### unproc_table = None if len(ptable) > 0: ## All jobs now submitted, update information from job queue and save ptable = update_from_queue(ptable, dry_run=dry_run_level) if dry_run_level < 3: write_table(ptable, tablename=proc_table_pathname) ## Now that processing is complete, lets identify what we didn't process if len(ptable) > 0: processed = np.isin(full_etable['EXPID'], np.unique(np.concatenate(ptable['EXPID']))) unproc_table = full_etable[~processed] else: unproc_table = full_etable write_table(unproc_table, tablename=unproc_table_pathname) elif dry_run_level < 3 and len(full_etable) > 0: ## Done determining what not to process, so write out unproc file unproc_table = full_etable write_table(unproc_table, tablename=unproc_table_pathname) if dry_run_level >= 3: log.info(f"{dry_run_level=} so not saving outputs.") log.info(f"\n{full_etable=}") log.info(f"\nn{ptable=}") log.info(f"\n{unproc_table=}") if still_acquiring: log.info(f"Current submission of exposures " + f"for {night=} are complete except for last tile at {time.asctime()}.\n\n\n\n") else: log.info(f"All done: Completed submission of exposures for night {night} at {time.asctime()}.\n") return ptable, unproc_table
def submit_calibrations(cal_etable, ptable, cal_override, calibjobs, int_id, curnight, files_to_link, create_submit_add_and_save): log = get_logger() if len(ptable) > 0: ## we use this to check for individual jobs rather than combination ## jobs, so only check for scalar jobs where JOBDESC == OBSTYPE ## ex. dark, zero, arc, and flat explists = ptable['EXPID'][ptable['JOBDESC']==ptable['OBSTYPE']] processed_cal_expids = np.unique(np.concatenate(explists).astype(int)) else: processed_cal_expids = np.array([]).astype(int) ######## Submit caliblink if requested ######## if 'linkcal' in cal_override and calibjobs['linkcal'] is None: log.info("Linking calibration files listed in override files: " + f"{files_to_link}") prow = default_prow() prow['INTID'] = int_id int_id += 1 prow['JOBDESC'] = 'linkcal' prow['OBSTYPE'] = 'link' prow['CALIBRATOR'] = 1 prow['NIGHT'] = curnight if 'refnight' in cal_override['linkcal']: refnight = int(cal_override['linkcal']['refnight']) prow = define_and_assign_dependency(prow, calibjobs, refnight=refnight) ## create dictionary to carry linking information linkcalargs = cal_override['linkcal'] prow, ptable = create_submit_add_and_save(prow, ptable, check_outputs=False, extra_job_args=linkcalargs) calibjobs[prow['JOBDESC']] = prow.copy() calibjobs = update_calibjobs_with_linking(calibjobs, files_to_link) if len(cal_etable) == 0: return ptable, calibjobs, int_id ## Otherwise proceed with submitting the calibrations ## Define objects to process darks, flats, ctes, cte1s = list(), list(), list(), list() zeros = cal_etable[cal_etable['OBSTYPE']=='zero'] arcs = cal_etable[cal_etable['OBSTYPE']=='arc'] if 'dark' in cal_etable['OBSTYPE']: darks = cal_etable[cal_etable['OBSTYPE']=='dark'] if 'flat' in cal_etable['OBSTYPE']: allflats = cal_etable[cal_etable['OBSTYPE']=='flat'] is_cte = np.array(['cte' in prog.lower() for prog in allflats['PROGRAM']]) flats = allflats[~is_cte] ctes = allflats[is_cte] have_flats_for_cte = len(ctes) > 0 and len(flats) > 0 do_bias = len(zeros) > 0 and not calibjobs['accounted_for']['biasnight'] do_badcol = len(darks) > 0 and not calibjobs['accounted_for']['badcolumns'] do_cte = have_flats_for_cte and not calibjobs['accounted_for']['ctecorrnight'] ## if do badcol or cte, then submit a ccdcalib job, otherwise submit a ## nightlybias job if do_badcol or do_cte: ######## Submit ccdcalib ######## ## process dark for bad columns even if we don't have zeros for nightlybias ## ccdcalib = nightlybias(zeros) + badcol(dark) + cte correction jobdesc = 'ccdcalib' if calibjobs[jobdesc] is None: ## Define which erow to use to create the processing table row all_expids = [] if do_badcol: ## first exposure is a 300s dark job_erow = darks[0] all_expids.append(job_erow['EXPID']) else: job_erow = ctes[-1] ## if doing cte correction, create expid list of last 120s flat ## and all ctes provided by the calibration selection function if do_cte: cte_expids = np.array([flats[-1]['EXPID'], *ctes['EXPID']]) all_expids.extend(cte_expids) else: cte_expids = None prow, int_id = make_exposure_prow(job_erow, int_id, calibjobs, jobdesc=jobdesc) if len(all_expids) > 1: prow['EXPID'] = np.array(all_expids) prow['CALIBRATOR'] = 1 extra_job_args = {'nightlybias': do_bias, 'nightlycte': do_cte, 'cte_expids': cte_expids} prow, ptable = create_submit_add_and_save(prow, ptable, extra_job_args=extra_job_args) calibjobs[prow['JOBDESC']] = prow.copy() log.info(f"Submitted ccdcalib job with {do_bias=}, " + f"{do_badcol=}, {do_cte=}") elif do_bias: log.info("\nNo dark or cte found. Submitting nightlybias before " "processing exposures.\n") prow = erow_to_prow(zeros[0]) prow['EXPID'] = np.array([]) prow['INTID'] = int_id int_id += 1 prow['JOBDESC'] = 'nightlybias' prow['CALIBRATOR'] = 1 cams = set(decode_camword('a0123456789')) for zero in zeros: if 'calib' in zero['PROGRAM']: proccamword = difference_camwords(zero['CAMWORD'], zero['BADCAMWORD']) cams = cams.intersection(set(decode_camword(proccamword))) prow['PROCCAMWORD'] = create_camword(list(cams)) prow = define_and_assign_dependency(prow, calibjobs) prow, ptable = create_submit_add_and_save(prow, ptable) calibjobs[prow['JOBDESC']] = prow.copy() log.info("Performed nightly bias as no dark or cte passed cuts.") if do_bias: calibjobs['accounted_for']['biasnight'] = True if do_badcol: calibjobs['accounted_for']['badcolumns'] = True if do_cte: calibjobs['accounted_for']['ctecorrnight'] = True ######## Submit arcs and psfnight ######## if len(arcs)>0 and not calibjobs['accounted_for']['psfnight']: arc_prows = [] for arc_erow in arcs: if arc_erow['EXPID'] in processed_cal_expids: continue prow, int_id = make_exposure_prow(arc_erow, int_id, calibjobs) prow, ptable = create_submit_add_and_save(prow, ptable) arc_prows.append(prow) joint_prow, int_id = make_joint_prow(arc_prows, descriptor='psfnight', internal_id=int_id) ptable = set_calibrator_flag(arc_prows, ptable) joint_prow, ptable = create_submit_add_and_save(joint_prow, ptable) calibjobs[joint_prow['JOBDESC']] = joint_prow.copy() calibjobs['accounted_for']['psfnight'] = True ######## Submit flats and nightlyflat ######## ## If nightlyflat defined we don't need to process more normal flats if len(flats) > 0 and not calibjobs['accounted_for']['fiberflatnight']: flat_prows = [] for flat_erow in flats: if flat_erow['EXPID'] in processed_cal_expids: continue jobdesc = 'flat' prow, int_id = make_exposure_prow(flat_erow, int_id, calibjobs, jobdesc=jobdesc) prow, ptable = create_submit_add_and_save(prow, ptable) flat_prows.append(prow) joint_prow, int_id = make_joint_prow(flat_prows, descriptor='nightlyflat', internal_id=int_id) ptable = set_calibrator_flag(flat_prows, ptable) if 'nightlyflat' in cal_override: extra_args = cal_override['nightlyflat'] else: extra_args = None joint_prow, ptable = create_submit_add_and_save(joint_prow, ptable, extra_job_args=extra_args) calibjobs[joint_prow['JOBDESC']] = joint_prow.copy() calibjobs['accounted_for']['fiberflatnight'] = True ######## Submit cte flats ######## jobdesc = 'flat' for cte_erow in ctes: if cte_erow['EXPID'] in processed_cal_expids: continue prow, int_id = make_exposure_prow(cte_erow, int_id, calibjobs, jobdesc=jobdesc) prow, ptable = create_submit_add_and_save(prow, ptable) return ptable, calibjobs, int_id