Source code for desispec.pipeline.tasks.cframe

#
# See top-level LICENSE.rst file for Copyright information
#
# -*- coding: utf-8 -*-
"""
desispec.pipeline.tasks.cframe
==============================

"""

from __future__ import absolute_import, division, print_function

from collections import OrderedDict

from ..defs import (task_name_sep, task_state_to_int, task_int_to_state)

from ...util import option_list

from ...io import findfile

from .base import (BaseTask, task_classes)

from desiutil.log import get_logger

import sys,re,os,copy

# NOTE: only one class in this file should have a name that starts with "Task".

[docs]class TaskCFrame(BaseTask): """Class containing the properties of a sky fit task. """ def __init__(self): super(TaskCFrame, self).__init__() # then put int the specifics of this class # _cols must have a state self._type = "cframe" self._cols = [ "night", "band", "spec", "expid", "state" ] self._coltypes = [ "integer", "text", "integer", "integer", "integer" ] # _name_fields must also be in _cols self._name_fields = ["night","band","spec","expid"] self._name_formats = ["08d","s","d","08d"]
[docs] def _paths(self, name): """See BaseTask.paths. """ props = self.name_split(name) camera = "{}{}".format(props["band"], props["spec"]) return [ findfile("cframe", night=props["night"], expid=props["expid"], camera=camera, groupname=None, nside=None, band=props["band"], spectrograph=props["spec"]) ]
[docs] def _deps(self, name, db, inputs): """See BaseTask.deps. """ from .base import task_classes props = self.name_split(name) deptasks = { "infile" : task_classes["extract"].name_join(props), "fiberflat" : task_classes["fiberflatnight"].name_join(props), "sky" : task_classes["sky"].name_join(props), "calib" : task_classes["fluxcalib"].name_join(props) } return deptasks
[docs] def _run_max_procs(self): """See BaseTask.run_max_procs. """ return 1
[docs] def _run_time(self, name, procs, db): """See BaseTask.run_time. """ return 2
[docs] def _run_defaults(self): """See BaseTask.run_defaults. """ opts = {} #opts["sky-throughput-correction"] = True return opts
[docs] def _option_list(self, name, opts): """Build the full list of options. This includes appending the filenames and incorporating runtime options. """ from .base import task_classes, task_type deps = self.deps(name) options = {} options["infile"] = task_classes["extract"].paths(deps["infile"])[0] options["fiberflat"] = task_classes["fiberflatnight"].paths(deps["fiberflat"])[0] options["sky"] = task_classes["sky"].paths(deps["sky"])[0] options["calib"] = task_classes["fluxcalib"].paths(deps["calib"])[0] options["outfile"] = self.paths(name)[0] options.update(opts) return option_list(options)
[docs] def _run_cli(self, name, opts, procs, db): """See BaseTask.run_cli. """ entry = "desi_process_exposure" optlist = self._option_list(name, opts) com = "{} {}".format(entry, " ".join(optlist)) return com
[docs] def _run(self, name, opts, comm, db): """See BaseTask.run. """ from ...scripts import procexp optlist = self._option_list(name, opts) args = procexp.parse(optlist) procexp.main(args) return
[docs] def postprocessing(self, db, name, cur): """For successful runs, postprocessing on DB""" props=self.name_split(name) props["state"]=0 # selection db.update_healpix_frame_state(props,state=1,cur=cur) # 1=has a cframe log = get_logger() # call getready on all spectra ... might be super inefficient tt="spectra" required_healpix_frame_state = 1 # means we have a cframe cur.execute('select nside,pixel from healpix_frame where state = {} and expid = {} and spec = {}'.format(required_healpix_frame_state,props["expid"],props["spec"])) entries = cur.fetchall() log.debug("from {} set spectra to ready : {}".format(name,entries)) for entry in entries : cur.execute('update {} set state = {} where nside = {} and pixel = {}'.format(tt,task_state_to_int["ready"],entry[0],entry[1]))