Source code for desispec.pipeline.tasks.spectra

#
# See top-level LICENSE.rst file for Copyright information
#
# -*- coding: utf-8 -*-
"""
desispec.pipeline.tasks.spectra
===============================

"""

from __future__ import absolute_import, division, print_function

import numpy as np

from ..defs import (task_name_sep, task_state_to_int, task_int_to_state)

from ...util import option_list

from ...io import findfile

from .base import (BaseTask, task_classes)

from desiutil.log import get_logger

import os

# NOTE: only one class in this file should have a name that starts with "Task".

[docs]class TaskSpectra(BaseTask): """Class containing the properties of one spectra task. """ def __init__(self): super(TaskSpectra, self).__init__() # then put int the specifics of this class # _cols must have a state self._type = "spectra" self._cols = [ "nside", "pixel", "state" ] self._coltypes = [ "integer", "integer", "integer" ] # _name_fields must also be in _cols self._name_fields = ["nside","pixel"] self._name_formats = ["d","d"]
[docs] def _paths(self, name): """See BaseTask.paths. """ props = self.name_split(name) return [ findfile("spectra", night=None, expid=None, camera=None, groupname=props["pixel"], nside=props["nside"], band=None, spectrograph=None) ]
[docs] def _deps(self, name, db, inputs): """See BaseTask.deps. """ return dict()
def _run_max_procs(self): # This is a serial task. return 1 def _run_time(self, name, procs, db): # Run time on one proc on machine with scale factor == 1.0. # Get the list of frames and use the size of this list as # a proxy for determining the runtime. The run time is dominated by # I/O. if db is not None: props = self.name_split(name) entries = db.select_healpix_frame({"pixel":props["pixel"],"nside":props["nside"]}) nentry = len(entries) tm = 10.0 + 1.0 * (nentry / 50.0) else: tm = 60.0 return tm def _run_max_mem_proc(self, name, db): # Per-process memory requirements mem = 0.0 if db is not None: # Get the list of frames. The frame files touching this pixel will # be cached in RAM. props = self.name_split(name) entries = db.select_healpix_frame({"pixel":props["pixel"],"nside":props["nside"]}) nframe = len(entries) # Each frame is about 90MB mem = 0.090 * nframe return mem
[docs] def _run_defaults(self): """See BaseTask.run_defaults. """ return {}
def _option_list(self, name, opts, db): # we do need db access for spectra if db is None : log = get_logger() log.error("we do need db access for spectra") raise RuntimeError("we do need db access for spectra") from .base import task_classes, task_type # get pixel props = self.name_split(name) # get list of exposures and spectrographs by selecting entries in the # healpix_frame table with state = 1, which means that there is a new # cframe intersecting the pixel entries = db.select_healpix_frame({"pixel":props["pixel"],"nside":props["nside"],"state":1}) # now select cframe with same props cframes = [] for entry in entries : for band in ["b","r","z"] : entry_and_band = entry.copy() entry_and_band["band"] = band # this will match cframes with same expid and spectro taskname = task_classes["cframe"].name_join(entry_and_band) filename = task_classes["cframe"].paths(taskname)[0] cframes.append(filename) options = {} options["infiles"] = cframes options["outfile"] = self.paths(name)[0] options["healpix"] = props["pixel"] options["nside"] = props["nside"] return option_list(options)
[docs] def _run_cli(self, name, opts, procs, db): """See BaseTask.run_cli. """ entry = "desi_update_spectra" optlist = self._option_list(name, opts, db) return "{} {}".format(entry, " ".join(optlist))
[docs] def _run(self, name, opts, comm, db): """See BaseTask.run. """ from ...scripts import update_spectra optlist = self._option_list(name, opts, db) args = update_spectra.parse(optlist) update_spectra.main(args) return
[docs] def run_and_update(self, db, name, opts, comm=None): """Run the task and update DB state. The state of the task is marked as "done" if the command completes without raising an exception and if the output files exist. It is specific for spectra because the healpix_frame table has to be updated Args: db (pipeline.db.DB): The database. name (str): the name of this task. opts (dict): options to use for this task. comm (mpi4py.MPI.Comm): optional MPI communicator. Returns: int: the number of processes that failed. """ nproc = 1 rank = 0 if comm is not None: nproc = comm.size rank = comm.rank failed = self.run(name, opts, comm=comm, db=db) if rank == 0: if failed > 0: self.state_set(db, name, "failed") else: outputs = self.paths(name) done = True for out in outputs: if not os.path.isfile(out): done = False failed = nproc break if done: props=self.name_split(name) props["state"]=1 # selection, only those for which we had a cframe with db.cursor() as cur : self.state_set(db, name, "done",cur=cur) # 2=spectra has been updated db.update_healpix_frame_state(props,state=2,cur=cur) # directly set the corresponding redshift to ready cur.execute( 'update redshift set state={} where nside = {} and pixel = {}' .format( task_state_to_int["ready"], props["nside"], props["pixel"] ) ) # post processing is now done by a single rank in # run.run_task_list else: self.state_set(db, name, "failed") return failed