ysharma's picture
ysharma HF Staff
Update app.py
f16db96 verified
"""
Space Keeper v4 - Keeps HuggingFace Spaces alive during hackathon evaluation
"""
import os
import json
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime, timezone
from pathlib import Path
import threading
import gradio as gr
import requests
from huggingface_hub import HfApi
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.interval import IntervalTrigger
# =============================================================================
# CONFIGURATION
# =============================================================================
ORG_NAME = os.environ.get("ORG_NAME", "MCP-1st-Birthday")
PING_INTERVAL_HOURS = int(os.environ.get("PING_INTERVAL_HOURS", "12"))
REQUEST_TIMEOUT = int(os.environ.get("REQUEST_TIMEOUT", "30"))
PARALLEL_REQUESTS = int(os.environ.get("PARALLEL_REQUESTS", "10"))
HF_TOKEN = os.environ.get("HF_TOKEN", None)
LOG_FILE = Path("run_logs.json")
MAX_LOG_ENTRIES = 100
# =============================================================================
# GLOBAL STATE
# =============================================================================
scheduler = BackgroundScheduler()
api = HfApi(token=HF_TOKEN) if HF_TOKEN else HfApi()
APP_START_TIME = datetime.now(timezone.utc)
log_lock = threading.Lock()
# =============================================================================
# LOGGING FUNCTIONS
# =============================================================================
def load_logs() -> list:
with log_lock:
if LOG_FILE.exists():
try:
with open(LOG_FILE, "r") as f:
return json.load(f)
except Exception:
return []
return []
def save_logs(logs: list):
with log_lock:
logs = logs[-MAX_LOG_ENTRIES:]
with open(LOG_FILE, "w") as f:
json.dump(logs, f, indent=2)
# =============================================================================
# PING FUNCTIONS
# =============================================================================
def ping_space(space_id: str) -> dict:
"""Ping a single Space and return the result."""
org, name = space_id.split("/")
app_url = f"https://{org.lower()}-{name.lower()}.hf.space"
hf_page_url = f"https://huggingface.co/spaces/{space_id}"
try:
response = requests.get(app_url, timeout=REQUEST_TIMEOUT)
return {
"space_id": space_id,
"status": "success",
"status_code": response.status_code,
"url_pinged": app_url,
"error": None
}
except requests.Timeout:
return {
"space_id": space_id,
"status": "timeout",
"status_code": None,
"url_pinged": app_url,
"error": f"Timed out after {REQUEST_TIMEOUT}s"
}
except Exception as e:
try:
response = requests.get(hf_page_url, timeout=REQUEST_TIMEOUT)
return {
"space_id": space_id,
"status": "success",
"status_code": response.status_code,
"url_pinged": hf_page_url,
"error": None
}
except Exception:
return {
"space_id": space_id,
"status": "error",
"status_code": None,
"url_pinged": app_url,
"error": str(e)
}
def scheduled_job():
"""Wrapper for the scheduled job."""
print(f"[{datetime.now(timezone.utc).isoformat()}] Running scheduled ping job...")
start_time = datetime.now(timezone.utc)
try:
spaces = list(api.list_spaces(author=ORG_NAME))
except Exception as e:
run_result = {
"timestamp": start_time.isoformat(),
"triggered_by": "scheduler",
"status": "error",
"error": f"Failed to list Spaces: {str(e)}",
"total_spaces": 0,
"successful": 0,
"failed": 0,
"duration_seconds": 0,
"private_spaces": [],
"results": []
}
logs = load_logs()
logs.append(run_result)
save_logs(logs)
return
# Separate private and public spaces
private_space_names = [s.id.split("/")[1] for s in spaces if getattr(s, 'private', False)]
space_ids = [space.id for space in spaces]
results = []
with ThreadPoolExecutor(max_workers=PARALLEL_REQUESTS) as executor:
future_to_space = {executor.submit(ping_space, sid): sid for sid in space_ids}
for future in as_completed(future_to_space):
results.append(future.result())
end_time = datetime.now(timezone.utc)
duration = (end_time - start_time).total_seconds()
successful = sum(1 for r in results if r["status"] == "success")
failed = len(results) - successful
run_result = {
"timestamp": start_time.isoformat(),
"triggered_by": "scheduler",
"status": "completed",
"error": None,
"total_spaces": len(results),
"successful": successful,
"failed": failed,
"duration_seconds": round(duration, 2),
"private_spaces": private_space_names,
"results": results
}
logs = load_logs()
logs.append(run_result)
save_logs(logs)
print(f"[{end_time.isoformat()}] Completed: {successful}/{len(results)} successful in {duration:.1f}s")
# =============================================================================
# UI HELPER FUNCTIONS
# =============================================================================
def get_uptime() -> str:
delta = datetime.now(timezone.utc) - APP_START_TIME
hours, remainder = divmod(int(delta.total_seconds()), 3600)
minutes, seconds = divmod(remainder, 60)
if hours > 0:
return f"{hours}h {minutes}m"
elif minutes > 0:
return f"{minutes}m {seconds}s"
return f"{seconds}s"
def get_next_run_time() -> str:
jobs = scheduler.get_jobs()
if jobs and jobs[0].next_run_time:
return jobs[0].next_run_time.strftime('%Y-%m-%d %H:%M:%S UTC')
return "Not scheduled"
def get_stats() -> dict:
logs = load_logs()
if not logs:
return {"total_runs": 0, "last_run": "Never", "last_run_status": "N/A"}
last_run = logs[-1]
return {
"total_runs": len(logs),
"last_run": last_run["timestamp"],
"last_run_status": "βœ… Success" if last_run.get("failed", 0) == 0 else f"⚠️ {last_run.get('failed', 0)} failed",
"last_run_spaces": last_run.get("total_spaces", 0)
}
def format_status_panel() -> str:
stats = get_stats()
token_status = "βœ… Set (can see private Spaces)" if HF_TOKEN else "❌ Not set (only public Spaces visible)"
return f"""## βš™οΈ Configuration
| Setting | Value |
|---------|-------|
| **Organization** | `{ORG_NAME}` |
| **Ping Interval** | Every {PING_INTERVAL_HOURS} hours |
| **Parallel Requests** | {PARALLEL_REQUESTS} at once |
| **Request Timeout** | {REQUEST_TIMEOUT}s |
| **HF Token** | {token_status} |
## πŸ“Š Statistics
| Metric | Value |
|--------|-------|
| **App Uptime** | {get_uptime()} |
| **Total Runs** | {stats['total_runs']} |
| **Last Run** | {stats['last_run']} |
## ⏰ Next Scheduled Run
**{get_next_run_time()}**
"""
def format_logs_panel() -> str:
logs = load_logs()
if not logs:
return "## πŸ“œ Run History\n\n*No runs yet. Click **πŸš€ Run Now** to start!*"
output = "## πŸ“œ Run History (most recent first)\n\n"
for run in reversed(logs[-15:]):
ts = run["timestamp"]
by = run["triggered_by"]
if run["status"] == "error":
output += f"❌ **{ts}** | {by} | Error: {run['error']}\n\n"
else:
emoji = "βœ…" if run["failed"] == 0 else "⚠️"
private_count = len(run.get("private_spaces", []))
private_info = f" | πŸ”’ {private_count} private" if private_count > 0 else ""
output += f"{emoji} **{ts}** | {by} | "
output += f"**{run['total_spaces']}** spaces | "
output += f"{run['successful']} ok, {run['failed']} failed{private_info} | "
output += f"{run['duration_seconds']}s\n"
if run["failed"] > 0:
failed_names = [r["space_id"].split("/")[1] for r in run.get("results", []) if r["status"] != "success"][:5]
output += f" ↳ Failed: `{'`, `'.join(failed_names)}`"
if run["failed"] > 5:
output += f" +{run['failed']-5} more"
output += "\n"
output += "\n"
return output
def format_private_spaces_accordion(private_spaces: list) -> str:
"""Format the private spaces list for the accordion."""
if not private_spaces:
return "*No private Spaces found in this run.*"
output = f"**{len(private_spaces)} private Spaces** were pinged:\n\n"
# Create columns for better readability
for i, name in enumerate(sorted(private_spaces)):
output += f"- `{name}`\n"
return output
# =============================================================================
# MAIN RUN FUNCTION WITH LIVE PROGRESS
# =============================================================================
def manual_trigger_with_progress():
"""Manually trigger a ping run with live progress updates."""
yield (
"## πŸ”„ Starting...\n\nFetching Space list from HuggingFace API...",
format_status_panel(),
format_logs_panel(),
"*Run in progress...*"
)
start_time = datetime.now(timezone.utc)
# Get spaces from API
try:
spaces = list(api.list_spaces(author=ORG_NAME))
except Exception as e:
error_msg = f"## ❌ Failed\n\nCould not list Spaces: {str(e)}"
run_result = {
"timestamp": start_time.isoformat(),
"triggered_by": "manual",
"status": "error",
"error": str(e),
"total_spaces": 0,
"successful": 0,
"failed": 0,
"duration_seconds": 0,
"private_spaces": [],
"results": []
}
logs = load_logs()
logs.append(run_result)
save_logs(logs)
yield error_msg, format_status_panel(), format_logs_panel(), "*Error during run*"
return
# Separate private and public spaces
private_spaces = []
public_spaces = []
for s in spaces:
name = s.id.split("/")[1]
if getattr(s, 'private', False):
private_spaces.append(name)
else:
public_spaces.append(name)
space_ids = [space.id for space in spaces]
total = len(space_ids)
info_msg = f"## πŸ”„ Running...\n\n"
info_msg += f"**Found {total} Spaces** ({len(public_spaces)} public, {len(private_spaces)} private)\n\n"
info_msg += f"Pinging {PARALLEL_REQUESTS} Spaces in parallel...\n\n"
info_msg += f"`[0/{total}]` β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–‘ 0%"
yield info_msg, format_status_panel(), format_logs_panel(), format_private_spaces_accordion(private_spaces)
# Run pings
results = []
completed = 0
successful = 0
failed = 0
with ThreadPoolExecutor(max_workers=PARALLEL_REQUESTS) as executor:
future_to_space = {executor.submit(ping_space, sid): sid for sid in space_ids}
for future in as_completed(future_to_space):
result = future.result()
results.append(result)
completed += 1
if result["status"] == "success":
successful += 1
else:
failed += 1
pct = int((completed / total) * 100)
filled = int(pct / 10)
bar = "β–ˆ" * filled + "β–‘" * (10 - filled)
status_emoji = "βœ…" if result["status"] == "success" else "❌"
space_name = result["space_id"].split("/")[1]
live_status = f"""## πŸ”„ Running...
**Progress:** `[{completed}/{total}]` {bar} {pct}%
**Last:** {status_emoji} `{space_name}` {'β€” ' + result.get('error', '') if result['status'] != 'success' else ''}
**Running total:** βœ… {successful} successful, ❌ {failed} failed
"""
if completed % 5 == 0 or completed == total:
yield live_status, format_status_panel(), format_logs_panel(), format_private_spaces_accordion(private_spaces)
# Save results
end_time = datetime.now(timezone.utc)
duration = (end_time - start_time).total_seconds()
run_result = {
"timestamp": start_time.isoformat(),
"triggered_by": "manual",
"status": "completed",
"error": None,
"total_spaces": total,
"successful": successful,
"failed": failed,
"duration_seconds": round(duration, 2),
"private_spaces": private_spaces,
"results": results
}
logs = load_logs()
logs.append(run_result)
save_logs(logs)
# Final summary
if failed == 0:
final = f"""## βœ… Completed Successfully!
**Pinged {total} Spaces** ({len(public_spaces)} public, {len(private_spaces)} private) in {duration:.1f}s
All Spaces responded! πŸŽ‰
"""
else:
failed_spaces = [r for r in results if r["status"] != "success"]
failed_list = "\n".join([f"- `{r['space_id'].split('/')[1]}` β€” {r.get('error', 'Unknown error')}" for r in failed_spaces[:10]])
if len(failed_spaces) > 10:
failed_list += f"\n- ... and {len(failed_spaces) - 10} more"
final = f"""## ⚠️ Completed with {failed} issue(s)
**Pinged {total} Spaces** ({len(public_spaces)} public, {len(private_spaces)} private) in {duration:.1f}s
βœ… {successful} successful
❌ {failed} failed
### Failed Spaces:
{failed_list}
---
*Failures usually mean the Space is paused, has an error, or uses a non-standard SDK.*
"""
yield final, format_status_panel(), format_logs_panel(), format_private_spaces_accordion(private_spaces)
def refresh_all():
# Get private spaces from last run if available
logs = load_logs()
private_spaces = []
if logs:
private_spaces = logs[-1].get("private_spaces", [])
return format_status_panel(), format_logs_panel(), format_private_spaces_accordion(private_spaces)
# =============================================================================
# GRADIO UI
# =============================================================================
with gr.Blocks() as demo:
gr.Markdown("""# πŸ”„ Space Keeper
Keeps HuggingFace Spaces alive by pinging them to prevent sleeping during hackathon evaluation.
""")
with gr.Row():
with gr.Column(scale=1):
status_panel = gr.Markdown(format_status_panel())
with gr.Row():
run_btn = gr.Button("πŸš€ Run Now", variant="primary", size="lg")
refresh_btn = gr.Button("πŸ”„ Refresh", size="lg")
with gr.Column(scale=1):
live_output = gr.Markdown("## Ready\n\nClick **πŸš€ Run Now** to ping all Spaces.\n\nAuto-runs every 12 hours when this Space is awake.")
gr.Markdown("---")
with gr.Accordion("πŸ”’ Private Spaces", open=False):
private_spaces_display = gr.Markdown("*Run a ping to see private Spaces list.*")
logs_panel = gr.Markdown(format_logs_panel())
run_btn.click(
fn=manual_trigger_with_progress,
outputs=[live_output, status_panel, logs_panel, private_spaces_display]
)
refresh_btn.click(
fn=refresh_all,
outputs=[status_panel, logs_panel, private_spaces_display]
)
# =============================================================================
# START SCHEDULER
# =============================================================================
scheduler.add_job(
scheduled_job,
trigger=IntervalTrigger(hours=PING_INTERVAL_HOURS),
id="ping_job",
name="Ping all Spaces",
replace_existing=True
)
scheduler.start()
print(f"[{APP_START_TIME.isoformat()}] Space Keeper started for {ORG_NAME}")
print(f" Ping interval: {PING_INTERVAL_HOURS}h | Next run: {get_next_run_time()}")
if __name__ == "__main__":
demo.launch()