Source code for desispec.workflow.submission

"""
desispec.workflow.submision
===========================

Utilities for working submitting jobs to slurm
"""

import os
from importlib import resources
import yaml

from desispec.workflow.tableio import load_table, write_table
from astropy.table import Table, vstack
from desispec.scripts.compute_dark import compute_dark_parser, get_stacked_dark_exposure_table
from desispec.workflow.proctable import default_prow, get_pdarks_from_ptable
import numpy as np

from desispec.io.util import all_impacted_cameras, columns_to_goodcamword, difference_camwords, erow_to_goodcamword, \
    camword_intersection, camword_union
from desispec.scripts.link_calibnight import derive_include_exclude

from desispec.io.meta import findfile
from desispec.workflow.processing import create_and_submit, assign_dependency, define_and_assign_dependency, \
    generate_calibration_dict, night_to_starting_iid, filename_to_jobname
from desispec.workflow.utils import load_override_file, sleep_and_report
from desiutil.log import get_logger


[docs] def submit_linkcal_jobs(night, ptable, cal_override=None, override_pathname=None, psf_linking_without_fflat=False, proccamword='a0123456789', dry_run_level=0, queue=None, reservation=None, check_outputs=True, system_name=None): """ Submit linkcal jobs for the current night. This function will read the override file, determine what calibrations have been done, and submit jobs to link the calibrations that are specified in the override file. It will also update the processing table with the new jobs. Args: night (int): The night to process, in YYYYMMDD format. ptable (Table): Processing table to update with new jobs. cal_override (dict, optional): Dictionary of calibration overrides. If None, will read from override file. override_pathname (str, optional): Path to the override file. If None, will search for it. psf_linking_without_fflat (bool, optional): If True, allows linking psfnight without fiberflatnight. proccamword (str, optional): Camera word defining the cameras to process. dry_run_level (int, optional): Level of dry run to perform. Default is 0. queue (str, optional): Slurm queue to submit jobs to. Default is None. reservation (str, optional): Slurm reservation to use. Default is None. check_outputs (bool, optional): If True, checks for job outputs before submitting. Default is True. system_name (str, optional): Name of the system to use for batch submission. Default is None. Returns: ptable (Table): Updated processing table with new jobs. files_to_link (set): Set of calibration files that will be linked. """ log = get_logger() if cal_override is None: ## Require cal_override to exist if explcitly specified if override_pathname is None: override_pathname = findfile('override', night=night) if not os.path.exists(override_pathname): raise IOError(f"Specified override file: " f"{override_pathname} not found. Exiting this night.") ## Load calibration_override_file overrides = load_override_file(filepathname=override_pathname) cal_override = {} if 'calibration' in overrides: cal_override = overrides['calibration'] if len(ptable) > 0: int_id = np.max(ptable['INTID'])+1 else: int_id = night_to_starting_iid(night=night) ## Determine calibrations that will be linked if 'linkcal' in cal_override: files_to_link, files_not_linked = None, None if 'include' in cal_override['linkcal']: files_to_link = cal_override['linkcal']['include'] if 'exclude' in cal_override['linkcal']: files_not_linked = cal_override['linkcal']['exclude'] files_to_link, files_not_linked = derive_include_exclude(files_to_link, files_not_linked) ## Fiberflatnights need to be generated with psfs from same time, so ## can't link psfs without also linking fiberflatnight if 'psfnight' in files_to_link and not 'fiberflatnight' in files_to_link \ and not psf_linking_without_fflat: err = "Must link fiberflatnight if linking psfnight" log.error(err) raise ValueError(err) else: files_to_link = set() submitted = False if 'linkcal' in cal_override and 'linkcal' not in ptable['JOBDESC']: log.info("Linking calibration files listed in override files: " + f"{files_to_link}") prow = default_prow() prow['INTID'] = int_id int_id += 1 prow['JOBDESC'] = 'linkcal' prow['OBSTYPE'] = 'link' prow['CALIBRATOR'] = 1 prow['NIGHT'] = night if 'camword' in cal_override['linkcal']: prow['PROCCAMWORD'] = cal_override['linkcal']['camword'] else: ## If no camword is specified, use the provided camword, ## or if not provided, use default to all cameras prow['PROCCAMWORD'] = proccamword if 'refnight' in cal_override['linkcal']: refnight = int(cal_override['linkcal']['refnight']) ## For link cals only, enable cross-night dependencies if available refproctable = findfile('proctable', night=refnight) if os.path.exists(refproctable): ptab = load_table(tablename=refproctable, tabletype='proctable') ## This isn't perfect because we may depend on jobs that aren't ## actually being linked ## Also allows us to proceed even if jobs don't exist yet deps, expids, proccamwords = [], [], [] for filename in files_to_link: job = filename_to_jobname(filename) ## this returns 'biaspdark' but that isn't always ## what is used for early nights and those without pdarks ## so check for biasnight as well if biaspdark isn't found if 'bias' in job and job not in ptab['JOBDESC']: job = 'biasnight' if job in ptab['JOBDESC']: ## add prow to dependencies deprow = ptab[ptab['JOBDESC']==job][0] deps.append(deprow) proccamwords.append(deprow['PROCCAMWORD']) expids.extend(list(deprow['EXPID'])) if 'linkcal' in ptab['JOBDESC']: linkcalprow = ptab[ptab['JOBDESC']=='linkcal'][0] deps.append(linkcalprow) proccamwords.append(linkcalprow['PROCCAMWORD']) expids.extend(list(linkcalprow['EXPID'])) if len(deps) > 0: prow['EXPID'] = np.unique(np.array(expids, dtype=int)) ## The proccamword for the linking job is the largest set available from the reference night ## but restricting back to those requested for the current night, if fewer cameras are available prow['PROCCAMWORD'] = camword_intersection([prow['PROCCAMWORD'], camword_union(proccamwords)]) prow = assign_dependency(prow, deps) ## create dictionary to carry linking information linkcalargs = cal_override['linkcal'] log.info(f"\nProcessing: {prow}\n") prow = create_and_submit(prow, dry_run=dry_run_level, queue=queue, reservation=reservation, strictly_successful=True, check_for_outputs=check_outputs, system_name=system_name, extra_job_args=linkcalargs) ## Add the processing row to the processing table ptable.add_row(prow) return ptable, files_to_link
[docs] def submit_biasnight_and_preproc_darks(night, dark_expids, proc_obstypes, camword, badcamword, badamps=None, exp_table_path=None, proc_table_path=None, override_path=None, queue=None, reservation=None, check_for_outputs=True, system_name=None, specprod=None, path_to_data=None, psf_linking_without_fflat=None, sub_wait_time=0.1, dry_run_level=0): """ Submit a biasnight and/or preproc_darks jobs for the given night. This function will read the override file, determine what calibrations have been done, and submit jobs to process the bias and dark frames. Args: night (int): The night to process, in YYYYMMDD format. dark_expids (list): List of exposure IDs for the dark frames to process. proc_obstypes (list): List of obstypes to process. camword (str): Camera word defining the cameras to process. badcamword (str): Camera word defining the bad cameras. badamps (list, optional): List of bad amps to exclude. Default is None. exp_table_path (str, optional): Path to the exposure table files. If None, will search for it. proc_table_path (str, optional): Path to the processing table files. If None, will search for it. override_path (str, optional): Path to the override file. If None, will search for it. queue (str, optional): Slurm queue to submit jobs to. Default is None. reservation (str, optional): Slurm reservation to use. Default is None. check_for_outputs (bool, optional): If True, checks for job outputs before submitting. Default is True. system_name (str, optional): Name of the system to use for batch submission. Default is None. specprod (str, optional): Name of the spectroscopic production. Default is None. path_to_data (str, optional): Path to the data directory. Default is None. psf_linking_without_fflat: bool. Default False. If set then the code will NOT raise an error if asked to link psfnight calibrations without fiberflatnight calibrations. sub_wait_time (float, optional): Time to wait between submissions. Default is 0.1 seconds. dry_run_level (int, optional): Level of dry run to perform. Default is 0. Returns: ptable (Table): Updated processing table with new jobs. """ log = get_logger() ## Determine where the processing table will be written proc_table_pathname = findfile('processing_table', night=night, readonly=True) if proc_table_path is None: proc_table_path = os.path.dirname(proc_table_pathname) else: proc_table_name = os.path.basename(proc_table_pathname) proc_table_pathname = os.path.join(proc_table_path, proc_table_name) if dry_run_level < 3: os.makedirs(proc_table_path, exist_ok=True) ## Load in the files defined above ptable = load_table(tablename=proc_table_pathname, tabletype='proctable') dark_expid_to_process = np.asarray(dark_expids) if len(ptable) > 0: processed_dark_expids = get_pdarks_from_ptable(ptable) dark_expid_to_process = np.setdiff1d(dark_expid_to_process, processed_dark_expids) if len(ptable) > 0 and len(dark_expid_to_process) == 0: if 'biaspdark' in ptable['JOBDESC'] or 'biasnight' in ptable['JOBDESC']: log.info(f"Bias and all preproc darks are already accounted for on {night=}.") return ptable ## Determine where the exposure table will be written exp_table_pathname = findfile('exposure_table', night=night, readonly=True) if exp_table_path is None: exp_table_path = os.path.dirname(os.path.dirname(exp_table_pathname)) else: exp_table_name = os.path.basename(exp_table_pathname) exp_month_dir = os.path.basename(os.path.dirname(exp_table_pathname)) exp_table_pathname = os.path.join(exp_table_path, exp_month_dir, exp_table_name) if not os.path.exists(exp_table_pathname): raise IOError(f"Exposure table: {exp_table_pathname} not found. Exiting this night.") ## Load in the files defined above etable = load_table(tablename=exp_table_pathname, tabletype='exptable') ## Remove exposures that we shouldn't process bad = etable['LASTSTEP'] == 'ignore' badetable = etable[bad] if np.any(np.isin(dark_expid_to_process, badetable['EXPID'].data)): baddarks = badetable[np.isin(badetable['EXPID'].data, dark_expid_to_process)] log.critical(f"Asked to process exposure that has LASTSTEP=ignore: {baddarks}") raise ValueError(f"Asked to process exposure that has LASTSTEP=ignore: {baddarks}") etable = etable[~bad] ## HACK derive the camword from the exposure table and ignore the input ## eventually we want to change this so that the input camword is None ## except when we actually want to restrict the processing to a subset of cameras, ## but for now this is easier and less error prone as a quick-fix if len(etable) > 0: ## camword is any camera that appears on that night log.info(f"Deriving the CAMWORD for night {night} as union of CAMWORD's from the exposure table") camword = camword_union(etable['CAMWORD'].data.astype(str)) ## badcamword is any camera that appears in all exposures for the night log.info(f"Deriving the BADCAMWORD for night {night} as intersection of BADCAMWORD's from the exposure table") badcamword = camword_intersection(etable['BADCAMWORD'].data.astype(str)) else: log.error(f"No exposures for night {night}. Using provided camword and badcamword.") ## Require cal_override to exist if explcitly specified if override_path is None: override_pathname = findfile('override', night=night, readonly=True) override_path = os.path.dirname(override_pathname) else: override_pathname = os.path.join(override_path, f'override_{night}.yaml') if not os.path.exists(override_pathname): raise IOError(f"Specified override file: " f"{override_pathname} not found. Exiting this night.") ## Load calibration_override_file overrides = load_override_file(filepathname=override_pathname) cal_override = {} if 'calibration' in overrides: cal_override = overrides['calibration'] ## Identify what calibrations have been done if 'linkcal' in cal_override: ## define files_to_link even if we already have linkcal files_to_link, files_not_linked = None, None if 'include' in cal_override['linkcal']: files_to_link = cal_override['linkcal']['include'] if 'exclude' in cal_override['linkcal']: files_not_linked = cal_override['linkcal']['exclude'] files_to_link, files_not_linked = derive_include_exclude(files_to_link, files_not_linked) ## run linkcal if we haven't already if 'linkcal' not in ptable['JOBDESC']: proccamword = difference_camwords(camword, badcamword) ptable, files_to_link = submit_linkcal_jobs(night, ptable, cal_override=cal_override, proccamword=proccamword, dry_run_level=dry_run_level, queue=queue, reservation=reservation, psf_linking_without_fflat=psf_linking_without_fflat, check_outputs=check_for_outputs, system_name=system_name) if len(ptable) > 0 and dry_run_level < 3: write_table(ptable, tablename=proc_table_pathname, tabletype='proctable') sleep_and_report(sub_wait_time, message_suffix=f"to slow down the queue submission rate", dry_run=dry_run_level>0, logfunc=log.info) else: files_to_link = set() zeros = etable[etable['OBSTYPE'] == 'zero'] zero_expids = np.array(zeros['EXPID'].data, dtype=int) darks = etable[np.isin(etable['EXPID'].data, dark_expid_to_process)] bias_accounted_for = ('biasnight' in files_to_link and 'linkcal' in ptable['JOBDESC']) or ('biasnight' in ptable['JOBDESC']) or ('biaspdark' in ptable['JOBDESC']) dobias = (not bias_accounted_for) and 'zero' in proc_obstypes and len(zero_expids) > 0 # Only submit pdark if it is after 30 days before 20240509 (see desispec issue #2571) ## Technically this is no longer needed, but left for belt and suspenders dark_date=night>20240408 dodarks = 'dark' in proc_obstypes and len(dark_expid_to_process) > 0 and dark_date ## Next derive the full badcamword from that supplied plus the erows for the exposure type if dobias: expset = zeros elif dodarks: expset = darks else: expset = [] missingcamwords = [] for erow in expset: zero_proccamword = erow_to_goodcamword(erow, suppress_logging=True, exclude_badamps=False) missingcamwords.append(difference_camwords(camword, zero_proccamword)) if len(missingcamwords) > 0: derived_badcam = camword_intersection(missingcamwords) if badcamword is None: badcamword = derived_badcam else: badcamword = camword_union([badcamword, derived_badcam]) extra_job_args = {'steps': []} if dobias: extra_job_args['steps'].append('biasnight') if dodarks: extra_job_args['steps'].append('pdark') if len(ptable) > 0: int_id = np.max(ptable['INTID'])+1 else: int_id = night_to_starting_iid(night=night) prow = None if dobias or dodarks: prow = default_prow() prow['INTID'] = int_id prow['CALIBRATOR'] = 1 prow['NIGHT'] = night prow['PROCCAMWORD'] = columns_to_goodcamword(camword, badcamword, badamps, suppress_logging=True, exclude_badamps=True) ## If submit bias and darks, submit joint job, otherwise submit one or the other if dobias and dodarks: log.info(f"Submitting biaspdark for night {night}.") prow['JOBDESC'] = 'biaspdark' prow['OBSTYPE'] = 'dark' prow['EXPID'] = dark_expid_to_process elif dobias: log.info(f"Submitting biasnight for night {night}.") prow['JOBDESC'] = 'biasnight' prow['OBSTYPE'] = 'zero' prow['EXPID'] = zero_expids[:1] # set as first zero expid elif dodarks: log.info(f"Submitting pdark for night {night}.") prow['JOBDESC'] = 'pdark' prow['OBSTYPE'] = 'dark' prow['EXPID'] = dark_expid_to_process if prow is not None: prow = define_and_assign_dependency(prow, ptable) prow = create_and_submit(prow, dry_run=dry_run_level, queue=queue, reservation=reservation, strictly_successful=True, check_for_outputs=True, system_name=system_name, extra_job_args=extra_job_args) ## Add the processing row to the processing table ptable.add_row(prow) if len(ptable) > 0 and dry_run_level < 3: write_table(ptable, tablename=proc_table_pathname, tabletype='proctable') sleep_and_report(sub_wait_time, message_suffix=f"to slow down the queue submission rate", dry_run=(dry_run_level>0), logfunc=log.info) log.info(f"Successfully submitted {prow['JOBDESC']} job for night {night}.") else: log.info(f"No biasnight or preproc_darks jobs submitted for night {night}.") return ptable
[docs] def submit_necessary_biasnights_and_preproc_darks(reference_night, proc_obstypes, camword, badcamword, badamps=None, exp_table_pathname=None, proc_table_pathname=None, specprod=None, path_to_data=None, sub_wait_time=0.1, dry_run_level=0, psf_linking_without_fflat=False, n_nights_before=None, n_nights_after=None, queue=None, system_name=None): """ Submit biasnight and preproc_darks jobs for the given reference night. This function will read the override file, determine what calibrations have been done, and submit jobs to process the bias and dark frames. Args: reference_night (int): The reference night to process, in YYYYMMDD format. proc_obstypes (list): List of obstypes to process. camword (str): Camera word defining the cameras to process. badcamword (str): Camera word defining the bad cameras. badamps (list, optional): List of bad amps to exclude. Default is None. exp_table_pathname (str, optional): Path to the exposure table file. If None, will search for it. proc_table_pathname (str, optional): Path to the processing table file. If None, will search for it. specprod (str, optional): Name of the spectroscopic production. Default is None. path_to_data (str, optional): Path to the data directory. Default is None. sub_wait_time (float, optional): Time to wait between submissions. Default is 0.1 seconds. dry_run_level (int, optional): Level of dry run to perform. Default is 0. psf_linking_without_fflat: bool. Default False. If set then the code will NOT raise an error if asked to link psfnight calibrations without fiberflatnight calibrations. n_nights_before (int, optional): Number of nights before the reference night to process. Default is None. n_nights_after (int, optional): Number of nights after the reference night to process. Default is None. queue (str): Queue to be used. system_name (str, optional): name of batch system, e.g. cori-haswell, perlmutter Returns: ptable (Table): Updated processing table with new jobs. """ log = get_logger() if 'dark' in proc_obstypes: compdarkparser = compute_dark_parser() options = ['--reference-night', str(reference_night), '-o', 'dummy', '-c', 'b1', '--skip-camera-check', '--dont-search-filesystem'] if n_nights_before is not None: options.extend(['--before', str(n_nights_before)]) if n_nights_after is not None: options.extend(['--after', str(n_nights_after)]) compdarkargs = compdarkparser.parse_args(options) exptab_for_dark_night = get_stacked_dark_exposure_table(compdarkargs) ## We might not have darks on the reference night, but we still want to process ## the biasnight's nights = np.unique(np.append(exptab_for_dark_night['NIGHT'].data, [reference_night])) else: log.info(f"{proc_obstypes=}, so not submitting darks for preprocessing." + " Only submitting biasnight for the reference night if requested.") exptab_for_dark_night = Table(names=['NIGHT', 'EXPID']) nights = [reference_night] ## Loop over nights and submit biasnight or biaspdark jobs refnight_ptable = None for night in nights: log.info(f"Processing night {night} for biasnight and preproc_darks.") dark_expids = np.array(exptab_for_dark_night[exptab_for_dark_night['NIGHT'] == night]['EXPID'].data, dtype=int) ptable = submit_biasnight_and_preproc_darks( night=night, dark_expids=dark_expids, proc_obstypes=proc_obstypes, camword=camword, badcamword=badcamword, badamps=badamps, exp_table_path=os.path.dirname(os.path.dirname(exp_table_pathname)), proc_table_path=os.path.dirname(proc_table_pathname), specprod=specprod, path_to_data=path_to_data, sub_wait_time=sub_wait_time, dry_run_level=dry_run_level, psf_linking_without_fflat=psf_linking_without_fflat, queue=queue, system_name=system_name) if night == reference_night: refnight_ptable = ptable return refnight_ptable