# Pre-processing script

In [None]:
import os
data_path = '../data/CBN-layers/'
os.chdir(data_path)

from cng.utils import set_secrets, s3_client, s3_cp, to_pmtiles
s3 = s3_client()

 
import ibis
from ibis import _
import ibis.expr.datatypes as dt
con = ibis.duckdb.connect(extensions=["spatial"])

import geopandas as gpd
import duckdb

#### Helper functions

In [None]:
def get_url(folder, file, base_folder = 'CBN'):
 minio = 'https://minio.carlboettiger.info/'
 bucket = 'public-ca30x30'
 if base_folder is None:
 path = os.path.join(bucket,folder,file)
 else:
 path = os.path.join(bucket,base_folder,folder,file)
 url = minio+path
 return url


# usage: t.mutate(geom_valid = ST_MakeValid(t.geom))
@ibis.udf.scalar.builtin
def ST_MakeValid(geom) -> dt.geometry:
 ...

#### Variables

In [None]:
# CA Nature data 
ca_raw_parquet = "https://data.source.coop/cboettig/ca30x30/ca_areas.parquet"
# ca_raw_parquet = 'ca_areas.parquet'

# Boundary of CA, used to computed 'non-conserved' areas
ca_boundary_parquet = get_url('CA_Nature/2024/Preprocessing','ca_boundary.parquet',base_folder = None)

# newly protected areas 
newly_protected = get_url('Progress_data_new_protection/Newly_counted_lands','newly_counted_lands_2024.parquet')

# Ecoregions
ecoregions = get_url('Ecoregion','ACE_ecoregions.parquet')

# file to save non-conserved areas; costly operation so we save results 
ca_nonconserved_url = get_url('Progress_data_new_protection/Land_Status_Zone_Ecoregion_Counties','all_regions_reGAP_county_eco.parquet')

# temp file only of CA Nature data + non-conserved areas 
ca_base_parquet = "ca-30x30-base.parquet"
ca_temp_parquet = "ca-30x30-temp.parquet" 

# temp file used to compute metrics w/ data layers 
ca_temp_stats_parquet = "ca-30x30-stats-temp.parquet" 

#vector data 
ACE_rarerank_statewide = get_url('ACE_biodiversity/ACE_rarerank_statewide','ACE_rarerank_statewide.parquet')
ACE_rarerank_ecoregion = get_url('ACE_biodiversity/ACE_rarerank_ecoregion','ACE_rarerank_ecoregion.parquet')
ACE_biorank_statewide = get_url('ACE_biodiversity/ACE_biorank_statewide','ACE_biorank_statewide.parquet')
ACE_biorank_ecoregion = get_url('ACE_biodiversity/ACE_biorank_ecoregion','ACE_biorank_ecoregion.parquet')

ACE_amph_richness = get_url('ACE_biodiversity/ACE_amphibian_richness','ACE_amphibian_richness.parquet')
ACE_reptile_richness = get_url('ACE_biodiversity/ACE_reptile_richness','ACE_reptile_richness.parquet')
ACE_bird_richness = get_url('ACE_biodiversity/ACE_bird_richness','ACE_bird_richness.parquet')
ACE_mammal_richness = get_url('ACE_biodiversity/ACE_mammal_richness','ACE_mammal_richness.parquet')
ACE_rare_amphibian_richness = get_url('ACE_biodiversity/ACE_rare_amphibian_richness','ACE_rare_amphibian_richness.parquet')
ACE_rare_reptile_richness = get_url('ACE_biodiversity/ACE_rare_reptile_richness','ACE_rare_reptile_richness.parquet')
ACE_rare_bird_richness = get_url('ACE_biodiversity/ACE_rare_bird_richness','ACE_rare_bird_richness.parquet')
ACE_rare_mammal_richness = get_url('ACE_biodiversity/ACE_rare_mammal_richness','ACE_rare_mammal_richness.parquet')
ACE_endemic_amphibian_richness = get_url('ACE_biodiversity/ACE_endemic_amphibian_richness','ACE_endemic_amphibian_richness.parquet')
ACE_endemic_reptile_richness = get_url('ACE_biodiversity/ACE_endemic_reptile_richness','ACE_endemic_reptile_richness.parquet')
ACE_endemic_bird_richness = get_url('ACE_biodiversity/ACE_endemic_bird_richness','ACE_endemic_bird_richness.parquet')
ACE_endemic_mammal_richness = get_url('ACE_biodiversity/ACE_endemic_mammal_richness','ACE_endemic_mammal_richness.parquet')

wetlands = get_url('Freshwater_resources/Wetlands','CA_wetlands.parquet')
fire = get_url('Climate_risks/Historical_fire_perimeters','calfire_2023.parquet')
farmland = get_url('NBS_agriculture/Farmland','Farmland_2018.parquet')
grazing = get_url('NBS_agriculture/Lands_suitable_grazing','Grazing_land_2018.parquet')
DAC = get_url('Progress_data_new_protection/DAC','DAC_2022.parquet')
low_income = get_url('Progress_data_new_protection/Low_income_communities','low_income_CalEnviroScreen4.parquet')

