import os import sys import traceback from pathlib import Path from typing import List, Tuple, Any import duckdb import pandas as pd import numpy as np import matplotlib matplotlib.use("Agg") # headless for Spaces import matplotlib.pyplot as plt import gradio as gr # ========================= # Basic configuration # ========================= APP_TITLE = "ALCO Liquidity & Interest-Rate Risk Dashboard" TABLE_FQN = "my_db.main.masterdataset_v" # source table VIEW_FQN = "my_db.main.positions_v" # normalized view created by this app PRODUCT_ASSETS = [ "loan", "overdraft", "advances", "bills", "bill", "tbond", "t-bond", "tbill", "t-bill", "repo_asset", "assets" ] PRODUCT_SOF = [ "fd", "term_deposit", "td", "savings", "current", "call", "repo_liab" ] # ========================= # Helpers # ========================= def connect_md() -> duckdb.DuckDBPyConnection: token = os.environ.get("MOTHERDUCK_TOKEN", "") if not token: # In a real environment, this token should be securely managed raise RuntimeError("MOTHERDUCK_TOKEN is not set. Add it in Space → Settings → Secrets.") return duckdb.connect(f"md:?motherduck_token={token}") def discover_columns(conn: duckdb.DuckDBPyConnection, table_fqn: str) -> List[str]: # Try DESCRIBE first (fast), fall back to information_schema try: df = conn.execute(f"DESCRIBE {table_fqn};").fetchdf() name_col = "column_name" if "column_name" in df.columns else df.columns[0] return [str(c).lower() for c in df[name_col].tolist()] except Exception: df = conn.execute( f""" SELECT lower(column_name) AS col FROM information_schema.columns WHERE table_catalog = split_part('{table_fqn}', '.', 1) AND table_schema = split_part('{table_fqn}', '.', 2) AND table_name = split_part('{table_fqn}', '.', 3) """ ).fetchdf() return df["col"].tolist() def build_view_sql(existing_cols: List[str]) -> str: wanted = [ "as_of_date", "product", "months", "segments", "currency", "Portfolio_value", "Interest_rate", "days_to_maturity" ] sel = [] for c in wanted: if c.lower() in existing_cols: sel.append(c) else: # Cast nulls for consistency, assuming most positions have these columns if c in ("Portfolio_value", "Interest_rate", "days_to_maturity", "months"): sel.append(f"CAST(NULL AS DOUBLE) AS {c}") else: sel.append(f"CAST(NULL AS VARCHAR) AS {c}") sof_list = ", ".join([f"'{p}'" for p in PRODUCT_SOF]) asset_list = ", ".join([f"'{p}'" for p in PRODUCT_ASSETS]) bucket_case = ( f"CASE " f"WHEN lower(product) IN ({sof_list}) THEN 'SoF' " f"WHEN lower(product) IN ({asset_list}) THEN 'Assets' " f"ELSE 'Unknown' END AS bucket" ) select_sql = ",\n ".join(sel + [bucket_case]) return f""" CREATE OR REPLACE VIEW {VIEW_FQN} AS SELECT {select_sql} FROM {TABLE_FQN}; """ def ensure_view(conn: duckdb.DuckDBPyConnection, cols: List[str]) -> None: required = {"product", "portfolio_value", "days_to_maturity"} if not required.issubset(set(cols)): raise RuntimeError( f"Source table {TABLE_FQN} must contain columns {sorted(required)}; found {sorted(cols)}" ) conn.execute(build_view_sql(cols)) def safe_num(x) -> float: try: return float(0.0 if x is None or (isinstance(x, float) and np.isnan(x)) else x) except Exception: return 0.0 def zeros_like_index(index) -> pd.Series: return pd.Series([0] * len(index), index=index) def plot_ladder(df: pd.DataFrame): try: if df is None or df.empty: fig, ax = plt.subplots(figsize=(7, 3)) ax.text(0.5, 0.5, "No data", ha="center", va="center") ax.axis("off") return fig pivot = df.pivot(index="time_bucket", columns="bucket", values="Amount (LKR Mn)").fillna(0) # Re-order the standard liquidity buckets order = ["T+1", "T+2..7", "T+8..30", "T+31+"] pivot = pivot.reindex(order) fig, ax = plt.subplots(figsize=(7, 4)) assets = pivot["Assets"] if "Assets" in pivot.columns else zeros_like_index(pivot.index) sof = pivot["SoF"] if "SoF" in pivot.columns else zeros_like_index(pivot.index) ax.bar(pivot.index, assets, label="Assets", color="#4CAF50") ax.bar(pivot.index, -sof, label="SoF", color="#FF9800") ax.axhline(0, color="gray", lw=1) ax.set_ylabel("LKR (Mn)") ax.set_title("Maturity Ladder (Assets vs SoF)") ax.legend() fig.tight_layout() return fig except Exception as e: fig, ax = plt.subplots(figsize=(7, 3)) ax.text(0.01, 0.8, "Chart Error:", fontsize=12, ha="left") ax.text(0.01, 0.5, str(e), fontsize=10, ha="left", wrap=True) ax.axis("off") return fig # ========================= # Query fragments # ========================= KPI_SQL = f""" SELECT COALESCE(SUM(CASE WHEN bucket='Assets' AND days_to_maturity<=1 THEN Portfolio_value END),0) AS assets_t1, COALESCE(SUM(CASE WHEN bucket='SoF' AND days_to_maturity<=1 THEN Portfolio_value END),0) AS sof_t1, COALESCE(SUM(CASE WHEN bucket='Assets' AND days_to_maturity<=1 THEN Portfolio_value END),0) - COALESCE(SUM(CASE WHEN bucket='SoF' AND days_to_maturity<=1 THEN Portfolio_value END),0) AS net_gap_t1 FROM positions_v_stressed; """ LADDER_SQL = f""" SELECT CASE WHEN days_to_maturity <= 1 THEN 'T+1' WHEN days_to_maturity BETWEEN 2 AND 7 THEN 'T+2..7' WHEN days_to_maturity BETWEEN 8 AND 30 THEN 'T+8..30' ELSE 'T+31+' END AS time_bucket, bucket, SUM(stressed_pv) / 1000000.0 AS "Amount (LKR Mn)" FROM positions_v_stressed GROUP BY 1,2 ORDER BY 1,2; """ GAP_DRIVERS_SQL = f""" SELECT product, bucket, SUM(stressed_pv) / 1000000.0 AS "Amount (LKR Mn)" FROM positions_v_stressed WHERE days_to_maturity <= 1 GROUP BY 1, 2 ORDER BY 3 DESC; """ def get_duration_components_sql(cols: List[str]) -> str: """Calculates Modified Duration, Portfolio Value, and Weights for Assets/Liabilities.""" # Use days_to_maturity as the best proxy for repricing/duration tenor has_months = "months" in cols has_ir = "interest_rate" in cols # Time-to-Maturity (in years) used as proxy for Macaulay Duration (T) t_expr = "CASE WHEN days_to_maturity IS NOT NULL THEN days_to_maturity/365.0" if has_months: t_expr += " WHEN months IS NOT NULL THEN months/12.0" t_expr += " ELSE 0.0001 END" # Avoid division by zero, use minimal time if unknown # Yield (Interest Rate / 100) y_expr = "(Interest_rate/100.0)" if has_ir else "0.05" # Assume 5% if rate missing return f""" WITH irr_calcs AS ( SELECT bucket, stressed_pv, -- Approximate Modified Duration = (Time / (1 + Yield)) ({t_expr}) / (1 + {y_expr}) AS mod_dur FROM positions_v_stressed WHERE bucket IN ('Assets', 'SoF') ) SELECT bucket, SUM(stressed_pv) AS total_pv, SUM(stressed_pv * mod_dur) AS weighted_duration_sum FROM irr_calcs GROUP BY bucket; """ def get_nii_sensitivity_sql() -> str: """ Calculates the 1-Year Repricing Gap (Assets vs. Liabilities repricing within 1 year). This is a simplification used to estimate NII change (Delta NII). """ return f""" WITH repricing_volume AS ( SELECT bucket, -- Assume repricing happens within 1 year (365 days) SUM(CASE WHEN days_to_maturity <= 365 THEN stressed_pv ELSE 0 END) AS repricing_pv FROM positions_v_stressed WHERE bucket IN ('Assets', 'SoF') GROUP BY bucket ) SELECT COALESCE(SUM(CASE WHEN bucket = 'Assets' THEN repricing_pv ELSE 0 END), 0) AS assets_repricing_pv, COALESCE(SUM(CASE WHEN bucket = 'SoF' THEN repricing_pv ELSE 0 END), 0) AS liabilities_repricing_pv, -- Repricing Gap = Repricing Assets - Repricing Liabilities (COALESCE(SUM(CASE WHEN bucket = 'Assets' THEN repricing_pv ELSE 0 END), 0) - COALESCE(SUM(CASE WHEN bucket = 'SoF' THEN repricing_pv ELSE 0 END), 0)) AS repricing_gap FROM repricing_volume; """ # ========================= # Dashboard callback # ========================= def run_dashboard(scenario: str, runoff_pct: float, rate_shock_bps_input: float, nii_shock_bps: float) -> Tuple[str, str, str, str, str, Any, pd.DataFrame, pd.DataFrame, pd.DataFrame, str, pd.DataFrame]: """ Returns: status, as_of, a1_text, a2_text, a3_text, figure, ladder_df, irr_df (BPV), nii_df, explain_text, drivers_df """ try: conn = connect_md() # 1) Discover columns & ensure view is created cols = discover_columns(conn, TABLE_FQN) ensure_view(conn, cols) # --- Scenario Application --- stressed_view_fqn = "positions_v_stressed" runoff_factor = 1.0 rate_shock_bps = 0.0 # Used for EVE (BPV) and NII sensitivity if scenario == "Liquidity Stress: High Deposit Runoff" and runoff_pct > 0: runoff_factor = (100.0 - runoff_pct) / 100.0 # Set shock to 0 for Liquidity stress rate_shock_bps = 0.0 elif scenario == "IRR Stress: Rate Shock" and rate_shock_bps_input != 0: rate_shock_bps = rate_shock_bps_input # Use only run-off factor 1.0 (no liquidity stress) runoff_factor = 1.0 # Create temporary view with scenario adjustments for both PV and Rate # NOTE: Rate shock is currently only applied to derived metrics, not stored PV scenario_sql = f""" CREATE OR REPLACE TEMP VIEW {stressed_view_fqn} AS SELECT *, -- Apply runoff only to liabilities (SoF) CASE WHEN lower(product) IN ({', '.join([f"'{p}'" for p in PRODUCT_SOF])}) THEN Portfolio_value * {runoff_factor} ELSE Portfolio_value END AS stressed_pv, -- Apply rate shock to Interest_rate for NII/Duration modeling (optional, but good practice) Interest_rate + ({rate_shock_bps} / 100.0) AS stressed_ir FROM {VIEW_FQN}; """ conn.execute(scenario_sql) # 2) As-of (optional) as_of = "N/A" if "as_of_date" in cols: tmp = conn.execute(f"SELECT max(as_of_date) AS d FROM {VIEW_FQN}").fetchdf() if not tmp.empty and not pd.isna(tmp["d"].iloc[0]): as_of = str(tmp["d"].iloc[0])[:10] # 3) KPIs (Liquidity Gap) kpi = conn.execute(KPI_SQL).fetchdf() assets_t1 = safe_num(kpi["assets_t1"].iloc[0]) if not kpi.empty else 0.0 sof_t1 = safe_num(kpi["sof_t1"].iloc[0]) if not kpi.empty else 0.0 net_gap = safe_num(kpi["net_gap_t1"].iloc[0]) if not kpi.empty else 0.0 # 4) Ladder and Gap Drivers ladder = conn.execute(LADDER_SQL).fetchdf() drivers = conn.execute(GAP_DRIVERS_SQL).fetchdf() # 5) Duration Gap & BPV (IRR - EVE) duration_components = conn.execute(get_duration_components_sql(cols)).fetchdf() # Calculate Modified Duration (D_A, D_L) and L/A Ratio pv_assets = duration_components[duration_components['bucket'] == 'Assets']['total_pv'].sum() pv_liab = duration_components[duration_components['bucket'] == 'SoF']['total_pv'].sum() wd_assets = duration_components[duration_components['bucket'] == 'Assets']['weighted_duration_sum'].sum() wd_liab = duration_components[duration_components['bucket'] == 'SoF']['weighted_duration_sum'].sum() mod_dur_assets = wd_assets / pv_assets if pv_assets > 0 else 0.0 mod_dur_liab = wd_liab / pv_liab if pv_liab > 0 else 0.0 # L/A Ratio (Liabilities / Assets) l_a_ratio = pv_liab / pv_assets if pv_assets > 0 else 0.0 # Duration Gap = D_A – D_L × (L/A) duration_gap = mod_dur_assets - (mod_dur_liab * l_a_ratio) # BPV (Basis Point Value) / DV01 (Dollar Value of 01) # BPV is the combined sensitivity (SUM(PV * Mod_Dur)) * 0.0001 net_bpv = (wd_assets - wd_liab) * 0.0001 # Calculate EVE Impact eve_impact = net_bpv * rate_shock_bps # Create EVE/BPV display table irr_df = pd.DataFrame({ "Metric": ["Assets Mod. Duration (Yrs)", "Liabilities Mod. Duration (Yrs)", "Duration Gap (Yrs)", "Net BPV (LKR)"], "Value": [mod_dur_assets, mod_dur_liab, duration_gap, net_bpv] }) irr_df['Value'] = irr_df['Value'].map('{:,.4f}'.format) # 6) NII Sensitivity (IRR - NII) nii_data = conn.execute(get_nii_sensitivity_sql()).fetchdf() assets_repricing_pv = safe_num(nii_data["assets_repricing_pv"].iloc[0]) liabilities_repricing_pv = safe_num(nii_data["liabilities_repricing_pv"].iloc[0]) repricing_gap = safe_num(nii_data["repricing_gap"].iloc[0]) # NII Delta = Repricing Gap * (Rate Shock / 10000) nii_delta = repricing_gap * (nii_shock_bps / 10000.0) # Create NII display table (in Mn) nii_df = pd.DataFrame({ "Metric": [ "Assets Repricing (LKR Mn)", "Liabilities Repricing (LKR Mn)", "1-Year Repricing Gap (LKR Mn)", f"NII Delta (+{nii_shock_bps:.0f}bps Shock) (LKR Mn)" ], "Value": [ assets_repricing_pv / 1000000.0, liabilities_repricing_pv / 1000000.0, repricing_gap / 1000000.0, nii_delta / 1000000.0 ] }) nii_df['Value'] = nii_df['Value'].map('{:,.2f}'.format) # 7) Format output dataframes for UI ladder_display = ladder.copy() if "Amount (LKR Mn)" in ladder.columns: ladder_display["Amount (LKR Mn)"] = ladder_display["Amount (LKR Mn)"].map('{:,.2f}'.format) else: ladder_display = pd.DataFrame() drivers_display = drivers.copy() if "Amount (LKR Mn)" in drivers.columns: drivers_display["Amount (LKR Mn)"] = drivers_display["Amount (LKR Mn)"].map('{:,.2f}'.format) else: drivers_display = pd.DataFrame() # 8) Chart fig = plot_ladder(ladder) # 9) Explanations assets_t1_mn_str = f"{(assets_t1 / 1_000_000):,.2f}" sof_t1_mn_str = f"{(sof_t1 / 1_000_000):,.2f}" net_gap_mn_str = f"{(net_gap / 1_000_000):,.2f}" gap_sign_str = "positive (surplus)" if net_gap >= 0 else "negative (deficit)" a1_text = f"The amount of Assets maturing tomorrow (T+1) is **LKR {assets_t1_mn_str} Mn**." a2_text = f"The amount of Sources of Funds (SoF) maturing tomorrow (T+1) is **LKR {sof_t1_mn_str} Mn**." a3_text = f"The resulting Net Liquidity Gap for tomorrow (T+1) is **LKR {net_gap_mn_str} Mn**." # Build "Why" text sof_drivers = drivers[drivers["bucket"] == "SoF"] asset_drivers = drivers[drivers["bucket"] == "Assets"] top_sof_prod = sof_drivers.iloc[0] if not sof_drivers.empty else None top_asset_prod = asset_drivers.iloc[0] if not asset_drivers.empty else None explain_text = f"### Liquidity Gap Analysis (T+1)\n" explain_text += f"The T+1 Net Liquidity Gap is **LKR {net_gap_mn_str} Mn** ({gap_sign_str}).\n\n" if top_sof_prod is not None: explain_text += f"* **Largest Outflow:** From `{top_sof_prod['product']}` at **LKR {top_sof_prod['Amount (LKR Mn)']:,.2f} Mn**.\n" if top_asset_prod is not None: explain_text += f"* **Largest Inflow:** From `{top_asset_prod['product']}` at **LKR {top_asset_prod['Amount (LKR Mn)']:,.2f} Mn**.\n" # Add EVE/NII analysis to explanation explain_text += f"\n### Interest Rate Risk (IRR) Analysis\n" # NII Explain nii_delta_mn = safe_num(nii_delta / 1000000.0) repricing_gap_mn = safe_num(repricing_gap / 1000000.0) explain_text += f"* **NII Sensitivity:** Based on the 1-Year Repricing Gap (LKR {repricing_gap_mn:,.2f} Mn), a **+{nii_shock_bps:.0f} bps** rate shock suggests a **LKR {nii_delta_mn:,.2f} Mn** change in 1-year Net Interest Income.\n" # EVE Explain eve_impact_mn = safe_num(eve_impact / 1000000.0) explain_text += f"* **EVE Sensitivity:** The Duration Gap is **{duration_gap:,.2f} years**. A **+{rate_shock_bps:.0f} bps** parallel rate shock is projected to change the portfolio's Economic Value (EVE) by **LKR {eve_impact_mn:,.2f} Mn**." if scenario != "Baseline": explain_text += f"\n\n**SCENARIO ACTIVE:** Results reflect the '{scenario}' scenario." status = f"✅ OK (as of {pd.Timestamp.now().strftime('%Y-%m-%d %H:%M:%S')})" return ( status, as_of, a1_text, a2_text, a3_text, fig, ladder_display, irr_df, nii_df, explain_text, drivers_display, ) except Exception as e: tb = traceback.format_exc() empty_df = pd.DataFrame() fig = plot_ladder(empty_df) return ( f"❌ Error: {e}\n\n{tb}", "N/A", "0", "0", "0", fig, empty_df, empty_df, empty_df, "Analysis could not be performed.", empty_df, ) # ========================= # Build Gradio UI # ========================= with gr.Blocks(title=APP_TITLE) as demo: gr.Markdown(f"# {APP_TITLE}\n_Source:_ `{TABLE_FQN}` → `{VIEW_FQN}`") status = gr.Textbox(label="Status", interactive=False, lines=8) with gr.Row(): refresh_btn = gr.Button("🔄 Refresh/Calculate", variant="primary") theme_btn = gr.Button("🌗 Toggle Theme") theme_btn.click( None, None, js="() => { document.querySelector('html').classList.toggle('dark'); }" ) with gr.Row(): # --- Left Column: Controls and Explanations --- with gr.Column(scale=1): scenario_dd = gr.Dropdown( label="Select Stress Scenario", choices=["Baseline", "Liquidity Stress: High Deposit Runoff", "IRR Stress: Rate Shock"], value="Baseline" ) with gr.Accordion("Stress Scenario Parameters", open=True): runoff_slider = gr.Slider( label="Deposit Runoff (%)", minimum=0, maximum=100, step=5, value=20, info="For Liquidity Stress: Percentage of key deposits that run off." ) shock_slider = gr.Slider( label="EVE Rate Shock (bps)", minimum=-500, maximum=500, step=25, value=200, info="For IRR Stress: Parallel shift in the yield curve for EVE (Duration) calculation." ) nii_shock_slider = gr.Slider( label="NII Rate Shock (bps)", minimum=-500, maximum=500, step=25, value=100, info="For NII Sensitivity: Shock applied to 1-Year Repricing Gap." ) explain_text = gr.Markdown("Analysis of the T+1 gap and IRR will appear here...") # --- Right Column: KPIs, Charts, and Tables --- with gr.Column(scale=3): with gr.Row(): as_of = gr.Textbox(label="As of date", interactive=False) a1 = gr.Markdown("The amount of Assets maturing tomorrow (T+1) is...") a2 = gr.Markdown("The amount of Sources of Funds (SoF) maturing tomorrow (T+1) is...") a3 = gr.Markdown("The resulting Net Liquidity Gap for tomorrow (T+1) is...") chart = gr.Plot(label="Maturity Ladder") with gr.Tabs(): with gr.TabItem("Liquidity Gap Detail"): ladder_df = gr.Dataframe( headers=["Time Bucket", "Bucket", "Amount (LKR Mn)"], type="pandas" ) with gr.TabItem("T+1 Gap Drivers"): drivers_df = gr.Dataframe( headers=["Product", "Bucket", "Amount (LKR Mn)"], type="pandas" ) with gr.TabItem("IRR - EVE (Duration Gap)"): irr_df = gr.Dataframe( headers=["Metric", "Value"], type="pandas" ) with gr.TabItem("IRR - NII (Repricing Gap)"): nii_df = gr.Dataframe( headers=["Metric", "Value"], type="pandas" ) refresh_btn.click( fn=run_dashboard, inputs=[scenario_dd, runoff_slider, shock_slider, nii_shock_slider], outputs=[status, as_of, a1, a2, a3, chart, ladder_df, irr_df, nii_df, explain_text, drivers_df], ) if __name__ == "__main__": demo.launch()