Source code for speechbrain.dataio.dataio

Data reading and writing.

 * Mirco Ravanelli 2020
 * Aku Rouhe 2020
 * Ju-Chieh Chou 2020
 * Samuele Cornell 2020
 * Abdel HEBA 2020

import os
import torch
import logging
import numpy as np
import pickle
import hashlib
import csv
import time
import torchaudio
import json
import re
from speechbrain.utils.torch_audio_backend import check_torchaudio_backend

logger = logging.getLogger(__name__)

[docs]def load_data_json(json_path, replacements={}): """Loads JSON and recursively formats string values. Arguments ---------- json_path : str Path to CSV file. replacements : dict (Optional dict), e.g., {"data_folder": "/home/speechbrain/data"}. This is used to recursively format all string values in the data. Returns ------- dict JSON data with replacements applied. Example ------- >>> json_spec = '''{ ... "ex1": {"files": ["{ROOT}/mic1/ex1.wav", "{ROOT}/mic2/ex1.wav"], "id": 1}, ... "ex2": {"files": [{"spk1": "{ROOT}/ex2.wav"}, {"spk2": "{ROOT}/ex2.wav"}], "id": 2} ... } ... ''' >>> tmpfile = getfixture('tmpdir') / "test.json" >>> with open(tmpfile, "w") as fo: ... _ = fo.write(json_spec) >>> data = load_data_json(tmpfile, {"ROOT": "/home"}) >>> data["ex1"]["files"][0] '/home/mic1/ex1.wav' >>> data["ex2"]["files"][1]["spk2"] '/home/ex2.wav' """ with open(json_path, "r") as f: out_json = json.load(f) _recursive_format(out_json, replacements) return out_json
def _recursive_format(data, replacements): # Data: dict or list, replacements : dict # Replaces string keys in replacements by their values # at all levels of data (in str values) # Works in-place. if isinstance(data, dict): for key, item in data.items(): if isinstance(item, dict) or isinstance(item, list): _recursive_format(item, replacements) elif isinstance(item, str): data[key] = item.format_map(replacements) # If not dict, list or str, do nothing if isinstance(data, list): for i, item in enumerate(data): if isinstance(item, dict) or isinstance(item, list): _recursive_format(item, replacements) elif isinstance(item, str): data[i] = item.format_map(replacements) # If not dict, list or str, do nothing
[docs]def load_data_csv(csv_path, replacements={}): """Loads CSV and formats string values. Uses the SpeechBrain legacy CSV data format, where the CSV must have an 'ID' field. If there is a field called duration, it is interpreted as a float. The rest of the fields are left as they are (legacy _format and _opts fields are not used to load the data in any special way). Bash-like string replacements with $to_replace are supported. Arguments ---------- csv_path : str Path to CSV file. replacements : dict (Optional dict), e.g., {"data_folder": "/home/speechbrain/data"} This is used to recursively format all string values in the data. Returns ------- dict CSV data with replacements applied. Example ------- >>> csv_spec = '''ID,duration,wav_path ... utt1,1.45,$data_folder/utt1.wav ... utt2,2.0,$data_folder/utt2.wav ... ''' >>> tmpfile = getfixture("tmpdir") / "test.csv" >>> with open(tmpfile, "w") as fo: ... _ = fo.write(csv_spec) >>> data = load_data_csv(tmpfile, {"data_folder": "/home"}) >>> data["utt1"]["wav_path"] '/home/utt1.wav' """ with open(csv_path, newline="") as csvfile: result = {} reader = csv.DictReader(csvfile, skipinitialspace=True) variable_finder = re.compile(r"\$([\w.]+)") for row in reader: # ID: try: data_id = row["ID"] del row["ID"] # This is used as a key in result, instead. except KeyError: raise KeyError( "CSV has to have an 'ID' field, with unique ids" " for all data points" ) if data_id in result: raise ValueError(f"Duplicate id: {data_id}") # Replacements: for key, value in row.items(): try: row[key] = variable_finder.sub( lambda match: str(replacements[match[1]]), value ) except KeyError: raise KeyError( f"The item {value} requires replacements " "which were not supplied." ) # Duration: if "duration" in row: row["duration"] = float(row["duration"]) result[data_id] = row return result
[docs]def read_audio(waveforms_obj): """General audio loading, based on a custom notation. Expected use case is in conjunction with Datasets specified by JSON. The custom notation: The annotation can be just a path to a file: "/path/to/wav1.wav" Or can specify more options in a dict: {"file": "/path/to/wav2.wav", "start": 8000, "stop": 16000 } Arguments ---------- waveforms_obj : str, dict Audio reading annotation, see above for format. Returns ------- torch.Tensor Audio tensor with shape: (samples, ). Example ------- >>> dummywav = torch.rand(16000) >>> import os >>> tmpfile = os.path.join(str(getfixture('tmpdir')), "wave.wav") >>> write_audio(tmpfile, dummywav, 16000) >>> asr_example = { "wav": tmpfile, "spk_id": "foo", "words": "foo bar"} >>> loaded = read_audio(asr_example["wav"]) >>> loaded.allclose(dummywav.squeeze(0),atol=1e-4) # replace with eq with sox_io backend True """ if isinstance(waveforms_obj, str): audio, _ = torchaudio.load(waveforms_obj) return audio.transpose(0, 1).squeeze(1) path = waveforms_obj["file"] start = waveforms_obj.get("start", 0) # Default stop to start -> if not specified, num_frames becomes 0, # which is the torchaudio default stop = waveforms_obj.get("stop", start) num_frames = stop - start audio, fs = torchaudio.load(path, num_frames=num_frames, frame_offset=start) audio = audio.transpose(0, 1) return audio.squeeze(1)
[docs]def read_audio_multichannel(waveforms_obj): """General audio loading, based on a custom notation. Expected use case is in conjunction with Datasets specified by JSON. The custom notation: The annotation can be just a path to a file: "/path/to/wav1.wav" Multiple (possibly multi-channel) files can be specified, as long as they have the same length: {"files": [ "/path/to/wav1.wav", "/path/to/wav2.wav" ] } Or you can specify a single file more succinctly: {"files": "/path/to/wav2.wav"} Offset number samples and stop number samples also can be specified to read only a segment within the files. {"files": [ "/path/to/wav1.wav", "/path/to/wav2.wav" ] "start": 8000 "stop": 16000 } Arguments ---------- waveforms_obj : str, dict Audio reading annotation, see above for format. Returns ------- torch.Tensor Audio tensor with shape: (samples, ). Example ------- >>> dummywav = torch.rand(16000, 2) >>> import os >>> tmpfile = os.path.join(str(getfixture('tmpdir')), "wave.wav") >>> write_audio(tmpfile, dummywav, 16000) >>> asr_example = { "wav": tmpfile, "spk_id": "foo", "words": "foo bar"} >>> loaded = read_audio(asr_example["wav"]) >>> loaded.allclose(dummywav.squeeze(0),atol=1e-4) # replace with eq with sox_io backend True """ if isinstance(waveforms_obj, str): audio, _ = torchaudio.load(waveforms_obj) return audio.transpose(0, 1) files = waveforms_obj["files"] if not isinstance(files, list): files = [files] waveforms = [] start = waveforms_obj.get("start", 0) # Default stop to start -> if not specified, num_frames becomes 0, # which is the torchaudio default stop = waveforms_obj.get("stop", start - 1) num_frames = stop - start for f in files: audio, fs = torchaudio.load( f, num_frames=num_frames, frame_offset=start ) waveforms.append(audio) out =, 0) return out.transpose(0, 1)
[docs]def write_audio(filepath, audio, samplerate): """Write audio on disk. It is basically a wrapper to support saving audio signals in the speechbrain format (audio, channels). Arguments --------- filepath: path Path where to save the audio file. audio : torch.Tensor Audio file in the expected speechbrain format (signal, channels). samplerate: int Sample rate (e.g., 16000). Example ------- >>> import os >>> tmpfile = os.path.join(str(getfixture('tmpdir')), "wave.wav") >>> dummywav = torch.rand(16000, 2) >>> write_audio(tmpfile, dummywav, 16000) >>> loaded = read_audio(tmpfile) >>> loaded.allclose(dummywav,atol=1e-4) # replace with eq with sox_io backend True """ if len(audio.shape) == 2: audio = audio.transpose(0, 1) elif len(audio.shape) == 1: audio = audio.unsqueeze(0), audio, samplerate)
[docs]def load_pickle(pickle_path): """Utility function for loading .pkl pickle files. Arguments --------- pickle_path : str Path to pickle file. Returns ------- out : object Python object loaded from pickle. """ with open(pickle_path, "rb") as f: out = pickle.load(f) return out
[docs]def to_floatTensor(x: (list, tuple, np.ndarray)): """ Arguments --------- x : (list, tuple, np.ndarray) Input data to be converted to torch float. Returns ------- tensor : torch.tensor Data now in torch.tensor float datatype. """ if isinstance(x, torch.Tensor): return x.float() if isinstance(x, np.ndarray): return torch.from_numpy(x).float() else: return torch.tensor(x, dtype=torch.float)
[docs]def to_doubleTensor(x: (list, tuple, np.ndarray)): """ Arguments --------- x : (list, tuple, np.ndarray) Input data to be converted to torch double. Returns ------- tensor : torch.tensor Data now in torch.tensor double datatype. """ if isinstance(x, torch.Tensor): return x.double() if isinstance(x, np.ndarray): return torch.from_numpy(x).double() else: return torch.tensor(x, dtype=torch.double)
[docs]def to_longTensor(x: (list, tuple, np.ndarray)): """ Arguments --------- x : (list, tuple, np.ndarray) Input data to be converted to torch long. Returns ------- tensor : torch.tensor Data now in torch.tensor long datatype. """ if isinstance(x, torch.Tensor): return x.long() if isinstance(x, np.ndarray): return torch.from_numpy(x).long() else: return torch.tensor(x, dtype=torch.long)
[docs]def convert_index_to_lab(batch, ind2lab): """Convert a batch of integer IDs to string labels. Arguments --------- batch : list List of lists, a batch of sequences. ind2lab : dict Mapping from integer IDs to labels. Returns ------- list List of lists, same size as batch, with labels from ind2lab. Example ------- >>> ind2lab = {1: "h", 2: "e", 3: "l", 4: "o"} >>> out = convert_index_to_lab([[4,1], [1,2,3,3,4]], ind2lab) >>> for seq in out: ... print("".join(seq)) oh hello """ return [[ind2lab[int(index)] for index in seq] for seq in batch]
[docs]def relative_time_to_absolute(batch, relative_lens, rate): """Converts SpeechBrain style relative length to the absolute duration. Operates on batch level. Arguments --------- batch : torch.tensor Sequences to determine the duration for. relative_lens : torch.tensor The relative length of each sequence in batch. The longest sequence in the batch needs to have relative length 1.0. rate : float The rate at which sequence elements occur in real-world time. Sample rate, if batch is raw wavs (recommended) or 1/frame_shift if batch is features. This has to have 1/s as the unit. Returns ------: torch.tensor Duration of each sequence in seconds. Example ------- >>> batch = torch.ones(2, 16000) >>> relative_lens = torch.tensor([3./4., 1.0]) >>> rate = 16000 >>> print(relative_time_to_absolute(batch, relative_lens, rate)) tensor([0.7500, 1.0000]) """ max_len = batch.shape[1] durations = torch.round(relative_lens * max_len) / rate return durations
[docs]class IterativeCSVWriter: """Write CSV files a line at a time. Arguments --------- outstream : file-object A writeable stream data_fields : list List of the optional keys to write. Each key will be expanded to the SpeechBrain format, producing three fields: key, key_format, key_opts. Example ------- >>> import io >>> f = io.StringIO() >>> writer = IterativeCSVWriter(f, ["phn"]) >>> print(f.getvalue()) ID,duration,phn,phn_format,phn_opts >>> writer.write("UTT1",2.5,"sil hh ee ll ll oo sil","string","") >>> print(f.getvalue()) ID,duration,phn,phn_format,phn_opts UTT1,2.5,sil hh ee ll ll oo sil,string, >>> writer.write(ID="UTT2",phn="sil ww oo rr ll dd sil",phn_format="string") >>> print(f.getvalue()) ID,duration,phn,phn_format,phn_opts UTT1,2.5,sil hh ee ll ll oo sil,string, UTT2,,sil ww oo rr ll dd sil,string, >>> writer.set_default('phn_format', 'string') >>> writer.write_batch(ID=["UTT3","UTT4"],phn=["ff oo oo", "bb aa rr"]) >>> print(f.getvalue()) ID,duration,phn,phn_format,phn_opts UTT1,2.5,sil hh ee ll ll oo sil,string, UTT2,,sil ww oo rr ll dd sil,string, UTT3,,ff oo oo,string, UTT4,,bb aa rr,string, """ def __init__(self, outstream, data_fields, defaults={}): self._outstream = outstream self.fields = ["ID", "duration"] + self._expand_data_fields(data_fields) self.defaults = defaults self._outstream.write(",".join(self.fields))
[docs] def set_default(self, field, value): """Sets a default value for the given CSV field. Arguments --------- field : str A field in the CSV. value The default value. """ if field not in self.fields: raise ValueError(f"{field} is not a field in this CSV!") self.defaults[field] = value
[docs] def write(self, *args, **kwargs): """Writes one data line into the CSV. Arguments --------- *args Supply every field with a value in positional form OR. **kwargs Supply certain fields by key. The ID field is mandatory for all lines, but others can be left empty. """ if args and kwargs: raise ValueError( "Use either positional fields or named fields, but not both." ) if args: if len(args) != len(self.fields): raise ValueError("Need consistent fields") to_write = [str(arg) for arg in args] if kwargs: if "ID" not in kwargs: raise ValueError("I'll need to see some ID") full_vals = self.defaults.copy() full_vals.update(kwargs) to_write = [str(full_vals.get(field, "")) for field in self.fields] self._outstream.write("\n") self._outstream.write(",".join(to_write))
[docs] def write_batch(self, *args, **kwargs): """Writes a batch of lines into the CSV. Here each argument should be a list with the same length. Arguments --------- *args Supply every field with a value in positional form OR. **kwargs Supply certain fields by key. The ID field is mandatory for all lines, but others can be left empty. """ if args and kwargs: raise ValueError( "Use either positional fields or named fields, but not both." ) if args: if len(args) != len(self.fields): raise ValueError("Need consistent fields") for arg_row in zip(*args): self.write(*arg_row) if kwargs: if "ID" not in kwargs: raise ValueError("I'll need to see some ID") keys = kwargs.keys() for value_row in zip(*kwargs.values()): kwarg_row = dict(zip(keys, value_row)) self.write(**kwarg_row)
@staticmethod def _expand_data_fields(data_fields): expanded = [] for data_field in data_fields: expanded.append(data_field) expanded.append(data_field + "_format") expanded.append(data_field + "_opts") return expanded
[docs]def write_txt_file(data, filename, sampling_rate=None): """Write data in text format. Arguments --------- data : str, list, torch.tensor, numpy.ndarray The data to write in the text file. filename : str Path to file where to write the data. sampling_rate : None Not used, just here for interface compatibility. Returns ------- None Example ------- >>> tmpdir = getfixture('tmpdir') >>> signal=torch.tensor([1,2,3,4]) >>> write_txt_file(signal, os.path.join(tmpdir, 'example.txt')) """ del sampling_rate # Not used. # Check if the path of filename exists os.makedirs(os.path.dirname(filename), exist_ok=True) with open(filename, "w") as fout: if isinstance(data, torch.Tensor): data = data.tolist() if isinstance(data, np.ndarray): data = data.tolist() if isinstance(data, list): for line in data: print(line, file=fout) if isinstance(data, str): print(data, file=fout)
[docs]def write_stdout(data, filename=None, sampling_rate=None): """Write data to standard output. Arguments --------- data : str, list, torch.tensor, numpy.ndarray The data to write in the text file. filename : None Not used, just here for compatibility. sampling_rate : None Not used, just here for compatibility. Returns ------- None Example ------- >>> tmpdir = getfixture('tmpdir') >>> signal = torch.tensor([[1,2,3,4]]) >>> write_stdout(signal, tmpdir + '/example.txt') [1, 2, 3, 4] """ # Managing Torch.Tensor if isinstance(data, torch.Tensor): data = data.tolist() # Managing np.ndarray if isinstance(data, np.ndarray): data = data.tolist() if isinstance(data, list): for line in data: print(line) if isinstance(data, str): print(data)
[docs]def length_to_mask(length, max_len=None, dtype=None, device=None): """Creates a binary mask for each sequence. Reference: Arguments --------- length : torch.LongTensor Containing the length of each sequence in the batch. Must be 1D. max_len : int Max length for the mask, also the size of the second dimension. dtype : torch.dtype, default: None The dtype of the generated mask. device: torch.device, default: None The device to put the mask variable. Returns ------- mask : tensor The binary mask. Example ------- >>> length=torch.Tensor([1,2,3]) >>> mask=length_to_mask(length) >>> mask tensor([[1., 0., 0.], [1., 1., 0.], [1., 1., 1.]]) """ assert len(length.shape) == 1 if max_len is None: max_len = length.max().long().item() # using arange to generate mask mask = torch.arange( max_len, device=length.device, dtype=length.dtype ).expand(len(length), max_len) < length.unsqueeze(1) if dtype is None: dtype = length.dtype if device is None: device = length.device mask = torch.as_tensor(mask, dtype=dtype, device=device) return mask
[docs]def read_kaldi_lab(kaldi_ali, kaldi_lab_opts): """Read labels in kaldi format. Uses kaldi IO. Arguments --------- kaldi_ali : str Path to directory where kaldi alignments are stored. kaldi_lab_opts : str A string that contains the options for reading the kaldi alignments. Returns ------- lab : dict A dictionary containing the labels. Note ---- This depends on kaldi-io-for-python. Install it separately. See: Example ------- This example requires kaldi files. ``` lab_folder = '/home/kaldi/egs/TIMIT/s5/exp/dnn4_pretrain-dbn_dnn_ali' read_kaldi_lab(lab_folder, 'ali-to-pdf') ``` """ # EXTRA TOOLS try: import kaldi_io except ImportError: raise ImportError("Could not import kaldi_io. Install it to use this.") # Reading the Kaldi labels lab = { k: v for k, v in kaldi_io.read_vec_int_ark( "gunzip -c " + kaldi_ali + "/ali*.gz | " + kaldi_lab_opts + " " + kaldi_ali + "/final.mdl ark:- ark:-|" ) } return lab
[docs]def get_md5(file): """Get the md5 checksum of an input file. Arguments --------- file : str Path to file for which compute the checksum. Returns ------- md5 Checksum for the given filepath. Example ------- >>> get_md5('tests/samples/single-mic/example1.wav') 'c482d0081ca35302d30d12f1136c34e5' """ # Lets read stuff in 64kb chunks! BUF_SIZE = 65536 md5 = hashlib.md5() # Computing md5 with open(file, "rb") as f: while True: data = if not data: break md5.update(data) return md5.hexdigest()
[docs]def save_md5(files, out_file): """Saves the md5 of a list of input files as a pickled dict into a file. Arguments --------- files : list List of input files from which we will compute the md5. outfile : str The path where to store the output pkl file. Returns ------- None Example: >>> files = ['tests/samples/single-mic/example1.wav'] >>> tmpdir = getfixture('tmpdir') >>> save_md5(files, os.path.join(tmpdir, "md5.pkl")) """ # Initialization of the dictionary md5_dict = {} # Computing md5 for all the files in the list for file in files: md5_dict[file] = get_md5(file) # Saving dictionary in pkl format save_pkl(md5_dict, out_file)
[docs]def save_pkl(obj, file): """Save an object in pkl format. Arguments --------- obj : object Object to save in pkl format file : str Path to the output file sampling_rate : int Sampling rate of the audio file, TODO: this is not used? Example ------- >>> tmpfile = os.path.join(getfixture('tmpdir'), "example.pkl") >>> save_pkl([1, 2, 3, 4, 5], tmpfile) >>> load_pkl(tmpfile) [1, 2, 3, 4, 5] """ with open(file, "wb") as f: pickle.dump(obj, f)
[docs]def load_pkl(file): """Loads a pkl file. For an example, see `save_pkl`. Arguments --------- file : str Path to the input pkl file. Returns ------- The loaded object. """ # Deals with the situation where two processes are trying # to access the same label dictionary by creating a lock count = 100 while count > 0: if os.path.isfile(file + ".lock"): time.sleep(1) count -= 1 else: break try: open(file + ".lock", "w").close() with open(file, "rb") as f: return pickle.load(f) finally: if os.path.isfile(file + ".lock"): os.remove(file + ".lock")
[docs]def prepend_bos_token(label, bos_index): """Create labels with <bos> token at the beginning. Arguments --------- label : torch.IntTensor Containing the original labels. Must be of size: [batch_size, max_length]. bos_index : int The index for <bos> token. Returns ------- new_label : tensor The new label with <bos> at the beginning. Example ------- >>> label=torch.LongTensor([[1,0,0], [2,3,0], [4,5,6]]) >>> new_label=prepend_bos_token(label, bos_index=7) >>> new_label tensor([[7, 1, 0, 0], [7, 2, 3, 0], [7, 4, 5, 6]]) """ new_label = label.long().clone() batch_size = label.shape[0] bos = new_label.new_zeros(batch_size, 1).fill_(bos_index) new_label =[bos, new_label], dim=1) return new_label
[docs]def append_eos_token(label, length, eos_index): """Create labels with <eos> token appended. Arguments --------- label : torch.IntTensor Containing the original labels. Must be of size: [batch_size, max_length] length : torch.LongTensor Containing the original length of each label sequences. Must be 1D. eos_index : int The index for <eos> token. Returns ------- new_label : tensor The new label with <eos> appended. Example ------- >>> label=torch.IntTensor([[1,0,0], [2,3,0], [4,5,6]]) >>> length=torch.LongTensor([1,2,3]) >>> new_label=append_eos_token(label, length, eos_index=7) >>> new_label tensor([[1, 7, 0, 0], [2, 3, 7, 0], [4, 5, 6, 7]], dtype=torch.int32) """ new_label = batch_size = label.shape[0] pad = new_label.new_zeros(batch_size, 1) new_label =[new_label, pad], dim=1) new_label[torch.arange(batch_size), length.long()] = eos_index return new_label
[docs]def merge_char(sequences, space="_"): """Merge characters sequences into word sequences. Arguments --------- sequences : list Each item contains a list, and this list contains a character sequence. space : string The token represents space. Default: _ Returns ------- The list contains word sequences for each sentence. Example ------- >>> sequences = [["a", "b", "_", "c", "_", "d", "e"], ["e", "f", "g", "_", "h", "i"]] >>> results = merge_char(sequences) >>> results [['ab', 'c', 'de'], ['efg', 'hi']] """ results = [] for seq in sequences: words = "".join(seq).split(space) results.append(words) return results
[docs]def merge_csvs(data_folder, csv_lst, merged_csv): """Merging several csv files into one file. Arguments --------- data_folder : string The folder to store csv files to be merged and after merging. csv_lst : list Filenames of csv file to be merged. merged_csv : string The filename to write the merged csv file. Example ------- >>> merge_csvs("tests/samples/annotation/", ... ["speech.csv", "speech.csv"], ... "test_csv_merge.csv") """ write_path = os.path.join(data_folder, merged_csv) if os.path.isfile(write_path):"Skipping merging. Completed in previous run.") with open(os.path.join(data_folder, csv_lst[0])) as f: header = f.readline() lines = [] for csv_file in csv_lst: with open(os.path.join(data_folder, csv_file)) as f: for i, line in enumerate(f): if i == 0: # Checking header if line != header: raise ValueError( "Different header for " f"{csv_lst[0]} and {csv}." ) continue lines.append(line) with open(write_path, "w") as f: f.write(header) for line in lines: f.write(line)"{write_path} is created.")
[docs]def split_word(sequences, space="_"): """Split word sequences into character sequences. Arguments --------- sequences : list Each item contains a list, and this list contains a words sequence. space : string The token represents space. Default: _ Returns ------- The list contains word sequences for each sentence. Example ------- >>> sequences = [['ab', 'c', 'de'], ['efg', 'hi']] >>> results = split_word(sequences) >>> results [['a', 'b', '_', 'c', '_', 'd', 'e'], ['e', 'f', 'g', '_', 'h', 'i']] """ results = [] for seq in sequences: chars = list(space.join(seq)) results.append(chars) return results