“””ZurichFlood Exposure , extends CLIMADA Exposure for Zurich Urban flood risk assessment.
I Wrote this Code for Climada as a part of Urban_flood_Petal
you can download this code from my github Page in this Address: https://github.com/sepehrkiya/climada_petal_urban_flood
import numpy as np
import pandas as pd
import geopandas as gpd
from shapely.geometry import Point, Polygon
import matplotlib.pyplot as plt
import os
from climada.entity.exposures import Exposures
from climada.util.constants import SYSTEM_DIR
import rasterio
from rasterio.features import shapes
from affine import Affine
import requests
import json
from scipy.interpolate import griddata
class ZurichFloodExposure(Exposures):
"""ZurichFloodExposure class extends CLIMADA Exposures for Zurich flood exposure.
This class manages exposure data for flood risk assessment in Zurich,
including building stock, infrastructure, and critical facilities.
Attributes:
gdf (GeoDataFrame): GeoPandas DataFrame with exposure data
admin_boundaries (GeoDataFrame): Administrative boundaries of Zurich
building_types (dict): Building type classification and parameters
infrastructure_types (dict): Infrastructure type classification
construction_years (dict): Construction years by district
districts (GeoDataFrame): Districts of Zurich
building_footprints (GeoDataFrame): Building footprints
"""
def __init__(self):
"""Initialize Zurich flood exposure with default parameters."""
super().__init__()
self.admin_boundaries = None
self.building_types = {
'residential': {
'max_floors': 8,
'avg_value_per_sqm': 10000, # CHF
'avg_floor_area': 120, # m²
'proportion': 0.7
},
'commercial': {
'max_floors': 10,
'avg_value_per_sqm': 15000, # CHF
'avg_floor_area': 500, # m²
'proportion': 0.15
},
'industrial': {
'max_floors': 3,
'avg_value_per_sqm': 8000, # CHF
'avg_floor_area': 2000, # m²
'proportion': 0.05
},
'public': {
'max_floors': 5,
'avg_value_per_sqm': 12000, # CHF
'avg_floor_area': 1000, # m²
'proportion': 0.1
}
}
self.infrastructure_types = {
'road': {
'primary': {'value_per_km': 10000000, 'width': 15},
'secondary': {'value_per_km': 5000000, 'width': 10},
'tertiary': {'value_per_km': 2000000, 'width': 8},
'residential': {'value_per_km': 1000000, 'width': 6}
},
'railway': {'value_per_km': 20000000, 'width': 5},
'bridge': {'value_per_unit': 50000000},
'tunnel': {'value_per_km': 100000000},
'utility': {
'electricity': {'value_per_km': 5000000},
'water': {'value_per_km': 3000000},
'gas': {'value_per_km': 4000000}
}
}
self.construction_years = {
'district_1': {'mean': 1890, 'std': 30}, # Old town
'district_2': {'mean': 1910, 'std': 20},
'district_3': {'mean': 1930, 'std': 25},
'district_4': {'mean': 1950, 'std': 30},
'district_5': {'mean': 1970, 'std': 25},
'district_6': {'mean': 1990, 'std': 20},
'district_7': {'mean': 1960, 'std': 30},
'district_8': {'mean': 1980, 'std': 25},
'district_9': {'mean': 2000, 'std': 15},
'district_10': {'mean': 2010, 'std': 10}, # Newest development
'district_11': {'mean': 1970, 'std': 30},
'district_12': {'mean': 1950, 'std': 35}
}
self.districts = None
self.building_footprints = None
def set_from_openstreetmap(self, bounds=None):
"""Load exposure data from OpenStreetMap for Zurich.
Parameters:
bounds (tuple, optional): Bounding box (min_lon, min_lat, max_lon, max_lat)
Returns:
ZurichFloodExposure: self
"""
# Default bounds for Zurich
if bounds is None:
bounds = (8.4, 47.3, 8.6, 47.4)
min_lon, min_lat, max_lon, max_lat = bounds
# This would use the Overpass API in a real implementation
# For demonstration, we'll create synthetic data
# Create districts (synthetic data)
if self.districts is None:
self.districts = self._create_synthetic_districts()
# Generate building footprints
building_geometries = []
building_attributes = []
# Number of buildings to generate (scaled for demonstration)
num_buildings = 5000
for i in range(num_buildings):
# Random point within bounds
lon = min_lon + (max_lon - min_lon) * np.random.random()
lat = min_lat + (max_lat - min_lat) * np.random.random()
point = Point(lon, lat)
# Find which district the point is in
district = None
district_id = None
for idx, dist in self.districts.iterrows():
if dist.geometry.contains(point):
district = dist.name
district_id = idx + 1
break
if district is None:
continue # Skip if not in any district
# Building type based on proportions
r = np.random.random()
cum_prop = 0
building_type = None
for btype, params in self.building_types.items():
cum_prop += params['proportion']
if r <= cum_prop:
building_type = btype
break
# Building size and shape
width = np.random.normal(20, 5) if building_type != 'industrial' else np.random.normal(50, 15)
length = np.random.normal(20, 5) if building_type != 'industrial' else np.random.normal(50, 15)
rotation = np.random.random() * np.pi
# Create building polygon
corners = []
for dx, dy in [(-width/2, -length/2), (width/2, -length/2),
(width/2, length/2), (-width/2, length/2)]:
rotated_x = dx * np.cos(rotation) - dy * np.sin(rotation)
rotated_y = dx * np.sin(rotation) + dy * np.cos(rotation)
corner_lon = lon + rotated_x * 0.00001 # Approximate conversion to degrees
corner_lat = lat + rotated_y * 0.00001
corners.append((corner_lon, corner_lat))
building_geometry = Polygon(corners)
# Number of floors
max_floors = self.building_types[building_type]['max_floors']
floors = np.random.randint(1, max_floors + 1)
# Building value
value_per_sqm = self.building_types[building_type]['avg_value_per_sqm']
floor_area = self.building_types[building_type]['avg_floor_area']
building_value = value_per_sqm * floor_area * floors
# Construction year based on district
if district_id in range(1, 13):
district_key = f'district_{district_id}'
mean_year = self.construction_years[district_key]['mean']
std_year = self.construction_years[district_key]['std']
year = int(np.random.normal(mean_year, std_year))
year = max(1800, min(2023, year)) # Constrain to realistic range
else:
year = np.random.randint(1900, 2023)
# Building attributes
building_attributes.append({
'id': i + 1,
'type': building_type,
'floors': floors,
'area': floor_area * floors,
'value': building_value,
'year': year,
'district': district,
'district_id': district_id
})
building_geometries.append(building_geometry)
# Create GeoDataFrame
self.building_footprints = gpd.GeoDataFrame(
building_attributes,
geometry=building_geometries,
crs="EPSG:4326"
)
# Generate roads
road_geometries = []
road_attributes = []
# Generate some main roads
for district_id in range(1, 13):
# Get district center
district = self.districts.iloc[district_id-1]
center = district.geometry.centroid
# Generate roads radiating from center
for angle in np.linspace(0, 2*np.pi, 5):
road_type = 'primary' if angle < np.pi/2 else 'secondary'
# Road length
length = np.random.normal(0.02, 0.005) # In degrees
# Road endpoint
end_lon = center.x + length * np.cos(angle)
end_lat = center.y + length * np.sin(angle)
# Create road line
road_geom = [Point(center.x, center.y), Point(end_lon, end_lat)]
road_attributes.append({
'id': len(road_attributes) + 1,
'type': 'road',
'subtype': road_type,
'value': length * self.infrastructure_types['road'][road_type]['value_per_km'] * 100, # km to degree conversion
'district_id': district_id
})
road_geometries.append(road_geom)
# Add infrastructure data to building data
# For simplicity, we'll combine all data into a single GeoDataFrame
all_geometries = building_geometries + road_geometries
all_attributes = building_attributes + road_attributes
# Create final GeoDataFrame
self.gdf = gpd.GeoDataFrame(
all_attributes,
geometry=all_geometries,
crs="EPSG:4326"
)
# Convert to CLIMADA format
self.set_gdf(self.gdf)
# Override value column with building or infrastructure value
self.value = np.array([item.get('value', 0) for item in all_attributes])
print(f"Loaded {len(building_geometries)} buildings and {len(road_geometries)} roads")
return self
def set_from_census(self, census_file=None):
"""Set exposure from census data.
Parameters:
census_file (str, optional): Path to census data file
Returns:
ZurichFloodExposure: self
"""
# In a real implementation, this would load actual census data
# For demonstration, we'll use synthetic data based on districts
if self.districts is None:
self.districts = self._create_synthetic_districts()
# Generate population data by district
district_data = []
for idx, district in self.districts.iterrows():
district_id = idx + 1
district_name = district.name
# Population density depends on district (higher in central districts)
base_density = 15000 # persons/km²
density_factor = 1.0
if district_id <= 5: # Central districts
density_factor = 1.5
elif district_id <= 10: # Middle districts
density_factor = 1.0
else: # Outer districts
density_factor = 0.7
density = base_density * density_factor
# Calculate area in km²
area = district.geometry.area * 111**2 * np.cos(np.radians(47.4)) # Approximate conversion
# Calculate population
population = int(density * area)
# Calculate household statistics
avg_household_size = np.random.normal(2.1, 0.2)
num_households = int(population / avg_household_size)
# Calculate income and wealth statistics
if district_id <= 3: # High-income central districts
avg_income = np.random.normal(120000, 20000)
avg_wealth = avg_income * np.random.uniform(8, 12)
elif district_id <= 8: # Middle-income districts
avg_income = np.random.normal(90000, 15000)
avg_wealth = avg_income * np.random.uniform(6, 10)
else: # Lower-income outer districts
avg_income = np.random.normal(70000, 10000)
avg_wealth = avg_income * np.random.uniform(4, 8)
district_data.append({
'district_id': district_id,
'district_name': district_name,
'population': population,
'density': density,
'households': num_households,
'avg_household_size': avg_household_size,
'avg_income': avg_income,
'avg_wealth': avg_wealth,
'geometry': district.geometry
})
# Create census GeoDataFrame
census_gdf = gpd.GeoDataFrame(district_data, geometry='geometry', crs="EPSG:4326")
# If we have building footprints, distribute census data to buildings
if self.building_footprints is not None:
# Filter residential buildings
residential = self.building_footprints[self.building_footprints.type == 'residential']
# Calculate population per building by district
for idx, row in census_gdf.iterrows():
district_id = row['district_id']
district_pop = row['population']
# Get buildings in this district
district_buildings = residential[residential.district_id == district_id]
if len(district_buildings) > 0:
# Distribute population proportionally to building area
total_area = district_buildings.area.sum()
for bidx, building in district_buildings.iterrows():
# Calculate building population based on area proportion
building_pop = district_pop * (building.area / total_area)
# Update building data
self.building_footprints.at[bidx, 'population'] = building_pop
# Calculate building economic value based on occupants
avg_wealth = row['avg_wealth']
self.building_footprints.at[bidx, 'content_value'] = building_pop * avg_wealth * 0.3
# Store census data
self.census_data = census_gdf
print(f"Loaded census data for {len(census_gdf)} districts with total population of {census_gdf.population.sum():.0f}")
return self
def set_elevation_data(self, dem_data):
"""Set elevation data for exposure points.
Parameters:
dem_data (ndarray): Digital Elevation Model data
Returns:
ZurichFloodExposure: self
"""
if dem_data is None:
raise ValueError("DEM data is required")
# Get elevation for each exposure point
if hasattr(self, 'gdf') and 'geometry' in self.gdf.columns:
# Create coordinate grid for DEM data
ny, nx = dem_data.shape
lats = np.linspace(47.3, 47.4, ny)
lons = np.linspace(8.4, 8.6, nx)
grid_x, grid_y = np.meshgrid(lons, lats)
# Get coordinates of exposure points
point_coords = []
for geom in self.gdf.geometry:
if hasattr(geom, 'centroid'):
point_coords.append((geom.centroid.x, geom.centroid.y))
elif isinstance(geom, list) and all(isinstance(p, Point) for p in geom):
# For lines (e.g., roads), use the midpoint
mid_idx = len(geom) // 2
point_coords.append((geom[mid_idx].x, geom[mid_idx].y))
# Extract x and y coordinates
points_x = np.array([p[0] for p in point_coords])
points_y = np.array([p[1] for p in point_coords])
# Interpolate elevation at exposure points
elevations = griddata((grid_x.flatten(), grid_y.flatten()),
dem_data.flatten(),
(points_x, points_y),
method='linear')
# Add elevation to GeoDataFrame
self.gdf['elevation'] = elevations
# Update CLIMADA exposure attributes
self.elevation = elevations
print(f"Added elevation data to {len(elevations)} exposure points")
return self
def calculate_floor_height(self):
"""Calculate first floor height based on building type and age.
Returns:
ZurichFloodExposure: self
"""
if 'type' in self.gdf.columns and 'year' in self.gdf.columns:
# Create floor height array
floor_heights = np.zeros(len(self.gdf))
for i, (_, row) in enumerate(self.gdf.iterrows()):
if row.get('type') in self.building_types:
# Base height depends on building type
if row['type'] == 'residential':
base_height = 0.5 # 50 cm above ground
elif row['type'] == 'commercial':
base_height = 0.2 # 20 cm above ground (for accessibility)
elif row['type'] == 'industrial':
base_height = 0.3 # 30 cm above ground
else: # public
base_height = 0.4 # 40 cm above ground
# Adjust for construction year
year = row['year']
if year < 1950:
# Older buildings often have higher first floors
year_factor = 0.5 # Additional 50 cm
elif year < 1980:
year_factor = 0.3 # Additional 30 cm
elif year < 2000:
year_factor = 0.1 # Additional 10 cm
else:
year_factor = 0.0 # Modern buildings follow standard
# Check if building is in flood-prone district
district_id = row.get('district_id')
district_factor = 0.0
if district_id in [3, 4, 9]: # Districts with historical flooding
district_factor = 0.2 # Additional 20 cm
# Total floor height
floor_heights[i] = base_height + year_factor + district_factor
# Add floor height to GeoDataFrame
self.gdf['floor_height'] = floor_heights
# Add to CLIMADA exposure attributes
self.floor_height = floor_heights
print(f"Calculated floor heights for {len(floor_heights)} buildings")
return self
def assign_vulnerability_classes(self):
"""Assign vulnerability classes based on building characteristics.
Returns:
ZurichFloodExposure: self
"""
if 'type' in self.gdf.columns and 'year' in self.gdf.columns:
# Initialize vulnerability class column
vuln_classes = np.zeros(len(self.gdf), dtype=int)
for i, (_, row) in enumerate(self.gdf.iterrows()):
if 'type' in row and row['type'] in self.building_types:
building_type = row['type']
year = row.get('year', 2000)
# Base vulnerability class (1-5, with 5 being most vulnerable)
if building_type == 'residential':
base_class = 3
elif building_type == 'commercial':
base_class = 4
elif building_type == 'industrial':
base_class = 5
else: # public
base_class = 2
# Modify based on construction year
if year < 1950:
year_modifier = 1 # Older buildings more vulnerable
elif year < 1980:
year_modifier = 0
elif year < 2000:
year_modifier = -1
else:
year_modifier = -2 # Modern buildings less vulnerable
# Final class (constrained to 1-5 range)
vuln_class = max(1, min(5, base_class + year_modifier))
vuln_classes[i] = vuln_class
else:
# For infrastructure
if row.get('type') == 'road':
if row.get('subtype') == 'primary':
vuln_classes[i] = 2
else:
vuln_classes[i] = 3
else:
vuln_classes[i] = 4
# Add vulnerability class to GeoDataFrame
self.gdf['vuln_class'] = vuln_classes
# Add to CLIMADA exposure attributes
self.vulnerability_class = vuln_classes
print(f"Assigned vulnerability classes to {len(vuln_classes)} assets")
return self
def _create_synthetic_districts(self):
"""Create synthetic districts for Zurich.
Returns:
GeoDataFrame: Synthetic district boundaries
"""
# Create concentric districts from center
center_x, center_y = 8.54, 47.37 # Approximate center of Zurich
district_polygons = []
district_names = []
# Create 12 districts (Zurich has 12 Kreise)
for i in range(12):
angle_start = i * np.pi/6
angle_end = (i+1) * np.pi/6
# Inner city districts are smaller
inner_radius = 0.02 if i < 5 else 0.04
outer_radius = 0.04 if i < 5 else 0.08
# Create district polygon
angles = np.linspace(angle_start, angle_end, 20)
inner_points = [(center_x + inner_radius*np.cos(a), center_y + inner_radius*np.sin(a)) for a in angles]
outer_points = [(center_x + outer_radius*np.cos(a), center_y + outer_radius*np.sin(a)) for a in reversed(angles)]
all_points = inner_points + outer_points + [inner_points[0]]
district_polygons.append(Polygon(all_points))
district_names.append(f"District {i+1}")
# Create GeoDataFrame
districts = gpd.GeoDataFrame({
'name': district_names,
'geometry': district_polygons
}, crs="EPSG:4326")
return districts
def set_from_raster_population(self, population_raster=None):
"""Set exposure from a population density raster.
Parameters:
population_raster (str): Path to population density raster
Returns:
ZurichFloodExposure: self
"""
# In a real implementation, this would load actual population data
# For demonstration, we'll create synthetic data
if self.districts is None:
self.districts = self._create_synthetic_districts()
# Create a synthetic population density grid
grid_size = 100
lons = np.linspace(8.4, 8.6, grid_size)
lats = np.linspace(47.3, 47.4, grid_size)
xx, yy = np.meshgrid(lons, lats)
# Initialize population grid
population = np.zeros((grid_size, grid_size))
# Create population density based on distance from center
center_x, center_y = 8.54, 47.37
for i in range(grid_size):
for j in range(grid_size):
dist = np.sqrt((xx[i, j] - center_x)**2 + (yy[i, j] - center_y)**2)
# Higher density in city center, decreasing outward
if dist < 0.02: # Inner city
base_density = np.random.normal(20000, 5000)
elif dist < 0.04: # Middle districts
base_density = np.random.normal(10000, 3000)
elif dist < 0.08: # Outer districts
base_density = np.random.normal(5000, 2000)
else: # Outskirts
base_density = np.random.normal(2000, 1000)
# Convert to number of people in cell
cell_area = 0.002 * 0.001 * 111**2 * np.cos(np.radians(47.4)) # Approximate km²
cell_population = base_density * cell_area
population[i, j] = max(0, cell_population)
# Convert population grid to points
points = []
values = []
ids = []
for i in range(grid_size):
for j in range(grid_size):
if population[i, j] > 0:
points.append(Point(xx[i, j], yy[i, j]))
values.append(population[i, j])
ids.append(len(points))
# Create GeoDataFrame
population_gdf = gpd.GeoDataFrame({
'id': ids,
'population': values,
'geometry': points
}, crs="EPSG:4326")
# Calculate exposure value based on population
# Assuming 500,000 CHF per person (total assets)
population_gdf['value'] = population_gdf['population'] * 500000
# Assign to districts
district_ids = []
for _, point in population_gdf.iterrows():
district_id = 0
for idx, dist in self.districts.iterrows():
if dist.geometry.contains(point.geometry):
district_id = idx + 1
break
district_ids.append(district_id)
population_gdf['district_id'] = district_ids
# Set as exposure
self.gdf = population_gdf
self.set_gdf(population_gdf)
# Override value
self.value = np.array(values)
print(f"Created population-based exposure with {len(points)} points and "
f"total population of {sum(values):.0f}")
return self
def plot_exposure(self, column=None, ax=None, **kwargs):
"""Plot exposure data.
Parameters:
column (str, optional): Column to plot
ax (matplotlib.axes, optional): Axes to plot on
**kwargs: Additional arguments to pass to plot
Returns:
matplotlib.axes: Axes with plot
"""
if ax is None:
_, ax = plt.subplots(figsize=(10, 8))
# Plot districts as background
if self.districts is not None:
self.districts.boundary.plot(ax=ax, color='black', linewidth=0.5)
# Plot exposure
if column is None:
column = 'value'
if column in self.gdf.columns:
# Normalize values for color scaling
norm_values = self.gdf[column] / self.gdf[column].max()
# Plot each geometry with color based on value
for idx, row in self.gdf.iterrows():
geom = row.geometry
value = norm_values.iloc[idx]
if hasattr(geom, 'exterior'):
# Polygon
ax.fill(*geom.exterior.xy, alpha=0.6, color=plt.cm.viridis(value))
elif isinstance(geom, Point):
# Point - size based on value
ax.scatter(geom.x, geom.y, s=value*100, color=plt.cm.viridis(value), alpha=0.6)
elif isinstance(geom, list) and all(isinstance(p, Point) for p in geom):
# Line
x = [p.x for p in geom]
y = [p.y for p in geom]
ax.plot(x, y, color=plt.cm.viridis(value), linewidth=2+value*3, alpha=0.6)
# Add colorbar
sm = plt.cm.ScalarMappable(cmap=plt.cm.viridis)
sm.set_array([])
plt.colorbar(sm, ax=ax, label=column)
# Set title and labels
ax.set_title(f"Zurich Exposure: {column}")
ax.set_xlabel('Longitude')
ax.set_ylabel('Latitude')
return ax