Spaces:
Sleeping
Sleeping
| from fastapi import APIRouter, HTTPException, Depends, Response | |
| from db.mongo import patients_collection | |
| from core.security import get_current_user | |
| from utils.helpers import calculate_age, escape_latex_special_chars, hyphenate_long_strings, format_timestamp | |
| from datetime import datetime | |
| from bson import ObjectId | |
| from bson.errors import InvalidId | |
| import os | |
| import subprocess | |
| from tempfile import TemporaryDirectory | |
| from string import Template | |
| import logging | |
| # Configure logging | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(levelname)s - %(name)s - %(message)s' | |
| ) | |
| logger = logging.getLogger(__name__) | |
| router = APIRouter() | |
| async def generate_patient_pdf(patient_id: str, current_user: dict = Depends(get_current_user)): | |
| # Suppress logging for this route | |
| logger.setLevel(logging.CRITICAL) | |
| try: | |
| if current_user.get('role') not in ['doctor', 'admin']: | |
| raise HTTPException(status_code=403, detail="Only clinicians can generate patient PDFs") | |
| # Determine if patient_id is ObjectId or fhir_id | |
| try: | |
| obj_id = ObjectId(patient_id) | |
| query = {"$or": [{"_id": obj_id}, {"fhir_id": patient_id}]} | |
| except InvalidId: | |
| query = {"fhir_id": patient_id} | |
| patient = await patients_collection.find_one(query) | |
| if not patient: | |
| raise HTTPException(status_code=404, detail="Patient not found") | |
| # Prepare table content with proper LaTeX formatting | |
| def prepare_table_content(items, columns, default_message): | |
| if not items: | |
| return f"\\multicolumn{{{columns}}}{{l}}{{{default_message}}} \\\\" | |
| content = [] | |
| for item in items: | |
| row = [] | |
| for field in item: | |
| value = item.get(field, "") or "" | |
| row.append(escape_latex_special_chars(hyphenate_long_strings(value))) | |
| content.append(" & ".join(row) + " \\\\") | |
| return "\n".join(content) | |
| # Notes table | |
| notes = patient.get("notes", []) | |
| notes_content = prepare_table_content( | |
| [{ | |
| "date": format_timestamp(n.get("date", "")), | |
| "type": n.get("type", ""), | |
| "text": n.get("text", "") | |
| } for n in notes], | |
| 3, | |
| "No notes available" | |
| ) | |
| # Conditions table | |
| conditions = patient.get("conditions", []) | |
| conditions_content = prepare_table_content( | |
| [{ | |
| "id": c.get("id", ""), | |
| "code": c.get("code", ""), | |
| "status": c.get("status", ""), | |
| "onset": format_timestamp(c.get("onset_date", "")), | |
| "verification": c.get("verification_status", "") | |
| } for c in conditions], | |
| 5, | |
| "No conditions available" | |
| ) | |
| # Medications table | |
| medications = patient.get("medications", []) | |
| medications_content = prepare_table_content( | |
| [{ | |
| "id": m.get("id", ""), | |
| "name": m.get("name", ""), | |
| "status": m.get("status", ""), | |
| "date": format_timestamp(m.get("prescribed_date", "")), | |
| "dosage": m.get("dosage", "") | |
| } for m in medications], | |
| 5, | |
| "No medications available" | |
| ) | |
| # Encounters table | |
| encounters = patient.get("encounters", []) | |
| encounters_content = prepare_table_content( | |
| [{ | |
| "id": e.get("id", ""), | |
| "type": e.get("type", ""), | |
| "status": e.get("status", ""), | |
| "start": format_timestamp(e.get("period", {}).get("start", "")), | |
| "provider": e.get("service_provider", "") | |
| } for e in encounters], | |
| 5, | |
| "No encounters available" | |
| ) | |
| # LaTeX template with improved table formatting | |
| latex_template = Template(r""" | |
| \documentclass[a4paper,12pt]{article} | |
| \usepackage[utf8]{inputenc} | |
| \usepackage[T1]{fontenc} | |
| \usepackage{geometry} | |
| \geometry{margin=1in} | |
| \usepackage{booktabs,longtable,fancyhdr} | |
| \usepackage{array} | |
| \usepackage{microtype} | |
| \microtypesetup{expansion=false} | |
| \setlength{\headheight}{14.5pt} | |
| \pagestyle{fancy} | |
| \fancyhf{} | |
| \fancyhead[L]{Patient Report} | |
| \fancyhead[R]{Generated: \today} | |
| \fancyfoot[C]{\thepage} | |
| \begin{document} | |
| \begin{center} | |
| \Large\textbf{Patient Medical Report} \\ | |
| \vspace{0.2cm} | |
| \textit{Generated on $generated_on} | |
| \end{center} | |
| \section*{Demographics} | |
| \begin{itemize} | |
| \item \textbf{FHIR ID:} $fhir_id | |
| \item \textbf{Full Name:} $full_name | |
| \item \textbf{Gender:} $gender | |
| \item \textbf{Date of Birth:} $dob | |
| \item \textbf{Age:} $age | |
| \item \textbf{Address:} $address | |
| \item \textbf{Marital Status:} $marital_status | |
| \item \textbf{Language:} $language | |
| \end{itemize} | |
| \section*{Clinical Notes} | |
| \begin{longtable}[l]{>{\raggedright\arraybackslash}p{3.5cm}>{\raggedright\arraybackslash}p{3cm}>{\raggedright\arraybackslash}p{6.5cm}} | |
| \caption{Clinical Notes} \\ | |
| \toprule | |
| \textbf{Date} & \textbf{Type} & \textbf{Text} \\ | |
| \midrule | |
| $notes | |
| \bottomrule | |
| \end{longtable} | |
| \section*{Conditions} | |
| \begin{longtable}[l]{>{\raggedright\arraybackslash}p{2cm}>{\raggedright\arraybackslash}p{3cm}>{\raggedright\arraybackslash}p{2cm}>{\raggedright\arraybackslash}p{3.5cm}>{\raggedright\arraybackslash}p{3cm}} | |
| \caption{Conditions} \\ | |
| \toprule | |
| \textbf{ID} & \textbf{Code} & \textbf{Status} & \textbf{Onset} & \textbf{Verification} \\ | |
| \midrule | |
| $conditions | |
| \bottomrule | |
| \end{longtable} | |
| \section*{Medications} | |
| \begin{longtable}[l]{>{\raggedright\arraybackslash}p{2cm}>{\raggedright\arraybackslash}p{4cm}>{\raggedright\arraybackslash}p{2cm}>{\raggedright\arraybackslash}p{3.5cm}>{\raggedright\arraybackslash}p{3cm}} | |
| \caption{Medications} \\ | |
| \toprule | |
| \textbf{ID} & \textbf{Name} & \textbf{Status} & \textbf{Date} & \textbf{Dosage} \\ | |
| \midrule | |
| $medications | |
| \bottomrule | |
| \end{longtable} | |
| \section*{Encounters} | |
| \begin{longtable}[l]{>{\raggedright\arraybackslash}p{2.5cm}>{\raggedright\arraybackslash}p{4.5cm}>{\raggedright\arraybackslash}p{2.5cm}>{\raggedright\arraybackslash}p{4.5cm}>{\raggedright\arraybackslash}p{3.5cm}} | |
| \caption{Encounters} \\ | |
| \toprule | |
| \textbf{ID} & \textbf{Type} & \textbf{Status} & \textbf{Start} & \textbf{Provider} \\ | |
| \midrule | |
| $encounters | |
| \bottomrule | |
| \end{longtable} | |
| \end{document} | |
| """) | |
| # Set the generated_on date to 02:54 PM CET, May 17, 2025 | |
| generated_on = datetime.strptime("2025-05-17 14:54:00+02:00", "%Y-%m-%d %H:%M:%S%z").strftime("%A, %B %d, %Y at %I:%M %p %Z") | |
| latex_filled = latex_template.substitute( | |
| generated_on=generated_on, | |
| fhir_id=escape_latex_special_chars(hyphenate_long_strings(patient.get("fhir_id", "") or "")), | |
| full_name=escape_latex_special_chars(patient.get("full_name", "") or ""), | |
| gender=escape_latex_special_chars(patient.get("gender", "") or ""), | |
| dob=escape_latex_special_chars(patient.get("date_of_birth", "") or ""), | |
| age=escape_latex_special_chars(str(calculate_age(patient.get("date_of_birth", "")) or "N/A")), | |
| address=escape_latex_special_chars(", ".join(filter(None, [ | |
| patient.get("address", ""), | |
| patient.get("city", ""), | |
| patient.get("state", ""), | |
| patient.get("postal_code", ""), | |
| patient.get("country", "") | |
| ]))), | |
| marital_status=escape_latex_special_chars(patient.get("marital_status", "") or ""), | |
| language=escape_latex_special_chars(patient.get("language", "") or ""), | |
| notes=notes_content, | |
| conditions=conditions_content, | |
| medications=medications_content, | |
| encounters=encounters_content | |
| ) | |
| # Compile LaTeX in a temporary directory | |
| with TemporaryDirectory() as tmpdir: | |
| tex_path = os.path.join(tmpdir, "report.tex") | |
| pdf_path = os.path.join(tmpdir, "report.pdf") | |
| with open(tex_path, "w", encoding="utf-8") as f: | |
| f.write(latex_filled) | |
| try: | |
| # Run latexmk twice to ensure proper table rendering | |
| for _ in range(2): | |
| result = subprocess.run( | |
| ["latexmk", "-pdf", "-interaction=nonstopmode", tex_path], | |
| cwd=tmpdir, | |
| check=False, | |
| capture_output=True, | |
| text=True | |
| ) | |
| if result.returncode != 0: | |
| raise HTTPException( | |
| status_code=500, | |
| detail=f"LaTeX compilation failed: stdout={result.stdout}, stderr={result.stderr}" | |
| ) | |
| except subprocess.CalledProcessError as e: | |
| raise HTTPException( | |
| status_code=500, | |
| detail=f"LaTeX compilation failed: stdout={e.stdout}, stderr={e.stderr}" | |
| ) | |
| if not os.path.exists(pdf_path): | |
| raise HTTPException( | |
| status_code=500, | |
| detail="PDF file was not generated" | |
| ) | |
| with open(pdf_path, "rb") as f: | |
| pdf_bytes = f.read() | |
| response = Response( | |
| content=pdf_bytes, | |
| media_type="application/pdf", | |
| headers={"Content-Disposition": f"attachment; filename=patient_{patient.get('fhir_id', 'unknown')}_report.pdf"} | |
| ) | |
| return response | |
| except HTTPException as http_error: | |
| raise http_error | |
| except Exception as e: | |
| raise HTTPException( | |
| status_code=500, | |
| detail=f"Unexpected error generating PDF: {str(e)}" | |
| ) | |
| finally: | |
| # Restore the logger level for other routes | |
| logger.setLevel(logging.INFO) | |
| # Export the router as 'pdf' for api.__init__.py | |
| pdf = router |