import os, re, json, codecs, pathlib, logging, threading, traceback, inspect, unicodedata from datetime import datetime, timezone, timedelta from functools import wraps from flask import Flask, render_template, request, redirect, url_for, session, jsonify, flash from werkzeug.middleware.proxy_fix import ProxyFix from werkzeug.exceptions import HTTPException from huggingface_hub import snapshot_download logging.basicConfig(level=logging.INFO, format="%(asctime)s | %(levelname)s | %(message)s") log = logging.getLogger("papua-app") app = Flask(__name__, template_folder="frontend", static_folder="static") app.wsgi_app = ProxyFix(app.wsgi_app, x_for=1, x_proto=1, x_host=1) app.config.update( SECRET_KEY=os.getenv("SECRET_KEY", "dev-secret-change-me"), SESSION_COOKIE_NAME="hfspace_session", SESSION_COOKIE_SAMESITE="None", SESSION_COOKIE_SECURE=True, SESSION_COOKIE_HTTPONLY=True, SESSION_COOKIE_PATH="/", PREFERRED_URL_SCHEME="https", ) app.permanent_session_lifetime = timedelta(hours=8) PRELOAD_MODEL = os.getenv("PRELOAD_MODEL", "true").lower() in ("1","true","yes") FALLBACK_TRANSLATE = os.getenv("FALLBACK_TRANSLATE", "false").lower() in ("1","true","yes") PUBLIC_APP_URL = os.getenv("PUBLIC_APP_URL", "").strip() @app.context_processor def inject_globals(): return {"public_app_url": PUBLIC_APP_URL} from sqlalchemy import create_engine, Column, Integer, Text, DateTime, ForeignKey, func from sqlalchemy.orm import declarative_base, sessionmaker, scoped_session, relationship DATABASE_URL = os.getenv("DATABASE_URL") or os.getenv("DB_URL") if not DATABASE_URL: DATABASE_URL = "sqlite:////tmp/app.db" log.warning("[DB] DATABASE_URL tidak diset; pakai SQLite /tmp/app.db") else: if DATABASE_URL.startswith("postgres://"): DATABASE_URL = DATABASE_URL.replace("postgres://", "postgresql+psycopg2://", 1) elif DATABASE_URL.startswith("postgresql://"): DATABASE_URL = DATABASE_URL.replace("postgresql://", "postgresql+psycopg2://", 1) if DATABASE_URL.startswith("postgresql+psycopg2") and "sslmode=" not in DATABASE_URL: sep = "&" if "?" in DATABASE_URL else "?" DATABASE_URL = f"{DATABASE_URL}{sep}sslmode=require" engine = create_engine(DATABASE_URL, pool_pre_ping=True) SessionLocal = scoped_session(sessionmaker(bind=engine, autoflush=False, autocommit=False)) Base = declarative_base() class User(Base): __tablename__ = "users" id = Column(Integer, primary_key=True) email = Column(Text, unique=True, nullable=False) pass_hash = Column(Text, nullable=False) created_at = Column(DateTime(timezone=True), server_default=func.now()) class Translation(Base): __tablename__ = "translations" id = Column(Integer, primary_key=True) user_id = Column(Integer, ForeignKey("users.id"), nullable=False) src = Column(Text, nullable=False) mt = Column(Text, nullable=False) created_at = Column(DateTime(timezone=True), server_default=func.now()) user = relationship("User") try: Base.metadata.create_all(engine) log.info("[DB] Ready") except Exception as e: log.exception("[DB] init error: %s", e) from werkzeug.security import generate_password_hash, check_password_hash def set_password(user: User, raw: str): user.pass_hash = generate_password_hash(raw) def verify_password(user: User, raw: str) -> bool: try: return check_password_hash(user.pass_hash, raw) except Exception: return False def login_required(fn): @wraps(fn) def _wrap(*args, **kwargs): if not session.get("uid"): return redirect(url_for("login_get")) return fn(*args, **kwargs) return _wrap PRENORM_LEVEL = os.getenv("PRENORM_LEVEL", "basic").lower() PRENORM_DEBUG = os.getenv("PRENORM_DEBUG", "0") == "1" WS_RE = re.compile("\s+") ELONG_RE = re.compile("([bcdfghjklmnpqrstvwxyz])\1{2,}", flags=re.IGNORECASE) PUNC_RE = re.compile("[^\w\s,.;:?!%()\-\—/]|_") MULTI_PUNC = re.compile("([,.;:?!])\1+") DASH_SPACES= re.compile("\s*([-—/])\s*") WHITELIST_KEEP_ELONG = {"papua", "koteka", "wamena", "sarmi", "sorong"} PAPUA_MAP = { "sa": "saya", "sy": "saya", "beta": "saya", "ko": "kamu", "kau": "kamu", "dong": "mereka", "kam": "kalian", "kamong": "kalian", "kitong": "kita", "kitorang": "kita", "torang": "kita", "tra": "tidak", "tr": "tidak", "trada": "tidak ada", "son": "tidak", "ndak": "tidak", "tid": "tidak", "mo": "mau", "su": "sudah", "uda": "sudah", "skarang": "sekarang", "td": "tadi", "tar": "nanti", "tarlah": "nanti", "pigi": "pergi", "pi": "pergi", "ma": "sama", "deng": "dengan", "dgn": "dengan", "kira2": "kira-kira", "bgmn": "bagaimana", "gmn": "bagaimana", "tau": "tahu", "tao": "tahu" } PRON_MAP = { "sa": "saya", "saya": "saya", "ko": "kamu", "kamu": "kamu", "dia": "dia", "dong": "mereka", "kam": "kalian", "kalian": "kalian", "kitong": "kita", "kitorang": "kita", "kita": "kita", "torang": "kita", } def _normalize_unicode(text: str) -> str: return unicodedata.normalize("NFKC", text) def _strip_emoji_and_noise(text: str) -> str: text = PUNC_RE.sub(" ", text) text = MULTI_PUNC.sub(r"\1", text) text = DASH_SPACES.sub(r" \1 ", text) return text def _normalize_ws(text: str) -> str: return WS_RE.sub(" ", text).strip() def _reduce_elongation(token: str) -> str: base = token.lower() if base in WHITELIST_KEEP_ELONG: return token return ELONG_RE.sub(r"\1\1", token) def _apply_papua_map(token: str) -> str: low = token.lower() return PAPUA_MAP.get(low, token) def _handle_pu_constructs(text: str) -> str: def repl(m): pron = m.group(1).lower() rest = m.group(2).strip() pron_std = PRON_MAP.get(pron, pron) return f"punya {pron_std} {rest}" return re.sub("\b(sa|saya|ko|kamu|dia|dong|kam|kalian|kitong|kitorang|kita|torang)\s*pu\s+([^.,;:!?]+)", repl, text, flags=re.IGNORECASE) def _handle_mo_bigram(text: str) -> str: def repl(m): pron = m.group(1).lower() pron_std = PRON_MAP.get(pron, pron) return f"{pron_std} mau" return re.sub("\b(sa|saya|ko|kamu|dia|dong|kam|kalian|kitong|kitorang|kita|torang)\s+mo\b", repl, text, flags=re.IGNORECASE) def _handle_negation_bigrams(text: str) -> str: def repl_pron(m): pron = m.group(1).lower() pron_std = PRON_MAP.get(pron, pron) return f"{pron_std} tidak" text = re.sub("\b(sa|saya|ko|kamu|dia|dong|kam|kalian|kitong|kitorang|kita|torang)\s+(tra|ndak|son|tid)\b", repl_pron, text, flags=re.IGNORECASE) text = re.sub("\btra\s+ada\b", "tidak ada", text, flags=re.IGNORECASE) text = re.sub("\bndak\s+ada\b", "tidak ada", text, flags=re.IGNORECASE) text = re.sub("\btid\s+ada\b", "tidak ada", text, flags=re.IGNORECASE) text = re.sub("\bson\s+ada\b", "tidak ada", text, flags=re.IGNORECASE) text = re.sub("\btidak\s+tau\b", "tidak tahu", text, flags=re.IGNORECASE) return text def _token_level_ops(text: str, aggressive: bool) -> str: tokens = text.split() out = [] for t in tokens: t2 = _reduce_elongation(t) if aggressive else t t3 = _apply_papua_map(t2) out.append(t3) return " ".join(out) def papua_prenorm(inp: str, level: str = "basic", return_trace: bool = False): if level == "off": return (inp, {}) if return_trace else inp s1 = _normalize_unicode(inp) s2 = _strip_emoji_and_noise(s1) if level == "aggressive" else s1 s3 = _normalize_ws(s2) s3a = _handle_mo_bigram(s3) s3b = _handle_negation_bigrams(s3a) s4 = _handle_pu_constructs(s3b) s5 = _token_level_ops(s4, aggressive=(level == "aggressive")) s6 = _normalize_ws(s5) return (s6, {"final": s6}) if return_trace else s6 def prenorm(text: str) -> str: if PRENORM_DEBUG: out, tr = papua_prenorm(text, level=PRENORM_LEVEL, return_trace=True) log.info("[PRENORM][%s] %s -> %s | trace=%s", PRENORM_LEVEL, text, out, json.dumps(tr, ensure_ascii=False)) return out return papua_prenorm(text, level=PRENORM_LEVEL, return_trace=False) BASE_MODEL_ID = os.getenv("BASE_MODEL_ID", "amosnbn/cendol-mt5-base-inst") ADAPTER_ID = os.getenv("ADAPTER_ID", "amosnbn/papua-lora-ckpt-168") DEVICE = "cuda" if os.getenv("DEVICE", "cpu") == "cuda" else "cpu" TOK = None MODEL = None _MODEL_LOCK = threading.Lock() _MODEL_READY = False _MODEL_ERROR = None def _strip_bom_in_dir(root_dir: str): root = pathlib.Path(root_dir) for p in root.rglob("*.json"): try: with codecs.open(p, "r", encoding="utf-8-sig") as f: data = json.load(f) with open(p, "w", encoding="utf-8") as f: json.dump(data, f, ensure_ascii=False, indent=2) log.info(f"[BOM] stripped: {p}") except Exception as e: log.warning(f"[BOM] skip {p}: {e}") def _sanitize_adapter_config(adapter_dir: str): try: from peft import LoraConfig sig = inspect.signature(LoraConfig.__init__) allowed = set(p.name for p in sig.parameters.values()) except Exception as e: log.warning(f"[SAN] Tidak bisa analisa LoraConfig: {e}") return cfg_path = pathlib.Path(adapter_dir) / "adapter_config.json" if not cfg_path.exists(): for alt in ("adapter_config.json","adapter_config_0.json","config.json"): c = pathlib.Path(adapter_dir) / alt if c.exists(): cfg_path = c break if not cfg_path.exists(): log.warning(f"[SAN] adapter_config tidak ditemukan di {adapter_dir}") return try: with codecs.open(cfg_path, "r", encoding="utf-8-sig") as f: cfg = json.load(f) cleaned = {k: v for k, v in cfg.items() if k in allowed} if set(cleaned.keys()) != set(cfg.keys()): with open(cfg_path, "w", encoding="utf-8") as f: json.dump(cleaned, f, ensure_ascii=False, indent=2) log.info("[SAN] adapter_config dibersihkan dari field tidak dikenal") except Exception as e: log.warning(f"[SAN] gagal baca/bersih adapter_config: {e}") def _load_model(): global TOK, MODEL, _MODEL_READY, _MODEL_ERROR try: log.info("[MODEL] downloading base=%s adapter=%s", BASE_MODEL_ID, ADAPTER_ID or "-") base_dir = snapshot_download( repo_id=BASE_MODEL_ID, local_dir="/tmp/hf_base", local_dir_use_symlinks=False, allow_patterns=None, ) _strip_bom_in_dir(base_dir) adapter_dir = None if ADAPTER_ID: adapter_dir = snapshot_download( repo_id=ADAPTER_ID, local_dir="/tmp/hf_adapter", local_dir_use_symlinks=False, allow_patterns=None, ) _strip_bom_in_dir(adapter_dir) _sanitize_adapter_config(adapter_dir) import torch from transformers import AutoTokenizer, AutoModelForSeq2SeqLM from peft import PeftModel TOK = AutoTokenizer.from_pretrained(base_dir) base = AutoModelForSeq2SeqLM.from_pretrained(base_dir) MODEL = PeftModel.from_pretrained(base, adapter_dir) if adapter_dir else base MODEL.eval().to(DEVICE) _MODEL_READY = True _MODEL_ERROR = None log.info("[MODEL] ready on %s", DEVICE) except Exception as e: _MODEL_READY = False _MODEL_ERROR = f"{type(e).__name__}: {e}" log.exception("[MODEL] load error") def get_model(): global MODEL if MODEL is None: with _MODEL_LOCK: if MODEL is None: _load_model() return TOK, MODEL def translate_with_model(text: str, max_new_tokens: int = 48) -> str: import torch tok, m = get_model() if not _MODEL_READY or m is None: raise RuntimeError(f"Model not ready: {_MODEL_ERROR or 'unknown error'}") enc = tok([text], return_tensors="pt", truncation=True, max_length=256) enc = {k: v.to(DEVICE) for k, v in enc.items()} out = m.generate( **enc, max_new_tokens=int(max_new_tokens), num_beams=4, length_penalty=0.9, no_repeat_ngram_size=3, early_stopping=True, ) return tok.decode(out[0], skip_special_tokens=True) def _preload_thread(): try: _load_model() except Exception: pass if PRELOAD_MODEL: threading.Thread(target=_preload_thread, daemon=True).start() @app.before_request def _log_req(): if request.path not in ("/health","/ping","/favicon.ico"): log.info("[REQ] %s %s", request.method, request.path) @app.errorhandler(HTTPException) def _http_err(e: HTTPException): return e @app.errorhandler(Exception) def _err(e): log.exception("Unhandled error") return "Internal Server Error", 500 @app.get("/diag") def diag(): import sys try: import torch, transformers, peft torch_v = torch.__version__ tf_v = transformers.__version__ peft_v = peft.__version__ except Exception as e: torch_v = tf_v = peft_v = f"import error: {e}" return jsonify({ "ok": True, "time": datetime.now(timezone.utc).isoformat(), "device": DEVICE, "base_model": BASE_MODEL_ID, "adapter": ADAPTER_ID or None, "model_ready": _MODEL_READY, "model_error": _MODEL_ERROR, "versions": {"python": sys.version, "torch": torch_v, "transformers": tf_v, "peft": peft_v}, "preload": PRELOAD_MODEL, "prenorm": {"level": PRENORM_LEVEL, "debug": PRENORM_DEBUG}, "public_app_url": PUBLIC_APP_URL or None }) @app.get("/health") def health(): return jsonify({"ok": True, "time": datetime.now(timezone.utc).isoformat()}) @app.get("/ping") def ping(): return jsonify({"ok": True, "time": datetime.now(timezone.utc).isoformat()}) @app.get("/login") def login_get(): if session.get("uid"): return redirect(url_for("index")) return render_template("login.html") @app.post("/login") def login_post(): email = (request.form.get("email") or "").strip().lower() pwd = request.form.get("password") or "" if not email or not pwd: flash("Isi email dan password", "error") return redirect(url_for("login_get")) with SessionLocal() as s: u = s.query(User).filter_by(email=email).first() if not u or not verify_password(u, pwd): flash("Email atau password salah", "error") return redirect(url_for("login_get")) session.permanent = True session["uid"], session["email"] = u.id, u.email return redirect(url_for("index")) @app.get("/register") def register_get(): if session.get("uid"): return redirect(url_for("index")) return render_template("register.html") @app.post("/register") def register_post(): email = (request.form.get("email") or "").strip().lower() pwd = (request.form.get("password") or "") if not email or not pwd: flash("Isi email dan password", "error") return redirect(url_for("register_get")) with SessionLocal() as s: if s.query(User).filter_by(email=email).first(): flash("Email sudah terdaftar", "error") return redirect(url_for("register_get")) u = User(email=email); set_password(u, pwd) s.add(u); s.commit() flash("Registrasi berhasil. Silakan login.", "success") return redirect(url_for("login_get")) @app.get("/logout") def logout(): session.clear() return redirect(url_for("login_get")) @app.get("/") @login_required def index(): with SessionLocal() as s: uid = session["uid"] items = (s.query(Translation) .filter(Translation.user_id == uid) .order_by(Translation.id.desc()) .limit(10).all()) recent = [{"src": it.src, "mt": it.mt, "created_at": it.created_at.strftime("%Y-%m-%d %H:%M")} for it in items] return render_template("index.html", logged_in=True, device=DEVICE, recent=recent) @app.get("/about") def about_page(): return render_template("about.html") @app.get("/history") def api_history(): if not session.get("uid"): return jsonify({"items": []}) with SessionLocal() as s: uid = session["uid"] items = (s.query(Translation) .filter(Translation.user_id == uid) .order_by(Translation.id.desc()) .limit(10).all()) out = [{"src": it.src, "mt": it.mt, "created_at": it.created_at.strftime("%Y-%m-%d %H:%M")} for it in items] return jsonify({"items": out}) @app.post("/translate") def api_translate(): if not session.get("uid"): return jsonify({"ok": False, "error": "Unauthorized"}), 401 payload = request.get_json(silent=True) or {} text = (payload.get("text") or "").strip() max_new = int(payload.get("max_new_tokens", 48)) if not text: return jsonify({"ok": False, "error": "Empty text"}), 400 try: clean = prenorm(text) mt = f"[FAKE] {clean}" if FALLBACK_TRANSLATE else translate_with_model(clean, max_new_tokens=max_new) with SessionLocal() as s: s.add(Translation(user_id=session["uid"], src=text, mt=mt)) s.commit() return jsonify({"ok": True, "mt": mt}) except Exception as e: log.error("[API] translate error: %s", e) log.error(traceback.format_exc()) return jsonify({"ok": False, "error": f"{type(e).__name__}: {e}"}), 500 if __name__ == "__main__": app.run(host="0.0.0.0", port=int(os.getenv("PORT", "7860")), debug=False)