Source code for speechbrain.nnet.losses

"""
Losses for training neural networks.

Authors
 * Mirco Ravanelli 2020
 * Samuele Cornell 2020
 * Hwidong Na 2020
 * Yan Gao 2020
 * Titouan Parcollet 2020
"""

import math
import torch
import logging
import functools
import numpy as np
import torch.nn as nn
import torch.nn.functional as F
from itertools import permutations
from speechbrain.dataio.dataio import length_to_mask
from speechbrain.decoders.ctc import filter_ctc_output


logger = logging.getLogger(__name__)


[docs]def transducer_loss( logits, targets, input_lens, target_lens, blank_index, reduction="mean", use_torchaudio=True, ): """Transducer loss, see `speechbrain/nnet/loss/transducer_loss.py`. Arguments --------- logits : torch.Tensor Predicted tensor, of shape [batch, maxT, maxU, num_labels]. targets : torch.Tensor Target tensor, without any blanks, of shape [batch, target_len]. input_lens : torch.Tensor Length of each utterance. target_lens : torch.Tensor Length of each target sequence. blank_index : int The location of the blank symbol among the label indices. reduction : str Specifies the reduction to apply to the output: 'mean' | 'batchmean' | 'sum'. use_torchaudio: bool If True, use Transducer loss implementation from torchaudio, otherwise, use Speechbrain Numba implementation. """ input_lens = (input_lens * logits.shape[1]).round().int() target_lens = (target_lens * targets.shape[1]).round().int() if use_torchaudio: try: from torchaudio.functional import rnnt_loss except ImportError: err_msg = "The dependency torchaudio >= 0.10.0 is needed to use Transducer Loss\n" err_msg += "Cannot import torchaudio.functional.rnnt_loss.\n" err_msg += "To use it, please install torchaudio >= 0.10.0\n" err_msg += "==================\n" err_msg += "Otherwise, you can use our numba implementation, set `use_torchaudio=False`.\n" raise ImportError(err_msg) return rnnt_loss( logits, targets.int(), input_lens, target_lens, blank=blank_index, reduction=reduction, ) else: from speechbrain.nnet.loss.transducer_loss import Transducer # Transducer.apply function take log_probs tensor. log_probs = logits.log_softmax(-1) return Transducer.apply( log_probs, targets, input_lens, target_lens, blank_index, reduction, )
[docs]class PitWrapper(nn.Module): """ Permutation Invariant Wrapper to allow Permutation Invariant Training (PIT) with existing losses. Permutation invariance is calculated over the sources/classes axis which is assumed to be the rightmost dimension: predictions and targets tensors are assumed to have shape [batch, ..., channels, sources]. Arguments --------- base_loss : function Base loss function, e.g. torch.nn.MSELoss. It is assumed that it takes two arguments: predictions and targets and no reduction is performed. (if a pytorch loss is used, the user must specify reduction="none"). Returns --------- pit_loss : torch.nn.Module Torch module supporting forward method for PIT. Example ------- >>> pit_mse = PitWrapper(nn.MSELoss(reduction="none")) >>> targets = torch.rand((2, 32, 4)) >>> p = (3, 0, 2, 1) >>> predictions = targets[..., p] >>> loss, opt_p = pit_mse(predictions, targets) >>> loss tensor([0., 0.]) """ def __init__(self, base_loss): super(PitWrapper, self).__init__() self.base_loss = base_loss def _fast_pit(self, loss_mat): """ Arguments ---------- loss_mat : torch.Tensor Tensor of shape [sources, source] containing loss values for each possible permutation of predictions. Returns ------- loss : torch.Tensor Permutation invariant loss for the current batch, tensor of shape [1] assigned_perm : tuple Indexes for optimal permutation of the input over sources which minimizes the loss. """ loss = None assigned_perm = None for p in permutations(range(loss_mat.shape[0])): c_loss = loss_mat[range(loss_mat.shape[0]), p].mean() if loss is None or loss > c_loss: loss = c_loss assigned_perm = p return loss, assigned_perm def _opt_perm_loss(self, pred, target): """ Arguments --------- pred : torch.Tensor Network prediction for the current example, tensor of shape [..., sources]. target : torch.Tensor Target for the current example, tensor of shape [..., sources]. Returns ------- loss : torch.Tensor Permutation invariant loss for the current example, tensor of shape [1] assigned_perm : tuple Indexes for optimal permutation of the input over sources which minimizes the loss. """ n_sources = pred.size(-1) pred = pred.unsqueeze(-2).repeat( *[1 for x in range(len(pred.shape) - 1)], n_sources, 1 ) target = target.unsqueeze(-1).repeat( 1, *[1 for x in range(len(target.shape) - 1)], n_sources ) loss_mat = self.base_loss(pred, target) assert ( len(loss_mat.shape) >= 2 ), "Base loss should not perform any reduction operation" mean_over = [x for x in range(len(loss_mat.shape))] loss_mat = loss_mat.mean(dim=mean_over[:-2]) return self._fast_pit(loss_mat)
[docs] def reorder_tensor(self, tensor, p): """ Arguments --------- tensor : torch.Tensor Tensor to reorder given the optimal permutation, of shape [batch, ..., sources]. p : list of tuples List of optimal permutations, e.g. for batch=2 and n_sources=3 [(0, 1, 2), (0, 2, 1]. Returns ------- reordered : torch.Tensor Reordered tensor given permutation p. """ reordered = torch.zeros_like(tensor, device=tensor.device) for b in range(tensor.shape[0]): reordered[b] = tensor[b][..., p[b]].clone() return reordered
[docs] def forward(self, preds, targets): """ Arguments --------- preds : torch.Tensor Network predictions tensor, of shape [batch, channels, ..., sources]. targets : torch.Tensor Target tensor, of shape [batch, channels, ..., sources]. Returns ------- loss : torch.Tensor Permutation invariant loss for current examples, tensor of shape [batch] perms : list List of indexes for optimal permutation of the inputs over sources. e.g., [(0, 1, 2), (2, 1, 0)] for three sources and 2 examples per batch. """ losses = [] perms = [] for pred, label in zip(preds, targets): loss, p = self._opt_perm_loss(pred, label) perms.append(p) losses.append(loss) loss = torch.stack(losses) return loss, perms
[docs]def ctc_loss( log_probs, targets, input_lens, target_lens, blank_index, reduction="mean" ): """CTC loss. Arguments --------- predictions : torch.Tensor Predicted tensor, of shape [batch, time, chars]. targets : torch.Tensor Target tensor, without any blanks, of shape [batch, target_len] input_lens : torch.Tensor Length of each utterance. target_lens : torch.Tensor Length of each target sequence. blank_index : int The location of the blank symbol among the character indexes. reduction : str What reduction to apply to the output. 'mean', 'sum', 'batch', 'batchmean', 'none'. See pytorch for 'mean', 'sum', 'none'. The 'batch' option returns one loss per item in the batch, 'batchmean' returns sum / batch size. """ input_lens = (input_lens * log_probs.shape[1]).round().int() target_lens = (target_lens * targets.shape[1]).round().int() log_probs = log_probs.transpose(0, 1) if reduction == "batchmean": reduction_loss = "sum" elif reduction == "batch": reduction_loss = "none" else: reduction_loss = reduction loss = torch.nn.functional.ctc_loss( log_probs, targets, input_lens, target_lens, blank_index, zero_infinity=True, reduction=reduction_loss, ) if reduction == "batchmean": return loss / targets.shape[0] elif reduction == "batch": N = loss.size(0) return loss.view(N, -1).sum(1) / target_lens.view(N, -1).sum(1) else: return loss
[docs]def l1_loss( predictions, targets, length=None, allowed_len_diff=3, reduction="mean" ): """Compute the true l1 loss, accounting for length differences. Arguments --------- predictions : torch.Tensor Predicted tensor, of shape ``[batch, time, *]``. targets : torch.Tensor Target tensor with the same size as predicted tensor. length : torch.Tensor Length of each utterance for computing true error with a mask. allowed_len_diff : int Length difference that will be tolerated before raising an exception. reduction : str Options are 'mean', 'batch', 'batchmean', 'sum'. See pytorch for 'mean', 'sum'. The 'batch' option returns one loss per item in the batch, 'batchmean' returns sum / batch size. Example ------- >>> probs = torch.tensor([[0.9, 0.1, 0.1, 0.9]]) >>> l1_loss(probs, torch.tensor([[1., 0., 0., 1.]])) tensor(0.1000) """ predictions, targets = truncate(predictions, targets, allowed_len_diff) loss = functools.partial(torch.nn.functional.l1_loss, reduction="none") return compute_masked_loss( loss, predictions, targets, length, reduction=reduction )
[docs]def mse_loss( predictions, targets, length=None, allowed_len_diff=3, reduction="mean" ): """Compute the true mean squared error, accounting for length differences. Arguments --------- predictions : torch.Tensor Predicted tensor, of shape ``[batch, time, *]``. targets : torch.Tensor Target tensor with the same size as predicted tensor. length : torch.Tensor Length of each utterance for computing true error with a mask. allowed_len_diff : int Length difference that will be tolerated before raising an exception. reduction : str Options are 'mean', 'batch', 'batchmean', 'sum'. See pytorch for 'mean', 'sum'. The 'batch' option returns one loss per item in the batch, 'batchmean' returns sum / batch size. Example ------- >>> probs = torch.tensor([[0.9, 0.1, 0.1, 0.9]]) >>> mse_loss(probs, torch.tensor([[1., 0., 0., 1.]])) tensor(0.0100) """ predictions, targets = truncate(predictions, targets, allowed_len_diff) loss = functools.partial(torch.nn.functional.mse_loss, reduction="none") return compute_masked_loss( loss, predictions, targets, length, reduction=reduction )
[docs]def classification_error( probabilities, targets, length=None, allowed_len_diff=3, reduction="mean" ): """Computes the classification error at frame or batch level. Arguments --------- probabilities : torch.Tensor The posterior probabilities of shape [batch, prob] or [batch, frames, prob] targets : torch.Tensor The targets, of shape [batch] or [batch, frames] length : torch.Tensor Length of each utterance, if frame-level loss is desired. allowed_len_diff : int Length difference that will be tolerated before raising an exception. reduction : str Options are 'mean', 'batch', 'batchmean', 'sum'. See pytorch for 'mean', 'sum'. The 'batch' option returns one loss per item in the batch, 'batchmean' returns sum / batch size. Example ------- >>> probs = torch.tensor([[[0.9, 0.1], [0.1, 0.9]]]) >>> classification_error(probs, torch.tensor([1, 1])) tensor(0.5000) """ if len(probabilities.shape) == 3 and len(targets.shape) == 2: probabilities, targets = truncate( probabilities, targets, allowed_len_diff ) def error(predictions, targets): """Computes the classification error.""" predictions = torch.argmax(probabilities, dim=-1) return (predictions != targets).float() return compute_masked_loss( error, probabilities, targets.long(), length, reduction=reduction )
[docs]def nll_loss( log_probabilities, targets, length=None, label_smoothing=0.0, allowed_len_diff=3, reduction="mean", ): """Computes negative log likelihood loss. Arguments --------- log_probabilities : torch.Tensor The probabilities after log has been applied. Format is [batch, log_p] or [batch, frames, log_p]. targets : torch.Tensor The targets, of shape [batch] or [batch, frames]. length : torch.Tensor Length of each utterance, if frame-level loss is desired. allowed_len_diff : int Length difference that will be tolerated before raising an exception. reduction : str Options are 'mean', 'batch', 'batchmean', 'sum'. See pytorch for 'mean', 'sum'. The 'batch' option returns one loss per item in the batch, 'batchmean' returns sum / batch size. Example ------- >>> probs = torch.tensor([[0.9, 0.1], [0.1, 0.9]]) >>> nll_loss(torch.log(probs), torch.tensor([1, 1])) tensor(1.2040) """ if len(log_probabilities.shape) == 3: log_probabilities, targets = truncate( log_probabilities, targets, allowed_len_diff ) log_probabilities = log_probabilities.transpose(1, -1) # Pass the loss function but apply reduction="none" first loss = functools.partial(torch.nn.functional.nll_loss, reduction="none") return compute_masked_loss( loss, log_probabilities, targets.long(), length, label_smoothing=label_smoothing, reduction=reduction, )
[docs]def bce_loss( inputs, targets, length=None, weight=None, pos_weight=None, reduction="mean", allowed_len_diff=3, label_smoothing=0.0, ): """Computes binary cross-entropy (BCE) loss. It also applies the sigmoid function directly (this improves the numerical stability). Arguments --------- inputs : torch.Tensor The output before applying the final softmax Format is [batch[, 1]?] or [batch, frames[, 1]?]. (Works with or without a singleton dimension at the end). targets : torch.Tensor The targets, of shape [batch] or [batch, frames]. length : torch.Tensor Length of each utterance, if frame-level loss is desired. weight : torch.Tensor A manual rescaling weight if provided it’s repeated to match input tensor shape. pos_weight : torch.Tensor A weight of positive examples. Must be a vector with length equal to the number of classes. allowed_len_diff : int Length difference that will be tolerated before raising an exception. reduction: str Options are 'mean', 'batch', 'batchmean', 'sum'. See pytorch for 'mean', 'sum'. The 'batch' option returns one loss per item in the batch, 'batchmean' returns sum / batch size. Example ------- >>> inputs = torch.tensor([10.0, -6.0]) >>> targets = torch.tensor([1, 0]) >>> bce_loss(inputs, targets) tensor(0.0013) """ # Squeeze singleton dimension so inputs + targets match if len(inputs.shape) == len(targets.shape) + 1: inputs = inputs.squeeze(-1) # Make sure tensor lengths match if len(inputs.shape) >= 2: inputs, targets = truncate(inputs, targets, allowed_len_diff) elif length is not None: raise ValueError("length can be passed only for >= 2D inputs.") # Pass the loss function but apply reduction="none" first loss = functools.partial( torch.nn.functional.binary_cross_entropy_with_logits, weight=weight, pos_weight=pos_weight, reduction="none", ) return compute_masked_loss( loss, inputs, targets.float(), length, label_smoothing=label_smoothing, reduction=reduction, )
[docs]def kldiv_loss( log_probabilities, targets, length=None, label_smoothing=0.0, allowed_len_diff=3, pad_idx=0, reduction="mean", ): """Computes the KL-divergence error at the batch level. This loss applies label smoothing directly to the targets Arguments --------- probabilities : torch.Tensor The posterior probabilities of shape [batch, prob] or [batch, frames, prob]. targets : torch.Tensor The targets, of shape [batch] or [batch, frames]. length : torch.Tensor Length of each utterance, if frame-level loss is desired. allowed_len_diff : int Length difference that will be tolerated before raising an exception. reduction : str Options are 'mean', 'batch', 'batchmean', 'sum'. See pytorch for 'mean', 'sum'. The 'batch' option returns one loss per item in the batch, 'batchmean' returns sum / batch size. Example ------- >>> probs = torch.tensor([[0.9, 0.1], [0.1, 0.9]]) >>> kldiv_loss(torch.log(probs), torch.tensor([1, 1])) tensor(1.2040) """ if label_smoothing > 0: if log_probabilities.dim() == 2: log_probabilities = log_probabilities.unsqueeze(1) bz, time, n_class = log_probabilities.shape targets = targets.long().detach() confidence = 1 - label_smoothing log_probabilities = log_probabilities.view(-1, n_class) targets = targets.view(-1) with torch.no_grad(): true_distribution = log_probabilities.clone() true_distribution.fill_(label_smoothing / (n_class - 1)) ignore = targets == pad_idx targets = targets.masked_fill(ignore, 0) true_distribution.scatter_(1, targets.unsqueeze(1), confidence) loss = torch.nn.functional.kl_div( log_probabilities, true_distribution, reduction="none" ) loss = loss.masked_fill(ignore.unsqueeze(1), 0) # return loss according to reduction specified if reduction == "mean": return loss.sum().mean() elif reduction == "batchmean": return loss.sum() / bz elif reduction == "batch": return loss.view(bz, -1).sum(1) / length elif reduction == "sum": return loss.sum() else: return loss else: return nll_loss(log_probabilities, targets, length, reduction=reduction)
[docs]def truncate(predictions, targets, allowed_len_diff=3): """Ensure that predictions and targets are the same length. Arguments --------- predictions : torch.Tensor First tensor for checking length. targets : torch.Tensor Second tensor for checking length. allowed_len_diff : int Length difference that will be tolerated before raising an exception. """ len_diff = predictions.shape[1] - targets.shape[1] if len_diff == 0: return predictions, targets elif abs(len_diff) > allowed_len_diff: raise ValueError( "Predictions and targets should be same length, but got %s and " "%s respectively." % (predictions.shape[1], targets.shape[1]) ) elif len_diff < 0: return predictions, targets[:, : predictions.shape[1]] else: return predictions[:, : targets.shape[1]], targets
[docs]def compute_masked_loss( loss_fn, predictions, targets, length=None, label_smoothing=0.0, reduction="mean", ): """Compute the true average loss of a set of waveforms of unequal length. Arguments --------- loss_fn : function A function for computing the loss taking just predictions and targets. Should return all the losses, not a reduction (e.g. reduction="none"). predictions : torch.Tensor First argument to loss function. targets : torch.Tensor Second argument to loss function. length : torch.Tensor Length of each utterance to compute mask. If None, global average is computed and returned. label_smoothing: float The proportion of label smoothing. Should only be used for NLL loss. Ref: Regularizing Neural Networks by Penalizing Confident Output Distributions. https://arxiv.org/abs/1701.06548 reduction : str One of 'mean', 'batch', 'batchmean', 'none' where 'mean' returns a single value and 'batch' returns one per item in the batch and 'batchmean' is sum / batch_size and 'none' returns all. """ mask = torch.ones_like(targets) if length is not None: length_mask = length_to_mask( length * targets.shape[1], max_len=targets.shape[1], ) # Handle any dimensionality of input while len(length_mask.shape) < len(mask.shape): length_mask = length_mask.unsqueeze(-1) length_mask = length_mask.type(mask.dtype) mask *= length_mask # Compute, then reduce loss loss = loss_fn(predictions, targets) * mask N = loss.size(0) if reduction == "mean": loss = loss.sum() / torch.sum(mask) elif reduction == "batchmean": loss = loss.sum() / N elif reduction == "batch": loss = loss.reshape(N, -1).sum(1) / mask.reshape(N, -1).sum(1) if label_smoothing == 0: return loss else: loss_reg = torch.mean(predictions, dim=1) * mask if reduction == "mean": loss_reg = torch.sum(loss_reg) / torch.sum(mask) elif reduction == "batchmean": loss_reg = torch.sum(loss_reg) / targets.shape[0] elif reduction == "batch": loss_reg = loss_reg.sum(1) / mask.sum(1) return -label_smoothing * loss_reg + (1 - label_smoothing) * loss
[docs]def get_si_snr_with_pitwrapper(source, estimate_source): """This function wraps si_snr calculation with the speechbrain pit-wrapper. Arguments: --------- source: [B, T, C], Where B is the batch size, T is the length of the sources, C is the number of sources the ordering is made so that this loss is compatible with the class PitWrapper. estimate_source: [B, T, C] The estimated source. Example: --------- >>> x = torch.arange(600).reshape(3, 100, 2) >>> xhat = x[:, :, (1, 0)] >>> si_snr = -get_si_snr_with_pitwrapper(x, xhat) >>> print(si_snr) tensor([135.2284, 135.2284, 135.2284]) """ pit_si_snr = PitWrapper(cal_si_snr) loss, perms = pit_si_snr(source, estimate_source) return loss
[docs]def get_snr_with_pitwrapper(source, estimate_source): """This function wraps si_snr calculation with the speechbrain pit-wrapper. Arguments: --------- source: [B, T, E, C], Where B is the batch size, T is the length of the sources, E is binaural channels, C is the number of sources the ordering is made so that this loss is compatible with the class PitWrapper. estimate_source: [B, T, E, C] The estimated source. """ pit_snr = PitWrapper(cal_snr) loss, perms = pit_snr(source, estimate_source) return loss
[docs]def cal_si_snr(source, estimate_source): """Calculate SI-SNR. Arguments: --------- source: [T, B, C], Where B is batch size, T is the length of the sources, C is the number of sources the ordering is made so that this loss is compatible with the class PitWrapper. estimate_source: [T, B, C] The estimated source. Example: --------- >>> import numpy as np >>> x = torch.Tensor([[1, 0], [123, 45], [34, 5], [2312, 421]]) >>> xhat = x[:, (1, 0)] >>> x = x.unsqueeze(-1).repeat(1, 1, 2) >>> xhat = xhat.unsqueeze(1).repeat(1, 2, 1) >>> si_snr = -cal_si_snr(x, xhat) >>> print(si_snr) tensor([[[ 25.2142, 144.1789], [130.9283, 25.2142]]]) """ EPS = 1e-8 assert source.size() == estimate_source.size() device = estimate_source.device.type source_lengths = torch.tensor( [estimate_source.shape[0]] * estimate_source.shape[-2], device=device ) mask = get_mask(source, source_lengths) estimate_source *= mask num_samples = ( source_lengths.contiguous().reshape(1, -1, 1).float() ) # [1, B, 1] mean_target = torch.sum(source, dim=0, keepdim=True) / num_samples mean_estimate = ( torch.sum(estimate_source, dim=0, keepdim=True) / num_samples ) zero_mean_target = source - mean_target zero_mean_estimate = estimate_source - mean_estimate # mask padding position along T zero_mean_target *= mask zero_mean_estimate *= mask # Step 2. SI-SNR with PIT # reshape to use broadcast s_target = zero_mean_target # [T, B, C] s_estimate = zero_mean_estimate # [T, B, C] # s_target = <s', s>s / ||s||^2 dot = torch.sum(s_estimate * s_target, dim=0, keepdim=True) # [1, B, C] s_target_energy = ( torch.sum(s_target ** 2, dim=0, keepdim=True) + EPS ) # [1, B, C] proj = dot * s_target / s_target_energy # [T, B, C] # e_noise = s' - s_target e_noise = s_estimate - proj # [T, B, C] # SI-SNR = 10 * log_10(||s_target||^2 / ||e_noise||^2) si_snr_beforelog = torch.sum(proj ** 2, dim=0) / ( torch.sum(e_noise ** 2, dim=0) + EPS ) si_snr = 10 * torch.log10(si_snr_beforelog + EPS) # [B, C] return -si_snr.unsqueeze(0)
[docs]def cal_snr(source, estimate_source): """Calculate binaural channel SNR. Arguments: --------- source: [T, E, B, C], Where B is batch size, T is the length of the sources, E is binaural channels, C is the number of sources the ordering is made so that this loss is compatible with the class PitWrapper. estimate_source: [T, E, B, C] The estimated source. """ EPS = 1e-8 assert source.size() == estimate_source.size() device = estimate_source.device.type source_lengths = torch.tensor( [estimate_source.shape[0]] * estimate_source.shape[-2], device=device ) mask = get_mask(source, source_lengths) # [T, E, 1] estimate_source *= mask num_samples = ( source_lengths.contiguous().reshape(1, -1, 1).float() ) # [1, B, 1] mean_target = torch.sum(source, dim=0, keepdim=True) / num_samples mean_estimate = ( torch.sum(estimate_source, dim=0, keepdim=True) / num_samples ) zero_mean_target = source - mean_target zero_mean_estimate = estimate_source - mean_estimate # mask padding position along T zero_mean_target *= mask zero_mean_estimate *= mask # Step 2. SNR with PIT # reshape to use broadcast s_target = zero_mean_target # [T, E, B, C] s_estimate = zero_mean_estimate # [T, E, B, C] # SNR = 10 * log_10(||s_target||^2 / ||e_noise||^2) # n_dim = [x for x in range(len(s_target.shape)-2)] snr_beforelog = torch.sum(s_target ** 2, dim=0) / ( torch.sum((s_estimate - s_target) ** 2, dim=0) + EPS ) snr = 10 * torch.log10(snr_beforelog + EPS) # [B, C] return -snr.unsqueeze(0)
[docs]def get_mask(source, source_lengths): """ Arguments --------- source : [T, B, C] source_lengths : [B] Returns ------- mask : [T, B, 1] Example: --------- >>> source = torch.randn(4, 3, 2) >>> source_lengths = torch.Tensor([2, 1, 4]).int() >>> mask = get_mask(source, source_lengths) >>> print(mask) tensor([[[1.], [1.], [1.]], <BLANKLINE> [[1.], [0.], [1.]], <BLANKLINE> [[0.], [0.], [1.]], <BLANKLINE> [[0.], [0.], [1.]]]) """ mask = source.new_ones(source.size()[:-1]).unsqueeze(-1).transpose(1, -2) B = source.size(-2) for i in range(B): mask[source_lengths[i] :, i] = 0 return mask.transpose(-2, 1)
[docs]class AngularMargin(nn.Module): """ An implementation of Angular Margin (AM) proposed in the following paper: '''Margin Matters: Towards More Discriminative Deep Neural Network Embeddings for Speaker Recognition''' (https://arxiv.org/abs/1906.07317) Arguments --------- margin : float The margin for cosine similiarity scale : float The scale for cosine similiarity Return --------- predictions : torch.Tensor Example ------- >>> pred = AngularMargin() >>> outputs = torch.tensor([ [1., -1.], [-1., 1.], [0.9, 0.1], [0.1, 0.9] ]) >>> targets = torch.tensor([ [1., 0.], [0., 1.], [ 1., 0.], [0., 1.] ]) >>> predictions = pred(outputs, targets) >>> predictions[:,0] > predictions[:,1] tensor([ True, False, True, False]) """ def __init__(self, margin=0.0, scale=1.0): super(AngularMargin, self).__init__() self.margin = margin self.scale = scale
[docs] def forward(self, outputs, targets): """Compute AM between two tensors Arguments --------- outputs : torch.Tensor The outputs of shape [N, C], cosine similarity is required. targets : torch.Tensor The targets of shape [N, C], where the margin is applied for. Return --------- predictions : torch.Tensor """ outputs = outputs - self.margin * targets return self.scale * outputs
[docs]class AdditiveAngularMargin(AngularMargin): """ An implementation of Additive Angular Margin (AAM) proposed in the following paper: '''Margin Matters: Towards More Discriminative Deep Neural Network Embeddings for Speaker Recognition''' (https://arxiv.org/abs/1906.07317) Arguments --------- margin : float The margin for cosine similiarity. scale: float The scale for cosine similiarity. Returns ------- predictions : torch.Tensor Tensor. Example ------- >>> outputs = torch.tensor([ [1., -1.], [-1., 1.], [0.9, 0.1], [0.1, 0.9] ]) >>> targets = torch.tensor([ [1., 0.], [0., 1.], [ 1., 0.], [0., 1.] ]) >>> pred = AdditiveAngularMargin() >>> predictions = pred(outputs, targets) >>> predictions[:,0] > predictions[:,1] tensor([ True, False, True, False]) """ def __init__(self, margin=0.0, scale=1.0, easy_margin=False): super(AdditiveAngularMargin, self).__init__(margin, scale) self.easy_margin = easy_margin self.cos_m = math.cos(self.margin) self.sin_m = math.sin(self.margin) self.th = math.cos(math.pi - self.margin) self.mm = math.sin(math.pi - self.margin) * self.margin
[docs] def forward(self, outputs, targets): """ Compute AAM between two tensors Arguments --------- outputs : torch.Tensor The outputs of shape [N, C], cosine similarity is required. targets : torch.Tensor The targets of shape [N, C], where the margin is applied for. Return --------- predictions : torch.Tensor """ cosine = outputs.float() cosine = torch.clamp(cosine, -1 + 1e-7, 1 - 1e-7) sine = torch.sqrt(1.0 - torch.pow(cosine, 2)) phi = cosine * self.cos_m - sine * self.sin_m # cos(theta + m) if self.easy_margin: phi = torch.where(cosine > 0, phi, cosine) else: phi = torch.where(cosine > self.th, phi, cosine - self.mm) outputs = (targets * phi) + ((1.0 - targets) * cosine) return self.scale * outputs
[docs]class LogSoftmaxWrapper(nn.Module): """ Arguments --------- Returns --------- loss : torch.Tensor Learning loss predictions : torch.Tensor Log probabilities Example ------- >>> outputs = torch.tensor([ [1., -1.], [-1., 1.], [0.9, 0.1], [0.1, 0.9] ]) >>> outputs = outputs.unsqueeze(1) >>> targets = torch.tensor([ [0], [1], [0], [1] ]) >>> log_prob = LogSoftmaxWrapper(nn.Identity()) >>> loss = log_prob(outputs, targets) >>> 0 <= loss < 1 tensor(True) >>> log_prob = LogSoftmaxWrapper(AngularMargin(margin=0.2, scale=32)) >>> loss = log_prob(outputs, targets) >>> 0 <= loss < 1 tensor(True) >>> outputs = torch.tensor([ [1., -1.], [-1., 1.], [0.9, 0.1], [0.1, 0.9] ]) >>> log_prob = LogSoftmaxWrapper(AdditiveAngularMargin(margin=0.3, scale=32)) >>> loss = log_prob(outputs, targets) >>> 0 <= loss < 1 tensor(True) """ def __init__(self, loss_fn): super(LogSoftmaxWrapper, self).__init__() self.loss_fn = loss_fn self.criterion = torch.nn.KLDivLoss(reduction="sum")
[docs] def forward(self, outputs, targets, length=None): """ Arguments --------- outputs : torch.Tensor Network output tensor, of shape [batch, 1, outdim]. targets : torch.Tensor Target tensor, of shape [batch, 1]. Returns ------- loss: torch.Tensor Loss for current examples. """ outputs = outputs.squeeze(1) targets = targets.squeeze(1) targets = F.one_hot(targets.long(), outputs.shape[1]).float() try: predictions = self.loss_fn(outputs, targets) except TypeError: predictions = self.loss_fn(outputs) predictions = F.log_softmax(predictions, dim=1) loss = self.criterion(predictions, targets) / targets.sum() return loss
[docs]def ctc_loss_kd(log_probs, targets, input_lens, blank_index, device): """Knowledge distillation for CTC loss. Reference --------- Distilling Knowledge from Ensembles of Acoustic Models for Joint CTC-Attention End-to-End Speech Recognition. https://arxiv.org/abs/2005.09310 Arguments --------- log_probs : torch.Tensor Predicted tensor from student model, of shape [batch, time, chars]. targets : torch.Tensor Predicted tensor from single teacher model, of shape [batch, time, chars]. input_lens : torch.Tensor Length of each utterance. blank_index : int The location of the blank symbol among the character indexes. device : str Device for computing. """ scores, predictions = torch.max(targets, dim=-1) pred_list = [] pred_len_list = [] for j in range(predictions.shape[0]): # Getting current predictions current_pred = predictions[j] actual_size = (input_lens[j] * log_probs.shape[1]).round().int() current_pred = current_pred[0:actual_size] current_pred = filter_ctc_output( list(current_pred.cpu().numpy()), blank_id=blank_index ) current_pred_len = len(current_pred) pred_list.append(current_pred) pred_len_list.append(current_pred_len) max_pred_len = max(pred_len_list) for j in range(predictions.shape[0]): diff = max_pred_len - pred_len_list[j] for n in range(diff): pred_list[j].append(0) # generate soft label of teacher model fake_lab = torch.from_numpy(np.array(pred_list)) fake_lab.to(device) fake_lab = fake_lab.int() fake_lab_lengths = torch.from_numpy(np.array(pred_len_list)).int() fake_lab_lengths.to(device) input_lens = (input_lens * log_probs.shape[1]).round().int() log_probs = log_probs.transpose(0, 1) return torch.nn.functional.ctc_loss( log_probs, fake_lab, input_lens, fake_lab_lengths, blank_index, zero_infinity=True, )
[docs]def ce_kd(inp, target): """Simple version of distillation for cross-entropy loss. Arguments --------- inp : torch.Tensor The probabilities from student model, of shape [batch_size * length, feature] target : torch.Tensor The probabilities from teacher model, of shape [batch_size * length, feature] """ return (-target * inp).sum(1)
[docs]def nll_loss_kd( probabilities, targets, rel_lab_lengths, ): """Knowledge distillation for negative log-likelihood loss. Reference --------- Distilling Knowledge from Ensembles of Acoustic Models for Joint CTC-Attention End-to-End Speech Recognition. https://arxiv.org/abs/2005.09310 Arguments --------- probabilities : torch.Tensor The predicted probabilities from the student model. Format is [batch, frames, p] targets : torch.Tensor The target probabilities from the teacher model. Format is [batch, frames, p] rel_lab_lengths : torch.Tensor Length of each utterance, if the frame-level loss is desired. Example ------- >>> probabilities = torch.tensor([[[0.8, 0.2], [0.2, 0.8]]]) >>> targets = torch.tensor([[[0.9, 0.1], [0.1, 0.9]]]) >>> rel_lab_lengths = torch.tensor([1.]) >>> nll_loss_kd(probabilities, targets, rel_lab_lengths) tensor(-0.7400) """ # Getting the number of sentences in the minibatch N_snt = probabilities.shape[0] # Getting the maximum length of label sequence max_len = probabilities.shape[1] # Getting the label lengths lab_lengths = torch.round(rel_lab_lengths * targets.shape[1]).int() # Reshape to [batch_size * length, feature] prob_curr = probabilities.reshape(N_snt * max_len, probabilities.shape[-1]) # Generating mask mask = length_to_mask( lab_lengths, max_len=max_len, dtype=torch.float, device=prob_curr.device ) # Reshape to [batch_size * length, feature] lab_curr = targets.reshape(N_snt * max_len, targets.shape[-1]) loss = ce_kd(prob_curr, lab_curr) # Loss averaging loss = torch.sum(loss.reshape(N_snt, max_len) * mask) / torch.sum(mask) return loss