hailey2024 commited on
Commit
7737fba
·
1 Parent(s): 9888ca2

use postgres

Browse files
Files changed (3) hide show
  1. src/allocation.py +39 -39
  2. src/database.py +156 -27
  3. src/eval_server.py +20 -20
src/allocation.py CHANGED
@@ -102,13 +102,13 @@ class AllocationValidator:
102
  """Check if users exist in this round (warn for new users)."""
103
  with db._connect() as conn:
104
  # Check if assignments table exists
105
- cur = conn.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='assignments'")
106
- if not cur.fetchone():
107
  return # New schema not initialized, skip check
108
 
109
  for user in users:
110
  cur = conn.execute(
111
- 'SELECT COUNT(*) FROM assignments WHERE round_id = ? AND user_id = ?', (round_id, user)
112
  )
113
  count = cur.fetchone()[0]
114
  if count == 0:
@@ -240,7 +240,7 @@ class AllocationEngine:
240
  """
241
  SELECT item_id
242
  FROM assignments
243
- WHERE round_id = ? AND user_id = ? AND status = 'pending'
244
  ORDER BY assigned_at
245
  """,
246
  (self.round_id, from_user),
@@ -274,11 +274,11 @@ class AllocationEngine:
274
  # Execute release
275
  now = datetime.now(timezone.utc).isoformat()
276
  with self.db._connect() as conn, self.db._lock:
277
- placeholders = ','.join('?' * len(to_release))
278
  conn.execute(
279
  f"""
280
  DELETE FROM assignments
281
- WHERE round_id = ? AND user_id = ? AND item_id IN ({placeholders})
282
  """,
283
  (self.round_id, from_user, *to_release),
284
  )
@@ -290,7 +290,7 @@ class AllocationEngine:
290
  (round_id, operation, operator, from_source, to_target, amount_spec,
291
  affected_users, affected_count, item_ids_sample, item_ids_hash,
292
  dry_run, force, reason, created_at)
293
- VALUES (?, 'release', ?, ?, 'pool', ?, ?, ?, ?, ?, 0, 0, ?, ?)
294
  """,
295
  (
296
  self.round_id,
@@ -328,12 +328,12 @@ class AllocationEngine:
328
  """
329
  # 1. Collect all unique pending tasks from these users (avoid duplicates for redundancy > 1)
330
  with self.db._connect() as conn:
331
- placeholders_users = ','.join('?' * len(users))
332
  cur = conn.execute(
333
  f"""
334
  SELECT DISTINCT item_id
335
  FROM assignments
336
- WHERE round_id = ? AND user_id IN ({placeholders_users}) AND status = 'pending'
337
  """,
338
  (self.round_id, *users),
339
  )
@@ -351,7 +351,7 @@ class AllocationEngine:
351
  """
352
  SELECT redundancy_required
353
  FROM task_config
354
- WHERE round_id = ? AND item_id = ?
355
  """,
356
  (self.round_id, item_id),
357
  )
@@ -359,12 +359,12 @@ class AllocationEngine:
359
  redundancy_required = row[0] if row else 1
360
 
361
  # Count current assignments among rebalance users
362
- placeholders = ','.join('?' * len(users))
363
  cur = conn.execute(
364
  f"""
365
  SELECT COUNT(*)
366
  FROM assignments
367
- WHERE round_id = ? AND item_id = ? AND user_id IN ({placeholders}) AND status = 'pending'
368
  """,
369
  (self.round_id, item_id, *users),
370
  )
@@ -402,16 +402,16 @@ class AllocationEngine:
402
  user_available_tasks[user] = []
403
  continue
404
  # exclude items that the user already has
