Spaces:
Sleeping
Sleeping
| from fastapi import APIRouter, Depends, HTTPException, UploadFile, File, Form, Query, Path, WebSocket, WebSocketDisconnect | |
| from fastapi.responses import JSONResponse, FileResponse, StreamingResponse | |
| from fastapi.encoders import jsonable_encoder | |
| from typing import Optional, List | |
| from pydantic import BaseModel | |
| from core.security import get_current_user | |
| import sys | |
| import os | |
| import re | |
| import io | |
| import asyncio | |
| import logging | |
| import base64 | |
| import tempfile | |
| import subprocess | |
| from datetime import datetime | |
| from bson import ObjectId | |
| from bson.errors import InvalidId | |
| from pathlib import Path as PathLib | |
| # Add the parent directory to the path to import utils | |
| sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(__file__)))) | |
| # Configure Hugging Face cache directory to avoid permission issues | |
| import os | |
| if not os.getenv('HF_HOME'): | |
| os.environ['HF_HOME'] = '/tmp/huggingface_cache' | |
| if not os.getenv('TRANSFORMERS_CACHE'): | |
| os.environ['TRANSFORMERS_CACHE'] = '/tmp/huggingface_cache' | |
| if not os.getenv('HF_DATASETS_CACHE'): | |
| os.environ['HF_DATASETS_CACHE'] = '/tmp/huggingface_cache' | |
| # Create cache directory if it doesn't exist | |
| os.makedirs('/tmp/huggingface_cache', exist_ok=True) | |
| # Import TxAgent | |
| try: | |
| sys.path.append('/app/src') | |
| from src.txagent import TxAgent | |
| TXAGENT_AVAILABLE = True | |
| except ImportError as e: | |
| logging.warning(f"TxAgent not available: {e}") | |
| TXAGENT_AVAILABLE = False | |
| try: | |
| from utils import clean_text_response | |
| except ImportError: | |
| # Fallback: define the function locally if import fails | |
| def clean_text_response(text: str) -> str: | |
| import re | |
| text = re.sub(r'\n\s*\n', '\n\n', text) | |
| text = re.sub(r'[ ]+', ' ', text) | |
| return text.replace("**", "").replace("__", "").strip() | |
| try: | |
| from analysis import analyze_patient_report | |
| except ImportError: | |
| # Fallback: define a mock function if import fails | |
| def analyze_patient_report(patient_data): | |
| return {"analysis": "Mock analysis", "status": "success"} | |
| try: | |
| from voice import recognize_speech, text_to_speech, extract_text_from_pdf | |
| except ImportError: | |
| # Fallback: define mock functions if import fails | |
| def recognize_speech(audio_data): | |
| return {"transcription": "Mock transcription"} | |
| def text_to_speech(text, language="en-US"): | |
| return b"Mock audio data" | |
| def extract_text_from_pdf(pdf_data): | |
| return "Mock PDF text" | |
| try: | |
| from docx import Document | |
| except ImportError: | |
| Document = None | |
| logger = logging.getLogger(__name__) | |
| # Initialize TxAgent instance | |
| txagent_instance = None | |
| def _normalize_risk_level(risk_level): | |
| """Normalize risk level names to match expected format""" | |
| risk_level_mapping = { | |
| 'low': 'low', | |
| 'medium': 'moderate', | |
| 'moderate': 'moderate', | |
| 'high': 'high', | |
| 'severe': 'severe', | |
| 'critical': 'severe', | |
| 'none': 'none', | |
| 'unknown': 'none' | |
| } | |
| return risk_level_mapping.get(risk_level.lower(), 'none') | |
| def get_txagent(): | |
| """Get or create TxAgent instance""" | |
| global txagent_instance | |
| if txagent_instance is None and TXAGENT_AVAILABLE: | |
| try: | |
| # Try to use a more accessible model first | |
| model_name = "microsoft/DialoGPT-medium" # Fallback model | |
| rag_model_name = "sentence-transformers/all-MiniLM-L6-v2" # Fallback RAG model | |
| # Try to use the original models if possible | |
| try: | |
| # Test if we can access the original models | |
| import torch | |
| from transformers import AutoTokenizer | |
| test_tokenizer = AutoTokenizer.from_pretrained("mims-harvard/TxAgent-T1-Llama-3.1-8B", trust_remote_code=True) | |
| model_name = "mims-harvard/TxAgent-T1-Llama-3.1-8B" | |
| rag_model_name = "mims-harvard/ToolRAG-T1-GTE-Qwen2-1.5B" | |
| logger.info("✅ Original TxAgent models are accessible") | |
| except Exception as model_error: | |
| logger.warning(f"⚠️ Original models not accessible, using fallback: {model_error}") | |
| # Initialize TxAgent with available models | |
| txagent_instance = TxAgent( | |
| model_name=model_name, | |
| rag_model_name=rag_model_name, | |
| enable_finish=True, | |
| enable_rag=False, # Set to True if you want RAG functionality | |
| force_finish=True, | |
| enable_checker=True, | |
| step_rag_num=4, | |
| seed=42 | |
| ) | |
| txagent_instance.init_model() | |
| # Set the same chat prompt as the original | |
| txagent_instance.chat_prompt = ( | |
| "You are a clinical assistant AI. Analyze the patient's data and provide clear clinical recommendations." | |
| ) | |
| logger.info(f"✅ TxAgent initialized successfully with model: {model_name}") | |
| except Exception as e: | |
| logger.error(f"❌ Failed to initialize TxAgent: {e}") | |
| txagent_instance = None | |
| return txagent_instance | |
| # Define the ChatRequest model with an optional patient_id | |
| class ChatRequest(BaseModel): | |
| message: str | |
| history: Optional[List[dict]] = None | |
| format: Optional[str] = "clean" | |
| temperature: Optional[float] = 0.7 | |
| max_new_tokens: Optional[int] = 512 | |
| patient_id: Optional[str] = None | |
| class VoiceOutputRequest(BaseModel): | |
| text: str | |
| language: str = "en-US" | |
| slow: bool = False | |
| return_format: str = "mp3" | |
| class RiskLevel(BaseModel): | |
| level: str | |
| score: float | |
| factors: Optional[List[str]] = None | |
| router = APIRouter(prefix="/txagent", tags=["TxAgent"]) | |
| async def status(current_user: dict = Depends(get_current_user)): | |
| logger.info(f"Status endpoint accessed by {current_user['email']}") | |
| return { | |
| "status": "running", | |
| "timestamp": datetime.utcnow().isoformat(), | |
| "version": "2.6.0", | |
| "features": ["chat", "voice-input", "voice-output", "patient-analysis", "report-upload", "patient-reports-pdf", "all-patients-reports-pdf"] | |
| } | |
| async def get_patient_analysis_results( | |
| name: Optional[str] = Query(None), | |
| current_user: dict = Depends(get_current_user) | |
| ): | |
| logger.info(f"Fetching analysis results by {current_user['email']}") | |
| try: | |
| # Check if user has appropriate permissions | |
| if not any(role in current_user.get('roles', []) for role in ['doctor', 'admin']): | |
| raise HTTPException(status_code=403, detail="Only doctors and admins can access analysis results") | |
| # Import database collections | |
| from db.mongo import db | |
| patients_collection = db.patients | |
| analysis_collection = db.patient_analysis_results | |
| query = {} | |
| if name: | |
| name_regex = re.compile(name, re.IGNORECASE) | |
| matching_patients = await patients_collection.find({"full_name": name_regex}).to_list(length=None) | |
| patient_ids = [p["fhir_id"] for p in matching_patients if "fhir_id" in p] | |
| if not patient_ids: | |
| return [] | |
| query = {"patient_id": {"$in": patient_ids}} | |
| analyses = await analysis_collection.find(query).sort("timestamp", -1).to_list(length=100) | |
| enriched_results = [] | |
| for analysis in analyses: | |
| patient = await patients_collection.find_one({"fhir_id": analysis.get("patient_id")}) | |
| if not patient: | |
| continue # Skip if patient no longer exists | |
| # Format the response with proper fields matching the expected format | |
| # Handle both old format (risk_level, risk_score) and new format (suicide_risk object) | |
| suicide_risk_data = analysis.get("suicide_risk", {}) | |
| # Extract risk data from suicide_risk object or fallback to individual fields | |
| if isinstance(suicide_risk_data, dict): | |
| risk_level = suicide_risk_data.get("level", "none") | |
| risk_score = suicide_risk_data.get("score", 0.0) | |
| risk_factors = suicide_risk_data.get("factors", []) | |
| else: | |
| # Fallback to individual fields for backward compatibility | |
| risk_level = analysis.get("risk_level", "none") | |
| risk_score = analysis.get("risk_score", 0.0) | |
| risk_factors = analysis.get("risk_factors", []) | |
| formatted_analysis = { | |
| "_id": str(analysis["_id"]), | |
| "patient_id": analysis.get("patient_id"), | |
| "full_name": patient.get("full_name", "Unknown"), | |
| "timestamp": analysis.get("timestamp"), | |
| "created_at": analysis.get("created_at"), | |
| "analysis_date": analysis.get("analysis_date"), | |
| "suicide_risk": { | |
| "level": _normalize_risk_level(risk_level), | |
| "score": risk_score, | |
| "factors": risk_factors | |
| }, | |
| "summary": analysis.get("summary", ""), | |
| "recommendations": analysis.get("recommendations", []) | |
| } | |
| enriched_results.append(formatted_analysis) | |
| return enriched_results | |
| except Exception as e: | |
| logger.error(f"Error fetching analysis results: {e}") | |
| return [] | |
| async def analyze_patients( | |
| current_user: dict = Depends(get_current_user) | |
| ): | |
| """Trigger analysis for all patients""" | |
| logger.info(f"Triggering analysis for all patients by {current_user['email']}") | |
| try: | |
| # Check if user has appropriate permissions | |
| if not any(role in current_user.get('roles', []) for role in ['doctor', 'admin']): | |
| raise HTTPException(status_code=403, detail="Only doctors and admins can trigger analysis") | |
| # Import database collections and analysis function | |
| from db.mongo import db | |
| from analysis import analyze_patient | |
| patients_collection = db.patients | |
| # Get all patients | |
| patients = await patients_collection.find({}).to_list(length=None) | |
| if not patients: | |
| return {"message": "No patients found to analyze", "analyzed_count": 0} | |
| analyzed_count = 0 | |
| for patient in patients: | |
| try: | |
| await analyze_patient(patient) | |
| analyzed_count += 1 | |
| logger.info(f"✅ Analyzed patient: {patient.get('full_name', 'Unknown')}") | |
| except Exception as e: | |
| logger.error(f"❌ Failed to analyze patient {patient.get('full_name', 'Unknown')}: {e}") | |
| continue | |
| return { | |
| "message": f"Analysis completed for {analyzed_count} patients", | |
| "analyzed_count": analyzed_count, | |
| "total_patients": len(patients) | |
| } | |
| except Exception as e: | |
| logger.error(f"Error triggering analysis: {e}") | |
| raise HTTPException(status_code=500, detail="Failed to trigger analysis") | |
| async def analyze_specific_patient( | |
| patient_id: str, | |
| current_user: dict = Depends(get_current_user) | |
| ): | |
| """Trigger analysis for a specific patient""" | |
| logger.info(f"Triggering analysis for patient {patient_id} by {current_user['email']}") | |
| try: | |
| # Check if user has appropriate permissions | |
| if not any(role in current_user.get('roles', []) for role in ['doctor', 'admin']): | |
| raise HTTPException(status_code=403, detail="Only doctors and admins can trigger analysis") | |
| # Import database collections and analysis function | |
| from db.mongo import db | |
| from analysis import analyze_patient | |
| patients_collection = db.patients | |
| # Find the patient | |
| patient = await patients_collection.find_one({"fhir_id": patient_id}) | |
| if not patient: | |
| raise HTTPException(status_code=404, detail="Patient not found") | |
| # Analyze the patient | |
| await analyze_patient(patient) | |
| return { | |
| "message": f"Analysis completed for patient {patient.get('full_name', 'Unknown')}", | |
| "patient_id": patient_id, | |
| "patient_name": patient.get('full_name', 'Unknown') | |
| } | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| logger.error(f"Error analyzing patient {patient_id}: {e}") | |
| raise HTTPException(status_code=500, detail="Failed to analyze patient") | |
| async def chat_with_txagent( | |
| request: ChatRequest, | |
| current_user: dict = Depends(get_current_user) | |
| ): | |
| """Chat avec TxAgent intégré""" | |
| try: | |
| # Vérifier que l'utilisateur est médecin ou admin | |
| if not any(role in current_user.get('roles', []) for role in ['doctor', 'admin']): | |
| raise HTTPException(status_code=403, detail="Only doctors and admins can use TxAgent") | |
| # For now, return a simple response since the full TxAgent is not yet implemented | |
| response = f"TxAgent integrated response: {request.message}" | |
| return { | |
| "status": "success", | |
| "response": response, | |
| "mode": "integrated" | |
| } | |
| except Exception as e: | |
| logger.error(f"Error in TxAgent chat: {e}") | |
| raise HTTPException(status_code=500, detail="Failed to process chat request") | |
| async def chat_stream_with_txagent( | |
| request: ChatRequest, | |
| current_user: dict = Depends(get_current_user) | |
| ): | |
| """Streaming chat avec TxAgent intégré""" | |
| try: | |
| # Vérifier que l'utilisateur est médecin ou admin | |
| if not any(role in current_user.get('roles', []) for role in ['doctor', 'admin']): | |
| raise HTTPException(status_code=403, detail="Only doctors and admins can use TxAgent") | |
| logger.info(f"Chat stream initiated by {current_user['email']}: {request.message}") | |
| # Generate a response (for now, a simple response) | |
| response_text = f"Hello! I'm your clinical assistant. You said: '{request.message}'. How can I help you with patient care today?" | |
| # Store the chat in the database | |
| try: | |
| from db.mongo import db | |
| chats_collection = db.chats | |
| chat_entry = { | |
| "message": request.message, | |
| "response": response_text, | |
| "user_id": current_user.get('_id'), | |
| "user_email": current_user.get('email'), | |
| "timestamp": datetime.utcnow(), | |
| "patient_id": request.patient_id if hasattr(request, 'patient_id') else None, | |
| "chat_type": "text_chat" | |
| } | |
| await chats_collection.insert_one(chat_entry) | |
| logger.info(f"Chat stored in database for user {current_user['email']}") | |
| except Exception as db_error: | |
| logger.error(f"Failed to store chat in database: {str(db_error)}") | |
| # Continue even if database storage fails | |
| # Return streaming response | |
| async def generate_response(): | |
| # Simulate streaming by sending the response in chunks | |
| words = response_text.split() | |
| chunk_size = 3 # Send 3 words at a time | |
| for i in range(0, len(words), chunk_size): | |
| chunk = " ".join(words[i:i + chunk_size]) | |
| if i + chunk_size < len(words): | |
| chunk += " " # Add space if not the last chunk | |
| yield chunk | |
| await asyncio.sleep(0.1) # Small delay to simulate streaming | |
| return StreamingResponse( | |
| generate_response(), | |
| media_type="text/plain" | |
| ) | |
| except Exception as e: | |
| logger.error(f"Error in TxAgent chat stream: {e}") | |
| raise HTTPException(status_code=500, detail="Failed to process chat stream request") | |
| async def transcribe_audio( | |
| audio: UploadFile = File(...), | |
| current_user: dict = Depends(get_current_user) | |
| ): | |
| """Transcription vocale avec TxAgent intégré""" | |
| try: | |
| if not any(role in current_user.get('roles', []) for role in ['doctor', 'admin']): | |
| raise HTTPException(status_code=403, detail="Only doctors and admins can use voice features") | |
| # For now, return mock transcription | |
| return { | |
| "status": "success", | |
| "transcription": "Mock voice transcription from integrated TxAgent", | |
| "mode": "integrated" | |
| } | |
| except Exception as e: | |
| logger.error(f"Error in voice transcription: {e}") | |
| raise HTTPException(status_code=500, detail="Failed to transcribe audio") | |
| async def synthesize_speech( | |
| request: VoiceOutputRequest, | |
| current_user: dict = Depends(get_current_user) | |
| ): | |
| """Synthèse vocale avec TxAgent intégré""" | |
| try: | |
| if not any(role in current_user.get('roles', []) for role in ['doctor', 'admin']): | |
| raise HTTPException(status_code=403, detail="Only doctors and admins can use voice features") | |
| # For now, return mock audio data | |
| audio_data = b"Mock audio data from integrated TxAgent" | |
| return StreamingResponse( | |
| iter([audio_data]), | |
| media_type="audio/mpeg", | |
| headers={"Content-Disposition": "attachment; filename=speech.mp3"} | |
| ) | |
| except Exception as e: | |
| logger.error(f"Error in voice synthesis: {e}") | |
| raise HTTPException(status_code=500, detail="Failed to synthesize speech") | |
| async def get_chats(current_user: dict = Depends(get_current_user)): | |
| """Obtient l'historique des chats""" | |
| try: | |
| if not any(role in current_user.get('roles', []) for role in ['doctor', 'admin']): | |
| raise HTTPException(status_code=403, detail="Only doctors and admins can access chat history") | |
| # Import database collections | |
| from db.mongo import db | |
| chats_collection = db.chats | |
| # Query local database for chat history | |
| cursor = chats_collection.find().sort("timestamp", -1).limit(50) | |
| chats = await cursor.to_list(length=50) | |
| return [ | |
| { | |
| "id": str(chat["_id"]), | |
| "message": chat.get("message", ""), | |
| "response": chat.get("response", ""), | |
| "timestamp": chat.get("timestamp"), | |
| "user_id": str(chat.get("user_id", "")), | |
| "patient_id": str(chat.get("patient_id", "")) if chat.get("patient_id") else None | |
| } | |
| for chat in chats | |
| ] | |
| except Exception as e: | |
| logger.error(f"Error getting chats: {e}") | |
| raise HTTPException(status_code=500, detail="Failed to get chats") | |
| async def get_patient_analysis_reports_pdf( | |
| patient_id: str = Path(...), | |
| current_user: dict = Depends(get_current_user) | |
| ): | |
| """Generate PDF analysis reports for a specific patient""" | |
| try: | |
| if not any(role in current_user.get('roles', []) for role in ['doctor', 'admin']): | |
| raise HTTPException(status_code=403, detail="Only doctors and admins can generate PDF reports") | |
| logger.info(f"Generating PDF analysis reports for patient {patient_id} by {current_user['email']}") | |
| # Import database collections | |
| from db.mongo import db | |
| analysis_collection = db.patient_analysis_results | |
| # Find analysis results for the patient | |
| analysis_results = await analysis_collection.find({"patient_id": patient_id}).to_list(length=None) | |
| if not analysis_results: | |
| raise HTTPException(status_code=404, detail="No analysis results found for this patient") | |
| # Create a simple PDF report | |
| from reportlab.lib.pagesizes import letter | |
| from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer, Table, TableStyle | |
| from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle | |
| from reportlab.lib.units import inch | |
| from reportlab.lib import colors | |
| import io | |
| # Create PDF buffer | |
| buffer = io.BytesIO() | |
| doc = SimpleDocTemplate(buffer, pagesize=letter) | |
| styles = getSampleStyleSheet() | |
| story = [] | |
| # Title | |
| title_style = ParagraphStyle( | |
| 'CustomTitle', | |
| parent=styles['Heading1'], | |
| fontSize=16, | |
| spaceAfter=30, | |
| alignment=1 # Center alignment | |
| ) | |
| story.append(Paragraph("Patient Analysis Report", title_style)) | |
| story.append(Spacer(1, 12)) | |
| # Patient Information | |
| story.append(Paragraph("Patient Information", styles['Heading2'])) | |
| story.append(Spacer(1, 12)) | |
| # Get patient info from first analysis result | |
| first_result = analysis_results[0] | |
| patient_info = [ | |
| ["Patient ID:", patient_id], | |
| ["Analysis Date:", first_result.get('timestamp', 'N/A')], | |
| ] | |
| patient_table = Table(patient_info, colWidths=[2*inch, 4*inch]) | |
| patient_table.setStyle(TableStyle([ | |
| ('BACKGROUND', (0, 0), (0, -1), colors.grey), | |
| ('TEXTCOLOR', (0, 0), (0, -1), colors.whitesmoke), | |
| ('ALIGN', (0, 0), (-1, -1), 'LEFT'), | |
| ('FONTNAME', (0, 0), (-1, -1), 'Helvetica-Bold'), | |
| ('FONTSIZE', (0, 0), (-1, -1), 10), | |
| ('BOTTOMPADDING', (0, 0), (-1, 0), 12), | |
| ('BACKGROUND', (1, 0), (1, -1), colors.beige), | |
| ('GRID', (0, 0), (-1, -1), 1, colors.black) | |
| ])) | |
| story.append(patient_table) | |
| story.append(Spacer(1, 20)) | |
| # Analysis Results | |
| story.append(Paragraph("Analysis Results", styles['Heading2'])) | |
| story.append(Spacer(1, 12)) | |
| for i, result in enumerate(analysis_results): | |
| # Risk Assessment | |
| suicide_risk = result.get('suicide_risk', {}) | |
| risk_level = suicide_risk.get('level', 'none') if isinstance(suicide_risk, dict) else 'none' | |
| risk_score = suicide_risk.get('score', 0.0) if isinstance(suicide_risk, dict) else 0.0 | |
| risk_factors = suicide_risk.get('factors', []) if isinstance(suicide_risk, dict) else [] | |
| story.append(Paragraph(f"Analysis #{i+1}", styles['Heading3'])) | |
| story.append(Spacer(1, 6)) | |
| analysis_data = [ | |
| ["Risk Level:", risk_level.upper()], | |
| ["Risk Score:", f"{risk_score:.2f}"], | |
| ["Risk Factors:", ", ".join(risk_factors) if risk_factors else "None identified"], | |
| ["Analysis Date:", result.get('timestamp', 'N/A')], | |
| ] | |
| analysis_table = Table(analysis_data, colWidths=[2*inch, 4*inch]) | |
| analysis_table.setStyle(TableStyle([ | |
| ('BACKGROUND', (0, 0), (0, -1), colors.lightblue), | |
| ('TEXTCOLOR', (0, 0), (0, -1), colors.black), | |
| ('ALIGN', (0, 0), (-1, -1), 'LEFT'), | |
| ('FONTNAME', (0, 0), (-1, -1), 'Helvetica'), | |
| ('FONTSIZE', (0, 0), (-1, -1), 9), | |
| ('BOTTOMPADDING', (0, 0), (-1, 0), 6), | |
| ('BACKGROUND', (1, 0), (1, -1), colors.white), | |
| ('GRID', (0, 0), (-1, -1), 1, colors.black) | |
| ])) | |
| story.append(analysis_table) | |
| story.append(Spacer(1, 12)) | |
| # Summary if available | |
| if result.get('summary'): | |
| story.append(Paragraph("Summary:", styles['Heading4'])) | |
| story.append(Paragraph(result['summary'], styles['Normal'])) | |
| story.append(Spacer(1, 12)) | |
| # Build PDF | |
| doc.build(story) | |
| buffer.seek(0) | |
| return StreamingResponse( | |
| buffer, | |
| media_type="application/pdf", | |
| headers={"Content-Disposition": f"attachment; filename=patient_{patient_id}_analysis_reports.pdf"} | |
| ) | |
| except Exception as e: | |
| logger.error(f"Error generating PDF report for patient {patient_id}: {str(e)}") | |
| raise HTTPException(status_code=500, detail=f"Failed to generate PDF report: {str(e)}") | |
| async def get_all_patients_analysis_reports_pdf( | |
| current_user: dict = Depends(get_current_user) | |
| ): | |
| """Generate PDF analysis reports for all patients""" | |
| try: | |
| if not any(role in current_user.get('roles', []) for role in ['doctor', 'admin']): | |
| raise HTTPException(status_code=403, detail="Only doctors and admins can generate PDF reports") | |
| logger.info(f"Generating PDF analysis reports for all patients by {current_user['email']}") | |
| # Import database collections | |
| from db.mongo import db | |
| analysis_collection = db.patient_analysis_results | |
| # Find all analysis results | |
| analysis_results = await analysis_collection.find({}).to_list(length=None) | |
| if not analysis_results: | |
| raise HTTPException(status_code=404, detail="No analysis results found") | |
| # Create a simple PDF report | |
| from reportlab.lib.pagesizes import letter | |
| from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer, Table, TableStyle | |
| from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle | |
| from reportlab.lib.units import inch | |
| from reportlab.lib import colors | |
| import io | |
| # Create PDF buffer | |
| buffer = io.BytesIO() | |
| doc = SimpleDocTemplate(buffer, pagesize=letter) | |
| styles = getSampleStyleSheet() | |
| story = [] | |
| # Title | |
| title_style = ParagraphStyle( | |
| 'CustomTitle', | |
| parent=styles['Heading1'], | |
| fontSize=16, | |
| spaceAfter=30, | |
| alignment=1 # Center alignment | |
| ) | |
| story.append(Paragraph("All Patients Analysis Reports", title_style)) | |
| story.append(Spacer(1, 12)) | |
| # Summary | |
| story.append(Paragraph("Summary", styles['Heading2'])) | |
| story.append(Spacer(1, 12)) | |
| summary_data = [ | |
| ["Total Analysis Reports:", str(len(analysis_results))], | |
| ["Generated Date:", datetime.now().strftime("%Y-%m-%d %H:%M:%S")], | |
| ["Generated By:", current_user['email']], | |
| ] | |
| summary_table = Table(summary_data, colWidths=[2*inch, 4*inch]) | |
| summary_table.setStyle(TableStyle([ | |
| ('BACKGROUND', (0, 0), (0, -1), colors.grey), | |
| ('TEXTCOLOR', (0, 0), (0, -1), colors.whitesmoke), | |
| ('ALIGN', (0, 0), (-1, -1), 'LEFT'), | |
| ('FONTNAME', (0, 0), (-1, -1), 'Helvetica-Bold'), | |
| ('FONTSIZE', (0, 0), (-1, -1), 10), | |
| ('BOTTOMPADDING', (0, 0), (-1, 0), 12), | |
| ('BACKGROUND', (1, 0), (1, -1), colors.beige), | |
| ('GRID', (0, 0), (-1, -1), 1, colors.black) | |
| ])) | |
| story.append(summary_table) | |
| story.append(Spacer(1, 20)) | |
| # Group results by patient | |
| patient_results = {} | |
| for result in analysis_results: | |
| patient_id = result.get('patient_id', 'unknown') | |
| if patient_id not in patient_results: | |
| patient_results[patient_id] = [] | |
| patient_results[patient_id].append(result) | |
| # Patient Reports | |
| for patient_id, results in patient_results.items(): | |
| story.append(Paragraph(f"Patient: {patient_id}", styles['Heading2'])) | |
| story.append(Spacer(1, 12)) | |
| for i, result in enumerate(results): | |
| # Risk Assessment | |
| suicide_risk = result.get('suicide_risk', {}) | |
| risk_level = suicide_risk.get('level', 'none') if isinstance(suicide_risk, dict) else 'none' | |
| risk_score = suicide_risk.get('score', 0.0) if isinstance(suicide_risk, dict) else 0.0 | |
| risk_factors = suicide_risk.get('factors', []) if isinstance(suicide_risk, dict) else [] | |
| story.append(Paragraph(f"Analysis #{i+1}", styles['Heading3'])) | |
| story.append(Spacer(1, 6)) | |
| analysis_data = [ | |
| ["Risk Level:", risk_level.upper()], | |
| ["Risk Score:", f"{risk_score:.2f}"], | |
| ["Risk Factors:", ", ".join(risk_factors) if risk_factors else "None identified"], | |
| ["Analysis Date:", result.get('timestamp', 'N/A')], | |
| ] | |
| analysis_table = Table(analysis_data, colWidths=[2*inch, 4*inch]) | |
| analysis_table.setStyle(TableStyle([ | |
| ('BACKGROUND', (0, 0), (0, -1), colors.lightblue), | |
| ('TEXTCOLOR', (0, 0), (0, -1), colors.black), | |
| ('ALIGN', (0, 0), (-1, -1), 'LEFT'), | |
| ('FONTNAME', (0, 0), (-1, -1), 'Helvetica'), | |
| ('FONTSIZE', (0, 0), (-1, -1), 9), | |
| ('BOTTOMPADDING', (0, 0), (-1, 0), 6), | |
| ('BACKGROUND', (1, 0), (1, -1), colors.white), | |
| ('GRID', (0, 0), (-1, -1), 1, colors.black) | |
| ])) | |
| story.append(analysis_table) | |
| story.append(Spacer(1, 12)) | |
| # Summary if available | |
| if result.get('summary'): | |
| story.append(Paragraph("Summary:", styles['Heading4'])) | |
| story.append(Paragraph(result['summary'], styles['Normal'])) | |
| story.append(Spacer(1, 12)) | |
| story.append(Spacer(1, 20)) | |
| # Build PDF | |
| doc.build(story) | |
| buffer.seek(0) | |
| return StreamingResponse( | |
| buffer, | |
| media_type="application/pdf", | |
| headers={"Content-Disposition": f"attachment; filename=all_patients_analysis_reports_{datetime.now().strftime('%Y%m%d')}.pdf"} | |
| ) | |
| except Exception as e: | |
| logger.error(f"Error generating PDF report for all patients: {str(e)}") | |
| raise HTTPException(status_code=500, detail=f"Failed to generate PDF report: {str(e)}") | |
| # Voice synthesis endpoint | |
| async def synthesize_voice( | |
| request: dict, | |
| current_user: dict = Depends(get_current_user) | |
| ): | |
| """ | |
| Convert text to speech using gTTS | |
| """ | |
| try: | |
| logger.info(f"Voice synthesis initiated by {current_user['email']}") | |
| # Extract parameters from request | |
| text = request.get('text', '') | |
| language = request.get('language', 'en-US') | |
| return_format = request.get('return_format', 'mp3') | |
| if not text: | |
| raise HTTPException(status_code=400, detail="Text is required") | |
| # Convert language code for gTTS (e.g., 'en-US' -> 'en') | |
| language_code = language.split('-')[0] if '-' in language else language | |
| # Generate speech | |
| audio_data = text_to_speech(text, language=language_code) | |
| # Return audio data | |
| return StreamingResponse( | |
| io.BytesIO(audio_data), | |
| media_type=f"audio/{return_format}", | |
| headers={"Content-Disposition": f"attachment; filename=speech.{return_format}"} | |
| ) | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| logger.error(f"Error in voice synthesis: {e}") | |
| raise HTTPException(status_code=500, detail="Error generating voice output") | |