Spaces:
Paused
Paused
| import os | |
| import sys | |
| import traceback | |
| import uuid | |
| from typing import List | |
| from datetime import datetime, timezone, timedelta | |
| from dotenv import load_dotenv | |
| from fastapi import Request | |
| from fastapi.routing import APIRoute | |
| import httpx | |
| import json | |
| from unittest.mock import MagicMock, patch | |
| load_dotenv() | |
| import io | |
| import os | |
| import time | |
| import fakeredis | |
| # this file is to test litellm/proxy | |
| sys.path.insert( | |
| 0, os.path.abspath("../..") | |
| ) # Adds the parent directory to the system path | |
| import asyncio | |
| import logging | |
| import pytest | |
| from litellm.proxy.db.db_transaction_queue.pod_lock_manager import PodLockManager | |
| import litellm | |
| from litellm._logging import verbose_proxy_logger | |
| from litellm.proxy.management_endpoints.internal_user_endpoints import ( | |
| new_user, | |
| user_info, | |
| user_update, | |
| ) | |
| from litellm.proxy.auth.auth_checks import get_key_object | |
| from litellm.proxy.management_endpoints.key_management_endpoints import ( | |
| delete_key_fn, | |
| generate_key_fn, | |
| generate_key_helper_fn, | |
| info_key_fn, | |
| list_keys, | |
| regenerate_key_fn, | |
| update_key_fn, | |
| ) | |
| from litellm.proxy.management_endpoints.team_endpoints import ( | |
| new_team, | |
| team_info, | |
| update_team, | |
| ) | |
| from litellm.proxy.proxy_server import ( | |
| LitellmUserRoles, | |
| audio_transcriptions, | |
| chat_completion, | |
| completion, | |
| embeddings, | |
| model_list, | |
| moderations, | |
| user_api_key_auth, | |
| ) | |
| from litellm.proxy.management_endpoints.customer_endpoints import ( | |
| new_end_user, | |
| ) | |
| from litellm.proxy.spend_tracking.spend_management_endpoints import ( | |
| global_spend, | |
| spend_key_fn, | |
| spend_user_fn, | |
| view_spend_logs, | |
| ) | |
| from litellm.proxy.utils import PrismaClient, ProxyLogging, hash_token, update_spend | |
| verbose_proxy_logger.setLevel(level=logging.DEBUG) | |
| from starlette.datastructures import URL | |
| from litellm.caching.caching import DualCache, RedisCache | |
| from litellm.proxy._types import ( | |
| DynamoDBArgs, | |
| GenerateKeyRequest, | |
| KeyRequest, | |
| LiteLLM_UpperboundKeyGenerateParams, | |
| NewCustomerRequest, | |
| NewTeamRequest, | |
| NewUserRequest, | |
| ProxyErrorTypes, | |
| ProxyException, | |
| UpdateKeyRequest, | |
| UpdateTeamRequest, | |
| UpdateUserRequest, | |
| UserAPIKeyAuth, | |
| ) | |
| proxy_logging_obj = ProxyLogging(user_api_key_cache=DualCache()) | |
| request_data = { | |
| "model": "azure-gpt-3.5", | |
| "messages": [ | |
| {"role": "user", "content": "this is my new test. respond in 50 lines"} | |
| ], | |
| } | |
| def prisma_client(): | |
| from litellm.proxy.proxy_cli import append_query_params | |
| ### add connection pool + pool timeout args | |
| params = {"connection_limit": 100, "pool_timeout": 60} | |
| database_url = os.getenv("DATABASE_URL") | |
| modified_url = append_query_params(database_url, params) | |
| os.environ["DATABASE_URL"] = modified_url | |
| # Assuming PrismaClient is a class that needs to be instantiated | |
| prisma_client = PrismaClient( | |
| database_url=os.environ["DATABASE_URL"], proxy_logging_obj=proxy_logging_obj | |
| ) | |
| # Reset litellm.proxy.proxy_server.prisma_client to None | |
| litellm.proxy.proxy_server.litellm_proxy_budget_name = ( | |
| f"litellm-proxy-budget-{time.time()}" | |
| ) | |
| litellm.proxy.proxy_server.user_custom_key_generate = None | |
| return prisma_client | |
| async def setup_db_connection(prisma_client): | |
| setattr(litellm.proxy.proxy_server, "prisma_client", prisma_client) | |
| setattr(litellm.proxy.proxy_server, "master_key", "sk-1234") | |
| await litellm.proxy.proxy_server.prisma_client.connect() | |
| async def test_pod_lock_acquisition_when_no_active_lock(): | |
| """Test if a pod can acquire a lock when no lock is active""" | |
| cronjob_id = str(uuid.uuid4()) | |
| global_redis_cache = RedisCache( | |
| host=os.getenv("REDIS_HOST"), | |
| port=os.getenv("REDIS_PORT"), | |
| password=os.getenv("REDIS_PASSWORD"), | |
| ) | |
| lock_manager = PodLockManager(redis_cache=global_redis_cache) | |
| # Attempt to acquire lock | |
| result = await lock_manager.acquire_lock( | |
| cronjob_id=cronjob_id, | |
| ) | |
| assert result == True, "Pod should be able to acquire lock when no lock exists" | |
| # Verify in database | |
| lock_key = PodLockManager.get_redis_lock_key(cronjob_id) | |
| lock_record = await global_redis_cache.async_get_cache(lock_key) | |
| print("lock_record=", lock_record) | |
| assert lock_record == lock_manager.pod_id | |
| async def test_pod_lock_acquisition_after_completion(): | |
| """Test if a new pod can acquire lock after previous pod completes""" | |
| cronjob_id = str(uuid.uuid4()) | |
| # First pod acquires and releases lock | |
| global_redis_cache = RedisCache( | |
| host=os.getenv("REDIS_HOST"), | |
| port=os.getenv("REDIS_PORT"), | |
| password=os.getenv("REDIS_PASSWORD"), | |
| ) | |
| first_lock_manager = PodLockManager(redis_cache=global_redis_cache) | |
| await first_lock_manager.acquire_lock( | |
| cronjob_id=cronjob_id, | |
| ) | |
| await first_lock_manager.release_lock( | |
| cronjob_id=cronjob_id, | |
| ) | |
| # Second pod attempts to acquire lock | |
| second_lock_manager = PodLockManager(redis_cache=global_redis_cache) | |
| result = await second_lock_manager.acquire_lock( | |
| cronjob_id=cronjob_id, | |
| ) | |
| assert result == True, "Second pod should acquire lock after first pod releases it" | |
| # Verify in redis | |
| lock_key = PodLockManager.get_redis_lock_key(cronjob_id) | |
| lock_record = await global_redis_cache.async_get_cache(lock_key) | |
| assert lock_record == second_lock_manager.pod_id | |
| async def test_pod_lock_acquisition_after_expiry(): | |
| """Test if a new pod can acquire lock after previous pod's lock expires""" | |
| cronjob_id = str(uuid.uuid4()) | |
| # First pod acquires lock | |
| global_redis_cache = RedisCache( | |
| host=os.getenv("REDIS_HOST"), | |
| port=os.getenv("REDIS_PORT"), | |
| password=os.getenv("REDIS_PASSWORD"), | |
| ) | |
| first_lock_manager = PodLockManager(redis_cache=global_redis_cache) | |
| await first_lock_manager.acquire_lock( | |
| cronjob_id=cronjob_id, | |
| ) | |
| # release the lock from the first pod | |
| await first_lock_manager.release_lock( | |
| cronjob_id=cronjob_id, | |
| ) | |
| # Second pod attempts to acquire lock | |
| second_lock_manager = PodLockManager(redis_cache=global_redis_cache) | |
| result = await second_lock_manager.acquire_lock( | |
| cronjob_id=cronjob_id, | |
| ) | |
| assert ( | |
| result == True | |
| ), "Second pod should acquire lock after first pod's lock expires" | |
| # Verify in redis | |
| lock_key = PodLockManager.get_redis_lock_key(cronjob_id) | |
| lock_record = await global_redis_cache.async_get_cache(lock_key) | |
| assert lock_record == second_lock_manager.pod_id | |
| async def test_pod_lock_release(): | |
| """Test if a pod can successfully release its lock""" | |
| cronjob_id = str(uuid.uuid4()) | |
| global_redis_cache = RedisCache( | |
| host=os.getenv("REDIS_HOST"), | |
| port=os.getenv("REDIS_PORT"), | |
| password=os.getenv("REDIS_PASSWORD"), | |
| ) | |
| lock_manager = PodLockManager(redis_cache=global_redis_cache) | |
| # Acquire and then release lock | |
| await lock_manager.acquire_lock( | |
| cronjob_id=cronjob_id, | |
| ) | |
| await lock_manager.release_lock( | |
| cronjob_id=cronjob_id, | |
| ) | |
| # Verify in redis | |
| lock_key = PodLockManager.get_redis_lock_key(cronjob_id) | |
| lock_record = await global_redis_cache.async_get_cache(lock_key) | |
| assert lock_record is None | |
| async def test_concurrent_lock_acquisition(): | |
| """Test that only one pod can acquire the lock when multiple pods try simultaneously""" | |
| cronjob_id = str(uuid.uuid4()) | |
| global_redis_cache = RedisCache( | |
| host=os.getenv("REDIS_HOST"), | |
| port=os.getenv("REDIS_PORT"), | |
| password=os.getenv("REDIS_PASSWORD"), | |
| ) | |
| # Create multiple lock managers simulating different pods | |
| lock_manager1 = PodLockManager(redis_cache=global_redis_cache) | |
| lock_manager2 = PodLockManager(redis_cache=global_redis_cache) | |
| lock_manager3 = PodLockManager(redis_cache=global_redis_cache) | |
| # Try to acquire locks concurrently | |
| results = await asyncio.gather( | |
| lock_manager1.acquire_lock( | |
| cronjob_id=cronjob_id, | |
| ), | |
| lock_manager2.acquire_lock( | |
| cronjob_id=cronjob_id, | |
| ), | |
| lock_manager3.acquire_lock( | |
| cronjob_id=cronjob_id, | |
| ), | |
| ) | |
| # Only one should succeed | |
| print("all results=", results) | |
| assert sum(results) == 1, "Only one pod should acquire the lock" | |
| # Verify in redis | |
| lock_key = PodLockManager.get_redis_lock_key(cronjob_id) | |
| lock_record = await global_redis_cache.async_get_cache(lock_key) | |
| assert lock_record in [ | |
| lock_manager1.pod_id, | |
| lock_manager2.pod_id, | |
| lock_manager3.pod_id, | |
| ] | |
| async def test_lock_acquisition_with_expired_ttl(): | |
| """Test that a pod can acquire a lock when existing lock has expired TTL""" | |
| cronjob_id = str(uuid.uuid4()) | |
| global_redis_cache = RedisCache( | |
| host=os.getenv("REDIS_HOST"), | |
| port=os.getenv("REDIS_PORT"), | |
| password=os.getenv("REDIS_PASSWORD"), | |
| ) | |
| first_lock_manager = PodLockManager(redis_cache=global_redis_cache) | |
| # First pod acquires lock with a very short TTL to simulate expiration | |
| short_ttl = 1 # 1 second | |
| lock_key = PodLockManager.get_redis_lock_key(cronjob_id) | |
| await global_redis_cache.async_set_cache( | |
| lock_key, | |
| first_lock_manager.pod_id, | |
| ttl=short_ttl, | |
| ) | |
| # Wait for the lock to expire | |
| await asyncio.sleep(short_ttl + 0.5) # Wait slightly longer than the TTL | |
| # Second pod tries to acquire without explicit release | |
| second_lock_manager = PodLockManager(redis_cache=global_redis_cache) | |
| result = await second_lock_manager.acquire_lock( | |
| cronjob_id=cronjob_id, | |
| ) | |
| assert result == True, "Should acquire lock when existing lock has expired TTL" | |
| # Verify in Redis | |
| lock_record = await global_redis_cache.async_get_cache(lock_key) | |
| print("lock_record=", lock_record) | |
| assert lock_record == second_lock_manager.pod_id | |
| async def test_release_expired_lock(): | |
| """Test that a pod cannot release a lock that has been taken over by another pod""" | |
| cronjob_id = str(uuid.uuid4()) | |
| global_redis_cache = RedisCache( | |
| host=os.getenv("REDIS_HOST"), | |
| port=os.getenv("REDIS_PORT"), | |
| password=os.getenv("REDIS_PASSWORD"), | |
| ) | |
| # First pod acquires lock with a very short TTL | |
| first_lock_manager = PodLockManager(redis_cache=global_redis_cache) | |
| short_ttl = 1 # 1 second | |
| lock_key = PodLockManager.get_redis_lock_key(cronjob_id) | |
| await global_redis_cache.async_set_cache( | |
| lock_key, | |
| first_lock_manager.pod_id, | |
| ttl=short_ttl, | |
| ) | |
| # Wait for the lock to expire | |
| await asyncio.sleep(short_ttl + 0.5) # Wait slightly longer than the TTL | |
| # Second pod acquires the lock | |
| second_lock_manager = PodLockManager(redis_cache=global_redis_cache) | |
| await second_lock_manager.acquire_lock( | |
| cronjob_id=cronjob_id, | |
| ) | |
| # First pod attempts to release its lock | |
| await first_lock_manager.release_lock( | |
| cronjob_id=cronjob_id, | |
| ) | |
| # Verify that second pod's lock is still active | |
| lock_record = await global_redis_cache.async_get_cache(lock_key) | |
| assert lock_record == second_lock_manager.pod_id | |
| async def test_e2e_size_of_redis_buffer(): | |
| """ | |
| Ensure that all elements from the redis queue's get flushed to the DB | |
| Goal of this is to ensure Redis does not blow up in size | |
| """ | |
| from litellm.proxy.db.db_spend_update_writer import DBSpendUpdateWriter | |
| from litellm.proxy.db.db_transaction_queue.base_update_queue import BaseUpdateQueue | |
| from litellm.caching import RedisCache | |
| import uuid | |
| redis_cache = RedisCache(host=os.getenv("REDIS_HOST"), port=os.getenv("REDIS_PORT"), password=os.getenv("REDIS_PASSWORD")) | |
| fake_redis_client = fakeredis.FakeAsyncRedis() | |
| redis_cache.redis_async_client = fake_redis_client | |
| setattr(litellm.proxy.proxy_server, "use_redis_transaction_buffer", True) | |
| db_writer = DBSpendUpdateWriter(redis_cache=redis_cache) | |
| # get all the queues | |
| initialized_queues: List[BaseUpdateQueue] = [] | |
| for attr in dir(db_writer): | |
| if isinstance(getattr(db_writer, attr), BaseUpdateQueue): | |
| initialized_queues.append(getattr(db_writer, attr)) | |
| # add mock data to each queue | |
| new_keys_added = [] | |
| for queue in initialized_queues: | |
| key = f"test_key_{queue.__class__.__name__}_{uuid.uuid4()}" | |
| new_keys_added.append(key) | |
| await queue.add_update({key: {"spend": 1.0, "entity_id": "test_entity_id", "entity_type": "user", "api_key": "test_api_key", "model": "test_model", "custom_llm_provider": "test_custom_llm_provider", "date": "2025-01-01", "prompt_tokens": 100, "completion_tokens": 100, "total_tokens": 200, "response_cost": 1.0, "api_requests": 1, "successful_requests": 1, "failed_requests": 0}}) | |
| print("initialized_queues=", initialized_queues) | |
| print("new_keys_added=", new_keys_added) | |
| # get the size of each queue | |
| for queue in initialized_queues: | |
| assert queue.update_queue.qsize() == 1, f"Queue {queue.__class__.__name__} was not initialized with mock data. Expected size 1, got {queue.update_queue.qsize()}" | |
| # flush from in-memory -> redis -> to DB | |
| with patch("litellm.proxy.db.db_spend_update_writer.PodLockManager.acquire_lock", return_value=True): | |
| await db_writer._commit_spend_updates_to_db_with_redis( | |
| prisma_client=MagicMock(), | |
| n_retry_times=3, | |
| proxy_logging_obj=MagicMock() | |
| ) | |
| # Verify all the keys were looked up in Redis | |
| keys = await fake_redis_client.keys("*") | |
| print("found keys even after flushing to DB", keys) | |
| assert len(keys) == 0, f"Expected Redis to be empty, but found keys: {keys}" | |