405
- placeholders = ','.join('?' * len(all_pending))
406
  cur = conn.execute(
407
  f"""
408
  SELECT x.item_id
409
  FROM (
410
- SELECT ? AS round_id, ? AS user_id
411
  ) p
412
  JOIN (
413
  SELECT item_id FROM (
414
- VALUES {','.join(['(?)'] * len(all_pending))}
415
  ) AS v(item_id)
416
  ) AS x ON 1=1
417
  WHERE NOT EXISTS (
@@ -443,15 +443,15 @@ class AllocationEngine:
443
  # 7. Execute rebalance (delete old assignments, insert new ones)
444
  now = datetime.now(timezone.utc).isoformat()
445
  with self.db._connect() as conn, self.db._lock:
446
- conn.execute('BEGIN IMMEDIATE')
447
 
448
  try:
449
  # Delete all pending tasks from these users
450
- placeholders_users = ','.join('?' * len(users))
451
  conn.execute(
452
  f"""
453
  DELETE FROM assignments
454
- WHERE round_id = ? AND user_id IN ({placeholders_users}) AND status = 'pending'
455
  """,
456
  (self.round_id, *users),
457
  )
@@ -465,7 +465,7 @@ class AllocationEngine:
465
  """
466
  SELECT 1
467
  FROM assignments
468
- WHERE round_id = ? AND item_id = ? AND user_id = ?
469
  """,
470
  (self.round_id, item_id, user),
471
  )
@@ -478,7 +478,7 @@ class AllocationEngine:
478
  """
479
  SELECT COUNT(DISTINCT user_id)
480
  FROM assignments
481
- WHERE round_id = ? AND item_id = ?
482
  """,
483
  (self.round_id, item_id),
484
  )
@@ -489,7 +489,7 @@ class AllocationEngine:
489
  """
490
  INSERT INTO assignments
491
  (round_id, item_id, user_id, status, assigned_at, redundancy_slot)
492
- VALUES (?, ?, ?, 'pending', ?, ?)
493
  """,
494
  (self.round_id, item_id, user, now, next_slot),
495
  )
@@ -501,7 +501,7 @@ class AllocationEngine:
501
  (round_id, operation, operator, from_source, to_target, amount_spec,
502
  affected_users, affected_count, item_ids_sample, item_ids_hash,
503
  dry_run, force, reason, created_at)
504
- VALUES (?, 'rebalance', ?, ?, ?, ?, ?, ?, ?, ?, 0, 0, ?, ?)
505
  """,
506
  (
507
  self.round_id,
@@ -571,23 +571,23 @@ class AllocationEngine:
571
  FROM tasks t
572
  LEFT JOIN task_config tc
573
  ON t.round_id = tc.round_id AND t.item_id = tc.item_id
574
- WHERE t.round_id = ?
575
  -- This specific user doesn't have this task
576
  AND NOT EXISTS (
577
  SELECT 1
578
  FROM assignments a
579
  WHERE a.round_id = t.round_id
580
  AND a.item_id = t.item_id
581
- AND a.user_id = ?
582
  )
583
  -- Check redundancy limit (current assignments < required)
584
  AND (
585
  SELECT COUNT(DISTINCT a2.user_id)
586
  FROM assignments a2
587
  WHERE a2.round_id = t.round_id AND a2.item_id = t.item_id
588
- ) < COALESCE(tc.redundancy_required, ?)
589
  -- Check redundancy completion
590
- AND COALESCE(tc.redundancy_completed, 0) < COALESCE(tc.redundancy_required, ?)
591
  ORDER BY t.order_key
592
  """,
593
  (self.round_id, user, redundancy, redundancy),
@@ -605,9 +605,9 @@ class AllocationEngine:
605
  SELECT DISTINCT a1.item_id
606
  FROM assignments a1
607
  LEFT JOIN task_config tc
608
- ON a1.round_id = tc.round_id AND a1.item_id = a1.item_id
609
- WHERE a1.round_id = ?
610
- AND a1.user_id = ?
611
  AND a1.status IN {status_filter}
612
  -- This specific target user doesn't have this task
613
  AND NOT EXISTS (
@@ -615,16 +615,16 @@ class AllocationEngine:
615
  FROM assignments a2
616
  WHERE a2.round_id = a1.round_id
617
  AND a2.item_id = a1.item_id
618
- AND a2.user_id = ?
619
  )
620
  -- Check redundancy limit
621
  AND (
622
  SELECT COUNT(DISTINCT a3.user_id)
623
  FROM assignments a3
624
  WHERE a3.round_id = a1.round_id AND a3.item_id = a1.item_id
625
- ) < COALESCE(tc.redundancy_required, ?)
626
  -- Check redundancy completion
627
- AND COALESCE(tc.redundancy_completed, 0) < COALESCE(tc.redundancy_required, ?)
628
  ORDER BY a1.assigned_at
629
  """,
630
  (self.round_id, source_user_id, user, redundancy, redundancy),
@@ -825,17 +825,17 @@ class AllocationEngine:
825
  all_assigned_items = [iid for items in distribution.values() for iid in items]
826
 
827
  with self.db._connect() as conn, self.db._lock:
828
- conn.execute('BEGIN IMMEDIATE')
829
 
830
  try:
831
  # 1. If transferring from a user, delete their assignments (no skipped state)
832
  if from_source.startswith('user:'):
833
  source_user = from_source[5:]
834
- placeholders = ','.join('?' * len(all_assigned_items))
835
  conn.execute(
836
  f"""
837
  DELETE FROM assignments
838
- WHERE round_id = ? AND user_id = ? AND item_id IN ({placeholders})
839
  """,
840
  (self.round_id, source_user, *all_assigned_items),
841
  )
@@ -849,7 +849,7 @@ class AllocationEngine:
849
  """
850
  SELECT 1
851
  FROM assignments
852
- WHERE round_id = ? AND item_id = ? AND user_id = ?
853
  """,
854
  (self.round_id, item_id, user),
855
  )
@@ -862,7 +862,7 @@ class AllocationEngine:
862
  """
863
  SELECT COUNT(DISTINCT user_id)
864
  FROM assignments
865
- WHERE round_id = ? AND item_id = ?
866
  """,
867
  (self.round_id, item_id),
868
  )
