Spaces:
Sleeping
Sleeping
| import os | |
| import torch | |
| import torch.nn as nn | |
| import torch.nn.functional as F | |
| import joblib | |
| from transformers import AutoModel, AutoTokenizer | |
| from huggingface_hub import hf_hub_download | |
| from torch.utils.data import Dataset, DataLoader | |
| from config import CONFIG | |
| # Import sklearn untuk encoding label sentimen | |
| try: | |
| from sklearn.preprocessing import LabelEncoder | |
| except ImportError: | |
| print("⚠️ scikit-learn tidak terinstall, mencoba install...") | |
| import subprocess | |
| subprocess.check_call(['pip', 'install', 'scikit-learn']) | |
| from sklearn.preprocessing import LabelEncoder | |
| class ABSADataset(Dataset): | |
| """ | |
| Custom Dataset untuk ABSA batch processing. | |
| Menggabungkan setiap kalimat dengan setiap aspek untuk prediksi. | |
| """ | |
| def __init__(self, sentences, aspects, tokenizer, max_len): | |
| """ | |
| Args: | |
| sentences (list): List dari kalimat input. | |
| aspects (list): List dari aspek yang ingin diprediksi. | |
| tokenizer (AutoTokenizer): Tokenizer IndoBERT. | |
| max_len (int): Panjang maksimum token. | |
| """ | |
| self.sentences = sentences | |
| self.aspects = aspects | |
| self.tokenizer = tokenizer | |
| self.max_len = max_len | |
| def __len__(self): | |
| # Total kombinasi = jumlah kalimat × jumlah aspek | |
| return len(self.sentences) * len(self.aspects) | |
| def __getitem__(self, idx): | |
| """ | |
| Mengembalikan encoded input untuk satu pasangan kalimat-aspek. | |
| """ | |
| # Hitung indeks kalimat dan aspek dari indeks global | |
| sent_idx = idx // len(self.aspects) | |
| aspect_idx = idx % len(self.aspects) | |
| sentence = self.sentences[sent_idx] | |
| aspect = self.aspects[aspect_idx] | |
| # Gabungkan aspek dan kalimat dengan format khusus | |
| combined = f"[ASPEK] {aspect} [TEXT] {sentence}" | |
| # Tokenisasi dan encoding text | |
| encoded = self.tokenizer.encode_plus( | |
| combined, | |
| add_special_tokens=True, # Tambah token [CLS] dan [SEP] | |
| padding="max_length", # Padding ke max_length | |
| max_length=self.max_len, | |
| truncation=True, # Potong jika melebihi max_length | |
| return_attention_mask=True, | |
| return_tensors="pt", | |
| ) | |
| return { | |
| 'input_ids': encoded['input_ids'].flatten(), | |
| 'attention_mask': encoded['attention_mask'].flatten(), | |
| 'sent_idx': sent_idx, # Simpan indeks untuk mapping hasil | |
| 'aspect_idx': aspect_idx | |
| } | |
| def load_model_and_tokenizer(): | |
| """ | |
| Memuat model IndoBERT ABSA, tokenizer, dan label encoder. | |
| Jika file tidak ada lokal, akan download dari HuggingFace. | |
| Returns: | |
| model (nn.Module): Model ABSA yang sudah diload. | |
| tokenizer (AutoTokenizer): Tokenizer untuk IndoBERT. | |
| label_encoder (LabelEncoder): Encoder untuk label sentimen. | |
| device (torch.device): Device (cuda/cpu) yang digunakan. | |
| """ | |
| # Setup path direktori model dan tokenizer | |
| base_path = os.path.abspath(os.path.dirname(__file__)) | |
| model_dir = os.path.join(base_path, "assets", "model") | |
| tokenizer_dir = os.path.join(base_path, "assets", "tokenizer") | |
| # Buat direktori jika belum ada | |
| os.makedirs(model_dir, exist_ok=True) | |
| os.makedirs(tokenizer_dir, exist_ok=True) | |
| model_path = os.path.join(model_dir, "indobert_absa_model.pth") | |
| label_path = os.path.join(model_dir, "label_encoder.joblib") | |
| # === DOWNLOAD MODEL JIKA BELUM ADA === | |
| if not os.path.exists(model_path): | |
| print("📥 Downloading model dari HuggingFace...") | |
| try: | |
| # Download dari HuggingFace Hub | |
| downloaded_model = hf_hub_download( | |
| repo_id=CONFIG["hf_model_repo"], | |
| filename="indobert_absa_model.pth", | |
| subfolder=CONFIG["hf_model_subfolder"], | |
| cache_dir=None | |
| ) | |
| # Copy ke direktori lokal | |
| import shutil | |
| shutil.copy(downloaded_model, model_path) | |
| print(f"✅ Model berhasil didownload ke {model_path}") | |
| except Exception as e: | |
| print(f"❌ Error downloading model: {e}") | |
| raise | |
| # === DOWNLOAD LABEL ENCODER JIKA BELUM ADA === | |
| if not os.path.exists(label_path): | |
| print("📥 Downloading label encoder dari HuggingFace...") | |
| try: | |
| downloaded_label = hf_hub_download( | |
| repo_id=CONFIG["hf_model_repo"], | |
| filename="label_encoder.joblib", | |
| subfolder=CONFIG["hf_model_subfolder"], | |
| cache_dir=None | |
| ) | |
| import shutil | |
| shutil.copy(downloaded_label, label_path) | |
| print(f"✅ Label encoder berhasil didownload ke {label_path}") | |
| except Exception as e: | |
| print(f"❌ Error downloading label encoder: {e}") | |
| raise | |
| # === DOWNLOAD TOKENIZER JIKA BELUM ADA === | |
| tokenizer_files = ["special_tokens_map.json", "tokenizer.json", | |
| "tokenizer_config.json", "vocab.txt"] | |
| # Cek apakah semua file tokenizer sudah ada | |
| all_tokenizer_exists = all( | |
| os.path.exists(os.path.join(tokenizer_dir, f)) for f in tokenizer_files | |
| ) | |
| if not all_tokenizer_exists: | |
| print("📥 Downloading tokenizer dari HuggingFace...") | |
| try: | |
| for file in tokenizer_files: | |
| if not os.path.exists(os.path.join(tokenizer_dir, file)): | |
| # Download setiap file tokenizer | |
| downloaded_file = hf_hub_download( | |
| repo_id=CONFIG["hf_model_repo"], | |
| filename=file, | |
| subfolder=CONFIG["hf_tokenizer_subfolder"], | |
| cache_dir=None | |
| ) | |
| import shutil | |
| shutil.copy(downloaded_file, os.path.join( | |
| tokenizer_dir, file)) | |
| print(f"✅ Tokenizer berhasil didownload ke {tokenizer_dir}") | |
| except Exception as e: | |
| print(f"❌ Error downloading tokenizer: {e}") | |
| # Fallback ke pretrained jika gagal | |
| pass | |
| # === LOAD TOKENIZER === | |
| try: | |
| # Coba load dari direktori lokal | |
| tokenizer = AutoTokenizer.from_pretrained(tokenizer_dir) | |
| print("✅ Tokenizer loaded dari lokal") | |
| except Exception as e: | |
| # Fallback: load dari pretrained model | |
| print( | |
| f"⚠️ Gagal load tokenizer lokal ({e}), menggunakan pretrained...") | |
| tokenizer = AutoTokenizer.from_pretrained(CONFIG["model_name"]) | |
| # === LOAD LABEL ENCODER === | |
| try: | |
| label_encoder = joblib.load(label_path) | |
| print("✅ Label encoder loaded successfully") | |
| except Exception as e: | |
| print(f"❌ Error loading label encoder: {e}") | |
| raise RuntimeError( | |
| f"Gagal load label_encoder.joblib. " | |
| f"Pastikan file valid dan scikit-learn terinstall. Error: {e}" | |
| ) | |
| # === DEFINISI MODEL ARCHITECTURE === | |
| class IndoBertForABSA(nn.Module): | |
| """ | |
| Model klasifikasi aspek berbasis IndoBERT untuk ABSA. | |
| Arsitektur: IndoBERT -> LayerNorm -> Dropout -> Linear Classifier | |
| """ | |
| def __init__(self, num_labels): | |
| super().__init__() | |
| # Load pretrained IndoBERT | |
| self.bert = AutoModel.from_pretrained( | |
| CONFIG["model_name"], trust_remote_code=True, use_safetensors=True | |
| ) | |
| # Layer normalisasi untuk stabilitas training | |
| self.norm = nn.LayerNorm(self.bert.config.hidden_size) | |
| # Dropout untuk mencegah overfitting | |
| self.dropout = nn.Dropout(CONFIG["dropout_rate"]) | |
| # Linear layer untuk klasifikasi sentimen | |
| self.classifier = nn.Linear( | |
| self.bert.config.hidden_size, num_labels) | |
| def forward(self, input_ids, attention_mask): | |
| """ | |
| Forward pass untuk model ABSA. | |
| Args: | |
| input_ids (torch.Tensor): Tensor input token IDs. | |
| attention_mask (torch.Tensor): Tensor mask perhatian. | |
| Returns: | |
| torch.Tensor: Logit prediksi. | |
| """ | |
| # Dapatkan output dari BERT | |
| output = self.bert(input_ids=input_ids, | |
| attention_mask=attention_mask) | |
| # Gunakan pooler output (representasi [CLS] token) | |
| pooled = output.pooler_output | |
| # Normalisasi | |
| normed = self.norm(pooled) | |
| # Dropout | |
| dropped = self.dropout(normed) | |
| # Klasifikasi | |
| return self.classifier(dropped) | |
| # === SETUP DEVICE DAN LOAD MODEL === | |
| # Gunakan GPU jika tersedia, jika tidak gunakan CPU | |
| device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
| # Inisialisasi model dengan jumlah label dari label encoder | |
| model = IndoBertForABSA(num_labels=len(label_encoder.classes_)) | |
| try: | |
| # Load weights model yang sudah di-training | |
| model.load_state_dict(torch.load(model_path, map_location=device)) | |
| print("✅ Model state dict loaded successfully") | |
| except Exception as e: | |
| print(f"❌ Error loading model state dict: {e}") | |
| raise | |
| # Pindahkan model ke device (GPU/CPU) | |
| model.to(device) | |
| # Set model ke mode evaluasi (nonaktifkan dropout, dll) | |
| model.eval() | |
| return model, tokenizer, label_encoder, device | |
| def predict_multi_aspect(model, tokenizer, sentence, aspek_list, label_encoder, device, max_len): | |
| """ | |
| Melakukan prediksi sentimen untuk setiap aspek pada satu kalimat. | |
| Proses prediksi dilakukan satu per satu (non-batch). | |
| Args: | |
| model (nn.Module): Model ABSA yang sudah diload. | |
| tokenizer (AutoTokenizer): Tokenizer IndoBERT. | |
| sentence (str): Kalimat input. | |
| aspek_list (list): Daftar aspek yang ingin diprediksi. | |
| label_encoder (LabelEncoder): Encoder label. | |
| device (torch.device): Device (cuda/cpu). | |
| max_len (int): Panjang maksimum token. | |
| Returns: | |
| dict: Hasil prediksi berupa {aspek: label_sentimen}. | |
| """ | |
| results = {} | |
| # Loop untuk setiap aspek | |
| for aspek in aspek_list: | |
| # Gabungkan aspek dan kalimat | |
| combined = f"[ASPEK] {aspek} [TEXT] {sentence}" | |
| # Tokenisasi input | |
| encoded = tokenizer.encode_plus( | |
| combined, | |
| add_special_tokens=True, | |
| padding="max_length", | |
| max_length=max_len, | |
| truncation=True, | |
| return_attention_mask=True, | |
| return_tensors="pt", | |
| ) | |
| # Pindahkan tensor ke device | |
| input_ids = encoded["input_ids"].to(device) | |
| attention_mask = encoded["attention_mask"].to(device) | |
| # Prediksi tanpa menghitung gradient (inference mode) | |
| with torch.no_grad(): | |
| # Forward pass | |
| outputs = model(input_ids, attention_mask) | |
| # Konversi logits ke probabilitas dengan softmax | |
| probs = F.softmax(outputs, dim=1).squeeze() | |
| # Ambil indeks dengan probabilitas tertinggi | |
| idx = torch.argmax(probs).item() | |
| # Konversi indeks ke label sentimen | |
| label = label_encoder.inverse_transform([idx])[0] | |
| # Simpan hasil | |
| results[aspek] = label | |
| return results | |
| def predict_multi_aspect_batch(model, tokenizer, sentences, aspek_list, label_encoder, device, max_len, batch_size=None): | |
| """ | |
| Melakukan prediksi sentimen untuk setiap aspek pada multiple kalimat menggunakan batch processing. | |
| Lebih efisien untuk memproses banyak kalimat sekaligus. | |
| Args: | |
| model (nn.Module): Model ABSA yang sudah diload. | |
| tokenizer (AutoTokenizer): Tokenizer IndoBERT. | |
| sentences (list): List kalimat input. | |
| aspek_list (list): Daftar aspek yang ingin diprediksi. | |
| label_encoder (LabelEncoder): Encoder label. | |
| device (torch.device): Device (cuda/cpu). | |
| max_len (int): Panjang maksimum token. | |
| batch_size (int, optional): Ukuran batch. Jika None, gunakan dari CONFIG. | |
| Returns: | |
| list: List of dict hasil prediksi [{aspek: label_sentimen}, ...]. | |
| """ | |
| # Set batch size dari CONFIG jika tidak diberikan | |
| if batch_size is None: | |
| batch_size = CONFIG.get("batch_size", 32) | |
| # === BUAT DATASET DAN DATALOADER === | |
| # Dataset akan membuat kombinasi semua kalimat × semua aspek | |
| dataset = ABSADataset(sentences, aspek_list, tokenizer, max_len) | |
| dataloader = DataLoader( | |
| dataset, | |
| batch_size=batch_size, # Process dalam batch untuk efisiensi | |
| shuffle=False, # Jangan shuffle untuk maintain urutan | |
| num_workers=CONFIG.get("num_workers", 0) | |
| ) | |
| # === INISIALISASI CONTAINER HASIL === | |
| num_sentences = len(sentences) | |
| num_aspects = len(aspek_list) | |
| # Buat matrix untuk menyimpan prediksi [num_sentences x num_aspects] | |
| all_predictions = [[None] * num_aspects for _ in range(num_sentences)] | |
| # === BATCH PREDICTION === | |
| model.eval() # Set model ke evaluation mode | |
| with torch.no_grad(): # Nonaktifkan gradient calculation | |
| for batch in dataloader: | |
| # Pindahkan batch ke device | |
| input_ids = batch['input_ids'].to(device) | |
| attention_mask = batch['attention_mask'].to(device) | |
| sent_indices = batch['sent_idx'].numpy() | |
| aspect_indices = batch['aspect_idx'].numpy() | |
| # Forward pass untuk seluruh batch | |
| outputs = model(input_ids, attention_mask) | |
| # Konversi logits ke probabilitas | |
| probs = F.softmax(outputs, dim=1) | |
| # Ambil indeks prediksi tertinggi | |
| pred_indices = torch.argmax(probs, dim=1).cpu().numpy() | |
| # Konversi indeks ke label sentimen | |
| labels = label_encoder.inverse_transform(pred_indices) | |
| # Simpan hasil ke matrix sesuai indeks aslinya | |
| for i, (sent_idx, aspect_idx, label) in enumerate(zip(sent_indices, aspect_indices, labels)): | |
| all_predictions[sent_idx][aspect_idx] = label | |
| # === KONVERSI KE FORMAT DICTIONARY === | |
| results = [] | |
| for predictions in all_predictions: | |
| # Buat dict {aspek: label} untuk setiap kalimat | |
| result_dict = {aspek: label for aspek, | |
| label in zip(aspek_list, predictions)} | |
| results.append(result_dict) | |
| return results | |