Source code for synthesizer.emissions.line

"""A module containing functionality for working with emission lines.

Emission lines in Synthesizer are represented by the LineCollection class. This
holds a collection of emission lines, each with a rest frame wavelength, a
luminosity, and a continuum. The LineCollection class also has the ability to
calculate the flux of the lines given a redshift and cosmology, including the
observed wavelength and flux density. It also provides a number of helpful
conversions and derived properties, such as the equivalent widths, photon
energies, and line ratios.

Note that emission lines held by a LineCollection object exist at a specific
wavelength and thus don't account for any broadening effects at this time. To
account for broadening effects, `Seds` including line contribution should be
used (i.e. spectra extracted from the "nebular" type on a `Grid`).

Example usages::

    # Create a LineCollection objects
    lines = LineCollection(
        line_ids=["O III 5007 A", "H 1 1215.67A"],
        lam=[5007, 1215.67] * angstrom,
        lum=[1e40, 1e39] * erg / s,
        cont=[1e39, 1e38] * erg / s / Hz,
    )

    # Get the flux of the lines at redshift z=0.1
    lines.get_flux(cosmo, z=0.1)

    # Get the flux of the lines at redshift z=0.1 with IGM absorption
    lines.get_flux(cosmo, z=0.1, igm=Inoue14)

    # Get a ratio
    lines.get_ratio("R23")

    # Get a diagram
    lines.get_diagram("OHNO")

"""

import matplotlib.pyplot as plt
import numpy as np
from unyt import (
    Hz,
    angstrom,
    c,
    erg,
    eV,
    h,
    pc,
    s,
    unyt_array,
    unyt_quantity,
)

from synthesizer import exceptions
from synthesizer.conversions import lnu_to_llam, standard_to_vacuum
from synthesizer.cosmology import get_luminosity_distance
from synthesizer.emissions import line_ratios
from synthesizer.emissions.scaling import (
    normalise_line_scaling,
    scale_line_arrays,
)
from synthesizer.emissions.sed import Sed
from synthesizer.emissions.utils import (
    alias_to_line_id,
    get_available_diagram_ids,
    get_available_ratio_ids,
    get_line2index,
    get_line_id_signature,
)
from synthesizer.extensions.spectra_operations import (
    apply_separable_attenuation_2d,
    multiply_array_by_vector_1d,
)
from synthesizer.synth_warnings import warn
from synthesizer.units import (
    Quantity,
    accepts,
    get_array_quantity_view,
    get_quantity_unit,
)
from synthesizer.utils import TableFormatter
from synthesizer.utils.operation_timers import timed


