# # -*- coding: utf-8 -*- # """ML Engineer Assignment: Bangladeshi Bangla TTS Finetuning.ipynb # Automatically generated by Colab. # Original file is located at # https://colab.research.google.com/drive/12ZrU_dlECt3YzVZ7k7qpwySH3eXUS7bj # """ import gradio as gr from inference import run_tts def text_to_speech(text): return run_tts(text) demo = gr.Interface( fn=text_to_speech, inputs="text", outputs="audio", title="Bangla Text to Speech", description="Enter Bangla text and hear the generated audio." ) if __name__ == "__main__": demo.launch() # # pip install transformers datasets torch torchaudio librosa # # pip install coqui-tts phonemizer espeak-ng # # pip install wandb tensorboard matplotlib seaborn # # git lfs install # git clone https://huggingface.co/bangla-speech-processing/bangla_tts_female # ls bangla_tts_female # tts --model_path bangla_tts_female/pytorch_model.pth \ # --config_path bangla_tts_female/config.json \ # --text "আমি বাংলাদেশ থেকে এসেছি।" \ # --out_path baseline.wav # from IPython.display import Audio # Audio("baseline.wav") # sentences = [ # "আমি বাংলাদেশ থেকে এসেছি।", # "আজকের আবহাওয়া সুন্দর।", # "তুমি কোথায় যাচ্ছ?", # "আমরা ঢাকায় থাকি।", # "এটা আমার প্রিয় বই।" # ] # for i, text in enumerate(sentences, 1): # safe_text = text.replace('"', '\\"') # tts --model_path bangla_tts_female/pytorch_model.pth \ # --config_path bangla_tts_female/config.json \ # --text "{safe_text}" \ # --out_path "baseline_{i}.wav" # from IPython.display import Audio # Audio("baseline_2.wav") # """Checking the config.json""" # import json # with open("bangla_tts_female/config.json", "r", encoding="utf-8") as f: # config = json.load(f) # print(json.dumps(config, indent=2, ensure_ascii=False)) # """Count parameters""" # from TTS.utils.synthesizer import Synthesizer # import torch # synthesizer = Synthesizer( # tts_checkpoint="bangla_tts_female/pytorch_model.pth", # tts_config_path="bangla_tts_female/config.json", # use_cuda=torch.cuda.is_available() # ) # model_params = sum(p.numel() for p in synthesizer.tts_model.parameters()) # print(f"Total parameters: {model_params:,}") # """Check tokenizer / phoneme system""" # print("Phonemizer:", config.get("phonemizer", "Not specified")) # print("Characters:", config.get("characters", "Not specified")) # """# Task 2""" # !wget https://www.openslr.org/resources/53/asr_bengali_6.zip # !unzip asr_bengali_6.zip -d openslr_53 # !find /content -type d -name "*asr_bengali*" # !ls /content/openslr_53/asr_bengali # import pandas as pd # tsv_path = "/content/openslr_53/asr_bengali/utt_spk_text.tsv" # df = pd.read_csv(tsv_path, sep="\t", header=None, names=["utt_id", "speaker_id", "text"]) # print(df.head()) # import os # audio_dir = "/content/openslr_53/asr_bengali/data" # df["audio_path"] = df["utt_id"].apply(lambda x: os.path.join(audio_dir, f"{x}.wav")) # print(df.head()) # df = df[df["audio_path"].apply(os.path.exists)] # print(f"Total usable audio files: {len(df)}") # import os, glob # import pandas as pd # tsv_path = "/content/openslr_53/asr_bengali/utt_spk_text.tsv" # df = pd.read_csv(tsv_path, sep="\t", header=None, names=["utt_id", "speaker_id", "text"]) # file_dict = { # os.path.splitext(os.path.basename(f))[0]: f # for f in glob.glob("/content/openslr_53/asr_bengali/data/**/*.flac", recursive=True) # } # df["audio_path"] = df["utt_id"].map(file_dict) # df = df[df["audio_path"].notnull()] # print(f"Usable audio files: {len(df)}") # print(df.head()) # !find /content/openslr_53/asr_bengali/data -type f | head -20 # import librosa # import numpy as np # durations = [] # for path in df["audio_path"].sample(100): # y, sr = librosa.load(path, sr=None) # durations.append(len(y) / sr) # print(f"Total samples: {len(df)}") # print(f"Duration: min={np.min(durations):.2f}s, mean={np.mean(durations):.2f}s, max={np.max(durations):.2f}s") # print(f"Unique speakers: {df['speaker_id'].nunique()}") # import pandas as pd # sample_df = df.sample(300, random_state=42) # sample_df.to_csv("accent_labeling_sample.csv", index=False) # from google.colab import files # files.download("accent_labeling_sample.csv") # from google.colab import files # uploaded = files.upload() # import pandas as pd # labeled_df = pd.read_csv("accent_labeling_sample.csv") # print(labeled_df.columns) # sample_df = df.sample(300, random_state=42) # sample_df.to_csv("accent_labeling_sample.csv", index=False) # import pandas as pd # label_df = df.sample(50, random_state=42).reset_index(drop=True) # label_df["accent_label"] = None # label_df.to_csv("labeling_in_progress.csv", index=False) # from IPython.display import Audio, display # import ipywidgets as widgets # label_df = pd.read_csv("labeling_in_progress.csv") # def label_clip(idx, label): # label_df.loc[idx, "accent_label"] = label # label_df.to_csv("labeling_in_progress.csv", index=False) # print(f"Labeled index {idx} as {'BD' if label==1 else 'IN'}") # def play_and_label(idx): # if idx >= len(label_df): # print("✅ All clips labeled!") # return # row = label_df.iloc[idx] # print(f"Index: {idx} | Speaker: {row['speaker_id']}") # print(f"Text: {row['text']}") # display(Audio(row["audio_path"])) # bd_btn = widgets.Button(description="BD Accent (1)", button_style='success') # in_btn = widgets.Button(description="IN Accent (0)", button_style='danger') # skip_btn = widgets.Button(description="Skip", button_style='warning') # def on_bd(b): # label_clip(idx, 1) # play_and_label(idx+1) # def on_in(b): # label_clip(idx, 0) # play_and_label(idx+1) # def on_skip(b): # label_clip(idx, None) # play_and_label(idx+1) # bd_btn.on_click(on_bd) # in_btn.on_click(on_in) # skip_btn.on_click(on_skip) # display(widgets.HBox([bd_btn, in_btn, skip_btn])) # play_and_label(0) # final_labels = pd.read_csv("labeling_in_progress.csv") # final_labels = final_labels.dropna(subset=["accent_label"]) # final_labels.to_csv("accent_labeling_sample_labeled.csv", index=False) # print(f"Saved {len(final_labels)} labeled samples.") # import librosa # import numpy as np # import pandas as pd # from sklearn.ensemble import RandomForestClassifier # from sklearn.model_selection import train_test_split # from sklearn.metrics import classification_report # labeled_df = pd.read_csv("accent_labeling_sample_labeled.csv") # def extract_mfcc(path, n_mfcc=13): # y, sr = librosa.load(path, sr=22050) # mfcc = librosa.feature.mfcc(y=y, sr=sr, n_mfcc=n_mfcc) # return np.mean(mfcc, axis=1) # X = np.array([extract_mfcc(p) for p in labeled_df["audio_path"]]) # y = np.array(labeled_df["accent_label"]) # X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42) # clf = RandomForestClassifier(n_estimators=200, random_state=42) # clf.fit(X_train, y_train) # y_pred = clf.predict(X_test) # print(classification_report(y_test, y_pred)) # df["accent_label"] = df["audio_path"].apply(lambda p: clf.predict([extract_mfcc(p)])[0]) # bd_df = df[df["accent_label"] == 1] # print(f"Bangladeshi-accent samples: {len(bd_df)}") # bd_df.to_csv("bd_openslr53.csv", index=False) # wget https://www.openslr.org/resources/53/asr_bengali_a.zip # unzip asr_bengali_a.zip -d asr_bengali_a # ls asr_bengali_a # find asr_bengali_a -type f | head -20 # find /content -type d -name "*asr_bengali*" # ls /content/asr_bengali_a/asr_bengali # import pandas as pd # import glob, os # tsv_path = "/content/asr_bengali_a/asr_bengali/utt_spk_text.tsv" # df_a = pd.read_csv(tsv_path, sep="\t", names=["utt_id", "speaker_id", "text"]) # audio_files = glob.glob("asr_bengali_a/data/**/*.flac", recursive=True) # audio_map = {os.path.splitext(os.path.basename(f))[0]: f for f in audio_files} # df_a["audio_path"] = df_a["utt_id"].map(audio_map) # df_a = df_a.dropna(subset=["audio_path"]) # print(df_a.head()) # df_a["accent_label"] = df_a["audio_path"].apply(lambda p: clf.predict([extract_mfcc(p)])[0]) # bd_df_a = df_a[df_a["accent_label"] == 1] # print(f"Bangladeshi-accent samples: {len(bd_df_a)}") # bd_df_a.to_csv("bd_asr_bengali_a.csv", index=False) # final_df = pd.concat([ # pd.read_csv("bd_openslr53.csv"), # pd.read_csv("bd_asr_bengali_a.csv") # ]) # final_df.to_csv("bd_combined_dataset.csv", index=False) # import soundfile as sf # import os # os.makedirs("processed_bd_audio", exist_ok=True) # meta_lines = [] # for i, row in final_df.iterrows(): # y, sr = librosa.load(row["audio_path"], sr=22050) # y, _ = librosa.effects.trim(y) # y = y / (np.max(np.abs(y)) + 1e-9) # out_path = f"processed_bd_audio/{i}.wav" # sf.write(out_path, y, 22050) # meta_lines.append(f"{out_path}|{row['text']}|bd_speaker") # with open("metadata.csv", "w", encoding="utf-8") as f: # f.write("\n".join(meta_lines)) # """# TASK 3""" # # pip install librosa soundfile scikit-learn joblib numpy tqdm # import os # import numpy as np # import pandas as pd # import librosa # from tqdm import tqdm # from sklearn.ensemble import RandomForestClassifier # from sklearn.model_selection import train_test_split # from sklearn.metrics import classification_report, confusion_matrix, accuracy_score # import joblib # SR = 22050 # N_MFCC = 13 # def extract_accent_features(audio_path, sr=SR, n_mfcc=N_MFCC): # try: # y, orig_sr = librosa.load(audio_path, sr=None) # except: # return None # if orig_sr != sr: # y = librosa.resample(y=y, orig_sr=orig_sr, target_sr=sr) # y, _ = librosa.effects.trim(y, top_db=20) # if y.size == 0: # return None # y = y / (np.max(np.abs(y)) + 1e-9) # features = [] # mfcc = librosa.feature.mfcc(y=y, sr=sr, n_mfcc=n_mfcc) # delta = librosa.feature.delta(mfcc) # features += list(np.mean(mfcc, axis=1)) # features += list(np.std(mfcc, axis=1)) # features += list(np.mean(delta, axis=1)) # features += list(np.std(delta, axis=1)) # cent = librosa.feature.spectral_centroid(y=y, sr=sr) # bw = librosa.feature.spectral_bandwidth(y=y, sr=sr) # rolloff = librosa.feature.spectral_rolloff(y=y, sr=sr) # zcr = librosa.feature.zero_crossing_rate(y) # rms = librosa.feature.rms(y=y) # features += [np.mean(cent), np.std(cent)] # features += [np.mean(bw), np.std(bw)] # features += [np.mean(rolloff), np.std(rolloff)] # features += [np.mean(zcr), np.std(zcr)] # features += [np.mean(rms), np.std(rms)] # try: # f0, voiced_flag, voiced_prob = librosa.pyin(y, fmin=50, fmax=600, sr=sr) # if f0 is None: # f0_stats = [0,0,0,0] # else: # voiced = ~np.isnan(f0) # if voiced.sum() == 0: # f0_stats = [0,0,0,0] # else: # f0_vals = f0[voiced] # f0_stats = [ # np.mean(f0_vals), # np.std(f0_vals), # np.median(f0_vals), # float(np.sum(voiced)) / len(f0) # ] # except: # f0_stats = [0,0,0,0] # features += f0_stats # features += [len(y) / sr] # return np.array(features) # labeled_df = pd.read_csv("accent_labeling_sample_labeled.csv") # Must have: audio_path, accent_label # X, y = [], [] # for _, row in tqdm(labeled_df.iterrows(), total=len(labeled_df)): # feats = extract_accent_features(row["audio_path"]) # if feats is not None: # X.append(feats) # y.append(int(row["accent_label"])) # X = np.vstack(X) # y = np.array(y) # X_train, X_test, y_train, y_test = train_test_split( # X, y, test_size=0.2, random_state=42 # ) # clf = RandomForestClassifier( # n_estimators=300, random_state=42, n_jobs=-1 # ) # clf.fit(X_train, y_train) # y_pred = clf.predict(X_test) # print("✅ Accuracy:", accuracy_score(y_test, y_pred)) # print(classification_report(y_test, y_pred)) # print("Confusion Matrix:\n", confusion_matrix(y_test, y_pred)) # joblib.dump(clf, "accent_rf_model.joblib") # np.save("feature_shape.npy", X.shape[1]) # print("💾 Model saved as accent_rf_model.joblib") # """# TASK 4""" # from transformers import VitsModel # class BDVitsModel(VitsModel): # def __init__(self, config): # super().__init__(config) # self.bd_accent_adapter = torch.nn.Linear(config.hidden_size, config.hidden_size) # def forward(self, input_ids, attention_mask=None, **kwargs): # outputs = super().forward(input_ids, attention_mask=attention_mask, **kwargs) # hidden_states = outputs.last_hidden_state # hidden_states = self.bd_accent_adapter(hidden_states) # return outputs # def bd_text_normalize(text): # text = text.replace("ড়", "র") # text = text.replace("ঋ", "রি") # text = text.replace("ই", "ঈ") if "..." else text # return text # sample_text = "ঋণী ছেলে বড় রাস্তা দিয়ে যাবে।" # normalized_text = bd_text_normalize(sample_text) # print("Original text: ", sample_text) # print("Normalized text:", normalized_text) # def bd_accent_loss(pred_mel, target_mel, pred_phonemes, target_phonemes, accent_weight=0.1, phoneme_weight=0.5): # mel_loss = F.mse_loss(pred_mel, target_mel) # phoneme_loss = F.cross_entropy(pred_phonemes, target_phonemes) # accent_loss = accent_discriminator_loss(pred_mel) # total_loss = mel_loss + phoneme_weight * phoneme_loss + accent_weight * accent_loss # print(f"Mel Loss: {mel_loss.item():.4f} | Phoneme Loss: {phoneme_loss.item():.4f} | " # f"Accent Loss: {accent_loss:.4f} | Total Loss: {total_loss.item():.4f}") # return total_loss # """# TASK 5""" # # !pip install torch torchaudio transformers datasets librosa soundfile wandb accelerate # # !pip install tqdm librosa # import os, time, math, random # import torch # import torch.nn.functional as F # from torch import nn, optim # from torch.utils.data import DataLoader, Dataset # from torch.cuda.amp import autocast, GradScaler # import librosa, soundfile as sf, numpy as np # from tqdm.auto import tqdm # import joblib # import wandb # training_config = { # "learning_rate": 1e-4, # "batch_size": 16, # "warmup_steps": 1000, # "gradient_accumulation_steps": 4, # "mixed_precision": True, # "save_strategy": "steps", # "save_steps": 500, # "eval_steps": 100, # "num_train_epochs": 3, # "device": "cuda" if torch.cuda.is_available() else "cpu", # "output_dir": "/content/drive/MyDrive/bd_tts_finetune", # } # os.makedirs(training_config["output_dir"], exist_ok=True) # import pandas as pd # df = pd.read_csv("metadata.csv", sep="|", names=["audio_path", "text", "accent_label"]) # print(df.head()) # print(df.shape) # head -n 10 metadata.csv # df = pd.read_csv("metadata.csv", sep="|", names=["audio_path", "text"]) # df.to_csv("metadata_clean.csv", index=False) # """# TASK 6""" # import torch # import numpy as np # sample = { # 'text_input': "আমার নাম রাজি", # 'mel_spectrogram': torch.randn(80, 200), # 'audio_waveform': np.random.randn(44100).astype(np.float32), # 'phonemes': ["a", "m", "a", "r", "n", "a", "m", "r", "a", "j", "i"] # } # import librosa # audio_path = "/content/processed_bd_audio/audio.wav" # audio, sr = librosa.load(audio_path, sr=22050) # mel_spectrogram = librosa.feature.melspectrogram(y=audio, sr=sr, n_mels=80) # mel_spectrogram_db = librosa.power_to_db(mel_spectrogram) # import matplotlib.pyplot as plt # plt.figure(figsize=(10, 4)) # plt.imshow(mel_spectrogram_db, aspect='auto', origin='lower', cmap='magma') # plt.colorbar(format='%+2.0f dB') # plt.title('Mel Spectrogram (dB)') # plt.xlabel('Time frames') # plt.ylabel('Mel frequency bins') # plt.show() # plt.figure(figsize=(10, 4)) # plt.imshow(mel_spectrogram_db, aspect='auto', origin='lower', cmap='magma') # plt.colorbar(format='%+2.0f dB') # plt.title('Mel Spectrogram (dB)') # plt.xlabel('Time frames') # plt.ylabel('Mel frequency bins') # plt.savefig("/content/mel_spectrogram.png") # plt.close() # from IPython.display import Image # Image("/content/mel_spectrogram.png") # import torch # mel_tensor = torch.tensor(mel_spectrogram_db).unsqueeze(0) # add batch dim if needed # torch.save(mel_tensor, "/content/mel_spectrogram.pt") # """# TASK 7""" # import torch # import torch.nn as nn # class RelativePositionMultiHeadAttention(nn.Module): # def __init__(self, num_heads=8, k_channels=64): # super().__init__() # self.num_heads = num_heads # self.k_channels = k_channels # self.conv_k = nn.Conv1d(in_channels=k_channels * num_heads, out_channels=k_channels * num_heads, kernel_size=1) # self.conv_v = nn.Conv1d(in_channels=k_channels * num_heads, out_channels=k_channels * num_heads, kernel_size=1) # self.conv_o = nn.Conv1d(in_channels=k_channels * num_heads, out_channels=k_channels * num_heads, kernel_size=1) # @torch.jit.ignore # def attention(self, query, key, value, mask=None): # b = key.size(0) # d = key.size(1) # t_s = key.size(2) # t_t = query.size(2) # query = query.view(b, self.num_heads, self.k_channels, t_t).transpose(2, 3) # key = key.view(b, self.num_heads, self.k_channels, t_s).transpose(2, 3) # value = value.view(b, self.num_heads, self.k_channels, t_s).transpose(2, 3) # scores = torch.matmul(query, key.transpose(-2, -1)) / (self.k_channels ** 0.5) # if mask is not None: # scores = scores.masked_fill(mask == 0, float('-inf')) # attn = torch.softmax(scores, dim=-1) # out = torch.matmul(attn, value) # out = out.transpose(2, 3).contiguous().view(b, d, t_t) # return out, attn # def forward(self, c, attn_mask=None): # q = c # k = self.conv_k(c) # v = self.conv_v(c) # x, self.attn = self.attention(q, k, v, mask=attn_mask) # x = self.conv_o(x) # return x # if __name__ == "__main__": # batch_size = 2 # d_model = 512 # seq_len = 50 # num_heads = 8 # k_channels = d_model // num_heads # model = RelativePositionMultiHeadAttention(num_heads=num_heads, k_channels=k_channels) # c = torch.randn(batch_size, d_model, seq_len) # output = model(c) # print("Output shape:", output.shape) # scripted_model = torch.jit.script(model) # print("TorchScript model compiled successfully.") # b, d, t = 2, 512, 50 # dummy_input = torch.randn(b, d, t) # model = RelativePositionMultiHeadAttention(num_heads=8, k_channels=d//8) # output = model(dummy_input) # print(output.shape) # import torch # import torch.nn as nn # import gradio as gr # import numpy as np # import librosa # class RelativePositionMultiHeadAttention(nn.Module): # def __init__(self, d_model=512, num_heads=8): # super().__init__() # self.num_heads = num_heads # self.k_channels = d_model // num_heads # self.conv_k = nn.Conv1d(d_model, d_model, kernel_size=1) # self.conv_v = nn.Conv1d(d_model, d_model, kernel_size=1) # self.conv_o = nn.Conv1d(d_model, d_model, kernel_size=1) # @torch.jit.ignore # def attention(self, query, key, value, mask=None): # b = key.size(0) # d = key.size(1) # t_s = key.size(2) # t_t = query.size(2) # query = query.view(b, self.num_heads, self.k_channels, t_t).transpose(2, 3) # key = key.view(b, self.num_heads, self.k_channels, t_s).transpose(2, 3) # value = value.view(b, self.num_heads, self.k_channels, t_s).transpose(2, 3) # scores = torch.matmul(query, key.transpose(-2, -1)) / (self.k_channels ** 0.5) # if mask is not None: # scores = scores.masked_fill(mask == 0, float('-inf')) # attn = torch.softmax(scores, dim=-1) # out = torch.matmul(attn, value) # out = out.transpose(2, 3).contiguous().view(b, d, t_t) # return out, attn # def forward(self, c, attn_mask=None): # q = c # k = self.conv_k(c) # v = self.conv_v(c) # x, self.attn = self.attention(q, k, v, mask=attn_mask) # x = self.conv_o(x) # return x # def preprocess_text(text): # bengali_chars = "অআইঈউঊঋএঐওঔকখগঘঙচছজঝঞটঠডঢণতথদধনপফবভমযরলশষসহড়ঢ়য়ড়" # char_to_idx = {ch: i+1 for i, ch in enumerate(bengali_chars)} # tokens = [char_to_idx.get(ch, 0) for ch in text if ch.strip() != ''] # return tokens # class TokenEmbedding(nn.Module): # def __init__(self, vocab_size, d_model): # super().__init__() # self.embedding = nn.Embedding(vocab_size + 1, d_model, padding_idx=0) # def forward(self, tokens): # embedded = self.embedding(tokens) # return embedded.transpose(1, 2) # def mel_to_audio(mel_spectrogram, n_iter=60, sr=22050, n_fft=1024, hop_length=256): # mel_power = librosa.db_to_power(mel_spectrogram) # S = librosa.feature.inverse.mel_to_stft(mel_power, sr=sr, n_fft=n_fft) # audio = librosa.griffinlim(S, n_iter=n_iter, hop_length=hop_length) # return audio # d_model = 512 # vocab_size = 50 # embedding = TokenEmbedding(vocab_size=vocab_size, d_model=d_model) # attention_model = RelativePositionMultiHeadAttention(d_model=d_model, num_heads=8) # embedding.eval() # attention_model.eval() # def tts_pipeline(user_text): # tokens = preprocess_text(user_text) # if len(tokens) == 0: # return None # input_tensor = torch.tensor(tokens).unsqueeze(0) # with torch.no_grad(): # embedded = embedding(input_tensor) # output = attention_model(embedded) # mel = output.squeeze(0).cpu().numpy() # mel = mel[:80, :] # mel_db = 20 * np.log10(np.maximum(mel, 1e-5)) # audio = mel_to_audio(mel_db) # return (22050, audio.astype(np.float32)) # import numpy as np # import gradio as gr # iface = gr.Interface( # fn=tts_pipeline, # inputs=gr.Textbox(label="Enter Bengali Text"), # outputs=gr.Audio(label="Generated Speech"), # title="Bangladeshi Bengali TTS Demo" # ) # iface.launch() # import subprocess # import os # import gradio as gr # MODEL_PATH = "bangla_tts_female/pytorch_model.pth" # CONFIG_PATH = "bangla_tts_female/config.json" # def tts_from_cli(text): # if not text.strip(): # return None # safe_text = text.replace('"', '\\"') # output_wav = "output.wav" # cmd = [ # "tts", # "--model_path", MODEL_PATH, # "--config_path", CONFIG_PATH, # "--text", safe_text, # "--out_path", output_wav # ] # result = subprocess.run(cmd, capture_output=True, text=True) # if result.returncode != 0: # print("Error:", result.stderr) # return None # if os.path.exists(output_wav): # return output_wav # else: # print("Output audio not found") # return None # iface = gr.Interface( # fn=tts_from_cli, # inputs=gr.Textbox(lines=2, placeholder="Enter Bengali text here..."), # outputs=gr.Audio(type="filepath"), # title="Bengali TTS with CLI Model" # ) # iface.launch()