Spaces:
Sleeping
Sleeping
| import requests | |
| import json | |
| from typing import List, Dict, Optional | |
| from datetime import datetime | |
| class HAPIFHIRClient: | |
| """ | |
| Client for connecting to HAPI FHIR Test Server | |
| """ | |
| def __init__(self, base_url: str = "https://hapi.fhir.org/baseR4"): | |
| self.base_url = base_url | |
| self.session = requests.Session() | |
| self.session.headers.update({ | |
| 'Content-Type': 'application/fhir+json', | |
| 'Accept': 'application/fhir+json' | |
| }) | |
| def get_patients(self, limit: int = 50) -> List[Dict]: | |
| """ | |
| Fetch patients from HAPI FHIR Test Server | |
| """ | |
| try: | |
| url = f"{self.base_url}/Patient" | |
| params = { | |
| '_count': limit, | |
| '_format': 'json' | |
| } | |
| response = self.session.get(url, params=params) | |
| response.raise_for_status() | |
| data = response.json() | |
| patients = [] | |
| if 'entry' in data: | |
| for entry in data['entry']: | |
| patient = self._parse_patient(entry['resource']) | |
| if patient: | |
| patients.append(patient) | |
| return patients | |
| except requests.RequestException as e: | |
| print(f"Error fetching patients: {e}") | |
| return [] | |
| def get_patient_by_id(self, patient_id: str) -> Optional[Dict]: | |
| """ | |
| Fetch a specific patient by ID | |
| """ | |
| try: | |
| url = f"{self.base_url}/Patient/{patient_id}" | |
| response = self.session.get(url) | |
| response.raise_for_status() | |
| patient_data = response.json() | |
| return self._parse_patient(patient_data) | |
| except requests.RequestException as e: | |
| print(f"Error fetching patient {patient_id}: {e}") | |
| return None | |
| def get_patient_observations(self, patient_id: str) -> List[Dict]: | |
| """ | |
| Fetch observations (vital signs, lab results) for a patient | |
| """ | |
| try: | |
| url = f"{self.base_url}/Observation" | |
| params = { | |
| 'subject': f"Patient/{patient_id}", | |
| '_count': 100, | |
| '_format': 'json' | |
| } | |
| response = self.session.get(url, params=params) | |
| response.raise_for_status() | |
| data = response.json() | |
| observations = [] | |
| if 'entry' in data: | |
| for entry in data['entry']: | |
| observation = self._parse_observation(entry['resource']) | |
| if observation: | |
| observations.append(observation) | |
| return observations | |
| except requests.RequestException as e: | |
| print(f"Error fetching observations for patient {patient_id}: {e}") | |
| return [] | |
| def get_patient_medications(self, patient_id: str) -> List[Dict]: | |
| """ | |
| Fetch medications for a patient | |
| """ | |
| try: | |
| url = f"{self.base_url}/MedicationRequest" | |
| params = { | |
| 'subject': f"Patient/{patient_id}", | |
| '_count': 100, | |
| '_format': 'json' | |
| } | |
| response = self.session.get(url, params=params) | |
| response.raise_for_status() | |
| data = response.json() | |
| medications = [] | |
| if 'entry' in data: | |
| for entry in data['entry']: | |
| medication = self._parse_medication(entry['resource']) | |
| if medication: | |
| medications.append(medication) | |
| return medications | |
| except requests.RequestException as e: | |
| print(f"Error fetching medications for patient {patient_id}: {e}") | |
| return [] | |
| def get_patient_conditions(self, patient_id: str) -> List[Dict]: | |
| """ | |
| Fetch conditions (diagnoses) for a patient | |
| """ | |
| try: | |
| url = f"{self.base_url}/Condition" | |
| params = { | |
| 'subject': f"Patient/{patient_id}", | |
| '_count': 100, | |
| '_format': 'json' | |
| } | |
| response = self.session.get(url, params=params) | |
| response.raise_for_status() | |
| data = response.json() | |
| conditions = [] | |
| if 'entry' in data: | |
| for entry in data['entry']: | |
| condition = self._parse_condition(entry['resource']) | |
| if condition: | |
| conditions.append(condition) | |
| return conditions | |
| except requests.RequestException as e: | |
| print(f"Error fetching conditions for patient {patient_id}: {e}") | |
| return [] | |
| def _parse_patient(self, patient_data: Dict) -> Optional[Dict]: | |
| """ | |
| Parse FHIR Patient resource into our format | |
| """ | |
| try: | |
| # Extract basic demographics | |
| name = "" | |
| if 'name' in patient_data and patient_data['name']: | |
| name_parts = patient_data['name'][0] | |
| given = name_parts.get('given', []) | |
| family = name_parts.get('family', '') | |
| name = f"{' '.join(given)} {family}".strip() | |
| # Extract address | |
| address = "" | |
| if 'address' in patient_data and patient_data['address']: | |
| addr = patient_data['address'][0] | |
| line = addr.get('line', []) | |
| city = addr.get('city', '') | |
| state = addr.get('state', '') | |
| postal_code = addr.get('postalCode', '') | |
| address = f"{', '.join(line)}, {city}, {state} {postal_code}".strip() | |
| # Extract contact info | |
| phone = "" | |
| email = "" | |
| if 'telecom' in patient_data: | |
| for telecom in patient_data['telecom']: | |
| if telecom.get('system') == 'phone': | |
| phone = telecom.get('value', '') | |
| elif telecom.get('system') == 'email': | |
| email = telecom.get('value', '') | |
| return { | |
| 'id': patient_data.get('id', ''), | |
| 'fhir_id': patient_data.get('id', ''), | |
| 'full_name': name, | |
| 'gender': patient_data.get('gender', 'unknown'), | |
| 'date_of_birth': patient_data.get('birthDate', ''), | |
| 'address': address, | |
| 'phone': phone, | |
| 'email': email, | |
| 'marital_status': self._get_marital_status(patient_data), | |
| 'language': self._get_language(patient_data), | |
| 'source': 'hapi_fhir', | |
| 'status': 'active', | |
| 'created_at': datetime.now().isoformat(), | |
| 'updated_at': datetime.now().isoformat() | |
| } | |
| except Exception as e: | |
| print(f"Error parsing patient data: {e}") | |
| return None | |
| def _parse_observation(self, observation_data: Dict) -> Optional[Dict]: | |
| """ | |
| Parse FHIR Observation resource | |
| """ | |
| try: | |
| code = observation_data.get('code', {}) | |
| coding = code.get('coding', []) | |
| code_text = code.get('text', '') | |
| if coding: | |
| code_text = coding[0].get('display', code_text) | |
| value = observation_data.get('valueQuantity', {}) | |
| unit = value.get('unit', '') | |
| value_amount = value.get('value', '') | |
| return { | |
| 'id': observation_data.get('id', ''), | |
| 'code': code_text, | |
| 'value': f"{value_amount} {unit}".strip(), | |
| 'date': observation_data.get('effectiveDateTime', ''), | |
| 'category': self._get_observation_category(observation_data) | |
| } | |
| except Exception as e: | |
| print(f"Error parsing observation: {e}") | |
| return None | |
| def _parse_medication(self, medication_data: Dict) -> Optional[Dict]: | |
| """ | |
| Parse FHIR MedicationRequest resource | |
| """ | |
| try: | |
| medication = medication_data.get('medicationCodeableConcept', {}) | |
| coding = medication.get('coding', []) | |
| name = medication.get('text', '') | |
| if coding: | |
| name = coding[0].get('display', name) | |
| dosage = medication_data.get('dosageInstruction', []) | |
| dosage_text = "" | |
| if dosage: | |
| dosage_text = dosage[0].get('text', '') | |
| return { | |
| 'id': medication_data.get('id', ''), | |
| 'name': name, | |
| 'dosage': dosage_text, | |
| 'status': medication_data.get('status', 'active'), | |
| 'prescribed_date': medication_data.get('authoredOn', ''), | |
| 'requester': self._get_practitioner_name(medication_data) | |
| } | |
| except Exception as e: | |
| print(f"Error parsing medication: {e}") | |
| return None | |
| def _parse_condition(self, condition_data: Dict) -> Optional[Dict]: | |
| """ | |
| Parse FHIR Condition resource | |
| """ | |
| try: | |
| code = condition_data.get('code', {}) | |
| coding = code.get('coding', []) | |
| name = code.get('text', '') | |
| if coding: | |
| name = coding[0].get('display', name) | |
| return { | |
| 'id': condition_data.get('id', ''), | |
| 'code': name, | |
| 'status': condition_data.get('clinicalStatus', {}).get('coding', [{}])[0].get('code', 'active'), | |
| 'onset_date': condition_data.get('onsetDateTime', ''), | |
| 'recorded_date': condition_data.get('recordedDate', ''), | |
| 'notes': condition_data.get('note', [{}])[0].get('text', '') if condition_data.get('note') else '' | |
| } | |
| except Exception as e: | |
| print(f"Error parsing condition: {e}") | |
| return None | |
| def _get_marital_status(self, patient_data: Dict) -> str: | |
| """Extract marital status from patient data""" | |
| if 'maritalStatus' in patient_data: | |
| coding = patient_data['maritalStatus'].get('coding', []) | |
| if coding: | |
| return coding[0].get('display', 'Unknown') | |
| return 'Unknown' | |
| def _get_language(self, patient_data: Dict) -> str: | |
| """Extract language from patient data""" | |
| if 'communication' in patient_data and patient_data['communication']: | |
| language = patient_data['communication'][0].get('language', {}) | |
| coding = language.get('coding', []) | |
| if coding: | |
| return coding[0].get('display', 'English') | |
| return 'English' | |
| def _get_observation_category(self, observation_data: Dict) -> str: | |
| """Extract observation category""" | |
| category = observation_data.get('category', {}) | |
| coding = category.get('coding', []) | |
| if coding: | |
| return coding[0].get('display', 'Unknown') | |
| return 'Unknown' | |
| def _get_practitioner_name(self, medication_data: Dict) -> str: | |
| """Extract practitioner name from medication request""" | |
| requester = medication_data.get('requester', {}) | |
| reference = requester.get('reference', '') | |
| if reference.startswith('Practitioner/'): | |
| # In a real implementation, you'd fetch the practitioner details | |
| return 'Dr. Practitioner' | |
| return 'Unknown' |