Source code for jdaviz.configs.mosviz.plugins.parsers

import csv
import glob
import logging
import os
from pathlib import Path

import numpy as np
from astropy import units as u
from astropy.io import fits
from astropy.io.registry import IORegistryError
from astropy.wcs import WCS
from glue.core.data import Data
from glue.core.link_helpers import LinkSame
from specutils import Spectrum1D, SpectrumList, SpectrumCollection

from jdaviz.configs.imviz.plugins.parsers import get_image_data_iterator
from jdaviz.core.registries import data_parser_registry
from jdaviz.core.events import SnackbarMessage

__all__ = ['mos_spec1d_parser', 'mos_spec2d_parser', 'mos_image_parser']

FALLBACK_NAME = "Unspecified"


def _add_to_table(app, data, comp_label):
    """
    Creates a mos table instance in the application data collection is none
    currently exists.

    Parameters
    ----------
    app : `~jdaviz.app.Application`
        The Jdaviz application instance.
    data : array-list
        The set of data to added as a table (i.g. column) component.
    comp_label : str
        The label used to describe the data. Also used as the column header.
    """
    # Add data to the mos viz table object
    if 'MOS Table' not in app.data_collection:
        table_data = Data(label='MOS Table')
        app.add_data(table_data, notify_done=False)

        mos_table = app.data_collection['MOS Table']
        mos_table.add_component(data, comp_label)

        viewer = app.get_viewer("table-viewer")
        viewer.add_data(table_data)
    else:
        mos_table = app.data_collection['MOS Table']
        if comp_label not in mos_table.component_ids():
            mos_table.add_component(data, comp_label)


def _check_is_file(path):
    return isinstance(path, str) and Path(path).is_file()


def _warn_if_not_found(app, file_lists):
    """
    Take a list of labels and associated file lists and send a snackbar
    message if the length of any list is 0.
    """
    found = []
    not_found = []
    for key in file_lists:
        if len(file_lists[key]) == 0:
            not_found.append(key)
        else:
            found.append(key)

    if len(found) == 0:
        raise ValueError("No valid files found in specified directory")
    if len(not_found) > 0:
        warn_msg = "Some files not found: {}".format(", ".join(not_found))
        print(warn_msg)
        warn_msg = SnackbarMessage(warn_msg, color="warning", sender=app)
        app.hub.broadcast(warn_msg)

    return found


def _fields_from_ecsv(fname, fields, delimiter=","):
    parsed_fields = []
    with open(fname, "r") as f:
        reader = csv.DictReader(filter(lambda r: r[0] != "#", f),
                                delimiter=delimiter)
        for row in reader:
            temp_list = []
            for field in fields:
                temp_list.append(row[field])
            parsed_fields.append(temp_list)
    return parsed_fields


@data_parser_registry("mosviz-link-data")
def link_data_in_table(app, data_obj=None):
    """
    Batch links data in the mosviz table viewer.

    Parameters
    ----------
    app : `~jdaviz.app.Application`
        The application-level object used to reference the viewers.
    data_obj : None
        Passed in in order to use the data_parser_registry, otherwise
        not used.
    """
    mos_data = app.session.data_collection['MOS Table']
    wc_spec_ids = []

    # Optimize linking speed through a) delaying link manager updates with a
    # context manager, b) handling intra-row linkage of 1D and 2D spectra in a
    # loop, and c) handling inter-row linkage after that in one fell swoop.
    with app.data_collection.delay_link_manager_update():

        spectra_1d = mos_data.get_component('1D Spectra').data
        spectra_2d = mos_data.get_component('2D Spectra').data

        # Link each 1D spectrum with its corresponding 2D spectrum
        for index in range(len(spectra_1d)):

            spec_1d = spectra_1d[index]
            spec_2d = spectra_2d[index]

            wc_spec_1d = app.session.data_collection[spec_1d].world_component_ids
            wc_spec_2d = app.session.data_collection[spec_2d].world_component_ids

            wc_spec_ids.append(LinkSame(wc_spec_1d[0], wc_spec_2d[1]))

        # Link each 1D spectrum to all other 1D spectra
        first_spec_1d = spectra_1d[0]
        wc_first_spec_1d = app.session.data_collection[first_spec_1d].world_component_ids

        for index in range(1, len(spectra_1d)):
            spec_1d = spectra_1d[index]
            wc_spec_1d = app.session.data_collection[spec_1d].world_component_ids
            wc_spec_ids.append(LinkSame(wc_spec_1d[0], wc_first_spec_1d[0]))

    app.session.data_collection.add_link(wc_spec_ids)


