Source code for synthesizer.pipeline.pipeline

"""A module containing a pipeline helper class.

This module contains the `Pipeline` class, which is used to run observable
generation pipelines on a set of galaxies. To use this functionality the user
needs to define the properties of the Pipeline and a function to load the
galaxies. The user can then call the various methods to generate the mock
data they need, simplifying a complex pipeline full of boilerplate code to a
handful of definitions and calls to the Pipeline object.

Example usage:
```python
    from synthesizer import Pipeline

    pipeline = Pipeline(
        gal_loader_func=load_galaxy,
        emission_model=emission_model,
        instruments=[instrument1, instrument2],
        n_galaxies=1000,
        nthreads=4,
        comm=None,
        verbose=1,
        )

    pipeline.load_galaxies()
    pipeline.get_spectra()
    pipeline.get_photometry_luminosities()
    pipeline.write("output.hdf5")
```
"""

import time

import numpy as np
from unyt import unyt_array

from synthesizer import check_openmp, exceptions
from synthesizer.instruments import InstrumentCollection
from synthesizer.particle import Galaxy as ParticleGalaxy
from synthesizer.pipeline.pipeline_io import PipelineIO
from synthesizer.pipeline.pipeline_utils import (
    NO_MODEL_LABEL,
    OperationKwargsHandler,
    accumulate_pipeline_results_from_child,
    clear_pipeline_outputs,
    combine_list_of_dicts,
    count_and_check_dict_recursive,
    get_full_memory,
    validate_noise_unit_compatibility,
)
from synthesizer.synth_warnings import warn
from synthesizer.utils.art import Art


