Ali2206's picture
Initial CPS-API deployment with TxAgent integration
11102bd
raw
history blame
10.1 kB
from fastapi import APIRouter, HTTPException, Depends, Response
from db.mongo import patients_collection
from core.security import get_current_user
from utils.helpers import calculate_age, escape_latex_special_chars, hyphenate_long_strings, format_timestamp
from datetime import datetime
from bson import ObjectId
from bson.errors import InvalidId
import os
import subprocess
from tempfile import TemporaryDirectory
from string import Template
import logging
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(name)s - %(message)s'
)
logger = logging.getLogger(__name__)
router = APIRouter()
@router.get("/{patient_id}/pdf", response_class=Response)
async def generate_patient_pdf(patient_id: str, current_user: dict = Depends(get_current_user)):
# Suppress logging for this route
logger.setLevel(logging.CRITICAL)
try:
if current_user.get('role') not in ['doctor', 'admin']:
raise HTTPException(status_code=403, detail="Only clinicians can generate patient PDFs")
# Determine if patient_id is ObjectId or fhir_id
try:
obj_id = ObjectId(patient_id)
query = {"$or": [{"_id": obj_id}, {"fhir_id": patient_id}]}
except InvalidId:
query = {"fhir_id": patient_id}
patient = await patients_collection.find_one(query)
if not patient:
raise HTTPException(status_code=404, detail="Patient not found")
# Prepare table content with proper LaTeX formatting
def prepare_table_content(items, columns, default_message):
if not items:
return f"\\multicolumn{{{columns}}}{{l}}{{{default_message}}} \\\\"
content = []
for item in items:
row = []
for field in item:
value = item.get(field, "") or ""
row.append(escape_latex_special_chars(hyphenate_long_strings(value)))
content.append(" & ".join(row) + " \\\\")
return "\n".join(content)
# Notes table
notes = patient.get("notes", [])
notes_content = prepare_table_content(
[{
"date": format_timestamp(n.get("date", "")),
"type": n.get("type", ""),
"text": n.get("text", "")
} for n in notes],
3,
"No notes available"
)
# Conditions table
conditions = patient.get("conditions", [])
conditions_content = prepare_table_content(
[{
"id": c.get("id", ""),
"code": c.get("code", ""),
"status": c.get("status", ""),
"onset": format_timestamp(c.get("onset_date", "")),
"verification": c.get("verification_status", "")
} for c in conditions],
5,
"No conditions available"
)
# Medications table
medications = patient.get("medications", [])
medications_content = prepare_table_content(
[{
"id": m.get("id", ""),
"name": m.get("name", ""),
"status": m.get("status", ""),
"date": format_timestamp(m.get("prescribed_date", "")),
"dosage": m.get("dosage", "")
} for m in medications],
5,
"No medications available"
)
# Encounters table
encounters = patient.get("encounters", [])
encounters_content = prepare_table_content(
[{
"id": e.get("id", ""),
"type": e.get("type", ""),
"status": e.get("status", ""),
"start": format_timestamp(e.get("period", {}).get("start", "")),
"provider": e.get("service_provider", "")
} for e in encounters],
5,
"No encounters available"
)
# LaTeX template with improved table formatting
latex_template = Template(r"""
\documentclass[a4paper,12pt]{article}
\usepackage[utf8]{inputenc}
\usepackage[T1]{fontenc}
\usepackage{geometry}
\geometry{margin=1in}
\usepackage{booktabs,longtable,fancyhdr}
\usepackage{array}
\usepackage{microtype}
\microtypesetup{expansion=false}
\setlength{\headheight}{14.5pt}
\pagestyle{fancy}
\fancyhf{}
\fancyhead[L]{Patient Report}
\fancyhead[R]{Generated: \today}
\fancyfoot[C]{\thepage}
\begin{document}
\begin{center}
\Large\textbf{Patient Medical Report} \\
\vspace{0.2cm}
\textit{Generated on $generated_on}
\end{center}
\section*{Demographics}
\begin{itemize}
\item \textbf{FHIR ID:} $fhir_id
\item \textbf{Full Name:} $full_name
\item \textbf{Gender:} $gender
\item \textbf{Date of Birth:} $dob
\item \textbf{Age:} $age
\item \textbf{Address:} $address
\item \textbf{Marital Status:} $marital_status
\item \textbf{Language:} $language
\end{itemize}
\section*{Clinical Notes}
\begin{longtable}[l]{>{\raggedright\arraybackslash}p{3.5cm}>{\raggedright\arraybackslash}p{3cm}>{\raggedright\arraybackslash}p{6.5cm}}
\caption{Clinical Notes} \\
\toprule
\textbf{Date} & \textbf{Type} & \textbf{Text} \\
\midrule
$notes
\bottomrule
\end{longtable}
\section*{Conditions}
\begin{longtable}[l]{>{\raggedright\arraybackslash}p{2cm}>{\raggedright\arraybackslash}p{3cm}>{\raggedright\arraybackslash}p{2cm}>{\raggedright\arraybackslash}p{3.5cm}>{\raggedright\arraybackslash}p{3cm}}
\caption{Conditions} \\
\toprule
\textbf{ID} & \textbf{Code} & \textbf{Status} & \textbf{Onset} & \textbf{Verification} \\
\midrule
$conditions
\bottomrule
\end{longtable}
\section*{Medications}
\begin{longtable}[l]{>{\raggedright\arraybackslash}p{2cm}>{\raggedright\arraybackslash}p{4cm}>{\raggedright\arraybackslash}p{2cm}>{\raggedright\arraybackslash}p{3.5cm}>{\raggedright\arraybackslash}p{3cm}}
\caption{Medications} \\
\toprule
\textbf{ID} & \textbf{Name} & \textbf{Status} & \textbf{Date} & \textbf{Dosage} \\
\midrule
$medications
\bottomrule
\end{longtable}
\section*{Encounters}
\begin{longtable}[l]{>{\raggedright\arraybackslash}p{2.5cm}>{\raggedright\arraybackslash}p{4.5cm}>{\raggedright\arraybackslash}p{2.5cm}>{\raggedright\arraybackslash}p{4.5cm}>{\raggedright\arraybackslash}p{3.5cm}}
\caption{Encounters} \\
\toprule
\textbf{ID} & \textbf{Type} & \textbf{Status} & \textbf{Start} & \textbf{Provider} \\
\midrule
$encounters
\bottomrule
\end{longtable}
\end{document}
""")
# Set the generated_on date to 02:54 PM CET, May 17, 2025
generated_on = datetime.strptime("2025-05-17 14:54:00+02:00", "%Y-%m-%d %H:%M:%S%z").strftime("%A, %B %d, %Y at %I:%M %p %Z")
latex_filled = latex_template.substitute(
generated_on=generated_on,
fhir_id=escape_latex_special_chars(hyphenate_long_strings(patient.get("fhir_id", "") or "")),
full_name=escape_latex_special_chars(patient.get("full_name", "") or ""),
gender=escape_latex_special_chars(patient.get("gender", "") or ""),
dob=escape_latex_special_chars(patient.get("date_of_birth", "") or ""),
age=escape_latex_special_chars(str(calculate_age(patient.get("date_of_birth", "")) or "N/A")),
address=escape_latex_special_chars(", ".join(filter(None, [
patient.get("address", ""),
patient.get("city", ""),
patient.get("state", ""),
patient.get("postal_code", ""),
patient.get("country", "")
]))),
marital_status=escape_latex_special_chars(patient.get("marital_status", "") or ""),
language=escape_latex_special_chars(patient.get("language", "") or ""),
notes=notes_content,
conditions=conditions_content,
medications=medications_content,
encounters=encounters_content
)
# Compile LaTeX in a temporary directory
with TemporaryDirectory() as tmpdir:
tex_path = os.path.join(tmpdir, "report.tex")
pdf_path = os.path.join(tmpdir, "report.pdf")
with open(tex_path, "w", encoding="utf-8") as f:
f.write(latex_filled)
try:
# Run latexmk twice to ensure proper table rendering
for _ in range(2):
result = subprocess.run(
["latexmk", "-pdf", "-interaction=nonstopmode", tex_path],
cwd=tmpdir,
check=False,
capture_output=True,
text=True
)
if result.returncode != 0:
raise HTTPException(
status_code=500,
detail=f"LaTeX compilation failed: stdout={result.stdout}, stderr={result.stderr}"
)
except subprocess.CalledProcessError as e:
raise HTTPException(
status_code=500,
detail=f"LaTeX compilation failed: stdout={e.stdout}, stderr={e.stderr}"
)
if not os.path.exists(pdf_path):
raise HTTPException(
status_code=500,
detail="PDF file was not generated"
)
with open(pdf_path, "rb") as f:
pdf_bytes = f.read()
response = Response(
content=pdf_bytes,
media_type="application/pdf",
headers={"Content-Disposition": f"attachment; filename=patient_{patient.get('fhir_id', 'unknown')}_report.pdf"}
)
return response
except HTTPException as http_error:
raise http_error
except Exception as e:
raise HTTPException(
status_code=500,
detail=f"Unexpected error generating PDF: {str(e)}"
)
finally:
# Restore the logger level for other routes
logger.setLevel(logging.INFO)
# Export the router as 'pdf' for api.__init__.py
pdf = router