import csv
import glob
import logging
import os
from pathlib import Path
import numpy as np
from astropy import units as u
from astropy.io import fits
from astropy.io.registry import IORegistryError
from astropy.wcs import WCS
from glue.core.data import Data
from glue.core.link_helpers import LinkSame
from specutils import Spectrum1D, SpectrumList, SpectrumCollection
from jdaviz.configs.imviz.plugins.parsers import get_image_data_iterator
from jdaviz.core.registries import data_parser_registry
from jdaviz.core.events import SnackbarMessage
__all__ = ['mos_spec1d_parser', 'mos_spec2d_parser', 'mos_image_parser']
FALLBACK_NAME = "Unspecified"
def _add_to_table(app, data, comp_label):
"""
Creates a mos table instance in the application data collection is none
currently exists.
Parameters
----------
app : `~jdaviz.app.Application`
The Jdaviz application instance.
data : array-list
The set of data to added as a table (i.g. column) component.
comp_label : str
The label used to describe the data. Also used as the column header.
"""
# Add data to the mos viz table object
if 'MOS Table' not in app.data_collection:
table_data = Data(label='MOS Table')
app.add_data(table_data, notify_done=False)
mos_table = app.data_collection['MOS Table']
mos_table.add_component(data, comp_label)
viewer = app.get_viewer("table-viewer")
viewer.add_data(table_data)
else:
mos_table = app.data_collection['MOS Table']
if comp_label not in mos_table.component_ids():
mos_table.add_component(data, comp_label)
def _check_is_file(path):
return isinstance(path, str) and Path(path).is_file()
def _warn_if_not_found(app, file_lists):
"""
Take a list of labels and associated file lists and send a snackbar
message if the length of any list is 0.
"""
found = []
not_found = []
for key in file_lists:
if len(file_lists[key]) == 0:
not_found.append(key)
else:
found.append(key)
if len(found) == 0:
raise ValueError("No valid files found in specified directory")
if len(not_found) > 0:
warn_msg = "Some files not found: {}".format(", ".join(not_found))
print(warn_msg)
warn_msg = SnackbarMessage(warn_msg, color="warning", sender=app)
app.hub.broadcast(warn_msg)
return found
def _fields_from_ecsv(fname, fields, delimiter=","):
parsed_fields = []
with open(fname, "r") as f:
reader = csv.DictReader(filter(lambda r: r[0] != "#", f),
delimiter=delimiter)
for row in reader:
temp_list = []
for field in fields:
temp_list.append(row[field])
parsed_fields.append(temp_list)
return parsed_fields
@data_parser_registry("mosviz-link-data")
def link_data_in_table(app, data_obj=None):
"""
Batch links data in the mosviz table viewer.
Parameters
----------
app : `~jdaviz.app.Application`
The application-level object used to reference the viewers.
data_obj : None
Passed in in order to use the data_parser_registry, otherwise
not used.
"""
mos_data = app.session.data_collection['MOS Table']
wc_spec_ids = []
# Optimize linking speed through a) delaying link manager updates with a
# context manager, b) handling intra-row linkage of 1D and 2D spectra in a
# loop, and c) handling inter-row linkage after that in one fell swoop.
with app.data_collection.delay_link_manager_update():
spectra_1d = mos_data.get_component('1D Spectra').data
spectra_2d = mos_data.get_component('2D Spectra').data
# Link each 1D spectrum with its corresponding 2D spectrum
for index in range(len(spectra_1d)):
spec_1d = spectra_1d[index]
spec_2d = spectra_2d[index]
wc_spec_1d = app.session.data_collection[spec_1d].world_component_ids
wc_spec_2d = app.session.data_collection[spec_2d].world_component_ids
wc_spec_ids.append(LinkSame(wc_spec_1d[0], wc_spec_2d[1]))
# Link each 1D spectrum to all other 1D spectra
first_spec_1d = spectra_1d[0]
wc_first_spec_1d = app.session.data_collection[first_spec_1d].world_component_ids
for index in range(1, len(spectra_1d)):
spec_1d = spectra_1d[index]
wc_spec_1d = app.session.data_collection[spec_1d].world_component_ids
wc_spec_ids.append(LinkSame(wc_spec_1d[0], wc_first_spec_1d[0]))
app.session.data_collection.add_link(wc_spec_ids)
@data_parser_registry("mosviz-nirspec-directory-parser")
def mos_nirspec_directory_parser(app, data_obj, data_labels=None):
spectra_1d = []
spectra_2d = []
# Load spectra
level3_path = Path(data_obj)
for file_path in glob.iglob(str(level3_path / '*')):
if 'x1d' in file_path or 'c1d' in file_path:
spectra_1d.append(file_path)
elif 's2d' in file_path:
spectra_2d.append(file_path)
# Load images, if present
image_path = None
# Potential names of subdirectories where images are stored
for image_dir_name in ["cutouts", "mosviz_cutouts", "images"]:
if os.path.isdir(Path(str(level3_path / image_dir_name))):
image_path = Path(str(level3_path / image_dir_name))
break
if image_path is not None:
images = sorted([file_path for file_path in glob.iglob(str(image_path / '*'))])
# The amount of images needs to be equal to the amount of rows
# of the other columns in the table
if len(images) == len(spectra_1d):
mos_meta_parser(app, images, ids=images)
mos_image_parser(app, images)
else:
msg = "The number of images in this directory does not match the" \
" number of spectra 1d and 2d files, please make the " \
"amounts equal or load images separately."
logging.warning(msg)
msg = SnackbarMessage(msg, color='warning', sender=app)
app.hub.broadcast(msg)
spectra_1d.sort()
spectra_2d.sort()
mos_spec1d_parser(app, spectra_1d)
mos_spec2d_parser(app, spectra_2d)
mos_meta_parser(app, spectra_2d, spectra=True)
mos_meta_parser(app, spectra_1d, spectra=True, sp1d=True)
[docs]@data_parser_registry("mosviz-spec1d-parser")
def mos_spec1d_parser(app, data_obj, data_labels=None):
"""
Attempts to parse a 1D spectrum object.
Parameters
----------
app : `~jdaviz.app.Application`
The application-level object used to reference the viewers.
data_obj : str or list or spectrum-like
File path, list, or spectrum-like object to be read as a new row in
the mosviz table.
data_labels : str, optional
The label applied to the glue data component.
"""
if isinstance(data_labels, str):
data_labels = [data_labels]
# Coerce into list if needed
if not isinstance(data_obj, (list, tuple, SpectrumCollection)):
data_obj = [data_obj]
data_obj = [Spectrum1D.read(x) if _check_is_file(x) else x for x in data_obj]
if data_labels is None:
data_labels = [f"1D Spectrum {i}" for i in range(len(data_obj))]
elif len(data_obj) != len(data_labels):
data_labels = [f"{data_labels[0]} {i}" for i in range(len(data_obj))]
# Handle the case where the 1d spectrum is a collection of spectra
with app.data_collection.delay_link_manager_update():
for i in range(len(data_obj)):
app.add_data(data_obj[i], data_labels[i], notify_done=False)
_add_to_table(app, data_labels, '1D Spectra')
[docs]@data_parser_registry("mosviz-spec2d-parser")
def mos_spec2d_parser(app, data_obj, data_labels=None, add_to_table=True,
show_in_viewer=False):
"""
Attempts to parse a 2D spectrum object.
Notes
-----
This currently only works with JWST-type data in which the data is in the
second hdu of the fits file.
Parameters
----------
app : `~jdaviz.app.Application`
The application-level object used to reference the viewers.
data_obj : str or list or spectrum-like
File path, list, or spectrum-like object to be read as a new row in
the mosviz table.
data_labels : str, optional
The label applied to the glue data component.
"""
def _parse_as_spectrum1d(path):
# Parse as a FITS file and assume the WCS is correct
with fits.open(path) as hdulist:
data = hdulist[1].data
header = hdulist[1].header
wcs = WCS(header)
return Spectrum1D(data, wcs=wcs)
# Coerce into list-like object
if not isinstance(data_obj, (list, tuple, SpectrumCollection)):
data_obj = [data_obj]
# If we're given a string, repeat it for each object
if isinstance(data_labels, str):
if len(data_obj) > 1:
data_labels = [f"{data_labels} {i}" for i in range(len(data_obj))]
else:
data_labels = [data_labels]
elif data_labels is None:
if len(data_obj) > 1:
data_labels = [f"2D Spectrum {i}" for i in range(len(data_obj))]
else:
data_labels = ['2D Spectrum']
with app.data_collection.delay_link_manager_update():
for index, data in enumerate(data_obj):
# If we got a filepath, first try and parse using the Spectrum1D and
# SpectrumList parsers, and then fall back to parsing it as a generic
# FITS file.
if _check_is_file(data):
try:
data = Spectrum1D.read(data)
except IORegistryError:
try:
data = Spectrum1D.read(data)
except IORegistryError:
data = _parse_as_spectrum1d(data)
# Copy (if present) region to top-level meta object
if ('header' in data.meta and
'S_REGION' in data.meta['header'] and
'S_REGION' not in data.meta):
data.meta['S_REGION'] = data.meta['header']['S_REGION']
# Set the instrument
# TODO: this should not be set to nirspec for all datasets
data.meta['INSTRUME'] = 'nirspec'
# Get the corresponding label for this data product
label = data_labels[index]
app.data_collection[label] = data
if add_to_table:
_add_to_table(app, data_labels, '2D Spectra')
if show_in_viewer:
if len(data_labels) > 1:
raise ValueError("More than one data label provided, unclear " +
"which to show in viewer")
app.add_data_to_viewer("spectrum-2d-viewer", data_labels[0])
def _load_fits_image_from_filename(filename, app):
data_list = []
with fits.open(filename) as hdulist:
meta = dict(hdulist[0].header.copy())
data_iter = get_image_data_iterator(app, hdulist, "Image", ext=None)
for d, _ in data_iter: # We do not use the generated labels
d.meta.update(meta)
data_list.append(d)
return data_list
[docs]@data_parser_registry("mosviz-image-parser")
def mos_image_parser(app, data_obj, data_labels=None, share_image=0):
"""
Attempts to parse an image-like object or list of images.
Parameters
----------
app : `~jdaviz.app.Application`
The application-level object used to reference the viewers.
data_obj : str or list or image-like
File path, list, or image-like object to be read as a new row in
the mosviz table.
data_labels : str, optional
The label applied to the glue data component.
share_image : int, optional
If 0, images are treated as applying to individual spectra. If non-zero,
a single image will be shared by multiple spectra so that clicking a
different row in the table does not reload the displayed image.
Currently, if non-zero, the provided number must match the number of
spectra.
"""
if data_obj is None:
return
# The label does not matter here. We overwrite later.
if isinstance(data_obj, str):
data_obj = _load_fits_image_from_filename(data_obj, app)
elif isinstance(data_obj, (list, tuple)) and share_image == 0:
temp_data = []
for cur_data_obj in data_obj:
if isinstance(cur_data_obj, str):
temp_data += _load_fits_image_from_filename(cur_data_obj, app)
else:
data_iter = get_image_data_iterator(app, cur_data_obj, "Image", ext=None)
temp_data += [d[0] for d in data_iter]
data_obj = temp_data
else:
data_iter = get_image_data_iterator(app, data_obj, "Image", ext=None)
data_obj = [d[0] for d in data_iter]
n_data = len(data_obj)
n_data_range = range(n_data)
# TODO: Maybe should raise exception?
if share_image and n_data > 1: # Just use the first one
data_obj = [data_obj[0]]
if data_labels is None:
if share_image:
data_labels = ["Shared Image"]
else:
data_labels = [f"Image {i}" for i in n_data_range]
elif isinstance(data_labels, str):
if share_image:
data_labels = [data_labels]
else:
data_labels = [f"{data_labels} {i}" for i in n_data_range]
with app.data_collection.delay_link_manager_update():
for i in n_data_range:
data_obj[i].label = data_labels[i]
app.add_data(data_obj[i], data_labels[i], notify_done=False)
if share_image:
# Associate this image with multiple spectra
data_labels *= share_image
# Show it on viewer
app.add_data_to_viewer("image-viewer", data_labels[0])
_add_to_table(app, data_labels, 'Images')
@data_parser_registry("mosviz-metadata-parser")
def mos_meta_parser(app, data_obj, ids=None, spectra=False, sp1d=False):
"""
Attempts to parse MOS FITS header metadata.
Parameters
----------
app : `~jdaviz.app.Application`
The application-level object used to reference the viewers.
data_obj : str or list or HDUList
File path, list, or an HDUList to extract metadata from.
ids : list of str
A list with identification strings to be used to label mosviz
table rows. Typically, a list with file names.
spectra : Boolean
In case the FITS objects are related to spectral data.
"""
if data_obj is None:
return
# Coerce into list if needed
if not isinstance(data_obj, (list, tuple, SpectrumCollection)):
data_obj = [data_obj]
data_obj = [fits.open(x) if _check_is_file(x) else x for x in data_obj]
if np.all([isinstance(x, fits.HDUList) for x in data_obj]):
# metadata taken from 2d spectra
if spectra and not sp1d:
filters = [x[0].header.get("FILTER", FALLBACK_NAME) for x in data_obj]
gratings = [x[0].header.get("GRATING", FALLBACK_NAME) for x in data_obj]
filters_gratings = [(f+'/'+g) for f, g in zip(filters, gratings)]
[x.close() for x in data_obj]
# source name can be taken from 1d spectra
elif spectra and sp1d:
names = _get_object_keyword(data_obj, ids)
# source name and coordinates are taken from image headers, if present
else:
ra = [x[0].header.get("OBJ_RA", float("nan")) for x in data_obj]
dec = [x[0].header.get("OBJ_DEC", float("nan")) for x in data_obj]
names = _get_object_keyword(data_obj, ids)
[x.close() for x in data_obj]
else:
# TODO: Come up with more robust metadata parsing, perhaps from
# the spectra files.
logging.warning("Could not parse metadata from input images.")
return
with app.data_collection.delay_link_manager_update():
# this conditional must mirror the one above
if spectra and not sp1d:
_add_to_table(app, filters_gratings, "Filter/Grating")
elif spectra and sp1d:
_add_to_table(app, names, "Source Name")
else:
_add_to_table(app, names, "Source Name")
_add_to_table(app, ra, "R.A.")
_add_to_table(app, dec, "Dec.")
def _get_object_keyword(data_obj, ids):
if ids is not None:
# remove leading path to file name
ids_short = [os.path.basename(d) for d in ids]
names = [x[0].header.get("OBJECT", d) for x, d in zip(data_obj, ids_short)]
else:
names = [x[0].header.get("OBJECT", FALLBACK_NAME) for x in data_obj]
return names
@data_parser_registry("mosviz-niriss-parser")
def mos_niriss_parser(app, data_dir, obs_label=""):
"""
Attempts to parse all data for a NIRISS dataset in the specified
directory, which should include:
- *_direct_*_cal.fits : Direct 2D image
- *_direct_*_cat.ecsv : Source catalog
- *_WFSSR_*_cal.fits : 2D spectra in first orientation
- *_WFSSC_*_cal.fits : 2D spectra in second orientation
- *_WFSSR_*_x1d.fits : 1D spectra in first orientatiom
- *_WFSSC_*_x1d.fits : 1D spectra in second orientatiom
The spectra from the "C" files (horizontal orientation) are showed
in the viewers by default.
"""
p = Path(data_dir)
if not p.is_dir():
raise ValueError("{} is not a valid directory path".format(data_dir))
source_cat = sorted(list(p.glob("{}*_direct_*_cat.ecsv".format(obs_label))))
direct_image = sorted(list(p.glob("{}*_direct_dit1*_i2d.fits".format(obs_label))))
spec2d_r = sorted(list(p.glob("{}*_WFSSR_*_cal.fits".format(obs_label))))
spec2d_c = sorted(list(p.glob("{}*_WFSSC_*_cal.fits".format(obs_label))))
spec1d_r = sorted(list(p.glob("{}*_WFSSR_*_x1d.fits".format(obs_label))))
spec1d_c = sorted(list(p.glob("{}*_WFSSC_*_x1d.fits".format(obs_label))))
file_lists = {
"Source Catalog": source_cat,
"Direct Image": direct_image,
"2D Spectra C": spec2d_c,
"2D Spectra R": spec2d_r,
"1D Spectra C": spec1d_c,
"1D Spectra R": spec1d_r
}
# Convert from pathlib Paths back to strings
for key in file_lists:
file_lists[key] = [str(x) for x in file_lists[key]]
_warn_if_not_found(app, file_lists)
# Parse relevant information from source catalog
cat_fields = ["id", "sky_centroid.ra", "sky_centroid.dec"]
source_ids = []
ras = []
decs = []
image_add = []
pupil_id_dict = {}
# Retrieve source information
for source_catalog_num in range(0, len(file_lists["Source Catalog"])):
cat_file = file_lists["Source Catalog"][source_catalog_num]
parsed_cat_fields = _fields_from_ecsv(cat_file, cat_fields, delimiter=" ")
pupil = [x
for x in cat_file.split("/")[-1].split("_")
if x[0] == "F" or x[0] == "f"][0]
pupil_id_dict[pupil] = {}
for row in parsed_cat_fields:
pupil_id_dict[pupil][int(row[0])] = (row[1], row[2])
# Read in direct image filters
image_dict = {}
filter_wcs = {}
# Set up a dictionary of datasets to add to glue
add_to_glue = {}
print("Loading: Images")
for image_file in file_lists["Direct Image"]:
im_split = image_file.split("/")[-1].split("_")
pupil = [x
for x in image_file.split("/")[-1].split("_")
if x[0] == "F" or x[0] == "f"][0]
image_label = "Image {} {}".format(im_split[0], pupil)
with fits.open(image_file) as file_obj:
data_iter = get_image_data_iterator(app, file_obj, "Image", ext=None)
data_obj = [d[0] for d in data_iter] # We do not use the generated labels
image_data = data_obj[0] # Grab the first one. TODO: Error if multiple found?
with fits.open(image_file) as temp:
filter_wcs[pupil] = temp[1].header
image_data.label = image_label
add_to_glue[image_label] = image_data
image_dict[pupil] = image_label
# Parse 2D spectra
spec_labels_2d = []
filters = []
for f in ["2D Spectra C", "2D Spectra R"]:
for fname in file_lists[f]:
print(f"Loading: {f} sources")
orientation = f[-1]
filter_name = [x
for x in fname.split("/")[-1].split("_")
if x[0] == "F" or x[0] == "f"][0]
with fits.open(fname, memmap=False) as temp:
sci_hdus = []
wav_hdus = {}
for i in range(len(temp)):
if i == 0:
filter = temp[0].header["FILTER"]
if "EXTNAME" in temp[i].header:
if temp[i].header["EXTNAME"] == "SCI":
sci_hdus.append(i)
wav_hdus[i] = ('WAVELENGTH', temp[i].header['EXTVER'])
# Now get a Spectrum1D object for each SCI HDU
for sci in sci_hdus:
if temp[sci].header["SPORDER"] == 1:
data = temp[sci].data
meta = temp[sci].header
# The wavelength is stored in a WAVELENGTH HDU. This is
# a 2D array, but in order to be able to use Spectrum1D
# we use the average wavelength for all image rows
wav = temp[wav_hdus[sci]].data.mean(axis=0) * u.micron
spec2d = Spectrum1D(data * u.one, spectral_axis=wav, meta=meta)
spec2d.meta['INSTRUME'] = 'NIRISS'
label = "{} Source {} spec2d {}".format(filter_name,
temp[sci].header["SOURCEID"],
orientation
)
ra, dec = pupil_id_dict[filter_name][temp[sci].header["SOURCEID"]]
source_ids.append("Source Catalog: {} Source ID: {}".
format(filter_name, temp[sci].header["SOURCEID"]))
ras.append(ra)
decs.append(dec)
image_add.append(image_dict[filter_name])
spec_labels_2d.append(label)
add_to_glue[label] = spec2d
filters.append(filter)
spec_labels_1d = []
for f in ["1D Spectra C", "1D Spectra R"]:
for fname in file_lists[f]:
print(f"Loading: {f} sources")
with fits.open(fname, memmap=False) as temp:
# TODO: Remove this once valid SRCTYPE values are present in all headers
for hdu in temp:
if "SRCTYPE" in hdu.header and\
(hdu.header["SRCTYPE"] in ["POINT", "EXTENDED"]):
pass
else:
hdu.header["SRCTYPE"] = "EXTENDED"
specs = SpectrumList.read(temp, format="JWST x1d multi")
filter_name = [x
for x in fname.split("/")[-1].split("_")
if x[0] == "F" or x[0] == "f"][0]
# Orientation denoted by "C" or "R"
orientation = f[-1]
for spec in specs:
if spec.meta['header']['SPORDER'] == 1 and\
spec.meta['header']['EXTNAME'] == "EXTRACT1D":
label = "{} Source {} spec1d {}".format(filter_name,
spec.meta['header']['SOURCEID'],
orientation
)
spec_labels_1d.append(label)
add_to_glue[label] = spec
# Add the datasets to glue - we do this in one step so that we can easily
# optimize by avoiding recomputing the full link graph at every add
with app.data_collection.delay_link_manager_update():
for label, data in add_to_glue.items():
app.add_data(data, label, notify_done=False)
# We then populate the table inside this context manager as _add_to_table
# does operations that also trigger link manager updates.
_add_to_table(app, source_ids, "Source ID")
_add_to_table(app, ras, "R.A.")
_add_to_table(app, decs, "Dec.")
_add_to_table(app, image_add, "Images")
_add_to_table(app, spec_labels_1d, "1D Spectra")
_add_to_table(app, spec_labels_2d, "2D Spectra")
_add_to_table(app, filters, "Filter/Grating")
app.get_viewer('table-viewer')._shared_image = True