Spaces:
Sleeping
Sleeping
| from datetime import datetime | |
| from typing import List, Dict, Optional | |
| from api.utils.fhir_client import HAPIFHIRClient | |
| from db.mongo import db | |
| class HAPIFHIRIntegrationService: | |
| """ | |
| Service to integrate HAPI FHIR data with your existing database | |
| """ | |
| def __init__(self): | |
| self.fhir_client = HAPIFHIRClient() | |
| async def import_patients_from_hapi(self, limit: int = 20) -> dict: | |
| """ | |
| Import patients from HAPI FHIR Test Server with detailed feedback | |
| """ | |
| try: | |
| print(f"Fetching {limit} patients from HAPI FHIR...") | |
| patients = self.fhir_client.get_patients(limit=limit) | |
| if not patients: | |
| print("No patients found in HAPI FHIR") | |
| return { | |
| "imported_count": 0, | |
| "skipped_count": 0, | |
| "total_found": 0, | |
| "imported_patients": [], | |
| "skipped_patients": [], | |
| "errors": [] | |
| } | |
| print(f"Found {len(patients)} patients, checking for duplicates...") | |
| imported_count = 0 | |
| skipped_count = 0 | |
| imported_patients = [] | |
| skipped_patients = [] | |
| errors = [] | |
| for patient in patients: | |
| try: | |
| # Check if patient already exists by multiple criteria | |
| existing = await db.patients.find_one({ | |
| "$or": [ | |
| {"fhir_id": patient['fhir_id']}, | |
| {"full_name": patient['full_name'], "date_of_birth": patient['date_of_birth']}, | |
| {"demographics.fhir_id": patient['fhir_id']} | |
| ] | |
| }) | |
| if existing: | |
| skipped_count += 1 | |
| skipped_patients.append(patient['full_name']) | |
| print(f"Patient {patient['full_name']} already exists (fhir_id: {patient['fhir_id']}), skipping...") | |
| continue | |
| # Enhance patient data with additional FHIR data | |
| enhanced_patient = await self._enhance_patient_data(patient) | |
| # Insert into database | |
| result = await db.patients.insert_one(enhanced_patient) | |
| if result.inserted_id: | |
| imported_count += 1 | |
| imported_patients.append(patient['full_name']) | |
| print(f"Imported patient: {patient['full_name']} (ID: {result.inserted_id})") | |
| except Exception as e: | |
| error_msg = f"Error importing patient {patient.get('full_name', 'Unknown')}: {e}" | |
| errors.append(error_msg) | |
| print(error_msg) | |
| continue | |
| print(f"Import completed: {imported_count} imported, {skipped_count} skipped") | |
| return { | |
| "imported_count": imported_count, | |
| "skipped_count": skipped_count, | |
| "total_found": len(patients), | |
| "imported_patients": imported_patients, | |
| "skipped_patients": skipped_patients, | |
| "errors": errors | |
| } | |
| except Exception as e: | |
| print(f"Error importing patients: {e}") | |
| return { | |
| "imported_count": 0, | |
| "skipped_count": 0, | |
| "total_found": 0, | |
| "imported_patients": [], | |
| "skipped_patients": [], | |
| "errors": [str(e)] | |
| } | |
| async def _enhance_patient_data(self, patient: Dict) -> Dict: | |
| """ | |
| Enhance patient data with additional FHIR resources | |
| """ | |
| try: | |
| patient_id = patient['fhir_id'] | |
| # Fetch additional data from HAPI FHIR | |
| observations = self.fhir_client.get_patient_observations(patient_id) | |
| medications = self.fhir_client.get_patient_medications(patient_id) | |
| conditions = self.fhir_client.get_patient_conditions(patient_id) | |
| # Structure the enhanced patient data | |
| enhanced_patient = { | |
| # Basic demographics | |
| **patient, | |
| # Clinical data | |
| 'demographics': { | |
| 'id': patient['id'], | |
| 'fhir_id': patient['fhir_id'], | |
| 'full_name': patient['full_name'], | |
| 'gender': patient['gender'], | |
| 'date_of_birth': patient['date_of_birth'], | |
| 'address': patient['address'], | |
| 'phone': patient.get('phone', ''), | |
| 'email': patient.get('email', ''), | |
| 'marital_status': patient.get('marital_status', 'Unknown'), | |
| 'language': patient.get('language', 'English') | |
| }, | |
| 'clinical_data': { | |
| 'observations': observations, | |
| 'medications': medications, | |
| 'conditions': conditions, | |
| 'notes': [], # Will be populated separately | |
| 'encounters': [] # Will be populated separately | |
| }, | |
| 'metadata': { | |
| 'source': 'hapi_fhir', | |
| 'import_date': datetime.now().isoformat(), | |
| 'last_updated': datetime.now().isoformat(), | |
| 'fhir_server': 'https://hapi.fhir.org/baseR4' | |
| } | |
| } | |
| return enhanced_patient | |
| except Exception as e: | |
| print(f"Error enhancing patient data: {e}") | |
| return patient | |
| async def sync_patient_data(self, patient_id: str) -> bool: | |
| """ | |
| Sync a specific patient's data from HAPI FHIR | |
| """ | |
| try: | |
| # Get patient from HAPI FHIR | |
| patient = self.fhir_client.get_patient_by_id(patient_id) | |
| if not patient: | |
| print(f"Patient {patient_id} not found in HAPI FHIR") | |
| return False | |
| # Enhance with additional data | |
| enhanced_patient = await self._enhance_patient_data(patient) | |
| # Update in database | |
| result = await db.patients.update_one( | |
| {"fhir_id": patient_id}, | |
| {"$set": enhanced_patient}, | |
| upsert=True | |
| ) | |
| if result.modified_count > 0 or result.upserted_id: | |
| print(f"Synced patient: {patient['full_name']}") | |
| return True | |
| else: | |
| print(f"No changes for patient: {patient['full_name']}") | |
| return False | |
| except Exception as e: | |
| print(f"Error syncing patient {patient_id}: {e}") | |
| return False | |
| async def get_patient_statistics(self) -> Dict: | |
| """ | |
| Get statistics about imported patients | |
| """ | |
| try: | |
| total_patients = await db.patients.count_documents({}) | |
| hapi_patients = await db.patients.count_documents({"source": "hapi_fhir"}) | |
| # Get sample patient data and convert ObjectId to string | |
| sample_patient = await db.patients.find_one({"source": "hapi_fhir"}) | |
| if sample_patient: | |
| # Convert ObjectId to string for JSON serialization | |
| sample_patient['_id'] = str(sample_patient['_id']) | |
| stats = { | |
| 'total_patients': total_patients, | |
| 'hapi_fhir_patients': hapi_patients, | |
| 'sample_patient': sample_patient | |
| } | |
| return stats | |
| except Exception as e: | |
| print(f"Error getting statistics: {e}") | |
| return {} | |
| async def get_hapi_patients(self, limit: int = 50) -> List[Dict]: | |
| """ | |
| Get patients from HAPI FHIR without importing them | |
| """ | |
| try: | |
| patients = self.fhir_client.get_patients(limit=limit) | |
| return patients | |
| except Exception as e: | |
| print(f"Error fetching HAPI patients: {e}") | |
| return [] | |
| async def get_hapi_patient_details(self, patient_id: str) -> Optional[Dict]: | |
| """ | |
| Get detailed information for a specific HAPI FHIR patient | |
| """ | |
| try: | |
| patient = self.fhir_client.get_patient_by_id(patient_id) | |
| if not patient: | |
| return None | |
| # Get additional data | |
| observations = self.fhir_client.get_patient_observations(patient_id) | |
| medications = self.fhir_client.get_patient_medications(patient_id) | |
| conditions = self.fhir_client.get_patient_conditions(patient_id) | |
| return { | |
| 'patient': patient, | |
| 'observations': observations, | |
| 'medications': medications, | |
| 'conditions': conditions | |
| } | |
| except Exception as e: | |
| print(f"Error fetching patient details: {e}") | |
| return None |