[docs] class Pipeline: """A class for running observable generation pipelines on galaxies. To use this class the user must instantiate it with a galaxy loading function, an emission model defining the different emissions that will be included in the pipeline, any instruments that will be used to make observations, and the number of galaxies that will be loaded. Optionally the user can also specify the number of threads to use if Synthesizer has been installed with OpenMP support, and an MPI communicator if they are running over MPI. Finally the verbosity level can be set to control the amount of output. Once the Pipeline object has been instantiated the user can call the various methods to generate the data they need. For spectra: - get_spectra (passing a cosmology object if redshifted spectra are required) - get_lnu_data_cubes (resolved spectral data cubes) - get_fnu_data_cubes (resolved spectral data cubes) For photometry: - get_photometry_luminosities - get_photometry_fluxes For emission lines: - get_lines (passing a list of line IDs to generate) For images (with optional PSF and noise based on the instrument): - get_images_luminosity - get_images_flux For the SFZH grid: - get_sfzh (passing a Grid object) The user can also add their own analysis functions to the pipeline which will be run on each galaxy once all data has been generated. These functions should take a galaxy object as the first argument and can take any number of additional arguments and keyword arguments. The results of these functions should be attached to the galaxy object, either as base level attributes or dictionaries containing the computed values. These attributes should be unique to the function to avoid overwriting existing attributes (they should be named what is passed to the result_attribute argument, see add_analysis_func for more details). Finally the user can write out the data generated by the pipeline using the write method. This will write out the data to an HDF5 file. Attributes: emission_model (EmissionModel): The emission model to use for the pipeline. n_galaxies (int): How many galaxies will we load in total (i.e. not per rank if using MPI)? nthreads (int): The number of threads to use for shared memory parallelism. Default is 1. comm (MPI.Comm): The MPI communicator to use for MPI parallelism. Default is None. verbose (int): How talkative are we? 0: No output beyond hello and goodbye. 1: Outputs with timings but only on rank 0 (when using MPI). 2: Outputs with timings on all ranks (when using MPI). galaxies (list): A list of Galaxy objects that have been loaded. """ def __init__( self, emission_model, nthreads=1, comm=None, verbose=1, report_memory=False, max_npart=None, ): """Initialise the Pipeline object. This will not perform any part of the calculation, it only sets it up. This will attach all the passed attributes of the pipeline and set up anything we'll need later like MPI variables (if applicable), flags to indicate what stages we've completed and containers for any outputs and additional analysis functions. This will also check arguments are sensible, e.g. - The galaxy loader function is callable and takes at least one argument (the galaxy index). - Synthesizer has been installed with OpenMP support if multiple threads are requested. Args: emission_model (EmissionModel): The emission model to use for the pipeline. nthreads (int): The number of threads to use for shared memory parallelism. Default is 1. comm (MPI.Comm): The MPI communicator to use for MPI parallelism. Default is None. verbose (int): How talkative are we? 0: No output beyond hello and goodbye. 1: Outputs with timings but only on rank 0 (when using MPI). 2: Outputs with timings on all ranks (when using MPI). report_memory (bool): Should we report memory usage as we run? This should only be turned on for debugging purposes. This will report the memory usage of the galaxies, the results, and the pipeline object itself in the progress table. This can be very EXPENSIVE in terms of time since we recurse through all objects to get the memory usage after processing each galaxy. Default is False. max_npart (int): (Only applicable for particle based galaxies) The maximum number of particles to be processed at one time. Any galaxies above this threshold will be split until each individual "sub-galaxy" obeys the threshold. The subsequent emissions will then be combined at the end. Note that any operations that require the complete particle distribution (e.g. column densities) will be done on the original galaxy prior to splitting. TODO: while gas particles don't carry spectra we only need to consider stars and black holes in this threshold. If we have gas spectra in the future we will need to amend this. """ # Attributes to track timing self._start_time = time.perf_counter() # Attach all the attributes we need to know what to do self.emission_model = emission_model # Set verbosity self.verbose = verbose # Are we reporting memory usage? self._report_memory = report_memory # How many galaxies are we going to be looking at? self.n_galaxies = 0 self.n_galaxies_local = 0 # Only applicable when using MPI self.n_galaxies_per_rank = 0 # Only applicable when using MPI self.n_galaxies_offset = 0 # Only applicable when using MPI # Attach the maximum number of particles we can process at one time. if max_npart is not None and ( not isinstance(max_npart, int) or max_npart < 1 ): raise ValueError("max_npart must be an int >= 1") self._max_npart = max_npart # Define the container to hold the galaxies self.galaxies = [] # Define an empty InstrumentCollection to hold the instruments we # will use (this is only used for book keeping, the kwarg handler # deals with where to apply different instruments) self.instruments = InstrumentCollection() self._instruments_prepared = False # How many threads are we using for shared memory parallelism? self.nthreads = nthreads # Check if we can use OpenMP if self.nthreads > 1 and not check_openmp(): raise exceptions.InconsistentArguments( "Can't use multiple threads without OpenMP support. " " Install with: `WITH_OPENMP=1 pip install .`" ) # Define flags for what we will do self._do_los_optical_depths = False self._do_lnu_spectra = False self._do_fnu_spectra = False self._do_luminosities = False self._do_fluxes = False self._do_lum_lines = False self._do_flux_lines = False self._do_images_lum = False self._do_images_flux = False self._do_lnu_data_cubes = False self._do_fnu_data_cubes = False self._do_spectroscopy_lnu = False self._do_spectroscopy_fnu = False self._do_sfzh = False self._do_sfh = False # Define the container for all the kwargs needed by each operation # This is a handler that manages kwargs for each # (model_label, operation) pair. Each call to one of the signalling # methods will add a set of kwargs to the relevant operation. self._operation_kwargs = OperationKwargsHandler( model_labels=emission_model.saved_labels, ) # Define flags for what we will write out self._write_lnu_spectra = False self._write_fnu_spectra = False self._write_luminosities = False self._write_fluxes = False self._write_lines = False self._write_flux_lines = False self._write_images_lum = False self._write_images_lum_psf = False self._write_images_lum_noise = False self._write_images_flux = False self._write_images_flux_psf = False self._write_images_flux_noise = False self._write_lnu_data_cubes = False self._write_fnu_data_cubes = False self._write_spectroscopy_lnu = False self._write_spectroscopy_fnu = False self._write_sfzh = False self._write_sfh = False # Define flags to indicate we read the galaxies self._loaded_galaxies = False self._analysis_complete = False # Define containers for any additional analysis functions self._analysis_funcs = [] self._analysis_args = [] self._analysis_kwargs = [] self._analysis_results_keys = [] self._analysis_results = {} # Initialise containers for all the data we can generate self.sfzhs = [] self.sfhs = [] self.lnu_spectra = {"Galaxy": {}, "Stars": {}, "BlackHole": {}} self.fnu_spectra = {"Galaxy": {}, "Stars": {}, "BlackHole": {}} self.luminosities = {"Galaxy": {}, "Stars": {}, "BlackHole": {}} self.fluxes = {"Galaxy": {}, "Stars": {}, "BlackHole": {}} self.line_lums = {"Galaxy": {}, "Stars": {}, "BlackHole": {}} self.line_cont_lums = {"Galaxy": {}, "Stars": {}, "BlackHole": {}} self.line_lams = None self.line_ids = None self.line_fluxes = {"Galaxy": {}, "Stars": {}, "BlackHole": {}} self.line_cont_fluxes = {"Galaxy": {}, "Stars": {}, "BlackHole": {}} self.line_obs_lams = None self.images_lum = {"Galaxy": {}, "Stars": {}, "BlackHole": {}} self.images_lum_psf = {"Galaxy": {}, "Stars": {}, "BlackHole": {}} self.images_lum_noise = {"Galaxy": {}, "Stars": {}, "BlackHole": {}} self.images_flux = {"Galaxy": {}, "Stars": {}, "BlackHole": {}} self.images_flux_psf = {"Galaxy": {}, "Stars": {}, "BlackHole": {}} self.images_flux_noise = {"Galaxy": {}, "Stars": {}, "BlackHole": {}} self.lnu_data_cubes = {"Galaxy": {}, "Stars": {}, "BlackHole": {}} self.fnu_data_cubes = {"Galaxy": {}, "Stars": {}, "BlackHole": {}} self.spectroscopy = {"Galaxy": {}, "Stars": {}, "BlackHole": {}} # We'll need to store the timing information to be reported at the # end of the pipeline (this will be stored with the operation # as the key and the value being the collected time taken). self._op_timing = { "LOS optical depths": 0.0, "SFZH": 0.0, "SFH": 0.0, "Lnu Spectra": 0.0, "Fnu Spectra": 0.0, "Luminosities": 0.0, "Fluxes": 0.0, "Emission Line Luminosities": 0.0, "Emission Line Fluxes": 0.0, "Luminosity Images": 0.0, "Luminosity Images (With PSF)": 0.0, "Luminosity Images (With Noise)": 0.0, "Flux Images": 0.0, "Flux Images (With PSF)": 0.0, "Flux Images (With Noise)": 0.0, "Lnu Data Cubes": 0.0, "Fnu Data Cubes": 0.0, "Spectroscopy Lnu": 0.0, "Spectroscopy Fnu": 0.0, "Extra Analyses": 0.0, "Unpacking results": 0.0, } # We'll also report total counts so define a container for those. self._op_counts = { "LOS optical depths": 0, "SFZH": 0, "SFH": 0, "Lnu Spectra": 0, "Fnu Spectra": 0, "Luminosities": 0, "Fluxes": 0, "Emission Line Luminosities": 0, "Emission Line Fluxes": 0, "Luminosity Images": 0, "Luminosity Images (With PSF)": 0, "Luminosity Images (With Noise)": 0, "Flux Images": 0, "Flux Images (With PSF)": 0, "Flux Images (With Noise)": 0, "Lnu Data Cubes": 0, "Fnu Data Cubes": 0, "Spectroscopy Lnu": 0, "Spectroscopy Fnu": 0, "Extra Analyses": 0, } # Some constants we'll use to format the progress report self._table_col_width = 12 # Everything that follows is only needed for hybrid parallelism # (running with MPI in addition to shared memory parallelism) # If we are running with hybrid parallelism, we need to know about # the communicator for MPI self.comm = comm self.using_mpi = comm is not None and comm.Get_size() > 1 # Get some MPI information if we are using MPI if self.using_mpi: self.rank = comm.Get_rank() self.size = comm.Get_size() else: self.rank = 0 self.size = 1 if self.rank == 0: self._say_hello() self._report() # Attach the writer, we'll assign this later in the write method self.io_helper = None # If we are using MPI we need to make sure everyone agrees on the # start time, take the minimum if self.using_mpi: self._start_time = min(self.comm.allgather(self._start_time)) # Hold back everyone in MPI land until we're all ready to go if self.using_mpi: comm.Barrier() def _say_hello(self): """Print a nice welcome.""" if self.verbose == 0: return print() print("\n".join([" " * 25 + s for s in Art.galaxy.split("\n")])) print() def _report(self): """Print a report containing the Pipeline setup.""" # Print the MPI setup if we are using MPI if self.using_mpi: self._print(f"Running with MPI on {self.size} ranks.") # Print the shared memory parallelism setup if self.nthreads > 1 and self.using_mpi: self._print(f"Running with {self.nthreads} threads per rank.") elif self.nthreads > 1: self._print(f"Running with {self.nthreads} threads.") # Print some information about the emission model self._print(f"Root emission model: {self.emission_model.label}") self._print( f"EmissionModel contains {len(self.emission_model._models)} " "individual models." ) self._print("EmissionModels split by emitter:") label_width = max( len(" - galaxy"), len(" - stellar"), len(" - blackhole") ) ngal_models = len( [ m for m in self.emission_model._models.values() if m.emitter == "galaxy" ] ) nstar_models = len( [ m for m in self.emission_model._models.values() if m.emitter == "stellar" ] ) nbh_models = len( [ m for m in self.emission_model._models.values() if m.emitter == "blackhole" ] ) self._print(f"{' - galaxy:'.ljust(label_width + 2)} {ngal_models}") self._print(f"{' - stellar:'.ljust(label_width + 2)} {nstar_models}") self._print(f"{' - blackhole:'.ljust(label_width + 2)} {nbh_models}") self._print("EmissionModels split by operation type:") label_width = max( len(" - extraction"), len(" - combination"), len(" - attenuating"), len(" - generation"), ) nextract_models = len( [ m for m in self.emission_model._models.values() if m._is_extracting ] ) ncombine_models = len( [ m for m in self.emission_model._models.values() if m._is_combining ] ) nattenuate_models = len( [ m for m in self.emission_model._models.values() if m._is_transforming ] ) ngen_models = len( [ m for m in self.emission_model._models.values() if m._is_generating ] ) self._print( f"{' - extraction:'.ljust(label_width + 2)} {nextract_models}" ) self._print( f"{' - combination:'.ljust(label_width + 2)} {ncombine_models}" ) self._print( f"{' - attenuating:'.ljust(label_width + 2)} {nattenuate_models}" ) self._print( f"{' - generation:'.ljust(label_width + 2)} {ngen_models}" ) def _report_instruments(self): """Print a report containing the instruments setup.""" # Unpack the instruments to collect together all the unique instruments unique_instruments = {} for inst in self.instruments: unique_instruments[inst.label] = inst ninstruments = len(unique_instruments) # Print the number of instruments we have self._print(f"Using {ninstruments} instruments.") # Print the number of filters we have nfilters = 0 for inst in unique_instruments.values(): if getattr(inst, "filters", None) is not None: nfilters += len(inst.filters) self._print(f"Instruments have {nfilters} filters in total.") # Make a breakdown of the instruments self._print( "Included instruments:", ", ".join(list(unique_instruments.keys())), ) self._print("Instruments split by capability:") label_width = max( len(" - photometry"), len(" - spectroscopy"), len(" - imaging"), len(" - resolved spectroscopy"), ) nphot_inst = len( [ inst for inst in unique_instruments.values() if inst.can_do_photometry ] ) nspec_inst = len( [ inst for inst in unique_instruments.values() if inst.can_do_spectroscopy ] ) nimg_inst = len( [ inst for inst in unique_instruments.values() if inst.can_do_imaging ] ) nresspec_inst = len( [ inst for inst in unique_instruments.values() if inst.can_do_resolved_spectroscopy ] ) self._print( f"{' - photometry:'.ljust(label_width + 2)} {nphot_inst}" ) self._print( " - spectroscopy:".ljust(label_width + 2), nspec_inst, ) self._print(f"{' - imaging:'.ljust(label_width + 2)} {nimg_inst}") self._print( f"{' - resolved spectroscopy:'.ljust(label_width + 2)}" f" {nresspec_inst}" ) def _say_goodbye(self): """Print a nice goodbye including timing.""" elapsed = time.perf_counter() - self._start_time # Report in sensible units if elapsed < 1: elapsed *= 1000 units = "ms" elif elapsed < 60: units = "s" elif elapsed < 3600: elapsed /= 60 units = "mins" else: elapsed /= 3600 units = "hours" # Report how blazingly fast we are self._print(f"Total synthesis took {elapsed:.3f} {units}.") self._print("Goodbye!") def _print(self, *args, **kwargs): """Print a message to the screen with extra information. The prints behave differently depending on whether we are using MPI or not. We can also set the verbosity level at the Pipeline level which will control the verbosity of the print statements. Verbosity: 0: No output beyond hello and goodbye. 1: Outputs with timings but only on rank 0 (when using MPI). 2: Outputs with timings on all ranks (when using MPI). Args: *args: The message to print. **kwargs: Any additional arguments to pass to the print function. """ # At verbosity 0 we are silent if self.verbose == 0: return # Get the current time code in seconds with 0 padding and 2 # decimal places now = time.perf_counter() - self._start_time int_now = str(int(now)).zfill( len(str(int(now))) + 1 if now > 9999 else 5 ) decimal = str(now).split(".")[-1][:2] now_str = f"{int_now}.{decimal}" # Create the prefix for the print, theres extra info to output if # we are using MPI if self.using_mpi: # Only print on rank 0 if we are using MPI and have verbosity 1 if self.verbose == 1 and self.rank != 0: return prefix = ( f"[{str(self.rank).zfill(len(str(self.size)) + 1)}]" f"[{now_str}]:" ) else: prefix = f"[{now_str}]:" print(prefix, *args, **kwargs) def _took(self, start, message): """Print a message with the time taken since the start time. Args: start (float): The start time of the process. message (str): The message to print. """ elapsed = time.perf_counter() - start # Report in sensible units if elapsed < 1: elapsed *= 1000 units = "ms" elif elapsed < 60: units = "s" else: elapsed /= 60 units = "mins" # Report how blazingly fast we are self._print(f"{message} took {elapsed:.3f} {units}.") @property def results_memory_usage(self): """Return the memory usage of the results stored on the Pipeline. Returns: float: The memory usage in Megabytes. """ results = [ self.sfzhs, self.sfhs, self.lnu_spectra, self.fnu_spectra, self.luminosities, self.fluxes, self.line_lums, self.line_cont_lums, self.line_lams, self.line_ids, self.line_fluxes, self.line_cont_fluxes, self.line_obs_lams, self.images_lum, self.images_lum_psf, self.images_flux, self.images_flux_psf, self.lnu_data_cubes, self.fnu_data_cubes, self.spectroscopy, ] return get_full_memory(results) / (1024**2) @property def galaxies_memory_usage(self): """Return the memory usage of the galaxies loaded into the Pipeline. Returns: float: The memory usage in Megabytes. """ return get_full_memory(self.galaxies) / (1024**2) @property def all_galaxies_memory_usage(self): """Return the memory usage of all galaxies across all ranks. Returns: float: The memory usage in Megabytes. """ if self.using_mpi: return self.comm.allreduce(self.galaxies_memory_usage) return self.galaxies_memory_usage @property def memory_usage(self): """Return the memory usage of the Pipeline object. Returns: float: The memory usage in Megabytes. """ return get_full_memory(self) / (1024**2) def _print_progress_header(self): """Print the header for the progress report.""" # Nothing to do if we are not rank 0 if self.rank != 0: return # Obey 0 verbosity if self.verbose == 0: return # Print the pipeline footprint regardless of report memory setting self._print("Pipeline memory footprint (MB):", self.memory_usage) self._print("Running the pipeline...") self._print_progress_footer() # Define the header for the progress report header = ( "|" + f"{'Galaxy #':>{self._table_col_width}}" + "|" + f"{'Nstars':>{self._table_col_width}}" + "|" + f"{'Ngas':>{self._table_col_width}}" + "|" + f"{'Nbh':>{self._table_col_width}}" + "|" + f"{'dt (s)':>{self._table_col_width}}" + "|" ) # If we are reporting memory usage we need to add the extra columns if self._report_memory: header += ( f"{'Memory footprint (MB)':>{self._table_col_width * 2}}" + "|" + f"{'Galaxy memory (MB)':>{self._table_col_width * 2}}" + "|" + f"{'Results memory (MB)':>{self._table_col_width * 2}}" + "|" ) self._print(header) self._print_progress_footer() def _add_progress_row(self, gal, igal, start): """Print a row for the progress report. Args: gal (Galaxy): The galaxy object to report on. igal (int): The index of the galaxy. start (float): The start time of the process. """ # Obey 0 verbosity if self.verbose == 0: return # Get the number of stars, for parametric galaxies we don't have this # information but we do have the sfzh shape if gal.stars is not None and hasattr(gal.stars, "nstars"): nstars = gal.stars.nstars elif gal.stars is not None: nstars = str(gal.stars.sfzh.shape) else: nstars = "None" # Get the number of blackholes (this is automatically 1 for parametric # galaxies) nbh = gal.black_holes.nbh if gal.black_holes is not None else "None" # Get the number of gas particles (this should be None for parametric) ngas = gal.gas.nparticles if gal.gas is not None else "None" # Get the elapsed time for this galaxy elapsed = time.perf_counter() - start # Get the memory footprint if we are reporting memory usage if self._report_memory: res_mem_mb = self.results_memory_usage self_mem_mb = self.memory_usage gal_mem_mb = self.galaxies_memory_usage # In MPI land we need to add the offset to this ranks galaxy index if self.using_mpi: igal += self.n_galaxies_offset # We don't want to obey the verbosity level here, we always want to # print this information so we'll use the print method directly # Get the current time code in seconds with 0 padding and 2 # decimal places now = time.perf_counter() - self._start_time int_now = str(int(now)).zfill( len(str(int(now))) + 1 if now > 9999 else 5 ) decimal = str(now).split(".")[-1][:2] now_str = f"{int_now}.{decimal}" # Create the prefix for the print, theres extra info to output if # we are using MPI if self.using_mpi: # Only print on rank 0 if we are using MPI and have verbosity 1 if self.verbose == 1 and self.rank != 0: return prefix = ( f"[{str(self.rank).zfill(len(str(self.size)) + 1)}]" f"[{now_str}]:" ) else: prefix = f"[{now_str}]:" # Define the row for the progress report row = ( "|" + f"{igal:>{self._table_col_width}}" + "|" + f"{nstars:>{self._table_col_width}}" + "|" + f"{ngas:>{self._table_col_width}}" + "|" + f"{nbh:>{self._table_col_width}}" + "|" + f"{elapsed:>{self._table_col_width}.2f}" + "|" ) # If we are reporting memory usage we need to add the extra columns if self._report_memory: row += ( f"{self_mem_mb:>{self._table_col_width * 2}.2f}" + "|" + f"{gal_mem_mb:>{self._table_col_width * 2}.2f}" + "|" + f"{res_mem_mb:>{self._table_col_width * 2}.2f}" + "|" ) print(prefix, row) def _print_progress_footer(self): """Print the footer for the progress report.""" footer = ( "+" + "-" * self._table_col_width + "+" + "-" * self._table_col_width + "+" + "-" * self._table_col_width + "+" + "-" * self._table_col_width + "+" + "-" * self._table_col_width + "+" ) # If we are reporting memory usage we need to add the extra columns if self._report_memory: footer += ( "-" * self._table_col_width * 2 + "+" + "-" * self._table_col_width * 2 + "+" + "-" * self._table_col_width * 2 + "+" ) self._print(footer) def _report_total_timings(self): """Print the total timings for each operation.""" # If we are using MPI we need to sum the timings and counts across all # ranks to get the total time taken and total counts if self.using_mpi: for key in self._op_timing: self._op_timing[key] = self.comm.allreduce( self._op_timing[key] ) for key in self._op_counts: self._op_counts[key] = self.comm.allreduce( self._op_counts[key] ) # We only want to report this on rank 0 so we'll return if we're # not rank 0 if self.rank != 0: return # Loop over the timings and print them out for key, elapsed in self._op_timing.items(): if elapsed == 0.0: continue # Report in sensible units if elapsed < 1: elapsed *= 1000 units = "ms" elif elapsed < 60: units = "s" else: elapsed /= 60 units = "mins" # Handle reporting (extra analysis and unpacking need some special # handling because they are not operations) if key == "Unpacking results": self._print(f"{key} took {elapsed:.2f} {units}") elif key == "Extra Analyses": self._print( f"Running {self._op_counts[key]} extra analyses" f" took {elapsed:.2f} {units}" ) elif key in self._op_counts: self._print( f"Computing {self._op_counts[key]} {key}" f" took {elapsed:.2f} {units}" ) else: self._print(f"Computing {key} took {elapsed:.2f} {units}")
[docs] def report_operations(self): """Print the operations that will be performed by the Pipeline. This will print out the operations that will be performed by the pipeline, including which will be written out and which will just be computed. """ self._print("-" * 60) self._print("".ljust(30) + "Compute?".rjust(15) + "Write?".rjust(15)) # Print each operation and whether it will be written out self._print( "Line of Sight Optical Depths".ljust(30) + str(self._do_los_optical_depths).rjust(15) + "N/A".rjust(15) ) self._print( "SFZH".ljust(30) + str(self._do_sfzh).rjust(15) + str(self._write_sfzh).rjust(15) ) self._print( "SFH".ljust(30) + str(self._do_sfh).rjust(15) + str(self._write_sfh).rjust(15) ) self._print( "Lnu Spectra".ljust(30) + str(self._do_lnu_spectra).rjust(15) + str(self._write_lnu_spectra).rjust(15) ) self._print( "Fnu Spectra".ljust(30) + str(self._do_fnu_spectra).rjust(15) + str(self._write_fnu_spectra).rjust(15) ) self._print( "Photometric Luminosities".ljust(30) + str(self._do_luminosities).rjust(15) + str(self._write_luminosities).rjust(15) ) self._print( "Photometric Fluxes".ljust(30) + str(self._do_fluxes).rjust(15) + str(self._write_fluxes).rjust(15) ) self._print( "Emission Line Luminosities".ljust(30) + str(self._do_lum_lines).rjust(15) + str(self._write_lines).rjust(15) ) self._print( "Emission Line Fluxes".ljust(30) + str(self._do_flux_lines).rjust(15) + str(self._write_flux_lines).rjust(15) ) self._print( "Luminosity Images".ljust(30) + str(self._do_images_lum).rjust(15) + str(self._write_images_lum).rjust(15) ) self._print( "Flux Images".ljust(30) + str(self._do_images_flux).rjust(15) + str(self._write_images_flux).rjust(15) ) self._print( "Lnu Data Cubes".ljust(30) + str(self._do_lnu_data_cubes).rjust(15) + str(self._write_lnu_data_cubes).rjust(15) ) self._print( "Fnu Data Cubes".ljust(30) + str(self._do_fnu_data_cubes).rjust(15) + str(self._write_fnu_data_cubes).rjust(15) ) self._print( "Spectroscopy Lnu".ljust(30) + str(self._do_spectroscopy_lnu).rjust(15) + str(self._write_spectroscopy_lnu).rjust(15) ) self._print( "Spectroscopy Fnu".ljust(30) + str(self._do_spectroscopy_fnu).rjust(15) + str(self._write_spectroscopy_fnu).rjust(15) ) self._print("-" * 60)
[docs] def add_analysis_func(self, func, result_key, *args, **kwargs): """Add an analysis function to the Pipeline. The provided function will be called on each galaxy in the Pipeline once all data has been generated. The function should take a galaxy object as the first argument and can take any number of additional arguments and keyword arguments. The results of the analysis function should be returned. This can be a scalar, array, or a dictionary of arbitrary structure. We'll store it in a dictionary on the Pipeline object with the key being the result_key argument. For example: ```python def my_analysis_func(galaxy, *args, **kwargs): return galaxy.some_attribute * 2 pipeline.add_analysis_func(my_analysis_func, "MyAnalysisResult") ``` Or for a specific component of the galaxy: ```python def my_analysis_func(galaxy, *args, **kwargs): return galaxy.stars.mass.sum() pipeline.add_analysis_func(my_analysis_func, "Stars/Mass") ``` Args: func (callable): The analysis function to add to the Pipeline. This function should take a galaxy object as the first argument and can take any number of additional arguments and keyword arguments. result_key (str): The key to use when storing the results of the analysis function in the output. This can include slashes to denote nesting, e.g. "Gas/Nested/Result". *args: Any additional arguments to pass to the analysis function. **kwargs: Any additional keyword arguments to pass to the analysis function. """ # Ensure we have a callable function if not callable(func): raise exceptions.InconsistentArguments( "Analysis function is not a callable function" f" (found {type(func)})." ) # Warn the user if theres a name clash, we'll take the new one if func.__name__ in self._analysis_funcs: warn( f"{func.__name__} already exists in the analysis functions. " "Overwriting with the passed function." ) # Add the function to the dictionary self._analysis_funcs.append(func) self._analysis_args.append(args) self._analysis_kwargs.append(kwargs) self._analysis_results_keys.append(result_key) # Warn the user if theres a name clash, we'll take the new one if result_key in self._analysis_results: raise exceptions.InconsistentArguments( f"{result_key} already exists in the analysis results. " "Choose a different result_key" ) else: self._analysis_results[result_key] = [] self._print(f"Added analysis function: {result_key}")
[docs] def add_galaxies(self, galaxies): """Add galaxies to the Pipeline. This function will add the provided galaxies to the Pipeline. This is useful if you have already loaded the galaxies and want to add them to the Pipeline object. Args: galaxies (list): A list of Galaxy objects to add to the Pipeline. """ start = time.perf_counter() # Check there are actually galaxies... and tell the user if len(galaxies) == 0: if self.using_mpi: self._print( "No galaxies provided to add to the Pipeline on rank" f" {self.rank}." ) else: self._print("No galaxies provided to add to the Pipeline.") # Attach the galaxies self.galaxies = galaxies self.n_galaxies_local = len(galaxies) # If we're in MPI land we need sum the local counts across all ranks # to get the total number of galaxies if self.using_mpi: self.n_galaxies = self.comm.allreduce(self.n_galaxies_local) self.n_galaxies_per_rank = self.comm.allgather( self.n_galaxies_local ) self.n_galaxies_offset = sum(self.n_galaxies_per_rank[: self.rank]) else: self.n_galaxies = self.n_galaxies_local self.n_galaxies_per_rank = [self.n_galaxies_local] self.n_galaxies_offset = 0 # If we have MPI lets report the balance if self.using_mpi: self._report_balance() # Report the galaxy memory footprint (in MPI land we'll report # the total and local memory usage) if self.using_mpi: self._print( "Local galaxies memory footprint:" f" {self.galaxies_memory_usage:.2f} MB" ) self._print( "Distributed galaxies total memory footprint:" f" {self.all_galaxies_memory_usage:.2f} MB" ) else: self._print( "Galaxies memory footprint: " f"{self.galaxies_memory_usage:.2f} MB" ) # Done! self._loaded_galaxies = True self._took(start, f"Adding {self.n_galaxies} galaxies")
def _add_instruments(self, instruments): """Add instruments to the Pipeline. This is a helprer used to attach instruments to the pipeline. Note that these instruments are only used for book keeping. The machinery to apply instruments to the right operation is handled by the OperationKwargsHandler (self._operation_kwargs). This method will convert the input to a flat list of Instruments. Args: instruments (list of Instrument/InstrumentCollection): The instruments to attach to the pipeline. Returns: list of Instrument: The flattened list of instruments. """ # Flatten the instruments into a single list _instruments = [] for inst in instruments: if isinstance(inst, InstrumentCollection): _instruments.extend(list(inst.instruments.values())) else: _instruments.append(inst) # Get a set for the currently instrument collection current_instruments = self.instruments.to_set() # Ensure we don't have duplicate labels (this can happen if the same # label is being used for similar instruments with different # properties) all_labels = [inst.label for inst in current_instruments] label_set = set(all_labels) if len(label_set) != len(all_labels): raise exceptions.InconsistentArguments( "Duplicate instrument labels found when adding " "instruments to the Pipeline. Ensure all instruments " f"have unique labels. Labels found: {all_labels}" ) # Add the new instruments to the instrument collection new_instruments = set(_instruments) - current_instruments self.instruments.add_instruments(*new_instruments) if len(new_instruments) > 0: self._instruments_prepared = False return _instruments def _prepare_instruments(self): """Prewarm instrument filters onto the emission-model grid. This prepares filters once per pipeline setup so repeated photometry operations can avoid interpolation and reuse precomputed integration weights. """ if self._instruments_prepared: return # Nothing to do if we have no instruments attached. if len(self.instruments) == 0: self._instruments_prepared = True return # If the emission model has no wavelength grid we can only precompute # on each filter's native grid. model_lam = getattr(self.emission_model, "lam", None) for inst in self.instruments: if not inst.can_do_photometry: continue if model_lam is not None: inst.filters.prepare_for_grid(lam=model_lam) else: inst.filters.prepare_for_grid() self._instruments_prepared = True
[docs] def get_los_optical_depths( self, kernel, kernel_threshold=1.0, kappa=0.0795, tau_v_attr="tau_v", ): """Flag that the Pipeline should compute the LOS optical depths. This will signal the Pipeline to compute the LOS optical depths when the run method is called. LOS optical depths are computed first. Note that the LOS calculation requires a galaxy has a gas component and either a stellar or black hole components emitting. Args: kernel (array-like): The gas SPH kernel. kernel_threshold (float): The threshold of the kernel. Default is 1.0. kappa (float): The dust opacity coefficient in units of Msun / pc**2. Default is 0.0795. tau_v_attr (str): The name of the attribute to store the V-band optical depth. Default is "tau_v". """ # Store the arguments for the operation self._operation_kwargs.add( NO_MODEL_LABEL, "get_los_optical_depths", kernel=kernel, kernel_threshold=kernel_threshold, kappa=kappa, tau_v_attr=tau_v_attr, ) # Flag that we will compute the LOS optical depths self._do_los_optical_depths = True
def _get_los_optical_depths(self, galaxy): """Compute the Line of Sight optical depths for all particles. This will compute the optical depths based on the line of sight dust column density for all non-gas components. We project a ray along the z axis (LOS) and any gas kernels it intersects are evaluated at the intersection and their contributions to the optical depth is included. Args: galaxy (Galaxy): The galaxy to compute the optical depths for. """ start = time.perf_counter() # Iterate over all queued operation configurations for model_label, op_kwargs in self._operation_kwargs[ "get_los_optical_depths" ]: # Compute the optical depths for the present components. if ( galaxy.stars is not None and galaxy.gas is not None and galaxy.stars.nstars > 0 ): galaxy.get_stellar_los_tau_v( kernel=op_kwargs["kernel"], threshold=op_kwargs["kernel_threshold"], kappa=op_kwargs["kappa"], tau_v_attr=op_kwargs["tau_v_attr"], nthreads=self.nthreads, ) if ( galaxy.black_holes is not None and galaxy.gas is not None and galaxy.black_holes.nbh > 0 ): galaxy.get_black_hole_los_tau_v( kernel=op_kwargs["kernel"], threshold=op_kwargs["kernel_threshold"], kappa=op_kwargs["kappa"], tau_v_attr=op_kwargs["tau_v_attr"], nthreads=self.nthreads, ) # Count how many optical depths we have generated (1 per particle) and # increment the total counts if galaxy.stars is not None and galaxy.gas is not None: self._op_counts["LOS optical depths"] += galaxy.stars.nstars if galaxy.black_holes is not None and galaxy.gas is not None: self._op_counts["LOS optical depths"] += galaxy.black_holes.nbh # Record the time taken self._op_timing["LOS optical depths"] += time.perf_counter() - start
[docs] def get_sfzh(self, log10ages, metallicities, write=True): """Flag that the Pipeline should compute the SFZH grid. This will signal the Pipeline to compute the SFZH grid when the run method is called. The SFZH grid is the star formation history grid for each galaxy. Args: log10ages (array-like): The log10 age axis of the SFZH grid. metallicities (array-like): The metallicity axis of the SFZH grid. write (bool): Whether to write out the SFZH grid. Default is True. """ # Store the arguments for the operation self._operation_kwargs.add_unique( "get_sfzh", log10ages=log10ages, metallicities=metallicities, ) # Flag that we will compute the SFZH grid self._do_sfzh = True # Flag that we will want to write out the SFZH grid (calling the # get_sfzh method is considered the the intent to write it out # by default) self._write_sfzh = write or self._write_sfzh
def _get_sfzh(self, galaxy): """Compute the SFZH grid for each galaxy. This is also the integrated weights of each star particle onto the SPS grid. Args: galaxy (Galaxy): The galaxy to compute the SFZH grid for. Returns: array-like: The computed or cached SFZH grid. """ start = time.perf_counter() # Get the operation kwargs for this operation op_kwargs = self._operation_kwargs.get_unique_kwargs("get_sfzh") # Get the SFZH, skip any without stars. # Parametric galaxies have this ready to go so we can skip them if getattr(galaxy, "sfzh", None) is not None: return galaxy.sfzh elif galaxy.stars is not None and galaxy.stars.nstars > 0: galaxy.stars.get_sfzh( log10ages=op_kwargs["log10ages"], metallicities=op_kwargs["metallicities"], nthreads=self.nthreads, ) galaxy.sfzh = galaxy.stars.sfzh else: # No stars, no SFZH, store a zeroed grid galaxy.sfzh = np.zeros( ( len(op_kwargs["log10ages"]), len(op_kwargs["metallicities"]), ) ) return galaxy.sfzh # Count the number of SFZH grids we have generated self._op_counts["SFZH"] += 1 # Record the time taken self._op_timing["SFZH"] += time.perf_counter() - start return galaxy.sfzh
[docs] def get_sfh(self, log10ages, write=True): """Flag that the Pipeline should compute the binned SFH. This will signal the Pipeline to compute the binned SFH when the run method is called. The SFH is the binned star formation history based on an arbitrary set of age bins define din log10 space. Args: log10ages (array-like): The log10 age axis of the SFH grid. write (bool): Whether to write out the SFH grid. Default is True. """ # Store the arguments for the operation self._operation_kwargs.add_unique( "get_sfh", log10ages=log10ages, ) # Flag that we will compute the SFH grid self._do_sfh = True # Flag that we will want to write out the SFH grid (calling the # get_sfh method is considered the the intent to write it out # by default) self._write_sfh = write or self._write_sfh
def _get_sfh(self, galaxy): """Compute the binned SFH for each galaxy. Args: galaxy (Galaxy): The galaxy to compute the SFH for. Returns: array-like: The computed or cached SFH grid. """ start = time.perf_counter() # Get the operation kwargs for this operation op_kwargs = self._operation_kwargs.get_unique_kwargs("get_sfh") # Get the SFH, skip any without stars. # Parametric galaxies have this ready to go so we can skip them if getattr(galaxy, "sfh", None) is not None: return galaxy.sfh elif galaxy.stars is not None and galaxy.stars.nstars > 0: galaxy.stars.get_sfh( log10ages=op_kwargs["log10ages"], nthreads=self.nthreads, ) galaxy.sfh = galaxy.stars.sfh else: # No stars, no SFH, store a zeroed grid galaxy.sfh = np.zeros(len(op_kwargs["log10ages"])) return galaxy.sfh # Count the number of SFH grids we have generated self._op_counts["SFH"] += 1 # Record the time taken self._op_timing["SFH"] += time.perf_counter() - start return galaxy.sfh
[docs] def get_spectra(self, write=True): """Flag that the Pipeline should compute the rest frame spectra. This will signal the Pipeline to compute the rest frame spectral luminosity density for each galaxy when the run method is called. The spectra are generated based on the EmissionModel and the galaxy components. Spectral flux densities can be computed with get_observed_spectra. Note that there are no kwargs for this method and all models flagged to be saved by the emission model will be computed. Args: write (bool): Whether to write out the spectra. Default is True. """ # Flag that we will compute the spectra self._do_lnu_spectra = True # Flag that we will want to write out the spectra (calling the # get_spectra method is considered the intent to write it out # by default) self._write_lnu_spectra = write or self._write_lnu_spectra
# Spectra has no kwargs so we don't need to store anything in # the handler (self._operation_kwargs) def _get_spectra(self, galaxy): """Generate the spectra for the galaxies based on the EmissionModel. Args: galaxy (Galaxy): The galaxy to generate the spectra for. """ start = time.perf_counter() # Get the spectra galaxy.get_spectra(self.emission_model, nthreads=self.nthreads) # Count the number of spectra we have generated self._op_counts["Lnu Spectra"] += count_and_check_dict_recursive( galaxy.spectra ) if galaxy.stars is not None: self._op_counts["Lnu Spectra"] += count_and_check_dict_recursive( galaxy.stars.spectra ) if galaxy.black_holes is not None: self._op_counts["Lnu Spectra"] += count_and_check_dict_recursive( galaxy.black_holes.spectra ) # Record the time taken self._op_timing["Lnu Spectra"] += time.perf_counter() - start
[docs] def get_observed_spectra(self, cosmo, igm=None, write=True): """Flag that the Pipeline should compute the observed spectra. This will signal the Pipeline to compute the observed spectral flux density for each galaxy when the run method is called. The observed spectra are generated based on the rest frame spectra and the cosmology. Args: cosmo (astropy.cosmology.Cosmology): The cosmology to use for the observed spectra. igm (IGMBase): The IGM model to use for the attenuation of the spectra. write (bool): Whether to write out the observed spectra. Default is True. """ # Store the cosmology for the operation self._operation_kwargs.add_unique( "get_observed_spectra", cosmo=cosmo, igm=igm, ) # Flag that we will compute the observed spectra self._do_fnu_spectra = True # Flag that we will want to write out the observed spectra (calling the # get_observed_spectra method is considered the intent to write it out # by default) self._write_fnu_spectra = write or self._write_fnu_spectra # We need to ensure the lnu spectra are computed first # NOTE: this is safe if the user has already called get_spectra, it # will just leave the flag as True and respect the original intent to # write or not write self.get_spectra(write=False)
def _get_observed_spectra(self, galaxy): """Compute the observed spectra for each galaxy. Args: galaxy (Galaxy): The galaxy to compute the observed spectra for. """ start = time.perf_counter() # Get the operation kwargs for this operation op_kwargs = self._operation_kwargs.get_unique_kwargs( "get_observed_spectra" ) # Get the observed spectra galaxy.get_observed_spectra( cosmo=op_kwargs["cosmo"], igm=op_kwargs["igm"], ) # Count the number of observed spectra we have generated self._op_counts["Fnu Spectra"] += count_and_check_dict_recursive( galaxy.spectra ) if galaxy.stars is not None: self._op_counts["Fnu Spectra"] += count_and_check_dict_recursive( galaxy.stars.spectra ) if galaxy.black_holes is not None: self._op_counts["Fnu Spectra"] += count_and_check_dict_recursive( galaxy.black_holes.spectra ) # Record the time taken self._op_timing["Fnu Spectra"] += time.perf_counter() - start
[docs] def get_photometry_luminosities( self, *instruments, labels=None, write=True, ): """Flag that the Pipeline should compute the photometric luminosities. This will signal the Pipeline to compute the photometric luminosities for each galaxy when the run method is called using the passed instrument. If multiple instruments are desired this method can be called multiple times to add the new instruments. The photometric luminosities are generated based on the lnu spectra and the instrument filters. Args: instruments (Instrument/InstrumentCollection): The instruments to use for the photometric luminosities. This can be any number of instruments or instrument collections, they will all be combined into a single InstrumentCollection for this operation. labels (str/list of str): An optional label/set of labels for this set of instruments. This is only needed if you want to apply different instrument sets to different models. Default is None. write (bool): Whether to write out the photometric luminosities. Default is True. """ # Flag that we will compute the photometric luminosities self._do_luminosities = True # Flag that we will want to write out the photometric luminosities # (calling the get_photometry_luminosities method is considered the # intent to write it out by default) self._write_luminosities = write or self._write_luminosities # Check that we have instruments to compute the photometry for if len(instruments) == 0: raise exceptions.PipelineNotReady( "Cannot generate photometry without instruments with filters! " "Pass instruments to the get_photometry_luminosities method." ) # Unpack and attach instruments to the pipeline _instruments = self._add_instruments(instruments) # Check that the instruments can do photometry for inst in _instruments: if not inst.can_do_photometry: raise exceptions.PipelineNotReady( f"Cannot generate photometry with {inst.label}!" ) # Add the instruments to the operation kwargs self._operation_kwargs.add( labels if labels is not None else NO_MODEL_LABEL, "get_photometry_luminosities", instruments=_instruments, ) # We need to ensure the lnu spectra are computed first # NOTE: this is safe if the user has already called get_spectra, it # will just leave the flag as True and respect the original intent to # write or not write self.get_spectra(write=False)
def _get_photometry_luminosities(self, galaxy): """Compute the photometric luminosities from the generated spectra. Args: galaxy (Galaxy): The galaxy to compute the photometric luminosities for. """ start = time.perf_counter() # Loop over all queued operation configurations for model_label, op_kwargs in self._operation_kwargs[ "get_photometry_luminosities" ]: # Get the instruments for this operation instruments = op_kwargs["instruments"] # Get the photometry. galaxy.get_photo_lnu( filters=instruments.all_filters, nthreads=self.nthreads, limit_to=model_label, ) # Count the number of photometric luminosities we have generated self._op_counts["Luminosities"] += count_and_check_dict_recursive( galaxy.photo_lnu ) if galaxy.stars is not None: self._op_counts["Luminosities"] += count_and_check_dict_recursive( galaxy.stars.photo_lnu ) if galaxy.black_holes is not None: self._op_counts["Luminosities"] += count_and_check_dict_recursive( galaxy.black_holes.photo_lnu ) # Record the time taken self._op_timing["Luminosities"] += time.perf_counter() - start
[docs] def get_photometry_fluxes( self, *instruments, cosmo=None, igm=None, labels=None, write=True, ): """Flag that the Pipeline should compute the photometric fluxes. This will signal the Pipeline to compute the photometric fluxes for each galaxy when the run method is called. The photometric fluxes are generated based on the fnu spectra and the instrument filters. Args: instruments (Instrument/InstrumentCollection): The instruments to use for the photometric fluxes. This can be any number of instruments or instrument collections, they will all be combined into a single InstrumentCollection for this operation. cosmo (astropy.cosmology.Cosmology): If get_spectra_observed has not been called explicitly, then we will need the cosmology to compute the observed spectra first. Default is None. igm (IGMBase): If get_spectra_observed has not been called explicitly, then we will need the IGM model to compute the observed spectra first. Unlike the cosmology, this is not required if IGM attenuation is not needed. Default is None. labels (str/list of str): An optional label/set of labels for this set of instruments. This is only needed if you want to apply different instrument sets to different models. Default is None. write (bool): Whether to write out the photometric fluxes. Default is True. """ # Flag that we will compute the photometric fluxes self._do_fluxes = True # Flag that we will want to write out the photometric fluxes (calling # the get_photometry_fluxes method is considered the intent to write it # out by default) self._write_fluxes = write or self._write_fluxes # Check that we have instruments to compute the photometry for if len(instruments) == 0: raise exceptions.PipelineNotReady( "Cannot generate photometry without instruments with filters! " "Pass instruments to the get_photometry_fluxes method." ) # Unpack and attach instruments to the pipeline _instruments = self._add_instruments(instruments) # Check that the instruments can do photometry for inst in _instruments: if not inst.can_do_photometry: raise exceptions.PipelineNotReady( f"Cannot generate photometry with {inst.label}!" ) # Add the instruments to the operation kwargs self._operation_kwargs.add( labels if labels is not None else NO_MODEL_LABEL, "get_photometry_fluxes", instruments=_instruments, cosmo=cosmo, igm=igm, ) # We need to ensure the fnu spectra are computed first # NOTE: this is safe if the user has already calculated # get_observed_spectra, it will just leave the flag as True and respect # the original intent to write or not write self.get_observed_spectra(cosmo=cosmo, igm=igm, write=False)
def _get_photometry_fluxes(self, galaxy): """Compute the photometric fluxes from the generated spectra. Args: galaxy (Galaxy): The galaxy to compute the photometric fluxes for. """ start = time.perf_counter() # Loop over all queued operation configurations for model_label, op_kwargs in self._operation_kwargs[ "get_photometry_fluxes" ]: # Get the instruments for this operation instruments = op_kwargs["instruments"] # Get the photometry. galaxy.get_photo_fnu( filters=instruments.all_filters, nthreads=self.nthreads, limit_to=model_label, ) # Count the number of photometric fluxes we have generated self._op_counts["Fluxes"] += count_and_check_dict_recursive( galaxy.photo_fnu ) if galaxy.stars is not None: self._op_counts["Fluxes"] += count_and_check_dict_recursive( galaxy.stars.photo_fnu ) if galaxy.black_holes is not None: self._op_counts["Fluxes"] += count_and_check_dict_recursive( galaxy.black_holes.photo_fnu ) # Record the time taken self._op_timing["Fluxes"] += time.perf_counter() - start
[docs] def get_lines(self, line_ids, write=True): """Flag that the Pipeline should compute the emission lines. This will signal the Pipeline to compute the emission lines for each galaxy when the run method is called. The emission lines are generated based on the lnu spectra and the EmissionModel. Args: line_ids (list): The emission line IDs to generate. write (bool): Whether to write out the emission lines. Default is True. """ # Store the line IDs for the operation self._operation_kwargs.add_unique( "get_lines", line_ids=line_ids, ) # Flag that we will compute the emission lines self._do_lum_lines = True # Some models require spectra to compute certain line emission # contributions, if we have now spectra warn the user if not self._do_lnu_spectra: warn( "Some generation emission models may require spectra to " "in addition to lines. Currently no spectra will be " "generated. This could be fine depending on the " "emission model but if not call get_spectra before running " "the pipeline." ) # Flag that we will want to write out the emission lines (calling the # get_lines method is considered the intent to write it out # by default) self._write_lines = write or self._write_lines # Store the line IDs, we'll write these once later self.line_ids = line_ids
def _get_lines(self, galaxy): """Generate the emission lines for the galaxies. This function will generate the emission lines for all spectra types that were saved when spectra were generated. Args: galaxy (Galaxy): The galaxy to generate the emission lines for. """ start = time.perf_counter() # Get the operation kwargs for this operation op_kwargs = self._operation_kwargs.get_unique_kwargs("get_lines") # Loop over the galaxies and get the spectra galaxy.get_lines( line_ids=op_kwargs["line_ids"], emission_model=self.emission_model, nthreads=self.nthreads, ) # Store the line wavelengths for writing, we only do this once since # they are the same for all galaxies but we need to find them first # (either on the galaxy, stars, or black holes) if self.line_lams is None: # Get the line lams from the first line collection we can find for lines in galaxy.lines.values(): self.line_lams = lines.lam break if self.line_lams is None and galaxy.stars is not None: for lines in galaxy.stars.lines.values(): self.line_lams = lines.lam break if self.line_lams is None and galaxy.black_holes is not None: for lines in galaxy.black_holes.lines.values(): self.line_lams = lines.lam break # Count the number of emission lines we have generated self._op_counts["Emission Line Luminosities"] += ( count_and_check_dict_recursive(galaxy.lines) ) if galaxy.stars is not None: self._op_counts["Emission Line Luminosities"] += ( count_and_check_dict_recursive(galaxy.stars.lines) ) if galaxy.black_holes is not None: self._op_counts["Emission Line Luminosities"] += ( count_and_check_dict_recursive(galaxy.black_holes.lines) ) # Record the time taken self._op_timing["Emission Line Luminosities"] += ( time.perf_counter() - start )
[docs] def get_observed_lines( self, cosmo, igm=None, line_ids=None, write=True, ): """Flag that the Pipeline should compute the observed emission lines. This will signal the Pipeline to compute the observed emission lines for each galaxy when the run method is called. The observed emission lines are generated based on the emission lines and the cosmology. Args: cosmo (astropy.cosmology.Cosmology): The cosmology to use for the observed emission lines. igm (IGMBase): The IGM model to use for the observed emission lines. line_ids (list): If get_lines has not been called explicitly, then we will need the line IDs to generate the emission lines. Default is None. write (bool): Whether to write out the observed emission lines. Default is True. """ # Store the kwargs for the operation self._operation_kwargs.add_unique( "get_observed_lines", cosmo=cosmo, igm=igm, ) # Flag that we will compute the observed emission lines self._do_flux_lines = True # Ensure we have line IDs if we need to compute the emission lines and # get_lines has not been called if "get_lines" not in self._operation_kwargs and line_ids is None: raise exceptions.PipelineNotReady( "Cannot generate observed lines without line IDs, " "please pass a list of line IDs to the line_ids argument of " "get_observed_lines." ) self._operation_kwargs.add_unique( "get_lines", line_ids=line_ids, ) # Flag that we will want to write out the observed emission lines # (calling the get_observed_lines method is considered the intent to # write it out by default) self._write_flux_lines = write or self._write_flux_lines # We need to ensure the emission lines are computed first # NOTE: this is safe if the user has already called get_lines, it # will just leave the flag as True and respect the original intent to # write or not write self.get_lines(line_ids=line_ids, write=False)
def _get_observed_lines(self, galaxy): """Compute the observed emission lines for each galaxy. Args: galaxy (Galaxy): The galaxy to compute the observed emission lines for. """ start = time.perf_counter() # Get the operation kwargs for this operation op_kwargs = self._operation_kwargs.get_unique_kwargs( "get_observed_lines" ) # Get the observed emission lines galaxy.get_observed_lines( cosmo=op_kwargs["cosmo"], igm=op_kwargs["igm"], ) # Store the observed line wavelengths for writing, we only do this once # since they are the same for all galaxies but we need to find them # first (either on the galaxy, stars, or black holes) if self.line_obs_lams is None: # Get the line lams from the first line collection we can find for lines in galaxy.lines.values(): self.line_obs_lams = lines.obslam break if self.line_obs_lams is None and galaxy.stars is not None: for lines in galaxy.stars.lines.values(): self.line_obs_lams = lines.obslam break if self.line_obs_lams is None and galaxy.black_holes is not None: for lines in galaxy.black_holes.lines.values(): self.line_obs_lams = lines.obslam break # Count the number of observed emission lines we have generated self._op_counts["Emission Line Fluxes"] += ( count_and_check_dict_recursive(galaxy.lines) ) if galaxy.stars is not None: self._op_counts["Emission Line Fluxes"] += ( count_and_check_dict_recursive(galaxy.stars.lines) ) if galaxy.black_holes is not None: self._op_counts["Emission Line Fluxes"] += ( count_and_check_dict_recursive(galaxy.black_holes.lines) ) # Record the time taken self._op_timing["Emission Line Fluxes"] += time.perf_counter() - start
[docs] def get_images_luminosity( self, *instruments, fov=None, img_type="smoothed", kernel=None, kernel_threshold=1.0, labels=None, psf_resample_factor=1, cosmo=None, write=True, ): """Flag that the Pipeline should compute the luminosity images. This will signal the Pipeline to compute the luminosity images for each galaxy when the run method is called. The luminosity images are generated based on the luminosities, and in turn the lnu spectra, and the instrument filters. Args: instruments (Instrument/InstrumentCollection): The instruments to use for the luminosity images. This can be any number of instruments or instrument collections, they will all be combined into a single InstrumentCollection for this operation. fov (unyt_quantity): The field of view of the image with units. img_type (str): The type of image to generate. Options are 'smoothed' or 'hist'. Default is 'smoothed'. kernel (array-like): The kernel to use for smoothing the image. Default is None. Required for 'smoothed' images from a particle distribution. kernel_threshold (float): The threshold of the kernel. Default is 1.0. labels (list/str): The type of spectra to generate images for. By default this is None and all saved spectra types will be used. This can either be a list of strings or a single string. psf_resample_factor (int): (Only applicable for instruments with a PSF.) The resample factor for the PSF. This should be a value greater than 1. The image will be resampled by this factor before the PSF is applied and then downsampled back to the original after convolution. This can help minimize the effects of using a generic PSF centred on the galaxy centre, a simplification we make for performance reasons (the effects are sufficiently small that this simplifications is justified). cosmo (astropy.cosmology.Cosmology): The cosmology to use for the calculation of the luminosity distance. Only needed for internal conversions from cartesian to angular coordinates when an angular resolution is used. Default is None. write (bool): Whether to write out the luminosity images. Default is True. """ # If we have no labels then use all saved models if labels is None: warn( "No labels were passed to get_images_luminosity. We will " f"generate images for: {self.emission_model.saved_labels}. " "This could be very expensive depending on the number of " " models and image sizes." ) labels = self.emission_model.saved_labels # Flag that we will compute the luminosity images self._do_images_lum = True # Ensure we have a field of view if we need to compute the images if fov is None: raise exceptions.InconsistentArguments( "Cannot generate images without a field of view, please pass " "one to the fov argument of get_images_luminosity." ) # Check that we have instruments to compute the images for if len(instruments) == 0: raise exceptions.PipelineNotReady( "Cannot generate images without instruments with filters! " "Pass instruments to the get_images_luminosity method." ) # Unpack and attach instruments to the pipeline _instruments = self._add_instruments(instruments) # Check that the instruments can do imaging for inst in _instruments: if not inst.can_do_imaging: raise exceptions.PipelineNotReady( f"Cannot generate images with {inst.label}!" ) # Store the arguments for the operation self._operation_kwargs.add( labels, "get_images_luminosity", instruments=_instruments, fov=fov, img_type=img_type, kernel=kernel, kernel_threshold=kernel_threshold, psf_resample_factor=psf_resample_factor, cosmo=cosmo, ) # Validate noise attribute units for luminosity images validate_noise_unit_compatibility(_instruments, "erg/s/Hz") # Based on the instruments we have and the write argument, set the # appropriate writing flags if write: self._write_images_lum = True for inst in _instruments: if inst.can_do_psf_imaging: self._write_images_lum_psf = True if inst.can_do_noisy_imaging: self._write_images_lum_noise = True # We need to ensure the photometric luminosities are computed first # NOTE: this is safe if the user has already called # get_photometry_luminosities, it will just leave the flag as True and # respect the original intent to write or not write self.get_photometry_luminosities( *instruments, labels=labels, write=False, )
def _get_images_luminosity(self, galaxy): """Compute the luminosity images for the galaxies. This function will compute the luminosity images for all spectra types that were saved when spectra were generated, in all filters included in the Pipeline instruments. Args: galaxy (Galaxy): The galaxy to generate the luminosity images for. """ start = time.perf_counter() # We want to time PSF application and noise application separately psf_time = 0 noise_time = 0 # Loop over all the queued operation configurations for model_label, op_kwargs in self._operation_kwargs[ "get_images_luminosity" ]: # Get the instruments for this operation instruments = op_kwargs["instruments"] # Loop over instruments and perform any imaging they define for inst in instruments: # Get the basic images for the requested spectra types galaxy.get_images_luminosity( *model_label, fov=op_kwargs["fov"], img_type=op_kwargs["img_type"], kernel=op_kwargs["kernel"], kernel_threshold=op_kwargs["kernel_threshold"], nthreads=self.nthreads, instrument=inst, cosmo=op_kwargs["cosmo"], ) # Apply the PSF if applicable to the instrument if inst.can_do_psf_imaging: psf_start = time.perf_counter() galaxy.apply_psf_to_images_lnu( instrument=inst, psf_resample_factor=op_kwargs["psf_resample_factor"], limit_to=model_label, ) psf_time += time.perf_counter() - psf_start # Apply the instrument noise if applicable to the instrument if inst.can_do_noisy_imaging: noise_start = time.perf_counter() galaxy.apply_noise_to_images_lnu( instrument=inst, limit_to=model_label, apply_to_psf=inst.can_do_psf_imaging, ) noise_time += time.perf_counter() - noise_start # Count the number of images we have generated self._op_counts["Luminosity Images"] += count_and_check_dict_recursive( galaxy.images_lnu ) self._op_counts["Luminosity Images (With PSF)"] += ( count_and_check_dict_recursive(galaxy.images_psf_lnu) ) self._op_counts["Luminosity Images (With Noise)"] += ( count_and_check_dict_recursive(galaxy.images_noise_lnu) ) if galaxy.stars is not None: self._op_counts["Luminosity Images"] += ( count_and_check_dict_recursive(galaxy.stars.images_lnu) ) self._op_counts["Luminosity Images (With PSF)"] += ( count_and_check_dict_recursive(galaxy.stars.images_psf_lnu) ) self._op_counts["Luminosity Images (With Noise)"] += ( count_and_check_dict_recursive(galaxy.stars.images_noise_lnu) ) if galaxy.black_holes is not None: self._op_counts["Luminosity Images"] += ( count_and_check_dict_recursive(galaxy.black_holes.images_lnu) ) self._op_counts["Luminosity Images (With PSF)"] += ( count_and_check_dict_recursive( galaxy.black_holes.images_psf_lnu ) ) self._op_counts["Luminosity Images (With Noise)"] += ( count_and_check_dict_recursive( galaxy.black_holes.images_noise_lnu ) ) # Record the time taken self._op_timing["Luminosity Images"] += ( time.perf_counter() - start - psf_time - noise_time ) self._op_timing["Luminosity Images (With PSF)"] += psf_time self._op_timing["Luminosity Images (With Noise)"] += noise_time
[docs] def get_images_flux( self, *instruments, fov=None, img_type="smoothed", kernel=None, kernel_threshold=1.0, cosmo=None, igm=None, labels=None, psf_resample_factor=1, write=True, ): """Flag that the Pipeline should compute the flux images. This will signal the Pipeline to compute the flux images for each galaxy when the run method is called. The flux images are generated based on the fluxes, and in turn the fnu spectra, and the instrument filters. Args: instruments (Instrument/InstrumentCollection): The instruments to use for the flux images. This can be any number of instruments or instrument collections, they will all be combined into a single InstrumentCollection for this operation. fov (unyt_quantity): The field of view of the image with units. img_type (str): The type of image to generate. Options are 'smoothed' or 'hist'. Default is 'smoothed'. kernel (array-like): The kernel to use for smoothing the image. Default is None. Required for 'smoothed' images from a particle distribution. kernel_threshold (float): The threshold of the kernel. Default is 1.0. cosmo (astropy.cosmology.Cosmology): If get_spectra_observed has not been called explicitly, then we will need the cosmology to compute the observed spectra first. Default is None. igm (IGMBase): If get_spectra_observed has not been called explicitly, then we will need the IGM model to compute the observed spectra first. Unlike the cosmology, this is not required if IGM attenuation is not needed. Default is None. labels (list/str): The type of spectra to generate images for. By default this is None and all saved spectra types will be used. This can either be a list of strings or a single string. psf_resample_factor (int): (Only applicable for instruments with a PSF.) The resample factor for the PSF. This should be a value greater than 1. The image will be resampled by this factor before the PSF is applied and then downsampled back to the original after convolution. This can help minimize the effects of using a generic PSF centred on the galaxy centre, a simplification we make for performance reasons (the effects are sufficiently small that this simplifications is justified). write (bool): Whether to write out the flux images. Default is True. """ # If we have no labels then use all saved models if labels is None: warn( "No labels were passed to get_images_flux. We will generate " f"images for: {self.emission_model.saved_labels}. This could " "be very expensive depending on the number of models and " "image sizes." ) labels = self.emission_model.saved_labels # Flag that we will compute the flux images self._do_images_flux = True # Ensure we have a field of view if we need to compute the images if fov is None: raise exceptions.InconsistentArguments( "Cannot generate images without a field of view, please " "pass one to the fov argument of get_images_flux." ) # Ensure we have a cosmology if we need to compute the observed spectra # and get_spectra_observed has not been called if ( not self._operation_kwargs.has("get_observed_spectra") and cosmo is None ): raise exceptions.PipelineNotReady( "Cannot generate flux images without an astropy.cosmology" " object, please pass one to the cosmo argument of " "get_images_flux." ) # Check that we have instruments to compute the images for if len(instruments) == 0: raise exceptions.PipelineNotReady( "Cannot generate images without instruments with filters! " "Pass instruments to the get_images_flux method." ) # Unpack and attach instruments to the pipeline _instruments = self._add_instruments(instruments) # Check that the instruments can do imaging for inst in _instruments: if not inst.can_do_imaging: raise exceptions.PipelineNotReady( f"Cannot generate images with {inst.label}!" ) # Store the arguments for the operation self._operation_kwargs.add( labels, "get_images_flux", instruments=_instruments, fov=fov, img_type=img_type, kernel=kernel, kernel_threshold=kernel_threshold, psf_resample_factor=psf_resample_factor, cosmo=cosmo, ) # Validate noise attribute units for flux images validate_noise_unit_compatibility(_instruments, "nJy") # Based on the instruments we have and the write argument, set the # appropriate writing flags if write: self._write_images_flux = True for inst in _instruments: if inst.can_do_psf_imaging: self._write_images_flux_psf = True if inst.can_do_noisy_imaging: self._write_images_flux_noise = True # We need to ensure the photometric fluxes are computed first # NOTE: this is safe if the user has already called # get_photometry_fluxes, it will just leave the flag as True and # respect the original intent to write or not write self.get_photometry_fluxes( *instruments, labels=labels, cosmo=cosmo, igm=igm, write=False, )
def _get_images_flux(self, galaxy): """Compute the flux images for the galaxies. This function will compute the flux images for all spectra types that were saved when spectra were generated, in all filters included in the Pipeline instruments. Args: galaxy (Galaxy): The galaxy to generate the flux images for (all components). """ start = time.perf_counter() # We want to time PSF application and noise application separately psf_time = 0 noise_time = 0 # Loop over all the queued operation configurations for model_label, op_kwargs in self._operation_kwargs[ "get_images_flux" ]: # Get the instruments for this operation instruments = op_kwargs["instruments"] # Loop over instruments and perform any imaging they define for inst in instruments: # Get the basic images for the requested spectra types galaxy.get_images_flux( *model_label, fov=op_kwargs["fov"], img_type=op_kwargs["img_type"], kernel=op_kwargs["kernel"], kernel_threshold=op_kwargs["kernel_threshold"], cosmo=op_kwargs.get("cosmo", None), nthreads=self.nthreads, instrument=inst, ) # Apply the PSF if applicable to the instrument if inst.can_do_psf_imaging: psf_start = time.perf_counter() galaxy.apply_psf_to_images_fnu( instrument=inst, psf_resample_factor=op_kwargs["psf_resample_factor"], limit_to=model_label, ) psf_time += time.perf_counter() - psf_start # Apply the instrument noise if applicable to the instrument if inst.can_do_noisy_imaging: noise_start = time.perf_counter() galaxy.apply_noise_to_images_fnu( instrument=inst, limit_to=model_label, apply_to_psf=inst.can_do_psf_imaging, ) noise_time += time.perf_counter() - noise_start # Count the number of images we have generated self._op_counts["Flux Images"] += count_and_check_dict_recursive( galaxy.images_fnu ) self._op_counts["Flux Images (With PSF)"] += ( count_and_check_dict_recursive(galaxy.images_psf_fnu) ) self._op_counts["Flux Images (With Noise)"] += ( count_and_check_dict_recursive(galaxy.images_noise_fnu) ) if galaxy.stars is not None: self._op_counts["Flux Images"] += count_and_check_dict_recursive( galaxy.stars.images_fnu ) self._op_counts["Flux Images (With PSF)"] += ( count_and_check_dict_recursive(galaxy.stars.images_psf_fnu) ) self._op_counts["Flux Images (With Noise)"] += ( count_and_check_dict_recursive(galaxy.stars.images_noise_fnu) ) if galaxy.black_holes is not None: self._op_counts["Flux Images"] += count_and_check_dict_recursive( galaxy.black_holes.images_fnu ) self._op_counts["Flux Images (With PSF)"] += ( count_and_check_dict_recursive( galaxy.black_holes.images_psf_fnu ) ) self._op_counts["Flux Images (With Noise)"] += ( count_and_check_dict_recursive( galaxy.black_holes.images_noise_fnu ) ) # Record the time taken self._op_timing["Flux Images"] += ( time.perf_counter() - start - psf_time - noise_time ) self._op_timing["Flux Images (With PSF)"] += psf_time self._op_timing["Flux Images (With Noise)"] += noise_time
[docs] def get_data_cubes_lnu( self, *instruments, fov, cube_type="smoothed", kernel=None, kernel_threshold=1.0, cosmo=None, labels=None, write=True, ): """Flag that the Pipeline should compute luminosity data cubes. This will signal the Pipeline to compute the spectral luminosity density data cubes for each galaxy when the run method is called. Args: instruments (Instrument/InstrumentCollection): The instruments to use for the luminosity data cubes. This can be any number of instruments or instrument collections, they will all be combined into a single InstrumentCollection for this operation. The resolution and wavelength array will be taken from each instrument. fov (unyt_quantity): The width of the data cube in image coordinates. cube_type (str): The type of data cube to make. Either "smoothed" to smooth particle spectra over a kernel or "hist" to sort particle spectra into individual spaxels. Default is "smoothed". kernel (array-like): The kernel to use for smoothing. Default is None. Required for 'smoothed' cubes from a particle distribution. kernel_threshold (float): The threshold of the kernel. Default is 1.0. cosmo (astropy.cosmology.Cosmology): The cosmology to use for unit conversions between angular and Cartesian coordinates. Only needed when resolution and fov have different unit systems. Default is None. labels (list/str, optional): The type of spectra to generate data cubes for. By default this is None and all saved spectra types will be used. This can either be a list of strings or a single string. write (bool): Whether to write out the luminosity data cubes. Default is True. """ # If we have no labels then use all saved models if labels is None: warn( "No labels were passed to get_data_cubes_lnu. We will " f"generate data cubes for: {self.emission_model.saved_labels}." " This could be very expensive depending on the number of " "models and cube sizes." ) labels = self.emission_model.saved_labels # Flag that we will compute the lnu data cubes self._do_lnu_data_cubes = True # Flag that we will want to write out the lnu data cubes self._write_lnu_data_cubes = write or self._write_lnu_data_cubes # Check that we have instruments to compute the data cubes for if len(instruments) == 0: raise exceptions.PipelineNotReady( "Cannot generate data cubes without instruments! " "Pass instruments to the get_data_cubes_lnu method." ) # Unpack and attach instruments to the pipeline _instruments = self._add_instruments(instruments) # Check that the instruments can do resolved spectroscopy for inst in _instruments: if not inst.can_do_resolved_spectroscopy: raise exceptions.PipelineNotReady( f"Cannot generate data cubes with {inst.label}! " "Instruments must have both resolution and lam defined." ) # Ensure we have a field of view if fov is None: raise exceptions.InconsistentArguments( "Cannot generate data cubes without a field of view, " "please pass one to the fov argument." ) # Validate kernel for smoothed data cubes if cube_type == "smoothed" and kernel is None: raise exceptions.InconsistentArguments( "Cannot generate smoothed data cubes without a kernel! " "Please pass a kernel to the get_data_cubes_lnu method. " "Example: from synthesizer.kernel_functions import Kernel; " "kernel = Kernel().get_kernel(). Available kernel names: " "'uniform', 'sph_anarchy', 'gadget_2', 'cubic', 'quintic'. " "Alternatively, use cube_type='hist' for histogram-based " "data cubes." ) # Add the instruments to the operation kwargs self._operation_kwargs.add( labels if labels is not None else NO_MODEL_LABEL, "get_data_cubes_lnu", instruments=_instruments, fov=fov, cube_type=cube_type, kernel=kernel, kernel_threshold=kernel_threshold, cosmo=cosmo, ) # We need to ensure the lnu spectra are computed first # NOTE: this is safe if the user has already called # get_spectra, it will just leave the flag as True and # respect the original intent to write or not write self.get_spectra(write=False)
def _get_data_cubes_lnu(self, galaxy): """Compute the luminosity data cubes for the galaxy. Args: galaxy (Galaxy): The galaxy to generate data cubes for. """ start = time.perf_counter() # Iterate over all queued operation configurations for model_label, op_kwargs in self._operation_kwargs[ "get_data_cubes_lnu" ]: # Get the instruments for this operation instruments = op_kwargs["instruments"] # Get the data cube for these labels # Note: model_label can be a list of labels if not isinstance(model_label, (list, tuple)): model_label = [model_label] # Loop over instruments for inst in instruments: # Loop over each label and create a data cube for label in model_label: # Determine which component this label belongs to # and call get_data_cube appropriately cube = galaxy.get_data_cube( resolution=inst.resolution, fov=op_kwargs["fov"], lam=inst.lam, cube_type=op_kwargs.get("cube_type", "smoothed"), stellar_spectra=label if label in getattr(galaxy.stars, "spectra", {}) or label in getattr(galaxy.stars, "particle_spectra", {}) else None, blackhole_spectra=label if label in getattr(galaxy.black_holes, "spectra", {}) or label in getattr(galaxy.black_holes, "particle_spectra", {}) else None, kernel=op_kwargs.get("kernel"), kernel_threshold=op_kwargs.get( "kernel_threshold", 1.0 ), quantity="lnu", cosmo=op_kwargs.get("cosmo"), nthreads=self.nthreads, ) # Store the cube (we'll need to add a data_cubes_lnu # attribute to galaxies) if not hasattr(galaxy, "data_cubes_lnu"): galaxy.data_cubes_lnu = {} # Use instrument label in the key to differentiate # between data cubes from different instruments key = f"{label}_{inst.label}" galaxy.data_cubes_lnu[key] = cube # Count the number of data cubes we have generated if hasattr(galaxy, "data_cubes_lnu"): self._op_counts["Lnu Data Cubes"] += len(galaxy.data_cubes_lnu) # Record the time taken self._op_timing["Lnu Data Cubes"] += time.perf_counter() - start
[docs] def get_data_cubes_fnu( self, *instruments, fov, cube_type="smoothed", kernel=None, kernel_threshold=1.0, cosmo=None, igm=None, labels=None, write=True, ): """Flag that the Pipeline should compute flux density data cubes. This will signal the Pipeline to compute the spectral flux density data cubes for each galaxy when the run method is called. Args: instruments (Instrument/InstrumentCollection): The instruments to use for the flux density data cubes. This can be any number of instruments or instrument collections, they will all be combined into a single InstrumentCollection for this operation. The resolution and wavelength array will be taken from each instrument. fov (unyt_quantity): The width of the data cube in image coordinates. cube_type (str): The type of data cube to make. Either "smoothed" to smooth particle spectra over a kernel or "hist" to sort particle spectra into individual spaxels. Default is "smoothed". kernel (array-like): The kernel to use for smoothing. Default is None. Required for 'smoothed' cubes from a particle distribution. kernel_threshold (float): The threshold of the kernel. Default is 1.0. cosmo (astropy.cosmology): The cosmology object to use for the redshift conversion. Default is None. igm (str): The IGM model to use for the attenuation. Default is None, meaning no IGM attenuation is applied. labels (list/str, optional): The type of spectra to generate data cubes for. By default this is None and all saved spectra types will be used. This can either be a list of strings or a single string. write (bool): Whether to write out the flux density data cubes. Default is True. """ # If we have no labels then use all saved models if labels is None: warn( "No labels were passed to get_data_cubes_fnu. We will " f"generate data cubes for: {self.emission_model.saved_labels}." " This could be very expensive depending on the number of " "models and cube sizes." ) labels = self.emission_model.saved_labels # Flag that we will compute the fnu data cubes self._do_fnu_data_cubes = True # Flag that we will want to write out the fnu data cubes self._write_fnu_data_cubes = write or self._write_fnu_data_cubes # Check that we have instruments to compute the data cubes for if len(instruments) == 0: raise exceptions.PipelineNotReady( "Cannot generate data cubes without instruments! " "Pass instruments to the get_data_cubes_fnu method." ) # Unpack and attach instruments to the pipeline _instruments = self._add_instruments(instruments) # Check that the instruments can do resolved spectroscopy for inst in _instruments: if not inst.can_do_resolved_spectroscopy: raise exceptions.PipelineNotReady( f"Cannot generate data cubes with {inst.label}! " "Instruments must have both resolution and lam defined." ) # Ensure we have a field of view if fov is None: raise exceptions.InconsistentArguments( "Cannot generate data cubes without a field of view, " "please pass one to the fov argument." ) # Validate kernel for smoothed data cubes if cube_type == "smoothed" and kernel is None: raise exceptions.InconsistentArguments( "Cannot generate smoothed data cubes without a kernel! " "Please pass a kernel to the get_data_cubes_fnu method. " "Example: from synthesizer.kernel_functions import Kernel; " "kernel = Kernel().get_kernel(). Available kernel names: " "'uniform', 'sph_anarchy', 'gadget_2', 'cubic', 'quintic'. " "Alternatively, use cube_type='hist' for histogram-based " "data cubes." ) # Add the instruments to the operation kwargs self._operation_kwargs.add( labels if labels is not None else NO_MODEL_LABEL, "get_data_cubes_fnu", instruments=_instruments, fov=fov, cube_type=cube_type, kernel=kernel, kernel_threshold=kernel_threshold, cosmo=cosmo, igm=igm, ) # We need to ensure the observed spectra are computed first # NOTE: this is safe if the user has already called # get_observed_spectra, it will just leave the flag as True and # respect the original intent to write or not write self.get_observed_spectra(write=False, cosmo=cosmo, igm=igm)
def _get_data_cubes_fnu(self, galaxy): """Compute the flux density data cubes for the galaxy. Args: galaxy (Galaxy): The galaxy to generate data cubes for. """ start = time.perf_counter() # Iterate over all queued operation configurations for model_label, op_kwargs in self._operation_kwargs[ "get_data_cubes_fnu" ]: # Get the instruments for this operation instruments = op_kwargs["instruments"] # Get the data cube for these labels # Note: model_label can be a list of labels if not isinstance(model_label, (list, tuple)): model_label = [model_label] # Loop over instruments for inst in instruments: # Loop over each label and create a data cube for label in model_label: # Determine which component this label belongs to # and call get_data_cube appropriately cube = galaxy.get_data_cube( resolution=inst.resolution, fov=op_kwargs["fov"], lam=inst.lam, cube_type=op_kwargs.get("cube_type", "smoothed"), stellar_spectra=label if label in getattr(galaxy.stars, "spectra", {}) or label in getattr(galaxy.stars, "particle_spectra", {}) else None, blackhole_spectra=label if label in getattr(galaxy.black_holes, "spectra", {}) or label in getattr(galaxy.black_holes, "particle_spectra", {}) else None, kernel=op_kwargs.get("kernel"), kernel_threshold=op_kwargs.get( "kernel_threshold", 1.0 ), quantity="fnu", cosmo=op_kwargs.get("cosmo"), nthreads=self.nthreads, ) # Store the cube if not hasattr(galaxy, "data_cubes_fnu"): galaxy.data_cubes_fnu = {} # Use instrument label in the key to differentiate # between data cubes from different instruments key = f"{label}_{inst.label}" galaxy.data_cubes_fnu[key] = cube # Count the number of data cubes we have generated if hasattr(galaxy, "data_cubes_fnu"): self._op_counts["Fnu Data Cubes"] += len(galaxy.data_cubes_fnu) # Record the time taken self._op_timing["Fnu Data Cubes"] += time.perf_counter() - start
[docs] def get_spectroscopy_lnu(self, *instruments, labels=None, write=True): """Flag that the Pipeline should compute spectral luminosity density. This will signal the Pipeline to compute the spectral luminosity density for each galaxy when the run method is called. Args: instruments (Instrument/InstrumentCollection): The instruments to use for the spectral luminosity density. This can be any number of instruments or instrument collections, they will all be combined into a single InstrumentCollection for this operation. labels (list/str, optional): The type of spectra to generate spectroscopy for. By default this is None and all saved spectra types will be used. This can either be a list of strings or a single string. write (bool): Whether to write out the spectral luminosity density. Default is True. """ # If we have no labels then use all saved models if labels is None: labels = self.emission_model.saved_labels # Flag that we will compute the spectral luminosity density self._do_spectroscopy_lnu = True # Flag that we will want to write out the spectral luminosity density # (calling the get_spectroscopy_lnu method is considered the intent # to write it out by default) self._write_spectroscopy_lnu = write or self._write_spectroscopy_lnu # Check that we have instruments to compute the spectroscopy for if len(instruments) == 0: raise exceptions.PipelineNotReady( "Cannot generate spectroscopy without instruments with " "spectroscopy! Pass instruments to the get_spectroscopy_lnu " "method." ) # Unpack and attach instruments to the pipeline _instruments = self._add_instruments(instruments) # Check that the instruments can do spectroscopy for inst in _instruments: if not inst.can_do_spectroscopy: raise exceptions.PipelineNotReady( f"Cannot generate spectroscopy with {inst.label}!" ) # Store the arguments for the operation self._operation_kwargs.add( labels, "get_spectroscopy_lnu", instruments=_instruments, ) # We need to ensure the lnu spectra are computed first # NOTE: this is safe if the user has already called # get_spectra, it will just leave the flag as True and # respect the original intent to write or not write self.get_spectra(write=False)
def _get_spectroscopy_lnu(self, galaxy): """Compute the spectral luminosity density for the galaxy. This function will compute the spectral luminosity density for all spectra stored on the galaxy and its components. Args: galaxy (Galaxy): The galaxy to generate the spectroscopy for. """ start = time.perf_counter() # Iterate over all queued operation configurations for model_label, op_kwargs in self._operation_kwargs[ "get_spectroscopy_lnu" ]: # Get the instruments for this operation instruments = op_kwargs["instruments"] # Loop over instruments for inst in instruments: # Get the spectroscopy for this instrument galaxy.get_spectroscopy(inst, limit_to=model_label) # Count the number of spectroscopy items we have generated self._op_counts["Spectroscopy Lnu"] += count_and_check_dict_recursive( galaxy.spectroscopy ) if galaxy.stars is not None: self._op_counts["Spectroscopy Lnu"] += ( count_and_check_dict_recursive(galaxy.stars.spectroscopy) ) if galaxy.black_holes is not None: self._op_counts["Spectroscopy Lnu"] += ( count_and_check_dict_recursive(galaxy.black_holes.spectroscopy) ) # Record the time taken self._op_timing["Spectroscopy Lnu"] += time.perf_counter() - start
[docs] def get_spectroscopy_fnu( self, *instruments, labels=None, cosmo=None, igm=None, write=True, ): """Flag that the Pipeline should compute the spectral flux density. This will signal the Pipeline to compute the spectral flux density for each galaxy when the run method is called. Args: instruments (Instrument/InstrumentCollection): The instruments to use for the spectral flux density. This can be any number of instruments or instrument collections, they will all be combined into a single InstrumentCollection for this operation. labels (list/str, optional): The type of spectra to generate spectroscopy for. By default this is None and all saved spectra types will be used. This can either be a list of strings or a single string. cosmo (astropy.cosmology): The cosmology object to use for the redshift conversion. Default is None. igm (str): The IGM model to use for the attenuation. Default is None, meaning no IGM attenuation is applied. write (bool): Whether to write out the spectral flux density. Default is True. """ # If we have no labels then use all saved models if labels is None: labels = self.emission_model.saved_labels # Flag that we will compute the spectral flux density self._do_spectroscopy_fnu = True # Flag that we will want to write out the spectral flux density # (calling the get_spectroscopy_fnu method is considered the intent # to write it out by default) self._write_spectroscopy_fnu = write or self._write_spectroscopy_fnu # Check that we have instruments to compute the spectroscopy for if len(instruments) == 0: raise exceptions.PipelineNotReady( "Cannot generate spectroscopy without instruments with " "spectroscopy! Pass instruments to the get_spectroscopy_fnu " "method." ) # Unpack and attach instruments to the pipeline _instruments = self._add_instruments(instruments) # Check that the instruments can do spectroscopy for inst in _instruments: if not inst.can_do_spectroscopy: raise exceptions.PipelineNotReady( f"Cannot generate spectroscopy with {inst.label}!" ) # Store the arguments for the operation self._operation_kwargs.add( labels, "get_spectroscopy_fnu", instruments=_instruments, ) # We need to ensure the fnu spectra are computed first # NOTE: this is safe if the user has already called # get_observed_spectra, it will just leave the flag as True and # respect the original intent to write or not write self.get_observed_spectra(write=False, cosmo=cosmo, igm=igm)
def _get_spectroscopy_fnu(self, galaxy): """Compute the spectral flux density for the galaxy. This function will compute the spectral flux density for all spectra stored on the galaxy and its components. Args: galaxy (Galaxy): The galaxy to generate the spectroscopy for. """ start = time.perf_counter() # Iterate over all queued operation configurations for model_label, op_kwargs in self._operation_kwargs[ "get_spectroscopy_fnu" ]: # Get the instruments for this operation instruments = op_kwargs["instruments"] # Loop over instruments for inst in instruments: # Get the spectroscopy for this instrument galaxy.get_spectroscopy(inst, limit_to=model_label) # Count the number of spectroscopy items we have generated self._op_counts["Spectroscopy Fnu"] += count_and_check_dict_recursive( galaxy.spectroscopy ) if galaxy.stars is not None: self._op_counts["Spectroscopy Fnu"] += ( count_and_check_dict_recursive(galaxy.stars.spectroscopy) ) if galaxy.black_holes is not None: self._op_counts["Spectroscopy Fnu"] += ( count_and_check_dict_recursive(galaxy.black_holes.spectroscopy) ) # Record the time taken self._op_timing["Spectroscopy Fnu"] += time.perf_counter() - start def _run_extra_analysis(self, galaxy): """Call any user provided analysis functions. We will call this last when running the pipeline. This ensures that all data generated by the pipeline exists before performing the user calculations. This is important since the user may want to use the data generated by the pipeline in their analysis functions. Args: galaxy (Galaxy): The galaxy to run the extra analysis functions on. """ start = time.perf_counter() # Nothing to do if we have no analysis functions if len(self._analysis_funcs) == 0: return # Create a temporary attribute on the galaxy to store the results # of the analysis functions galaxy._extra_analysis_results = {} # Loop over the analysis functions and call them for this galaxy for func, args, kwargs, key in zip( self._analysis_funcs, self._analysis_args, self._analysis_kwargs, self._analysis_results_keys, ): # Run the analysis function on this galaxy try: func_start = time.perf_counter() res = func(galaxy, *args, **kwargs) # Count the number of times we have run this function self._op_counts["Extra Analyses"] += 1 # Record the time taken self._op_timing[key] = ( self._op_timing.get(key, 0.0) + time.perf_counter() - func_start ) except Exception as e: raise type(e)( f"Error running extra analysis function {key}: {e}" ).with_traceback(e.__traceback__) # Store the result on the galaxy galaxy._extra_analysis_results[key] = res # Record the time taken self._op_timing["Extra Analyses"] += time.perf_counter() - start def _unpack_results(self, galaxy): """Unpack the results from the galaxy into the pipeline. This will extract all the calculated data from the galaxy and store it on the pipeline for writing out. All data not flagged to be saved will be cleared when the galaxy is garbage collected. Args: galaxy (Galaxy): The galaxy to unpack the results from. """ start = time.perf_counter() # Do we need to unpack the SFZH? if self._write_sfzh: if galaxy.stars is not None: self.sfzhs.append(galaxy.stars.sfzh) else: self.sfzhs.append(galaxy.sfzh) # Do we need to unpack the SFH? if self._write_sfh: if galaxy.stars is not None: self.sfhs.append(galaxy.stars.sfh) else: self.sfhs.append(galaxy.sfh) # Do we need to unpack the lnu spectra? if self._write_lnu_spectra: for spec_type, spec in galaxy.spectra.items(): self.lnu_spectra["Galaxy"].setdefault(spec_type, []).append( spec.lnu ) if galaxy.stars is not None: for spec_type, spec in galaxy.stars.spectra.items(): self.lnu_spectra["Stars"].setdefault(spec_type, []).append( spec.lnu ) if galaxy.black_holes is not None: for spec_type, spec in galaxy.black_holes.spectra.items(): self.lnu_spectra["BlackHole"].setdefault( spec_type, [] ).append(spec.lnu) # Do we need to unpack the fnu spectra? if self._write_fnu_spectra: for spec_type, spec in galaxy.spectra.items(): self.fnu_spectra["Galaxy"].setdefault(spec_type, []).append( spec.fnu ) if galaxy.stars is not None: for spec_type, spec in galaxy.stars.spectra.items(): self.fnu_spectra["Stars"].setdefault(spec_type, []).append( spec.fnu ) if galaxy.black_holes is not None: for spec_type, spec in galaxy.black_holes.spectra.items(): self.fnu_spectra["BlackHole"].setdefault( spec_type, [] ).append(spec.fnu) # Do we need to unpack the photometric luminosities? if self._write_luminosities: for spec_type, phot in galaxy.photo_lnu.items(): for filt, lnu in phot.items(): self.luminosities["Galaxy"].setdefault( spec_type, {} ).setdefault(filt, []).append(lnu) if galaxy.stars is not None: for spec_type, phot in galaxy.stars.photo_lnu.items(): for filt, lnu in phot.items(): self.luminosities["Stars"].setdefault( spec_type, {} ).setdefault(filt, []).append(lnu) if galaxy.black_holes is not None: for spec_type, phot in galaxy.black_holes.photo_lnu.items(): for filt, lnu in phot.items(): self.luminosities["BlackHole"].setdefault( spec_type, {} ).setdefault(filt, []).append(lnu) # Do we need to unpack the photometric fluxes? if self._write_fluxes: for spec_type, phot in galaxy.photo_fnu.items(): for filt, fnu in phot.items(): self.fluxes["Galaxy"].setdefault(spec_type, {}).setdefault( filt, [] ).append(fnu) if galaxy.stars is not None: for spec_type, phot in galaxy.stars.photo_fnu.items(): for filt, fnu in phot.items(): self.fluxes["Stars"].setdefault( spec_type, {} ).setdefault(filt, []).append(fnu) if galaxy.black_holes is not None: for spec_type, phot in galaxy.black_holes.photo_fnu.items(): for filt, fnu in phot.items(): self.fluxes["BlackHole"].setdefault( spec_type, {} ).setdefault(filt, []).append(fnu) # Do we need to unpack the emission lines? if self._write_lines: for spec_type, lines in galaxy.lines.items(): self.line_lums["Galaxy"].setdefault(spec_type, []).append( lines.luminosity ) self.line_cont_lums["Galaxy"].setdefault(spec_type, []).append( lines.continuum ) if galaxy.stars is not None: for spec_type, lines in galaxy.stars.lines.items(): self.line_lums["Stars"].setdefault(spec_type, []).append( lines.luminosity ) self.line_cont_lums["Stars"].setdefault( spec_type, [] ).append(lines.continuum) if galaxy.black_holes is not None: for spec_type, lines in galaxy.black_holes.lines.items(): self.line_lums["BlackHole"].setdefault( spec_type, [] ).append(lines.luminosity) self.line_cont_lums["BlackHole"].setdefault( spec_type, [] ).append(lines.continuum) # Do we need to unpack the observed emission lines? if self._write_flux_lines: for spec_type, lines in galaxy.lines.items(): self.line_fluxes["Galaxy"].setdefault(spec_type, []).append( lines.flux ) self.line_cont_fluxes["Galaxy"].setdefault( spec_type, [] ).append(lines.continuum) if galaxy.stars is not None: for spec_type, lines in galaxy.stars.lines.items(): self.line_fluxes["Stars"].setdefault(spec_type, []).append( lines.flux ) self.line_cont_fluxes["Stars"].setdefault( spec_type, [] ).append(lines.continuum) if galaxy.black_holes is not None: for spec_type, lines in galaxy.black_holes.lines.items(): self.line_fluxes["BlackHole"].setdefault( spec_type, [] ).append(lines.flux) self.line_cont_fluxes["BlackHole"].setdefault( spec_type, [] ).append(lines.continuum) # Do we need to unpack the luminosity images? if self._write_images_lum: for inst_label, d in galaxy.images_lnu.items(): for spec_type, imgs in d.items(): for f, img in imgs.items(): self.images_lum["Galaxy"].setdefault( inst_label, {} ).setdefault(spec_type, {}).setdefault(f, []).append( img.arr * img.units ) if galaxy.stars is not None: for inst_label, d in galaxy.stars.images_lnu.items(): for spec_type, imgs in d.items(): for f, img in imgs.items(): self.images_lum["Stars"].setdefault( inst_label, {} ).setdefault(spec_type, {}).setdefault( f, [] ).append(img.arr * img.units) if galaxy.black_holes is not None: for inst_label, d in galaxy.black_holes.images_lnu.items(): for spec_type, imgs in d.items(): for f, img in imgs.items(): self.images_lum["BlackHole"].setdefault( inst_label, {} ).setdefault(spec_type, {}).setdefault( f, [] ).append(img.arr * img.units) # Do we need to unpack the flux images? if self._write_images_flux: for inst_label, d in galaxy.images_fnu.items(): for spec_type, imgs in d.items(): for f, img in imgs.items(): self.images_flux["Galaxy"].setdefault( inst_label, {} ).setdefault(spec_type, {}).setdefault(f, []).append( img.arr * img.units ) if galaxy.stars is not None: for inst_label, d in galaxy.stars.images_fnu.items(): for spec_type, imgs in d.items(): for f, img in imgs.items(): self.images_flux["Stars"].setdefault( inst_label, {} ).setdefault(spec_type, {}).setdefault( f, [] ).append(img.arr * img.units) if galaxy.black_holes is not None: for inst_label, d in galaxy.black_holes.images_fnu.items(): for spec_type, imgs in d.items(): for f, img in imgs.items(): self.images_flux["BlackHole"].setdefault( inst_label, {} ).setdefault(spec_type, {}).setdefault( f, [] ).append(img.arr * img.units) # Do we need to unpack the luminosity images With PSFs? if self._write_images_lum_psf: for inst_label, d in galaxy.images_psf_lnu.items(): for spec_type, imgs in d.items(): for f, img in imgs.items(): self.images_lum_psf["Galaxy"].setdefault( inst_label, {} ).setdefault(spec_type, {}).setdefault(f, []).append( img.arr * img.units ) if galaxy.stars is not None: for inst_label, d in galaxy.stars.images_psf_lnu.items(): for spec_type, imgs in d.items(): for f, img in imgs.items(): self.images_lum_psf["Stars"].setdefault( inst_label, {} ).setdefault(spec_type, {}).setdefault( f, [] ).append(img.arr * img.units) if galaxy.black_holes is not None: for inst_label, d in galaxy.black_holes.images_psf_lnu.items(): for spec_type, imgs in d.items(): for f, img in imgs.items(): self.images_lum_psf["BlackHole"].setdefault( inst_label, {} ).setdefault(spec_type, {}).setdefault( f, [] ).append(img.arr * img.units) # Do we need to unpack the flux images With PSFs? if self._write_images_flux_psf: for inst_label, d in galaxy.images_psf_fnu.items(): for spec_type, imgs in d.items(): for f, img in imgs.items(): self.images_flux_psf["Galaxy"].setdefault( inst_label, {} ).setdefault(spec_type, {}).setdefault(f, []).append( img.arr * img.units ) if galaxy.stars is not None: for inst_label, d in galaxy.stars.images_psf_fnu.items(): for spec_type, imgs in d.items(): for f, img in imgs.items(): self.images_flux_psf["Stars"].setdefault( inst_label, {} ).setdefault(spec_type, {}).setdefault( f, [] ).append(img.arr * img.units) if galaxy.black_holes is not None: for inst_label, d in galaxy.black_holes.images_psf_fnu.items(): for spec_type, imgs in d.items(): for f, img in imgs.items(): self.images_flux_psf["BlackHole"].setdefault( inst_label, {} ).setdefault(spec_type, {}).setdefault( f, [] ).append(img.arr * img.units) # Do we need to unpack the luminosity images With Noise? if self._write_images_lum_noise: for inst_label, d in galaxy.images_noise_lnu.items(): for spec_type, imgs in d.items(): for f, img in imgs.items(): self.images_lum_noise["Galaxy"].setdefault( inst_label, {} ).setdefault(spec_type, {}).setdefault(f, []).append( img.arr * img.units ) if galaxy.stars is not None: for inst_label, d in galaxy.stars.images_noise_lnu.items(): for spec_type, imgs in d.items(): for f, img in imgs.items(): self.images_lum_noise["Stars"].setdefault( inst_label, {} ).setdefault(spec_type, {}).setdefault( f, [] ).append(img.arr * img.units) if galaxy.black_holes is not None: bhs = galaxy.black_holes for inst_label, d in bhs.images_noise_lnu.items(): for spec_type, imgs in d.items(): for f, img in imgs.items(): self.images_lum_noise["BlackHole"].setdefault( inst_label, {} ).setdefault(spec_type, {}).setdefault( f, [] ).append(img.arr * img.units) # Do we need to unpack the flux images With Noise? if self._write_images_flux_noise: for inst_label, d in galaxy.images_noise_fnu.items(): for spec_type, imgs in d.items(): for f, img in imgs.items(): self.images_flux_noise["Galaxy"].setdefault( inst_label, {} ).setdefault(spec_type, {}).setdefault(f, []).append( img.arr * img.units ) if galaxy.stars is not None: for inst_label, d in galaxy.stars.images_noise_fnu.items(): for spec_type, imgs in d.items(): for f, img in imgs.items(): self.images_flux_noise["Stars"].setdefault( inst_label, {} ).setdefault(spec_type, {}).setdefault( f, [] ).append(img.arr * img.units) if galaxy.black_holes is not None: bhs = galaxy.black_holes for inst_label, d in bhs.images_noise_fnu.items(): for spec_type, imgs in d.items(): for f, img in imgs.items(): self.images_flux_noise["BlackHole"].setdefault( inst_label, {} ).setdefault(spec_type, {}).setdefault( f, [] ).append(img.arr * img.units) # Do we need to unpack the extra analysis results? if hasattr(galaxy, "_extra_analysis_results"): for key, res in galaxy._extra_analysis_results.items(): self._analysis_results[key].append(res) # Done! self._op_timing["Unpacking results"] += time.perf_counter() - start
[docs] def run(self): """Run the pipeline. This will churn through the attached galaxies generating all the data requested using the get_* methods. Only data flagged for saving will be held in memory with all other data cleared out. Once the pipeline has run, the data can be written out to a file using the write method. Note that as we loop over galaxies they will be removed from the pipeline to free up memory. This means that once the pipeline has run the galaxies will no longer be accessible from the pipeline object. Raises: PipelineNotReady: If the pipeline is not ready to a specific operation. """ # Ensure we are ready if not self._loaded_galaxies: raise exceptions.PipelineNotReady( "Cannot generate spectra before galaxies are loaded! " "Call load_galaxies first." ) # Ensure we have at least one operation signalled signals = [ self._do_los_optical_depths, self._do_lnu_spectra, self._do_fnu_spectra, self._do_luminosities, self._do_fluxes, self._do_lum_lines, self._do_flux_lines, self._do_images_lum, self._do_images_flux, self._do_lnu_data_cubes, self._do_fnu_data_cubes, self._do_spectroscopy_lnu, self._do_spectroscopy_fnu, self._do_sfzh, self._do_sfh, ] if not any(signals): raise exceptions.PipelineNotReady( "Cannot run pipeline without any operations signalled! " "Use the get_* methods to signal operations." ) # Ok we are good to go! Report the last metadata and then get going if self.rank == 0: self._report_instruments() # Prewarm instruments before processing galaxies. self._prepare_instruments() # Print the header for the pipeline run to the console self._print_progress_header() # Loop over galaxies and compute what has been requested using get_* # signalling methods igal = 0 while len(self.galaxies) > 0: start_gal = time.perf_counter() # Pop the first galaxy from the list gal = self.galaxies.pop(0) # Are we generating LOS optical depths? # Note that this can only be done on galaxies prior to any # splitting done for memory optimisation if self._do_los_optical_depths: self._get_los_optical_depths(gal) # If we have a particle galaxy, see if we need to split it if isinstance(gal, ParticleGalaxy): gals = gal.split(self._max_npart) else: gals = [gal] # If chunking is active, clear any existing additive outputs on the # parent before processing children and accumulating back onto it # just to be sure, in case the galaxy has been used before if len(gals) > 1: clear_pipeline_outputs(gal) # Loop over the split galaxies list (which may just be a list # containing the original gal above) while len(gals) > 0: # Get the next galaxy _gal = gals.pop(0) # Are we generating SFZHs? if self._do_sfzh: self._get_sfzh(_gal) # Are we generating SFHs? if self._do_sfh: self._get_sfh(_gal) # Are we generating lnu spectra? if self._do_lnu_spectra: self._get_spectra(_gal) # Are we generating fnu spectra? if self._do_fnu_spectra: self._get_observed_spectra(_gal) # Are we generating photometric luminosities? if self._do_luminosities: self._get_photometry_luminosities(_gal) # Are we generating photometric fluxes? if self._do_fluxes: self._get_photometry_fluxes(_gal) # Are we generating emission lines? if self._do_lum_lines: self._get_lines(_gal) # Are we generating observed emission lines? if self._do_flux_lines: self._get_observed_lines(_gal) # Are we generating luminosity images? if self._do_images_lum: self._get_images_luminosity(_gal) # Are we generating flux images? if self._do_images_flux: self._get_images_flux(_gal) # Are we generating luminosity data cubes? if self._do_lnu_data_cubes: self._get_data_cubes_lnu(_gal) # Are we generating flux data cubes? if self._do_fnu_data_cubes: self._get_data_cubes_fnu(_gal) # Are we generating luminosity spectroscopy? if self._do_spectroscopy_lnu: self._get_spectroscopy_lnu(_gal) # Are we generating flux spectroscopy? if self._do_spectroscopy_fnu: self._get_spectroscopy_fnu(_gal) # Child chunk outputs are accumulated back onto the original # parent galaxy once this chunk has been processed. # Combine back onto the original galaxy if necessary if _gal is not gal: accumulate_pipeline_results_from_child(gal, _gal) del _gal # Run any extra analysis functions self._run_extra_analysis(gal) # Ok, we're done with this galaxy so we can unpack the results self._unpack_results(gal) # Output into the progress table self._add_progress_row(gal, igal, start_gal) igal += 1 # Now we can remove the galaxy to free up memory del gal # print the footer for the pipeline run to the console self._print_progress_footer() # We're done! Report the time taken self._report_total_timings() # Clean up the outputs self._clean_outputs() # Flag that we have run the pipeline self._analysis_complete = True
def _clean_outputs(self): """Clean up the lists attached to the pipeline. This prepares the results for writing out to a file. """ start = time.perf_counter() # Convert the lists of SFZHs to unyt arrays self.sfzhs = unyt_array(self.sfzhs) # Convert the lists of SFHs to unyt arrays self.sfhs = unyt_array(self.sfhs) # Convert the lists of spectra to unyt arrays for spec_type, spec in self.lnu_spectra["Galaxy"].items(): self.lnu_spectra["Galaxy"][spec_type] = unyt_array(spec) for spec_type, spec in self.lnu_spectra["Stars"].items(): self.lnu_spectra["Stars"][spec_type] = unyt_array(spec) for spec_type, spec in self.lnu_spectra["BlackHole"].items(): self.lnu_spectra["BlackHole"][spec_type] = unyt_array(spec) for spec_type, spec in self.fnu_spectra["Galaxy"].items(): self.fnu_spectra[spec_type] = unyt_array(spec) for spec_type, spec in self.fnu_spectra["Stars"].items(): self.fnu_spectra["Stars"][spec_type] = unyt_array(spec) for spec_type, spec in self.fnu_spectra["BlackHole"].items(): self.fnu_spectra["BlackHole"][spec_type] = unyt_array(spec) # Convert the lists of luminosities to unyt arrays for spec_type, phot in self.luminosities["Galaxy"].items(): for filt, lnu in phot.items(): self.luminosities["Galaxy"][spec_type][filt] = unyt_array(lnu) for spec_type, phot in self.luminosities["Stars"].items(): for filt, lnu in phot.items(): self.luminosities["Stars"][spec_type][filt] = unyt_array(lnu) for spec_type, phot in self.luminosities["BlackHole"].items(): for filt, lnu in phot.items(): self.luminosities["BlackHole"][spec_type][filt] = unyt_array( lnu ) # Convert the lists of fluxes to unyt arrays for spec_type, phot in self.fluxes["Galaxy"].items(): for filt, fnu in phot.items(): self.fluxes["Galaxy"][spec_type][filt] = unyt_array(fnu) for spec_type, phot in self.fluxes["Stars"].items(): for filt, fnu in phot.items(): self.fluxes["Stars"][spec_type][filt] = unyt_array(fnu) for spec_type, phot in self.fluxes["BlackHole"].items(): for filt, fnu in phot.items(): self.fluxes["BlackHole"][spec_type][filt] = unyt_array(fnu) # Convert the lists of line luminosities and continua to unyt arrays for spec_type, lines in self.line_lums["Galaxy"].items(): self.line_lums["Galaxy"][spec_type] = unyt_array(lines) for spec_type, conts in self.line_cont_lums["Galaxy"].items(): self.line_cont_lums["Galaxy"][spec_type] = unyt_array(conts) for spec_type, lines in self.line_lums["Stars"].items(): self.line_lums["Stars"][spec_type] = unyt_array(lines) for spec_type, conts in self.line_cont_lums["Stars"].items(): self.line_cont_lums["Stars"][spec_type] = unyt_array(conts) for spec_type, lines in self.line_lums["BlackHole"].items(): self.line_lums["BlackHole"][spec_type] = unyt_array(lines) for spec_type, conts in self.line_cont_lums["BlackHole"].items(): self.line_cont_lums["BlackHole"][spec_type] = unyt_array(conts) # Convert the lists of line fluxes and continua to unyt arrays for spec_type, fluxes in self.line_fluxes["Galaxy"].items(): self.line_fluxes["Galaxy"][spec_type] = unyt_array(fluxes) for spec_type, conts in self.line_cont_fluxes["Galaxy"].items(): self.line_cont_fluxes["Galaxy"][spec_type] = unyt_array(conts) for spec_type, fluxes in self.line_fluxes["Stars"].items(): self.line_fluxes["Stars"][spec_type] = unyt_array(fluxes) for spec_type, conts in self.line_cont_fluxes["Stars"].items(): self.line_cont_fluxes["Stars"][spec_type] = unyt_array(conts) for spec_type, fluxes in self.line_fluxes["BlackHole"].items(): self.line_fluxes["BlackHole"][spec_type] = unyt_array(fluxes) for spec_type, conts in self.line_cont_fluxes["BlackHole"].items(): self.line_cont_fluxes["BlackHole"][spec_type] = unyt_array(conts) # Convert the lists of luminosity images to unyt arrays for inst_label, inst_data in self.images_lum["Galaxy"].items(): for spec_type, imgs in inst_data.items(): for f, img in imgs.items(): self.images_lum["Galaxy"][inst_label][spec_type][f] = ( unyt_array(img) ) for inst_label, inst_data in self.images_lum["Stars"].items(): for spec_type, imgs in inst_data.items(): for f, img in imgs.items(): self.images_lum["Stars"][inst_label][spec_type][f] = ( unyt_array(img) ) for inst_label, inst_data in self.images_lum["BlackHole"].items(): for spec_type, imgs in inst_data.items(): for f, img in imgs.items(): self.images_lum["BlackHole"][inst_label][spec_type][f] = ( unyt_array(img) ) # Convert the lists of flux images to unyt arrays for inst_label, inst_data in self.images_flux["Galaxy"].items(): for spec_type, imgs in inst_data.items(): for f, img in imgs.items(): self.images_flux["Galaxy"][inst_label][spec_type][f] = ( unyt_array(img) ) for inst_label, inst_data in self.images_flux["Stars"].items(): for spec_type, imgs in inst_data.items(): for f, img in imgs.items(): self.images_flux["Stars"][inst_label][spec_type][f] = ( unyt_array(img) ) for inst_label, inst_data in self.images_flux["BlackHole"].items(): for spec_type, imgs in inst_data.items(): for f, img in imgs.items(): self.images_flux["BlackHole"][inst_label][spec_type][f] = ( unyt_array(img) ) # Convert the lists of psf luminosity images to unyt arrays for inst_label, inst_data in self.images_lum_psf["Galaxy"].items(): for spec_type, imgs in inst_data.items(): for f, img in imgs.items(): self.images_lum_psf["Galaxy"][inst_label][spec_type][f] = ( unyt_array(img) ) for inst_label, inst_data in self.images_lum_psf["Stars"].items(): for spec_type, imgs in inst_data.items(): for f, img in imgs.items(): self.images_lum_psf["Stars"][inst_label][spec_type][f] = ( unyt_array(img) ) for inst_label, inst_data in self.images_lum_psf["BlackHole"].items(): for spec_type, imgs in inst_data.items(): for f, img in imgs.items(): self.images_lum_psf["BlackHole"][inst_label][spec_type][ f ] = unyt_array(img) # Convert the lists of psf flux images to unyt arrays for inst_label, inst_data in self.images_flux_psf["Galaxy"].items(): for spec_type, imgs in inst_data.items(): for f, img in imgs.items(): self.images_flux_psf["Galaxy"][inst_label][spec_type][ f ] = unyt_array(img) for inst_label, inst_data in self.images_flux_psf["Stars"].items(): for spec_type, imgs in inst_data.items(): for f, img in imgs.items(): self.images_flux_psf["Stars"][inst_label][spec_type][f] = ( unyt_array(img) ) for inst_label, inst_data in self.images_flux_psf["BlackHole"].items(): for spec_type, imgs in inst_data.items(): for f, img in imgs.items(): self.images_flux_psf["BlackHole"][inst_label][spec_type][ f ] = unyt_array(img) # Convert the lists of noise luminosity images to unyt arrays for inst_label, inst_data in self.images_lum_noise["Galaxy"].items(): for spec_type, imgs in inst_data.items(): for f, img in imgs.items(): self.images_lum_noise["Galaxy"][inst_label][spec_type][ f ] = unyt_array(img) for inst_label, inst_data in self.images_lum_noise["Stars"].items(): for spec_type, imgs in inst_data.items(): for f, img in imgs.items(): self.images_lum_noise["Stars"][inst_label][spec_type][ f ] = unyt_array(img) for inst_label, inst_data in self.images_lum_noise[ "BlackHole" ].items(): for spec_type, imgs in inst_data.items(): for f, img in imgs.items(): self.images_lum_noise["BlackHole"][inst_label][spec_type][ f ] = unyt_array(img) # Convert the lists of noise flux images to unyt arrays for inst_label, inst_data in self.images_flux_noise["Galaxy"].items(): for spec_type, imgs in inst_data.items(): for f, img in imgs.items(): self.images_flux_noise["Galaxy"][inst_label][spec_type][ f ] = unyt_array(img) for inst_label, inst_data in self.images_flux_noise["Stars"].items(): for spec_type, imgs in inst_data.items(): for f, img in imgs.items(): self.images_flux_noise["Stars"][inst_label][spec_type][ f ] = unyt_array(img) for inst_label, inst_data in self.images_flux_noise[ "BlackHole" ].items(): for spec_type, imgs in inst_data.items(): for f, img in imgs.items(): self.images_flux_noise["BlackHole"][inst_label][spec_type][ f ] = unyt_array(img) # Convert the lists of extra analysis results to unyt arrays # Unlike the previous data we need to do some checks here to ensure # that the data is in a consistent format. This is because the user # can return any type of data from their analysis functions. for key, res in self._analysis_results.items(): # Ensure the data of all results is in the same format. types = set([type(r) for r in res]) if len(types) > 1: raise exceptions.BadResult( "All results from extra analysis functions must be " f"of the same type. Got: {set(types)}" ) # Combine the list of results into a dict of unyt arrays or a # single unyt array. This will simply return a unyt array if we # hand a list of values combined_data = combine_list_of_dicts(res) # Store the data self._analysis_results[key] = combined_data self._took(start, "Cleaning outputs")
[docs] def write(self, outpath, verbose=None): """Write what we have produced to a HDF5 file. Any get_* methods that have been called will have their results written to the HDF5 file. We consider the call to get_* to be the signal to write the data out. If a get_* method has not been called, but the data was needed by a subsequent get_* method, it will have been run but then discarded, e.g. calling get_photometry_fluxes will have run get_observed_spectra, which will have run get_spectra, but the results from get_spectra and get_observed_spectra will have been discarded. Args: outpath (str): The path to the HDF5 file to write. verbose (bool, optional): If set, override the Pipeline verbose setting. """ # Ensure we have run the pipeline (this is mostly useful to help with # running old pipelines that don't call run before write) if not self._analysis_complete: self._print( "Pipeline has not been run. Running pipeline before writing." ) self.run() write_start = time.perf_counter() # Get an instance of the HDF5 file writer self.io_helper = PipelineIO( outpath, self.comm, self.n_galaxies_local, self._start_time, verbose if verbose is not None else self.verbose, ) # Write out the metadata self.io_helper.create_file_with_metadata( self.instruments, self.emission_model ) # In MPI land we need to collect together the galaxy counts on each # rank to make the indices for each rank consistent if self.using_mpi: galaxy_counts = self.comm.allgather(self.n_galaxies_local) galaxy_stars = np.cumsum(galaxy_counts) - galaxy_counts galaxy_ends = np.cumsum(galaxy_counts) - 1 galaxy_indices = np.arange( galaxy_stars[self.rank], galaxy_ends[self.rank] + 1, ) else: galaxy_indices = None # Write spectral luminosity densities if self._write_lnu_spectra: self.io_helper.write_data( self.lnu_spectra["Galaxy"], "Galaxies/Spectra/SpectralLuminosityDensities", galaxy_indices, ) self.io_helper.write_data( self.lnu_spectra["Stars"], "Galaxies/Stars/Spectra/SpectralLuminosityDensities", galaxy_indices, ) self.io_helper.write_data( self.lnu_spectra["BlackHole"], "Galaxies/BlackHoles/Spectra/SpectralLuminosityDensities", galaxy_indices, ) # Write spectral flux densities if self._write_fnu_spectra: self.io_helper.write_data( self.fnu_spectra["Galaxy"], "Galaxies/Spectra/SpectralFluxDensities", galaxy_indices, ) self.io_helper.write_data( self.fnu_spectra["Stars"], "Galaxies/Stars/Spectra/SpectralFluxDensities", galaxy_indices, ) self.io_helper.write_data( self.fnu_spectra["BlackHole"], "Galaxies/BlackHoles/Photometry/SpectralFluxDensities", galaxy_indices, ) # Write photometric luminosities if self._write_luminosities: self.io_helper.write_data( self.luminosities["Galaxy"], "Galaxies/Photometry/Luminosities", galaxy_indices, ) self.io_helper.write_data( self.luminosities["Stars"], "Galaxies/Stars/Photometry/Luminosities", galaxy_indices, ) self.io_helper.write_data( self.luminosities["BlackHole"], "Galaxies/BlackHoles/Photometry/Luminosities", galaxy_indices, ) # Write photometric fluxes if self._write_fluxes: self.io_helper.write_data( self.fluxes["Galaxy"], "Galaxies/Photometry/Fluxes", galaxy_indices, ) self.io_helper.write_data( self.fluxes["Stars"], "Galaxies/Stars/Photometry/Fluxes", galaxy_indices, ) self.io_helper.write_data( self.fluxes["BlackHole"], "Galaxies/BlackHoles/Photometry/Fluxes", galaxy_indices, ) # Write emission line luminosities if self._write_lines: self.io_helper.write_data( self.line_lums["Galaxy"], "Galaxies/Lines/Luminosity", galaxy_indices, ) self.io_helper.write_data( self.line_lums["Stars"], "Galaxies/Stars/Lines/Luminosity", galaxy_indices, ) self.io_helper.write_data( self.line_lums["BlackHole"], "Galaxies/BlackHoles/Lines/Luminosity", galaxy_indices, ) self.io_helper.write_data( self.line_cont_lums["Galaxy"], "Galaxies/Lines/Continuum", galaxy_indices, ) self.io_helper.write_data( self.line_cont_lums["Stars"], "Galaxies/Stars/Lines/Continuum", galaxy_indices, ) self.io_helper.write_data( self.line_cont_lums["BlackHole"], "Galaxies/BlackHoles/Lines/Continuum", galaxy_indices, ) self.io_helper.write_data( self.line_ids, "Galaxies/Lines/IDs", galaxy_indices, ) self.io_helper.write_data( self.line_lams, "Galaxies/Lines/Wavelengths", galaxy_indices, ) # Write observed emission line fluxes if self._write_flux_lines: self.io_helper.write_data( self.line_fluxes["Galaxy"], "Galaxies/Lines/Flux", galaxy_indices, ) self.io_helper.write_data( self.line_fluxes["Stars"], "Galaxies/Stars/Lines/Flux", galaxy_indices, ) self.io_helper.write_data( self.line_fluxes["BlackHole"], "Galaxies/BlackHoles/Lines/Flux", galaxy_indices, ) self.io_helper.write_data( self.line_cont_fluxes["Galaxy"], "Galaxies/Lines/Continuum", galaxy_indices, ) self.io_helper.write_data( self.line_cont_fluxes["Stars"], "Galaxies/Stars/Lines/Continuum", galaxy_indices, ) self.io_helper.write_data( self.line_cont_fluxes["BlackHole"], "Galaxies/BlackHoles/Lines/Continuum", galaxy_indices, ) self.io_helper.write_data( self.line_obs_lams, "Galaxies/Lines/ObservedWavelengths", galaxy_indices, ) # Write luminosity images if self._write_images_lum: self.io_helper.write_data( self.images_lum["Galaxy"], "Galaxies/Images/Luminosity", galaxy_indices, ) self.io_helper.write_data( self.images_lum["Stars"], "Galaxies/Stars/Images/Luminosity", galaxy_indices, ) self.io_helper.write_data( self.images_lum["BlackHole"], "Galaxies/BlackHoles/Images/Luminosity", galaxy_indices, ) # Write PSF luminosity images if self._write_images_lum_psf: self.io_helper.write_data( self.images_lum_psf["Galaxy"], "Galaxies/PSFImages/Luminosity", galaxy_indices, ) self.io_helper.write_data( self.images_lum_psf["Stars"], "Galaxies/Stars/PSFImages/Luminosity", galaxy_indices, ) self.io_helper.write_data( self.images_lum_psf["BlackHole"], "Galaxies/BlackHoles/PSFImages/Luminosity", galaxy_indices, ) # Write noise luminosity images if self._write_images_lum_noise: self.io_helper.write_data( self.images_lum_noise["Galaxy"], "Galaxies/NoiseImages/Luminosity", galaxy_indices, ) self.io_helper.write_data( self.images_lum_noise["Stars"], "Galaxies/Stars/NoiseImages/Luminosity", galaxy_indices, ) self.io_helper.write_data( self.images_lum_noise["BlackHole"], "Galaxies/BlackHoles/NoiseImages/Luminosity", galaxy_indices, ) # Write flux images if self._write_images_flux: self.io_helper.write_data( self.images_flux["Galaxy"], "Galaxies/Images/Flux", galaxy_indices, ) self.io_helper.write_data( self.images_flux["Stars"], "Galaxies/Stars/Images/Flux", galaxy_indices, ) self.io_helper.write_data( self.images_flux["BlackHole"], "Galaxies/BlackHoles/Images/Flux", galaxy_indices, ) # Write PSF flux images if self._write_images_flux_psf: self.io_helper.write_data( self.images_flux_psf["Galaxy"], "Galaxies/PSFImages/Flux", galaxy_indices, ) self.io_helper.write_data( self.images_flux_psf["Stars"], "Galaxies/Stars/PSFImages/Flux", galaxy_indices, ) self.io_helper.write_data( self.images_flux_psf["BlackHole"], "Galaxies/BlackHoles/PSFImages/Flux", galaxy_indices, ) # Write noise flux images if self._write_images_flux_noise: self.io_helper.write_data( self.images_flux_noise["Galaxy"], "Galaxies/NoiseImages/Flux", galaxy_indices, ) self.io_helper.write_data( self.images_flux_noise["Stars"], "Galaxies/Stars/NoiseImages/Flux", galaxy_indices, ) self.io_helper.write_data( self.images_flux_noise["BlackHole"], "Galaxies/BlackHoles/NoiseImages/Flux", galaxy_indices, ) # Write out the extra analysis results for key, res in self._analysis_results.items(): self.io_helper.write_data( res, f"Galaxies/{key}", galaxy_indices, ) self._took(write_start, "Writing data") # Totally done! self._say_goodbye()
[docs] def combine_files(self): """Combine individual rank files into a single file. Only applicable to MPI runs. This will create a physical file on disk with all the data copied from the individual rank files. The rank files themselves will be deleted. Once all data has been copied. This method is cleaner but has the potential to be very slow. """ start = time.perf_counter() # Nothing to do if we're not using MPI if not self.using_mpi: self._print("Not using MPI, nothing to combine.") return # Ensure we have written the data if self.io_helper is None: raise exceptions.PipelineNotReady( "Cannot combine files before writing data! Call write first." ) # Nothing to do if we're using collective I/O if self.io_helper.is_collective: self._print("Using collective I/O, nothing to combine.") return # Combine the files self.io_helper.combine_rank_files() self._took(start, "Combining files")
[docs] def combine_files_virtual(self): """Combine individual rank files into a single virtual file. Only applicable to MPI runs. This will create a file where all the data is accessible but not physically copied. This is much faster than making a true copy but requires each individual rank file remains accessible. """ start = time.perf_counter() # Nothing to do if we're not using MPI if not self.using_mpi: self._print("Not using MPI, nothing to combine.") return # Ensure we have written the data if self.io_helper is None: raise exceptions.PipelineNotReady( "Cannot combine files before writing data! Call write first." ) # Nothing to do if we're using collective I/O if self.io_helper.is_collective: self._print("Using collective I/O, nothing to combine.") return # Combine the files self.io_helper.combine_rank_files_virtual() self._took(start, "Combining files (virtual)")
[docs] def repartition_galaxies(self, galaxy_weights=None, random_seed=42): """Given the galaxies repartition them across the ranks.""" raise NotImplementedError("Repartitioning is not yet implemented.")
def _report_balance(self): """Report the balance of galaxies across the ranks. This function will print out a nice horizontal bar graph showing the distribution of galaxies across the ranks. """ # Communicate local counts to rank 0 counts = self.comm.gather(self.n_galaxies_local) # Produce a nice horizontal bar graph to show the # distribution of galaxies across the ranks. This only needs printing # on rank 0 regardless of verbosity. if self.rank == 0: self._print("Galaxy balance across ranks:") # Find the maximum list length for scaling max_count = max(counts) for rank, count in enumerate(counts): # Calculate the length of the bar based on the relative size bar_length = ( int((count / max_count) * 50) if max_count > 0 else 0 ) # Create the bar and append the list length in brackets bar = "#" * bar_length self._print( f"Rank {str(rank).zfill(len(str(self.size)) + 1)} - " f"{bar} ({count})" )