PapuaTranslate / app.py
amosnbn's picture
final1.2
25480c6
import os, re, json, codecs, pathlib, logging, threading, traceback, inspect, unicodedata
from datetime import datetime, timezone, timedelta
from functools import wraps
from flask import Flask, render_template, request, redirect, url_for, session, jsonify, flash
from werkzeug.middleware.proxy_fix import ProxyFix
from werkzeug.exceptions import HTTPException
from huggingface_hub import snapshot_download
logging.basicConfig(level=logging.INFO, format="%(asctime)s | %(levelname)s | %(message)s")
log = logging.getLogger("papua-app")
app = Flask(__name__, template_folder="frontend", static_folder="static")
app.wsgi_app = ProxyFix(app.wsgi_app, x_for=1, x_proto=1, x_host=1)
app.config.update(
SECRET_KEY=os.getenv("SECRET_KEY", "dev-secret-change-me"),
SESSION_COOKIE_NAME="hfspace_session",
SESSION_COOKIE_SAMESITE="None",
SESSION_COOKIE_SECURE=True,
SESSION_COOKIE_HTTPONLY=True,
SESSION_COOKIE_PATH="/",
PREFERRED_URL_SCHEME="https",
)
app.permanent_session_lifetime = timedelta(hours=8)
PRELOAD_MODEL = os.getenv("PRELOAD_MODEL", "true").lower() in ("1","true","yes")
FALLBACK_TRANSLATE = os.getenv("FALLBACK_TRANSLATE", "false").lower() in ("1","true","yes")
PUBLIC_APP_URL = os.getenv("PUBLIC_APP_URL", "").strip()
@app.context_processor
def inject_globals():
return {"public_app_url": PUBLIC_APP_URL}
from sqlalchemy import create_engine, Column, Integer, Text, DateTime, ForeignKey, func
from sqlalchemy.orm import declarative_base, sessionmaker, scoped_session, relationship
DATABASE_URL = os.getenv("DATABASE_URL") or os.getenv("DB_URL")
if not DATABASE_URL:
DATABASE_URL = "sqlite:////tmp/app.db"
log.warning("[DB] DATABASE_URL tidak diset; pakai SQLite /tmp/app.db")
else:
if DATABASE_URL.startswith("postgres://"):
DATABASE_URL = DATABASE_URL.replace("postgres://", "postgresql+psycopg2://", 1)
elif DATABASE_URL.startswith("postgresql://"):
DATABASE_URL = DATABASE_URL.replace("postgresql://", "postgresql+psycopg2://", 1)
if DATABASE_URL.startswith("postgresql+psycopg2") and "sslmode=" not in DATABASE_URL:
sep = "&" if "?" in DATABASE_URL else "?"
DATABASE_URL = f"{DATABASE_URL}{sep}sslmode=require"
engine = create_engine(DATABASE_URL, pool_pre_ping=True)
SessionLocal = scoped_session(sessionmaker(bind=engine, autoflush=False, autocommit=False))
Base = declarative_base()
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True)
email = Column(Text, unique=True, nullable=False)
pass_hash = Column(Text, nullable=False)
created_at = Column(DateTime(timezone=True), server_default=func.now())
class Translation(Base):
__tablename__ = "translations"
id = Column(Integer, primary_key=True)
user_id = Column(Integer, ForeignKey("users.id"), nullable=False)
src = Column(Text, nullable=False)
mt = Column(Text, nullable=False)
created_at = Column(DateTime(timezone=True), server_default=func.now())
user = relationship("User")
try:
Base.metadata.create_all(engine)
log.info("[DB] Ready")
except Exception as e:
log.exception("[DB] init error: %s", e)
from werkzeug.security import generate_password_hash, check_password_hash
def set_password(user: User, raw: str): user.pass_hash = generate_password_hash(raw)
def verify_password(user: User, raw: str) -> bool:
try:
return check_password_hash(user.pass_hash, raw)
except Exception:
return False
def login_required(fn):
@wraps(fn)
def _wrap(*args, **kwargs):
if not session.get("uid"):
return redirect(url_for("login_get"))
return fn(*args, **kwargs)
return _wrap
PRENORM_LEVEL = os.getenv("PRENORM_LEVEL", "basic").lower()
PRENORM_DEBUG = os.getenv("PRENORM_DEBUG", "0") == "1"
WS_RE = re.compile("\s+")
ELONG_RE = re.compile("([bcdfghjklmnpqrstvwxyz])\1{2,}", flags=re.IGNORECASE)
PUNC_RE = re.compile("[^\w\s,.;:?!%()\-\—/]|_")
MULTI_PUNC = re.compile("([,.;:?!])\1+")
DASH_SPACES= re.compile("\s*([-—/])\s*")
WHITELIST_KEEP_ELONG = {"papua", "koteka", "wamena", "sarmi", "sorong"}
PAPUA_MAP = {
"sa": "saya", "sy": "saya", "beta": "saya",
"ko": "kamu", "kau": "kamu",
"dong": "mereka", "kam": "kalian", "kamong": "kalian",
"kitong": "kita", "kitorang": "kita", "torang": "kita",
"tra": "tidak", "tr": "tidak", "trada": "tidak ada", "son": "tidak", "ndak": "tidak", "tid": "tidak",
"mo": "mau", "su": "sudah", "uda": "sudah",
"skarang": "sekarang", "td": "tadi", "tar": "nanti", "tarlah": "nanti",
"pigi": "pergi", "pi": "pergi",
"ma": "sama", "deng": "dengan", "dgn": "dengan",
"kira2": "kira-kira", "bgmn": "bagaimana", "gmn": "bagaimana",
"tau": "tahu", "tao": "tahu"
}
PRON_MAP = {
"sa": "saya", "saya": "saya",
"ko": "kamu", "kamu": "kamu",
"dia": "dia", "dong": "mereka",
"kam": "kalian", "kalian": "kalian",
"kitong": "kita", "kitorang": "kita", "kita": "kita", "torang": "kita",
}
def _normalize_unicode(text: str) -> str:
return unicodedata.normalize("NFKC", text)
def _strip_emoji_and_noise(text: str) -> str:
text = PUNC_RE.sub(" ", text)
text = MULTI_PUNC.sub(r"\1", text)
text = DASH_SPACES.sub(r" \1 ", text)
return text
def _normalize_ws(text: str) -> str:
return WS_RE.sub(" ", text).strip()
def _reduce_elongation(token: str) -> str:
base = token.lower()
if base in WHITELIST_KEEP_ELONG:
return token
return ELONG_RE.sub(r"\1\1", token)
def _apply_papua_map(token: str) -> str:
low = token.lower()
return PAPUA_MAP.get(low, token)
def _handle_pu_constructs(text: str) -> str:
def repl(m):
pron = m.group(1).lower()
rest = m.group(2).strip()
pron_std = PRON_MAP.get(pron, pron)
return f"punya {pron_std} {rest}"
return re.sub("\b(sa|saya|ko|kamu|dia|dong|kam|kalian|kitong|kitorang|kita|torang)\s*pu\s+([^.,;:!?]+)",
repl, text, flags=re.IGNORECASE)
def _handle_mo_bigram(text: str) -> str:
def repl(m):
pron = m.group(1).lower()
pron_std = PRON_MAP.get(pron, pron)
return f"{pron_std} mau"
return re.sub("\b(sa|saya|ko|kamu|dia|dong|kam|kalian|kitong|kitorang|kita|torang)\s+mo\b",
repl, text, flags=re.IGNORECASE)
def _handle_negation_bigrams(text: str) -> str:
def repl_pron(m):
pron = m.group(1).lower()
pron_std = PRON_MAP.get(pron, pron)
return f"{pron_std} tidak"
text = re.sub("\b(sa|saya|ko|kamu|dia|dong|kam|kalian|kitong|kitorang|kita|torang)\s+(tra|ndak|son|tid)\b",
repl_pron, text, flags=re.IGNORECASE)
text = re.sub("\btra\s+ada\b", "tidak ada", text, flags=re.IGNORECASE)
text = re.sub("\bndak\s+ada\b", "tidak ada", text, flags=re.IGNORECASE)
text = re.sub("\btid\s+ada\b", "tidak ada", text, flags=re.IGNORECASE)
text = re.sub("\bson\s+ada\b", "tidak ada", text, flags=re.IGNORECASE)
text = re.sub("\btidak\s+tau\b", "tidak tahu", text, flags=re.IGNORECASE)
return text
def _token_level_ops(text: str, aggressive: bool) -> str:
tokens = text.split()
out = []
for t in tokens:
t2 = _reduce_elongation(t) if aggressive else t
t3 = _apply_papua_map(t2)
out.append(t3)
return " ".join(out)
def papua_prenorm(inp: str, level: str = "basic", return_trace: bool = False):
if level == "off":
return (inp, {}) if return_trace else inp
s1 = _normalize_unicode(inp)
s2 = _strip_emoji_and_noise(s1) if level == "aggressive" else s1
s3 = _normalize_ws(s2)
s3a = _handle_mo_bigram(s3)
s3b = _handle_negation_bigrams(s3a)
s4 = _handle_pu_constructs(s3b)
s5 = _token_level_ops(s4, aggressive=(level == "aggressive"))
s6 = _normalize_ws(s5)
return (s6, {"final": s6}) if return_trace else s6
def prenorm(text: str) -> str:
if PRENORM_DEBUG:
out, tr = papua_prenorm(text, level=PRENORM_LEVEL, return_trace=True)
log.info("[PRENORM][%s] %s -> %s | trace=%s", PRENORM_LEVEL, text, out, json.dumps(tr, ensure_ascii=False))
return out
return papua_prenorm(text, level=PRENORM_LEVEL, return_trace=False)
BASE_MODEL_ID = os.getenv("BASE_MODEL_ID", "amosnbn/cendol-mt5-base-inst")
ADAPTER_ID = os.getenv("ADAPTER_ID", "amosnbn/papua-lora-ckpt-168")
DEVICE = "cuda" if os.getenv("DEVICE", "cpu") == "cuda" else "cpu"
TOK = None
MODEL = None
_MODEL_LOCK = threading.Lock()
_MODEL_READY = False
_MODEL_ERROR = None
def _strip_bom_in_dir(root_dir: str):
root = pathlib.Path(root_dir)
for p in root.rglob("*.json"):
try:
with codecs.open(p, "r", encoding="utf-8-sig") as f:
data = json.load(f)
with open(p, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
log.info(f"[BOM] stripped: {p}")
except Exception as e:
log.warning(f"[BOM] skip {p}: {e}")
def _sanitize_adapter_config(adapter_dir: str):
try:
from peft import LoraConfig
sig = inspect.signature(LoraConfig.__init__)
allowed = set(p.name for p in sig.parameters.values())
except Exception as e:
log.warning(f"[SAN] Tidak bisa analisa LoraConfig: {e}")
return
cfg_path = pathlib.Path(adapter_dir) / "adapter_config.json"
if not cfg_path.exists():
for alt in ("adapter_config.json","adapter_config_0.json","config.json"):
c = pathlib.Path(adapter_dir) / alt
if c.exists():
cfg_path = c
break
if not cfg_path.exists():
log.warning(f"[SAN] adapter_config tidak ditemukan di {adapter_dir}")
return
try:
with codecs.open(cfg_path, "r", encoding="utf-8-sig") as f:
cfg = json.load(f)
cleaned = {k: v for k, v in cfg.items() if k in allowed}
if set(cleaned.keys()) != set(cfg.keys()):
with open(cfg_path, "w", encoding="utf-8") as f:
json.dump(cleaned, f, ensure_ascii=False, indent=2)
log.info("[SAN] adapter_config dibersihkan dari field tidak dikenal")
except Exception as e:
log.warning(f"[SAN] gagal baca/bersih adapter_config: {e}")
def _load_model():
global TOK, MODEL, _MODEL_READY, _MODEL_ERROR
try:
log.info("[MODEL] downloading base=%s adapter=%s", BASE_MODEL_ID, ADAPTER_ID or "-")
base_dir = snapshot_download(
repo_id=BASE_MODEL_ID,
local_dir="/tmp/hf_base",
local_dir_use_symlinks=False,
allow_patterns=None,
)
_strip_bom_in_dir(base_dir)
adapter_dir = None
if ADAPTER_ID:
adapter_dir = snapshot_download(
repo_id=ADAPTER_ID,
local_dir="/tmp/hf_adapter",
local_dir_use_symlinks=False,
allow_patterns=None,
)
_strip_bom_in_dir(adapter_dir)
_sanitize_adapter_config(adapter_dir)
import torch
from transformers import AutoTokenizer, AutoModelForSeq2SeqLM
from peft import PeftModel
TOK = AutoTokenizer.from_pretrained(base_dir)
base = AutoModelForSeq2SeqLM.from_pretrained(base_dir)
MODEL = PeftModel.from_pretrained(base, adapter_dir) if adapter_dir else base
MODEL.eval().to(DEVICE)
_MODEL_READY = True
_MODEL_ERROR = None
log.info("[MODEL] ready on %s", DEVICE)
except Exception as e:
_MODEL_READY = False
_MODEL_ERROR = f"{type(e).__name__}: {e}"
log.exception("[MODEL] load error")
def get_model():
global MODEL
if MODEL is None:
with _MODEL_LOCK:
if MODEL is None:
_load_model()
return TOK, MODEL
def translate_with_model(text: str, max_new_tokens: int = 48) -> str:
import torch
tok, m = get_model()
if not _MODEL_READY or m is None:
raise RuntimeError(f"Model not ready: {_MODEL_ERROR or 'unknown error'}")
enc = tok([text], return_tensors="pt", truncation=True, max_length=256)
enc = {k: v.to(DEVICE) for k, v in enc.items()}
out = m.generate(
**enc,
max_new_tokens=int(max_new_tokens),
num_beams=4,
length_penalty=0.9,
no_repeat_ngram_size=3,
early_stopping=True,
)
return tok.decode(out[0], skip_special_tokens=True)
def _preload_thread():
try:
_load_model()
except Exception:
pass
if PRELOAD_MODEL:
threading.Thread(target=_preload_thread, daemon=True).start()
@app.before_request
def _log_req():
if request.path not in ("/health","/ping","/favicon.ico"):
log.info("[REQ] %s %s", request.method, request.path)
@app.errorhandler(HTTPException)
def _http_err(e: HTTPException):
return e
@app.errorhandler(Exception)
def _err(e):
log.exception("Unhandled error")
return "Internal Server Error", 500
@app.get("/diag")
def diag():
import sys
try:
import torch, transformers, peft
torch_v = torch.__version__
tf_v = transformers.__version__
peft_v = peft.__version__
except Exception as e:
torch_v = tf_v = peft_v = f"import error: {e}"
return jsonify({
"ok": True,
"time": datetime.now(timezone.utc).isoformat(),
"device": DEVICE,
"base_model": BASE_MODEL_ID,
"adapter": ADAPTER_ID or None,
"model_ready": _MODEL_READY,
"model_error": _MODEL_ERROR,
"versions": {"python": sys.version, "torch": torch_v, "transformers": tf_v, "peft": peft_v},
"preload": PRELOAD_MODEL,
"prenorm": {"level": PRENORM_LEVEL, "debug": PRENORM_DEBUG},
"public_app_url": PUBLIC_APP_URL or None
})
@app.get("/health")
def health():
return jsonify({"ok": True, "time": datetime.now(timezone.utc).isoformat()})
@app.get("/ping")
def ping():
return jsonify({"ok": True, "time": datetime.now(timezone.utc).isoformat()})
@app.get("/login")
def login_get():
if session.get("uid"):
return redirect(url_for("index"))
return render_template("login.html")
@app.post("/login")
def login_post():
email = (request.form.get("email") or "").strip().lower()
pwd = request.form.get("password") or ""
if not email or not pwd:
flash("Isi email dan password", "error")
return redirect(url_for("login_get"))
with SessionLocal() as s:
u = s.query(User).filter_by(email=email).first()
if not u or not verify_password(u, pwd):
flash("Email atau password salah", "error")
return redirect(url_for("login_get"))
session.permanent = True
session["uid"], session["email"] = u.id, u.email
return redirect(url_for("index"))
@app.get("/register")
def register_get():
if session.get("uid"):
return redirect(url_for("index"))
return render_template("register.html")
@app.post("/register")
def register_post():
email = (request.form.get("email") or "").strip().lower()
pwd = (request.form.get("password") or "")
if not email or not pwd:
flash("Isi email dan password", "error")
return redirect(url_for("register_get"))
with SessionLocal() as s:
if s.query(User).filter_by(email=email).first():
flash("Email sudah terdaftar", "error")
return redirect(url_for("register_get"))
u = User(email=email); set_password(u, pwd)
s.add(u); s.commit()
flash("Registrasi berhasil. Silakan login.", "success")
return redirect(url_for("login_get"))
@app.get("/logout")
def logout():
session.clear()
return redirect(url_for("login_get"))
@app.get("/")
@login_required
def index():
with SessionLocal() as s:
uid = session["uid"]
items = (s.query(Translation)
.filter(Translation.user_id == uid)
.order_by(Translation.id.desc())
.limit(10).all())
recent = [{"src": it.src, "mt": it.mt, "created_at": it.created_at.strftime("%Y-%m-%d %H:%M")} for it in items]
return render_template("index.html", logged_in=True, device=DEVICE, recent=recent)
@app.get("/about")
def about_page():
return render_template("about.html")
@app.get("/history")
def api_history():
if not session.get("uid"):
return jsonify({"items": []})
with SessionLocal() as s:
uid = session["uid"]
items = (s.query(Translation)
.filter(Translation.user_id == uid)
.order_by(Translation.id.desc())
.limit(10).all())
out = [{"src": it.src, "mt": it.mt, "created_at": it.created_at.strftime("%Y-%m-%d %H:%M")} for it in items]
return jsonify({"items": out})
@app.post("/translate")
def api_translate():
if not session.get("uid"):
return jsonify({"ok": False, "error": "Unauthorized"}), 401
payload = request.get_json(silent=True) or {}
text = (payload.get("text") or "").strip()
max_new = int(payload.get("max_new_tokens", 48))
if not text:
return jsonify({"ok": False, "error": "Empty text"}), 400
try:
clean = prenorm(text)
mt = f"[FAKE] {clean}" if FALLBACK_TRANSLATE else translate_with_model(clean, max_new_tokens=max_new)
with SessionLocal() as s:
s.add(Translation(user_id=session["uid"], src=text, mt=mt))
s.commit()
return jsonify({"ok": True, "mt": mt})
except Exception as e:
log.error("[API] translate error: %s", e)
log.error(traceback.format_exc())
return jsonify({"ok": False, "error": f"{type(e).__name__}: {e}"}), 500
if __name__ == "__main__":
app.run(host="0.0.0.0", port=int(os.getenv("PORT", "7860")), debug=False)