#
# See top-level LICENSE.rst file for Copyright information
#
# -*- coding: utf-8 -*-
"""
desispec.pipeline.tasks.redshift
================================
"""
from __future__ import absolute_import, division, print_function
import numpy as np
from .base import BaseTask, task_classes, task_type
from ...io import findfile
from ...util import option_list
from redrock.external.desi import rrdesi
from desiutil.log import get_logger
import os
# NOTE: only one class in this file should have a name that starts with "Task".
[docs]class TaskRedshift(BaseTask):
"""Class containing the properties of one spectra task.
"""
def __init__(self):
super(TaskRedshift, self).__init__()
# then put int the specifics of this class
# _cols must have a state
self._type = "redshift"
self._cols = [
"nside",
"pixel",
"state"
]
self._coltypes = [
"integer",
"integer",
"integer"
]
# _name_fields must also be in _cols
self._name_fields = ["nside","pixel"]
self._name_formats = ["d","d"]
[docs] def _paths(self, name):
"""See BaseTask.paths.
"""
props = self.name_split(name)
hpix = props["pixel"]
nside = props["nside"]
redrock = findfile("redrock", groupname=hpix, nside=nside)
rrdetails = findfile("rrdetails", groupname=hpix, nside=nside)
return [redrock, rrdetails]
[docs] def _deps(self, name, db, inputs):
"""See BaseTask.deps.
"""
props = self.name_split(name)
deptasks = {
"infile" : task_classes["spectra"].name_join(props)
}
return deptasks
def _run_max_procs(self):
# Redshifts can run on any number of procs.
return 0
def _run_time(self, name, procs, db):
# Run time on one task on machine with scale factor == 1.0.
# This should depend on the total number of unique targets, which is
# not known a priori. Instead, we compute the total targets and reduce
# this by some factor.
if db is not None:
props = self.name_split(name)
entries = db.select_healpix_frame(
{"pixel":props["pixel"],
"nside":props["nside"]}
)
ntarget = np.sum([x["ntargets"] for x in entries])
neff = 0.3 * ntarget
# 2.5 seconds per targets
tm = 1 + 2.5 * 0.0167 * neff
else:
tm = 60
return tm
def _run_max_mem_proc(self, name, db):
# Per-process memory requirements. This is determined by the largest
# Spectra file that must be read and broadcast. We compute that size
# assuming no coadd and using the total number of targets falling in
# our pixel.
mem = 0.0
if db is not None:
props = self.name_split(name)
entries = db.select_healpix_frame(
{"pixel":props["pixel"],
"nside":props["nside"]}
)
ntarget = np.sum([x["ntargets"] for x in entries])
# DB entry is for one exposure and spectrograph.
mem = 0.2 + 0.0002 * 3 * ntarget
return mem
def _run_max_mem_task(self, name, db):
# This returns the total aggregate memory needed for the task,
# which should be based on the larger of:
# 1) the total number of unique (coadded) targets.
# 2) the largest spectra file times the number of processes
# Since it is not easy to calculate (1), and the constraint for (2)
# is already encapsulated in the per-process memory requirements,
# we return zero here. This effectively selects one node.
mem = 0.0
return mem
[docs] def _run_defaults(self):
"""See BaseTask.run_defaults.
"""
return {'no-mpi-abort': True}
[docs] def _option_list(self, name, opts):
"""Build the full list of options.
This includes appending the filenames and incorporating runtime
options.
"""
redrockfile, rrdetailsfile = self.paths(name)
outdir = os.path.dirname(redrockfile)
options = {}
options["details"] = rrdetailsfile
options["outfile"] = redrockfile
options.update(opts)
optarray = option_list(options)
deps = self.deps(name)
specfile = task_classes["spectra"].paths(deps["infile"])[0]
optarray.append(specfile)
return optarray
[docs] def _run_cli(self, name, opts, procs, db):
"""See BaseTask.run_cli.
"""
entry = "rrdesi_mpi"
optlist = self._option_list(name, opts)
return "{} {}".format(entry, " ".join(optlist))
[docs] def _run(self, name, opts, comm, db):
"""See BaseTask.run.
"""
optlist = self._option_list(name, opts)
rrdesi(options=optlist, comm=comm)
return
[docs] def run_and_update(self, db, name, opts, comm=None):
"""Run the task and update DB state.
The state of the task is marked as "done" if the command completes
without raising an exception and if the output files exist.
It is specific for redshift because the healpix_frame table has to be updated
Args:
db (pipeline.db.DB): The database.
name (str): the name of this task.
opts (dict): options to use for this task.
comm (mpi4py.MPI.Comm): optional MPI communicator.
Returns:
int: the number of processes that failed.
"""
nproc = 1
rank = 0
if comm is not None:
nproc = comm.size
rank = comm.rank
failed = self.run(name, opts, comm=comm, db=db)
if rank == 0:
if failed > 0:
self.state_set(db, name, "failed")
else:
outputs = self.paths(name)
done = True
for out in outputs:
if not os.path.isfile(out):
done = False
failed = nproc
break
if done:
props=self.name_split(name)
props["state"]=2 # selection, only those for which we had already updated the spectra
with db.cursor() as cur :
self.state_set(db, name, "done",cur=cur)
db.update_healpix_frame_state(props,state=3,cur=cur) # 3=redshifts have been updated
else:
self.state_set(db, name, "failed")
return failed