"""Evaluation server.""" import json from datetime import datetime, timedelta, timezone from typing import Any import gradio as gr from .allocation import AllocationEngine from .database import DB from .instuction_md import ( important_notes_instruction, login_instruction, nav_instruction, q1_instruction, q2_instruction, ) # Type alias for return tuples # Most functions return 8 elements (without user_state) ReturnTuple = tuple[ dict[str, Any], # img dict[str, Any], # label_md dict[str, Any], # words_checkbox dict[str, Any], # done_md dict[str, Any], # current_idx_md dict[str, Any], # progress_md str | None, # current_item_state dict[str, Any], # score ] # start_or_resume returns 9 elements (includes user_state) StartReturnTuple = tuple[ dict[str, Any], # img dict[str, Any], # label_md dict[str, Any], # words_checkbox dict[str, Any], # done_md dict[str, Any], # current_idx_md dict[str, Any], # progress_md dict[str, Any], # user_state str | None, # current_item_state str | None, # score ] def _error_return(message: str, keep_current_item: bool = True, current_item_id: str | None = None) -> ReturnTuple: """Create a standard error return tuple with notification. Args: message: Error message to display keep_current_item: If True, preserve current_item_id; if False, set to None current_item_id: The current item ID (only used if keep_current_item is True) Returns: Standard 8-element return tuple with error message """ # Show warning notification (3 seconds) gr.Warning(message, duration=3) return ( gr.update(), gr.update(), gr.update(), gr.update(value=message), gr.update(), gr.update(), current_item_id if keep_current_item else None, gr.update(), ) def format_image_path(template: str, label: str, path: str) -> str: """Replace the template with the label and path.""" return template.format(label=label, path=path) def app_main(args) -> None: """Run the evaluation server.""" # Set custom temp directory to avoid permission issues with shared /tmp/gradio # user_temp_dir = os.path.join(tempfile.gettempdir(), f'gradio_{os.getenv("USER", "user")}') # os.makedirs(user_temp_dir, exist_ok=True) # os.environ['GRADIO_TEMP_DIR'] = user_temp_dir db_path = args.db_path db = DB(db_path) # Get auto-allocation setting auto_allo_num = args.auto_allo_num allowed_users = args.allowed_users with gr.Blocks(title='Icon Evaluation', theme=gr.themes.Base()) as demo: user_state = gr.State({'user_id': None}) current_item_state = gr.State(None) # current item_id # Instructions page (shown first) with gr.Column(visible=True) as instructions_page: # gr.Markdown(task_instructions) gr.Markdown('# Instructions') gr.Markdown( 'Welcome to the Icon Evaluation Task! Please read the instructions first time you start the task.' ) with gr.Walkthrough(selected=0) as walkthrough: with gr.Step('Login', id=0): gr.Markdown(login_instruction) gr.HTML( """""" ) btn = gr.Button('Next Step') btn.click(lambda: gr.Walkthrough(selected=1), outputs=walkthrough) with gr.Step('Question 1', id=1): gr.Markdown(q1_instruction) gr.HTML( """""" ) btn = gr.Button('Next Step') btn.click(lambda: gr.Walkthrough(selected=2), outputs=walkthrough) with gr.Step('Question 2', id=2): gr.Markdown(q2_instruction) gr.HTML( """""" ) btn = gr.Button('Next Step') btn.click(lambda: gr.Walkthrough(selected=3), outputs=walkthrough) with gr.Step('Important Notes', id=3): gr.Markdown(important_notes_instruction) gr.HTML( """""" ) gr.Markdown(nav_instruction) gr.HTML( """""" ) start_task_btn = gr.Button('🚀 Start Evaluation Task', variant='primary', size='lg') # Main evaluation interface (hidden initially) with gr.Column(visible=False) as main_page: with gr.Row(): user_id_inp = gr.Textbox(label='User ID', placeholder='e.g.: alice_01', scale=2) start_btn = gr.Button('Start/Resume', variant='primary', scale=1) # Progress indicators (At: current index, Done: completed count) with gr.Row(): current_idx_md = gr.Markdown('**At:** -/-', visible=False) progress_md = gr.Markdown('**Done:** -/-', visible=False) # Image display (disable download function) img = gr.Image(label='Image', type='filepath', height=256, show_download_button=False) # Collapsible task guide with gr.Accordion('Task Instructions (Click to expand)', open=False): gr.Markdown(q1_instruction + '\n' + q2_instruction) # Relevance score label_md = gr.Markdown(visible=False) score = gr.Radio( choices=['1', '2', '3', '4', '5'], label='1-5 points: the relevance of the image to the label', interactive=True, ) # 10 words checkbox words_checkbox = gr.CheckboxGroup( choices=[], label='10 words: Select the words that are NOT presented in the image or "ALL WORDS PRESENT"', interactive=True, visible=False, ) # Action buttons with gr.Row(): submit_btn = gr.Button('Submit and Next', variant='primary') # Navigation controls with gr.Row(): jump_idx = gr.Number(label='Jump to the index (1-indexed)', precision=0) jump_btn = gr.Button('Jump') prev_btn = gr.Button('Prev') next_btn = gr.Button('Next') # Status messages done_md = gr.Markdown(visible=False) # Button to show main page start_task_btn.click( fn=lambda: (gr.update(visible=False), gr.update(visible=True)), outputs=[instructions_page, main_page], queue=False, ) def get_user_items_list(user_id: str) -> list[dict[str, Any]]: """Get ordered list of items assigned to user.""" with db._connect() as conn: cur = conn.execute( """ SELECT t.item_id, t.label, t.path, t.words, t.order_key, a.status FROM assignments a JOIN tasks t ON a.round_id = t.round_id AND a.item_id = t.item_id WHERE a.round_id = %s AND a.user_id = %s ORDER BY t.order_key """, (args.round_id, user_id), ) rows = cur.fetchall() items_list = [] for row in rows: item_id, label, path, words_json, order_key, status = row items_list.append( { 'item_id': item_id, 'label': label, 'path': path, 'words': json.loads(words_json), 'order_key': order_key, 'status': status, } ) return items_list def get_next_item(user_id: str) -> dict[str, Any] | None: """Get the next pending or in_progress item for user to resume.""" items_list = get_user_items_list(user_id) for item in items_list: if item['status'] in ('pending', 'in_progress'): return item return None def start_or_resume(user_id: str, state: dict[str, Any]) -> StartReturnTuple: """Start or resume the evaluation. Return: img, label_md, words_checkbox, done_md, current_idx_md, progress_md, state, current_item_state, score """ user_id = user_id.strip() if allowed_users and user_id not in allowed_users: gr.Warning('User ID not allowed', duration=3) return ( gr.update(), gr.update(), gr.update(), gr.update(visible=True, value='User ID not allowed'), gr.update(), gr.update(), state, None, None, ) if not user_id or user_id == '': gr.Warning('Please input the user ID', duration=3) return ( gr.update(), gr.update(), gr.update(), gr.update(visible=True, value='Please input the user ID.'), gr.update(), gr.update(), state, None, None, ) state = {'user_id': user_id} db.cleanup_expired_leases(args.round_id) items_list = get_user_items_list(user_id) # Auto-allocate for new users if not items_list and auto_allo_num > 0: engine = AllocationEngine(db, args.round_id, operator='auto_system') result = engine.allocate( from_source='unassigned', to_target=f'user:{user_id}', amount_spec=f'count:{auto_allo_num}', redundancy=args.redundancy, force=False, dry_run=False, reason='Auto-allocation for new user on first login', ) if result['success']: allocated_count = result['affected_count'] gr.Info(f'Welcome! You have {allocated_count} tasks to evaluate. Enjoy!', duration=5) items_list = get_user_items_list(user_id) else: error_msg = result.get('error', 'Failed to automatically allocate tasks') if 'No tasks available' in error_msg: gr.Warning('No tasks available. Stay tuned!', duration=5) else: gr.Warning( f'Failed to automatically allocate tasks: {error_msg}. ' 'Please contact the admin(janeding@umich.edu).', duration=8, ) if not items_list: gr.Warning('No tasks assigned to this user. Please contact admin.', duration=5) return ( gr.update(visible=False), gr.update(), gr.update(), gr.update(visible=True, value='No tasks assigned to this user. Please contact admin.'), gr.update(), gr.update(), state, None, None, ) total = len(items_list) completed_items = [it for it in items_list if it['status'] == 'completed'] done_count = len(completed_items) nxt = get_next_item(user_id) if nxt is None: # all items are done # Load the last completed item and show completion message gr.Info('Congratulations! All tasks completed. Thank you for participating! ☺️', duration=10) last_item = items_list[-1] if items_list else None if last_item: # Get the 8-element result from helper and insert user_state ( img_up, label_up, words_up_cb, done_up, idx_up, prog_up, item_id_val, score_up, ) = _load_item_helper(user_id, last_item, items_list, len(items_list)) # Return 9 elements with user_state inserted at position 7 return ( img_up, label_up, words_up_cb, done_up, idx_up, prog_up, state, item_id_val, score_up, ) else: return ( gr.update(visible=False), gr.update(), gr.update(), gr.update(visible=True, value='All tasks completed. Thank you for participating!'), gr.update(), gr.update(visible=True, value=f'**Done:** {done_count}/{total}'), state, None, None, ) # Load the next pending item item_id = nxt['item_id'] # Update status to in_progress and acquire lease now_dt = datetime.now(timezone.utc) lease_until = (now_dt + timedelta(seconds=600)).isoformat() with db._connect() as conn, db._lock: conn.execute( """ UPDATE assignments SET status = 'in_progress', started_at = %s, lease_until = %s WHERE round_id = %s AND item_id = %s AND user_id = %s """, (now_dt.isoformat(), lease_until, args.round_id, item_id, user_id), ) img_path = format_image_path(args.image_template, nxt['label'], nxt['path']) current_idx = next((i + 1 for i, it in enumerate(items_list) if it['item_id'] == item_id), 1) label_md_value = f'## Label: {nxt["label"]}' return ( gr.update(value=img_path, visible=True), gr.update(visible=True, value=label_md_value), gr.update(visible=True, choices=nxt['words'] + ['ALL WORDS PRESENT'], value=[]), gr.update(visible=False), gr.update(visible=True, value=f'**At:** {current_idx}/{total}'), gr.update(visible=True, value=f'**Done:** {done_count}/{total}'), state, item_id, gr.update(value=None), ) def submit( user_state_val: dict[str, Any], current_item_id: str | None, score_val: str | None, words_not_present: list[str], ) -> ReturnTuple: """Submit the answer and move to the next item.""" if not user_state_val or not user_state_val.get('user_id'): return _error_return('Please start/continue', current_item_id=current_item_id) if not current_item_id: return _error_return('No current item', current_item_id=current_item_id) # check if user selected 'ALL WORDS PRESENT', no other words should be selected if 'ALL WORDS PRESENT' in words_not_present and len(words_not_present) > 1: return _error_return( 'Please select either ALL WORDS PRESENT or specific words', current_item_id=current_item_id ) # check 2 questions are answered if score_val is None or len(words_not_present) == 0: return _error_return( 'Please answer the 2 questions before submitting', current_item_id=current_item_id ) user_id = user_state_val['user_id'] # Get item info with db._connect() as conn: cur = conn.execute( 'SELECT label, path FROM tasks WHERE round_id = %s AND item_id = %s', (args.round_id, current_item_id), ) row = cur.fetchone() if not row: return _error_return('Item not found in database', current_item_id=current_item_id) label, path = row img_path = format_image_path(args.image_template, label, path) # Filter out 'ALL WORDS PRESENT' from the selected words - if selected, means no words are missing filtered_words = [w for w in (words_not_present or []) if w != 'ALL WORDS PRESENT'] # Record answer (filtered_words is the list of words not in the image, empty if 'ALL PRESENT' was selected) db.record_answer(args.round_id, user_id, current_item_id, label, img_path, int(score_val), filtered_words) # Update assignment status to completed now = datetime.now(timezone.utc).isoformat() with db._connect() as conn, db._lock: conn.execute( """ UPDATE assignments SET status = 'completed', completed_at = %s, lease_until = NULL WHERE round_id = %s AND item_id = %s AND user_id = %s """, (now, args.round_id, current_item_id, user_id), ) # update redundancy_completed conn.execute( """ UPDATE task_config SET redundancy_completed = ( SELECT COUNT(*) FROM assignments WHERE round_id = %s AND item_id = %s AND status = 'completed' ) WHERE round_id = %s AND item_id = %s """, (args.round_id, current_item_id, args.round_id, current_item_id), ) items_list = get_user_items_list(user_id) total = len(items_list) completed_items = [it for it in items_list if it['status'] == 'completed'] done_count = len(completed_items) # Find current index and go to the next item in order current_idx_in_list = next( (i for i, item in enumerate(items_list) if item['item_id'] == current_item_id), None ) # # db auto commit # if ( # args.auto_commit > 0 # and current_idx_in_list is not None # and (current_idx_in_list + 1) % args.auto_commit == 0 # ): # try: # db.commit_and_push_db() # except Exception as e: # print(f'Failed to commit and push the database: {e}') if current_idx_in_list is None or current_idx_in_list >= len(items_list) - 1: # This was the last item - show completion message but stay on current item gr.Info('Congratulations! All tasks completed. Thank you for participating! ☺️', duration=10) # Reload the current (last) item to show it with answers last_item = items_list[-1] if items_list else None if last_item: return _load_item_helper(user_id, last_item, items_list, len(items_list)) else: # This should not happen (items_list empty after completion check) gr.Info('All tasks completed. Thank you for participating!', duration=10) return ( gr.update(visible=False), gr.update(), gr.update(), gr.update(visible=True, value='All tasks completed. Thank you for participating!'), gr.update(), gr.update(visible=True, value=f'**Done:** {done_count}/{total}'), current_item_id, gr.update(), ) # Go to the next item in sequence next_item = items_list[current_idx_in_list + 1] return _load_item_helper(user_id, next_item, items_list, current_idx_in_list + 2) def _load_item_helper( user_id: str, item: dict[str, Any], items_list: list[dict[str, Any]], current_idx: int ) -> tuple: """Load an item and return Gradio updates.""" item_id = item['item_id'] total = len(items_list) completed_items = [it for it in items_list if it['status'] == 'completed'] done_count = len(completed_items) # Handle different status: completed vs pending/in_progress if item['status'] == 'completed': # Completed item: no lease needed, just load existing answer existing = db.get_answer(args.round_id, user_id, item_id) if existing: s, words_not_present_list = existing s_val = str(int(s)) # If no words are missing (empty list), show 'ALL WORDS PRESENT' as selected words_cb_val = words_not_present_list if words_not_present_list else ['ALL WORDS PRESENT'] else: # This should not happen: completed item without answer raise ValueError(f'Item {item_id} is marked as completed but has no answer in database') else: # Pending or in_progress: acquire lease if item['status'] == 'pending': # Update to in_progress now_dt = datetime.now(timezone.utc) lease_until = (now_dt + timedelta(seconds=600)).isoformat() with db._connect() as conn, db._lock: conn.execute( """ UPDATE assignments SET status = 'in_progress', started_at = %s, lease_until = %s WHERE round_id = %s AND item_id = %s AND user_id = %s """, (now_dt.isoformat(), lease_until, args.round_id, item_id, user_id), ) elif item['status'] == 'in_progress': # Renew lease now_dt = datetime.now(timezone.utc) lease_until = (now_dt + timedelta(seconds=600)).isoformat() with db._connect() as conn, db._lock: conn.execute( """ UPDATE assignments SET lease_until = %s WHERE round_id = %s AND item_id = %s AND user_id = %s """, (lease_until, args.round_id, item_id, user_id), ) s_val, words_cb_val = None, [] img_path = format_image_path(args.image_template, item['label'], item['path']) label_md_value = f'## Label: {item["label"]}' return ( gr.update(value=img_path, visible=True), gr.update(visible=True, value=label_md_value), gr.update(visible=True, choices=item['words'] + ['ALL WORDS PRESENT'], value=words_cb_val), gr.update(visible=False), gr.update(visible=True, value=f'**At:** {current_idx}/{total}'), gr.update(visible=True, value=f'**Done:** {done_count}/{total}'), item_id, gr.update(value=s_val), ) def _find_item_by_index(user_state_val: dict[str, Any], index1: int) -> str | None: """Get the item id by index in a user's assigned items.""" if not user_state_val or not user_state_val.get('user_id'): return None user_id = user_state_val['user_id'] items_list = get_user_items_list(user_id) if index1 < 1 or index1 > len(items_list): return None return items_list[index1 - 1]['item_id'] def _load_item(user_state_val: dict[str, Any], item_id: str) -> ReturnTuple: """Load and display an item.""" if not user_state_val or not user_state_val.get('user_id'): return _error_return('Please start/continue', keep_current_item=False) user_id = user_state_val['user_id'] items_list = get_user_items_list(user_id) # Find the item item = next((it for it in items_list if it['item_id'] == item_id), None) if not item: return _error_return('Item not found in your assignments', keep_current_item=False) current_idx = next((i + 1 for i, it in enumerate(items_list) if it['item_id'] == item_id), 1) return _load_item_helper(user_id, item, items_list, current_idx) def jump_to( user_state_val: dict[str, Any], index_number: float | None, current_item_id: str | None ) -> ReturnTuple: """Jump to the item by index (only completed items).""" if not user_state_val or not user_state_val.get('user_id'): return _error_return('Please start/continue', current_item_id=current_item_id) if index_number is None: return _error_return('Please input the index', current_item_id=current_item_id) target_index = int(index_number) target_item_id = _find_item_by_index(user_state_val, target_index) if not target_item_id: return _error_return('Index out of range', current_item_id=current_item_id) # Check jump constraints user_id = user_state_val['user_id'] items_list = get_user_items_list(user_id) target_item = next((it for it in items_list if it['item_id'] == target_item_id), None) if not target_item: return _error_return('Target item not found', current_item_id=current_item_id) # Find the first non-completed item index first_pending_idx = next( (i for i, it in enumerate(items_list) if it['status'] != 'completed'), len(items_list) ) # Allow jump to: completed items OR the first non-completed item (but not beyond) if target_item['status'] != 'completed' and target_index - 1 > first_pending_idx: return _error_return( 'Can only jump to completed items or the first pending item', current_item_id=current_item_id ) return _load_item(user_state_val, target_item_id) def handle_prev(user_state_val: dict[str, Any], current_item_id: str | None) -> ReturnTuple: """Handle Prev button: navigate to previous item (completed items only).""" if not user_state_val or not user_state_val.get('user_id'): return _error_return('Please start/continue', current_item_id=current_item_id) user_id = user_state_val['user_id'] items_list = get_user_items_list(user_id) if not items_list: return _error_return('No items assigned', keep_current_item=False) if current_item_id is None: return _error_return('No current item', current_item_id=current_item_id) current_idx = next((i for i, it in enumerate(items_list) if it['item_id'] == current_item_id), None) if current_idx is None: return _error_return('Current item not found in your assignments', current_item_id=current_item_id) if current_idx == 0: return _error_return('Already at the first item', current_item_id=current_item_id) target_idx = current_idx - 1 target_item = items_list[target_idx] # Only allow navigating to completed items if target_item['status'] != 'completed': return _error_return( 'Can only navigate to completed items using Prev', current_item_id=current_item_id ) return _load_item(user_state_val, target_item['item_id']) def handle_next( user_state_val: dict[str, Any], current_item_id: str | None, score_val: str | None, words_not_present: list[str], ) -> ReturnTuple: """Navigate to next item. Rules: - If current item is NOT completed, require submission first - If current item is completed, allow free navigation """ if not user_state_val or not user_state_val.get('user_id'): return _error_return('Please start/continue', current_item_id=current_item_id) user_id = user_state_val['user_id'] items_list = get_user_items_list(user_id) if not items_list: return _error_return('No items assigned', keep_current_item=False) if current_item_id is None: return _error_return('No current item', current_item_id=current_item_id) current_idx = next((i for i, it in enumerate(items_list) if it['item_id'] == current_item_id), None) if current_idx is None: return _error_return('Current item not found in your assignments', current_item_id=current_item_id) if current_idx >= len(items_list) - 1: return _error_return('Already at the last item', current_item_id=current_item_id) current_item = items_list[current_idx] current_is_completed = current_item['status'] == 'completed' # If current item is NOT completed, require submission if not current_is_completed: return _error_return( 'Please submit your answer before moving to the next item', current_item_id=current_item_id ) # Move to next item target_idx = current_idx + 1 target_item = items_list[target_idx] return _load_item(user_state_val, target_item['item_id']) start_btn.click( fn=start_or_resume, inputs=[user_id_inp, user_state], outputs=[ img, label_md, words_checkbox, done_md, current_idx_md, progress_md, user_state, current_item_state, score, ], queue=True, ) submit_btn.click( fn=submit, inputs=[user_state, current_item_state, score, words_checkbox], outputs=[ img, label_md, words_checkbox, done_md, current_idx_md, progress_md, current_item_state, score, ], queue=True, ) jump_btn.click( fn=jump_to, inputs=[user_state, jump_idx, current_item_state], outputs=[ img, label_md, words_checkbox, done_md, current_idx_md, progress_md, current_item_state, score, ], queue=True, ) prev_btn.click( fn=handle_prev, inputs=[user_state, current_item_state], outputs=[ img, label_md, words_checkbox, done_md, current_idx_md, progress_md, current_item_state, score, ], queue=True, ) next_btn.click( fn=handle_next, inputs=[user_state, current_item_state, score, words_checkbox], outputs=[ img, label_md, words_checkbox, done_md, current_idx_md, progress_md, current_item_state, score, ], queue=True, ) # Extract image directory from template to add to allowed_paths # e.g., "/path/to/img/{label}/{path}" -> "/path/to/img" image_dir = args.image_template.split('{')[0].rstrip('/') if image_dir: allowed_paths = [image_dir] else: allowed_paths = None print(f'Allowed paths: {allowed_paths}') demo.queue(max_size=256).launch( ssr_mode=False, allowed_paths=allowed_paths, )