@@ -872,7 +872,7 @@ class AllocationEngine:
872
  """
873
  INSERT INTO assignments
874
  (round_id, item_id, user_id, status, assigned_at, redundancy_slot)
875
- VALUES (?, ?, ?, 'pending', ?, ?)
876
  """,
877
  (self.round_id, item_id, user, now, next_slot),
878
  )
@@ -884,7 +884,7 @@ class AllocationEngine:
884
  (round_id, operation, operator, from_source, to_target, amount_spec,
885
  affected_users, affected_count, item_ids_sample, item_ids_hash,
886
  dry_run, force, reason, created_at)
887
- VALUES (?, 'allocate', ?, ?, ?, ?, ?, ?, ?, ?, 0, ?, ?, ?)
888
  """,
889
  (
890
  self.round_id,
 
102
  """Check if users exist in this round (warn for new users)."""
103
  with db._connect() as conn:
104
  # Check if assignments table exists
105
+ cur = conn.execute("SELECT to_regclass('public.assignments')")
106
+ if cur.fetchone()[0] is None:
107
  return # New schema not initialized, skip check
108
 
109
  for user in users:
110
  cur = conn.execute(
111
+ 'SELECT COUNT(*) FROM assignments WHERE round_id = %s AND user_id = %s', (round_id, user)
112
  )
113
  count = cur.fetchone()[0]
114
  if count == 0:
 
240
  """
241
  SELECT item_id
242
  FROM assignments
243
+ WHERE round_id = %s AND user_id = %s AND status = 'pending'
244
  ORDER BY assigned_at
245
  """,
246
  (self.round_id, from_user),
 
274
  # Execute release
275
  now = datetime.now(timezone.utc).isoformat()
276
  with self.db._connect() as conn, self.db._lock:
277
+ placeholders = ','.join(['%s'] * len(to_release))
278
  conn.execute(
279
  f"""
280
  DELETE FROM assignments
281
+ WHERE round_id = %s AND user_id = %s AND item_id IN ({placeholders})
282
  """,
283
  (self.round_id, from_user, *to_release),
284
  )
 
290
  (round_id, operation, operator, from_source, to_target, amount_spec,
291
  affected_users, affected_count, item_ids_sample, item_ids_hash,
292
  dry_run, force, reason, created_at)
293
+ VALUES (%s, 'release', %s, %s, 'pool', %s, %s, %s, %s, %s, 0, 0, %s, %s)
294
  """,
295
  (
296
  self.round_id,
 
328
  """
329
  # 1. Collect all unique pending tasks from these users (avoid duplicates for redundancy > 1)
330
  with self.db._connect() as conn:
331
+ placeholders_users = ','.join(['%s'] * len(users))
332
  cur = conn.execute(
333
  f"""
334
  SELECT DISTINCT item_id
335
  FROM assignments
336
+ WHERE round_id = %s AND user_id IN ({placeholders_users}) AND status = 'pending'
337
  """,
338
  (self.round_id, *users),
339
  )
 
351
  """
352
  SELECT redundancy_required
353
  FROM task_config
354
+ WHERE round_id = %s AND item_id = %s
355
  """,
356
  (self.round_id, item_id),
357
  )
 
359
  redundancy_required = row[0] if row else 1
360
 
361
  # Count current assignments among rebalance users
362
+ placeholders = ','.join(['%s'] * len(users))
363
  cur = conn.execute(
364
  f"""
365
  SELECT COUNT(*)
366
  FROM assignments
367
+ WHERE round_id = %s AND item_id = %s AND user_id IN ({placeholders}) AND status = 'pending'
368
  """,
369
  (self.round_id, item_id, *users),
370
  )
 
402
  user_available_tasks[user] = []
403
  continue
404
  # exclude items that the user already has
405
+ placeholders = ','.join(['%s'] * len(all_pending))
406
  cur = conn.execute(
407
  f"""
408
  SELECT x.item_id
409
  FROM (
410
+ SELECT %s AS round_id, %s AS user_id
411
  ) p
412
  JOIN (
413
  SELECT item_id FROM (
414
+ VALUES {','.join(['(%s)'] * len(all_pending))}
415
  ) AS v(item_id)
416
  ) AS x ON 1=1
417
  WHERE NOT EXISTS (
 
443
  # 7. Execute rebalance (delete old assignments, insert new ones)
444
  now = datetime.now(timezone.utc).isoformat()
445
  with self.db._connect() as conn, self.db._lock:
446
+ conn.execute('BEGIN')
447
 
448
  try:
449
  # Delete all pending tasks from these users
450
+ placeholders_users = ','.join(['%s'] * len(users))
451
  conn.execute(
452
  f"""
453
  DELETE FROM assignments
454
+ WHERE round_id = %s AND user_id IN ({placeholders_users}) AND status = 'pending'
455
  """,
456
  (self.round_id, *users),
457
  )
 
465
  """
466
  SELECT 1
467
  FROM assignments
468
+ WHERE round_id = %s AND item_id = %s AND user_id = %s
469
  """,
470
  (self.round_id, item_id, user),
471
  )
 
478
  """
479
  SELECT COUNT(DISTINCT user_id)
480
  FROM assignments
481
+ WHERE round_id = %s AND item_id = %s
482
  """,
483
  (self.round_id, item_id),
484
  )
 
