# Licensed under a 3-clause BSD style license - see LICENSE.rst
# -*- coding: utf-8 -*-
"""
desispec.database.redshift
==========================
Code for loading a spectroscopic production database. This includes both
targeting and redshift data.
Notes
-----
* EDR development:
- Load as much data as possible from patched fiberassign files.
- If necessary, create intermediate files for ingestion by running
the lsdr9-photometry_ code.
- In the tractorphot files, TARGETID really is unique. However, some
TARGETIDs are not present because they do not match an object to within
1 arcsec. For these, fill in what can be filled in from target files or
fiberassign files.
- For every duplicate potential targetid, use only the most recent desitarget
version containing that targetid, rather than the default associated with
the fiberassign file.
- Read redshifts from ``zall-*.fits`` files to get the ``*_primary``
columns, or run the primary generation code on the data
(:func:`~desispec.zcatalog.find_primary_spectra`).
* Future devlopment:
- Plan for how to support fuji+guadalupe combined analysis. May need to look
into cross-schema views, or daughter tables that inherit from both schemas.
- Anticipating loading afterburners and VACs into the database.
- Load redshifts from all redrock files in ``tiles/cumulative``, rather than
from the ``ztile-*-cumulative.fits`` summary file.
.. _lsdr9-photometry: https://github.com/moustakas/lsdr9-photometry
"""
import os
import re
import glob
import sys
import numpy as np
from astropy.io import fits
from astropy.table import Table, MaskedColumn, join
from astropy.time import Time
from pytz import utc
from sqlalchemy import (create_engine, event, ForeignKey, Column, DDL,
BigInteger, Boolean, Integer, String, Float, DateTime,
SmallInteger, bindparam, Numeric)
from sqlalchemy.exc import IntegrityError
from sqlalchemy.orm import (declarative_base, declarative_mixin, declared_attr,
scoped_session, sessionmaker, relationship)
from sqlalchemy.schema import CreateSchema, Index
from sqlalchemy.dialects.postgresql import DOUBLE_PRECISION, REAL
from desiutil.iers import freeze_iers
from desiutil.log import get_logger, DEBUG, INFO
# from desitarget.targets import decode_targetid
from ..io.meta import specprod_root, faflavor2program
from ..io.util import checkgzip
from .util import convert_dateobs, parse_pgpass, cameraid, surveyid, programid, spgrpid
Base = declarative_base()
engine = None
dbSession = scoped_session(sessionmaker())
schemaname = None
log = None
@declarative_mixin
class SchemaMixin(object):
"""Mixin class to allow schema name to be changed at runtime. Also
automatically sets the table name.
"""
@declared_attr
def __tablename__(cls):
return cls.__name__.lower()
@declared_attr
def __table_args__(cls):
return {'schema': schemaname}
[docs]class Photometry(SchemaMixin, Base):
"""Contains *only* photometric quantities associated with a ``TARGETID``.
This table is deliberately designed so that ``TARGETID`` can serve as a
primary key. Any quantities created or modified by desitarget are
defined in the :class:`~desispec.database.redshift.Target` class.
However we *avoid* the use of the term "tractor" for this table,
because not every target will have *tractor* photometry,
Notes
-----
The various ``LC`` (light curve) columns,
which are vector-valued, are not yet implemented.
"""
ls_id = Column(BigInteger, nullable=False, index=True) # (release << 40) | (brickid << 16) | brick_objid
release = Column(SmallInteger, nullable=False) # targetphot, fiberassign
brickid = Column(Integer, nullable=False) # targetphot, fiberassign
brickname = Column(String(8), nullable=False) # targetphot, fiberassign
brick_objid = Column(Integer, nullable=False) # targetphot, fiberassign
morphtype = Column(String(4), nullable=False) # targetphot, fiberassign
ra = Column(DOUBLE_PRECISION, nullable=False) # targetphot, fiberassign: target_ra?
ra_ivar = Column(REAL, nullable=False) # targetphot
dec = Column(DOUBLE_PRECISION, nullable=False) # targetphot, fiberassign: target_dec?
dec_ivar = Column(REAL, nullable=False) # targetphot
dchisq_psf = Column(REAL, nullable=False) # targetphot
dchisq_rex = Column(REAL, nullable=False) # targetphot
dchisq_dev = Column(REAL, nullable=False) # targetphot
dchisq_exp = Column(REAL, nullable=False) # targetphot
dchisq_ser = Column(REAL, nullable=False) # targetphot
ebv = Column(REAL, nullable=False) # targetphot, fiberassign
flux_g = Column(REAL, nullable=False) # targetphot, fiberassign
flux_r = Column(REAL, nullable=False) # targetphot, fiberassign
flux_z = Column(REAL, nullable=False) # targetphot, fiberassign
flux_ivar_g = Column(REAL, nullable=False) # targetphot, fiberassign
flux_ivar_r = Column(REAL, nullable=False) # targetphot, fiberassign
flux_ivar_z = Column(REAL, nullable=False) # targetphot, fiberassign
mw_transmission_g = Column(REAL, nullable=False) # targetphot
mw_transmission_r = Column(REAL, nullable=False) # targetphot
mw_transmission_z = Column(REAL, nullable=False) # targetphot
fracflux_g = Column(REAL, nullable=False) # targetphot
fracflux_r = Column(REAL, nullable=False) # targetphot
fracflux_z = Column(REAL, nullable=False) # targetphot
fracmasked_g = Column(REAL, nullable=False) # targetphot
fracmasked_r = Column(REAL, nullable=False) # targetphot
fracmasked_z = Column(REAL, nullable=False) # targetphot
fracin_g = Column(REAL, nullable=False) # targetphot
fracin_r = Column(REAL, nullable=False) # targetphot
fracin_z = Column(REAL, nullable=False) # targetphot
nobs_g = Column(SmallInteger, nullable=False) # targetphot
nobs_r = Column(SmallInteger, nullable=False) # targetphot
nobs_z = Column(SmallInteger, nullable=False) # targetphot
psfdepth_g = Column(REAL, nullable=False) # targetphot
psfdepth_r = Column(REAL, nullable=False) # targetphot
psfdepth_z = Column(REAL, nullable=False) # targetphot
galdepth_g = Column(REAL, nullable=False) # targetphot
galdepth_r = Column(REAL, nullable=False) # targetphot
galdepth_z = Column(REAL, nullable=False) # targetphot
flux_w1 = Column(REAL, nullable=False) # targetphot, fiberassign
flux_w2 = Column(REAL, nullable=False) # targetphot, fiberassign
flux_w3 = Column(REAL, nullable=False) # targetphot
flux_w4 = Column(REAL, nullable=False) # targetphot
flux_ivar_w1 = Column(REAL, nullable=False) # fiberassign
flux_ivar_w2 = Column(REAL, nullable=False) # fiberassign
flux_ivar_w3 = Column(REAL, nullable=False) # targetphot
flux_ivar_w4 = Column(REAL, nullable=False) # targetphot
mw_transmission_w1 = Column(REAL, nullable=False) # targetphot
mw_transmission_w2 = Column(REAL, nullable=False) # targetphot
mw_transmission_w3 = Column(REAL, nullable=False) # targetphot
mw_transmission_w4 = Column(REAL, nullable=False) # targetphot
allmask_g = Column(SmallInteger, nullable=False) # targetphot
allmask_r = Column(SmallInteger, nullable=False) # targetphot
allmask_z = Column(SmallInteger, nullable=False) # targetphot
fiberflux_g = Column(REAL, nullable=False) # targetphot, fiberassign
fiberflux_r = Column(REAL, nullable=False) # targetphot, fiberassign
fiberflux_z = Column(REAL, nullable=False) # targetphot, fiberassign
fibertotflux_g = Column(REAL, nullable=False) # targetphot, fiberassign
fibertotflux_r = Column(REAL, nullable=False) # targetphot, fiberassign
fibertotflux_z = Column(REAL, nullable=False) # targetphot, fiberassign
ref_epoch = Column(REAL, nullable=False) # targetphot, fiberassign
wisemask_w1 = Column(SmallInteger, nullable=False) # targetphot
wisemask_w2 = Column(SmallInteger, nullable=False) # targetphot
maskbits = Column(SmallInteger, nullable=False) # targetphot, fiberassign
# LC_...
shape_r = Column(REAL, nullable=False) # targetphot, fiberassign
shape_e1 = Column(REAL, nullable=False) # targetphot, fiberassign
shape_e2 = Column(REAL, nullable=False) # targetphot, fiberassign
shape_r_ivar = Column(REAL, nullable=False) # targetphot
shape_e1_ivar = Column(REAL, nullable=False) # targetphot
shape_e2_ivar = Column(REAL, nullable=False) # targetphot
sersic = Column(REAL, nullable=False) # targetphot, fiberassign
sersic_ivar = Column(REAL, nullable=False) # targetphot
ref_id = Column(BigInteger, nullable=False) # targetphot, fiberassign
ref_cat = Column(String(2), nullable=False) # targetphot, fiberassign
gaia_phot_g_mean_mag = Column(REAL, nullable=False) # targetphot, fiberassign
gaia_phot_g_mean_flux_over_error = Column(REAL, nullable=False) # targetphot
gaia_phot_bp_mean_mag = Column(REAL, nullable=False) # targetphot, fiberassign
gaia_phot_bp_mean_flux_over_error = Column(REAL, nullable=False) # targetphot
gaia_phot_rp_mean_mag = Column(REAL, nullable=False) # targetphot, fiberassign
gaia_phot_rp_mean_flux_over_error = Column(REAL, nullable=False) # targetphot
gaia_phot_bp_rp_excess_factor = Column(REAL, nullable=False) # targetphot
gaia_duplicated_source = Column(Boolean, nullable=False) # targetphot
gaia_astrometric_sigma5d_max = Column(REAL, nullable=False) # targetphot
gaia_astrometric_params_solved = Column(SmallInteger, nullable=False) # targetphot, but inconsistent type!
parallax = Column(REAL, nullable=False) # targetphot, fiberassign
parallax_ivar = Column(REAL, nullable=False) # targetphot
pmra = Column(REAL, nullable=False) # targetphot, fiberassign
pmra_ivar = Column(REAL, nullable=False) # targetphot
pmdec = Column(REAL, nullable=False) # targetphot, fiberassign
pmdec_ivar = Column(REAL, nullable=False) # targetphot
targetid = Column(BigInteger, primary_key=True, autoincrement=False) # targetphot, fiberassign
targets = relationship("Target", back_populates="photometry")
fiberassign = relationship("Fiberassign", back_populates="photometry")
potential = relationship("Potential", back_populates="photometry")
zpix_redshifts = relationship("Zpix", back_populates="photometry")
ztile_redshifts = relationship("Ztile", back_populates="photometry")
def __repr__(self):
return "Photometry(targetid={0.targetid:d})".format(self)
[docs]class Target(SchemaMixin, Base):
"""Representation of the pure-desitarget quantities in the
``TARGETPHOT`` table in the targetphot files.
"""
@declared_attr
def __table_args__(cls):
return (Index(f'ix_{cls.__tablename__}_unique', "targetid", "survey", "tileid", unique=True),
SchemaMixin.__table_args__)
id = Column(Numeric(39), primary_key=True, autoincrement=False)
targetid = Column(BigInteger, ForeignKey('photometry.targetid'), nullable=False, index=True) # fiberassign
photsys = Column(String(1), nullable=False) # fiberassign
subpriority = Column(DOUBLE_PRECISION, nullable=False) # fiberassign
obsconditions = Column(BigInteger, nullable=False) # fiberassign
priority_init = Column(BigInteger, nullable=False) # fiberassign
numobs_init = Column(BigInteger, nullable=False) # fiberassign
hpxpixel = Column(BigInteger, nullable=False, index=True)
cmx_target = Column(BigInteger, nullable=False)
desi_target = Column(BigInteger, nullable=False)
bgs_target = Column(BigInteger, nullable=False)
mws_target = Column(BigInteger, nullable=False)
sv1_desi_target = Column(BigInteger, nullable=False)
sv1_bgs_target = Column(BigInteger, nullable=False)
sv1_mws_target = Column(BigInteger, nullable=False)
sv2_desi_target = Column(BigInteger, nullable=False)
sv2_bgs_target = Column(BigInteger, nullable=False)
sv2_mws_target = Column(BigInteger, nullable=False)
sv3_desi_target = Column(BigInteger, nullable=False)
sv3_bgs_target = Column(BigInteger, nullable=False)
sv3_mws_target = Column(BigInteger, nullable=False)
scnd_target = Column(BigInteger, nullable=False)
sv1_scnd_target = Column(BigInteger, nullable=False)
sv2_scnd_target = Column(BigInteger, nullable=False)
sv3_scnd_target = Column(BigInteger, nullable=False)
survey = Column(String(7), nullable=False, index=True)
program = Column(String(6), nullable=False, index=True)
tileid = Column(Integer, ForeignKey('tile.tileid'), nullable=False, index=True) # fiberassign
photometry = relationship("Photometry", back_populates="targets")
tile = relationship("Tile", back_populates="targets")
def __repr__(self):
return "Target(targetid={0.targetid:d}, tileid={0.tileid:d}, survey='{0.survey}')".format(self)
[docs]class Tile(SchemaMixin, Base):
"""Representation of the tiles file.
Notes
-----
Most of the data that are currently in the tiles file are derivable
from the exposures table with much greater precision::
CREATE VIEW f5.tile AS SELECT tileid,
-- SURVEY, FAPRGRM, FAFLAVOR?
COUNT(*) AS nexp, SUM(exptime) AS exptime,
MIN(tilera) AS tilera, MIN(tiledec) AS tiledec,
SUM(efftime_etc) AS efftime_etc, SUM(efftime_spec) AS efftime_spec,
SUM(efftime_gfa) AS efftime_gfa, MIN(goaltime) AS goaltime,
-- OBSSTATUS?
SUM(lrg_efftime_dark) AS lrg_efftime_dark,
SUM(elg_efftime_dark) AS elg_efftime_dark,
SUM(bgs_efftime_bright) AS bgs_efftime_bright,
SUM(lya_efftime_dark) AS lya_efftime_dark,
-- GOALTYPE?
MIN(mintfrac) AS mintfrac, MAX(night) AS lastnight
FROM f5.exposure GROUP BY tileid;
However because of some unresolved discrepancies, we'll just load the
full tiles file for now.
"""
tileid = Column(Integer, primary_key=True, autoincrement=False)
survey = Column(String(20), nullable=False)
program = Column(String(6), nullable=False)
faprgrm = Column(String(20), nullable=False)
faflavor = Column(String(20), nullable=False)
nexp = Column(BigInteger, nullable=False) # In principle this could be replaced by a count of exposures
exptime = Column(DOUBLE_PRECISION, nullable=False)
tilera = Column(DOUBLE_PRECISION, nullable=False) #- Calib exposures don't have RA, dec
tiledec = Column(DOUBLE_PRECISION, nullable=False)
efftime_etc = Column(DOUBLE_PRECISION, nullable=False)
efftime_spec = Column(DOUBLE_PRECISION, nullable=False)
efftime_gfa = Column(DOUBLE_PRECISION, nullable=False)
goaltime = Column(DOUBLE_PRECISION, nullable=False)
obsstatus = Column(String(20), nullable=False)
lrg_efftime_dark = Column(DOUBLE_PRECISION, nullable=False)
elg_efftime_dark = Column(DOUBLE_PRECISION, nullable=False)
bgs_efftime_bright = Column(DOUBLE_PRECISION, nullable=False)
lya_efftime_dark = Column(DOUBLE_PRECISION, nullable=False)
goaltype = Column(String(20), nullable=False)
mintfrac = Column(DOUBLE_PRECISION, nullable=False)
lastnight = Column(Integer, nullable=False) # In principle this could be replaced by MAX(night) grouped by exposures.
exposures = relationship("Exposure", back_populates="tile")
fiberassign = relationship("Fiberassign", back_populates="tile")
potential = relationship("Potential", back_populates="tile")
targets = relationship("Target", back_populates="tile")
ztile_redshifts = relationship("Ztile", back_populates="tile")
def __repr__(self):
return "Tile(tileid={0.tileid:d})".format(self)
[docs]class Exposure(SchemaMixin, Base):
"""Representation of the EXPOSURES HDU in the exposures file.
Notes
-----
The column ``program`` is filled in via :func:`~desispec.io.meta.faflavor2program`.
"""
night = Column(Integer, nullable=False, index=True)
expid = Column(Integer, primary_key=True, autoincrement=False)
tileid = Column(Integer, ForeignKey('tile.tileid'), nullable=False, index=True)
tilera = Column(DOUBLE_PRECISION, nullable=False) #- Calib exposures don't have RA, dec
tiledec = Column(DOUBLE_PRECISION, nullable=False)
date_obs = Column(DateTime(True), nullable=False)
mjd = Column(DOUBLE_PRECISION, nullable=False)
survey = Column(String(7), nullable=False)
program = Column(String(6), nullable=False)
faprgrm = Column(String(16), nullable=False)
faflavor = Column(String(19), nullable=False)
exptime = Column(DOUBLE_PRECISION, nullable=False)
efftime_spec = Column(DOUBLE_PRECISION, nullable=False)
goaltime = Column(DOUBLE_PRECISION, nullable=False)
goaltype = Column(String(6), nullable=False)
mintfrac = Column(DOUBLE_PRECISION, nullable=False)
airmass = Column(REAL, nullable=False)
ebv = Column(DOUBLE_PRECISION, nullable=False)
seeing_etc = Column(DOUBLE_PRECISION, nullable=False)
efftime_etc = Column(REAL, nullable=False)
tsnr2_elg = Column(REAL, nullable=False)
tsnr2_qso = Column(REAL, nullable=False)
tsnr2_lrg = Column(REAL, nullable=False)
tsnr2_lya = Column(DOUBLE_PRECISION, nullable=False)
tsnr2_bgs = Column(REAL, nullable=False)
tsnr2_gpbdark = Column(REAL, nullable=False)
tsnr2_gpbbright = Column(REAL, nullable=False)
tsnr2_gpbbackup = Column(REAL, nullable=False)
lrg_efftime_dark = Column(REAL, nullable=False)
elg_efftime_dark = Column(REAL, nullable=False)
bgs_efftime_bright = Column(REAL, nullable=False)
lya_efftime_dark = Column(DOUBLE_PRECISION, nullable=False)
gpb_efftime_dark = Column(REAL, nullable=False)
gpb_efftime_bright = Column(REAL, nullable=False)
gpb_efftime_backup = Column(REAL, nullable=False)
transparency_gfa = Column(DOUBLE_PRECISION, nullable=False)
seeing_gfa = Column(DOUBLE_PRECISION, nullable=False)
fiber_fracflux_gfa = Column(DOUBLE_PRECISION, nullable=False)
fiber_fracflux_elg_gfa = Column(DOUBLE_PRECISION, nullable=False)
fiber_fracflux_bgs_gfa = Column(DOUBLE_PRECISION, nullable=False)
fiberfac_gfa = Column(DOUBLE_PRECISION, nullable=False)
fiberfac_elg_gfa = Column(DOUBLE_PRECISION, nullable=False)
fiberfac_bgs_gfa = Column(DOUBLE_PRECISION, nullable=False)
airmass_gfa = Column(DOUBLE_PRECISION, nullable=False)
sky_mag_ab_gfa = Column(DOUBLE_PRECISION, nullable=False)
sky_mag_g_spec = Column(DOUBLE_PRECISION, nullable=False)
sky_mag_r_spec = Column(DOUBLE_PRECISION, nullable=False)
sky_mag_z_spec = Column(DOUBLE_PRECISION, nullable=False)
efftime_gfa = Column(DOUBLE_PRECISION, nullable=False)
efftime_dark_gfa = Column(DOUBLE_PRECISION, nullable=False)
efftime_bright_gfa = Column(DOUBLE_PRECISION, nullable=False)
efftime_backup_gfa = Column(DOUBLE_PRECISION, nullable=False)
tile = relationship("Tile", back_populates="exposures")
frames = relationship("Frame", back_populates="exposure")
def __repr__(self):
return "Exposure(night={0.night:d}, expid={0.expid:d}, tileid={0.tileid:d})".format(self)
[docs]class Frame(SchemaMixin, Base):
"""Representation of the FRAMES HDU in the exposures file.
Notes
-----
The column ``frameid`` is a combination of ``expid`` and the camera name::
frameid = 100*expid + cameraid(camera)
where ``cameraid()`` is :func:`desispec.database.util.cameraid`.
"""
frameid = Column(Integer, primary_key=True, autoincrement=False) # Arbitrary integer composed from expid + cameraid
# frameid = Column(BigInteger, primary_key=True, autoincrement=True)
night = Column(Integer, nullable=False, index=True)
expid = Column(Integer, ForeignKey('exposure.expid'), nullable=False)
tileid = Column(Integer, nullable=False, index=True)
# 4 TILERA D
# 5 TILEDEC D
# 6 MJD D
mjd = Column(DOUBLE_PRECISION, nullable=False)
# 7 EXPTIME E
exptime = Column(REAL, nullable=False)
# 8 AIRMASS E
# 9 EBV E
ebv = Column(REAL, nullable=False)
# 10 SEEING_ETC D
# 11 EFFTIME_ETC E
# 12 CAMERA 2A
camera = Column(String(2), nullable=False)
# 13 TSNR2_GPBDARK E
# 14 TSNR2_ELG E
# 15 TSNR2_GPBBRIGHT E
# 16 TSNR2_LYA D
# 17 TSNR2_BGS E
# 18 TSNR2_GPBBACKUP E
# 19 TSNR2_QSO E
# 20 TSNR2_LRG E
tsnr2_gpbdark = Column(REAL, nullable=False)
tsnr2_elg = Column(REAL, nullable=False)
tsnr2_gpbbright = Column(REAL, nullable=False)
tsnr2_lya = Column(DOUBLE_PRECISION, nullable=False)
tsnr2_bgs = Column(REAL, nullable=False)
tsnr2_gpbbackup = Column(REAL, nullable=False)
tsnr2_qso = Column(REAL, nullable=False)
tsnr2_lrg = Column(REAL, nullable=False)
# 21 SURVEY 7A
# 22 GOALTYPE 6A
# 23 FAPRGRM 15A
# 24 FAFLAVOR 18A
# 25 MINTFRAC D
# 26 GOALTIME D
exposure = relationship("Exposure", back_populates="frames")
def __repr__(self):
return "Frame(expid={0.expid:d}, camera='{0.camera}')".format(self)
[docs]class Fiberassign(SchemaMixin, Base):
"""Representation of the FIBERASSIGN table in a fiberassign file.
Notes
-----
* Targets are assigned to a ``location``. A ``location`` happens to
correspond to a ``fiber``, but this correspondence could change over
time, and therefore should not be assumed to be a rigid 1:1 mapping.
* ``PLATE_RA``, ``PLATE_DEC`` are sometimes missing. These can be
copies of ``TARGET_RA``, ``TARGET_DEC``, but in principle they could
be different if chromatic offsets in targeting positions were
ever implemented.
"""
@declared_attr
def __table_args__(cls):
return (Index(f'ix_{cls.__tablename__}_unique', "tileid", "targetid", "location", unique=True),
SchemaMixin.__table_args__)
id = Column(Numeric(39), primary_key=True, autoincrement=False)
tileid = Column(Integer, ForeignKey('tile.tileid'), nullable=False, index=True)
targetid = Column(BigInteger, ForeignKey('photometry.targetid'), nullable=False, index=True)
petal_loc = Column(SmallInteger, nullable=False)
device_loc = Column(Integer, nullable=False)
location = Column(Integer, nullable=False, index=True)
fiber = Column(Integer, nullable=False)
fiberstatus = Column(Integer, nullable=False)
target_ra = Column(DOUBLE_PRECISION, nullable=False)
target_dec = Column(DOUBLE_PRECISION, nullable=False)
lambda_ref = Column(REAL, nullable=False)
fa_target = Column(BigInteger, nullable=False)
fa_type = Column(SmallInteger, nullable=False)
fiberassign_x = Column(REAL, nullable=False)
fiberassign_y = Column(REAL, nullable=False)
priority = Column(Integer, nullable=False)
plate_ra = Column(DOUBLE_PRECISION, nullable=False)
plate_dec = Column(DOUBLE_PRECISION, nullable=False)
photometry = relationship("Photometry", back_populates="fiberassign")
tile = relationship("Tile", back_populates="fiberassign")
def __repr__(self):
return "Fiberassign(tileid={0.tileid:d}, targetid={0.targetid:d}, location={0.location:d})".format(self)
[docs]class Potential(SchemaMixin, Base):
"""Representation of the POTENTIAL_ASSIGNMENTS table in a fiberassign file.
"""
@declared_attr
def __table_args__(cls):
return (Index(f'ix_{cls.__tablename__}_unique', "tileid", "targetid", "location", unique=True),
SchemaMixin.__table_args__)
id = Column(Numeric(39), primary_key=True, autoincrement=False)
tileid = Column(Integer, ForeignKey('tile.tileid'), nullable=False, index=True)
targetid = Column(BigInteger, ForeignKey('photometry.targetid'), nullable=False, index=True)
fiber = Column(Integer, nullable=False)
location = Column(Integer, nullable=False, index=True)
photometry = relationship("Photometry", back_populates="potential")
tile = relationship("Tile", back_populates="potential")
def __repr__(self):
return "Potential(tileid={0.tileid:d}, targetid={0.targetid:d}, location={0.location:d})".format(self)
[docs]class Zpix(SchemaMixin, Base):
"""Representation of the ``ZCATALOG`` table in zpix files.
"""
@declared_attr
def __table_args__(cls):
return (Index(f'ix_{cls.__tablename__}_unique', "targetid", "survey", "program", unique=True),
SchemaMixin.__table_args__)
id = Column(Numeric(39), primary_key=True, autoincrement=False)
targetid = Column(BigInteger, ForeignKey('photometry.targetid'), nullable=False, index=True)
survey = Column(String(7), nullable=False, index=True)
program = Column(String(6), nullable=False, index=True)
spgrp = Column(String(10), nullable=False, index=True)
spgrpval = Column(Integer, nullable=False, index=True)
healpix = Column(Integer, nullable=False, index=True)
z = Column(DOUBLE_PRECISION, index=True, nullable=False)
zerr = Column(DOUBLE_PRECISION, nullable=False)
zwarn = Column(BigInteger, index=True, nullable=False)
chi2 = Column(DOUBLE_PRECISION, nullable=False)
coeff_0 = Column(DOUBLE_PRECISION, nullable=False)
coeff_1 = Column(DOUBLE_PRECISION, nullable=False)
coeff_2 = Column(DOUBLE_PRECISION, nullable=False)
coeff_3 = Column(DOUBLE_PRECISION, nullable=False)
coeff_4 = Column(DOUBLE_PRECISION, nullable=False)
coeff_5 = Column(DOUBLE_PRECISION, nullable=False)
coeff_6 = Column(DOUBLE_PRECISION, nullable=False)
coeff_7 = Column(DOUBLE_PRECISION, nullable=False)
coeff_8 = Column(DOUBLE_PRECISION, nullable=False)
coeff_9 = Column(DOUBLE_PRECISION, nullable=False)
npixels = Column(BigInteger, nullable=False)
spectype = Column(String(6), index=True, nullable=False)
subtype = Column(String(20), index=True, nullable=False)
ncoeff = Column(BigInteger, nullable=False)
deltachi2 = Column(DOUBLE_PRECISION, nullable=False)
coadd_fiberstatus = Column(Integer, nullable=False)
#
# Skipping columns that are in other tables.
#
coadd_numexp = Column(SmallInteger, nullable=False)
coadd_exptime = Column(REAL, nullable=False)
coadd_numnight = Column(SmallInteger, nullable=False)
coadd_numtile = Column(SmallInteger, nullable=False)
mean_delta_x = Column(REAL, nullable=False)
rms_delta_x = Column(REAL, nullable=False)
mean_delta_y = Column(REAL, nullable=False)
rms_delta_y = Column(REAL, nullable=False)
mean_fiber_ra = Column(DOUBLE_PRECISION, nullable=False)
std_fiber_ra = Column(REAL, nullable=False)
mean_fiber_dec = Column(DOUBLE_PRECISION, nullable=False)
std_fiber_dec = Column(REAL, nullable=False)
mean_psf_to_fiber_specflux = Column(REAL, nullable=False)
tsnr2_gpbdark_b = Column(REAL, nullable=False)
tsnr2_elg_b = Column(REAL, nullable=False)
tsnr2_gpbbright_b = Column(REAL, nullable=False)
tsnr2_lya_b = Column(REAL, nullable=False)
tsnr2_bgs_b = Column(REAL, nullable=False)
tsnr2_gpbbackup_b = Column(REAL, nullable=False)
tsnr2_qso_b = Column(REAL, nullable=False)
tsnr2_lrg_b = Column(REAL, nullable=False)
tsnr2_gpbdark_r = Column(REAL, nullable=False)
tsnr2_elg_r = Column(REAL, nullable=False)
tsnr2_gpbbright_r = Column(REAL, nullable=False)
tsnr2_lya_r = Column(REAL, nullable=False)
tsnr2_bgs_r = Column(REAL, nullable=False)
tsnr2_gpbbackup_r = Column(REAL, nullable=False)
tsnr2_qso_r = Column(REAL, nullable=False)
tsnr2_lrg_r = Column(REAL, nullable=False)
tsnr2_gpbdark_z = Column(REAL, nullable=False)
tsnr2_elg_z = Column(REAL, nullable=False)
tsnr2_gpbbright_z = Column(REAL, nullable=False)
tsnr2_lya_z = Column(REAL, nullable=False)
tsnr2_bgs_z = Column(REAL, nullable=False)
tsnr2_gpbbackup_z = Column(REAL, nullable=False)
tsnr2_qso_z = Column(REAL, nullable=False)
tsnr2_lrg_z = Column(REAL, nullable=False)
tsnr2_gpbdark = Column(REAL, nullable=False)
tsnr2_elg = Column(REAL, nullable=False)
tsnr2_gpbbright = Column(REAL, nullable=False)
tsnr2_lya = Column(REAL, nullable=False)
tsnr2_bgs = Column(REAL, nullable=False)
tsnr2_gpbbackup = Column(REAL, nullable=False)
tsnr2_qso = Column(REAL, nullable=False)
tsnr2_lrg = Column(REAL, nullable=False)
sv_nspec = Column(SmallInteger, nullable=False)
sv_primary = Column(Boolean, nullable=False)
main_nspec = Column(SmallInteger, nullable=False)
main_primary = Column(Boolean, nullable=False)
zcat_nspec = Column(SmallInteger, nullable=False)
zcat_primary = Column(Boolean, nullable=False)
photometry = relationship("Photometry", back_populates="zpix_redshifts")
def __repr__(self):
return "Zpix(targetid={0.targetid:d}, survey='{0.survey}', program='{0.program}')".format(self)
[docs]class Ztile(SchemaMixin, Base):
"""Representation of the ``ZCATALOG`` table in ztile files.
"""
@declared_attr
def __table_args__(cls):
return (Index(f'ix_{cls.__tablename__}_unique', "targetid", "spgrp", "spgrpval", "tileid", unique=True),
SchemaMixin.__table_args__)
id = Column(Numeric(39), primary_key=True, autoincrement=False)
targetphotid = Column(Numeric(39), ForeignKey("target.id"), nullable=False, index=True)
targetid = Column(BigInteger, ForeignKey('photometry.targetid'), nullable=False, index=True)
survey = Column(String(7), nullable=False, index=True)
program = Column(String(6), nullable=False, index=True)
spgrp = Column(String, nullable=False, index=True)
spgrpval = Column(Integer, nullable=False, index=True)
z = Column(DOUBLE_PRECISION, index=True, nullable=False)
zerr = Column(DOUBLE_PRECISION, nullable=False)
zwarn = Column(BigInteger, index=True, nullable=False)
chi2 = Column(DOUBLE_PRECISION, nullable=False)
coeff_0 = Column(DOUBLE_PRECISION, nullable=False)
coeff_1 = Column(DOUBLE_PRECISION, nullable=False)
coeff_2 = Column(DOUBLE_PRECISION, nullable=False)
coeff_3 = Column(DOUBLE_PRECISION, nullable=False)
coeff_4 = Column(DOUBLE_PRECISION, nullable=False)
coeff_5 = Column(DOUBLE_PRECISION, nullable=False)
coeff_6 = Column(DOUBLE_PRECISION, nullable=False)
coeff_7 = Column(DOUBLE_PRECISION, nullable=False)
coeff_8 = Column(DOUBLE_PRECISION, nullable=False)
coeff_9 = Column(DOUBLE_PRECISION, nullable=False)
npixels = Column(BigInteger, nullable=False)
spectype = Column(String(6), index=True, nullable=False)
subtype = Column(String(20), index=True, nullable=False)
ncoeff = Column(BigInteger, nullable=False)
deltachi2 = Column(DOUBLE_PRECISION, nullable=False)
coadd_fiberstatus = Column(Integer, nullable=False)
#
# Skipping columns that are in other tables.
#
tileid = Column(Integer, ForeignKey("tile.tileid"), nullable=False, index=True)
coadd_numexp = Column(SmallInteger, nullable=False)
coadd_exptime = Column(REAL, nullable=False)
coadd_numnight = Column(SmallInteger, nullable=False)
coadd_numtile = Column(SmallInteger, nullable=False)
mean_delta_x = Column(REAL, nullable=False)
rms_delta_x = Column(REAL, nullable=False)
mean_delta_y = Column(REAL, nullable=False)
rms_delta_y = Column(REAL, nullable=False)
mean_fiber_ra = Column(DOUBLE_PRECISION, nullable=False)
std_fiber_ra = Column(REAL, nullable=False)
mean_fiber_dec = Column(DOUBLE_PRECISION, nullable=False)
std_fiber_dec = Column(REAL, nullable=False)
mean_psf_to_fiber_specflux = Column(REAL, nullable=False)
tsnr2_gpbdark_b = Column(REAL, nullable=False)
tsnr2_elg_b = Column(REAL, nullable=False)
tsnr2_gpbbright_b = Column(REAL, nullable=False)
tsnr2_lya_b = Column(REAL, nullable=False)
tsnr2_bgs_b = Column(REAL, nullable=False)
tsnr2_gpbbackup_b = Column(REAL, nullable=False)
tsnr2_qso_b = Column(REAL, nullable=False)
tsnr2_lrg_b = Column(REAL, nullable=False)
tsnr2_gpbdark_r = Column(REAL, nullable=False)
tsnr2_elg_r = Column(REAL, nullable=False)
tsnr2_gpbbright_r = Column(REAL, nullable=False)
tsnr2_lya_r = Column(REAL, nullable=False)
tsnr2_bgs_r = Column(REAL, nullable=False)
tsnr2_gpbbackup_r = Column(REAL, nullable=False)
tsnr2_qso_r = Column(REAL, nullable=False)
tsnr2_lrg_r = Column(REAL, nullable=False)
tsnr2_gpbdark_z = Column(REAL, nullable=False)
tsnr2_elg_z = Column(REAL, nullable=False)
tsnr2_gpbbright_z = Column(REAL, nullable=False)
tsnr2_lya_z = Column(REAL, nullable=False)
tsnr2_bgs_z = Column(REAL, nullable=False)
tsnr2_gpbbackup_z = Column(REAL, nullable=False)
tsnr2_qso_z = Column(REAL, nullable=False)
tsnr2_lrg_z = Column(REAL, nullable=False)
tsnr2_gpbdark = Column(REAL, nullable=False)
tsnr2_elg = Column(REAL, nullable=False)
tsnr2_gpbbright = Column(REAL, nullable=False)
tsnr2_lya = Column(REAL, nullable=False)
tsnr2_bgs = Column(REAL, nullable=False)
tsnr2_gpbbackup = Column(REAL, nullable=False)
tsnr2_qso = Column(REAL, nullable=False)
tsnr2_lrg = Column(REAL, nullable=False)
sv_nspec = Column(SmallInteger, nullable=False)
sv_primary = Column(Boolean, nullable=False)
main_nspec = Column(SmallInteger, nullable=False)
main_primary = Column(Boolean, nullable=False)
zcat_nspec = Column(SmallInteger, nullable=False)
zcat_primary = Column(Boolean, nullable=False)
photometry = relationship("Photometry", back_populates="ztile_redshifts")
tile = relationship("Tile", back_populates="ztile_redshifts")
def __repr__(self):
return "Ztile(targetid={0.targetid:d}, tileid={0.tileid:d}, spgrp='{0.spgrp}', spgrpval={0.spgrpval:d})".format(self)
[docs]def _frameid(data):
"""Update the ``frameid`` column.
Parameters
----------
data : :class:`astropy.table.Table`
The initial data read from the file.
Returns
-------
:class:`astropy.table.Table`
Updated data table.
"""
frameid = 100*data['EXPID'] + np.array([cameraid(c) for c in data['CAMERA']], dtype=data['EXPID'].dtype)
data.add_column(frameid, name='FRAMEID', index=0)
return data
[docs]def _tileid(data):
"""Update the ``tileid`` column. Also check for the presence of ``PLATE_RA``, ``PLATE_DEC``.
Parameters
----------
data : :class:`astropy.table.Table`
The initial data read from the file.
Returns
-------
:class:`astropy.table.Table`
Updated data table.
"""
try:
tileid = data.meta['TILEID']*np.ones(len(data), dtype=np.int32)
except KeyError:
log.error("Could not find TILEID in metadata!")
raise
data.add_column(tileid, name='TILEID', index=0)
if 'TARGET_RA' in data.colnames and 'PLATE_RA' not in data.colnames:
log.debug("Adding PLATE_RA, PLATE_DEC.")
data['PLATE_RA'] = data['TARGET_RA']
data['PLATE_DEC'] = data['TARGET_DEC']
id0 = data['LOCATION'].base.astype(np.int64) << 32 | data['TILEID'].base.astype(np.int64)
composite_id = np.array([id0, data['TARGETID'].base]).T
data.add_column(composite_id, name='ID', index=0)
return data
[docs]def _survey_program(data):
"""Add ``SURVEY``, ``PROGRAM``, ``SPGRP`` columns to zpix and ztile tables.
Parameters
----------
data : :class:`astropy.table.Table`
The initial data read from the file.
Returns
-------
:class:`astropy.table.Table`
Updated data table.
Raises
------
KeyError
If a necessary header could not be found.
"""
# for i, key in enumerate(('SURVEY', 'PROGRAM', 'SPGRP')):
for i, key in enumerate(('SURVEY', 'PROGRAM')):
if key in data.colnames:
log.info("Column %s is already in the table.", key)
else:
try:
val = data.meta[key]
except KeyError:
log.error("Could not find %s in metadata!", key)
raise
log.debug("Adding %s column.", key)
data.add_column(np.array([val]*len(data)), name=key, index=i+1)
# objid, brickid, release, mock, sky, gaiadr = decode_targetid(data['TARGETID'])
# data.add_column(sky, name='SKY', index=0)
if 'MAIN_NSPEC' not in data.colnames:
data.add_column(np.array([0]*len(data), dtype=np.int16), name='MAIN_NSPEC', index=data.colnames.index('SV_PRIMARY')+1)
data.add_column(np.array([False]*len(data), dtype=np.int16), name='MAIN_PRIMARY', index=data.colnames.index('MAIN_NSPEC')+1)
if 'SV_NSPEC' not in data.colnames:
data.add_column(np.array([0]*len(data), dtype=np.int16), name='SV_NSPEC', index=data.colnames.index('TSNR2_LRG')+1)
data.add_column(np.array([False]*len(data), dtype=np.int16), name='SV_PRIMARY', index=data.colnames.index('SV_NSPEC')+1)
if 'TILEID' in data.colnames:
data.add_column(np.array(['cumulative']*len(data)), name='SPGRP', index=data.colnames.index('PROGRAM')+1)
data = _target_unique_id(data)
data.rename_column('ID', 'TARGETPHOTID')
s = np.array([spgrpid(s) for s in data['SPGRP']], dtype=np.int64)
id0 = (s << 27 | data['SPGRPVAL'].base.astype(np.int64)) << 32 | data['TILEID'].base.astype(np.int64)
else:
data.add_column(np.array(['healpix']*len(data)), name='SPGRP', index=data.colnames.index('PROGRAM')+1)
s = np.array([surveyid(s) for s in data['SURVEY']], dtype=np.int64)
p = np.array([programid(s) for s in data['PROGRAM']], dtype=np.int64)
id0 = p << 32 | s
composite_id = np.array([id0, data['TARGETID'].base]).T
data.add_column(composite_id, name='ID', index=0)
return data
[docs]def _target_unique_id(data):
"""Add composite ``ID`` column for later conversion.
Parameters
----------
data : :class:`astropy.table.Table`
The initial data read from the file.
Returns
-------
:class:`astropy.table.Table`
Updated data table.
"""
s = np.array([surveyid(s) for s in data['SURVEY']], dtype=np.int64)
id0 = s << 32 | data['TILEID'].base.astype(np.int64)
composite_id = np.array([id0, data['TARGETID'].base]).T
data.add_column(composite_id, name='ID', index=0)
return data
[docs]def _add_ls_id(data):
"""Add LS_ID to targetphot data.
Parameters
----------
data : :class:`astropy.table.Table`
The initial data read from the file.
Returns
-------
:class:`astropy.table.Table`
Updated data table.
"""
ls_id = ((data['RELEASE'].data.astype(np.int64) << 40) |
(data['BRICKID'].data.astype(np.int64) << 16) |
data['BRICK_OBJID'].data.astype(np.int64))
data.add_column(ls_id, name='LS_ID', index=0)
return data
[docs]def _deduplicate_targetid(data):
"""Find targetphot rows that are not already loaded into the Photometry
table *and* resolve any duplicate TARGETID.
Parameters
----------
data : :class:`astropy.table.Table`
The initial data read from the file.
Returns
-------
:class:`numpy.array`
An array of rows that are safe to load.
"""
rows = dbSession.query(Photometry.targetid, Photometry.ls_id).order_by(Photometry.targetid).all()
loaded_targetid = Table()
loaded_targetid['TARGETID'] = np.array([r[0] for r in rows])
loaded_targetid['LS_ID'] = np.array([r[1] for r in rows])
#
# Find TARGETIDs that do not exist in Photometry
#
j = join(data['TARGETID', 'RELEASE'], loaded_targetid, join_type='left', keys='TARGETID')
load_targetids = j['TARGETID'][j['LS_ID'].mask]
load_rows = np.zeros((len(data),), dtype=bool)
unique_targetid, targetid_index = np.unique(data['TARGETID'].data, return_index=True)
for t in load_targetids:
load_rows[targetid_index[unique_targetid == t]] = True
return load_rows
[docs]def _remove_loaded_targetid(data):
"""Remove rows with TARGETID already loaded into the database.
Parameters
----------
data : :class:`astropy.table.Table`
The initial data read from the file.
Returns
-------
:class:`numpy.array`
An array of rows that are safe to load.
"""
targetid = data['TARGETID'].data
good_rows = np.ones((len(targetid),), dtype=bool)
q = dbSession.query(Photometry.targetid).filter(Photometry.targetid.in_(targetid.tolist())).all()
for row in q:
good_rows[targetid == row[0]] = False
return good_rows
[docs]def _remove_loaded_unique_id(data):
"""Remove rows with UNIQUE ID already loaded into the database.
Parameters
----------
data : :class:`astropy.table.Table`
The initial data read from the file.
Returns
-------
:class:`numpy.array`
An array of rows that are safe to load.
"""
rows = dbSession.query(Target.id).all()
loaded_id = [r[0] for r in rows]
data_id = [(int(data['ID'][k][0]) << 64) | int(data['ID'][k][1])
for k in range(len(data))]
id_index = dict(zip(data_id, range(len(data))))
good_rows = np.ones((len(data),), dtype=bool)
for i in loaded_id:
good_rows[id_index[i]] = False
return good_rows
[docs]def load_file(filepaths, tcls, hdu=1, preload=None, expand=None, insert=None, convert=None,
index=None, rowfilter=None, q3c=None,
chunksize=50000, maxrows=0):
"""Load data file into the database, assuming that column names map
to database column names with no surprises.
Parameters
----------
filepaths : :class:`str` or :class:`list`
Full path to the data file or set of data files.
tcls : :class:`sqlalchemy.ext.declarative.api.DeclarativeMeta`
The table to load, represented by its class.
hdu : :class:`int` or :class:`str`, optional
Read a data table from this HDU (default 1).
preload : callable, optional
A function that takes a :class:`~astropy.table.Table` as an argument.
Use this for more complicated manipulation of the data before loading,
for example a function that depends on multiple columns. The return
value should be the updated Table.
expand : :class:`dict`, optional
If set, map FITS column names to one or more alternative column names.
insert : :class:`dict`, optional
If set, insert one or more columns, before an existing column. The
existing column will be copied into the new column(s).
convert : :class:`dict`, optional
If set, convert the data for a named (database) column using the
supplied function.
index : :class:`str`, optional
If set, add a column that just counts the number of rows.
rowfilter : callable, optional
If set, apply this filter to the rows to be loaded. The function
should return :class:`bool`, with ``True`` meaning a good row.
q3c : :class:`str`, optional
If set, create q3c index on the table, using the RA column
named `q3c`.
chunksize : :class:`int`, optional
If set, load database `chunksize` rows at a time (default 50000).
maxrows : :class:`int`, optional
If set, stop loading after `maxrows` are loaded. Alteratively,
set `maxrows` to zero (0) to load all rows.
Returns
-------
:class:`int`
The grand total of rows loaded.
"""
tn = tcls.__tablename__
if isinstance(filepaths, str):
filepaths = [filepaths]
log.info("Identified %d files for ingestion.", len(filepaths))
loaded_rows = 0
for filepath in filepaths:
if filepath.endswith('.fits') or filepath.endswith('.fits.gz'):
data = Table.read(filepath, hdu=hdu, format='fits')
log.info("Read %d rows of data from %s HDU %s.", len(data), filepath, hdu)
elif filepath.endswith('.ecsv'):
data = Table.read(filepath, format='ascii.ecsv')
log.info("Read %d rows of data from %s.", len(data), filepath)
elif filepath.endswith('.csv'):
data = Table.read(filepath, format='ascii.csv')
log.info("Read %d rows of data from %s.", len(data), filepath)
else:
log.error("Unrecognized data file, %s!", filepath)
return
if maxrows == 0 or len(data) < maxrows:
mr = len(data)
else:
mr = maxrows
if preload is not None:
data = preload(data)
log.info("Preload function complete on %s.", tn)
try:
colnames = data.names
except AttributeError:
colnames = data.colnames
masked = dict()
for col in colnames:
if data[col].dtype.kind == 'f':
if isinstance(data[col], MaskedColumn):
bad = np.isnan(data[col].data.data[0:mr])
masked[col] = True
else:
bad = np.isnan(data[col][0:mr])
if np.any(bad):
if bad.ndim == 1:
log.warning("%d rows of bad data detected in column " +
"%s of %s.", bad.sum(), col, filepath)
elif bad.ndim == 2:
nbadrows = len(bad.sum(1).nonzero()[0])
nbaditems = bad.sum(1).sum()
log.warning("%d rows (%d items) of bad data detected in column " +
"%s of %s.", nbadrows, nbaditems, col, filepath)
else:
log.warning("Bad data detected in high-dimensional column %s of %s.", col, filepath)
#
# TODO: is this replacement appropriate for all columns?
#
if col in masked:
data[col].data.data[0:mr][bad] = -9999.0
else:
data[col][0:mr][bad] = -9999.0
log.info("Integrity check complete on %s.", tn)
if rowfilter is None:
good_rows = np.ones((mr,), dtype=bool)
else:
good_rows = rowfilter(data[0:mr])
log.info("Row filter applied on %s; %d rows remain.", tn, good_rows.sum())
data_list = list()
for col in colnames:
if col in masked:
data_list.append(data[col].data.data[0:mr][good_rows].tolist())
else:
data_list.append(data[col][0:mr][good_rows].tolist())
data_names = [col.lower() for col in colnames]
finalrows = len(data_list[0])
log.info("Initial column conversion complete on %s.", tn)
if expand is not None:
for col in expand:
i = data_names.index(col.lower())
if isinstance(expand[col], str):
#
# Just rename a column.
#
log.debug("Renaming column %s (at index %d) to %s.", data_names[i], i, expand[col])
data_names[i] = expand[col]
else:
#
# Assume this is an expansion of an array-valued column
# into individual columns.
#
del data_names[i]
del data_list[i]
for j, n in enumerate(expand[col]):
log.debug("Expanding column %d of %s (at index %d) to %s.", j, col, i, n)
data_names.insert(i + j, n)
data_list.insert(i + j, data[col][:, j].tolist())
log.debug(data_names)
log.info("Column expansion complete on %s.", tn)
del data
if insert is not None:
for col in insert:
i = data_names.index(col)
for item in insert[col]:
data_names.insert(i, item)
data_list.insert(i, data_list[i].copy()) # Dummy values
log.info("Column insertion complete on %s.", tn)
if convert is not None:
for col in convert:
i = data_names.index(col)
data_list[i] = [convert[col](x) for x in data_list[i]]
log.info("Column conversion complete on %s.", tn)
if index is not None:
data_list.insert(0, list(range(1, finalrows+1)))
data_names.insert(0, index)
log.info("Added index column '%s'.", index)
data_rows = list(zip(*data_list))
del data_list
log.info("Converted columns into rows on %s.", tn)
n_chunks = finalrows//chunksize
if finalrows % chunksize:
n_chunks += 1
for k in range(n_chunks):
data_chunk = [dict(zip(data_names, row))
for row in data_rows[k*chunksize:(k+1)*chunksize]]
if len(data_chunk) > 0:
loaded_rows += len(data_chunk)
engine.execute(tcls.__table__.insert(), data_chunk)
log.info("Inserted %d rows in %s.",
min((k+1)*chunksize, finalrows), tn)
else:
log.error("Detected empty data chunk in %s!", tn)
# for k in range(finalrows//chunksize + 1):
# data_insert = [dict([(col, data_list[i].pop(0))
# for i, col in enumerate(data_names)])
# for j in range(chunksize)]
# session.bulk_insert_mappings(tcls, data_insert)
# log.info("Inserted %d rows in %s..",
# min((k+1)*chunksize, finalrows), tn)
dbSession.commit()
if q3c is not None:
q3c_index(tn, ra=q3c)
return loaded_rows
[docs]def update_truth(filepath, hdu=2, chunksize=50000, skip=('SLOPES', 'EMLINES')):
"""Add data from columns in other HDUs of the Truth table.
Parameters
----------
filepath : :class:`str`
Full path to the data file.
hdu : :class:`int` or :class:`str`, optional
Read a data table from this HDU (default 2).
chunksize : :class:`int`, optional
If set, update database `chunksize` rows at a time (default 50000).
skip : :func:`tuple`, optional
Do not load columns with these names (default, ``('SLOPES', 'EMLINES')``)
"""
tcls = Truth
tn = tcls.__tablename__
t = tcls.__table__
if filepath.endswith( ('.fits', '.fits.gz') ):
with fits.open(filepath) as hdulist:
data = hdulist[hdu].data
elif filepath.endswith('.ecsv'):
data = Table.read(filepath, format='ascii.ecsv')
else:
log.error("Unrecognized data file, %s!", filepath)
return
log.info("Read data from %s HDU %s", filepath, hdu)
try:
colnames = data.names
except AttributeError:
colnames = data.colnames
for col in colnames:
if data[col].dtype.kind == 'f':
bad = np.isnan(data[col])
if np.any(bad):
nbad = bad.sum()
log.warning("%d rows of bad data detected in column " +
"%s of %s.", nbad, col, filepath)
log.info("Integrity check complete on %s.", tn)
# if rowfilter is None:
# good_rows = np.ones((maxrows,), dtype=bool)
# else:
# good_rows = rowfilter(data[0:maxrows])
# data_list = [data[col][0:maxrows][good_rows].tolist() for col in colnames]
data_list = [data[col].tolist() for col in colnames if col not in skip]
data_names = [col.lower() for col in colnames if col not in skip]
data_names[0] = 'b_targetid'
finalrows = len(data_list[0])
log.info("Initial column conversion complete on %s.", tn)
del data
data_rows = list(zip(*data_list))
del data_list
log.info("Converted columns into rows on %s.", tn)
for k in range(finalrows//chunksize + 1):
data_chunk = [dict(zip(data_names, row))
for row in data_rows[k*chunksize:(k+1)*chunksize]]
q = t.update().where(t.c.targetid == bindparam('b_targetid'))
if len(data_chunk) > 0:
engine.execute(q, data_chunk)
log.info("Updated %d rows in %s.",
min((k+1)*chunksize, finalrows), tn)
[docs]def load_redrock(datapath=None, hdu='REDSHIFTS', q3c=False):
"""Load redrock files into the zcat table.
This function is deprecated since there should now be a single
redshift catalog file.
Parameters
----------
datapath : :class:`str`
Full path to the directory containing redrock files.
hdu : :class:`int` or :class:`str`, optional
Read a data table from this HDU (default 'REDSHIFTS').
q3c : :class:`bool`, optional
If set, create q3c index on the table.
"""
if datapath is None:
datapath = specprod_root()
redrockpath = os.path.join(datapath, 'spectra-64', '*', '*', 'redrock-64-*.fits')
log.info("Using redrock file search path: %s.", redrockpath)
redrock_files = glob.glob(redrockpath)
if len(redrock_files) == 0:
log.error("No redrock files found!")
return
log.info("Found %d redrock files.", len(redrock_files))
#
# Read the identified redrock files.
#
for f in redrock_files:
brickname = os.path.basename(os.path.dirname(f))
with fits.open(f) as hdulist:
data = hdulist[hdu].data
log.info("Read data from %s HDU %s.", f, hdu)
good_targetids = ((data['TARGETID'] != 0) & (data['TARGETID'] != -1))
#
# If there are too many targetids, the in_ clause will blow up.
# Disabling this test, and crossing fingers.
#
# q = dbSession.query(ZCat).filter(ZCat.targetid.in_(data['TARGETID'].tolist())).all()
# if len(q) != 0:
# log.warning("Duplicate TARGETID found in %s.", f)
# for z in q:
# log.warning("Duplicate TARGETID = %d.", z.targetid)
# good_targetids = good_targetids & (data['TARGETID'] != z.targetid)
data_list = [data[col][good_targetids].tolist()
for col in data.names]
data_names = [col.lower() for col in data.names]
log.info("Initial column conversion complete on brick = %s.", brickname)
#
# Expand COEFF
#
col = 'COEFF'
expand = ('coeff_0', 'coeff_1', 'coeff_2', 'coeff_3', 'coeff_4',
'coeff_5', 'coeff_6', 'coeff_7', 'coeff_8', 'coeff_9',)
i = data_names.index(col.lower())
del data_names[i]
del data_list[i]
for j, n in enumerate(expand):
log.debug("Expanding column %d of %s (at index %d) to %s.", j, col, i, n)
data_names.insert(i + j, n)
data_list.insert(i + j, data[col][:, j].tolist())
log.debug(data_names)
#
# redrock files don't contain the same columns as zcatalog.
#
for col in ZCat.__table__.columns:
if col.name not in data_names:
data_names.append(col.name)
data_list.append([0]*len(data_list[0]))
data_rows = list(zip(*data_list))
log.info("Converted columns into rows on brick = %s.", brickname)
try:
dbSession.bulk_insert_mappings(ZCat, [dict(zip(data_names, row))
for row in data_rows])
except IntegrityError as e:
log.error("Integrity Error detected!")
log.error(e)
dbSession.rollback()
else:
log.info("Inserted %d rows in %s for brick = %s.",
len(data_rows), ZCat.__tablename__, brickname)
dbSession.commit()
if q3c:
q3c_index('zcat')
return
[docs]def q3c_index(table, ra='ra'):
"""Create a q3c index on a table.
Parameters
----------
table : :class:`str`
Name of the table to index.
ra : :class:`str`, optional
If the RA, Dec columns are called something besides "ra" and "dec",
set its name. For example, ``ra='target_ra'``.
"""
q3c_sql = """CREATE INDEX ix_{table}_q3c_ang2ipix ON {schema}.{table} (q3c_ang2ipix({ra}, {dec}));
CLUSTER {schema}.{table} USING ix_{table}_q3c_ang2ipix;
ANALYZE {schema}.{table};
""".format(ra=ra, dec=ra.lower().replace('ra', 'dec'),
schema=schemaname, table=table)
log.info("Creating q3c index on %s.%s.", schemaname, table)
dbSession.execute(q3c_sql)
log.info("Finished q3c index on %s.%s.", schemaname, table)
dbSession.commit()
return
[docs]def setup_db(options=None, **kwargs):
"""Initialize the database connection.
Parameters
----------
options : :class:`argpare.Namespace`
Parsed command-line options.
kwargs : keywords
If present, use these instead of `options`. This is more
user-friendly than setting up a :class:`~argpare.Namespace`
object in, *e.g.* a Jupyter Notebook.
Returns
-------
:class:`bool`
``True`` if the configured database is a PostgreSQL database.
"""
global engine, schemaname
#
# Schema creation
#
if options is None:
if len(kwargs) > 0:
dbfile = kwargs.get('dbfile', 'redshift.db')
hostname = kwargs.get('hostname', None)
overwrite = kwargs.get('overwrite', False)
schema = kwargs.get('schema', None)
username = kwargs.get('username', 'desidev_admin')
verbose = kwargs.get('verbose', False)
else:
raise ValueError("No options specified!")
else:
dbfile = options.dbfile
hostname = options.hostname
overwrite = options.overwrite
schema = options.schema
username = options.username
verbose = options.verbose
if schema:
schemaname = schema
# event.listen(Base.metadata, 'before_create', CreateSchema(schemaname))
# if overwrite:
# event.listen(Base.metadata, 'before_create',
# DDL('DROP SCHEMA IF EXISTS {0} CASCADE'.format(schemaname)))
event.listen(Base.metadata, 'before_create',
DDL(f'CREATE SCHEMA IF NOT EXISTS {schema};'))
event.listen(Base.metadata, 'after_create',
DDL(f'GRANT USAGE ON SCHEMA {schema} TO desi; GRANT SELECT ON ALL TABLES IN SCHEMA {schema} TO desi; GRANT SELECT ON ALL SEQUENCES IN SCHEMA {schema} TO desi;'))
#
# Create the file.
#
postgresql = False
if hostname:
postgresql = True
db_connection = parse_pgpass(hostname=hostname,
username=username)
if db_connection is None:
log.critical("Could not load database information!")
return 1
else:
if os.path.basename(dbfile) == dbfile:
db_file = os.path.join(options.datapath, dbfile)
else:
db_file = dbfile
if overwrite and os.path.exists(db_file):
log.info("Removing file: %s.", db_file)
os.remove(db_file)
db_connection = 'sqlite:///'+db_file
#
# SQLAlchemy stuff.
#
engine = create_engine(db_connection, echo=verbose)
dbSession.remove()
dbSession.configure(bind=engine, autoflush=False, expire_on_commit=False)
for tab in Base.metadata.tables.values():
tab.schema = schemaname
if overwrite:
log.info("Begin creating tables.")
Base.metadata.drop_all(engine)
Base.metadata.create_all(engine)
log.info("Finished creating tables.")
return postgresql
[docs]def get_options(*args):
"""Parse command-line options.
Parameters
----------
args : iterable
If arguments are passed, use them instead of ``sys.argv``.
Returns
-------
:class:`argparse.Namespace`
The parsed options.
"""
from sys import argv
from argparse import ArgumentParser
prsr = ArgumentParser(description=("Load redshift data into a database."),
prog=os.path.basename(argv[0]))
prsr.add_argument('-f', '--filename', action='store', dest='dbfile',
default='redshift.db', metavar='FILE',
help="Store data in FILE (default %(default)s).")
prsr.add_argument('-H', '--hostname', action='store', dest='hostname',
metavar='HOSTNAME', default='nerscdb03.nersc.gov',
help='If specified, connect to a PostgreSQL database on HOSTNAME (default %(default)s).')
prsr.add_argument('-l', '--load', action='store', dest='load',
default='exposures', metavar='STAGE',
help='Load the set of files associated with STAGE (default "%(default)s").')
prsr.add_argument('-m', '--max-rows', action='store', dest='maxrows',
type=int, default=0, metavar='M',
help="Load up to M rows in the tables (default is all rows).")
prsr.add_argument('-o', '--overwrite', action='store_true', dest='overwrite',
help='Delete any existing file(s) before loading.')
prsr.add_argument('-r', '--rows', action='store', dest='chunksize',
type=int, default=50000, metavar='N',
help="Load N rows at a time (default %(default)s).")
prsr.add_argument('-s', '--schema', action='store', dest='schema',
metavar='SCHEMA',
help='Set the schema name in the PostgreSQL database.')
prsr.add_argument('-t', '--tiles-path', action='store', dest='tilespath', metavar='PATH',
default=os.path.join(os.environ['DESI_TARGET'], 'fiberassign', 'tiles', 'trunk'),
help="Load fiberassign data from PATH (default %(default)s).")
prsr.add_argument('-T', '--target-path', action='store', dest='targetpath', metavar='PATH',
help="Load target photometry data from PATH.")
prsr.add_argument('-U', '--username', action='store', dest='username',
metavar='USERNAME', default='desidev_admin',
help="If specified, connect to a PostgreSQL database with USERNAME (default %(default)s).")
prsr.add_argument('-v', '--verbose', action='store_true', dest='verbose',
help='Print extra information.')
prsr.add_argument('datapath', metavar='DIR', help='Load the data in DIR.')
if len(args) > 0:
options = prsr.parse_args(args)
else:
options = prsr.parse_args()
if options.targetpath is None:
options.targetpath = options.datapath
return options
[docs]def main():
"""Entry point for command-line script.
Returns
-------
:class:`int`
An integer suitable for passing to :func:`sys.exit`.
"""
global log
freeze_iers()
#
# command-line arguments
#
options = get_options()
#
# Logging
#
if options.verbose:
log = get_logger(DEBUG, timestamp=True)
else:
log = get_logger(INFO, timestamp=True)
#
# Initialize DB
#
postgresql = setup_db(options)
#
# Load configuration
#
loaders = {'exposures': [{'filepaths': os.path.join(options.datapath, 'spectro', 'redux', os.environ['SPECPROD'], 'tiles-{specprod}.fits'.format(specprod=os.environ['SPECPROD'])),
'tcls': Tile,
'hdu': 'TILE_COMPLETENESS',
'q3c': 'tilera',
'chunksize': options.chunksize,
'maxrows': options.maxrows
},
{'filepaths': os.path.join(options.datapath, 'spectro', 'redux', os.environ['SPECPROD'], 'exposures-{specprod}.fits'.format(specprod=os.environ['SPECPROD'])),
'tcls': Exposure,
'hdu': 'EXPOSURES',
'insert': {'mjd': ('date_obs',)},
'convert': {'date_obs': lambda x: Time(x, format='mjd').to_value('datetime').replace(tzinfo=utc)},
'q3c': 'tilera',
'chunksize': options.chunksize,
'maxrows': options.maxrows
},
{'filepaths': os.path.join(options.datapath, 'spectro', 'redux', os.environ['SPECPROD'], 'exposures-{specprod}.fits'.format(specprod=os.environ['SPECPROD'])),
'tcls': Frame,
'hdu': 'FRAMES',
'preload': _frameid,
'chunksize': options.chunksize,
'maxrows': options.maxrows
}],
#
# The potential targets are supposed to include data for all targets.
# In other words, every actual target is also a potential target.
#
'photometry': [{'filepaths': glob.glob(os.path.join(options.targetpath, 'vac', 'lsdr9-photometry', os.environ['SPECPROD'], 'v1.0', 'potential-targets', 'tractorphot', 'tractorphot*.fits')),
'tcls': Photometry,
'hdu': 'TRACTORPHOT',
'expand': {'DCHISQ': ('dchisq_psf', 'dchisq_rex', 'dchisq_dev', 'dchisq_exp', 'dchisq_ser',),
'OBJID': 'brick_objid',
'TYPE': 'morphtype'},
# 'rowfilter': _remove_loaded_targetid,
'chunksize': options.chunksize,
'maxrows': options.maxrows
}],
#
# This stage loads targets, and such photometry as they have, that did not
# successfully match to a known LS DR9 object.
#
'targetphot': [{'filepaths': os.path.join(options.targetpath, 'vac', 'lsdr9-photometry', os.environ['SPECPROD'], 'v1.0', 'potential-targets', 'targetphot-potential-{specprod}.fits'.format(specprod=os.environ['SPECPROD'])),
'tcls': Photometry,
'hdu': 'TARGETPHOT',
'preload': _add_ls_id,
'expand': {'DCHISQ': ('dchisq_psf', 'dchisq_rex', 'dchisq_dev', 'dchisq_exp', 'dchisq_ser',)},
'convert': {'gaia_astrometric_params_solved': lambda x: int(x)},
'rowfilter': _deduplicate_targetid,
'q3c': 'ra',
'chunksize': options.chunksize,
'maxrows': options.maxrows
}],
'target': [{'filepaths': os.path.join(options.targetpath, 'vac', 'lsdr9-photometry', os.environ['SPECPROD'], 'v1.0', 'potential-targets', 'targetphot-potential-{specprod}.fits'.format(specprod=os.environ['SPECPROD'])),
'tcls': Target,
'hdu': 'TARGETPHOT',
'preload': _target_unique_id,
'convert': {'id': lambda x: x[0] << 64 | x[1]},
# 'rowfilter': _remove_loaded_unique_id,
'chunksize': options.chunksize,
'maxrows': options.maxrows
}],
'redshift': [{'filepaths': os.path.join(options.datapath, 'spectro', 'redux', os.environ['SPECPROD'], 'zcatalog', 'zall-pix-{specprod}.fits'.format(specprod=os.environ['SPECPROD'])),
'tcls': Zpix,
'hdu': 'ZCATALOG',
'preload': _survey_program,
'expand': {'COEFF': ('coeff_0', 'coeff_1', 'coeff_2', 'coeff_3', 'coeff_4',
'coeff_5', 'coeff_6', 'coeff_7', 'coeff_8', 'coeff_9',)},
'convert': {'id': lambda x: x[0] << 64 | x[1]},
'rowfilter': lambda x: (x['TARGETID'] > 0) & ((x['TARGETID'] & 2**59) == 0),
'chunksize': options.chunksize,
'maxrows': options.maxrows
},
{'filepaths': os.path.join(options.datapath, 'spectro', 'redux', os.environ['SPECPROD'], 'zcatalog', 'zall-tilecumulative-{specprod}.fits'.format(specprod=os.environ['SPECPROD'])),
'tcls': Ztile,
'hdu': 'ZCATALOG',
'preload': _survey_program,
'expand': {'COEFF': ('coeff_0', 'coeff_1', 'coeff_2', 'coeff_3', 'coeff_4',
'coeff_5', 'coeff_6', 'coeff_7', 'coeff_8', 'coeff_9',)},
'convert': {'id': lambda x: x[0] << 64 | x[1],
'targetphotid': lambda x: x[0] << 64 | x[1]},
'rowfilter': lambda x: (x['TARGETID'] > 0) & ((x['TARGETID'] & 2**59) == 0),
'chunksize': options.chunksize,
'maxrows': options.maxrows
}],
'fiberassign': [{'filepaths': None,
'tcls': Fiberassign,
'hdu': 'FIBERASSIGN',
'preload': _tileid,
'convert': {'id': lambda x: x[0] << 64 | x[1]},
'rowfilter': lambda x: (x['TARGETID'] > 0) & ((x['TARGETID'] & 2**59) == 0),
'q3c': 'target_ra',
'chunksize': options.chunksize,
'maxrows': options.maxrows
},
{'filepaths': None,
'tcls': Potential,
'hdu': 'POTENTIAL_ASSIGNMENTS',
'preload': _tileid,
'convert': {'id': lambda x: x[0] << 64 | x[1]},
'rowfilter': lambda x: (x['TARGETID'] > 0) & ((x['TARGETID'] & 2**59) == 0),
'chunksize': options.chunksize,
'maxrows': options.maxrows
}]
}
try:
loader = loaders[options.load]
except KeyError:
log.critical("Unknown loading stage '%s'!", options.load)
return 1
#
# Find the tiles that need to be loaded. Not all fiberassign files are compressed!
#
if options.load == 'fiberassign':
try:
fiberassign_files = [checkgzip(os.path.join(options.tilespath, (f"{tileid[0]:06d}")[0:3], f"fiberassign-{tileid[0]:06d}.fits"))
for tileid in dbSession.query(Tile.tileid).order_by(Tile.tileid)]
except FileNotFoundError:
log.error("Some fiberassign files were not found!")
return 1
log.debug(fiberassign_files)
for k in range(len(loader)):
loader[k]['filepaths'] = fiberassign_files
#
# Load the tables that correspond to a set of files.
#
for l in loader:
tn = l['tcls'].__tablename__
log.info("Loading %s from %s.", tn, str(l['filepaths']))
load_file(**l)
log.info("Finished loading %s.", tn)
return 0
if __name__ == '__main__':
sys.exit(main())