Spaces:
Sleeping
Sleeping
| """Task allocation engine for dynamic workload management.""" | |
| import hashlib | |
| import json | |
| from datetime import datetime, timezone | |
| from typing import Any | |
| from .database import DB | |
| class AllocationValidator: | |
| """Input validator for allocation operations.""" | |
| def parse_amount_spec(spec: str) -> dict[str, Any]: | |
| """Parse amount specification. | |
| Examples: | |
| 'count:100' -> {'type': 'count', 'value': 100} | |
| 'ratio:0.5' -> {'type': 'ratio', 'value': 0.5} | |
| 'equal' -> {'type': 'equal'} | |
| 'ratio:A:0.3,B:0.7' -> {'type': 'ratio_per_user', 'users': {'A': 0.3, 'B': 0.7}} | |
| 'all' -> {'type': 'all'} | |
| """ | |
| if spec == 'equal': | |
| return {'type': 'equal'} | |
| if spec == 'all': | |
| return {'type': 'all'} | |
| if spec.startswith('count:'): | |
| count = int(spec[6:]) | |
| if count <= 0: | |
| raise ValueError(f'Count must be positive, got: {count}') | |
| return {'type': 'count', 'value': count} | |
| if spec.startswith('ratio:'): | |
| rest = spec[6:] | |
| # Check if per-user ratio | |
| if ':' in rest: | |
| # Format: ratio:A:0.3,B:0.3,C:0.4 | |
| user_ratios = {} | |
| for part in rest.split(','): | |
| user, ratio_str = part.split(':') | |
| ratio = float(ratio_str) | |
| if ratio < 0 or ratio > 1: | |
| raise ValueError(f'Ratio must be in [0, 1], got: {ratio} for user {user}') | |
| user_ratios[user.strip()] = ratio | |
| total = sum(user_ratios.values()) | |
| if abs(total - 1.0) > 0.001: | |
| raise ValueError(f'User ratios must sum to 1.0, got: {total}') | |
| return {'type': 'ratio_per_user', 'users': user_ratios} | |
| else: | |
| # Single ratio | |
| ratio = float(rest) | |
| if ratio <= 0 or ratio > 1: | |
| raise ValueError(f'Ratio must be in (0, 1], got: {ratio}') | |
| return {'type': 'ratio', 'value': ratio} | |
| raise ValueError(f'Invalid amount spec: {spec}') | |
| def parse_target(target: str) -> dict[str, Any]: | |
| """Parse target specification. | |
| Examples: | |
| 'user:A' -> {'type': 'single', 'user': 'A'} | |
| 'users:A,B,C' -> {'type': 'multiple', 'users': ['A', 'B', 'C']} | |
| """ | |
| if target.startswith('user:'): | |
| user = target[5:].strip() | |
| if not user: | |
| raise ValueError('User ID cannot be empty') | |
| return {'type': 'single', 'user': user} | |
| if target.startswith('users:'): | |
| users = [u.strip() for u in target[6:].split(',')] | |
| users = list(dict.fromkeys(users)) # Remove duplicates, preserve order | |
| if not users: | |
| raise ValueError('User list cannot be empty') | |
| if len(users) != len(target[6:].split(',')): | |
| print('Warning: Duplicate users removed from target') | |
| return {'type': 'multiple', 'users': users} | |
| raise ValueError(f'Invalid target: {target}') | |
| def validate_no_self_overlap(from_users: list[str], to_users: list[str]) -> list[str]: | |
| """Remove self-overlapping users and return filtered list.""" | |
| overlap = set(from_users) & set(to_users) | |
| if overlap: | |
| print(f'Warning: Removing self-overlapping users: {overlap}') | |
| return [u for u in to_users if u not in overlap] | |
| return to_users | |
| def check_users_exist(db: DB, round_id: str, users: list[str]) -> None: | |
| """Check if users exist in this round (warn for new users).""" | |
| with db._connect() as conn: | |
| # Check if assignments table exists | |
| cur = conn.execute("SELECT to_regclass('public.assignments')") | |
| if cur.fetchone()[0] is None: | |
| return # New schema not initialized, skip check | |
| for user in users: | |
| cur = conn.execute( | |
| 'SELECT COUNT(*) FROM assignments WHERE round_id = %s AND user_id = %s', (round_id, user) | |
| ) | |
| count = cur.fetchone()[0] | |
| if count == 0: | |
| print(f"Info: User '{user}' is new to this round") | |
| class AllocationEngine: | |
| """Task allocation engine for managing workload distribution. | |
| Methods: | |
| - allocate(from_source, to_target, amount_spec, redundancy, force, dry_run, reason) | |
| - release(from_user, amount_spec, dry_run, reason) | |
| - rebalance(users, mode_spec, dry_run, reason) | |
| """ | |
| def __init__( | |
| self, | |
| db: DB, | |
| round_id: str, | |
| operator: str = 'admin', | |
| ): | |
| """Initialize allocation engine. | |
| Args: | |
| db: Database instance | |
| round_id: Round ID | |
| operator: Operator name for audit logging | |
| """ | |
| self.db = db | |
| self.round_id = round_id | |
| self.operator = operator | |
| def allocate( | |
| self, | |
| from_source: str, | |
| to_target: str, | |
| amount_spec: str, | |
| redundancy: int, | |
| force: bool = False, | |
| dry_run: bool = False, | |
| reason: str = '', | |
| ) -> dict[str, Any]: | |
| """Core allocation function. | |
| Redundancy design: | |
| - Each task needs N different annotators (redundancy parameter) | |
| - Each user can only annotate a task ONCE (enforced by PRIMARY KEY) | |
| - If a user already has a task, it cannot be assigned to them again | |
| - This may reduce the actual number of tasks allocated if users already | |
| have some of the source tasks | |
| Args: | |
| from_source: Source of tasks ('unassigned', 'user:A') | |
| to_target: Target users ('user:A', 'users:A,B,C') | |
| amount_spec: Amount specification ('count:100', 'ratio:0.5', 'equal') | |
| redundancy: Number of different people who should label each task | |
| force: Allow transferring in_progress tasks (default: False) | |
| dry_run: Preview only, don't execute (default: False) | |
| reason: Reason for this allocation (for audit) | |
| Returns: | |
| { | |
| 'success': bool, | |
| 'affected_users': list[str], | |
| 'affected_count': int, | |
| 'distribution': dict[str, int], # user -> count | |
| 'item_ids_sample': list[str], | |
| 'error': str # only if success=False | |
| } | |
| """ | |
| # 1. Validate inputs | |
| validator = AllocationValidator() | |
| amount_parsed = validator.parse_amount_spec(amount_spec) | |
| target_parsed = validator.parse_target(to_target) | |
| # 2. Get target users | |
| target_users = self._get_target_users(target_parsed) | |
| if not target_users: | |
| return {'success': False, 'error': 'No target users specified', 'affected_count': 0} | |
| validator.check_users_exist(self.db, self.round_id, target_users) | |
| # 3. Get available source tasks for EACH user (filtered by redundancy at SQL level) | |
| user_available_tasks = self._get_available_source_items(from_source, target_users, redundancy, force) | |
| # Check if any user has available tasks | |
| total_available = sum(len(tasks) for tasks in user_available_tasks.values()) | |
| if total_available == 0: | |
| return { | |
| 'success': False, | |
| 'error': f'No available tasks from source {from_source} for target users ' | |
| '(tasks may already be assigned or at redundancy limit)', | |
| 'affected_count': 0, | |
| } | |
| # 4. Calculate distribution considering each user's available tasks | |
| distribution = self._calculate_distribution_per_user(user_available_tasks, amount_parsed) | |
| # 6. Execute or preview | |
| if dry_run: | |
| result = self._preview_allocation(distribution) | |
| else: | |
| result = self._execute_allocation(from_source, to_target, amount_spec, distribution, force, reason) | |
| return result | |
| def release(self, from_user: str, amount_spec: str, dry_run: bool = False, reason: str = '') -> dict[str, Any]: | |
| """Release tasks from a user back to unassigned pool. | |
| Args: | |
| from_user: User ID to release tasks from | |
| amount_spec: Amount to release ('count:50', 'ratio:0.3', 'all') | |
| dry_run: Preview only | |
| reason: Reason for release | |
| Returns: | |
| Result dictionary with success status | |
| """ | |
| validator = AllocationValidator() | |
| amount_parsed = validator.parse_amount_spec(amount_spec) | |
| # Get user's pending tasks | |
| with self.db._connect() as conn: | |
| cur = conn.execute( | |
| """ | |
| SELECT item_id | |
| FROM assignments | |
| WHERE round_id = %s AND user_id = %s AND status = 'pending' | |
| ORDER BY assigned_at | |
| """, | |
| (self.round_id, from_user), | |
| ) | |
| pending_items = [row[0] for row in cur.fetchall()] | |
| if not pending_items: | |
| return {'success': False, 'error': 'No pending tasks to release'} | |
| # Calculate how many to release | |
| if amount_parsed['type'] == 'count': | |
| to_release = pending_items[: min(amount_parsed['value'], len(pending_items))] | |
| elif amount_parsed['type'] == 'ratio': | |
| count = int(len(pending_items) * amount_parsed['value']) | |
| to_release = pending_items[:count] | |
| elif amount_parsed['type'] == 'all': | |
| to_release = pending_items | |
| else: | |
| return {'success': False, 'error': f'Invalid amount_spec for release: {amount_spec}'} | |
| # Preview release | |
| if dry_run: | |
| return { | |
| 'success': True, | |
| 'dry_run': True, | |
| 'affected_users': [from_user], | |
| 'affected_count': len(to_release), | |
| 'item_ids_sample': to_release[:10], | |
| } | |
| # Execute release | |
| now = datetime.now(timezone.utc).isoformat() | |
| with self.db._connect() as conn, self.db._lock: | |
| placeholders = ','.join(['%s'] * len(to_release)) | |
| conn.execute( | |
| f""" | |
| DELETE FROM assignments | |
| WHERE round_id = %s AND user_id = %s AND item_id IN ({placeholders}) | |
| """, | |
| (self.round_id, from_user, *to_release), | |
| ) | |
| # Record log | |
| conn.execute( | |
| """ | |
| INSERT INTO allocation_history | |
| (round_id, operation, operator, from_source, to_target, amount_spec, | |
| affected_users, affected_count, item_ids_sample, item_ids_hash, | |
| dry_run, force, reason, created_at) | |
| VALUES (%s, 'release', %s, %s, 'pool', %s, %s, %s, %s, %s, 0, 0, %s, %s) | |
| """, | |
| ( | |
| self.round_id, | |
| self.operator, | |
| f'user:{from_user}', | |
| amount_spec, | |
| json.dumps([from_user]), | |
| len(to_release), | |
| json.dumps(to_release[:10]), | |
| self._compute_item_hash(to_release), | |
| reason, | |
| now, | |
| ), | |
| ) | |
| return { | |
| 'success': True, | |
| 'affected_users': [from_user], | |
| 'affected_count': len(to_release), | |
| 'item_ids_sample': to_release[:10], | |
| } | |
| def rebalance(self, users: list[str], mode_spec: str, dry_run: bool = False, reason: str = '') -> dict[str, Any]: | |
| """Rebalance pending tasks among selected users. | |
| Args: | |
| users: List of user IDs to rebalance among | |
| mode_spec: Rebalance mode ('equal', 'ratio:A:0.3,B:0.7') | |
| dry_run: Preview only | |
| reason: Reason for rebalance | |
| Returns: | |
| Result dictionary with success status | |
| """ | |
| # 1. Collect all unique pending tasks from these users (avoid duplicates for redundancy > 1) | |
| with self.db._connect() as conn: | |
| placeholders_users = ','.join(['%s'] * len(users)) | |
| cur = conn.execute( | |
| f""" | |
| SELECT DISTINCT item_id | |
| FROM assignments | |
| WHERE round_id = %s AND user_id IN ({placeholders_users}) AND status = 'pending' | |
| """, | |
| (self.round_id, *users), | |
| ) | |
| all_pending = [row[0] for row in cur.fetchall()] | |
| if not all_pending: | |
| return {'success': False, 'error': 'No pending tasks to rebalance'} | |
| # 2. Check for redundancy > 1 tasks and collect their current assignment counts | |
| task_redundancy_info = {} | |
| with self.db._connect() as conn: | |
| for item_id in all_pending: | |
| # Get redundancy config | |
| cur = conn.execute( | |
| """ | |
| SELECT redundancy_required | |
| FROM task_config | |
| WHERE round_id = %s AND item_id = %s | |
| """, | |
| (self.round_id, item_id), | |
| ) | |
| row = cur.fetchone() | |
| redundancy_required = row[0] if row else 1 | |
| # Count current assignments among rebalance users | |
| placeholders = ','.join(['%s'] * len(users)) | |
| cur = conn.execute( | |
| f""" | |
| SELECT COUNT(*) | |
| FROM assignments | |
| WHERE round_id = %s AND item_id = %s AND user_id IN ({placeholders}) AND status = 'pending' | |
| """, | |
| (self.round_id, item_id, *users), | |
| ) | |
| current_count = cur.fetchone()[0] | |
| task_redundancy_info[item_id] = { | |
| 'redundancy_required': redundancy_required, | |
| 'current_count': current_count, | |
| } | |
| # 3. Parse distribution mode | |
| validator = AllocationValidator() | |
| amount_parsed = validator.parse_amount_spec(mode_spec) | |
| # 4. Warn if there are redundancy > 1 tasks | |
| redundancy_gt1_tasks = [ | |
| item_id for item_id, info in task_redundancy_info.items() if info['redundancy_required'] > 1 | |
| ] | |
| if redundancy_gt1_tasks: | |
| print( | |
| f'Warning: {len(redundancy_gt1_tasks)} tasks have redundancy > 1. ' | |
| 'Rebalance will redistribute unique tasks (may change redundancy counts). ' | |
| f'Consider using allocate/release for fine control. Sample: {redundancy_gt1_tasks[:3]}' | |
| ) | |
| # 5. Build per-user available pools from all_pending (exclude items the user already has) | |
| # We do NOT filter by redundancy limit here because we are redistributing | |
| # the same set of tasks within the same user group; distinct-user counts | |
| # will remain within limits. We only prevent giving a user a task they | |
| # already have (any status). | |
| user_available_tasks: dict[str, list[str]] = {} | |
| with self.db._connect() as conn: | |
| for user in users: | |
| if not all_pending: | |
| user_available_tasks[user] = [] | |
| continue | |
| # exclude items that the user already has | |
| placeholders = ','.join(['%s'] * len(all_pending)) | |
| cur = conn.execute( | |
| f""" | |
| SELECT x.item_id | |
| FROM ( | |
| SELECT %s AS round_id, %s AS user_id | |
| ) p | |
| JOIN ( | |
| SELECT item_id FROM ( | |
| VALUES {','.join(['(%s)'] * len(all_pending))} | |
| ) AS v(item_id) | |
| ) AS x ON 1=1 | |
| WHERE NOT EXISTS ( | |
| SELECT 1 | |
| FROM assignments a | |
| WHERE a.round_id = p.round_id | |
| AND a.item_id = x.item_id | |
| AND a.user_id = p.user_id | |
| ) | |
| """, | |
| (self.round_id, user, *all_pending), | |
| ) | |
| user_available_tasks[user] = [row[0] for row in cur.fetchall()] | |
| # 6. Calculate new distribution using per-user pools | |
| distribution = self._calculate_distribution_per_user(user_available_tasks, amount_parsed) | |
| # Preview rebalance | |
| if dry_run: | |
| return { | |
| 'success': True, | |
| 'dry_run': True, | |
| 'affected_users': users, | |
| 'affected_count': len(all_pending), | |
| 'distribution': {u: len(items) for u, items in distribution.items()}, | |
| 'item_ids_sample': all_pending[:10], | |
| } | |
| # 7. Execute rebalance (delete old assignments, insert new ones) | |
| now = datetime.now(timezone.utc).isoformat() | |
| with self.db._connect() as conn, self.db._lock: | |
| conn.execute('BEGIN') | |
| try: | |
| # Delete all pending tasks from these users | |
| placeholders_users = ','.join(['%s'] * len(users)) | |
| conn.execute( | |
| f""" | |
| DELETE FROM assignments | |
| WHERE round_id = %s AND user_id IN ({placeholders_users}) AND status = 'pending' | |
| """, | |
| (self.round_id, *users), | |
| ) | |
| # Insert new assignments | |
| for user, item_ids in distribution.items(): | |
| for item_id in item_ids: | |
| # Check if this user already has this task (avoid PRIMARY KEY violation) | |
| # Each user can only have ONE assignment per task | |
| cur = conn.execute( | |
| """ | |
| SELECT 1 | |
| FROM assignments | |
| WHERE round_id = %s AND item_id = %s AND user_id = %s | |
| """, | |
| (self.round_id, item_id, user), | |
| ) | |
| if cur.fetchone(): | |
| # User already has this task, skip | |
| continue | |
| # Compute next slot based on how many DISTINCT users already have this task | |
| cur = conn.execute( | |
| """ | |
| SELECT COUNT(DISTINCT user_id) | |
| FROM assignments | |
| WHERE round_id = %s AND item_id = %s | |
| """, | |
| (self.round_id, item_id), | |
| ) | |
| next_slot = cur.fetchone()[0] | |
| # Insert new assignment | |
| conn.execute( | |
| """ | |
| INSERT INTO assignments | |
| (round_id, item_id, user_id, status, assigned_at, redundancy_slot) | |
| VALUES (%s, %s, %s, 'pending', %s, %s) | |
| """, | |
| (self.round_id, item_id, user, now, next_slot), | |
| ) | |
| # Record log | |
| conn.execute( | |
| """ | |
| INSERT INTO allocation_history | |
| (round_id, operation, operator, from_source, to_target, amount_spec, | |
| affected_users, affected_count, item_ids_sample, item_ids_hash, | |
| dry_run, force, reason, created_at) | |
| VALUES (%s, 'rebalance', %s, %s, %s, %s, %s, %s, %s, %s, 0, 0, %s, %s) | |
| """, | |
| ( | |
| self.round_id, | |
| self.operator, | |
| f'users:{",".join(users)}', | |
| f'users:{",".join(users)}', | |
| mode_spec, | |
| json.dumps(users), | |
| len(all_pending), | |
| json.dumps(all_pending[:10]), | |
| self._compute_item_hash(all_pending), | |
| reason, | |
| now, | |
| ), | |
| ) | |
| conn.execute('COMMIT') | |
| return { | |
| 'success': True, | |
| 'affected_users': users, | |
| 'affected_count': len(all_pending), | |
| 'distribution': {u: len(items) for u, items in distribution.items()}, | |
| } | |
| except Exception as e: | |
| conn.execute('ROLLBACK') | |
| raise e | |
| # ========== Internal Methods ========== | |
| def _get_available_source_items( | |
| self, from_source: str, target_users: list[str], redundancy: int, force: bool | |
| ) -> dict[str, list[str]]: | |
| """Get available source tasks for EACH target user separately. | |
| This method returns a dictionary mapping each user to their available tasks. | |
| Different users have different available tasks because they | |
| already have different existing assignments. | |
| For each user, filters at SQL level to return tasks that: | |
| 1. Come from the specified source (unassigned pool or another user) | |
| 2. Are not already assigned to THIS SPECIFIC USER | |
| 3. Have not reached their redundancy limit | |
| Args: | |
| from_source: Source specification ('unassigned' or 'user:UserID') | |
| target_users: List of users who will receive these tasks | |
| redundancy: Number of different annotators required per task | |
| force: If True, allow transferring in_progress tasks (only for user source) | |
| Returns: | |
| Dictionary mapping user_id to list of available item_ids | |
| Example: {'Alice': ['task1', 'task2'], 'Bob': ['task1', 'task3'], ...} | |
| """ | |
| user_available_tasks = {} | |
| with self.db._connect() as conn: | |
| # Query available tasks for EACH user separately | |
| for user in target_users: | |
| if from_source == 'unassigned': | |
| # Get tasks from unassigned pool that this user doesn't have | |
| cur = conn.execute( | |
| """ | |
| SELECT t.item_id | |
| FROM tasks t | |
| LEFT JOIN task_config tc | |
| ON t.round_id = tc.round_id AND t.item_id = tc.item_id | |
| WHERE t.round_id = %s | |
| -- This specific user doesn't have this task | |
| AND NOT EXISTS ( | |
| SELECT 1 | |
| FROM assignments a | |
| WHERE a.round_id = t.round_id | |
| AND a.item_id = t.item_id | |
| AND a.user_id = %s | |
| ) | |
| -- Check redundancy limit (current assignments < required) | |
| AND ( | |
| SELECT COUNT(DISTINCT a2.user_id) | |
| FROM assignments a2 | |
| WHERE a2.round_id = t.round_id AND a2.item_id = t.item_id | |
| ) < COALESCE(tc.redundancy_required, %s) | |
| -- Check redundancy completion | |
| AND COALESCE(tc.redundancy_completed, 0) < COALESCE(tc.redundancy_required, %s) | |
| ORDER BY t.order_key | |
| """, | |
| (self.round_id, user, redundancy, redundancy), | |
| ) | |
| elif from_source.startswith('user:'): | |
| source_user_id = from_source[5:] | |
| if force: | |
| status_filter = "('pending', 'in_progress')" | |
| else: | |
| status_filter = "('pending')" | |
| cur = conn.execute( | |
| f""" | |
| SELECT DISTINCT a1.item_id | |
| FROM assignments a1 | |
| LEFT JOIN task_config tc | |
| ON a1.round_id = tc.round_id AND tc.item_id = a1.item_id | |
| WHERE a1.round_id = %s | |
| AND a1.user_id = %s | |
| AND a1.status IN {status_filter} | |
| -- This specific target user doesn't have this task | |
| AND NOT EXISTS ( | |
| SELECT 1 | |
| FROM assignments a2 | |
| WHERE a2.round_id = a1.round_id | |
| AND a2.item_id = a1.item_id | |
| AND a2.user_id = %s | |
| ) | |
| -- Check redundancy limit | |
| AND ( | |
| SELECT COUNT(DISTINCT a3.user_id) | |
| FROM assignments a3 | |
| WHERE a3.round_id = a1.round_id AND a3.item_id = a1.item_id | |
| ) < COALESCE(tc.redundancy_required, %s) | |
| -- Check redundancy completion | |
| AND COALESCE(tc.redundancy_completed, 0) < COALESCE(tc.redundancy_required, %s) | |
| ORDER BY a1.assigned_at | |
| """, | |
| (self.round_id, source_user_id, user, redundancy, redundancy), | |
| ) | |
| else: | |
| raise ValueError(f'Invalid from_source: {from_source}') | |
| user_available_tasks[user] = [row[0] for row in cur.fetchall()] | |
| return user_available_tasks | |
| def _get_target_users(self, target_parsed: dict) -> list[str]: | |
| """Get target user list.""" | |
| if target_parsed['type'] == 'single': | |
| return [target_parsed['user']] | |
| else: | |
| return target_parsed['users'] | |
| def _calculate_distribution_per_user( | |
| self, user_available_tasks: dict[str, list[str]], amount_parsed: dict | |
| ) -> dict[str, list[str]]: | |
| """Calculate task distribution for each user based on their available tasks. | |
| Key insight: Each user has a different pool of available tasks because they | |
| have different existing assignments. This method respects that and assigns | |
| tasks from each user's individual available pool. | |
| Args: | |
| user_available_tasks: Dict mapping user_id to list of available task IDs | |
| Example: {'Alice': ['task1', 'task2', 'task3'], | |
| 'Bob': ['task1', 'task4']} | |
| amount_parsed: Parsed amount specification | |
| Returns: | |
| Dict mapping user_id to list of assigned task IDs | |
| Example: {'Alice': ['task1', 'task2'], 'Bob': ['task1']} | |
| """ | |
| target_users = list(user_available_tasks.keys()) | |
| distribution = {} | |
| if amount_parsed['type'] == 'count': | |
| # Each user gets up to N tasks from their available pool | |
| count = amount_parsed['value'] | |
| for user in target_users: | |
| available = user_available_tasks[user] | |
| distribution[user] = available[: min(count, len(available))] | |
| elif amount_parsed['type'] == 'ratio': | |
| # Each user gets X% of their available tasks | |
| ratio = amount_parsed['value'] | |
| for user in target_users: | |
| available = user_available_tasks[user] | |
| count = int(len(available) * ratio) | |
| distribution[user] = available[:count] | |
| elif amount_parsed['type'] == 'equal': | |
| # Try to give each user roughly equal number of tasks | |
| # Strategy: Calculate target count as average, but respect each user's limit | |
| total_available = sum(len(tasks) for tasks in user_available_tasks.values()) | |
| target_per_user = total_available // len(target_users) | |
| for user in target_users: | |
| available = user_available_tasks[user] | |
| distribution[user] = available[: min(target_per_user, len(available))] | |
| elif amount_parsed['type'] == 'ratio_per_user': | |
| # User-specific ratios: calculate from total available pool | |
| user_ratios = amount_parsed['users'] | |
| total_available = sum(len(tasks) for tasks in user_available_tasks.values()) | |
| # Sort by user_id for deterministic behavior | |
| sorted_users = sorted(user_ratios.items(), key=lambda x: x[0]) | |
| for user, ratio in sorted_users: | |
| if user not in user_available_tasks: | |
| distribution[user] = [] | |
| continue | |
| available = user_available_tasks[user] | |
| target_count = int(total_available * ratio) | |
| # Take up to target_count, but limited by what's available for this user | |
| distribution[user] = available[: min(target_count, len(available))] | |
| elif amount_parsed['type'] == 'all': | |
| # Give each user all their available tasks | |
| for user in target_users: | |
| distribution[user] = user_available_tasks[user] | |
| else: | |
| raise ValueError(f'Unknown amount type: {amount_parsed["type"]}') | |
| return distribution | |
| def _calculate_distribution( | |
| self, item_ids: list[str], target_users: list[str], amount_parsed: dict | |
| ) -> dict[str, list[str]]: | |
| """Calculate task distribution for each user (old method, kept for compatibility). | |
| Returns: | |
| {'user_A': ['item_1', 'item_2'], 'user_B': ['item_3'], ...} | |
| """ | |
| n_items = len(item_ids) | |
| if amount_parsed['type'] == 'count': | |
| count = min(amount_parsed['value'], n_items) | |
| selected_items = item_ids[:count] | |
| return self._distribute_equal(selected_items, target_users) | |
| elif amount_parsed['type'] == 'ratio': | |
| ratio = amount_parsed['value'] | |
| count = int(n_items * ratio) | |
| selected_items = item_ids[:count] | |
| return self._distribute_equal(selected_items, target_users) | |
| elif amount_parsed['type'] in ('equal', 'all'): | |
| return self._distribute_equal(item_ids, target_users) | |
| elif amount_parsed['type'] == 'ratio_per_user': | |
| user_ratios = amount_parsed['users'] | |
| distribution = {} | |
| start_idx = 0 | |
| # Sort by user_id for deterministic behavior | |
| sorted_users = sorted(user_ratios.items(), key=lambda x: x[0]) | |
| for i, (user, ratio) in enumerate(sorted_users): | |
| if i == len(sorted_users) - 1: | |
| # Last user gets all remaining (avoid floating point errors) | |
| distribution[user] = item_ids[start_idx:] | |
| else: | |
| count = int(n_items * ratio) | |
| distribution[user] = item_ids[start_idx : start_idx + count] | |
| start_idx += count | |
| return distribution | |
| raise ValueError(f'Unknown amount type: {amount_parsed["type"]}') | |
| def _distribute_equal(self, item_ids: list[str], users: list[str]) -> dict[str, list[str]]: | |
| """Distribute tasks equally among users (deterministic remainder allocation).""" | |
| n_items = len(item_ids) | |
| n_users = len(users) | |
| base_count = n_items // n_users | |
| remainder = n_items % n_users | |
| # Sort by user_id for deterministic behavior | |
| sorted_users = sorted(users) | |
| distribution = {} | |
| start_idx = 0 | |
| for i, user in enumerate(sorted_users): | |
| # First 'remainder' users get one extra item | |
| count = base_count + (1 if i < remainder else 0) | |
| distribution[user] = item_ids[start_idx : start_idx + count] | |
| start_idx += count | |
| return distribution | |
| def _compute_item_hash(self, item_ids: list[str]) -> str: | |
| """Compute hash of item_id list for audit.""" | |
| content = ','.join(sorted(item_ids)) | |
| return hashlib.sha256(content.encode()).hexdigest()[:16] | |
| def _preview_allocation(self, distribution: dict[str, list[str]]) -> dict[str, Any]: | |
| """Dry-run preview.""" | |
| total_allocated = sum(len(items) for items in distribution.values()) | |
| # Sample: first 2 items from each user | |
| sample_ids = [] | |
| for items in distribution.values(): | |
| sample_ids.extend(items[:2]) | |
| sample_ids = sample_ids[:10] | |
| return { | |
| 'success': True, | |
| 'dry_run': True, | |
| 'affected_users': list(distribution.keys()), | |
| 'affected_count': total_allocated, | |
| 'distribution': {u: len(items) for u, items in distribution.items()}, | |
| 'item_ids_sample': sample_ids, | |
| 'item_ids_hash': self._compute_item_hash([iid for items in distribution.values() for iid in items]), | |
| } | |
| def _execute_allocation( | |
| self, | |
| from_source: str, | |
| to_target: str, | |
| amount_spec: str, | |
| distribution: dict[str, list[str]], | |
| force: bool, | |
| reason: str, | |
| ) -> dict[str, Any]: | |
| """Execute actual allocation.""" | |
| now = datetime.now(timezone.utc).isoformat() | |
| all_assigned_items = [iid for items in distribution.values() for iid in items] | |
| with self.db._connect() as conn, self.db._lock: | |
| conn.execute('BEGIN') | |
| try: | |
| # 1. If transferring from a user, delete their assignments (no skipped state) | |
| if from_source.startswith('user:'): | |
| source_user = from_source[5:] | |
| placeholders = ','.join(['%s'] * len(all_assigned_items)) | |
| conn.execute( | |
| f""" | |
| DELETE FROM assignments | |
| WHERE round_id = %s AND user_id = %s AND item_id IN ({placeholders}) | |
| """, | |
| (self.round_id, source_user, *all_assigned_items), | |
| ) | |
| # 2. Create assignments for target users | |
| for user, item_ids in distribution.items(): | |
| for item_id in item_ids: | |
| # Check if this user already has this task (avoid PRIMARY KEY violation) | |
| # Each user can only have ONE assignment per task | |
| cur = conn.execute( | |
| """ | |
| SELECT 1 | |
| FROM assignments | |
| WHERE round_id = %s AND item_id = %s AND user_id = %s | |
| """, | |
| (self.round_id, item_id, user), | |
| ) | |
| if cur.fetchone(): | |
| # User already has this task, skip | |
| continue | |
| # Compute next slot based on how many DISTINCT users already have this task | |
| cur = conn.execute( | |
| """ | |
| SELECT COUNT(DISTINCT user_id) | |
| FROM assignments | |
| WHERE round_id = %s AND item_id = %s | |
| """, | |
| (self.round_id, item_id), | |
| ) | |
| next_slot = cur.fetchone()[0] | |
| conn.execute( | |
| """ | |
| INSERT INTO assignments | |
| (round_id, item_id, user_id, status, assigned_at, redundancy_slot) | |
| VALUES (%s, %s, %s, 'pending', %s, %s) | |
| """, | |
| (self.round_id, item_id, user, now, next_slot), | |
| ) | |
| # 3. Record log | |
| conn.execute( | |
| """ | |
| INSERT INTO allocation_history | |
| (round_id, operation, operator, from_source, to_target, amount_spec, | |
| affected_users, affected_count, item_ids_sample, item_ids_hash, | |
| dry_run, force, reason, created_at) | |
| VALUES (%s, 'allocate', %s, %s, %s, %s, %s, %s, %s, %s, 0, %s, %s, %s) | |
| """, | |
| ( | |
| self.round_id, | |
| self.operator, | |
| from_source, | |
| to_target, | |
| amount_spec, | |
| json.dumps(list(distribution.keys())), | |
| len(all_assigned_items), | |
| json.dumps(all_assigned_items[:10]), | |
| self._compute_item_hash(all_assigned_items), | |
| 1 if force else 0, | |
| reason, | |
| now, | |
| ), | |
| ) | |
| conn.execute('COMMIT') | |
| return { | |
| 'success': True, | |
| 'affected_users': list(distribution.keys()), | |
| 'affected_count': len(all_assigned_items), | |
| 'distribution': {u: len(items) for u, items in distribution.items()}, | |
| 'item_ids_sample': all_assigned_items[:10], | |
| } | |
| except Exception as e: | |
| conn.execute('ROLLBACK') | |
| raise e | |