# raster data
climate_zones = get_url('Climate_zones', 'climate_zones_10_processed.tif')
# habitat = get_url('Habitat', 'CWHR13_2022_processed.tif')
habitat = get_url('Habitat', 'fveg22_1_processed.tif')
plant_richness = get_url('Biodiversity_unique/Plant_richness', 'species_D_80percentile_processed.tif')
endemic_plant_richness = get_url('Biodiversity_unique/Rarityweighted_endemic_plant_richness', 'endemicspecies_E_80percentile_processed.tif')
resilient_conn_network = get_url('Connectivity_resilience/Resilient_connected_network_allcategories', 
 'rcn_wIntactBioCat_caOnly_2020-10-27_processed.tif')

# final files: conserved + non-conserved areas + data layers 
ca_parquet = "ca-30x30-cbn.parquet"
ca_geojson = "ca-30x30-cbn.geojson"
ca_pmtiles = "ca-30x30-cbn.pmtiles" 

# Step 1: Cleaning up "non-conserved" areas

#### Non-conserved areas need to match CA Nature schema when merging

In [None]:
%%time 
# match CA Nature schema 

non_conserved = (con.read_parquet(ca_nonconserved_url)
 .filter(_.reGAP == 0)
 .rename(county = "COUNTY_NAM", ecoregion = "CA_Ecoregi",acres = "Acres", gap_code = "reGAP")
 .mutate(id=ibis.row_number().over())
 .select( _.id, _.county, _.ecoregion, _.acres,_.geom, _.gap_code)
 .mutate(established = ibis.null(), name = ibis.literal("Non-Conserved Areas"),
 access_type = ibis.null(), manager = ibis.null(), manager_type = ibis.null(),
 easement = ibis.null(), type = ibis.literal("Land"),
 status = ibis.literal("non-conserved"),
 acres = _.acres.round(4)
 )
 .cast({"geom": "geometry", "established": "string", "gap_code": "int16", "status": "string","name": "string",
 "access_type": "string", "manager": "string", "manager_type": "string",
 "ecoregion": "string", "easement": "string", "id": "string", "type": "string",
 "acres":"float64"}) #match schema to CA Nature
 .mutate(geom = ST_MakeValid(_.geom))
 .drop_null(['geom'],how = "any")
 )

non_conserved.execute().set_crs('epsg:3310').to_parquet('ca_cbn_nonconserved_areas.parquet')
s3.fput_object("public-ca30x30", 'Preprocessing/ca_cbn_nonconserved_areas.parquet', 'ca_cbn_nonconserved_areas.parquet') 

# Step 2: Isolate the "newly protected" polygons

In [None]:
# negative buffer to account for overlapping boundaries. 
buffer = -30 #30m buffer 

tbl = (
 con.read_parquet(ca_raw_parquet)
 .cast({"SHAPE": "geometry"})
 .rename(geom = "SHAPE")
 .filter(_.reGAP < 3) # only gap 1 and 2 count towards 30x30
)


# polygons with release_year 2024 are a superset of release_year 2023. 
# use anti_join to isolate the objects that are in release_year 2024 but not release_year 2023 (aka newly established). 
tbl_2023 = tbl.filter(_.Release_Year == 2023).mutate(geom=_.geom.buffer(buffer)) 
tbl_2024 = tbl.filter(_.Release_Year == 2024)
intersects = tbl_2024.anti_join(tbl_2023, _.geom.intersects(tbl_2023.geom))

In [None]:
# buffer = 160 #0.1mile buffer 

# tbl_2024 = (
# con.read_parquet(ca_raw_parquet)
# .cast({"SHAPE": "geometry"})
# .rename(geom = "SHAPE")
# .filter(_.Release_Year == 2024)
# .mutate(geom=_.geom.buffer(buffer)) 
# )

# tbl_new = (
# con.read_parquet(newly_protected)
# )

# intersects = tbl_new.anti_join(tbl_2024, _.geom.intersects(tbl_2024.geom))

# Step 3: Join all protected lands + non-conserved areas 

In [None]:
# %%time
new2024 = intersects.select("OBJECTID").mutate(established = ibis.literal("2024")) # saving IDs to join on

