"""
Source code for Global Multihazard Transport Risk Analysis (GMTRA)
Functions to perform the exposure analysis.
Copyright (C) 2019 Elco Koks. All versions released under the GNU Affero General Public License v3.0 license.
"""
import os
import pandas
import geopandas
pandas.options.mode.chained_assignment = None
from gmtra.utils import load_config,total_length_risk,exposed_length_risk
pandas.set_option('chained_assignment',None)
[docs]def regional_roads(n,prot_lookup,data_path):
"""
Function to get summarized exposure values for each region for all road assets.
Arguments:
*n* : the index ID of a region in the specified shapefile with all the regions.
*prot_lookup* : dictionary with dike design standards for a region.
*data_path* : file path to location of all data.
Returns:
*dataframe* : a pandas DataFrame with exposure statistics.
"""
# specify the file path where all data is located.
data_path = load_config()['paths']['data']
# load shapefile with unique information for each region
global_regions = geopandas.read_file(os.path.join(data_path,'input_data','global_regions_v2.shp'))
# grab the row of the region from the global region shapefile
region = global_regions.iloc[n]
print('{} started!'.format(region.GID_2))
try:
# load ID and income group for the region
ID = region.GID_2
wbincome = region.wbincome
# specify all unique hazard abbrevations
hazards = ['EQ','Cyc','PU','FU','CF']
collect_risks = []
# load regional statistics
reg_stats = pandas.read_csv(os.path.join(data_path,'road_stats','{}_stats.csv'.format(ID)))
# loop over all hazards
for hazard in hazards:
# read exposure data
df= pandas.read_feather(os.path.join(data_path,'output_{}_full'.format(hazard),'{}_{}.ft'.format(ID,hazard)))
# correct for protection standards for fluval and coastal flooding
if (hazard == 'FU') | (hazard == 'CF'):
prot_stand = prot_lookup[ID]
no_floods= [x for x in [x for x in df.columns if ('val' in x)] if prot_stand > int(x.split('-')[1])]
df[no_floods] = 0
# correct for (assumed) design standards for surface flooding
if (hazard == 'PU'):
if wbincome == 'HIC':
df.loc[df.road_type.isin(['primary','secondary']),['val_PU-5','val_PU-10','val_PU-20','val_PU-50']] = 0
df.loc[df.road_type.isin(['tertiary''track']),['val_PU-5','val_PU-10','val_PU-20']] = 0
elif wbincome == 'UMC':
df.loc[df.road_type.isin(['primary','secondary']),['val_PU-5','val_PU-10','val_PU-20']] = 0
df.loc[df.road_type.isin(['tertiary''track']),['val_PU-5','val_PU-10']] = 0
else:
df.loc[df.road_type.isin(['primary','secondary']),['val_PU-5','val_PU-10']] = 0
df.loc[df.road_type.isin(['tertiary''track']),['val_PU-5','val_PU-10']] = 0
# correct for (assumed) design standards for river flooding
if (hazard == 'FU'):
if wbincome == 'HIC':
df.loc[df.road_type.isin(['primary','secondary']),['val_FU-5','val_FU-10','val_FU-20','val_FU-50']] = 0
df.loc[df.road_type.isin(['tertiary''track']),['val_FU-5','val_FU-10','val_FU-20',]] = 0
elif wbincome == 'UMC':
df.loc[df.road_type.isin(['primary','secondary']),['val_FU-5','val_FU-10','val_FU-20']] = 0
df.loc[df.road_type.isin(['tertiary''track']),['val_FU-5','val_FU-10']] = 0
else:
df.loc[df.road_type.isin(['primary','secondary']),['val_FU-5','val_FU-10']] = 0
df.loc[df.road_type.isin(['tertiary''track']),['val_FU-5','val_FU-10']] = 0
# correct for (assumed) design standards for coastal flooding
if (hazard == 'CF'):
if wbincome == 'HIC':
df.loc[df.road_type.isin(['primary','secondary']),['val_CF-10','val_CF-20','val_CF-50']] = 0
df.loc[df.road_type.isin(['tertiary''track']),['val_CF-10','val_CF-20']] = 0
elif wbincome == 'UMC':
df.loc[df.road_type.isin(['primary','secondary']),['val_CF-10','val_CF-20']] = 0
df.loc[df.road_type.isin(['tertiary''track']),['val_CF-10']] = 0
else:
df.loc[df.road_type.isin(['primary','secondary']),['val_CF-10']] = 0
df.loc[df.road_type.isin(['tertiary''track']),['val_CF-10']] = 0
if hazard == 'EQ':
reg_df = df.copy()
elif hazard != 'EQ':
reg_df = reg_df.merge(df[[x for x in df.columns if ('val_' in x) | ('length_' in x)]+['osm_id']],left_on='osm_id',right_on='osm_id')
# something went wrong in the order of the azard maps, correct that here.
if hazard == 'EQ':
event_list = ['EQ_rp250','EQ_rp475','EQ_rp975','EQ_rp1500','EQ_rp2475'] #
RPS = [1/250,1/475,1/975,1/1500,1/2475]
cat_list = [1,2,3,4]
bins = [-1,92,180,340,650,2000]
df = df.rename({'val_EQ_rp250':'val_EQ_rp475',
'val_EQ_rp475':'val_EQ_rp1500',
'val_EQ_rp975':'val_EQ_rp250',
'val_EQ_rp1500':'val_EQ_rp2475',
'val_EQ_rp2475':'val_EQ_rp975',
'length_EQ_rp250':'length_EQ_rp475',
'length_EQ_rp475':'length_EQ_rp1500',
'length_EQ_rp975':'length_EQ_rp250',
'length_EQ_rp1500':'length_EQ_rp2475',
'length_EQ_rp2475':'length_EQ_rp975',}, axis='columns')
elif hazard == 'Cyc':
event_list = ['Cyc_rp50','Cyc_rp100','Cyc_rp250','Cyc_rp500','Cyc_rp1000']
RPS = [1/50,1/100,1/250,1/500,1/1000]
cat_list = [1,2,3,4]
bins = [-1,154,178,209,252,1000]
df = df.rename({'val_Cyc_rp100':'val_Cyc_rp1000',
'val_Cyc_rp500':'val_Cyc_rp100',
'val_Cyc_rp1000':'val_Cyc_rp500',
'length_Cyc_rp100':'length_Cyc_rp1000',
'length_Cyc_rp500':'length_Cyc_rp100',
'length_Cyc_rp1000':'length_Cyc_rp500'}, axis='columns')
elif hazard == 'FU':
event_list = ['FU-5', 'FU-10', 'FU-20', 'FU-50', 'FU-75', 'FU-100', 'FU-200', 'FU-250','FU-500', 'FU-1000']
RPS = [1/5,1/10,1/20,1/50,1/75,1/100,1/200,1/250,1/500,1/1000]
cat_list = [1,2,3,4]
bins = [-1,25,50,100,200,2000]
elif hazard == 'PU':
event_list = ['PU-5', 'PU-10', 'PU-20', 'PU-50', 'PU-75', 'PU-100', 'PU-200', 'PU-250','PU-500', 'PU-1000']
RPS = [1/5,1/10,1/20,1/50,1/75,1/100,1/200,1/250,1/500,1/1000]
cat_list = [1,2,3,4]
bins = [-1,25,50,100,200,2000]
elif hazard == 'CF':
event_list = ['CF-10', 'CF-20', 'CF-50', 'CF-100', 'CF-200', 'CF-500', 'CF-1000']
RPS = [1/10,1/20,1/50,1/100,1/200,1/500,1/1000]
cat_list = [1,2,3,4]
bins = [-1,25,50,100,200,2000]
# calculate the annual kilometers of total possible roads for each asset
reg_stats[hazard] = reg_stats.apply(lambda x: total_length_risk(x,RPS),axis=1)
# bin this into the four risk categories, as specified in the Supplementary Materials of Koks et al. (2019)
for event in event_list:
reg_df['binned_{}'.format(event)] = pandas.cut(reg_df['val_{}'.format(event)], bins=bins, labels=[0]+cat_list)
get_all_cats = []
# calculate the annual exposed kilometers of road per risk category per asset type
for cat in cat_list[:]:
get_all_events = []
for event in event_list:
event_sep = reg_df.loc[reg_df['binned_{}'.format(event)] == cat][['length_{}'.format(event),'country','region','continent','road_type']]
cont_out = pandas.DataFrame(event_sep.groupby(['continent','country','region','road_type'])['length_{}'.format(event)].sum())
get_all_events.append(cont_out)
cat_df = pandas.concat(get_all_events,axis=1)
cat_df = cat_df.fillna(0)
if len(cat_df) == 0:
cat_df = pandas.DataFrame(columns = list(cat_df.columns)+['risk_{}_{}'.format(cat,hazard)],index=df.groupby(['continent','country','region','road_type']).sum().index).fillna(0)
else:
cat_df['risk_{}_{}'.format(cat,hazard)] = cat_df.apply(lambda x: exposed_length_risk(x,hazard,RPS),axis=1)
cat_df.loc[cat_df['risk_{}_{}'.format(cat,hazard)] < 0] = 0
cat_df.reset_index(inplace=True)
get_all_cats.append(cat_df.groupby(['continent','country','region','road_type']).sum()['risk_{}_{}'.format(cat,hazard)])
collect_risks.append(pandas.concat(get_all_cats,axis=1).fillna(0))
# return results to be saved in one big file for all regions combined
return (pandas.concat(collect_risks,axis=1).fillna(0))
except Exception as e:
print('Failed to finish {} because of {}!'.format(region.GID_2,e))
[docs]def regional_railway(n,prot_lookup,data_path):
"""
Function to get summarized exposure values for each region for all railway assets.
Arguments:
*n* : the index ID of a region in the specified shapefile with all the regions.
*prot_lookup* : dictionary with dike design standards for a region.
*data_path* : file path to location of all data.
Returns:
*dataframe* : a pandas DataFrame with exposure statistics.
"""
# specify the file path where all data is located.
data_path = load_config()['paths']['data']
# load shapefile with unique information for each region
global_regions = geopandas.read_file(os.path.join(data_path,'input_data','global_regions_v2.shp'))
# grab the row of the region from the global region shapefile
region = global_regions.iloc[n]
print('{} started!'.format(region.GID_2))
try:
# load ID and income group for the region
ID = region.GID_2
wbincome = region.wbincome
# specify all unique hazard abbrevations
hazards = ['EQ','Cyc','PU','FU','CF']
collect_risks = []
# load regional statistics
reg_stats = pandas.read_csv(os.path.join(data_path,'railway_stats','{}_stats.csv'.format(ID)))
# loop over all hazards
for hazard in hazards:
try:
# read exposure data
df= pandas.read_feather(os.path.join(data_path,'output_{}_rail_full'.format(hazard),'{}_{}.ft'.format(ID,hazard)))
except:
continue
# correct for protection standards for fluval and coastal flooding
if (hazard == 'FU') | (hazard == 'CF'):
prot_stand = prot_lookup[ID]
no_floods= [x for x in [x for x in df.columns if ('val' in x)] if prot_stand > int(x.split('-')[1])]
df[no_floods] = 0
# correct for (assumed) design standards for surface flooding
if (hazard == 'PU'):
if wbincome == 'HIC':
df.loc[:,['val_PU-5','val_PU-10','val_PU-20','val_PU-50']] = 0
elif wbincome == 'UMC':
df.loc[:,['val_PU-5','val_PU-10','val_PU-20']] = 0
else:
df.loc[:,['val_PU-5','val_PU-10',]] = 0
# correct for (assumed) design standards for river flooding
if (hazard == 'FU'):
if wbincome == 'HIC':
df.loc[:,['val_FU-5','val_FU-10','val_FU-20','val_FU-50']] = 0
elif wbincome == 'UMC':
df.loc[:,['val_FU-5','val_FU-10','val_FU-20',]] = 0
else:
df.loc[:,['val_FU-5','val_FU-10']] = 0
# correct for (assumed) design standards for coastal flooding
if (hazard == 'CF'):
if wbincome == 'HIC':
df.loc[:,['val_CF-10','val_CF-20','val_CF-50',]] = 0
elif wbincome == 'UMC':
df.loc[:,['val_CF-10','val_CF-20']] = 0
else:
df.loc[:,['val_CF-10']] = 0
if hazard == 'EQ':
reg_df = df.copy()
elif hazard != 'EQ':
reg_df = reg_df.merge(df[[x for x in df.columns if ('val_' in x) | ('length_' in x)]+['osm_id']],left_on='osm_id',right_on='osm_id')
# something went wrong in the order of the azard maps, correct that here.
if hazard == 'EQ':
event_list = ['EQ_rp250','EQ_rp475','EQ_rp975','EQ_rp1500','EQ_rp2475'] #
RPS = [1/250,1/475,1/975,1/1500,1/2475]
cat_list = [1,2,3,4]
bins = [-1,92,180,340,650,2000]
df = df.rename({'val_EQ_rp250':'val_EQ_rp475',
'val_EQ_rp475':'val_EQ_rp1500',
'val_EQ_rp975':'val_EQ_rp250',
'val_EQ_rp1500':'val_EQ_rp2475',
'val_EQ_rp2475':'val_EQ_rp975',
'length_EQ_rp250':'length_EQ_rp475',
'length_EQ_rp475':'length_EQ_rp1500',
'length_EQ_rp975':'length_EQ_rp250',
'length_EQ_rp1500':'length_EQ_rp2475',
'length_EQ_rp2475':'length_EQ_rp975',}, axis='columns')
elif hazard == 'Cyc':
event_list = ['Cyc_rp50','Cyc_rp100','Cyc_rp250','Cyc_rp500','Cyc_rp1000']
RPS = [1/50,1/100,1/250,1/500,1/1000]
cat_list = [1,2,3,4]
bins = [-1,154,178,209,252,1000]
df = df.rename({'val_Cyc_rp100':'val_Cyc_rp1000',
'val_Cyc_rp500':'val_Cyc_rp100',
'val_Cyc_rp1000':'val_Cyc_rp500',
'length_Cyc_rp100':'length_Cyc_rp1000',
'length_Cyc_rp500':'length_Cyc_rp100',
'length_Cyc_rp1000':'length_Cyc_rp500'}, axis='columns')
elif hazard == 'FU':
event_list = ['FU-5', 'FU-10', 'FU-20', 'FU-50', 'FU-75', 'FU-100', 'FU-200', 'FU-250','FU-500', 'FU-1000']
RPS = [1/5,1/10,1/20,1/50,1/75,1/100,1/200,1/250,1/500,1/1000]
cat_list = [1,2,3,4]
bins = [-1,25,50,100,200,2000]
elif hazard == 'PU':
event_list = ['PU-5', 'PU-10', 'PU-20', 'PU-50', 'PU-75', 'PU-100', 'PU-200', 'PU-250','PU-500', 'PU-1000']
RPS = [1/5,1/10,1/20,1/50,1/75,1/100,1/200,1/250,1/500,1/1000]
cat_list = [1,2,3,4]
bins = [-1,25,50,100,200,2000]
elif hazard == 'CF':
event_list = ['CF-10', 'CF-20', 'CF-50', 'CF-100', 'CF-200', 'CF-500', 'CF-1000']
RPS = [1/10,1/20,1/50,1/100,1/200,1/500,1/1000]
cat_list = [1,2,3,4]
bins = [-1,25,50,100,200,2000]
# calculate the annual kilometers of total possible roads for each asset
reg_stats[hazard] = reg_stats.apply(lambda x: total_length_risk(x,RPS),axis=1)
# bin this into the four risk categories, as specified in the Supplementary Materials of Koks et al. (2019)
for event in event_list:
reg_df['binned_{}'.format(event)] = pandas.cut(reg_df['val_{}'.format(event)], bins=bins, labels=[0]+cat_list)
get_all_cats = []
# calculate the annual exposed kilometers of road per risk category per asset type
for cat in cat_list[:]:
get_all_events = []
for event in event_list:
event_sep = reg_df.loc[reg_df['binned_{}'.format(event)] == cat][['length_{}'.format(event),'country','region','continent','infra_type']]
cont_out = pandas.DataFrame(event_sep.groupby(['continent','country','region','infra_type'])['length_{}'.format(event)].sum())
get_all_events.append(cont_out)
cat_df = pandas.concat(get_all_events,axis=1)
cat_df = cat_df.fillna(0)
if len(cat_df) == 0:
cat_df = pandas.DataFrame(columns = list(cat_df.columns)+['risk_{}_{}'.format(cat,hazard)],index=df.groupby(['continent','country','region','infra_type']).sum().index).fillna(0)
else:
cat_df['risk_{}_{}'.format(cat,hazard)] = cat_df.apply(lambda x: exposed_length_risk(x,hazard,RPS),axis=1)
cat_df.loc[cat_df['risk_{}_{}'.format(cat,hazard)] < 0] = 0
cat_df.reset_index(inplace=True)
get_all_cats.append(cat_df.groupby(['continent','country','region','infra_type']).sum()['risk_{}_{}'.format(cat,hazard)])
collect_risks.append(pandas.concat(get_all_cats,axis=1).fillna(0))
# return results to be saved in one big file for all regions combined
return (pandas.concat(collect_risks,axis=1).fillna(0))
except Exception as e:
print('Failed to finish {} because of {}!'.format(region.GID_2,e))