agentic-language-partner / src /app /conversation_core.py
mastefan's picture
Update src/app/conversation_core.py
8fc00ee verified
###############################################################
# conversation_core.py — Agentic Partner Core
###############################################################
import io
import re
from dataclasses import dataclass
from typing import List, Optional, Tuple
import numpy as np
from pydub import AudioSegment
import torch
from gtts import gTTS
from transformers import (
AutoTokenizer,
AutoModelForCausalLM,
pipeline,
)
from .config import get_user_dir
################################################################
# MODEL CONSTANTS
################################################################
QWEN_MODEL_NAME = "Qwen/Qwen2.5-1.5B-Instruct"
# CEFR control hints
CONTROL_PROMPTS = {
"A1": "Use extremely short, simple sentences and very basic vocabulary.",
"A2": "Use simple sentences and common everyday vocabulary.",
"B1": "Use moderately complex sentences and conversational vocabulary.",
"B2": "Use natural, fluent sentences with richer vocabulary.",
"C1": "Use complex, advanced sentences with nuanced expressions.",
"C2": "Use highly sophisticated, near-native language and style.",
}
# spoken language → TTS language
GTTS_LANG = {
"english": "en",
"spanish": "es",
"german": "de",
"russian": "ru",
"japanese": "ja",
"chinese": "zh-cn",
"korean": "ko",
"french": "fr",
"italian": "it",
}
################################################################
# GLOBAL MODELS
################################################################
_QWEN_TOKENIZER = None
_QWEN_MODEL = None
_WHISPER_PIPE = None
def load_partner_lm():
"""Load Qwen conversational model once."""
global _QWEN_TOKENIZER, _QWEN_MODEL
if _QWEN_MODEL is not None:
return _QWEN_TOKENIZER, _QWEN_MODEL
print("[conversation_core] loading:", QWEN_MODEL_NAME)
tok = AutoTokenizer.from_pretrained(QWEN_MODEL_NAME, trust_remote_code=True)
model = AutoModelForCausalLM.from_pretrained(
QWEN_MODEL_NAME,
torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32,
device_map="auto",
trust_remote_code=True,
)
_QWEN_TOKENIZER = tok
_QWEN_MODEL = model
return tok, model
def load_whisper_pipe():
"""Load Whisper ASR pipeline once."""
global _WHISPER_PIPE
if _WHISPER_PIPE is not None:
return _WHISPER_PIPE
print("[conversation_core] loading Whisper pipeline…")
_WHISPER_PIPE = pipeline(
"automatic-speech-recognition",
model="openai/whisper-small",
device="cpu",
)
return _WHISPER_PIPE
################################################################
# DATA STRUCTURE
################################################################
@dataclass
class ConversationTurn:
role: str
text: str
################################################################
# CLEANING LM OUTPUT
################################################################
def clean_assistant_reply(text: str) -> str:
"""Remove meta junk, labels, identity statements."""
if not text:
return ""
# Remove "assistant:" echo
text = re.sub(r"(?i)\bassistant\s*:\s*", "", text)
text = re.sub(r"(?i)\buser\s*:\s*", "", text)
# Remove bullet lists (not desired in conversation)
text = re.sub(r"(?m)^\s*[-•*]\s+.*$", "", text)
text = re.sub(r"(?m)^\s*\d+\.\s+.*$", "", text)
# Remove identity claims
identity_patterns = [
r"(?i)i am an ai.*",
r"(?i)i am a large language model.*",
r"(?i)i was created.*",
r"(?i)my name is .*",
]
for p in identity_patterns:
text = re.sub(p, "", text)
text = re.sub(r"\s{2,}", " ", text)
return text.strip()
################################################################
# CONVERSATION MANAGER
################################################################
class ConversationManager:
def __init__(
self,
target_language="german",
native_language="english",
cefr_level="B1",
topic="general conversation",
):
self.target_language = target_language.lower()
self.native_language = native_language.lower()
self.cefr_level = cefr_level.upper()
self.topic = topic
self.history: List[ConversationTurn] = []
load_partner_lm()
load_whisper_pipe()
################################################################
# SYSTEM PROMPT
################################################################
def _build_system_prompt(self):
base = (
f"You are a friendly conversation partner speaking {self.target_language}. "
f"Reply ONLY in {self.target_language}. "
f"Adapt your language to CEFR level {self.cefr_level}. "
f"{CONTROL_PROMPTS.get(self.cefr_level, '')} "
f"Topic of conversation: {self.topic}. "
"Give 1–3 short natural sentences and ALWAYS end with 1 follow-up question. "
"Never mention AI, assistants, grammar explanations, or meta commentary."
)
return base
################################################################
# GENERATION
################################################################
def _generate_lm(self, user_text: str) -> str:
tok, model = load_partner_lm()
messages = [
{"role": "system", "content": self._build_system_prompt()},
{"role": "user", "content": user_text},
]
prompt = tok.apply_chat_template(
messages, tokenize=False, add_generation_prompt=True
)
enc = tok(prompt, return_tensors="pt").to(model.device)
with torch.no_grad():
out = model.generate(
**enc,
max_new_tokens=160,
temperature=0.8,
top_p=0.95,
repetition_penalty=1.15,
do_sample=True,
pad_token_id=tok.eos_token_id,
)
raw = tok.decode(out[0], skip_special_tokens=True)
# Remove echo
cleaned = clean_assistant_reply(raw)
return cleaned
################################################################
# PUBLIC REPLY API
################################################################
def reply(self, user_text: str, input_lang="german"):
self.history.append(ConversationTurn("user", user_text))
assistant_text = self._generate_lm(user_text)
self.history.append(ConversationTurn("assistant", assistant_text))
explanation = self._generate_explanation(assistant_text)
audio_bytes = self.text_to_speech(assistant_text)
return {
"reply_text": assistant_text,
"explanation": explanation,
"audio": audio_bytes,
}
################################################################
# SHORT EXPLANATION
################################################################
def _generate_explanation(self, assistant_text: str) -> str:
tok, model = load_partner_lm()
prompt = (
f"Rewrite the meaning of this {self.target_language} sentence "
f"in ONE short {self.native_language} sentence:\n{assistant_text}"
)
enc = tok(prompt, return_tensors="pt").to(model.device)
with torch.no_grad():
out = model.generate(
**enc,
max_new_tokens=40,
temperature=0.6,
top_p=0.9,
pad_token_id=tok.eos_token_id,
)
decoded = tok.decode(out[0], skip_special_tokens=True)
cleaned = decoded.replace(prompt, "").strip()
# keep only the first sentence
parts = re.split(r"(?<=[.!?])\s+", cleaned)
return parts[0].strip()
################################################################
# TRANSCRIPTION — SINGLE VALID VERSION
################################################################
def transcribe(self, audio_segment, spoken_lang=None):
"""Transcribe using Transformers Whisper."""
pipe = load_whisper_pipe()
audio = np.array(audio_segment.get_array_of_samples()).astype("float32")
audio = audio / max(np.max(np.abs(audio)), 1e-6)
result = pipe(audio)
text = result.get("text", "").strip()
return text, spoken_lang or "unknown", 1.0
################################################################
# TTS — gTTS
################################################################
def text_to_speech(self, text: str) -> Optional[bytes]:
if not text:
return None
try:
lang = GTTS_LANG.get(self.target_language, "en")
tts = gTTS(text=text, lang=lang)
buf = io.BytesIO()
tts.write_to_fp(buf)
return buf.getvalue()
except Exception:
return None
################################################################
# END OF FILE
################################################################