@data_parser_registry("mosviz-nirspec-directory-parser")
def mos_nirspec_directory_parser(app, data_obj, data_labels=None):

    spectra_1d = []
    spectra_2d = []

    # Load spectra
    level3_path = Path(data_obj)
    for file_path in glob.iglob(str(level3_path / '*')):
        if 'x1d' in file_path or 'c1d' in file_path:
            spectra_1d.append(file_path)
        elif 's2d' in file_path:
            spectra_2d.append(file_path)

    # Load images, if present
    image_path = None

    # Potential names of subdirectories where images are stored
    for image_dir_name in ["cutouts", "mosviz_cutouts", "images"]:
        if os.path.isdir(Path(str(level3_path / image_dir_name))):
            image_path = Path(str(level3_path / image_dir_name))
            break
    if image_path is not None:
        images = sorted([file_path for file_path in glob.iglob(str(image_path / '*'))])

        # The amount of images needs to be equal to the amount of rows
        # of the other columns in the table
        if len(images) == len(spectra_1d):
            mos_meta_parser(app, images, ids=images)
            mos_image_parser(app, images)
        else:
            msg = "The number of images in this directory does not match the" \
                  " number of spectra 1d and 2d files, please make the " \
                  "amounts equal or load images separately."
            logging.warning(msg)
            msg = SnackbarMessage(msg, color='warning', sender=app)
            app.hub.broadcast(msg)

    spectra_1d.sort()
    spectra_2d.sort()
    mos_spec1d_parser(app, spectra_1d)
    mos_spec2d_parser(app, spectra_2d)
    mos_meta_parser(app, spectra_2d, spectra=True)
    mos_meta_parser(app, spectra_1d, spectra=True, sp1d=True)