ca_merged = (con
 .read_parquet(ca_raw_parquet)
 .cast({"SHAPE": "geometry"})
 .mutate(area = _.SHAPE.area())
 .filter(_.Release_Year == 2024) # having both 2023 and 2024 is redudant since 2024 is the superset.
 .left_join(new2024, "OBJECTID") # newly established 2024 polygons 
 .mutate(established=_.established.fill_null("pre-2024")) 
 .rename(name = "cpad_PARK_NAME", access_type = "cpad_ACCESS_TYP", manager = "cpad_MNG_AGENCY",
 manager_type = "cpad_MNG_AG_LEV", id = "OBJECTID", type = "TYPE", 
 ecoregion = "CA_Ecoregion_Name", acres = "Acres", gap_code = "reGAP", geom = "SHAPE")
 .cast({"gap_code": "int16"})
 .cast({"id": "int64"})
 .mutate(manager = _.manager.substitute({"": "Unknown"})) 
 .mutate(manager_type = _.manager_type.substitute({"": "Unknown"}))
 .mutate(access_type = _.access_type.substitute({"": "Unknown Access"}))
 .mutate(name = _.name.substitute({"": "Unknown"}))
 .mutate(manager_type = _.manager_type.substitute({"Home Owners Association": "HOA"}))
 .mutate(easement=_.Easement.cast("string").substitute({"0": "False", "1": "True"}))
 .mutate(status=_.gap_code.cast("string")
 .substitute({"1": "30x30-conserved", "2": "30x30-conserved", "3": "other-conserved", 
 "4": "unknown"}))
 .select(_.established, _.gap_code, _.status, _.name, _.access_type, _.manager, _.manager_type,
 _.ecoregion, _.easement, _.acres, _.id, _.type, _.geom)
 )


#### Adding county data + non conserved areas

Non conserved data already has counties, so we join it after we add counties to cpad

In [None]:
%%time 
counties = con.read_parquet('../CA_counties.parquet')
# ca = con.read_parquet(ca_temp_parquet)

con.create_table("counties", counties.select("COUNTY_NAM","geom"), overwrite = True)
con.create_table("ca", ca_merged, overwrite = True)

# getting county name(s) for each protected area 
con.con.execute('''
CREATE TABLE counties_data AS
SELECT 
 ca.*, 
 counties.COUNTY_NAM AS county,
 ST_Intersection(ca.geom, counties.geom) AS geom
FROM ca
JOIN counties 
 ON ST_Intersects(ca.geom, counties.geom)
WHERE NOT ST_IsEmpty(ST_Intersection(ca.geom, counties.geom))
 AND ST_GeometryType(ST_Intersection(ca.geom, counties.geom)) IN ('POLYGON', 'MULTIPOLYGON');
''')

import string
from ibis import window, literal, row_number

win = window(group_by = "id", order_by="geom")

# add suffix to duplicate ids (caused by separating the areas by county)
def map_idx_to_letter(idx):
 case = idx.case()
 for i, letter in enumerate(string.ascii_lowercase[:26]):
 case = case.when(i, letter)
 return case.else_("").end()
 
all_data = (
 con.table("counties_data")
 .drop("geom")
 .rename(geom="geom_1")
 # # modify the ids for areas that span multiple counties 
 .mutate(
 geom = ST_MakeValid(_.geom),
 acres=_.geom.area() / 4046.8564224,
 id_count=_.id.count().over(win), # 
 idx= row_number().over(win) - 1,
 )
 # e.g. if id = 11 has 2 rows (bc it spans 2 counties), make each row 11a and 11b. 
 .mutate(
 id=_.id.cast("string") + (_.id_count > 1).ifelse(map_idx_to_letter(_.idx.cast("int")), "")
 )
 .drop("id_count", "idx")
 .drop_null(['geom'],how = "any")
 .union(non_conserved)
 .mutate(acres=_.acres.round(4))
)

gdf = all_data.execute()

gdf.set_crs("epsg:3310").to_parquet(ca_base_parquet)
s3.fput_object("public-ca30x30", 'CA_Nature/2024/Preprocessing/'+ca_base_parquet, ca_base_parquet) 

# Step 4: Compute metrics w/ data layers

#### Raster data

In [None]:
# getting the habitat name from pixel
import xml.etree.ElementTree as ET

def get_habitat_type(fieldname):
 aux_xml_path = 'fveg22_1_processed.tif.aux.xml'
 s3.fget_object('public-ca30x30','CBN/Habitat/'+aux_xml_path, aux_xml_path)
 tree = ET.parse(aux_xml_path)
 root = tree.find(".//GDALRasterAttributeTable")
 field_names = [f.find("Name").text for f in root.findall("FieldDefn")]
 val_i, name_i = field_names.index("Value"), field_names.index(fieldname)

 return {
 int(r.findall("F")[val_i].text): r.findall("F")[name_i].text
 for r in root.findall("Row")
 }

habitat_lookup = get_habitat_type("WHR13NAME")

In [None]:
# computing overlap 
def compute_overlap(stats, name):
 return [
 round(s["count"] / (s["count"] + s["nodata"]), 4)
 if (s["count"] + s["nodata"]) > 0 else 0
 for s in stats
 ]

