"""A module containing a pipeline helper class.
This module contains the `Pipeline` class, which is used to run observable
generation pipelines on a set of galaxies. To use this functionality the user
needs to define the properties of the Pipeline and a function to load the
galaxies. The user can then call the various methods to generate the mock
data they need, simplifying a complex pipeline full of boilerplate code to a
handful of definitions and calls to the Pipeline object.
Example usage:
```python
from synthesizer import Pipeline
pipeline = Pipeline(
gal_loader_func=load_galaxy,
emission_model=emission_model,
instruments=[instrument1, instrument2],
n_galaxies=1000,
nthreads=4,
comm=None,
verbose=1,
)
pipeline.load_galaxies()
pipeline.get_spectra()
pipeline.get_photometry_luminosities()
pipeline.write("output.hdf5")
```
"""
import time
import numpy as np
from unyt import unyt_array
from synthesizer import check_openmp, exceptions
from synthesizer.instruments import InstrumentCollection
from synthesizer.particle import Galaxy as ParticleGalaxy
from synthesizer.pipeline.pipeline_io import PipelineIO
from synthesizer.pipeline.pipeline_utils import (
NO_MODEL_LABEL,
OperationKwargsHandler,
accumulate_pipeline_results_from_child,
clear_pipeline_outputs,
combine_list_of_dicts,
count_and_check_dict_recursive,
get_full_memory,
validate_noise_unit_compatibility,
)
from synthesizer.synth_warnings import warn
from synthesizer.utils.art import Art
[docs]
class Pipeline:
"""A class for running observable generation pipelines on galaxies.
To use this class the user must instantiate it with a galaxy loading
function, an emission model defining the different emissions that will be
included in the pipeline, any instruments that will be used to make
observations, and the number of galaxies that will be loaded.
Optionally the user can also specify the number of threads to use if
Synthesizer has been installed with OpenMP support, and an MPI communicator
if they are running over MPI.
Finally the verbosity level can be set to control the amount of output.
Once the Pipeline object has been instantiated the user can call the
various methods to generate the data they need.
For spectra:
- get_spectra (passing a cosmology object if redshifted spectra are
required)
- get_lnu_data_cubes (resolved spectral data cubes)
- get_fnu_data_cubes (resolved spectral data cubes)
For photometry:
- get_photometry_luminosities
- get_photometry_fluxes
For emission lines:
- get_lines (passing a list of line IDs to generate)
For images (with optional PSF and noise based on the instrument):
- get_images_luminosity
- get_images_flux
For the SFZH grid:
- get_sfzh (passing a Grid object)
The user can also add their own analysis functions to the pipeline which
will be run on each galaxy once all data has been generated. These
functions should take a galaxy object as the first argument and can take
any number of additional arguments and keyword arguments. The results of
these functions should be attached to the galaxy object, either as base
level attributes or dictionaries containing the computed values. These
attributes should be unique to the function to avoid overwriting existing
attributes (they should be named what is passed to the result_attribute
argument, see add_analysis_func for more details).
Finally the user can write out the data generated by the pipeline using the
write method. This will write out the data to an HDF5 file.
Attributes:
emission_model (EmissionModel):
The emission model to use for the pipeline.
n_galaxies (int):
How many galaxies will we load in total (i.e. not per rank if using
MPI)?
nthreads (int):
The number of threads to use for shared memory parallelism.
Default is 1.
comm (MPI.Comm):
The MPI communicator to use for MPI parallelism. Default is None.
verbose (int):
How talkative are we?
0: No output beyond hello and goodbye.
1: Outputs with timings but only on rank 0 (when using MPI).
2: Outputs with timings on all ranks (when using MPI).
galaxies (list):
A list of Galaxy objects that have been loaded.
"""
def __init__(
self,
emission_model,
nthreads=1,
comm=None,
verbose=1,
report_memory=False,
max_npart=None,
):
"""Initialise the Pipeline object.
This will not perform any part of the calculation, it only sets it up.
This will attach all the passed attributes of the pipeline and set up
anything we'll need later like MPI variables (if applicable), flags
to indicate what stages we've completed and containers for any
outputs and additional analysis functions.
This will also check arguments are sensible, e.g.
- The galaxy loader function is callable and takes at least one
argument (the galaxy index).
- Synthesizer has been installed with OpenMP support if multiple
threads are requested.
Args:
emission_model (EmissionModel): The emission model to use for the
pipeline.
nthreads (int): The number of threads to use for shared memory
parallelism. Default is 1.
comm (MPI.Comm): The MPI communicator to use for MPI parallelism.
Default is None.
verbose (int): How talkative are we? 0: No output beyond hello and
goodbye. 1: Outputs with timings but only on rank 0 (when using
MPI). 2: Outputs with timings on all ranks (when using MPI).
report_memory (bool): Should we report memory usage as we run?
This should only be turned on for debugging purposes. This
will report the memory usage of the galaxies, the results,
and the pipeline object itself in the progress table. This
can be very EXPENSIVE in terms of time since we recurse through
all objects to get the memory usage after processing each
galaxy. Default is False.
max_npart (int): (Only applicable for particle based galaxies) The
maximum number of particles to be processed at one time. Any
galaxies above this threshold will be split until each
individual "sub-galaxy" obeys the threshold. The subsequent
emissions will then be combined at the end. Note that any
operations that require the complete particle distribution
(e.g. column densities) will be done on the original galaxy
prior to splitting.
TODO: while gas particles don't carry spectra we only need to
consider stars and black holes in this threshold. If we have
gas spectra in the future we will need to amend this.
"""
# Attributes to track timing
self._start_time = time.perf_counter()
# Attach all the attributes we need to know what to do
self.emission_model = emission_model
# Set verbosity
self.verbose = verbose
# Are we reporting memory usage?
self._report_memory = report_memory
# How many galaxies are we going to be looking at?
self.n_galaxies = 0
self.n_galaxies_local = 0 # Only applicable when using MPI
self.n_galaxies_per_rank = 0 # Only applicable when using MPI
self.n_galaxies_offset = 0 # Only applicable when using MPI
# Attach the maximum number of particles we can process at one time.
if max_npart is not None and (
not isinstance(max_npart, int) or max_npart < 1
):
raise ValueError("max_npart must be an int >= 1")
self._max_npart = max_npart
# Define the container to hold the galaxies
self.galaxies = []
# Define an empty InstrumentCollection to hold the instruments we
# will use (this is only used for book keeping, the kwarg handler
# deals with where to apply different instruments)
self.instruments = InstrumentCollection()
self._instruments_prepared = False
# How many threads are we using for shared memory parallelism?
self.nthreads = nthreads
# Check if we can use OpenMP
if self.nthreads > 1 and not check_openmp():
raise exceptions.InconsistentArguments(
"Can't use multiple threads without OpenMP support. "
" Install with: `WITH_OPENMP=1 pip install .`"
)
# Define flags for what we will do
self._do_los_optical_depths = False
self._do_lnu_spectra = False
self._do_fnu_spectra = False
self._do_luminosities = False
self._do_fluxes = False
self._do_lum_lines = False
self._do_flux_lines = False
self._do_images_lum = False
self._do_images_flux = False
self._do_lnu_data_cubes = False
self._do_fnu_data_cubes = False
self._do_spectroscopy_lnu = False
self._do_spectroscopy_fnu = False
self._do_sfzh = False
self._do_sfh = False
# Define the container for all the kwargs needed by each operation
# This is a handler that manages kwargs for each
# (model_label, operation) pair. Each call to one of the signalling
# methods will add a set of kwargs to the relevant operation.
self._operation_kwargs = OperationKwargsHandler(
model_labels=emission_model.saved_labels,
)
# Define flags for what we will write out
self._write_lnu_spectra = False
self._write_fnu_spectra = False
self._write_luminosities = False
self._write_fluxes = False
self._write_lines = False
self._write_flux_lines = False
self._write_images_lum = False
self._write_images_lum_psf = False
self._write_images_lum_noise = False
self._write_images_flux = False
self._write_images_flux_psf = False
self._write_images_flux_noise = False
self._write_lnu_data_cubes = False
self._write_fnu_data_cubes = False
self._write_spectroscopy_lnu = False
self._write_spectroscopy_fnu = False
self._write_sfzh = False
self._write_sfh = False
# Define flags to indicate we read the galaxies
self._loaded_galaxies = False
self._analysis_complete = False
# Define containers for any additional analysis functions
self._analysis_funcs = []
self._analysis_args = []
self._analysis_kwargs = []
self._analysis_results_keys = []
self._analysis_results = {}
# Initialise containers for all the data we can generate
self.sfzhs = []
self.sfhs = []
self.lnu_spectra = {"Galaxy": {}, "Stars": {}, "BlackHole": {}}
self.fnu_spectra = {"Galaxy": {}, "Stars": {}, "BlackHole": {}}
self.luminosities = {"Galaxy": {}, "Stars": {}, "BlackHole": {}}
self.fluxes = {"Galaxy": {}, "Stars": {}, "BlackHole": {}}
self.line_lums = {"Galaxy": {}, "Stars": {}, "BlackHole": {}}
self.line_cont_lums = {"Galaxy": {}, "Stars": {}, "BlackHole": {}}
self.line_lams = None
self.line_ids = None
self.line_fluxes = {"Galaxy": {}, "Stars": {}, "BlackHole": {}}
self.line_cont_fluxes = {"Galaxy": {}, "Stars": {}, "BlackHole": {}}
self.line_obs_lams = None
self.images_lum = {"Galaxy": {}, "Stars": {}, "BlackHole": {}}
self.images_lum_psf = {"Galaxy": {}, "Stars": {}, "BlackHole": {}}
self.images_lum_noise = {"Galaxy": {}, "Stars": {}, "BlackHole": {}}
self.images_flux = {"Galaxy": {}, "Stars": {}, "BlackHole": {}}
self.images_flux_psf = {"Galaxy": {}, "Stars": {}, "BlackHole": {}}
self.images_flux_noise = {"Galaxy": {}, "Stars": {}, "BlackHole": {}}
self.lnu_data_cubes = {"Galaxy": {}, "Stars": {}, "BlackHole": {}}
self.fnu_data_cubes = {"Galaxy": {}, "Stars": {}, "BlackHole": {}}
self.spectroscopy = {"Galaxy": {}, "Stars": {}, "BlackHole": {}}
# We'll need to store the timing information to be reported at the
# end of the pipeline (this will be stored with the operation
# as the key and the value being the collected time taken).
self._op_timing = {
"LOS optical depths": 0.0,
"SFZH": 0.0,
"SFH": 0.0,
"Lnu Spectra": 0.0,
"Fnu Spectra": 0.0,
"Luminosities": 0.0,
"Fluxes": 0.0,
"Emission Line Luminosities": 0.0,
"Emission Line Fluxes": 0.0,
"Luminosity Images": 0.0,
"Luminosity Images (With PSF)": 0.0,
"Luminosity Images (With Noise)": 0.0,
"Flux Images": 0.0,
"Flux Images (With PSF)": 0.0,
"Flux Images (With Noise)": 0.0,
"Lnu Data Cubes": 0.0,
"Fnu Data Cubes": 0.0,
"Spectroscopy Lnu": 0.0,
"Spectroscopy Fnu": 0.0,
"Extra Analyses": 0.0,
"Unpacking results": 0.0,
}
# We'll also report total counts so define a container for those.
self._op_counts = {
"LOS optical depths": 0,
"SFZH": 0,
"SFH": 0,
"Lnu Spectra": 0,
"Fnu Spectra": 0,
"Luminosities": 0,
"Fluxes": 0,
"Emission Line Luminosities": 0,
"Emission Line Fluxes": 0,
"Luminosity Images": 0,
"Luminosity Images (With PSF)": 0,
"Luminosity Images (With Noise)": 0,
"Flux Images": 0,
"Flux Images (With PSF)": 0,
"Flux Images (With Noise)": 0,
"Lnu Data Cubes": 0,
"Fnu Data Cubes": 0,
"Spectroscopy Lnu": 0,
"Spectroscopy Fnu": 0,
"Extra Analyses": 0,
}
# Some constants we'll use to format the progress report
self._table_col_width = 12
# Everything that follows is only needed for hybrid parallelism
# (running with MPI in addition to shared memory parallelism)
# If we are running with hybrid parallelism, we need to know about
# the communicator for MPI
self.comm = comm
self.using_mpi = comm is not None and comm.Get_size() > 1
# Get some MPI information if we are using MPI
if self.using_mpi:
self.rank = comm.Get_rank()
self.size = comm.Get_size()
else:
self.rank = 0
self.size = 1
if self.rank == 0:
self._say_hello()
self._report()
# Attach the writer, we'll assign this later in the write method
self.io_helper = None
# If we are using MPI we need to make sure everyone agrees on the
# start time, take the minimum
if self.using_mpi:
self._start_time = min(self.comm.allgather(self._start_time))
# Hold back everyone in MPI land until we're all ready to go
if self.using_mpi:
comm.Barrier()
def _say_hello(self):
"""Print a nice welcome."""
if self.verbose == 0:
return
print()
print("\n".join([" " * 25 + s for s in Art.galaxy.split("\n")]))
print()
def _report(self):
"""Print a report containing the Pipeline setup."""
# Print the MPI setup if we are using MPI
if self.using_mpi:
self._print(f"Running with MPI on {self.size} ranks.")
# Print the shared memory parallelism setup
if self.nthreads > 1 and self.using_mpi:
self._print(f"Running with {self.nthreads} threads per rank.")
elif self.nthreads > 1:
self._print(f"Running with {self.nthreads} threads.")
# Print some information about the emission model
self._print(f"Root emission model: {self.emission_model.label}")
self._print(
f"EmissionModel contains {len(self.emission_model._models)} "
"individual models."
)
self._print("EmissionModels split by emitter:")
label_width = max(
len(" - galaxy"), len(" - stellar"), len(" - blackhole")
)
ngal_models = len(
[
m
for m in self.emission_model._models.values()
if m.emitter == "galaxy"
]
)
nstar_models = len(
[
m
for m in self.emission_model._models.values()
if m.emitter == "stellar"
]
)
nbh_models = len(
[
m
for m in self.emission_model._models.values()
if m.emitter == "blackhole"
]
)
self._print(f"{' - galaxy:'.ljust(label_width + 2)} {ngal_models}")
self._print(f"{' - stellar:'.ljust(label_width + 2)} {nstar_models}")
self._print(f"{' - blackhole:'.ljust(label_width + 2)} {nbh_models}")
self._print("EmissionModels split by operation type:")
label_width = max(
len(" - extraction"),
len(" - combination"),
len(" - attenuating"),
len(" - generation"),
)
nextract_models = len(
[
m
for m in self.emission_model._models.values()
if m._is_extracting
]
)
ncombine_models = len(
[
m
for m in self.emission_model._models.values()
if m._is_combining
]
)
nattenuate_models = len(
[
m
for m in self.emission_model._models.values()
if m._is_transforming
]
)
ngen_models = len(
[
m
for m in self.emission_model._models.values()
if m._is_generating
]
)
self._print(
f"{' - extraction:'.ljust(label_width + 2)} {nextract_models}"
)
self._print(
f"{' - combination:'.ljust(label_width + 2)} {ncombine_models}"
)
self._print(
f"{' - attenuating:'.ljust(label_width + 2)} {nattenuate_models}"
)
self._print(
f"{' - generation:'.ljust(label_width + 2)} {ngen_models}"
)
def _report_instruments(self):
"""Print a report containing the instruments setup."""
# Unpack the instruments to collect together all the unique instruments
unique_instruments = {}
for inst in self.instruments:
unique_instruments[inst.label] = inst
ninstruments = len(unique_instruments)
# Print the number of instruments we have
self._print(f"Using {ninstruments} instruments.")
# Print the number of filters we have
nfilters = 0
for inst in unique_instruments.values():
if getattr(inst, "filters", None) is not None:
nfilters += len(inst.filters)
self._print(f"Instruments have {nfilters} filters in total.")
# Make a breakdown of the instruments
self._print(
"Included instruments:",
", ".join(list(unique_instruments.keys())),
)
self._print("Instruments split by capability:")
label_width = max(
len(" - photometry"),
len(" - spectroscopy"),
len(" - imaging"),
len(" - resolved spectroscopy"),
)
nphot_inst = len(
[
inst
for inst in unique_instruments.values()
if inst.can_do_photometry
]
)
nspec_inst = len(
[
inst
for inst in unique_instruments.values()
if inst.can_do_spectroscopy
]
)
nimg_inst = len(
[
inst
for inst in unique_instruments.values()
if inst.can_do_imaging
]
)
nresspec_inst = len(
[
inst
for inst in unique_instruments.values()
if inst.can_do_resolved_spectroscopy
]
)
self._print(
f"{' - photometry:'.ljust(label_width + 2)} {nphot_inst}"
)
self._print(
" - spectroscopy:".ljust(label_width + 2),
nspec_inst,
)
self._print(f"{' - imaging:'.ljust(label_width + 2)} {nimg_inst}")
self._print(
f"{' - resolved spectroscopy:'.ljust(label_width + 2)}"
f" {nresspec_inst}"
)
def _say_goodbye(self):
"""Print a nice goodbye including timing."""
elapsed = time.perf_counter() - self._start_time
# Report in sensible units
if elapsed < 1:
elapsed *= 1000
units = "ms"
elif elapsed < 60:
units = "s"
elif elapsed < 3600:
elapsed /= 60
units = "mins"
else:
elapsed /= 3600
units = "hours"
# Report how blazingly fast we are
self._print(f"Total synthesis took {elapsed:.3f} {units}.")
self._print("Goodbye!")
def _print(self, *args, **kwargs):
"""Print a message to the screen with extra information.
The prints behave differently depending on whether we are using MPI or
not. We can also set the verbosity level at the Pipeline level which
will control the verbosity of the print statements.
Verbosity:
0: No output beyond hello and goodbye.
1: Outputs with timings but only on rank 0 (when using MPI).
2: Outputs with timings on all ranks (when using MPI).
Args:
*args: The message to print.
**kwargs: Any additional arguments to pass to the print function.
"""
# At verbosity 0 we are silent
if self.verbose == 0:
return
# Get the current time code in seconds with 0 padding and 2
# decimal places
now = time.perf_counter() - self._start_time
int_now = str(int(now)).zfill(
len(str(int(now))) + 1 if now > 9999 else 5
)
decimal = str(now).split(".")[-1][:2]
now_str = f"{int_now}.{decimal}"
# Create the prefix for the print, theres extra info to output if
# we are using MPI
if self.using_mpi:
# Only print on rank 0 if we are using MPI and have verbosity 1
if self.verbose == 1 and self.rank != 0:
return
prefix = (
f"[{str(self.rank).zfill(len(str(self.size)) + 1)}]"
f"[{now_str}]:"
)
else:
prefix = f"[{now_str}]:"
print(prefix, *args, **kwargs)
def _took(self, start, message):
"""Print a message with the time taken since the start time.
Args:
start (float): The start time of the process.
message (str): The message to print.
"""
elapsed = time.perf_counter() - start
# Report in sensible units
if elapsed < 1:
elapsed *= 1000
units = "ms"
elif elapsed < 60:
units = "s"
else:
elapsed /= 60
units = "mins"
# Report how blazingly fast we are
self._print(f"{message} took {elapsed:.3f} {units}.")
@property
def results_memory_usage(self):
"""Return the memory usage of the results stored on the Pipeline.
Returns:
float: The memory usage in Megabytes.
"""
results = [
self.sfzhs,
self.sfhs,
self.lnu_spectra,
self.fnu_spectra,
self.luminosities,
self.fluxes,
self.line_lums,
self.line_cont_lums,
self.line_lams,
self.line_ids,
self.line_fluxes,
self.line_cont_fluxes,
self.line_obs_lams,
self.images_lum,
self.images_lum_psf,
self.images_flux,
self.images_flux_psf,
self.lnu_data_cubes,
self.fnu_data_cubes,
self.spectroscopy,
]
return get_full_memory(results) / (1024**2)
@property
def galaxies_memory_usage(self):
"""Return the memory usage of the galaxies loaded into the Pipeline.
Returns:
float: The memory usage in Megabytes.
"""
return get_full_memory(self.galaxies) / (1024**2)
@property
def all_galaxies_memory_usage(self):
"""Return the memory usage of all galaxies across all ranks.
Returns:
float: The memory usage in Megabytes.
"""
if self.using_mpi:
return self.comm.allreduce(self.galaxies_memory_usage)
return self.galaxies_memory_usage
@property
def memory_usage(self):
"""Return the memory usage of the Pipeline object.
Returns:
float: The memory usage in Megabytes.
"""
return get_full_memory(self) / (1024**2)
def _print_progress_header(self):
"""Print the header for the progress report."""
# Nothing to do if we are not rank 0
if self.rank != 0:
return
# Obey 0 verbosity
if self.verbose == 0:
return
# Print the pipeline footprint regardless of report memory setting
self._print("Pipeline memory footprint (MB):", self.memory_usage)
self._print("Running the pipeline...")
self._print_progress_footer()
# Define the header for the progress report
header = (
"|"
+ f"{'Galaxy #':>{self._table_col_width}}"
+ "|"
+ f"{'Nstars':>{self._table_col_width}}"
+ "|"
+ f"{'Ngas':>{self._table_col_width}}"
+ "|"
+ f"{'Nbh':>{self._table_col_width}}"
+ "|"
+ f"{'dt (s)':>{self._table_col_width}}"
+ "|"
)
# If we are reporting memory usage we need to add the extra columns
if self._report_memory:
header += (
f"{'Memory footprint (MB)':>{self._table_col_width * 2}}"
+ "|"
+ f"{'Galaxy memory (MB)':>{self._table_col_width * 2}}"
+ "|"
+ f"{'Results memory (MB)':>{self._table_col_width * 2}}"
+ "|"
)
self._print(header)
self._print_progress_footer()
def _add_progress_row(self, gal, igal, start):
"""Print a row for the progress report.
Args:
gal (Galaxy): The galaxy object to report on.
igal (int): The index of the galaxy.
start (float): The start time of the process.
"""
# Obey 0 verbosity
if self.verbose == 0:
return
# Get the number of stars, for parametric galaxies we don't have this
# information but we do have the sfzh shape
if gal.stars is not None and hasattr(gal.stars, "nstars"):
nstars = gal.stars.nstars
elif gal.stars is not None:
nstars = str(gal.stars.sfzh.shape)
else:
nstars = "None"
# Get the number of blackholes (this is automatically 1 for parametric
# galaxies)
nbh = gal.black_holes.nbh if gal.black_holes is not None else "None"
# Get the number of gas particles (this should be None for parametric)
ngas = gal.gas.nparticles if gal.gas is not None else "None"
# Get the elapsed time for this galaxy
elapsed = time.perf_counter() - start
# Get the memory footprint if we are reporting memory usage
if self._report_memory:
res_mem_mb = self.results_memory_usage
self_mem_mb = self.memory_usage
gal_mem_mb = self.galaxies_memory_usage
# In MPI land we need to add the offset to this ranks galaxy index
if self.using_mpi:
igal += self.n_galaxies_offset
# We don't want to obey the verbosity level here, we always want to
# print this information so we'll use the print method directly
# Get the current time code in seconds with 0 padding and 2
# decimal places
now = time.perf_counter() - self._start_time
int_now = str(int(now)).zfill(
len(str(int(now))) + 1 if now > 9999 else 5
)
decimal = str(now).split(".")[-1][:2]
now_str = f"{int_now}.{decimal}"
# Create the prefix for the print, theres extra info to output if
# we are using MPI
if self.using_mpi:
# Only print on rank 0 if we are using MPI and have verbosity 1
if self.verbose == 1 and self.rank != 0:
return
prefix = (
f"[{str(self.rank).zfill(len(str(self.size)) + 1)}]"
f"[{now_str}]:"
)
else:
prefix = f"[{now_str}]:"
# Define the row for the progress report
row = (
"|"
+ f"{igal:>{self._table_col_width}}"
+ "|"
+ f"{nstars:>{self._table_col_width}}"
+ "|"
+ f"{ngas:>{self._table_col_width}}"
+ "|"
+ f"{nbh:>{self._table_col_width}}"
+ "|"
+ f"{elapsed:>{self._table_col_width}.2f}"
+ "|"
)
# If we are reporting memory usage we need to add the extra columns
if self._report_memory:
row += (
f"{self_mem_mb:>{self._table_col_width * 2}.2f}"
+ "|"
+ f"{gal_mem_mb:>{self._table_col_width * 2}.2f}"
+ "|"
+ f"{res_mem_mb:>{self._table_col_width * 2}.2f}"
+ "|"
)
print(prefix, row)
def _print_progress_footer(self):
"""Print the footer for the progress report."""
footer = (
"+"
+ "-" * self._table_col_width
+ "+"
+ "-" * self._table_col_width
+ "+"
+ "-" * self._table_col_width
+ "+"
+ "-" * self._table_col_width
+ "+"
+ "-" * self._table_col_width
+ "+"
)
# If we are reporting memory usage we need to add the extra columns
if self._report_memory:
footer += (
"-" * self._table_col_width * 2
+ "+"
+ "-" * self._table_col_width * 2
+ "+"
+ "-" * self._table_col_width * 2
+ "+"
)
self._print(footer)
def _report_total_timings(self):
"""Print the total timings for each operation."""
# If we are using MPI we need to sum the timings and counts across all
# ranks to get the total time taken and total counts
if self.using_mpi:
for key in self._op_timing:
self._op_timing[key] = self.comm.allreduce(
self._op_timing[key]
)
for key in self._op_counts:
self._op_counts[key] = self.comm.allreduce(
self._op_counts[key]
)
# We only want to report this on rank 0 so we'll return if we're
# not rank 0
if self.rank != 0:
return
# Loop over the timings and print them out
for key, elapsed in self._op_timing.items():
if elapsed == 0.0:
continue
# Report in sensible units
if elapsed < 1:
elapsed *= 1000
units = "ms"
elif elapsed < 60:
units = "s"
else:
elapsed /= 60
units = "mins"
# Handle reporting (extra analysis and unpacking need some special
# handling because they are not operations)
if key == "Unpacking results":
self._print(f"{key} took {elapsed:.2f} {units}")
elif key == "Extra Analyses":
self._print(
f"Running {self._op_counts[key]} extra analyses"
f" took {elapsed:.2f} {units}"
)
elif key in self._op_counts:
self._print(
f"Computing {self._op_counts[key]} {key}"
f" took {elapsed:.2f} {units}"
)
else:
self._print(f"Computing {key} took {elapsed:.2f} {units}")
[docs]
def report_operations(self):
"""Print the operations that will be performed by the Pipeline.
This will print out the operations that will be performed by the
pipeline, including which will be written out and which will just be
computed.
"""
self._print("-" * 60)
self._print("".ljust(30) + "Compute?".rjust(15) + "Write?".rjust(15))
# Print each operation and whether it will be written out
self._print(
"Line of Sight Optical Depths".ljust(30)
+ str(self._do_los_optical_depths).rjust(15)
+ "N/A".rjust(15)
)
self._print(
"SFZH".ljust(30)
+ str(self._do_sfzh).rjust(15)
+ str(self._write_sfzh).rjust(15)
)
self._print(
"SFH".ljust(30)
+ str(self._do_sfh).rjust(15)
+ str(self._write_sfh).rjust(15)
)
self._print(
"Lnu Spectra".ljust(30)
+ str(self._do_lnu_spectra).rjust(15)
+ str(self._write_lnu_spectra).rjust(15)
)
self._print(
"Fnu Spectra".ljust(30)
+ str(self._do_fnu_spectra).rjust(15)
+ str(self._write_fnu_spectra).rjust(15)
)
self._print(
"Photometric Luminosities".ljust(30)
+ str(self._do_luminosities).rjust(15)
+ str(self._write_luminosities).rjust(15)
)
self._print(
"Photometric Fluxes".ljust(30)
+ str(self._do_fluxes).rjust(15)
+ str(self._write_fluxes).rjust(15)
)
self._print(
"Emission Line Luminosities".ljust(30)
+ str(self._do_lum_lines).rjust(15)
+ str(self._write_lines).rjust(15)
)
self._print(
"Emission Line Fluxes".ljust(30)
+ str(self._do_flux_lines).rjust(15)
+ str(self._write_flux_lines).rjust(15)
)
self._print(
"Luminosity Images".ljust(30)
+ str(self._do_images_lum).rjust(15)
+ str(self._write_images_lum).rjust(15)
)
self._print(
"Flux Images".ljust(30)
+ str(self._do_images_flux).rjust(15)
+ str(self._write_images_flux).rjust(15)
)
self._print(
"Lnu Data Cubes".ljust(30)
+ str(self._do_lnu_data_cubes).rjust(15)
+ str(self._write_lnu_data_cubes).rjust(15)
)
self._print(
"Fnu Data Cubes".ljust(30)
+ str(self._do_fnu_data_cubes).rjust(15)
+ str(self._write_fnu_data_cubes).rjust(15)
)
self._print(
"Spectroscopy Lnu".ljust(30)
+ str(self._do_spectroscopy_lnu).rjust(15)
+ str(self._write_spectroscopy_lnu).rjust(15)
)
self._print(
"Spectroscopy Fnu".ljust(30)
+ str(self._do_spectroscopy_fnu).rjust(15)
+ str(self._write_spectroscopy_fnu).rjust(15)
)
self._print("-" * 60)
[docs]
def add_analysis_func(self, func, result_key, *args, **kwargs):
"""Add an analysis function to the Pipeline.
The provided function will be called on each galaxy in the Pipeline
once all data has been generated. The function should take a galaxy
object as the first argument and can take any number of additional
arguments and keyword arguments.
The results of the analysis function should be returned. This can
be a scalar, array, or a dictionary of arbitrary structure. We'll
store it in a dictionary on the Pipeline object with the key being the
result_key argument.
For example:
```python
def my_analysis_func(galaxy, *args, **kwargs):
return galaxy.some_attribute * 2
pipeline.add_analysis_func(my_analysis_func, "MyAnalysisResult")
```
Or for a specific component of the galaxy:
```python
def my_analysis_func(galaxy, *args, **kwargs):
return galaxy.stars.mass.sum()
pipeline.add_analysis_func(my_analysis_func, "Stars/Mass")
```
Args:
func (callable):
The analysis function to add to the Pipeline. This function
should take a galaxy object as the first argument and can take
any number of additional arguments and keyword arguments.
result_key (str):
The key to use when storing the results of the analysis
function in the output. This can include slashes to denote
nesting, e.g. "Gas/Nested/Result".
*args: Any additional arguments to pass to the analysis function.
**kwargs: Any additional keyword arguments to pass to the
analysis function.
"""
# Ensure we have a callable function
if not callable(func):
raise exceptions.InconsistentArguments(
"Analysis function is not a callable function"
f" (found {type(func)})."
)
# Warn the user if theres a name clash, we'll take the new one
if func.__name__ in self._analysis_funcs:
warn(
f"{func.__name__} already exists in the analysis functions. "
"Overwriting with the passed function."
)
# Add the function to the dictionary
self._analysis_funcs.append(func)
self._analysis_args.append(args)
self._analysis_kwargs.append(kwargs)
self._analysis_results_keys.append(result_key)
# Warn the user if theres a name clash, we'll take the new one
if result_key in self._analysis_results:
raise exceptions.InconsistentArguments(
f"{result_key} already exists in the analysis results. "
"Choose a different result_key"
)
else:
self._analysis_results[result_key] = []
self._print(f"Added analysis function: {result_key}")
[docs]
def add_galaxies(self, galaxies):
"""Add galaxies to the Pipeline.
This function will add the provided galaxies to the Pipeline. This is
useful if you have already loaded the galaxies and want to add them to
the Pipeline object.
Args:
galaxies (list):
A list of Galaxy objects to add to the Pipeline.
"""
start = time.perf_counter()
# Check there are actually galaxies... and tell the user
if len(galaxies) == 0:
if self.using_mpi:
self._print(
"No galaxies provided to add to the Pipeline on rank"
f" {self.rank}."
)
else:
self._print("No galaxies provided to add to the Pipeline.")
# Attach the galaxies
self.galaxies = galaxies
self.n_galaxies_local = len(galaxies)
# If we're in MPI land we need sum the local counts across all ranks
# to get the total number of galaxies
if self.using_mpi:
self.n_galaxies = self.comm.allreduce(self.n_galaxies_local)
self.n_galaxies_per_rank = self.comm.allgather(
self.n_galaxies_local
)
self.n_galaxies_offset = sum(self.n_galaxies_per_rank[: self.rank])
else:
self.n_galaxies = self.n_galaxies_local
self.n_galaxies_per_rank = [self.n_galaxies_local]
self.n_galaxies_offset = 0
# If we have MPI lets report the balance
if self.using_mpi:
self._report_balance()
# Report the galaxy memory footprint (in MPI land we'll report
# the total and local memory usage)
if self.using_mpi:
self._print(
"Local galaxies memory footprint:"
f" {self.galaxies_memory_usage:.2f} MB"
)
self._print(
"Distributed galaxies total memory footprint:"
f" {self.all_galaxies_memory_usage:.2f} MB"
)
else:
self._print(
"Galaxies memory footprint: "
f"{self.galaxies_memory_usage:.2f} MB"
)
# Done!
self._loaded_galaxies = True
self._took(start, f"Adding {self.n_galaxies} galaxies")
def _add_instruments(self, instruments):
"""Add instruments to the Pipeline.
This is a helprer used to attach instruments to the pipeline. Note
that these instruments are only used for book keeping. The machinery
to apply instruments to the right operation is handled by the
OperationKwargsHandler (self._operation_kwargs).
This method will convert the input to a flat list of Instruments.
Args:
instruments (list of Instrument/InstrumentCollection):
The instruments to attach to the pipeline.
Returns:
list of Instrument:
The flattened list of instruments.
"""
# Flatten the instruments into a single list
_instruments = []
for inst in instruments:
if isinstance(inst, InstrumentCollection):
_instruments.extend(list(inst.instruments.values()))
else:
_instruments.append(inst)
# Get a set for the currently instrument collection
current_instruments = self.instruments.to_set()
# Ensure we don't have duplicate labels (this can happen if the same
# label is being used for similar instruments with different
# properties)
all_labels = [inst.label for inst in current_instruments]
label_set = set(all_labels)
if len(label_set) != len(all_labels):
raise exceptions.InconsistentArguments(
"Duplicate instrument labels found when adding "
"instruments to the Pipeline. Ensure all instruments "
f"have unique labels. Labels found: {all_labels}"
)
# Add the new instruments to the instrument collection
new_instruments = set(_instruments) - current_instruments
self.instruments.add_instruments(*new_instruments)
if len(new_instruments) > 0:
self._instruments_prepared = False
return _instruments
def _prepare_instruments(self):
"""Prewarm instrument filters onto the emission-model grid.
This prepares filters once per pipeline setup so repeated photometry
operations can avoid interpolation and reuse precomputed integration
weights.
"""
if self._instruments_prepared:
return
# Nothing to do if we have no instruments attached.
if len(self.instruments) == 0:
self._instruments_prepared = True
return
# If the emission model has no wavelength grid we can only precompute
# on each filter's native grid.
model_lam = getattr(self.emission_model, "lam", None)
for inst in self.instruments:
if not inst.can_do_photometry:
continue
if model_lam is not None:
inst.filters.prepare_for_grid(lam=model_lam)
else:
inst.filters.prepare_for_grid()
self._instruments_prepared = True
[docs]
def get_los_optical_depths(
self,
kernel,
kernel_threshold=1.0,
kappa=0.0795,
tau_v_attr="tau_v",
):
"""Flag that the Pipeline should compute the LOS optical depths.
This will signal the Pipeline to compute the LOS optical depths when
the run method is called.
LOS optical depths are computed first.
Note that the LOS calculation requires a galaxy has a gas component and
either a stellar or black hole components emitting.
Args:
kernel (array-like):
The gas SPH kernel.
kernel_threshold (float):
The threshold of the kernel. Default is 1.0.
kappa (float):
The dust opacity coefficient in units of Msun / pc**2. Default
is 0.0795.
tau_v_attr (str):
The name of the attribute to store the V-band optical depth.
Default is "tau_v".
"""
# Store the arguments for the operation
self._operation_kwargs.add(
NO_MODEL_LABEL,
"get_los_optical_depths",
kernel=kernel,
kernel_threshold=kernel_threshold,
kappa=kappa,
tau_v_attr=tau_v_attr,
)
# Flag that we will compute the LOS optical depths
self._do_los_optical_depths = True
def _get_los_optical_depths(self, galaxy):
"""Compute the Line of Sight optical depths for all particles.
This will compute the optical depths based on the line of sight dust
column density for all non-gas components. We project a ray along the
z axis (LOS) and any gas kernels it intersects are evaluated at the
intersection and their contributions to the optical depth is included.
Args:
galaxy (Galaxy):
The galaxy to compute the optical depths for.
"""
start = time.perf_counter()
# Iterate over all queued operation configurations
for model_label, op_kwargs in self._operation_kwargs[
"get_los_optical_depths"
]:
# Compute the optical depths for the present components.
if (
galaxy.stars is not None
and galaxy.gas is not None
and galaxy.stars.nstars > 0
):
galaxy.get_stellar_los_tau_v(
kernel=op_kwargs["kernel"],
threshold=op_kwargs["kernel_threshold"],
kappa=op_kwargs["kappa"],
tau_v_attr=op_kwargs["tau_v_attr"],
nthreads=self.nthreads,
)
if (
galaxy.black_holes is not None
and galaxy.gas is not None
and galaxy.black_holes.nbh > 0
):
galaxy.get_black_hole_los_tau_v(
kernel=op_kwargs["kernel"],
threshold=op_kwargs["kernel_threshold"],
kappa=op_kwargs["kappa"],
tau_v_attr=op_kwargs["tau_v_attr"],
nthreads=self.nthreads,
)
# Count how many optical depths we have generated (1 per particle) and
# increment the total counts
if galaxy.stars is not None and galaxy.gas is not None:
self._op_counts["LOS optical depths"] += galaxy.stars.nstars
if galaxy.black_holes is not None and galaxy.gas is not None:
self._op_counts["LOS optical depths"] += galaxy.black_holes.nbh
# Record the time taken
self._op_timing["LOS optical depths"] += time.perf_counter() - start
[docs]
def get_sfzh(self, log10ages, metallicities, write=True):
"""Flag that the Pipeline should compute the SFZH grid.
This will signal the Pipeline to compute the SFZH grid when the run
method is called.
The SFZH grid is the star formation history grid for each galaxy.
Args:
log10ages (array-like):
The log10 age axis of the SFZH grid.
metallicities (array-like):
The metallicity axis of the SFZH grid.
write (bool):
Whether to write out the SFZH grid. Default is True.
"""
# Store the arguments for the operation
self._operation_kwargs.add_unique(
"get_sfzh",
log10ages=log10ages,
metallicities=metallicities,
)
# Flag that we will compute the SFZH grid
self._do_sfzh = True
# Flag that we will want to write out the SFZH grid (calling the
# get_sfzh method is considered the the intent to write it out
# by default)
self._write_sfzh = write or self._write_sfzh
def _get_sfzh(self, galaxy):
"""Compute the SFZH grid for each galaxy.
This is also the integrated weights of each star particle onto the SPS
grid.
Args:
galaxy (Galaxy):
The galaxy to compute the SFZH grid for.
Returns:
array-like:
The computed or cached SFZH grid.
"""
start = time.perf_counter()
# Get the operation kwargs for this operation
op_kwargs = self._operation_kwargs.get_unique_kwargs("get_sfzh")
# Get the SFZH, skip any without stars.
# Parametric galaxies have this ready to go so we can skip them
if getattr(galaxy, "sfzh", None) is not None:
return galaxy.sfzh
elif galaxy.stars is not None and galaxy.stars.nstars > 0:
galaxy.stars.get_sfzh(
log10ages=op_kwargs["log10ages"],
metallicities=op_kwargs["metallicities"],
nthreads=self.nthreads,
)
galaxy.sfzh = galaxy.stars.sfzh
else:
# No stars, no SFZH, store a zeroed grid
galaxy.sfzh = np.zeros(
(
len(op_kwargs["log10ages"]),
len(op_kwargs["metallicities"]),
)
)
return galaxy.sfzh
# Count the number of SFZH grids we have generated
self._op_counts["SFZH"] += 1
# Record the time taken
self._op_timing["SFZH"] += time.perf_counter() - start
return galaxy.sfzh
[docs]
def get_sfh(self, log10ages, write=True):
"""Flag that the Pipeline should compute the binned SFH.
This will signal the Pipeline to compute the binned SFH when the run
method is called.
The SFH is the binned star formation history based on an arbitrary
set of age bins define din log10 space.
Args:
log10ages (array-like):
The log10 age axis of the SFH grid.
write (bool):
Whether to write out the SFH grid. Default is True.
"""
# Store the arguments for the operation
self._operation_kwargs.add_unique(
"get_sfh",
log10ages=log10ages,
)
# Flag that we will compute the SFH grid
self._do_sfh = True
# Flag that we will want to write out the SFH grid (calling the
# get_sfh method is considered the the intent to write it out
# by default)
self._write_sfh = write or self._write_sfh
def _get_sfh(self, galaxy):
"""Compute the binned SFH for each galaxy.
Args:
galaxy (Galaxy):
The galaxy to compute the SFH for.
Returns:
array-like:
The computed or cached SFH grid.
"""
start = time.perf_counter()
# Get the operation kwargs for this operation
op_kwargs = self._operation_kwargs.get_unique_kwargs("get_sfh")
# Get the SFH, skip any without stars.
# Parametric galaxies have this ready to go so we can skip them
if getattr(galaxy, "sfh", None) is not None:
return galaxy.sfh
elif galaxy.stars is not None and galaxy.stars.nstars > 0:
galaxy.stars.get_sfh(
log10ages=op_kwargs["log10ages"],
nthreads=self.nthreads,
)
galaxy.sfh = galaxy.stars.sfh
else:
# No stars, no SFH, store a zeroed grid
galaxy.sfh = np.zeros(len(op_kwargs["log10ages"]))
return galaxy.sfh
# Count the number of SFH grids we have generated
self._op_counts["SFH"] += 1
# Record the time taken
self._op_timing["SFH"] += time.perf_counter() - start
return galaxy.sfh
[docs]
def get_spectra(self, write=True):
"""Flag that the Pipeline should compute the rest frame spectra.
This will signal the Pipeline to compute the rest frame spectral
luminosity density for each galaxy when the run method is called.
The spectra are generated based on the EmissionModel and the galaxy
components.
Spectral flux densities can be computed with get_observed_spectra.
Note that there are no kwargs for this method and all models flagged
to be saved by the emission model will be computed.
Args:
write (bool):
Whether to write out the spectra. Default is True.
"""
# Flag that we will compute the spectra
self._do_lnu_spectra = True
# Flag that we will want to write out the spectra (calling the
# get_spectra method is considered the intent to write it out
# by default)
self._write_lnu_spectra = write or self._write_lnu_spectra
# Spectra has no kwargs so we don't need to store anything in
# the handler (self._operation_kwargs)
def _get_spectra(self, galaxy):
"""Generate the spectra for the galaxies based on the EmissionModel.
Args:
galaxy (Galaxy):
The galaxy to generate the spectra for.
"""
start = time.perf_counter()
# Get the spectra
galaxy.get_spectra(self.emission_model, nthreads=self.nthreads)
# Count the number of spectra we have generated
self._op_counts["Lnu Spectra"] += count_and_check_dict_recursive(
galaxy.spectra
)
if galaxy.stars is not None:
self._op_counts["Lnu Spectra"] += count_and_check_dict_recursive(
galaxy.stars.spectra
)
if galaxy.black_holes is not None:
self._op_counts["Lnu Spectra"] += count_and_check_dict_recursive(
galaxy.black_holes.spectra
)
# Record the time taken
self._op_timing["Lnu Spectra"] += time.perf_counter() - start
[docs]
def get_observed_spectra(self, cosmo, igm=None, write=True):
"""Flag that the Pipeline should compute the observed spectra.
This will signal the Pipeline to compute the observed spectral flux
density for each galaxy when the run method is called.
The observed spectra are generated based on the rest frame spectra and
the cosmology.
Args:
cosmo (astropy.cosmology.Cosmology):
The cosmology to use for the observed spectra.
igm (IGMBase):
The IGM model to use for the attenuation of the spectra.
write (bool):
Whether to write out the observed spectra. Default is True.
"""
# Store the cosmology for the operation
self._operation_kwargs.add_unique(
"get_observed_spectra",
cosmo=cosmo,
igm=igm,
)
# Flag that we will compute the observed spectra
self._do_fnu_spectra = True
# Flag that we will want to write out the observed spectra (calling the
# get_observed_spectra method is considered the intent to write it out
# by default)
self._write_fnu_spectra = write or self._write_fnu_spectra
# We need to ensure the lnu spectra are computed first
# NOTE: this is safe if the user has already called get_spectra, it
# will just leave the flag as True and respect the original intent to
# write or not write
self.get_spectra(write=False)
def _get_observed_spectra(self, galaxy):
"""Compute the observed spectra for each galaxy.
Args:
galaxy (Galaxy):
The galaxy to compute the observed spectra for.
"""
start = time.perf_counter()
# Get the operation kwargs for this operation
op_kwargs = self._operation_kwargs.get_unique_kwargs(
"get_observed_spectra"
)
# Get the observed spectra
galaxy.get_observed_spectra(
cosmo=op_kwargs["cosmo"],
igm=op_kwargs["igm"],
)
# Count the number of observed spectra we have generated
self._op_counts["Fnu Spectra"] += count_and_check_dict_recursive(
galaxy.spectra
)
if galaxy.stars is not None:
self._op_counts["Fnu Spectra"] += count_and_check_dict_recursive(
galaxy.stars.spectra
)
if galaxy.black_holes is not None:
self._op_counts["Fnu Spectra"] += count_and_check_dict_recursive(
galaxy.black_holes.spectra
)
# Record the time taken
self._op_timing["Fnu Spectra"] += time.perf_counter() - start
[docs]
def get_photometry_luminosities(
self,
*instruments,
labels=None,
write=True,
):
"""Flag that the Pipeline should compute the photometric luminosities.
This will signal the Pipeline to compute the photometric luminosities
for each galaxy when the run method is called using the passed
instrument. If multiple instruments are desired this method can
be called multiple times to add the new instruments.
The photometric luminosities are generated based on the lnu
spectra and the instrument filters.
Args:
instruments (Instrument/InstrumentCollection):
The instruments to use for the photometric luminosities.
This can be any number of instruments or instrument
collections, they will all be combined into a single
InstrumentCollection for this operation.
labels (str/list of str):
An optional label/set of labels for this set of instruments.
This is only needed if you want to apply different instrument
sets to different models. Default is None.
write (bool):
Whether to write out the photometric luminosities. Default is
True.
"""
# Flag that we will compute the photometric luminosities
self._do_luminosities = True
# Flag that we will want to write out the photometric luminosities
# (calling the get_photometry_luminosities method is considered the
# intent to write it out by default)
self._write_luminosities = write or self._write_luminosities
# Check that we have instruments to compute the photometry for
if len(instruments) == 0:
raise exceptions.PipelineNotReady(
"Cannot generate photometry without instruments with filters! "
"Pass instruments to the get_photometry_luminosities method."
)
# Unpack and attach instruments to the pipeline
_instruments = self._add_instruments(instruments)
# Check that the instruments can do photometry
for inst in _instruments:
if not inst.can_do_photometry:
raise exceptions.PipelineNotReady(
f"Cannot generate photometry with {inst.label}!"
)
# Add the instruments to the operation kwargs
self._operation_kwargs.add(
labels if labels is not None else NO_MODEL_LABEL,
"get_photometry_luminosities",
instruments=_instruments,
)
# We need to ensure the lnu spectra are computed first
# NOTE: this is safe if the user has already called get_spectra, it
# will just leave the flag as True and respect the original intent to
# write or not write
self.get_spectra(write=False)
def _get_photometry_luminosities(self, galaxy):
"""Compute the photometric luminosities from the generated spectra.
Args:
galaxy (Galaxy):
The galaxy to compute the photometric luminosities for.
"""
start = time.perf_counter()
# Loop over all queued operation configurations
for model_label, op_kwargs in self._operation_kwargs[
"get_photometry_luminosities"
]:
# Get the instruments for this operation
instruments = op_kwargs["instruments"]
# Get the photometry.
galaxy.get_photo_lnu(
filters=instruments.all_filters,
nthreads=self.nthreads,
limit_to=model_label,
)
# Count the number of photometric luminosities we have generated
self._op_counts["Luminosities"] += count_and_check_dict_recursive(
galaxy.photo_lnu
)
if galaxy.stars is not None:
self._op_counts["Luminosities"] += count_and_check_dict_recursive(
galaxy.stars.photo_lnu
)
if galaxy.black_holes is not None:
self._op_counts["Luminosities"] += count_and_check_dict_recursive(
galaxy.black_holes.photo_lnu
)
# Record the time taken
self._op_timing["Luminosities"] += time.perf_counter() - start
[docs]
def get_photometry_fluxes(
self,
*instruments,
cosmo=None,
igm=None,
labels=None,
write=True,
):
"""Flag that the Pipeline should compute the photometric fluxes.
This will signal the Pipeline to compute the photometric fluxes for
each galaxy when the run method is called.
The photometric fluxes are generated based on the fnu spectra and the
instrument filters.
Args:
instruments (Instrument/InstrumentCollection):
The instruments to use for the photometric fluxes. This can be
any number of instruments or instrument collections, they will
all be combined into a single InstrumentCollection for this
operation.
cosmo (astropy.cosmology.Cosmology):
If get_spectra_observed has not been called explicitly, then
we will need the cosmology to compute the observed spectra
first. Default is None.
igm (IGMBase):
If get_spectra_observed has not been called explicitly, then
we will need the IGM model to compute the observed spectra
first. Unlike the cosmology, this is not required if IGM
attenuation is not needed. Default is None.
labels (str/list of str):
An optional label/set of labels for this set of instruments.
This is only needed if you want to apply different instrument
sets to different models. Default is None.
write (bool):
Whether to write out the photometric fluxes. Default is True.
"""
# Flag that we will compute the photometric fluxes
self._do_fluxes = True
# Flag that we will want to write out the photometric fluxes (calling
# the get_photometry_fluxes method is considered the intent to write it
# out by default)
self._write_fluxes = write or self._write_fluxes
# Check that we have instruments to compute the photometry for
if len(instruments) == 0:
raise exceptions.PipelineNotReady(
"Cannot generate photometry without instruments with filters! "
"Pass instruments to the get_photometry_fluxes method."
)
# Unpack and attach instruments to the pipeline
_instruments = self._add_instruments(instruments)
# Check that the instruments can do photometry
for inst in _instruments:
if not inst.can_do_photometry:
raise exceptions.PipelineNotReady(
f"Cannot generate photometry with {inst.label}!"
)
# Add the instruments to the operation kwargs
self._operation_kwargs.add(
labels if labels is not None else NO_MODEL_LABEL,
"get_photometry_fluxes",
instruments=_instruments,
cosmo=cosmo,
igm=igm,
)
# We need to ensure the fnu spectra are computed first
# NOTE: this is safe if the user has already calculated
# get_observed_spectra, it will just leave the flag as True and respect
# the original intent to write or not write
self.get_observed_spectra(cosmo=cosmo, igm=igm, write=False)
def _get_photometry_fluxes(self, galaxy):
"""Compute the photometric fluxes from the generated spectra.
Args:
galaxy (Galaxy):
The galaxy to compute the photometric fluxes for.
"""
start = time.perf_counter()
# Loop over all queued operation configurations
for model_label, op_kwargs in self._operation_kwargs[
"get_photometry_fluxes"
]:
# Get the instruments for this operation
instruments = op_kwargs["instruments"]
# Get the photometry.
galaxy.get_photo_fnu(
filters=instruments.all_filters,
nthreads=self.nthreads,
limit_to=model_label,
)
# Count the number of photometric fluxes we have generated
self._op_counts["Fluxes"] += count_and_check_dict_recursive(
galaxy.photo_fnu
)
if galaxy.stars is not None:
self._op_counts["Fluxes"] += count_and_check_dict_recursive(
galaxy.stars.photo_fnu
)
if galaxy.black_holes is not None:
self._op_counts["Fluxes"] += count_and_check_dict_recursive(
galaxy.black_holes.photo_fnu
)
# Record the time taken
self._op_timing["Fluxes"] += time.perf_counter() - start
[docs]
def get_lines(self, line_ids, write=True):
"""Flag that the Pipeline should compute the emission lines.
This will signal the Pipeline to compute the emission lines for each
galaxy when the run method is called.
The emission lines are generated based on the lnu spectra and the
EmissionModel.
Args:
line_ids (list):
The emission line IDs to generate.
write (bool):
Whether to write out the emission lines. Default is True.
"""
# Store the line IDs for the operation
self._operation_kwargs.add_unique(
"get_lines",
line_ids=line_ids,
)
# Flag that we will compute the emission lines
self._do_lum_lines = True
# Some models require spectra to compute certain line emission
# contributions, if we have now spectra warn the user
if not self._do_lnu_spectra:
warn(
"Some generation emission models may require spectra to "
"in addition to lines. Currently no spectra will be "
"generated. This could be fine depending on the "
"emission model but if not call get_spectra before running "
"the pipeline."
)
# Flag that we will want to write out the emission lines (calling the
# get_lines method is considered the intent to write it out
# by default)
self._write_lines = write or self._write_lines
# Store the line IDs, we'll write these once later
self.line_ids = line_ids
def _get_lines(self, galaxy):
"""Generate the emission lines for the galaxies.
This function will generate the emission lines for all spectra types
that were saved when spectra were generated.
Args:
galaxy (Galaxy):
The galaxy to generate the emission lines for.
"""
start = time.perf_counter()
# Get the operation kwargs for this operation
op_kwargs = self._operation_kwargs.get_unique_kwargs("get_lines")
# Loop over the galaxies and get the spectra
galaxy.get_lines(
line_ids=op_kwargs["line_ids"],
emission_model=self.emission_model,
nthreads=self.nthreads,
)
# Store the line wavelengths for writing, we only do this once since
# they are the same for all galaxies but we need to find them first
# (either on the galaxy, stars, or black holes)
if self.line_lams is None:
# Get the line lams from the first line collection we can find
for lines in galaxy.lines.values():
self.line_lams = lines.lam
break
if self.line_lams is None and galaxy.stars is not None:
for lines in galaxy.stars.lines.values():
self.line_lams = lines.lam
break
if self.line_lams is None and galaxy.black_holes is not None:
for lines in galaxy.black_holes.lines.values():
self.line_lams = lines.lam
break
# Count the number of emission lines we have generated
self._op_counts["Emission Line Luminosities"] += (
count_and_check_dict_recursive(galaxy.lines)
)
if galaxy.stars is not None:
self._op_counts["Emission Line Luminosities"] += (
count_and_check_dict_recursive(galaxy.stars.lines)
)
if galaxy.black_holes is not None:
self._op_counts["Emission Line Luminosities"] += (
count_and_check_dict_recursive(galaxy.black_holes.lines)
)
# Record the time taken
self._op_timing["Emission Line Luminosities"] += (
time.perf_counter() - start
)
[docs]
def get_observed_lines(
self,
cosmo,
igm=None,
line_ids=None,
write=True,
):
"""Flag that the Pipeline should compute the observed emission lines.
This will signal the Pipeline to compute the observed emission lines
for each galaxy when the run method is called.
The observed emission lines are generated based on the emission lines
and the cosmology.
Args:
cosmo (astropy.cosmology.Cosmology):
The cosmology to use for the observed emission lines.
igm (IGMBase):
The IGM model to use for the observed emission lines.
line_ids (list):
If get_lines has not been called explicitly, then we will need
the line IDs to generate the emission lines. Default is None.
write (bool):
Whether to write out the observed emission lines. Default is
True.
"""
# Store the kwargs for the operation
self._operation_kwargs.add_unique(
"get_observed_lines",
cosmo=cosmo,
igm=igm,
)
# Flag that we will compute the observed emission lines
self._do_flux_lines = True
# Ensure we have line IDs if we need to compute the emission lines and
# get_lines has not been called
if "get_lines" not in self._operation_kwargs and line_ids is None:
raise exceptions.PipelineNotReady(
"Cannot generate observed lines without line IDs, "
"please pass a list of line IDs to the line_ids argument of "
"get_observed_lines."
)
self._operation_kwargs.add_unique(
"get_lines",
line_ids=line_ids,
)
# Flag that we will want to write out the observed emission lines
# (calling the get_observed_lines method is considered the intent to
# write it out by default)
self._write_flux_lines = write or self._write_flux_lines
# We need to ensure the emission lines are computed first
# NOTE: this is safe if the user has already called get_lines, it
# will just leave the flag as True and respect the original intent to
# write or not write
self.get_lines(line_ids=line_ids, write=False)
def _get_observed_lines(self, galaxy):
"""Compute the observed emission lines for each galaxy.
Args:
galaxy (Galaxy):
The galaxy to compute the observed emission lines for.
"""
start = time.perf_counter()
# Get the operation kwargs for this operation
op_kwargs = self._operation_kwargs.get_unique_kwargs(
"get_observed_lines"
)
# Get the observed emission lines
galaxy.get_observed_lines(
cosmo=op_kwargs["cosmo"],
igm=op_kwargs["igm"],
)
# Store the observed line wavelengths for writing, we only do this once
# since they are the same for all galaxies but we need to find them
# first (either on the galaxy, stars, or black holes)
if self.line_obs_lams is None:
# Get the line lams from the first line collection we can find
for lines in galaxy.lines.values():
self.line_obs_lams = lines.obslam
break
if self.line_obs_lams is None and galaxy.stars is not None:
for lines in galaxy.stars.lines.values():
self.line_obs_lams = lines.obslam
break
if self.line_obs_lams is None and galaxy.black_holes is not None:
for lines in galaxy.black_holes.lines.values():
self.line_obs_lams = lines.obslam
break
# Count the number of observed emission lines we have generated
self._op_counts["Emission Line Fluxes"] += (
count_and_check_dict_recursive(galaxy.lines)
)
if galaxy.stars is not None:
self._op_counts["Emission Line Fluxes"] += (
count_and_check_dict_recursive(galaxy.stars.lines)
)
if galaxy.black_holes is not None:
self._op_counts["Emission Line Fluxes"] += (
count_and_check_dict_recursive(galaxy.black_holes.lines)
)
# Record the time taken
self._op_timing["Emission Line Fluxes"] += time.perf_counter() - start
[docs]
def get_images_luminosity(
self,
*instruments,
fov=None,
img_type="smoothed",
kernel=None,
kernel_threshold=1.0,
labels=None,
psf_resample_factor=1,
cosmo=None,
write=True,
):
"""Flag that the Pipeline should compute the luminosity images.
This will signal the Pipeline to compute the luminosity images for each
galaxy when the run method is called.
The luminosity images are generated based on the luminosities, and
in turn the lnu spectra, and the instrument filters.
Args:
instruments (Instrument/InstrumentCollection):
The instruments to use for the luminosity images. This can be
any number of instruments or instrument collections, they will
all be combined into a single InstrumentCollection for this
operation.
fov (unyt_quantity):
The field of view of the image with units.
img_type (str):
The type of image to generate. Options are 'smoothed' or
'hist'. Default is 'smoothed'.
kernel (array-like):
The kernel to use for smoothing the image. Default is None.
Required for 'smoothed' images from a particle distribution.
kernel_threshold (float):
The threshold of the kernel. Default is 1.0.
labels (list/str):
The type of spectra to generate images for. By default this
is None and all saved spectra types will be used. This can
either be a list of strings or a single string.
psf_resample_factor (int):
(Only applicable for instruments with a PSF.) The resample
factor for the PSF. This should be a value greater than 1.
The image will be resampled by this factor before the
PSF is applied and then downsampled back to the original
after convolution. This can help minimize the effects of
using a generic PSF centred on the galaxy centre, a
simplification we make for performance reasons (the
effects are sufficiently small that this simplifications is
justified).
cosmo (astropy.cosmology.Cosmology):
The cosmology to use for the calculation of the luminosity
distance. Only needed for internal conversions from cartesian
to angular coordinates when an angular resolution is used.
Default is None.
write (bool):
Whether to write out the luminosity images. Default is True.
"""
# If we have no labels then use all saved models
if labels is None:
warn(
"No labels were passed to get_images_luminosity. We will "
f"generate images for: {self.emission_model.saved_labels}. "
"This could be very expensive depending on the number of "
" models and image sizes."
)
labels = self.emission_model.saved_labels
# Flag that we will compute the luminosity images
self._do_images_lum = True
# Ensure we have a field of view if we need to compute the images
if fov is None:
raise exceptions.InconsistentArguments(
"Cannot generate images without a field of view, please pass "
"one to the fov argument of get_images_luminosity."
)
# Check that we have instruments to compute the images for
if len(instruments) == 0:
raise exceptions.PipelineNotReady(
"Cannot generate images without instruments with filters! "
"Pass instruments to the get_images_luminosity method."
)
# Unpack and attach instruments to the pipeline
_instruments = self._add_instruments(instruments)
# Check that the instruments can do imaging
for inst in _instruments:
if not inst.can_do_imaging:
raise exceptions.PipelineNotReady(
f"Cannot generate images with {inst.label}!"
)
# Store the arguments for the operation
self._operation_kwargs.add(
labels,
"get_images_luminosity",
instruments=_instruments,
fov=fov,
img_type=img_type,
kernel=kernel,
kernel_threshold=kernel_threshold,
psf_resample_factor=psf_resample_factor,
cosmo=cosmo,
)
# Validate noise attribute units for luminosity images
validate_noise_unit_compatibility(_instruments, "erg/s/Hz")
# Based on the instruments we have and the write argument, set the
# appropriate writing flags
if write:
self._write_images_lum = True
for inst in _instruments:
if inst.can_do_psf_imaging:
self._write_images_lum_psf = True
if inst.can_do_noisy_imaging:
self._write_images_lum_noise = True
# We need to ensure the photometric luminosities are computed first
# NOTE: this is safe if the user has already called
# get_photometry_luminosities, it will just leave the flag as True and
# respect the original intent to write or not write
self.get_photometry_luminosities(
*instruments,
labels=labels,
write=False,
)
def _get_images_luminosity(self, galaxy):
"""Compute the luminosity images for the galaxies.
This function will compute the luminosity images for all spectra types
that were saved when spectra were generated, in all filters included in
the Pipeline instruments.
Args:
galaxy (Galaxy):
The galaxy to generate the luminosity images for.
"""
start = time.perf_counter()
# We want to time PSF application and noise application separately
psf_time = 0
noise_time = 0
# Loop over all the queued operation configurations
for model_label, op_kwargs in self._operation_kwargs[
"get_images_luminosity"
]:
# Get the instruments for this operation
instruments = op_kwargs["instruments"]
# Loop over instruments and perform any imaging they define
for inst in instruments:
# Get the basic images for the requested spectra types
galaxy.get_images_luminosity(
*model_label,
fov=op_kwargs["fov"],
img_type=op_kwargs["img_type"],
kernel=op_kwargs["kernel"],
kernel_threshold=op_kwargs["kernel_threshold"],
nthreads=self.nthreads,
instrument=inst,
cosmo=op_kwargs["cosmo"],
)
# Apply the PSF if applicable to the instrument
if inst.can_do_psf_imaging:
psf_start = time.perf_counter()
galaxy.apply_psf_to_images_lnu(
instrument=inst,
psf_resample_factor=op_kwargs["psf_resample_factor"],
limit_to=model_label,
)
psf_time += time.perf_counter() - psf_start
# Apply the instrument noise if applicable to the instrument
if inst.can_do_noisy_imaging:
noise_start = time.perf_counter()
galaxy.apply_noise_to_images_lnu(
instrument=inst,
limit_to=model_label,
apply_to_psf=inst.can_do_psf_imaging,
)
noise_time += time.perf_counter() - noise_start
# Count the number of images we have generated
self._op_counts["Luminosity Images"] += count_and_check_dict_recursive(
galaxy.images_lnu
)
self._op_counts["Luminosity Images (With PSF)"] += (
count_and_check_dict_recursive(galaxy.images_psf_lnu)
)
self._op_counts["Luminosity Images (With Noise)"] += (
count_and_check_dict_recursive(galaxy.images_noise_lnu)
)
if galaxy.stars is not None:
self._op_counts["Luminosity Images"] += (
count_and_check_dict_recursive(galaxy.stars.images_lnu)
)
self._op_counts["Luminosity Images (With PSF)"] += (
count_and_check_dict_recursive(galaxy.stars.images_psf_lnu)
)
self._op_counts["Luminosity Images (With Noise)"] += (
count_and_check_dict_recursive(galaxy.stars.images_noise_lnu)
)
if galaxy.black_holes is not None:
self._op_counts["Luminosity Images"] += (
count_and_check_dict_recursive(galaxy.black_holes.images_lnu)
)
self._op_counts["Luminosity Images (With PSF)"] += (
count_and_check_dict_recursive(
galaxy.black_holes.images_psf_lnu
)
)
self._op_counts["Luminosity Images (With Noise)"] += (
count_and_check_dict_recursive(
galaxy.black_holes.images_noise_lnu
)
)
# Record the time taken
self._op_timing["Luminosity Images"] += (
time.perf_counter() - start - psf_time - noise_time
)
self._op_timing["Luminosity Images (With PSF)"] += psf_time
self._op_timing["Luminosity Images (With Noise)"] += noise_time
[docs]
def get_images_flux(
self,
*instruments,
fov=None,
img_type="smoothed",
kernel=None,
kernel_threshold=1.0,
cosmo=None,
igm=None,
labels=None,
psf_resample_factor=1,
write=True,
):
"""Flag that the Pipeline should compute the flux images.
This will signal the Pipeline to compute the flux images for each
galaxy when the run method is called.
The flux images are generated based on the fluxes, and in turn the fnu
spectra, and the instrument filters.
Args:
instruments (Instrument/InstrumentCollection):
The instruments to use for the flux images. This can be any
number of instruments or instrument collections, they will
all be combined into a single InstrumentCollection for this
operation.
fov (unyt_quantity):
The field of view of the image with units.
img_type (str):
The type of image to generate. Options are 'smoothed' or
'hist'. Default is 'smoothed'.
kernel (array-like):
The kernel to use for smoothing the image. Default is None.
Required for 'smoothed' images from a particle distribution.
kernel_threshold (float):
The threshold of the kernel. Default is 1.0.
cosmo (astropy.cosmology.Cosmology):
If get_spectra_observed has not been called explicitly, then
we will need the cosmology to compute the observed spectra
first. Default is None.
igm (IGMBase):
If get_spectra_observed has not been called explicitly, then
we will need the IGM model to compute the observed spectra
first. Unlike the cosmology, this is not required if IGM
attenuation is not needed. Default is None.
labels (list/str):
The type of spectra to generate images for. By default this
is None and all saved spectra types will be used. This can
either be a list of strings or a single string.
psf_resample_factor (int):
(Only applicable for instruments with a PSF.) The resample
factor for the PSF. This should be a value greater than 1.
The image will be resampled by this factor before the
PSF is applied and then downsampled back to the original
after convolution. This can help minimize the effects of
using a generic PSF centred on the galaxy centre, a
simplification we make for performance reasons (the
effects are sufficiently small that this simplifications is
justified).
write (bool):
Whether to write out the flux images. Default is True.
"""
# If we have no labels then use all saved models
if labels is None:
warn(
"No labels were passed to get_images_flux. We will generate "
f"images for: {self.emission_model.saved_labels}. This could "
"be very expensive depending on the number of models and "
"image sizes."
)
labels = self.emission_model.saved_labels
# Flag that we will compute the flux images
self._do_images_flux = True
# Ensure we have a field of view if we need to compute the images
if fov is None:
raise exceptions.InconsistentArguments(
"Cannot generate images without a field of view, please "
"pass one to the fov argument of get_images_flux."
)
# Ensure we have a cosmology if we need to compute the observed spectra
# and get_spectra_observed has not been called
if (
not self._operation_kwargs.has("get_observed_spectra")
and cosmo is None
):
raise exceptions.PipelineNotReady(
"Cannot generate flux images without an astropy.cosmology"
" object, please pass one to the cosmo argument of "
"get_images_flux."
)
# Check that we have instruments to compute the images for
if len(instruments) == 0:
raise exceptions.PipelineNotReady(
"Cannot generate images without instruments with filters! "
"Pass instruments to the get_images_flux method."
)
# Unpack and attach instruments to the pipeline
_instruments = self._add_instruments(instruments)
# Check that the instruments can do imaging
for inst in _instruments:
if not inst.can_do_imaging:
raise exceptions.PipelineNotReady(
f"Cannot generate images with {inst.label}!"
)
# Store the arguments for the operation
self._operation_kwargs.add(
labels,
"get_images_flux",
instruments=_instruments,
fov=fov,
img_type=img_type,
kernel=kernel,
kernel_threshold=kernel_threshold,
psf_resample_factor=psf_resample_factor,
cosmo=cosmo,
)
# Validate noise attribute units for flux images
validate_noise_unit_compatibility(_instruments, "nJy")
# Based on the instruments we have and the write argument, set the
# appropriate writing flags
if write:
self._write_images_flux = True
for inst in _instruments:
if inst.can_do_psf_imaging:
self._write_images_flux_psf = True
if inst.can_do_noisy_imaging:
self._write_images_flux_noise = True
# We need to ensure the photometric fluxes are computed first
# NOTE: this is safe if the user has already called
# get_photometry_fluxes, it will just leave the flag as True and
# respect the original intent to write or not write
self.get_photometry_fluxes(
*instruments,
labels=labels,
cosmo=cosmo,
igm=igm,
write=False,
)
def _get_images_flux(self, galaxy):
"""Compute the flux images for the galaxies.
This function will compute the flux images for all spectra types that
were saved when spectra were generated, in all filters included in the
Pipeline instruments.
Args:
galaxy (Galaxy):
The galaxy to generate the flux images for (all components).
"""
start = time.perf_counter()
# We want to time PSF application and noise application separately
psf_time = 0
noise_time = 0
# Loop over all the queued operation configurations
for model_label, op_kwargs in self._operation_kwargs[
"get_images_flux"
]:
# Get the instruments for this operation
instruments = op_kwargs["instruments"]
# Loop over instruments and perform any imaging they define
for inst in instruments:
# Get the basic images for the requested spectra types
galaxy.get_images_flux(
*model_label,
fov=op_kwargs["fov"],
img_type=op_kwargs["img_type"],
kernel=op_kwargs["kernel"],
kernel_threshold=op_kwargs["kernel_threshold"],
cosmo=op_kwargs.get("cosmo", None),
nthreads=self.nthreads,
instrument=inst,
)
# Apply the PSF if applicable to the instrument
if inst.can_do_psf_imaging:
psf_start = time.perf_counter()
galaxy.apply_psf_to_images_fnu(
instrument=inst,
psf_resample_factor=op_kwargs["psf_resample_factor"],
limit_to=model_label,
)
psf_time += time.perf_counter() - psf_start
# Apply the instrument noise if applicable to the instrument
if inst.can_do_noisy_imaging:
noise_start = time.perf_counter()
galaxy.apply_noise_to_images_fnu(
instrument=inst,
limit_to=model_label,
apply_to_psf=inst.can_do_psf_imaging,
)
noise_time += time.perf_counter() - noise_start
# Count the number of images we have generated
self._op_counts["Flux Images"] += count_and_check_dict_recursive(
galaxy.images_fnu
)
self._op_counts["Flux Images (With PSF)"] += (
count_and_check_dict_recursive(galaxy.images_psf_fnu)
)
self._op_counts["Flux Images (With Noise)"] += (
count_and_check_dict_recursive(galaxy.images_noise_fnu)
)
if galaxy.stars is not None:
self._op_counts["Flux Images"] += count_and_check_dict_recursive(
galaxy.stars.images_fnu
)
self._op_counts["Flux Images (With PSF)"] += (
count_and_check_dict_recursive(galaxy.stars.images_psf_fnu)
)
self._op_counts["Flux Images (With Noise)"] += (
count_and_check_dict_recursive(galaxy.stars.images_noise_fnu)
)
if galaxy.black_holes is not None:
self._op_counts["Flux Images"] += count_and_check_dict_recursive(
galaxy.black_holes.images_fnu
)
self._op_counts["Flux Images (With PSF)"] += (
count_and_check_dict_recursive(
galaxy.black_holes.images_psf_fnu
)
)
self._op_counts["Flux Images (With Noise)"] += (
count_and_check_dict_recursive(
galaxy.black_holes.images_noise_fnu
)
)
# Record the time taken
self._op_timing["Flux Images"] += (
time.perf_counter() - start - psf_time - noise_time
)
self._op_timing["Flux Images (With PSF)"] += psf_time
self._op_timing["Flux Images (With Noise)"] += noise_time
[docs]
def get_data_cubes_lnu(
self,
*instruments,
fov,
cube_type="smoothed",
kernel=None,
kernel_threshold=1.0,
cosmo=None,
labels=None,
write=True,
):
"""Flag that the Pipeline should compute luminosity data cubes.
This will signal the Pipeline to compute the spectral luminosity
density data cubes for each galaxy when the run method is called.
Args:
instruments (Instrument/InstrumentCollection):
The instruments to use for the luminosity data cubes. This can
be any number of instruments or instrument collections, they
will all be combined into a single InstrumentCollection for
this operation. The resolution and wavelength array will be
taken from each instrument.
fov (unyt_quantity):
The width of the data cube in image coordinates.
cube_type (str):
The type of data cube to make. Either "smoothed" to smooth
particle spectra over a kernel or "hist" to sort particle
spectra into individual spaxels. Default is "smoothed".
kernel (array-like):
The kernel to use for smoothing. Default is None.
Required for 'smoothed' cubes from a particle distribution.
kernel_threshold (float):
The threshold of the kernel. Default is 1.0.
cosmo (astropy.cosmology.Cosmology):
The cosmology to use for unit conversions between angular
and Cartesian coordinates. Only needed when resolution and
fov have different unit systems. Default is None.
labels (list/str, optional):
The type of spectra to generate data cubes for. By default
this is None and all saved spectra types will be used. This
can either be a list of strings or a single string.
write (bool):
Whether to write out the luminosity data cubes. Default is
True.
"""
# If we have no labels then use all saved models
if labels is None:
warn(
"No labels were passed to get_data_cubes_lnu. We will "
f"generate data cubes for: {self.emission_model.saved_labels}."
" This could be very expensive depending on the number of "
"models and cube sizes."
)
labels = self.emission_model.saved_labels
# Flag that we will compute the lnu data cubes
self._do_lnu_data_cubes = True
# Flag that we will want to write out the lnu data cubes
self._write_lnu_data_cubes = write or self._write_lnu_data_cubes
# Check that we have instruments to compute the data cubes for
if len(instruments) == 0:
raise exceptions.PipelineNotReady(
"Cannot generate data cubes without instruments! "
"Pass instruments to the get_data_cubes_lnu method."
)
# Unpack and attach instruments to the pipeline
_instruments = self._add_instruments(instruments)
# Check that the instruments can do resolved spectroscopy
for inst in _instruments:
if not inst.can_do_resolved_spectroscopy:
raise exceptions.PipelineNotReady(
f"Cannot generate data cubes with {inst.label}! "
"Instruments must have both resolution and lam defined."
)
# Ensure we have a field of view
if fov is None:
raise exceptions.InconsistentArguments(
"Cannot generate data cubes without a field of view, "
"please pass one to the fov argument."
)
# Validate kernel for smoothed data cubes
if cube_type == "smoothed" and kernel is None:
raise exceptions.InconsistentArguments(
"Cannot generate smoothed data cubes without a kernel! "
"Please pass a kernel to the get_data_cubes_lnu method. "
"Example: from synthesizer.kernel_functions import Kernel; "
"kernel = Kernel().get_kernel(). Available kernel names: "
"'uniform', 'sph_anarchy', 'gadget_2', 'cubic', 'quintic'. "
"Alternatively, use cube_type='hist' for histogram-based "
"data cubes."
)
# Add the instruments to the operation kwargs
self._operation_kwargs.add(
labels if labels is not None else NO_MODEL_LABEL,
"get_data_cubes_lnu",
instruments=_instruments,
fov=fov,
cube_type=cube_type,
kernel=kernel,
kernel_threshold=kernel_threshold,
cosmo=cosmo,
)
# We need to ensure the lnu spectra are computed first
# NOTE: this is safe if the user has already called
# get_spectra, it will just leave the flag as True and
# respect the original intent to write or not write
self.get_spectra(write=False)
def _get_data_cubes_lnu(self, galaxy):
"""Compute the luminosity data cubes for the galaxy.
Args:
galaxy (Galaxy):
The galaxy to generate data cubes for.
"""
start = time.perf_counter()
# Iterate over all queued operation configurations
for model_label, op_kwargs in self._operation_kwargs[
"get_data_cubes_lnu"
]:
# Get the instruments for this operation
instruments = op_kwargs["instruments"]
# Get the data cube for these labels
# Note: model_label can be a list of labels
if not isinstance(model_label, (list, tuple)):
model_label = [model_label]
# Loop over instruments
for inst in instruments:
# Loop over each label and create a data cube
for label in model_label:
# Determine which component this label belongs to
# and call get_data_cube appropriately
cube = galaxy.get_data_cube(
resolution=inst.resolution,
fov=op_kwargs["fov"],
lam=inst.lam,
cube_type=op_kwargs.get("cube_type", "smoothed"),
stellar_spectra=label
if label in getattr(galaxy.stars, "spectra", {})
or label
in getattr(galaxy.stars, "particle_spectra", {})
else None,
blackhole_spectra=label
if label in getattr(galaxy.black_holes, "spectra", {})
or label
in getattr(galaxy.black_holes, "particle_spectra", {})
else None,
kernel=op_kwargs.get("kernel"),
kernel_threshold=op_kwargs.get(
"kernel_threshold", 1.0
),
quantity="lnu",
cosmo=op_kwargs.get("cosmo"),
nthreads=self.nthreads,
)
# Store the cube (we'll need to add a data_cubes_lnu
# attribute to galaxies)
if not hasattr(galaxy, "data_cubes_lnu"):
galaxy.data_cubes_lnu = {}
# Use instrument label in the key to differentiate
# between data cubes from different instruments
key = f"{label}_{inst.label}"
galaxy.data_cubes_lnu[key] = cube
# Count the number of data cubes we have generated
if hasattr(galaxy, "data_cubes_lnu"):
self._op_counts["Lnu Data Cubes"] += len(galaxy.data_cubes_lnu)
# Record the time taken
self._op_timing["Lnu Data Cubes"] += time.perf_counter() - start
[docs]
def get_data_cubes_fnu(
self,
*instruments,
fov,
cube_type="smoothed",
kernel=None,
kernel_threshold=1.0,
cosmo=None,
igm=None,
labels=None,
write=True,
):
"""Flag that the Pipeline should compute flux density data cubes.
This will signal the Pipeline to compute the spectral flux density
data cubes for each galaxy when the run method is called.
Args:
instruments (Instrument/InstrumentCollection):
The instruments to use for the flux density data cubes. This
can be any number of instruments or instrument collections,
they will all be combined into a single InstrumentCollection
for this operation. The resolution and wavelength array will
be taken from each instrument.
fov (unyt_quantity):
The width of the data cube in image coordinates.
cube_type (str):
The type of data cube to make. Either "smoothed" to smooth
particle spectra over a kernel or "hist" to sort particle
spectra into individual spaxels. Default is "smoothed".
kernel (array-like):
The kernel to use for smoothing. Default is None.
Required for 'smoothed' cubes from a particle distribution.
kernel_threshold (float):
The threshold of the kernel. Default is 1.0.
cosmo (astropy.cosmology):
The cosmology object to use for the redshift conversion.
Default is None.
igm (str):
The IGM model to use for the attenuation. Default is None,
meaning no IGM attenuation is applied.
labels (list/str, optional):
The type of spectra to generate data cubes for. By default
this is None and all saved spectra types will be used. This
can either be a list of strings or a single string.
write (bool):
Whether to write out the flux density data cubes. Default is
True.
"""
# If we have no labels then use all saved models
if labels is None:
warn(
"No labels were passed to get_data_cubes_fnu. We will "
f"generate data cubes for: {self.emission_model.saved_labels}."
" This could be very expensive depending on the number of "
"models and cube sizes."
)
labels = self.emission_model.saved_labels
# Flag that we will compute the fnu data cubes
self._do_fnu_data_cubes = True
# Flag that we will want to write out the fnu data cubes
self._write_fnu_data_cubes = write or self._write_fnu_data_cubes
# Check that we have instruments to compute the data cubes for
if len(instruments) == 0:
raise exceptions.PipelineNotReady(
"Cannot generate data cubes without instruments! "
"Pass instruments to the get_data_cubes_fnu method."
)
# Unpack and attach instruments to the pipeline
_instruments = self._add_instruments(instruments)
# Check that the instruments can do resolved spectroscopy
for inst in _instruments:
if not inst.can_do_resolved_spectroscopy:
raise exceptions.PipelineNotReady(
f"Cannot generate data cubes with {inst.label}! "
"Instruments must have both resolution and lam defined."
)
# Ensure we have a field of view
if fov is None:
raise exceptions.InconsistentArguments(
"Cannot generate data cubes without a field of view, "
"please pass one to the fov argument."
)
# Validate kernel for smoothed data cubes
if cube_type == "smoothed" and kernel is None:
raise exceptions.InconsistentArguments(
"Cannot generate smoothed data cubes without a kernel! "
"Please pass a kernel to the get_data_cubes_fnu method. "
"Example: from synthesizer.kernel_functions import Kernel; "
"kernel = Kernel().get_kernel(). Available kernel names: "
"'uniform', 'sph_anarchy', 'gadget_2', 'cubic', 'quintic'. "
"Alternatively, use cube_type='hist' for histogram-based "
"data cubes."
)
# Add the instruments to the operation kwargs
self._operation_kwargs.add(
labels if labels is not None else NO_MODEL_LABEL,
"get_data_cubes_fnu",
instruments=_instruments,
fov=fov,
cube_type=cube_type,
kernel=kernel,
kernel_threshold=kernel_threshold,
cosmo=cosmo,
igm=igm,
)
# We need to ensure the observed spectra are computed first
# NOTE: this is safe if the user has already called
# get_observed_spectra, it will just leave the flag as True and
# respect the original intent to write or not write
self.get_observed_spectra(write=False, cosmo=cosmo, igm=igm)
def _get_data_cubes_fnu(self, galaxy):
"""Compute the flux density data cubes for the galaxy.
Args:
galaxy (Galaxy):
The galaxy to generate data cubes for.
"""
start = time.perf_counter()
# Iterate over all queued operation configurations
for model_label, op_kwargs in self._operation_kwargs[
"get_data_cubes_fnu"
]:
# Get the instruments for this operation
instruments = op_kwargs["instruments"]
# Get the data cube for these labels
# Note: model_label can be a list of labels
if not isinstance(model_label, (list, tuple)):
model_label = [model_label]
# Loop over instruments
for inst in instruments:
# Loop over each label and create a data cube
for label in model_label:
# Determine which component this label belongs to
# and call get_data_cube appropriately
cube = galaxy.get_data_cube(
resolution=inst.resolution,
fov=op_kwargs["fov"],
lam=inst.lam,
cube_type=op_kwargs.get("cube_type", "smoothed"),
stellar_spectra=label
if label in getattr(galaxy.stars, "spectra", {})
or label
in getattr(galaxy.stars, "particle_spectra", {})
else None,
blackhole_spectra=label
if label in getattr(galaxy.black_holes, "spectra", {})
or label
in getattr(galaxy.black_holes, "particle_spectra", {})
else None,
kernel=op_kwargs.get("kernel"),
kernel_threshold=op_kwargs.get(
"kernel_threshold", 1.0
),
quantity="fnu",
cosmo=op_kwargs.get("cosmo"),
nthreads=self.nthreads,
)
# Store the cube
if not hasattr(galaxy, "data_cubes_fnu"):
galaxy.data_cubes_fnu = {}
# Use instrument label in the key to differentiate
# between data cubes from different instruments
key = f"{label}_{inst.label}"
galaxy.data_cubes_fnu[key] = cube
# Count the number of data cubes we have generated
if hasattr(galaxy, "data_cubes_fnu"):
self._op_counts["Fnu Data Cubes"] += len(galaxy.data_cubes_fnu)
# Record the time taken
self._op_timing["Fnu Data Cubes"] += time.perf_counter() - start
[docs]
def get_spectroscopy_lnu(self, *instruments, labels=None, write=True):
"""Flag that the Pipeline should compute spectral luminosity density.
This will signal the Pipeline to compute the spectral luminosity
density for each galaxy when the run method is called.
Args:
instruments (Instrument/InstrumentCollection):
The instruments to use for the spectral luminosity density.
This can be any number of instruments or instrument
collections, they will all be combined into a single
InstrumentCollection for this operation.
labels (list/str, optional):
The type of spectra to generate spectroscopy for. By default
this is None and all saved spectra types will be used. This
can either be a list of strings or a single string.
write (bool):
Whether to write out the spectral luminosity density. Default
is True.
"""
# If we have no labels then use all saved models
if labels is None:
labels = self.emission_model.saved_labels
# Flag that we will compute the spectral luminosity density
self._do_spectroscopy_lnu = True
# Flag that we will want to write out the spectral luminosity density
# (calling the get_spectroscopy_lnu method is considered the intent
# to write it out by default)
self._write_spectroscopy_lnu = write or self._write_spectroscopy_lnu
# Check that we have instruments to compute the spectroscopy for
if len(instruments) == 0:
raise exceptions.PipelineNotReady(
"Cannot generate spectroscopy without instruments with "
"spectroscopy! Pass instruments to the get_spectroscopy_lnu "
"method."
)
# Unpack and attach instruments to the pipeline
_instruments = self._add_instruments(instruments)
# Check that the instruments can do spectroscopy
for inst in _instruments:
if not inst.can_do_spectroscopy:
raise exceptions.PipelineNotReady(
f"Cannot generate spectroscopy with {inst.label}!"
)
# Store the arguments for the operation
self._operation_kwargs.add(
labels,
"get_spectroscopy_lnu",
instruments=_instruments,
)
# We need to ensure the lnu spectra are computed first
# NOTE: this is safe if the user has already called
# get_spectra, it will just leave the flag as True and
# respect the original intent to write or not write
self.get_spectra(write=False)
def _get_spectroscopy_lnu(self, galaxy):
"""Compute the spectral luminosity density for the galaxy.
This function will compute the spectral luminosity density for all
spectra stored on the galaxy and its components.
Args:
galaxy (Galaxy):
The galaxy to generate the spectroscopy for.
"""
start = time.perf_counter()
# Iterate over all queued operation configurations
for model_label, op_kwargs in self._operation_kwargs[
"get_spectroscopy_lnu"
]:
# Get the instruments for this operation
instruments = op_kwargs["instruments"]
# Loop over instruments
for inst in instruments:
# Get the spectroscopy for this instrument
galaxy.get_spectroscopy(inst, limit_to=model_label)
# Count the number of spectroscopy items we have generated
self._op_counts["Spectroscopy Lnu"] += count_and_check_dict_recursive(
galaxy.spectroscopy
)
if galaxy.stars is not None:
self._op_counts["Spectroscopy Lnu"] += (
count_and_check_dict_recursive(galaxy.stars.spectroscopy)
)
if galaxy.black_holes is not None:
self._op_counts["Spectroscopy Lnu"] += (
count_and_check_dict_recursive(galaxy.black_holes.spectroscopy)
)
# Record the time taken
self._op_timing["Spectroscopy Lnu"] += time.perf_counter() - start
[docs]
def get_spectroscopy_fnu(
self,
*instruments,
labels=None,
cosmo=None,
igm=None,
write=True,
):
"""Flag that the Pipeline should compute the spectral flux density.
This will signal the Pipeline to compute the spectral flux density
for each galaxy when the run method is called.
Args:
instruments (Instrument/InstrumentCollection):
The instruments to use for the spectral flux density. This can
be any number of instruments or instrument collections, they
will all be combined into a single InstrumentCollection for
this operation.
labels (list/str, optional):
The type of spectra to generate spectroscopy for. By default
this is None and all saved spectra types will be used. This
can either be a list of strings or a single string.
cosmo (astropy.cosmology):
The cosmology object to use for the redshift conversion.
Default is None.
igm (str):
The IGM model to use for the attenuation. Default is None,
meaning no IGM attenuation is applied.
write (bool):
Whether to write out the spectral flux density. Default is
True.
"""
# If we have no labels then use all saved models
if labels is None:
labels = self.emission_model.saved_labels
# Flag that we will compute the spectral flux density
self._do_spectroscopy_fnu = True
# Flag that we will want to write out the spectral flux density
# (calling the get_spectroscopy_fnu method is considered the intent
# to write it out by default)
self._write_spectroscopy_fnu = write or self._write_spectroscopy_fnu
# Check that we have instruments to compute the spectroscopy for
if len(instruments) == 0:
raise exceptions.PipelineNotReady(
"Cannot generate spectroscopy without instruments with "
"spectroscopy! Pass instruments to the get_spectroscopy_fnu "
"method."
)
# Unpack and attach instruments to the pipeline
_instruments = self._add_instruments(instruments)
# Check that the instruments can do spectroscopy
for inst in _instruments:
if not inst.can_do_spectroscopy:
raise exceptions.PipelineNotReady(
f"Cannot generate spectroscopy with {inst.label}!"
)
# Store the arguments for the operation
self._operation_kwargs.add(
labels,
"get_spectroscopy_fnu",
instruments=_instruments,
)
# We need to ensure the fnu spectra are computed first
# NOTE: this is safe if the user has already called
# get_observed_spectra, it will just leave the flag as True and
# respect the original intent to write or not write
self.get_observed_spectra(write=False, cosmo=cosmo, igm=igm)
def _get_spectroscopy_fnu(self, galaxy):
"""Compute the spectral flux density for the galaxy.
This function will compute the spectral flux density for all
spectra stored on the galaxy and its components.
Args:
galaxy (Galaxy):
The galaxy to generate the spectroscopy for.
"""
start = time.perf_counter()
# Iterate over all queued operation configurations
for model_label, op_kwargs in self._operation_kwargs[
"get_spectroscopy_fnu"
]:
# Get the instruments for this operation
instruments = op_kwargs["instruments"]
# Loop over instruments
for inst in instruments:
# Get the spectroscopy for this instrument
galaxy.get_spectroscopy(inst, limit_to=model_label)
# Count the number of spectroscopy items we have generated
self._op_counts["Spectroscopy Fnu"] += count_and_check_dict_recursive(
galaxy.spectroscopy
)
if galaxy.stars is not None:
self._op_counts["Spectroscopy Fnu"] += (
count_and_check_dict_recursive(galaxy.stars.spectroscopy)
)
if galaxy.black_holes is not None:
self._op_counts["Spectroscopy Fnu"] += (
count_and_check_dict_recursive(galaxy.black_holes.spectroscopy)
)
# Record the time taken
self._op_timing["Spectroscopy Fnu"] += time.perf_counter() - start
def _run_extra_analysis(self, galaxy):
"""Call any user provided analysis functions.
We will call this last when running the pipeline. This ensures that
all data generated by the pipeline exists before performing the user
calculations. This is important since the user may want to use the
data generated by the pipeline in their analysis functions.
Args:
galaxy (Galaxy):
The galaxy to run the extra analysis functions on.
"""
start = time.perf_counter()
# Nothing to do if we have no analysis functions
if len(self._analysis_funcs) == 0:
return
# Create a temporary attribute on the galaxy to store the results
# of the analysis functions
galaxy._extra_analysis_results = {}
# Loop over the analysis functions and call them for this galaxy
for func, args, kwargs, key in zip(
self._analysis_funcs,
self._analysis_args,
self._analysis_kwargs,
self._analysis_results_keys,
):
# Run the analysis function on this galaxy
try:
func_start = time.perf_counter()
res = func(galaxy, *args, **kwargs)
# Count the number of times we have run this function
self._op_counts["Extra Analyses"] += 1
# Record the time taken
self._op_timing[key] = (
self._op_timing.get(key, 0.0)
+ time.perf_counter()
- func_start
)
except Exception as e:
raise type(e)(
f"Error running extra analysis function {key}: {e}"
).with_traceback(e.__traceback__)
# Store the result on the galaxy
galaxy._extra_analysis_results[key] = res
# Record the time taken
self._op_timing["Extra Analyses"] += time.perf_counter() - start
def _unpack_results(self, galaxy):
"""Unpack the results from the galaxy into the pipeline.
This will extract all the calculated data from the galaxy and store it
on the pipeline for writing out. All data not flagged to be saved will
be cleared when the galaxy is garbage collected.
Args:
galaxy (Galaxy):
The galaxy to unpack the results from.
"""
start = time.perf_counter()
# Do we need to unpack the SFZH?
if self._write_sfzh:
if galaxy.stars is not None:
self.sfzhs.append(galaxy.stars.sfzh)
else:
self.sfzhs.append(galaxy.sfzh)
# Do we need to unpack the SFH?
if self._write_sfh:
if galaxy.stars is not None:
self.sfhs.append(galaxy.stars.sfh)
else:
self.sfhs.append(galaxy.sfh)
# Do we need to unpack the lnu spectra?
if self._write_lnu_spectra:
for spec_type, spec in galaxy.spectra.items():
self.lnu_spectra["Galaxy"].setdefault(spec_type, []).append(
spec.lnu
)
if galaxy.stars is not None:
for spec_type, spec in galaxy.stars.spectra.items():
self.lnu_spectra["Stars"].setdefault(spec_type, []).append(
spec.lnu
)
if galaxy.black_holes is not None:
for spec_type, spec in galaxy.black_holes.spectra.items():
self.lnu_spectra["BlackHole"].setdefault(
spec_type, []
).append(spec.lnu)
# Do we need to unpack the fnu spectra?
if self._write_fnu_spectra:
for spec_type, spec in galaxy.spectra.items():
self.fnu_spectra["Galaxy"].setdefault(spec_type, []).append(
spec.fnu
)
if galaxy.stars is not None:
for spec_type, spec in galaxy.stars.spectra.items():
self.fnu_spectra["Stars"].setdefault(spec_type, []).append(
spec.fnu
)
if galaxy.black_holes is not None:
for spec_type, spec in galaxy.black_holes.spectra.items():
self.fnu_spectra["BlackHole"].setdefault(
spec_type, []
).append(spec.fnu)
# Do we need to unpack the photometric luminosities?
if self._write_luminosities:
for spec_type, phot in galaxy.photo_lnu.items():
for filt, lnu in phot.items():
self.luminosities["Galaxy"].setdefault(
spec_type, {}
).setdefault(filt, []).append(lnu)
if galaxy.stars is not None:
for spec_type, phot in galaxy.stars.photo_lnu.items():
for filt, lnu in phot.items():
self.luminosities["Stars"].setdefault(
spec_type, {}
).setdefault(filt, []).append(lnu)
if galaxy.black_holes is not None:
for spec_type, phot in galaxy.black_holes.photo_lnu.items():
for filt, lnu in phot.items():
self.luminosities["BlackHole"].setdefault(
spec_type, {}
).setdefault(filt, []).append(lnu)
# Do we need to unpack the photometric fluxes?
if self._write_fluxes:
for spec_type, phot in galaxy.photo_fnu.items():
for filt, fnu in phot.items():
self.fluxes["Galaxy"].setdefault(spec_type, {}).setdefault(
filt, []
).append(fnu)
if galaxy.stars is not None:
for spec_type, phot in galaxy.stars.photo_fnu.items():
for filt, fnu in phot.items():
self.fluxes["Stars"].setdefault(
spec_type, {}
).setdefault(filt, []).append(fnu)
if galaxy.black_holes is not None:
for spec_type, phot in galaxy.black_holes.photo_fnu.items():
for filt, fnu in phot.items():
self.fluxes["BlackHole"].setdefault(
spec_type, {}
).setdefault(filt, []).append(fnu)
# Do we need to unpack the emission lines?
if self._write_lines:
for spec_type, lines in galaxy.lines.items():
self.line_lums["Galaxy"].setdefault(spec_type, []).append(
lines.luminosity
)
self.line_cont_lums["Galaxy"].setdefault(spec_type, []).append(
lines.continuum
)
if galaxy.stars is not None:
for spec_type, lines in galaxy.stars.lines.items():
self.line_lums["Stars"].setdefault(spec_type, []).append(
lines.luminosity
)
self.line_cont_lums["Stars"].setdefault(
spec_type, []
).append(lines.continuum)
if galaxy.black_holes is not None:
for spec_type, lines in galaxy.black_holes.lines.items():
self.line_lums["BlackHole"].setdefault(
spec_type, []
).append(lines.luminosity)
self.line_cont_lums["BlackHole"].setdefault(
spec_type, []
).append(lines.continuum)
# Do we need to unpack the observed emission lines?
if self._write_flux_lines:
for spec_type, lines in galaxy.lines.items():
self.line_fluxes["Galaxy"].setdefault(spec_type, []).append(
lines.flux
)
self.line_cont_fluxes["Galaxy"].setdefault(
spec_type, []
).append(lines.continuum)
if galaxy.stars is not None:
for spec_type, lines in galaxy.stars.lines.items():
self.line_fluxes["Stars"].setdefault(spec_type, []).append(
lines.flux
)
self.line_cont_fluxes["Stars"].setdefault(
spec_type, []
).append(lines.continuum)
if galaxy.black_holes is not None:
for spec_type, lines in galaxy.black_holes.lines.items():
self.line_fluxes["BlackHole"].setdefault(
spec_type, []
).append(lines.flux)
self.line_cont_fluxes["BlackHole"].setdefault(
spec_type, []
).append(lines.continuum)
# Do we need to unpack the luminosity images?
if self._write_images_lum:
for inst_label, d in galaxy.images_lnu.items():
for spec_type, imgs in d.items():
for f, img in imgs.items():
self.images_lum["Galaxy"].setdefault(
inst_label, {}
).setdefault(spec_type, {}).setdefault(f, []).append(
img.arr * img.units
)
if galaxy.stars is not None:
for inst_label, d in galaxy.stars.images_lnu.items():
for spec_type, imgs in d.items():
for f, img in imgs.items():
self.images_lum["Stars"].setdefault(
inst_label, {}
).setdefault(spec_type, {}).setdefault(
f, []
).append(img.arr * img.units)
if galaxy.black_holes is not None:
for inst_label, d in galaxy.black_holes.images_lnu.items():
for spec_type, imgs in d.items():
for f, img in imgs.items():
self.images_lum["BlackHole"].setdefault(
inst_label, {}
).setdefault(spec_type, {}).setdefault(
f, []
).append(img.arr * img.units)
# Do we need to unpack the flux images?
if self._write_images_flux:
for inst_label, d in galaxy.images_fnu.items():
for spec_type, imgs in d.items():
for f, img in imgs.items():
self.images_flux["Galaxy"].setdefault(
inst_label, {}
).setdefault(spec_type, {}).setdefault(f, []).append(
img.arr * img.units
)
if galaxy.stars is not None:
for inst_label, d in galaxy.stars.images_fnu.items():
for spec_type, imgs in d.items():
for f, img in imgs.items():
self.images_flux["Stars"].setdefault(
inst_label, {}
).setdefault(spec_type, {}).setdefault(
f, []
).append(img.arr * img.units)
if galaxy.black_holes is not None:
for inst_label, d in galaxy.black_holes.images_fnu.items():
for spec_type, imgs in d.items():
for f, img in imgs.items():
self.images_flux["BlackHole"].setdefault(
inst_label, {}
).setdefault(spec_type, {}).setdefault(
f, []
).append(img.arr * img.units)
# Do we need to unpack the luminosity images With PSFs?
if self._write_images_lum_psf:
for inst_label, d in galaxy.images_psf_lnu.items():
for spec_type, imgs in d.items():
for f, img in imgs.items():
self.images_lum_psf["Galaxy"].setdefault(
inst_label, {}
).setdefault(spec_type, {}).setdefault(f, []).append(
img.arr * img.units
)
if galaxy.stars is not None:
for inst_label, d in galaxy.stars.images_psf_lnu.items():
for spec_type, imgs in d.items():
for f, img in imgs.items():
self.images_lum_psf["Stars"].setdefault(
inst_label, {}
).setdefault(spec_type, {}).setdefault(
f, []
).append(img.arr * img.units)
if galaxy.black_holes is not None:
for inst_label, d in galaxy.black_holes.images_psf_lnu.items():
for spec_type, imgs in d.items():
for f, img in imgs.items():
self.images_lum_psf["BlackHole"].setdefault(
inst_label, {}
).setdefault(spec_type, {}).setdefault(
f, []
).append(img.arr * img.units)
# Do we need to unpack the flux images With PSFs?
if self._write_images_flux_psf:
for inst_label, d in galaxy.images_psf_fnu.items():
for spec_type, imgs in d.items():
for f, img in imgs.items():
self.images_flux_psf["Galaxy"].setdefault(
inst_label, {}
).setdefault(spec_type, {}).setdefault(f, []).append(
img.arr * img.units
)
if galaxy.stars is not None:
for inst_label, d in galaxy.stars.images_psf_fnu.items():
for spec_type, imgs in d.items():
for f, img in imgs.items():
self.images_flux_psf["Stars"].setdefault(
inst_label, {}
).setdefault(spec_type, {}).setdefault(
f, []
).append(img.arr * img.units)
if galaxy.black_holes is not None:
for inst_label, d in galaxy.black_holes.images_psf_fnu.items():
for spec_type, imgs in d.items():
for f, img in imgs.items():
self.images_flux_psf["BlackHole"].setdefault(
inst_label, {}
).setdefault(spec_type, {}).setdefault(
f, []
).append(img.arr * img.units)
# Do we need to unpack the luminosity images With Noise?
if self._write_images_lum_noise:
for inst_label, d in galaxy.images_noise_lnu.items():
for spec_type, imgs in d.items():
for f, img in imgs.items():
self.images_lum_noise["Galaxy"].setdefault(
inst_label, {}
).setdefault(spec_type, {}).setdefault(f, []).append(
img.arr * img.units
)
if galaxy.stars is not None:
for inst_label, d in galaxy.stars.images_noise_lnu.items():
for spec_type, imgs in d.items():
for f, img in imgs.items():
self.images_lum_noise["Stars"].setdefault(
inst_label, {}
).setdefault(spec_type, {}).setdefault(
f, []
).append(img.arr * img.units)
if galaxy.black_holes is not None:
bhs = galaxy.black_holes
for inst_label, d in bhs.images_noise_lnu.items():
for spec_type, imgs in d.items():
for f, img in imgs.items():
self.images_lum_noise["BlackHole"].setdefault(
inst_label, {}
).setdefault(spec_type, {}).setdefault(
f, []
).append(img.arr * img.units)
# Do we need to unpack the flux images With Noise?
if self._write_images_flux_noise:
for inst_label, d in galaxy.images_noise_fnu.items():
for spec_type, imgs in d.items():
for f, img in imgs.items():
self.images_flux_noise["Galaxy"].setdefault(
inst_label, {}
).setdefault(spec_type, {}).setdefault(f, []).append(
img.arr * img.units
)
if galaxy.stars is not None:
for inst_label, d in galaxy.stars.images_noise_fnu.items():
for spec_type, imgs in d.items():
for f, img in imgs.items():
self.images_flux_noise["Stars"].setdefault(
inst_label, {}
).setdefault(spec_type, {}).setdefault(
f, []
).append(img.arr * img.units)
if galaxy.black_holes is not None:
bhs = galaxy.black_holes
for inst_label, d in bhs.images_noise_fnu.items():
for spec_type, imgs in d.items():
for f, img in imgs.items():
self.images_flux_noise["BlackHole"].setdefault(
inst_label, {}
).setdefault(spec_type, {}).setdefault(
f, []
).append(img.arr * img.units)
# Do we need to unpack the extra analysis results?
if hasattr(galaxy, "_extra_analysis_results"):
for key, res in galaxy._extra_analysis_results.items():
self._analysis_results[key].append(res)
# Done!
self._op_timing["Unpacking results"] += time.perf_counter() - start
[docs]
def run(self):
"""Run the pipeline.
This will churn through the attached galaxies generating all the data
requested using the get_* methods.
Only data flagged for saving will be held in memory with all other data
cleared out.
Once the pipeline has run, the data can be written out to a file using
the write method.
Note that as we loop over galaxies they will be removed from the
pipeline to free up memory. This means that once the pipeline has run
the galaxies will no longer be accessible from the pipeline object.
Raises:
PipelineNotReady:
If the pipeline is not ready to a specific operation.
"""
# Ensure we are ready
if not self._loaded_galaxies:
raise exceptions.PipelineNotReady(
"Cannot generate spectra before galaxies are loaded! "
"Call load_galaxies first."
)
# Ensure we have at least one operation signalled
signals = [
self._do_los_optical_depths,
self._do_lnu_spectra,
self._do_fnu_spectra,
self._do_luminosities,
self._do_fluxes,
self._do_lum_lines,
self._do_flux_lines,
self._do_images_lum,
self._do_images_flux,
self._do_lnu_data_cubes,
self._do_fnu_data_cubes,
self._do_spectroscopy_lnu,
self._do_spectroscopy_fnu,
self._do_sfzh,
self._do_sfh,
]
if not any(signals):
raise exceptions.PipelineNotReady(
"Cannot run pipeline without any operations signalled! "
"Use the get_* methods to signal operations."
)
# Ok we are good to go! Report the last metadata and then get going
if self.rank == 0:
self._report_instruments()
# Prewarm instruments before processing galaxies.
self._prepare_instruments()
# Print the header for the pipeline run to the console
self._print_progress_header()
# Loop over galaxies and compute what has been requested using get_*
# signalling methods
igal = 0
while len(self.galaxies) > 0:
start_gal = time.perf_counter()
# Pop the first galaxy from the list
gal = self.galaxies.pop(0)
# Are we generating LOS optical depths?
# Note that this can only be done on galaxies prior to any
# splitting done for memory optimisation
if self._do_los_optical_depths:
self._get_los_optical_depths(gal)
# If we have a particle galaxy, see if we need to split it
if isinstance(gal, ParticleGalaxy):
gals = gal.split(self._max_npart)
else:
gals = [gal]
# If chunking is active, clear any existing additive outputs on the
# parent before processing children and accumulating back onto it
# just to be sure, in case the galaxy has been used before
if len(gals) > 1:
clear_pipeline_outputs(gal)
# Loop over the split galaxies list (which may just be a list
# containing the original gal above)
while len(gals) > 0:
# Get the next galaxy
_gal = gals.pop(0)
# Are we generating SFZHs?
if self._do_sfzh:
self._get_sfzh(_gal)
# Are we generating SFHs?
if self._do_sfh:
self._get_sfh(_gal)
# Are we generating lnu spectra?
if self._do_lnu_spectra:
self._get_spectra(_gal)
# Are we generating fnu spectra?
if self._do_fnu_spectra:
self._get_observed_spectra(_gal)
# Are we generating photometric luminosities?
if self._do_luminosities:
self._get_photometry_luminosities(_gal)
# Are we generating photometric fluxes?
if self._do_fluxes:
self._get_photometry_fluxes(_gal)
# Are we generating emission lines?
if self._do_lum_lines:
self._get_lines(_gal)
# Are we generating observed emission lines?
if self._do_flux_lines:
self._get_observed_lines(_gal)
# Are we generating luminosity images?
if self._do_images_lum:
self._get_images_luminosity(_gal)
# Are we generating flux images?
if self._do_images_flux:
self._get_images_flux(_gal)
# Are we generating luminosity data cubes?
if self._do_lnu_data_cubes:
self._get_data_cubes_lnu(_gal)
# Are we generating flux data cubes?
if self._do_fnu_data_cubes:
self._get_data_cubes_fnu(_gal)
# Are we generating luminosity spectroscopy?
if self._do_spectroscopy_lnu:
self._get_spectroscopy_lnu(_gal)
# Are we generating flux spectroscopy?
if self._do_spectroscopy_fnu:
self._get_spectroscopy_fnu(_gal)
# Child chunk outputs are accumulated back onto the original
# parent galaxy once this chunk has been processed.
# Combine back onto the original galaxy if necessary
if _gal is not gal:
accumulate_pipeline_results_from_child(gal, _gal)
del _gal
# Run any extra analysis functions
self._run_extra_analysis(gal)
# Ok, we're done with this galaxy so we can unpack the results
self._unpack_results(gal)
# Output into the progress table
self._add_progress_row(gal, igal, start_gal)
igal += 1
# Now we can remove the galaxy to free up memory
del gal
# print the footer for the pipeline run to the console
self._print_progress_footer()
# We're done! Report the time taken
self._report_total_timings()
# Clean up the outputs
self._clean_outputs()
# Flag that we have run the pipeline
self._analysis_complete = True
def _clean_outputs(self):
"""Clean up the lists attached to the pipeline.
This prepares the results for writing out to a file.
"""
start = time.perf_counter()
# Convert the lists of SFZHs to unyt arrays
self.sfzhs = unyt_array(self.sfzhs)
# Convert the lists of SFHs to unyt arrays
self.sfhs = unyt_array(self.sfhs)
# Convert the lists of spectra to unyt arrays
for spec_type, spec in self.lnu_spectra["Galaxy"].items():
self.lnu_spectra["Galaxy"][spec_type] = unyt_array(spec)
for spec_type, spec in self.lnu_spectra["Stars"].items():
self.lnu_spectra["Stars"][spec_type] = unyt_array(spec)
for spec_type, spec in self.lnu_spectra["BlackHole"].items():
self.lnu_spectra["BlackHole"][spec_type] = unyt_array(spec)
for spec_type, spec in self.fnu_spectra["Galaxy"].items():
self.fnu_spectra[spec_type] = unyt_array(spec)
for spec_type, spec in self.fnu_spectra["Stars"].items():
self.fnu_spectra["Stars"][spec_type] = unyt_array(spec)
for spec_type, spec in self.fnu_spectra["BlackHole"].items():
self.fnu_spectra["BlackHole"][spec_type] = unyt_array(spec)
# Convert the lists of luminosities to unyt arrays
for spec_type, phot in self.luminosities["Galaxy"].items():
for filt, lnu in phot.items():
self.luminosities["Galaxy"][spec_type][filt] = unyt_array(lnu)
for spec_type, phot in self.luminosities["Stars"].items():
for filt, lnu in phot.items():
self.luminosities["Stars"][spec_type][filt] = unyt_array(lnu)
for spec_type, phot in self.luminosities["BlackHole"].items():
for filt, lnu in phot.items():
self.luminosities["BlackHole"][spec_type][filt] = unyt_array(
lnu
)
# Convert the lists of fluxes to unyt arrays
for spec_type, phot in self.fluxes["Galaxy"].items():
for filt, fnu in phot.items():
self.fluxes["Galaxy"][spec_type][filt] = unyt_array(fnu)
for spec_type, phot in self.fluxes["Stars"].items():
for filt, fnu in phot.items():
self.fluxes["Stars"][spec_type][filt] = unyt_array(fnu)
for spec_type, phot in self.fluxes["BlackHole"].items():
for filt, fnu in phot.items():
self.fluxes["BlackHole"][spec_type][filt] = unyt_array(fnu)
# Convert the lists of line luminosities and continua to unyt arrays
for spec_type, lines in self.line_lums["Galaxy"].items():
self.line_lums["Galaxy"][spec_type] = unyt_array(lines)
for spec_type, conts in self.line_cont_lums["Galaxy"].items():
self.line_cont_lums["Galaxy"][spec_type] = unyt_array(conts)
for spec_type, lines in self.line_lums["Stars"].items():
self.line_lums["Stars"][spec_type] = unyt_array(lines)
for spec_type, conts in self.line_cont_lums["Stars"].items():
self.line_cont_lums["Stars"][spec_type] = unyt_array(conts)
for spec_type, lines in self.line_lums["BlackHole"].items():
self.line_lums["BlackHole"][spec_type] = unyt_array(lines)
for spec_type, conts in self.line_cont_lums["BlackHole"].items():
self.line_cont_lums["BlackHole"][spec_type] = unyt_array(conts)
# Convert the lists of line fluxes and continua to unyt arrays
for spec_type, fluxes in self.line_fluxes["Galaxy"].items():
self.line_fluxes["Galaxy"][spec_type] = unyt_array(fluxes)
for spec_type, conts in self.line_cont_fluxes["Galaxy"].items():
self.line_cont_fluxes["Galaxy"][spec_type] = unyt_array(conts)
for spec_type, fluxes in self.line_fluxes["Stars"].items():
self.line_fluxes["Stars"][spec_type] = unyt_array(fluxes)
for spec_type, conts in self.line_cont_fluxes["Stars"].items():
self.line_cont_fluxes["Stars"][spec_type] = unyt_array(conts)
for spec_type, fluxes in self.line_fluxes["BlackHole"].items():
self.line_fluxes["BlackHole"][spec_type] = unyt_array(fluxes)
for spec_type, conts in self.line_cont_fluxes["BlackHole"].items():
self.line_cont_fluxes["BlackHole"][spec_type] = unyt_array(conts)
# Convert the lists of luminosity images to unyt arrays
for inst_label, inst_data in self.images_lum["Galaxy"].items():
for spec_type, imgs in inst_data.items():
for f, img in imgs.items():
self.images_lum["Galaxy"][inst_label][spec_type][f] = (
unyt_array(img)
)
for inst_label, inst_data in self.images_lum["Stars"].items():
for spec_type, imgs in inst_data.items():
for f, img in imgs.items():
self.images_lum["Stars"][inst_label][spec_type][f] = (
unyt_array(img)
)
for inst_label, inst_data in self.images_lum["BlackHole"].items():
for spec_type, imgs in inst_data.items():
for f, img in imgs.items():
self.images_lum["BlackHole"][inst_label][spec_type][f] = (
unyt_array(img)
)
# Convert the lists of flux images to unyt arrays
for inst_label, inst_data in self.images_flux["Galaxy"].items():
for spec_type, imgs in inst_data.items():
for f, img in imgs.items():
self.images_flux["Galaxy"][inst_label][spec_type][f] = (
unyt_array(img)
)
for inst_label, inst_data in self.images_flux["Stars"].items():
for spec_type, imgs in inst_data.items():
for f, img in imgs.items():
self.images_flux["Stars"][inst_label][spec_type][f] = (
unyt_array(img)
)
for inst_label, inst_data in self.images_flux["BlackHole"].items():
for spec_type, imgs in inst_data.items():
for f, img in imgs.items():
self.images_flux["BlackHole"][inst_label][spec_type][f] = (
unyt_array(img)
)
# Convert the lists of psf luminosity images to unyt arrays
for inst_label, inst_data in self.images_lum_psf["Galaxy"].items():
for spec_type, imgs in inst_data.items():
for f, img in imgs.items():
self.images_lum_psf["Galaxy"][inst_label][spec_type][f] = (
unyt_array(img)
)
for inst_label, inst_data in self.images_lum_psf["Stars"].items():
for spec_type, imgs in inst_data.items():
for f, img in imgs.items():
self.images_lum_psf["Stars"][inst_label][spec_type][f] = (
unyt_array(img)
)
for inst_label, inst_data in self.images_lum_psf["BlackHole"].items():
for spec_type, imgs in inst_data.items():
for f, img in imgs.items():
self.images_lum_psf["BlackHole"][inst_label][spec_type][
f
] = unyt_array(img)
# Convert the lists of psf flux images to unyt arrays
for inst_label, inst_data in self.images_flux_psf["Galaxy"].items():
for spec_type, imgs in inst_data.items():
for f, img in imgs.items():
self.images_flux_psf["Galaxy"][inst_label][spec_type][
f
] = unyt_array(img)
for inst_label, inst_data in self.images_flux_psf["Stars"].items():
for spec_type, imgs in inst_data.items():
for f, img in imgs.items():
self.images_flux_psf["Stars"][inst_label][spec_type][f] = (
unyt_array(img)
)
for inst_label, inst_data in self.images_flux_psf["BlackHole"].items():
for spec_type, imgs in inst_data.items():
for f, img in imgs.items():
self.images_flux_psf["BlackHole"][inst_label][spec_type][
f
] = unyt_array(img)
# Convert the lists of noise luminosity images to unyt arrays
for inst_label, inst_data in self.images_lum_noise["Galaxy"].items():
for spec_type, imgs in inst_data.items():
for f, img in imgs.items():
self.images_lum_noise["Galaxy"][inst_label][spec_type][
f
] = unyt_array(img)
for inst_label, inst_data in self.images_lum_noise["Stars"].items():
for spec_type, imgs in inst_data.items():
for f, img in imgs.items():
self.images_lum_noise["Stars"][inst_label][spec_type][
f
] = unyt_array(img)
for inst_label, inst_data in self.images_lum_noise[
"BlackHole"
].items():
for spec_type, imgs in inst_data.items():
for f, img in imgs.items():
self.images_lum_noise["BlackHole"][inst_label][spec_type][
f
] = unyt_array(img)
# Convert the lists of noise flux images to unyt arrays
for inst_label, inst_data in self.images_flux_noise["Galaxy"].items():
for spec_type, imgs in inst_data.items():
for f, img in imgs.items():
self.images_flux_noise["Galaxy"][inst_label][spec_type][
f
] = unyt_array(img)
for inst_label, inst_data in self.images_flux_noise["Stars"].items():
for spec_type, imgs in inst_data.items():
for f, img in imgs.items():
self.images_flux_noise["Stars"][inst_label][spec_type][
f
] = unyt_array(img)
for inst_label, inst_data in self.images_flux_noise[
"BlackHole"
].items():
for spec_type, imgs in inst_data.items():
for f, img in imgs.items():
self.images_flux_noise["BlackHole"][inst_label][spec_type][
f
] = unyt_array(img)
# Convert the lists of extra analysis results to unyt arrays
# Unlike the previous data we need to do some checks here to ensure
# that the data is in a consistent format. This is because the user
# can return any type of data from their analysis functions.
for key, res in self._analysis_results.items():
# Ensure the data of all results is in the same format.
types = set([type(r) for r in res])
if len(types) > 1:
raise exceptions.BadResult(
"All results from extra analysis functions must be "
f"of the same type. Got: {set(types)}"
)
# Combine the list of results into a dict of unyt arrays or a
# single unyt array. This will simply return a unyt array if we
# hand a list of values
combined_data = combine_list_of_dicts(res)
# Store the data
self._analysis_results[key] = combined_data
self._took(start, "Cleaning outputs")
[docs]
def write(self, outpath, verbose=None):
"""Write what we have produced to a HDF5 file.
Any get_* methods that have been called will have their results written
to the HDF5 file. We consider the call to get_* to be the signal to
write the data out. If a get_* method has not been called, but the data
was needed by a subsequent get_* method, it will have been run but
then discarded, e.g. calling get_photometry_fluxes will have run
get_observed_spectra, which will have run get_spectra, but the results
from get_spectra and get_observed_spectra will have been discarded.
Args:
outpath (str):
The path to the HDF5 file to write.
verbose (bool, optional):
If set, override the Pipeline verbose setting.
"""
# Ensure we have run the pipeline (this is mostly useful to help with
# running old pipelines that don't call run before write)
if not self._analysis_complete:
self._print(
"Pipeline has not been run. Running pipeline before writing."
)
self.run()
write_start = time.perf_counter()
# Get an instance of the HDF5 file writer
self.io_helper = PipelineIO(
outpath,
self.comm,
self.n_galaxies_local,
self._start_time,
verbose if verbose is not None else self.verbose,
)
# Write out the metadata
self.io_helper.create_file_with_metadata(
self.instruments, self.emission_model
)
# In MPI land we need to collect together the galaxy counts on each
# rank to make the indices for each rank consistent
if self.using_mpi:
galaxy_counts = self.comm.allgather(self.n_galaxies_local)
galaxy_stars = np.cumsum(galaxy_counts) - galaxy_counts
galaxy_ends = np.cumsum(galaxy_counts) - 1
galaxy_indices = np.arange(
galaxy_stars[self.rank],
galaxy_ends[self.rank] + 1,
)
else:
galaxy_indices = None
# Write spectral luminosity densities
if self._write_lnu_spectra:
self.io_helper.write_data(
self.lnu_spectra["Galaxy"],
"Galaxies/Spectra/SpectralLuminosityDensities",
galaxy_indices,
)
self.io_helper.write_data(
self.lnu_spectra["Stars"],
"Galaxies/Stars/Spectra/SpectralLuminosityDensities",
galaxy_indices,
)
self.io_helper.write_data(
self.lnu_spectra["BlackHole"],
"Galaxies/BlackHoles/Spectra/SpectralLuminosityDensities",
galaxy_indices,
)
# Write spectral flux densities
if self._write_fnu_spectra:
self.io_helper.write_data(
self.fnu_spectra["Galaxy"],
"Galaxies/Spectra/SpectralFluxDensities",
galaxy_indices,
)
self.io_helper.write_data(
self.fnu_spectra["Stars"],
"Galaxies/Stars/Spectra/SpectralFluxDensities",
galaxy_indices,
)
self.io_helper.write_data(
self.fnu_spectra["BlackHole"],
"Galaxies/BlackHoles/Photometry/SpectralFluxDensities",
galaxy_indices,
)
# Write photometric luminosities
if self._write_luminosities:
self.io_helper.write_data(
self.luminosities["Galaxy"],
"Galaxies/Photometry/Luminosities",
galaxy_indices,
)
self.io_helper.write_data(
self.luminosities["Stars"],
"Galaxies/Stars/Photometry/Luminosities",
galaxy_indices,
)
self.io_helper.write_data(
self.luminosities["BlackHole"],
"Galaxies/BlackHoles/Photometry/Luminosities",
galaxy_indices,
)
# Write photometric fluxes
if self._write_fluxes:
self.io_helper.write_data(
self.fluxes["Galaxy"],
"Galaxies/Photometry/Fluxes",
galaxy_indices,
)
self.io_helper.write_data(
self.fluxes["Stars"],
"Galaxies/Stars/Photometry/Fluxes",
galaxy_indices,
)
self.io_helper.write_data(
self.fluxes["BlackHole"],
"Galaxies/BlackHoles/Photometry/Fluxes",
galaxy_indices,
)
# Write emission line luminosities
if self._write_lines:
self.io_helper.write_data(
self.line_lums["Galaxy"],
"Galaxies/Lines/Luminosity",
galaxy_indices,
)
self.io_helper.write_data(
self.line_lums["Stars"],
"Galaxies/Stars/Lines/Luminosity",
galaxy_indices,
)
self.io_helper.write_data(
self.line_lums["BlackHole"],
"Galaxies/BlackHoles/Lines/Luminosity",
galaxy_indices,
)
self.io_helper.write_data(
self.line_cont_lums["Galaxy"],
"Galaxies/Lines/Continuum",
galaxy_indices,
)
self.io_helper.write_data(
self.line_cont_lums["Stars"],
"Galaxies/Stars/Lines/Continuum",
galaxy_indices,
)
self.io_helper.write_data(
self.line_cont_lums["BlackHole"],
"Galaxies/BlackHoles/Lines/Continuum",
galaxy_indices,
)
self.io_helper.write_data(
self.line_ids,
"Galaxies/Lines/IDs",
galaxy_indices,
)
self.io_helper.write_data(
self.line_lams,
"Galaxies/Lines/Wavelengths",
galaxy_indices,
)
# Write observed emission line fluxes
if self._write_flux_lines:
self.io_helper.write_data(
self.line_fluxes["Galaxy"],
"Galaxies/Lines/Flux",
galaxy_indices,
)
self.io_helper.write_data(
self.line_fluxes["Stars"],
"Galaxies/Stars/Lines/Flux",
galaxy_indices,
)
self.io_helper.write_data(
self.line_fluxes["BlackHole"],
"Galaxies/BlackHoles/Lines/Flux",
galaxy_indices,
)
self.io_helper.write_data(
self.line_cont_fluxes["Galaxy"],
"Galaxies/Lines/Continuum",
galaxy_indices,
)
self.io_helper.write_data(
self.line_cont_fluxes["Stars"],
"Galaxies/Stars/Lines/Continuum",
galaxy_indices,
)
self.io_helper.write_data(
self.line_cont_fluxes["BlackHole"],
"Galaxies/BlackHoles/Lines/Continuum",
galaxy_indices,
)
self.io_helper.write_data(
self.line_obs_lams,
"Galaxies/Lines/ObservedWavelengths",
galaxy_indices,
)
# Write luminosity images
if self._write_images_lum:
self.io_helper.write_data(
self.images_lum["Galaxy"],
"Galaxies/Images/Luminosity",
galaxy_indices,
)
self.io_helper.write_data(
self.images_lum["Stars"],
"Galaxies/Stars/Images/Luminosity",
galaxy_indices,
)
self.io_helper.write_data(
self.images_lum["BlackHole"],
"Galaxies/BlackHoles/Images/Luminosity",
galaxy_indices,
)
# Write PSF luminosity images
if self._write_images_lum_psf:
self.io_helper.write_data(
self.images_lum_psf["Galaxy"],
"Galaxies/PSFImages/Luminosity",
galaxy_indices,
)
self.io_helper.write_data(
self.images_lum_psf["Stars"],
"Galaxies/Stars/PSFImages/Luminosity",
galaxy_indices,
)
self.io_helper.write_data(
self.images_lum_psf["BlackHole"],
"Galaxies/BlackHoles/PSFImages/Luminosity",
galaxy_indices,
)
# Write noise luminosity images
if self._write_images_lum_noise:
self.io_helper.write_data(
self.images_lum_noise["Galaxy"],
"Galaxies/NoiseImages/Luminosity",
galaxy_indices,
)
self.io_helper.write_data(
self.images_lum_noise["Stars"],
"Galaxies/Stars/NoiseImages/Luminosity",
galaxy_indices,
)
self.io_helper.write_data(
self.images_lum_noise["BlackHole"],
"Galaxies/BlackHoles/NoiseImages/Luminosity",
galaxy_indices,
)
# Write flux images
if self._write_images_flux:
self.io_helper.write_data(
self.images_flux["Galaxy"],
"Galaxies/Images/Flux",
galaxy_indices,
)
self.io_helper.write_data(
self.images_flux["Stars"],
"Galaxies/Stars/Images/Flux",
galaxy_indices,
)
self.io_helper.write_data(
self.images_flux["BlackHole"],
"Galaxies/BlackHoles/Images/Flux",
galaxy_indices,
)
# Write PSF flux images
if self._write_images_flux_psf:
self.io_helper.write_data(
self.images_flux_psf["Galaxy"],
"Galaxies/PSFImages/Flux",
galaxy_indices,
)
self.io_helper.write_data(
self.images_flux_psf["Stars"],
"Galaxies/Stars/PSFImages/Flux",
galaxy_indices,
)
self.io_helper.write_data(
self.images_flux_psf["BlackHole"],
"Galaxies/BlackHoles/PSFImages/Flux",
galaxy_indices,
)
# Write noise flux images
if self._write_images_flux_noise:
self.io_helper.write_data(
self.images_flux_noise["Galaxy"],
"Galaxies/NoiseImages/Flux",
galaxy_indices,
)
self.io_helper.write_data(
self.images_flux_noise["Stars"],
"Galaxies/Stars/NoiseImages/Flux",
galaxy_indices,
)
self.io_helper.write_data(
self.images_flux_noise["BlackHole"],
"Galaxies/BlackHoles/NoiseImages/Flux",
galaxy_indices,
)
# Write out the extra analysis results
for key, res in self._analysis_results.items():
self.io_helper.write_data(
res,
f"Galaxies/{key}",
galaxy_indices,
)
self._took(write_start, "Writing data")
# Totally done!
self._say_goodbye()
[docs]
def combine_files(self):
"""Combine individual rank files into a single file.
Only applicable to MPI runs.
This will create a physical file on disk with all the data copied from
the individual rank files. The rank files themselves will be deleted.
Once all data has been copied.
This method is cleaner but has the potential to be very slow.
"""
start = time.perf_counter()
# Nothing to do if we're not using MPI
if not self.using_mpi:
self._print("Not using MPI, nothing to combine.")
return
# Ensure we have written the data
if self.io_helper is None:
raise exceptions.PipelineNotReady(
"Cannot combine files before writing data! Call write first."
)
# Nothing to do if we're using collective I/O
if self.io_helper.is_collective:
self._print("Using collective I/O, nothing to combine.")
return
# Combine the files
self.io_helper.combine_rank_files()
self._took(start, "Combining files")
[docs]
def combine_files_virtual(self):
"""Combine individual rank files into a single virtual file.
Only applicable to MPI runs.
This will create a file where all the data is accessible but not
physically copied. This is much faster than making a true copy but
requires each individual rank file remains accessible.
"""
start = time.perf_counter()
# Nothing to do if we're not using MPI
if not self.using_mpi:
self._print("Not using MPI, nothing to combine.")
return
# Ensure we have written the data
if self.io_helper is None:
raise exceptions.PipelineNotReady(
"Cannot combine files before writing data! Call write first."
)
# Nothing to do if we're using collective I/O
if self.io_helper.is_collective:
self._print("Using collective I/O, nothing to combine.")
return
# Combine the files
self.io_helper.combine_rank_files_virtual()
self._took(start, "Combining files (virtual)")
[docs]
def repartition_galaxies(self, galaxy_weights=None, random_seed=42):
"""Given the galaxies repartition them across the ranks."""
raise NotImplementedError("Repartitioning is not yet implemented.")
def _report_balance(self):
"""Report the balance of galaxies across the ranks.
This function will print out a nice horizontal bar graph showing the
distribution of galaxies across the ranks.
"""
# Communicate local counts to rank 0
counts = self.comm.gather(self.n_galaxies_local)
# Produce a nice horizontal bar graph to show the
# distribution of galaxies across the ranks. This only needs printing
# on rank 0 regardless of verbosity.
if self.rank == 0:
self._print("Galaxy balance across ranks:")
# Find the maximum list length for scaling
max_count = max(counts)
for rank, count in enumerate(counts):
# Calculate the length of the bar based on the relative size
bar_length = (
int((count / max_count) * 50) if max_count > 0 else 0
)
# Create the bar and append the list length in brackets
bar = "#" * bar_length
self._print(
f"Rank {str(rank).zfill(len(str(self.size)) + 1)} - "
f"{bar} ({count})"
)