489
  """
490
  INSERT INTO assignments
491
  (round_id, item_id, user_id, status, assigned_at, redundancy_slot)
492
+ VALUES (%s, %s, %s, 'pending', %s, %s)
493
  """,
494
  (self.round_id, item_id, user, now, next_slot),
495
  )
 
501
  (round_id, operation, operator, from_source, to_target, amount_spec,
502
  affected_users, affected_count, item_ids_sample, item_ids_hash,
503
  dry_run, force, reason, created_at)
504
+ VALUES (%s, 'rebalance', %s, %s, %s, %s, %s, %s, %s, %s, 0, 0, %s, %s)
505
  """,
506
  (
507
  self.round_id,
 
571
  FROM tasks t
572
  LEFT JOIN task_config tc
573
  ON t.round_id = tc.round_id AND t.item_id = tc.item_id
574
+ WHERE t.round_id = %s
575
  -- This specific user doesn't have this task
576
  AND NOT EXISTS (
577
  SELECT 1
578
  FROM assignments a
579
  WHERE a.round_id = t.round_id
580
  AND a.item_id = t.item_id
581
+ AND a.user_id = %s
582
  )
583
  -- Check redundancy limit (current assignments < required)
584
  AND (
585
  SELECT COUNT(DISTINCT a2.user_id)
586
  FROM assignments a2
587
  WHERE a2.round_id = t.round_id AND a2.item_id = t.item_id
588
+ ) < COALESCE(tc.redundancy_required, %s)
589
  -- Check redundancy completion
590
+ AND COALESCE(tc.redundancy_completed, 0) < COALESCE(tc.redundancy_required, %s)
591
  ORDER BY t.order_key
592
  """,
593
  (self.round_id, user, redundancy, redundancy),
 
605
  SELECT DISTINCT a1.item_id
606
  FROM assignments a1
607
  LEFT JOIN task_config tc
608
+ ON a1.round_id = tc.round_id AND tc.item_id = a1.item_id
609
+ WHERE a1.round_id = %s
610
+ AND a1.user_id = %s
611
  AND a1.status IN {status_filter}
612
  -- This specific target user doesn't have this task
613
  AND NOT EXISTS (
 
615
  FROM assignments a2
616
  WHERE a2.round_id = a1.round_id
617
  AND a2.item_id = a1.item_id
618
+ AND a2.user_id = %s
619
  )
620
  -- Check redundancy limit
621
  AND (
622
  SELECT COUNT(DISTINCT a3.user_id)
623
  FROM assignments a3
624
  WHERE a3.round_id = a1.round_id AND a3.item_id = a1.item_id
625
+ ) < COALESCE(tc.redundancy_required, %s)
626
  -- Check redundancy completion
627
+ AND COALESCE(tc.redundancy_completed, 0) < COALESCE(tc.redundancy_required, %s)
628
  ORDER BY a1.assigned_at
629
  """,
630
  (self.round_id, source_user_id, user, redundancy, redundancy),
 
825
  all_assigned_items = [iid for items in distribution.values() for iid in items]
826
 
827
  with self.db._connect() as conn, self.db._lock:
828
+ conn.execute('BEGIN')
829
 
830
  try:
831
  # 1. If transferring from a user, delete their assignments (no skipped state)
832
  if from_source.startswith('user:'):
833
  source_user = from_source[5:]
834
+ placeholders = ','.join(['%s'] * len(all_assigned_items))
835
  conn.execute(
836
  f"""
837
  DELETE FROM assignments
838
+ WHERE round_id = %s AND user_id = %s AND item_id IN ({placeholders})
839
  """,
840
  (self.round_id, source_user, *all_assigned_items),
841
  )
 
849
  """
850
  SELECT 1
851
  FROM assignments
