"""A pipeline for data transformations.
Example
-------
>>> from hyperpyyaml import load_hyperpyyaml
>>> yamlstring = '''
... pipeline: !new:speechbrain.utils.data_pipeline.DataPipeline
... static_data_keys: [a, b]
... dynamic_items:
... - func: !name:operator.add
... takes: ["a", "b"]
... provides: foo
... - func: !name:operator.sub
... takes: ["foo", "b"]
... provides: bar
... output_keys: ["foo", "bar"]
... '''
>>> hparams = load_hyperpyyaml(yamlstring)
>>> hparams["pipeline"]({"a": 1, "b": 2})
{'foo': 3, 'bar': 1}
Author:
* Aku Rouhe
* Peter Plantinga
"""
import inspect
import pathlib
from dataclasses import dataclass
import torch
from speechbrain.utils.depgraph import DependencyGraph
[docs]
@dataclass
class StaticItem:
"""Data class that represents a static item.
Static items are in-memory items so they don't need to be computed
dynamically.
"""
key: str
[docs]
class DynamicItem:
"""Essentially represents a data transformation function.
A DynamicItem takes some arguments and computes its value dynamically when
called. A straight-forward use-case is to load something from disk
dynamically; take the path and provide the loaded data.
Instances of this class are often created implicitly via the
@takes and @provides decorators or otherwise from specifying the taken and
provided arguments and the function.
A counterpart is the GeneratorDynamicItem, which should be used for
generator functions.
Arguments
---------
takes : list
The keys of the items that this needs to compute its output.
func : callable
The function that is used to compute the output.
provides : list
The keys that this provides.
"""
def __init__(self, takes=None, func=None, provides=None):
self.takes = takes if takes is not None else []
self.func = func
self.provides = provides if provides is not None else []
def __call__(self, *args):
return self.func(*args)
# The next methods are more about supporting GeneratorDynamicItems
[docs]
def next_takes(self):
"""The next argkeys to provide to this, when called."""
# Regular function DynamicItems always just need the same set of args
return self.takes
[docs]
def next_provides(self):
"""The next keys that this provides, when called."""
# Regular function DynamicItems always just provide the same set of keys
return self.provides
[docs]
def provided_in_order(self):
"""Assuming that this may need to be called multiple times; which keys
does it provide at that call. Returns a list, with len equal to the
number of times that this may be called.
"""
# Regular function DynamicItems are only called once:
return [self.provides]
[docs]
def reset(self):
"""Signals that this will not be called any more times on this pipeline
call.
"""
# Regular function DynamicItems don't need special resets.
pass
[docs]
class GeneratorDynamicItem(DynamicItem):
"""Essentially represents a multi-step data transformation.
This is the generator function counterpart for DynamicItem (which should be
used for regular functions).
A GeneratorDynamicItem first takes some arguments and then uses those in
multiple steps to incrementally compute some values when called.
A typical use-case is a pipeline of transformations on data: e.g. taking in
text as a string, and first a tokenized version, and then on the second
call providing an integer-encoded version. This can be used even though the
integer-encoder needs to be trained on the first outputs.
The main benefit is to be able to define the pipeline in a clear function,
even if parts of the pipeline depend on others for their initialization.
Arguments
---------
*args : tuple
Forwarded to parent class
**kwargs : tuple
Forwarded to parent class
Example
-------
>>> lab2ind = {}
>>> def text_pipeline(text):
... text = text.lower().strip()
... text = "".join(c for c in text if c.isalpha() or c == " ")
... words = text.split()
... yield words
... encoded = [lab2ind[word] for word in words]
... yield encoded
>>> item = GeneratorDynamicItem(
... func=text_pipeline,
... takes=["text"],
... provides=["words", "words_encoded"],
... )
>>> # First create the integer-encoding:
>>> ind = 1
>>> for token in item("Is this it? - This is it."):
... if token not in lab2ind:
... lab2ind[token] = ind
... ind += 1
>>> # Now the integers can be encoded!
>>> item()
[1, 2, 3, 2, 1, 3]
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# Doesn't generate electricity, only stores the currently active
# generator:
self.current_generator = None
self.num_provided_items = 0
def __call__(self, *args):
if self.num_provided_items == len(self.provides):
raise RuntimeError("DynamicItemPipeline called too many times!")
if not self.current_generator:
self.current_generator = self.func(*args)
# NOTE: Not supporting sending new values to the pipeline.
out = next(self.current_generator)
self.num_provided_items += 1
return out
[docs]
def next_takes(self):
"""The next argkeys to provide to this, when called."""
if not self.current_generator:
return self.takes
else:
return []
[docs]
def next_provides(self):
"""The next keys that this provides, when called."""
keys = self.provides[self.num_provided_items]
# Support multiple yielded values like:
# @yields("wav_read", ["left_ch", "right_ch"])
if isinstance(keys, str):
return [keys]
else:
return keys
[docs]
def provided_in_order(self):
"""Assuming that this may need to be called multiple times; which keys
does it provide at that call. Returns a list, with len equal to the
number of times that this may be called.
"""
in_order = []
for keys in self.provides:
# Support multiple yielded values like:
# @provides("wav_read", ["left_ch", "right_ch"])
if isinstance(keys, str):
in_order.append([keys])
else:
in_order.append(keys)
return in_order
[docs]
def reset(self):
"""Signals that this will not be called any more times on this pipeline
call.
"""
if self.current_generator is not None:
self.current_generator.close()
self.current_generator = None
self.num_provided_items = 0
[docs]
class CachedDynamicItem(DynamicItem):
"""Caches the result of a data transform to the filesystem, so that
expensive data transforms can be done only once.
NOTE: Uses each item's unique "id" to determine location on disk. This
means that the id must be a valid filename on your system, and that
only one item can be stored per id -- so each cached item must have
its own storage location.
PyTorch save() and load() are used for caching. File storage tree
after caching:
cache_location/
<id_1>.pt
<id_2>.pt
...
Arguments
---------
cache_location : os.PathLike
Storage folder for containing each item's cached output.
*args
**kwargs
Forwarded to DynamicItem constructor
"""
def __init__(self, cache_location, *args, **kwargs):
super().__init__(*args, **kwargs)
if not self.takes:
raise ValueError(
"Expected 'takes' list to have at least one item, but 'takes' is empty"
)
if not self.takes[0] == "id":
raise ValueError("First item in 'takes' list must be 'id'")
self.cache_location = pathlib.Path(cache_location)
self.cache_location.mkdir(parents=True, exist_ok=True)
[docs]
def __call__(self, *args):
"""If cached, return cached result. Otherwise, compute, cache, and return."""
# If its already in the cache, load and return
if self._is_cached(args[0]):
return self._load(args[0])
# Not cached, compute and save to cache
result = self.func(*args)
self._cache(result, args[0])
return result
def _is_cached(self, uid):
"""Test whether uid is cached."""
return self._uid2path(uid).exists()
def _load(self, uid):
"""Load result from cache"""
return torch.load(self._uid2path(uid))
def _cache(self, result, uid):
"""Save the result to the cache"""
torch.save(result, self._uid2path(uid))
def _uid2path(self, uid):
"""Convert a uid to a cache location"""
return self.cache_location / (uid + ".pt")
[docs]
@classmethod
def cache(cls, save_dir):
"""Decorator which takes a DynamicItem and creates a CachedDynamicItem
Arguments
---------
save_dir : os.PathLike
Path to the directory where the cache should be stored.
Example
-------
>>> import os
>>> tempdir = getfixture("tmpdir")
>>> @CachedDynamicItem.cache(tempdir)
... @takes("id", "text")
... @provides("tokenized")
... def tokenize(id, text):
... return text.strip().lower().split()
>>> os.listdir(tempdir)
[]
>>> tokenize("utt_id", "\tThis Example gets tokenized")
['this', 'example', 'gets', 'tokenized']
>>> os.listdir(tempdir)
['utt_id.pt']
>>> torch.load(tempdir / "utt_id.pt")
['this', 'example', 'gets', 'tokenized']
>>> # The output shouldn't change on the second call
>>> tokenize("utt_id", "\tThis Example gets tokenized")
['this', 'example', 'gets', 'tokenized']
>>> # NOTE: NO INVALID CACHE DETECTION
>>> tokenize("utt_id", "Different sentence but same result")
['this', 'example', 'gets', 'tokenized']
"""
def decorator(obj):
"""Decorator definition."""
if not isinstance(obj, DynamicItem):
raise ValueError("Can only cache a DynamicItem")
return cls(
save_dir, takes=obj.takes, func=obj.func, provides=obj.provides
)
return decorator
[docs]
def takes(*argkeys):
"""Decorator which makes a DynamicItem and specifies its argkeys.
If the wrapped object is a generator function (has a yield statement),
Creates a GeneratorDynamicItem. If the object is already a DynamicItem,
just specifies the argkeys for that. Otherwise creates a new regular
DynamicItem, with argkeys specified.
The args are always passed to the function at the start. Generators could
support sending new arguments, but for such use cases, simply create a new
dynamic item. The GeneratorDynamicItem class is meant for pipelines which
take in an input and transform it in multiple ways, where the intermediate
representations may be needed for e.g. fitting a BPE segmenter.
Arguments
---------
*argkeys : tuple
The data keys expected as input
Returns
-------
The decorated function, with input argkeys specified
Example
-------
>>> @takes("text")
... def tokenize(text):
... return text.strip().lower().split()
>>> tokenize.provides = ["tokenized"]
>>> tokenize("\tThis Example gets tokenized")
['this', 'example', 'gets', 'tokenized']
"""
def decorator(obj):
"""Decorator definition."""
if isinstance(obj, DynamicItem):
if obj.takes:
raise ValueError("Can't overwrite DynamicItem.takes")
obj.takes = argkeys
return obj
elif inspect.isgeneratorfunction(obj):
return GeneratorDynamicItem(takes=argkeys, func=obj)
else:
return DynamicItem(takes=argkeys, func=obj)
return decorator
takes_decorator = takes # Just for DataPipeline.add_dynamic_item
[docs]
def provides(*output_keys):
"""Decorator which makes a DynamicItem and specifies what keys it provides.
If the wrapped object is a generator function (has a yield statement),
Creates a GeneratorDynamicItem. If the object is already a DynamicItem,
just specifies the provided keys for that. Otherwise creates a new regular
DynamicItem, with provided keys specified.
Arguments
---------
*output_keys : tuple
The data keys to be produced by this function
Returns
-------
The decorated function, with output keys specified
NOTE
----
The behavior is slightly different for generators and regular functions, if
many output keys are specified, e.g. @provides("signal", "mfcc"). Regular
functions should return a tuple with len equal to len(output_keys), while
generators should yield the items one by one.
>>> @provides("signal", "feat")
... def read_feat():
... wav = [0.1, 0.2, -0.1]
... feat = [s**2 for s in wav]
... return wav, feat
>>> @provides("signal", "feat")
... def read_feat():
... wav = [0.1, 0.2, -0.1]
... yield wav
... feat = [s**2 for s in wav]
... yield feat
If multiple keys are yielded at once, write e.g.,
>>> @provides("wav_read", ["left_channel", "right_channel"])
... def read_multi_channel():
... wav = [[0.1, 0.2, -0.1], [0.2, 0.1, -0.1]]
... yield wav
... yield wav[0], wav[1]
"""
def decorator(obj):
"""Decorator definition."""
if isinstance(obj, DynamicItem):
if obj.provides:
raise ValueError("Can't overwrite DynamicItem provides-list.")
obj.provides = output_keys
return obj
elif inspect.isgeneratorfunction(obj):
return GeneratorDynamicItem(func=obj, provides=output_keys)
else:
return DynamicItem(func=obj, provides=output_keys)
return decorator
provides_decorator = provides # Just for DataPipeline.add_dynamic_item
[docs]
class DataPipeline:
"""Organises data transformations into a pipeline.
Arguments
---------
static_data_keys: list
The keys which are provided as data
dynamic_items: list
A list of mappings with "func", "takes", and "provides"
output_keys: list
The keys to use as outputs
Example
-------
>>> pipeline = DataPipeline(
... static_data_keys=["text"],
... dynamic_items=[
... {
... "func": lambda x: x.lower(),
... "takes": "text",
... "provides": "foo",
... },
... {"func": lambda x: x[::-1], "takes": "foo", "provides": "bar"},
... ],
... output_keys=["bar"],
... )
>>> pipeline({"text": "Test"})
{'bar': 'tset'}
"""
def __init__(self, static_data_keys, dynamic_items=None, output_keys=None):
if dynamic_items is None:
dynamic_items = []
if output_keys is None:
output_keys = []
self.dg = DependencyGraph()
self._exec_order = None
self.key_to_node = {}
self.unaccounted_keys = {}
self.dynamic_items = []
self.output_mapping = {}
self.add_static_keys(static_data_keys)
self.add_dynamic_items(dynamic_items)
self.set_output_keys(output_keys)
[docs]
def add_static_keys(self, static_keys):
"""Informs the pipeline about static items.
Static items are the ones provided to __call__ as data.
"""
for key in static_keys:
node_id = self.dg.add_node(data=StaticItem(key=key))
self.key_to_node[key] = node_id
[docs]
def add_dynamic_items(self, dynamic_items):
"""Add multiple dynamic items at once."""
for item in dynamic_items:
try:
self.add_dynamic_item(**item)
except TypeError:
self.add_dynamic_item(item)
[docs]
def add_dynamic_item(self, func, takes=None, provides=None):
"""Adds a dynamic item to the Pipeline.
Two calling conventions. For DynamicItem objects, just use:
add_dynamic_item(dynamic_item)
But otherwise, should use:
add_dynamic_item(func, takes, provides)
Arguments
---------
func : callable, DynamicItem
If a DynamicItem is given, adds that directly. Otherwise a
DynamicItem is created, and this specifies the callable to use. If
a generator function is given, then create a GeneratorDynamicItem.
Otherwise creates a normal DynamicItem.
takes : list, str
List of keys. When func is called, each key is resolved to
either an entry in the data or the output of another dynamic_item.
The func is then called with these as positional arguments,
in the same order as specified here.
A single key can be given as a bare string.
provides : str, list
For regular functions, the key or list of keys that it provides.
If you give a generator function, key or list of keys that it
yields, in order. Also see the provides decorator.
A single key can be given as a bare string.
Returns
-------
None
"""
if isinstance(func, DynamicItem):
if takes is not None or provides is not None:
raise ValueError(
"If providing a DynamicItem directly, don't "
"specify takes or provides"
)
else:
self._add_dynamic_item_object(func)
return
if isinstance(takes, str):
takes = [takes]
if isinstance(provides, str):
provides = [provides]
di = takes_decorator(*takes)(provides_decorator(*provides)(func))
self._add_dynamic_item_object(di)
def _add_dynamic_item_object(self, obj):
"""Internally adds the object.
There is a node in the dependency graph for each call of the
DynamicItem. Each call may return multiple keys and depend on multiple
keys. An internal dict maps key to the id of the node that produces it.
"""
if not obj.provides:
raise ValueError(
"Won't add redundant dynamic item which doesn't "
"provide anything."
)
depended = []
for key in obj.takes:
# Might not be accounted for, yet:
if key not in self.key_to_node:
dependee_keys = self.unaccounted_keys.setdefault(key, [])
dependee_keys.extend(obj.next_provides())
else:
depended.append(self.key_to_node[key])
for provided in obj.provided_in_order():
node_id = self.dg.add_node(data=obj)
for key in provided:
self.key_to_node[key] = node_id
# This key may also be unaccounted for, so account for it now:
if key in self.unaccounted_keys:
for dependee_key in self.unaccounted_keys[key]:
dependee_node = self.key_to_node[dependee_key]
self.dg.add_edge(dependee_node, node_id)
del self.unaccounted_keys[key] # Now accounted for!
for dep_id in depended:
self.dg.add_edge(node_id, dep_id)
# Next call will depend on this call:
depended = [node_id]
# Keep a reference to the item in this object, as well:
self.dynamic_items.append(obj)
[docs]
def set_output_keys(self, keys):
"""Use this to change the output keys.
Also re-evaluates execution order.
So if you request different outputs, some parts of the
data pipeline may be skipped.
Arguments
---------
keys : dict, list, None
List of keys (str) to produce in output.
If a dict is given; it is used to map internal keys to output keys.
From the output_keys dict key:value pairs the key appears outside,
and value is the internal key.
"""
self.output_mapping = self._output_keys_to_mapping(keys)
self._exec_order = None
@staticmethod
def _output_keys_to_mapping(keys):
# Ensure a mapping (accept a list for convenience, too)
if keys is None:
output_mapping = {}
elif isinstance(keys, dict):
output_mapping = keys
else:
output_mapping = {key: key for key in keys}
return output_mapping
[docs]
def compute_outputs(self, data):
"""
Arguments
---------
data : dict
Dictionary with data entries by key.
Returns
-------
dict
With the keys that were set.
"""
if self._exec_order is None:
self._prepare_run(data)
return self._compute(data, self._exec_order, self.output_mapping)
[docs]
def compute_specific(self, keys, data):
"""Compute output of specific item, without changing output_keys."""
output_mapping = self._output_keys_to_mapping(keys)
order = self.dg.get_evaluation_order(
selected_keys=self.get_selected_node_ids(keys)
)
return self._compute(data, order, output_mapping)
def _compute(self, data, order, output_mapping):
if self.unaccounted_keys:
MSG = "These keys are still unaccounted for in the data pipeline: "
MSG += ", ".join(self.unaccounted_keys)
raise RuntimeError(MSG)
intermediate = {}
for node_id, edges, item in order:
if isinstance(item, StaticItem):
# Static item in data.
# Just check that key is found.
try:
data[item.key]
continue
except KeyError:
raise KeyError(f"Expected key {item.key} in data!")
# A dynamic item, which we should compute:
args = [
data[argkey] if argkey in data else intermediate[argkey]
for argkey in item.next_takes()
]
# This needs to be called BEFORE the dynamic item is called.
provided_keys = item.next_provides()
values = item(*args) # Call the DynamicItem to produce output
# If there is just one output value, wrap in a list so that
# it can be zipped as well:
if len(provided_keys) == 1:
values = [values]
intermediate.update(zip(provided_keys, values))
for dynamic_item in self.dynamic_items:
dynamic_item.reset()
return {
outkey: data[inkey] if inkey in data else intermediate[inkey]
for outkey, inkey in output_mapping.items()
}
[docs]
def get_selected_node_ids(self, selected_keys):
"""Translates selected keys to dependency graph keys."""
return [self.key_to_node[key] for key in selected_keys]
def __call__(self, data):
return self.compute_outputs(data)
def _prepare_run(self, data):
self._exec_order = list(
self.dg.get_evaluation_order(
self.get_selected_node_ids(self.output_mapping.values())
)
)