synthesizer.pipeline.pipeline_utils¶
A submodule with helpers for writing out Synthesizer pipeline results.
Functions
- synthesizer.pipeline.pipeline_utils.accumulate_pipeline_results_from_child(parent, *children)[source]¶
Accumulate additive pipeline outputs from child galaxies.
- Parameters:
parent – The parent galaxy receiving accumulated outputs.
*children – Child galaxies whose additive pipeline outputs should be combined onto the parent.
- Returns:
The parent galaxy after accumulation.
- Return type:
object
- synthesizer.pipeline.pipeline_utils.cached_split(split_key)[source]¶
Split a key into a list of keys.
This is a cached version of the split function to avoid repeated splitting of the same key.
- Parameters:
split_key (str) – The key to split in “key1/key2/…/keyN” format.
- Returns:
A list of the split keys.
- Return type:
list
- synthesizer.pipeline.pipeline_utils.clear_pipeline_outputs(gal)[source]¶
Clear additive pipeline outputs from a galaxy and components.
- Parameters:
gal – The galaxy whose additive pipeline outputs should be reset.
- Returns:
None
- synthesizer.pipeline.pipeline_utils.combine_atomic_timing_snapshots(comm, using_mpi, rank, timing_data, total_elapsed)[source]¶
Combine timing snapshots across ranks when running under MPI.
- Parameters:
comm – The MPI communicator associated with the Pipeline.
using_mpi (bool) – Whether the Pipeline is currently running under MPI.
rank (int) – The rank of the current process.
timing_data (dict) – The local timing snapshot for this rank.
total_elapsed (float) – The wall-clock time elapsed on this rank since Pipeline instantiation.
- Returns:
A pair
(timing_data, total_elapsed). On non-MPI runs this is just the input data. On MPI runs rank 0 receives the aggregated timings and summed elapsed time, while all other ranks receive(None, None).- Return type:
tuple
- synthesizer.pipeline.pipeline_utils.combine_list_of_dicts(dicts)[source]¶
Combine a list of dictionaries into a single dictionary.
- Parameters:
dicts (list) – A list of dictionaries to combine.
- Returns:
The combined dictionary.
- Return type:
dict
- synthesizer.pipeline.pipeline_utils.count_and_check_dict_recursive(data, prefix='')[source]¶
Recursively count the number of leaves in a dictionary.
- Parameters:
data (dict) – The dictionary to search.
prefix (str) – A prefix to add to the keys of the arrays.
- Returns:
A dictionary of all the numpy arrays in the input dictionary.
- Return type:
dict
- synthesizer.pipeline.pipeline_utils.discover_attr_paths_recursive(obj, prefix='', output_set=None)[source]¶
Recursively discover all outputs attached to an object.
This function will collate all paths to attributes at any level within the input object.
If the object is a dictionary, we will loop over all keys and values recursing where appropriate.
If the object is a class instance (e.g. Galaxy, Stars, ImageCollection, etc.), we will loop over all attributes and recurse where appropriate.
If the object is a “value” (i.e. an array or a scalar), we will append the full path to the output list.
NOTE: this function is currently unused but is kept for debugging purposes since it is extremely useful to see the nesting of attributes on objects.
- Parameters:
obj (dict) – The dictionary to search.
prefix (str) – A prefix to add to the keys of the arrays.
output_set (set) – A set to store the output paths in.
- Returns:
A dictionary of all the numpy arrays in the input dictionary.
- Return type:
dict
- synthesizer.pipeline.pipeline_utils.discover_dict_recursive(data, prefix='', output_set=None)[source]¶
Recursively discover all leaves in a dictionary.
- Parameters:
data (dict) – The dictionary to search.
prefix (str) – A prefix to add to the keys of the arrays.
output_set (set) – A set to store the output paths in.
- Returns:
A dictionary of all the numpy arrays in the input dictionary.
- Return type:
dict
- synthesizer.pipeline.pipeline_utils.discover_dict_structure(data)[source]¶
Recursively discover the structure of a dictionary.
- Parameters:
data (dict) – The dictionary to search.
- Returns:
A dictionary of all the paths in the input dictionary.
- Return type:
dict
- synthesizer.pipeline.pipeline_utils.divide_dicts_recursive(data, divisors)[source]¶
Divide nested dictionary leaves by matching nested divisors.
- synthesizer.pipeline.pipeline_utils.get_atomic_timing_snapshot()[source]¶
Return the current atomic timing snapshot for this process.
- Parameters:
None
- Returns:
A dictionary keyed by operation name containing cumulative timing information with
seconds,count, andsourceentries.- Return type:
dict
- synthesizer.pipeline.pipeline_utils.get_dataset_properties(data, comm, root=0)[source]¶
Return the shapes, dtypes and units of all data arrays in a dictionary.
- Parameters:
data (dict) – The data to get the shapes of.
comm (mpi.Comm) – The MPI communicator.
root (int) – The root rank to gather data to.
- Returns:
A dictionary of the shapes of all data arrays. dict: A dictionary of the dtypes of all data arrays. dict: A dictionary of the units of all data arrays.
- Return type:
dict
- synthesizer.pipeline.pipeline_utils.get_full_memory(obj, seen=None)[source]¶
Estimate memory usage of a Python object, including NumPy arrays.
- Parameters:
obj – The object to inspect.
seen – Set of seen object ids to avoid double-counting.
- Returns:
Approximate size in bytes.
- Return type:
int
- synthesizer.pipeline.pipeline_utils.plot_timing_analysis(rows, outdir)[source]¶
Write timing analysis plots to disk.
- Parameters:
rows (list) – The timing rows produced by
build_timing_analysis_rows.outdir (str or Path) – The directory where the timing plots should be written.
- Returns:
None
- synthesizer.pipeline.pipeline_utils.sanitise_hdf5_key_part(value)[source]¶
Return a HDF5-safe string fragment for generated labels.
- synthesizer.pipeline.pipeline_utils.sum_dicts_recursive(dicts)[source]¶
Sum a list of nested dictionaries with additive leaves.
- Parameters:
dicts (list) – A list of dictionaries or additive leaf values.
- Returns:
The recursively summed dictionary or leaf value.
- Return type:
dict or object
- synthesizer.pipeline.pipeline_utils.unify_dict_structure_across_ranks(data, comm, root=0)[source]¶
Recursively unify the structure of a dictionary across all ranks.
This function will ensure that all ranks have the same structure in their dictionaries. This is necessary for writing out the data in parallel.
- Parameters:
data (dict) – The data to unify.
comm (mpi.Comm) – The MPI communicator.
root (int) – The root rank to gather data to.
- synthesizer.pipeline.pipeline_utils.validate_noise_unit_compatibility(instruments, expected_unit)[source]¶
Validate that noise attributes have compatible units.
This function checks that instruments with noise capabilities have depth and noise_maps attributes with units compatible with the expected unit for the image type (luminosity or flux).
- Note: depth can be specified as:
Plain float/dict of floats: apparent magnitudes (dimensionless, valid for both luminosity and flux images)
unyt_quantity/dict of unyt_quantity: flux/luminosity with units (must match image type)
- Parameters:
instruments (list) – A list of Instrument objects to validate.
expected_unit (unyt.Unit) – The expected unit for the image type (e.g., “erg/s/Hz” for luminosity images or “nJy” for flux images).
- Raises:
InconsistentArguments – If an instrument has depth or noise_maps with incompatible units.
- synthesizer.pipeline.pipeline_utils.write_timing_analysis_summary(rows, outdir)[source]¶
Write the timing analysis summary CSV.
- Parameters:
rows (list) – The timing rows produced by
build_timing_analysis_rows.outdir (str or Path) – The directory where the timing summary CSV should be written.
- Returns:
None
Classes
- class synthesizer.pipeline.pipeline_utils.OperationKwargs(**kwargs)[source]¶
A container class holding the kwargs needed by any pipeline operation.
- _kwargs¶
dict The original kwargs dict used to build this object. (Values are not copied; we just hold the references.)
- property kwargs¶
Return the underlying kwargs dict.
- class synthesizer.pipeline.pipeline_utils.OperationKwargsHandler(model_labels)[source]¶
Container for Pipeline operation kwargs.
This handler enables running pipeline operations multiple times with different parameters for different models in a clean, expandable and organized manner.
Internally it stores unique OperationKwargs objects per operation (func_name) and associates them with one or more model labels and their instruments:
self._func_map[func_name][OperationKwargs][label] -> list[instruments]
This avoids duplicating identical kwargs sets across labels and provides a clean interface to loop over:
all (label, OperationKwargs) for a given operation, or
all OperationKwargs for a given (label, operation), or
groups of labels that share the same OperationKwargs.
- add(model_label, func_name, **kwargs)[source]¶
Add a kwargs set for a given func_name and one or more labels.
This wraps the kwargs in an OperationKwargs and deduplicates them based on its hashing / equality semantics.
- Parameters:
model_label (str or iterable of str or None) – Emission model label(s) or None for NO_MODEL_LABEL.
func_name (str) – Operation / method name, e.g. “get_images_luminosity”.
**kwargs – Arbitrary keyword arguments to store for this func.
- Returns:
The OperationKwargs instance representing this kwargs set.
- Return type:
- add_unique(func_name, **kwargs)[source]¶
Add a single unique kwargs set for a given func_name.
This is used for operations that should only have one configuration per pipeline run (e.g., get_sfzh, get_sfh, get_observed_spectra).
- Parameters:
func_name (str) – Operation / method name, e.g. “get_sfzh”.
**kwargs – Arbitrary keyword arguments to store for this func.
- Returns:
The OperationKwargs instance representing this kwargs set.
- Return type:
- get_unique_kwargs(func_name)[source]¶
Return the unique OperationKwargs for a given func_name.
This is only applicable for operations added via add_unique() and can never have multiple variations.
- Parameters:
func_name (str) – Operation / method name.
- Returns:
The unique OperationKwargs for this operation.
- Return type:
- has(func_name, model_label=None)[source]¶
Return True if any kwargs are stored for the given operation.
- Parameters:
func_name (str) – Operation / method name.
model_label (str, optional) – If provided, restrict the check to this model. If omitted, all models are searched.
- Returns:
True if at least one OperationKwargs exists matching the query.
- Return type:
bool
- iter_all(func_name)[source]¶
Iterate over (model_label, OperationKwargs) pairs for an operation.
This is the main entry point for Pipeline methods that want to process all configs for a given operation, regardless of model.
Non-consuming: internal state is left unchanged.
- Parameters:
func_name (str) – Operation / method name.
- Yields:
(model_label, OperationKwargs) – Tuples of model label and OperationKwargs object.