852
+ WHERE round_id = %s AND item_id = %s AND user_id = %s
853
  """,
854
  (self.round_id, item_id, user),
855
  )
 
862
  """
863
  SELECT COUNT(DISTINCT user_id)
864
  FROM assignments
865
+ WHERE round_id = %s AND item_id = %s
866
  """,
867
  (self.round_id, item_id),
868
  )
 
872
  """
873
  INSERT INTO assignments
874
  (round_id, item_id, user_id, status, assigned_at, redundancy_slot)
875
+ VALUES (%s, %s, %s, 'pending', %s, %s)
876
  """,
877
  (self.round_id, item_id, user, now, next_slot),
878
  )
 
884
  (round_id, operation, operator, from_source, to_target, amount_spec,
885
  affected_users, affected_count, item_ids_sample, item_ids_hash,
886
  dry_run, force, reason, created_at)
887
+ VALUES (%s, 'allocate', %s, %s, %s, %s, %s, %s, %s, %s, 0, %s, %s, %s)
888
  """,
889
  (
890
  self.round_id,
src/database.py CHANGED
@@ -2,13 +2,14 @@
2
 
3
  import json
4
  import os
5
- import sqlite3
6
  import subprocess
7
  import threading
8
  from contextlib import contextmanager
9
  from datetime import datetime, timezone
10
  from typing import Optional
11
 
 
 
12
  from huggingface_hub import CommitOperationAdd, HfApi
13
 
14
  REQUIRED_TABLES = ('tasks', 'assignments', 'task_config', 'answers', 'allocation_history')
@@ -22,35 +23,158 @@ class DB:
22
  """Database for the evaluation."""
23
 
24
  def __init__(self, db_path: str, schema_path: Optional[str] = None, verify_only: bool = True):
25
- """Initialize the database."""
26
- os.makedirs(os.path.dirname(db_path) or '.', exist_ok=True)
27
- self.db_path = db_path
 
 
 
28
  self._lock = threading.Lock()
29
 
30
- with self._connect() as conn:
31
- if schema_path is not None and os.path.exists(schema_path):
32
- with open(schema_path, encoding='utf-8') as f:
33
- schema_sql = f.read()
34
- conn.executescript(schema_sql)
35
- elif verify_only:
 
 
 
 
 
 
 
 
36
  missing = []
37
  for t in REQUIRED_TABLES:
38
- cur = conn.execute('SELECT name FROM sqlite_master WHERE type="table" AND name=?', (t,))
39
- if cur.fetchone() is None:
 
40
  missing.append(t)
41
  if missing:
42
  raise FileNotFoundError(f'Missing required tables: {", ".join(missing)}')
43
- else:
44
- raise FileNotFoundError(f'SQL schema file not found at: {schema_path or "None"}')
45
 
46
  @contextmanager
47
  def _connect(self):
48
- conn = sqlite3.connect(self.db_path, timeout=30, isolation_level=None) # autocommit
49
- conn.execute('PRAGMA foreign_keys=ON;')
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
50
  try:
51
- yield conn
52
  finally:
53
- conn.close()
 
 
 
 
 
54
 
55
  # ========== Answer Recording ==========
56
 
@@ -68,9 +192,15 @@ class DB:
68
  now = _get_time_now()
69
  with self._connect() as conn, self._lock:
70
  conn.execute(
71
- 'INSERT OR REPLACE INTO answers(round_id, user_id, item_id, label, image_path, '
72
- 'score, words_not_present, answered_at) '
73
- 'VALUES(?,?,?,?,?,?,?,?)',
 
 
 
 
 
 
74
  (
75
  round_id,
76
  user_id,
@@ -87,7 +217,7 @@ class DB:
87
  """Get all the answered item ids of a user in a round."""
88
  with self._connect() as conn:
89
  cur = conn.execute(
90
- 'SELECT item_id FROM answers WHERE round_id=? AND user_id=?',
91
  (round_id, user_id),
92
  )
93
  return {r[0] for r in cur.fetchall()} # fetchall returns a list of tuples
@@ -102,7 +232,7 @@ class DB:
102
  """
103
  with self._connect() as conn:
104
  cur = conn.execute(
105
- 'SELECT score, words_not_present FROM answers WHERE round_id=? AND user_id=? AND item_id=?',
106
  (round_id, user_id, item_id),
107
  )
108
  row = cur.fetchone()
@@ -125,9 +255,9 @@ class DB:
125
  """
126
  UPDATE assignments
127
  SET status = 'pending', lease_until = NULL, started_at = NULL
128
- WHERE round_id = ?
129
- AND status = 'in_progress'
130
- AND lease_until < ?
131
  """,
132
  (round_id, now),
133
  )
@@ -139,7 +269,6 @@ class DB:
139
  space_id = os.getenv('SPACE_ID', None)
140
  if not space_id:
141
  return
142
-
143
  try:
144
  repo_root = subprocess.run(
145
  ['git', 'rev-parse', '--show-toplevel'],
 
2
 
3
  import json
4
  import os
 
5
  import subprocess
6
  import threading
7
  from contextlib import contextmanager
8
  from datetime import datetime, timezone
9
  from typing import Optional
10
 
11
+ import psycopg2
12
+ import psycopg2.extras
13
  from huggingface_hub import CommitOperationAdd, HfApi
14
 
15
  REQUIRED_TABLES = ('tasks', 'assignments', 'task_config', 'answers', 'allocation_history')
 
23
  """Database for the evaluation."""
24
 
25
  def __init__(self, db_path: str, schema_path: Optional[str] = None, verify_only: bool = True):
26
+ """Initialize the database.
27
+
28
+ Note: db_path/schema_path kept for compatibility; Postgres uses env vars.
29
+ Required env vars: PGHOST, PGPORT, PGUSER, PGPASSWORD, PGDATABASE
30
+ """
31
+ self.db_path = db_path # kept for compatibility
32
  self._lock = threading.Lock()
33
 
34
+ # Validate environment variables early
35
+ self._pg_dsn = dict(
36
+ host=os.environ.get('PGHOST'),
37
+ port=int(os.environ.get('PGPORT')) if os.environ.get('PGPORT') else None,
38
+ user=os.environ.get('PGUSER'),
39
+ password=os.environ.get('PGPASSWORD'),
40
+ database=os.environ.get('PGDATABASE'),
41
+ )
42
+ if not all([self._pg_dsn['host'], self._pg_dsn['user'], self._pg_dsn['password'], self._pg_dsn['database']]):
43
+ raise OSError('Missing PostgreSQL env vars: PGHOST, PGUSER, PGPASSWORD, PGDATABASE')
44
+
45
+ # Verify required tables exist
46
+ if verify_only:
47
+ with self._connect() as conn:
48
  missing = []
49
  for t in REQUIRED_TABLES:
50
+ cur = conn.execute('SELECT to_regclass(%s)', (f'public.{t}',))
51
+ exists = cur.fetchone()[0]
52
+ if exists is None:
53
  missing.append(t)
54
  if missing:
55
  raise FileNotFoundError(f'Missing required tables: {", ".join(missing)}')
 
 
56
 
57
  @contextmanager
58
  def _connect(self):
59
+ """Context manager yielding a lightweight wrapper with conn.execute(...).
60
+
61
+ - Uses psycopg2 with autocommit by default (emulates sqlite autocommit usage).
62
+ - Supports BEGIN/COMMIT/ROLLBACK issued via execute for backward compatibility.
63
+ """
64
+
65
+ class _CursorWrapper:
66
+ def __init__(self, cursor):
67
+ self._cursor = cursor
68
+
69
+ def fetchone(self):
70
+ return self._cursor.fetchone()
71
+
72
+ def fetchall(self):
73
+ return self._cursor.fetchall()
74
+
75
+ @property
76
+ def rowcount(self):
77
+ return self._cursor.rowcount
78
+
79
+ class _ConnWrapper:
80
+ def __init__(self, real_conn):
81
+ self._conn = real_conn
82
+
83
+ def execute(self, sql: str, params: tuple | list | None = None):
84
+ sql_strip = (sql or '').strip().upper()
85
+ # Map transaction control statements
86
+ if sql_strip.startswith('BEGIN'):
87
+ self._conn.autocommit = False
88
+ with self._conn.cursor() as c:
89
+ c.execute('BEGIN;')
90
+ # Return dummy wrapper
91
+ return _CursorWrapper(c)
92
+ if sql_strip == 'COMMIT':
93
+ try:
94
+ self._conn.commit()
95
+ finally:
96
+ self._conn.autocommit = True
97
+ # Return a cursor with no data
98
+ with self._conn.cursor() as c:
99
+ return _CursorWrapper(c)
100
+ if sql_strip == 'ROLLBACK':
101
+ try:
102
+ self._conn.rollback()
103
+ finally:
104
+ self._conn.autocommit = True
105
+ with self._conn.cursor() as c:
106
+ return _CursorWrapper(c)
107
+
108
+ # Regular statement
109
+ with self._conn.cursor() as c:
110
+ c.execute(sql, params or ())
111
+ # For SELECTs, we need a cursor able to fetch after context ends.
112
+ # Use named cursor copy via RealDictCursor? Simpler: re-execute on a new cursor kept open.
113
+ # Instead, materialize results if it's a query returning rows.
114
+ if c.description is not None:
115
+ rows = c.fetchall()
116
+
117
+ # Create a lightweight object mimicking cursor
118
+ class _Mat:
119
+ def __init__(self, rows):
120
+ self._rows = rows
121
+
122
+ def fetchone(self):
123
+ return self._rows[0] if self._rows else None
124
+
125
+ def fetchall(self):
126
+ return list(self._rows)
127
+
128
+ @property
129
+ def rowcount(self):
130
+ return len(self._rows)
131
+
132
+ return _Mat(rows)
133
+ else:
134
+ # Non-SELECT: return wrapper with rowcount captured
135
+ rc = c.rowcount
136
+
137
+ class _Affect:
138
+ def __init__(self, rc):
139
+ self._rc = rc
140
+
141
+ def fetchone(self):
142
+ return None
143
+
144
+ def fetchall(self):
145
+ return []
146
+
147
+ @property
148
+ def rowcount(self):
149
+ return self._rc
150
+
151
+ return _Affect(rc)
152
+
153
+ def __enter__(self):
154
+ return self
155
+
156
+ def __exit__(self, exc_type, exc, tb):
157
+ # If a transaction is open and no explicit commit/rollback occurred, rollback on error
158
+ if not self._conn.autocommit:
159
+ if exc is None:
160
+ self._conn.commit()
161
+ else:
162
+ self._conn.rollback()
163
+ self._conn.close()
164
+
165
+ # Open a new psycopg2 connection per context
166
+ conn = psycopg2.connect(**{k: v for k, v in self._pg_dsn.items() if v is not None})
167
+ conn.autocommit = True
168
+ wrapper = _ConnWrapper(conn)
169
  try:
170
+ yield wrapper
171
  finally:
172
+ # _ConnWrapper.__exit__ handles close when used with "with". If not, ensure close here.
173
+ try:
174
+ if not conn.closed:
175
+ conn.close()
176
+ except Exception:
177
+ pass
178
 
179
  # ========== Answer Recording ==========
180
 
 
192
  now = _get_time_now()
193
  with self._connect() as conn, self._lock:
194
  conn.execute(
195
+ (
196
+ 'INSERT INTO answers (round_id, user_id, item_id, label, image_path, '
197
+ 'score, words_not_present, answered_at) '
198
+ 'VALUES (%s, %s, %s, %s, %s, %s, %s, %s) '
199
+ 'ON CONFLICT (round_id, user_id, item_id) DO UPDATE SET '
200
+ 'score = EXCLUDED.score, '
201
+ 'words_not_present = EXCLUDED.words_not_present, '
202
+ 'answered_at = EXCLUDED.answered_at'
203
+ ),
204
  (
205
  round_id,
206
  user_id,
 
217
  """Get all the answered item ids of a user in a round."""
218
  with self._connect() as conn:
219
  cur = conn.execute(
220
+ 'SELECT item_id FROM answers WHERE round_id=%s AND user_id=%s',
221
  (round_id, user_id),
222
  )
223
  return {r[0] for r in cur.fetchall()} # fetchall returns a list of tuples
 
232
  """
233
  with self._connect() as conn:
234
  cur = conn.execute(
235
+ 'SELECT score, words_not_present FROM answers WHERE round_id=%s AND user_id=%s AND item_id=%s',
236
  (round_id, user_id, item_id),
237
  )
238
  row = cur.fetchone()
 
255
  """
256
  UPDATE assignments
257
  SET status = 'pending', lease_until = NULL, started_at = NULL
258
+ WHERE round_id = %s
259
+ AND status = 'in_progress'
260
+ AND lease_until < %s
261
  """,
262
  (round_id, now),
263
  )
 
269
  space_id = os.getenv('SPACE_ID', None)
270
  if not space_id:
271
  return
 
272
  try:
273
  repo_root = subprocess.run(
274
  ['git', 'rev-parse', '--show-toplevel'],
src/eval_server.py CHANGED
@@ -211,7 +211,7 @@ def app_main(args) -> None:
211
  SELECT t.item_id, t.label, t.path, t.words, t.order_key, a.status
212
  FROM assignments a
213
  JOIN tasks t ON a.round_id = t.round_id AND a.item_id = t.item_id
214
- WHERE a.round_id = ? AND a.user_id = ?
215
  ORDER BY t.order_key
216
  """,
217
  (args.round_id, user_id),
@@ -428,7 +428,7 @@ def app_main(args) -> None:
428
  # Get item info
429
  with db._connect() as conn:
430
  cur = conn.execute(
431
- 'SELECT label, path FROM tasks WHERE round_id = ? AND item_id = ?',
432
  (args.round_id, current_item_id),
433
  )
434
  row = cur.fetchone()
@@ -450,8 +450,8 @@ def app_main(args) -> None:
450
  conn.execute(
451
  """
452
  UPDATE assignments
453
- SET status = 'completed', completed_at = ?, lease_until = NULL
454
- WHERE round_id = ? AND item_id = ? AND user_id = ?
455
  """,
456
  (now, args.round_id, current_item_id, user_id),
457
  )
@@ -462,9 +462,9 @@ def app_main(args) -> None:
462
  UPDATE task_config
463
  SET redundancy_completed = (
464
  SELECT COUNT(*) FROM assignments
465
- WHERE round_id = ? AND item_id = ? AND status = 'completed'
466
  )
467
- WHERE round_id = ? AND item_id = ?
468
  """,
469
  (args.round_id, current_item_id, args.round_id, current_item_id),
470
  )
@@ -479,16 +479,16 @@ def app_main(args) -> None:
479
  (i for i, item in enumerate(items_list) if item['item_id'] == current_item_id), None
480
  )
481
 
482
- # db auto commit
483
- if (
484
- args.auto_commit > 0
485
- and current_idx_in_list is not None
486
- and (current_idx_in_list + 1) % args.auto_commit == 0
487
- ):
488
- try:
489
- db.commit_and_push_db()
490
- except Exception as e:
491
- print(f'Failed to commit and push the database: {e}')
492
 
493
  if current_idx_in_list is None or current_idx_in_list >= len(items_list) - 1:
494
  # This was the last item - show completion message but stay on current item
@@ -546,8 +546,8 @@ def app_main(args) -> None:
546
  conn.execute(
547
  """
548
  UPDATE assignments
549
- SET status = 'in_progress', started_at = ?, lease_until = ?
550
- WHERE round_id = ? AND item_id = ? AND user_id = ?
551
  """,
552
  (now_dt.isoformat(), lease_until, args.round_id, item_id, user_id),
553
  )
@@ -559,8 +559,8 @@ def app_main(args) -> None:
559
  conn.execute(
560
  """
561
  UPDATE assignments
562
- SET lease_until = ?
563
- WHERE round_id = ? AND item_id = ? AND user_id = ?
564
  """,
565
  (lease_until, args.round_id, item_id, user_id),
566
  )
 
211
  SELECT t.item_id, t.label, t.path, t.words, t.order_key, a.status
212
  FROM assignments a
213
  JOIN tasks t ON a.round_id = t.round_id AND a.item_id = t.item_id
214
+ WHERE a.round_id = %s AND a.user_id = %s
215
  ORDER BY t.order_key
216
  """,
217
  (args.round_id, user_id),
 
428
  # Get item info
429
  with db._connect() as conn:
430
  cur = conn.execute(
431
+ 'SELECT label, path FROM tasks WHERE round_id = %s AND item_id = %s',
432
  (args.round_id, current_item_id),
433
  )
434
  row = cur.fetchone()
 
450
  conn.execute(
451
  """
452
  UPDATE assignments
453
+ SET status = 'completed', completed_at = %s, lease_until = NULL
454
+ WHERE round_id = %s AND item_id = %s AND user_id = %s
455
  """,
456
  (now, args.round_id, current_item_id, user_id),
457
  )
 
462
  UPDATE task_config
463
  SET redundancy_completed = (
464
  SELECT COUNT(*) FROM assignments
465
+ WHERE round_id = %s AND item_id = %s AND status = 'completed'
466
  )
467
+ WHERE round_id = %s AND item_id = %s
468
  """,
469
  (args.round_id, current_item_id, args.round_id, current_item_id),
470
  )
 
479
  (i for i, item in enumerate(items_list) if item['item_id'] == current_item_id), None
480
  )
481
 
482
+ # # db auto commit
483
+ # if (
484
+ # args.auto_commit > 0
485
+ # and current_idx_in_list is not None
486
+ # and (current_idx_in_list + 1) % args.auto_commit == 0
487
+ # ):
488
+ # try:
489
+ # db.commit_and_push_db()
490
+ # except Exception as e:
491
+ # print(f'Failed to commit and push the database: {e}')
492
 
493
  if current_idx_in_list is None or current_idx_in_list >= len(items_list) - 1:
494
  # This was the last item - show completion message but stay on current item
 
546
  conn.execute(
547
  """
548
  UPDATE assignments
549
+ SET status = 'in_progress', started_at = %s, lease_until = %s
550
+ WHERE round_id = %s AND item_id = %s AND user_id = %s
551
  """,
552
  (now_dt.isoformat(), lease_until, args.round_id, item_id, user_id),
553
  )
 
559
  conn.execute(
560
  """
561
  UPDATE assignments
562
+ SET lease_until = %s
563
+ WHERE round_id = %s AND item_id = %s AND user_id = %s
564
  """,
565
  (lease_until, args.round_id, item_id, user_id),
566
  )