Spaces:
Sleeping
Sleeping
| """Evaluation server.""" | |
| import json | |
| from datetime import datetime, timedelta, timezone | |
| from typing import Any | |
| import gradio as gr | |
| from .allocation import AllocationEngine | |
| from .database import DB | |
| from .instuction_md import ( | |
| important_notes_instruction, | |
| login_instruction, | |
| nav_instruction, | |
| q1_instruction, | |
| q2_instruction, | |
| ) | |
| # Type alias for return tuples | |
| # Most functions return 8 elements (without user_state) | |
| ReturnTuple = tuple[ | |
| dict[str, Any], # img | |
| dict[str, Any], # label_md | |
| dict[str, Any], # words_checkbox | |
| dict[str, Any], # done_md | |
| dict[str, Any], # current_idx_md | |
| dict[str, Any], # progress_md | |
| str | None, # current_item_state | |
| dict[str, Any], # score | |
| ] | |
| # start_or_resume returns 9 elements (includes user_state) | |
| StartReturnTuple = tuple[ | |
| dict[str, Any], # img | |
| dict[str, Any], # label_md | |
| dict[str, Any], # words_checkbox | |
| dict[str, Any], # done_md | |
| dict[str, Any], # current_idx_md | |
| dict[str, Any], # progress_md | |
| dict[str, Any], # user_state | |
| str | None, # current_item_state | |
| str | None, # score | |
| ] | |
| def _error_return(message: str, keep_current_item: bool = True, current_item_id: str | None = None) -> ReturnTuple: | |
| """Create a standard error return tuple with notification. | |
| Args: | |
| message: Error message to display | |
| keep_current_item: If True, preserve current_item_id; if False, set to None | |
| current_item_id: The current item ID (only used if keep_current_item is True) | |
| Returns: | |
| Standard 8-element return tuple with error message | |
| """ | |
| # Show warning notification (3 seconds) | |
| gr.Warning(message, duration=3) | |
| return ( | |
| gr.update(), | |
| gr.update(), | |
| gr.update(), | |
| gr.update(value=message), | |
| gr.update(), | |
| gr.update(), | |
| current_item_id if keep_current_item else None, | |
| gr.update(), | |
| ) | |
| def format_image_path(template: str, label: str, path: str) -> str: | |
| """Replace the template with the label and path.""" | |
| return template.format(label=label, path=path) | |
| def app_main(args) -> None: | |
| """Run the evaluation server.""" | |
| # Set custom temp directory to avoid permission issues with shared /tmp/gradio | |
| # user_temp_dir = os.path.join(tempfile.gettempdir(), f'gradio_{os.getenv("USER", "user")}') | |
| # os.makedirs(user_temp_dir, exist_ok=True) | |
| # os.environ['GRADIO_TEMP_DIR'] = user_temp_dir | |
| db_path = args.db_path | |
| db = DB(db_path) | |
| # Get auto-allocation setting | |
| auto_allo_num = args.auto_allo_num | |
| allowed_users = args.allowed_users | |
| with gr.Blocks(title='Icon Evaluation', theme=gr.themes.Base()) as demo: | |
| user_state = gr.State({'user_id': None}) | |
| current_item_state = gr.State(None) # current item_id | |
| # Instructions page (shown first) | |
| with gr.Column(visible=True) as instructions_page: | |
| # gr.Markdown(task_instructions) | |
| gr.Markdown('# Instructions') | |
| gr.Markdown( | |
| 'Welcome to the Icon Evaluation Task! Please read the instructions first time you start the task.' | |
| ) | |
| with gr.Walkthrough(selected=0) as walkthrough: | |
| with gr.Step('Login', id=0): | |
| gr.Markdown(login_instruction) | |
| gr.HTML( | |
| """<img src="https://image2url.com/images/1760534494987-2ee98f6a-8ec5-40fc-b48b-026fc57b8b00.png">""" | |
| ) | |
| btn = gr.Button('Next Step') | |
| btn.click(lambda: gr.Walkthrough(selected=1), outputs=walkthrough) | |
| with gr.Step('Question 1', id=1): | |
| gr.Markdown(q1_instruction) | |
| gr.HTML( | |
| """<img src="https://image2url.com/images/1760534393840-1f609fc4-66a9-4033-9bca-6b6517ab2e4c.png">""" | |
| ) | |
| btn = gr.Button('Next Step') | |
| btn.click(lambda: gr.Walkthrough(selected=2), outputs=walkthrough) | |
| with gr.Step('Question 2', id=2): | |
| gr.Markdown(q2_instruction) | |
| gr.HTML( | |
| """<img src="https://image2url.com/images/1760534411276-0aa6e148-935d-4901-97c6-2697ae26f52e.png">""" | |
| ) | |
| btn = gr.Button('Next Step') | |
| btn.click(lambda: gr.Walkthrough(selected=3), outputs=walkthrough) | |
| with gr.Step('Important Notes', id=3): | |
| gr.Markdown(important_notes_instruction) | |
| gr.HTML( | |
| """<img src="https://image2url.com/images/1760534430736-43665629-d687-4fa9-8d13-8e577fbee25d.png">""" | |
| ) | |
| gr.Markdown(nav_instruction) | |
| gr.HTML( | |
| """<img src="https://image2url.com/images/1760534445418-88e9259a-76f8-47ca-80df-7efe06c546c8.png">""" | |
| ) | |
| start_task_btn = gr.Button('🚀 Start Evaluation Task', variant='primary', size='lg') | |
| # Main evaluation interface (hidden initially) | |
| with gr.Column(visible=False) as main_page: | |
| with gr.Row(): | |
| user_id_inp = gr.Textbox(label='User ID', placeholder='e.g.: alice_01', scale=2) | |
| start_btn = gr.Button('Start/Resume', variant='primary', scale=1) | |
| # Progress indicators (At: current index, Done: completed count) | |
| with gr.Row(): | |
| current_idx_md = gr.Markdown('**At:** -/-', visible=False) | |
| progress_md = gr.Markdown('**Done:** -/-', visible=False) | |
| # Image display (disable download function) | |
| img = gr.Image(label='Image', type='filepath', height=256, show_download_button=False) | |
| # Collapsible task guide | |
| with gr.Accordion('Task Instructions (Click to expand)', open=False): | |
| gr.Markdown(q1_instruction + '\n' + q2_instruction) | |
| # Relevance score | |
| label_md = gr.Markdown(visible=False) | |
| score = gr.Radio( | |
| choices=['1', '2', '3', '4', '5'], | |
| label='1-5 points: the relevance of the image to the label', | |
| interactive=True, | |
| ) | |
| # 10 words checkbox | |
| words_checkbox = gr.CheckboxGroup( | |
| choices=[], | |
| label='10 words: Select the words that are NOT presented in the image or "ALL WORDS PRESENT"', | |
| interactive=True, | |
| visible=False, | |
| ) | |
| # Action buttons | |
| with gr.Row(): | |
| submit_btn = gr.Button('Submit and Next', variant='primary') | |
| # Navigation controls | |
| with gr.Row(): | |
| jump_idx = gr.Number(label='Jump to the index (1-indexed)', precision=0) | |
| jump_btn = gr.Button('Jump') | |
| prev_btn = gr.Button('Prev') | |
| next_btn = gr.Button('Next') | |
| # Status messages | |
| done_md = gr.Markdown(visible=False) | |
| # Button to show main page | |
| start_task_btn.click( | |
| fn=lambda: (gr.update(visible=False), gr.update(visible=True)), | |
| outputs=[instructions_page, main_page], | |
| queue=False, | |
| ) | |
| def get_user_items_list(user_id: str) -> list[dict[str, Any]]: | |
| """Get ordered list of items assigned to user.""" | |
| with db._connect() as conn: | |
| cur = conn.execute( | |
| """ | |
| SELECT t.item_id, t.label, t.path, t.words, t.order_key, a.status | |
| FROM assignments a | |
| JOIN tasks t ON a.round_id = t.round_id AND a.item_id = t.item_id | |
| WHERE a.round_id = %s AND a.user_id = %s | |
| ORDER BY t.order_key | |
| """, | |
| (args.round_id, user_id), | |
| ) | |
| rows = cur.fetchall() | |
| items_list = [] | |
| for row in rows: | |
| item_id, label, path, words_json, order_key, status = row | |
| items_list.append( | |
| { | |
| 'item_id': item_id, | |
| 'label': label, | |
| 'path': path, | |
| 'words': json.loads(words_json), | |
| 'order_key': order_key, | |
| 'status': status, | |
| } | |
| ) | |
| return items_list | |
| def get_next_item(user_id: str) -> dict[str, Any] | None: | |
| """Get the next pending or in_progress item for user to resume.""" | |
| items_list = get_user_items_list(user_id) | |
| for item in items_list: | |
| if item['status'] in ('pending', 'in_progress'): | |
| return item | |
| return None | |
| def start_or_resume(user_id: str, state: dict[str, Any]) -> StartReturnTuple: | |
| """Start or resume the evaluation. | |
| Return: img, label_md, words_checkbox, done_md, current_idx_md, progress_md, | |
| state, current_item_state, score | |
| """ | |
| user_id = user_id.strip() | |
| if allowed_users and user_id not in allowed_users: | |
| gr.Warning('User ID not allowed', duration=3) | |
| return ( | |
| gr.update(), | |
| gr.update(), | |
| gr.update(), | |
| gr.update(visible=True, value='User ID not allowed'), | |
| gr.update(), | |
| gr.update(), | |
| state, | |
| None, | |
| None, | |
| ) | |
| if not user_id or user_id == '': | |
| gr.Warning('Please input the user ID', duration=3) | |
| return ( | |
| gr.update(), | |
| gr.update(), | |
| gr.update(), | |
| gr.update(visible=True, value='Please input the user ID.'), | |
| gr.update(), | |
| gr.update(), | |
| state, | |
| None, | |
| None, | |
| ) | |
| state = {'user_id': user_id} | |
| db.cleanup_expired_leases(args.round_id) | |
| items_list = get_user_items_list(user_id) | |
| # Auto-allocate for new users | |
| if not items_list and auto_allo_num > 0: | |
| engine = AllocationEngine(db, args.round_id, operator='auto_system') | |
| result = engine.allocate( | |
| from_source='unassigned', | |
| to_target=f'user:{user_id}', | |
| amount_spec=f'count:{auto_allo_num}', | |
| redundancy=args.redundancy, | |
| force=False, | |
| dry_run=False, | |
| reason='Auto-allocation for new user on first login', | |
| ) | |
| if result['success']: | |
| allocated_count = result['affected_count'] | |
| gr.Info(f'Welcome! You have {allocated_count} tasks to evaluate. Enjoy!', duration=5) | |
| items_list = get_user_items_list(user_id) | |
| else: | |
| error_msg = result.get('error', 'Failed to automatically allocate tasks') | |
| if 'No tasks available' in error_msg: | |
| gr.Warning('No tasks available. Stay tuned!', duration=5) | |
| else: | |
| gr.Warning( | |
| f'Failed to automatically allocate tasks: {error_msg}. ' | |
| 'Please contact the admin([email protected]).', | |
| duration=8, | |
| ) | |
| if not items_list: | |
| gr.Warning('No tasks assigned to this user. Please contact admin.', duration=5) | |
| return ( | |
| gr.update(visible=False), | |
| gr.update(), | |
| gr.update(), | |
| gr.update(visible=True, value='No tasks assigned to this user. Please contact admin.'), | |
| gr.update(), | |
| gr.update(), | |
| state, | |
| None, | |
| None, | |
| ) | |
| total = len(items_list) | |
| completed_items = [it for it in items_list if it['status'] == 'completed'] | |
| done_count = len(completed_items) | |
| nxt = get_next_item(user_id) | |
| if nxt is None: # all items are done | |
| # Load the last completed item and show completion message | |
| gr.Info('Congratulations! All tasks completed. Thank you for participating! ☺️', duration=10) | |
| last_item = items_list[-1] if items_list else None | |
| if last_item: | |
| # Get the 8-element result from helper and insert user_state | |
| ( | |
| img_up, | |
| label_up, | |
| words_up_cb, | |
| done_up, | |
| idx_up, | |
| prog_up, | |
| item_id_val, | |
| score_up, | |
| ) = _load_item_helper(user_id, last_item, items_list, len(items_list)) | |
| # Return 9 elements with user_state inserted at position 7 | |
| return ( | |
| img_up, | |
| label_up, | |
| words_up_cb, | |
| done_up, | |
| idx_up, | |
| prog_up, | |
| state, | |
| item_id_val, | |
| score_up, | |
| ) | |
| else: | |
| return ( | |
| gr.update(visible=False), | |
| gr.update(), | |
| gr.update(), | |
| gr.update(visible=True, value='All tasks completed. Thank you for participating!'), | |
| gr.update(), | |
| gr.update(visible=True, value=f'**Done:** {done_count}/{total}'), | |
| state, | |
| None, | |
| None, | |
| ) | |
| # Load the next pending item | |
| item_id = nxt['item_id'] | |
| # Update status to in_progress and acquire lease | |
| now_dt = datetime.now(timezone.utc) | |
| lease_until = (now_dt + timedelta(seconds=600)).isoformat() | |
| with db._connect() as conn, db._lock: | |
| conn.execute( | |
| """ | |
| UPDATE assignments | |
| SET status = 'in_progress', started_at = %s, lease_until = %s | |
| WHERE round_id = %s AND item_id = %s AND user_id = %s | |
| """, | |
| (now_dt.isoformat(), lease_until, args.round_id, item_id, user_id), | |
| ) | |
| img_path = format_image_path(args.image_template, nxt['label'], nxt['path']) | |
| current_idx = next((i + 1 for i, it in enumerate(items_list) if it['item_id'] == item_id), 1) | |
| label_md_value = f'## Label: <span style="color: #00AA00; font-weight: bold;">{nxt["label"]}</span>' | |
| return ( | |
| gr.update(value=img_path, visible=True), | |
| gr.update(visible=True, value=label_md_value), | |
| gr.update(visible=True, choices=nxt['words'] + ['ALL WORDS PRESENT'], value=[]), | |
| gr.update(visible=False), | |
| gr.update(visible=True, value=f'**At:** {current_idx}/{total}'), | |
| gr.update(visible=True, value=f'**Done:** {done_count}/{total}'), | |
| state, | |
| item_id, | |
| gr.update(value=None), | |
| ) | |
| def submit( | |
| user_state_val: dict[str, Any], | |
| current_item_id: str | None, | |
| score_val: str | None, | |
| words_not_present: list[str], | |
| ) -> ReturnTuple: | |
| """Submit the answer and move to the next item.""" | |
| if not user_state_val or not user_state_val.get('user_id'): | |
| return _error_return('Please start/continue', current_item_id=current_item_id) | |
| if not current_item_id: | |
| return _error_return('No current item', current_item_id=current_item_id) | |
| # check if user selected 'ALL WORDS PRESENT', no other words should be selected | |
| if 'ALL WORDS PRESENT' in words_not_present and len(words_not_present) > 1: | |
| return _error_return( | |
| 'Please select either ALL WORDS PRESENT or specific words', current_item_id=current_item_id | |
| ) | |
| # check 2 questions are answered | |
| if score_val is None or len(words_not_present) == 0: | |
| return _error_return( | |
| 'Please answer the 2 questions before submitting', current_item_id=current_item_id | |
| ) | |
| user_id = user_state_val['user_id'] | |
| # Get item info | |
| with db._connect() as conn: | |
| cur = conn.execute( | |
| 'SELECT label, path FROM tasks WHERE round_id = %s AND item_id = %s', | |
| (args.round_id, current_item_id), | |
| ) | |
| row = cur.fetchone() | |
| if not row: | |
| return _error_return('Item not found in database', current_item_id=current_item_id) | |
| label, path = row | |
| img_path = format_image_path(args.image_template, label, path) | |
| # Filter out 'ALL WORDS PRESENT' from the selected words - if selected, means no words are missing | |
| filtered_words = [w for w in (words_not_present or []) if w != 'ALL WORDS PRESENT'] | |
| # Record answer (filtered_words is the list of words not in the image, empty if 'ALL PRESENT' was selected) | |
| db.record_answer(args.round_id, user_id, current_item_id, label, img_path, int(score_val), filtered_words) | |
| # Update assignment status to completed | |
| now = datetime.now(timezone.utc).isoformat() | |
| with db._connect() as conn, db._lock: | |
| conn.execute( | |
| """ | |
| UPDATE assignments | |
| SET status = 'completed', completed_at = %s, lease_until = NULL | |
| WHERE round_id = %s AND item_id = %s AND user_id = %s | |
| """, | |
| (now, args.round_id, current_item_id, user_id), | |
| ) | |
| # update redundancy_completed | |
| conn.execute( | |
| """ | |
| UPDATE task_config | |
| SET redundancy_completed = ( | |
| SELECT COUNT(*) FROM assignments | |
| WHERE round_id = %s AND item_id = %s AND status = 'completed' | |
| ) | |
| WHERE round_id = %s AND item_id = %s | |
| """, | |
| (args.round_id, current_item_id, args.round_id, current_item_id), | |
| ) | |
| items_list = get_user_items_list(user_id) | |
| total = len(items_list) | |
| completed_items = [it for it in items_list if it['status'] == 'completed'] | |
| done_count = len(completed_items) | |
| # Find current index and go to the next item in order | |
| current_idx_in_list = next( | |
| (i for i, item in enumerate(items_list) if item['item_id'] == current_item_id), None | |
| ) | |
| # # db auto commit | |
| # if ( | |
| # args.auto_commit > 0 | |
| # and current_idx_in_list is not None | |
| # and (current_idx_in_list + 1) % args.auto_commit == 0 | |
| # ): | |
| # try: | |
| # db.commit_and_push_db() | |
| # except Exception as e: | |
| # print(f'Failed to commit and push the database: {e}') | |
| if current_idx_in_list is None or current_idx_in_list >= len(items_list) - 1: | |
| # This was the last item - show completion message but stay on current item | |
| gr.Info('Congratulations! All tasks completed. Thank you for participating! ☺️', duration=10) | |
| # Reload the current (last) item to show it with answers | |
| last_item = items_list[-1] if items_list else None | |
| if last_item: | |
| return _load_item_helper(user_id, last_item, items_list, len(items_list)) | |
| else: | |
| # This should not happen (items_list empty after completion check) | |
| gr.Info('All tasks completed. Thank you for participating!', duration=10) | |
| return ( | |
| gr.update(visible=False), | |
| gr.update(), | |
| gr.update(), | |
| gr.update(visible=True, value='All tasks completed. Thank you for participating!'), | |
| gr.update(), | |
| gr.update(visible=True, value=f'**Done:** {done_count}/{total}'), | |
| current_item_id, | |
| gr.update(), | |
| ) | |
| # Go to the next item in sequence | |
| next_item = items_list[current_idx_in_list + 1] | |
| return _load_item_helper(user_id, next_item, items_list, current_idx_in_list + 2) | |
| def _load_item_helper( | |
| user_id: str, item: dict[str, Any], items_list: list[dict[str, Any]], current_idx: int | |
| ) -> tuple: | |
| """Load an item and return Gradio updates.""" | |
| item_id = item['item_id'] | |
| total = len(items_list) | |
| completed_items = [it for it in items_list if it['status'] == 'completed'] | |
| done_count = len(completed_items) | |
| # Handle different status: completed vs pending/in_progress | |
| if item['status'] == 'completed': | |
| # Completed item: no lease needed, just load existing answer | |
| existing = db.get_answer(args.round_id, user_id, item_id) | |
| if existing: | |
| s, words_not_present_list = existing | |
| s_val = str(int(s)) | |
| # If no words are missing (empty list), show 'ALL WORDS PRESENT' as selected | |
| words_cb_val = words_not_present_list if words_not_present_list else ['ALL WORDS PRESENT'] | |
| else: | |
| # This should not happen: completed item without answer | |
| raise ValueError(f'Item {item_id} is marked as completed but has no answer in database') | |
| else: | |
| # Pending or in_progress: acquire lease | |
| if item['status'] == 'pending': | |
| # Update to in_progress | |
| now_dt = datetime.now(timezone.utc) | |
| lease_until = (now_dt + timedelta(seconds=600)).isoformat() | |
| with db._connect() as conn, db._lock: | |
| conn.execute( | |
| """ | |
| UPDATE assignments | |
| SET status = 'in_progress', started_at = %s, lease_until = %s | |
| WHERE round_id = %s AND item_id = %s AND user_id = %s | |
| """, | |
| (now_dt.isoformat(), lease_until, args.round_id, item_id, user_id), | |
| ) | |
| elif item['status'] == 'in_progress': | |
| # Renew lease | |
| now_dt = datetime.now(timezone.utc) | |
| lease_until = (now_dt + timedelta(seconds=600)).isoformat() | |
| with db._connect() as conn, db._lock: | |
| conn.execute( | |
| """ | |
| UPDATE assignments | |
| SET lease_until = %s | |
| WHERE round_id = %s AND item_id = %s AND user_id = %s | |
| """, | |
| (lease_until, args.round_id, item_id, user_id), | |
| ) | |
| s_val, words_cb_val = None, [] | |
| img_path = format_image_path(args.image_template, item['label'], item['path']) | |
| label_md_value = f'## Label: <span style="color: #00AA00; font-weight: bold;">{item["label"]}</span>' | |
| return ( | |
| gr.update(value=img_path, visible=True), | |
| gr.update(visible=True, value=label_md_value), | |
| gr.update(visible=True, choices=item['words'] + ['ALL WORDS PRESENT'], value=words_cb_val), | |
| gr.update(visible=False), | |
| gr.update(visible=True, value=f'**At:** {current_idx}/{total}'), | |
| gr.update(visible=True, value=f'**Done:** {done_count}/{total}'), | |
| item_id, | |
| gr.update(value=s_val), | |
| ) | |
| def _find_item_by_index(user_state_val: dict[str, Any], index1: int) -> str | None: | |
| """Get the item id by index in a user's assigned items.""" | |
| if not user_state_val or not user_state_val.get('user_id'): | |
| return None | |
| user_id = user_state_val['user_id'] | |
| items_list = get_user_items_list(user_id) | |
| if index1 < 1 or index1 > len(items_list): | |
| return None | |
| return items_list[index1 - 1]['item_id'] | |
| def _load_item(user_state_val: dict[str, Any], item_id: str) -> ReturnTuple: | |
| """Load and display an item.""" | |
| if not user_state_val or not user_state_val.get('user_id'): | |
| return _error_return('Please start/continue', keep_current_item=False) | |
| user_id = user_state_val['user_id'] | |
| items_list = get_user_items_list(user_id) | |
| # Find the item | |
| item = next((it for it in items_list if it['item_id'] == item_id), None) | |
| if not item: | |
| return _error_return('Item not found in your assignments', keep_current_item=False) | |
| current_idx = next((i + 1 for i, it in enumerate(items_list) if it['item_id'] == item_id), 1) | |
| return _load_item_helper(user_id, item, items_list, current_idx) | |
| def jump_to( | |
| user_state_val: dict[str, Any], index_number: float | None, current_item_id: str | None | |
| ) -> ReturnTuple: | |
| """Jump to the item by index (only completed items).""" | |
| if not user_state_val or not user_state_val.get('user_id'): | |
| return _error_return('Please start/continue', current_item_id=current_item_id) | |
| if index_number is None: | |
| return _error_return('Please input the index', current_item_id=current_item_id) | |
| target_index = int(index_number) | |
| target_item_id = _find_item_by_index(user_state_val, target_index) | |
| if not target_item_id: | |
| return _error_return('Index out of range', current_item_id=current_item_id) | |
| # Check jump constraints | |
| user_id = user_state_val['user_id'] | |
| items_list = get_user_items_list(user_id) | |
| target_item = next((it for it in items_list if it['item_id'] == target_item_id), None) | |
| if not target_item: | |
| return _error_return('Target item not found', current_item_id=current_item_id) | |
| # Find the first non-completed item index | |
| first_pending_idx = next( | |
| (i for i, it in enumerate(items_list) if it['status'] != 'completed'), len(items_list) | |
| ) | |
| # Allow jump to: completed items OR the first non-completed item (but not beyond) | |
| if target_item['status'] != 'completed' and target_index - 1 > first_pending_idx: | |
| return _error_return( | |
| 'Can only jump to completed items or the first pending item', current_item_id=current_item_id | |
| ) | |
| return _load_item(user_state_val, target_item_id) | |
| def handle_prev(user_state_val: dict[str, Any], current_item_id: str | None) -> ReturnTuple: | |
| """Handle Prev button: navigate to previous item (completed items only).""" | |
| if not user_state_val or not user_state_val.get('user_id'): | |
| return _error_return('Please start/continue', current_item_id=current_item_id) | |
| user_id = user_state_val['user_id'] | |
| items_list = get_user_items_list(user_id) | |
| if not items_list: | |
| return _error_return('No items assigned', keep_current_item=False) | |
| if current_item_id is None: | |
| return _error_return('No current item', current_item_id=current_item_id) | |
| current_idx = next((i for i, it in enumerate(items_list) if it['item_id'] == current_item_id), None) | |
| if current_idx is None: | |
| return _error_return('Current item not found in your assignments', current_item_id=current_item_id) | |
| if current_idx == 0: | |
| return _error_return('Already at the first item', current_item_id=current_item_id) | |
| target_idx = current_idx - 1 | |
| target_item = items_list[target_idx] | |
| # Only allow navigating to completed items | |
| if target_item['status'] != 'completed': | |
| return _error_return( | |
| 'Can only navigate to completed items using Prev', current_item_id=current_item_id | |
| ) | |
| return _load_item(user_state_val, target_item['item_id']) | |
| def handle_next( | |
| user_state_val: dict[str, Any], | |
| current_item_id: str | None, | |
| score_val: str | None, | |
| words_not_present: list[str], | |
| ) -> ReturnTuple: | |
| """Navigate to next item. | |
| Rules: | |
| - If current item is NOT completed, require submission first | |
| - If current item is completed, allow free navigation | |
| """ | |
| if not user_state_val or not user_state_val.get('user_id'): | |
| return _error_return('Please start/continue', current_item_id=current_item_id) | |
| user_id = user_state_val['user_id'] | |
| items_list = get_user_items_list(user_id) | |
| if not items_list: | |
| return _error_return('No items assigned', keep_current_item=False) | |
| if current_item_id is None: | |
| return _error_return('No current item', current_item_id=current_item_id) | |
| current_idx = next((i for i, it in enumerate(items_list) if it['item_id'] == current_item_id), None) | |
| if current_idx is None: | |
| return _error_return('Current item not found in your assignments', current_item_id=current_item_id) | |
| if current_idx >= len(items_list) - 1: | |
| return _error_return('Already at the last item', current_item_id=current_item_id) | |
| current_item = items_list[current_idx] | |
| current_is_completed = current_item['status'] == 'completed' | |
| # If current item is NOT completed, require submission | |
| if not current_is_completed: | |
| return _error_return( | |
| 'Please submit your answer before moving to the next item', current_item_id=current_item_id | |
| ) | |
| # Move to next item | |
| target_idx = current_idx + 1 | |
| target_item = items_list[target_idx] | |
| return _load_item(user_state_val, target_item['item_id']) | |
| start_btn.click( | |
| fn=start_or_resume, | |
| inputs=[user_id_inp, user_state], | |
| outputs=[ | |
| img, | |
| label_md, | |
| words_checkbox, | |
| done_md, | |
| current_idx_md, | |
| progress_md, | |
| user_state, | |
| current_item_state, | |
| score, | |
| ], | |
| queue=True, | |
| ) | |
| submit_btn.click( | |
| fn=submit, | |
| inputs=[user_state, current_item_state, score, words_checkbox], | |
| outputs=[ | |
| img, | |
| label_md, | |
| words_checkbox, | |
| done_md, | |
| current_idx_md, | |
| progress_md, | |
| current_item_state, | |
| score, | |
| ], | |
| queue=True, | |
| ) | |
| jump_btn.click( | |
| fn=jump_to, | |
| inputs=[user_state, jump_idx, current_item_state], | |
| outputs=[ | |
| img, | |
| label_md, | |
| words_checkbox, | |
| done_md, | |
| current_idx_md, | |
| progress_md, | |
| current_item_state, | |
| score, | |
| ], | |
| queue=True, | |
| ) | |
| prev_btn.click( | |
| fn=handle_prev, | |
| inputs=[user_state, current_item_state], | |
| outputs=[ | |
| img, | |
| label_md, | |
| words_checkbox, | |
| done_md, | |
| current_idx_md, | |
| progress_md, | |
| current_item_state, | |
| score, | |
| ], | |
| queue=True, | |
| ) | |
| next_btn.click( | |
| fn=handle_next, | |
| inputs=[user_state, current_item_state, score, words_checkbox], | |
| outputs=[ | |
| img, | |
| label_md, | |
| words_checkbox, | |
| done_md, | |
| current_idx_md, | |
| progress_md, | |
| current_item_state, | |
| score, | |
| ], | |
| queue=True, | |
| ) | |
| # Extract image directory from template to add to allowed_paths | |
| # e.g., "/path/to/img/{label}/{path}" -> "/path/to/img" | |
| image_dir = args.image_template.split('{')[0].rstrip('/') | |
| if image_dir: | |
| allowed_paths = [image_dir] | |
| else: | |
| allowed_paths = None | |
| print(f'Allowed paths: {allowed_paths}') | |
| demo.queue(max_size=256).launch( | |
| ssr_mode=False, | |
| allowed_paths=allowed_paths, | |
| ) | |