In [None]:
%%time
from rasterstats import zonal_stats
import numpy as np
import warnings
warnings.filterwarnings("ignore", message="Warning: converting a masked element to nan") #ignoring warning 

rasters = [climate_zones, habitat, plant_richness, endemic_plant_richness, resilient_conn_network]
names = ['climate_zone','habitat_type','plant_richness','rarityweighted_endemic_plant_richness', 'resilient_connected_network']

gdf_stats = gpd.read_parquet(ca_base_parquet) # read in data if it's not already created 

for file,name in zip(rasters,names):
 if name in ['climate_zone','habitat_type','resilient_connected_network']:
 metric = "majority"
 else: 
 metric = ["count", "nodata"]
 raster_stats = zonal_stats(ca_base_parquet, file, stats = metric)
 if name in ['plant_richness','rarityweighted_endemic_plant_richness']:
 values = compute_overlap(raster_stats, name)
 else:
 values = [d[metric] for d in raster_stats]
 gdf_stats[name] = values

# getting the habitat name from the pixel
gdf_stats['habitat_type'] = gdf_stats['habitat_type'].map(habitat_lookup)
gdf_stats.to_parquet(ca_temp_stats_parquet) 


#### Vector data

In [None]:
def vector_vector_stats(base, data_layer):
 t1 = con.read_parquet(base).select(_.id, _.geom)
 t2 = con.read_parquet(data_layer).select(_.geom)

 expr = (t1
 .left_join(t2, t1.geom.intersects(t2.geom))
 .group_by(t1.id, t1.geom)
 .agg(overlap_fraction = (t1.geom.intersection(t2.geom).area() / t1.geom.area()) 
 .sum().coalesce(0).round(3) ) # overlap 
 )
 ibis.to_sql(expr)
 stats = expr.execute()
 return stats[['id','overlap_fraction']]

In [None]:
%%time

## this takes ~24 hours

names = ['ACE_rarerank_statewide', 'ACE_rarerank_ecoregion',
 'ACE_biorank_statewide', 'ACE_biorank_ecoregion',
 'ACE_amphibian_richness','ACE_reptile_richness',
 'ACE_bird_richness','ACE_mammal_richness',
 'ACE_rare_amphibian_richness','ACE_rare_reptile_richness',
 'ACE_rare_bird_richness','ACE_rare_mammal_richness',
 'ACE_endemic_amphibian_richness','ACE_endemic_reptile_richness',
 'ACE_endemic_bird_richness','ACE_endemic_mammal_richness',
 'wetlands','fire','farmland','grazing','DAC','low_income']

vectors = [ACE_rarerank_statewide, ACE_rarerank_ecoregion,
 ACE_biorank_statewide, ACE_biorank_ecoregion,
 ACE_amph_richness, ACE_reptile_richness,
 ACE_bird_richness, ACE_mammal_richness,
 ACE_rare_amphibian_richness, ACE_rare_reptile_richness,
 ACE_rare_bird_richness, ACE_rare_mammal_richness,
 ACE_endemic_amphibian_richness,
 ACE_endemic_reptile_richness,
 ACE_endemic_bird_richness,
 ACE_endemic_mammal_richness,
 wetlands, fire,
 farmland, grazing,
 DAC, low_income]


gdf_stats = gpd.read_parquet(ca_temp_stats_parquet) 

 # set the index to the col we are joining on for gpd.join()
gdf_stats = gdf_stats.set_index('id')

for file,name in zip(vectors,names):
 vector_stats = vector_vector_stats(ca_base_parquet, file) 
 vector_stats = vector_stats.rename(columns ={'overlap_fraction':name}) 

 # joining new zonal stats column with CA Nature data. 
 gdf_stats = gdf_stats.join(vector_stats.set_index('id'))
 # gdf_stats.to_parquet(name+'_v2.parquet') #save CA Nature + zonal stats 

gdf_stats = gdf_stats.reset_index()
gdf_stats = gdf_stats.to_crs("epsg:4326") # to make pmtiles, we need to switch to epsg:4326
gdf_stats.to_parquet(ca_parquet)


# Step 5: Upload file + Generate PMTiles

In [None]:
# upload parquet to minio 
s3_cp(ca_parquet, "s3://public-ca30x30/"+ca_parquet, "minio")

#to use PMTiles, need to convert to geojson
ca_geo = (con
 .read_parquet(ca_parquet)
 )

#can't go directly from parquet -> pmtiles, need to go parquet -> geojson -> pmtiles 
ca_geo.execute().to_file(ca_geojson) 
pmtiles = to_pmtiles(ca_geojson, ca_pmtiles, options = ['--extend-zooms-if-still-dropping'])

# upload pmtiles to minio
s3_cp(ca_pmtiles, "s3://public-ca30x30/"+ca_pmtiles, "minio")