LLM-kitchen / backend.py
Keeby-smilyai's picture
Update backend.py
42cf83c verified
# backend.py β€” FINAL VERSION
import sqlite3
import threading
import time
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, Dataset
from huggingface_hub import whoami, HfApi, create_repo
from datasets import load_dataset
from transformers import AutoTokenizer
import psutil
import os
import shutil
from werkzeug.security import generate_password_hash, check_password_hash
DB_PATH = "llm_kitchen.db"
training_queue = []
active_runs = set()
active_users = set()
scheduler_lock = threading.Lock()
RUN_TIMEOUT = 48 * 3600
MAX_RAM_PER_RUN_GB = 1.5
# ------------------------------ DATABASE ------------------------------
def init_db():
conn = sqlite3.connect(DB_PATH, check_same_thread=False)
cursor = conn.cursor()
cursor.executescript("""
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT UNIQUE NOT NULL,
password_hash TEXT NOT NULL,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS training_runs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER NOT NULL,
arch_type TEXT NOT NULL,
num_layers INTEGER NOT NULL,
learning_rate REAL NOT NULL,
epochs INTEGER NOT NULL,
batch_size INTEGER NOT NULL,
status TEXT DEFAULT 'queued',
logs TEXT DEFAULT '',
started_at DATETIME,
completed_at DATETIME,
FOREIGN KEY (user_id) REFERENCES users(id)
);
""")
conn.close()
init_db()
def db_query(query, params=()):
conn = sqlite3.connect(DB_PATH, check_same_thread=False)
cursor = conn.cursor()
cursor.execute(query, params)
res = cursor.fetchall()
conn.commit()
last_id = cursor.lastrowid
conn.close()
return res, last_id
def get_user_by_username(username):
rows, _ = db_query("SELECT id, password_hash FROM users WHERE username = ?", (username,))
return rows[0] if rows else None
# ------------------------------ AUTHENTICATION ------------------------------
def signup_user(username, password):
if not username or not password:
return None, "Username and password cannot be empty."
if get_user_by_username(username):
return None, "Username already exists. Please choose another."
password_hash = generate_password_hash(password)
_, user_id = db_query("INSERT INTO users (username, password_hash) VALUES (?, ?)", (username, password_hash))
return user_id, f"Welcome, {username}! Your account is ready. Please log in."
def login_user(username, password):
user = get_user_by_username(username)
if user and check_password_hash(user[1], password):
return user[0], f"Welcome back, {username}!"
return None, "Invalid username or password."
# ------------------------------ TRAINING QUEUE & SCHEDULER ------------------------------
def ram_available():
return (psutil.virtual_memory().available / (1024**3)) >= MAX_RAM_PER_RUN_GB
def queue_training_run(user_id, config):
_, run_id = db_query("INSERT INTO training_runs (user_id, arch_type, num_layers, learning_rate, epochs, batch_size) VALUES (?, ?, ?, ?, ?, ?)", (user_id, config['arch_type'], config['num_layers'], config['learning_rate'], config['epochs'], config['batch_size']))
training_queue.append({"run_id": run_id, "user_id": user_id, **config})
start_training_if_free()
return run_id
def start_training_if_free():
with scheduler_lock:
for job in list(training_queue):
if not ram_available():
log_update("MemoryWarning: Not enough RAM for new runs. Waiting.", -1)
break
if job["user_id"] in active_users:
continue
log_update(f"Scheduler: Starting run #{job['run_id']} for user #{job['user_id']}", -1)
active_runs.add(job["run_id"])
active_users.add(job["user_id"])
training_queue.remove(job)
update_run_status(job["run_id"], "running")
log_update("🍳 Starting kitchen process...", job["run_id"])
thread = threading.Thread(target=run_training_job, args=(job,))
thread.start()
threading.Timer(RUN_TIMEOUT, kill_run_timeout, args=[job]).start()
def kill_run_timeout(job):
run_id, user_id = job["run_id"], job["user_id"]
with scheduler_lock:
if run_id in active_runs:
log_update(f"Run {run_id}: πŸ’₯ 48-HOUR TIMEOUT. Terminating.", run_id)
update_run_status(run_id, "timeout")
active_runs.discard(run_id)
active_users.discard(user_id)
start_training_if_free()
def get_user_runs(user_id):
rows, _ = db_query("SELECT id, arch_type, num_layers, status, started_at FROM training_runs WHERE user_id = ? ORDER BY id DESC", (user_id,))
return rows
def get_run_logs(user_id, run_id):
"""Securely fetches logs by checking ownership (user_id)."""
rows, _ = db_query("SELECT logs, status FROM training_runs WHERE id = ? AND user_id = ?", (run_id, user_id))
return rows[0] if rows else ("", "unknown")
def update_run_status(run_id, status):
if status == 'running':
db_query("UPDATE training_runs SET status = ?, started_at = CURRENT_TIMESTAMP WHERE id = ?", (status, run_id))
elif status in ['completed', 'failed', 'timeout']:
db_query("UPDATE training_runs SET status = ?, completed_at = CURRENT_TIMESTAMP WHERE id = ?", (status, run_id))
else:
db_query("UPDATE training_runs SET status = ? WHERE id = ?", (status, run_id))
def log_update(message, run_id):
timestamp = time.strftime("%H:%M:%S")
full_msg = f"[{timestamp}] {message}"
print(full_msg)
if run_id > 0:
db_query("UPDATE training_runs SET logs = logs || ? || ? WHERE id = ?", ('\n', full_msg, run_id))
# ------------------------------ MODELS & TRAINING ------------------------------
class CNNLanguageModel(nn.Module):
def __init__(self, vocab_size, embed_dim=128, num_layers=4):
super().__init__()
self.embedding = nn.Embedding(vocab_size, embed_dim)
layers, in_ch = [], embed_dim
for _ in range(num_layers):
layers.extend([nn.Conv1d(in_ch, in_ch * 2, kernel_size=3, padding=1), nn.ReLU()])
in_ch *= 2
self.convs, self.fc = nn.Sequential(*layers), nn.Linear(in_ch, vocab_size)
def forward(self, x, labels=None):
x = self.embedding(x).transpose(1, 2)
x = self.convs(x).transpose(1, 2)
logits = self.fc(x)
loss = nn.CrossEntropyLoss()(logits.view(-1, logits.size(-1)), labels.view(-1)) if labels is not None else None
return {"loss": loss, "logits": logits}
class RNNLanguageModel(nn.Module):
def __init__(self, vocab_size, embed_dim=128, hidden_dim=256, num_layers=2):
super().__init__()
self.embedding = nn.Embedding(vocab_size, embed_dim)
self.rnn = nn.LSTM(embed_dim, hidden_dim, num_layers, batch_first=True)
self.fc = nn.Linear(hidden_dim, vocab_size)
def forward(self, x, labels=None):
x = self.embedding(x)
output, _ = self.rnn(x)
logits = self.fc(output)
loss = nn.CrossEntropyLoss()(logits.view(-1, logits.size(-1)), labels.view(-1)) if labels is not None else None
return {"loss": loss, "logits": logits}
class TransformerLanguageModel(nn.Module):
def __init__(self, vocab_size, embed_dim=128, num_heads=4, num_layers=3):
super().__init__()
self.embedding = nn.Embedding(vocab_size, embed_dim)
encoder_layer = nn.TransformerEncoderLayer(d_model=embed_dim, nhead=num_heads, batch_first=True)
self.transformer = nn.TransformerEncoder(encoder_layer, num_layers=num_layers)
self.fc = nn.Linear(embed_dim, vocab_size)
def forward(self, x, labels=None):
x = self.embedding(x)
x = self.transformer(x)
logits = self.fc(x)
loss = nn.CrossEntropyLoss()(logits.view(-1, logits.size(-1)), labels.view(-1)) if labels is not None else None
return {"loss": loss, "logits": logits}
def get_model(arch_type, vocab_size, num_layers):
models = {"cnn": CNNLanguageModel, "rnn": RNNLanguageModel, "transformer": TransformerLanguageModel}
return models[arch_type](vocab_size, num_layers=num_layers)
class TextDataset(Dataset):
def __init__(self, tokenized_data):
self.data = tokenized_data["input_ids"]
def __len__(self):
return len(self.data)
def __getitem__(self, idx):
return {"input_ids": torch.tensor(self.data[idx]), "labels": torch.tensor(self.data[idx])}
def run_training_job(job):
run_id, user_id = job["run_id"], job["user_id"]
try:
device = "cuda" if torch.cuda.is_available() else "cpu"
log_update(f"πŸš€ Device = {device}", run_id)
tokenizer = AutoTokenizer.from_pretrained("gpt2")
tokenizer.pad_token = tokenizer.eos_token
tokenizer_save_path = f"./runs/{run_id}/tokenizer"
os.makedirs(tokenizer_save_path, exist_ok=True)
tokenizer.save_pretrained(tokenizer_save_path)
model = get_model(job["arch_type"], len(tokenizer), job["num_layers"]).to(device)
log_update(f"🧱 Model: {job['arch_type']} x{job['num_layers']} layers", run_id)
dataset = load_dataset("voidful/reasoning_gemini_300k", split="train[:5000]")
tokenized_dataset = dataset.map(lambda ex: tokenizer([q + " " + a for q, a in zip(ex["message"], ex["answer"])], truncation=True, padding="max_length", max_length=128), batched=True, remove_columns=dataset.column_names)
train_loader = DataLoader(TextDataset(tokenized_dataset), batch_size=job["batch_size"], shuffle=True)
optimizer = torch.optim.AdamW(model.parameters(), lr=job["learning_rate"])
model.train()
log_update(f"▢️ Starting training for {job['epochs']} epochs...", run_id)
for epoch in range(job["epochs"]):
for step, batch in enumerate(train_loader):
input_ids = batch["input_ids"].to(device)
labels = batch["labels"].to(device)
optimizer.zero_grad()
outputs = model(input_ids, labels=labels)
loss = outputs["loss"]
loss.backward()
optimizer.step()
if step % 50 == 0:
log_update(f"Epoch {epoch+1} | Step {step} | Loss: {loss.item():.4f}", run_id)
log_update(f"βœ… Epoch {epoch+1} completed.", run_id)
model_path = f"./runs/{run_id}"
os.makedirs(model_path, exist_ok=True)
torch.save(model.state_dict(), f"{model_path}/pytorch_model.bin")
except Exception as e:
log_update(f"πŸ’₯ FAILED - {str(e)}", run_id)
update_run_status(run_id, "failed")
else:
log_update("πŸŽ‰ Cooking complete!", run_id)
update_run_status(run_id, "completed")
finally:
with scheduler_lock:
active_runs.discard(run_id)
active_users.discard(user_id)
start_training_if_free()
def run_inference(run_id, prompt):
model_path = f"./runs/{run_id}/pytorch_model.bin"
tokenizer_path = f"./runs/{run_id}/tokenizer"
if not (os.path.exists(model_path) and os.path.exists(tokenizer_path)):
return "ModelError: Files not found."
tokenizer = AutoTokenizer.from_pretrained(tokenizer_path)
rows, _ = db_query("SELECT arch_type, num_layers FROM training_runs WHERE id = ?", (run_id,))
if not rows:
return "ModelError: Run not found."
arch_type, num_layers = rows[0]
model = get_model(arch_type, len(tokenizer), num_layers)
model.load_state_dict(torch.load(model_path, map_location="cpu"))
model.eval()
inputs = tokenizer(prompt, return_tensors="pt")
input_ids = inputs.input_ids
with torch.no_grad():
outputs = model(input_ids)
logits = outputs["logits"]
generated_ids = torch.argmax(logits, dim=-1)
return f"πŸ§‘β€πŸ³ Model says:\n{tokenizer.decode(generated_ids[0], skip_special_tokens=True)}"
def publish_run_to_hub(run_id, hf_token, repo_name, user_description=""):
try:
user_info = whoami(token=hf_token)
hf_username = user_info['name']
except Exception as e:
raise ValueError(f"Invalid Hugging Face Token. Error: {e}")
final_repo_name = f"{hf_username}/{repo_name}"
local_dir = f"./runs/{run_id}/hub_upload"
shutil.rmtree(local_dir, ignore_errors=True)
os.makedirs(local_dir, exist_ok=True)
shutil.copy(f"./runs/{run_id}/pytorch_model.bin", f"{local_dir}/pytorch_model.bin")
shutil.copytree(f"./runs/{run_id}/tokenizer", f"{local_dir}/tokenizer", dirs_exist_ok=True)
readme_content = user_description.strip() or f"# Model from LLM Kitchen - Run #{run_id}"
with open(f"{local_dir}/README.md", "w") as f:
f.write(readme_content)
api = HfApi()
repo_url = api.create_repo(repo_id=final_repo_name, token=hf_token, exist_ok=True).repo_id
api.upload_folder(folder_path=local_dir, repo_id=repo_url, token=hf_token)
return f"https://huggingface.co/{repo_url}"