"""Evaluation server."""
import json
from datetime import datetime, timedelta, timezone
from typing import Any
import gradio as gr
from .allocation import AllocationEngine
from .database import DB
from .instuction_md import (
important_notes_instruction,
login_instruction,
nav_instruction,
q1_instruction,
q2_instruction,
)
# Type alias for return tuples
# Most functions return 8 elements (without user_state)
ReturnTuple = tuple[
dict[str, Any], # img
dict[str, Any], # label_md
dict[str, Any], # words_checkbox
dict[str, Any], # done_md
dict[str, Any], # current_idx_md
dict[str, Any], # progress_md
str | None, # current_item_state
dict[str, Any], # score
]
# start_or_resume returns 9 elements (includes user_state)
StartReturnTuple = tuple[
dict[str, Any], # img
dict[str, Any], # label_md
dict[str, Any], # words_checkbox
dict[str, Any], # done_md
dict[str, Any], # current_idx_md
dict[str, Any], # progress_md
dict[str, Any], # user_state
str | None, # current_item_state
str | None, # score
]
def _error_return(message: str, keep_current_item: bool = True, current_item_id: str | None = None) -> ReturnTuple:
"""Create a standard error return tuple with notification.
Args:
message: Error message to display
keep_current_item: If True, preserve current_item_id; if False, set to None
current_item_id: The current item ID (only used if keep_current_item is True)
Returns:
Standard 8-element return tuple with error message
"""
# Show warning notification (3 seconds)
gr.Warning(message, duration=3)
return (
gr.update(),
gr.update(),
gr.update(),
gr.update(value=message),
gr.update(),
gr.update(),
current_item_id if keep_current_item else None,
gr.update(),
)
def format_image_path(template: str, label: str, path: str) -> str:
"""Replace the template with the label and path."""
return template.format(label=label, path=path)
def app_main(args) -> None:
"""Run the evaluation server."""
# Set custom temp directory to avoid permission issues with shared /tmp/gradio
# user_temp_dir = os.path.join(tempfile.gettempdir(), f'gradio_{os.getenv("USER", "user")}')
# os.makedirs(user_temp_dir, exist_ok=True)
# os.environ['GRADIO_TEMP_DIR'] = user_temp_dir
db_path = args.db_path
db = DB(db_path)
# Get auto-allocation setting
auto_allo_num = args.auto_allo_num
allowed_users = args.allowed_users
with gr.Blocks(title='Icon Evaluation', theme=gr.themes.Base()) as demo:
user_state = gr.State({'user_id': None})
current_item_state = gr.State(None) # current item_id
# Instructions page (shown first)
with gr.Column(visible=True) as instructions_page:
# gr.Markdown(task_instructions)
gr.Markdown('# Instructions')
gr.Markdown(
'Welcome to the Icon Evaluation Task! Please read the instructions first time you start the task.'
)
with gr.Walkthrough(selected=0) as walkthrough:
with gr.Step('Login', id=0):
gr.Markdown(login_instruction)
gr.HTML(
"""
"""
)
btn = gr.Button('Next Step')
btn.click(lambda: gr.Walkthrough(selected=1), outputs=walkthrough)
with gr.Step('Question 1', id=1):
gr.Markdown(q1_instruction)
gr.HTML(
"""
"""
)
btn = gr.Button('Next Step')
btn.click(lambda: gr.Walkthrough(selected=2), outputs=walkthrough)
with gr.Step('Question 2', id=2):
gr.Markdown(q2_instruction)
gr.HTML(
"""
"""
)
btn = gr.Button('Next Step')
btn.click(lambda: gr.Walkthrough(selected=3), outputs=walkthrough)
with gr.Step('Important Notes', id=3):
gr.Markdown(important_notes_instruction)
gr.HTML(
"""
"""
)
gr.Markdown(nav_instruction)
gr.HTML(
"""
"""
)
start_task_btn = gr.Button('🚀 Start Evaluation Task', variant='primary', size='lg')
# Main evaluation interface (hidden initially)
with gr.Column(visible=False) as main_page:
with gr.Row():
user_id_inp = gr.Textbox(label='User ID', placeholder='e.g.: alice_01', scale=2)
start_btn = gr.Button('Start/Resume', variant='primary', scale=1)
# Progress indicators (At: current index, Done: completed count)
with gr.Row():
current_idx_md = gr.Markdown('**At:** -/-', visible=False)
progress_md = gr.Markdown('**Done:** -/-', visible=False)
# Image display (disable download function)
img = gr.Image(label='Image', type='filepath', height=256, show_download_button=False)
# Collapsible task guide
with gr.Accordion('Task Instructions (Click to expand)', open=False):
gr.Markdown(q1_instruction + '\n' + q2_instruction)
# Relevance score
label_md = gr.Markdown(visible=False)
score = gr.Radio(
choices=['1', '2', '3', '4', '5'],
label='1-5 points: the relevance of the image to the label',
interactive=True,
)
# 10 words checkbox
words_checkbox = gr.CheckboxGroup(
choices=[],
label='10 words: Select the words that are NOT presented in the image or "ALL WORDS PRESENT"',
interactive=True,
visible=False,
)
# Action buttons
with gr.Row():
submit_btn = gr.Button('Submit and Next', variant='primary')
# Navigation controls
with gr.Row():
jump_idx = gr.Number(label='Jump to the index (1-indexed)', precision=0)
jump_btn = gr.Button('Jump')
prev_btn = gr.Button('Prev')
next_btn = gr.Button('Next')
# Status messages
done_md = gr.Markdown(visible=False)
# Button to show main page
start_task_btn.click(
fn=lambda: (gr.update(visible=False), gr.update(visible=True)),
outputs=[instructions_page, main_page],
queue=False,
)
def get_user_items_list(user_id: str) -> list[dict[str, Any]]:
"""Get ordered list of items assigned to user."""
with db._connect() as conn:
cur = conn.execute(
"""
SELECT t.item_id, t.label, t.path, t.words, t.order_key, a.status
FROM assignments a
JOIN tasks t ON a.round_id = t.round_id AND a.item_id = t.item_id
WHERE a.round_id = %s AND a.user_id = %s
ORDER BY t.order_key
""",
(args.round_id, user_id),
)
rows = cur.fetchall()
items_list = []
for row in rows:
item_id, label, path, words_json, order_key, status = row
items_list.append(
{
'item_id': item_id,
'label': label,
'path': path,
'words': json.loads(words_json),
'order_key': order_key,
'status': status,
}
)
return items_list
def get_next_item(user_id: str) -> dict[str, Any] | None:
"""Get the next pending or in_progress item for user to resume."""
items_list = get_user_items_list(user_id)
for item in items_list:
if item['status'] in ('pending', 'in_progress'):
return item
return None
def start_or_resume(user_id: str, state: dict[str, Any]) -> StartReturnTuple:
"""Start or resume the evaluation.
Return: img, label_md, words_checkbox, done_md, current_idx_md, progress_md,
state, current_item_state, score
"""
user_id = user_id.strip()
if allowed_users and user_id not in allowed_users:
gr.Warning('User ID not allowed', duration=3)
return (
gr.update(),
gr.update(),
gr.update(),
gr.update(visible=True, value='User ID not allowed'),
gr.update(),
gr.update(),
state,
None,
None,
)
if not user_id or user_id == '':
gr.Warning('Please input the user ID', duration=3)
return (
gr.update(),
gr.update(),
gr.update(),
gr.update(visible=True, value='Please input the user ID.'),
gr.update(),
gr.update(),
state,
None,
None,
)
state = {'user_id': user_id}
db.cleanup_expired_leases(args.round_id)
items_list = get_user_items_list(user_id)
# Auto-allocate for new users
if not items_list and auto_allo_num > 0:
engine = AllocationEngine(db, args.round_id, operator='auto_system')
result = engine.allocate(
from_source='unassigned',
to_target=f'user:{user_id}',
amount_spec=f'count:{auto_allo_num}',
redundancy=args.redundancy,
force=False,
dry_run=False,
reason='Auto-allocation for new user on first login',
)
if result['success']:
allocated_count = result['affected_count']
gr.Info(f'Welcome! You have {allocated_count} tasks to evaluate. Enjoy!', duration=5)
items_list = get_user_items_list(user_id)
else:
error_msg = result.get('error', 'Failed to automatically allocate tasks')
if 'No tasks available' in error_msg:
gr.Warning('No tasks available. Stay tuned!', duration=5)
else:
gr.Warning(
f'Failed to automatically allocate tasks: {error_msg}. '
'Please contact the admin(janeding@umich.edu).',
duration=8,
)
if not items_list:
gr.Warning('No tasks assigned to this user. Please contact admin.', duration=5)
return (
gr.update(visible=False),
gr.update(),
gr.update(),
gr.update(visible=True, value='No tasks assigned to this user. Please contact admin.'),
gr.update(),
gr.update(),
state,
None,
None,
)
total = len(items_list)
completed_items = [it for it in items_list if it['status'] == 'completed']
done_count = len(completed_items)
nxt = get_next_item(user_id)
if nxt is None: # all items are done
# Load the last completed item and show completion message
gr.Info('Congratulations! All tasks completed. Thank you for participating! ☺️', duration=10)
last_item = items_list[-1] if items_list else None
if last_item:
# Get the 8-element result from helper and insert user_state
(
img_up,
label_up,
words_up_cb,
done_up,
idx_up,
prog_up,
item_id_val,
score_up,
) = _load_item_helper(user_id, last_item, items_list, len(items_list))
# Return 9 elements with user_state inserted at position 7
return (
img_up,
label_up,
words_up_cb,
done_up,
idx_up,
prog_up,
state,
item_id_val,
score_up,
)
else:
return (
gr.update(visible=False),
gr.update(),
gr.update(),
gr.update(visible=True, value='All tasks completed. Thank you for participating!'),
gr.update(),
gr.update(visible=True, value=f'**Done:** {done_count}/{total}'),
state,
None,
None,
)
# Load the next pending item
item_id = nxt['item_id']
# Update status to in_progress and acquire lease
now_dt = datetime.now(timezone.utc)
lease_until = (now_dt + timedelta(seconds=600)).isoformat()
with db._connect() as conn, db._lock:
conn.execute(
"""
UPDATE assignments
SET status = 'in_progress', started_at = %s, lease_until = %s
WHERE round_id = %s AND item_id = %s AND user_id = %s
""",
(now_dt.isoformat(), lease_until, args.round_id, item_id, user_id),
)
img_path = format_image_path(args.image_template, nxt['label'], nxt['path'])
current_idx = next((i + 1 for i, it in enumerate(items_list) if it['item_id'] == item_id), 1)
label_md_value = f'## Label: {nxt["label"]}'
return (
gr.update(value=img_path, visible=True),
gr.update(visible=True, value=label_md_value),
gr.update(visible=True, choices=nxt['words'] + ['ALL WORDS PRESENT'], value=[]),
gr.update(visible=False),
gr.update(visible=True, value=f'**At:** {current_idx}/{total}'),
gr.update(visible=True, value=f'**Done:** {done_count}/{total}'),
state,
item_id,
gr.update(value=None),
)
def submit(
user_state_val: dict[str, Any],
current_item_id: str | None,
score_val: str | None,
words_not_present: list[str],
) -> ReturnTuple:
"""Submit the answer and move to the next item."""
if not user_state_val or not user_state_val.get('user_id'):
return _error_return('Please start/continue', current_item_id=current_item_id)
if not current_item_id:
return _error_return('No current item', current_item_id=current_item_id)
# check if user selected 'ALL WORDS PRESENT', no other words should be selected
if 'ALL WORDS PRESENT' in words_not_present and len(words_not_present) > 1:
return _error_return(
'Please select either ALL WORDS PRESENT or specific words', current_item_id=current_item_id
)
# check 2 questions are answered
if score_val is None or len(words_not_present) == 0:
return _error_return(
'Please answer the 2 questions before submitting', current_item_id=current_item_id
)
user_id = user_state_val['user_id']
# Get item info
with db._connect() as conn:
cur = conn.execute(
'SELECT label, path FROM tasks WHERE round_id = %s AND item_id = %s',
(args.round_id, current_item_id),
)
row = cur.fetchone()
if not row:
return _error_return('Item not found in database', current_item_id=current_item_id)
label, path = row
img_path = format_image_path(args.image_template, label, path)
# Filter out 'ALL WORDS PRESENT' from the selected words - if selected, means no words are missing
filtered_words = [w for w in (words_not_present or []) if w != 'ALL WORDS PRESENT']
# Record answer (filtered_words is the list of words not in the image, empty if 'ALL PRESENT' was selected)
db.record_answer(args.round_id, user_id, current_item_id, label, img_path, int(score_val), filtered_words)
# Update assignment status to completed
now = datetime.now(timezone.utc).isoformat()
with db._connect() as conn, db._lock:
conn.execute(
"""
UPDATE assignments
SET status = 'completed', completed_at = %s, lease_until = NULL
WHERE round_id = %s AND item_id = %s AND user_id = %s
""",
(now, args.round_id, current_item_id, user_id),
)
# update redundancy_completed
conn.execute(
"""
UPDATE task_config
SET redundancy_completed = (
SELECT COUNT(*) FROM assignments
WHERE round_id = %s AND item_id = %s AND status = 'completed'
)
WHERE round_id = %s AND item_id = %s
""",
(args.round_id, current_item_id, args.round_id, current_item_id),
)
items_list = get_user_items_list(user_id)
total = len(items_list)
completed_items = [it for it in items_list if it['status'] == 'completed']
done_count = len(completed_items)
# Find current index and go to the next item in order
current_idx_in_list = next(
(i for i, item in enumerate(items_list) if item['item_id'] == current_item_id), None
)
# # db auto commit
# if (
# args.auto_commit > 0
# and current_idx_in_list is not None
# and (current_idx_in_list + 1) % args.auto_commit == 0
# ):
# try:
# db.commit_and_push_db()
# except Exception as e:
# print(f'Failed to commit and push the database: {e}')
if current_idx_in_list is None or current_idx_in_list >= len(items_list) - 1:
# This was the last item - show completion message but stay on current item
gr.Info('Congratulations! All tasks completed. Thank you for participating! ☺️', duration=10)
# Reload the current (last) item to show it with answers
last_item = items_list[-1] if items_list else None
if last_item:
return _load_item_helper(user_id, last_item, items_list, len(items_list))
else:
# This should not happen (items_list empty after completion check)
gr.Info('All tasks completed. Thank you for participating!', duration=10)
return (
gr.update(visible=False),
gr.update(),
gr.update(),
gr.update(visible=True, value='All tasks completed. Thank you for participating!'),
gr.update(),
gr.update(visible=True, value=f'**Done:** {done_count}/{total}'),
current_item_id,
gr.update(),
)
# Go to the next item in sequence
next_item = items_list[current_idx_in_list + 1]
return _load_item_helper(user_id, next_item, items_list, current_idx_in_list + 2)
def _load_item_helper(
user_id: str, item: dict[str, Any], items_list: list[dict[str, Any]], current_idx: int
) -> tuple:
"""Load an item and return Gradio updates."""
item_id = item['item_id']
total = len(items_list)
completed_items = [it for it in items_list if it['status'] == 'completed']
done_count = len(completed_items)
# Handle different status: completed vs pending/in_progress
if item['status'] == 'completed':
# Completed item: no lease needed, just load existing answer
existing = db.get_answer(args.round_id, user_id, item_id)
if existing:
s, words_not_present_list = existing
s_val = str(int(s))
# If no words are missing (empty list), show 'ALL WORDS PRESENT' as selected
words_cb_val = words_not_present_list if words_not_present_list else ['ALL WORDS PRESENT']
else:
# This should not happen: completed item without answer
raise ValueError(f'Item {item_id} is marked as completed but has no answer in database')
else:
# Pending or in_progress: acquire lease
if item['status'] == 'pending':
# Update to in_progress
now_dt = datetime.now(timezone.utc)
lease_until = (now_dt + timedelta(seconds=600)).isoformat()
with db._connect() as conn, db._lock:
conn.execute(
"""
UPDATE assignments
SET status = 'in_progress', started_at = %s, lease_until = %s
WHERE round_id = %s AND item_id = %s AND user_id = %s
""",
(now_dt.isoformat(), lease_until, args.round_id, item_id, user_id),
)
elif item['status'] == 'in_progress':
# Renew lease
now_dt = datetime.now(timezone.utc)
lease_until = (now_dt + timedelta(seconds=600)).isoformat()
with db._connect() as conn, db._lock:
conn.execute(
"""
UPDATE assignments
SET lease_until = %s
WHERE round_id = %s AND item_id = %s AND user_id = %s
""",
(lease_until, args.round_id, item_id, user_id),
)
s_val, words_cb_val = None, []
img_path = format_image_path(args.image_template, item['label'], item['path'])
label_md_value = f'## Label: {item["label"]}'
return (
gr.update(value=img_path, visible=True),
gr.update(visible=True, value=label_md_value),
gr.update(visible=True, choices=item['words'] + ['ALL WORDS PRESENT'], value=words_cb_val),
gr.update(visible=False),
gr.update(visible=True, value=f'**At:** {current_idx}/{total}'),
gr.update(visible=True, value=f'**Done:** {done_count}/{total}'),
item_id,
gr.update(value=s_val),
)
def _find_item_by_index(user_state_val: dict[str, Any], index1: int) -> str | None:
"""Get the item id by index in a user's assigned items."""
if not user_state_val or not user_state_val.get('user_id'):
return None
user_id = user_state_val['user_id']
items_list = get_user_items_list(user_id)
if index1 < 1 or index1 > len(items_list):
return None
return items_list[index1 - 1]['item_id']
def _load_item(user_state_val: dict[str, Any], item_id: str) -> ReturnTuple:
"""Load and display an item."""
if not user_state_val or not user_state_val.get('user_id'):
return _error_return('Please start/continue', keep_current_item=False)
user_id = user_state_val['user_id']
items_list = get_user_items_list(user_id)
# Find the item
item = next((it for it in items_list if it['item_id'] == item_id), None)
if not item:
return _error_return('Item not found in your assignments', keep_current_item=False)
current_idx = next((i + 1 for i, it in enumerate(items_list) if it['item_id'] == item_id), 1)
return _load_item_helper(user_id, item, items_list, current_idx)
def jump_to(
user_state_val: dict[str, Any], index_number: float | None, current_item_id: str | None
) -> ReturnTuple:
"""Jump to the item by index (only completed items)."""
if not user_state_val or not user_state_val.get('user_id'):
return _error_return('Please start/continue', current_item_id=current_item_id)
if index_number is None:
return _error_return('Please input the index', current_item_id=current_item_id)
target_index = int(index_number)
target_item_id = _find_item_by_index(user_state_val, target_index)
if not target_item_id:
return _error_return('Index out of range', current_item_id=current_item_id)
# Check jump constraints
user_id = user_state_val['user_id']
items_list = get_user_items_list(user_id)
target_item = next((it for it in items_list if it['item_id'] == target_item_id), None)
if not target_item:
return _error_return('Target item not found', current_item_id=current_item_id)
# Find the first non-completed item index
first_pending_idx = next(
(i for i, it in enumerate(items_list) if it['status'] != 'completed'), len(items_list)
)
# Allow jump to: completed items OR the first non-completed item (but not beyond)
if target_item['status'] != 'completed' and target_index - 1 > first_pending_idx:
return _error_return(
'Can only jump to completed items or the first pending item', current_item_id=current_item_id
)
return _load_item(user_state_val, target_item_id)
def handle_prev(user_state_val: dict[str, Any], current_item_id: str | None) -> ReturnTuple:
"""Handle Prev button: navigate to previous item (completed items only)."""
if not user_state_val or not user_state_val.get('user_id'):
return _error_return('Please start/continue', current_item_id=current_item_id)
user_id = user_state_val['user_id']
items_list = get_user_items_list(user_id)
if not items_list:
return _error_return('No items assigned', keep_current_item=False)
if current_item_id is None:
return _error_return('No current item', current_item_id=current_item_id)
current_idx = next((i for i, it in enumerate(items_list) if it['item_id'] == current_item_id), None)
if current_idx is None:
return _error_return('Current item not found in your assignments', current_item_id=current_item_id)
if current_idx == 0:
return _error_return('Already at the first item', current_item_id=current_item_id)
target_idx = current_idx - 1
target_item = items_list[target_idx]
# Only allow navigating to completed items
if target_item['status'] != 'completed':
return _error_return(
'Can only navigate to completed items using Prev', current_item_id=current_item_id
)
return _load_item(user_state_val, target_item['item_id'])
def handle_next(
user_state_val: dict[str, Any],
current_item_id: str | None,
score_val: str | None,
words_not_present: list[str],
) -> ReturnTuple:
"""Navigate to next item.
Rules:
- If current item is NOT completed, require submission first
- If current item is completed, allow free navigation
"""
if not user_state_val or not user_state_val.get('user_id'):
return _error_return('Please start/continue', current_item_id=current_item_id)
user_id = user_state_val['user_id']
items_list = get_user_items_list(user_id)
if not items_list:
return _error_return('No items assigned', keep_current_item=False)
if current_item_id is None:
return _error_return('No current item', current_item_id=current_item_id)
current_idx = next((i for i, it in enumerate(items_list) if it['item_id'] == current_item_id), None)
if current_idx is None:
return _error_return('Current item not found in your assignments', current_item_id=current_item_id)
if current_idx >= len(items_list) - 1:
return _error_return('Already at the last item', current_item_id=current_item_id)
current_item = items_list[current_idx]
current_is_completed = current_item['status'] == 'completed'
# If current item is NOT completed, require submission
if not current_is_completed:
return _error_return(
'Please submit your answer before moving to the next item', current_item_id=current_item_id
)
# Move to next item
target_idx = current_idx + 1
target_item = items_list[target_idx]
return _load_item(user_state_val, target_item['item_id'])
start_btn.click(
fn=start_or_resume,
inputs=[user_id_inp, user_state],
outputs=[
img,
label_md,
words_checkbox,
done_md,
current_idx_md,
progress_md,
user_state,
current_item_state,
score,
],
queue=True,
)
submit_btn.click(
fn=submit,
inputs=[user_state, current_item_state, score, words_checkbox],
outputs=[
img,
label_md,
words_checkbox,
done_md,
current_idx_md,
progress_md,
current_item_state,
score,
],
queue=True,
)
jump_btn.click(
fn=jump_to,
inputs=[user_state, jump_idx, current_item_state],
outputs=[
img,
label_md,
words_checkbox,
done_md,
current_idx_md,
progress_md,
current_item_state,
score,
],
queue=True,
)
prev_btn.click(
fn=handle_prev,
inputs=[user_state, current_item_state],
outputs=[
img,
label_md,
words_checkbox,
done_md,
current_idx_md,
progress_md,
current_item_state,
score,
],
queue=True,
)
next_btn.click(
fn=handle_next,
inputs=[user_state, current_item_state, score, words_checkbox],
outputs=[
img,
label_md,
words_checkbox,
done_md,
current_idx_md,
progress_md,
current_item_state,
score,
],
queue=True,
)
# Extract image directory from template to add to allowed_paths
# e.g., "/path/to/img/{label}/{path}" -> "/path/to/img"
image_dir = args.image_template.split('{')[0].rstrip('/')
if image_dir:
allowed_paths = [image_dir]
else:
allowed_paths = None
print(f'Allowed paths: {allowed_paths}')
demo.queue(max_size=256).launch(
ssr_mode=False,
allowed_paths=allowed_paths,
)