[docs] class LineCollection: """A class holding a collection of emission lines. The LineCollection class holds a collection of emission lines, each with a rest frame wavelength, a luminosity, and a continuum. The LineCollection class also has the ability to calculate the flux of the lines given a redshift and cosmology, including the observed wavelength and flux density. It also provides a number of helpful conversions and derived properties, such as the equivalent widths, photon energies, and line ratios. Note that emission lines held by a LineCollection object exist at a specific wavelength and thus don't account for any broadening effects at this time. Attributes: line_ids (list): A list of the line ids. lam (unyt_array): An array of rest frame line wavelengths. luminosity (unyt_array): An array of rest frame line luminosities. continuum (unyt_array): An array of rest frame line continuum. description (str): An optional description of the line collection. obslam (unyt_array): An array of observed line wavelengths. flux (unyt_array): An array of line fluxes. continuum_flux (unyt_array): An array of line continuum fluxes. vacuum_wavelength (unyt_array): An array of vacuum wavelengths. available_ratios (list): A list of the available line ratios based on the lines in the collection. available_diagrams (list): A list of the available line diagrams based on the lines in the collection. nlines (int): The number of lines in the collection. """ # Define quantities, for details see units.py lam = Quantity("wavelength") luminosity = Quantity("luminosity") continuum = Quantity("luminosity_density_frequency") flux = Quantity("flux") continuum_flux = Quantity("flux_density_frequency") obslam = Quantity("wavelength") vacuum_wavelength = Quantity("wavelength") @accepts(lam=angstrom, lum=erg / s, cont=erg / s / Hz) @timed("LineCollection.__init__") def __init__(self, line_ids, lam, lum, cont, description=None): """Initialise the collection of emission lines. Args: line_ids (list): A list or array of line ids. lam (unyt_array): An array of rest frame line wavelengths. lum (unyt_array): An array of rest frame line luminosities. cont (unyt_array): An array of rest frame line continuum. description (str): A optional description of the line collection. """ # Set the description self.description = description # Set the line line IDs (these include the element, the ionisation # state, and the vacuum wavelength). Handle if we have a single line # passed as a string or a list of lines. if isinstance(line_ids, str): line_ids = [line_ids] self.line_ids = np.asarray(line_ids, dtype=str) # How many lines do we have? self.nlines = len(self.line_ids) # Set the line wavelengths self.lam = lam # Set the line luminosities self.luminosity = lum # Set the line continuum self.continuum = cont # Ensure the luminosity and continuum are the same shape if self._luminosity.shape != self._continuum.shape: raise exceptions.InconsistentArguments( "Luminosity and continuum arrays must have the same shape" ) # We'll populate observer frame properties when get_flux is called self.obslam = None self.flux = None self.continuum_flux = None # Atrributes to enable looping self._current_ind = 0 # Delay all constructor-side metadata work until first access self._line2index = None self._available_ratios = None self._available_diagrams = None @property def id(self): """Return the line id. This is an alias to be used when theres a single line id. Returns: line_ids (list): A list of the line ids. """ if self.nlines == 1: return str(self.line_ids[0]) raise exceptions.UnimplementedFunctionality( "id only applicable for a single line. Use line_ids instead." ) @property def lum(self): """Return the luminosity of the lines. This is an alias for the luminosity property. Returns: luminosity (unyt_array): The luminosity of the lines. """ return self.luminosity @property def cont(self): """Return the continuum of the lines. This is an alias for the continuum property. Returns: continuum (unyt_array): The continuum of the lines. """ return self.continuum @property def elements(self): """Return the elements of the lines. We relegate this to a property method to reduce the duplication of memory. This is rarely used in expensive use cases and is inexpensive enough that when it is calculating the elements on the fly is not prohibitive. Returns: elements (list): A list of the elements of the lines. """ return list( lid.strip().split(" ")[0] for lids in self.line_ids for lid in lids.split(",") ) @property def available_ratios(self): """Return ratio ids lazily derived from the available lines. Returns: list: The predefined ratio ids available for this collection. """ if self._available_ratios is None: # Get the hashable signature for the line ids signature = get_line_id_signature(self.line_ids) # Get the available ratio ids for this signature and remember # it for later self._available_ratios = list(get_available_ratio_ids(signature)) return self._available_ratios @available_ratios.setter def available_ratios(self, value): """Allow explicit override of the cached ratio list. Args: value (iterable or None): The ratio ids to cache on the instance, or None to clear the cached value. """ self._available_ratios = list(value) if value is not None else None @property def available_diagrams(self): """Return diagram ids lazily derived from the available lines. Returns: list: The predefined diagram ids available for this collection. """ if self._available_diagrams is None: # Get the hashable signature for the line ids signature = get_line_id_signature(self.line_ids) # Get the available diagram ids for this signature and remember it # for later self._available_diagrams = list( get_available_diagram_ids(signature) ) return self._available_diagrams @available_diagrams.setter def available_diagrams(self, value): """Allow explicit override of the cached diagram list. Args: value (iterable or None): The diagram ids to cache on the instance, or None to clear the cached value. """ self._available_diagrams = list(value) if value is not None else None @property def line2index(self): """Return the line-id to index mapping lazily. Returns: Mapping: A mapping from each stored line id to its array index. """ if self._line2index is None: # Get the hashable signature for the line ids signature = get_line_id_signature(self.line_ids) # Get the line2index mapping for this signature and remember it # for later self._line2index = get_line2index(signature) return self._line2index @property def vacuum_wavelengths(self): """Return the vacuum wavelengths of the lines. We relegate this to a property method to reduce the duplication of memory. This is rarely used in expensive use cases and is inexpensive enough that when it is calculating the vacuum wavelengths on the fly is not prohibitive. Returns: vacuum_wavelengths (unyt_array): The vacuum wavelengths of the lines. """ return standard_to_vacuum(self.lam) @property def _individual_line_ids(self): """Return the individual line ids. We relegate this to a property method to reduce the duplication of memory. This is rarely used in expensive use cases and is inexpensive enough that when it is calculating the individual line ids on the fly is not prohibitive. Returns: individual_line_ids (list): A list of the individual line ids. """ return np.array( [lid for lids in self.line_ids for lid in lids.split(",")] ) @property def nu(self): """Return the frequency of the line. Returns: nu (unyt_quantity): The frequency of the line in Hz. """ return (c / self.lam).to(Hz) @property def obsnu(self): """Return the observed frequency of the line. Returns: obsnu (unyt_quantity): The observed frequency of the line in Hz. """ if self.obslam is None: return None return (c / self.obslam).to(Hz) @property def energy(self): """Get the wavelengths in terms of photon energies in eV. Returns: energy (unyt_array): The energy coordinate. """ return (h * c / self.lam).to(eV) @property def continuum_llam(self): """Return the continuum in units of Llam (erg/s/AA). Returns: continuum_llam (unyt_quantity): The continuum in units of Llam (erg/s/AA). """ return lnu_to_llam(self.lam, self.continuum) @property def equivalent_width(self): """Return the equivalent width. Returns: equivalent_width (unyt_quantity): The equivalent width of the line. """ return self.luminosity / self.continuum_llam @property def ndim(self): """Get the dimensions of the spectra array. Returns: Tuple The shape of self.lnu """ return np.ndim(self.lum) @property def shape(self): """Get the shape of the spectra array. Returns: Tuple The shape of self.lnu """ return self._luminosity.shape @property def nlam(self): """Get the number of wavelength bins. Returns: int The number of wavelength bins. """ return len(self.line_ids) def __str__(self): """Return a string representation of the SED object. Returns: table (str): A string representation of the SED object. """ # Intialise the table formatter formatter = TableFormatter(self) return formatter.get_table("LineCollection") def __iter__(self): """Allow iteration over the LineCollection object. Overload iteration to allow simple looping over Line objects, combined with __next__ this enables for l in LineCollection syntax Returns: self (LineCollection) The LineCollection object. """ return self def __next__(self): """Return the next line ID in the LineCollection object. Overload iteration to allow simple looping over Line objects, combined with __iter__ this enables for l in LineCollection syntax Returns: line_id (str): The line ID. """ # Check we haven't finished if self._current_ind >= self.nlines: self._current_ind = 0 raise StopIteration else: # Increment index self._current_ind += 1 # Return the filter return self[self.line_ids[self._current_ind - 1]] def __len__(self): """Return the number of lines in the collection.""" return self.nlines def __mul__(self, scaling): """Scale the line by a given factor. Overloads * operator to allow direct scaling of Line objects. Returns: (Line) New instance of Line containing the scaled line. """ return self.scale(scaling) def __rmul__(self, scaling): """Scale the line by a given factor. Overloads * operator to allow direct scaling of Line objects. Returns: (Line) New instance of Line containing the scaled line. """ return self.scale(scaling) def __add__(self, other): """Add two Line objects together. Overloads + operator to allow direct addition of Line objects. Note, this will only act on the rest frame quantities. To get the flux of the summed lines get_flux must be called on the new Line object. Returns: (Line) New instance of Line containing the summed lines. """ # Check if the other object is a LineCollection if not isinstance(other, LineCollection): raise exceptions.InconsistentArguments( "Addition only supported between LineCollection objects" ) # Check the LineCollections are compatible if np.any(self.line_ids != other.line_ids): raise exceptions.InconsistentArguments( "LineCollections must contain the same lines to be added" ) new_line = LineCollection( line_ids=self.line_ids, lam=self.lam, lum=self.luminosity + other.luminosity, cont=self.continuum + other.continuum, ) # Preserve observed-frame quantities when both inputs already have # them populated, which is required when chunked pipeline outputs are # recombined after get_flux has been called. if (self.flux is not None) and (other.flux is not None): new_line.flux = self.flux + other.flux if ( self.continuum_flux is not None and other.continuum_flux is not None ): new_line.continuum_flux = ( self.continuum_flux + other.continuum_flux ) if (self.obslam is not None) and (other.obslam is not None): new_line.obslam = self.obslam return new_line def __getitem__(self, key): """Return a subset of lines from the collection. This method enables the extraction of various stored and derived line properties. The key can be one of the following: 1. A single line id (e.g. lines["O III 5007 A"]), to return a single. 2. A list of line ids (e.g. lines[["O III 5007 A", "H 1 1215.67A"]]), to return a subset of lines. 3. A string containing a comma separated list of line ids (e.g. lines["O III 5007 A, H 1 1215.67A"]), to return a composite line (e.g. a doublet, triplet, etc) where each line is summed. 4. A string defining a ratio_id from the line_ratios module (e.g. lines["R23"]), to return the line ratio (see get_ratio). 5. A string defining a diagram_id from the line_ratios module (e.g. lines["OHNO"]), to return the line diagram (see get_diagram). Args: key (str/list): The id/s of the line/ratio/diagram. If a string contains a comma separated list of line_ids a composite line will be returned containing the sum of the luminosities and the mean of the wavelengths. If a list of line_ids is provided a subset of lines will be returned. If a single line_id is passed a single line will be returned. If a ratio_id or diagram_id is passed the result of the ratio or diagram will be returned. Returns: line (Line) The line object. """ # Is the key a ratio_id? if isinstance(key, str) and key in self.available_ratios: return self.get_ratio(key) # Is the key a diagram_id? elif isinstance(key, str) and key in self.available_diagrams: return self.get_diagram(key) # Otherwise, we have a line_id line_id = key # Do we have a list of lines? if isinstance(line_id, (list, tuple, np.ndarray)): # Define lists to hold the indices we'll need to extract line_ids = [] lam = [] lum = [] cont = [] flux = [] obslam = [] cont_flux = [] # Loop over each line_id and extract the relevant properties for li in line_id: single_line = self[li] line_ids.extend(single_line.line_ids) lam.extend(single_line.lam) lum.extend(single_line.luminosity) cont.extend(single_line.continuum) if single_line.flux is not None: flux.extend(single_line.flux) obslam.extend(single_line.obslam) cont_flux.extend(single_line.continuum_flux) # Convert to arrays line_ids = np.array(line_ids) lam = np.array(lam) lum = np.array(lum).T cont = np.array(cont).T flux = np.array(flux).T obslam = np.array(obslam).T cont_flux = np.array(cont_flux).T # Create the new line (converting to unyt_arrays) new_line = LineCollection( line_ids=line_ids, lam=unyt_array(lam, self.lam.units), lum=unyt_array(lum, self.luminosity.units), cont=unyt_array(cont, self.continuum.units), ) # Copy over the flux and observed wavelength if they exist if len(flux) > 0: new_line.flux = unyt_array(flux, self.flux.units) new_line.obslam = unyt_array(obslam, self.obslam.units) new_line.continuum_flux = unyt_array( cont_flux, self.continuum_flux.units ) return new_line # Do we have a single line? elif ( isinstance(line_id, str) and alias_to_line_id(line_id) in self.line_ids ): # Are we dealing with an alias? line_id = alias_to_line_id(line_id) # Return the single line new_line = LineCollection( line_ids=[line_id], lam=unyt_array( [self.lam[self.line2index[line_id]]], self.lam.units, ), lum=unyt_array( [self.luminosity[..., self.line2index[line_id]]], self.luminosity.units, ), cont=unyt_array( [self.continuum[..., self.line2index[line_id]]], self.continuum.units, ), ) # Copy over the flux and observed wavelength if they exist if self.flux is not None: new_line.flux = unyt_array( [self.flux[..., self.line2index[line_id]]], self.flux.units, ) new_line.continuum_flux = unyt_array( [self.continuum_flux[..., self.line2index[line_id]]], self.continuum_flux.units, ) new_line.obslam = unyt_array( [self.obslam[self.line2index[line_id]]], self.obslam.units, ) return new_line # Do we have a blended line? elif isinstance(line_id, str) and "," in line_id: # Split the line id into a list of individual lines, handling any # aliases line_ids = [ alias_to_line_id(li.strip()) for li in line_id.split(",") ] # Loop over the lines and combine them into a single line new_lam = self.lam[self.line2index[line_ids[0]]] new_lum = self.luminosity[..., self.line2index[line_ids[0]]] new_cont = self.continuum[..., self.line2index[line_ids[0]]] new_flux = ( self.flux[..., self.line2index[line_ids[0]]] if self.flux is not None else None ) new_obs_cont = ( self.continuum_flux[..., self.line2index[line_ids[0]]] if self.continuum_flux is not None else None ) new_obslam = ( self.obslam[self.line2index[line_ids[0]]] if self.obslam is not None else None ) for li in line_ids[1:]: new_lam += self.lam[self.line2index[li]] new_lum += self.luminosity[..., self.line2index[li]] new_cont += self.continuum[..., self.line2index[li]] if self.flux is not None: new_flux += self.flux[..., self.line2index[li]] new_obs_cont += self.continuum_flux[ ..., self.line2index[li] ] new_obslam += self.obslam[self.line2index[li]] # Create the new line (converting the wavelength to the mean of the # individual lines) new_line = LineCollection( line_ids=[line_id], lam=unyt_array([new_lam / len(line_ids)], self.lam.units), lum=unyt_array([new_lum], self.luminosity.units), cont=unyt_array([new_cont], self.continuum.units), ) # Copy over the flux and observed wavelength if they exist # (converting the wavelength to the mean of the individual lines) if self.flux is not None: new_line.flux = unyt_array([new_flux], self.flux.units) new_line.continuum_flux = unyt_array( [new_obs_cont], self.continuum_flux.units ) new_line.obslam = unyt_array( [new_obslam / len(line_ids)], self.obslam.units ) return new_line # Otherwise, raise an exception, we have two options here, either the # line_id is not recognised or the type is not recognised if isinstance(line_id, str): raise exceptions.UnrecognisedOption( f"Unrecognised line_id (line_id={line_id})" ) raise exceptions.UnrecognisedOption( "Unrecognised line_id type. Please provide a string, list, or " f"comma separated string (type={type(line_id)} line_id={line_id})" )
[docs] def sum(self, axis=None): """Sum the lines in the collection. Args: axis (int/tuple): The axis/axes to sum over. By default this will sum over all but the final axis (the axis containing different lines). """ # First lets check if we have a multidimensional line collection, if # not we can just return the single line object if self.ndim == 1 and self.nlines == 1: return self # Having a single line is a special case where we still need to sum # but ndim > 1 if self.ndim == 1: if axis is not None and axis != 0: raise exceptions.InconsistentArguments( f"Axis {axis} not compatible with LineCollection of ndim=1" ) return LineCollection( line_ids=self.line_ids, lam=self.lam, lum=np.nansum(self.luminosity, axis=axis), cont=np.nansum(self.continuum, axis=axis), ) # If no axes are passed we will sum over all but the last axis if axis is None: axis = tuple(range(self.ndim - 1)) return LineCollection( line_ids=self.line_ids, lam=self.lam, lum=np.nansum(self.luminosity, axis=axis), cont=np.nansum(self.continuum, axis=axis), )
[docs] def concat(self, *other_lines): """Concatenate LineCollections together. This will combine the array along the first axis. For example, if we have two LineCollections with LineCollection.shape = (10, 100) and (100, 100) the resulting LineCollection will have shape (110, 100). This cannot combine LineCollections with different line_ids. Args: other_lines (LineCollection): The other LineCollections to concatenate. Returns: LineCollection The concatenated LineCollection. """ # Check the LineCollections are compatible for other_line in other_lines: if np.any(self.line_ids != other_line.line_ids): raise exceptions.InconsistentArguments( "LineCollections must contain the same lines to be added" ) # Concatenate the LineCollections return LineCollection( line_ids=self.line_ids, lam=self.lam, lum=np.vstack( [self.luminosity, *[ol.luminosity for ol in other_lines]], ), cont=np.vstack( [self.continuum, *[ol.continuum for ol in other_lines]], ), )
[docs] def extend(self, *other_lines): """Extend the LineCollection with additional lines. This will add the lines to the end of the LineCollection. The combined LineCollections must not contain any duplicate lines. Args: other_lines (LineCollection): The other LineCollections to extend with. Returns: LineCollection The extended LineCollection. """ # Check the LineCollections are compatible lst_ids = [*self.line_ids] set_ids = set(self.line_ids) for other_line in other_lines: lst_ids.extend(other_line.line_ids) set_ids.update(other_line.line_ids) # Check for duplicates if len(lst_ids) != len(set_ids): raise exceptions.InconsistentArguments( "LineCollections must contain different lines to be combined" ) # Extend the LineCollection new_line_ids = np.concatenate( [self.line_ids, *[ol.line_ids for ol in other_lines]] ) new_lam = np.concatenate( [self.lam, *[ol.lam for ol in other_lines]], axis=0 ) new_lum = np.concatenate( [self.luminosity, *[ol.luminosity for ol in other_lines]], axis=-1, ) new_cont = np.concatenate( [self.continuum, *[ol.continuum for ol in other_lines]], axis=-1, ) return LineCollection( line_ids=new_line_ids, lam=new_lam, lum=new_lum, cont=new_cont, )
def _which_ratios(self): """Determine the available line ratios for this LineCollection.""" signature = get_line_id_signature(self.line_ids) self.available_ratios = get_available_ratio_ids(signature) return self.available_ratios def _which_diagrams(self): """Determine the available line diagrams for this LineCollection.""" signature = get_line_id_signature(self.line_ids) self.available_diagrams = get_available_diagram_ids(signature) return self.available_diagrams
[docs] def get_flux0(self): """Calculate the rest frame line flux. Uses a standard distance of 10pc to calculate the flux. This will also populate the observed_wavelength attribute with the wavelength of the line when observed (which in the rest frame is the same as the emitted wavelength). Returns: flux (unyt_quantity): Flux of the line in units of erg/s/cm2 by default. """ # Compute flux self.flux = self.luminosity / (4 * np.pi * (10 * pc) ** 2) self.continuum_flux = self.continuum / (4 * np.pi * (10 * pc) ** 2) # Set the observed wavelength (in this case this is the rest frame # wavelength) self.obslam = self.lam return self.flux
[docs] @timed("LineCollection.get_flux") def get_flux(self, cosmo, z, igm=None): """Calculate the line flux given a redshift and cosmology. This will also populate the observed_wavelength attribute with the wavelength of the line when observed. NOTE: if a redshift of 0 is passed the flux return will be calculated assuming a distance of 10 pc omitting IGM since at this distance IGM contribution makes no sense. Args: cosmo (astropy.cosmology.Cosmology): Astropy cosmology object. z (float): The redshift. igm (igm): The IGM class. e.g. `synthesizer.igm.Inoue14`. Defaults to None. Returns: flux (unyt_quantity): Flux of the line in units of erg/s/cm2 by default. """ # If the redshift is 0 we can assume a distance of 10pc and ignore # the IGM if z == 0: return self.get_flux0() # Get the luminosity distance luminosity_distance = get_luminosity_distance(cosmo, z).to("cm") # Compute flux and observed continuum self.flux = self.luminosity / (4 * np.pi * luminosity_distance**2) self.continuum_flux = self.continuum / ( 4 * np.pi * luminosity_distance**2 ) # Set the observed wavelength self.obslam = self.lam * (1 + z) # If we are applying an IGM model apply it if igm is not None: # Support both class references and instantiated objects if callable(igm): igm_transmission = igm().get_transmission(z, self.obslam) else: igm_transmission = igm.get_transmission(z, self.obslam) self.flux *= igm_transmission self.continuum_flux *= igm_transmission return self.flux
def _get_ratio(self, numer_line, denom_line): """Measure (and return) a line ratio. Args: numer_line (str/list): The line or lines in the numerator. denom_line (str/list): The line or lines in the denominator. Returns: float a line ratio """ # Handle the different cases for the numerator if isinstance(numer_line, str) and "," in numer_line: # Split the line id into a list of individual lines numer_line = [lid.strip() for lid in numer_line.split(",")] elif isinstance(numer_line, str): numer_line = [ numer_line.strip(), ] elif isinstance(numer_line, list): numer_line = [lid.strip() for lid in numer_line] else: raise exceptions.UnrecognisedOption( f"Unrecognised line_id type (type={type(numer_line)}). " "Please provide a string or list of strings." ) # And the same for the denominator if isinstance(denom_line, str) and "," in denom_line: # Split the line id into a list of individual lines denom_line = [lid.strip() for lid in denom_line.split(",")] elif isinstance(denom_line, str): denom_line = [ denom_line.strip(), ] elif isinstance(denom_line, list): denom_line = [lid.strip() for lid in denom_line] else: raise exceptions.UnrecognisedOption( f"Unrecognised line_id type (type={type(denom_line)}). " "Please provide a string or list of strings." ) # Ensure we have the lines required for the ratio if not set(numer_line).issubset(self.line_ids): raise exceptions.UnrecognisedOption( "LineCollection is missing the lines required for " f"this ratio (missing={set(numer_line) - set(self.line_ids)})" ) if not set(denom_line).issubset(self.line_ids): raise exceptions.UnrecognisedOption( "LineCollection is missing the lines required for " f"this ratio (missing={set(denom_line) - set(self.line_ids)})" ) # Sum the luminosities of the numerator and denominator numer_lum = np.sum( [self[lid].luminosity for lid in numer_line], axis=0, ) denom_lum = np.sum( [self[lid].luminosity for lid in denom_line], axis=0, ) # Calculate the ratio ratio = numer_lum / denom_lum # Return a single float if we have a scalar result if ratio.ndim == 0: return float(ratio) elif ratio.size == 1: return float(ratio.item()) # Otherwise return the ratio array return ratio
[docs] def get_ratio(self, ratio_id): """Measure (and return) a line ratio. Args: ratio_id (str, list): Either a ratio_id where the ratio lines are defined in line_ratios or a list of lines. Returns: float a line ratio """ # If ratio_id is a string we interpret it as a ratio_id for the ratios # defined in the line_ratios module if isinstance(ratio_id, str): # Check if ratio_id exists if ratio_id not in line_ratios.available_ratios: raise exceptions.UnrecognisedOption( f"ratio_id not recognised ({ratio_id})" ) # Check we have the lines required for this ratio elif ratio_id not in self.available_ratios: raise exceptions.UnrecognisedOption( "LineCollection is missing the lines required for " f"this ratio ({ratio_id})" ) line1, line2 = line_ratios.ratios[ratio_id] # Otherwise interpret it as a list, it must only contain two elements # for a ratio though, each element can be a list of lines or a single # string elif isinstance(ratio_id, list): # Ensure the list only contains two lines if len(ratio_id) != 2: raise exceptions.InconsistentArguments( "A ratio must contain two elements, the first defining" " the numerator and the second the denominator. Either " " can be a single string or a list of strings defining " " the lines." ) line1, line2 = ratio_id return self._get_ratio(line1, line2)
[docs] def get_diagram(self, diagram_id): """Return a pair of line ratios for a given diagram_id (E.g. BPT). Args: diagram_id (str, list): Either a diagram_id where the pairs of ratio lines are defined in line_ratios or a list of lists defining the ratios. Returns: tuple (float): a pair of line ratios """ # If ratio_id is a string interpret as a ratio_id for the ratios # defined in the line_ratios module... if isinstance(diagram_id, str): # check if ratio_id exists if diagram_id not in line_ratios.available_diagrams: raise exceptions.UnrecognisedOption( f"diagram_id not recognised ({diagram_id})" ) # check if ratio_id exists elif diagram_id not in self.available_diagrams: raise exceptions.UnrecognisedOption( "LineCollection is missing the lines required for " f"this diagram ({diagram_id})" ) ab, cd = line_ratios.diagrams[diagram_id] # Otherwise interpret as a list elif isinstance(diagram_id, list): ab, cd = diagram_id return self._get_ratio(*ab), self._get_ratio(*cd)
[docs] @timed("LineCollection.scale") def scale( self, scaling, inplace=False, mask=None, lam_mask=None, nthreads=1, ): """Scale the lines by a given factor. Note: this will only scale the rest frame continuum and luminosity. To get the scaled flux get_flux must be called on the new LineCollection object. Args: scaling (float, np.ndarray, or unyt quantity): The factor by which to scale the line. inplace (bool): If True the LineCollection will be scaled in place, otherwise a new LineCollection will be returned. mask (array-like, bool): A mask array with an entry for each line. Masked out spectra will not be scaled. Only applicable for multidimensional lines. lam_mask (array-like, bool): A mask array with an entry for each wavelength bin. Masked out wavelengths will not be scaled. Only applicable for multidimensional lines. nthreads (int): The number of threads to use for compatible 2D scaling paths. Returns: LineCollection A new LineCollection object containing the scaled lines, unless inplace is True in which case the LineCollection object will be scaled in place. """ # Get the units without making a copy lum_units = get_quantity_unit(self, "luminosity") cont_units = get_quantity_unit(self, "continuum") # Let the shared helper decide whether the incoming scaling belongs to # luminosity units, continuum units, or is already unitless. scaling_lum, scaling_cont = normalise_line_scaling( scaling, lambda: ( c / get_array_quantity_view( self._lam, get_quantity_unit(self, "lam"), ) ).to(Hz), lum_units, cont_units, ) # In-place scaling can hand the existing buffers down as outputs. out_lum = self._luminosity if inplace else None out_cont = self._continuum if inplace else None # The shared helper decides whether this should go through the fused # lum+cont kernel or two independent array scalings. lum, cont = scale_line_arrays( self._luminosity, self._continuum, scaling_lum, scaling_cont, mask=mask, lam_mask=lam_mask, nthreads=nthreads, out_lum=out_lum, out_cont=out_cont, ) # For in-place scaling we have already updated the existing buffers # and can just return self if inplace: return self # Otherwise we need to build a new LineCollection return LineCollection( line_ids=self.line_ids, lam=self.lam, lum=get_array_quantity_view(lum, lum_units), cont=get_array_quantity_view(cont, cont_units), )
[docs] @timed("LineCollection.apply_attenuation") def apply_attenuation( self, tau_v=None, dust_curve=None, mask=None, nthreads=1, **dust_curve_kwargs, ): """Apply attenuation to this LineCollection. Args: tau_v (float/np.ndarray of float): The V-band optical depth for every star particle. Optional for attenuation laws that do not require it. dust_curve (synthesizer.emission_models.attenuation.*): An instance of one of the dust attenuation models. (defined in synthesizer/emission_models.attenuation.py) mask (array-like, bool): A mask array with an entry for each line. Masked out spectra will be ignored when applying the attenuation. Only applicable for multidimensional lines. nthreads (int): The number of threads to use for compatible array kernels. dust_curve_kwargs (dict): A dictionary of extra parameters set at runtime on the attenuation model. Returns: LineCollection A new LineCollection object containing the attenuated lines. """ # Avoid cyclic imports. from synthesizer.emission_models.transformers.dust_attenuation import ( AttenuationLaw, ) # Ensure we have a dust curve to apply if dust_curve is None: raise exceptions.MissingArgument("dust_curve must be provided") # Ensure we have tau_v if the dust curve requires it if tau_v is None and "tau_v" in getattr( dust_curve, "_required_params", () ): raise exceptions.MissingArgument( "tau_v is required by the selected attenuation law: " f"{dust_curve.__class__.__name__}" ) # Ensure the mask is compatible with the spectra if mask is not None: if self._luminosity.ndim < 1: raise exceptions.InconsistentArguments( "Masks are only applicable for Lines containing " "multiple elements" ) if self._luminosity.shape[0] != mask.size: raise exceptions.InconsistentArguments( "Mask and lines are incompatible shapes " f"({mask.shape}, {self.lum.shape})" ) # If tau_v is an array it needs to match the spectra shape, note # that we need a special case here because unyt_quantity resolves # to true in isinstance checks despite being a scalar quantity if isinstance(tau_v, np.ndarray) and not isinstance( tau_v, unyt_quantity ): if self._luminosity.ndim < 1: raise exceptions.InconsistentArguments( "Arrays of tau_v values are only applicable for Lines" " containing multiple elements" ) if self._luminosity.shape[0] != tau_v.size: raise exceptions.InconsistentArguments( "tau_v and lines are incompatible shapes " f"({tau_v.shape}, {self.lum.shape})" ) # Get the units without making a copy lum_units = get_quantity_unit(self, "luminosity") cont_units = get_quantity_unit(self, "continuum") # For the standard AttenuationLaw implementation with per-row tau_v we # can avoid materialising a full 2D transmission array and instead use # the separable attenuation kernel directly. # As in Sed.apply_attenuation, the separable AttenuationLaw case lets # us keep tau_v and the wavelength curve split until the inner loop. if ( self._luminosity.ndim == 2 and isinstance(tau_v, np.ndarray) and tau_v.ndim == 1 and tau_v.shape[0] == self._luminosity.shape[0] and type(dust_curve).get_transmission is AttenuationLaw.get_transmission ): # Pull out just the wavelength-dependent extinction curve once and # reuse it for both luminosity and continuum. tau_x_v = dust_curve.get_extinction_curve( self.lam, **dust_curve_kwargs, ) # Both arrays see the same attenuation structure, so we run the # same kernel twice rather than building two transmission matrices. att_lum = apply_separable_attenuation_2d( self._luminosity, tau_v, tau_x_v, mask, nthreads, ) att_cont = apply_separable_attenuation_2d( self._continuum, tau_v, tau_x_v, mask, nthreads, ) return LineCollection( line_ids=self.line_ids, lam=self.lam, lum=get_array_quantity_view(att_lum, lum_units), cont=get_array_quantity_view(att_cont, cont_units), ) # Compute the transmission for the remaining generic cases. transmission = dust_curve.get_transmission( tau_v, self.lam, **dust_curve_kwargs ) # When attenuation reduces to a wavelength-only transmission curve we # can apply it with the dedicated last-axis scaling kernel. Here we # have a row mask so we need to copy the arrays and apply the # transmission only to the masked rows if ( self._luminosity.ndim == 2 and isinstance(transmission, np.ndarray) and transmission.ndim == 1 and mask is not None ): # We only want to touch the masked rows here, so start from copies # and then patch the attenuated rows back in. att_lum = np.array(self._luminosity, copy=True) att_cont = np.array(self._continuum, copy=True) att_lum[mask] = multiply_array_by_vector_1d( self._luminosity[mask], transmission, nthreads, ) att_cont[mask] = multiply_array_by_vector_1d( self._continuum[mask], transmission, nthreads, ) return LineCollection( line_ids=self.line_ids, lam=self.lam, lum=get_array_quantity_view(att_lum, lum_units), cont=get_array_quantity_view(att_cont, cont_units), ) # If the transmission is a wavelength-only curve and we don't have a # row mask we can apply it directly using the dedicated 1D scaling # kernel without a copy if isinstance(transmission, np.ndarray) and transmission.ndim == 1: # With no row mask, both arrays can go straight through the # wavelength-vector kernel. att_lum = multiply_array_by_vector_1d( self._luminosity, transmission, nthreads, ) att_cont = multiply_array_by_vector_1d( self._continuum, transmission, nthreads, ) return LineCollection( line_ids=self.line_ids, lam=self.lam, lum=get_array_quantity_view(att_lum, lum_units), cont=get_array_quantity_view(att_cont, cont_units), ) # If neither fast path applies we fall back to NumPy broadcasting, # copying the arrays and applying the transmission with or without # a mask att_lum = self.luminosity att_cont = self.continuum if mask is None: att_lum *= transmission att_cont *= transmission else: # Full transmission matrices already line up with the row mask, so # we can attenuate only the selected rows in both arrays. att_lum[mask] *= transmission[mask] att_cont[mask] *= transmission[mask] return LineCollection( line_ids=self.line_ids, lam=self.lam, lum=att_lum, cont=att_cont, )
[docs] def plot_lines( self, subset=None, figsize=(8, 6), show=False, xlimits=(), ylimits=(), ): """Plot the lines in the LineCollection. Args: subset (str/list): The line_id or list of line_ids to plot. If None, all lines will be plotted. figsize (tuple): The size of the figure (width, height) in inches. Defaults to (8, 6). show (bool): Whether to show the plot. xlimits (tuple): The x-axis limits. Must be a length 2 tuple. Defaults to (), in which case the default limits are used. ylimits (tuple): The y-axis limits. Must be a length 2 tuple. Defaults to (), in which case the default limits are used. Returns: fig (matplotlib.figure.Figure) The figure object. ax (matplotlib.axes.Axes) The axis object. """ # Are we doing all lines? if subset is None: subset = self.line_ids plot_lines = self else: plot_lines = self[subset] # Collect luminosities and wavelengths line_ids = plot_lines.line_ids luminosities = plot_lines.luminosity wavelengths = plot_lines.lam.to("angstrom").value # Remove 0s and nans mask = np.logical_and(luminosities > 0, ~np.isnan(luminosities)) luminosities = luminosities[mask] wavelengths = wavelengths[mask] # Warn the user if we removed anything if np.sum(~mask) > 0: warn( f"Removed {np.sum(~mask)} lines with zero or NaN luminosities" ) # Set up the plot fig, ax = plt.subplots(figsize=figsize) ax.semilogy() # Plot vertical lines ax.vlines( x=wavelengths, ymin=min(luminosities) / 10, ymax=luminosities, color="C0", ) # If we haven't been given a lower lim, set it to the minimum if len(xlimits) == 0: xlimits = ( np.min(wavelengths), np.max(wavelengths), ) if len(ylimits) == 0: ylimits = ( min(luminosities) / 10 / (erg / s), max(luminosities) * 10 / (erg / s), ) elif ylimits[0] is None: ylimits = list(ylimits) ylimits[0] = min(luminosities) / 10 # Optionally label each line at the tip # (assuming self.line_ids is in the same order) for x, y, label in zip(wavelengths, luminosities, line_ids): # On a log scale, you might want a small offset (e.g., y*1.05) ax.text(x, y, label, rotation=45, ha="left", va="bottom") # Set the x-axis to be in Angstroms ax.set_xlabel(r"$ \lambda / \AA$") ax.set_ylabel("$L / $erg s$^{-1}$") # Apply limits if requested if len(xlimits) > 0: ax.set_xlim(xlimits) if len(ylimits) > 0: ax.set_ylim(ylimits) # Show the plot if requested if show: plt.show() return fig, ax
[docs] @accepts(wavelength_bins=angstrom) def get_blended_lines(self, wavelength_bins): """Blend lines based on a set of wavelength bins. We use a set of wavelength bins to enable the user to control exactly which lines are blended together. This also enables an array to be used emulating an instrument's resolution. A simple resolution would lead to ambiguity in situations where A and B are blended, and B and C are blended, but A and C are not. Args: wavelength_bins (unyt_array): The wavelength bin edges into which the lines will be blended. Any lines outside the range of the bins will be ignored. Returns: LineCollection A new LineCollection object containing the blended lines. """ # Ensure the bins are sorted and actually have a length wavelength_bins = np.sort(wavelength_bins) if len(wavelength_bins) < 2: raise exceptions.InconsistentArguments( "Wavelength bins must have a length of at least 2" ) # Sort wavelengths into the bins getting the indices in each bin bin_inds = np.digitize(self.lam, wavelength_bins) # Define the blended lines new shape new_shape = list(self.shape) new_shape[-1] = len(wavelength_bins) - 1 # Create the arrays we'll need to store the blended lines blended_lines_counts = np.zeros(len(wavelength_bins) - 1, dtype=int) blended_line_ids = [[] for _ in range(len(wavelength_bins) - 1)] blended_line_lams = np.zeros(len(wavelength_bins) - 1, dtype=float) blended_line_lums = np.zeros(new_shape, dtype=float) blended_line_conts = np.zeros(new_shape, dtype=float) # Loop bin indices and combine the lines into the blended_lines array for i, bin_ind in enumerate(bin_inds): # If the bin index is 0 or the length of the bins then it lay # outside the range of the bins if bin_ind == 0 or bin_ind == len(wavelength_bins): continue # Ok, now we can handle the off by 1 error that digitize gives us bin_ind -= 1 # Added this lines contribution to the arrays blended_lines_counts[bin_ind] += 1 blended_line_ids[bin_ind].append(self.line_ids[i]) blended_line_lams[bin_ind] += self._lam[i] blended_line_lums[..., bin_ind] += self._luminosity[..., i] blended_line_conts[..., bin_ind] += self._continuum[..., i] # Now we need to clean up and concatenate the ids of the blended lines # into comma separated strings for i in range(len(wavelength_bins) - 1): if blended_lines_counts[i] == 0: blended_line_ids[i] = None continue # Combine the line ids into a single string blended_line_ids[i] = ", ".join(blended_line_ids[i]) # Remove any bins with no lines mask = blended_lines_counts > 0 blended_lines_counts = blended_lines_counts[mask] blended_line_ids = np.array(blended_line_ids)[mask] blended_line_lams = blended_line_lams[mask] blended_line_lums = blended_line_lums[..., mask] blended_line_conts = blended_line_conts[..., mask] # Convert the wavelengths into the means blended_line_lams /= blended_lines_counts return LineCollection( line_ids=blended_line_ids, lam=blended_line_lams * self.lam.units, lum=blended_line_lums * self.luminosity.units, cont=blended_line_conts * self.continuum.units, )
[docs] @accepts(sed_lam=angstrom) def create_sed(self, sed_lam): """Create a synthesizer.sed.Sed object from the LineCollection. Arguments: sed_lam (unyt_array): Wavelength grid. Returns: sed (synthesizer.sed.Sed): synthesizer.sed.Sed object. """ # Create empty spectra with correct units sed_lnu = np.zeros(len(sed_lam)) * erg / s / Hz # Loop over the vacuum wavelengths and luminosities in the collection # and add them to the spectra for lam, lum in zip(self.vacuum_wavelengths, self.lum): # Identify the element to place the line's luminosity lam_index = (np.abs(sed_lam - lam)).argmin() # Skip lines outside the wavelength range (at boundaries) if lam_index == 0 or lam_index == len(sed_lam) - 1: continue # The wavelength resolution at this wavelength delta_lambda = 0.5 * ( sed_lam[lam_index + 1] - sed_lam[lam_index - 1] ) # Frequency of the line nu = c / lam # Calculate the luminosity of the line in units of lnu lnu_ = lam * (lum / nu) / delta_lambda # Add into the spectrum sed_lnu[lam_index] += lnu_ # Create and return new synthesizer.sed.Sed object return Sed(lam=sed_lam, lnu=sed_lnu)