# -*- coding: utf-8 -*- """ Skrypt do masowego przetwarzania plików parquet w celu klasyfikacji jakości tekstu. Ten moduł jest przeznaczony do wydajnej analizy dużych zbiorów danych. Skanuje folder wejściowy w poszukiwaniu plików .parquet, przetwarza każdy z nich równolegle z użyciem wielu procesów (`multiprocessing`), a następnie zapisuje wyniki do nowego pliku w folderze wyjściowym, zachowując oryginalną strukturę danych i dodając wyniki klasyfikacji. """ import os import glob import time import pickle import joblib import pandas as pd import json import numpy as np from tqdm import tqdm from typing import List from text_analyzer.analyzer import TextAnalyzer from text_analyzer import constants # --- Ładowanie modeli i konfiguracja --- with open('models/scaler.pkl', 'rb') as f: scaler = pickle.load(f) classifier = joblib.load("models/model.joblib") text_analyzer = TextAnalyzer() batch_size = 10 class NumpyJSONEncoder(json.JSONEncoder): """ Specjalny enkoder JSON do obsługi typów danych z NumPy, które nie są domyślnie serializowalne. """ def default(self, obj): if isinstance(obj, np.integer): return int(obj) if isinstance(obj, np.floating): return float(obj) if isinstance(obj, np.ndarray): return obj.tolist() return super(NumpyJSONEncoder, self).default(obj) # --- Definicje funkcji --- def predict_batch(texts: List[str], analyzer: TextAnalyzer, scaler_model, classifier_model) -> List[tuple[str | None, float | None]]: """ Przetwarza całą listę tekstów wsadowo i zwraca listę predykcji. """ all_features = [] # Krok 1: Ekstrakcja cech dla wszystkich tekstów feature_generator = analyzer.analyze_batch(texts, batch_size=batch_size) for features_dict in tqdm(feature_generator, total=len(texts), desc="Analiza cech"): ordered_features = [features_dict.get(fname, 0.0) for fname in constants.COLUMN_ORDER] all_features.append(ordered_features) if not all_features: return [] # Krok 2: Przygotowanie i skalowanie wszystkich wektorów naraz features_df = pd.DataFrame(all_features, columns=constants.COLUMN_ORDER) features_scaled = scaler_model.transform(features_df) # Krok 3: Predykcja dla całej paczki pred_probas = classifier_model.predict_proba(features_scaled) # Krok 4: Przetworzenie wyników results = [] labels = ["LOW", "MEDIUM", "HIGH"] for single_pred_proba in pred_probas: category_prob = { label: prob for label, prob in zip(labels, single_pred_proba) } # Sortujemy, aby znaleźć kategorię z najwyższym prawdopodobieństwem sorted_category_prob = sorted(category_prob.items(), key=lambda item: item[1], reverse=True) most_probable_category, confidence = sorted_category_prob[0] results.append((most_probable_category, round(float(confidence) * 100, 2))) return results def process_parquet_file(input_file: str, output_file: str): """ Orkiestruje proces przetwarzania pojedynczego pliku .parquet wsadowo. Wczytuje plik, przetwarza kolumnę 'text', a następnie dopisuje wynikowe kolumny 'quality_ai' i 'confidence' do nowego pliku Parquet. """ try: # Krok 1: Wczytaj cały plik Parquet do ramki danych pandas df = pd.read_parquet(input_file) except Exception as e: print(f"Nie udało się wczytać pliku {input_file}. Błąd: {e}") return # Sprawdzenie, czy kolumna 'text' istnieje if 'text' not in df.columns: print(f"Błąd: W pliku {input_file} brakuje wymaganej kolumny 'text'.") return # Krok 2: Przygotuj dane do przetwarzania wsadowego texts_to_process = df['text'].tolist() print(f"Wczytano {len(texts_to_process)} wierszy. Rozpoczynam przetwarzanie wsadowe...") # Krok 3: Wywołaj funkcję wsadową (ta część pozostaje bez zmian) # Zakładamy, że predict_batch zwraca listę tuple: [(kategoria, pewność), ...] results = predict_batch(texts_to_process, text_analyzer, scaler, classifier) # Krok 4: Dodaj wyniki jako nowe kolumny do ramki danych categories = [res[0] for res in results] confidences = [res[1] for res in results] df['quality_ai'] = categories df['confidence'] = confidences # Krok 5: Zapisz zmodyfikowaną ramkę danych do nowego pliku Parquet try: df.to_parquet(output_file, index=False) print(df.head(10)) print(f"Pomyślnie zapisano przetworzone dane do pliku {output_file}") except Exception as e: print(f"Nie udało się zapisać pliku {output_file}. Błąd: {e}") # --- Główny blok wykonawczy --- if __name__ == '__main__': print("Inicjalizacja skryptu przetwarzania wsadowego...") INPUT_FOLDER = 'input_parquet' OUTPUT_FOLDER = 'output' os.makedirs(OUTPUT_FOLDER, exist_ok=True) # Skanowanie plików parquet_files = glob.glob(os.path.join(INPUT_FOLDER, '*.parquet')) for file_path in parquet_files: start_time = time.time() output_file = os.path.join(OUTPUT_FOLDER, os.path.basename(file_path)) if os.path.exists(output_file): print(f"POMIJAM - plik już istnieje: {output_file}") continue print(f"\n--- Przetwarzanie pliku: {file_path} ---") process_parquet_file(file_path, output_file) end_time = time.time() print(f"Processing time: {end_time - start_time:.4f} seconds") print("\nWszystkie pliki zostały przetworzone!")