Spaces:
Running
Running
| import os | |
| import csv | |
| import json | |
| import torch | |
| import shutil | |
| import requests | |
| import textwrap | |
| import numpy as np | |
| import pandas as pd | |
| import streamlit as st | |
| from tqdm.auto import tqdm | |
| from collections import Counter | |
| from tokenizers import Tokenizer | |
| import plotly.graph_objects as go | |
| from huggingface_hub import whoami, HfApi | |
| from transformers import AutoModel, AutoTokenizer, PreTrainedTokenizerFast, pipeline | |
| LANGUAGES = { | |
| "french": {"emoji":"🇫🇷", "nllb_code":"fra_Latn", "hf_code":"fr"}, | |
| "english": {"emoji":"🇬🇧", "nllb_code":"eng_Latn", "hf_code":"en"}, | |
| "german": {"emoji":"🇩🇪", "nllb_code":"deu_Latn", "hf_code":"de"}, | |
| "italian": {"emoji":"🇮🇹", "nllb_code":"ita_Latn", "hf_code":"it"}, | |
| "spanish": {"emoji":"🇪🇸", "nllb_code":"spa_Latn", "hf_code":"es"}, | |
| "portuguese": {"emoji":"🇵🇹", "nllb_code":"por_Latn", "hf_code":"pt"} | |
| } | |
| MODELS = [ | |
| "intfloat/multilingual-e5-small", | |
| "intfloat/multilingual-e5-base", | |
| "intfloat/multilingual-e5-large", | |
| "BAAI/bge-m3", | |
| "Alibaba-NLP/gte-multilingual-base", | |
| #"jinaai/jina-embeddings-v3", # TODO: uses ParametrizedEmbedding | |
| ] | |
| def estimate_pruned_vocabulary(tokenizer: PreTrainedTokenizerFast, language: str): | |
| """ | |
| Estimate the most common tokens in the language. You should first download the 1M sentences dataset for the desired language. | |
| Source: https://wortschatz.uni-leipzig.de/en/download/English | |
| """ | |
| sentences_file = f'data.nosync/{language}_news_2020_1M-sentences.txt' | |
| if os.path.exists(sentences_file): | |
| df = pd.read_csv(sentences_file, sep='\t', header=None, quoting=csv.QUOTE_NONE, names=['id', 'text']) | |
| counter = Counter(tokenizer.all_special_tokens) | |
| counter.update(tok for t in tqdm(df.text) for tok in tokenizer.tokenize(t)) | |
| with open(f"data.nosync/{language}_filtered_tokens.txt", "w") as f: | |
| f.write("\n".join(map(str, set(counter)))) | |
| else: | |
| raise FileNotFoundError | |
| def get_pruned_vocabulary(language: str): | |
| filtered_tokens_file = f"data.nosync/{language}_filtered_tokens.txt" | |
| if os.path.exists(filtered_tokens_file): | |
| with open(filtered_tokens_file, "r") as f: | |
| return set(f.read().splitlines()) | |
| else: | |
| raise FileNotFoundError(f"No filtered tokens file found for language {language}. Please run `estimate_pruned_vocabulary` first.") | |
| def load_model_and_tokenizer(model_name: str): | |
| model = AutoModel.from_pretrained(model_name, trust_remote_code=True) | |
| tokenizer = AutoTokenizer.from_pretrained(model_name, trust_remote_code=True, use_fast=True) | |
| return model, tokenizer | |
| def count_parameters(model, layer_name: str = None): | |
| return sum(p.numel() for name, p in model.named_parameters() if layer_name is None or name.startswith(layer_name)) | |
| def get_test_sentence(target_lang: str, source_lang: str = "eng_Latn"): | |
| text = """ | |
| Alan Mathison Turing (23 June 1912 - 7 June 1954) was an English mathematician, | |
| computer scientist, logician, cryptanalyst, philosopher and theoretical biologist. | |
| """ | |
| if target_lang == "eng_Latn": | |
| return text | |
| model_name = "facebook/nllb-200-distilled-600M" | |
| translator = pipeline(task="translation", tokenizer=model_name, model=model_name) | |
| return translator(text, src_lang=source_lang, tgt_lang=target_lang)[0]['translation_text'] | |
| def push_to_hub(username: str, token: str, model_dir: str, private: bool = False): | |
| _ = whoami(token=token) | |
| api = HfApi(endpoint="https://huggingface.co", token=token) | |
| repo_id = f"{username}/{model_dir.split('/')[-1]}" | |
| api.create_repo(repo_id=repo_id, repo_type="model", private=private) | |
| api.upload_folder(repo_id=repo_id, folder_path=model_dir, commit_message="Upload pruned model") | |
| def prune_model(model_name: str, language: str, username: str, token: str): | |
| st.markdown(f"- Pruning the [**{model_name}**](https://huggingface.co/{model_name}) model to keep its **{language.capitalize()}** tokens only. *Let's go!*") | |
| # Load the model and its tokenizer | |
| model, tokenizer = load_model_and_tokenizer(model_name) | |
| # Calculate parameters for the original model | |
| all_params = count_parameters(model) | |
| encoder_params = count_parameters(model, layer_name="encoder") | |
| embedding_params = count_parameters(model, layer_name="embeddings") | |
| st.markdown( | |
| f"- The model has **{all_params/1e6:.1f}M** parameters, of which **{embedding_params/all_params*100:.0f}%** "+ | |
| f"(i.e., {embedding_params/1e6:.1f}M params) come from the *embedding matrix* and its {tokenizer.vocab_size} token entries. "+ | |
| f"This means that the contextualization of text sequences is actually done by a *{model.config.num_hidden_layers}-layer Transformer encoder* "+ | |
| f"with **{encoder_params/1e6:.1f}M** parameters only." | |
| ) | |
| # Estimate the most used tokens in the language. | |
| filtered_tokens = get_pruned_vocabulary(language) | |
| st.markdown( | |
| f"- {language.capitalize()} seems to only use **{len(filtered_tokens)/tokenizer.vocab_size*100:.0f}%** "+ | |
| f"of the model vocabulary (i.e., {len(filtered_tokens)} out of the original {tokenizer.vocab_size} tokens)." | |
| ) | |
| st.markdown("- *Updating the tokenizer...*") | |
| outdir = f"{language}-{model_name.split('/')[-1]}" | |
| # Export the tokenizer to a JSON string and access its vocabulary (list of lists: [[token, score], ...]) | |
| tokenizer_json = json.loads(tokenizer.backend_tokenizer.to_str()) | |
| original_vocab = tokenizer_json['model']['vocab'] | |
| # Build a mapping from tokens to their original IDs | |
| original_token_to_id = {entry[0]: idx for idx, entry in enumerate(original_vocab)} | |
| # Filter out the tokens to remove and reassign new IDs | |
| new_id = 0 | |
| new_token_to_id = {} | |
| new_id_to_original_id = {} | |
| filtered_vocab_entries = [] | |
| for token, score in original_vocab: | |
| if token in filtered_tokens: | |
| filtered_vocab_entries.append([token, score]) | |
| new_token_to_id[token] = new_id | |
| new_id_to_original_id[new_id] = original_token_to_id[token] | |
| new_id += 1 | |
| # Update the vocab in the tokenizer JSON and rebuild the tokenizer from the modified JSON | |
| tokenizer_json['model']['vocab'] = filtered_vocab_entries | |
| new_backend_tokenizer = Tokenizer.from_str(json.dumps(tokenizer_json)) | |
| # Create a new tokenizer instance and save it | |
| new_tokenizer = PreTrainedTokenizerFast(tokenizer_object=new_backend_tokenizer, **tokenizer.init_kwargs) | |
| new_tokenizer.save_pretrained(outdir) | |
| st.markdown("- *Updating the embedding matrix...*") | |
| new_model = AutoModel.from_pretrained(model_name, trust_remote_code=True) | |
| # Create a new embedding matrix and map the original vectors to their new IDs | |
| original_embeddings = new_model.get_input_embeddings().weight.data | |
| new_embeddings = torch.nn.Embedding( | |
| num_embeddings=new_tokenizer.vocab_size, | |
| embedding_dim=model.config.hidden_size, | |
| padding_idx=new_tokenizer.pad_token_id, | |
| ) | |
| for new_id in range(new_tokenizer.vocab_size): | |
| original_id = new_id_to_original_id.get(new_id) | |
| new_embeddings.weight.data[new_id] = original_embeddings[original_id] | |
| new_model.set_input_embeddings(new_embeddings) | |
| new_model.config.vocab_size = new_tokenizer.vocab_size | |
| new_model.save_pretrained(outdir) | |
| # Test the conversion | |
| test_sentence = get_test_sentence(LANGUAGES[language]['nllb_code']) | |
| st.markdown(f"""- *Verifying everything worked as expected with the following test sentence: "{test_sentence}"*""") | |
| assert len(new_tokenizer) == len(filtered_tokens), f"ERROR: new tokenizer size ({len(new_tokenizer)}) != number of filtered tokens ({len(filtered_tokens)})" | |
| assert filtered_tokens == set(new_tokenizer.convert_ids_to_tokens(range(len(new_tokenizer)))), f"ERROR: The new tokenizer vocabulary doesn't match number of the filtered tokens" | |
| with torch.inference_mode(): | |
| emb1 = model(**tokenizer(test_sentence, return_tensors='pt')).last_hidden_state[:, 0][0].numpy() | |
| emb2 = new_model(**new_tokenizer(test_sentence, return_tensors='pt')).last_hidden_state[:, 0][0].numpy() | |
| diff = np.abs(emb1 - emb2).max() | |
| assert diff < 1e-6, f"ERROR: Some dimensions of the two vectors have a non negligible difference ({diff})" | |
| st.success("The conversion **succeeded**! You can verify it by looking at the output *[cls]* token embedding:") | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| st.markdown("Original model:") | |
| st.code(f"{emb1.tolist()}") | |
| with col2: | |
| st.markdown("Pruned model:") | |
| st.code(f"{emb2.tolist()}") | |
| # Show visually the result of the pruning process | |
| pruned_all_params = count_parameters(new_model) | |
| pruned_encoder_params = count_parameters(new_model, layer_name="encoder") | |
| pruned_embedding_params = count_parameters(new_model, layer_name="embeddings") | |
| st.markdown(f"The pruned model is **{pruned_all_params/all_params*100:.1f}%** of the original model size.") | |
| data = { | |
| 'Model': ['Original', 'Pruned'], | |
| 'Embedding': [embedding_params / 1e6, pruned_embedding_params / 1e6], | |
| 'Encoder': [encoder_params / 1e6, pruned_encoder_params / 1e6] | |
| } | |
| fig = go.Figure(data=[ | |
| go.Bar(name='Embedding matrix', x=data['Model'], y=data['Embedding'], text=data['Embedding'], textposition='inside', marker_color='#E5B4B4'), | |
| go.Bar(name='Transformer encoder', x=data['Model'], y=data['Encoder'], text=data['Encoder'], textposition='inside', marker_color='#7FBFE0') | |
| ]) | |
| fig.update_layout(barmode='stack', yaxis_title='# Params (M)', height=400, margin=dict(t=10, b=10)) | |
| fig.update_traces(texttemplate='%{text:.1f}M', textposition='inside', insidetextanchor='middle') | |
| st.plotly_chart(fig) | |
| # Add a README to the pruned model repo | |
| new_model_name = f"{username}/{outdir.split('/')[-1]}" | |
| readme_content = textwrap.dedent(f""" | |
| --- | |
| pipeline_tag: sentence-similarity | |
| language: {LANGUAGES[language]['hf_code']} | |
| license: mit | |
| tags: | |
| - passage-retrieval | |
| - sentence-similarity | |
| - pruned | |
| library_name: sentence-transformers | |
| base_model: {model_name} | |
| base_model_relation: pruned | |
| --- | |
| # {new_model_name.split('/')[-1]} | |
| This model is a pruned version of [{model_name}](https://huggingface.co/{model_name}) for the {language.capitalize()} language. | |
| It was created by the [Multilingual Text Embedding Model Pruner](https://huggingface.co/spaces/antoinelouis/mteb-pruner) space, | |
| which removed tokens not commonly used in {language.capitalize()} from the original multilingual model's vocabulary and adjsuted | |
| the model's embedding matrix accordingly. | |
| This pruned model should perform similarly to the original model for {language.capitalize()} language tasks, but with a much smaller | |
| memory footprint ({100 - pruned_all_params/all_params*100:.1f}% smaller). However, it may not perform well for other languages present | |
| in the original multilingual model. | |
| ## Usage | |
| You can use this model with the Transformers library: | |
| ```python | |
| from transformers import AutoModel, AutoTokenizer | |
| model_name = "{new_model_name}" | |
| model = AutoModel.from_pretrained(model_name, trust_remote_code=True) | |
| tokenizer = AutoTokenizer.from_pretrained(model_name, trust_remote_code=True, use_fast=True) | |
| ``` | |
| """) | |
| with open(os.path.join(outdir, "README.md"), "w") as f: | |
| f.write(readme_content) | |
| st.markdown("- *Pushing the pruned model to your Hugging Face account...*") | |
| push_to_hub(username, token, outdir) | |
| shutil.rmtree(outdir) | |
| st.markdown("Done! You can now load your pruned model like this:") | |
| st.code(f""" | |
| from transformers import AutoModel, AutoTokenizer | |
| model_name = "{new_model_name}" | |
| model = AutoModel.from_pretrained(model_name, trust_remote_code=True) | |
| tokenizer = AutoTokenizer.from_pretrained(model_name, trust_remote_code=True, use_fast=True) | |
| """, language="python") | |
| def main(): | |
| st.header("Multilingual Text Embedding Model Pruner") | |
| st.markdown(""" | |
| This space helps you create a smaller, language-specific version of a multilingual text embedding model. Here's what it does: | |
| 1. 🌎 Takes a popular text embedding model that was trained on many languages | |
| 2. ✂️ Trims it down to focus on just one language by removing unused tokens from its vocabulary | |
| 3. 🚀 Gives you a smaller model that works just as well for your chosen language | |
| #### Why is this useful? | |
| - 💾 Get the same performance in your language with a much smaller model size | |
| - 🌐 Great for low-resource environments with limited RAM | |
| Ready to shrink your model? Let's get started! | |
| """) | |
| model_name = st.selectbox("Choose a multilingual model", MODELS) | |
| language = st.selectbox( | |
| "Pick your target language", | |
| options=list(LANGUAGES.keys()), | |
| format_func=lambda x: f"{LANGUAGES[x]['emoji']} {x.capitalize()}" | |
| ) | |
| username = st.text_input("Your Hugging Face username", placeholder="antoinelouis") | |
| token = st.text_input("Your Hugging Face access token", type="password", placeholder="hf_xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx") | |
| if st.button("Prune Model"): | |
| if not username or not token: | |
| st.error("Your HF username and access token is required to save the pruned model on your account.") | |
| else: | |
| prune_model(model_name, language, username, token) | |
| st.markdown( | |
| """ | |
| <style> | |
| .credits { | |
| position: fixed; | |
| right: 10px; | |
| bottom: 10px; | |
| color: #888888; | |
| font-size: 11px; | |
| } | |
| </style> | |
| <div class="credits"> | |
| Credits to <a href="https://gist.github.com/avidale/44cd35bfcdaf8bedf51d97c468cc8001" target="_blank">@avidale</a> for inspiration. | |
| </div> | |
| """, | |
| unsafe_allow_html=True | |
| ) | |
| if __name__ == "__main__": | |
| main() |