Source code for desispec.pipeline.control

#
# See top-level LICENSE.rst file for Copyright information
#
# -*- coding: utf-8 -*-
"""
desispec.pipeline.control
=========================

Tools for controling pipeline production.
"""

from __future__ import absolute_import, division, print_function

import os
import sys
import re
import time

from collections import OrderedDict

import numpy as np

from desiutil.log import get_logger

from .. import io

from ..parallel import (dist_uniform, dist_discrete, dist_discrete_all,
    stdouterr_redirected)

from .defs import (task_states, prod_options_name,
    task_state_to_int, task_int_to_state)

from . import prod as pipeprod
from . import db as pipedb
from . import run as piperun
from . import tasks as pipetasks
from . import scriptgen as scriptgen


class clr:
    HEADER = "\033[95m"
    OKBLUE = "\033[94m"
    OKGREEN = "\033[92m"
    WARNING = "\033[93m"
    FAIL = "\033[91m"
    ENDC = "\033[0m"
    def disable(self):
        self.HEADER = ""
        self.OKBLUE = ""
        self.OKGREEN = ""
        self.WARNING = ""
        self.FAIL = ""
        self.ENDC = ""


[docs]def create(root=None, data=None, redux=None, prod=None, force=False, basis=None, calib=None, db_sqlite=False, db_sqlite_path=None, db_postgres=False, db_postgres_host="nerscdb03.nersc.gov", db_postgres_port=5432, db_postgres_name="desidev", db_postgres_user="desidev_admin", db_postgres_authorized="desidev_ro", nside=64 ): """Create (or re-create) a production. Args: root (str): value to use for DESI_ROOT. data (str): value to use for DESI_SPECTRO_DATA. redux (str): value to use for DESI_SPECTRO_REDUX. prod (str): value to use for SPECPROD. force (bool): if True, overwrite existing production DB. basis (str): value to use for DESI_BASIS_TEMPLATES. calib (str): value to use for DESI_SPECTRO_CALIB. db_sqlite (bool): if True, use SQLite for the DB. db_sqlite_path (str): override path to SQLite DB. db_postgres (bool): if True, use PostgreSQL for the DB. db_postgres_host (str): PostgreSQL hostname. db_postgres_port (int): PostgreSQL connection port number. db_postgres_name (str): PostgreSQL DB name. db_postgres_user (str): PostgreSQL user name. db_postgres_authorized (str): Additional PostgreSQL users to authorize. nside (int): HEALPix nside value used for spectral grouping. """ log = get_logger() # Check desi root location desiroot = None if root is not None: desiroot = os.path.abspath(root) os.environ["DESI_ROOT"] = desiroot elif "DESI_ROOT" in os.environ: desiroot = os.environ["DESI_ROOT"] else: log.error("You must set DESI_ROOT in your environment or " "set the root keyword argument") raise RuntimeError("Invalid DESI_ROOT") # Check raw data location rawdir = None if data is not None: rawdir = os.path.abspath(data) os.environ["DESI_SPECTRO_DATA"] = rawdir elif "DESI_SPECTRO_DATA" in os.environ: rawdir = os.environ["DESI_SPECTRO_DATA"] else: log.error("You must set DESI_SPECTRO_DATA in your environment or " "set the data keyword argument") raise RuntimeError("Invalid DESI_SPECTRO_DATA") # Check production name prodname = None if prod is not None: prodname = prod os.environ["SPECPROD"] = prodname elif "SPECPROD" in os.environ: prodname = os.environ["SPECPROD"] else: log.error("You must set SPECPROD in your environment or " "set the prod keyword argument") raise RuntimeError("Invalid SPECPROD") # Check spectro redux location specdir = None if redux is not None: specdir = os.path.abspath(redux) os.environ["DESI_SPECTRO_REDUX"] = specdir elif "DESI_SPECTRO_REDUX" in os.environ: specdir = os.environ["DESI_SPECTRO_REDUX"] else: log.error("You must set DESI_SPECTRO_REDUX in your environment or " "set the redux keyword argument") raise RuntimeError("Invalid DESI_SPECTRO_REDUX") proddir = os.path.join(specdir, prodname) if os.path.exists(proddir) and not force : log.error("Production {} exists.\n" "Either remove this directory if you want to start fresh\n" "or use 'desi_pipe update' to update a production\n" "or rerun with --force option.".format(proddir)) raise RuntimeError("production already exists") # Check basis template location if basis is not None: basis = os.path.abspath(basis) os.environ["DESI_BASIS_TEMPLATES"] = basis elif "DESI_BASIS_TEMPLATES" in os.environ: basis = os.environ["DESI_BASIS_TEMPLATES"] else: log.error("You must set DESI_BASIS_TEMPLATES in your environment or " "set the basis keyword argument") raise RuntimeError("Invalid DESI_BASIS_TEMPLATES") # Check calibration location if calib is not None: calib = os.path.abspath(calib) os.environ["DESI_SPECTRO_CALIB"] = calib elif "DESI_SPECTRO_CALIB" in os.environ: calib = os.environ["DESI_SPECTRO_CALIB"] else: log.error("You must set DESI_SPECTRO_CALIB in your environment " " or set the calib keyword argument") raise RuntimeError("Invalid DESI_SPECTRO_CALIB") # Construct our DB connection string dbpath = None if db_postgres: # We are creating a new postgres backend. Explicitly create the # database, so that we can get the schema key. db = pipedb.DataBasePostgres(host=db_postgres_host, port=db_postgres_port, dbname=db_postgres_name, user=db_postgres_user, schema=None, authorize=db_postgres_authorized) dbprops = [ "postgresql", db_postgres_host, "{}".format(db_postgres_port), db_postgres_name, db_postgres_user, db.schema ] dbpath = ":".join(dbprops) os.environ["DESI_SPECTRO_DB"] = dbpath elif db_sqlite: # We are creating a new sqlite backend if db_sqlite_path is not None: # We are using a non-default path dbpath = os.path.abspath(db_sqlite_path) else: # We are using sqlite with the default location dbpath = os.path.join(proddir, "desi.db") if not os.path.isdir(proddir): os.makedirs(proddir) # Create the database db = pipedb.DataBaseSqlite(dbpath, "w") os.environ["DESI_SPECTRO_DB"] = dbpath elif "DESI_SPECTRO_DB" in os.environ: # We are using an existing prod dbpath = os.environ["DESI_SPECTRO_DB"] else: # Error- we have to get the DB info from somewhere log.error("You must set DESI_SPECTRO_DB in your environment or " "use the db_sqlite or db_postgres arguments") raise RuntimeError("Invalid DESI_SPECTRO_DB") pipeprod.update_prod(nightstr=None, hpxnside=nside) # create setup shell snippet setupfile = os.path.abspath(os.path.join(proddir, "setup.sh")) with open(setupfile, "w") as s: s.write("# Generated by desi_pipe\n") s.write("export DESI_ROOT={}\n\n".format(desiroot)) s.write("export DESI_BASIS_TEMPLATES={}\n".format(basis)) s.write("export DESI_SPECTRO_CALIB={}\n\n".format(calib)) s.write("export DESI_SPECTRO_DATA={}\n\n".format(rawdir)) s.write("# Production originally created at\n") s.write("# $DESI_SPECTRO_REDUX={}\n".format(specdir)) s.write("# $SPECPROD={}\n".format(prodname)) s.write("#\n") s.write("# Support the ability to move the production\n") s.write("# - get abspath to directory where this script is located\n") s.write("# - unpack proddir=$DESI_SPECTRO_REDUX/$SPECPROD\n\n") s.write('proddir=$(cd $(dirname "$BASH_SOURCE"); pwd)\n') s.write("export DESI_SPECTRO_REDUX=$(dirname $proddir)\n") s.write("export SPECPROD=$(basename $proddir)\n\n") # s.write("export DESI_SPECTRO_REDUX={}\n".format(specdir)) # s.write("export SPECPROD={}\n".format(specprod)) s.write("export DESI_SPECTRO_DB={}\n".format(dbpath)) s.write("\n") if "DESI_LOGLEVEL" in os.environ: s.write("export DESI_LOGLEVEL=\"{}\"\n\n"\ .format(os.environ["DESI_LOGLEVEL"])) else: s.write("#export DESI_LOGLEVEL=\"DEBUG\"\n\n") log.info("\n\nTo use this production, you should do:\n%> source {}\n\n"\ .format(setupfile)) return
[docs]def update(nightstr=None, nside=64, expid=None): """Update a production. Args: nightstr (str): Comma separated (YYYYMMDD) or regex pattern. Only nights matching these patterns will be considered. nside (int): HEALPix nside value used for spectral grouping. expid (int): Only update the production for a single exposure ID. """ pipeprod.update_prod(nightstr=nightstr, hpxnside=nside, expid=expid) return
[docs]def get_tasks_type(db, tasktype, states, nights, expid=None, spec=None): """Get tasks of one type that match certain criteria. Args: db (DataBase): the production DB. tasktype (str): a valid task type. states (list): list of task states to select. nights (list): list of nights to select. expid (int): exposure ID to select. spec (int): spectrograph to select. Returns: (list): list of tasks meeting the criteria. """ ntlist = ",".join(nights) if (expid is not None) and (len(nights) > 1): raise RuntimeError("Only one night should be specified when " "getting tasks for a single exposure.") tasks = list() with db.cursor() as cur: if tasktype == "spectra" or tasktype == "redshift": cmd = "select pixel from healpix_frame where night in ({})".format(ntlist) cur.execute(cmd) pixels = np.unique([ x for (x,) in cur.fetchall() ]).tolist() pixlist = ",".join([ str(p) for p in pixels]) cmd = "select name,state from {} where pixel in ({})".format(tasktype, pixlist) cur.execute(cmd) tasks = [ x for (x, y) in cur.fetchall() if \ task_int_to_state[y] in states ] else : cmd = "select name, state from {} where night in ({})"\ .format(tasktype, ntlist) if expid is not None: cmd = "{} and expid = {}".format(cmd, expid) if spec is not None: cmd = "{} and spec = {}".format(cmd, spec) cur.execute(cmd) tasks = [ x for (x, y) in cur.fetchall() if \ task_int_to_state[y] in states ] return tasks
[docs]def get_tasks(db, tasktypes, nights, states=None, expid=None, spec=None, nosubmitted=False, taskfile=None): """Get tasks of multiple types that match certain criteria. Args: db (DataBase): the production DB. tasktypes (list): list of valid task types. states (list): list of task states to select. nights (list): list of nights to select. expid (int): exposure ID to select. spec (int): spectrograph to select. nosubmitted (bool): if True, ignore tasks that were already submitted. Returns: list: all tasks of all types. """ all_tasks = list() for tt in tasktypes: tasks = get_tasks_type(db, tt, states, nights, expid=expid, spec=spec) if nosubmitted: if (tt != "spectra") and (tt != "redshift"): sb = db.get_submitted(tasks) tasks = [ x for x in tasks if not sb[x] ] all_tasks.extend(tasks) return all_tasks
[docs]def tasks(tasktypes, nightstr=None, states=None, expid=None, spec=None, nosubmitted=False, db_postgres_user="desidev_ro", taskfile=None): """Get tasks of multiple types that match certain criteria. Args: tasktypes (list): list of valid task types. nightstr (list): comma separated (YYYYMMDD) or regex pattern. states (list): list of task states to select. expid (int): exposure ID to select. spec (int): spectrograph to select. nosubmitted (bool): if True, ignore tasks that were already submitted. db_postgres_user (str): If using postgres, connect as this user for read-only access" taskfile (str): if set write to this file, else write to STDOUT. """ if states is None: states = task_states else: for s in states: if s not in task_states: raise RuntimeError("Task state '{}' is not valid".format(s)) dbpath = io.get_pipe_database() db = pipedb.load_db(dbpath, mode="r", user=db_postgres_user) allnights = io.get_nights(strip_path=True) nights = pipeprod.select_nights(allnights, nightstr) ttypes = list() for tt in pipedb.all_task_types(): if tt in tasktypes: ttypes.append(tt) all_tasks = get_tasks(db, ttypes, nights, states=states, expid=expid, spec=spec, nosubmitted=nosubmitted) pipeprod.task_write(taskfile, all_tasks) return
[docs]def getready(db, nightstr=None): """Update forward dependencies in the database. Update database for one or more nights to ensure that forward dependencies know that they are ready to run. Args: db (DataBase): the production DB. nightstr (list): comma separated (YYYYMMDD) or regex pattern. """ allnights = io.get_nights(strip_path=True) nights = pipeprod.select_nights(allnights, nightstr) for nt in nights: db.getready(night=nt) return
[docs]def check_tasks(tasks, db=None): """Check the state of pipeline tasks. If the database handle is given, use the DB for checking. Otherwise use the filesystem. Args: tasks (list): list of tasks to check. db (DataBase): the database to use. Returns: OrderedDict: dictionary of the state of each task. """ states = pipedb.check_tasks(tasks, db=db) tskstate = OrderedDict() for tsk in tasks: tskstate[tsk] = states[tsk] return tskstate
[docs]def sync(db, nightstr=None, specdone=False): """Synchronize DB state based on the filesystem. This scans the filesystem for all tasks for the specified nights, and updates the states accordingly. Args: db (DataBase): the production DB. nightstr (list): comma separated (YYYYMMDD) or regex pattern. specdone: If true, set spectra to done if files exist. """ allnights = io.get_nights(strip_path=True) nights = pipeprod.select_nights(allnights, nightstr) for nt in nights: db.sync(nt,specdone=specdone) return
[docs]def cleanup(db, tasktypes, failed=False, submitted=False, expid=None): """Clean up stale tasks in the DB. Args: db (DataBase): the production DB. tasktypes (list): list of valid task types. failed (bool): also clear failed states. submitted (bool): also clear submitted flag. expid (int): only clean this exposure ID. """ exid = None if expid is not None and expid >= 0: exid = expid db.cleanup(tasktypes=tasktypes, expid=exid, cleanfailed=failed, cleansubmitted=submitted) return
[docs]def dryrun(tasks, nersc=None, nersc_queue="regular", nersc_maxtime=0, nersc_maxnodes=0, nersc_shifter=None, mpi_procs=1, mpi_run="", procs_per_node=0, nodb=False, db_postgres_user="desidev_ro", force=False): """Print equivalent command line jobs. For the specified tasks, print the equivalent stand-alone commands that would be run on each task. A pipeline job calls the internal desispec.scripts entrypoints directly. Args: tasks (list): list of tasks to run. nersc (str): if not None, the name of the nersc machine to use (cori-haswell | cori-knl). nersc_queue (str): the name of the queue to use (regular | debug | realtime). nersc_maxtime (int): if specified, restrict the runtime to this number of minutes. nersc_maxnodes (int): if specified, restrict the job to use this number of nodes. nersc_shifter (str): the name of the shifter image to use. mpi_run (str): if specified, and if not using NERSC, use this command to launch MPI executables in the shell scripts. Default is to not use MPI. mpi_procs (int): if not using NERSC, the number of MPI processes to use in shell scripts. procs_per_node (int): if specified, use only this number of processes per node. Default runs one process per core. nodb (bool): if True, do not use the production DB. db_postgres_user (str): If using postgres, connect as this user for read-only access" force (bool): if True, print commands for all tasks, not just the ones in a ready state. """ tasks_by_type = pipedb.task_sort(tasks) (db, opts) = pipeprod.load_prod("r", user=db_postgres_user) if nodb: db = None ppn = None if procs_per_node > 0: ppn = procs_per_node if nersc is None: # Not running at NERSC if ppn is None: ppn = mpi_procs for tt, tlist in tasks_by_type.items(): piperun.dry_run(tt, tlist, opts, mpi_procs, ppn, db=db, launch="mpirun -n", force=force) else: # Running at NERSC hostprops = scriptgen.nersc_machine(nersc, nersc_queue) for tt, tlist in tasks_by_type.items(): joblist = scriptgen.nersc_job_size(tt, tlist, nersc, nersc_queue, nersc_maxtime, nersc_maxnodes, nodeprocs=ppn, db=db) launch="srun -n" for (jobnodes, jobppn, jobtime, jobworkers, jobtasks) in joblist: jobprocs = jobnodes * jobppn piperun.dry_run(tt, jobtasks, opts, jobprocs, jobppn, db=db, launch=launch, force=force) return
[docs]def gen_scripts(tasks_by_type, nersc=None, nersc_queue="regular", nersc_maxtime=0, nersc_maxnodes=0, nersc_shifter=None, mpi_procs=1, mpi_run="", procs_per_node=0, nodb=False, out=None, debug=False, db_postgres_user="desidev_ro"): """Generate scripts to run tasks of one or more types. If multiple task type keys are contained in the dictionary, they will be packed into a single batch job. Args: tasks_by_type (dict): each key is the task type and the value is a list of tasks. nersc (str): if not None, the name of the nersc machine to use (cori-haswell | cori-knl). nersc_queue (str): the name of the queue to use (regular | debug | realtime). nersc_maxtime (int): if specified, restrict the runtime to this number of minutes. nersc_maxnodes (int): if specified, restrict the job to use this number of nodes. nersc_shifter (str): the name of the shifter image to use. mpi_run (str): if specified, and if not using NERSC, use this command to launch MPI executables in the shell scripts. Default is to not use MPI. mpi_procs (int): if not using NERSC, the number of MPI processes to use in shell scripts. procs_per_node (int): if specified, use only this number of processes per node. Default runs one process per core. nodb (bool): if True, do not use the production DB. out (str): Put task scripts and logs in this directory relative to the production 'scripts' directory. Default puts task directory in the main scripts directory. debug (bool): if True, enable DEBUG log level in generated scripts. db_postgres_user (str): If using postgres, connect as this user for read-only access" Returns: list: the generated script files """ ttypes = list(tasks_by_type.keys()) if len(ttypes)==0 : return None jobname = ttypes[0] if len(ttypes) > 1: jobname = "{}-{}".format(ttypes[0], ttypes[-1]) proddir = os.path.abspath(io.specprod_root()) import datetime now = datetime.datetime.now() outtaskdir = "{}_{:%Y%m%d-%H%M%S-%f}".format(jobname, now) if out is None: outdir = os.path.join(proddir, io.get_pipe_rundir(), io.get_pipe_scriptdir(), outtaskdir) else: outdir = os.path.join(proddir, io.get_pipe_rundir(), io.get_pipe_scriptdir(), out, outtaskdir) if not os.path.isdir(outdir): os.makedirs(outdir) mstr = "run" if nersc is not None: mstr = nersc outscript = os.path.join(outdir, mstr) outlog = os.path.join(outdir, mstr) (db, opts) = pipeprod.load_prod("r", user=db_postgres_user) if nodb: db = None ppn = None if procs_per_node > 0: ppn = procs_per_node # FIXME: Add openmp / multiproc function to task classes and # call them here. scripts = None if nersc is None: # Not running at NERSC scripts = scriptgen.batch_shell(tasks_by_type, outscript, outlog, mpirun=mpi_run, mpiprocs=mpi_procs, openmp=1, db=db) else: # Running at NERSC scripts = scriptgen.batch_nersc(tasks_by_type, outscript, outlog, jobname, nersc, nersc_queue, nersc_maxtime, nersc_maxnodes, nodeprocs=ppn, openmp=False, multiproc=False, db=db, shifterimg=nersc_shifter, debug=debug) return scripts
[docs]def script(taskfile, nersc=None, nersc_queue="regular", nersc_maxtime=0, nersc_maxnodes=0, nersc_shifter=None, mpi_procs=1, mpi_run="", procs_per_node=0, nodb=False, out=None, debug=False, db_postgres_user="desidev_ro"): """Generate pipeline scripts for a taskfile. This gets tasks from the taskfile and sorts them by type. Then it generates the scripts. Args: taskfile (str): read tasks from this file (if not specified, read from STDIN). nersc (str): if not None, the name of the nersc machine to use (cori-haswell | cori-knl). nersc_queue (str): the name of the queue to use (regular | debug | realtime). nersc_maxtime (int): if specified, restrict the runtime to this number of minutes. nersc_maxnodes (int): if specified, restrict the job to use this number of nodes. nersc_shifter (str): the name of the shifter image to use. mpi_run (str): if specified, and if not using NERSC, use this command to launch MPI executables in the shell scripts. Default is to not use MPI. mpi_procs (int): if not using NERSC, the number of MPI processes to use in shell scripts. procs_per_node (int): if specified, use only this number of processes per node. Default runs one process per core. nodb (bool): if True, do not use the production DB. out (str): Put task scripts and logs in this directory relative to the production 'scripts' directory. Default puts task directory in the main scripts directory. debug (bool): if True, enable DEBUG log level in generated scripts. db_postgres_user (str): If using postgres, connect as this user for read-only access" Returns: list: the generated script files """ tasks = pipeprod.task_read(taskfile) scripts = list() if len(tasks) > 0: tasks_by_type = pipedb.task_sort(tasks) scripts = gen_scripts( tasks_by_type, nersc=nersc, nersc_queue=nersc_queue, nersc_maxtime=nersc_maxtime, nersc_maxnodes=nersc_maxnodes, nersc_shifter=nersc_shifter, mpi_procs=mpi_procs, mpi_run=mpi_run, procs_per_node=procs_per_node, nodb=nodb, out=out, debug=debug, db_postgres_user=db_postgres_user) else: import warnings warnings.warn("Input task list is empty", RuntimeWarning) return scripts
[docs]def run_scripts(scripts, deps=None, slurm=False): """Run job scripts with optional dependecies. This either submits the jobs to the scheduler or simply runs them in order with subprocess. Args: scripts (list): list of pathnames of the scripts to run. deps (list): optional list of job IDs which are dependencies for these scripts. slurm (bool): if True use slurm to submit the jobs. Returns: list: the job IDs returned by the scheduler. """ import subprocess as sp log = get_logger() depstr = "" if deps is not None and len(deps)>0 : depstr = "-d afterok" for d in deps: depstr = "{}:{}".format(depstr, d) jobids = list() if slurm: # submit each job and collect the job IDs for scr in scripts: scom = "sbatch {} {}".format(depstr, scr) #print("RUN SCRIPTS: {}".format(scom)) log.debug(time.asctime()) log.info(scom) sout = sp.check_output(scom, shell=True, universal_newlines=True) log.info(sout) p = sout.split() jid = re.sub(r'[^\d]', '', p[3]) jobids.append(jid) else: # run the scripts one at a time for scr in scripts: rcode = sp.call(scr, shell=True) if rcode != 0: log.warning("script {} had return code = {}".format(scr, rcode)) return jobids
[docs]def run(taskfile, nosubmitted=False, depjobs=None, nersc=None, nersc_queue="regular", nersc_maxtime=0, nersc_maxnodes=0, nersc_shifter=None, mpi_procs=1, mpi_run="", procs_per_node=0, nodb=False, out=None, debug=False): """Create job scripts and run them. This gets tasks from the taskfile and sorts them by type. Then it generates the scripts. Finally, it runs or submits those scripts to the scheduler. Args: taskfile (str): read tasks from this file (if not specified, read from STDIN). nosubmitted (bool): if True, do not run jobs that have already been submitted. depjobs (list): list of job ID dependencies. nersc (str): if not None, the name of the nersc machine to use (cori-haswell | cori-knl). nersc_queue (str): the name of the queue to use (regular | debug | realtime). nersc_maxtime (int): if specified, restrict the runtime to this number of minutes. nersc_maxnodes (int): if specified, restrict the job to use this number of nodes. nersc_shifter (str): the name of the shifter image to use. mpi_run (str): if specified, and if not using NERSC, use this command to launch MPI executables in the shell scripts. Default is to not use MPI. mpi_procs (int): if not using NERSC, the number of MPI processes to use in shell scripts. procs_per_node (int): if specified, use only this number of processes per node. Default runs one process per core. nodb (bool): if True, do not use the production DB. out (str): Put task scripts and logs in this directory relative to the production 'scripts' directory. Default puts task directory in the main scripts directory. debug (bool): if True, enable DEBUG log level in generated scripts. Returns: list: the job IDs returned by the scheduler. """ log = get_logger() tasks = pipeprod.task_read(taskfile) jobids = list() if len(tasks) > 0: tasks_by_type = pipedb.task_sort(tasks) tasktypes = list(tasks_by_type.keys()) # We are packing everything into one job scripts = gen_scripts( tasks_by_type, nersc=nersc, nersc_queue=nersc_queue, nersc_maxtime=nersc_maxtime, nersc_maxnodes=nersc_maxnodes, nersc_shifter=nersc_shifter, mpi_procs=mpi_procs, mpi_run=mpi_run, procs_per_node=procs_per_node, nodb=nodb, out=out, debug=debug) log.info("wrote scripts {}".format(scripts)) deps = None slurm = False if nersc is not None: slurm = True if depjobs is not None: deps = depjobs # Run the jobs if not nodb: # We can use the DB, mark tasks as submitted. if slurm: dbpath = io.get_pipe_database() db = pipedb.load_db(dbpath, mode="w") for tt in tasktypes: if (tt != "spectra") and (tt != "redshift"): db.set_submitted_type(tt, tasks_by_type[tt]) jobids = run_scripts(scripts, deps=deps, slurm=slurm) else: import warnings warnings.warn("Input task list is empty", RuntimeWarning) return jobids
[docs]def chain(tasktypes, nightstr=None, states=None, expid=None, spec=None, pack=False, nosubmitted=False, depjobs=None, nersc=None, nersc_queue="regular", nersc_maxtime=0, nersc_maxnodes=0, nersc_shifter=None, mpi_procs=1, mpi_run="", procs_per_node=0, nodb=False, out=None, debug=False, dryrun=False): """Run a chain of jobs for multiple pipeline steps. For the list of task types, get all ready tasks meeting the selection criteria. Then either pack all tasks into one job or submit each task type as its own job. Input job dependencies can be specified, and dependencies are tracked between jobs in the chain. Args: tasktypes (list): list of valid task types. nightstr (str): Comma separated (YYYYMMDD) or regex pattern. Only nights matching these patterns will be considered. states (list): list of task states to select. nights (list): list of nights to select. expid (int): exposure ID to select. pack (bool): if True, pack all tasks into a single job. nosubmitted (bool): if True, do not run jobs that have already been submitted. depjobs (list): list of job ID dependencies. nersc (str): if not None, the name of the nersc machine to use (cori-haswell | cori-knl). nersc_queue (str): the name of the queue to use (regular | debug | realtime). nersc_maxtime (int): if specified, restrict the runtime to this number of minutes. nersc_maxnodes (int): if specified, restrict the job to use this number of nodes. nersc_shifter (str): the name of the shifter image to use. mpi_run (str): if specified, and if not using NERSC, use this command to launch MPI executables in the shell scripts. Default is to not use MPI. mpi_procs (int): if not using NERSC, the number of MPI processes to use in shell scripts. procs_per_node (int): if specified, use only this number of processes per node. Default runs one process per core. nodb (bool): if True, do not use the production DB. out (str): Put task scripts and logs in this directory relative to the production 'scripts' directory. Default puts task directory in the main scripts directory. debug (bool): if True, enable DEBUG log level in generated scripts. dryrun (bool): if True, do not submit the jobs. Returns: list: the job IDs from the final step in the chain. """ log = get_logger() machprops = None if nersc is not None: machprops = scriptgen.nersc_machine(nersc, nersc_queue) if states is None: states = task_states else: for s in states: if s not in task_states: raise RuntimeError("Task state '{}' is not valid".format(s)) ttypes = list() for tt in pipetasks.base.default_task_chain: if tt in tasktypes: ttypes.append(tt) if (machprops is not None) and (not pack): if len(ttypes) > machprops["submitlimit"]: log.error("Queue {} on machine {} limited to {} jobs."\ .format(nersc_queue, nersc, machprops["submitlimit"])) log.error("Use a different queue or shorter chains of tasks.") raise RuntimeError("Too many jobs") slurm = False if nersc is not None: slurm = True dbpath = io.get_pipe_database() db = pipedb.load_db(dbpath, mode="w") allnights = io.get_nights(strip_path=True) nights = pipeprod.select_nights(allnights, nightstr) outdeps = None indeps = None if depjobs is not None: indeps = depjobs tasks_by_type = OrderedDict() for tt in ttypes: # Get the tasks. We select by state and submitted status. tasks = get_tasks_type(db, tt, states, nights, expid=expid, spec=spec) #print("CHAIN: ", tt, tasks) if nosubmitted: if (tt != "spectra") and (tt != "redshift"): sb = db.get_submitted(tasks) tasks = [ x for x in tasks if not sb[x] ] #print("CHAIN: nosubmitted: ", tt, tasks) if len(tasks) == 0: import warnings warnings.warn("Input task list for '{}' is empty".format(tt), RuntimeWarning) continue # might be tasks to do in other ttype tasks_by_type[tt] = tasks scripts = None tscripts = None if pack: # We are packing everything into one job scripts = gen_scripts( tasks_by_type, nersc=nersc, nersc_queue=nersc_queue, nersc_maxtime=nersc_maxtime, nersc_maxnodes=nersc_maxnodes, nersc_shifter=nersc_shifter, mpi_procs=mpi_procs, mpi_run=mpi_run, procs_per_node=procs_per_node, nodb=nodb, out=out, debug=debug) if scripts is not None and len(scripts)>0 : log.info("wrote scripts {}".format(scripts)) else: # Generate individual scripts tscripts = dict() for tt in ttypes: onetype = OrderedDict() onetype[tt] = tasks_by_type[tt] tscripts[tt] = gen_scripts( onetype, nersc=nersc, nersc_queue=nersc_queue, nersc_maxtime=nersc_maxtime, nersc_maxnodes=nersc_maxnodes, nersc_shifter=nersc_shifter, mpi_procs=mpi_procs, mpi_run=mpi_run, procs_per_node=procs_per_node, nodb=nodb, out=out, debug=debug) if tscripts[tt] is not None : log.info("wrote script {}".format(tscripts[tt])) if dryrun : log.warning("dry run: do not submit the jobs") return None # Run the jobs if slurm: for tt in ttypes: if (tt != "spectra") and (tt != "redshift"): if tt in tasks_by_type.keys() : db.set_submitted_type(tt, tasks_by_type[tt]) outdeps = None if pack: # Submit one job if scripts is not None and len(scripts)>0 : outdeps = run_scripts(scripts, deps=indeps, slurm=slurm) else: # Loop over task types submitting jobs and tracking dependencies. for tt in ttypes: if tscripts[tt] is not None : outdeps = run_scripts(tscripts[tt], deps=indeps, slurm=slurm) if outdeps is not None and len(outdeps) > 0: indeps = outdeps else: indeps = None return outdeps
def status_color(state): col = clr.ENDC if state == "done": col = clr.OKGREEN elif state == "running": col = clr.WARNING elif state == "failed": col = clr.FAIL elif state == "ready": col = clr.OKBLUE return col def status_task(task, ttype, state, logdir): fields = pipetasks.base.task_classes[ttype].name_split(task) tasklog = None if "night" in fields: tasklogdir = os.path.join( logdir, io.get_pipe_nightdir(), "{:08d}".format(fields["night"]) ) tasklog = os.path.join( tasklogdir, "{}.log".format(task) ) elif "pixel" in fields: tasklogdir = os.path.join( logdir, "healpix", io.healpix_subdirectory(fields["nside"],fields["pixel"]) ) tasklog = os.path.join( tasklogdir, "{}.log".format(task) ) col = status_color(state) print("Task {}".format(task)) print( "State = {}{}{}".format( col, state, clr.ENDC ) ) if os.path.isfile(tasklog): print("Dumping task log {}".format(tasklog)) print("=========== Begin Log =============") print("") with open(tasklog, "r") as f: logdata = f.read() print(logdata) print("") print("============ End Log ==============") print("", flush=True) else: print("Task log {} does not exist".format(tasklog), flush=True) return def status_taskname(tsklist): for tsk in tsklist: st = tsk[1] col = status_color(st) print( " {:20s}: {}{}{}".format(tsk[0], col, st, clr.ENDC), flush=True ) def status_night_totals(tasktypes, nights, tasks, tskstates): # Accumulate totals for each night and type sep = "------------------+---------+---------+---------+---------+---------+" ntlist = list() nighttot = OrderedDict() for tt in tasktypes: if tt == "spectra" or tt == "redshift": # This function only prints nightly tasks continue for tsk in tasks[tt]: fields = pipetasks.base.task_classes[tt].name_split(tsk) nt = fields["night"] if nt not in nighttot: nighttot[nt] = OrderedDict() if tt not in nighttot[nt]: nighttot[nt][tt] = OrderedDict() for s in task_states: nighttot[nt][tt][s] = 0 st = tskstates[tt][tsk] nighttot[nt][tt][st] += 1 for nt, ttstates in nighttot.items(): ntstr = "{:08d}".format(nt) if ntstr in nights: ntlist.append(nt) ntlist = list(sorted(ntlist)) for nt in ntlist: ttstates = nighttot[nt] ntstr = "{:08d}".format(nt) if ntstr in nights: header = "{:18s}|".format(ntstr) for s in task_states: col = status_color(s) header = "{} {}{:8s}{}|".format( header, col, s, clr.ENDC ) print(sep) print(header) print(sep) for tt, totst in ttstates.items(): line = " {:16s}|".format(tt) for s in task_states: line = "{}{:9d}|".format(line, totst[s]) print(line) print("", flush=True) def status_pixel_totals(tasktypes, tasks, tskstates): # Accumulate totals for each type sep = "------------------+---------+---------+---------+---------+---------+" pixtot = OrderedDict() for tt in tasktypes: if (tt != "spectra") and (tt != "redshift"): # This function only prints pixel tasks continue for tsk in tasks[tt]: if tt not in pixtot: pixtot[tt] = OrderedDict() for s in task_states: pixtot[tt][s] = 0 st = tskstates[tt][tsk] pixtot[tt][st] += 1 header = "{:18s}|".format("Pixel Tasks") for s in task_states: col = status_color(s) header = "{} {}{:8s}{}|".format( header, col, s, clr.ENDC ) print(sep) print(header) print(sep) for tt, totst in pixtot.items(): line = " {:16s}|".format(tt) for s in task_states: line = "{}{:9d}|".format(line, totst[s]) print(line) print("", flush=True) def status_night_tasks(tasktypes, nights, tasks, tskstates): # Sort the tasks into nights nighttasks = OrderedDict() ntlist = list() for tt in tasktypes: if tt == "spectra" or tt == "redshift": # This function only prints nightly tasks continue for tsk in tasks[tt]: fields = pipetasks.base.task_classes[tt].name_split(tsk) nt = fields["night"] if nt not in nighttasks: nighttasks[nt] = list() nighttasks[nt].append((tsk, tskstates[tt][tsk])) for nt, tsklist in nighttasks.items(): ntstr = "{:08d}".format(nt) if ntstr in nights: ntlist.append(nt) ntlist = list(sorted(ntlist)) for nt in ntlist: tsklist = nighttasks[nt] ntstr = "{:08d}".format(nt) if ntstr in nights: print(nt) status_taskname(tsklist) def status_pixel_tasks(tasktypes, tasks, tskstates): for tt in tasktypes: tsklist = list() if (tt != "spectra") and (tt != "redshift"): # This function only prints pixel tasks continue for tsk in tasks[tt]: tsklist.append((tsk, tskstates[tt][tsk])) print(tt) status_taskname(tsklist) def status_summary(tasktypes, nights, tasks, tskstates): sep = "----------------+---------+---------+---------+---------+---------+" hline = "-----------------------------------------------" print(sep) header_state = "{:16s}|".format(" Task Type") for s in task_states: col = status_color(s) header_state = "{} {}{:8s}{}|".format( header_state, col, s, clr.ENDC ) print(header_state) print(sep) for tt in tasktypes: line = "{:16s}|".format(tt) for s in task_states: tsum = np.sum( np.array( [1 for x, y in tskstates[tt].items() if y == s], dtype=np.int32 ) ) line = "{}{:9d}|".format(line, tsum) print(line, flush=True)
[docs]def status(task=None, tasktypes=None, nightstr=None, states=None, expid=None, spec=None, db_postgres_user="desidev_ro"): """Check the status of pipeline tasks. Args: Returns: None """ dbpath = io.get_pipe_database() db = pipedb.load_db(dbpath, mode="r", user=db_postgres_user) rundir = io.get_pipe_rundir() logdir = os.path.join(rundir, io.get_pipe_logdir()) tasks = OrderedDict() summary = False if (tasktypes is None) and (nightstr is None): summary = True if task is None: ttypes = None if tasktypes is not None: ttypes = list() for tt in pipetasks.base.default_task_chain: if tt in tasktypes: ttypes.append(tt) else: ttypes = list(pipetasks.base.default_task_chain) if states is None: states = task_states else: for s in states: if s not in task_states: raise RuntimeError("Task state '{}' is not valid".format(s)) allnights = io.get_nights(strip_path=True) nights = pipeprod.select_nights(allnights, nightstr) for tt in ttypes: tasks[tt] = get_tasks( db, [tt], nights, states=states, expid=expid, spec=spec ) else: ttypes = [pipetasks.base.task_type(task)] tasks[ttypes[0]] = [task] tstates = OrderedDict() for typ, tsks in tasks.items(): tstates[typ] = pipedb.check_tasks(tsks, db=db) if len(ttypes) == 1 and len(tasks[ttypes[0]]) == 1: # Print status of this specific task thistype = ttypes[0] thistask = tasks[thistype][0] status_task(thistask, thistype, tstates[thistype][thistask], logdir) else: if len(ttypes) > 1 and len(nights) > 1: # We have multiple nights and multiple task types. # Just print totals. if summary: status_summary(ttypes, nights, tasks, tstates) else: status_night_totals(ttypes, nights, tasks, tstates) status_pixel_totals(ttypes, tasks, tstates) elif len(ttypes) > 1: # Multiple task types for one night. Print the totals for each # task type. thisnight = nights[0] status_night_totals(ttypes, nights, tasks, tstates) elif len(nights) > 1: # We have just one task type, print the state totals for each night # OR the full task list for redshift or spectra tasks. thistype = ttypes[0] print("Task type {}".format(thistype)) if thistype == "spectra" or thistype == "redshift": status_pixel_tasks(ttypes, tasks, tstates) else: status_night_totals(ttypes, nights, tasks, tstates) else: # We have one type and one night, print the full state of every # task. thistype = ttypes[0] thisnight = nights[0] print("Task type {}".format(thistype)) status_night_tasks(ttypes, nights, tasks, tstates) status_pixel_tasks(ttypes, tasks, tstates) return