| import time | |
| import logging | |
| from typing import List, Dict, Optional, Union | |
| from core.providers.base import LLMProvider | |
| from utils.config import config | |
| logger = logging.getLogger(__name__) | |
| try: | |
| from openai import OpenAI | |
| OPENAI_SDK_AVAILABLE = True | |
| except ImportError: | |
| OPENAI_SDK_AVAILABLE = False | |
| OpenAI = None | |
| class OpenAIProvider(LLMProvider): | |
| """OpenAI LLM provider implementation""" | |
| def __init__(self, model_name: str, timeout: int = 30, max_retries: int = 3): | |
| super().__init__(model_name, timeout, max_retries) | |
| if not OPENAI_SDK_AVAILABLE: | |
| raise ImportError("OpenAI provider requires 'openai' package") | |
| if not config.openai_api_key: | |
| raise ValueError("OPENAI_API_KEY not set - required for OpenAI provider") | |
| self.client = OpenAI(api_key=config.openai_api_key) | |
| def generate(self, prompt: str, conversation_history: List[Dict]) -> Optional[str]: | |
| """Generate a response synchronously""" | |
| try: | |
| return self._retry_with_backoff(self._generate_impl, prompt, conversation_history) | |
| except Exception as e: | |
| logger.error(f"OpenAI generation failed: {e}") | |
| return None | |
| def stream_generate(self, prompt: str, conversation_history: List[Dict]) -> Optional[Union[str, List[str]]]: | |
| """Generate a response with streaming support""" | |
| try: | |
| return self._retry_with_backoff(self._stream_generate_impl, prompt, conversation_history) | |
| except Exception as e: | |
| logger.error(f"OpenAI stream generation failed: {e}") | |
| return None | |
| def validate_model(self) -> bool: | |
| """Validate if the model is available""" | |
| try: | |
| models = self.client.models.list() | |
| model_ids = [model.id for model in models.data] | |
| return self.model_name in model_ids | |
| except Exception as e: | |
| logger.warning(f"OpenAI model validation failed: {e}") | |
| return False | |
| def _generate_impl(self, prompt: str, conversation_history: List[Dict]) -> str: | |
| """Implementation of synchronous generation""" | |
| response = self.client.chat.completions.create( | |
| model=self.model_name, | |
| messages=conversation_history, | |
| max_tokens=500, | |
| temperature=0.7 | |
| ) | |
| return response.choices[0].message.content | |
| def _stream_generate_impl(self, prompt: str, conversation_history: List[Dict]) -> List[str]: | |
| """Implementation of streaming generation""" | |
| response = self.client.chat.completions.create( | |
| model=self.model_name, | |
| messages=conversation_history, | |
| max_tokens=500, | |
| temperature=0.7, | |
| stream=True | |
| ) | |
| chunks = [] | |
| for chunk in response: | |
| content = chunk.choices[0].delta.content | |
| if content: | |
| chunks.append(content) | |
| return chunks | |