[docs]@data_parser_registry("mosviz-spec1d-parser") def mos_spec1d_parser(app, data_obj, data_labels=None): """ Attempts to parse a 1D spectrum object. Parameters ---------- app : `~jdaviz.app.Application` The application-level object used to reference the viewers. data_obj : str or list or spectrum-like File path, list, or spectrum-like object to be read as a new row in the mosviz table. data_labels : str, optional The label applied to the glue data component. """ if isinstance(data_labels, str): data_labels = [data_labels] # Coerce into list if needed if not isinstance(data_obj, (list, tuple, SpectrumCollection)): data_obj = [data_obj] data_obj = [Spectrum1D.read(x) if _check_is_file(x) else x for x in data_obj] if data_labels is None: data_labels = [f"1D Spectrum {i}" for i in range(len(data_obj))] elif len(data_obj) != len(data_labels): data_labels = [f"{data_labels[0]} {i}" for i in range(len(data_obj))] # Handle the case where the 1d spectrum is a collection of spectra with app.data_collection.delay_link_manager_update(): for i in range(len(data_obj)): app.add_data(data_obj[i], data_labels[i], notify_done=False) _add_to_table(app, data_labels, '1D Spectra')
[docs]@data_parser_registry("mosviz-spec2d-parser") def mos_spec2d_parser(app, data_obj, data_labels=None, add_to_table=True, show_in_viewer=False): """ Attempts to parse a 2D spectrum object. Notes ----- This currently only works with JWST-type data in which the data is in the second hdu of the fits file. Parameters ---------- app : `~jdaviz.app.Application` The application-level object used to reference the viewers. data_obj : str or list or spectrum-like File path, list, or spectrum-like object to be read as a new row in the mosviz table. data_labels : str, optional The label applied to the glue data component. """ def _parse_as_spectrum1d(path): # Parse as a FITS file and assume the WCS is correct with fits.open(path) as hdulist: data = hdulist[1].data header = hdulist[1].header wcs = WCS(header) return Spectrum1D(data, wcs=wcs) # Coerce into list-like object if not isinstance(data_obj, (list, tuple, SpectrumCollection)): data_obj = [data_obj] # If we're given a string, repeat it for each object if isinstance(data_labels, str): if len(data_obj) > 1: data_labels = [f"{data_labels} {i}" for i in range(len(data_obj))] else: data_labels = [data_labels] elif data_labels is None: if len(data_obj) > 1: data_labels = [f"2D Spectrum {i}" for i in range(len(data_obj))] else: data_labels = ['2D Spectrum'] with app.data_collection.delay_link_manager_update(): for index, data in enumerate(data_obj): # If we got a filepath, first try and parse using the Spectrum1D and # SpectrumList parsers, and then fall back to parsing it as a generic # FITS file. if _check_is_file(data): try: data = Spectrum1D.read(data) except IORegistryError: try: data = Spectrum1D.read(data) except IORegistryError: data = _parse_as_spectrum1d(data) # Copy (if present) region to top-level meta object if ('header' in data.meta and 'S_REGION' in data.meta['header'] and 'S_REGION' not in data.meta): data.meta['S_REGION'] = data.meta['header']['S_REGION'] # Set the instrument # TODO: this should not be set to nirspec for all datasets data.meta['INSTRUME'] = 'nirspec' # Get the corresponding label for this data product label = data_labels[index] app.data_collection[label] = data if add_to_table: _add_to_table(app, data_labels, '2D Spectra') if show_in_viewer: if len(data_labels) > 1: raise ValueError("More than one data label provided, unclear " + "which to show in viewer") app.add_data_to_viewer("spectrum-2d-viewer", data_labels[0])
def _load_fits_image_from_filename(filename, app): data_list = [] with fits.open(filename) as hdulist: meta = dict(hdulist[0].header.copy()) data_iter = get_image_data_iterator(app, hdulist, "Image", ext=None) for d, _ in data_iter: # We do not use the generated labels d.meta.update(meta) data_list.append(d) return data_list
[docs]@data_parser_registry("mosviz-image-parser") def mos_image_parser(app, data_obj, data_labels=None, share_image=0): """ Attempts to parse an image-like object or list of images. Parameters ---------- app : `~jdaviz.app.Application` The application-level object used to reference the viewers. data_obj : str or list or image-like File path, list, or image-like object to be read as a new row in the mosviz table. data_labels : str, optional The label applied to the glue data component. share_image : int, optional If 0, images are treated as applying to individual spectra. If non-zero, a single image will be shared by multiple spectra so that clicking a different row in the table does not reload the displayed image. Currently, if non-zero, the provided number must match the number of spectra. """ if data_obj is None: return # The label does not matter here. We overwrite later. if isinstance(data_obj, str): data_obj = _load_fits_image_from_filename(data_obj, app) elif isinstance(data_obj, (list, tuple)) and share_image == 0: temp_data = [] for cur_data_obj in data_obj: if isinstance(cur_data_obj, str): temp_data += _load_fits_image_from_filename(cur_data_obj, app) else: data_iter = get_image_data_iterator(app, cur_data_obj, "Image", ext=None) temp_data += [d[0] for d in data_iter] data_obj = temp_data else: data_iter = get_image_data_iterator(app, data_obj, "Image", ext=None) data_obj = [d[0] for d in data_iter] n_data = len(data_obj) n_data_range = range(n_data) # TODO: Maybe should raise exception? if share_image and n_data > 1: # Just use the first one data_obj = [data_obj[0]] if data_labels is None: if share_image: data_labels = ["Shared Image"] else: data_labels = [f"Image {i}" for i in n_data_range] elif isinstance(data_labels, str): if share_image: data_labels = [data_labels] else: data_labels = [f"{data_labels} {i}" for i in n_data_range] with app.data_collection.delay_link_manager_update(): for i in n_data_range: data_obj[i].label = data_labels[i] app.add_data(data_obj[i], data_labels[i], notify_done=False) if share_image: # Associate this image with multiple spectra data_labels *= share_image # Show it on viewer app.add_data_to_viewer("image-viewer", data_labels[0]) _add_to_table(app, data_labels, 'Images')
@data_parser_registry("mosviz-metadata-parser") def mos_meta_parser(app, data_obj, ids=None, spectra=False, sp1d=False): """ Attempts to parse MOS FITS header metadata. Parameters ---------- app : `~jdaviz.app.Application` The application-level object used to reference the viewers. data_obj : str or list or HDUList File path, list, or an HDUList to extract metadata from. ids : list of str A list with identification strings to be used to label mosviz table rows. Typically, a list with file names. spectra : Boolean In case the FITS objects are related to spectral data. """ if data_obj is None: return # Coerce into list if needed if not isinstance(data_obj, (list, tuple, SpectrumCollection)): data_obj = [data_obj] data_obj = [fits.open(x) if _check_is_file(x) else x for x in data_obj] if np.all([isinstance(x, fits.HDUList) for x in data_obj]): # metadata taken from 2d spectra if spectra and not sp1d: filters = [x[0].header.get("FILTER", FALLBACK_NAME) for x in data_obj] gratings = [x[0].header.get("GRATING", FALLBACK_NAME) for x in data_obj] filters_gratings = [(f+'/'+g) for f, g in zip(filters, gratings)] [x.close() for x in data_obj] # source name can be taken from 1d spectra elif spectra and sp1d: names = _get_object_keyword(data_obj, ids) # source name and coordinates are taken from image headers, if present else: ra = [x[0].header.get("OBJ_RA", float("nan")) for x in data_obj] dec = [x[0].header.get("OBJ_DEC", float("nan")) for x in data_obj] names = _get_object_keyword(data_obj, ids) [x.close() for x in data_obj] else: # TODO: Come up with more robust metadata parsing, perhaps from # the spectra files. logging.warning("Could not parse metadata from input images.") return with app.data_collection.delay_link_manager_update(): # this conditional must mirror the one above if spectra and not sp1d: _add_to_table(app, filters_gratings, "Filter/Grating") elif spectra and sp1d: _add_to_table(app, names, "Source Name") else: _add_to_table(app, names, "Source Name") _add_to_table(app, ra, "R.A.") _add_to_table(app, dec, "Dec.") def _get_object_keyword(data_obj, ids): if ids is not None: # remove leading path to file name ids_short = [os.path.basename(d) for d in ids] names = [x[0].header.get("OBJECT", d) for x, d in zip(data_obj, ids_short)] else: names = [x[0].header.get("OBJECT", FALLBACK_NAME) for x in data_obj] return names @data_parser_registry("mosviz-niriss-parser") def mos_niriss_parser(app, data_dir, obs_label=""): """ Attempts to parse all data for a NIRISS dataset in the specified directory, which should include: - *_direct_*_cal.fits : Direct 2D image - *_direct_*_cat.ecsv : Source catalog - *_WFSSR_*_cal.fits : 2D spectra in first orientation - *_WFSSC_*_cal.fits : 2D spectra in second orientation - *_WFSSR_*_x1d.fits : 1D spectra in first orientatiom - *_WFSSC_*_x1d.fits : 1D spectra in second orientatiom The spectra from the "C" files (horizontal orientation) are showed in the viewers by default. """ p = Path(data_dir) if not p.is_dir(): raise ValueError("{} is not a valid directory path".format(data_dir)) source_cat = sorted(list(p.glob("{}*_direct_*_cat.ecsv".format(obs_label)))) direct_image = sorted(list(p.glob("{}*_direct_dit1*_i2d.fits".format(obs_label)))) spec2d_r = sorted(list(p.glob("{}*_WFSSR_*_cal.fits".format(obs_label)))) spec2d_c = sorted(list(p.glob("{}*_WFSSC_*_cal.fits".format(obs_label)))) spec1d_r = sorted(list(p.glob("{}*_WFSSR_*_x1d.fits".format(obs_label)))) spec1d_c = sorted(list(p.glob("{}*_WFSSC_*_x1d.fits".format(obs_label)))) file_lists = { "Source Catalog": source_cat, "Direct Image": direct_image, "2D Spectra C": spec2d_c, "2D Spectra R": spec2d_r, "1D Spectra C": spec1d_c, "1D Spectra R": spec1d_r } # Convert from pathlib Paths back to strings for key in file_lists: file_lists[key] = [str(x) for x in file_lists[key]] _warn_if_not_found(app, file_lists) # Parse relevant information from source catalog cat_fields = ["id", "sky_centroid.ra", "sky_centroid.dec"] source_ids = [] ras = [] decs = [] image_add = [] pupil_id_dict = {} # Retrieve source information for source_catalog_num in range(0, len(file_lists["Source Catalog"])): cat_file = file_lists["Source Catalog"][source_catalog_num] parsed_cat_fields = _fields_from_ecsv(cat_file, cat_fields, delimiter=" ") pupil = [x for x in cat_file.split("/")[-1].split("_") if x[0] == "F" or x[0] == "f"][0] pupil_id_dict[pupil] = {} for row in parsed_cat_fields: pupil_id_dict[pupil][int(row[0])] = (row[1], row[2]) # Read in direct image filters image_dict = {} filter_wcs = {} # Set up a dictionary of datasets to add to glue add_to_glue = {} print("Loading: Images") for image_file in file_lists["Direct Image"]: im_split = image_file.split("/")[-1].split("_") pupil = [x for x in image_file.split("/")[-1].split("_") if x[0] == "F" or x[0] == "f"][0] image_label = "Image {} {}".format(im_split[0], pupil) with fits.open(image_file) as file_obj: data_iter = get_image_data_iterator(app, file_obj, "Image", ext=None) data_obj = [d[0] for d in data_iter] # We do not use the generated labels image_data = data_obj[0] # Grab the first one. TODO: Error if multiple found? with fits.open(image_file) as temp: filter_wcs[pupil] = temp[1].header image_data.label = image_label add_to_glue[image_label] = image_data image_dict[pupil] = image_label # Parse 2D spectra spec_labels_2d = [] filters = [] for f in ["2D Spectra C", "2D Spectra R"]: for fname in file_lists[f]: print(f"Loading: {f} sources") orientation = f[-1] filter_name = [x for x in fname.split("/")[-1].split("_") if x[0] == "F" or x[0] == "f"][0] with fits.open(fname, memmap=False) as temp: sci_hdus = [] wav_hdus = {} for i in range(len(temp)): if i == 0: filter = temp[0].header["FILTER"] if "EXTNAME" in temp[i].header: if temp[i].header["EXTNAME"] == "SCI": sci_hdus.append(i) wav_hdus[i] = ('WAVELENGTH', temp[i].header['EXTVER']) # Now get a Spectrum1D object for each SCI HDU for sci in sci_hdus: if temp[sci].header["SPORDER"] == 1: data = temp[sci].data meta = temp[sci].header # The wavelength is stored in a WAVELENGTH HDU. This is # a 2D array, but in order to be able to use Spectrum1D # we use the average wavelength for all image rows wav = temp[wav_hdus[sci]].data.mean(axis=0) * u.micron spec2d = Spectrum1D(data * u.one, spectral_axis=wav, meta=meta) spec2d.meta['INSTRUME'] = 'NIRISS' label = "{} Source {} spec2d {}".format(filter_name, temp[sci].header["SOURCEID"], orientation ) ra, dec = pupil_id_dict[filter_name][temp[sci].header["SOURCEID"]] source_ids.append("Source Catalog: {} Source ID: {}". format(filter_name, temp[sci].header["SOURCEID"])) ras.append(ra) decs.append(dec) image_add.append(image_dict[filter_name]) spec_labels_2d.append(label) add_to_glue[label] = spec2d filters.append(filter) spec_labels_1d = [] for f in ["1D Spectra C", "1D Spectra R"]: for fname in file_lists[f]: print(f"Loading: {f} sources") with fits.open(fname, memmap=False) as temp: # TODO: Remove this once valid SRCTYPE values are present in all headers for hdu in temp: if "SRCTYPE" in hdu.header and\ (hdu.header["SRCTYPE"] in ["POINT", "EXTENDED"]): pass else: hdu.header["SRCTYPE"] = "EXTENDED" specs = SpectrumList.read(temp, format="JWST x1d multi") filter_name = [x for x in fname.split("/")[-1].split("_") if x[0] == "F" or x[0] == "f"][0] # Orientation denoted by "C" or "R" orientation = f[-1] for spec in specs: if spec.meta['header']['SPORDER'] == 1 and\ spec.meta['header']['EXTNAME'] == "EXTRACT1D": label = "{} Source {} spec1d {}".format(filter_name, spec.meta['header']['SOURCEID'], orientation ) spec_labels_1d.append(label) add_to_glue[label] = spec # Add the datasets to glue - we do this in one step so that we can easily # optimize by avoiding recomputing the full link graph at every add with app.data_collection.delay_link_manager_update(): for label, data in add_to_glue.items(): app.add_data(data, label, notify_done=False) # We then populate the table inside this context manager as _add_to_table # does operations that also trigger link manager updates. _add_to_table(app, source_ids, "Source ID") _add_to_table(app, ras, "R.A.") _add_to_table(app, decs, "Dec.") _add_to_table(app, image_add, "Images") _add_to_table(app, spec_labels_1d, "1D Spectra") _add_to_table(app, spec_labels_2d, "2D Spectra") _add_to_table(app, filters, "Filter/Grating") app.get_viewer('table-viewer')._shared_image = True