Urban_Flood_Hazard _Code_For_Climada

“””ZurichFloodHazard , extends CLIMADA Hazard for Zurich Urban flood risk assessment.

I Wrote this Code for Climada as a part of Urban_flood_Petal

you can download this code from my github Page in this Address: https://github.com/sepehrkiya/climada_petal_urban_flood

import numpy as np
import pandas as pd
import xarray as xr
import rasterio
from scipy.sparse import csr_matrix
from scipy.sparse.linalg import spsolve
from numba import jit, prange
import matplotlib.pyplot as plt
from climada.hazard import Hazard
from climada.util.constants import SYSTEM_DIR
import os
import requests
import multiprocessing as mp
from pathlib import Path
import datetime
import geopandas as gpd
from shapely.geometry import Polygon, MultiPolygon

class ZurichFloodHazard(Hazard):
    """ZurichFloodHazard class extends CLIMADA Hazard for urban flood modeling in Zurich.
    
    This hazard module implements a sophisticated 2D shallow water model with
    Green-Ampt infiltration for the city of Zurich, Switzerland. It handles
    the 10 municipal districts and accounts for the lake and river systems.
    
    Attributes:
        intensity (sparse.csr_matrix): Flood depth in meters
        fraction (sparse.csr_matrix): Fractional impact (1 for flood)
        event_id (np.array): Unique identifier for each event
        frequency (np.array): Annual frequency of each event
        date (np.array): Date of each event
        orig (np.array): Origin of data source
        units (str): Units of intensity (default: 'm')
        centroids (Centroids): Centroids container
        event_name (list): Name of each event
        tag (TagHazard): Tag information about the source
        pool (BoundedThreadPoolExecutor): Thread pool for parallel computation
        rainfall_data (xarray.Dataset): Precipitation data
        dem_data (np.array): Digital Elevation Model of Zurich
        districts (gpd.GeoDataFrame): Boundaries of Zurich districts
        drainage_network (gpd.GeoDataFrame): Storm water drainage network
        soil_properties (pd.DataFrame): Soil infiltration parameters
        lake_zurich (Polygon): Lake Zurich boundary polygon
        rivers (gpd.GeoDataFrame): Rivers in Zurich area
    """
    
    def __init__(self):
        """Initialize Zurich flood hazard model with default parameters."""
        super().__init__(haz_type='FL')
        self.units = 'm'
        self.pool = None
        self.rainfall_data = None
        self.dem_data = None
        self.districts = None
        self.drainage_network = None
        self.soil_properties = None
        self.lake_zurich = None
        self.rivers = None
        self.model_params = {
            'dx': 10.0,  # Grid resolution in meters
            'dt': 1.0,   # Time step in seconds
            'g': 9.81,   # Gravity acceleration
            'manning_n': 0.03,  # Manning roughness coefficient
            'theta': 0.7,  # Time integration parameter (0.5-1.0)
            'max_iter': 10000,  # Maximum iterations
            'convergence_threshold': 1e-5,  # Convergence criterion
            'cfl_number': 0.7,  # Courant–Friedrichs–Lewy condition
            'output_interval': 3600,  # Output interval in seconds
            'sim_duration': 86400,  # Simulation duration in seconds
            'infiltration': {
                'initial_moisture': 0.3,  # Initial moisture content
                'saturated_moisture': 0.5,  # Saturated moisture content
                'suction_head': 0.1667,  # Suction head in meters
                'hydraulic_conductivity': {
                    'urban': 5e-7,      # Urban areas (m/s)
                    'suburban': 1e-6,   # Suburban areas (m/s)
                    'parks': 5e-6,      # Parks and gardens (m/s)
                    'forest': 1e-5      # Forest areas (m/s)
                }
            },
            'drainage': {
                'capacity': 0.02,  # Base drainage capacity in m/h
                'upgrade_factor': {  # Capacity multiplier by district
                    'district_1': 1.2,  # Central district
                    'district_2': 1.0,
                    'district_3': 0.9,
                    'district_4': 1.1,
                    'district_5': 1.0,
                    'district_6': 0.8,
                    'district_7': 1.1,
                    'district_8': 1.0,
                    'district_9': 0.9,
                    'district_10': 0.7, # Newest development
                    'district_11': 1.0,
                    'district_12': 0.8
                }
            }
        }
        
    def set_from_copernicus_cds(self, start_date, end_date, api_key=None):
        """Load precipitation data from Copernicus Climate Data Store.
        
        Parameters:
            start_date (str): Start date in format 'YYYY-MM-DD'
            end_date (str): End date in format 'YYYY-MM-DD'
            api_key (str, optional): Copernicus CDS API key
        
        Returns:
            ZurichFloodHazard: self
        """
        # Check if API key is provided or exists in environment
        if api_key is None:
            api_key = os.environ.get('COPERNICUS_CDS_API_KEY')
            if api_key is None:
                raise ValueError("No Copernicus CDS API key provided")
        
        # Format request to Copernicus CDS
        url = "https://cds.climate.copernicus.eu/api/v2"
        request_params = {
            'dataset_short_name': 'reanalysis-era5-land',
            'product_type': 'reanalysis',
            'format': 'netcdf',
            'variable': ['total_precipitation'],
            'year': [d.split('-')[0] for d in [start_date, end_date]],
            'month': list(range(1, 13)),
            'day': list(range(1, 32)),
            'time': [f"{h:02d}:00" for h in range(24)],
            'area': [47.4, 8.4, 47.3, 8.6],  # Zurich bounding box
        }
        
        # Log request details
        print(f"Requesting precipitation data from Copernicus CDS for {start_date} to {end_date}")
        
        # This is a simplified placeholder - actual implementation would use the CDS API client
        # For demonstration only - in production code, use proper CDS API client
        # self.rainfall_data = xr.open_dataset(response_file)
        
        # Placeholder simulated data (in actual implementation, this would come from CDS)
        time_range = pd.date_range(start=start_date, end=end_date, freq='1H')
        lats = np.linspace(47.3, 47.4, 20)
        lons = np.linspace(8.4, 8.6, 20)
        
        # Create synthetic precipitation data with realistic spatio-temporal pattern
        # Rainstorm events with higher intensity in summer months
        precip_data = np.zeros((len(time_range), len(lats), len(lons)))
        
        for i, t in enumerate(time_range):
            month = t.month
            hour = t.hour
            
            # More intense rain in summer (May-September)
            seasonal_factor = 2.0 if 5 <= month <= 9 else 0.8
            
            # Diurnal cycle - more rain in afternoon/evening
            diurnal_factor = 1.5 if 14 <= hour <= 20 else 0.9
            
            # Random storm events (approximately 10% of time steps)
            if np.random.random() < 0.1:
                storm_center_lat = np.random.choice(range(len(lats)))
                storm_center_lon = np.random.choice(range(len(lons)))
                
                # Create storm pattern
                for lat_idx in range(len(lats)):
                    for lon_idx in range(len(lons)):
                        distance = np.sqrt((lat_idx - storm_center_lat)**2 + 
                                          (lon_idx - storm_center_lon)**2)
                        intensity = 10 * np.exp(-0.5 * (distance/3)**2) * seasonal_factor * diurnal_factor
                        precip_data[i, lat_idx, lon_idx] = max(0, intensity + np.random.normal(0, 0.2))
            else:
                # Background precipitation
                precip_data[i, :, :] = np.random.exponential(0.5) * seasonal_factor * diurnal_factor
        
        # Create xarray dataset
        self.rainfall_data = xr.Dataset(
            data_vars={
                "tp": (["time", "latitude", "longitude"], precip_data)
            },
            coords={
                "time": time_range,
                "latitude": lats,
                "longitude": lons
            }
        )
        
        # Add metadata
        self.rainfall_data.tp.attrs["units"] = "mm/h"
        self.rainfall_data.tp.attrs["long_name"] = "Total Precipitation"
        
        print(f"Successfully loaded precipitation data with {len(time_range)} timestamps")
        return self
    
    def load_zurich_data(self, data_dir=None):
        """Load Zurich specific data: DEM, districts, drainage network, etc.
        
        Parameters:
            data_dir (str, optional): Directory with Zurich data files
            
        Returns:
            ZurichFloodHazard: self
        """
        if data_dir is None:
            data_dir = os.path.join(SYSTEM_DIR, 'data', 'zurich')
        
        # In a real implementation, this would load actual data files
        # For demonstration, we'll create synthetic data
        
        # Create synthetic DEM with realistic Zurich topography
        # Zurich has elevation ranging from ~400m to ~850m
        grid_size = 500
        x = np.linspace(0, 10000, grid_size)  # 10 km grid
        y = np.linspace(0, 10000, grid_size)  # 10 km grid
        
        # Base elevation
        base_elevation = 400 + np.zeros((grid_size, grid_size))
        
        # Lake Zurich (south-east)
        lake_mask = np.zeros((grid_size, grid_size), dtype=bool)
        for i in range(grid_size):
            for j in range(grid_size):
                if (i-grid_size*0.7)**2 + (j-grid_size*0.7)**2 < (grid_size*0.3)**2:
                    lake_mask[i, j] = True
        
        base_elevation[lake_mask] = 399  # Lake level
        
        # Hills (Uetliberg in west, Zürichberg in east)
        xx, yy = np.meshgrid(x, y)
        
        # Uetliberg (west side)
        uetliberg = 400 * np.exp(-0.5 * ((xx-2000)**2 + (yy-5000)**2) / 2000**2)
        
        # Zürichberg (east side)
        zurichberg = 300 * np.exp(-0.5 * ((xx-8000)**2 + (yy-5000)**2) / 2000**2)
        
        # Combining topographic features
        self.dem_data = base_elevation + uetliberg + zurichberg
        
        # Synthetic river path (Limmat river running from lake through city center)
        river_y = np.linspace(grid_size*0.7, 0, 100)
        river_x = grid_size*0.7 + np.sin(np.linspace(0, np.pi*2, 100)) * grid_size*0.05
        
        # Carve river into DEM
        for i, (x, y) in enumerate(zip(river_x, river_y)):
            x_idx = int(x)
            y_idx = int(y)
            if 0 <= x_idx < grid_size and 0 <= y_idx < grid_size:
                # River channel with declining elevation from lake to north
                river_elevation = 398 - i*0.1
                # Create a river bed approx 30m wide
                for dx in range(-15, 16):
                    for dy in range(-15, 16):
                        if x_idx+dx >= 0 and x_idx+dx < grid_size and y_idx+dy >= 0 and y_idx+dy < grid_size:
                            dist = np.sqrt(dx**2 + dy**2)
                            if dist < 15:
                                self.dem_data[y_idx+dy, x_idx+dx] = min(
                                    self.dem_data[y_idx+dy, x_idx+dx],
                                    river_elevation + dist*0.2
                                )
        
        # Create synthetic district boundaries
        # In a real implementation, this would load from a GeoJSON file
        district_polygons = []
        district_names = []
        
        # Create 12 districts (Zurich has 12 Kreise)
        for i in range(12):
            angle_start = i * np.pi/6
            angle_end = (i+1) * np.pi/6
            
            # Inner city districts are smaller
            inner_radius = 2000 if i < 5 else 4000
            outer_radius = 4000 if i < 5 else 8000
            
            # Create district polygon
            angles = np.linspace(angle_start, angle_end, 20)
            inner_points = [(5000 + inner_radius*np.cos(a), 5000 + inner_radius*np.sin(a)) for a in angles]
            outer_points = [(5000 + outer_radius*np.cos(a), 5000 + outer_radius*np.sin(a)) for a in reversed(angles)]
            all_points = inner_points + outer_points + [inner_points[0]]
            
            district_polygons.append(Polygon(all_points))
            district_names.append(f"district_{i+1}")
        
        # Create GeoDataFrame
        self.districts = gpd.GeoDataFrame({
            'name': district_names,
            'geometry': district_polygons,
            'population': [1000*i + 30000 for i in range(12)],  # Synthetic population
            'build_year': [1850 + 10*i for i in range(12)]  # Synthetic construction years
        })
        
        # Set lake boundary
        lake_boundary = []
        for theta in np.linspace(0, 2*np.pi, 100):
            x = 5000 + 3000 * np.cos(theta) * 1.5  # Elongated in x direction
            y = 5000 + 3000 * np.sin(theta) * 0.8  # Compressed in y direction
            lake_boundary.append((x, y))
        
        self.lake_zurich = Polygon(lake_boundary)
        
        print("Successfully loaded Zurich geographic data")
        return self
    
    @jit(nopython=True, parallel=True)
    def _shallow_water_step(self, h, u, v, z, dx, dt, g, n, rainfall):
        """Compute one time step of the 2D shallow water equations.
        
        Parameters:
            h (ndarray): Water depth
            u (ndarray): x-direction velocity
            v (ndarray): y-direction velocity
            z (ndarray): Elevation (DEM)
            dx (float): Grid spacing
            dt (float): Time step
            g (float): Gravity acceleration
            n (float): Manning roughness coefficient
            rainfall (ndarray): Rainfall intensity
            
        Returns:
            tuple: Updated h, u, v
        """
        ny, nx = h.shape
        h_new = np.zeros_like(h)
        u_new = np.zeros_like(u)
        v_new = np.zeros_like(v)
        
        # Compute fluxes
        for i in prange(1, ny-1):
            for j in range(1, nx-1):
                # Skip if the cell is not in the computational domain
                if h[i, j] < 1e-6 and rainfall[i, j] < 1e-9:
                    continue
                
                # Water surface elevation
                wse = h[i, j] + z[i, j]
                
                # x-direction flux
                if j < nx-1:
                    # East face
                    h_e = 0.5 * (h[i, j] + h[i, j+1])
                    if h_e > 1e-6:
                        # Compute momentum flux
                        u_e = 0.5 * (u[i, j] + u[i, j+1])
                        flux_e = h_e * u_e
                        
                        # Manning friction term
                        n_term = g * n**2 * u_e * np.abs(u_e) / h_e**(1/3)
                        
                        # Source term (pressure gradient)
                        source_e = -g * h_e * (z[i, j+1] + h[i, j+1] - z[i, j] - h[i, j]) / dx
                        
                        # Update velocity
                        u_new[i, j] = u[i, j] + dt * (source_e - n_term)
                        
                        # Update water depth
                        h_new[i, j] -= dt * flux_e / dx
                        h_new[i, j+1] += dt * flux_e / dx
                
                # y-direction flux
                if i < ny-1:
                    # North face
                    h_n = 0.5 * (h[i, j] + h[i+1, j])
                    if h_n > 1e-6:
                        # Compute momentum flux
                        v_n = 0.5 * (v[i, j] + v[i+1, j])
                        flux_n = h_n * v_n
                        
                        # Manning friction term
                        n_term = g * n**2 * v_n * np.abs(v_n) / h_n**(1/3)
                        
                        # Source term (pressure gradient)
                        source_n = -g * h_n * (z[i+1, j] + h[i+1, j] - z[i, j] - h[i, j]) / dx
                        
                        # Update velocity
                        v_new[i, j] = v[i, j] + dt * (source_n - n_term)
                        
                        # Update water depth
                        h_new[i, j] -= dt * flux_n / dx
                        h_new[i+1, j] += dt * flux_n / dx
                
                # Add rainfall
                h_new[i, j] += dt * rainfall[i, j]
        
        return h_new, u_new, v_new
    
    @jit(nopython=True)
    def _green_ampt_infiltration(self, h, soil_moisture, K_s, psi_f, theta_i, theta_s, dt):
        """Calculate infiltration using Green-Ampt method.
        
        Parameters:
            h (ndarray): Water depth
            soil_moisture (ndarray): Current soil moisture content
            K_s (ndarray): Saturated hydraulic conductivity
            psi_f (float): Suction head at wetting front
            theta_i (float): Initial moisture content
            theta_s (float): Saturated moisture content
            dt (float): Time step
            
        Returns:
            tuple: Infiltration amount, updated soil moisture
        """
        ny, nx = h.shape
        infiltration = np.zeros_like(h)
        new_moisture = soil_moisture.copy()
        
        # Available storage in soil
        available_storage = theta_s - soil_moisture
        
        # Only infiltrate where there's water
        wet_cells = h > 1e-6
        
        # Potential infiltration rate
        F = np.zeros_like(h)
        F[wet_cells] = K_s[wet_cells] * (1 + psi_f * (theta_s - theta_i) / F[wet_cells])
        
        # Actual infiltration limited by available water and storage
        infiltration = np.minimum(h, F * dt)
        infiltration = np.minimum(infiltration, available_storage * dt)
        
        # Update soil moisture
        new_moisture += infiltration / dt  # Convert volume to content
        
        return infiltration, new_moisture
    
    def _simulate_drainage(self, h, district_map, dt):
        """Simulate water drainage through urban drainage system.
        
        Parameters:
            h (ndarray): Water depth
            district_map (ndarray): Map of district indices
            dt (float): Time step
            
        Returns:
            ndarray: Drainage amount
        """
        # Base drainage capacity
        base_capacity = self.model_params['drainage']['capacity'] * (dt / 3600)  # Convert to m per timestep
        
        # Initialize drainage array
        drainage = np.zeros_like(h)
        
        # Apply drainage capacity by district
        for district_id, factor in self.model_params['drainage']['upgrade_factor'].items():
            district_num = int(district_id.split('_')[1])
            district_mask = district_map == district_num
            
            # Scale by district specific factor
            district_capacity = base_capacity * factor
            
            # Drainage limited by available water
            drainage[district_mask] = np.minimum(h[district_mask], district_capacity)
        
        return drainage
    
    def set_from_intensity(self, intensity_file, return_periods=None):
        """Set hazard from intensity file (NetCDF or raster).
        
        Parameters:
            intensity_file (str): Path to intensity file
            return_periods (list): List of return periods to extract
            
        Returns:
            ZurichFloodHazard: self
        """
        # Load intensity data
        try:
            if intensity_file.endswith('.nc'):
                dataset = xr.open_dataset(intensity_file)
                # Extract flood depths for given return periods
                if return_periods is None:
                    return_periods = [10, 30, 100, 300]
                
                # Process datasets and construct event set
                event_ids = []
                event_names = []
                frequencies = []
                intensity_list = []
                
                for rp in return_periods:
                    event_ids.append(int(rp))
                    event_names.append(f"RP{rp}")
                    frequencies.append(1.0/rp)
                    
                    # Extract flood depth for this return period
                    if f"depth_rp{rp}" in dataset:
                        depth = dataset[f"depth_rp{rp}"].values
                    else:
                        # If specific return period not available, interpolate
                        available_rps = [int(v.split('_rp')[1]) for v in dataset.variables 
                                        if v.startswith('depth_rp')]
                        available_rps.sort()
                        
                        if rp < min(available_rps):
                            depth = dataset[f"depth_rp{min(available_rps)}"].values * (rp / min(available_rps))
                        elif rp > max(available_rps):
                            depth = dataset[f"depth_rp{max(available_rps)}"].values * (rp / max(available_rps))
                        else:
                            # Linear interpolation
                            lower_rp = max([r for r in available_rps if r < rp])
                            upper_rp = min([r for r in available_rps if r > rp])
                            lower_depth = dataset[f"depth_rp{lower_rp}"].values
                            upper_depth = dataset[f"depth_rp{upper_rp}"].values
                            
                            weight = (rp - lower_rp) / (upper_rp - lower_rp)
                            depth = lower_depth * (1-weight) + upper_depth * weight
                    
                    # Flatten and store
                    intensity_list.append(depth.flatten())
                
                # Get grid coordinates
                lats = dataset['latitude'].values
                lons = dataset['longitude'].values
                
                # Create centroids
                xx, yy = np.meshgrid(lons, lats)
                coord_x = xx.flatten()
                coord_y = yy.flatten()
                
                # Set hazard attributes
                self.centroids.set_lat_lon(coord_y, coord_x)
                self.event_id = np.array(event_ids)
                self.event_name = event_names
                self.frequency = np.array(frequencies)
                self.intensity = csr_matrix(np.vstack(intensity_list))
                self.fraction = self.intensity.copy()
                self.fraction.data[:] = 1
                
            elif intensity_file.endswith(('.tif', '.asc')):
                # For raster files, assume it represents a single event
                with rasterio.open(intensity_file) as src:
                    depth = src.read(1)
                    transform = src.transform
                    
                    # Create a grid of coordinates
                    height, width = depth.shape
                    rows, cols = np.mgrid[0:height, 0:width]
                    x, y = rasterio.transform.xy(transform, rows, cols)
                    
                    # Flatten arrays
                    depth_flat = depth.flatten()
                    x_flat = np.array(x).flatten()
                    y_flat = np.array(y).flatten()
                    
                    # Filter out no-data values
                    valid = depth_flat != src.nodata
                    depth_flat = depth_flat[valid]
                    x_flat = x_flat[valid]
                    y_flat = y_flat[valid]
                    
                    # Set centroids
                    self.centroids.set_lat_lon(y_flat, x_flat)
                    
                    # Set single event
                    self.event_id = np.array([1])
                    self.event_name = ["Flood event"]
                    self.frequency = np.array([0.01])  # Assuming 100yr event
                    self.intensity = csr_matrix(depth_flat.reshape(1, -1))
                    self.fraction = self.intensity.copy()
                    self.fraction.data[:] = 1
            else:
                raise ValueError(f"Unsupported file format: {intensity_file}")
                
        except Exception as e:
            raise ValueError(f"Failed to load intensity file: {str(e)}")
        
        return self
    
    def simulate_events(self, num_events=10):
        """Simulate flood events using the shallow water model.
        
        Parameters:
            num_events (int): Number of events to simulate
            
        Returns:
            ZurichFloodHazard: self
        """
        if self.rainfall_data is None:
            raise ValueError("Rainfall data not loaded. Call set_from_copernicus_cds first.")
        
        if self.dem_data is None:
            raise ValueError("DEM data not loaded. Call load_zurich_data first.")
        
        # Initialize multi-processing pool
        if self.pool is None:
            self.pool = mp.Pool(processes=min(num_events, mp.cpu_count()))
        
        # Create district map for drainage simulation
        district_map = np.zeros_like(self.dem_data, dtype=int)
        
        # This would use a more sophisticated method in a real implementation
        # For demonstration, we use a simple radial pattern
        ny, nx = self.dem_data.shape
        center_y, center_x = ny // 2, nx // 2
        
        for y in range(ny):
            for x in range(nx):
                # Calculate angle from center
                angle = np.arctan2(y - center_y, x - center_x)
                # Normalize to 0-12 range
                district_idx = int(((angle + np.pi) / (2 * np.pi) * 12) % 12) + 1
                district_map[y, x] = district_idx
        
        # Sample rainfall events
        event_ids = []
        event_names = []
        frequencies = []
        date_stamps = []
        intensity_list = []
        
        # Select random time slices for events
        time_indices = np.random.choice(len(self.rainfall_data.time), size=num_events, replace=False)
        
        # Parallel simulation of events
        event_args = [(i, self.rainfall_data.isel(time=i).tp.values, 
                      self.dem_data, district_map, self.model_params) 
                      for i in time_indices]
        
        # Execute simulations in parallel
        results = self.pool.map(self._simulate_single_event, event_args)
        
        # Process results
        for i, (max_depth, event_time) in enumerate(results):
            event_date = self.rainfall_data.time[time_indices[i]].values
            
            event_ids.append(i+1)
            event_names.append(f"Event_{event_date}")
            
            # Calculate frequency (more intense events are rarer)
            mean_rainfall = self.rainfall_data.isel(time=time_indices[i]).tp.mean().values
            if mean_rainfall > 10:  # Heavy rain
                frequency = 1/100
            elif mean_rainfall > 5:  # Moderate rain
                frequency = 1/30
            else:  # Light rain
                frequency = 1/10
                
            frequencies.append(frequency)
            date_stamps.append(np.datetime64(event_date))
            
            # Store max depth
            intensity_list.append(max_depth.flatten())
        
        # Get grid coordinates
        ny, nx = self.dem_data.shape
        y = np.linspace(47.3, 47.4, ny)
        x = np.linspace(8.4, 8.6, nx)
        xx, yy = np.meshgrid(x, y)
        
        # Set hazard attributes
        self.centroids.set_lat_lon(yy.flatten(), xx.flatten())
        self.event_id = np.array(event_ids)
        self.event_name = event_names
        self.frequency = np.array(frequencies)
        self.date = np.array(date_stamps)
        self.intensity = csr_matrix(np.vstack(intensity_list))
        self.fraction = self.intensity.copy()
        self.fraction.data[:] = 1
        
        print(f"Successfully simulated {num_events} flood events")
        return self
    
    def _simulate_single_event(self, args):
        """Simulate a single flood event (called by multiprocessing).
        
        Parameters:
            args (tuple): Tuple containing (event_idx, rainfall, dem, district_map, model_params)
            
        Returns:
            tuple: (max_depth, event_time)
        """
        event_idx, rainfall, dem, district_map, model_params = args
        
        # Extract model parameters
        dx = model_params['dx']
        dt = model_params['dt']
        g = model_params['g']
        n = model_params['manning_n']
        sim_duration = model_params['sim_duration']
        
        # Initialize state variables
        ny, nx = dem.shape
        h = np.zeros((ny, nx))  # Water depth
        u = np.zeros((ny, nx))  # x-velocity
        v = np.zeros((ny, nx))  # y-velocity
        
        # Initialize soil moisture
        soil_moisture = np.ones((ny, nx)) * model_params['infiltration']['initial_moisture']
        
        # Initialize infiltration parameters
        theta_i = model_params['infiltration']['initial_moisture']
        theta_s = model_params['infiltration']['saturated_moisture']
        psi_f = model_params['infiltration']['suction_head']
        
        # Create hydraulic conductivity map based on districts
        k_s = np.zeros((ny, nx))
        for y in range(ny):
            for x in range(nx):
                district = district_map[y, x]
                if district <= 5:  # Inner city
                    k_s[y, x] = model_params['infiltration']['hydraulic_conductivity']['urban']
                elif district <= 10:  # Outer districts
                    k_s[y, x] = model_params['infiltration']['hydraulic_conductivity']['suburban']
                else:  # Outskirts
                    k_s[y, x] = model_params['infiltration']['hydraulic_conductivity']['parks']
        
        # Storage for maximum depth
        max_depth = np.zeros((ny, nx))
        
        # Convert rainfall from mm/h to m/s
        rainfall_rate = rainfall / 1000 / 3600
        
        # Main simulation loop
        num_steps = int(sim_duration / dt)
        for step in range(num_steps):
            t = step * dt
            
            # Calculate infiltration
            infiltration, soil_moisture = self._green_ampt_infiltration(
                h, soil_moisture, k_s, psi_f, theta_i, theta_s, dt)
            
            # Calculate drainage
            drainage = self._simulate_drainage(h, district_map, dt)
            
            # Update water depth due to infiltration and drainage
            h -= (infiltration + drainage)
            h = np.maximum(h, 0)  # Ensure non-negative depth
            
            # Shallow water update
            h, u, v = self._shallow_water_step(h, u, v, dem, dx, dt, g, n, rainfall_rate)
            
            # Update maximum depth
            max_depth = np.maximum(max_depth, h)
            
            # CFL condition check
            water_cells = h > 1e-6
            if water_cells.any():
                max_vel = max(np.max(np.abs(u[water_cells])), np.max(np.abs(v[water_cells])))
                if max_vel > 0:
                    wave_speed = np.sqrt(g * np.max(h[water_cells]))
                    cfl = (max_vel + wave_speed) * dt / dx
                    if cfl > model_params['cfl_number']:
                        # Reduce timestep and retry
                        dt_new = model_params['cfl_number'] * dx / (max_vel + wave_speed)
                        dt = max(dt_new * 0.8, dt/2)  # Apply safety factor
            
        # Return maximum depth and event time
        return max_depth, t
    
    def plot_hazard_map(self, event_id=None, ax=None, cmap='Blues', **kwargs):
        """Plot flood hazard map for a specified event.
        
        Parameters:
            event_id (int, optional): Event ID to plot. If None, plots the first event.
            ax (matplotlib.axes, optional): Axes to plot on.
            cmap (str): Colormap name
            **kwargs: Additional arguments to pass to matplotlib
            
        Returns:
            matplotlib.axes: Axes with the plot
        """
        if ax is None:
            _, ax = plt.subplots(figsize=(10, 8))
        
        if event_id is None:
            event_idx = 0
        else:
            event_idx = np.where(self.event_id == event_id)[0][0]
        
        # Extract event data
        event_intensity = self.intensity[event_idx].toarray().reshape(-1)
        
        # Get coordinates
        coords_x = self.centroids.lon
        coords_y = self.centroids.lat
        
        # Create grid for plotting
        grid_size = int(np.sqrt(len(coords_x)))
        grid_x = coords_x.reshape(grid_size, grid_size)
        grid_y = coords_y.reshape(grid_size, grid_size)
        grid_z = event_intensity.reshape(grid_size, grid_size)
        
        # Plot flood depth
        im = ax.pcolormesh(grid_x, grid_y, grid_z, cmap=cmap, **kwargs)
        
        # Add colorbar
        plt.colorbar(im, ax=ax, label='Flood Depth (m)')
        
        # Add districts if available
        if self.districts is not None:
            self.districts.boundary.plot(ax=ax, color='black', linewidth=0.5)
            
            # Add district labels
            for idx, row in self.districts.iterrows():
                ax.annotate(row['name'], xy=row.geometry.centroid.coords[0], 
                          ha='center', fontsize=8)
        
        # Add lake if available
        if self.lake_zurich is not None:
            ax.fill(*self.lake_zurich.exterior.xy, color='lightblue', alpha=0.6, 
                    label='Lake Zurich')
        
        # Set title and labels
        ax.set_title(f"Flood Hazard Map: {self.event_name[event_idx]}")
        ax.set_xlabel('Longitude')
        ax.set_ylabel('Latitude')
        
        # Add legend
        ax.legend()
        
        return ax
error: لطفا دانلود نفرمائید !!