cps-api-tx / services /fhir_integration.py
Ali2206's picture
Initial CPS-API deployment with TxAgent integration
11102bd
raw
history blame
9.48 kB
from datetime import datetime
from typing import List, Dict, Optional
from api.utils.fhir_client import HAPIFHIRClient
from db.mongo import db
class HAPIFHIRIntegrationService:
"""
Service to integrate HAPI FHIR data with your existing database
"""
def __init__(self):
self.fhir_client = HAPIFHIRClient()
async def import_patients_from_hapi(self, limit: int = 20) -> dict:
"""
Import patients from HAPI FHIR Test Server with detailed feedback
"""
try:
print(f"Fetching {limit} patients from HAPI FHIR...")
patients = self.fhir_client.get_patients(limit=limit)
if not patients:
print("No patients found in HAPI FHIR")
return {
"imported_count": 0,
"skipped_count": 0,
"total_found": 0,
"imported_patients": [],
"skipped_patients": [],
"errors": []
}
print(f"Found {len(patients)} patients, checking for duplicates...")
imported_count = 0
skipped_count = 0
imported_patients = []
skipped_patients = []
errors = []
for patient in patients:
try:
# Check if patient already exists by multiple criteria
existing = await db.patients.find_one({
"$or": [
{"fhir_id": patient['fhir_id']},
{"full_name": patient['full_name'], "date_of_birth": patient['date_of_birth']},
{"demographics.fhir_id": patient['fhir_id']}
]
})
if existing:
skipped_count += 1
skipped_patients.append(patient['full_name'])
print(f"Patient {patient['full_name']} already exists (fhir_id: {patient['fhir_id']}), skipping...")
continue
# Enhance patient data with additional FHIR data
enhanced_patient = await self._enhance_patient_data(patient)
# Insert into database
result = await db.patients.insert_one(enhanced_patient)
if result.inserted_id:
imported_count += 1
imported_patients.append(patient['full_name'])
print(f"Imported patient: {patient['full_name']} (ID: {result.inserted_id})")
except Exception as e:
error_msg = f"Error importing patient {patient.get('full_name', 'Unknown')}: {e}"
errors.append(error_msg)
print(error_msg)
continue
print(f"Import completed: {imported_count} imported, {skipped_count} skipped")
return {
"imported_count": imported_count,
"skipped_count": skipped_count,
"total_found": len(patients),
"imported_patients": imported_patients,
"skipped_patients": skipped_patients,
"errors": errors
}
except Exception as e:
print(f"Error importing patients: {e}")
return {
"imported_count": 0,
"skipped_count": 0,
"total_found": 0,
"imported_patients": [],
"skipped_patients": [],
"errors": [str(e)]
}
async def _enhance_patient_data(self, patient: Dict) -> Dict:
"""
Enhance patient data with additional FHIR resources
"""
try:
patient_id = patient['fhir_id']
# Fetch additional data from HAPI FHIR
observations = self.fhir_client.get_patient_observations(patient_id)
medications = self.fhir_client.get_patient_medications(patient_id)
conditions = self.fhir_client.get_patient_conditions(patient_id)
# Structure the enhanced patient data
enhanced_patient = {
# Basic demographics
**patient,
# Clinical data
'demographics': {
'id': patient['id'],
'fhir_id': patient['fhir_id'],
'full_name': patient['full_name'],
'gender': patient['gender'],
'date_of_birth': patient['date_of_birth'],
'address': patient['address'],
'phone': patient.get('phone', ''),
'email': patient.get('email', ''),
'marital_status': patient.get('marital_status', 'Unknown'),
'language': patient.get('language', 'English')
},
'clinical_data': {
'observations': observations,
'medications': medications,
'conditions': conditions,
'notes': [], # Will be populated separately
'encounters': [] # Will be populated separately
},
'metadata': {
'source': 'hapi_fhir',
'import_date': datetime.now().isoformat(),
'last_updated': datetime.now().isoformat(),
'fhir_server': 'https://hapi.fhir.org/baseR4'
}
}
return enhanced_patient
except Exception as e:
print(f"Error enhancing patient data: {e}")
return patient
async def sync_patient_data(self, patient_id: str) -> bool:
"""
Sync a specific patient's data from HAPI FHIR
"""
try:
# Get patient from HAPI FHIR
patient = self.fhir_client.get_patient_by_id(patient_id)
if not patient:
print(f"Patient {patient_id} not found in HAPI FHIR")
return False
# Enhance with additional data
enhanced_patient = await self._enhance_patient_data(patient)
# Update in database
result = await db.patients.update_one(
{"fhir_id": patient_id},
{"$set": enhanced_patient},
upsert=True
)
if result.modified_count > 0 or result.upserted_id:
print(f"Synced patient: {patient['full_name']}")
return True
else:
print(f"No changes for patient: {patient['full_name']}")
return False
except Exception as e:
print(f"Error syncing patient {patient_id}: {e}")
return False
async def get_patient_statistics(self) -> Dict:
"""
Get statistics about imported patients
"""
try:
total_patients = await db.patients.count_documents({})
hapi_patients = await db.patients.count_documents({"source": "hapi_fhir"})
# Get sample patient data and convert ObjectId to string
sample_patient = await db.patients.find_one({"source": "hapi_fhir"})
if sample_patient:
# Convert ObjectId to string for JSON serialization
sample_patient['_id'] = str(sample_patient['_id'])
stats = {
'total_patients': total_patients,
'hapi_fhir_patients': hapi_patients,
'sample_patient': sample_patient
}
return stats
except Exception as e:
print(f"Error getting statistics: {e}")
return {}
async def get_hapi_patients(self, limit: int = 50) -> List[Dict]:
"""
Get patients from HAPI FHIR without importing them
"""
try:
patients = self.fhir_client.get_patients(limit=limit)
return patients
except Exception as e:
print(f"Error fetching HAPI patients: {e}")
return []
async def get_hapi_patient_details(self, patient_id: str) -> Optional[Dict]:
"""
Get detailed information for a specific HAPI FHIR patient
"""
try:
patient = self.fhir_client.get_patient_by_id(patient_id)
if not patient:
return None
# Get additional data
observations = self.fhir_client.get_patient_observations(patient_id)
medications = self.fhir_client.get_patient_medications(patient_id)
conditions = self.fhir_client.get_patient_conditions(patient_id)
return {
'patient': patient,
'observations': observations,
'medications': medications,
'conditions': conditions
}
except Exception as e:
print(f"Error fetching patient details: {e}")
return None