Spaces:
Running
Running
| import os, re, json, codecs, pathlib, logging, threading, traceback, inspect, unicodedata | |
| from datetime import datetime, timezone, timedelta | |
| from functools import wraps | |
| from flask import Flask, render_template, request, redirect, url_for, session, jsonify, flash | |
| from werkzeug.middleware.proxy_fix import ProxyFix | |
| from werkzeug.exceptions import HTTPException | |
| from huggingface_hub import snapshot_download | |
| logging.basicConfig(level=logging.INFO, format="%(asctime)s | %(levelname)s | %(message)s") | |
| log = logging.getLogger("papua-app") | |
| app = Flask(__name__, template_folder="frontend", static_folder="static") | |
| app.wsgi_app = ProxyFix(app.wsgi_app, x_for=1, x_proto=1, x_host=1) | |
| app.config.update( | |
| SECRET_KEY=os.getenv("SECRET_KEY", "dev-secret-change-me"), | |
| SESSION_COOKIE_NAME="hfspace_session", | |
| SESSION_COOKIE_SAMESITE="None", | |
| SESSION_COOKIE_SECURE=True, | |
| SESSION_COOKIE_HTTPONLY=True, | |
| SESSION_COOKIE_PATH="/", | |
| PREFERRED_URL_SCHEME="https", | |
| ) | |
| app.permanent_session_lifetime = timedelta(hours=8) | |
| PRELOAD_MODEL = os.getenv("PRELOAD_MODEL", "true").lower() in ("1","true","yes") | |
| FALLBACK_TRANSLATE = os.getenv("FALLBACK_TRANSLATE", "false").lower() in ("1","true","yes") | |
| PUBLIC_APP_URL = os.getenv("PUBLIC_APP_URL", "").strip() | |
| def inject_globals(): | |
| return {"public_app_url": PUBLIC_APP_URL} | |
| from sqlalchemy import create_engine, Column, Integer, Text, DateTime, ForeignKey, func | |
| from sqlalchemy.orm import declarative_base, sessionmaker, scoped_session, relationship | |
| DATABASE_URL = os.getenv("DATABASE_URL") or os.getenv("DB_URL") | |
| if not DATABASE_URL: | |
| DATABASE_URL = "sqlite:////tmp/app.db" | |
| log.warning("[DB] DATABASE_URL tidak diset; pakai SQLite /tmp/app.db") | |
| else: | |
| if DATABASE_URL.startswith("postgres://"): | |
| DATABASE_URL = DATABASE_URL.replace("postgres://", "postgresql+psycopg2://", 1) | |
| elif DATABASE_URL.startswith("postgresql://"): | |
| DATABASE_URL = DATABASE_URL.replace("postgresql://", "postgresql+psycopg2://", 1) | |
| if DATABASE_URL.startswith("postgresql+psycopg2") and "sslmode=" not in DATABASE_URL: | |
| sep = "&" if "?" in DATABASE_URL else "?" | |
| DATABASE_URL = f"{DATABASE_URL}{sep}sslmode=require" | |
| engine = create_engine(DATABASE_URL, pool_pre_ping=True) | |
| SessionLocal = scoped_session(sessionmaker(bind=engine, autoflush=False, autocommit=False)) | |
| Base = declarative_base() | |
| class User(Base): | |
| __tablename__ = "users" | |
| id = Column(Integer, primary_key=True) | |
| email = Column(Text, unique=True, nullable=False) | |
| pass_hash = Column(Text, nullable=False) | |
| created_at = Column(DateTime(timezone=True), server_default=func.now()) | |
| class Translation(Base): | |
| __tablename__ = "translations" | |
| id = Column(Integer, primary_key=True) | |
| user_id = Column(Integer, ForeignKey("users.id"), nullable=False) | |
| src = Column(Text, nullable=False) | |
| mt = Column(Text, nullable=False) | |
| created_at = Column(DateTime(timezone=True), server_default=func.now()) | |
| user = relationship("User") | |
| try: | |
| Base.metadata.create_all(engine) | |
| log.info("[DB] Ready") | |
| except Exception as e: | |
| log.exception("[DB] init error: %s", e) | |
| from werkzeug.security import generate_password_hash, check_password_hash | |
| def set_password(user: User, raw: str): user.pass_hash = generate_password_hash(raw) | |
| def verify_password(user: User, raw: str) -> bool: | |
| try: | |
| return check_password_hash(user.pass_hash, raw) | |
| except Exception: | |
| return False | |
| def login_required(fn): | |
| def _wrap(*args, **kwargs): | |
| if not session.get("uid"): | |
| return redirect(url_for("login_get")) | |
| return fn(*args, **kwargs) | |
| return _wrap | |
| PRENORM_LEVEL = os.getenv("PRENORM_LEVEL", "basic").lower() | |
| PRENORM_DEBUG = os.getenv("PRENORM_DEBUG", "0") == "1" | |
| WS_RE = re.compile("\s+") | |
| ELONG_RE = re.compile("([bcdfghjklmnpqrstvwxyz])\1{2,}", flags=re.IGNORECASE) | |
| PUNC_RE = re.compile("[^\w\s,.;:?!%()\-\—/]|_") | |
| MULTI_PUNC = re.compile("([,.;:?!])\1+") | |
| DASH_SPACES= re.compile("\s*([-—/])\s*") | |
| WHITELIST_KEEP_ELONG = {"papua", "koteka", "wamena", "sarmi", "sorong"} | |
| PAPUA_MAP = { | |
| "sa": "saya", "sy": "saya", "beta": "saya", | |
| "ko": "kamu", "kau": "kamu", | |
| "dong": "mereka", "kam": "kalian", "kamong": "kalian", | |
| "kitong": "kita", "kitorang": "kita", "torang": "kita", | |
| "tra": "tidak", "tr": "tidak", "trada": "tidak ada", "son": "tidak", "ndak": "tidak", "tid": "tidak", | |
| "mo": "mau", "su": "sudah", "uda": "sudah", | |
| "skarang": "sekarang", "td": "tadi", "tar": "nanti", "tarlah": "nanti", | |
| "pigi": "pergi", "pi": "pergi", | |
| "ma": "sama", "deng": "dengan", "dgn": "dengan", | |
| "kira2": "kira-kira", "bgmn": "bagaimana", "gmn": "bagaimana", | |
| "tau": "tahu", "tao": "tahu" | |
| } | |
| PRON_MAP = { | |
| "sa": "saya", "saya": "saya", | |
| "ko": "kamu", "kamu": "kamu", | |
| "dia": "dia", "dong": "mereka", | |
| "kam": "kalian", "kalian": "kalian", | |
| "kitong": "kita", "kitorang": "kita", "kita": "kita", "torang": "kita", | |
| } | |
| def _normalize_unicode(text: str) -> str: | |
| return unicodedata.normalize("NFKC", text) | |
| def _strip_emoji_and_noise(text: str) -> str: | |
| text = PUNC_RE.sub(" ", text) | |
| text = MULTI_PUNC.sub(r"\1", text) | |
| text = DASH_SPACES.sub(r" \1 ", text) | |
| return text | |
| def _normalize_ws(text: str) -> str: | |
| return WS_RE.sub(" ", text).strip() | |
| def _reduce_elongation(token: str) -> str: | |
| base = token.lower() | |
| if base in WHITELIST_KEEP_ELONG: | |
| return token | |
| return ELONG_RE.sub(r"\1\1", token) | |
| def _apply_papua_map(token: str) -> str: | |
| low = token.lower() | |
| return PAPUA_MAP.get(low, token) | |
| def _handle_pu_constructs(text: str) -> str: | |
| def repl(m): | |
| pron = m.group(1).lower() | |
| rest = m.group(2).strip() | |
| pron_std = PRON_MAP.get(pron, pron) | |
| return f"punya {pron_std} {rest}" | |
| return re.sub("\b(sa|saya|ko|kamu|dia|dong|kam|kalian|kitong|kitorang|kita|torang)\s*pu\s+([^.,;:!?]+)", | |
| repl, text, flags=re.IGNORECASE) | |
| def _handle_mo_bigram(text: str) -> str: | |
| def repl(m): | |
| pron = m.group(1).lower() | |
| pron_std = PRON_MAP.get(pron, pron) | |
| return f"{pron_std} mau" | |
| return re.sub("\b(sa|saya|ko|kamu|dia|dong|kam|kalian|kitong|kitorang|kita|torang)\s+mo\b", | |
| repl, text, flags=re.IGNORECASE) | |
| def _handle_negation_bigrams(text: str) -> str: | |
| def repl_pron(m): | |
| pron = m.group(1).lower() | |
| pron_std = PRON_MAP.get(pron, pron) | |
| return f"{pron_std} tidak" | |
| text = re.sub("\b(sa|saya|ko|kamu|dia|dong|kam|kalian|kitong|kitorang|kita|torang)\s+(tra|ndak|son|tid)\b", | |
| repl_pron, text, flags=re.IGNORECASE) | |
| text = re.sub("\btra\s+ada\b", "tidak ada", text, flags=re.IGNORECASE) | |
| text = re.sub("\bndak\s+ada\b", "tidak ada", text, flags=re.IGNORECASE) | |
| text = re.sub("\btid\s+ada\b", "tidak ada", text, flags=re.IGNORECASE) | |
| text = re.sub("\bson\s+ada\b", "tidak ada", text, flags=re.IGNORECASE) | |
| text = re.sub("\btidak\s+tau\b", "tidak tahu", text, flags=re.IGNORECASE) | |
| return text | |
| def _token_level_ops(text: str, aggressive: bool) -> str: | |
| tokens = text.split() | |
| out = [] | |
| for t in tokens: | |
| t2 = _reduce_elongation(t) if aggressive else t | |
| t3 = _apply_papua_map(t2) | |
| out.append(t3) | |
| return " ".join(out) | |
| def papua_prenorm(inp: str, level: str = "basic", return_trace: bool = False): | |
| if level == "off": | |
| return (inp, {}) if return_trace else inp | |
| s1 = _normalize_unicode(inp) | |
| s2 = _strip_emoji_and_noise(s1) if level == "aggressive" else s1 | |
| s3 = _normalize_ws(s2) | |
| s3a = _handle_mo_bigram(s3) | |
| s3b = _handle_negation_bigrams(s3a) | |
| s4 = _handle_pu_constructs(s3b) | |
| s5 = _token_level_ops(s4, aggressive=(level == "aggressive")) | |
| s6 = _normalize_ws(s5) | |
| return (s6, {"final": s6}) if return_trace else s6 | |
| def prenorm(text: str) -> str: | |
| if PRENORM_DEBUG: | |
| out, tr = papua_prenorm(text, level=PRENORM_LEVEL, return_trace=True) | |
| log.info("[PRENORM][%s] %s -> %s | trace=%s", PRENORM_LEVEL, text, out, json.dumps(tr, ensure_ascii=False)) | |
| return out | |
| return papua_prenorm(text, level=PRENORM_LEVEL, return_trace=False) | |
| BASE_MODEL_ID = os.getenv("BASE_MODEL_ID", "amosnbn/cendol-mt5-base-inst") | |
| ADAPTER_ID = os.getenv("ADAPTER_ID", "amosnbn/papua-lora-ckpt-168") | |
| DEVICE = "cuda" if os.getenv("DEVICE", "cpu") == "cuda" else "cpu" | |
| TOK = None | |
| MODEL = None | |
| _MODEL_LOCK = threading.Lock() | |
| _MODEL_READY = False | |
| _MODEL_ERROR = None | |
| def _strip_bom_in_dir(root_dir: str): | |
| root = pathlib.Path(root_dir) | |
| for p in root.rglob("*.json"): | |
| try: | |
| with codecs.open(p, "r", encoding="utf-8-sig") as f: | |
| data = json.load(f) | |
| with open(p, "w", encoding="utf-8") as f: | |
| json.dump(data, f, ensure_ascii=False, indent=2) | |
| log.info(f"[BOM] stripped: {p}") | |
| except Exception as e: | |
| log.warning(f"[BOM] skip {p}: {e}") | |
| def _sanitize_adapter_config(adapter_dir: str): | |
| try: | |
| from peft import LoraConfig | |
| sig = inspect.signature(LoraConfig.__init__) | |
| allowed = set(p.name for p in sig.parameters.values()) | |
| except Exception as e: | |
| log.warning(f"[SAN] Tidak bisa analisa LoraConfig: {e}") | |
| return | |
| cfg_path = pathlib.Path(adapter_dir) / "adapter_config.json" | |
| if not cfg_path.exists(): | |
| for alt in ("adapter_config.json","adapter_config_0.json","config.json"): | |
| c = pathlib.Path(adapter_dir) / alt | |
| if c.exists(): | |
| cfg_path = c | |
| break | |
| if not cfg_path.exists(): | |
| log.warning(f"[SAN] adapter_config tidak ditemukan di {adapter_dir}") | |
| return | |
| try: | |
| with codecs.open(cfg_path, "r", encoding="utf-8-sig") as f: | |
| cfg = json.load(f) | |
| cleaned = {k: v for k, v in cfg.items() if k in allowed} | |
| if set(cleaned.keys()) != set(cfg.keys()): | |
| with open(cfg_path, "w", encoding="utf-8") as f: | |
| json.dump(cleaned, f, ensure_ascii=False, indent=2) | |
| log.info("[SAN] adapter_config dibersihkan dari field tidak dikenal") | |
| except Exception as e: | |
| log.warning(f"[SAN] gagal baca/bersih adapter_config: {e}") | |
| def _load_model(): | |
| global TOK, MODEL, _MODEL_READY, _MODEL_ERROR | |
| try: | |
| log.info("[MODEL] downloading base=%s adapter=%s", BASE_MODEL_ID, ADAPTER_ID or "-") | |
| base_dir = snapshot_download( | |
| repo_id=BASE_MODEL_ID, | |
| local_dir="/tmp/hf_base", | |
| local_dir_use_symlinks=False, | |
| allow_patterns=None, | |
| ) | |
| _strip_bom_in_dir(base_dir) | |
| adapter_dir = None | |
| if ADAPTER_ID: | |
| adapter_dir = snapshot_download( | |
| repo_id=ADAPTER_ID, | |
| local_dir="/tmp/hf_adapter", | |
| local_dir_use_symlinks=False, | |
| allow_patterns=None, | |
| ) | |
| _strip_bom_in_dir(adapter_dir) | |
| _sanitize_adapter_config(adapter_dir) | |
| import torch | |
| from transformers import AutoTokenizer, AutoModelForSeq2SeqLM | |
| from peft import PeftModel | |
| TOK = AutoTokenizer.from_pretrained(base_dir) | |
| base = AutoModelForSeq2SeqLM.from_pretrained(base_dir) | |
| MODEL = PeftModel.from_pretrained(base, adapter_dir) if adapter_dir else base | |
| MODEL.eval().to(DEVICE) | |
| _MODEL_READY = True | |
| _MODEL_ERROR = None | |
| log.info("[MODEL] ready on %s", DEVICE) | |
| except Exception as e: | |
| _MODEL_READY = False | |
| _MODEL_ERROR = f"{type(e).__name__}: {e}" | |
| log.exception("[MODEL] load error") | |
| def get_model(): | |
| global MODEL | |
| if MODEL is None: | |
| with _MODEL_LOCK: | |
| if MODEL is None: | |
| _load_model() | |
| return TOK, MODEL | |
| def translate_with_model(text: str, max_new_tokens: int = 48) -> str: | |
| import torch | |
| tok, m = get_model() | |
| if not _MODEL_READY or m is None: | |
| raise RuntimeError(f"Model not ready: {_MODEL_ERROR or 'unknown error'}") | |
| enc = tok([text], return_tensors="pt", truncation=True, max_length=256) | |
| enc = {k: v.to(DEVICE) for k, v in enc.items()} | |
| out = m.generate( | |
| **enc, | |
| max_new_tokens=int(max_new_tokens), | |
| num_beams=4, | |
| length_penalty=0.9, | |
| no_repeat_ngram_size=3, | |
| early_stopping=True, | |
| ) | |
| return tok.decode(out[0], skip_special_tokens=True) | |
| def _preload_thread(): | |
| try: | |
| _load_model() | |
| except Exception: | |
| pass | |
| if PRELOAD_MODEL: | |
| threading.Thread(target=_preload_thread, daemon=True).start() | |
| def _log_req(): | |
| if request.path not in ("/health","/ping","/favicon.ico"): | |
| log.info("[REQ] %s %s", request.method, request.path) | |
| def _http_err(e: HTTPException): | |
| return e | |
| def _err(e): | |
| log.exception("Unhandled error") | |
| return "Internal Server Error", 500 | |
| def diag(): | |
| import sys | |
| try: | |
| import torch, transformers, peft | |
| torch_v = torch.__version__ | |
| tf_v = transformers.__version__ | |
| peft_v = peft.__version__ | |
| except Exception as e: | |
| torch_v = tf_v = peft_v = f"import error: {e}" | |
| return jsonify({ | |
| "ok": True, | |
| "time": datetime.now(timezone.utc).isoformat(), | |
| "device": DEVICE, | |
| "base_model": BASE_MODEL_ID, | |
| "adapter": ADAPTER_ID or None, | |
| "model_ready": _MODEL_READY, | |
| "model_error": _MODEL_ERROR, | |
| "versions": {"python": sys.version, "torch": torch_v, "transformers": tf_v, "peft": peft_v}, | |
| "preload": PRELOAD_MODEL, | |
| "prenorm": {"level": PRENORM_LEVEL, "debug": PRENORM_DEBUG}, | |
| "public_app_url": PUBLIC_APP_URL or None | |
| }) | |
| def health(): | |
| return jsonify({"ok": True, "time": datetime.now(timezone.utc).isoformat()}) | |
| def ping(): | |
| return jsonify({"ok": True, "time": datetime.now(timezone.utc).isoformat()}) | |
| def login_get(): | |
| if session.get("uid"): | |
| return redirect(url_for("index")) | |
| return render_template("login.html") | |
| def login_post(): | |
| email = (request.form.get("email") or "").strip().lower() | |
| pwd = request.form.get("password") or "" | |
| if not email or not pwd: | |
| flash("Isi email dan password", "error") | |
| return redirect(url_for("login_get")) | |
| with SessionLocal() as s: | |
| u = s.query(User).filter_by(email=email).first() | |
| if not u or not verify_password(u, pwd): | |
| flash("Email atau password salah", "error") | |
| return redirect(url_for("login_get")) | |
| session.permanent = True | |
| session["uid"], session["email"] = u.id, u.email | |
| return redirect(url_for("index")) | |
| def register_get(): | |
| if session.get("uid"): | |
| return redirect(url_for("index")) | |
| return render_template("register.html") | |
| def register_post(): | |
| email = (request.form.get("email") or "").strip().lower() | |
| pwd = (request.form.get("password") or "") | |
| if not email or not pwd: | |
| flash("Isi email dan password", "error") | |
| return redirect(url_for("register_get")) | |
| with SessionLocal() as s: | |
| if s.query(User).filter_by(email=email).first(): | |
| flash("Email sudah terdaftar", "error") | |
| return redirect(url_for("register_get")) | |
| u = User(email=email); set_password(u, pwd) | |
| s.add(u); s.commit() | |
| flash("Registrasi berhasil. Silakan login.", "success") | |
| return redirect(url_for("login_get")) | |
| def logout(): | |
| session.clear() | |
| return redirect(url_for("login_get")) | |
| def index(): | |
| with SessionLocal() as s: | |
| uid = session["uid"] | |
| items = (s.query(Translation) | |
| .filter(Translation.user_id == uid) | |
| .order_by(Translation.id.desc()) | |
| .limit(10).all()) | |
| recent = [{"src": it.src, "mt": it.mt, "created_at": it.created_at.strftime("%Y-%m-%d %H:%M")} for it in items] | |
| return render_template("index.html", logged_in=True, device=DEVICE, recent=recent) | |
| def about_page(): | |
| return render_template("about.html") | |
| def api_history(): | |
| if not session.get("uid"): | |
| return jsonify({"items": []}) | |
| with SessionLocal() as s: | |
| uid = session["uid"] | |
| items = (s.query(Translation) | |
| .filter(Translation.user_id == uid) | |
| .order_by(Translation.id.desc()) | |
| .limit(10).all()) | |
| out = [{"src": it.src, "mt": it.mt, "created_at": it.created_at.strftime("%Y-%m-%d %H:%M")} for it in items] | |
| return jsonify({"items": out}) | |
| def api_translate(): | |
| if not session.get("uid"): | |
| return jsonify({"ok": False, "error": "Unauthorized"}), 401 | |
| payload = request.get_json(silent=True) or {} | |
| text = (payload.get("text") or "").strip() | |
| max_new = int(payload.get("max_new_tokens", 48)) | |
| if not text: | |
| return jsonify({"ok": False, "error": "Empty text"}), 400 | |
| try: | |
| clean = prenorm(text) | |
| mt = f"[FAKE] {clean}" if FALLBACK_TRANSLATE else translate_with_model(clean, max_new_tokens=max_new) | |
| with SessionLocal() as s: | |
| s.add(Translation(user_id=session["uid"], src=text, mt=mt)) | |
| s.commit() | |
| return jsonify({"ok": True, "mt": mt}) | |
| except Exception as e: | |
| log.error("[API] translate error: %s", e) | |
| log.error(traceback.format_exc()) | |
| return jsonify({"ok": False, "error": f"{type(e).__name__}: {e}"}), 500 | |
| if __name__ == "__main__": | |
| app.run(host="0.0.0.0", port=int(os.getenv("PORT", "7860")), debug=False) | |