Source code for jdaviz.configs.specviz.plugins.line_analysis.line_analysis

import os
import warnings

import numpy as np
from glue.core.message import (SubsetDeleteMessage,
                               SubsetUpdateMessage)
from glue_jupyter.common.toolbar_vuetify import read_icon
from traitlets import Bool, List, Float, Unicode, observe
from astropy import units as u
from specutils import analysis, Spectrum1D
from specutils.manipulation import extract_region

from jdaviz.core.custom_traitlets import FloatHandleEmpty
from jdaviz.core.events import (AddDataMessage,
                                RemoveDataMessage,
                                SpectralMarksChangedMessage,
                                LineIdentifyMessage,
                                RedshiftMessage)
from jdaviz.core.marks import (LineAnalysisContinuum,
                               LineAnalysisContinuumCenter,
                               LineAnalysisContinuumLeft,
                               LineAnalysisContinuumRight,
                               ShadowLine)
from jdaviz.core.registries import tray_registry
from jdaviz.core.template_mixin import (PluginTemplateMixin,
                                        DatasetSelectMixin,
                                        SpectralSubsetSelectMixin,
                                        DatasetSpectralSubsetValidMixin,
                                        SubsetSelect,
                                        SPATIAL_DEFAULT_TEXT)
from jdaviz.core.user_api import PluginUserApi
from jdaviz.core.tools import ICON_DIR


__all__ = ['LineAnalysis']

FUNCTIONS = {"Line Flux": analysis.line_flux,
             "Equivalent Width": analysis.equivalent_width,
             "Gaussian Sigma Width": analysis.gaussian_sigma_width,
             "Gaussian FWHM": analysis.gaussian_fwhm,
             "Centroid": analysis.centroid}


def _coerce_unit(quantity):
    """
    coerce the unit on a quantity to have a single length unit (will take the first length
    unit with a power of 1) and to strip any constants from the units.
    """
    # for some reason, quantity.unit.powers gives floats which then raise an error in
    # quantity.to and we want to avoid casting to integer in case of fractional powers
    unit = u.Unit(str(quantity.unit))
    unit_types = [str(subunit.physical_type) for subunit in unit.bases]
    length_inds = [ind for ind, (base, power, unit_type)
                   in enumerate(zip(unit.bases, unit.powers, unit_types))
                   if unit_type == 'length' and abs(power) == 1]
    # we want to force all length units (not area) to use the same base unit so they can
    # combine/cancel appropriately
    coerced_bases = [unit.bases[i if i not in length_inds else length_inds[0]]
                     for i in range(len(unit.bases))]
    coerced_unit_string = ' * '.join([f'{base}**{power}'
                                      for base, power in zip(coerced_bases, unit.powers)])
    coerced_quantity = quantity.to(coerced_unit_string)
    if getattr(quantity, 'uncertainty', None) is not None:
        coerced_quantity.uncertainty = quantity.uncertainty.to(coerced_unit_string)
    return coerced_quantity


