from fastapi import APIRouter, HTTPException, Depends, Query, status, Body from db.mongo import patients_collection, db # Added db import from core.security import get_current_user from utils.db import create_indexes from utils.helpers import calculate_age, standardize_language from models.entities import Note, PatientCreate from models.schemas import PatientListResponse # Fixed import from api.services.fhir_integration import HAPIFHIRIntegrationService from datetime import datetime from bson import ObjectId from bson.errors import InvalidId from typing import Optional, List, Dict, Any from pymongo import UpdateOne, DeleteOne from pymongo.errors import BulkWriteError import json from pathlib import Path import glob import uuid import re import logging import time import os from pydantic import BaseModel, Field from motor.motor_asyncio import AsyncIOMotorClient # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(name)s - %(message)s' ) logger = logging.getLogger(__name__) router = APIRouter() # Configuration BASE_DIR = Path(__file__).resolve().parent.parent.parent SYNTHEA_DATA_DIR = BASE_DIR / "output" / "fhir" try: os.makedirs(SYNTHEA_DATA_DIR, exist_ok=True) except PermissionError: # In containerized environments, we might not have write permissions # Use a temporary directory instead import tempfile SYNTHEA_DATA_DIR = Path(tempfile.gettempdir()) / "fhir" os.makedirs(SYNTHEA_DATA_DIR, exist_ok=True) # Pydantic models for update validation class ConditionUpdate(BaseModel): id: Optional[str] = None code: Optional[str] = None status: Optional[str] = None onset_date: Optional[str] = None recorded_date: Optional[str] = None verification_status: Optional[str] = None notes: Optional[str] = None class MedicationUpdate(BaseModel): id: Optional[str] = None name: Optional[str] = None status: Optional[str] = None prescribed_date: Optional[str] = None requester: Optional[str] = None dosage: Optional[str] = None class EncounterUpdate(BaseModel): id: Optional[str] = None type: Optional[str] = None status: Optional[str] = None period: Optional[Dict[str, str]] = None service_provider: Optional[str] = None class NoteUpdate(BaseModel): id: Optional[str] = None title: Optional[str] = None date: Optional[str] = None author: Optional[str] = None content: Optional[str] = None class PatientUpdate(BaseModel): full_name: Optional[str] = None gender: Optional[str] = None date_of_birth: Optional[str] = None address: Optional[str] = None city: Optional[str] = None state: Optional[str] = None postal_code: Optional[str] = None country: Optional[str] = None marital_status: Optional[str] = None language: Optional[str] = None conditions: Optional[List[ConditionUpdate]] = None medications: Optional[List[MedicationUpdate]] = None encounters: Optional[List[EncounterUpdate]] = None notes: Optional[List[NoteUpdate]] = None @router.get("/debug/count") async def debug_patient_count(): """Debug endpoint to verify patient counts""" try: total = await patients_collection.count_documents({}) synthea = await patients_collection.count_documents({"source": "synthea"}) manual = await patients_collection.count_documents({"source": "manual"}) return { "total": total, "synthea": synthea, "manual": manual, "message": f"Found {total} total patients ({synthea} from synthea, {manual} manual)" } except Exception as e: logger.error(f"Error counting patients: {str(e)}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Error counting patients: {str(e)}" ) @router.post("/patients", status_code=status.HTTP_201_CREATED) async def create_patient( patient_data: PatientCreate, current_user: dict = Depends(get_current_user) ): """Create a new patient in the database""" logger.info(f"Creating new patient by user {current_user.get('email')}") if not any(role in current_user.get('roles', []) for role in ['admin', 'doctor']): logger.warning(f"Unauthorized create attempt by {current_user.get('email')}") raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Only administrators and doctors can create patients" ) try: # Prepare the patient document patient_doc = patient_data.dict() now = datetime.utcnow().isoformat() # Add system-generated fields patient_doc.update({ "fhir_id": str(uuid.uuid4()), "import_date": now, "last_updated": now, "source": "manual", "created_by": current_user.get('email') }) # Ensure arrays exist even if empty for field in ['conditions', 'medications', 'encounters', 'notes']: if field not in patient_doc: patient_doc[field] = [] # Insert the patient document result = await patients_collection.insert_one(patient_doc) # Return the created patient with the generated ID created_patient = await patients_collection.find_one( {"_id": result.inserted_id} ) if not created_patient: logger.error("Failed to retrieve created patient") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to retrieve created patient" ) created_patient["id"] = str(created_patient["_id"]) del created_patient["_id"] logger.info(f"Successfully created patient {created_patient['fhir_id']}") return created_patient except Exception as e: logger.error(f"Failed to create patient: {str(e)}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to create patient: {str(e)}" ) @router.delete("/patients/{patient_id}", status_code=status.HTTP_204_NO_CONTENT) async def delete_patient( patient_id: str, current_user: dict = Depends(get_current_user) ): """Delete a patient from the database""" logger.info(f"Deleting patient {patient_id} by user {current_user.get('email')}") if not any(role in current_user.get('roles', []) for role in ['admin', 'doctor']): logger.warning(f"Unauthorized delete attempt by {current_user.get('email')}") raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Only administrators can delete patients" ) try: # Build the query based on whether patient_id is a valid ObjectId query = {"fhir_id": patient_id} if ObjectId.is_valid(patient_id): query = { "$or": [ {"_id": ObjectId(patient_id)}, {"fhir_id": patient_id} ] } # Check if patient exists patient = await patients_collection.find_one(query) if not patient: logger.warning(f"Patient not found for deletion: {patient_id}") raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Patient not found" ) # Perform deletion result = await patients_collection.delete_one(query) if result.deleted_count == 0: logger.error(f"Failed to delete patient {patient_id}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to delete patient" ) logger.info(f"Successfully deleted patient {patient_id}") return None except HTTPException: raise except Exception as e: logger.error(f"Failed to delete patient {patient_id}: {str(e)}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to delete patient: {str(e)}" ) async def process_synthea_patient(bundle: dict, file_path: str) -> Optional[dict]: logger.debug(f"Processing patient from file: {file_path}") patient_data = {} notes = [] conditions = [] medications = [] encounters = [] # Validate bundle structure if not isinstance(bundle, dict) or 'entry' not in bundle: logger.error(f"Invalid FHIR bundle structure in {file_path}") return None for entry in bundle.get('entry', []): resource = entry.get('resource', {}) resource_type = resource.get('resourceType') if not resource_type: logger.warning(f"Skipping entry with missing resourceType in {file_path}") continue try: if resource_type == 'Patient': name = resource.get('name', [{}])[0] address = resource.get('address', [{}])[0] # Construct full name and remove numbers raw_full_name = f"{' '.join(name.get('given', ['']))} {name.get('family', '')}".strip() clean_full_name = re.sub(r'\d+', '', raw_full_name).strip() patient_data = { 'fhir_id': resource.get('id'), 'full_name': clean_full_name, 'gender': resource.get('gender', 'unknown'), 'date_of_birth': resource.get('birthDate', ''), 'address': ' '.join(address.get('line', [''])), 'city': address.get('city', ''), 'state': address.get('state', ''), 'postal_code': address.get('postalCode', ''), 'country': address.get('country', ''), 'marital_status': resource.get('maritalStatus', {}).get('text', ''), 'language': standardize_language(resource.get('communication', [{}])[0].get('language', {}).get('text', '')), 'source': 'synthea', 'last_updated': datetime.utcnow().isoformat() } elif resource_type == 'Encounter': encounter = { 'id': resource.get('id'), 'type': resource.get('type', [{}])[0].get('text', ''), 'status': resource.get('status'), 'period': resource.get('period', {}), 'service_provider': resource.get('serviceProvider', {}).get('display', '') } encounters.append(encounter) for note in resource.get('note', []): if note.get('text'): notes.append({ 'date': resource.get('period', {}).get('start', datetime.utcnow().isoformat()), 'type': resource.get('type', [{}])[0].get('text', 'Encounter Note'), 'text': note.get('text'), 'context': f"Encounter: {encounter.get('type')}", 'author': 'System Generated' }) elif resource_type == 'Condition': conditions.append({ 'id': resource.get('id'), 'code': resource.get('code', {}).get('text', ''), 'status': resource.get('clinicalStatus', {}).get('text', ''), 'onset_date': resource.get('onsetDateTime'), 'recorded_date': resource.get('recordedDate'), 'verification_status': resource.get('verificationStatus', {}).get('text', '') }) elif resource_type == 'MedicationRequest': medications.append({ 'id': resource.get('id'), 'name': resource.get('medicationCodeableConcept', {}).get('text', ''), 'status': resource.get('status'), 'prescribed_date': resource.get('authoredOn'), 'requester': resource.get('requester', {}).get('display', ''), 'dosage': resource.get('dosageInstruction', [{}])[0].get('text', '') }) except Exception as e: logger.error(f"Error processing {resource_type} in {file_path}: {str(e)}") continue if patient_data: patient_data.update({ 'notes': notes, 'conditions': conditions, 'medications': medications, 'encounters': encounters, 'import_date': datetime.utcnow().isoformat() }) logger.info(f"Successfully processed patient {patient_data.get('fhir_id')} from {file_path}") return patient_data logger.warning(f"No valid patient data found in {file_path}") return None @router.post("/import", status_code=status.HTTP_201_CREATED) async def import_patients( limit: int = Query(100, ge=1, le=1000), current_user: dict = Depends(get_current_user) ): request_id = str(uuid.uuid4()) logger.info(f"Starting import request {request_id} by user {current_user.get('email')}") start_time = time.time() if current_user.get('role') not in ['admin', 'doctor']: logger.warning(f"Unauthorized import attempt by {current_user.get('email')}") raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Only administrators and doctors can import data" ) try: await create_indexes() if not SYNTHEA_DATA_DIR.exists(): logger.error(f"Synthea data directory not found: {SYNTHEA_DATA_DIR}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Data directory not found" ) # Filter out non-patient files files = [ f for f in glob.glob(str(SYNTHEA_DATA_DIR / "*.json")) if not re.search(r'(hospitalInformation|practitionerInformation)\d+\.json$', f) ] if not files: logger.warning("No valid patient JSON files found in synthea data directory") return { "status": "success", "message": "No patient data files found", "imported": 0, "request_id": request_id } operations = [] imported = 0 errors = [] for file_path in files[:limit]: try: logger.debug(f"Processing file: {file_path}") # Check file accessibility if not os.path.exists(file_path): logger.error(f"File not found: {file_path}") errors.append(f"File not found: {file_path}") continue # Check file size file_size = os.path.getsize(file_path) if file_size == 0: logger.warning(f"Empty file: {file_path}") errors.append(f"Empty file: {file_path}") continue with open(file_path, 'r', encoding='utf-8') as f: try: bundle = json.load(f) except json.JSONDecodeError as je: logger.error(f"Invalid JSON in {file_path}: {str(je)}") errors.append(f"Invalid JSON in {file_path}: {str(je)}") continue patient = await process_synthea_patient(bundle, file_path) if patient: if not patient.get('fhir_id'): logger.warning(f"Missing FHIR ID in patient data from {file_path}") errors.append(f"Missing FHIR ID in {file_path}") continue operations.append(UpdateOne( {"fhir_id": patient['fhir_id']}, {"$setOnInsert": patient}, upsert=True )) imported += 1 else: logger.warning(f"No valid patient data in {file_path}") errors.append(f"No valid patient data in {file_path}") except Exception as e: logger.error(f"Error processing {file_path}: {str(e)}") errors.append(f"Error in {file_path}: {str(e)}") continue response = { "status": "success", "imported": imported, "errors": errors, "request_id": request_id, "duration_seconds": time.time() - start_time } if operations: try: result = await patients_collection.bulk_write(operations, ordered=False) response.update({ "upserted": result.upserted_count, "existing": len(operations) - result.upserted_count }) logger.info(f"Import request {request_id} completed: {imported} patients processed, " f"{result.upserted_count} upserted, {len(errors)} errors") except BulkWriteError as bwe: logger.error(f"Partial bulk write failure for request {request_id}: {str(bwe.details)}") response.update({ "upserted": bwe.details.get('nUpserted', 0), "existing": len(operations) - bwe.details.get('nUpserted', 0), "write_errors": [ f"Index {err['index']}: {err['errmsg']}" for err in bwe.details.get('writeErrors', []) ] }) logger.info(f"Import request {request_id} partially completed: {imported} patients processed, " f"{response['upserted']} upserted, {len(errors)} errors") except Exception as e: logger.error(f"Bulk write failed for request {request_id}: {str(e)}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Database operation failed: {str(e)}" ) else: logger.info(f"Import request {request_id} completed: No new patients to import, {len(errors)} errors") response["message"] = "No new patients found to import" return response except HTTPException: raise except Exception as e: logger.error(f"Import request {request_id} failed: {str(e)}", exc_info=True) raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Import failed: {str(e)}" ) @router.post("/patients/import-ehr", status_code=status.HTTP_201_CREATED) async def import_ehr_patients( ehr_data: List[dict], ehr_system: str = Query(..., description="Name of the EHR system"), current_user: dict = Depends(get_current_user), db_client: AsyncIOMotorClient = Depends(lambda: db) ): """Import patients from external EHR system""" logger.info(f"Importing {len(ehr_data)} patients from EHR system: {ehr_system}") if not any(role in current_user.get('roles', []) for role in ['admin', 'doctor']): logger.warning(f"Unauthorized EHR import attempt by {current_user.get('email')}") raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Only administrators and doctors can import EHR patients" ) try: imported_patients = [] skipped_patients = [] for patient_data in ehr_data: # Check if patient already exists by multiple criteria existing_patient = await patients_collection.find_one({ "$or": [ {"ehr_id": patient_data.get("ehr_id"), "ehr_system": ehr_system}, {"full_name": patient_data.get("full_name"), "date_of_birth": patient_data.get("date_of_birth")}, {"national_id": patient_data.get("national_id")} if patient_data.get("national_id") else {} ] }) if existing_patient: skipped_patients.append(patient_data.get("full_name", "Unknown")) logger.info(f"Patient {patient_data.get('full_name', 'Unknown')} already exists, skipping...") continue # Prepare patient document for EHR import patient_doc = { "full_name": patient_data.get("full_name"), "date_of_birth": patient_data.get("date_of_birth"), "gender": patient_data.get("gender"), "address": patient_data.get("address"), "national_id": patient_data.get("national_id"), "blood_type": patient_data.get("blood_type"), "allergies": patient_data.get("allergies", []), "chronic_conditions": patient_data.get("chronic_conditions", []), "medications": patient_data.get("medications", []), "emergency_contact_name": patient_data.get("emergency_contact_name"), "emergency_contact_phone": patient_data.get("emergency_contact_phone"), "insurance_provider": patient_data.get("insurance_provider"), "insurance_policy_number": patient_data.get("insurance_policy_number"), "contact": patient_data.get("contact"), "source": "ehr", "ehr_id": patient_data.get("ehr_id"), "ehr_system": ehr_system, "status": "active", "registration_date": datetime.now(), "created_by": current_user.get('email'), "created_at": datetime.now(), "updated_at": datetime.now() } # Insert patient result = await patients_collection.insert_one(patient_doc) imported_patients.append(patient_data.get("full_name", "Unknown")) logger.info(f"Successfully imported {len(imported_patients)} patients, skipped {len(skipped_patients)}") return { "message": f"Successfully imported {len(imported_patients)} patients from {ehr_system}", "imported_count": len(imported_patients), "skipped_count": len(skipped_patients), "imported_patients": imported_patients, "skipped_patients": skipped_patients } except Exception as e: logger.error(f"Error importing EHR patients: {str(e)}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Error importing EHR patients: {str(e)}" ) @router.get("/patients/sources", response_model=List[dict]) async def get_patient_sources( current_user: dict = Depends(get_current_user) ): """Get available patient sources and their counts""" try: # Get counts for each source source_counts = await patients_collection.aggregate([ { "$group": { "_id": "$source", "count": {"$sum": 1} } } ]).to_list(length=None) # Format the response sources = [] for source_count in source_counts: source_name = source_count["_id"] or "unknown" sources.append({ "source": source_name, "count": source_count["count"], "label": source_name.replace("_", " ").title() }) return sources except Exception as e: logger.error(f"Error getting patient sources: {str(e)}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Error getting patient sources: {str(e)}" ) @router.get("/patients", response_model=PatientListResponse) async def get_patients( page: int = Query(1, ge=1), limit: int = Query(20, ge=1, le=100), search: Optional[str] = Query(None), source: Optional[str] = Query(None), # Filter by patient source patient_status: Optional[str] = Query(None), # Filter by patient status doctor_id: Optional[str] = Query(None), # Filter by assigned doctor current_user: dict = Depends(get_current_user), db_client: AsyncIOMotorClient = Depends(lambda: db) ): """Get patients with filtering options""" skip = (page - 1) * limit user_id = current_user["_id"] # Debug logging logger.info(f"🔍 Getting patients for user: {current_user.get('email')} with roles: {current_user.get('roles', [])}") # Build filter query filter_query = {} # Role-based access - apply this first if 'admin' not in current_user.get('roles', []): if 'doctor' in current_user.get('roles', []): # Doctors can see all patients for now (temporarily simplified) logger.info("👨‍⚕️ Doctor access - no restrictions applied") pass # No restrictions for doctors else: # Patients can only see their own record logger.info(f"👤 Patient access - restricting to own record: {user_id}") filter_query["_id"] = ObjectId(user_id) # Build additional filters additional_filters = {} # Add search filter if search: additional_filters["$or"] = [ {"full_name": {"$regex": search, "$options": "i"}}, {"national_id": {"$regex": search, "$options": "i"}}, {"ehr_id": {"$regex": search, "$options": "i"}} ] # Add source filter if source: additional_filters["source"] = source # Add status filter if patient_status: additional_filters["status"] = patient_status # Add doctor assignment filter if doctor_id: additional_filters["assigned_doctor_id"] = ObjectId(doctor_id) # Combine filters if additional_filters: if filter_query.get("$or"): # If we have role-based $or, we need to combine with additional filters # Create a new $and condition filter_query = { "$and": [ filter_query, additional_filters ] } else: # No role-based restrictions, just use additional filters filter_query.update(additional_filters) logger.info(f"🔍 Final filter query: {filter_query}") try: # Get total count total = await patients_collection.count_documents(filter_query) logger.info(f"📊 Total patients matching filter: {total}") # Get patients with pagination patients_cursor = patients_collection.find(filter_query).skip(skip).limit(limit) patients = await patients_cursor.to_list(length=limit) logger.info(f"📋 Retrieved {len(patients)} patients") # Process patients to include doctor names and format dates processed_patients = [] for patient in patients: # Get assigned doctor name if exists assigned_doctor_name = None if patient.get("assigned_doctor_id"): doctor = await db_client.users.find_one({"_id": patient["assigned_doctor_id"]}) if doctor: assigned_doctor_name = doctor.get("full_name") # Convert ObjectId to string patient["id"] = str(patient["_id"]) del patient["_id"] # Add assigned doctor name patient["assigned_doctor_name"] = assigned_doctor_name processed_patients.append(patient) logger.info(f"✅ Returning {len(processed_patients)} processed patients") return PatientListResponse( patients=processed_patients, total=total, page=page, limit=limit, source_filter=source, status_filter=patient_status ) except Exception as e: logger.error(f"❌ Error fetching patients: {str(e)}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Error fetching patients: {str(e)}" ) @router.get("/patients/{patient_id}", response_model=dict) async def get_patient(patient_id: str): logger.info(f"Retrieving patient: {patient_id}") try: patient = await patients_collection.find_one({ "$or": [ {"_id": ObjectId(patient_id)}, {"fhir_id": patient_id} ] }) if not patient: logger.warning(f"Patient not found: {patient_id}") raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Patient not found" ) response = { "demographics": { "id": str(patient["_id"]), "fhir_id": patient.get("fhir_id"), "full_name": patient.get("full_name"), "gender": patient.get("gender"), "date_of_birth": patient.get("date_of_birth"), "age": calculate_age(patient.get("date_of_birth")), "address": { "line": patient.get("address"), "city": patient.get("city"), "state": patient.get("state"), "postal_code": patient.get("postal_code"), "country": patient.get("country") }, "marital_status": patient.get("marital_status"), "language": patient.get("language") }, "clinical_data": { "notes": patient.get("notes", []), "conditions": patient.get("conditions", []), "medications": patient.get("medications", []), "encounters": patient.get("encounters", []) }, "metadata": { "source": patient.get("source"), "import_date": patient.get("import_date"), "last_updated": patient.get("last_updated") } } logger.info(f"Successfully retrieved patient: {patient_id}") return response except ValueError as ve: logger.error(f"Invalid patient ID format: {patient_id}, error: {str(ve)}") raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid patient ID format" ) except Exception as e: logger.error(f"Failed to retrieve patient {patient_id}: {str(e)}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to retrieve patient: {str(e)}" ) @router.post("/{patient_id}/notes", status_code=status.HTTP_201_CREATED) async def add_note( patient_id: str, note: Note, current_user: dict = Depends(get_current_user) ): logger.info(f"Adding note for patient {patient_id} by user {current_user.get('email')}") if current_user.get('role') not in ['doctor', 'admin']: logger.warning(f"Unauthorized note addition attempt by {current_user.get('email')}") raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Only clinicians can add notes" ) try: note_data = note.dict() note_data.update({ "author": current_user.get('full_name', 'System'), "timestamp": datetime.utcnow().isoformat() }) result = await patients_collection.update_one( {"$or": [ {"_id": ObjectId(patient_id)}, {"fhir_id": patient_id} ]}, { "$push": {"notes": note_data}, "$set": {"last_updated": datetime.utcnow().isoformat()} } ) if result.modified_count == 0: logger.warning(f"Patient not found for note addition: {patient_id}") raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Patient not found" ) logger.info(f"Note added successfully for patient {patient_id}") return {"status": "success", "message": "Note added"} except ValueError as ve: logger.error(f"Invalid patient ID format: {patient_id}, error: {str(ve)}") raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid patient ID format" ) except Exception as e: logger.error(f"Failed to add note for patient {patient_id}: {str(e)}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to add note: {str(e)}" ) @router.put("/patients/{patient_id}", status_code=status.HTTP_200_OK) async def update_patient( patient_id: str, update_data: PatientUpdate, current_user: dict = Depends(get_current_user) ): """Update a patient's record in the database""" logger.info(f"Updating patient {patient_id} by user {current_user.get('email')}") if not any(role in current_user.get('roles', []) for role in ['admin', 'doctor']): logger.warning(f"Unauthorized update attempt by {current_user.get('email')}") raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Only administrators and doctors can update patients" ) try: # Build the query based on whether patient_id is a valid ObjectId query = {"fhir_id": patient_id} if ObjectId.is_valid(patient_id): query = { "$or": [ {"_id": ObjectId(patient_id)}, {"fhir_id": patient_id} ] } # Check if patient exists patient = await patients_collection.find_one(query) if not patient: logger.warning(f"Patient not found for update: {patient_id}") raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Patient not found" ) # Prepare update operations update_ops = {"$set": {"last_updated": datetime.utcnow().isoformat()}} # Handle demographic updates demographics = { "full_name": update_data.full_name, "gender": update_data.gender, "date_of_birth": update_data.date_of_birth, "address": update_data.address, "city": update_data.city, "state": update_data.state, "postal_code": update_data.postal_code, "country": update_data.country, "marital_status": update_data.marital_status, "language": update_data.language } for key, value in demographics.items(): if value is not None: update_ops["$set"][key] = value # Handle array updates (conditions, medications, encounters, notes) array_fields = { "conditions": update_data.conditions, "medications": update_data.medications, "encounters": update_data.encounters, "notes": update_data.notes } for field, items in array_fields.items(): if items is not None: # Fetch existing items existing_items = patient.get(field, []) updated_items = [] for item in items: item_dict = item.dict(exclude_unset=True) if not item_dict: continue # Generate ID for new items if not item_dict.get("id"): item_dict["id"] = str(uuid.uuid4()) # Validate required fields if field == "conditions" and not item_dict.get("code"): raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=f"Condition code is required for {field}" ) if field == "medications" and not item_dict.get("name"): raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=f"Medication name is required for {field}" ) if field == "encounters" and not item_dict.get("type"): raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=f"Encounter type is required for {field}" ) if field == "notes" and not item_dict.get("content"): raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=f"Note content is required for {field}" ) updated_items.append(item_dict) # Replace the entire array update_ops["$set"][field] = updated_items # Perform the update result = await patients_collection.update_one(query, update_ops) if result.modified_count == 0 and result.matched_count == 0: logger.warning(f"Patient not found for update: {patient_id}") raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Patient not found" ) # Retrieve and return the updated patient updated_patient = await patients_collection.find_one(query) if not updated_patient: logger.error(f"Failed to retrieve updated patient: {patient_id}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to retrieve updated patient" ) response = { "id": str(updated_patient["_id"]), "fhir_id": updated_patient.get("fhir_id"), "full_name": updated_patient.get("full_name"), "gender": updated_patient.get("gender"), "date_of_birth": updated_patient.get("date_of_birth"), "address": updated_patient.get("address"), "city": updated_patient.get("city"), "state": updated_patient.get("state"), "postal_code": updated_patient.get("postal_code"), "country": updated_patient.get("country"), "marital_status": updated_patient.get("marital_status"), "language": updated_patient.get("language"), "conditions": updated_patient.get("conditions", []), "medications": updated_patient.get("medications", []), "encounters": updated_patient.get("encounters", []), "notes": updated_patient.get("notes", []), "source": updated_patient.get("source"), "import_date": updated_patient.get("import_date"), "last_updated": updated_patient.get("last_updated") } logger.info(f"Successfully updated patient {patient_id}") return response except ValueError as ve: logger.error(f"Invalid patient ID format: {patient_id}, error: {str(ve)}") raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid patient ID format" ) except HTTPException: raise except Exception as e: logger.error(f"Failed to update patient {patient_id}: {str(e)}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to update patient: {str(e)}" ) # FHIR Integration Endpoints @router.post("/patients/import-hapi-fhir", status_code=status.HTTP_201_CREATED) async def import_hapi_patients( limit: int = Query(20, ge=1, le=100, description="Number of patients to import"), current_user: dict = Depends(get_current_user) ): """ Import patients from HAPI FHIR Test Server """ try: service = HAPIFHIRIntegrationService() result = await service.import_patients_from_hapi(limit=limit) # Create detailed message message_parts = [] if result["imported_count"] > 0: message_parts.append(f"Successfully imported {result['imported_count']} patients") if result["skipped_count"] > 0: message_parts.append(f"Skipped {result['skipped_count']} duplicate patients") if result["errors"]: message_parts.append(f"Encountered {len(result['errors'])} errors") message = ". ".join(message_parts) + " from HAPI FHIR" return { "message": message, "imported_count": result["imported_count"], "skipped_count": result["skipped_count"], "total_found": result["total_found"], "imported_patients": result["imported_patients"], "skipped_patients": result["skipped_patients"], "errors": result["errors"], "source": "hapi_fhir" } except Exception as e: logger.error(f"Error importing HAPI FHIR patients: {str(e)}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to import patients from HAPI FHIR: {str(e)}" ) @router.post("/patients/sync-patient/{patient_id}") async def sync_patient_data( patient_id: str, current_user: dict = Depends(get_current_user) ): """ Sync a specific patient's data from HAPI FHIR """ try: service = HAPIFHIRIntegrationService() success = await service.sync_patient_data(patient_id) if success: return { "message": f"Successfully synced patient {patient_id} from HAPI FHIR", "patient_id": patient_id, "success": True } else: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Patient {patient_id} not found in HAPI FHIR or sync failed" ) except HTTPException: raise except Exception as e: logger.error(f"Error syncing patient {patient_id}: {str(e)}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to sync patient: {str(e)}" ) @router.get("/patients/hapi-fhir/patients") async def get_hapi_patients( limit: int = Query(50, ge=1, le=200, description="Number of patients to fetch"), current_user: dict = Depends(get_current_user) ): """ Get patients from HAPI FHIR without importing them """ try: service = HAPIFHIRIntegrationService() patients = await service.get_hapi_patients(limit=limit) return { "patients": patients, "count": len(patients), "source": "hapi_fhir" } except Exception as e: logger.error(f"Error fetching HAPI FHIR patients: {str(e)}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to fetch patients from HAPI FHIR: {str(e)}" ) @router.get("/patients/hapi-fhir/patients/{patient_id}") async def get_hapi_patient_details( patient_id: str, current_user: dict = Depends(get_current_user) ): """ Get detailed information for a specific HAPI FHIR patient """ try: service = HAPIFHIRIntegrationService() patient_details = await service.get_hapi_patient_details(patient_id) if not patient_details: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Patient {patient_id} not found in HAPI FHIR" ) return patient_details except HTTPException: raise except Exception as e: logger.error(f"Error fetching HAPI FHIR patient details: {str(e)}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to fetch patient details from HAPI FHIR: {str(e)}" ) @router.get("/patients/hapi-fhir/statistics") async def get_hapi_statistics( current_user: dict = Depends(get_current_user) ): """ Get statistics about HAPI FHIR imported patients """ try: service = HAPIFHIRIntegrationService() stats = await service.get_patient_statistics() return { "statistics": stats, "source": "hapi_fhir" } except Exception as e: logger.error(f"Error getting HAPI FHIR statistics: {str(e)}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to get HAPI FHIR statistics: {str(e)}" ) # Export the router as 'patients' for api.__init__.py patients = router