"""
desispec.workflow.tableio
=========================
"""
import os
import numpy as np
from astropy.table import Table
###################################################
################ Table Functions #################
###################################################
from desispec.workflow.utils import pathjoin
from desiutil.log import get_logger
[docs]def ensure_scalar(val, joinsymb='|',comma_replacement=';'):
"""
Ensures that the object in val is a scalar that can be save to a Table cell (i.e. row of a column or
column of a row). If the it is an array or list, it uses joinsymb to turn them into a single string.
Parameters
----------
val : a scalar datatype, list, or array
The value to be converted to a scalar quantity
(returning the val if it is already a scalar).
joinsymb : str
A string symbol *other than comma* that will be used to
join the multiple values of a list or array.
comma_replacement : str
A string symbol that should be used to replace any existing
commas in the data, such that the value can be saved in a csv format.
Returns
-------
val or outstr, any scalar type or string
The output string which is a scalar quantity capable of being
written to a single table cell (in a csv or fits file, for example).
"""
if type(val) in [str, np.str_]:
if ',' in val:
val = val.replace(',', comma_replacement)
return val
elif val is None or type(val) is np.ma.core.MaskedConstant or np.isscalar(val):
return val
else:
val = np.atleast_1d(val).astype(str)
outstr = joinsymb.join(val) + joinsymb
if ',' in outstr:
outstr = outstr.replace(',', comma_replacement)
return outstr
[docs]def split_str(val, joinsymb='|',comma_replacement=';'):
"""
Attempts to intelligently interpret an input scalar. If it is a string it looks to see if it was a list or array
objects that was joined to be a single string using joinsymb. If it identifies that, it will split that into the
original list/array. Otherwise it will return the val as-is.
Parameters
----------
val : any datatype
The input to be checked to see if it is in fact a list/array that was joined into a string
for saving in a Table.
joinsymb : str
The symbol used to join values in a list/array when saving. Should not be a comma.
comma_replacement : str
Replace instances of this symbol with commas when loading ONLY scalar columns in a table,
as e.g. BADAMPS is used in the pipeline and symbols like ';' are problematic
on the command line. Comment arrays do not need to be converted back and forth.
Returns
-------
val or split_list, any datatype or np.array.
"""
if type(val) in [str, np.str_]:
if val.isnumeric():
if '.' in val:
return float(val)
else:
return int(val)
elif joinsymb not in val:
if val.lower() == 'true':
return True
elif val.lower() == 'false':
return False
else:
if comma_replacement in val:
val = val.replace(comma_replacement, ',')
return val
else:
val = val.strip(joinsymb)
if val == '':
return np.array([], dtype=object)
split_list = np.array(val.split(joinsymb))
if '.' in split_list[0] and split_list[0].isnumeric():
return split_list.astype(float)
elif split_list[0].isnumeric():
return split_list.astype(int)
else:
split_list = np.array([val.strip('\t ') for val in split_list.astype(str)]).astype(str)
return split_list
else:
return val
[docs]def write_table(origtable, tablename=None, tabletype=None, joinsymb='|', overwrite=True, verbose=False,
comma_replacement=';', write_empty=False, use_specprod=True):
"""
Workflow function to write exposure, processing, and unprocessed tables. It allows for multi-valued table cells, which are
reduced to strings using the joinsymb. It writes to a temp file before moving the fully written file to the
name given by tablename (or the default for table of type tabletype).
Parameters
----------
origtable : Table
Either exposure table or processing table.
tablename : str
Full pathname of where the table should be saved, including the extension. Originally save to
``*.temp.{ext}`` and then moved to ``*.{ext}``. If ``None``, it looks up the default for typetable.
tabletype : str
Used if tablename is None to get the default name for the type of table.
joinsymb : str
The symbol used to join values in a list/array when saving. Should not be a comma.
overwrite : bool
Whether to overwrite the file on disk if it already exists. Default is currently True.
verbose : bool
Whether to give verbose amounts of information (True) or succinct/no outputs (False). Default is False.
write_empty : bool
Whether to write an empty table to disk. The default is False. Warning: code is less robust
to column datatypes on read/write if the table is empty. May cause issues if this is set to True.
comma_replacement : str
Replace instances of this symbol with commas when loading scalar columns in a table,
as e.g. BADAMPS is used in the pipeline and symbols like ';' are problematic
on the command line.
use_specprod : bool
If True and tablename not specified and tabletype is exposure table, this looks for the
table in the SPECPROD rather than the exptab repository. Default is True.
"""
log = get_logger()
if tablename is None and tabletype is None:
log.error("Pathname or type of table is required to save the table")
return
if tabletype is not None:
tabletype = standardize_tabletype(tabletype)
if tablename is None:
tablename = translate_type_to_pathname(tabletype, use_specprod=use_specprod)
if not write_empty and len(origtable) == 0:
log.warning(f'NOT writing zero length table to {tablename}')
return
if verbose:
log.info("In write table", tablename,'\n', tabletype)
log.info(origtable[0:2])
basename, ext = os.path.splitext(tablename)
temp_name = f'{basename}.temp{ext}'
if verbose:
log.info(ext ,temp_name)
table = origtable.copy()
if ext in ['.csv', '.ecsv']:
if verbose:
log.info("Given table: ", table.info)
# replace_cols = {}
for nam in table.colnames:
ndim = table[nam].ndim
if ndim > 1 or type(table[nam][0]) in [list, np.ndarray, np.array] or table[nam].dtype is object:
if verbose:
log.info(f'{nam} is {ndim} dimensions, changing to string')
col = [ensure_scalar(row, joinsymb=joinsymb) for row in table[nam]]
# replace_cols[nam] = Table.Column(name=nam,data=col)
if type(table[nam]) is Table.MaskedColumn:
col = Table.MaskedColumn(name=nam, data=col)
else:
col = Table.Column(name=nam, data=col)
table.replace_column(nam, col)
elif type(table[nam][0]) in [str, np.str_]:
col = [row.replace(',', comma_replacement) for row in table[nam]]
if type(table[nam]) is Table.MaskedColumn:
col = Table.MaskedColumn(name=nam, data=col)
else:
col = Table.Column(name=nam, data=col)
table.replace_column(nam, col)
# for nam, col in replace_cols.items():
# t.replace_column(nam,col)
if np.any([c.ndim > 1 or type(table[nam][0]) in [list, np.ndarray, np.array] for c in
table.itercols()]) and verbose:
log.warning("A column was still more than one dimensional")
log.info(table.info())
table.write(temp_name, format=f'ascii{ext}', overwrite=overwrite)
else:
table.write(temp_name, overwrite=True)
os.rename(temp_name, tablename)
if verbose:
log.info("Written table: ", table.info)
[docs]def standardize_tabletype(tabletype):
"""
Given the user defined type of table it returns the proper 'tabletype' expected by the pipeline
Parameters
----------
tabletype : str
Allows for a flexible number of input options, but should refer to either the 'exposure',
'processing', or 'unprocessed' table types.
Returns
-------
tabletype : str
Standardized tabletype values. Either "exptable", "proctable", "unproctable".
"""
if tabletype.lower() in ['exp', 'exposure', 'etable', 'exptable', 'exptab', 'exposuretable', 'exposure_table']:
tabletype = 'exptable'
elif tabletype.lower() in ['proc', 'processing', 'proctable', 'proctab', 'int', 'ptable', 'internal']:
tabletype = 'proctable'
elif tabletype.lower() in ['unproc', 'unproctable', 'unproctab', 'unprocessed', 'unprocessing', 'unproc_table']:
tabletype = 'unproctable'
return tabletype
[docs]def translate_type_to_pathname(tabletype, use_specprod=True):
"""
Given the type of table it returns the proper file pathname
Parameters
----------
tabletype : str
Allows for a flexible number of input options, but should refer to either the 'exposure',
'processing', or 'unprocessed' table types.
use_specprod : bool
If True and tablename not specified and tabletype is exposure table, this looks for the
table in the SPECPROD rather than the exptab repository. Default is True.
Returns
-------
tablename : str
Full pathname including extension of the table type. Uses environment variables to determine
the location.
"""
from desispec.workflow.exptable import get_exposure_table_path, get_exposure_table_pathname, get_exposure_table_name
from desispec.workflow.proctable import get_processing_table_path, get_processing_table_pathname, get_processing_table_name
tabletype = standardize_tabletype(tabletype)
if tabletype == 'exptable':
tablename = get_exposure_table_pathname(night=None,usespecprod=use_specprod)
elif tabletype == 'proctable':
tablename = get_processing_table_pathname()
elif tabletype == 'unproctable':
tablepath = get_processing_table_path()
tablename = get_processing_table_name().replace("processing", 'unprocessed')
tablename = pathjoin(tablepath, tablename)
return tablename
[docs]def load_table(tablename=None, tabletype=None, joinsymb='|', verbose=False,
process_mixins=True, use_specprod=True, suppress_logging=False):
"""
Workflow function to read in exposure, processing, and unprocessed tables. It allows for multi-valued table cells, which are
generated from strings using the joinsymb. It reads from the file given by tablename (or the default for table of
type tabletype).
Parameters
----------
tablename : str
Full pathname of where the table should be saved, including the extension. Originally save to
``*.temp.{ext}`` and then moved to ``*.{ext}``. If None, it looks up the default for typetable. If
tabletype is None it uses this to try and identify the tabletype and uses that to get the
default column names and types.
tabletype : str
'exptable', 'proctable', or 'unproctable'. Used if tablename is None to get the default name
for the type of table. Also used to get the column datatypes and defaults.
joinsymb : str
The symbol used to join values in a list/array when saving. Should not be a comma.
verbose : bool
Whether to give verbose amounts of information (True) or succinct/no outputs (False). Default is False.
process_mixins : bool
Whether to look for and try to split strings into lists/arrays. The default is True.
Warning: The exposure and processing tables have default data types which are multi-value.
If this is set to False, the default data types will be incorrect and issues are likely
to arise.
use_specprod : bool
If True and tablename not specified and tabletype is exposure table, this looks for the
table in the SPECPROD rather than the exptab repository. Default is True.
suppress_logging : bool
If True, the log.info() messages are skipped. This
is useful in scripts looping over many tables to reduce the
amount of things printed to the screen.
Returns
-------
table : Table
Either exposure table or processing table that was loaded from tablename (or from default name
based on tabletype). Returns None if the file doesn't exist.
"""
from desispec.workflow.exptable import instantiate_exposure_table, get_exposure_table_column_defs
from desispec.workflow.proctable import instantiate_processing_table, get_processing_table_column_defs
log = get_logger()
if tabletype is not None:
tabletype = standardize_tabletype(tabletype)
if tablename is None:
if tabletype is None:
log.error("Must specify either tablename or tabletype in load_table()")
return None
else:
tablename = translate_type_to_pathname(tabletype, use_specprod=use_specprod)
else:
if tabletype is None:
if not suppress_logging:
log.info("tabletype not given in load_table(), trying to guess based on filename")
filename = os.path.split(tablename)[-1]
if 'exp' in filename or 'etable' in filename:
tabletype = 'exptable'
elif 'unproc' in filename:
tabletype = 'unproctable'
elif 'proc' in filename or 'ptable' in filename:
tabletype = 'proctable'
if tabletype is None:
log.warning(f"Couldn't identify type based on filename {filename}")
else:
if not suppress_logging:
log.info(f"Based on filename {filename}, identified type as {tabletype}")
if os.path.isfile(tablename):
if not suppress_logging:
log.info(f"Found table: {tablename}")
elif tabletype is not None:
if not suppress_logging:
log.info(f'Table {tablename} not found, creating new table of type {tabletype}')
if tabletype == 'exptable':
return instantiate_exposure_table()
elif tabletype == 'unproctable':
return instantiate_exposure_table()
elif tabletype == 'proctable':
return instantiate_processing_table()
else:
log.warning(f"Couldn't create type {tabletype}, unknown table type")
return None
else:
log.error(f"In load_table:\n\tCouldn't find: {tabletype} and tabletype not specified, returning None")
return None
basename, ext = os.path.splitext(tablename)
if ext in ['.csv', '.ecsv']:
table = Table.read(tablename, format=f'ascii{ext}')
if verbose:
log.info("Raw loaded table: ", table.info)
if tabletype in ['exptable', 'unproctable']:
colnames, coltypes, coldefaults = get_exposure_table_column_defs(return_default_values=True)
elif tabletype == 'proctable':
colnames, coltypes, coldefaults = get_processing_table_column_defs(return_default_values=True)
else:
colnames = table.colnames
coltypes = [table[nam].dtype for nam in colnames]
coldefaults = [guess_default_by_dtype(typ) for typ in coltypes]
colnames, coltypes = np.array(colnames), np.array(coltypes)
if len(table) > 0:
outcolumns = []
first_err = True
for nam, typ, default in zip(colnames, coltypes, coldefaults):
if nam not in table.colnames:
if first_err:
log.warning(f"{nam} not in column names of loaded table: {table.colnames}")
first_err = False
else:
log.warning(f"{nam} not in column names of loaded table")
continue
elif type(table[nam]) is Table.MaskedColumn:
data, mask = table[nam].data, table[nam].mask
else:
data, mask = table[nam].data, None
col, dtyp = process_column(data, typ, mask=mask, default=default, joinsymb=joinsymb, \
process_mixins=process_mixins, verbose=verbose)
if dtyp in [list, np.array, np.ndarray]:
out = np.ndarray(shape=(len(col),),dtype=object)
for ii in range(len(col)):
out[ii] = np.atleast_1d(col[ii])
newcol = Table.Column(name=nam, data=out, dtype=dtyp)
else:
newcol = Table.Column(name=nam, data=col, dtype=dtyp)
outcolumns.append(newcol)
table = Table(outcolumns)
else:
table = Table(names=colnames, dtype=coltypes)
else:
table = Table.read(tablename)
if verbose:
log.info("Expanded table: ", table.info)
return table
[docs]def guess_default_by_dtype(typ):
"""
Returns a default value given a data type. To be used in filling a table if no default is given.
Parameters
----------
typ : DataType
The datatype of the element you want a default value for.
Returns
-------
number
default value for that type. Can be int, float, str, list, or array. If it can't guess, it returns the
integer -99 .
"""
if typ in [int ,np.int8, np.int16, np.int32, np.int32]:
return -99
elif typ in [float, np.float32, np.float64]:
return -99.0
elif typ in [str, np.str_]:
return 'unknown'
elif typ == list:
return []
elif typ in [np.array, np.ndarray]:
return np.array([], dtype=str)
else:
return -99
[docs]def process_column(data, typ, mask=None, default=None, joinsymb='|', process_mixins=True,
comma_replacement=';', verbose=False):
"""
Used with load_table to process a Table.Column after being read in. It fills in masked values with defaults,
and identifies and splits mixin columns (columns that should be a list/array) back into their list/array from
their string representation.
Parameters
----------
data : Table.Column or Table.MaskedColumn
Column of data to be checked for masked rows (to be filled with
default) and string-ed versions of lists/arrays that need to be
expanded out.
typ : DataType
The expected datatype of the data in data. May differ from the type of the input data, in which
case the data will be transformed.
mask : np.array
A mask array with True in row elements of the input data array that are masked and False in
row elements that are not masked.
default : any type
The default value to be used for masked rows.
joinsymb : str
The symbol used to join values in a list/array when saving. Should not be a comma.
process_mixins : bool
Whether to look for and try to split strings into lists/arrays. The default is True.
Warning: The exposure and processing tables have default data types which are multi-value.
If this is set to False, the default data types will be incorrect and issues are likely
to arise.
comma_replacement : str
Replace instances of this symbol with commas when loading scalar columns in a table,
as e.g. BADAMPS is used in the pipeline and symbols like ';' are problematic
on the command line.
verbose : bool
Whether to give verbose amounts of information (True) or succinct/no outputs (False). Default is False.
Returns
-------
col, list or np.array
A new data vector similar to input 'data' except with masked values filled in and
mixin strings expanded back into np.array's.
DataType
The data type of a row element in the return col.
"""
log = get_logger()
if default is None:
default = guess_default_by_dtype(typ)
if mask is not None and np.sum(np.bitwise_not(mask)) == 0:
return [default]*len(data), typ
if mask is None:
mask = np.zeros(len(data)).astype(bool)
array_like = (typ in [list, np.array, np.ndarray])
dtyp = typ
if mask is not None:
first = data[np.bitwise_not(mask)][0]
else:
first = data[0]
firsttype = type(first)
if verbose:
log.debug(first, firsttype, firsttype in [str, np.str_])
if process_mixins and firsttype in [str, np.str_] and joinsymb in first:
do_split_str = True
if typ not in [list, np.array, np.ndarray]:
log.warning("Found mixin column with scalar datatype:")
log.info("\tcolname={nam}, first={first}, typefirst={firsttyp}, dtype={typ}")
log.info("\tchanging to np.array datatype")
dtyp = np.ndarray
else:
do_split_str = False
col = []
for rowdat ,rowmsk in zip(data ,mask):
if rowmsk:
col.append(default)
elif do_split_str:
col.append(split_str(rowdat, joinsymb=joinsymb))
elif array_like:
col.append(np.array([rowdat]))
elif type(rowdat) in [str, np.str_] and comma_replacement in rowdat:
col.append(rowdat.replace(comma_replacement, ','))
else:
col.append(rowdat)
if verbose:
log.info(col)
return col, dtyp
[docs]def write_tables(tables, tablenames=None, tabletypes=None, write_empty=False, verbose=False, overwrite=True):
"""
Workflow function to write multiple exposure, processing, and unprocessed tables. It allows for multi-valued
table cells, which are reduced to strings. It writes to a temp file before moving the fully
written file to the name given by tablenames (or the default for table of types tabletypes).
Parameters
----------
tables : list/array of Table
List or array of exposure tables, unprocessed tables, and/or processing table.
tablenames : list/array of str
List or array of the full pathnames to where the tables should be saved,
including the extension. If None, it looks up the default for each of tabletypes.
tabletype : list/array of str
List or array of table types to be used if tablenames is None to get the
default name for each type of table.
write_empty : bool
Whether to write an empty table to disk. The default is False. Warning: code is less robust
to column datatypes on read/write if the table is empty. May cause issues if this is set to True.
overwrite : bool
Whether to overwrite the file on disk if it already exists. Default is currently True.
verbose : bool
Whether to give verbose amounts of information (True) or succinct/no outputs (False). Default is False.
"""
log = get_logger()
if tablenames is None and tabletypes is None:
log.error("Need to define either tablenames or the table types in write_tables")
elif tablenames is None:
for tabl, tabltyp in zip(tables, tabletypes):
if write_empty or len(tabl) > 0:
write_table(tabl, tabletype=tabltyp, verbose=verbose, overwrite=overwrite, write_empty=write_empty)
else:
for tabl, tablname in zip(tables, tablenames):
if write_empty or len(tabl) > 0:
write_table(tabl, tablename=tablname, verbose=verbose, overwrite=overwrite, write_empty=write_empty)
[docs]def load_tables(tablenames=None, tabletypes=None, verbose=False):
"""
Workflow function to read in multiple exposure, processing, and unprocessed tables. It allows for multi-valued
table cells, which are generated from strings using the joinsymb. It reads from the files given by
tablenames (or the default for tables of types in tabletypes).
Parameters
----------
tablename : list/array of str
List or array of the full pathnames of where the tables should be saved,
including the extension.
tabletype : list/array of str
List or array of the table types, which are used if tablenames is None to get
the default name for the type of table. They are also used to get the
column datatypes and defaults.
verbose : bool
Whether to give verbose amounts of information (True) or succinct/no outputs (False). Default is False.
Returns
-------
tabs : list of Table
Either exposure table or processing table that was loaded from tablename (or from default name
based on tabletype). Returns None if the file doesn't exist.
"""
tabs = []
if tablenames is None and tabletypes is None:
pass
elif tablenames is None:
for tabltyp in tabletypes:
tabs.append(load_table(tabletype=tabltyp, verbose=verbose))
elif tabletypes is None:
for tablname in tablenames:
tabs.append(load_table(tablename=tablname, verbose=verbose))
else:
for tablname ,tabltyp in zip(tablenames , tabletypes):
tabs.append(load_table(tablename=tablname, tabletype=tabltyp, verbose=verbose))
return tabs