[docs]@tray_registry('specviz-line-analysis', label="Line Analysis", viewer_requirements='spectrum') class LineAnalysis(PluginTemplateMixin, DatasetSelectMixin, SpectralSubsetSelectMixin, DatasetSpectralSubsetValidMixin): """ The Line Analysis plugin returns specutils analysis for a single spectral line. See the :ref:`Line Analysis Plugin Documentation <line-analysis>` for more details. Only the following attributes and methods are available through the :ref:`public plugin API <plugin-apis>`: * :meth:`~jdaviz.core.template_mixin.PluginTemplateMixin.show` * :meth:`~jdaviz.core.template_mixin.PluginTemplateMixin.open_in_tray` * ``dataset`` (:class:`~jdaviz.core.template_mixin.DatasetSelect`): Dataset to use for computing line statistics. * ``spatial_subset`` (:class:`~jdaviz.core.template_mixin.SubsetSelect`): Select which region's collapsed spectrum to analyze or ``Entire Cube``. * ``spectral_subset`` (:class:`~jdaviz.core.template_mixin.SubsetSelect`): Subset to use for the line, or ``Entire Spectrum``. * ``continuum`` (:class:`~jdaviz.core.template_mixin.SubsetSelect`): Subset to use for the continuum, or ``Surrounding`` to use a region surrounding the subset set in ``spectral_subset``. * :attr:`width`: Width, relative to the overall line spectral region, to fit the linear continuum (excluding the region containing the line). If 1, will use endpoints within line region only. * :meth:`show_continuum_marks` * :meth:`get_results` """ dialog = Bool(False).tag(sync=True) template_file = __file__, "line_analysis.vue" spatial_subset_items = List().tag(sync=True) spatial_subset_selected = Unicode().tag(sync=True) continuum_subset_items = List().tag(sync=True) continuum_subset_selected = Unicode().tag(sync=True) width = FloatHandleEmpty(3).tag(sync=True) results_computing = Bool(False).tag(sync=True) results = List().tag(sync=True) results_centroid = Float().tag(sync=True) # stored in AA units line_items = List([]).tag(sync=True) sync_identify = Bool(True).tag(sync=True) sync_identify_icon_enabled = Unicode(read_icon(os.path.join(ICON_DIR, 'line_select.svg'), 'svg+xml')).tag(sync=True) # noqa sync_identify_icon_disabled = Unicode(read_icon(os.path.join(ICON_DIR, 'line_select_disabled.svg'), 'svg+xml')).tag(sync=True) # noqa identified_line = Unicode("").tag(sync=True) selected_line = Unicode("").tag(sync=True) selected_line_redshift = Float(0).tag(sync=True) def __init__(self, *args, **kwargs): self._default_spectrum_viewer_reference_name = kwargs.get( "spectrum_viewer_reference_name", "spectrum-viewer" ) super().__init__(**kwargs) self.update_results(None) self.continuum = SubsetSelect(self, 'continuum_subset_items', 'continuum_subset_selected', default_text='Surrounding', allowed_type='spectral') # when accessing the selected data, access the spectrum-viewer version self.dataset._viewers = [self._default_spectrum_viewer_reference_name] # require entries to be in spectrum-viewer (not other cubeviz images, etc) self.dataset.add_filter('layer_in_spectrum_viewer') if self.app.state.settings.get("configuration") == "cubeviz": self.spatial_subset = SubsetSelect(self, 'spatial_subset_items', 'spatial_subset_selected', default_text=SPATIAL_DEFAULT_TEXT, allowed_type='spatial') else: self.spatial_subset = None self.hub.subscribe(self, AddDataMessage, handler=self._on_viewer_data_changed) self.hub.subscribe(self, RemoveDataMessage, handler=self._on_viewer_data_changed) self.hub.subscribe(self, SubsetDeleteMessage, handler=self._on_viewer_subsets_changed) self.hub.subscribe(self, SubsetUpdateMessage, handler=self._on_viewer_subsets_changed) self.hub.subscribe(self, SpectralMarksChangedMessage, handler=self._on_plotted_lines_changed) self.hub.subscribe(self, LineIdentifyMessage, handler=self._on_identified_line_changed) @property def user_api(self): return PluginUserApi(self, expose=('dataset', 'spatial_subset', 'spectral_subset', 'continuum', 'width', 'show_continuum_marks', 'get_results')) def _on_viewer_data_changed(self, msg): viewer_id = self.app._viewer_item_by_reference( self._default_spectrum_viewer_reference_name ).get('id') if msg is None or msg.viewer_id != viewer_id or msg.data is None: return viewer = self.app.get_viewer(self._default_spectrum_viewer_reference_name) get_data_kwargs = {'data_label': msg.data.label} if self.config == 'cubeviz': get_data_kwargs['function'] = getattr(viewer.state, 'function', None) viewer_data = self.app._jdaviz_helper.get_data(**get_data_kwargs) # If no data is currently plotted, don't attempt to update if viewer_data is None: return if viewer_data.spectral_axis.unit == u.pix: # disable the plugin until we can address this properly (either using the wavelength # solution to support pixels in line-lists, or properly displaying the extracted # 1d spectrum in wavelength-space) self.disabled_msg = 'Line Analysis unavailable when x-axis is in pixels' else: self.disabled_msg = '' def _on_viewer_subsets_changed(self, msg): """ Update the statistics if any of the referenced regions have changed Parameters ---------- msg : `glue.core.Message` The glue message passed to this callback method. """ if (msg.subset.label in [self.spectral_subset_selected, self.spatial_subset_selected, self.continuum_subset_selected] and self.plugin_opened): self._calculate_statistics() @observe('plugin_opened') def _on_plugin_opened_changed(self, *args): if self.disabled_msg: return # toggle continuum lines in spectrum viewer based on whether this plugin # is currently open in the tray self.show_continuum_marks(self.plugin_opened)
[docs] def show_continuum_marks(self, show=True): """ Show (or hide) the marks indicating the continuum on the spectrum viewer. Parameters ---------- show : bool Whether to show (or hide) the marks """ for pos, mark in self.marks.items(): mark.visible = show if show: self._calculate_statistics(ignore_plugin_closed=True)
@property def marks(self): marks = {} viewer = self.app.get_viewer(self._default_spectrum_viewer_reference_name) for mark in viewer.figure.marks: if isinstance(mark, LineAnalysisContinuum): # NOTE: we don't use isinstance anymore because of nested inheritance if mark.__class__.__name__ == 'LineAnalysisContinuumLeft': marks['left'] = mark elif mark.__class__.__name__ == 'LineAnalysisContinuumCenter': marks['center'] = mark elif mark.__class__.__name__ == 'LineAnalysisContinuumRight': marks['right'] = mark if not len(marks): if not viewer.state.reference_data: # we don't have data yet for scales, defer initializing return {} # then haven't been initialized yet, so initialize with empty # marks that will be populated once the first analysis is done. marks = {'left': LineAnalysisContinuumLeft(viewer, visible=self.plugin_opened), 'center': LineAnalysisContinuumCenter(viewer, visible=self.plugin_opened), 'right': LineAnalysisContinuumRight(viewer, visible=self.plugin_opened)} shadows = [ShadowLine(mark, shadow_width=2) for mark in marks.values()] # NOTE: += won't trigger the figure to notice new marks viewer.figure.marks = viewer.figure.marks + shadows + list(marks.values()) return marks
[docs] def update_results(self, results=None, mark_x={}, mark_y={}): for pos, mark in self.marks.items(): mark.update_xy(mark_x.get(pos, []), mark_y.get(pos, [])) if results is None: self.results = [{'function': function, 'result': ''} for function in FUNCTIONS] else: self.results = results self.results_computing = False
[docs] def get_results(self): # user-facing API call to force updating and retrieving results, even if the plugin # is closed if not self.spectral_subset_valid: valid, spec_range, subset_range = self._check_dataset_spectral_subset_valid(return_ranges=True) # noqa raise ValueError(f"spectral subset '{self.spectral_subset.selected}' {subset_range} is outside data range of '{self.dataset.selected}' {spec_range}") # noqa self._calculate_statistics(ignore_plugin_closed=True) return self.results
def _on_plotted_lines_changed(self, msg): self.line_marks = msg.marks self.line_items = msg.names_rest if self.selected_line not in self.line_items: # default to identified line if available self.selected_line = self.identified_line def _on_identified_line_changed(self, msg): self.identified_line = msg.name_rest if self.sync_identify or not self.selected_line: # then we should follow the identified line, either because of sync # or because nothing has been selected yet. # if results aren't available yet, then we'll wait until they are # in which case we'll default to the identified line self.selected_line = self.identified_line @observe("spatial_subset_selected", "spectral_subset_selected", "dataset_selected", "continuum_subset_selected", "width") def _calculate_statistics(self, *args, **kwargs): """ Run the line analysis functions on the selected data/subset and display the results. """ if not hasattr(self, 'dataset') or self.app._jdaviz_helper is None or self.dataset_selected == '': # noqa # during initial init, this can trigger before the component is initialized return # call directly since this observe may be triggered before the spectral_subset_valid # traitlet is updated if not self._check_dataset_spectral_subset_valid(): # skip gracefully, if the user called from get_results, and error would be raised there self.update_results(None) return # show spinner with overlay self.results_computing = True full_spectrum = self.dataset.selected_spectrum_for_spatial_subset(self.spatial_subset_selected) # noqa if (full_spectrum is None or self.width == "" or (not self.plugin_opened and not kwargs.get('ignore_plugin_closed'))): # this can happen DURING a unit conversion change self.update_results(None) return spectral_axis = full_spectrum.spectral_axis if self.continuum_subset_selected == self.spectral_subset_selected: # already raised a validation error in the UI self.update_results(None) return sr = self.app.get_subsets().get(self.spectral_subset_selected) if self.spectral_subset_selected == "Entire Spectrum": spectrum = full_spectrum else: spectrum = extract_region(full_spectrum, sr, return_single_spectrum=True) sr_lower = spectrum.spectral_axis[spectrum.spectral_axis.value >= sr.lower.value][0] sr_upper = spectrum.spectral_axis[spectrum.spectral_axis.value <= sr.upper.value][-1] # compute continuum if self.continuum_subset_selected == "Surrounding" and self.spectral_subset_selected == "Entire Spectrum": # noqa # we know we'll just use the endpoints, so let's be efficient and not even # try extracting from the region continuum_mask = np.array([0, len(spectral_axis)-1]) mark_x = {'left': np.array([]), 'center': np.array([min(spectral_axis.value), max(spectral_axis.value)]), 'right': np.array([])} elif self.continuum_subset_selected == "Surrounding": # self.spectral_subset_selected != "Entire Spectrum" if self.width > 10 or self.width < 1: # DEV NOTE: if changing the limits, make sure to also update the form validation # rules in line_analysis.vue self.update_results(None) return spectral_region_width = sr_upper - sr_lower # convert width from total relative width, to width per "side" width = (self.width - 1) / 2 left, = np.where((spectral_axis < sr_lower) & (spectral_axis > sr_lower - spectral_region_width*width)) if not len(left): # then no points matching the width are available outside the line region, # so we'll default to the left-most point of the line region. left, = np.where(spectral_axis == min(spectrum.spectral_axis)) right, = np.where((spectral_axis > sr_upper) & (spectral_axis < sr_upper + spectral_region_width*width)) if not len(right): # then no points matching the width are available outside the line region, # so we'll default to the right-most point of the line region. right, = np.where(spectral_axis == max(spectrum.spectral_axis)) continuum_mask = np.concatenate((left, right)) mark_x = {'left': np.array([min(spectral_axis.value[continuum_mask]), sr_lower.value]), 'center': np.array([sr_lower.value, sr_upper.value]), 'right': np.array([sr_upper.value, max(spectral_axis.value[continuum_mask])])} else: # we'll access the mask of the continuum and then apply that to the spectrum. For a # spatially-collapsed spectrum in cubeviz, this will access the mask from the full # cube, but still apply that to the spatially-collapsed spectrum. continuum_mask = ~self._specviz_helper.get_data( self.dataset.selected, spectral_subset=self.continuum_subset_selected).mask spectral_axis_nanmasked = spectral_axis.value.copy() spectral_axis_nanmasked[~continuum_mask] = np.nan if self.spectral_subset_selected == "Entire Spectrum": mark_x = {'left': spectral_axis_nanmasked, 'center': spectral_axis.value, 'right': []} else: mark_x = {'left': spectral_axis_nanmasked[spectral_axis.value < sr_lower.value], 'right': spectral_axis_nanmasked[spectral_axis.value > sr_upper.value]} # Center should extend (at least) across the line region between the full # range defined by the continuum subset(s). # OK for mark_x to be all NaNs. with warnings.catch_warnings(): warnings.simplefilter('ignore', category=RuntimeWarning) mark_x_min = np.nanmin(mark_x['left']) mark_x_max = np.nanmax(mark_x['right']) left_min = np.nanmin([mark_x_min, sr_lower.value]) right_max = np.nanmax([mark_x_max, sr_upper.value]) mark_x['center'] = np.array([left_min, right_max]) continuum_x = spectral_axis[continuum_mask].value min_x = min(spectral_axis.value) continuum_y = full_spectrum.flux[continuum_mask].value # DEV NOTE: could replace this with internal calls to the model fitting infrastructure # to enable other model-types and to give visual feedback (by labeling the model # as line_analysis:continuum, for example) slope, intercept = np.polyfit(continuum_x-min_x, continuum_y, deg=1) continuum = slope * (spectrum.spectral_axis.value-min_x) + intercept mark_y = {k: slope * (v-min_x) + intercept for k, v in mark_x.items()} def _uncertainty(result): if getattr(result, 'uncertainty', None) is not None: # we'll keep the uncertainty and result in the same unit (so # we only have to show the unit at the end) if np.isnan(result.uncertainty.value) or np.isinf(result.uncertainty.value): return '' return str(result.uncertainty.to_value(result.unit)) else: return '' temp_results = [] spec_subtracted = spectrum - continuum if spec_subtracted.mask is not None: # temporary fix while mask may contain None: spec_subtracted.mask = spec_subtracted.mask.astype(bool) for function in FUNCTIONS: # TODO: update specutils to allow ALL analysis to take regions and continuum so we # don't need these if statements if function == "Line Flux": flux_unit = spec_subtracted.flux.unit # If the flux unit is equivalent to Jy, or Jy per spaxel for Cubeviz, # enforce integration in frequency space if (flux_unit.is_equivalent(u.Jy) or flux_unit.is_equivalent(u.Jy/u.sr)): # Perform integration in frequency space freq_spec = Spectrum1D( spectral_axis=spec_subtracted.spectral_axis.to(u.Hz, equivalencies=u.spectral()), flux=spec_subtracted.flux, uncertainty=spec_subtracted.uncertainty) raw_result = analysis.line_flux(freq_spec) # When flux is equivalent to Jy, lineflux result should be shown in W/m2 if flux_unit.is_equivalent(u.Jy/u.sr): final_unit = u.Unit('W/(m2 sr)') else: final_unit = u.Unit('W/m2') temp_result = raw_result.to(final_unit) if getattr(raw_result, 'uncertainty', None) is not None: temp_result.uncertainty = raw_result.uncertainty.to(final_unit) # If the flux unit is instead equivalent to power density # (Jy, but defined in wavelength), enforce integration in wavelength space elif (flux_unit.is_equivalent(u.Unit('W/(m2 m)')) or flux_unit.is_equivalent(u.Unit('W/(m2 m sr)'))): # Perform integration in wavelength space using MKS unit (meters) wave_spec = Spectrum1D( spectral_axis=spec_subtracted.spectral_axis.to(u.m, equivalencies=u.spectral()), flux=spec_subtracted.flux, uncertainty=spec_subtracted.uncertainty) raw_result = analysis.line_flux(wave_spec) # When flux is equivalent to Jy, lineflux result should be shown in W/m2 if flux_unit.is_equivalent(u.Unit('W/(m2 m)'/u.sr)): final_unit = u.Unit('W/(m2 sr)') else: final_unit = u.Unit('W/m2') temp_result = raw_result.to(final_unit) if getattr(raw_result, 'uncertainty', None) is not None: temp_result.uncertainty = raw_result.uncertainty.to(final_unit) # Otherwise, just rely on the default specutils line_flux result else: temp_result = analysis.line_flux(spec_subtracted) elif function == "Equivalent Width": if np.any(continuum <= 0): temp_results.append({'function': function, 'result': '', 'error_msg': 'N/A (continuum <= 0)', 'uncertainty': '', 'unit': ''}) continue else: spec_normalized = spectrum / continuum if spec_normalized.mask is not None: spec_normalized.mask = spec_normalized.mask.astype(bool) temp_result = FUNCTIONS[function](spec_normalized) elif function == "Centroid": # TODO: update specutils to be consistent with region vs regions and default to # regions=None so this elif can be removed temp_result = FUNCTIONS[function](spec_subtracted, region=None) self.results_centroid = temp_result.to_value(u.AA, equivalencies=u.spectral()) else: temp_result = FUNCTIONS[function](spec_subtracted) temp_result = _coerce_unit(temp_result) temp_results.append({'function': function, 'result': str(temp_result.value), 'uncertainty': _uncertainty(temp_result), 'unit': str(temp_result.unit)}) if not self.selected_line and self.identified_line: # default to the identified line self.selected_line = self.identified_line self.update_results(temp_results, mark_x, mark_y) def _compute_redshift_for_selected_line(self): index = self.line_items.index(self.selected_line) line_mark = self.line_marks[index] rest_value = (line_mark.rest_value * line_mark._x_unit).to_value(u.AA, equivalencies=u.spectral()) return (self.results_centroid - rest_value) / rest_value @observe('sync_identify') def _sync_identify_changed(self, event={}): if not event.get('new', self.sync_identify): return if not self.identified_line and self.selected_line: # then we just enabled the sync, but no line is currently # identified, so we'll identify the current selection msg = LineIdentifyMessage(self.selected_line, sender=self) self.hub.broadcast(msg) elif self.identified_line: # then update the selection the the identified line self.selected_line = self.identified_line @observe('selected_line') def _selected_line_changed(self, event): if self.sync_identify: msg = LineIdentifyMessage(event.get('new', self.selected_line), sender=self) self.hub.broadcast(msg) @observe('results_centroid', 'selected_line') def _update_selected_line_redshift(self, event): if self.selected_line and self.results_centroid is not None: # compute redshift that WILL be applied if clicking assign self.selected_line_redshift = self._compute_redshift_for_selected_line()
[docs] def vue_line_assign(self, msg=None): z = self._compute_redshift_for_selected_line() msg = RedshiftMessage('redshift', z, sender=self) self.hub.broadcast(msg)