the-emergent-show / yt_chat.py
inventwithdean
add realtime yt chat support via pytchat
c48e85d
raw
history blame
4.82 kB
import os
import pytchat
# from dotenv import load_dotenv
from guard import Guardian
from tv_crew import TVCrew
import requests
import time
# load_dotenv()
unreal_orchestrator_url = os.getenv("ORCHESTRATOR_URL")
api_key_unreal = os.getenv("API_KEY_UNREAL")
API_KEY = os.getenv("YOUTUBE_API_KEY")
VIDEO_ID = "xBVJIJQg1FY"
POLL_INTERVAL = 3 # seconds
chat = pytchat.create(video_id=VIDEO_ID)
class StreamChatHost:
def __init__(self, client, tv_crew: TVCrew, guardian: Guardian):
self.client = client
self.guardian = guardian
self.tv_crew = tv_crew
self.model = "deepseek/deepseek-v3.2-exp"
system_prompt = """You are the Host of a Talk Show, where users come and have a chat.
There is a TV Display near you which the crew members use to display images based on your conversation.
The TV is not in control of the guest. It is controlled by crew members and is only for graphic uplifting of the conversation.
You will be told when TV shows something, otherwise, don't talk about it.
Keep your responses short, like two or three sentences, and be witty."""
self.system_message = {"role": "system", "content": system_prompt}
self.messages = [self.system_message]
self.headers = {"Content-Type": "application/json", "x-api-key": api_key_unreal}
def chat_interaction_loop(self):
"""Checks if the Host is in lobby, then pulls chat messages from the Live Stream. The Host then replies to the message."""
while chat.is_alive():
for c in chat.get().sync_items():
request = requests.get(
f"{unreal_orchestrator_url}/status", headers=self.headers
)
in_lobby = request.json().get("in_lobby", False)
if not in_lobby:
continue
if len(self.messages) > 20:
self.messages = [self.system_message]
self.tv_crew.clear_context()
user_message = f"{c.author.name}]- {c.message}"
# print(user_message)
# print("Checking message safety...")
is_safe, _ = self.guardian.moderate_message(user_message)
if not is_safe:
# print("Message flagged by Guardian, skipping...")
continue
self.tv_crew.add_context(user_message)
# print("TV Crew suggesting image...")
image_base64, caption = self.tv_crew.suggest_image()
if image_base64:
# print("Checking image caption safety...")
is_safe, _ = self.guardian.moderate_message(caption)
if not is_safe:
# print("Image caption flagged by Guardian, skipping image...")
image_base64 = None
caption = None
else:
user_message += f"[Crew Updated Television to: {caption}]\n"
self.messages.append({"role": "user", "content": user_message})
try:
# print("Generating host response...")
response = self.client.responses.create(
model=self.model, input=self.messages
)
host_response = response.output_text
self.messages.append(
{"role": "assistant", "content": host_response}
)
except Exception as e:
# print(f"Error generating host response: {e}")
continue
self.tv_crew.add_context(f"Host: {host_response}\n")
# print(f"Host Response: {host_response}")
# Just to make sure, that we're in the lobby
request = requests.get(
f"{unreal_orchestrator_url}/status", headers=self.headers
)
in_lobby = request.json().get("in_lobby", False)
if not in_lobby:
continue
if image_base64:
# print("Sending Image Unreal Orchestrator...")
payload_tv = {"base64": image_base64}
response = requests.post(
url=f"{unreal_orchestrator_url}/television",
json=payload_tv,
headers=self.headers,
)
# print("Sending Host response to Unreal Orchestrator...")
payload = {"text": host_response, "is_host": True}
response = requests.post(
url=f"{unreal_orchestrator_url}/tts",
json=payload,
headers=self.headers,
)