import re import hashlib import io import json from datetime import datetime from typing import Dict, List, Tuple from bson import ObjectId import logging logger = logging.getLogger(__name__) def clean_text_response(text: str) -> str: """Clean and format text response""" text = re.sub(r'\n\s*\n', '\n\n', text) text = re.sub(r'[ ]+', ' ', text) return text.replace("**", "").replace("__", "").strip() def extract_section(text: str, heading: str) -> str: """Extract a section from text based on heading""" try: pattern = rf"{re.escape(heading)}:\s*\n(.*?)(?=\n[A-Z][^\n]*:|\Z)" match = re.search(pattern, text, re.DOTALL | re.IGNORECASE) return match.group(1).strip() if match else "" except Exception as e: logger.error(f"Section extraction failed for heading '{heading}': {e}") return "" def structure_medical_response(text: str) -> Dict: """Structure medical response into sections""" def extract_improved(text: str, heading: str) -> str: patterns = [ rf"{re.escape(heading)}:\s*\n(.*?)(?=\n\s*\n|\Z)", rf"\*\*{re.escape(heading)}\*\*:\s*\n(.*?)(?=\n\s*\n|\Z)", rf"{re.escape(heading)}[\s\-]+(.*?)(?=\n\s*\n|\Z)", rf"\n{re.escape(heading)}\s*\n(.*?)(?=\n\s*\n|\Z)" ] for pattern in patterns: match = re.search(pattern, text, re.DOTALL | re.IGNORECASE) if match: content = match.group(1).strip() content = re.sub(r'^\s*[\-\*]\s*', '', content, flags=re.MULTILINE) return content return "" text = text.replace('**', '').replace('__', '') return { "summary": extract_improved(text, "Summary of Patient's Medical History") or extract_improved(text, "Summarize the patient's medical history"), "risks": extract_improved(text, "Identify Risks or Red Flags") or extract_improved(text, "Risks or Red Flags"), "missed_issues": extract_improved(text, "Missed Diagnoses or Treatments") or extract_improved(text, "What the doctor might have missed"), "recommendations": extract_improved(text, "Suggest Next Clinical Steps") or extract_improved(text, "Suggested Clinical Actions") } def serialize_patient(patient: dict) -> dict: """Serialize patient data for JSON response""" patient_copy = patient.copy() if "_id" in patient_copy: patient_copy["_id"] = str(patient_copy["_id"]) return patient_copy def compute_patient_data_hash(data: dict) -> str: """Compute hash of patient data for change detection""" # Custom JSON encoder to handle datetime objects class DateTimeEncoder(json.JSONEncoder): def default(self, obj): if isinstance(obj, datetime): return obj.isoformat() elif isinstance(obj, ObjectId): return str(obj) return super().default(obj) serialized = json.dumps(data, sort_keys=True, cls=DateTimeEncoder) return hashlib.sha256(serialized.encode()).hexdigest() def compute_file_content_hash(file_content: bytes) -> str: """Compute hash of file content""" return hashlib.sha256(file_content).hexdigest() def create_notification(user_id: str, title: str, message: str, notification_type: str = "info", patient_id: str = None) -> dict: """Create a notification object""" return { "user_id": user_id, "title": title, "message": message, "type": notification_type, "read": False, "timestamp": datetime.utcnow(), "patient_id": patient_id } def format_risk_level(risk_level: str) -> str: """Normalize risk level names""" risk_level_mapping = { 'low': 'low', 'medium': 'moderate', 'moderate': 'moderate', 'high': 'high', 'severe': 'severe', 'critical': 'severe', 'none': 'none', 'unknown': 'none' } return risk_level_mapping.get(risk_level.lower(), 'none')