Add files via upload

This commit is contained in:
Sam Khoze
2024-06-18 13:18:37 -07:00
committed by GitHub
parent 9e69cc24ea
commit 6af40bc6cf
74 changed files with 8332 additions and 0 deletions
+18
View File
@@ -0,0 +1,18 @@
### Speaker Encoder
This is an implementation of https://arxiv.org/abs/1710.10467. This model can be used for voice and speaker embedding.
With the code here you can generate d-vectors for both multi-speaker and single-speaker TTS datasets, then visualise and explore them along with the associated audio files in an interactive chart.
Below is an example showing embedding results of various speakers. You can generate the same plot with the provided notebook as demonstrated in [this video](https://youtu.be/KW3oO7JVa7Q).
![](umap.png)
Download a pretrained model from [Released Models](https://github.com/mozilla/TTS/wiki/Released-Models) page.
To run the code, you need to follow the same flow as in TTS.
- Define 'config.json' for your needs. Note that, audio parameters should match your TTS model.
- Example training call ```python speaker_encoder/train.py --config_path speaker_encoder/config.json --data_path ~/Data/Libri-TTS/train-clean-360```
- Generate embedding vectors ```python speaker_encoder/compute_embeddings.py --use_cuda true /model/path/best_model.pth model/config/path/config.json dataset/path/ output_path``` . This code parses all .wav files at the given dataset path and generates the same folder structure under the output path with the generated embedding files.
- Watch training on Tensorboard as in TTS
View File
Binary file not shown.
Binary file not shown.
@@ -0,0 +1,61 @@
from dataclasses import asdict, dataclass, field
from typing import Dict, List
from coqpit import MISSING
from TTS.config.shared_configs import BaseAudioConfig, BaseDatasetConfig, BaseTrainingConfig
@dataclass
class BaseEncoderConfig(BaseTrainingConfig):
"""Defines parameters for a Generic Encoder model."""
model: str = None
audio: BaseAudioConfig = field(default_factory=BaseAudioConfig)
datasets: List[BaseDatasetConfig] = field(default_factory=lambda: [BaseDatasetConfig()])
# model params
model_params: Dict = field(
default_factory=lambda: {
"model_name": "lstm",
"input_dim": 80,
"proj_dim": 256,
"lstm_dim": 768,
"num_lstm_layers": 3,
"use_lstm_with_projection": True,
}
)
audio_augmentation: Dict = field(default_factory=lambda: {})
# training params
epochs: int = 10000
loss: str = "angleproto"
grad_clip: float = 3.0
lr: float = 0.0001
optimizer: str = "radam"
optimizer_params: Dict = field(default_factory=lambda: {"betas": [0.9, 0.999], "weight_decay": 0})
lr_decay: bool = False
warmup_steps: int = 4000
# logging params
tb_model_param_stats: bool = False
steps_plot_stats: int = 10
save_step: int = 1000
print_step: int = 20
run_eval: bool = False
# data loader
num_classes_in_batch: int = MISSING
num_utter_per_class: int = MISSING
eval_num_classes_in_batch: int = None
eval_num_utter_per_class: int = None
num_loader_workers: int = MISSING
voice_len: float = 1.6
def check_values(self):
super().check_values()
c = asdict(self)
assert (
c["model_params"]["input_dim"] == self.audio.num_mels
), " [!] model input dimendion must be equal to melspectrogram dimension."
@@ -0,0 +1,12 @@
from dataclasses import asdict, dataclass
from TTS.encoder.configs.base_encoder_config import BaseEncoderConfig
@dataclass
class EmotionEncoderConfig(BaseEncoderConfig):
"""Defines parameters for Emotion Encoder model."""
model: str = "emotion_encoder"
map_classid_to_classname: dict = None
class_name_key: str = "emotion_name"
@@ -0,0 +1,11 @@
from dataclasses import asdict, dataclass
from TTS.encoder.configs.base_encoder_config import BaseEncoderConfig
@dataclass
class SpeakerEncoderConfig(BaseEncoderConfig):
"""Defines parameters for Speaker Encoder model."""
model: str = "speaker_encoder"
class_name_key: str = "speaker_name"
+147
View File
@@ -0,0 +1,147 @@
import random
import torch
from torch.utils.data import Dataset
from TTS.encoder.utils.generic_utils import AugmentWAV
class EncoderDataset(Dataset):
def __init__(
self,
config,
ap,
meta_data,
voice_len=1.6,
num_classes_in_batch=64,
num_utter_per_class=10,
verbose=False,
augmentation_config=None,
use_torch_spec=None,
):
"""
Args:
ap (TTS.tts.utils.AudioProcessor): audio processor object.
meta_data (list): list of dataset instances.
seq_len (int): voice segment length in seconds.
verbose (bool): print diagnostic information.
"""
super().__init__()
self.config = config
self.items = meta_data
self.sample_rate = ap.sample_rate
self.seq_len = int(voice_len * self.sample_rate)
self.num_utter_per_class = num_utter_per_class
self.ap = ap
self.verbose = verbose
self.use_torch_spec = use_torch_spec
self.classes, self.items = self.__parse_items()
self.classname_to_classid = {key: i for i, key in enumerate(self.classes)}
# Data Augmentation
self.augmentator = None
self.gaussian_augmentation_config = None
if augmentation_config:
self.data_augmentation_p = augmentation_config["p"]
if self.data_augmentation_p and ("additive" in augmentation_config or "rir" in augmentation_config):
self.augmentator = AugmentWAV(ap, augmentation_config)
if "gaussian" in augmentation_config.keys():
self.gaussian_augmentation_config = augmentation_config["gaussian"]
if self.verbose:
print("\n > DataLoader initialization")
print(f" | > Classes per Batch: {num_classes_in_batch}")
print(f" | > Number of instances : {len(self.items)}")
print(f" | > Sequence length: {self.seq_len}")
print(f" | > Num Classes: {len(self.classes)}")
print(f" | > Classes: {self.classes}")
def load_wav(self, filename):
audio = self.ap.load_wav(filename, sr=self.ap.sample_rate)
return audio
def __parse_items(self):
class_to_utters = {}
for item in self.items:
path_ = item["audio_file"]
class_name = item[self.config.class_name_key]
if class_name in class_to_utters.keys():
class_to_utters[class_name].append(path_)
else:
class_to_utters[class_name] = [
path_,
]
# skip classes with number of samples >= self.num_utter_per_class
class_to_utters = {k: v for (k, v) in class_to_utters.items() if len(v) >= self.num_utter_per_class}
classes = list(class_to_utters.keys())
classes.sort()
new_items = []
for item in self.items:
path_ = item["audio_file"]
class_name = item["emotion_name"] if self.config.model == "emotion_encoder" else item["speaker_name"]
# ignore filtered classes
if class_name not in classes:
continue
# ignore small audios
if self.load_wav(path_).shape[0] - self.seq_len <= 0:
continue
new_items.append({"wav_file_path": path_, "class_name": class_name})
return classes, new_items
def __len__(self):
return len(self.items)
def get_num_classes(self):
return len(self.classes)
def get_class_list(self):
return self.classes
def set_classes(self, classes):
self.classes = classes
self.classname_to_classid = {key: i for i, key in enumerate(self.classes)}
def get_map_classid_to_classname(self):
return dict((c_id, c_n) for c_n, c_id in self.classname_to_classid.items())
def __getitem__(self, idx):
return self.items[idx]
def collate_fn(self, batch):
# get the batch class_ids
labels = []
feats = []
for item in batch:
utter_path = item["wav_file_path"]
class_name = item["class_name"]
# get classid
class_id = self.classname_to_classid[class_name]
# load wav file
wav = self.load_wav(utter_path)
offset = random.randint(0, wav.shape[0] - self.seq_len)
wav = wav[offset : offset + self.seq_len]
if self.augmentator is not None and self.data_augmentation_p:
if random.random() < self.data_augmentation_p:
wav = self.augmentator.apply_one(wav)
if not self.use_torch_spec:
mel = self.ap.melspectrogram(wav)
feats.append(torch.FloatTensor(mel))
else:
feats.append(torch.FloatTensor(wav))
labels.append(class_id)
feats = torch.stack(feats)
labels = torch.LongTensor(labels)
return feats, labels
+226
View File
@@ -0,0 +1,226 @@
import torch
import torch.nn.functional as F
from torch import nn
# adapted from https://github.com/cvqluu/GE2E-Loss
class GE2ELoss(nn.Module):
def __init__(self, init_w=10.0, init_b=-5.0, loss_method="softmax"):
"""
Implementation of the Generalized End-to-End loss defined in https://arxiv.org/abs/1710.10467 [1]
Accepts an input of size (N, M, D)
where N is the number of speakers in the batch,
M is the number of utterances per speaker,
and D is the dimensionality of the embedding vector (e.g. d-vector)
Args:
- init_w (float): defines the initial value of w in Equation (5) of [1]
- init_b (float): definies the initial value of b in Equation (5) of [1]
"""
super().__init__()
# pylint: disable=E1102
self.w = nn.Parameter(torch.tensor(init_w))
# pylint: disable=E1102
self.b = nn.Parameter(torch.tensor(init_b))
self.loss_method = loss_method
print(" > Initialized Generalized End-to-End loss")
assert self.loss_method in ["softmax", "contrast"]
if self.loss_method == "softmax":
self.embed_loss = self.embed_loss_softmax
if self.loss_method == "contrast":
self.embed_loss = self.embed_loss_contrast
# pylint: disable=R0201
def calc_new_centroids(self, dvecs, centroids, spkr, utt):
"""
Calculates the new centroids excluding the reference utterance
"""
excl = torch.cat((dvecs[spkr, :utt], dvecs[spkr, utt + 1 :]))
excl = torch.mean(excl, 0)
new_centroids = []
for i, centroid in enumerate(centroids):
if i == spkr:
new_centroids.append(excl)
else:
new_centroids.append(centroid)
return torch.stack(new_centroids)
def calc_cosine_sim(self, dvecs, centroids):
"""
Make the cosine similarity matrix with dims (N,M,N)
"""
cos_sim_matrix = []
for spkr_idx, speaker in enumerate(dvecs):
cs_row = []
for utt_idx, utterance in enumerate(speaker):
new_centroids = self.calc_new_centroids(dvecs, centroids, spkr_idx, utt_idx)
# vector based cosine similarity for speed
cs_row.append(
torch.clamp(
torch.mm(
utterance.unsqueeze(1).transpose(0, 1),
new_centroids.transpose(0, 1),
)
/ (torch.norm(utterance) * torch.norm(new_centroids, dim=1)),
1e-6,
)
)
cs_row = torch.cat(cs_row, dim=0)
cos_sim_matrix.append(cs_row)
return torch.stack(cos_sim_matrix)
# pylint: disable=R0201
def embed_loss_softmax(self, dvecs, cos_sim_matrix):
"""
Calculates the loss on each embedding $L(e_{ji})$ by taking softmax
"""
N, M, _ = dvecs.shape
L = []
for j in range(N):
L_row = []
for i in range(M):
L_row.append(-F.log_softmax(cos_sim_matrix[j, i], 0)[j])
L_row = torch.stack(L_row)
L.append(L_row)
return torch.stack(L)
# pylint: disable=R0201
def embed_loss_contrast(self, dvecs, cos_sim_matrix):
"""
Calculates the loss on each embedding $L(e_{ji})$ by contrast loss with closest centroid
"""
N, M, _ = dvecs.shape
L = []
for j in range(N):
L_row = []
for i in range(M):
centroids_sigmoids = torch.sigmoid(cos_sim_matrix[j, i])
excl_centroids_sigmoids = torch.cat((centroids_sigmoids[:j], centroids_sigmoids[j + 1 :]))
L_row.append(1.0 - torch.sigmoid(cos_sim_matrix[j, i, j]) + torch.max(excl_centroids_sigmoids))
L_row = torch.stack(L_row)
L.append(L_row)
return torch.stack(L)
def forward(self, x, _label=None):
"""
Calculates the GE2E loss for an input of dimensions (num_speakers, num_utts_per_speaker, dvec_feats)
"""
assert x.size()[1] >= 2
centroids = torch.mean(x, 1)
cos_sim_matrix = self.calc_cosine_sim(x, centroids)
torch.clamp(self.w, 1e-6)
cos_sim_matrix = self.w * cos_sim_matrix + self.b
L = self.embed_loss(x, cos_sim_matrix)
return L.mean()
# adapted from https://github.com/clovaai/voxceleb_trainer/blob/master/loss/angleproto.py
class AngleProtoLoss(nn.Module):
"""
Implementation of the Angular Prototypical loss defined in https://arxiv.org/abs/2003.11982
Accepts an input of size (N, M, D)
where N is the number of speakers in the batch,
M is the number of utterances per speaker,
and D is the dimensionality of the embedding vector
Args:
- init_w (float): defines the initial value of w
- init_b (float): definies the initial value of b
"""
def __init__(self, init_w=10.0, init_b=-5.0):
super().__init__()
# pylint: disable=E1102
self.w = nn.Parameter(torch.tensor(init_w))
# pylint: disable=E1102
self.b = nn.Parameter(torch.tensor(init_b))
self.criterion = torch.nn.CrossEntropyLoss()
print(" > Initialized Angular Prototypical loss")
def forward(self, x, _label=None):
"""
Calculates the AngleProto loss for an input of dimensions (num_speakers, num_utts_per_speaker, dvec_feats)
"""
assert x.size()[1] >= 2
out_anchor = torch.mean(x[:, 1:, :], 1)
out_positive = x[:, 0, :]
num_speakers = out_anchor.size()[0]
cos_sim_matrix = F.cosine_similarity(
out_positive.unsqueeze(-1).expand(-1, -1, num_speakers),
out_anchor.unsqueeze(-1).expand(-1, -1, num_speakers).transpose(0, 2),
)
torch.clamp(self.w, 1e-6)
cos_sim_matrix = cos_sim_matrix * self.w + self.b
label = torch.arange(num_speakers).to(cos_sim_matrix.device)
L = self.criterion(cos_sim_matrix, label)
return L
class SoftmaxLoss(nn.Module):
"""
Implementation of the Softmax loss as defined in https://arxiv.org/abs/2003.11982
Args:
- embedding_dim (float): speaker embedding dim
- n_speakers (float): number of speakers
"""
def __init__(self, embedding_dim, n_speakers):
super().__init__()
self.criterion = torch.nn.CrossEntropyLoss()
self.fc = nn.Linear(embedding_dim, n_speakers)
print("Initialised Softmax Loss")
def forward(self, x, label=None):
# reshape for compatibility
x = x.reshape(-1, x.size()[-1])
label = label.reshape(-1)
x = self.fc(x)
L = self.criterion(x, label)
return L
def inference(self, embedding):
x = self.fc(embedding)
activations = torch.nn.functional.softmax(x, dim=1).squeeze(0)
class_id = torch.argmax(activations)
return class_id
class SoftmaxAngleProtoLoss(nn.Module):
"""
Implementation of the Softmax AnglePrototypical loss as defined in https://arxiv.org/abs/2009.14153
Args:
- embedding_dim (float): speaker embedding dim
- n_speakers (float): number of speakers
- init_w (float): defines the initial value of w
- init_b (float): definies the initial value of b
"""
def __init__(self, embedding_dim, n_speakers, init_w=10.0, init_b=-5.0):
super().__init__()
self.softmax = SoftmaxLoss(embedding_dim, n_speakers)
self.angleproto = AngleProtoLoss(init_w, init_b)
print("Initialised SoftmaxAnglePrototypical Loss")
def forward(self, x, label=None):
"""
Calculates the SoftmaxAnglePrototypical loss for an input of dimensions (num_speakers, num_utts_per_speaker, dvec_feats)
"""
Lp = self.angleproto(x)
Ls = self.softmax(x, label)
return Ls + Lp
Binary file not shown.
+161
View File
@@ -0,0 +1,161 @@
import numpy as np
import torch
import torchaudio
from coqpit import Coqpit
from torch import nn
from TTS.encoder.losses import AngleProtoLoss, GE2ELoss, SoftmaxAngleProtoLoss
from TTS.utils.generic_utils import set_init_dict
from TTS.utils.io import load_fsspec
class PreEmphasis(nn.Module):
def __init__(self, coefficient=0.97):
super().__init__()
self.coefficient = coefficient
self.register_buffer("filter", torch.FloatTensor([-self.coefficient, 1.0]).unsqueeze(0).unsqueeze(0))
def forward(self, x):
assert len(x.size()) == 2
x = torch.nn.functional.pad(x.unsqueeze(1), (1, 0), "reflect")
return torch.nn.functional.conv1d(x, self.filter).squeeze(1)
class BaseEncoder(nn.Module):
"""Base `encoder` class. Every new `encoder` model must inherit this.
It defines common `encoder` specific functions.
"""
# pylint: disable=W0102
def __init__(self):
super(BaseEncoder, self).__init__()
def get_torch_mel_spectrogram_class(self, audio_config):
return torch.nn.Sequential(
PreEmphasis(audio_config["preemphasis"]),
# TorchSTFT(
# n_fft=audio_config["fft_size"],
# hop_length=audio_config["hop_length"],
# win_length=audio_config["win_length"],
# sample_rate=audio_config["sample_rate"],
# window="hamming_window",
# mel_fmin=0.0,
# mel_fmax=None,
# use_htk=True,
# do_amp_to_db=False,
# n_mels=audio_config["num_mels"],
# power=2.0,
# use_mel=True,
# mel_norm=None,
# )
torchaudio.transforms.MelSpectrogram(
sample_rate=audio_config["sample_rate"],
n_fft=audio_config["fft_size"],
win_length=audio_config["win_length"],
hop_length=audio_config["hop_length"],
window_fn=torch.hamming_window,
n_mels=audio_config["num_mels"],
),
)
@torch.no_grad()
def inference(self, x, l2_norm=True):
return self.forward(x, l2_norm)
@torch.no_grad()
def compute_embedding(self, x, num_frames=250, num_eval=10, return_mean=True, l2_norm=True):
"""
Generate embeddings for a batch of utterances
x: 1xTxD
"""
# map to the waveform size
if self.use_torch_spec:
num_frames = num_frames * self.audio_config["hop_length"]
max_len = x.shape[1]
if max_len < num_frames:
num_frames = max_len
offsets = np.linspace(0, max_len - num_frames, num=num_eval)
frames_batch = []
for offset in offsets:
offset = int(offset)
end_offset = int(offset + num_frames)
frames = x[:, offset:end_offset]
frames_batch.append(frames)
frames_batch = torch.cat(frames_batch, dim=0)
embeddings = self.inference(frames_batch, l2_norm=l2_norm)
if return_mean:
embeddings = torch.mean(embeddings, dim=0, keepdim=True)
return embeddings
def get_criterion(self, c: Coqpit, num_classes=None):
if c.loss == "ge2e":
criterion = GE2ELoss(loss_method="softmax")
elif c.loss == "angleproto":
criterion = AngleProtoLoss()
elif c.loss == "softmaxproto":
criterion = SoftmaxAngleProtoLoss(c.model_params["proj_dim"], num_classes)
else:
raise Exception("The %s not is a loss supported" % c.loss)
return criterion
def load_checkpoint(
self,
config: Coqpit,
checkpoint_path: str,
eval: bool = False,
use_cuda: bool = False,
criterion=None,
cache=False,
):
state = load_fsspec(checkpoint_path, map_location=torch.device("cpu"), cache=cache)
try:
self.load_state_dict(state["model"])
print(" > Model fully restored. ")
except (KeyError, RuntimeError) as error:
# If eval raise the error
if eval:
raise error
print(" > Partial model initialization.")
model_dict = self.state_dict()
model_dict = set_init_dict(model_dict, state["model"], c)
self.load_state_dict(model_dict)
del model_dict
# load the criterion for restore_path
if criterion is not None and "criterion" in state:
try:
criterion.load_state_dict(state["criterion"])
except (KeyError, RuntimeError) as error:
print(" > Criterion load ignored because of:", error)
# instance and load the criterion for the encoder classifier in inference time
if (
eval
and criterion is None
and "criterion" in state
and getattr(config, "map_classid_to_classname", None) is not None
):
criterion = self.get_criterion(config, len(config.map_classid_to_classname))
criterion.load_state_dict(state["criterion"])
if use_cuda:
self.cuda()
if criterion is not None:
criterion = criterion.cuda()
if eval:
self.eval()
assert not self.training
if not eval:
return criterion, state["step"]
return criterion
+99
View File
@@ -0,0 +1,99 @@
import torch
from torch import nn
from TTS.encoder.models.base_encoder import BaseEncoder
class LSTMWithProjection(nn.Module):
def __init__(self, input_size, hidden_size, proj_size):
super().__init__()
self.input_size = input_size
self.hidden_size = hidden_size
self.proj_size = proj_size
self.lstm = nn.LSTM(input_size, hidden_size, batch_first=True)
self.linear = nn.Linear(hidden_size, proj_size, bias=False)
def forward(self, x):
self.lstm.flatten_parameters()
o, (_, _) = self.lstm(x)
return self.linear(o)
class LSTMWithoutProjection(nn.Module):
def __init__(self, input_dim, lstm_dim, proj_dim, num_lstm_layers):
super().__init__()
self.lstm = nn.LSTM(input_size=input_dim, hidden_size=lstm_dim, num_layers=num_lstm_layers, batch_first=True)
self.linear = nn.Linear(lstm_dim, proj_dim, bias=True)
self.relu = nn.ReLU()
def forward(self, x):
_, (hidden, _) = self.lstm(x)
return self.relu(self.linear(hidden[-1]))
class LSTMSpeakerEncoder(BaseEncoder):
def __init__(
self,
input_dim,
proj_dim=256,
lstm_dim=768,
num_lstm_layers=3,
use_lstm_with_projection=True,
use_torch_spec=False,
audio_config=None,
):
super().__init__()
self.use_lstm_with_projection = use_lstm_with_projection
self.use_torch_spec = use_torch_spec
self.audio_config = audio_config
self.proj_dim = proj_dim
layers = []
# choise LSTM layer
if use_lstm_with_projection:
layers.append(LSTMWithProjection(input_dim, lstm_dim, proj_dim))
for _ in range(num_lstm_layers - 1):
layers.append(LSTMWithProjection(proj_dim, lstm_dim, proj_dim))
self.layers = nn.Sequential(*layers)
else:
self.layers = LSTMWithoutProjection(input_dim, lstm_dim, proj_dim, num_lstm_layers)
self.instancenorm = nn.InstanceNorm1d(input_dim)
if self.use_torch_spec:
self.torch_spec = self.get_torch_mel_spectrogram_class(audio_config)
else:
self.torch_spec = None
self._init_layers()
def _init_layers(self):
for name, param in self.layers.named_parameters():
if "bias" in name:
nn.init.constant_(param, 0.0)
elif "weight" in name:
nn.init.xavier_normal_(param)
def forward(self, x, l2_norm=True):
"""Forward pass of the model.
Args:
x (Tensor): Raw waveform signal or spectrogram frames. If input is a waveform, `torch_spec` must be `True`
to compute the spectrogram on-the-fly.
l2_norm (bool): Whether to L2-normalize the outputs.
Shapes:
- x: :math:`(N, 1, T_{in})` or :math:`(N, D_{spec}, T_{in})`
"""
with torch.no_grad():
with torch.cuda.amp.autocast(enabled=False):
if self.use_torch_spec:
x.squeeze_(1)
x = self.torch_spec(x)
x = self.instancenorm(x).transpose(1, 2)
d = self.layers(x)
if self.use_lstm_with_projection:
d = d[:, -1]
if l2_norm:
d = torch.nn.functional.normalize(d, p=2, dim=1)
return d
+198
View File
@@ -0,0 +1,198 @@
import torch
from torch import nn
# from TTS.utils.audio.torch_transforms import TorchSTFT
from TTS.encoder.models.base_encoder import BaseEncoder
class SELayer(nn.Module):
def __init__(self, channel, reduction=8):
super(SELayer, self).__init__()
self.avg_pool = nn.AdaptiveAvgPool2d(1)
self.fc = nn.Sequential(
nn.Linear(channel, channel // reduction),
nn.ReLU(inplace=True),
nn.Linear(channel // reduction, channel),
nn.Sigmoid(),
)
def forward(self, x):
b, c, _, _ = x.size()
y = self.avg_pool(x).view(b, c)
y = self.fc(y).view(b, c, 1, 1)
return x * y
class SEBasicBlock(nn.Module):
expansion = 1
def __init__(self, inplanes, planes, stride=1, downsample=None, reduction=8):
super(SEBasicBlock, self).__init__()
self.conv1 = nn.Conv2d(inplanes, planes, kernel_size=3, stride=stride, padding=1, bias=False)
self.bn1 = nn.BatchNorm2d(planes)
self.conv2 = nn.Conv2d(planes, planes, kernel_size=3, padding=1, bias=False)
self.bn2 = nn.BatchNorm2d(planes)
self.relu = nn.ReLU(inplace=True)
self.se = SELayer(planes, reduction)
self.downsample = downsample
self.stride = stride
def forward(self, x):
residual = x
out = self.conv1(x)
out = self.relu(out)
out = self.bn1(out)
out = self.conv2(out)
out = self.bn2(out)
out = self.se(out)
if self.downsample is not None:
residual = self.downsample(x)
out += residual
out = self.relu(out)
return out
class ResNetSpeakerEncoder(BaseEncoder):
"""Implementation of the model H/ASP without batch normalization in speaker embedding. This model was proposed in: https://arxiv.org/abs/2009.14153
Adapted from: https://github.com/clovaai/voxceleb_trainer
"""
# pylint: disable=W0102
def __init__(
self,
input_dim=64,
proj_dim=512,
layers=[3, 4, 6, 3],
num_filters=[32, 64, 128, 256],
encoder_type="ASP",
log_input=False,
use_torch_spec=False,
audio_config=None,
):
super(ResNetSpeakerEncoder, self).__init__()
self.encoder_type = encoder_type
self.input_dim = input_dim
self.log_input = log_input
self.use_torch_spec = use_torch_spec
self.audio_config = audio_config
self.proj_dim = proj_dim
self.conv1 = nn.Conv2d(1, num_filters[0], kernel_size=3, stride=1, padding=1)
self.relu = nn.ReLU(inplace=True)
self.bn1 = nn.BatchNorm2d(num_filters[0])
self.inplanes = num_filters[0]
self.layer1 = self.create_layer(SEBasicBlock, num_filters[0], layers[0])
self.layer2 = self.create_layer(SEBasicBlock, num_filters[1], layers[1], stride=(2, 2))
self.layer3 = self.create_layer(SEBasicBlock, num_filters[2], layers[2], stride=(2, 2))
self.layer4 = self.create_layer(SEBasicBlock, num_filters[3], layers[3], stride=(2, 2))
self.instancenorm = nn.InstanceNorm1d(input_dim)
if self.use_torch_spec:
self.torch_spec = self.get_torch_mel_spectrogram_class(audio_config)
else:
self.torch_spec = None
outmap_size = int(self.input_dim / 8)
self.attention = nn.Sequential(
nn.Conv1d(num_filters[3] * outmap_size, 128, kernel_size=1),
nn.ReLU(),
nn.BatchNorm1d(128),
nn.Conv1d(128, num_filters[3] * outmap_size, kernel_size=1),
nn.Softmax(dim=2),
)
if self.encoder_type == "SAP":
out_dim = num_filters[3] * outmap_size
elif self.encoder_type == "ASP":
out_dim = num_filters[3] * outmap_size * 2
else:
raise ValueError("Undefined encoder")
self.fc = nn.Linear(out_dim, proj_dim)
self._init_layers()
def _init_layers(self):
for m in self.modules():
if isinstance(m, nn.Conv2d):
nn.init.kaiming_normal_(m.weight, mode="fan_out", nonlinearity="relu")
elif isinstance(m, nn.BatchNorm2d):
nn.init.constant_(m.weight, 1)
nn.init.constant_(m.bias, 0)
def create_layer(self, block, planes, blocks, stride=1):
downsample = None
if stride != 1 or self.inplanes != planes * block.expansion:
downsample = nn.Sequential(
nn.Conv2d(self.inplanes, planes * block.expansion, kernel_size=1, stride=stride, bias=False),
nn.BatchNorm2d(planes * block.expansion),
)
layers = []
layers.append(block(self.inplanes, planes, stride, downsample))
self.inplanes = planes * block.expansion
for _ in range(1, blocks):
layers.append(block(self.inplanes, planes))
return nn.Sequential(*layers)
# pylint: disable=R0201
def new_parameter(self, *size):
out = nn.Parameter(torch.FloatTensor(*size))
nn.init.xavier_normal_(out)
return out
def forward(self, x, l2_norm=False):
"""Forward pass of the model.
Args:
x (Tensor): Raw waveform signal or spectrogram frames. If input is a waveform, `torch_spec` must be `True`
to compute the spectrogram on-the-fly.
l2_norm (bool): Whether to L2-normalize the outputs.
Shapes:
- x: :math:`(N, 1, T_{in})` or :math:`(N, D_{spec}, T_{in})`
"""
x.squeeze_(1)
# if you torch spec compute it otherwise use the mel spec computed by the AP
if self.use_torch_spec:
x = self.torch_spec(x)
if self.log_input:
x = (x + 1e-6).log()
x = self.instancenorm(x).unsqueeze(1)
x = self.conv1(x)
x = self.relu(x)
x = self.bn1(x)
x = self.layer1(x)
x = self.layer2(x)
x = self.layer3(x)
x = self.layer4(x)
x = x.reshape(x.size()[0], -1, x.size()[-1])
w = self.attention(x)
if self.encoder_type == "SAP":
x = torch.sum(x * w, dim=2)
elif self.encoder_type == "ASP":
mu = torch.sum(x * w, dim=2)
sg = torch.sqrt((torch.sum((x**2) * w, dim=2) - mu**2).clamp(min=1e-5))
x = torch.cat((mu, sg), 1)
x = x.view(x.size()[0], -1)
x = self.fc(x)
if l2_norm:
x = torch.nn.functional.normalize(x, p=2, dim=1)
return x
+2
View File
@@ -0,0 +1,2 @@
umap-learn
numpy>=1.17.0
View File
+136
View File
@@ -0,0 +1,136 @@
import glob
import os
import random
import numpy as np
from scipy import signal
from TTS.encoder.models.lstm import LSTMSpeakerEncoder
from TTS.encoder.models.resnet import ResNetSpeakerEncoder
class AugmentWAV(object):
def __init__(self, ap, augmentation_config):
self.ap = ap
self.use_additive_noise = False
if "additive" in augmentation_config.keys():
self.additive_noise_config = augmentation_config["additive"]
additive_path = self.additive_noise_config["sounds_path"]
if additive_path:
self.use_additive_noise = True
# get noise types
self.additive_noise_types = []
for key in self.additive_noise_config.keys():
if isinstance(self.additive_noise_config[key], dict):
self.additive_noise_types.append(key)
additive_files = glob.glob(os.path.join(additive_path, "**/*.wav"), recursive=True)
self.noise_list = {}
for wav_file in additive_files:
noise_dir = wav_file.replace(additive_path, "").split(os.sep)[0]
# ignore not listed directories
if noise_dir not in self.additive_noise_types:
continue
if not noise_dir in self.noise_list:
self.noise_list[noise_dir] = []
self.noise_list[noise_dir].append(wav_file)
print(
f" | > Using Additive Noise Augmentation: with {len(additive_files)} audios instances from {self.additive_noise_types}"
)
self.use_rir = False
if "rir" in augmentation_config.keys():
self.rir_config = augmentation_config["rir"]
if self.rir_config["rir_path"]:
self.rir_files = glob.glob(os.path.join(self.rir_config["rir_path"], "**/*.wav"), recursive=True)
self.use_rir = True
print(f" | > Using RIR Noise Augmentation: with {len(self.rir_files)} audios instances")
self.create_augmentation_global_list()
def create_augmentation_global_list(self):
if self.use_additive_noise:
self.global_noise_list = self.additive_noise_types
else:
self.global_noise_list = []
if self.use_rir:
self.global_noise_list.append("RIR_AUG")
def additive_noise(self, noise_type, audio):
clean_db = 10 * np.log10(np.mean(audio**2) + 1e-4)
noise_list = random.sample(
self.noise_list[noise_type],
random.randint(
self.additive_noise_config[noise_type]["min_num_noises"],
self.additive_noise_config[noise_type]["max_num_noises"],
),
)
audio_len = audio.shape[0]
noises_wav = None
for noise in noise_list:
noiseaudio = self.ap.load_wav(noise, sr=self.ap.sample_rate)[:audio_len]
if noiseaudio.shape[0] < audio_len:
continue
noise_snr = random.uniform(
self.additive_noise_config[noise_type]["min_snr_in_db"],
self.additive_noise_config[noise_type]["max_num_noises"],
)
noise_db = 10 * np.log10(np.mean(noiseaudio**2) + 1e-4)
noise_wav = np.sqrt(10 ** ((clean_db - noise_db - noise_snr) / 10)) * noiseaudio
if noises_wav is None:
noises_wav = noise_wav
else:
noises_wav += noise_wav
# if all possible files is less than audio, choose other files
if noises_wav is None:
return self.additive_noise(noise_type, audio)
return audio + noises_wav
def reverberate(self, audio):
audio_len = audio.shape[0]
rir_file = random.choice(self.rir_files)
rir = self.ap.load_wav(rir_file, sr=self.ap.sample_rate)
rir = rir / np.sqrt(np.sum(rir**2))
return signal.convolve(audio, rir, mode=self.rir_config["conv_mode"])[:audio_len]
def apply_one(self, audio):
noise_type = random.choice(self.global_noise_list)
if noise_type == "RIR_AUG":
return self.reverberate(audio)
return self.additive_noise(noise_type, audio)
def setup_encoder_model(config: "Coqpit"):
if config.model_params["model_name"].lower() == "lstm":
model = LSTMSpeakerEncoder(
config.model_params["input_dim"],
config.model_params["proj_dim"],
config.model_params["lstm_dim"],
config.model_params["num_lstm_layers"],
use_torch_spec=config.model_params.get("use_torch_spec", False),
audio_config=config.audio,
)
elif config.model_params["model_name"].lower() == "resnet":
model = ResNetSpeakerEncoder(
input_dim=config.model_params["input_dim"],
proj_dim=config.model_params["proj_dim"],
log_input=config.model_params.get("log_input", False),
use_torch_spec=config.model_params.get("use_torch_spec", False),
audio_config=config.audio,
)
return model
+219
View File
@@ -0,0 +1,219 @@
# coding=utf-8
# Copyright (C) 2020 ATHENA AUTHORS; Yiping Peng; Ne Luo
# All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
# Only support eager mode and TF>=2.0.0
# pylint: disable=no-member, invalid-name, relative-beyond-top-level
# pylint: disable=too-many-locals, too-many-statements, too-many-arguments, too-many-instance-attributes
""" voxceleb 1 & 2 """
import hashlib
import os
import subprocess
import sys
import zipfile
import pandas
import soundfile as sf
from absl import logging
SUBSETS = {
"vox1_dev_wav": [
"https://thor.robots.ox.ac.uk/~vgg/data/voxceleb/vox1a/vox1_dev_wav_partaa",
"https://thor.robots.ox.ac.uk/~vgg/data/voxceleb/vox1a/vox1_dev_wav_partab",
"https://thor.robots.ox.ac.uk/~vgg/data/voxceleb/vox1a/vox1_dev_wav_partac",
"https://thor.robots.ox.ac.uk/~vgg/data/voxceleb/vox1a/vox1_dev_wav_partad",
],
"vox1_test_wav": ["https://thor.robots.ox.ac.uk/~vgg/data/voxceleb/vox1a/vox1_test_wav.zip"],
"vox2_dev_aac": [
"https://thor.robots.ox.ac.uk/~vgg/data/voxceleb/vox1a/vox2_dev_aac_partaa",
"https://thor.robots.ox.ac.uk/~vgg/data/voxceleb/vox1a/vox2_dev_aac_partab",
"https://thor.robots.ox.ac.uk/~vgg/data/voxceleb/vox1a/vox2_dev_aac_partac",
"https://thor.robots.ox.ac.uk/~vgg/data/voxceleb/vox1a/vox2_dev_aac_partad",
"https://thor.robots.ox.ac.uk/~vgg/data/voxceleb/vox1a/vox2_dev_aac_partae",
"https://thor.robots.ox.ac.uk/~vgg/data/voxceleb/vox1a/vox2_dev_aac_partaf",
"https://thor.robots.ox.ac.uk/~vgg/data/voxceleb/vox1a/vox2_dev_aac_partag",
"https://thor.robots.ox.ac.uk/~vgg/data/voxceleb/vox1a/vox2_dev_aac_partah",
],
"vox2_test_aac": ["https://thor.robots.ox.ac.uk/~vgg/data/voxceleb/vox1a/vox2_test_aac.zip"],
}
MD5SUM = {
"vox1_dev_wav": "ae63e55b951748cc486645f532ba230b",
"vox2_dev_aac": "bbc063c46078a602ca71605645c2a402",
"vox1_test_wav": "185fdc63c3c739954633d50379a3d102",
"vox2_test_aac": "0d2b3ea430a821c33263b5ea37ede312",
}
USER = {"user": "", "password": ""}
speaker_id_dict = {}
def download_and_extract(directory, subset, urls):
"""Download and extract the given split of dataset.
Args:
directory: the directory where to put the downloaded data.
subset: subset name of the corpus.
urls: the list of urls to download the data file.
"""
os.makedirs(directory, exist_ok=True)
try:
for url in urls:
zip_filepath = os.path.join(directory, url.split("/")[-1])
if os.path.exists(zip_filepath):
continue
logging.info("Downloading %s to %s" % (url, zip_filepath))
subprocess.call(
"wget %s --user %s --password %s -O %s" % (url, USER["user"], USER["password"], zip_filepath),
shell=True,
)
statinfo = os.stat(zip_filepath)
logging.info("Successfully downloaded %s, size(bytes): %d" % (url, statinfo.st_size))
# concatenate all parts into zip files
if ".zip" not in zip_filepath:
zip_filepath = "_".join(zip_filepath.split("_")[:-1])
subprocess.call("cat %s* > %s.zip" % (zip_filepath, zip_filepath), shell=True)
zip_filepath += ".zip"
extract_path = zip_filepath.strip(".zip")
# check zip file md5sum
with open(zip_filepath, "rb") as f_zip:
md5 = hashlib.md5(f_zip.read()).hexdigest()
if md5 != MD5SUM[subset]:
raise ValueError("md5sum of %s mismatch" % zip_filepath)
with zipfile.ZipFile(zip_filepath, "r") as zfile:
zfile.extractall(directory)
extract_path_ori = os.path.join(directory, zfile.infolist()[0].filename)
subprocess.call("mv %s %s" % (extract_path_ori, extract_path), shell=True)
finally:
# os.remove(zip_filepath)
pass
def exec_cmd(cmd):
"""Run a command in a subprocess.
Args:
cmd: command line to be executed.
Return:
int, the return code.
"""
try:
retcode = subprocess.call(cmd, shell=True)
if retcode < 0:
logging.info(f"Child was terminated by signal {retcode}")
except OSError as e:
logging.info(f"Execution failed: {e}")
retcode = -999
return retcode
def decode_aac_with_ffmpeg(aac_file, wav_file):
"""Decode a given AAC file into WAV using ffmpeg.
Args:
aac_file: file path to input AAC file.
wav_file: file path to output WAV file.
Return:
bool, True if success.
"""
cmd = f"ffmpeg -i {aac_file} {wav_file}"
logging.info(f"Decoding aac file using command line: {cmd}")
ret = exec_cmd(cmd)
if ret != 0:
logging.error(f"Failed to decode aac file with retcode {ret}")
logging.error("Please check your ffmpeg installation.")
return False
return True
def convert_audio_and_make_label(input_dir, subset, output_dir, output_file):
"""Optionally convert AAC to WAV and make speaker labels.
Args:
input_dir: the directory which holds the input dataset.
subset: the name of the specified subset. e.g. vox1_dev_wav
output_dir: the directory to place the newly generated csv files.
output_file: the name of the newly generated csv file. e.g. vox1_dev_wav.csv
"""
logging.info("Preprocessing audio and label for subset %s" % subset)
source_dir = os.path.join(input_dir, subset)
files = []
# Convert all AAC file into WAV format. At the same time, generate the csv
for root, _, filenames in os.walk(source_dir):
for filename in filenames:
name, ext = os.path.splitext(filename)
if ext.lower() == ".wav":
_, ext2 = os.path.splitext(name)
if ext2:
continue
wav_file = os.path.join(root, filename)
elif ext.lower() == ".m4a":
# Convert AAC to WAV.
aac_file = os.path.join(root, filename)
wav_file = aac_file + ".wav"
if not os.path.exists(wav_file):
if not decode_aac_with_ffmpeg(aac_file, wav_file):
raise RuntimeError("Audio decoding failed.")
else:
continue
speaker_name = root.split(os.path.sep)[-2]
if speaker_name not in speaker_id_dict:
num = len(speaker_id_dict)
speaker_id_dict[speaker_name] = num
# wav_filesize = os.path.getsize(wav_file)
wav_length = len(sf.read(wav_file)[0])
files.append((os.path.abspath(wav_file), wav_length, speaker_id_dict[speaker_name], speaker_name))
# Write to CSV file which contains four columns:
# "wav_filename", "wav_length_ms", "speaker_id", "speaker_name".
csv_file_path = os.path.join(output_dir, output_file)
df = pandas.DataFrame(data=files, columns=["wav_filename", "wav_length_ms", "speaker_id", "speaker_name"])
df.to_csv(csv_file_path, index=False, sep="\t")
logging.info("Successfully generated csv file {}".format(csv_file_path))
def processor(directory, subset, force_process):
"""download and process"""
urls = SUBSETS
if subset not in urls:
raise ValueError(subset, "is not in voxceleb")
subset_csv = os.path.join(directory, subset + ".csv")
if not force_process and os.path.exists(subset_csv):
return subset_csv
logging.info("Downloading and process the voxceleb in %s", directory)
logging.info("Preparing subset %s", subset)
download_and_extract(directory, subset, urls[subset])
convert_audio_and_make_label(directory, subset, directory, subset + ".csv")
logging.info("Finished downloading and processing")
return subset_csv
if __name__ == "__main__":
logging.set_verbosity(logging.INFO)
if len(sys.argv) != 4:
print("Usage: python prepare_data.py save_directory user password")
sys.exit()
DIR, USER["user"], USER["password"] = sys.argv[1], sys.argv[2], sys.argv[3]
for SUBSET in SUBSETS:
processor(DIR, SUBSET, False)
+99
View File
@@ -0,0 +1,99 @@
import os
from dataclasses import dataclass, field
from coqpit import Coqpit
from trainer import TrainerArgs, get_last_checkpoint
from trainer.io import copy_model_files
from trainer.logging import logger_factory
from trainer.logging.console_logger import ConsoleLogger
from TTS.config import load_config, register_config
from TTS.tts.utils.text.characters import parse_symbols
from TTS.utils.generic_utils import get_experiment_folder_path, get_git_branch
@dataclass
class TrainArgs(TrainerArgs):
config_path: str = field(default=None, metadata={"help": "Path to the config file."})
def getarguments():
train_config = TrainArgs()
parser = train_config.init_argparse(arg_prefix="")
return parser
def process_args(args, config=None):
"""Process parsed comand line arguments and initialize the config if not provided.
Args:
args (argparse.Namespace or dict like): Parsed input arguments.
config (Coqpit): Model config. If none, it is generated from `args`. Defaults to None.
Returns:
c (TTS.utils.io.AttrDict): Config paramaters.
out_path (str): Path to save models and logging.
audio_path (str): Path to save generated test audios.
c_logger (TTS.utils.console_logger.ConsoleLogger): Class that does
logging to the console.
dashboard_logger (WandbLogger or TensorboardLogger): Class that does the dashboard Logging
TODO:
- Interactive config definition.
"""
if isinstance(args, tuple):
args, coqpit_overrides = args
if args.continue_path:
# continue a previous training from its output folder
experiment_path = args.continue_path
args.config_path = os.path.join(args.continue_path, "config.json")
args.restore_path, best_model = get_last_checkpoint(args.continue_path)
if not args.best_path:
args.best_path = best_model
# init config if not already defined
if config is None:
if args.config_path:
# init from a file
config = load_config(args.config_path)
else:
# init from console args
from TTS.config.shared_configs import BaseTrainingConfig # pylint: disable=import-outside-toplevel
config_base = BaseTrainingConfig()
config_base.parse_known_args(coqpit_overrides)
config = register_config(config_base.model)()
# override values from command-line args
config.parse_known_args(coqpit_overrides, relaxed_parser=True)
experiment_path = args.continue_path
if not experiment_path:
experiment_path = get_experiment_folder_path(config.output_path, config.run_name)
audio_path = os.path.join(experiment_path, "test_audios")
config.output_log_path = experiment_path
# setup rank 0 process in distributed training
dashboard_logger = None
if args.rank == 0:
new_fields = {}
if args.restore_path:
new_fields["restore_path"] = args.restore_path
new_fields["github_branch"] = get_git_branch()
# if model characters are not set in the config file
# save the default set to the config file for future
# compatibility.
if config.has("characters") and config.characters is None:
used_characters = parse_symbols()
new_fields["characters"] = used_characters
copy_model_files(config, experiment_path, new_fields)
dashboard_logger = logger_factory(config, experiment_path)
c_logger = ConsoleLogger()
return config, experiment_path, audio_path, c_logger, dashboard_logger
def init_arguments():
train_config = TrainArgs()
parser = train_config.init_argparse(arg_prefix="")
return parser
def init_training(config: Coqpit = None):
"""Initialization of a training run."""
parser = init_arguments()
args = parser.parse_known_args()
config, OUT_PATH, AUDIO_PATH, c_logger, dashboard_logger = process_args(args, config)
return args[0], config, OUT_PATH, AUDIO_PATH, c_logger, dashboard_logger
+50
View File
@@ -0,0 +1,50 @@
import matplotlib
import matplotlib.pyplot as plt
import numpy as np
import umap
matplotlib.use("Agg")
colormap = (
np.array(
[
[76, 255, 0],
[0, 127, 70],
[255, 0, 0],
[255, 217, 38],
[0, 135, 255],
[165, 0, 165],
[255, 167, 255],
[0, 255, 255],
[255, 96, 38],
[142, 76, 0],
[33, 0, 127],
[0, 0, 0],
[183, 183, 183],
],
dtype=float,
)
/ 255
)
def plot_embeddings(embeddings, num_classes_in_batch):
num_utter_per_class = embeddings.shape[0] // num_classes_in_batch
# if necessary get just the first 10 classes
if num_classes_in_batch > 10:
num_classes_in_batch = 10
embeddings = embeddings[: num_classes_in_batch * num_utter_per_class]
model = umap.UMAP()
projection = model.fit_transform(embeddings)
ground_truth = np.repeat(np.arange(num_classes_in_batch), num_utter_per_class)
colors = [colormap[i] for i in ground_truth]
fig, ax = plt.subplots(figsize=(16, 10))
_ = ax.scatter(projection[:, 0], projection[:, 1], c=colors)
plt.gca().set_aspect("equal", "datalim")
plt.title("UMAP projection")
plt.tight_layout()
plt.savefig("umap")
return fig