sputnik-view / app.py
Serg4451D's picture
Create app.py
175b9c8 verified
import os
import re
import io
import json
import math
import datetime as dt
from typing import Optional, Tuple, Dict
import requests
import numpy as np
import cv2
from mgrs import MGRS
from PIL import Image
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
import gradio as gr
# --------------------------
# Константы и хелперы
# --------------------------
S2_BASE = "https://sentinel-s2-l1c.s3.amazonaws.com/tiles"
UA = {"User-Agent": "s2-ndvi-gradio-tool/1.0 (+https://example.com)"}
# Sentinel-2 тайлы 10м — 10980 px (около 109.8 км), 100 км поле внутри с ~490 px отступом
TILE_SIZE_10M = 10980
GRID_100KM_M = 100_000
MARGIN_PX_10M = (TILE_SIZE_10M - GRID_100KM_M // 10) // 2 # ~490
# --------------------------
# Геокодирование / парсинг
# --------------------------
def parse_latlon(text: str) -> Optional[Tuple[float, float]]:
if not text:
return None
m = re.match(r"^\s*([+-]?\d+(?:\.\d+)?)\s*[,; ]\s*([+-]?\d+(?:\.\d+)?)\s*$", text)
if not m:
return None
lat = float(m.group(1))
lon = float(m.group(2))
if not (-90 <= lat <= 90 and -180 <= lon <= 180):
return None
return lat, lon
def geocode_nominatim(query: str, lang: str = "ru") -> Optional[Tuple[float, float, str]]:
url = "https://nominatim.openstreetmap.org/search"
params = {"q": query, "format": "jsonv2", "limit": 1, "accept-language": lang}
try:
r = requests.get(url, params=params, headers=UA, timeout=20)
if r.status_code != 200:
return None
data = r.json()
if not data:
return None
lat = float(data[0]["lat"])
lon = float(data[0]["lon"])
display = data[0].get("display_name", query)
return lat, lon, display
except Exception:
return None
# --------------------------
# MGRS / индексация тайла
# --------------------------
def latlon_to_mgrs(lat: float, lon: float) -> Dict[str, any]:
"""
Возвращает словарь с полями:
- zone (int), band (str), square (str) — компоненты пути S3
- easting_m, northing_m — координаты внутри 100 км квадрата (метры)
- mgrs_str — полный MGRS (с 5+5 цифрами)
"""
m = MGRS()
s = m.toMGRS(lat, lon, MGRSPrecision=5) # Пример: '31UDQ4825111980'
# Найдём границу цифр/букв
i = 0
while i < len(s) and s[i].isdigit():
i += 1
zone = int(s[:i])
band = s[i]
square = s[i + 1:i + 3]
digits = s[i + 3:]
e_str = digits[:5]
n_str = digits[5:10]
easting_m = int(e_str)
northing_m = int(n_str)
return {
"zone": zone,
"band": band,
"square": square,
"easting_m": easting_m,
"northing_m": northing_m,
"mgrs_str": s
}
def build_s2_tile_prefix(zone: int, band: str, square: str, d: dt.date, seq: int) -> str:
# В AWS путь вида: /tiles/{utm_zone}/{lat_band}/{grid_square}/{YYYY}/{M}/{D}/{sequence}/
return f"{S2_BASE}/{zone}/{band}/{square}/{d.year}/{d.month}/{d.day}/{seq}"
def url_band(zone: int, band: str, square: str, d: dt.date, seq: int, band_name: str) -> str:
# band_name: 'B04', 'B08', 'TCI', 'QA60' и т.д.
return f"{build_s2_tile_prefix(zone, band, square, d, seq)}/{band_name}.jp2"
def url_preview(zone: int, band: str, square: str, d: dt.date, seq: int) -> str:
return f"{build_s2_tile_prefix(zone, band, square, d, seq)}/preview.jpg"
def head_exists(url: str) -> bool:
try:
r = requests.head(url, headers=UA, timeout=15)
return r.status_code == 200
except Exception:
return False
def find_sequence_with_data(zone: int, band: str, square: str, d: dt.date, test_band="B04") -> Optional[int]:
# Пробуем seq 0..3 пока найдём объект
for seq in range(4):
if head_exists(url_band(zone, band, square, d, seq, test_band)):
return seq
return None
# --------------------------
# Загрузка и декодирование JP2
# --------------------------
def http_get(url: str) -> Optional[bytes]:
try:
r = requests.get(url, headers=UA, timeout=60)
if r.status_code != 200:
return None
return r.content
except Exception:
return None
def decode_jp2_to_np(jp2_bytes: bytes, flags=cv2.IMREAD_UNCHANGED):
# Возвращает numpy массив (H, W) или (H, W, C); 16-bit для каналов, 8/16-bit для TCI
arr = np.frombuffer(jp2_bytes, dtype=np.uint8)
img = cv2.imdecode(arr, flags)
return img
# --------------------------
# Геометрия ROI внутри тайла
# --------------------------
def pixel_from_mgrs(easting_m: int, northing_m: int) -> Tuple[int, int]:
"""
Перевод положения внутри 100 км квадрата (метры от юго-западного угла)
в пиксели 10 м данных тайла (с учётом ~490 px отступа).
Возвращает (col_x, row_y).
"""
x_px = MARGIN_PX_10M + int(round(easting_m / 10.0))
# y: сверху 0, внутри квадрата от южной границы -> переворачиваем
y_px = MARGIN_PX_10M + int(round((GRID_100KM_M - northing_m) / 10.0))
# страхуемся в пределах допустимого
x_px = max(0, min(TILE_SIZE_10M - 1, x_px))
y_px = max(0, min(TILE_SIZE_10M - 1, y_px))
return x_px, y_px
def crop_roi(img: np.ndarray, cx: int, cy: int, rad_px: int) -> np.ndarray:
h, w = img.shape[:2]
x0 = max(0, cx - rad_px)
x1 = min(w, cx + rad_px)
y0 = max(0, cy - rad_px)
y1 = min(h, cy + rad_px)
return img[y0:y1, x0:x1].copy(), (x0, y0, x1, y1)
# --------------------------
# NDVI и отрисовка
# --------------------------
def compute_ndvi(b08_10m: np.ndarray, b04_10m: np.ndarray, qa60_10m: Optional[np.ndarray] = None) -> Tuple[np.ndarray, Optional[np.ndarray]]:
b08 = b08_10m.astype(np.float32)
b04 = b04_10m.astype(np.float32)
denom = (b08 + b04)
ndvi = np.where(denom > 0, (b08 - b04) / denom, np.nan)
cloud_mask = None
if qa60_10m is not None:
# QA60: биты 10 (opaque clouds) и 11 (cirrus)
qa = qa60_10m.astype(np.uint16)
clouds = (((qa >> 10) & 1) | ((qa >> 11) & 1)) > 0
ndvi = np.where(clouds, np.nan, ndvi)
cloud_mask = clouds
return ndvi, cloud_mask
def colorize_ndvi(ndvi: np.ndarray, cloud_mask: Optional[np.ndarray] = None,
vmin: float = -0.2, vmax: float = 0.9) -> np.ndarray:
x = ndvi.copy()
# Отдельно отметим NaN (облака/нет данных)
nan_mask = np.isnan(x)
x = np.clip(x, vmin, vmax)
norm = (x - vmin) / (vmax - vmin + 1e-9)
cmap = plt.get_cmap("RdYlGn") # красный->желтый->зеленый
rgb = (cmap(norm)[..., :3] * 255).astype(np.uint8)
# Заштрихуем NaN серым
if cloud_mask is not None:
nan_mask = nan_mask | cloud_mask
rgb[nan_mask] = np.array([180, 180, 180], dtype=np.uint8)
return rgb
def stretch_16_to_8(img: np.ndarray, p_low=2, p_high=98) -> np.ndarray:
# Клип и линейная растяжка для лучшего контраста
if img.ndim == 2:
lo, hi = np.percentile(img[img > 0], [p_low, p_high]) if np.any(img > 0) else (0, 1)
hi = max(hi, lo + 1)
out = (np.clip(img, lo, hi) - lo) / (hi - lo + 1e-6)
return (out * 255).astype(np.uint8)
else:
out = np.zeros_like(img, dtype=np.uint8)
for c in range(img.shape[2]):
chan = img[..., c]
if np.any(chan > 0):
lo, hi = np.percentile(chan[chan > 0], [p_low, p_high])
hi = max(hi, lo + 1)
out[..., c] = ((np.clip(chan, lo, hi) - lo) / (hi - lo + 1e-6) * 255).astype(np.uint8)
return out
# --------------------------
# Основная логика анализа
# --------------------------
def analyze_vegetation(
address_or_coords: str,
date_str: str,
radius_m: int,
) -> Tuple[np.ndarray, np.ndarray, str]:
# 1) Получаем координаты
latlon = parse_latlon(address_or_coords.strip())
resolved_label = None
if latlon is None:
geo = geocode_nominatim(address_or_coords.strip())
if geo is None:
raise gr.Error("Не удалось геокодировать адрес или распознать координаты. Попробуйте формат '55.75, 37.61' или уточните адрес.")
lat, lon, resolved_label = geo
else:
lat, lon = latlon
resolved_label = f"Координаты: {lat:.6f}, {lon:.6f}"
# 2) MGRS тайл + положение
tile = latlon_to_mgrs(lat, lon)
zone = tile["zone"]
band = tile["band"]
square = tile["square"]
easting_m = tile["easting_m"]
northing_m = tile["northing_m"]
mgrs_str = tile["mgrs_str"]
cx, cy = pixel_from_mgrs(easting_m, northing_m)
rad_px = max(5, int(radius_m / 10)) # 10 м/пикс
# 3) Дата и последовательность
if date_str:
try:
d = dt.datetime.strptime(date_str.strip(), "%Y-%m-%d").date()
except Exception:
raise gr.Error("Дата должна быть в формате ГГГГ-ММ-ДД (например, 2024-07-15).")
else:
d = dt.date.today()
seq = find_sequence_with_data(zone, band, square, d, test_band="B04")
if seq is None:
# Попробуем отмотать назад до 10 дней
found = False
for back in range(1, 11):
dd = d - dt.timedelta(days=back)
seq_try = find_sequence_with_data(zone, band, square, dd, test_band="B04")
if seq_try is not None:
d = dd
seq = seq_try
found = True
break
if not found:
raise gr.Error("Не нашёл подходящих сцен Sentinel-2 для этой локации и последних 10 дней.")
# 4) Скачиваем каналы: B04 (red), B08 (nir), QA60 (облака), TCI (true color) либо B02/B03/B04
url_b04 = url_band(zone, band, square, d, seq, "B04")
url_b08 = url_band(zone, band, square, d, seq, "B08")
url_qa60 = url_band(zone, band, square, d, seq, "QA60")
url_tci = url_band(zone, band, square, d, seq, "TCI")
b04_bytes = http_get(url_b04)
b08_bytes = http_get(url_b08)
if b04_bytes is None or b08_bytes is None:
raise gr.Error("Не удалось скачать каналы B04/B08. Попробуйте другую дату.")
qa60_bytes = http_get(url_qa60) # может быть None
tci_bytes = http_get(url_tci) # может быть None
# 5) Декодирование
b04 = decode_jp2_to_np(b04_bytes, cv2.IMREAD_UNCHANGED) # (10980, 10980), uint16
b08 = decode_jp2_to_np(b08_bytes, cv2.IMREAD_UNCHANGED)
if b04 is None or b08 is None or b04.shape != b08.shape:
raise gr.Error("Ошибка чтения JP2 или несовпадение размеров каналов B04/B08.")
# QA60 обычно 60м (1830x1830) — апсемплим до 10м
qa10m = None
cloud_pct_roi = None
if qa60_bytes is not None:
qa = decode_jp2_to_np(qa60_bytes, cv2.IMREAD_UNCHANGED)
if qa is not None:
qa10m = cv2.resize(qa, (TILE_SIZE_10M, TILE_SIZE_10M), interpolation=cv2.INTER_NEAREST)
# 6) Кадрируем ROI
b04_roi, (x0, y0, x1, y1) = crop_roi(b04, cx, cy, rad_px)
b08_roi, _ = crop_roi(b08, cx, cy, rad_px)
qa_roi = None
if qa10m is not None:
qa_roi, _ = crop_roi(qa10m, cx, cy, rad_px)
# 7) NDVI + маска облаков
ndvi_roi, cloud_mask = compute_ndvi(b08_roi, b04_roi, qa_roi)
# 8) Статистика
valid = ~np.isnan(ndvi_roi)
if np.any(valid):
mean_ndvi = float(np.nanmean(ndvi_roi))
med_ndvi = float(np.nanmedian(ndvi_roi))
pct_veg_good = float(np.nanmean(ndvi_roi > 0.5) * 100.0)
pct_veg_med = float(np.nanmean((ndvi_roi > 0.3) & (ndvi_roi <= 0.5)) * 100.0)
else:
mean_ndvi = float("nan")
med_ndvi = float("nan")
pct_veg_good = 0.0
pct_veg_med = 0.0
if cloud_mask is not None:
cloud_pct_roi = float(np.mean(cloud_mask) * 100.0)
else:
cloud_pct_roi = None
# 9) Рендер NDVI
ndvi_rgb = colorize_ndvi(ndvi_roi, cloud_mask)
# 10) True color ROI
# a) Сначала пытаемся TCI
tci_roi_rgb = None
if tci_bytes is not None:
tci = decode_jp2_to_np(tci_bytes, cv2.IMREAD_UNCHANGED)
if tci is not None:
if tci.ndim == 3:
# cv2 читает BGR — переведём в RGB
if tci.shape[2] == 3:
tci = cv2.cvtColor(tci, cv2.COLOR_BGR2RGB)
tci_roi, _ = crop_roi(tci, cx, cy, rad_px)
tci_roi_rgb = stretch_16_to_8(tci_roi)
else:
# бывает моно — не используем
tci_roi_rgb = None
# b) Если TCI нет — соберём из B02/B03/B04
if tci_roi_rgb is None:
url_b02 = url_band(zone, band, square, d, seq, "B02")
url_b03 = url_band(zone, band, square, d, seq, "B03")
b02_bytes = http_get(url_b02)
b03_bytes = http_get(url_b03)
if b02_bytes is not None and b03_bytes is not None:
b02 = decode_jp2_to_np(b02_bytes, cv2.IMREAD_UNCHANGED)
b03 = decode_jp2_to_np(b03_bytes, cv2.IMREAD_UNCHANGED)
if b02 is not None and b03 is not None and b02.shape == b03.shape == b04.shape:
b02_roi, _ = crop_roi(b02, cx, cy, rad_px)
b03_roi, _ = crop_roi(b03, cx, cy, rad_px)
rgb16 = np.dstack([b04_roi, b03_roi, b02_roi])
tci_roi_rgb = stretch_16_to_8(rgb16)
# если не получилось — сделаем псевдоцвет из B08/B04
if tci_roi_rgb is None:
pseudo = np.dstack([b08_roi, b04_roi, b04_roi])
tci_roi_rgb = stretch_16_to_8(pseudo)
# 11) Текстовый вывод/диагностика
url_used_preview = url_preview(zone, band, square, d, seq)
info_lines = []
info_lines.append(f"Адрес/точка: {resolved_label}")
info_lines.append(f"MGRS тайл: {mgrs_str} (zone={zone}, band={band}, square={square})")
info_lines.append(f"Дата съёмки: {d.isoformat()} (seq={seq})")
if cloud_pct_roi is not None:
info_lines.append(f"Облака в ROI: {cloud_pct_roi:.1f}%")
if not np.isnan(mean_ndvi):
info_lines.append(f"Средний NDVI (ROI): {mean_ndvi:.3f}")
info_lines.append(f"Медианный NDVI (ROI): {med_ndvi:.3f}")
info_lines.append(f"Доля пикселей NDVI>0.5: {pct_veg_good:.1f}%")
info_lines.append(f"Доля пикселей 0.3<NDVI≤0.5: {pct_veg_med:.1f}%")
# Грубая классификация
if mean_ndvi < 0.25:
concl = "Низкая растительность/стресс или открытая почва."
elif mean_ndvi < 0.45:
concl = "Умеренная растительность."
else:
concl = "Хорошее состояние растительности."
info_lines.append(f"Вывод: {concl}")
else:
info_lines.append("Недостаточно валидных пикселей для оценки (возможно, облака).")
info_lines.append("")
info_lines.append("Служебное:")
info_lines.append(f"Пример ссылки на предпросмотр: {url_used_preview}")
summary_md = " \n".join(info_lines)
# 12) Возвращаем изображения как numpy (RGB) + текст
return tci_roi_rgb, ndvi_rgb, summary_md
# --------------------------
# Gradio UI
# --------------------------
with gr.Blocks(title="Sentinel-2 NDVI по адресу") as demo:
gr.Markdown(
"""
# NDVI по адресу/координатам (Sentinel-2 L1C, AWS)
Введите адрес (или координаты в формате `lat, lon`), при желании дату и радиус.
Инструмент скачивает данные напрямую из публичного бакета AWS без ключей.
"""
)
with gr.Row():
address_in = gr.Textbox(label="Адрес или координаты (lat, lon)", placeholder="Например: Москва, Тверская 1 или 55.75, 37.61", lines=1)
date_in = gr.Textbox(label="Дата (ГГГГ-ММ-ДД) — опционально", placeholder="например: 2025-07-01 (пусто = сегодня/последние дни)")
radius_in = gr.Slider(label="Радиус области интереса (м)", minimum=100, maximum=5000, step=50, value=1000)
run_btn = gr.Button("Анализировать NDVI")
with gr.Row():
tci_out = gr.Image(label="True Color (ROI)", type="numpy")
ndvi_out = gr.Image(label="NDVI (ROI)", type="numpy")
summary_out = gr.Markdown()
def on_click(addr, dstr, rad):
return analyze_vegetation(addr, dstr, int(rad))
run_btn.click(fn=on_click, inputs=[address_in, date_in, radius_in], outputs=[tci_out, ndvi_out, summary_out])
if __name__ == "__main__":
# Запуск локально: http://127.0.0.1:7860
demo.launch()