"""
desispec.workflow.processing
============================
"""
import sys, os, glob
import json
from astropy.io import fits
from astropy.table import Table, join
import numpy as np
import time, datetime
from collections import OrderedDict
import subprocess
from desispec.scripts.link_calibnight import derive_include_exclude
from desispec.scripts.tile_redshifts import generate_tile_redshift_scripts
from desispec.workflow.redshifts import get_ztile_script_pathname, \
get_ztile_relpath, \
get_ztile_script_suffix
from desispec.workflow.queue import get_resubmission_states, update_from_queue, \
queue_info_from_qids, get_queue_states_from_qids, update_queue_state_cache
from desispec.workflow.timing import what_night_is_it
from desispec.workflow.desi_proc_funcs import get_desi_proc_batch_file_pathname, \
create_desi_proc_batch_script, \
get_desi_proc_batch_file_path, \
get_desi_proc_tilenight_batch_file_pathname, \
create_desi_proc_tilenight_batch_script, create_linkcal_batch_script
from desispec.workflow.utils import pathjoin, sleep_and_report, \
load_override_file
from desispec.workflow.tableio import write_table, load_table
from desispec.workflow.proctable import table_row_to_dict, erow_to_prow
from desiutil.log import get_logger
from desispec.io import findfile, specprod_root
from desispec.io.util import decode_camword, create_camword, \
difference_camwords, \
camword_to_spectros, camword_union, camword_intersection, parse_badamps
#################################################
############## Misc Functions ###################
#################################################
[docs]def night_to_starting_iid(night=None):
"""
Creates an internal ID for a given night. The resulting integer is an 8 digit number.
The digits are YYMMDDxxx where YY is the years since 2000, MM and DD are the month and day. xxx are 000,
and are incremented for up to 1000 unique job ID's for a given night.
Args:
night (str or int): YYYYMMDD of the night to get the starting internal ID for.
Returns:
int: 9 digit number consisting of YYMMDD000. YY is years after 2000, MMDD is month and day.
000 being the starting job number (0).
"""
if night is None:
night = what_night_is_it()
night = int(night)
internal_id = (night - 20000000) * 1000
return internal_id
class ProcessingParams():
def __init__(self, dry_run_level=0, queue='realtime',
reservation=None, strictly_successful=True,
check_for_outputs=True,
resubmit_partial_complete=True,
system_name='perlmutter', use_specter=True):
self.dry_run_level = dry_run_level
self.system_name = system_name
self.queue = queue
self.reservation = reservation
self.strictly_successful = strictly_successful
self.check_for_outputs = check_for_outputs
self.resubmit_partial_complete = resubmit_partial_complete
self.use_specter = use_specter
#################################################
############ Script Functions ###################
#################################################
[docs]def batch_script_name(prow):
"""
Wrapper script that takes a processing table row (or dictionary with NIGHT, EXPID, JOBDESC, PROCCAMWORD defined)
and determines the script file pathname as defined by desi_proc's helper functions.
Args:
prow (Table.Row or dict): Must include keyword accessible definitions for 'NIGHT', 'EXPID', 'JOBDESC', and 'PROCCAMWORD'.
Returns:
str: The complete pathname to the script file, as it is defined within the desi_proc ecosystem.
"""
expids = prow['EXPID']
if len(expids) == 0:
expids = None
if prow['JOBDESC'] == 'tilenight':
pathname = get_desi_proc_tilenight_batch_file_pathname(night = prow['NIGHT'], tileid=prow['TILEID'])
else:
pathname = get_desi_proc_batch_file_pathname(night = prow['NIGHT'], exp=expids, \
jobdesc=prow['JOBDESC'], cameras=prow['PROCCAMWORD'])
scriptfile = pathname + '.slurm'
return scriptfile
[docs]def get_jobdesc_to_file_map():
"""
Returns a mapping of job descriptions to the filenames of the output files
Args:
None
Returns:
dict. Dictionary with keys as lowercase job descriptions and to the
filename of their expected outputs.
"""
return {'prestdstar': 'sframe',
'stdstarfit': 'stdstars',
'poststdstar': 'cframe',
'nightlybias': 'biasnight',
# 'ccdcalib': 'badcolumns',
'badcol': 'badcolumns',
'arc': 'fitpsf',
'flat': 'fiberflat',
'psfnight': 'psfnight',
'nightlyflat': 'fiberflatnight',
'spectra': 'spectra_tile',
'coadds': 'coadds_tile',
'redshift': 'redrock_tile'}
[docs]def get_file_to_jobdesc_map():
"""
Returns a mapping of output filenames to job descriptions
Args:
None
Returns:
dict. Dictionary with keys as filename of their expected outputs to
the lowercase job descriptions
.
"""
job_to_file_map = get_jobdesc_to_file_map()
job_to_file_map.pop('badcol') # these files can also be in a ccdcalib job
job_to_file_map.pop('nightlybias') # these files can also be in a ccdcalib job
return {value: key for key, value in job_to_file_map.items()}
[docs]def check_for_outputs_on_disk(prow, resubmit_partial_complete=True):
"""
Args:
prow (Table.Row or dict): Must include keyword accessible definitions for processing_table columns found in
desispect.workflow.proctable.get_processing_table_column_defs()
resubmit_partial_complete (bool, optional): Default is True. Must be used with check_for_outputs=True. If this flag is True,
jobs with some prior data are pruned using PROCCAMWORD to only process the
remaining cameras not found to exist.
Returns:
Table.Row or dict: The same prow type and keywords as input except with modified values updated to reflect
the change in job status after creating and submitting the job for processing.
"""
prow['STATUS'] = 'UNKNOWN'
log = get_logger()
if prow['JOBDESC'] in ['linkcal', 'ccdcalib']:
log.info(f"jobdesc={prow['JOBDESC']} has indeterminated outputs, so "
+ "not checking for files on disk.")
return prow
job_to_file_map = get_jobdesc_to_file_map()
night = prow['NIGHT']
if prow['JOBDESC'] in ['cumulative','pernight-v0','pernight','perexp']:
filetype = 'redrock_tile'
else:
filetype = job_to_file_map[prow['JOBDESC']]
orig_camword = prow['PROCCAMWORD']
## if spectro based, look for spectros, else look for cameras
if prow['JOBDESC'] in ['stdstarfit','spectra','coadds','redshift']:
## Spectrograph based
spectros = camword_to_spectros(prow['PROCCAMWORD'])
n_desired = len(spectros)
## Suppress outputs about using tile based files in findfile if only looking for stdstarfits
if prow['JOBDESC'] == 'stdstarfit':
tileid = None
else:
tileid = prow['TILEID']
expid = prow['EXPID'][0]
existing_spectros = []
for spectro in spectros:
if os.path.exists(findfile(filetype=filetype, night=night, expid=expid, spectrograph=spectro, tile=tileid)):
existing_spectros.append(spectro)
completed = (len(existing_spectros) == n_desired)
if not completed and resubmit_partial_complete and len(existing_spectros) > 0:
existing_camword = 'a' + ''.join([str(spec) for spec in sorted(existing_spectros)])
prow['PROCCAMWORD'] = difference_camwords(prow['PROCCAMWORD'],existing_camword)
elif prow['JOBDESC'] in ['cumulative','pernight-v0','pernight','perexp']:
## Spectrograph based
spectros = camword_to_spectros(prow['PROCCAMWORD'])
n_desired = len(spectros)
## Suppress outputs about using tile based files in findfile if only looking for stdstarfits
tileid = prow['TILEID']
expid = prow['EXPID'][0]
redux_dir = specprod_root()
outdir = os.path.join(redux_dir,get_ztile_relpath(tileid,group=prow['JOBDESC'],night=night,expid=expid))
suffix = get_ztile_script_suffix(tileid, group=prow['JOBDESC'], night=night, expid=expid)
existing_spectros = []
for spectro in spectros:
if os.path.exists(os.path.join(outdir, f"redrock-{spectro}-{suffix}.fits")):
existing_spectros.append(spectro)
completed = (len(existing_spectros) == n_desired)
if not completed and resubmit_partial_complete and len(existing_spectros) > 0:
existing_camword = 'a' + ''.join([str(spec) for spec in sorted(existing_spectros)])
prow['PROCCAMWORD'] = difference_camwords(prow['PROCCAMWORD'],existing_camword)
else:
## Otheriwse camera based
cameras = decode_camword(prow['PROCCAMWORD'])
n_desired = len(cameras)
if len(prow['EXPID']) > 0:
expid = prow['EXPID'][0]
else:
expid = None
if len(prow['EXPID']) > 1 and prow['JOBDESC'] not in ['psfnight','nightlyflat']:
log.warning(f"{prow['JOBDESC']} job with exposure(s) {prow['EXPID']}. This job type only makes " +
f"sense with a single exposure. Proceeding with {expid}.")
missing_cameras = []
for cam in cameras:
if not os.path.exists(findfile(filetype=filetype, night=night, expid=expid, camera=cam)):
missing_cameras.append(cam)
completed = (len(missing_cameras) == 0)
if not completed and resubmit_partial_complete and len(missing_cameras) < n_desired:
prow['PROCCAMWORD'] = create_camword(missing_cameras)
if completed:
prow['STATUS'] = 'COMPLETED'
log.info(f"{prow['JOBDESC']} job with exposure(s) {prow['EXPID']} already has " +
f"the desired {n_desired} {filetype}'s. Not submitting this job.")
elif resubmit_partial_complete and orig_camword != prow['PROCCAMWORD']:
log.info(f"{prow['JOBDESC']} job with exposure(s) {prow['EXPID']} already has " +
f"some {filetype}'s. Submitting smaller camword={prow['PROCCAMWORD']}.")
elif not resubmit_partial_complete:
log.info(f"{prow['JOBDESC']} job with exposure(s) {prow['EXPID']} doesn't have all " +
f"{filetype}'s and resubmit_partial_complete=False. "+
f"Submitting full camword={prow['PROCCAMWORD']}.")
else:
log.info(f"{prow['JOBDESC']} job with exposure(s) {prow['EXPID']} has no " +
f"existing {filetype}'s. Submitting full camword={prow['PROCCAMWORD']}.")
return prow
[docs]def create_and_submit(prow, queue='realtime', reservation=None, dry_run=0,
joint=False, strictly_successful=False,
check_for_outputs=True, resubmit_partial_complete=True,
system_name=None, use_specter=False,
extra_job_args=None):
"""
Wrapper script that takes a processing table row and three modifier keywords, creates a submission script for the
compute nodes, and then submits that script to the Slurm scheduler with appropriate dependencies.
Args:
prow (Table.Row or dict): Must include keyword accessible definitions for processing_table columns found in
desispect.workflow.proctable.get_processing_table_column_defs()
queue (str, optional): The name of the NERSC Slurm queue to submit to. Default is the realtime queue.
reservation: str. The reservation to submit jobs to. If None, it is not submitted to a reservation.
dry_run (int, optional): If nonzero, this is a simulated run. If dry_run=1 the scripts will be written or submitted. If
dry_run=2, the scripts will not be writter or submitted. Logging will remain the same
for testing as though scripts are being submitted. Default is 0 (false).
joint (bool, optional): Whether this is a joint fitting job (the job involves multiple exposures) and therefore needs to be
run with desi_proc_joint_fit. Default is False.
strictly_successful (bool, optional): Whether all jobs require all inputs to have succeeded. For daily processing, this is
less desirable because e.g. the sciences can run with SVN default calibrations rather
than failing completely from failed calibrations. Default is False.
check_for_outputs (bool, optional): Default is True. If True, the code checks for the existence of the expected final
data products for the script being submitted. If all files exist and this is True,
then the script will not be submitted. If some files exist and this is True, only the
subset of the cameras without the final data products will be generated and submitted.
resubmit_partial_complete (bool, optional): Default is True. Must be used with check_for_outputs=True. If this flag is True,
jobs with some prior data are pruned using PROCCAMWORD to only process the
remaining cameras not found to exist.
system_name (str): batch system name, e.g. cori-haswell or perlmutter-gpu
use_specter (bool, optional): Default is False. If True, use specter, otherwise use gpu_specter by default.
extra_job_args (dict): Dictionary with key-value pairs that specify additional
information used for a specific type of job. Examples include refnight
and include/exclude lists for linkcals, laststeps for for tilenight, etc.
Returns:
Table.Row or dict: The same prow type and keywords as input except with modified values updated to reflect
the change in job status after creating and submitting the job for processing.
Note:
This modifies the input. Though Table.Row objects are generally copied on modification, so the change to the
input object in memory may or may not be changed. As of writing, a row from a table given to this function will
not change during the execution of this function (but can be overwritten explicitly with the returned row if desired).
"""
orig_prow = prow.copy()
if check_for_outputs:
prow = check_for_outputs_on_disk(prow, resubmit_partial_complete)
if prow['STATUS'].upper() == 'COMPLETED':
return prow
prow = create_batch_script(prow, queue=queue, dry_run=dry_run, joint=joint,
system_name=system_name, use_specter=use_specter,
extra_job_args=extra_job_args)
prow = submit_batch_script(prow, reservation=reservation, dry_run=dry_run,
strictly_successful=strictly_successful)
## If resubmitted partial, the PROCCAMWORD and SCRIPTNAME will correspond
## to the pruned values. But we want to
## retain the full job's value, so get those from the old job.
if resubmit_partial_complete:
prow['PROCCAMWORD'] = orig_prow['PROCCAMWORD']
prow['SCRIPTNAME'] = orig_prow['SCRIPTNAME']
return prow
[docs]def desi_link_calibnight_command(prow, refnight, include=None):
"""
Wrapper script that takes a processing table row (or dictionary with
REFNIGHT, NIGHT, PROCCAMWORD defined) and determines the proper command
line call to link data defined by the input row/dict.
Args:
prow (Table.Row or dict): Must include keyword accessible definitions
for 'NIGHT', 'REFNIGHT', and 'PROCCAMWORD'.
refnight (str or int): The night with a valid set of calibrations
be created.
include (list): The filetypes to include in the linking.
Returns:
str: The proper command to be submitted to desi_link_calibnight
to process the job defined by the prow values.
"""
cmd = 'desi_link_calibnight'
cmd += f' --refnight={refnight}'
cmd += f' --newnight={prow["NIGHT"]}'
cmd += f' --cameras={prow["PROCCAMWORD"]}'
if include is not None:
cmd += f' --include=' + ','.join(list(include))
return cmd
[docs]def desi_proc_command(prow, system_name, use_specter=False, queue=None):
"""
Wrapper script that takes a processing table row (or dictionary with NIGHT, EXPID, OBSTYPE, JOBDESC, PROCCAMWORD defined)
and determines the proper command line call to process the data defined by the input row/dict.
Args:
prow (Table.Row or dict): Must include keyword accessible definitions for 'NIGHT', 'EXPID', 'JOBDESC', and 'PROCCAMWORD'.
system_name (str): batch system name, e.g. cori-haswell, cori-knl, perlmutter-gpu
queue (str, optional): The name of the NERSC Slurm queue to submit to. Default is None (which leaves it to the desi_proc default).
use_specter (bool, optional): Default is False. If True, use specter, otherwise use gpu_specter by default.
Returns:
str: The proper command to be submitted to desi_proc to process the job defined by the prow values.
"""
cmd = 'desi_proc'
cmd += ' --batch'
cmd += ' --nosubmit'
if queue is not None:
cmd += f' -q {queue}'
if prow['OBSTYPE'].lower() == 'science':
if prow['JOBDESC'] == 'prestdstar':
cmd += ' --nostdstarfit --nofluxcalib'
elif prow['JOBDESC'] == 'poststdstar':
cmd += ' --noprestdstarfit --nostdstarfit'
if use_specter:
cmd += ' --use-specter'
elif prow['JOBDESC'] in ['flat', 'prestdstar'] and use_specter:
cmd += ' --use-specter'
pcamw = str(prow['PROCCAMWORD'])
cmd += f" --cameras={pcamw} -n {prow['NIGHT']}"
if len(prow['EXPID']) > 0:
## If ccdcalib job without a dark exposure, don't assign the flat expid
## since it would incorrectly process the flat using desi_proc
if prow['OBSTYPE'].lower() != 'flat' or prow['JOBDESC'] != 'ccdcalib':
cmd += f" -e {prow['EXPID'][0]}"
if prow['BADAMPS'] != '':
cmd += ' --badamps={}'.format(prow['BADAMPS'])
return cmd
[docs]def desi_proc_joint_fit_command(prow, queue=None):
"""
Wrapper script that takes a processing table row (or dictionary with NIGHT, EXPID, OBSTYPE, PROCCAMWORD defined)
and determines the proper command line call to process the data defined by the input row/dict.
Args:
prow (Table.Row or dict): Must include keyword accessible definitions for 'NIGHT', 'EXPID', 'JOBDESC', and 'PROCCAMWORD'.
queue (str): The name of the NERSC Slurm queue to submit to. Default is None (which leaves it to the desi_proc default).
Returns:
str: The proper command to be submitted to desi_proc_joint_fit
to process the job defined by the prow values.
"""
cmd = 'desi_proc_joint_fit'
cmd += ' --batch'
cmd += ' --nosubmit'
if queue is not None:
cmd += f' -q {queue}'
descriptor = prow['OBSTYPE'].lower()
night = prow['NIGHT']
specs = str(prow['PROCCAMWORD'])
expid_str = ','.join([str(eid) for eid in prow['EXPID']])
cmd += f' --obstype {descriptor}'
cmd += f' --cameras={specs} -n {night}'
if len(expid_str) > 0:
cmd += f' -e {expid_str}'
return cmd
[docs]def create_batch_script(prow, queue='realtime', dry_run=0, joint=False,
system_name=None, use_specter=False, extra_job_args=None):
"""
Wrapper script that takes a processing table row and three modifier keywords and creates a submission script for the
compute nodes.
Args:
prow, Table.Row or dict. Must include keyword accessible definitions for processing_table columns found in
desispect.workflow.proctable.get_processing_table_column_defs()
queue, str. The name of the NERSC Slurm queue to submit to. Default is the realtime queue.
dry_run, int. If nonzero, this is a simulated run. If dry_run=1 the scripts will be written but not submitted.
If dry_run=2, the scripts will not be written nor submitted. Logging will remain the same
for testing as though scripts are being submitted. Default is 0 (false).
joint, bool. Whether this is a joint fitting job (the job involves multiple exposures) and therefore needs to be
run with desi_proc_joint_fit when not using tilenight. Default is False.
system_name (str): batch system name, e.g. cori-haswell or perlmutter-gpu
use_specter, bool, optional. Default is False. If True, use specter, otherwise use gpu_specter by default.
extra_job_args (dict): Dictionary with key-value pairs that specify additional
information used for a specific type of job. Examples include refnight
and include/exclude lists for linkcal, laststeps for tilenight, etc.
Returns:
Table.Row or dict: The same prow type and keywords as input except with modified values updated values for
scriptname.
Note:
This modifies the input. Though Table.Row objects are generally copied on modification, so the change to the
input object in memory may or may not be changed. As of writing, a row from a table given to this function will
not change during the execution of this function (but can be overwritten explicitly with the returned row if desired).
"""
log = get_logger()
if extra_job_args is None:
extra_job_args = {}
if prow['JOBDESC'] in ['perexp','pernight','pernight-v0','cumulative']:
if dry_run > 1:
scriptpathname = get_ztile_script_pathname(tileid=prow['TILEID'],group=prow['JOBDESC'],
night=prow['NIGHT'], expid=prow['EXPID'][0])
log.info("Output file would have been: {}".format(scriptpathname))
else:
#- run zmtl for cumulative redshifts but not others
run_zmtl = (prow['JOBDESC'] == 'cumulative')
no_afterburners = False
print(f"entering tileredshiftscript: {prow}")
scripts, failed_scripts = generate_tile_redshift_scripts(tileid=prow['TILEID'], group=prow['JOBDESC'],
nights=[prow['NIGHT']], expids=prow['EXPID'],
batch_queue=queue, system_name=system_name,
run_zmtl=run_zmtl,
no_afterburners=no_afterburners,
nosubmit=True)
if len(failed_scripts) > 0:
log.error(f"Redshifts failed for group={prow['JOBDESC']}, night={prow['NIGHT']}, "+
f"tileid={prow['TILEID']}, expid={prow['EXPID']}.")
log.info(f"Returned failed scriptname is {failed_scripts}")
elif len(scripts) > 1:
log.error(f"More than one redshifts returned for group={prow['JOBDESC']}, night={prow['NIGHT']}, "+
f"tileid={prow['TILEID']}, expid={prow['EXPID']}.")
log.info(f"Returned scriptnames were {scripts}")
elif len(scripts) == 0:
msg = f'No scripts were generated for {prow=}'
log.critical(prow)
raise ValueError(msg)
else:
scriptpathname = scripts[0]
elif prow['JOBDESC'] == 'linkcal':
refnight, include, exclude = -99, None, None
if 'refnight' in extra_job_args:
refnight = extra_job_args['refnight']
if 'include' in extra_job_args:
include = extra_job_args['include']
if 'exclude' in extra_job_args:
exclude = extra_job_args['exclude']
include, exclude = derive_include_exclude(include, exclude)
## Fiberflatnights shouldn't to be generated with psfs from same time, so
## shouldn't link psfs without also linking fiberflatnight
## However, this should be checked at a higher level. If set here,
## go ahead and do it
# if 'psfnight' in include and not 'fiberflatnight' in include:
# err = "Must link fiberflatnight if linking psfnight"
# log.error(err)
# raise ValueError(err)
if dry_run > 1:
scriptpathname = batch_script_name(prow)
log.info("Output file would have been: {}".format(scriptpathname))
cmd = desi_link_calibnight_command(prow, refnight, include)
log.info("Command to be run: {}".format(cmd.split()))
else:
if refnight == -99:
err = f'For {prow=} asked to link calibration but not given' \
+ ' a valid refnight'
log.error(err)
raise ValueError(err)
cmd = desi_link_calibnight_command(prow, refnight, include)
log.info(f"Running: {cmd.split()}")
scriptpathname = create_linkcal_batch_script(newnight=prow['NIGHT'],
cameras=prow['PROCCAMWORD'],
queue=queue,
cmd=cmd,
system_name=system_name)
else:
if prow['JOBDESC'] != 'tilenight':
nightlybias, nightlycte, cte_expids = False, False, None
if 'nightlybias' in extra_job_args:
nightlybias = extra_job_args['nightlybias']
elif prow['JOBDESC'].lower() == 'nightlybias':
nightlybias = True
if 'nightlycte' in extra_job_args:
nightlycte = extra_job_args['nightlycte']
if 'cte_expids' in extra_job_args:
cte_expids = extra_job_args['cte_expids']
## run known joint jobs as joint even if unspecified
## in the future we can eliminate the need for "joint"
if joint or prow['JOBDESC'].lower() in ['psfnight', 'nightlyflat']:
cmd = desi_proc_joint_fit_command(prow, queue=queue)
## For consistency with how we edit the other commands, do them
## here, but future TODO would be to move these into the command
## generation itself
if 'extra_cmd_args' in extra_job_args:
cmd += ' ' + ' '.join(np.atleast_1d(extra_job_args['extra_cmd_args']))
else:
cmd = desi_proc_command(prow, system_name, use_specter, queue=queue)
if nightlybias:
cmd += ' --nightlybias'
if nightlycte:
cmd += ' --nightlycte'
if cte_expids is not None:
cmd += ' --cte-expids '
cmd += ','.join(np.atleast_1d(cte_expids).astype(str))
if dry_run > 1:
scriptpathname = batch_script_name(prow)
log.info("Output file would have been: {}".format(scriptpathname))
if prow['JOBDESC'] != 'tilenight':
log.info("Command to be run: {}".format(cmd.split()))
else:
expids = prow['EXPID']
if len(expids) == 0:
expids = None
if prow['JOBDESC'] == 'tilenight':
log.info("Creating tilenight script for tile {}".format(prow['TILEID']))
if 'laststeps' in extra_job_args:
laststeps = extra_job_args['laststeps']
else:
err = f'{prow=} job did not specify last steps to tilenight'
log.error(err)
raise ValueError(err)
ncameras = len(decode_camword(prow['PROCCAMWORD']))
scriptpathname = create_desi_proc_tilenight_batch_script(
night=prow['NIGHT'], exp=expids,
tileid=prow['TILEID'],
ncameras=ncameras,
queue=queue,
mpistdstars=True,
use_specter=use_specter,
system_name=system_name,
laststeps=laststeps)
else:
if expids is not None and len(expids) > 1:
expids = expids[:1]
log.info("Running: {}".format(cmd.split()))
scriptpathname = create_desi_proc_batch_script(night=prow['NIGHT'], exp=expids,
cameras=prow['PROCCAMWORD'],
jobdesc=prow['JOBDESC'],
queue=queue, cmdline=cmd,
use_specter=use_specter,
system_name=system_name,
nightlybias=nightlybias,
nightlycte=nightlycte,
cte_expids=cte_expids)
log.info("Outfile is: {}".format(scriptpathname))
prow['SCRIPTNAME'] = os.path.basename(scriptpathname)
return prow
_fake_qid = int(time.time() - 1.7e9)
[docs]def _get_fake_qid():
"""
Return fake slurm queue jobid to use for dry-run testing
"""
# Note: not implemented as a yield generator so that this returns a
# genuine int, not a generator object
global _fake_qid
_fake_qid += 1
return _fake_qid
[docs]def submit_batch_script(prow, dry_run=0, reservation=None, strictly_successful=False):
"""
Wrapper script that takes a processing table row and three modifier keywords and submits the scripts to the Slurm
scheduler.
Args:
prow, Table.Row or dict. Must include keyword accessible definitions for processing_table columns found in
desispect.workflow.proctable.get_processing_table_column_defs()
dry_run, int. If nonzero, this is a simulated run. If dry_run=1 the scripts will be written or submitted. If
dry_run=2, the scripts will not be writter or submitted. Logging will remain the same
for testing as though scripts are being submitted. Default is 0 (false).
reservation: str. The reservation to submit jobs to. If None, it is not submitted to a reservation.
strictly_successful, bool. Whether all jobs require all inputs to have succeeded. For daily processing, this is
less desirable because e.g. the sciences can run with SVN default calibrations rather
than failing completely from failed calibrations. Default is False.
Returns:
Table.Row or dict: The same prow type and keywords as input except with modified values updated values for
scriptname.
Note:
This modifies the input. Though Table.Row objects are generally copied on modification, so the change to the
input object in memory may or may not be changed. As of writing, a row from a table given to this function will
not change during the execution of this function (but can be overwritten explicitly with the returned row if desired).
"""
log = get_logger()
dep_qids = prow['LATEST_DEP_QID']
dep_list, dep_str = '', ''
## With desi_proc_night we now either resubmit failed jobs or exit, so this
## should no longer be necessary in the normal workflow.
# workaround for sbatch --dependency bug not tracking jobs correctly
# see NERSC TICKET INC0203024
if len(dep_qids) > 0 and not dry_run:
state_dict = get_queue_states_from_qids(dep_qids, dry_run=0, use_cache=True)
still_depids = []
for depid in dep_qids:
if depid in state_dict.keys() and state_dict[int(depid)] == 'COMPLETED':
log.info(f"removing completed jobid {depid}")
else:
still_depids.append(depid)
dep_qids = np.array(still_depids)
if len(dep_qids) > 0:
jobtype = prow['JOBDESC']
if strictly_successful:
depcond = 'afterok'
elif jobtype in ['arc', 'psfnight', 'prestdstar', 'stdstarfit']:
## (though psfnight and stdstarfit will require some inputs otherwise they'll go up in flames)
depcond = 'afterany'
else:
## if 'flat','nightlyflat','poststdstar', or any type of redshift, require strict success of inputs
depcond = 'afterok'
dep_str = f'--dependency={depcond}:'
if np.isscalar(dep_qids):
dep_list = str(dep_qids).strip(' \t')
if dep_list == '':
dep_str = ''
else:
dep_str += dep_list
else:
if len(dep_qids)>1:
dep_list = ':'.join(np.array(dep_qids).astype(str))
dep_str += dep_list
elif len(dep_qids) == 1 and dep_qids[0] not in [None, 0]:
dep_str += str(dep_qids[0])
else:
dep_str = ''
# script = f'{jobname}.slurm'
# script_path = pathjoin(batchdir, script)
if prow['JOBDESC'] in ['pernight-v0','pernight','perexp','cumulative']:
script_path = get_ztile_script_pathname(tileid=prow['TILEID'],group=prow['JOBDESC'],
night=prow['NIGHT'], expid=np.min(prow['EXPID']))
jobname = os.path.basename(script_path)
else:
batchdir = get_desi_proc_batch_file_path(night=prow['NIGHT'])
jobname = batch_script_name(prow)
script_path = pathjoin(batchdir, jobname)
batch_params = ['sbatch', '--parsable']
if dep_str != '':
batch_params.append(f'{dep_str}')
if reservation is not None:
batch_params.append(f'--reservation={reservation}')
batch_params.append(f'{script_path}')
if dry_run:
current_qid = _get_fake_qid()
else:
#- sbatch sometimes fails; try several times before giving up
max_attempts = 3
for attempt in range(max_attempts):
try:
current_qid = subprocess.check_output(batch_params, stderr=subprocess.STDOUT, text=True)
current_qid = int(current_qid.strip(' \t\n'))
break
except subprocess.CalledProcessError as err:
log.error(f'{jobname} submission failure at {datetime.datetime.now()}')
log.error(f'{jobname} {batch_params}')
log.error(f'{jobname} {err.output=}')
if attempt < max_attempts - 1:
log.info('Sleeping 60 seconds then retrying')
time.sleep(60)
else: #- for/else happens if loop doesn't succeed
msg = f'{jobname} submission failed {max_attempts} times; exiting'
log.critical(msg)
raise RuntimeError(msg)
log.info(batch_params)
log.info(f'Submitted {jobname} with dependencies {dep_str} and reservation={reservation}. Returned qid: {current_qid}')
## Update prow with new information
prow['LATEST_QID'] = current_qid
prow['ALL_QIDS'] = np.append(prow['ALL_QIDS'],current_qid)
prow['STATUS'] = 'SUBMITTED'
prow['SUBMIT_DATE'] = int(time.time())
## Update the Slurm jobid cache of job states
update_queue_state_cache(qid=prow['LATEST_QID'], state=prow['STATUS'])
return prow
#############################################
########## Row Manipulations ############
#############################################
[docs]def define_and_assign_dependency(prow, calibjobs, use_tilenight=False,
refnight=None):
"""
Given input processing row and possible calibjobs, this defines the
JOBDESC keyword and assigns the dependency appropriate for the job type of
prow.
Args:
prow, Table.Row or dict. Must include keyword accessible definitions for
'OBSTYPE'. A row must have column names for
'JOBDESC', 'INT_DEP_IDS', and 'LATEST_DEP_ID'.
calibjobs, dict. Dictionary containing 'nightlybias', 'ccdcalib', 'psfnight'
and 'nightlyflat'. Each key corresponds to a Table.Row or
None. The table.Row() values are for the corresponding
calibration job. Each value that isn't None must contain
'INTID', and 'LATEST_QID'. If None, it assumes the
dependency doesn't exist and no dependency is assigned.
use_tilenight, bool. Default is False. If True, use desi_proc_tilenight
for prestdstar, stdstar,and poststdstar steps for
science exposures.
refnight, int. The reference night for linking jobs
Returns:
Table.Row or dict: The same prow type and keywords as input except
with modified values updated values for 'JOBDESC', 'INT_DEP_IDS'. and 'LATEST_DEP_ID'.
Note:
This modifies the input. Though Table.Row objects are generally copied
on modification, so the change to the input object in memory may or may
not be changed. As of writing, a row from a table given to this function
will not change during the execution of this function (but can be
overwritten explicitly with the returned row if desired).
"""
if prow['OBSTYPE'] in ['science', 'twiflat']:
if calibjobs['nightlyflat'] is not None:
dependency = calibjobs['nightlyflat']
elif calibjobs['psfnight'] is not None:
dependency = calibjobs['psfnight']
elif calibjobs['ccdcalib'] is not None:
dependency = calibjobs['ccdcalib']
elif calibjobs['nightlybias'] is not None:
dependency = calibjobs['nightlybias']
elif calibjobs['badcol'] is not None:
dependency = calibjobs['badcol']
else:
dependency = calibjobs['linkcal']
if not use_tilenight:
prow['JOBDESC'] = 'prestdstar'
elif prow['OBSTYPE'] == 'flat':
if calibjobs['psfnight'] is not None:
dependency = calibjobs['psfnight']
elif calibjobs['ccdcalib'] is not None:
dependency = calibjobs['ccdcalib']
elif calibjobs['nightlybias'] is not None:
dependency = calibjobs['nightlybias']
elif calibjobs['badcol'] is not None:
dependency = calibjobs['badcol']
else:
dependency = calibjobs['linkcal']
elif prow['OBSTYPE'] == 'arc':
if calibjobs['ccdcalib'] is not None:
dependency = calibjobs['ccdcalib']
elif calibjobs['nightlybias'] is not None:
dependency = calibjobs['nightlybias']
elif calibjobs['badcol'] is not None:
dependency = calibjobs['badcol']
else:
dependency = calibjobs['linkcal']
elif prow['JOBDESC'] in ['badcol', 'nightlybias', 'ccdcalib']:
dependency = calibjobs['linkcal']
elif prow['OBSTYPE'] == 'dark':
if calibjobs['ccdcalib'] is not None:
dependency = calibjobs['ccdcalib']
elif calibjobs['nightlybias'] is not None:
dependency = calibjobs['nightlybias']
elif calibjobs['badcol'] is not None:
dependency = calibjobs['badcol']
else:
dependency = calibjobs['linkcal']
elif prow['JOBDESC'] == 'linkcal' and refnight is not None:
dependency = None
## For link cals only, enable cross-night dependencies if available
refproctable = findfile('proctable', night=refnight)
if os.path.exists(refproctable):
ptab = load_table(tablename=refproctable, tabletype='proctable')
## This isn't perfect because we may depend on jobs that aren't
## actually being linked
## Also allows us to proceed even if jobs don't exist yet
deps = []
for job in ['nightlybias', 'ccdcalib', 'psfnight', 'nightlyflat']:
if job in ptab['JOBDESC']:
## add prow to dependencies
deps.append(ptab[ptab['JOBDESC']==job][0])
if len(deps) > 0:
dependency = deps
else:
dependency = None
prow = assign_dependency(prow, dependency)
return prow
[docs]def assign_dependency(prow, dependency):
"""
Given input processing row and possible arcjob (processing row for psfnight) and flatjob (processing row for
nightlyflat), this defines the JOBDESC keyword and assigns the dependency appropriate for the job type of prow.
Args:
prow, Table.Row or dict. Must include keyword accessible definitions for 'OBSTYPE'. A row must have column names for
'JOBDESC', 'INT_DEP_IDS', and 'LATEST_DEP_ID'.
dependency, NoneType or scalar/list/array of Table.Row, dict. Processing row corresponding to the required input
for the job in prow. This must contain keyword
accessible values for 'INTID', and 'LATEST_QID'.
If None, it assumes the dependency doesn't exist
and no dependency is assigned.
Returns:
Table.Row or dict: The same prow type and keywords as input except with modified values updated values for
'JOBDESC', 'INT_DEP_IDS'. and 'LATEST_DEP_ID'.
Note:
This modifies the input. Though Table.Row objects are generally copied on modification, so the change to the
input object in memory may or may not be changed. As of writing, a row from a table given to this function will
not change during the execution of this function (but can be overwritten explicitly with the returned row if desired).
"""
prow['INT_DEP_IDS'] = np.ndarray(shape=0).astype(int)
prow['LATEST_DEP_QID'] = np.ndarray(shape=0).astype(int)
if dependency is not None:
if type(dependency) in [list, np.array]:
ids, qids = [], []
for curdep in dependency:
if still_a_dependency(curdep):
ids.append(curdep['INTID'])
qids.append(curdep['LATEST_QID'])
prow['INT_DEP_IDS'] = np.array(ids, dtype=int)
prow['LATEST_DEP_QID'] = np.array(qids, dtype=int)
elif type(dependency) in [dict, OrderedDict, Table.Row] and still_a_dependency(dependency):
prow['INT_DEP_IDS'] = np.array([dependency['INTID']], dtype=int)
prow['LATEST_DEP_QID'] = np.array([dependency['LATEST_QID']], dtype=int)
return prow
[docs]def still_a_dependency(dependency):
"""
Defines the criteria for which a dependency is deemed complete (and therefore no longer a dependency).
Args:
dependency, Table.Row or dict. Processing row corresponding to the required input for the job in prow.
This must contain keyword accessible values for 'STATUS', and 'LATEST_QID'.
Returns:
bool: False if the criteria indicate that the dependency is completed and no longer a blocking factor (ie no longer
a genuine dependency). Returns True if the dependency is still a blocking factor such that the slurm
scheduler needs to be aware of the pending job.
"""
return dependency['LATEST_QID'] > 0 and dependency['STATUS'] != 'COMPLETED'
[docs]def get_type_and_tile(erow):
"""
Trivial function to return the OBSTYPE and the TILEID from an exposure table row
Args:
erow, Table.Row or dict. Must contain 'OBSTYPE' and 'TILEID' as keywords.
Returns:
tuple (str, str), corresponding to the OBSTYPE and TILEID values of the input erow.
"""
return str(erow['OBSTYPE']).lower(), erow['TILEID']
#############################################
######### Table manipulators ############
#############################################
[docs]def parse_previous_tables(etable, ptable, night):
"""
This takes in the exposure and processing tables and regenerates all the working memory variables needed for the
daily processing script.
Used by the daily processing to define most of its state-ful variables into working memory.
If the processing table is empty, these are simply declared and returned for use.
If the code had previously run and exited (or crashed), however, this will all the code to
re-establish itself by redefining these values.
Args:
etable, Table, Exposure table of all exposures that have been dealt with thus far.
ptable, Table, Processing table of all exposures that have been processed.
night, str or int, the night the data was taken.
Returns:
tuple: A tuple containing:
* arcs, list of dicts, list of the individual arc jobs used for the psfnight (NOT all
the arcs, if multiple sets existed)
* flats, list of dicts, list of the individual flat jobs used for the nightlyflat (NOT
all the flats, if multiple sets existed)
* sciences, list of dicts, list of the most recent individual prestdstar science exposures
(if currently processing that tile)
* calibjobs, dict. Dictionary containing 'nightlybias', 'ccdcalib', 'badcol', 'psfnight'
and 'nightlyflat'. Each key corresponds to a Table.Row or
None. The table.Row() values are for the corresponding
calibration job.
* curtype, None, the obstype of the current job being run. Always None as first new job will define this.
* lasttype, str or None, the obstype of the last individual exposure row to be processed.
* curtile, None, the tileid of the current job (if science). Otherwise None. Always None as first
new job will define this.
* lasttile, str or None, the tileid of the last job (if science). Otherwise None.
* internal_id, int, an internal identifier unique to each job. Increments with each new job. This
is the latest unassigned value.
"""
log = get_logger()
arcs, flats, sciences = [], [], []
calibjobs = {'nightlybias': None, 'ccdcalib': None, 'badcol': None,
'psfnight': None, 'nightlyflat': None, 'linkcal': None,
'accounted_for': dict()}
curtype,lasttype = None,None
curtile,lasttile = None,None
if len(ptable) > 0:
prow = ptable[-1]
internal_id = int(prow['INTID'])+1
lasttype,lasttile = get_type_and_tile(ptable[-1])
jobtypes = ptable['JOBDESC']
if 'nightlybias' in jobtypes:
calibjobs['nightlybias'] = table_row_to_dict(ptable[jobtypes=='nightlybias'][0])
log.info("Located nightlybias job in exposure table: {}".format(calibjobs['nightlybias']))
if 'ccdcalib' in jobtypes:
calibjobs['ccdcalib'] = table_row_to_dict(ptable[jobtypes=='ccdcalib'][0])
log.info("Located ccdcalib job in exposure table: {}".format(calibjobs['ccdcalib']))
if 'psfnight' in jobtypes:
calibjobs['psfnight'] = table_row_to_dict(ptable[jobtypes=='psfnight'][0])
log.info("Located joint fit psfnight job in exposure table: {}".format(calibjobs['psfnight']))
elif lasttype == 'arc':
seqnum = 10
for row in ptable[::-1]:
erow = etable[etable['EXPID']==row['EXPID'][0]]
if row['OBSTYPE'].lower() == 'arc' and int(erow['SEQNUM'])<seqnum:
arcs.append(table_row_to_dict(row))
seqnum = int(erow['SEQNUM'])
else:
break
## Because we work backword to fill in, we need to reverse them to get chronological order back
arcs = arcs[::-1]
if 'nightlyflat' in jobtypes:
calibjobs['nightlyflat'] = table_row_to_dict(ptable[jobtypes=='nightlyflat'][0])
log.info("Located joint fit nightlyflat job in exposure table: {}".format(calibjobs['nightlyflat']))
elif lasttype == 'flat':
for row in ptable[::-1]:
erow = etable[etable['EXPID']==row['EXPID'][0]]
if row['OBSTYPE'].lower() == 'flat' and int(erow['SEQTOT']) < 5:
if float(erow['EXPTIME']) > 100.:
flats.append(table_row_to_dict(row))
else:
break
flats = flats[::-1]
if lasttype.lower() == 'science':
for row in ptable[::-1]:
if row['OBSTYPE'].lower() == 'science' and row['TILEID'] == lasttile and \
row['JOBDESC'] == 'prestdstar' and row['LASTSTEP'] != 'skysub':
sciences.append(table_row_to_dict(row))
else:
break
sciences = sciences[::-1]
else:
internal_id = night_to_starting_iid(night)
return arcs,flats,sciences, \
calibjobs, \
curtype, lasttype, \
curtile, lasttile,\
internal_id
[docs]def generate_calibration_dict(ptable, files_to_link=None):
"""
This takes in a processing table and regenerates the working memory calibration
dictionary for dependency tracking. Used by the daily processing to define
most of its state-ful variables into working memory.
If the processing table is empty, these are simply declared and returned for use.
If the code had previously run and exited (or crashed), however, this will all the code to
re-establish itself by redefining these values.
Args:
ptable, Table, Processing table of all exposures that have been processed.
files_to_link, set, Set of filenames that the linkcal job will link.
Returns:
calibjobs, dict. Dictionary containing 'nightlybias', 'badcol', 'ccdcalib',
'psfnight', 'nightlyflat', 'linkcal', and 'completed'. Each key corresponds to a
Table.Row or None. The table.Row() values are for the corresponding
calibration job.
"""
log = get_logger()
job_to_file_map = get_jobdesc_to_file_map()
accounted_for = {'biasnight': False, 'badcolumns': False,
'ctecorrnight': False, 'psfnight': False,
'fiberflatnight': False}
calibjobs = {'nightlybias': None, 'ccdcalib': None, 'badcol': None,
'psfnight': None, 'nightlyflat': None, 'linkcal': None}
ptable_jobtypes = ptable['JOBDESC']
for jobtype in calibjobs.keys():
if jobtype in ptable_jobtypes:
calibjobs[jobtype] = table_row_to_dict(ptable[ptable_jobtypes==jobtype][0])
log.info(f"Located {jobtype} job in exposure table: {calibjobs[jobtype]}")
if jobtype == 'linkcal':
if files_to_link is not None and len(files_to_link) > 0:
log.info(f"Assuming existing linkcal job processed "
+ f"{files_to_link} since given in override file.")
calibjobs = update_calibjobs_with_linking(calibjobs, files_to_link)
else:
err = f"linkcal job exists but no files given: {files_to_link=}"
log.error(err)
raise ValueError(err)
elif jobtype == 'ccdcalib':
possible_ccd_files = set(['biasnight', 'badcolumns', 'ctecorrnight'])
if files_to_link is None:
files_accounted_for = possible_ccd_files
else:
files_accounted_for = possible_ccd_files.difference(files_to_link)
ccd_files_linked = possible_ccd_files.intersection(files_to_link)
log.info(f"Assuming existing ccdcalib job processed "
+ f"{files_accounted_for} since {ccd_files_linked} "
+ f"are linked.")
for fil in files_accounted_for:
accounted_for[fil] = True
else:
accounted_for[job_to_file_map[jobtype]] = True
calibjobs['accounted_for'] = accounted_for
return calibjobs
[docs]def update_calibjobs_with_linking(calibjobs, files_to_link):
"""
This takes in a dictionary summarizing the calibration jobs and updates it
based on the files_to_link, which are assumed to have already been linked
such that those files already exist on disk and don't need ot be generated.
Parameters
----------
calibjobs: dict
Dictionary containing "nightlybias", "badcol", "ccdcalib",
"psfnight", "nightlyflat", "linkcal", and "accounted_for". Each key corresponds to a
Table.Row or None. The table.Row() values are for the corresponding
calibration job.
files_to_link: set
Set of filenames that the linkcal job will link.
Returns
-------
calibjobs, dict
Dictionary containing 'nightlybias', 'badcol', 'ccdcalib',
'psfnight', 'nightlyflat', 'linkcal', and 'accounted_for'. Each key corresponds to a
Table.Row or None. The table.Row() values are for the corresponding
calibration job.
"""
log = get_logger()
for fil in files_to_link:
if fil in calibjobs['accounted_for']:
calibjobs['accounted_for'][fil] = True
else:
err = f"{fil} doesn't match an expected filetype: "
err += f"{calibjobs['accounted_for'].keys()}"
log.error(err)
raise ValueError(err)
return calibjobs
[docs]def all_calibs_submitted(accounted_for, do_cte_flats):
"""
Function that returns the boolean logic to determine if the necessary
calibration jobs have been submitted for calibration.
Args:
accounted_for, dict, Dictionary with keys corresponding to the calibration
filenames and values of True or False.
do_cte_flats, bool, whether ctecorrnight files are expected or not.
Returns:
bool, True if all necessary calibrations have been submitted or handled, False otherwise.
"""
test_dict = accounted_for.copy()
if not do_cte_flats:
test_dict.pop('ctecorrnight')
return np.all(list(test_dict.values()))
[docs]def update_and_recurvsively_submit(proc_table, submits=0, resubmission_states=None,
ptab_name=None, dry_run=0,reservation=None):
"""
Given an processing table, this loops over job rows and resubmits failed jobs (as defined by resubmission_states).
Before submitting a job, it checks the dependencies for failures. If a dependency needs to be resubmitted, it recursively
follows dependencies until it finds the first job without a failed dependency and resubmits that. Then resubmits the
other jobs with the new Slurm jobID's for proper dependency coordination within Slurm.
Args:
proc_table, Table, the processing table with a row per job.
submits, int, the number of submissions made to the queue. Used for saving files and in not overloading the scheduler.
resubmission_states, list or array of strings, each element should be a capitalized string corresponding to a
possible Slurm scheduler state, where you wish for jobs with that
outcome to be resubmitted
ptab_name, str, the full pathname where the processing table should be saved.
dry_run, int, If nonzero, this is a simulated run. If dry_run=1 the scripts will be written or submitted. If
dry_run=2, the scripts will not be writter or submitted. Logging will remain the same
for testing as though scripts are being submitted. Default is 0 (false).
reservation: str. The reservation to submit jobs to. If None, it is not submitted to a reservation.
Returns:
tuple: A tuple containing:
* proc_table: Table, a table with the same rows as the input except that Slurm and jobid relevant columns have
been updated for those jobs that needed to be resubmitted.
* submits: int, the number of submissions made to the queue. This is incremented from the input submits, so it is
the number of submissions made from this function call plus the input submits value.
Note:
This modifies the inputs of both proc_table and submits and returns them.
"""
log = get_logger()
if resubmission_states is None:
resubmission_states = get_resubmission_states()
log.info(f"Resubmitting jobs with current states in the following: {resubmission_states}")
proc_table = update_from_queue(proc_table, dry_run=False)
log.info("Updated processing table queue information:")
cols = ['INTID', 'INT_DEP_IDS', 'EXPID', 'TILEID',
'OBSTYPE', 'JOBDESC', 'LATEST_QID', 'STATUS']
log.info(np.array(cols))
for row in proc_table:
log.info(np.array(row[cols]))
log.info("\n")
id_to_row_map = {row['INTID']: rown for rown, row in enumerate(proc_table)}
for rown in range(len(proc_table)):
if proc_table['STATUS'][rown] in resubmission_states:
proc_table, submits = recursive_submit_failed(rown, proc_table, submits,
id_to_row_map, ptab_name,
resubmission_states,
reservation, dry_run)
proc_table = update_from_queue(proc_table)
return proc_table, submits
[docs]def recursive_submit_failed(rown, proc_table, submits, id_to_row_map, ptab_name=None,
resubmission_states=None, reservation=None, dry_run=0):
"""
Given a row of a processing table and the full processing table, this resubmits the given job.
Before submitting a job, it checks the dependencies for failures in the processing table. If a dependency needs to
be resubmitted, it recursively follows dependencies until it finds the first job without a failed dependency and
resubmits that. Then resubmits the other jobs with the new Slurm jobID's for proper dependency coordination within Slurm.
Args:
rown, Table.Row, the row of the processing table that you want to resubmit.
proc_table, Table, the processing table with a row per job.
submits, int, the number of submissions made to the queue. Used for saving files and in not overloading the scheduler.
id_to_row_map, dict, lookup dictionary where the keys are internal ids (INTID's) and the values are the row position
in the processing table.
ptab_name, str, the full pathname where the processing table should be saved.
resubmission_states, list or array of strings, each element should be a capitalized string corresponding to a
possible Slurm scheduler state, where you wish for jobs with that
outcome to be resubmitted
reservation: str. The reservation to submit jobs to. If None, it is not submitted to a reservation.
dry_run, int, If nonzero, this is a simulated run. If dry_run=1 the scripts will be written or submitted. If
dry_run=2, the scripts will not be writter or submitted. Logging will remain the same
for testing as though scripts are being submitted. Default is 0 (false).
Returns:
tuple: A tuple containing:
* proc_table: Table, a table with the same rows as the input except that Slurm and jobid relevant columns have
been updated for those jobs that needed to be resubmitted.
* submits: int, the number of submissions made to the queue. This is incremented from the input submits, so it is
the number of submissions made from this function call plus the input submits value.
Note:
This modifies the inputs of both proc_table and submits and returns them.
"""
log = get_logger()
row = proc_table[rown]
log.info(f"Identified row {row['INTID']} as needing resubmission.")
log.info(f"{row['INTID']}: Expid(s): {row['EXPID']} Job: {row['JOBDESC']}")
if resubmission_states is None:
resubmission_states = get_resubmission_states()
ideps = proc_table['INT_DEP_IDS'][rown]
if ideps is None:
proc_table['LATEST_DEP_QID'][rown] = np.ndarray(shape=0).astype(int)
else:
all_valid_states = list(resubmission_states.copy())
all_valid_states.extend(['RUNNING','PENDING','SUBMITTED','COMPLETED'])
for idep in np.sort(np.atleast_1d(ideps)):
if idep not in id_to_row_map and idep // 1000 != row['INTID'] // 1000:
log.warning(f"Internal ID: {idep} not in id_to_row_map. "
+ "This is expected since it's from another day. "
+ f" This dependency will not be checked or "
+ "resubmitted")
elif proc_table['STATUS'][id_to_row_map[idep]] not in all_valid_states:
log.warning(f"Proc INTID: {proc_table['INTID'][rown]} depended on" +
f" INTID {proc_table['INTID'][id_to_row_map[idep]]}" +
f" but that exposure has state" +
f" {proc_table['STATUS'][id_to_row_map[idep]]} that" +
f" isn't in the list of resubmission states." +
f" Exiting this job's resubmission attempt.")
proc_table['STATUS'][rown] = "DEP_NOT_SUBD"
return proc_table, submits
qdeps = []
for idep in np.sort(np.atleast_1d(ideps)):
if idep not in id_to_row_map and idep // 1000 != row['INTID'] // 1000:
log.warning(f"Internal ID: {idep} not in id_to_row_map. "
+ "This is expected since it's from another day. "
+ f" This dependency will not be checked or "
+ "resubmitted")
continue
elif proc_table['STATUS'][id_to_row_map[idep]] in resubmission_states:
proc_table, submits = recursive_submit_failed(id_to_row_map[idep],
proc_table, submits,
id_to_row_map,
reservation=reservation,
dry_run=dry_run)
qdeps.append(proc_table['LATEST_QID'][id_to_row_map[idep]])
qdeps = np.atleast_1d(qdeps)
if len(qdeps) > 0:
proc_table['LATEST_DEP_QID'][rown] = qdeps
else:
log.error(f"number of qdeps should be 1 or more: Rown {rown}, ideps {ideps}")
proc_table[rown] = submit_batch_script(proc_table[rown], reservation=reservation,
strictly_successful=True, dry_run=dry_run)
submits += 1
if not dry_run:
if ptab_name is None:
write_table(proc_table, tabletype='processing', overwrite=True)
else:
write_table(proc_table, tablename=ptab_name, overwrite=True)
sleep_and_report(0.1 + 0.1*(submits % 10 == 0),
message_suffix=f"after submitting job to queue and writing proctable")
return proc_table, submits
#########################################
######## Joint fit ##############
#########################################
[docs]def joint_fit(ptable, prows, internal_id, queue, reservation, descriptor, z_submit_types=None,
dry_run=0, strictly_successful=False, check_for_outputs=True, resubmit_partial_complete=True,
system_name=None):
"""
DEPRECATED
Given a set of prows, this generates a processing table row, creates a batch script, and submits the appropriate
joint fitting job given by descriptor. If the joint fitting job is standard star fitting, the post standard star fits
for all the individual exposures also created and submitted. The returned ptable has all of these rows added to the
table given as input.
Args:
ptable (Table): The processing table where each row is a processed job.
prows (list or array of dict): The rows corresponding to the individual exposure jobs that are
inputs to the joint fit.
internal_id (int): the next internal id to be used for assignment (already incremented up from the last used id number used).
queue (str): The name of the queue to submit the jobs to. If None is given the current desi_proc default is used.
reservation (str): The reservation to submit jobs to. If None, it is not submitted to a reservation.
descriptor (str): Description of the joint fitting job. Can either be 'science' or 'stdstarfit', 'arc' or 'psfnight',
or 'flat' or 'nightlyflat'.
z_submit_types (list of str, optional): The "group" types of redshifts that should be submitted with each
exposure. If not specified or None, then no redshifts are submitted.
dry_run (int, optional): If nonzero, this is a simulated run. If dry_run=1 the scripts will be written or submitted. If
dry_run=2, the scripts will not be writter or submitted. Logging will remain the same
for testing as though scripts are being submitted. Default is 0 (false).
strictly_successful (bool, optional): Whether all jobs require all inputs to have succeeded. For daily processing, this is
less desirable because e.g. the sciences can run with SVN default calibrations rather
than failing completely from failed calibrations. Default is False.
check_for_outputs (bool, optional): Default is True. If True, the code checks for the existence of the expected final
data products for the script being submitted. If all files exist and this is True,
then the script will not be submitted. If some files exist and this is True, only the
subset of the cameras without the final data products will be generated and submitted.
resubmit_partial_complete (bool, optional): Default is True. Must be used with check_for_outputs=True. If this flag is True,
jobs with some prior data are pruned using PROCCAMWORD to only process the
remaining cameras not found to exist.
system_name (str): batch system name, e.g. cori-haswell or perlmutter-gpu
Returns:
tuple: A tuple containing:
* ptable, Table. The same processing table as input except with added rows for the joint fit job and, in the case
of a stdstarfit, the poststdstar science exposure jobs.
* joint_prow, dict. Row of a processing table corresponding to the joint fit job.
* internal_id, int, the next internal id to be used for assignment (already incremented up from the last used id number used).
"""
log = get_logger()
if len(prows) < 1:
return ptable, None, internal_id
if descriptor is None:
return ptable, None
elif descriptor == 'arc':
descriptor = 'psfnight'
elif descriptor == 'flat':
descriptor = 'nightlyflat'
elif descriptor == 'science':
if z_submit_types is None or len(z_submit_types) == 0:
descriptor = 'stdstarfit'
if descriptor not in ['psfnight', 'nightlyflat', 'science','stdstarfit']:
return ptable, None, internal_id
log.info(" ")
log.info(f"Joint fit criteria found. Running {descriptor}.\n")
if descriptor == 'science':
joint_prow, internal_id = make_joint_prow(prows, descriptor='stdstarfit', internal_id=internal_id)
else:
joint_prow, internal_id = make_joint_prow(prows, descriptor=descriptor, internal_id=internal_id)
joint_prow = create_and_submit(joint_prow, queue=queue, reservation=reservation, joint=True, dry_run=dry_run,
strictly_successful=strictly_successful, check_for_outputs=check_for_outputs,
resubmit_partial_complete=resubmit_partial_complete, system_name=system_name)
ptable.add_row(joint_prow)
if descriptor in ['science','stdstarfit']:
if descriptor == 'science':
zprows = []
log.info(" ")
log.info(f"Submitting individual science exposures now that joint fitting of standard stars is submitted.\n")
for row in prows:
if row['LASTSTEP'] == 'stdstarfit':
continue
row['JOBDESC'] = 'poststdstar'
# poststdstar job can't process cameras not included in its stdstar joint fit
stdcamword = joint_prow['PROCCAMWORD']
thiscamword = row['PROCCAMWORD']
proccamword = camword_intersection([stdcamword, thiscamword])
if proccamword != thiscamword:
dropcams = difference_camwords(thiscamword, proccamword)
assert dropcams != '' #- i.e. if they differ, we should be dropping something
log.warning(f"Dropping exp {row['EXPID']} poststdstar cameras {dropcams} since they weren't included in stdstar fit {stdcamword}")
row['PROCCAMWORD'] = proccamword
row['INTID'] = internal_id
internal_id += 1
row['ALL_QIDS'] = np.ndarray(shape=0).astype(int)
row = assign_dependency(row, joint_prow)
row = create_and_submit(row, queue=queue, reservation=reservation, dry_run=dry_run,
strictly_successful=strictly_successful, check_for_outputs=check_for_outputs,
resubmit_partial_complete=resubmit_partial_complete, system_name=system_name)
ptable.add_row(row)
if descriptor == 'science' and row['LASTSTEP'] == 'all':
zprows.append(row)
## Now run redshifts
if descriptor == 'science' and len(zprows) > 0 and z_submit_types is not None:
prow_selection = ( (ptable['OBSTYPE'] == 'science')
& (ptable['LASTSTEP'] == 'all')
& (ptable['JOBDESC'] == 'poststdstar')
& (ptable['TILEID'] == int(zprows[0]['TILEID'])) )
nightly_zprows = []
if np.sum(prow_selection) == len(zprows):
nightly_zprows = zprows.copy()
else:
for prow in ptable[prow_selection]:
nightly_zprows.append(table_row_to_dict(prow))
for zsubtype in z_submit_types:
if zsubtype == 'perexp':
for zprow in zprows:
log.info(" ")
log.info(f"Submitting redshift fit of type {zsubtype} for TILEID {zprow['TILEID']} and EXPID {zprow['EXPID']}.\n")
joint_prow, internal_id = make_joint_prow([zprow], descriptor=zsubtype, internal_id=internal_id)
joint_prow = create_and_submit(joint_prow, queue=queue, reservation=reservation, joint=True, dry_run=dry_run,
strictly_successful=strictly_successful, check_for_outputs=check_for_outputs,
resubmit_partial_complete=resubmit_partial_complete, system_name=system_name)
ptable.add_row(joint_prow)
else:
log.info(" ")
log.info(f"Submitting joint redshift fits of type {zsubtype} for TILEID {nightly_zprows[0]['TILEID']}.")
expids = [prow['EXPID'][0] for prow in nightly_zprows]
log.info(f"Expids: {expids}.\n")
joint_prow, internal_id = make_joint_prow(nightly_zprows, descriptor=zsubtype, internal_id=internal_id)
joint_prow = create_and_submit(joint_prow, queue=queue, reservation=reservation, joint=True, dry_run=dry_run,
strictly_successful=strictly_successful, check_for_outputs=check_for_outputs,
resubmit_partial_complete=resubmit_partial_complete, system_name=system_name)
ptable.add_row(joint_prow)
if descriptor in ['psfnight', 'nightlyflat']:
log.info(f"Setting the calibration exposures as calibrators in the processing table.\n")
ptable = set_calibrator_flag(prows, ptable)
return ptable, joint_prow, internal_id
[docs]def joint_cal_fit(descriptor, ptable, prows, internal_id, queue, reservation,
dry_run=0, strictly_successful=False, check_for_outputs=True,
resubmit_partial_complete=True, system_name=None):
"""
Given a set of prows, this generates a processing table row, creates a batch script, and submits the appropriate
joint fitting job given by descriptor. If the joint fitting job is standard star fitting, the post standard star fits
for all the individual exposures also created and submitted. The returned ptable has all of these rows added to the
table given as input.
Args:
descriptor (str): Description of the joint fitting job. Can either be 'science' or 'stdstarfit', 'arc' or 'psfnight',
or 'flat' or 'nightlyflat'.
prows (list or array of dict): The rows corresponding to the individual exposure jobs that are
inputs to the joint fit.
ptable (Table): The processing table where each row is a processed job.
internal_id (int): the next internal id to be used for assignment (already incremented up from the last used id number used).
queue (str): The name of the queue to submit the jobs to. If None is given the current desi_proc default is used.
reservation (str): The reservation to submit jobs to. If None, it is not submitted to a reservation.
dry_run (int, optional): If nonzero, this is a simulated run. If dry_run=1 the scripts will be written or submitted. If
dry_run=2, the scripts will not be writter or submitted. Logging will remain the same
for testing as though scripts are being submitted. Default is 0 (false).
strictly_successful (bool, optional): Whether all jobs require all inputs to have succeeded. For daily processing, this is
less desirable because e.g. the sciences can run with SVN default calibrations rather
than failing completely from failed calibrations. Default is False.
check_for_outputs (bool, optional): Default is True. If True, the code checks for the existence of the expected final
data products for the script being submitted. If all files exist and this is True,
then the script will not be submitted. If some files exist and this is True, only the
subset of the cameras without the final data products will be generated and submitted.
resubmit_partial_complete (bool, optional): Default is True. Must be used with check_for_outputs=True. If this flag is True,
jobs with some prior data are pruned using PROCCAMWORD to only process the
remaining cameras not found to exist.
system_name (str): batch system name, e.g. cori-haswell or perlmutter-gpu
Returns:
tuple: A tuple containing:
* ptable, Table. The same processing table as input except with added rows for the joint fit job and, in the case
of a stdstarfit, the poststdstar science exposure jobs.
* joint_prow, dict. Row of a processing table corresponding to the joint fit job.
* internal_id, int, the next internal id to be used for assignment (already incremented up from the last used id number used).
"""
log = get_logger()
if len(prows) < 1:
return ptable, None, internal_id
if descriptor is None:
return ptable, None
elif descriptor == 'arc':
descriptor = 'psfnight'
elif descriptor == 'flat':
descriptor = 'nightlyflat'
if descriptor not in ['psfnight', 'nightlyflat']:
return ptable, None, internal_id
log.info(" ")
log.info(f"Joint fit criteria found. Running {descriptor}.\n")
joint_prow, internal_id = make_joint_prow(prows, descriptor=descriptor, internal_id=internal_id)
joint_prow = create_and_submit(joint_prow, queue=queue, reservation=reservation, joint=True, dry_run=dry_run,
strictly_successful=strictly_successful, check_for_outputs=check_for_outputs,
resubmit_partial_complete=resubmit_partial_complete, system_name=system_name)
ptable.add_row(joint_prow)
if descriptor in ['psfnight', 'nightlyflat']:
log.info(f"Setting the calibration exposures as calibrators in the processing table.\n")
ptable = set_calibrator_flag(prows, ptable)
return ptable, joint_prow, internal_id
#########################################
######## Redshifts ##############
#########################################
[docs]def submit_redshifts(ptable, prows, tnight, internal_id, queue, reservation,
dry_run=0, strictly_successful=False,
check_for_outputs=True, resubmit_partial_complete=True,
z_submit_types=None, system_name=None):
"""
Given a set of prows, this generates a processing table row, creates a batch script, and submits the appropriate
tilenight job given by descriptor. The returned ptable has all of these rows added to the
table given as input.
Args:
ptable (Table): The processing table where each row is a processed job.
prows (list or array of dict): Unsubmitted prestdstar jobs that are first steps in tilenight.
tnight (Table.Row): The processing table row of the tilenight job on which the redshifts depend.
internal_id (int): the next internal id to be used for assignment (already incremented up from the last used id number used).
queue (str): The name of the queue to submit the jobs to. If None is given the current desi_proc default is used.
reservation (str): The reservation to submit jobs to. If None, it is not submitted to a reservation.
dry_run (int, optional): If nonzero, this is a simulated run. If dry_run=1 the scripts will be written or submitted. If
dry_run=2, the scripts will not be writter or submitted. Logging will remain the same
for testing as though scripts are being submitted. Default is 0 (false).
strictly_successful (bool, optional): Whether all jobs require all inputs to have succeeded. For daily processing, this is
less desirable because e.g. the sciences can run with SVN default calibrations rather
than failing completely from failed calibrations. Default is False.
check_for_outputs (bool, optional): Default is True. If True, the code checks for the existence of the expected final
data products for the script being submitted. If all files exist and this is True,
then the script will not be submitted. If some files exist and this is True, only the
subset of the cameras without the final data products will be generated and submitted.
resubmit_partial_complete (bool, optional): Default is True. Must be used with check_for_outputs=True. If this flag is True,
jobs with some prior data are pruned using PROCCAMWORD to only process the
remaining cameras not found to exist.
z_submit_types (list of str): The "group" types of redshifts that should be submitted with each
exposure. If not specified or None, then no redshifts are submitted.
system_name (str): batch system name, e.g. cori-haswell or perlmutter-gpu
Returns:
tuple: A tuple containing:
* ptable, Table. The same processing table as input except with added rows for the joint fit job.
* internal_id, int, the next internal id to be used for assignment (already incremented up from the last used id number used).
"""
log = get_logger()
if len(prows) < 1 or z_submit_types == None:
return ptable, internal_id
log.info(" ")
log.info(f"Running redshifts.\n")
## Now run redshifts
zprows = []
for row in prows:
if row['LASTSTEP'] == 'all':
zprows.append(row)
if len(zprows) > 0:
for zsubtype in z_submit_types:
if zsubtype == 'perexp':
for zprow in zprows:
log.info(" ")
log.info(f"Submitting redshift fit of type {zsubtype} for TILEID {zprow['TILEID']} and EXPID {zprow['EXPID']}.\n")
redshift_prow = make_redshift_prow([zprow], tnight, descriptor=zsubtype, internal_id=internal_id)
internal_id += 1
redshift_prow = create_and_submit(redshift_prow, queue=queue, reservation=reservation, joint=True, dry_run=dry_run,
strictly_successful=strictly_successful, check_for_outputs=check_for_outputs,
resubmit_partial_complete=resubmit_partial_complete, system_name=system_name)
ptable.add_row(redshift_prow)
else:
log.info(" ")
log.info(f"Submitting joint redshift fits of type {zsubtype} for TILEID {zprows[0]['TILEID']}.")
expids = [prow['EXPID'][0] for prow in zprows]
log.info(f"Expids: {expids}.\n")
redshift_prow = make_redshift_prow(zprows, tnight, descriptor=zsubtype, internal_id=internal_id)
internal_id += 1
redshift_prow = create_and_submit(redshift_prow, queue=queue, reservation=reservation, joint=True, dry_run=dry_run,
strictly_successful=strictly_successful, check_for_outputs=check_for_outputs,
resubmit_partial_complete=resubmit_partial_complete, system_name=system_name)
ptable.add_row(redshift_prow)
return ptable, internal_id
#########################################
######## Tilenight ##############
#########################################
[docs]def submit_tilenight(ptable, prows, calibjobs, internal_id, queue, reservation,
dry_run=0, strictly_successful=False, resubmit_partial_complete=True,
system_name=None, use_specter=False, extra_job_args=None):
"""
Given a set of prows, this generates a processing table row, creates a batch script, and submits the appropriate
tilenight job given by descriptor. The returned ptable has all of these rows added to the
table given as input.
Args:
ptable (Table): The processing table where each row is a processed job.
prows (list or array of dict): Unsubmitted prestdstar jobs that are first steps in tilenight.
calibjobs (dict): Dictionary containing 'nightlybias', 'ccdcalib', 'psfnight'
and 'nightlyflat'. Each key corresponds to a Table.Row or
None. The table.Row() values are for the corresponding
calibration job.
internal_id (int): the next internal id to be used for assignment (already incremented up from the last used id number used).
queue (str): The name of the queue to submit the jobs to. If None is given the current desi_proc default is used.
reservation (str): The reservation to submit jobs to. If None, it is not submitted to a reservation.
dry_run (int, optional): If nonzero, this is a simulated run. If dry_run=1 the scripts will be written or submitted. If
dry_run=2, the scripts will not be writter or submitted. Logging will remain the same
for testing as though scripts are being submitted. Default is 0 (false).
strictly_successful (bool, optional): Whether all jobs require all inputs to have succeeded. For daily processing, this is
less desirable because e.g. the sciences can run with SVN default calibrations rather
than failing completely from failed calibrations. Default is False.
resubmit_partial_complete (bool, optional): Default is True. Must be used with check_for_outputs=True. If this flag is True,
jobs with some prior data are pruned using PROCCAMWORD to only process the
remaining cameras not found to exist.
system_name (str): batch system name, e.g. cori-haswell or perlmutter-gpu
use_specter (bool, optional): Default is False. If True, use specter, otherwise use gpu_specter by default.
extra_job_args (dict): Dictionary with key-value pairs that specify additional
information used for a specific type of job. Examples include
laststeps for for tilenight, etc.
Returns:
tuple: A tuple containing:
* ptable, Table. The same processing table as input except with added rows for the joint fit job.
* tnight_prow, dict. Row of a processing table corresponding to the tilenight job.
* internal_id, int, the next internal id to be used for assignment (already incremented up from the last used id number used).
"""
log = get_logger()
if len(prows) < 1:
return ptable, None, internal_id
log.info(" ")
log.info(f"Running tilenight.\n")
tnight_prow = make_tnight_prow(prows, calibjobs, internal_id=internal_id)
internal_id += 1
tnight_prow = create_and_submit(tnight_prow, queue=queue, reservation=reservation, dry_run=dry_run,
strictly_successful=strictly_successful, check_for_outputs=False,
resubmit_partial_complete=resubmit_partial_complete, system_name=system_name,
use_specter=use_specter, extra_job_args=extra_job_args)
ptable.add_row(tnight_prow)
return ptable, tnight_prow, internal_id
## wrapper functions for joint fitting
[docs]def science_joint_fit(ptable, sciences, internal_id, queue='realtime', reservation=None,
z_submit_types=None, dry_run=0, strictly_successful=False,
check_for_outputs=True, resubmit_partial_complete=True,
system_name=None):
"""
Wrapper function for desiproc.workflow.processing.joint_fit specific to the stdstarfit joint fit and redshift fitting.
All variables are the same except::
Arg 'sciences' is mapped to the prows argument of joint_fit.
The joint_fit argument descriptor is pre-defined as 'science'.
"""
return joint_fit(ptable=ptable, prows=sciences, internal_id=internal_id, queue=queue, reservation=reservation,
descriptor='science', z_submit_types=z_submit_types, dry_run=dry_run,
strictly_successful=strictly_successful, check_for_outputs=check_for_outputs,
resubmit_partial_complete=resubmit_partial_complete, system_name=system_name)
[docs]def flat_joint_fit(ptable, flats, internal_id, queue='realtime',
reservation=None, dry_run=0, strictly_successful=False,
check_for_outputs=True, resubmit_partial_complete=True,
system_name=None):
"""
Wrapper function for desiproc.workflow.processing.joint_fit specific to the nightlyflat joint fit.
All variables are the same except::
Arg 'flats' is mapped to the prows argument of joint_fit.
The joint_fit argument descriptor is pre-defined as 'nightlyflat'.
"""
return joint_fit(ptable=ptable, prows=flats, internal_id=internal_id, queue=queue, reservation=reservation,
descriptor='nightlyflat', dry_run=dry_run, strictly_successful=strictly_successful,
check_for_outputs=check_for_outputs, resubmit_partial_complete=resubmit_partial_complete,
system_name=system_name)
[docs]def arc_joint_fit(ptable, arcs, internal_id, queue='realtime',
reservation=None, dry_run=0, strictly_successful=False,
check_for_outputs=True, resubmit_partial_complete=True,
system_name=None):
"""
Wrapper function for desiproc.workflow.processing.joint_fit specific to the psfnight joint fit.
All variables are the same except::
Arg 'arcs' is mapped to the prows argument of joint_fit.
The joint_fit argument descriptor is pre-defined as 'psfnight'.
"""
return joint_fit(ptable=ptable, prows=arcs, internal_id=internal_id, queue=queue, reservation=reservation,
descriptor='psfnight', dry_run=dry_run, strictly_successful=strictly_successful,
check_for_outputs=check_for_outputs, resubmit_partial_complete=resubmit_partial_complete,
system_name=system_name)
[docs]def make_joint_prow(prows, descriptor, internal_id):
"""
Given an input list or array of processing table rows and a descriptor, this creates a joint fit processing job row.
It starts by copying the first input row, overwrites relevant columns, and defines the new dependencies (based on the
input prows).
Args:
prows, list or array of dicts. The rows corresponding to the individual exposure jobs that are
inputs to the joint fit.
descriptor, str. Description of the joint fitting job. Can either be 'stdstarfit', 'psfnight', or 'nightlyflat'.
internal_id, int, the next internal id to be used for assignment (already incremented up from the last used id number used).
Returns:
dict: Row of a processing table corresponding to the joint fit job.
internal_id, int, the next internal id to be used for assignment (already incremented up from the last used id number used).
"""
first_row = table_row_to_dict(prows[0])
joint_prow = first_row.copy()
joint_prow['INTID'] = internal_id
internal_id += 1
joint_prow['JOBDESC'] = descriptor
joint_prow['LATEST_QID'] = -99
joint_prow['ALL_QIDS'] = np.ndarray(shape=0).astype(int)
joint_prow['SUBMIT_DATE'] = -99
joint_prow['STATUS'] = 'U'
joint_prow['SCRIPTNAME'] = ''
joint_prow['EXPID'] = np.array([currow['EXPID'][0] for currow in prows], dtype=int)
## Assign the PROCCAMWORD based on the descriptor and the input exposures
if descriptor == 'stdstarfit':
pcamwords = [prow['PROCCAMWORD'] for prow in prows]
joint_prow['PROCCAMWORD'] = camword_union(pcamwords,
full_spectros_only=True)
else:
## UPDATE 2024-04-24: badamps are now included in arc/flat joint fits,
## so grab all PROCCAMWORDs instead of filtering out BADAMP cameras
pcamwords = [prow['PROCCAMWORD'] for prow in prows]
## For flats we want any camera that exists in all 12 exposures
## For arcs we want any camera that exists in at least 3 exposures
if descriptor == 'nightlyflat':
joint_prow['PROCCAMWORD'] = camword_intersection(pcamwords,
full_spectros_only=False)
elif descriptor == 'psfnight':
## Count number of exposures each camera is present for
camcheck = {}
for camword in pcamwords:
for cam in decode_camword(camword):
if cam in camcheck:
camcheck[cam] += 1
else:
camcheck[cam] = 1
## if exists in 3 or more exposures, then include it
goodcams = []
for cam,camcount in camcheck.items():
if camcount >= 3:
goodcams.append(cam)
joint_prow['PROCCAMWORD'] = create_camword(goodcams)
joint_prow = assign_dependency(joint_prow, dependency=prows)
return joint_prow, internal_id
def make_exposure_prow(erow, int_id, calibjobs, jobdesc=None):
prow = erow_to_prow(erow)
prow['INTID'] = int_id
int_id += 1
if jobdesc is None:
prow['JOBDESC'] = prow['OBSTYPE']
else:
prow['JOBDESC'] = jobdesc
prow = define_and_assign_dependency(prow, calibjobs)
return prow, int_id
[docs]def make_tnight_prow(prows, calibjobs, internal_id):
"""
Given an input list or array of processing table rows and a descriptor, this creates a joint fit processing job row.
It starts by copying the first input row, overwrites relevant columns, and defines the new dependencies (based on the
input prows).
Args:
prows, list or array of dicts. Unsumbitted rows corresponding to the individual prestdstar jobs that are
the first steps of tilenight.
calibjobs, dict. Dictionary containing keys that each corresponds to a Table.Row or
None, with each table.Row() value corresponding to a calibration job
on which the tilenight job depends.
internal_id, int, the next internal id to be used for assignment (already incremented up from the last used id number used).
Returns:
dict: Row of a processing table corresponding to the tilenight job.
"""
first_row = table_row_to_dict(prows[0])
joint_prow = first_row.copy()
joint_prow['INTID'] = internal_id
joint_prow['JOBDESC'] = 'tilenight'
joint_prow['LATEST_QID'] = -99
joint_prow['ALL_QIDS'] = np.ndarray(shape=0).astype(int)
joint_prow['SUBMIT_DATE'] = -99
joint_prow['STATUS'] = 'U'
joint_prow['SCRIPTNAME'] = ''
joint_prow['EXPID'] = np.array([currow['EXPID'][0] for currow in prows], dtype=int)
joint_prow = define_and_assign_dependency(joint_prow,calibjobs,use_tilenight=True)
return joint_prow
[docs]def make_redshift_prow(prows, tnight, descriptor, internal_id):
"""
Given an input list or array of processing table rows and a descriptor, this creates a joint fit processing job row.
It starts by copying the first input row, overwrites relevant columns, and defines the new dependencies (based on the
input prows).
Args:
prows, list or array of dicts. Unsumbitted rows corresponding to the individual prestdstar jobs that are
the first steps of tilenight.
tnight, Table.Row object. Row corresponding to the tilenight job on which the redshift job depends.
internal_id, int, the next internal id to be used for assignment (already incremented up from the last used id number used).
Returns:
dict: Row of a processing table corresponding to the tilenight job.
"""
first_row = table_row_to_dict(prows[0])
redshift_prow = first_row.copy()
redshift_prow['INTID'] = internal_id
redshift_prow['JOBDESC'] = descriptor
redshift_prow['LATEST_QID'] = -99
redshift_prow['ALL_QIDS'] = np.ndarray(shape=0).astype(int)
redshift_prow['SUBMIT_DATE'] = -99
redshift_prow['STATUS'] = 'U'
redshift_prow['SCRIPTNAME'] = ''
redshift_prow['EXPID'] = np.array([currow['EXPID'][0] for currow in prows], dtype=int)
redshift_prow = assign_dependency(redshift_prow,dependency=tnight)
return redshift_prow
[docs]def checkfor_and_submit_joint_job(ptable, arcs, flats, sciences, calibjobs,
lasttype, internal_id, z_submit_types=None, dry_run=0,
queue='realtime', reservation=None, strictly_successful=False,
check_for_outputs=True, resubmit_partial_complete=True,
system_name=None):
"""
Takes all the state-ful data from daily processing and determines whether a joint fit needs to be submitted. Places
the decision criteria into a single function for easier maintainability over time. These are separate from the
new standard manifest*.json method of indicating a calibration sequence is complete. That is checked independently
elsewhere and doesn't interact with this.
Args:
ptable (Table): Processing table of all exposures that have been processed.
arcs (list of dict): list of the individual arc jobs to be used for the psfnight (NOT all
the arcs, if multiple sets existed). May be empty if none identified yet.
flats (list of dict): list of the individual flat jobs to be used for the nightlyflat (NOT
all the flats, if multiple sets existed). May be empty if none identified yet.
sciences (list of dict): list of the most recent individual prestdstar science exposures
(if currently processing that tile). May be empty if none identified yet.
calibjobs (dict): Dictionary containing 'nightlybias', 'ccdcalib', 'psfnight'
and 'nightlyflat'. Each key corresponds to a Table.Row or
None. The table.Row() values are for the corresponding
calibration job.
lasttype (str or None): the obstype of the last individual exposure row to be processed.
internal_id (int): an internal identifier unique to each job. Increments with each new job. This
is the smallest unassigned value.
z_submit_types (list of str): The "group" types of redshifts that should be submitted with each
exposure. If not specified or None, then no redshifts are submitted.
dry_run (int, optional): If nonzero, this is a simulated run. If dry_run=1 the scripts will be written or submitted. If
dry_run=2, the scripts will not be writter or submitted. Logging will remain the same
for testing as though scripts are being submitted. Default is 0 (false).
queue (str, optional): The name of the queue to submit the jobs to. If None is given the current desi_proc default is used.
reservation (str, optional): The reservation to submit jobs to. If None, it is not submitted to a reservation.
strictly_successful (bool, optional): Whether all jobs require all inputs to have succeeded. For daily processing, this is
less desirable because e.g. the sciences can run with SVN default calibrations rather
than failing completely from failed calibrations. Default is False.
check_for_outputs (bool, optional): Default is True. If True, the code checks for the existence of the expected final
data products for the script being submitted. If all files exist and this is True,
then the script will not be submitted. If some files exist and this is True, only the
subset of the cameras without the final data products will be generated and submitted.
resubmit_partial_complete (bool, optional): Default is True. Must be used with check_for_outputs=True. If this flag is True,
jobs with some prior data are pruned using PROCCAMWORD to only process the
remaining cameras not found to exist.
system_name (str): batch system name, e.g. cori-haswell, cori-knl, permutter-gpu
Returns:
tuple: A tuple containing:
* ptable, Table, Processing table of all exposures that have been processed.
* calibjobs, dict. Dictionary containing 'nightlybias', 'ccdcalib', 'psfnight'
and 'nightlyflat'. Each key corresponds to a Table.Row or
None. The table.Row() values are for the corresponding
calibration job.
* sciences, list of dicts, list of the most recent individual prestdstar science exposures
(if currently processing that tile). May be empty if none identified yet or
we just submitted them for processing.
* internal_id, int, if no job is submitted, this is the same as the input, otherwise it is incremented upward from
from the input such that it represents the smallest unused ID.
"""
if lasttype == 'science' and len(sciences) > 0:
log = get_logger()
skysubonly = np.array([sci['LASTSTEP'] == 'skysub' for sci in sciences])
if np.all(skysubonly):
log.error("Identified all exposures in joint fitting request as skysub-only. Not submitting")
sciences = []
return ptable, calibjobs, sciences, internal_id
if np.any(skysubonly):
log.error("Identified skysub-only exposures in joint fitting request")
log.info("Expid's: {}".format([row['EXPID'] for row in sciences]))
log.info("LASTSTEP's: {}".format([row['LASTSTEP'] for row in sciences]))
sciences = (np.array(sciences,dtype=object)[~skysubonly]).tolist()
log.info("Removed skysub only exposures in joint fitting:")
log.info("Expid's: {}".format([row['EXPID'] for row in sciences]))
log.info("LASTSTEP's: {}".format([row['LASTSTEP'] for row in sciences]))
from collections import Counter
tiles = np.array([sci['TILEID'] for sci in sciences])
counts = Counter(tiles)
if len(counts.most_common()) > 1:
log.error("Identified more than one tile in a joint fitting request")
log.info("Expid's: {}".format([row['EXPID'] for row in sciences]))
log.info("Tileid's: {}".format(tiles))
log.info("Returning without joint fitting any of these exposures.")
# most_common, nmost_common = counts.most_common()[0]
# if most_common == -99:
# most_common, nmost_common = counts.most_common()[1]
# log.warning(f"Given multiple tiles to jointly fit: {counts}. "+
# "Only processing the most common non-default " +
# f"tile: {most_common} with {nmost_common} exposures")
# sciences = (np.array(sciences,dtype=object)[tiles == most_common]).tolist()
# log.info("Tiles and exposure id's being submitted for joint fitting:")
# log.info("Expid's: {}".format([row['EXPID'] for row in sciences]))
# log.info("Tileid's: {}".format([row['TILEID'] for row in sciences]))
sciences = []
return ptable, calibjobs, sciences, internal_id
ptable, tilejob, internal_id = science_joint_fit(ptable, sciences, internal_id, z_submit_types=z_submit_types,
dry_run=dry_run, queue=queue, reservation=reservation,
strictly_successful=strictly_successful,
check_for_outputs=check_for_outputs,
resubmit_partial_complete=resubmit_partial_complete,
system_name=system_name)
if tilejob is not None:
sciences = []
elif lasttype == 'flat' and calibjobs['nightlyflat'] is None and len(flats) == 12:
## Note here we have an assumption about the number of expected flats being greater than 11
ptable, calibjobs['nightlyflat'], internal_id \
= flat_joint_fit(ptable, flats, internal_id, dry_run=dry_run, queue=queue,
reservation=reservation, strictly_successful=strictly_successful,
check_for_outputs=check_for_outputs,
resubmit_partial_complete=resubmit_partial_complete,
system_name=system_name
)
elif lasttype == 'arc' and calibjobs['psfnight'] is None and len(arcs) == 5:
## Note here we have an assumption about the number of expected arcs being greater than 4
ptable, calibjobs['psfnight'], internal_id \
= arc_joint_fit(ptable, arcs, internal_id, dry_run=dry_run, queue=queue,
reservation=reservation, strictly_successful=strictly_successful,
check_for_outputs=check_for_outputs,
resubmit_partial_complete=resubmit_partial_complete,
system_name=system_name
)
return ptable, calibjobs, sciences, internal_id
[docs]def submit_tilenight_and_redshifts(ptable, sciences, calibjobs, internal_id, dry_run=0,
queue='realtime', reservation=None, strictly_successful=False,
check_for_outputs=True, resubmit_partial_complete=True,
system_name=None,use_specter=False, extra_job_args=None):
"""
Takes all the state-ful data from daily processing and determines whether a tilenight job needs to be submitted.
Args:
ptable (Table): Processing table of all exposures that have been processed.
sciences (list of dict): list of the most recent individual prestdstar science exposures
(if currently processing that tile). May be empty if none identified yet.
internal_id (int): an internal identifier unique to each job. Increments with each new job. This
is the smallest unassigned value.
dry_run (int, optional): If nonzero, this is a simulated run. If dry_run=1 the scripts will be written or submitted. If
dry_run=2, the scripts will not be writter or submitted. Logging will remain the same
for testing as though scripts are being submitted. Default is 0 (false).
queue (str, optional): The name of the queue to submit the jobs to. If None is given the current desi_proc default is used.
reservation (str, optional): The reservation to submit jobs to. If None, it is not submitted to a reservation.
strictly_successful (bool, optional): Whether all jobs require all inputs to have succeeded. For daily processing, this is
less desirable because e.g. the sciences can run with SVN default calibrations rather
than failing completely from failed calibrations. Default is False.
check_for_outputs (bool, optional): Default is True. If True, the code checks for the existence of the expected final
data products for the script being submitted. If all files exist and this is True,
then the script will not be submitted. If some files exist and this is True, only the
subset of the cameras without the final data products will be generated and submitted.
resubmit_partial_complete (bool, optional): Default is True. Must be used with check_for_outputs=True. If this flag is True,
jobs with some prior data are pruned using PROCCAMWORD to only process the
remaining cameras not found to exist.
system_name (str): batch system name, e.g. cori-haswell, cori-knl, permutter-gpu
use_specter (bool, optional): Default is False. If True, use specter, otherwise use gpu_specter by default.
extra_job_args (dict, optional): Dictionary with key-value pairs that specify additional
information used for a specific type of job. Examples include
laststeps for tilenight, z_submit_types for redshifts, etc.
Returns:
tuple: A tuple containing:
* ptable, Table, Processing table of all exposures that have been processed.
* sciences, list of dicts, list of the most recent individual prestdstar science exposures
(if currently processing that tile). May be empty if none identified yet or
we just submitted them for processing.
* internal_id, int, if no job is submitted, this is the same as the input, otherwise it is incremented upward from
from the input such that it represents the smallest unused ID.
"""
ptable, tnight, internal_id = submit_tilenight(ptable, sciences, calibjobs, internal_id,
queue=queue, reservation=reservation,
dry_run=dry_run, strictly_successful=strictly_successful,
resubmit_partial_complete=resubmit_partial_complete,
system_name=system_name,use_specter=use_specter,
extra_job_args=extra_job_args)
z_submit_types = None
if 'z_submit_types' in extra_job_args:
z_submit_types = extra_job_args['z_submit_types']
ptable, internal_id = submit_redshifts(ptable, sciences, tnight, internal_id,
queue=queue, reservation=reservation,
dry_run=dry_run, strictly_successful=strictly_successful,
check_for_outputs=check_for_outputs,
resubmit_partial_complete=resubmit_partial_complete,
z_submit_types=z_submit_types,
system_name=system_name)
if tnight is not None:
sciences = []
return ptable, sciences, internal_id
[docs]def set_calibrator_flag(prows, ptable):
"""
Sets the "CALIBRATOR" column of a procesing table row to 1 (integer representation of True)
for all input rows. Used within joint fitting code to flag the exposures that were input
to the psfnight or nightlyflat for later reference.
Args:
prows, list or array of Table.Rows or dicts. The rows corresponding to the individual exposure jobs that are
inputs to the joint fit.
ptable, Table. The processing table where each row is a processed job.
Returns:
Table: The same processing table as input except with added rows for the joint fit job and, in the case
of a stdstarfit, the poststdstar science exposure jobs.
"""
for prow in prows:
ptable['CALIBRATOR'][ptable['INTID'] == prow['INTID']] = 1
return ptable