speechbrain.utils.parallel module

Parallel processing tools to help speed up certain tasks like data preprocessing.

Authors
  • Sylvain de Langen 2023

Summary

Classes:

CancelFuturesOnExit

Context manager that .cancel()s all elements of a list upon exit.

Functions:

get_available_cpu_count

Return the number of CPUs available to the current process.

parallel_map

Maps iterable items with a function, processing chunks of items in parallel with multiple processes and displaying progress with tqdm.

Reference

speechbrain.utils.parallel.get_available_cpu_count() int[source]

Return the number of CPUs available to the current process.

This function provides a reliable way to determine CPU count that respects: 1. User override via SB_NUM_PROC environment variable 2. CPU affinity limits (e.g., SLURM allocations) 3. System CPU count as fallback

The fallback hierarchy is: 1. SB_NUM_PROC environment variable (if set and valid) 2. os.process_cpu_count() (Python 3.13+, respects affinity) 3. len(os.sched_getaffinity(0)) (Unix, respects SLURM/cgroups) 4. os.cpu_count() (fallback for Windows or when above fail)

Returns:

The number of CPUs available. Falls back to 1 if detection fails.

Return type:

int

Examples

>>> # With environment variable override:
>>> import os
>>> os.environ["SB_NUM_PROC"] = "2"
>>> get_available_cpu_count()
2
class speechbrain.utils.parallel.CancelFuturesOnExit(future_list)[source]

Bases: object

Context manager that .cancel()s all elements of a list upon exit. This is used to abort futures faster when raising an exception.

speechbrain.utils.parallel.parallel_map(fn: Callable[[Any], Any], source: Iterable[Any], process_count: int | None = None, chunk_size: int = 8, queue_size: int = 128, executor: Executor | None = None, progress_bar: bool = True, progress_bar_kwargs: dict = {'smoothing': 0.02})[source]

Maps iterable items with a function, processing chunks of items in parallel with multiple processes and displaying progress with tqdm.

Processed elements will always be returned in the original, correct order. Unlike ProcessPoolExecutor.map, elements are produced AND consumed lazily.

Parameters:
  • fn (Callable) – The function that is called for every element in the source list. The output is an iterator over the source list after fn(elem) is called.

  • source (Iterable) – Iterator whose elements are passed through the mapping function.

  • process_count (int, optional) – The number of processes to spawn. Ignored if a custom executor is provided. If None (the default), uses get_available_cpu_count() which respects SLURM allocations, CPU affinity, and SB_NUM_PROC env var. For CPU-bound tasks, it is generally not useful to exceed logical core count. For IO-bound tasks, it may make sense to as to limit the amount of time spent in iowait.

  • chunk_size (int) – How many elements are fed to the worker processes at once. A value of 8 is generally fine. Low values may increase overhead and reduce CPU occupancy.

  • queue_size (int) – Number of chunks to be waited for on the main process at a time. Low values increase the chance of the queue being starved, forcing workers to idle. Very high values may cause high memory usage, especially if the source iterable yields large objects.

  • executor (Optional[Executor]) – Allows providing an existing executor (preferably a ProcessPoolExecutor). If None (the default), a process pool will be spawned for this mapping task and will be shut down after.

  • progress_bar (bool) – Whether to show a tqdm progress bar.

  • progress_bar_kwargs (dict) – A dict of keyword arguments that is forwarded to tqdm when progress_bar == True. Allows overriding the defaults or e.g. specifying total when it cannot be inferred from the source iterable.

Yields:

The items from source processed by fn