# NetCDF file processing and air pollution variable detection import os import zipfile import warnings import tempfile import numpy as np import pandas as pd import xarray as xr from pathlib import Path from datetime import datetime # India geographical bounds for coordinate trimming INDIA_BOUNDS = { 'lat_min': 6.0, # Southern tip (including southern islands) 'lat_max': 38.0, # Northern border (including Kashmir) 'lon_min': 68.0, # Western border 'lon_max': 98.0 # Eastern border (including Andaman & Nicobar) } # Imports from our Modules from constants import NETCDF_VARIABLES, AIR_POLLUTION_VARIABLES, PRESSURE_LEVELS warnings.filterwarnings('ignore') class NetCDFProcessor: def __init__(self, file_path): """ Initialize NetCDF processor Parameters: file_path (str): Path to NetCDF or ZIP file """ self.file_path = Path(file_path) self.dataset = None self.surface_dataset = None self.atmospheric_dataset = None self.detected_variables = {} def load_dataset(self): """Load NetCDF dataset from file or ZIP""" try: if self.file_path.suffix.lower() == '.zip': return self._load_from_zip() elif self.file_path.suffix.lower() == '.nc': return self._load_from_netcdf() else: raise ValueError("Unsupported file format. Use .nc or .zip files.") except Exception as e: raise Exception(f"Error loading dataset: {str(e)}") def _load_from_zip(self): """Load dataset from ZIP file (CAMS format)""" with zipfile.ZipFile(self.file_path, 'r') as zf: zip_contents = zf.namelist() # Look for surface and atmospheric data files surface_file = None atmospheric_file = None for file in zip_contents: if 'sfc' in file.lower() or 'surface' in file.lower(): surface_file = file elif 'plev' in file.lower() or 'pressure' in file.lower() or 'atmospheric' in file.lower(): atmospheric_file = file # Load surface data if available if surface_file: with zf.open(surface_file) as f: with tempfile.NamedTemporaryFile(suffix='.nc') as tmp: tmp.write(f.read()) tmp.flush() self.surface_dataset = xr.open_dataset(tmp.name, engine='netcdf4') print(f"Loaded surface data: {surface_file}") # Load atmospheric data if available if atmospheric_file: with zf.open(atmospheric_file) as f: with tempfile.NamedTemporaryFile(suffix='.nc') as tmp: tmp.write(f.read()) tmp.flush() self.atmospheric_dataset = xr.open_dataset(tmp.name, engine='netcdf4') print(f"Loaded atmospheric data: {atmospheric_file}") # If no specific files found, try to load the first .nc file if not surface_file and not atmospheric_file: nc_files = [f for f in zip_contents if f.endswith('.nc')] if nc_files: with zf.open(nc_files[0]) as f: with tempfile.NamedTemporaryFile(suffix='.nc') as tmp: tmp.write(f.read()) tmp.flush() self.dataset = xr.open_dataset(tmp.name, engine='netcdf4') print(f"Loaded dataset: {nc_files[0]}") else: raise ValueError("No NetCDF files found in ZIP") return True def _load_from_netcdf(self): """Load dataset from single NetCDF file""" self.dataset = xr.open_dataset(self.file_path) print(f"Loaded NetCDF file: {self.file_path.name}") return True def detect_variables(self): """Detect all supported variables (pollution, meteorological, etc.) in all loaded datasets""" self.detected_variables = {} # Check surface dataset if self.surface_dataset is not None: surface_vars = self._detect_variables_in_dataset(self.surface_dataset, 'surface') self.detected_variables.update(surface_vars) # Check atmospheric dataset if self.atmospheric_dataset is not None: atmo_vars = self._detect_variables_in_dataset(self.atmospheric_dataset, 'atmospheric') self.detected_variables.update(atmo_vars) # Check main dataset if no separate files if self.dataset is not None: main_vars = self._detect_variables_in_dataset(self.dataset, 'unknown') self.detected_variables.update(main_vars) return self.detected_variables def _detect_variables_in_dataset(self, dataset, dataset_type): """Detect all supported variables in a specific dataset""" detected = {} for var_name in dataset.data_vars: var_name_lower = var_name.lower() var_dims = list(dataset[var_name].dims) # Determine actual variable type based on dimensions (not just dictionary) actual_var_type = 'surface' if any(dim in ['level', 'plev', 'pressure_level', 'height'] for dim in [d.lower() for d in var_dims]): actual_var_type = 'atmospheric' # Check exact matches first in NETCDF_VARIABLES if var_name in NETCDF_VARIABLES: detected[var_name] = NETCDF_VARIABLES[var_name].copy() detected[var_name]['original_name'] = var_name detected[var_name]['dataset_type'] = dataset_type detected[var_name]['shape'] = dataset[var_name].shape detected[var_name]['dims'] = var_dims # Override type based on actual dimensions detected[var_name]['type'] = actual_var_type elif var_name_lower in NETCDF_VARIABLES: detected[var_name] = NETCDF_VARIABLES[var_name_lower].copy() detected[var_name]['original_name'] = var_name detected[var_name]['dataset_type'] = dataset_type detected[var_name]['shape'] = dataset[var_name].shape detected[var_name]['dims'] = var_dims # Override type based on actual dimensions detected[var_name]['type'] = actual_var_type else: # Auto-detect unknown variables by examining their attributes var_info = dataset[var_name] long_name = getattr(var_info, 'long_name', '').lower() standard_name = getattr(var_info, 'standard_name', '').lower() units = getattr(var_info, 'units', 'unknown') # Try to match against any known variable in NETCDF_VARIABLES by keywords matched = False for known_var, properties in NETCDF_VARIABLES.items(): if (known_var in var_name_lower or known_var in long_name or known_var in standard_name or properties['name'].lower() in var_name_lower or properties['name'].lower() in long_name): detected[var_name] = properties.copy() detected[var_name]['original_name'] = var_name detected[var_name]['dataset_type'] = dataset_type detected[var_name]['shape'] = dataset[var_name].shape detected[var_name]['dims'] = var_dims # Override type based on actual dimensions detected[var_name]['type'] = actual_var_type if units != 'unknown': detected[var_name]['units'] = units # Use actual units from file matched = True break # If still no match, create a generic entry for any 2D+ variable if not matched and len(dataset[var_name].dims) >= 2: # Check if it has lat/lon dimensions has_spatial = any(dim in ['lat', 'lon', 'latitude', 'longitude', 'x', 'y'] for dim in [d.lower() for d in var_dims]) if has_spatial: # Use the already determined variable type var_type = actual_var_type # Auto-determine color scheme based on variable name or units cmap = 'viridis' # default if 'temp' in var_name_lower or 'temperature' in long_name: cmap = 'RdYlBu' elif any(word in var_name_lower for word in ['wind', 'u', 'v']): cmap = 'coolwarm' elif any(word in var_name_lower for word in ['precip', 'rain', 'cloud', 'humid']): cmap = 'Blues' elif 'pressure' in var_name_lower or 'pressure' in long_name: cmap = 'RdYlBu' elif any(word in var_name_lower for word in ['radiation', 'solar']): cmap = 'YlOrRd' detected[var_name] = { 'units': units, 'name': long_name.title() if long_name else var_name.replace('_', ' ').title(), 'cmap': cmap, 'vmax_percentile': 95, 'type': var_type, 'original_name': var_name, 'dataset_type': dataset_type, 'shape': dataset[var_name].shape, 'dims': var_dims, 'auto_detected': True # Flag to indicate this was auto-detected } return detected def get_coordinates(self, dataset): """Get coordinate names from dataset""" coords = list(dataset.coords.keys()) # Find latitude coordinate lat_names = ['latitude', 'lat', 'y', 'Latitude', 'LATITUDE'] lat_coord = next((name for name in lat_names if name in coords), None) # Find longitude coordinate lon_names = ['longitude', 'lon', 'x', 'Longitude', 'LONGITUDE'] lon_coord = next((name for name in lon_names if name in coords), None) # Find time coordinate time_names = ['time', 'Time', 'TIME', 'forecast_reference_time'] time_coord = next((name for name in time_names if name in coords), None) # Find pressure/level coordinate level_names = ['pressure_level', 'plev', 'level', 'pressure', 'lev'] level_coord = next((name for name in level_names if name in coords), None) return { 'lat': lat_coord, 'lon': lon_coord, 'time': time_coord, 'level': level_coord } def format_timestamp(self, timestamp): """Format timestamp for display in plots""" try: if pd.isna(timestamp): return "Unknown Time" # Convert to pandas datetime if it isn't already if not isinstance(timestamp, pd.Timestamp): timestamp = pd.to_datetime(timestamp) # Format as "YYYY-MM-DD HH:MM" return timestamp.strftime('%Y-%m-%d %H:%M') except: return str(timestamp) def extract_data(self, variable_name, time_index=1, pressure_level=None): """ Extract data for a specific variable Parameters: variable_name (str): Name of the variable to extract time_index (int): Time index to extract (default: 0 for current time) pressure_level (float): Pressure level for atmospheric variables (default: surface level) Returns: tuple: (data_array, metadata) """ if variable_name not in self.detected_variables: raise ValueError(f"Variable {variable_name} not found in detected variables") var_info = self.detected_variables[variable_name] dataset_type = var_info['dataset_type'] # Determine which dataset to use if dataset_type == 'surface' and self.surface_dataset is not None: dataset = self.surface_dataset elif dataset_type == 'atmospheric' and self.atmospheric_dataset is not None: dataset = self.atmospheric_dataset elif self.dataset is not None: dataset = self.dataset else: raise ValueError(f"No suitable dataset found for variable {variable_name}") # Get the data variable data_var = dataset[variable_name] coords = self.get_coordinates(dataset) print(f"Coordinates: {coords}\n\n") # Handle different data shapes data_array = data_var print(f"Data array shape: {data_array.dims} \n\n") # Get timestamp information before extracting data selected_timestamp = None timestamp_str = "Unknown Time" # Handle time dimension if coords['time'] and coords['time'] in data_array.dims: # Get all available times available_times = pd.to_datetime(dataset[coords['time']].values) if time_index == -1: # Latest time time_index = len(available_times) - 1 # Ensure time_index is within bounds if 0 <= time_index < len(available_times): selected_timestamp = available_times[time_index] timestamp_str = self.format_timestamp(selected_timestamp) print(f"Time index: {time_index} selected - {timestamp_str}") data_array = data_array.isel({coords['time']: time_index}) else: print(f"Warning: time_index {time_index} out of bounds, using index 0") time_index = 0 selected_timestamp = available_times[time_index] timestamp_str = self.format_timestamp(selected_timestamp) data_array = data_array.isel({coords['time']: time_index}) # Handle pressure/level dimension for atmospheric variables actual_pressure = None if coords['level'] and coords['level'] in data_array.dims: if pressure_level is None: # Default to surface level (highest pressure) pressure_level = 1000 # Find closest pressure level pressure_values = dataset[coords['level']].values level_index = np.argmin(np.abs(pressure_values - pressure_level)) actual_pressure = pressure_values[level_index] data_array = data_array.isel({coords['level']: level_index}) print(f"Selected pressure level: {actual_pressure} hPa (requested: {pressure_level} hPa)") elif pressure_level is not None: # Store the requested pressure level even if no level dimension exists actual_pressure = pressure_level # Handle batch dimension (usually the first dimension for CAMS data) shape = data_array.shape if len(shape) == 4: # (batch, time, lat, lon) or similar data_array = data_array[0, -1] # Take first batch, latest time elif len(shape) == 3: # (batch, lat, lon) or (time, lat, lon) data_array = data_array[-1] # Take latest elif len(shape) == 5: # (batch, time, level, lat, lon) data_array = data_array[0, -1] # Already handled level above # Get coordinate arrays lats = dataset[coords['lat']].values lons = dataset[coords['lon']].values # Crop data to India region for better performance data_values, lats, lons = self._crop_to_india(data_array.values, lats, lons) # Convert units if necessary original_units = getattr(dataset[variable_name], 'units', '') data_values = self._convert_units(data_values, original_units, var_info['units']) metadata = { 'variable_name': variable_name, 'display_name': var_info['name'], 'units': var_info['units'], 'original_units': original_units, 'shape': data_values.shape, 'lats': lats, 'lons': lons, 'pressure_level': actual_pressure, 'time_index': time_index, 'timestamp': selected_timestamp, 'timestamp_str': timestamp_str, 'dataset_type': dataset_type } return data_values, metadata def _convert_units(self, data, original_units, target_units): """Convert data units for air pollution variables""" data_converted = data.copy() if original_units and target_units: orig_lower = original_units.lower() target_lower = target_units.lower() # kg/m³ to µg/m³ if 'kg' in orig_lower and 'µg' in target_lower: data_converted = data_converted * 1e9 print(f"Converting from {original_units} to {target_units} (×1e9)") # kg/m³ to mg/m³ elif 'kg' in orig_lower and 'mg' in target_lower: data_converted = data_converted * 1e6 print(f"Converting from {original_units} to {target_units} (×1e6)") # mol/m² conversions (keep as is) elif 'mol' in orig_lower: print(f"Units {original_units} kept as is") # No unit (dimensionless) - keep as is elif target_units == '': print("Dimensionless variable - no unit conversion") return data_converted def _crop_to_india(self, data_values, lats, lons): """ Crop data to India region to improve performance and focus visualization Parameters: data_values (np.ndarray): 2D data array (lat, lon) lats (np.ndarray): Latitude values lons (np.ndarray): Longitude values Returns: tuple: (cropped_data, cropped_lats, cropped_lons) """ # Use same India bounds as Aurora processor for consistency lat_min = INDIA_BOUNDS['lat_min'] - 2 # 6-2 = 4°N lat_max = INDIA_BOUNDS['lat_max'] + 2 # 38+2 = 40°N lon_min = INDIA_BOUNDS['lon_min'] - 2 # 68-2 = 66°E lon_max = INDIA_BOUNDS['lon_max'] + 2 # 97+2 = 99°E print(f"Original data shape: {data_values.shape}") print(f"Original lat range: {lats.min():.2f} to {lats.max():.2f}") print(f"Original lon range: {lons.min():.2f} to {lons.max():.2f}") # Find indices within India bounds lat_mask = (lats >= lat_min) & (lats <= lat_max) lon_mask = (lons >= lon_min) & (lons <= lon_max) # Get the indices lat_indices = np.where(lat_mask)[0] lon_indices = np.where(lon_mask)[0] if len(lat_indices) == 0 or len(lon_indices) == 0: print("Warning: No data found in India region, using full dataset") return data_values, lats, lons # Get the min and max indices to define the crop region lat_start, lat_end = lat_indices[0], lat_indices[-1] + 1 lon_start, lon_end = lon_indices[0], lon_indices[-1] + 1 # Crop the data if len(data_values.shape) == 2: # (lat, lon) cropped_data = data_values[lat_start:lat_end, lon_start:lon_end] else: print(f"Warning: Unexpected data shape {data_values.shape}, cropping first two dimensions") cropped_data = data_values[lat_start:lat_end, lon_start:lon_end] # Crop coordinates cropped_lats = lats[lat_start:lat_end] cropped_lons = lons[lon_start:lon_end] print(f"Cropped data shape: {cropped_data.shape}") print(f"Cropped lat range: {cropped_lats.min():.2f} to {cropped_lats.max():.2f}") print(f"Cropped lon range: {cropped_lons.min():.2f} to {cropped_lons.max():.2f}") print(f"Data reduction: {(1 - cropped_data.size / data_values.size) * 100:.1f}%") return cropped_data, cropped_lats, cropped_lons def get_available_times(self, variable_name): """Get available time steps for a variable""" if variable_name not in self.detected_variables: return [] var_info = self.detected_variables[variable_name] dataset_type = var_info['dataset_type'] # Determine which dataset to use if dataset_type == 'surface' and self.surface_dataset is not None: dataset = self.surface_dataset elif dataset_type == 'atmospheric' and self.atmospheric_dataset is not None: dataset = self.atmospheric_dataset elif self.dataset is not None: dataset = self.dataset else: return [] coords = self.get_coordinates(dataset) if coords['time'] and coords['time'] in dataset.dims: times = pd.to_datetime(dataset[coords['time']].values) print(f"Times: {times.to_list()}") return times.tolist() return [] def get_available_pressure_levels(self, variable_name): """Get available pressure levels for atmospheric variables""" if variable_name not in self.detected_variables: return [] var_info = self.detected_variables[variable_name] if var_info['type'] != 'atmospheric': return [] dataset_type = var_info['dataset_type'] # Determine which dataset to use if dataset_type == 'atmospheric' and self.atmospheric_dataset is not None: dataset = self.atmospheric_dataset elif self.dataset is not None: dataset = self.dataset else: return [] coords = self.get_coordinates(dataset) if coords['level'] and coords['level'] in dataset.dims: levels = dataset[coords['level']].values return levels.tolist() return PRESSURE_LEVELS # Default pressure levels def close(self): """Close all open datasets safely""" try: if self.dataset is not None: self.dataset.close() self.dataset = None except (RuntimeError, OSError): pass # Dataset already closed or invalid try: if self.surface_dataset is not None: self.surface_dataset.close() self.surface_dataset = None except (RuntimeError, OSError): pass # Dataset already closed or invalid try: if self.atmospheric_dataset is not None: self.atmospheric_dataset.close() self.atmospheric_dataset = None except (RuntimeError, OSError): pass # Dataset already closed or invalid class AuroraPredictionProcessor: def __init__(self, file_path): """ Initialize Aurora prediction processor for single NetCDF files with timestep data Parameters: file_path (str): Path to Aurora prediction NetCDF file """ self.file_path = Path(file_path) self.dataset = None self.detected_variables = {} def _trim_to_india_bounds(self, var, lats, lons): """ Trim data and coordinates to India geographical bounds to reduce computation Parameters: var (xarray.DataArray): Variable data lats (numpy.ndarray): Latitude coordinates lons (numpy.ndarray): Longitude coordinates Returns: tuple: (trimmed_var, trimmed_lats, trimmed_lons) """ # Find indices within India bounds lat_mask = (lats >= INDIA_BOUNDS['lat_min']) & (lats <= INDIA_BOUNDS['lat_max']) lon_mask = (lons >= INDIA_BOUNDS['lon_min']) & (lons <= INDIA_BOUNDS['lon_max']) lat_indices = np.where(lat_mask)[0] lon_indices = np.where(lon_mask)[0] if len(lat_indices) == 0 or len(lon_indices) == 0: # If no points in India bounds, return original data return var, lats, lons # Get min/max indices for slicing lat_start, lat_end = lat_indices[0], lat_indices[-1] + 1 lon_start, lon_end = lon_indices[0], lon_indices[-1] + 1 # Trim coordinates trimmed_lats = lats[lat_start:lat_end] trimmed_lons = lons[lon_start:lon_end] # Trim data - handle different dimension orders if var.ndim == 2: # (lat, lon) trimmed_var = var[lat_start:lat_end, lon_start:lon_end] elif var.ndim == 3 and 'latitude' in var.dims and 'longitude' in var.dims: # Find latitude and longitude dimension positions lat_dim_pos = var.dims.index('latitude') if 'latitude' in var.dims else var.dims.index('lat') lon_dim_pos = var.dims.index('longitude') if 'longitude' in var.dims else var.dims.index('lon') if lat_dim_pos == 1 and lon_dim_pos == 2: # (time/level, lat, lon) trimmed_var = var[:, lat_start:lat_end, lon_start:lon_end] elif lat_dim_pos == 0 and lon_dim_pos == 1: # (lat, lon, time/level) trimmed_var = var[lat_start:lat_end, lon_start:lon_end, :] else: # Fall back to original if dimension order is unexpected return var, lats, lons else: # For other dimensions or if trimming fails, return original return var, lats, lons return trimmed_var, trimmed_lats, trimmed_lons def load_dataset(self): """Load Aurora prediction NetCDF dataset""" try: self.dataset = xr.open_dataset(self.file_path) return True except Exception as e: raise Exception(f"Error loading Aurora prediction dataset: {str(e)}") def extract_variable_data(self, variable_name, pressure_level=None, step=0): """ Extract variable data from Aurora prediction file Parameters: variable_name (str): Name of the variable to extract pressure_level (float, optional): Pressure level for atmospheric variables step (int): Time step index (default: 0) Returns: tuple: (data_array, metadata_dict) """ if self.dataset is None: self.load_dataset() if variable_name not in self.dataset.data_vars: raise ValueError(f"Variable '{variable_name}' not found in dataset") var = self.dataset[variable_name] # Handle Aurora-specific dimensions # Aurora files have: (forecast_period, forecast_reference_time, [pressure_level], latitude, longitude) # First, squeeze singleton forecast_period dimension if 'forecast_period' in var.dims and var.sizes['forecast_period'] == 1: var = var.squeeze('forecast_period') # Handle forecast_reference_time - take the first one (index 0) if 'forecast_reference_time' in var.dims: var = var.isel(forecast_reference_time=0) # Handle step dimension if present (for backward compatibility) if 'step' in var.dims: if step >= var.sizes['step']: raise ValueError(f"Step {step} not available. Dataset has {var.sizes['step']} steps.") var = var.isel(step=step) # Handle pressure level dimension if present if pressure_level is not None and 'pressure_level' in var.dims: pressure_level = float(pressure_level) # Find closest pressure level available_levels = var.pressure_level.values closest_idx = np.argmin(np.abs(available_levels - pressure_level)) actual_level = available_levels[closest_idx] var = var.isel(pressure_level=closest_idx) pressure_info = f"at {actual_level:.0f} hPa" else: pressure_info = None # Handle different coordinate naming conventions if 'latitude' in self.dataset.coords: lats = self.dataset['latitude'].values lons = self.dataset['longitude'].values else: lats = self.dataset['lat'].values if 'lat' in self.dataset else self.dataset['latitude'].values lons = self.dataset['lon'].values if 'lon' in self.dataset else self.dataset['longitude'].values # Trim data and coordinates to India bounds to reduce computation var, lats, lons = self._trim_to_india_bounds(var, lats, lons) # Extract trimmed data data_values = var.values # Get variable information from constants import NETCDF_VARIABLES var_info = NETCDF_VARIABLES.get(variable_name, {}) display_name = var_info.get('name', variable_name.replace('_', ' ').title()) units = var.attrs.get('units', var_info.get('units', '')) # Prepare metadata metadata = { 'variable_name': variable_name, 'display_name': display_name, 'units': units, 'lats': lats, 'lons': lons, 'pressure_level': pressure_level if pressure_level else None, 'pressure_info': pressure_info, 'step': step, 'data_shape': data_values.shape, 'source': 'Aurora Prediction', 'file_path': str(self.file_path), } # Add timestamp information # For Aurora predictions, step represents the forecast step (12-hour intervals) hours_from_start = (step + 1) * 12 # Assuming 12-hour intervals metadata['timestamp_str'] = f"T+{hours_from_start}h (Step {step + 1})" return data_values, metadata def get_available_variables(self): """Get list of available variables categorized by type""" if self.dataset is None: self.load_dataset() surface_vars = [] atmospheric_vars = [] for var_name in self.dataset.data_vars: var = self.dataset[var_name] # Check if variable has pressure level dimension if 'pressure_level' in var.dims: atmospheric_vars.append(var_name) else: surface_vars.append(var_name) return { 'surface_vars': surface_vars, 'atmospheric_vars': atmospheric_vars } def get_available_pressure_levels(self): """Get available pressure levels""" if self.dataset is None: self.load_dataset() if 'pressure_level' in self.dataset.coords: return self.dataset.pressure_level.values.tolist() return [] def get_available_steps(self): """Get available time steps""" if self.dataset is None: self.load_dataset() if 'step' in self.dataset.dims: return list(range(self.dataset.sizes['step'])) return [0] def close(self): """Close the dataset safely""" try: if self.dataset is not None: self.dataset.close() self.dataset = None except (RuntimeError, OSError): pass # Dataset already closed or invalid def analyze_netcdf_file(file_path): """ Analyze NetCDF file structure and return summary Parameters: file_path (str): Path to NetCDF or ZIP file Returns: dict: Analysis summary """ processor = NetCDFProcessor(file_path) try: processor.load_dataset() detected_vars = processor.detect_variables() analysis = { 'success': True, 'file_path': str(file_path), 'detected_variables': detected_vars, 'total_variables': len(detected_vars), 'surface_variables': len([v for v in detected_vars.values() if v.get('type') == 'surface']), 'atmospheric_variables': len([v for v in detected_vars.values() if v.get('type') == 'atmospheric']), } # Get sample time information if detected_vars: sample_var = list(detected_vars.keys())[0] times = processor.get_available_times(sample_var) if times: analysis['time_range'] = { 'start': str(times[0]), 'end': str(times[-1]), 'count': len(times) } processor.close() return analysis except Exception as e: processor.close() return { 'success': False, 'error': str(e), 'file_path': str(file_path) }