Spaces:
Running
Running
| # Flask web application for CAMS air pollution visualization | |
| import os | |
| import json | |
| import traceback | |
| from pathlib import Path | |
| from datetime import datetime, timedelta | |
| from werkzeug.utils import secure_filename | |
| from flask import Flask, render_template, request, redirect, url_for, flash, jsonify, send_file | |
| # Import our custom modules | |
| from data_processor import NetCDFProcessor, analyze_netcdf_file | |
| from plot_generator import IndiaMapPlotter | |
| from interactive_plot_generator import InteractiveIndiaMapPlotter | |
| from cams_downloader import CAMSDownloader | |
| from constants import ALLOWED_EXTENSIONS, MAX_FILE_SIZE, COLOR_THEMES | |
| app = Flask(__name__) | |
| app.secret_key = 'your-secret-key-change-this-in-production' # Change this! | |
| app.config['DEBUG'] = False # Explicitly disable debug mode | |
| # Add JSON filter for templates | |
| import json | |
| app.jinja_env.filters['tojson'] = json.dumps | |
| # Configure upload settings | |
| app.config['MAX_CONTENT_LENGTH'] = MAX_FILE_SIZE | |
| app.config['UPLOAD_FOLDER'] = 'uploads' | |
| # Initialize our services | |
| downloader = CAMSDownloader() | |
| plotter = IndiaMapPlotter() | |
| interactive_plotter = InteractiveIndiaMapPlotter() | |
| # Ensure directories exist | |
| for directory in ['uploads', 'downloads', 'plots', 'templates', 'static']: | |
| Path(directory).mkdir(exist_ok=True) | |
| def allowed_file(filename): | |
| """Check if file extension is allowed""" | |
| return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS | |
| def str_to_bool(value): | |
| """Convert string representation to boolean""" | |
| if isinstance(value, bool): | |
| return value | |
| if isinstance(value, str): | |
| return value.lower() in ('true', '1', 'yes', 'on') | |
| return bool(value) | |
| def index(): | |
| """Main page - file upload or date selection""" | |
| downloaded_files = downloader.list_downloaded_files() | |
| # List files in uploads and downloads/extracted | |
| upload_files = sorted( | |
| [f for f in Path(app.config['UPLOAD_FOLDER']).glob('*') if f.is_file()], | |
| key=lambda x: x.stat().st_mtime, reverse=True | |
| ) | |
| extracted_files = sorted( | |
| [f for f in Path('downloads/extracted').glob('*') if f.is_file()], | |
| key=lambda x: x.stat().st_mtime, reverse=True | |
| ) | |
| # Prepare for template: list of dicts with name and type | |
| recent_files = [ | |
| {'name': f.name, 'type': 'upload'} for f in upload_files | |
| ] + [ | |
| {'name': f.name, 'type': 'download'} for f in extracted_files | |
| ] | |
| current_date = datetime.now().strftime('%Y-%m-%d') | |
| return render_template( | |
| 'index.html', | |
| downloaded_files=downloaded_files, | |
| cds_ready=downloader.is_client_ready(), | |
| current_date=current_date, | |
| recent_files=recent_files | |
| ) | |
| def upload_file(): | |
| """Handle file upload""" | |
| if 'file' not in request.files: | |
| flash('No file selected', 'error') | |
| return redirect(request.url) | |
| file = request.files['file'] | |
| if file.filename == '': | |
| flash('No file selected', 'error') | |
| return redirect(request.url) | |
| if file and allowed_file(file.filename): | |
| try: | |
| filename = secure_filename(file.filename) | |
| timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') | |
| filename = f"{timestamp}_{filename}" | |
| filepath = Path(app.config['UPLOAD_FOLDER']) / filename | |
| file.save(str(filepath)) | |
| flash(f'File uploaded successfully: {filename}', 'success') | |
| return redirect(url_for('analyze_file', filename=filename)) | |
| except Exception as e: | |
| flash(f'Error uploading file: {str(e)}', 'error') | |
| return redirect(url_for('index')) | |
| else: | |
| flash('Invalid file type. Please upload .nc or .zip files.', 'error') | |
| return redirect(url_for('index')) | |
| def download_date(): | |
| """Handle date-based download""" | |
| date_str = request.form.get('date') | |
| if not date_str: | |
| flash('Please select a date', 'error') | |
| return redirect(url_for('index')) | |
| # --- Backend Validation Logic --- | |
| try: | |
| selected_date = datetime.strptime(date_str, '%Y-%m-%d') | |
| start_date = datetime(2015, 1, 1) | |
| end_date = datetime.now() | |
| if not (start_date <= selected_date <= end_date): | |
| flash(f'Invalid date. Please select a date between {start_date.strftime("%Y-%m-%d")} and today.', 'error') | |
| return redirect(url_for('index')) | |
| except ValueError: | |
| flash('Invalid date format. Please use YYYY-MM-DD.', 'error') | |
| return redirect(url_for('index')) | |
| # --- End of Validation Logic --- | |
| if not downloader.is_client_ready(): | |
| flash('CDS API not configured. Please check your environment variables or .cdsapirc file.', 'error') | |
| return redirect(url_for('index')) | |
| try: | |
| # Download CAMS data | |
| zip_path = downloader.download_cams_data(date_str) | |
| # Extract the files | |
| extracted_files = downloader.extract_cams_files(zip_path) | |
| flash(f'CAMS data downloaded successfully for {date_str}', 'success') | |
| # Analyze the extracted files | |
| if 'surface' in extracted_files: | |
| filename = Path(extracted_files['surface']).name | |
| return redirect(url_for('analyze_file', filename=filename, is_download='true')) | |
| elif 'atmospheric' in extracted_files: | |
| filename = Path(extracted_files['atmospheric']).name | |
| return redirect(url_for('analyze_file', filename=filename, is_download='true')) | |
| else: | |
| # Use the first available file | |
| first_file = list(extracted_files.values())[0] | |
| filename = Path(first_file).name | |
| return redirect(url_for('analyze_file', filename=filename, is_download='true')) | |
| except Exception as e: | |
| flash(f'Error downloading CAMS data: {str(e)}', 'error') | |
| return redirect(url_for('index')) | |
| def analyze_file(filename): | |
| """Analyze uploaded file and show variable selection""" | |
| is_download_param = request.args.get('is_download', 'false') | |
| is_download = str_to_bool(is_download_param) | |
| try: | |
| # Determine file path | |
| if is_download: | |
| file_path = Path('downloads/extracted') / filename | |
| else: | |
| file_path = Path(app.config['UPLOAD_FOLDER']) / filename | |
| if not file_path.exists(): | |
| flash('File not found', 'error') | |
| return redirect(url_for('index')) | |
| # Analyze the file | |
| analysis = analyze_netcdf_file(str(file_path)) | |
| if not analysis['success']: | |
| flash(f'Error analyzing file: {analysis["error"]}', 'error') | |
| return redirect(url_for('index')) | |
| if analysis['total_variables'] == 0: | |
| flash('No air pollution variables found in the file', 'warning') | |
| return redirect(url_for('index')) | |
| # Process variables for template | |
| variables = [] | |
| for var_name, var_info in analysis['detected_variables'].items(): | |
| variables.append({ | |
| 'name': var_name, | |
| 'display_name': var_info['name'], | |
| 'type': var_info['type'], | |
| 'units': var_info['units'], | |
| 'shape': var_info['shape'] | |
| }) | |
| return render_template('variables.html', | |
| filename=filename, | |
| variables=variables, | |
| color_themes=COLOR_THEMES, | |
| is_download=is_download) | |
| except Exception as e: | |
| flash(f'Error analyzing file: {str(e)}', 'error') | |
| return redirect(url_for('index')) | |
| def get_pressure_levels(filename, variable): | |
| """AJAX endpoint to get pressure levels for atmospheric variables""" | |
| try: | |
| is_download_param = request.args.get('is_download', 'false') | |
| is_download = str_to_bool(is_download_param) | |
| print(f"is_download: {is_download} (type: {type(is_download)})") | |
| # Determine file path | |
| if is_download: | |
| file_path = Path('downloads/extracted') / filename | |
| print("Using downloaded file path") | |
| else: | |
| file_path = Path(app.config['UPLOAD_FOLDER']) / filename | |
| print("Using upload file path") | |
| print(f"File path: {file_path}") | |
| processor = NetCDFProcessor(str(file_path)) | |
| processor.load_dataset() | |
| processor.detect_variables() | |
| pressure_levels = processor.get_available_pressure_levels(variable) | |
| processor.close() | |
| return jsonify({ | |
| 'success': True, | |
| 'pressure_levels': pressure_levels | |
| }) | |
| except Exception as e: | |
| return jsonify({ | |
| 'success': False, | |
| 'error': str(e) | |
| }) | |
| def get_available_times(filename, variable): | |
| """AJAX endpoint to get available timestamps for a variable""" | |
| try: | |
| is_download_param = request.args.get('is_download', 'false') | |
| is_download = str_to_bool(is_download_param) | |
| # Determine file path | |
| if is_download: | |
| file_path = Path('downloads/extracted') / filename | |
| else: | |
| file_path = Path(app.config['UPLOAD_FOLDER']) / filename | |
| processor = NetCDFProcessor(str(file_path)) | |
| processor.load_dataset() | |
| processor.detect_variables() | |
| available_times = processor.get_available_times(variable) | |
| processor.close() | |
| # Format times for display | |
| formatted_times = [] | |
| for i, time_val in enumerate(available_times): | |
| formatted_times.append({ | |
| 'index': i, | |
| 'value': str(time_val), | |
| 'display': time_val.strftime('%Y-%m-%d %H:%M') if hasattr(time_val, 'strftime') else str(time_val) | |
| }) | |
| return jsonify({ | |
| 'success': True, | |
| 'times': formatted_times | |
| }) | |
| except Exception as e: | |
| return jsonify({ | |
| 'success': False, | |
| 'error': str(e) | |
| }) | |
| def visualize(): | |
| """Generate and display the pollution map""" | |
| try: | |
| filename = request.form.get('filename') | |
| variable = request.form.get('variable') | |
| color_theme = request.form.get('color_theme', 'viridis') | |
| pressure_level = request.form.get('pressure_level') | |
| is_download_param = request.form.get('is_download', 'false') | |
| is_download = str_to_bool(is_download_param) | |
| if not filename or not variable: | |
| flash('Missing required parameters', 'error') | |
| return redirect(url_for('index')) | |
| # Determine file path | |
| if is_download: | |
| file_path = Path('downloads/extracted') / filename | |
| else: | |
| file_path = Path(app.config['UPLOAD_FOLDER']) / filename | |
| if not file_path.exists(): | |
| flash('File not found', 'error') | |
| return redirect(url_for('index')) | |
| # Process the data | |
| processor = NetCDFProcessor(str(file_path)) | |
| processor.load_dataset() | |
| processor.detect_variables() | |
| # Convert pressure level to float if provided | |
| pressure_level_val = None | |
| if pressure_level and pressure_level != 'None': | |
| try: | |
| pressure_level_val = float(pressure_level) | |
| except ValueError: | |
| pressure_level_val = None | |
| time_index_val = request.form.get('time_index') | |
| # Extract data | |
| data_values, metadata = processor.extract_data( | |
| variable, | |
| time_index = int(time_index_val) if time_index_val and time_index_val != 'None' else 0, | |
| pressure_level=pressure_level_val | |
| ) | |
| # Generate plot | |
| plot_path = plotter.create_india_map( | |
| data_values, | |
| metadata, | |
| color_theme=color_theme, | |
| save_plot=True | |
| ) | |
| processor.close() | |
| if plot_path: | |
| plot_filename = Path(plot_path).name | |
| # Prepare metadata for display | |
| plot_info = { | |
| 'variable': metadata.get('display_name', 'Unknown Variable'), | |
| 'units': metadata.get('units', ''), | |
| 'shape': str(metadata.get('shape', 'Unknown')), | |
| 'pressure_level': metadata.get('pressure_level'), | |
| 'color_theme': COLOR_THEMES.get(color_theme, color_theme), | |
| 'generated_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S'), | |
| 'data_range': { | |
| 'min': float(f"{data_values.min():.3f}") if hasattr(data_values, 'min') and not data_values.min() is None else 0, | |
| 'max': float(f"{data_values.max():.3f}") if hasattr(data_values, 'max') and not data_values.max() is None else 0, | |
| 'mean': float(f"{data_values.mean():.3f}") if hasattr(data_values, 'mean') and not data_values.mean() is None else 0 | |
| } | |
| } | |
| print(f"Plot info prepared: {plot_info}") | |
| return render_template('plot.html', | |
| plot_filename=plot_filename, | |
| plot_info=plot_info) | |
| else: | |
| flash('Error generating plot', 'error') | |
| return redirect(url_for('index')) | |
| except Exception as e: | |
| flash(f'Error creating visualization: {str(e)}', 'error') | |
| print(f"Full error: {traceback.format_exc()}") | |
| return redirect(url_for('index')) | |
| def visualize_interactive(): | |
| """Generate and display the interactive pollution map""" | |
| try: | |
| filename = request.form.get('filename') | |
| variable = request.form.get('variable') | |
| color_theme = request.form.get('color_theme', 'viridis') | |
| pressure_level = request.form.get('pressure_level') | |
| is_download_param = request.form.get('is_download', 'false') | |
| is_download = str_to_bool(is_download_param) | |
| if not filename or not variable: | |
| flash('Missing required parameters', 'error') | |
| return redirect(url_for('index')) | |
| # Determine file path | |
| if is_download: | |
| file_path = Path('downloads/extracted') / filename | |
| else: | |
| file_path = Path(app.config['UPLOAD_FOLDER']) / filename | |
| if not file_path.exists(): | |
| flash('File not found', 'error') | |
| return redirect(url_for('index')) | |
| # Process the data | |
| processor = NetCDFProcessor(str(file_path)) | |
| processor.load_dataset() | |
| processor.detect_variables() | |
| # Convert pressure level to float if provided | |
| pressure_level_val = None | |
| if pressure_level and pressure_level != 'None': | |
| try: | |
| pressure_level_val = float(pressure_level) | |
| except ValueError: | |
| pressure_level_val = None | |
| time_index_val = request.form.get('time_index') | |
| # Extract data | |
| data_values, metadata = processor.extract_data( | |
| variable, | |
| time_index = int(time_index_val) if time_index_val and time_index_val != 'None' else 0, | |
| pressure_level=pressure_level_val | |
| ) | |
| # Generate interactive plot (saved as JPG) | |
| plot_path = interactive_plotter.create_india_map( | |
| data_values, | |
| metadata, | |
| color_theme=color_theme, | |
| save_plot=True | |
| ) | |
| processor.close() | |
| if plot_path: | |
| plot_filename = Path(plot_path).name | |
| # Prepare metadata for display | |
| plot_info = { | |
| 'variable': metadata.get('display_name', 'Unknown Variable'), | |
| 'units': metadata.get('units', ''), | |
| 'shape': str(metadata.get('shape', 'Unknown')), | |
| 'pressure_level': metadata.get('pressure_level'), | |
| 'color_theme': COLOR_THEMES.get(color_theme, color_theme), | |
| 'generated_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S'), | |
| 'data_range': { | |
| 'min': float(f"{data_values.min():.3f}") if hasattr(data_values, 'min') and not data_values.min() is None else 0, | |
| 'max': float(f"{data_values.max():.3f}") if hasattr(data_values, 'max') and not data_values.max() is None else 0, | |
| 'mean': float(f"{data_values.mean():.3f}") if hasattr(data_values, 'mean') and not data_values.mean() is None else 0 | |
| }, | |
| 'is_interactive': True | |
| } | |
| return render_template('plot.html', | |
| plot_filename=plot_filename, | |
| plot_info=plot_info) | |
| else: | |
| flash('Error generating interactive plot', 'error') | |
| return redirect(url_for('index')) | |
| except Exception as e: | |
| flash(f'Error creating interactive visualization: {str(e)}', 'error') | |
| print(f"Full error: {traceback.format_exc()}") | |
| return redirect(url_for('index')) | |
| def serve_plot(filename): | |
| """Serve plot images""" | |
| try: | |
| plot_path = Path('plots') / filename | |
| if plot_path.exists(): | |
| # Determine mimetype based on file extension | |
| if filename.lower().endswith('.jpg') or filename.lower().endswith('.jpeg'): | |
| mimetype = 'image/jpeg' | |
| else: | |
| mimetype = 'image/png' | |
| return send_file(str(plot_path), mimetype=mimetype) | |
| else: | |
| flash('Plot not found', 'error') | |
| return redirect(url_for('index')) | |
| except Exception as e: | |
| flash(f'Error serving plot: {str(e)}', 'error') | |
| return redirect(url_for('index')) | |
| def cleanup(): | |
| """Clean up old files""" | |
| try: | |
| # Clean up old plots (older than 24 hours) | |
| cutoff_time = datetime.now() - timedelta(hours=24) | |
| cleaned_count = 0 | |
| for plot_file in Path('plots').glob('*.png'): | |
| if plot_file.stat().st_mtime < cutoff_time.timestamp(): | |
| plot_file.unlink() | |
| cleaned_count += 1 | |
| for plot_file in Path('plots').glob('*.jpg'): | |
| if plot_file.stat().st_mtime < cutoff_time.timestamp(): | |
| plot_file.unlink() | |
| cleaned_count += 1 | |
| flash(f'Cleaned up {cleaned_count} old plot files', 'success') | |
| return redirect(url_for('index')) | |
| except Exception as e: | |
| print(f"Error during cleanup: {str(e)}") | |
| flash('Error during cleanup', 'error') | |
| return redirect(url_for('index')) | |
| def health_check(): | |
| """Health check endpoint for monitoring""" | |
| return jsonify({ | |
| 'status': 'healthy', | |
| 'timestamp': datetime.now().isoformat(), | |
| 'cds_ready': downloader.is_client_ready() | |
| }) | |
| def too_large(e): | |
| """Handle file too large error""" | |
| flash('File too large. Maximum size is 500MB.', 'error') | |
| return redirect(url_for('index')) | |
| def not_found(e): | |
| """Handle 404 errors""" | |
| flash('Page not found', 'error') | |
| return redirect(url_for('index')) | |
| def server_error(e): | |
| """Handle server errors""" | |
| flash('An internal error occurred', 'error') | |
| return redirect(url_for('index')) | |
| if __name__ == '__main__': | |
| import os | |
| # Get port from environment variable (Hugging Face uses 7860) | |
| port = int(os.environ.get('PORT', 7860)) | |
| debug_mode = os.environ.get('FLASK_ENV', 'production') != 'production' | |
| print("π Starting CAMS Air Pollution Visualization App") | |
| print(f"π Available at: http://localhost:{port}") | |
| print("π§ CDS API Ready:", downloader.is_client_ready()) | |
| # Run the Flask app | |
| app.run(debug=debug_mode, host='0.0.0.0', port=port) |