speechbrain.utils.parallel moduleο
Parallel processing tools to help speed up certain tasks like data preprocessing.
- Authors
Sylvain de Langen 2023
Summaryο
Classes:
Context manager that .cancel()s all elements of a list upon exit. |
Functions:
Return the number of CPUs available to the current process. |
|
Maps iterable items with a function, processing chunks of items in parallel with multiple processes and displaying progress with tqdm. |
Referenceο
- speechbrain.utils.parallel.get_available_cpu_count() int[source]ο
Return the number of CPUs available to the current process.
This function provides a reliable way to determine CPU count that respects: 1. User override via SB_NUM_PROC environment variable 2. CPU affinity limits (e.g., SLURM allocations) 3. System CPU count as fallback
The fallback hierarchy is: 1. SB_NUM_PROC environment variable (if set and valid) 2. os.process_cpu_count() (Python 3.13+, respects affinity) 3. len(os.sched_getaffinity(0)) (Unix, respects SLURM/cgroups) 4. os.cpu_count() (fallback for Windows or when above fail)
- Returns:
The number of CPUs available. Falls back to 1 if detection fails.
- Return type:
Examples
>>> # With environment variable override: >>> import os >>> os.environ["SB_NUM_PROC"] = "2" >>> get_available_cpu_count() 2
- class speechbrain.utils.parallel.CancelFuturesOnExit(future_list)[source]ο
Bases:
objectContext manager that .cancel()s all elements of a list upon exit. This is used to abort futures faster when raising an exception.
- speechbrain.utils.parallel.parallel_map(fn: Callable[[Any], Any], source: Iterable[Any], process_count: int | None = None, chunk_size: int = 8, queue_size: int = 128, executor: Executor | None = None, progress_bar: bool = True, progress_bar_kwargs: dict = {'smoothing': 0.02})[source]ο
Maps iterable items with a function, processing chunks of items in parallel with multiple processes and displaying progress with tqdm.
Processed elements will always be returned in the original, correct order. Unlike
ProcessPoolExecutor.map, elements are produced AND consumed lazily.- Parameters:
fn (Callable) β The function that is called for every element in the source list. The output is an iterator over the source list after fn(elem) is called.
source (Iterable) β Iterator whose elements are passed through the mapping function.
process_count (int, optional) β The number of processes to spawn. Ignored if a custom executor is provided. If None (the default), uses
get_available_cpu_count()which respects SLURM allocations, CPU affinity, and SB_NUM_PROC env var. For CPU-bound tasks, it is generally not useful to exceed logical core count. For IO-bound tasks, it may make sense to as to limit the amount of time spent in iowait.chunk_size (int) β How many elements are fed to the worker processes at once. A value of 8 is generally fine. Low values may increase overhead and reduce CPU occupancy.
queue_size (int) β Number of chunks to be waited for on the main process at a time. Low values increase the chance of the queue being starved, forcing workers to idle. Very high values may cause high memory usage, especially if the source iterable yields large objects.
executor (Optional[Executor]) β Allows providing an existing executor (preferably a ProcessPoolExecutor). If None (the default), a process pool will be spawned for this mapping task and will be shut down after.
progress_bar (bool) β Whether to show a tqdm progress bar.
progress_bar_kwargs (dict) β A dict of keyword arguments that is forwarded to tqdm when
progress_bar == True. Allows overriding the defaults or e.g. specifyingtotalwhen it cannot be inferred from the source iterable.
- Yields:
The items from source processed by fn