Source code for desispec.scripts.submit_prod

"""
desispec.scripts.submit_prod
============================

"""
import yaml
import numpy as np
import os
import sys
import time
import re
import glob
from astropy.table import Table

from desispec.parallel import stdouterr_redirected
from desiutil.log import get_logger
from desispec.io import findfile
from desispec.scripts.proc_night import proc_night
## Import some helper functions, you can see their definitions by uncomenting the bash shell command
from desispec.workflow.utils import verify_variable_with_environment, listpath, \
    remove_slurm_environment_variables
from desispec.workflow.exptable import read_minimal_science_exptab_cols
from desispec.scripts.submit_night import submit_night
from desispec.workflow.queue import check_queue_count
import desispec.workflow.proctable
from desispec.util import wrap_long_logs


[docs] def get_nights_in_date_range(first_night, last_night): """ Returns a full list of all nights that have an exposure table exposure Args: first_night, int. First night to include (inclusive). last_night, int. Last night to include (inclusive). Returns: nights, list. A list of nights on or after Jan 1 2020 in which data exists at NERSC. """ etab_path = findfile('exptable', night='99999999', readonly=True) glob_path = etab_path.replace('99999999', '202?????').replace('999999', '202???') etab_files = sorted(glob.glob(glob_path)) nights = [] for n in etab_files: # - nights are 20YYMMDD if re.match(r'^20\d{6}$', n): nights.append(int(n)) nights = np.array(nights) nights = nights[((nights >= first_night) & (nights <= last_night))] return nights
[docs] def get_all_valid_nights(first_night, last_night): """ Returns a full list of all nights that have at least one valid science exposure Args: first_night, int. First night to include (inclusive). last_night, int. Last night to include (inclusive). Returns: nights, list. A list of nights on or after Jan 1 2020 in which data exists at NERSC. """ fulletab = read_minimal_science_exptab_cols() nights = np.unique(fulletab['NIGHT']) nights = nights[((nights>=first_night)&(nights<=last_night))] return nights
[docs] def get_all_science_nights_for_prod(production_yaml, verbose=False): """ Derives all the nights with valid science exposures that should be processed based on a production yaml file and returns a list of int nights. The yaml file must contain either NIGHTS or FIRST_NIGHT and LAST_NIGHT. Args: production_yaml (str or dict): Production yaml or pathname of the yaml file that defines the production. verbose (bool): Whether to be verbose in log outputs. Returns: nights, list. A list of nights on or after Jan 1 2020 in which data exists at NERSC. """ log = get_logger() ## If production_yaml not loaded, load the file if isinstance(production_yaml, str): if not os.path.exists(production_yaml): raise IOError(f"Prod yaml file doesn't exist: {production_yaml} not found.") with open(production_yaml, 'rb') as yamlfile: config = yaml.safe_load(yamlfile) else: config = production_yaml all_nights, first_night = None, None if 'NIGHTS' in config and 'LAST_NIGHT' in config: log.error(f"Both NIGHTS and LAST_NIGHT specified. Using NIGHTS " + f"and ignoring LAST_NIGHT.") if 'NIGHTS' in config: all_nights = np.array(list(config['NIGHTS'])).astype(int) if verbose: log.info(f"Setting all_nights to NIGHTS: {all_nights}") log.info("Setting first_night to earliest night in NIGHTS:" + f" {np.min(all_nights)}") first_night = np.min(all_nights) if verbose: log.info("Setting last_night to latest night in NIGHTS: " + f"{np.max(all_nights)}") last_night = np.max(all_nights) elif 'LAST_NIGHT' in config: last_night = int(config['LAST_NIGHT']) if verbose: log.info(f"Setting last_night to LATEST_NIGHT: {last_night}") else: raise ValueError("Either NIGHT or LAST_NIGHT required in yaml " + f"file {production_yaml}") if first_night is None: if 'FIRST_NIGHT' in config: first_night = int(config['FIRST_NIGHT']) if verbose: log.info(f"Setting first_night to FIRST_NIGHT: {first_night}") else: if verbose: log.info("Setting first_night to earliest in a normal prod: 20201214") first_night = 20201214 if all_nights is None: # all_nights = get_nights_in_date_range(first_night, last_night) if verbose: log.info("Populating all_nights with all of the nights with valid science " + f"exposures between {first_night} and {last_night} inclusive") all_nights = get_all_valid_nights(first_night, last_night) all_nights = np.sort(all_nights).tolist() log.info(wrap_long_logs(f"All nights in production: {all_nights}")) return all_nights
[docs] def get_nights_to_process(production_yaml, verbose=False): """ Derives the nights that need to be processed based on a production yaml file and processing tables that exist. The yaml file must contain either NIGHTS or FIRST_NIGHT and LAST_NIGHT. Args: production_yaml (str or dict): Production yaml or pathname of the yaml file that defines the production. verbose (bool): Whether to be verbose in log outputs. Returns: nights, list. A list of nights on or after Jan 1 2020 in which data exists at NERSC. """ log = get_logger() all_nights = get_all_science_nights_for_prod(production_yaml=production_yaml, verbose=verbose) log.info(f"Assuming nights with science jobs in proctable are complete and removing from the list of nights to process.") nights_to_process, nights_with_proctable = [], dict() for night in all_nights[::-1]: ## If proctable exists, file it for further testing about whether the night is completed or not pfile = findfile('proctable', night=night, readonly=True) if os.path.exists(pfile): nights_with_proctable[night] = pfile else: nights_to_process.append(night) ## Because of the reverse order in the loop above, this dict is in reverse chronological order ## Since we submit science nights in chronological order, we want to check the proctables starting ## with the latest night and stop at the first one that has science jobs, as we expect all of the ## earlier nights to also include science exposures (ie be complete). ## However, instead of exiting, we keep looping but add the earlier complete nights to skipped_nights ## since they've already been processed and we want to report them as skipped instead of just silently skipping them. skipped_nights = [] need_to_check = True for night, pfile in nights_with_proctable.items(): ## don't need to open file if need_to_check is False ## and also don't need to use desispec.workflow.tableio.load_table here ## since that brings extra overhead and only matters for multi-value ## columns we don't care about if need_to_check and 'science' not in Table.read(pfile)['OBSTYPE']: nights_to_process.append(night) else: skipped_nights.append(night) need_to_check = False log.info(wrap_long_logs(f"Skipped the following nights that already had a proctable with science jobs: {sorted(skipped_nights)}")) return sorted(nights_to_process)
[docs] def submit_production(production_yaml, queue_threshold=4500, dry_run_level=False): """ Interprets a production_yaml file and submits the respective nights for processing within the defined production. The yaml file must contain SPECPROD and either NIGHTS or FIRST_NIGHT and LAST_NIGHT. Args: production_yaml (str): Pathname of the yaml file that defines the production. queue_threshold (int): The number of jobs for the current user in the queue at which the script stops submitting new jobs. dry_run_level (int, optional): Default is 0. Should the jobs written to the processing table actually be submitted for processing. This is passed directly to desi_proc_night. Returns: None. """ log = get_logger() ## Load the yaml file if not os.path.exists(production_yaml): raise IOError(f"Prod yaml file doesn't exist: {production_yaml} not found.") with open(production_yaml, 'rb') as yamlfile: conf = yaml.safe_load(yamlfile) ## Unset Slurm environment variables set when running in scrontab remove_slurm_environment_variables() ## Make sure the specprod matches, if not set it to that in the file if 'SPECPROD' not in conf: raise ValueError(f"SPECPROD required in yaml file {production_yaml}") specprod = str(conf['SPECPROD']).lower() specprod = verify_variable_with_environment(var=specprod, var_name='specprod', env_name='SPECPROD') ## Define the user user = os.environ['USER'] ## Look for sentinal sentinel_file = os.path.join(os.environ['DESI_SPECTRO_REDUX'], os.environ['SPECPROD'], 'run', 'prod_submission_complete.txt') if os.path.exists(sentinel_file): log.info(f"Sentinel file {sentinel_file} exists, therefore all " + f"nights already submitted.") return 0 ## Load the nights to process all_nights = get_nights_to_process(production_yaml=conf, verbose=True) ## Load the other parameters for running desi_proc_night if 'THRU_NIGHT' in conf: thru_night = int(conf['THRU_NIGHT']) log.info(f"Setting thru_night to THRU_NIGHT: {thru_night}") else: thru_night = np.max(all_nights) log.warning(f"Setting thru_night to last night: {thru_night}") ## If not specified, run "cumulative" redshifts, otherwise do ## as directed no_redshifts = False if 'Z_SUBMIT_TYPES' in conf: z_submit_types_str = str(conf['Z_SUBMIT_TYPES']) if z_submit_types_str.lower() in ['false', 'none']: z_submit_types = None no_redshifts = True else: z_submit_types = [ztype.strip().lower() for ztype in z_submit_types_str.split(',')] else: z_submit_types = ['cumulative'] if 'SURVEYS' in conf: surveys_str = str(conf['SURVEYS']) if surveys_str.lower() in ['false', 'none']: surveys = None else: surveys = [survey.strip().lower() for survey in surveys_str.split(',')] else: surveys = None ## Bring in the queue and reservation information, if any if 'QUEUE' in conf: queue = conf['QUEUE'] else: queue = 'regular' if 'RESERVATION' in conf: reservation = str(conf['RESERVATION']) if reservation.lower() == 'none': reservation = None else: reservation = None ## Let user know what was defined if z_submit_types is not None: log.info(f'Using z_submit_types: {z_submit_types}') if surveys is not None: log.info(f'Using surveys: {surveys}') log.info(f'Using queue: {queue}') if reservation is not None: log.info(f'Using reservation: {reservation}') ## Define log location logpath = os.path.join(os.environ['DESI_SPECTRO_REDUX'], os.environ['SPECPROD'], 'run', 'logs') if dry_run_level < 4: os.makedirs(logpath, exist_ok=True) else: log.info(f"{dry_run_level=} so not creating {logpath}") ## If in dryrun mode, get the number of jobs in the queue once to ## properly simulate stopping if the queue is too full, but don't ## keep rechecking since we're not submitting new jobs. num_in_queue = 0 if dry_run_level >= 1: num_in_queue = check_queue_count(user=user, include_scron=False, dry_run_level=dry_run_level) ## Do the main processing finished = False processed_nights = [] log.info(wrap_long_logs(f"Processing {all_nights=}")) for night in sorted(all_nights): ## If the queue is too full, stop submitting nights ## don't keep checking if in dry run mode since we're not submitting new jobs if dry_run_level < 1: num_in_queue = check_queue_count(user=user, include_scron=False, dry_run_level=dry_run_level) else: log.info(f"{dry_run_level=} so not checking queue count each iteration. " + f"Would have checked for user {user}.") ## In Jura the largest night had 115 jobs, to be conservative we submit ## up to 4500 jobs (out of a 5000 limit) by default if num_in_queue > queue_threshold: log.info(f"{num_in_queue} jobs in the queue > {queue_threshold}," + " so stopping the job submissions.") break ## We don't expect exposure tables to change during code execution here ## but we do expect processing tables to evolve, so clear that cache log.info(f"Processing {night=}") ## Belt-and-suspenders: reset the processing table cache to force a re-read. ## This shouldn't be necessary, but resetting the cache is conservative. desispec.workflow.proctable.reset_tilenight_ptab_cache() if dry_run_level < 4: time.sleep(2) # Sleep to ensure any file system changes have time to propagate logfile = os.path.join(logpath, f'night-{night}.log') with stdouterr_redirected(logfile): proc_night(night=night, z_submit_types=z_submit_types, no_redshifts=no_redshifts, complete_tiles_thrunight=thru_night, surveys=surveys, dry_run_level=dry_run_level, queue=queue, reservation=reservation) else: log.info(f"{dry_run_level=} so not running desi_proc_night. " + f"Would have run for {night=}") processed_nights.append(night) # proc_night(night=None, proc_obstypes=None, z_submit_types=None, # queue=None, reservation=None, system_name=None, # exp_table_pathname=None, proc_table_pathname=None, # override_pathname=None, update_exptable=False, # dry_run_level=0, dry_run=False, no_redshifts=False, # ignore_proc_table_failures=False, # dont_check_job_outputs=False, # dont_resubmit_partial_jobs=False, # tiles=None, surveys=None, science_laststeps=None, # all_tiles=False, specstatus_path=None, use_specter=False, # no_cte_flats=False, complete_tiles_thrunight=None, # all_cumulatives=False, daily=False, specprod=None, # path_to_data=None, exp_obstypes=None, camword=None, # badcamword=None, badamps=None, exps_to_ignore=None, # sub_wait_time=0.1, verbose=False, # dont_require_cals=False, # psf_linking_without_fflat=False, # still_acquiring=False) log.info(f"Completed {night=}.") else: ## I.e. if the above loop didn't "break" because of exceeding the queue ## and all nights finished finished = True # write the sentinel if dry_run_level < 4: with open(sentinel_file, 'w') as sentinel: all_prod_nights = get_all_science_nights_for_prod(production_yaml=production_yaml, verbose=False) sentinel.write(f"All done with processing for {production_yaml}\n") sentinel.write(f"Nights processed: {all_prod_nights}\n") else: log.info(f"{dry_run_level=} so not creating {sentinel_file}") log.info(wrap_long_logs(f"Processed the following nights: {processed_nights}")) if finished: log.info('\n\n\n') log.info("All nights submitted")