from minio.error import S3Error from cng.utils import * import zipfile import os import subprocess import geopandas as gpd import ibis from ibis import _ import rasterio from rasterio.transform import xy from shapely.geometry import Point import numpy as np from pyproj import Transformer def info(folder, file, base_folder, bucket = "public-ca30x30"): """ Extract minio path to upload/download data """ if (folder is None) & (base_folder is None): path = file else: path = os.path.join(base_folder, folder, file) # path = os.path.join(folder, file) return bucket, path def download(s3, folder, file, file_name = None, base_folder = "CBN/"): """ Downloading file from minio """ if not file_name: file_name = file bucket, path = info(folder, file, base_folder) s3.fget_object(bucket, path , file_name) return def upload(s3, folder, file, base_folder = "CBN/"): """ Uploading file from minio """ bucket, path = info(folder, file, base_folder) s3.fput_object(bucket, path ,file) return def unzip(s3, folder, file, base_folder = "CBN/"): """ Unzipping zip files """ download(s3, folder, file, base_folder) with zipfile.ZipFile(file, 'r') as zip_ref: zip_ref.extractall() return def process_vector(s3, folder, file, file_name = None, gdf = None, crs="EPSG:4326", base_folder = "CBN/"): """ Driver function to process vectors """ if gdf is None: gdf = gpd.read_file(file) if gdf.crs != crs: gdf = gdf.to_crs(crs) if gdf.geometry.name != 'geom': gdf = gdf.rename_geometry('geom') if file_name: file = file_name # upload_parquet(folder, file, gdf) name, ext = os.path.splitext(file) parquet_file = f"{name}{'.parquet'}" gdf.to_parquet(parquet_file) upload(s3, folder, parquet_file, base_folder) # return gdf.drop('geom',axis = 1).columns.to_list() return def process_raster(s3, folder, file, file_name = None, base_folder = "CBN/"): """ Driver function to process rasters """ if file_name: file = file_name name, ext = os.path.splitext(file) output_file = f"{name}_processed{ext}" output_cog_file = f"{name}_processed_COG{ext}" output_vector_file = f"{name}_processed.parquet" # Reproject raster if not exists_on_s3(s3, folder, output_file, base_folder): output_file = reproject_raster(file) upload(s3, folder, output_file, base_folder) else: print(f"{output_file} already exists on S3, skipping reprojection/upload.") # Make COG if not exists_on_s3(s3, folder, output_cog_file, base_folder): output_cog_file = make_cog(output_file) upload(s3, folder, output_cog_file, base_folder) else: print(f"{output_cog_file} already exists on S3, skipping COG conversion/upload.") # # Vectorize raster # if not exists_on_s3(s3, folder, output_vector_file, base_folder): # output_vector_file, cols = make_vector(output_file) # upload(s3, folder, output_vector_file, base_folder) # else: # print(f"{output_vector_file} already exists on S3, skipping vectorization/upload.") # # We still need column names # gdf = gpd.read_parquet(output_vector_file) # cols = gdf.drop('geom', axis=1).columns.to_list() # return cols return def reproject_raster(input_file, crs="EPSG:3310"): """ Reproject rasters """ suffix = '_processed' name, ext = os.path.splitext(input_file) output_file = f"{name}{suffix}{ext}" command = [ "gdalwarp", "-t_srs", crs, input_file, output_file ] try: subprocess.run(command, check=True) print(f"Reprojection successful!") except subprocess.CalledProcessError as e: print(f"Error occurred during reprojection: {e}") return output_file def make_cog(input_file, crs="EPSG:4326"): """ Converting TIF to COGs """ suffix = '_COG' name, ext = os.path.splitext(input_file) output_file = f"{name}{suffix}{ext}" command = [ "gdalwarp", "-t_srs", crs, "-of", "COG", input_file, output_file ] try: subprocess.run(command, check=True) print(f"Successful!") except subprocess.CalledProcessError as e: print(f"Error occurred during processing: {e}") return output_file # def make_vector(input_file, crs="EPSG:4326"): # """ # Converting rasters to vector formats in order to convert to h3 # """ # name, ext = os.path.splitext(input_file) # output_file = f"{name}.parquet" # with rasterio.open(input_file) as src: # band = src.read(1) # read first band # mask = band != src.nodata # mask out nodata # rows, cols = np.where(mask) # x, y = rasterio.transform.xy(src.transform, rows, cols, offset = "center") # # reproject # if src.crs and src.crs.to_string() != crs: # transformer = Transformer.from_crs(src.crs, crs, always_xy=True) # x, y = transformer.transform(x, y) # crs_out = crs # else: # crs_out = src.crs # gdf = gpd.GeoDataFrame( # {"value": band[rows, cols]}, # geometry=[Point(xy) for xy in zip(x, y)], # crs=crs_out # ) # gdf.rename_geometry('geom', inplace=True) # gdf['id'] = np.arange(len(gdf)) # gdf.to_parquet(output_file) # return output_file, gdf.drop('geom',axis = 1).columns.to_list() def filter_raster(s3, folder, file, percentile): """ Helper function to filter rasteres """ with rasterio.open(file) as src: data = src.read(1) # Read the first band profile = src.profile # mask no data values masked_data = np.ma.masked_equal(data, src.nodata) # compute percentile/threshold p = np.percentile(masked_data.compressed(), percentile) filtered = np.full_like(data, src.nodata) filtered[(masked_data >= p).filled(False)] = masked_data[(masked_data >= p).filled(False)] name, ext = os.path.splitext(file) new_file = f"{name}{'_'}{percentile}{'percentile'}{ext}" profile.update(dtype=rasterio.float64, nodata=src.nodata) with rasterio.open(new_file, "w", **profile) as dst: dst.write(filtered, 1) process_raster(s3, folder, new_file) # return cols return def convert_pmtiles(con, s3, folder, file, base_folder = "CBN/", current_crs = 'epsg:3310'): """ Convert to PMTiles with tippecanoe """ print('converting pmtiles') name, ext = os.path.splitext(file) if ext != '.geojson': if current_crs != 'epsg:4326': data = (con.read_parquet(file).execute().set_crs(current_crs) .to_crs('epsg:4326')) else: data = (con.read_parquet(file).execute().set_crs(current_crs)) data.to_file(name+'.geojson') to_pmtiles(name+'.geojson', name+'.pmtiles', options = ['--extend-zooms-if-still-dropping','--drop-densest-as-needed']) upload(s3, folder, name+'.pmtiles', base_folder) return def exists_on_s3(s3, folder, file, base_folder = "CBN/"): """ Check if a file exists on S3 """ bucket, path = info(folder, file, base_folder) try: s3.stat_object(bucket, path) return True except S3Error: return False