Spaces:
Runtime error
Runtime error
| # app.py | |
| import os, glob, uuid, random, json, pymysql, gradio as gr | |
| from typing import List, Dict, Any | |
| IMAGE_DIR = os.getenv("IMAGE_DIR","assets") | |
| SENTIMENTS_PATH = os.getenv("SENTIMENTS_PATH", os.path.join(IMAGE_DIR,"sentiments.json")) | |
| IMG_EXTS = (".png",".jpg",".jpeg",".webp",".bmp",".gif") | |
| POLL_TYPE="sentiment" | |
| DEFAULT_SENTIMENTS=["Très négatif","Négatif","Neutre","Positif","Très positif"] | |
| DB_HOST=os.getenv("DB_HOST"); DB_PORT=int(os.getenv("DB_PORT","3306")) | |
| DB_NAME=os.getenv("DB_NAME"); DB_USER=os.getenv("DB_USER"); DB_PASSWORD=os.getenv("DB_PASSWORD") | |
| DB_SSL=os.getenv("DB_SSL","false").lower()=="true"; ALLOW_DDL=os.getenv("ALLOW_DDL","false").lower()=="true" | |
| def db_connect(): | |
| kw=dict(host=DB_HOST,port=DB_PORT,user=DB_USER,password=DB_PASSWORD,database=DB_NAME,charset="utf8mb4",autocommit=True) | |
| if DB_SSL: kw["ssl"]={"ssl":{}} | |
| return pymysql.connect(**kw) | |
| def ensure_tables(): | |
| if not ALLOW_DDL: return | |
| sql="CREATE TABLE IF NOT EXISTS poll_response (id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT, poll_type VARCHAR(32) NOT NULL, session_uuid CHAR(36) NOT NULL, item_index INT NOT NULL, item_file VARCHAR(255) NOT NULL, choice VARCHAR(64) NOT NULL, user_agent VARCHAR(255) DEFAULT NULL, ip_hash CHAR(64) DEFAULT NULL, created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, PRIMARY KEY (id), KEY idx_poll_item (poll_type, item_index), KEY idx_created (created_at)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;" | |
| conn=db_connect(); | |
| with conn.cursor() as cur: cur.execute(sql) | |
| conn.close() | |
| def load_sentiments(): | |
| try: | |
| if os.path.exists(SENTIMENTS_PATH): | |
| with open(SENTIMENTS_PATH,"r",encoding="utf-8") as f: | |
| data=json.load(f) | |
| if isinstance(data,list) and data: return [str(x) for x in data] | |
| if isinstance(data,dict) and isinstance(data.get("sentiments"),list): return [str(x) for x in data["sentiments"]] | |
| except Exception as e: | |
| print("[warn] sentiments.json illisible:", e) | |
| return DEFAULT_SENTIMENTS | |
| def load_images()->List[Dict[str,Any]]: | |
| files=[p for p in glob.glob(os.path.join(IMAGE_DIR,"*")) if p.lower().endswith(IMG_EXTS)] | |
| files.sort() | |
| if not files: raise RuntimeError(f"Aucune image trouvée dans {IMAGE_DIR}") | |
| return [{"path":p,"file":os.path.basename(p)} for p in files] | |
| def aggregate(conn, items, sentiments): | |
| rows=[] | |
| with conn.cursor() as cur: | |
| for i,it in enumerate(items, start=1): | |
| cur.execute("SELECT choice, COUNT(*) FROM poll_response WHERE poll_type=%s AND item_index=%s GROUP BY choice", (POLL_TYPE,i)) | |
| counts={c:cnt for c,cnt in (cur.fetchall() or [])} | |
| total=sum(counts.values()); row=[i,it["file"],total] | |
| for s in sentiments: row.append(counts.get(s,0)) | |
| rows.append(row) | |
| return rows | |
| def app(): | |
| ensure_tables() | |
| sentiments=load_sentiments() | |
| items=load_images() | |
| headers=["#","Fichier","Total"]+sentiments | |
| with gr.Blocks(theme=gr.themes.Soft()) as demo: | |
| gr.Markdown("# Poll Sentiments") | |
| gr.Markdown("Choisissez un sentiment pour chaque image, validez, puis consultez les agrégats globaux.") | |
| state_items=gr.State(items); state_sent=gr.State(sentiments); session_id=gr.State(str(uuid.uuid4())) | |
| with gr.Group(visible=True) as poll_group: | |
| with gr.Row(): | |
| btn_shuffle=gr.Button("🔀 Mélanger l’ordre") | |
| btn_reset=gr.Button("♻️ Réinitialiser les choix") | |
| btn_submit=gr.Button("✅ Valider mes choix", variant="primary") | |
| warn=gr.Markdown(visible=False) | |
| imgs=[]; dds=[] | |
| rows=(len(items)+3)//4; idx=0 | |
| with gr.Column(): | |
| for r in range(rows): | |
| with gr.Row(): | |
| for c in range(4): | |
| if idx>=len(items): break | |
| with gr.Column(): | |
| imgs.append(gr.Image(value=items[idx]["path"], label=f"Image {idx+1}", interactive=False)) | |
| dds.append(gr.Dropdown(choices=sentiments, label="Votre choix")) | |
| idx+=1 | |
| with gr.Group(visible=False) as result_group: | |
| gr.Markdown("## Agrégats (tous les usagers)") | |
| df=gr.Dataframe(headers=headers, interactive=False) | |
| btn_back=gr.Button("↩️ Revenir au poll") | |
| def on_shuffle(state, sentiments): | |
| items=list(state); import random; random.shuffle(items) | |
| return [*([gr.update(value=items[i]['path'],label=f'Image {i+1}') for i in range(len(items))]), *([gr.update(value=None, choices=sentiments) for _ in items]), gr.update(value='',visible=False), items] | |
| btn_shuffle.click(on_shuffle, inputs=[state_items,state_sent], outputs=[*imgs,*dds,warn,state_items]) | |
| def on_reset(): return [gr.update(value=None) for _ in dds]+[gr.update(value='',visible=False)] | |
| btn_reset.click(on_reset, inputs=None, outputs=[*dds,warn]) | |
| def on_submit(*vals, state, sentiments, sess): | |
| if any(v in (None,"") for v in vals): | |
| missing=sum(1 for v in vals if v in (None,"")) | |
| return gr.update(value=f"❗ {missing} choix manquent.", visible=True), gr.update(visible=True), gr.update(visible=False), None | |
| conn=db_connect() | |
| with conn.cursor() as cur: | |
| for i,v in enumerate(vals, start=1): | |
| cur.execute("INSERT INTO poll_response (poll_type,session_uuid,item_index,item_file,choice) VALUES (%s,%s,%s,%s,%s)", ("sentiment",sess,i,state[i-1]['file'],v)) | |
| rows=aggregate(conn,state,sentiments); conn.close() | |
| return gr.update(visible=False), gr.update(value=rows), gr.update(visible=True), None | |
| btn_submit.click(on_submit, inputs=[*dds,state_items,state_sent,session_id], outputs=[poll_group,df,result_group,warn]) | |
| btn_back.click(lambda: (gr.update(visible=True), gr.update(visible=False)), inputs=None, outputs=[poll_group, result_group]) | |
| return demo | |
| demo=app() | |
| if __name__=="__main__": demo.launch() | |