Spaces:
Build error
Build error
| # %% | |
| # Import necessary libraries | |
| from moviepy.editor import VideoFileClip | |
| import os | |
| from PIL import Image | |
| import numpy as np | |
| def extract_frames(video, frame_dir, n_samples, start=-1, end=-1): | |
| os.makedirs(frame_dir, exist_ok=True) | |
| if start == -1: | |
| start = 0 | |
| if end == -1: | |
| end = video.duration | |
| duration = end - start | |
| interval = duration / n_samples | |
| for i in range(n_samples): | |
| frame_time = start + i * interval | |
| frame = video.get_frame(frame_time) | |
| frame_image = Image.fromarray(np.uint8(frame)) | |
| frame_path = os.path.join(frame_dir, f"frame_{i+1}.png") | |
| frame_image.save(frame_path) | |
| def extract_video_parts(video, out_dir): | |
| os.makedirs(out_dir, exist_ok=True) | |
| # Extract audio | |
| audio_path = f"{out_dir}/audio.mp3" | |
| video.audio.write_audiofile(audio_path) | |
| # Extract 20 frames from the video | |
| extract_frames(video, f"{out_dir}/frames", 20) | |
| # Extract 20 frames from first 5 seconds | |
| extract_frames(video, f"{out_dir}/5s_frames", 20, start=0, end=5) | |
| # %% | |
| tags = [] | |
| with open("labels.txt", "r") as f: | |
| for line in f: | |
| tags.append(line.strip()) | |
| # %% | |
| from transformers import AutoTokenizer, AutoModel | |
| import torch | |
| import torch.nn.functional as F | |
| # Load the tokenizer and model | |
| tokenizer = AutoTokenizer.from_pretrained('nomic-ai/nomic-embed-text-v1.5') | |
| text_model = AutoModel.from_pretrained('nomic-ai/nomic-embed-text-v1.5', trust_remote_code=True) | |
| text_model.eval() | |
| # Function to get embeddings for tags | |
| def get_tag_embeddings(tags): | |
| encoded_input = tokenizer(tags, padding=True, truncation=True, return_tensors='pt') | |
| with torch.no_grad(): | |
| model_output = text_model(**encoded_input) | |
| text_embeddings = F.normalize(model_output.last_hidden_state[:, 0], p=2, dim=1) | |
| return text_embeddings | |
| tag_embeddings = get_tag_embeddings(tags) | |
| # %% | |
| from transformers import AutoImageProcessor, AutoModel | |
| from PIL import Image | |
| import os | |
| from collections import Counter | |
| processor = AutoImageProcessor.from_pretrained("nomic-ai/nomic-embed-vision-v1.5") | |
| vision_model = AutoModel.from_pretrained("nomic-ai/nomic-embed-vision-v1.5", trust_remote_code=True) | |
| def get_frames(frame_dir): | |
| # Order frames by number but they will have numerical suffixes | |
| found_frames = [frame for frame in os.listdir(frame_dir) if frame.startswith("frame_")] | |
| frame_numbers = [int(frame.split("_")[-1].split(".")[0]) for frame in found_frames] | |
| frames = [Image.open(os.path.join(frame_dir, f"frame_{frame_no}.png")) for frame_no in sorted(frame_numbers)] | |
| return frames | |
| def frames_to_embeddings(frames): | |
| inputs = processor(frames, return_tensors="pt") | |
| img_emb = vision_model(**inputs).last_hidden_state | |
| img_embeddings = F.normalize(img_emb[:, 0], p=2, dim=1) | |
| return img_embeddings | |
| def compute_similarities(img_embeddings, tag_embeddings): | |
| similarities = torch.matmul(img_embeddings, tag_embeddings.T) | |
| return similarities | |
| def get_top_tags(similarities, tags): | |
| top_5_tags = similarities.topk(5).indices.tolist() | |
| return [tags[tag_idx] for tag_idx in top_5_tags] | |
| def analyze_frames(frame_dir, tag_embeddings): | |
| frames = get_frames(frame_dir) | |
| img_embeddings = frames_to_embeddings(frames) | |
| cosine_similarities = compute_similarities(img_embeddings, tag_embeddings) | |
| results = { | |
| "images": [], | |
| "summary": {} | |
| } | |
| summary = Counter() | |
| for i, img in enumerate(frames): | |
| top_5_tags = get_top_tags(cosine_similarities[i], tags) | |
| results["images"].append({"image": img.filename, "tags": top_5_tags}) | |
| summary.update(top_5_tags) | |
| results["summary"]["tags"] = summary | |
| return results | |
| # %% | |
| import openai | |
| def transcribe(audio_path): | |
| client = openai.OpenAI(api_key=os.getenv("OPENAI_API_KEY")) | |
| transcript = client.audio.transcriptions.create(model="whisper-1", file=open(audio_path, "rb")) | |
| return transcript.text | |
| # %% | |
| # Load model directly | |
| from transformers import AutoFeatureExtractor, AutoModelForAudioClassification | |
| audio_extractor = AutoFeatureExtractor.from_pretrained("MIT/ast-finetuned-audioset-10-10-0.4593") | |
| audio_feature_model = AutoModelForAudioClassification.from_pretrained("MIT/ast-finetuned-audioset-10-10-0.4593") | |
| # %% | |
| from pydub import AudioSegment | |
| def extract_audio_features(audio_path): | |
| with open(audio_path, "rb") as file: | |
| audio = file.read() | |
| # Convert to wav | |
| audio = AudioSegment.from_file(audio_path, format="mp3") | |
| audio = audio.get_array_of_samples() | |
| inputs = audio_extractor(audio, return_tensors="pt") | |
| with torch.no_grad(): | |
| outputs = audio_feature_model(**inputs).logits | |
| predicted_class_ids = outputs.topk(3).indices.tolist()[0] | |
| predicted_labels = [audio_feature_model.config.id2label[class_id] for class_id in predicted_class_ids] | |
| return predicted_labels | |
| # %% | |
| import base64 | |
| from io import BytesIO | |
| def base64_encode_image(image): | |
| buffered = BytesIO() | |
| new_width = image.width // 2 | |
| new_height = image.height // 2 | |
| resized_image = image.resize((new_width, new_height), Image.LANCZOS) | |
| resized_image.save(buffered, format="JPEG") | |
| img_str = base64.b64encode(buffered.getvalue()) | |
| return 'data:image/jpeg;base64,' + img_str.decode('utf-8') | |
| def ai_summary(transcript, frames, audio_description, extra_context=""): | |
| client = openai.OpenAI(api_key=os.getenv("OPENAI_API_KEY")) | |
| messages=[ | |
| {"role": "system", "content": "You are social media content analysis bot trying to uncover trends about what makes a video distinct. Given the transcript, frames, and a description of the audio, give a short analysis of the video content and what makes it unique."}, | |
| {"role": "user", | |
| "content": [{ | |
| "type": "text", | |
| "text": f"Transcript: {transcript}\n\n\n\nAudio: {audio_description}\n\nExtra Context?: {extra_context or 'n/a'}", | |
| }, | |
| *[ | |
| { | |
| "type": "image_url", | |
| "image_url": {"url": base64_encode_image(frame)}, | |
| } for frame in frames | |
| ] | |
| ]} | |
| ] | |
| return client.chat.completions.create( | |
| model="gpt-4o", | |
| messages=messages | |
| ) | |
| # %% | |
| import gradio as gr | |
| # %% | |
| import uuid, shutil | |
| import tempfile | |
| def tiktok_analyze(video_path): | |
| results = { | |
| "overview": "", | |
| "ai_overview": "", | |
| "first_5s_analysis": "", | |
| "video_analysis": "", | |
| "transcript": "", | |
| } | |
| video_id = str(uuid.uuid4()) | |
| # copy video path to videos/video_id | |
| path_root = f"{tempfile.gettempdir()}/videos/{video_id}" | |
| os.makedirs(path_root, exist_ok=True) | |
| shutil.copy(video_path, f"{path_root}.mp4") | |
| video = VideoFileClip(f"{path_root}.mp4") | |
| extract_video_parts(video, f"{path_root}_parts") | |
| frames = get_frames(f"{path_root}_parts/frames") | |
| first_5s_analysis = analyze_frames(f"{path_root}_parts/5s_frames", tag_embeddings) | |
| whole_analysis = analyze_frames(f"{path_root}_parts/frames", tag_embeddings) | |
| audio_features = extract_audio_features(f"{path_root}_parts/audio.mp3") | |
| results["transcript"] = transcribe(f"{path_root}_parts/audio.mp3") | |
| ai_summary_response = ai_summary(results["transcript"], frames, audio_features).choices[0].message.content | |
| results["overview"] = f""" | |
| ## Overview | |
| **duration:** {video.duration} | |
| **major themes:** {", ".join(list(whole_analysis["summary"]["tags"])[:5])} | |
| **audio:** {", ".join(audio_features)} | |
| """ | |
| results["ai_overview"] = "# AI Summary\n" + ai_summary_response | |
| results["first_5s_analysis"] = f"Major themes: {', '.join(first_5s_analysis['summary']['tags'])}" | |
| results["video_analysis"] = f"Major themes: {', '.join(whole_analysis['summary']['tags'])}" | |
| return [ | |
| results["overview"], | |
| results["first_5s_analysis"], | |
| results["video_analysis"], | |
| results["ai_overview"], | |
| results["transcript"], | |
| ] | |
| demo = gr.Interface( | |
| title="Tiktok Content Analyzer", | |
| description="Start by uploading a video to analyze.", | |
| fn=tiktok_analyze, | |
| inputs="video", | |
| outputs=[ | |
| gr.Markdown(label="Overview"), | |
| gr.Text(label="First 5s Content Analysis"), | |
| gr.Text(label="Content Analysis"), | |
| gr.Markdown(label="AI Summary"), | |
| gr.Text(label="Transcript")] | |
| ) | |
| demo.launch() | |
| # %% | |