"""
desispec.workflow.science_selection
===================================
"""
import sys, os, glob
import json
from astropy.io import fits
from astropy.table import Table, join
import numpy as np
import time, datetime
from collections import OrderedDict
import subprocess
from copy import deepcopy
from desispec.scripts.tile_redshifts import generate_tile_redshift_scripts
from desispec.workflow.redshifts import get_ztile_script_pathname, \
get_ztile_relpath, \
get_ztile_script_suffix, read_minimal_exptables_columns
from desispec.workflow.queue import get_resubmission_states, update_from_queue, queue_info_from_qids
from desispec.workflow.timing import what_night_is_it
from desispec.workflow.desi_proc_funcs import get_desi_proc_batch_file_pathname, \
create_desi_proc_batch_script, \
get_desi_proc_batch_file_path, \
get_desi_proc_tilenight_batch_file_pathname, \
create_desi_proc_tilenight_batch_script
from desispec.workflow.utils import pathjoin, sleep_and_report
from desispec.workflow.tableio import write_table
from desispec.workflow.proctable import table_row_to_dict
from desiutil.log import get_logger
from desispec.io import findfile, specprod_root
from desispec.io.util import decode_camword, create_camword, \
difference_camwords, \
camword_to_spectros, camword_union, camword_intersection, parse_badamps
#################################################
############## Misc Functions ###################
#################################################
import numpy as np
from astropy.table import Table, vstack
[docs]def determine_science_to_proc(etable, tiles, surveys, laststeps,
processed_tiles=None,
all_tiles=True,
ignore_last_tile=False,
complete_tiles_thrunight=None,
specstatus_path=None):
"""
Selects the science exposures that should be processed from a populated
exposure table given the details and flags given as inputs.
Args:
etable (astropy.table.Table): A DESI exposure_table
tiles (array-like, optional): Only submit jobs for these TILEIDs.
surveys (array-like, optional): Only submit science jobs for these
surveys (lowercase)
laststeps (array-like, optional): Only submit jobs for exposures with
LASTSTEP in these science_laststeps (lowercase)
processed_tiles (array-like, optional): TILEIDs that have already
been processed
all_tiles (bool, optional): Default is True. Set to NOT restrict to
completed tiles as defined by the table pointed to by specstatus_path.
ignore_last_tile (bool): Default is False. Whether to ignore the last
observed tile. Generally used with daily operations when expecting
more data to come in the future.
complete_tiles_thrunight (int, optional): Default is None. Only
tiles completed on or before the supplied YYYYMMDD are considered
completed and will be processed. All complete tiles are submitted
if None or all_tiles is True.
specstatus_path (str, optional): Location of the surveyops specstatus
table.Default is $DESI_SURVEYOPS/ops/tiles-specstatus.ecsv.
Returns:
astropy.table.Table: A DESI exposure_table only containing the science
exposures that should be processed.
list: A list of the tiles that should be processed, in the order they
first appear in the input exposure_table.
"""
log = get_logger()
## divide into calibration and science etables
full_etable = etable.copy()
sci_etable = etable[etable['OBSTYPE'] == 'science']
## Cut on exposure time
if len(sci_etable) > 0:
sci_etable = sci_etable[sci_etable['EXPTIME'] >= 60]
## Remove any exposure related to the last tile when in daily mode
## and during the nightly processing
if ignore_last_tile and len(sci_etable) > 0:
last_ind = np.argmax(full_etable['EXPID'])
if full_etable['OBSTYPE'][last_ind] == 'science':
last_tile = full_etable['TILEID'][last_ind]
log.info(f"Ignoring exposures associated with tile {last_tile} since it"
+ f" was the last exposure observed and {ignore_last_tile=}")
sci_etable = sci_etable[sci_etable['TILEID'] != last_tile]
## Cut on LASTSTEP
if len(sci_etable) > 0:
good_exps = np.isin(np.array(sci_etable['LASTSTEP']).astype(str), laststeps)
sci_etable = sci_etable[good_exps]
## Identify tiles that have already been processed and remove them
if len(sci_etable) > 0:
keep = np.bitwise_not(np.isin(sci_etable['TILEID'], processed_tiles))
sci_etable = sci_etable[keep]
## filter by TILEID if requested
if tiles is not None and len(sci_etable) > 0:
log.info(f'Filtering by tiles={tiles}')
keep = np.isin(sci_etable['TILEID'], tiles)
sci_etable = sci_etable[keep]
## filter by SURVEY if requested
if surveys is not None and len(sci_etable) > 0:
log.info(f'Filtering by surveys={surveys}')
if 'SURVEY' not in etable.dtype.names:
raise ValueError(f'surveys={surveys} filter requested, but no '
+ f'SURVEY column in exposure_table')
keep = np.zero(len(sci_etable), dtype=bool)
# np.isin doesn't work with bytes vs. str from Tables but direct
# comparison does, so loop
for survey in surveys:
keep |= sci_etable['SURVEY'] == survey
sci_etable = sci_etable[keep]
## If asked to do so, only process tiles deemed complete by the specstatus file
if not all_tiles and len(sci_etable) > 0 and complete_tiles_thrunight is not None:
all_completed_tiles = get_completed_tiles(specstatus_path,
complete_tiles_thrunight=complete_tiles_thrunight)
keep = np.isin(sci_etable['TILEID'], all_completed_tiles)
sci_tiles = np.unique(sci_etable['TILEID'][keep])
log.info(f"Processing completed science tiles: "
+ f"{', '.join(sci_tiles.astype(str))}")
log.info(f"Filtering by completed tiles retained "
+ f"{len(sci_tiles)}/{sum(np.unique(sci_etable['TILEID'])>0)} science tiles")
log.info(f"Filtering by completed tiles retained "
+ f"{sum(keep)}/{sum(sci_etable['TILEID']>0)} science exposures")
sci_etable = sci_etable[keep]
## Identify tiles to be processed, in chronological order
tiles_to_proc = []
for tile in sci_etable['TILEID']:
if tile in tiles_to_proc:
continue
else:
tiles_to_proc.append(tile)
return sci_etable, tiles_to_proc
[docs]def get_completed_tiles(specstatus_path=None, complete_tiles_thrunight=None):
"""
Uses a tiles-specstatus.ecsv file and selection criteria to determine
what tiles have beeen completed. Takes an optional argument to point
to a custom specstatus file. Returns an array of TILEID's.
Args:
specstatus_path, str, optional. Location of the surveyops specstatus
table. Default is $DESI_SURVEYOPS/ops/tiles-specstatus.ecsv.
complete_tiles_thrunight, int, optional. Default is None. Only
tiles completed on or before the supplied YYYYMMDD are considered
completed and will be processed. All complete
tiles are submitted if None.
Returns:
array-like. The tiles from the specstatus file determined by the
selection criteria to be completed.
"""
log = get_logger()
if specstatus_path is None:
if 'DESI_SURVEYOPS' not in os.environ:
raise ValueError("DESI_SURVEYOPS is not defined in your environment. " +
"You must set it or specify --specstatus-path explicitly.")
specstatus_path = os.path.join(os.environ['DESI_SURVEYOPS'], 'ops',
'tiles-specstatus.ecsv')
log.info(f"specstatus_path not defined, setting default to {specstatus_path}.")
if not os.path.exists(specstatus_path):
raise IOError(f"Couldn't find {specstatus_path}.")
specstatus = Table.read(specstatus_path)
## good tile selection
iszdone = (specstatus['ZDONE'] == 'true')
isnotmain = (specstatus['SURVEY'] != 'main')
enoughfraction = 0.1 # 10% rather than specstatus['MINTFRAC']
isenoughtime = (specstatus['EFFTIME_SPEC'] >
specstatus['GOALTIME'] * enoughfraction)
## only take the approved QA tiles in main
goodtiles = iszdone
## not all special and cmx/SV tiles have zdone set, so also pass those with enough time
goodtiles |= (isenoughtime & isnotmain)
## main backup also don't have zdone set, so also pass those with enough time
goodtiles |= (isenoughtime & (specstatus['FAPRGRM'] == 'backup'))
if complete_tiles_thrunight is not None:
goodtiles &= (specstatus['LASTNIGHT'] <= complete_tiles_thrunight)
return np.array(specstatus['TILEID'][goodtiles])
[docs]def get_tiles_cumulative(sci_etable, z_submit_types, all_cumulatives, night):
"""
Takes an exposure table, list of redshift types to submit, and a boolean
defining whether to return all cumulatives or not, and returns the list
of tiles for which cumulative redshifts should be performed based on whether
it is the last known night in which that tiles was observed.
Args:
sci_etable, Table. An exposure table with column TILEID.
z_submit_types, list or None. List of strings identifying the
redshift types to run.
all_cumulatives, bool. If True all tile id's in the sci_etable are
returned, otherwise only those who were observed last on the given
night are returned for cumulative redshifts
night, int. The night in question, in YYYYMMDD format.
Returns:
tiles_cumulative, list. List of tile id's that should get cumulative
redshifts.
"""
log = get_logger()
tiles_cumulative = list()
if z_submit_types is not None and 'cumulative' in z_submit_types:
tiles_this_night = np.unique(np.asarray(sci_etable['TILEID']))
# select only science tiles, not calibs
tiles_this_night = tiles_this_night[tiles_this_night > 0]
if all_cumulatives:
tiles_cumulative = list(tiles_this_night)
log.info(f'Submitting cumulative redshifts for all tiles: {tiles_cumulative}')
else:
allexp = read_minimal_exptables_columns(tileids=tiles_this_night)
for tileid in tiles_this_night:
nights_with_tile = allexp['NIGHT'][allexp['TILEID'] == tileid]
if len(nights_with_tile) > 0 and night == np.max(nights_with_tile):
tiles_cumulative.append(tileid)
log.info(f'Submitting cumulative redshifts for {len(tiles_cumulative)}'
+ f'/{len(tiles_this_night)} tiles for '
+ f'which {night} is the last night: {tiles_cumulative}')
return tiles_cumulative