HMP / agents /_not_used /peer_sync.py
GitHub Action
Sync from GitHub with Git LFS
4d94786
raw
history blame
34.1 kB
# agent/peer_sync.py
import socket
import json
import time
import threading
import select
import netifaces
import re
import ipaddress
import asyncio
import dateutil.parser
from datetime import datetime, timezone as UTC
from tools.storage import Storage
storage = Storage()
# ---------------------------
# Конфигурация (будем пересчитывать после bootstrap)
# ---------------------------
my_id = storage.get_config_value("agent_id")
agent_name = storage.get_config_value("agent_name", "unknown")
# placeholders — реальные значения пересчитаются в start_sync()
local_addresses = []
global_addresses = []
all_addresses = []
local_ports = []
print(f"[PeerSync] (init) my_id={my_id} name={agent_name}")
# ---------------------------
# Загрузка bootstrap
# ---------------------------
def load_bootstrap_peers(filename="bootstrap.txt"):
try:
with open(filename, "r", encoding="utf-8") as f:
lines = f.readlines()
except FileNotFoundError:
print(f"[Bootstrap] File {filename} not found")
return
for line in lines:
line = line.strip()
if not line or line.startswith("#"):
continue
# Разделяем строку на ключ:значение по ";"
parts = [p.strip() for p in line.split(";") if p.strip()]
data = {}
for part in parts:
if ":" not in part:
continue
key, val = part.split(":", 1)
key = key.strip().upper()
val = val.strip()
if val.startswith('"') and val.endswith('"'):
val = val[1:-1].replace("\\n", "\n")
data[key] = val
# Проверка обязательных полей
did = data.get("DID")
pubkey = data.get("KEY")
addresses_json = data.get("ADDRESS")
name = data.get("NAME")
if not did or not pubkey or not addresses_json:
print(f"[Bootstrap] Missing required fields in line: {line}")
continue
# Парсим адреса
try:
addresses = json.loads(addresses_json)
except Exception as e:
print(f"[Bootstrap] Failed to parse JSON addresses: {line} ({e})")
continue
# Расширяем any:// в tcp/udp и приводим к формату адресов
expanded_addresses = []
for addr in addresses:
if isinstance(addr, dict):
# старый формат с address/pow → конвертим
if "address" in addr:
addr_str = addr["address"]
if addr_str.startswith("any://"):
hostport = addr_str[len("any://"):]
variants = [f"tcp://{hostport}", f"udp://{hostport}"]
else:
variants = [addr_str]
for v in variants:
expanded_addresses.append({
"addr": v,
"nonce": addr.get("pow", {}).get("nonce"),
"pow_hash": addr.get("pow", {}).get("hash"),
"difficulty": addr.get("pow", {}).get("difficulty"),
"datetime": addr.get("datetime", "")
})
# уже новый формат → оставляем как есть
elif "addr" in addr:
expanded_addresses.append(addr)
elif isinstance(addr, str):
if addr.startswith("any://"):
hostport = addr[len("any://"):]
expanded_addresses.extend([
{"addr": f"tcp://{hostport}", "nonce": None, "pow_hash": None, "difficulty": None, "datetime": ""},
{"addr": f"udp://{hostport}", "nonce": None, "pow_hash": None, "difficulty": None, "datetime": ""}
])
else:
expanded_addresses.append({
"addr": addr,
"nonce": None,
"pow_hash": None,
"difficulty": None,
"datetime": ""
})
# Сохраняем в storage
print(f"[DEBUG] Saving peer {did} with addresses:")
for a in expanded_addresses:
print(a)
storage.add_or_update_peer(
peer_id=did,
name=name,
addresses=expanded_addresses,
source="bootstrap",
status="offline",
pubkey=pubkey,
capabilities=None,
heard_from=None
)
print(f"[Bootstrap] Loaded peer {did} -> {expanded_addresses}")
# ---------------------------
# UDP Discovery
# ---------------------------
def udp_discovery():
import dateutil.parser # для парсинга ISO datetime
DISCOVERY_INTERVAL = 30
try:
# --- Создаём слушающие сокеты один раз (на основе текущих локальных адресов в storage) ---
listen_sockets = []
cfg_local_addresses = storage.get_config_value("local_addresses", [])
print(f"[UDP Discovery] Local addresses (init): {cfg_local_addresses}")
for entry in cfg_local_addresses:
addr_str = entry.get("addr") if isinstance(entry, dict) else entry
if not addr_str:
continue
proto, hostport = addr_str.split("://", 1)
host, port = storage.parse_hostport(hostport)
if not port or proto.lower() != "udp":
continue
# IPv4
if not host.startswith("["):
try:
sock4 = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock4.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock4.bind(("", port))
listen_sockets.append(sock4)
print(f"[UDP Discovery] Listening IPv4 on *:{port}")
except Exception as e:
print(f"[UDP Discovery] IPv4 bind failed on port {port}: {e}")
# IPv6
else:
try:
sock6 = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM)
sock6.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock6.bind(("::", port))
listen_sockets.append(sock6)
print(f"[UDP Discovery] Listening IPv6 on [::]:{port}")
except Exception as e:
print(f"[UDP Discovery] IPv6 bind failed on port {port}: {e}")
except Exception as init_e:
print(f"[UDP Discovery] init error: {init_e}")
return
# --- Основной цикл ---
while True:
try:
agent_pubkey = storage.get_config_value("agent_pubkey")
# Приём входящих пакетов
if listen_sockets:
rlist, _, _ = select.select(listen_sockets, [], [], 0.5)
for sock in rlist:
try:
data, addr = sock.recvfrom(2048)
print(f"[UDP Discovery] RAW from {addr}: {data!r}")
try:
msg = json.loads(data.decode("utf-8"))
except Exception as e:
print(f"[UDP Discovery] JSON decode error from {addr}: {e}")
continue
peer_id = msg.get("id")
if peer_id == my_id:
continue
name = msg.get("name", "unknown")
addresses = msg.get("addresses", []) or []
valid_addresses = []
for a in addresses:
addr_str = a.get("addr")
nonce = a.get("nonce")
pow_hash = a.get("pow_hash")
difficulty = a.get("difficulty")
dt = a.get("datetime")
pubkey = a.get("pubkey")
if not addr_str:
continue
# нормализуем адрес (везде используем единый формат)
addr_norm = storage.normalize_address(addr_str)
if not addr_norm:
print(f"[UDP Discovery] Can't normalize addr {addr_str}, skip")
continue
# Проверка PoW — только если есть все поля
if nonce is not None and pow_hash and difficulty is not None:
if not pubkey:
print(f"[UDP Discovery] Peer {peer_id} addr {addr_norm} missing pubkey, skip PoW check")
continue
ok = storage.verify_pow(peer_id, pubkey, addr_norm, nonce, pow_hash, dt, difficulty)
print(f"[UDP Discovery] Verify PoW for {addr_norm} = {ok}")
if not ok:
continue
# Проверка datetime
existing = storage.get_peer_address(peer_id, addr_norm)
try:
existing_dt = dateutil.parser.isoparse(existing.get("datetime")) if existing else None
dt_obj = dateutil.parser.isoparse(dt) if dt else None
except Exception as e:
print(f"[UDP Discovery] datetime parse error: {e}")
continue
if existing_dt and dt_obj and existing_dt >= dt_obj:
print(f"[UDP Discovery] Skip {addr_norm}: old datetime {dt}")
continue
# if all checks OK, ensure we keep canonical form in the stored dict
a_copy = dict(a)
a_copy["addr"] = addr_norm
valid_addresses.append(a_copy)
if valid_addresses:
storage.add_or_update_peer(
peer_id=peer_id,
name=name,
addresses=valid_addresses,
source="discovery",
status="online"
)
print(f"[UDP Discovery] Accepted peer {peer_id} ({addr}), {len(valid_addresses)} addresses")
except Exception as e:
print(f"[UDP Discovery] receive error: {e}")
# --- Отправка broadcast/multicast ---
# берем текущие локальные адреса из storage (актуально)
cfg_local_addresses = storage.get_config_value("local_addresses", [])
valid_local_addresses = []
for a in cfg_local_addresses:
addr_str = a.get("addr") if isinstance(a, dict) else a
nonce = a.get("nonce") if isinstance(a, dict) else None
pow_hash = a.get("pow_hash") if isinstance(a, dict) else None
difficulty = a.get("difficulty") if isinstance(a, dict) else None
dt = a.get("datetime") if isinstance(a, dict) else None
# prefer explicit pubkey per-address, otherwise agent_pubkey
addr_pubkey = a.get("pubkey") if isinstance(a, dict) else None
pubkey_used = addr_pubkey or agent_pubkey
if not addr_str:
continue
addr_norm = storage.normalize_address(addr_str)
if not addr_norm:
print(f"[UDP Discovery] Can't normalize local addr {addr_str}, skip")
continue
# Проверка PoW только если есть необходимые поля
if nonce is not None and pow_hash and difficulty is not None:
if not pubkey_used:
# если у агента нет pubkey в конфигах, не делаем жёсткую проверку PoW,
# потому что невозможно подтвердить — логируем и пропускаем проверку
print(f"[UDP Discovery] No pubkey for self addr {addr_norm}, skipping PoW self-check (will broadcast anyway)")
ok = True
else:
ok = storage.verify_pow(my_id, pubkey_used, addr_norm, nonce, pow_hash, dt, difficulty)
print(f"[UDP Discovery] Self-check PoW for {addr_norm} = {ok}")
if not ok:
print(f"[UDP Discovery] Self addr {addr_norm} failed PoW, skip")
continue
# attach pubkey for broadcast so receivers can verify
a_copy = dict(a) if isinstance(a, dict) else {"addr": addr_str}
a_copy["addr"] = addr_norm
if "pubkey" not in a_copy and agent_pubkey:
a_copy["pubkey"] = agent_pubkey
valid_local_addresses.append(a_copy)
msg_data = json.dumps({
"id": my_id,
"name": agent_name,
"addresses": valid_local_addresses
}).encode("utf-8")
print(f"[UDP Discovery] Broadcasting: {msg_data}")
for entry in valid_local_addresses:
addr_str = entry.get("addr")
proto, hostport = addr_str.split("://", 1)
host, port = storage.parse_hostport(hostport)
if not port or proto.lower() != "udp":
continue
# IPv4 broadcast
if not host.startswith("["):
for iface in netifaces.interfaces():
addrs = netifaces.ifaddresses(iface).get(netifaces.AF_INET, [])
for a in addrs:
if "broadcast" in a:
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
print(f"[UDP Discovery] Sending broadcast -> {a['broadcast']}:{port}")
sock.sendto(msg_data, (a["broadcast"], port))
sock.close()
except Exception as e:
print(f"[UDP Discovery] Broadcast error {a['broadcast']}:{port}: {e}")
# IPv6 multicast ff02::1
else:
for iface in netifaces.interfaces():
ifaddrs = netifaces.ifaddresses(iface).get(netifaces.AF_INET6, [])
for a in ifaddrs:
addr_ipv6 = a.get("addr")
if not addr_ipv6:
continue
multicast_addr = f"ff02::1%{iface}" if addr_ipv6.startswith("fe80:") else "ff02::1"
try:
sock6 = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM)
sock6.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_MULTICAST_IF, socket.if_nametoindex(iface))
sock6.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_MULTICAST_HOPS, 1)
print(f"[UDP Discovery] Sending multicast -> {multicast_addr}:{port}")
sock6.sendto(msg_data, (multicast_addr, port))
sock6.close()
except Exception as e:
print(f"[UDP Discovery] Multicast error {multicast_addr}:{port}: {e}")
time.sleep(DISCOVERY_INTERVAL)
except Exception as main_e:
print(f"[UDP Discovery] main loop error: {main_e}")
time.sleep(DISCOVERY_INTERVAL)
# ---------------------------
# TCP Peer Exchange (исходящие)
# ---------------------------
def tcp_peer_exchange():
import dateutil.parser # для корректного парсинга ISO datetime
PEER_EXCHANGE_INTERVAL = 20 # секунды для отладки
while True:
# получаем свежий список пиров из БД
peers = storage.get_known_peers(my_id, limit=50)
print(f"[PeerExchange] Checking {len(peers)} peers (raw DB)...")
for peer in peers:
peer_id = peer["id"] if isinstance(peer, dict) else peer[0]
addresses_json = peer["addresses"] if isinstance(peer, dict) else peer[1]
if peer_id == my_id:
continue
try:
addr_list = json.loads(addresses_json)
except Exception as e:
print(f"[PeerExchange] JSON decode error for peer {peer_id}: {e}")
addr_list = []
for addr_entry in addr_list:
addr_str = addr_entry.get("addr")
nonce = addr_entry.get("nonce")
pow_hash = addr_entry.get("pow_hash")
difficulty = addr_entry.get("difficulty")
dt = addr_entry.get("datetime")
pubkey = addr_entry.get("pubkey")
# нормализация адреса (включая скобки IPv6 и т.п.)
norm = storage.normalize_address(addr_str)
if not norm:
continue
# Проверка PoW
if nonce is not None and pow_hash and difficulty is not None:
if not pubkey:
print(f"[PeerExchange] Peer {peer_id} addr {norm} missing pubkey, skip PoW check")
continue
ok = storage.verify_pow(peer_id, pubkey, norm, nonce, pow_hash, dt, difficulty)
print(f"[PeerExchange] Verify PoW for {peer_id}@{norm} = {ok}")
if not ok:
continue
# Проверка datetime с использованием dateutil
existing = storage.get_peer_address(peer_id, norm)
try:
existing_dt = dateutil.parser.isoparse(existing.get("datetime")) if existing else None
dt_obj = dateutil.parser.isoparse(dt) if dt else None
except Exception as e:
print(f"[PeerExchange] datetime parse error for {norm}: {e}")
continue
if existing_dt and dt_obj and existing_dt >= dt_obj:
print(f"[PeerExchange] Skip {norm}: old datetime {dt}")
continue
# Парсим host и port
proto, hostport = norm.split("://", 1)
if proto not in ["tcp", "any"]:
continue
host, port = storage.parse_hostport(hostport)
if not host or not port:
continue
print(f"[PeerExchange] Trying {peer_id} at {host}:{port} (proto={proto})")
try:
# IPv6 link-local
if storage.is_ipv6(host) and host.startswith("fe80:"):
scope_id = storage.get_ipv6_scope(host)
if scope_id is None:
print(f"[PeerExchange] Skipping {host}, no scope_id")
continue
sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
sock.settimeout(3)
sock.connect((host, port, 0, scope_id))
else:
sock = socket.socket(socket.AF_INET6 if storage.is_ipv6(host) else socket.AF_INET,
socket.SOCK_STREAM)
sock.settimeout(3)
sock.connect((host, port))
# Получаем актуальные адреса на момент отправки (не использовать stale all_addresses)
cfg_local_addresses = storage.get_config_value("local_addresses", [])
cfg_global_addresses = storage.get_config_value("global_addresses", [])
cur_all_addresses = cfg_local_addresses + cfg_global_addresses
# Отправка handshake — фильтр адресов для public/private
if storage.is_private(host):
send_addresses = cur_all_addresses
else:
# фильтруем только публичные
send_addresses = []
for a in cur_all_addresses:
a_addr = a.get("addr") if isinstance(a, dict) else a
try:
host_only = storage.parse_hostport(a_addr.split("://", 1)[1])[0]
if is_public(host_only):
send_addresses.append(a)
except Exception:
continue
handshake = {
"type": "PEER_EXCHANGE_REQUEST",
"id": my_id,
"name": agent_name,
"addresses": send_addresses,
}
raw_handshake = json.dumps(handshake).encode("utf-8")
print(f"[PeerExchange] Sending handshake -> {host}:{port}: {raw_handshake}")
sock.sendall(raw_handshake)
# Читаем ответ
data = sock.recv(64 * 1024)
sock.close()
if not data:
print(f"[PeerExchange] No data from {host}:{port}")
continue
print(f"[PeerExchange] RAW recv from {host}:{port}: {data!r}")
try:
peers_recv = json.loads(data.decode("utf-8"))
print(f"[PeerExchange] Parsed recv from {host}:{port}: {peers_recv}")
for p in peers_recv:
new_addrs = []
for a in p.get("addresses", []):
try:
existing_addr = storage.get_peer_address(p["id"], a.get("addr"))
existing_dt = dateutil.parser.isoparse(existing_addr.get("datetime")) if existing_addr else None
dt_obj = dateutil.parser.isoparse(a.get("datetime")) if a.get("datetime") else None
if existing_addr is None or (existing_dt and dt_obj and existing_dt < dt_obj) or existing_dt is None:
new_addrs.append(a)
else:
print(f"[PeerExchange] Ignored old {a.get('addr')} from {p['id']}")
except Exception as e:
print(f"[PeerExchange] Error parsing datetime for {a.get('addr')}: {e}")
continue
if new_addrs:
storage.add_or_update_peer(
p["id"],
p.get("name", "unknown"),
new_addrs,
source="peer_exchange",
status="online"
)
print(f"[PeerExchange] Stored {len(new_addrs)} new addrs for peer {p['id']}")
print(f"[PeerExchange] Received {len(peers_recv)} peers from {host}:{port}")
except Exception as e:
print(f"[PeerExchange] Decode error from {host}:{port}: {e}")
continue
break
except Exception as e:
print(f"[PeerExchange] Connection to {host}:{port} failed: {e}")
continue
time.sleep(PEER_EXCHANGE_INTERVAL)
# ---------------------------
# TCP Listener (входящие)
# ---------------------------
def tcp_listener():
# получаем локальные порты в момент старта listener'а
listen_sockets = []
cfg_local_ports = storage.get_local_ports()
print(f"[TCP Listener] binding to local ports: {cfg_local_ports}")
for port in cfg_local_ports:
for family, addr_str in [(socket.AF_INET, ""), (socket.AF_INET6, "::")]:
try:
sock = socket.socket(family, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind((addr_str, port))
sock.listen(5)
listen_sockets.append(sock)
proto_str = "IPv6" if family == socket.AF_INET6 else "IPv4"
print(f"[TCP Listener] Listening {proto_str} on {addr_str}:{port}")
except Exception as e:
print(f"[TCP Listener] {proto_str} bind failed on port {port}: {e}")
while True:
if not listen_sockets:
time.sleep(1)
continue
rlist, _, _ = select.select(listen_sockets, [], [], 1)
for s in rlist:
try:
conn, addr = s.accept()
data = conn.recv(64 * 1024)
if not data:
print(f"[TCP Listener] Empty data from {addr}, closing")
conn.close()
continue
print(f"[TCP Listener] RAW recv from {addr}: {data!r}")
try:
msg = json.loads(data.decode("utf-8"))
print(f"[TCP Listener] Decoded JSON from {addr}: {msg}")
except Exception as e:
print(f"[TCP Listener] JSON decode error from {addr}: {e}")
conn.close()
continue
if msg.get("type") == "PEER_EXCHANGE_REQUEST":
peer_id = msg.get("id") or f"did:hmp:{addr[0]}:{addr[1]}"
peer_name = msg.get("name", "unknown")
peer_addrs = msg.get("addresses", []) or []
valid_addrs = []
for a in peer_addrs:
addr_value = a.get("addr")
nonce = a.get("nonce")
pow_hash = a.get("pow_hash")
difficulty = a.get("difficulty")
dt = a.get("datetime")
pubkey = a.get("pubkey")
if not addr_value:
print(f"[TCP Listener] Skip addr (no addr field): {a}")
continue
addr_norm = storage.normalize_address(addr_value)
if not addr_norm:
print(f"[TCP Listener] Can't normalize {addr_value}, skip")
continue
if nonce is None or not pow_hash or not pubkey:
print(f"[TCP Listener] Skip addr (incomplete): {a}")
continue
ok = storage.verify_pow(peer_id, pubkey, addr_norm, nonce, pow_hash, dt, difficulty)
print(f"[TCP Listener] Verify PoW for {addr_norm} = {ok}")
if not ok:
continue
existing = storage.get_peer_address(peer_id, addr_norm)
try:
existing_dt = dateutil.parser.isoparse(existing.get("datetime")) if existing else None
dt_obj = dateutil.parser.isoparse(dt) if dt else None
except Exception as e:
print(f"[TCP Listener] datetime parse error for {addr_norm}: {e}")
continue
if existing_dt and dt_obj and existing_dt >= dt_obj:
print(f"[TCP Listener] Skip old addr {addr_norm} (dt={dt})")
continue
a_copy = dict(a)
a_copy["addr"] = addr_norm
valid_addrs.append(a_copy)
if valid_addrs:
storage.add_or_update_peer(
peer_id=peer_id,
name=peer_name,
addresses=valid_addrs,
source="incoming",
status="online"
)
print(f"[TCP Listener] Stored {len(valid_addrs)} addrs for peer {peer_id}")
else:
print(f"[TCP Listener] No valid addrs from {peer_id}")
print(f"[TCP Listener] Handshake from {peer_id} ({addr}) -> name={peer_name}")
# Готовим список пиров для ответа
is_lan = storage.is_private(addr[0])
peers_list = []
for peer in storage.get_known_peers(my_id, limit=50):
peer_id_local = peer["id"]
try:
addresses = json.loads(peer["addresses"])
except:
addresses = []
updated_addresses = []
for a in addresses:
try:
proto, hostport = a["addr"].split("://", 1)
host, port = storage.parse_hostport(hostport)
if not host or not port:
continue
if not is_lan and not is_public(host):
continue
if storage.is_ipv6(host) and host.startswith("fe80:"):
scope_id = storage.get_ipv6_scope(host)
if scope_id:
host = f"{host}%{scope_id}"
updated_addresses.append({
"addr": f"{proto}://{host}:{port}"
})
except Exception:
continue
peers_list.append({
"id": peer_id_local,
"addresses": updated_addresses
})
print(f"[TCP Listener] Sending {len(peers_list)} peers back to {peer_id}")
conn.sendall(json.dumps(peers_list).encode("utf-8"))
conn.close()
except Exception as e:
print(f"[TCP Listener] Connection handling error: {e}")
# ---------------------------
# Запуск потоков
# ---------------------------
def start_sync(bootstrap_file="bootstrap.txt"):
# 1) загрузка bootstrap
load_bootstrap_peers(bootstrap_file)
# 2) пересчитать локальные config-derived переменные (теперь bootstrap и config загружены)
global local_addresses, global_addresses, all_addresses, local_ports
local_addresses = storage.get_config_value("local_addresses", [])
global_addresses = storage.get_config_value("global_addresses", [])
all_addresses = local_addresses + global_addresses
local_ports = storage.get_local_ports()
print(f"[PeerSync] Local ports (after bootstrap): {local_ports}")
print(f"[PeerSync] Local addresses (after bootstrap): {local_addresses}")
# 3) добавить себя в таблицу peers (чтобы pubkey, адреса были в БД и др. части кода могли их читать)
agent_pubkey = storage.get_config_value("agent_pubkey")
# подготавливаем адреса в том же формате, который add_or_update_peer ожидает
self_addrs = []
for a in all_addresses:
if isinstance(a, dict):
self_addrs.append(a)
else:
self_addrs.append({"addr": a, "nonce": None, "pow_hash": None, "difficulty": None, "datetime": ""})
try:
storage.add_or_update_peer(
peer_id=my_id,
name=agent_name,
addresses=self_addrs,
source="self",
status="online",
pubkey=agent_pubkey,
capabilities=None,
heard_from=None
)
print(f"[PeerSync] Registered self {my_id} in agent_peers (pubkey present: {bool(agent_pubkey)})")
except Exception as e:
print(f"[PeerSync] Failed to register self in agent_peers: {e}")
# 4) старт потоков
threading.Thread(target=udp_discovery, daemon=True).start()
threading.Thread(target=tcp_peer_exchange, daemon=True).start()
threading.Thread(target=tcp_listener, daemon=True).start()