#
# See top-level LICENSE.rst file for Copyright information
#
# -*- coding: utf-8 -*-
"""
desispec.pipeline.db
====================
Pipeline processing database
"""
from __future__ import absolute_import, division, print_function
import os
import re
from collections import OrderedDict
from contextlib import contextmanager
import numpy as np
from desiutil.log import get_logger
from .. import io
import fitsio
from .defs import (task_states, task_int_to_state, task_state_to_int, task_name_sep)
[docs]def all_task_types():
"""Get the list of possible task types that are supported.
Returns:
list: The list of supported task types.
"""
from . import tasks
from .tasks.base import default_task_chain
ttypes = ["fibermap", "rawdata"]
ttypes.extend(tasks.base.default_task_chain)
# Insert qadata after cframe
idx = ttypes.index('cframe')
ttypes.insert(idx+1, 'qadata')
return ttypes
[docs]def task_sort(tasks):
"""Sort a list of tasks by type.
This takes a list of arbitrary tasks and sorts them by type. The result
is placed in an ordered dictionary of lists in run order.
Args:
tasks (list): the list of input tasks.
Returns:
(OrderedDict): ordered dictionary of tasks sorted by type.
"""
from .tasks.base import task_classes, task_type
sort = dict()
ttypes = all_task_types()
for tp in ttypes:
sort[tp] = list()
for tsk in tasks:
sort[task_type(tsk)].append(tsk)
ret = OrderedDict()
for tp in ttypes:
if len(sort[tp]) > 0:
ret[tp] = sort[tp]
return ret
[docs]def all_tasks(night, nside, expid=None):
"""Get all possible tasks for a single night.
This uses the filesystem to query the raw data for a particular night and
return a dictionary containing all possible tasks for each task type. For
objects which span multiple nights (e.g. spectra, redrock), this returns the
tasks which are touched by the given night.
Args:
night (str): The night to scan for tasks.
nside (int): The HEALPix NSIDE value to use.
expid (int): Only get tasks for this single exposure.
Returns:
dict: a dictionary whose keys are the task types and where each value
is a list of task properties.
"""
import desimodel.footprint
log = get_logger()
log.debug("io.get_exposures night={}".format(night))
expids = io.get_exposures(night, raw=True)
full = dict()
for t in all_task_types():
full[t] = list()
healpix_frames = []
if expid is not None:
if expid not in expids:
raise RuntimeError("exposure ID {} not valid for night {}"\
.format(expid, night))
expids = [ expid ]
for ex in sorted(expids):
# get the fibermap for this exposure
fibermap = io.get_raw_files("fibermap", night, ex)
log.debug("read {}".format(fibermap))
fmdata = io.read_fibermap(fibermap)
header = fmdata.meta
# fmdata, header = fitsio.read(fibermap, 'FIBERMAP', header=True)
flavor = header["FLAVOR"].strip().lower()
if flavor not in ["arc","flat","science"] :
log.error("Do not know what do to with fibermap flavor '{}' for file '{}".format(flavor,fibermap))
raise ValueError("Do not know what do to with fibermap flavor '{}' for file '{}".format(flavor,fibermap))
fmpix = dict()
if (flavor != "arc") and (flavor != "flat"):
# This will be used to track which healpix pixels are
# touched by fibers from each spectrograph.
ra = np.array(fmdata["TARGET_RA"], dtype=np.float64)
dec = np.array(fmdata["TARGET_DEC"], dtype=np.float64)
# rm NaN (possible depending on versions of fiberassign)
valid_coordinates = (np.isnan(ra)==False)&(np.isnan(dec)==False)
for spectro in np.unique( fmdata["SPECTROID"] ) :
ii=np.where(fmdata["SPECTROID"][valid_coordinates]==spectro)[0]
if ii.size == 0 : continue
pixels = desimodel.footprint.radec2pix(nside, ra[valid_coordinates][ii], dec[valid_coordinates][ii])
for pixel in np.unique(pixels) :
props = dict()
props["night"] = int(night)
props["expid"] = int(ex)
props["spec"] = spectro
props["nside"] = nside
props["pixel"] = pixel
props["ntargets"] = np.sum(pixels==pixel)
healpix_frames.append(props)
# all spectro at once
pixels = np.unique(desimodel.footprint.radec2pix(nside, ra[valid_coordinates], dec[valid_coordinates]))
for pixel in pixels :
props = dict()
props["pixel"] = pixel
props["nside"] = nside
props["state"] = "waiting"
exists=False
for entry in full["spectra"] :
if entry["pixel"]==props["pixel"] :
exists=True
break
if not exists : full["spectra"].append(props)
exists=False
for entry in full["redshift"] :
if entry["pixel"]==props["pixel"] :
exists=True
break
if not exists : full["redshift"].append(props)
fmprops = dict()
fmprops["night"] = int(night)
fmprops["expid"] = int(ex)
fmprops["flavor"] = flavor
fmprops["state"] = "done"
full["fibermap"].append(fmprops)
rdprops = dict()
rdprops["night"] = int(night)
rdprops["expid"] = int(ex)
rdprops["flavor"] = flavor
rdprops["state"] = "done"
full["rawdata"].append(rdprops)
# Add the preprocessed pixel files
for band in ['b', 'r', 'z']:
# need to open the rawdata file to see how many spectros
# and cameras are there
for spec in np.unique( fmdata["SPECTROID"] ) :
pixprops = dict()
pixprops["night"] = int(night)
pixprops["band"] = band
pixprops["spec"] = spec
pixprops["expid"] = int(ex)
pixprops["flavor"] = flavor
pixprops["state"] = "ready"
full["preproc"].append(pixprops)
if flavor == "arc" :
# Add the PSF files
props = dict()
props["night"] = int(night)
props["band"] = band
props["spec"] = spec
props["expid"] = int(ex)
props["state"] = "waiting" # see defs.task_states
full["psf"].append(props)
# Add a PSF night file if does not exist
exists=False
for entry in full["psfnight"] :
if entry["night"]==props["night"] \
and entry["band"]==props["band"] \
and entry["spec"]==props["spec"] :
exists=True
break
if not exists :
props = dict()
props["night"] = int(night)
props["band"] = band
props["spec"] = spec
props["state"] = "waiting" # see defs.task_states
full["psfnight"].append(props)
if flavor != "arc" :
# Add extractions
props = dict()
props["night"] = int(night)
props["band"] = band
props["spec"] = spec
props["expid"] = int(ex)
props["state"] = "waiting" # see defs.task_states
# Add traceshift
full["traceshift"].append(props)
# Add extractions
full["extract"].append(props)
if flavor == "flat" :
# Add a fiberflat task
props = dict()
props["night"] = int(night)
props["band"] = band
props["spec"] = spec
props["expid"] = int(ex)
props["state"] = "waiting" # see defs.task_states
full["fiberflat"].append(props)
# Add a fiberflat night file if does not exist
exists=False
for entry in full["fiberflatnight"] :
if entry["night"]==props["night"] \
and entry["band"]==props["band"] \
and entry["spec"]==props["spec"] :
exists=True
break
if not exists :
props = dict()
props["night"] = int(night)
props["band"] = band
props["spec"] = spec
props["state"] = "waiting" # see defs.task_states
full["fiberflatnight"].append(props)
if flavor != "arc" and flavor != "flat":
# Add sky
props = dict()
props["night"] = int(night)
props["band"] = band
props["spec"] = spec
props["expid"] = int(ex)
props["state"] = "waiting" # see defs.task_states
full["sky"].append(props)
# Add fluxcalib
full["fluxcalib"].append(props)
# Add cframe
full["cframe"].append(props)
# Add QA
full["qadata"].append(props)
# Add starfit if does not exist
exists=False
for entry in full["starfit"] :
if entry["night"]==props["night"] \
and entry["expid"]==props["expid"] \
and entry["spec"]==props["spec"] :
exists=True
break
if not exists :
props = dict()
props["night"] = int(night)
props["expid"] = int(ex)
props["spec"] = spec
props["state"] = "waiting" # see defs.task_states
full["starfit"].append(props)
log.debug("done")
return full , healpix_frames
[docs]def check_tasks(tasklist, db=None, inputs=None):
"""Check a list of tasks and return their state.
If the database is specified, it is used to check the state of the tasks
and their dependencies. Otherwise the filesystem is checked.
Args:
tasklist (list): list of tasks.
db (pipeline.db.DB): The optional database to use.
inputs (dict): optional dictionary containing the only input
dependencies that should be considered.
Returns:
dict: The current state of all tasks.
"""
from .tasks.base import task_classes, task_type
states = dict()
if db is None:
# Check the filesystem to see which tasks are done. Since we don't
# have a DB, we can only distinguish between "waiting", "ready", and
# "done" states.
for tsk in tasklist:
tasktype = task_type(tsk)
st = "waiting"
# Check dependencies
deps = task_classes[tasktype].deps(tsk, db=db, inputs=inputs)
if len(deps)==0 :
# do not set state to ready of tasks with 0 dependencies
ready = False
else :
ready = True
for k, v in deps.items():
if not isinstance(v, list):
v = [ v ]
for dp in v:
deptype = task_type(dp)
depfiles = task_classes[deptype].paths(dp)
for odep in depfiles:
if not os.path.isfile(odep):
ready = False
break
if ready:
st = "ready"
done = True
# Check outputs
outfiles = task_classes[tasktype].paths(tsk)
for out in outfiles:
if not os.path.isfile(out):
done = False
break
if done:
st = "done"
states[tsk] = st
else:
states = db.get_states(tasklist)
return states
[docs]class DataBase:
"""Class for tracking pipeline processing objects and state.
"""
def __init__(self):
self._conn = None
return
[docs] def get_states_type(self, tasktype, tasks):
"""Efficiently get the state of many tasks of a single type.
Args:
tasktype (str): the type of these tasks.
tasks (list): list of task names.
Returns:
dict: the state of each task.
"""
states = None
namelist = ",".join([ "'{}'".format(x) for x in tasks ])
log = get_logger()
log.debug("opening db")
with self.cursor() as cur:
log.debug("selecting in db")
cur.execute(\
'select name, state from {} where name in ({})'.format(tasktype,
namelist))
st = cur.fetchall()
log.debug("done")
states = { x[0] : task_int_to_state[x[1]] for x in st }
return states
[docs] def count_task_states(self, tasktype):
"""Return a dictionary of how many tasks are in each state
Args:
tasktype (str): the type of these tasks.
Returns:
dict: keyed by state, values are number of tasks in that state0
"""
state_count = OrderedDict()
for state in task_states:
state_count[state] = 0
with self.cursor() as cur:
cur.execute( 'select name, state from {}'.format(tasktype))
for name, intstate in cur.fetchall():
state_count[task_int_to_state[intstate]] += 1
return state_count
[docs] def get_states(self, tasks):
"""Efficiently get the state of many tasks at once.
Args:
tasks (list): list of task names.
Returns:
dict: the state of each task.
"""
from .tasks.base import task_classes, task_type
# Sort by type
taskbytype = task_sort(tasks)
# Get state of each type
states = dict()
for t, tlist in taskbytype.items():
states.update(self.get_states_type(t, tlist))
return states
[docs] def set_states_type(self, tasktype, tasks, postprocessing=True):
"""Efficiently get the state of many tasks of a single type.
Args:
tasktype (str): the type of these tasks.
tasks (list): list of tuples containing the task name and the
state to set.
Returns:
Nothing.
"""
from .tasks.base import task_classes
log = get_logger()
log.debug("opening db")
with self.cursor() as cur:
log.debug("updating in db")
for tsk in tasks:
cur.execute("update {} set state = {} where name = '{}'".format(tasktype, task_state_to_int[tsk[1]], tsk[0]))
if postprocessing and tsk[1]=="done" :
task_classes[tasktype].postprocessing(db=self,name=tsk[0],cur=cur)
log.debug("done")
return
[docs] def set_states(self, tasks):
"""Efficiently set the state of many tasks at once.
Args:
tasks (list): list of tuples containing the task name and the
state to set.
Returns:
Nothing.
"""
from .tasks.base import task_classes, task_type
# First find the type of each task.
ttypes = dict()
for tsk in tasks:
ttypes[tsk[0]] = task_type(tsk[0])
# Sort tasks into types
taskbytype = dict()
for t in all_task_types():
taskbytype[t] = list()
for tsk in tasks:
taskbytype[ttypes[tsk[0]]].append(tsk)
# Process each type
for t, tlist in taskbytype.items():
if len(tlist) > 0:
self.set_states_type(t, tlist)
return
[docs] def get_submitted(self, tasks):
"""Return the submitted flag for the list of tasks.
Args:
tasks (list): list of task names.
Returns:
(dict): the boolean submitted state of each task (True means that
the task has been submitted).
"""
from .tasks.base import task_type
# Sort by type
taskbytype = task_sort(tasks)
# Process each type
submitted = dict()
for t, tlist in taskbytype.items():
if (t == "spectra") or (t == "redshift"):
raise RuntimeError("spectra and redshift tasks do not have submitted flag.")
namelist = ",".join([ "'{}'".format(x) for x in tlist ])
with self.cursor() as cur:
cur.execute(\
'select name, submitted from {} where name in ({})'.format(t, namelist))
sb = cur.fetchall()
submitted.update({ x[0] : x[1] for x in sb })
return submitted
[docs] def set_submitted_type(self, tasktype, tasks, unset=False):
"""Flag a list of tasks of a single type as submitted.
Args:
tasktype (str): the type of these tasks.
tasks (list): list of task names.
unset (bool): if True, invert the behavior and unset the submitted
flag for these tasks.
Returns:
Nothing.
"""
val = 1
if unset:
val = 0
with self.cursor() as cur:
for tsk in tasks:
cur.execute("update {} set submitted = {} where name = '{}'".format(tasktype, val, tsk))
return
[docs] def set_submitted(self, tasks, unset=False):
"""Flag a list of tasks as submitted.
Args:
tasks (list): list of task names.
unset (bool): if True, invert the behavior and unset the submitted
flag for these tasks.
Returns:
Nothing.
"""
from .tasks.base import task_type
# Sort by type
taskbytype = task_sort(tasks)
# Process each type
for t, tlist in taskbytype.items():
if (t == "spectra") or (t == "redshift"):
raise RuntimeError("spectra and redshift tasks do not have submitted flag.")
self.set_submitted_type(tlist, unset=unset)
return
[docs] def update(self, night, nside, expid=None):
"""Update DB based on raw data.
This will use the usual io.meta functions to find raw exposures. For
each exposure, the fibermap and all following objects will be added to
the DB.
Args:
night (str): The night to scan for updates.
nside (int): The current NSIDE value used for pixel grouping.
expid (int): Only update the DB for this exposure.
"""
from .tasks.base import task_classes, task_type
log = get_logger()
alltasks, healpix_frames = all_tasks(night, nside, expid=expid)
with self.cursor() as cur:
# insert or ignore all healpix_frames
log.debug("updating healpix_frame ...")
for entry in healpix_frames:
# see if we already have this entry
cmd = "select exists(select 1 from healpix_frame where (expid = {} and spec = {} and nside = {} and pixel = {} ))".format(entry["expid"], entry["spec"], entry["nside"], entry["pixel"])
cur.execute(cmd)
have_row = cur.fetchone()[0]
if not have_row:
cur.execute("insert into healpix_frame (night,expid,spec,nside,pixel,ntargets,state) values({},{},{},{},{},{},{})".format(entry["night"],entry["expid"],entry["spec"],entry["nside"],entry["pixel"],entry["ntargets"],0))
# read what is already in db
tasks_in_db = {}
for tt in all_task_types():
cur.execute("select name from {}".format(tt))
tasks_in_db[tt] = [ x for (x, ) in cur.fetchall()]
for tt in all_task_types():
log.debug("updating {} ...".format(tt))
for tsk in alltasks[tt]:
tname = task_classes[tt].name_join(tsk)
if tname not in tasks_in_db[tt] :
log.debug("adding {}".format(tname))
task_classes[tt].insert(cur, tsk)
return
[docs] def sync(self, night, specdone=False):
"""Update states of tasks based on filesystem.
Go through all tasks in the DB for the given night and determine their
state on the filesystem. Then update the DB state to match.
Args:
night (str): The night to scan for updates.
specdone: If true, set spectra to done if files exist.
"""
from .tasks.base import task_classes
log = get_logger()
# Get the list of task types excluding spectra and redshifts,
# which will be handled separately.
ttypes = [ t for t in all_task_types() if (t != "spectra") \
and (t != "redshift") ]
tasks_in_db = None
# Grab existing nightly tasks
with self.cursor() as cur:
tasks_in_db = {}
for tt in ttypes:
cur.execute("select name from {} where night = {}"\
.format(tt, night))
tasks_in_db[tt] = [ x for (x, ) in cur.fetchall() ]
# For each task type, check status WITHOUT the DB, then set state.
# Save out the cframe states for later use with the healpix_frame table
cfstates = None
for tt in ttypes:
tstates = check_tasks(tasks_in_db[tt], db=None)
st = [ (x, tstates[x]) for x in tasks_in_db[tt] ]
self.set_states_type(tt, st)
if tt == "cframe":
cfstates = tstates.copy()
# Now examine the spectra and redshift files. If the files exist,
# we assume they are done and completely up to date. If the files
# are not up to date, they must be manually deleted in order for the
# sync to correctly reconstruct the database state.
pixrows = self.select_healpix_frame({"night" : night})
# First check the existence of the files touched by this night
spec_exists = dict()
red_exists = dict()
for row in pixrows:
if row["pixel"] in spec_exists:
continue
spec_name = task_classes["spectra"].name_join(row)
red_name = task_classes["redshift"].name_join(row)
# Check spectra outputs
outfiles = task_classes["spectra"].paths(spec_name)
spec_exists[row["pixel"]] = True
for out in outfiles:
if not os.path.isfile(out):
spec_exists[row["pixel"]] = False
break
# Check redshift outputs
outfiles = task_classes["redshift"].paths(red_name)
red_exists[row["pixel"]] = True
for out in outfiles:
if not os.path.isfile(out):
red_exists[row["pixel"]] = False
break
# Now use all this info. Some internal helpers to avoid code
# duplication
def set_hpx_frame_0(row, spec, red, cur):
self.update_healpix_frame_state(row, 0, cur)
task_classes["spectra"].state_set(
self, spec, "waiting", cur)
task_classes["redshift"].state_set(
self, red, "waiting", cur)
return
def set_hpx_frame_1(row, spec, red, cur):
self.update_healpix_frame_state(row, 1, cur)
# getready() will do this for us:
#task_classes["spectra"].state_set(
# self, spec, "ready", cur)
task_classes["redshift"].state_set(
self, red, "waiting", cur)
return
def set_hpx_frame_2(row, spec, red, cur):
self.update_healpix_frame_state(row, 2, cur)
task_classes["spectra"].state_set(
self, spec, "done", cur)
# getready() will do this:
#task_classes["redshift"].state_set(
# self, red, "ready", cur)
return
def set_hpx_frame_3(row, spec, red, cur):
self.update_healpix_frame_state(row, 3, cur)
task_classes["spectra"].state_set(
self, spec, "done", cur)
task_classes["redshift"].state_set(
self, red, "done", cur)
return
with self.cursor() as cur:
for row in pixrows:
cfdone = True
cfprops = row.copy()
for band in ["b", "r", "z"]:
cfprops["band"] = band
cf_name = task_classes["cframe"].name_join(cfprops)
if cfstates[cf_name] != "done":
cfdone = False
spec_name = task_classes["spectra"].name_join(row)
red_name = task_classes["redshift"].name_join(row)
if (not cfdone) and (not specdone) :
# The cframes do not exist, so reset the state of the
# spectra and redshift tasks.
set_hpx_frame_0(row, spec_name, red_name, cur)
else:
# The cframe exists...
if spec_exists[row["pixel"]]:
if red_exists[row["pixel"]]:
# We are all done (state 3)
set_hpx_frame_3(row, spec_name, red_name, cur)
else:
# We are only at state 2
set_hpx_frame_2(row, spec_name, red_name, cur)
else:
# We are at just at state 1
set_hpx_frame_1(row, spec_name, red_name, cur)
# Update ready state of tasks
self.getready(night=night)
return
[docs] def cleanup(self, tasktypes=None, expid=None, cleanfailed=False,
cleansubmitted=False):
"""Reset states of tasks.
Any tasks that are marked as "running" will have their
state reset to "ready". This can be called if a job dies before
completing all tasks.
Args:
tasktypes (list): if not None, clean up only tasks of these types.
expid (int): if not None, only clean tasks related to this
exposure ID. Note that tasks which are independent of
an expid (psfnight, fiberflatnight, spectra, redshift)
will be ignored if this option is given.
cleanfailed (bool): if True, also reset failed tasks to ready.
cleansubmitted (bool): if True, set submitted flag to False.
"""
tasks_running = None
alltypes = all_task_types()
ttypes = None
if tasktypes is None:
ttypes = alltypes
else:
for tt in tasktypes:
if tt not in alltypes:
raise RuntimeError("Cannot clean invalid task type {}"\
.format(tt))
ttypes = tasktypes
# Grab existing nightly tasks
with self.cursor() as cur:
tasks_running = {}
for tt in ttypes:
hasexpid = (tt not in ["psfnight", "fiberflatnight", "spectra",
"redshift"])
if hasexpid:
# This task type has an expid property.
cmd = None
if expid is not None:
# We are cleaning only a single exposure.
cmd = "select name from {} where expid = {} and ( state = {}".format(tt, expid, task_state_to_int["running"])
else:
# We are cleaning all exposures for this task type.
cmd = "select name from {} where ( state = {}".format(tt, task_state_to_int["running"])
if cleanfailed:
cmd = "{} or state = {} )".format(cmd,
task_state_to_int["failed"])
else:
cmd = "{} )".format(cmd)
cur.execute(cmd)
tasks_running[tt] = [ x for (x, ) in cur.fetchall() ]
if cleansubmitted:
if expid is not None:
cmd = "update {} set submitted = 0 where expid = {}".format(tt, expid)
else:
cmd = "update {} set submitted = 0".format(tt)
cur.execute(cmd)
else:
# This task type has no concept of an exposure ID
if expid is not None:
# We specified an exposure ID, which makes no sense
# for this task type. Skip it.
tasks_running[tt] = list()
continue
else:
# cleanup this task type.
cmd = "select name from {} where ( state = {}".format(tt, task_state_to_int["running"])
if cleanfailed:
cmd = "{} or state = {} )".format(cmd,
task_state_to_int["failed"])
else:
cmd = "{} )".format(cmd)
cur.execute(cmd)
tasks_running[tt] = [ x for (x, ) in cur.fetchall() ]
if cleansubmitted:
if (tt != "spectra") and (tt != "redshift"):
cmd = "update {} set submitted = 0".format(tt)
cur.execute(cmd)
for tt in ttypes:
if len(tasks_running[tt]) > 0:
st = [ (x, "waiting") for x in tasks_running[tt] ]
self.set_states_type(tt, st)
self.getready()
return
[docs] def getready(self, night=None):
"""Update DB, changing waiting to ready depending on status of dependencies .
Args:
night (str): The night to process.
"""
from .tasks.base import task_classes, task_type
log = get_logger()
# Get the list of task types excluding spectra and redshifts,
# which will be handled separately.
ttypes = [ t for t in all_task_types() if (t != "spectra") \
and (t != "redshift") ]
with self.cursor() as cur:
for tt in ttypes:
# for each type of task, get the list of tasks in waiting mode
cmd = "select name from {} where state = {}".format(tt, task_state_to_int["waiting"])
if night is not None:
cmd = "{} and night = {}".format(cmd, night)
cur.execute(cmd)
tasks = [ x for (x, ) in cur.fetchall()]
if len(tasks) > 0:
log.debug("checking {} {} tasks ...".format(len(tasks),tt))
for tsk in tasks:
task_classes[tt].getready(db=self, name=tsk, cur=cur)
for tt in [ "spectra" , "redshift" ]:
if tt == "spectra":
required_healpix_frame_state = 1
# means we have a cframe
elif tt == "redshift":
required_healpix_frame_state = 2
# means we have an updated spectra file
cur.execute('select nside,pixel from healpix_frame where state = {}'.format(required_healpix_frame_state))
entries = cur.fetchall()
for entry in entries :
log.debug("{} of pixel {} is ready to run".format(tt,entry[1]))
cur.execute('update {} set state = {} where nside = {} and pixel = {}'.format(tt,task_state_to_int["ready"],entry[0],entry[1]))
log.debug("checking waiting {} tasks to see if they are done...".format(tt))
cmd = "select pixel from {} where state = {}".format(tt, task_state_to_int["waiting"])
cur.execute(cmd)
pixels = [ x for (x, ) in cur.fetchall()]
if len(pixels) > 0:
log.debug("checking {} {} ...".format(len(pixels),tt))
if tt == "spectra":
required_healpix_frame_state = 2
elif tt == "redshift":
required_healpix_frame_state = 3
for pixel in pixels:
cur.execute('select pixel from healpix_frame where pixel = {} and state != {}'.format(pixel,required_healpix_frame_state))
entries = cur.fetchall()
if len(entries)==0 :
log.debug("{} task of pixel {} is done".format(tt,pixel))
cur.execute('update {} set state = {} where pixel = {}'.format(tt,task_state_to_int["done"],pixel))
return
def update_healpix_frame_state(self, props, state, cur):
if "expid" in props :
# update from a cframe
cmd = "update healpix_frame set state = {} where expid = {} and spec = {} and state = {}".format(state,props["expid"],props["spec"],props["state"])
else :
# update from a spectra or redshift task
cmd = "update healpix_frame set state = {} where nside = {} and pixel = {} and state = {}".format(state,props["nside"],props["pixel"],props["state"])
if cur is None :
with self.cursor() as cur:
cur.execute(cmd)
else :
cur.execute(cmd)
return
def select_healpix_frame(self, props):
res = []
with self.cursor() as cur:
cmd = "select * from healpix_frame where "
first=True
for k in props.keys() :
if not first : cmd += " and "
first=False
cmd += "{}={}".format(k,props[k])
cur.execute(cmd)
entries = cur.fetchall()
# convert that to list of dictionaries
for entry in entries :
tmp = dict()
for i, k in enumerate(["night", "expid", "spec", "nside",
"pixel", "ntargets", "state"]):
tmp[k] = entry[i]
res.append(tmp)
return res
def create_healpix_frame_table(self) :
with self.cursor() as cur:
cmd = "create table healpix_frame (night integer, expid integer, spec integer, nside integer, pixel integer, ntargets integer, state integer, unique(expid, spec, nside, pixel))"
cur.execute(cmd)
return
[docs]class DataBaseSqlite(DataBase):
"""Pipeline database using sqlite3 as the backend.
Args:
path (str): the filesystem path of the database to open. If None, then
a temporary database is created in memory.
mode (str): if "r", the database is open in read-only mode. If "w",
the database is open in read-write mode and created if necessary.
"""
def __init__(self, path, mode):
super(DataBaseSqlite, self).__init__()
self._path = path
self._mode = mode
create = True
if (self._path is not None) and os.path.exists(self._path):
create = False
if self._mode == 'r' and create:
raise RuntimeError("cannot open a non-existent DB in read-only "
" mode")
self._connstr = None
# This timeout is in seconds
self._busytime = 1000
# Journaling options
self._journalmode = "persist"
self._syncmode = "normal"
if create:
self.initdb()
return
def _open(self):
import sqlite3
if self._path is None:
# We are opening an in-memory DB
self._conn = sqlite3.connect(":memory:")
else:
try:
# only python3 supports uri option
if self._mode == 'r':
self._connstr = 'file:{}?mode=ro'.format(self._path)
else:
self._connstr = 'file:{}?mode=rwc'.format(self._path)
self._conn = sqlite3.connect(self._connstr, uri=True,
timeout=self._busytime)
except:
self._conn = sqlite3.connect(self._path, timeout=self._busytime)
if self._mode == 'w':
# In read-write mode, set the journaling
self._conn.execute("pragma journal_mode={}"\
.format(self._journalmode))
self._conn.execute("pragma synchronous={}".format(self._syncmode))
# Other tuning options
self._conn.execute("pragma temp_store=memory")
self._conn.execute("pragma page_size=4096")
self._conn.execute("pragma cache_size=4000")
return
def _close(self):
del self._conn
self._conn = None
return
@contextmanager
def cursor(self):
import sqlite3
self._open()
cur = self._conn.cursor()
cur.execute("begin transaction")
try:
yield cur
except sqlite3.DatabaseError as err:
log = get_logger()
log.error(err)
cur.execute("rollback")
raise err
else:
try:
cur.execute("commit")
except sqlite3.OperationalError:
#- sqlite3 in py3.5 can't commit a read-only finished transaction
pass
finally:
del cur
self._close()
[docs] def initdb(self):
"""Create DB tables for all tasks if they do not exist.
"""
# check existing tables
tables_in_db = None
with self.cursor() as cur:
cur.execute("select name FROM sqlite_master WHERE type='table'")
tables_in_db = [x for (x, ) in cur.fetchall()]
# Create a table for every task type
from .tasks.base import task_classes, task_type
for tt, tc in task_classes.items():
if tt not in tables_in_db:
tc.create(self)
if "healpix_frame" not in tables_in_db:
self.create_healpix_frame_table()
return
[docs]class DataBasePostgres(DataBase):
"""Pipeline database using PostgreSQL as the backend.
Args:
host (str): The database server.
port (int): The connection port.
dbname (str): The database to connect.
user (str): The user name for the connection. The password should be
stored in the ~/.pgpass file.
schema (str): The schema within the database. If this is specified,
then the database is assumed to exist. Otherwise the schema is
computed from a hash of the production location and will be
created.
authorize (str): If creating the schema, this is the list of
additional roles that should be granted access.
"""
def __init__(self, host, port, dbname, user, schema=None, authorize=None):
super(DataBasePostgres, self).__init__()
self._schema = schema
self._user = user
self._dbname = dbname
self._host = host
self._port = port
self._authorize = authorize
self._proddir = os.path.abspath(io.specprod_root())
create = False
if self._schema is None:
create = True
self._schema = self._compute_schema()
if create:
self.initdb()
return
def _compute_schema(self):
import hashlib
md = hashlib.md5()
md.update(self._proddir.encode())
return "pipe_{}".format(md.hexdigest())
def _open(self):
import psycopg2 as pg2
import time
import numpy.random
# Open connection. If psycopg2 raises an exception, then sleep
# for a random time interval and keep trying.
maxtry = 10
ntry = 0
while True:
try:
self._conn = pg2.connect(host=self._host, port=self._port,
user=self._user, dbname=self._dbname)
except pg2.OperationalError as err:
log = get_logger()
log.debug("PostgreSQL connection failed with '{}', will sleep and retry".format(err))
if ntry > maxtry:
log.error(err)
break
numpy.random.seed(int(time.time()))
sec = numpy.random.uniform() * 3.0
time.sleep(sec)
ntry += 1
else:
break
return
def _close(self):
del self._conn
self._conn = None
return
@property
def schema(self):
return self._schema
def _have_schema(self, cur):
com = "select exists(select 1 from pg_namespace where nspname = '{}')".format(self._schema)
cur.execute(com)
return cur.fetchone()[0]
@contextmanager
def cursor(self, skipcheck=False):
import psycopg2
self._open()
cur = self._conn.cursor()
if not skipcheck:
have_schema = self._have_schema(cur)
if not have_schema:
raise RuntimeError("Postgres schema for production {} does"
" not exist. Make sure you create the production with"
" postgres options and source the top-level setup.sh"
" file.".format(self._proddir))
cur.execute("set search_path to '{}'".format(self._schema))
cur.execute("begin transaction")
try:
yield cur
except psycopg2.DatabaseError as err:
log = get_logger()
log.error(err)
cur.execute("rollback")
raise err
else:
cur.execute("commit")
finally:
del cur
self._close()
[docs] def initdb(self):
"""Create DB tables for all tasks if they do not exist.
"""
log = get_logger()
# Check existence of the schema. If we were not passed the schema
# in the constructor, it means that we are creating a new prod, so any
# existing schema should be wiped and recreated.
tables_in_db = None
with self.cursor(skipcheck=True) as cur:
# See if our schema already exists...
have_schema = self._have_schema(cur)
if have_schema:
# We need to wipe it first
com = "drop schema {} cascade".format(self._schema)
log.debug(com)
cur.execute(com)
com = "create schema {} authorization {}"\
.format(self._schema, self._user)
log.debug(com)
cur.execute(com)
if self._authorize is not None:
com = "grant usage on schema {} to {}"\
.format(self._schema, self._authorize)
log.debug(com)
cur.execute(com)
com = "alter default privileges in schema {} grant select on tables to {}".format(self._schema, self._authorize)
log.debug(com)
cur.execute(com)
com = "alter default privileges in schema {} grant select,usage on sequences to {}".format(self._schema, self._authorize)
log.debug(com)
cur.execute(com)
com = "alter default privileges in schema {} grant execute on functions to {}".format(self._schema, self._authorize)
log.debug(com)
cur.execute(com)
com = "alter default privileges in schema {} grant usage on types to {}".format(self._schema, self._authorize)
log.debug(com)
cur.execute(com)
# Create a table of information about this prod
com = "create table {}.info (key text unique, val text)"\
.format(self._schema)
log.debug(com)
cur.execute(com)
com = "insert into {}.info values ('{}', '{}')"\
.format(self._schema, "path", self._proddir)
log.debug(com)
cur.execute(com)
if 'USER' in os.environ:
com = "insert into {}.info values ('{}', '{}')"\
.format(self._schema, "created_by", os.environ['USER'])
log.debug(com)
cur.execute(com)
# check existing tables
cur.execute("select tablename from pg_tables where schemaname = '{}'".format(self.schema))
tables_in_db = [x for (x, ) in cur.fetchall()]
# Create a table for every task type
from .tasks.base import task_classes, task_type
for tt, tc in task_classes.items():
if tt not in tables_in_db:
tc.create(self)
if "healpix_frame" not in tables_in_db:
self.create_healpix_frame_table()
return
[docs]def load_db(dbstring, mode="w", user=None):
"""Load a database from a connection string.
This instantiates either an sqlite or postgresql database using a string.
If this string begins with "postgresql:", then it is taken to be the
information needed to connect to a postgres server. Otherwise it is
assumed to be a filesystem path to use with sqlite. The mode is only
meaningful when using sqlite. Postgres permissions are controlled through
the user permissions.
Args:
dbstring (str): either a filesystem path (sqlite) or a colon-separated
string of connection properties in the form
"postresql:<host>:<port>:<dbname>:<user>:<schema>".
mode (str): for sqlite, the mode.
user (str): for postgresql, an alternate user name for opening the DB.
This can be used to connect as a user with read-only access.
Returns:
DataBase: a derived database class of the appropriate type.
"""
if re.search(r"postgresql:", dbstring) is not None:
props = dbstring.split(":")
host = props[1]
port = int(props[2])
dbname = props[3]
username = props[4]
if user is not None:
username = user
schema = None
if len(props) > 5:
# Our DB string also contains the name of an existing
# schema.
schema = props[5]
return DataBasePostgres(host=host, port=port, dbname=dbname,
user=username, schema=schema)
else:
return DataBaseSqlite(dbstring, mode)