import pandas as pd import numpy as np from sklearn.ensemble import RandomForestClassifier from sklearn.preprocessing import LabelEncoder, StandardScaler from sklearn.model_selection import train_test_split from sklearn.metrics import accuracy_score, classification_report import joblib import re from typing import List, Dict, Tuple from database_connection import DatabaseConnection import os class CourseRecommender: def __init__(self): self.model = RandomForestClassifier(n_estimators=100, random_state=42) self.label_encoders = {} self.scaler = StandardScaler() self.db_connection = DatabaseConnection() self.is_trained = False self._available_courses = None # Cache for available courses self._last_data_count = 0 # Track data count for auto-retraining self._auto_retrain_threshold = 5 # Retrain every 5 new feedbacks self._min_samples_for_training = 10 # Minimum samples needed to train self._local_feedback = [] # Store feedback locally for learning def preprocess_data(self, df: pd.DataFrame) -> pd.DataFrame: """Preprocess the data for training""" df_processed = df.copy() # Normalize strand to uppercase for case-insensitive matching if 'strand' in df_processed.columns: df_processed['strand'] = df_processed['strand'].astype(str).str.upper() # Encode categorical variables categorical_columns = ['strand', 'hobbies'] for col in categorical_columns: if col not in self.label_encoders: self.label_encoders[col] = LabelEncoder() df_processed[col] = self.label_encoders[col].fit_transform(df_processed[col].astype(str)) else: # Handle unseen labels by using a default value try: df_processed[col] = self.label_encoders[col].transform(df_processed[col].astype(str)) except ValueError: # For unseen labels, use the most common label from training most_common = self.label_encoders[col].classes_[0] df_processed[col] = self.label_encoders[col].transform([most_common] * len(df_processed)) return df_processed def extract_hobbies_features(self, hobbies: str) -> Dict[str, int]: """Extract features from hobbies string""" if not hobbies or pd.isna(hobbies): hobbies = "" hobbies_lower = str(hobbies).lower() # Define hobby categories hobby_categories = { 'technical': ['programming', 'coding', 'computer', 'technology', 'software', 'gaming', 'electronics', 'math', 'mathematics'], 'creative': ['art', 'music', 'writing', 'design', 'photography', 'dancing', 'drawing', 'literature'], 'academic': ['reading', 'mathematics', 'science', 'research', 'studying', 'history', 'literature', 'books'], 'physical': ['sports', 'fitness', 'exercise', 'running', 'swimming', 'basketball', 'football', 'gym'], 'social': ['traveling', 'cooking', 'volunteering', 'community', 'leadership', 'social'] } features = {} for category, keywords in hobby_categories.items(): features[f'hobby_{category}'] = sum(1 for keyword in keywords if keyword in hobbies_lower) return features def prepare_features(self, df: pd.DataFrame) -> pd.DataFrame: """Prepare features for the model""" df_features = df.copy() # Extract hobby features hobby_features = [] for hobbies in df['hobbies']: features = self.extract_hobbies_features(hobbies) hobby_features.append(features) hobby_df = pd.DataFrame(hobby_features) df_features = pd.concat([df_features, hobby_df], axis=1) # Normalize GWA to 0-1 scale (75-100 -> 0-1) df_features['gwa_normalized'] = (df_features['gwa'] - 75) / 25 # Create stanine bins df_features['stanine_high'] = (df_features['stanine'] >= 7).astype(int) df_features['stanine_medium'] = ((df_features['stanine'] >= 4) & (df_features['stanine'] < 7)).astype(int) df_features['stanine_low'] = (df_features['stanine'] < 4).astype(int) return df_features def get_available_courses(self): """Get available courses with caching""" if self._available_courses is None: # Try to get courses from /courses endpoint first courses = self.db_connection.get_available_courses() if not courses: print("No courses found in /courses endpoint. Using courses from student feedback data...") # Get courses from student feedback data df_temp = self.db_connection.get_student_feedback_counts() if df_temp.empty: raise ValueError("No courses available in /courses endpoint and no student feedback data found.") courses = df_temp['course'].unique().tolist() print(f"Using courses from student feedback: {courses}") self._available_courses = courses print(f"Available courses cached: {len(courses)} courses") return self._available_courses def refresh_courses_cache(self): """Refresh the available courses cache""" self._available_courses = None return self.get_available_courses() def get_current_data_count(self): """Get current number of feedback records in database""" try: df = self.db_connection.get_student_feedback_counts() return len(df) if not df.empty else 0 except: return 0 def check_and_auto_retrain(self): """Check if enough new data exists and auto-retrain if needed""" # Use local feedback count for auto-retraining local_feedback_count = len(self._local_feedback) if local_feedback_count < self._min_samples_for_training: print(f"Not enough local feedback for training: {local_feedback_count} < {self._min_samples_for_training}") return False if local_feedback_count - self._last_data_count >= self._auto_retrain_threshold: print(f"Auto-retraining triggered: {local_feedback_count - self._last_data_count} new local feedbacks") try: accuracy = self.train_model(use_database=True) self._last_data_count = local_feedback_count print(f"Auto-retraining completed with accuracy: {accuracy:.3f}") return True except Exception as e: print(f"Auto-retraining failed: {e}") return False return False def add_feedback_with_learning(self, course: str, stanine: int, gwa: float, strand: str, rating: str, hobbies: str) -> bool: """Add feedback to database and trigger auto-learning if needed""" # Add feedback to database success = self.db_connection.add_feedback(course, stanine, gwa, strand, rating, hobbies) if success: print(f"Feedback added for course: {course}") # Store feedback locally for learning (since API has issues) feedback_record = { 'course': course, 'stanine': stanine, 'gwa': gwa, 'strand': strand, 'rating': rating, 'hobbies': hobbies, 'count': 1 } self._local_feedback.append(feedback_record) print(f"Feedback stored locally for learning: {len(self._local_feedback)} total") # Check if we should auto-retrain self.check_and_auto_retrain() return success def configure_auto_learning(self, retrain_threshold=5, min_samples=10): """Configure auto-learning parameters""" self._auto_retrain_threshold = retrain_threshold self._min_samples_for_training = min_samples print(f"Auto-learning configured: retrain every {retrain_threshold} new feedbacks, minimum {min_samples} samples") def get_learning_status(self): """Get current learning status""" current_count = self.get_current_data_count() return { 'current_data_count': current_count, 'last_trained_count': self._last_data_count, 'new_feedbacks': current_count - self._last_data_count, 'retrain_threshold': self._auto_retrain_threshold, 'min_samples': self._min_samples_for_training, 'ready_for_retrain': (current_count - self._last_data_count) >= self._auto_retrain_threshold } def train_model(self, use_database: bool = True): """Train the recommendation model using student feedback data""" print("Loading training data from student feedback...") # Get available courses with caching available_courses = self.get_available_courses() # Get training data from student feedback df = self.db_connection.get_student_feedback_counts() if df.empty: raise ValueError("No student feedback data available for training") print(f"Student feedback data: {len(df)} samples") print(f"Feedback courses: {df['course'].unique().tolist()}") # Filter training data to only include courses that are available in /courses df_filtered = df[df['course'].isin(available_courses)] if df_filtered.empty: raise ValueError("No training data available for courses that exist in /courses endpoint") print(f"Training with {len(df_filtered)} samples (filtered to available courses)") # Clean and prepare data df_clean = df_filtered.copy() # Convert data types df_clean['stanine'] = pd.to_numeric(df_clean['stanine'], errors='coerce') df_clean['gwa'] = pd.to_numeric(df_clean['gwa'], errors='coerce') df_clean['rating'] = df_clean['rating'].astype(str) # Remove rows with invalid data df_clean = df_clean.dropna(subset=['stanine', 'gwa']) if df_clean.empty: raise ValueError("No valid training data after cleaning") print(f"Training with {len(df_clean)} clean samples") # Prepare features df_features = self.prepare_features(df_clean) df_processed = self.preprocess_data(df_features) # Select features for training feature_columns = [ 'stanine', 'gwa_normalized', 'strand', 'hobby_technical', 'hobby_creative', 'hobby_academic', 'hobby_physical', 'hobby_social', 'stanine_high', 'stanine_medium', 'stanine_low' ] X = df_processed[feature_columns] y = df_processed['course'] # Split data X_train, X_test, y_train, y_test = train_test_split( X, y, test_size=0.2, random_state=42, stratify=y ) # Scale features X_train_scaled = self.scaler.fit_transform(X_train) X_test_scaled = self.scaler.transform(X_test) # Train model self.model.fit(X_train_scaled, y_train) # Evaluate y_pred = self.model.predict(X_test_scaled) accuracy = accuracy_score(y_test, y_pred) print(f"Model accuracy: {accuracy:.3f}") self.is_trained = True # Save model self.save_model() # Update data count tracking self._last_data_count = len(df_clean) return accuracy def predict_course(self, stanine: int, gwa: float, strand: str, hobbies: str) -> List[Tuple[str, float]]: """Predict course recommendations using student feedback data and available courses""" if not self.is_trained: self.load_model() if not self.is_trained: raise ValueError("Model not trained. Please train the model first.") # Get available courses with caching available_courses = self.get_available_courses() # Create input data input_data = pd.DataFrame({ 'stanine': [stanine], 'gwa': [gwa], 'strand': [strand], 'hobbies': [hobbies] }) # Prepare features input_features = self.prepare_features(input_data) input_processed = self.preprocess_data(input_features) # Select same features as training feature_columns = [ 'stanine', 'gwa_normalized', 'strand', 'hobby_technical', 'hobby_creative', 'hobby_academic', 'hobby_physical', 'hobby_social', 'stanine_high', 'stanine_medium', 'stanine_low' ] X = input_processed[feature_columns] X_scaled = self.scaler.transform(X) # Get predictions with probabilities probabilities = self.model.predict_proba(X_scaled)[0] classes = self.model.classes_ # Filter recommendations to only include courses available in /courses endpoint available_recommendations = [] for i, course in enumerate(classes): if course in available_courses: available_recommendations.append((course, probabilities[i])) # Sort by probability and get top 5 available_recommendations.sort(key=lambda x: x[1], reverse=True) recommendations = available_recommendations[:5] return recommendations def save_model(self): """Save the trained model and encoders""" os.makedirs('models', exist_ok=True) joblib.dump(self.model, 'models/course_recommender_model.pkl') joblib.dump(self.label_encoders, 'models/label_encoders.pkl') joblib.dump(self.scaler, 'models/scaler.pkl') print("Model saved successfully") def load_model(self): """Load the trained model and encoders""" try: self.model = joblib.load('models/course_recommender_model.pkl') self.label_encoders = joblib.load('models/label_encoders.pkl') self.scaler = joblib.load('models/scaler.pkl') self.is_trained = True # Initialize data count tracking self._last_data_count = self.get_current_data_count() print("Model loaded successfully") except FileNotFoundError: print("No saved model found. Please train the model first.") self.is_trained = False def add_feedback(self, course: str, stanine: int, gwa: float, strand: str, rating: int, hobbies: str) -> bool: """Add user feedback to the database""" return self.db_connection.add_feedback(course, stanine, gwa, strand, rating, hobbies)