-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcompute_ecmwf_rapid.py
executable file
·252 lines (217 loc) · 10.5 KB
/
compute_ecmwf_rapid.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
#!/usr/bin/python
import csv
import datetime
import os
import re
from subprocess import Popen
import sys
from erfp_data_process_ubuntu_aws.CreateInflowFileFromECMWFRunoff import CreateInflowFileFromECMWFRunoff
from erfp_data_process_ubuntu_aws.make_CF_RAPID_output import convert_ecmwf_rapid_output_to_cf_compliant
#------------------------------------------------------------------------------
#functions
#------------------------------------------------------------------------------
def case_insensitive_file_search(directory, pattern):
"""
Looks for file with patter with case insensitive search
"""
try:
return os.path.join(directory,
[filename for filename in os.listdir(directory) \
if re.search(pattern, filename, re.IGNORECASE)][0])
except IndexError:
print pattern, "not found"
raise
def csv_to_list(csv_file, delimiter=','):
"""
Reads in a CSV file and returns the contents as list,
where every row is stored as a sublist, and each element
in the sublist represents 1 cell in the table.
"""
with open(csv_file, 'rb') as csv_con:
reader = csv.reader(csv_con, delimiter=delimiter)
return list(reader)
def generate_namelist_file(rapid_io_files_location, watershed, subbasin,
ensemble_number, forecast_date_timestep, init_flow = False):
"""
Generate RAPID namelist file with new input
"""
rapid_input_directory = os.path.join(rapid_io_files_location, "rapid_input")
watershed_namelist_file = os.path.join(rapid_io_files_location, 'rapid_namelist')
template_namelist_file = case_insensitive_file_search(os.path.join(rapid_io_files_location, 'erfp_data_process_ubuntu_aws'),
'rapid_namelist_template\.dat')
#get rapid connect info
rapid_connect_file = case_insensitive_file_search(rapid_input_directory, r'rapid_connect\.csv')
rapid_connect_table = csv_to_list(rapid_connect_file)
is_riv_tot = len(rapid_connect_table)
is_max_up = max([int(float(row[2])) for row in rapid_connect_table])
#get riv_bas_id info
riv_bas_id_file = case_insensitive_file_search(rapid_input_directory, r'riv_bas_id.*?\.csv')
riv_bas_id_table = csv_to_list(riv_bas_id_file)
is_riv_bas = len(riv_bas_id_table)
#default duration of 15 days
duration = 15*24*60*60
#default interval of 6 hrs
interval = 6*60*60
#if it is high res
if(int(ensemble_number) == 52):
#duration of 10 days
duration = 10*24*60*60
#interval of 3 hrs
#interval = 3*60*60
qinit_file = None
if(init_flow):
#check for qinit file
past_date = (datetime.datetime.strptime(forecast_date_timestep[:11],"%Y%m%d.%H") - \
datetime.timedelta(hours=12)).strftime("%Y%m%dt%H")
qinit_file = os.path.join(rapid_input_directory, 'Qinit_%s.csv' % past_date)
init_flow = qinit_file and os.path.exists(qinit_file)
if not init_flow:
print "Error:", qinit_file, "not found. Not initializing ..."
old_file = open(template_namelist_file)
new_file = open(watershed_namelist_file,'w')
for line in old_file:
if line.strip().startswith('BS_opt_Qinit'):
if (init_flow):
new_file.write('BS_opt_Qinit =.true.\n')
else:
new_file.write('BS_opt_Qinit =.false.\n')
elif line.strip().startswith('ZS_TauM'):
new_file.write('ZS_TauM =%s\n' % duration)
elif line.strip().startswith('ZS_dtM'):
new_file.write('ZS_dtM =%s\n' % 86400)
elif line.strip().startswith('ZS_TauR'):
new_file.write('ZS_TauR =%s\n' % interval)
elif line.strip().startswith('IS_riv_tot'):
new_file.write('IS_riv_tot =%s\n' % is_riv_tot)
elif line.strip().startswith('rapid_connect_file'):
new_file.write('rapid_connect_file =\'%s\'\n' % rapid_connect_file)
elif line.strip().startswith('IS_max_up'):
new_file.write('IS_max_up =%s\n' % is_max_up)
elif line.strip().startswith('Vlat_file'):
new_file.write('Vlat_file =\'%s\'\n' % os.path.join(rapid_io_files_location,
'm3_riv_bas_%s.nc' % ensemble_number))
elif line.strip().startswith('IS_riv_bas'):
new_file.write('IS_riv_bas =%s\n' % is_riv_bas)
elif line.strip().startswith('riv_bas_id_file'):
new_file.write('riv_bas_id_file =\'%s\'\n' % riv_bas_id_file)
elif line.strip().startswith('Qinit_file'):
if (init_flow):
new_file.write('Qinit_file =\'%s\'\n' % qinit_file)
else:
new_file.write('Qinit_file =\'\'\n')
elif line.strip().startswith('k_file'):
new_file.write('k_file =\'%s\'\n' % case_insensitive_file_search(rapid_input_directory,
r'k\.csv'))
elif line.strip().startswith('x_file'):
new_file.write('x_file =\'%s\'\n' % case_insensitive_file_search(rapid_input_directory,
r'x\.csv'))
elif line.strip().startswith('Qout_file'):
new_file.write('Qout_file =\'%s\'\n' % os.path.join(rapid_io_files_location,
'Qout_%s_%s_%s.nc' % (watershed.lower(),
subbasin.lower(),
ensemble_number)))
else:
new_file.write(line)
#close temp file
new_file.close()
old_file.close()
def run_RAPID_single_watershed(forecast, watershed, subbasin,
rapid_executable_location, node_path, init_flow):
"""
run RAPID on single watershed after ECMWF prepared
"""
forecast_split = os.path.basename(forecast).split(".")
ensemble_number = int(forecast_split[2])
forecast_date_timestep = ".".join(forecast_split[:2])
rapid_namelist_file = os.path.join(node_path,'rapid_namelist')
local_rapid_executable = os.path.join(node_path,'rapid')
#create link to RAPID
os.symlink(rapid_executable_location, local_rapid_executable)
time_start_rapid = datetime.datetime.utcnow()
#change the new RAPID namelist file
print "Updating namelist file for:", watershed, subbasin, ensemble_number
generate_namelist_file(node_path, watershed, subbasin, ensemble_number,
forecast_date_timestep, init_flow)
def rapid_cleanup(local_rapid_executable, rapid_namelist_file):
"""
Cleans up the rapid files generated by the process
"""
#remove rapid link
try:
os.unlink(local_rapid_executable)
os.remove(local_rapid_executable)
except OSError:
pass
#remove namelist file
try:
os.remove(rapid_namelist_file)
except OSError:
pass
#run RAPID
print "Running RAPID for:", subbasin, "Ensemble:", ensemble_number
try:
process = Popen([local_rapid_executable], shell=True)
process.communicate()
except Exception:
rapid_cleanup(local_rapid_executable, rapid_namelist_file)
raise
print "Time to run RAPID:",(datetime.datetime.utcnow()-time_start_rapid)
rapid_cleanup(local_rapid_executable, rapid_namelist_file)
#convert rapid output to be CF compliant
convert_ecmwf_rapid_output_to_cf_compliant(datetime.datetime.strptime(forecast_date_timestep[:11], "%Y%m%d.%H"),
node_path)
def process_upload_ECMWF_RAPID(ecmwf_forecast, watershed, subbasin,
rapid_executable_location, init_flow):
"""
prepare all ECMWF files for rapid
"""
node_path = os.path.dirname(os.path.realpath(__file__))
forecast_basename = os.path.basename(ecmwf_forecast)
forecast_split = forecast_basename.split(".")
forecast_date_timestep = ".".join(forecast_split[:2])
ensemble_number = int(forecast_split[2])
old_rapid_input_directory = os.path.join(node_path, "%s-%s" % (watershed, subbasin))
rapid_input_directory = os.path.join(node_path, "rapid_input")
#rename rapid input directory
os.rename(old_rapid_input_directory, rapid_input_directory)
inflow_file_name = 'm3_riv_bas_%s.nc' % ensemble_number
#determine weight table from resolution
if ensemble_number == 52:
weight_table_file = case_insensitive_file_search(rapid_input_directory,
r'weight_high_res.csv')
else:
weight_table_file = case_insensitive_file_search(rapid_input_directory,
r'weight_low_res.csv')
time_start_all = datetime.datetime.utcnow()
def remove_inflow_file(inflow_file_name):
"""
remove inflow file generated from ecmwf downscaling
"""
print "Cleaning up"
#remove inflow file
try:
os.remove(inflow_file_name)
except OSError:
pass
#RUN CALCULATIONS
try:
#prepare ECMWF file for RAPID
print "Running all ECMWF downscaling for watershed:", watershed, subbasin, \
forecast_date_timestep, ensemble_number
print "Converting ECMWF inflow"
#optional argument ... time interval?
RAPIDinflowECMWF_tool = CreateInflowFileFromECMWFRunoff()
RAPIDinflowECMWF_tool.execute(forecast_basename, weight_table_file, inflow_file_name)
time_finish_ecmwf = datetime.datetime.utcnow()
print "Time to convert ECMWF: %s" % (time_finish_ecmwf-time_start_all)
run_RAPID_single_watershed(forecast_basename, watershed, subbasin,
rapid_executable_location, node_path, init_flow)
except Exception:
remove_inflow_file(inflow_file_name)
raise
#CLEAN UP
remove_inflow_file(inflow_file_name)
time_stop_all = datetime.datetime.utcnow()
print "Total time to compute: %s" % (time_stop_all-time_start_all)
if __name__ == "__main__":
process_upload_ECMWF_RAPID(sys.argv[1],sys.argv[2], sys.argv[3], sys.argv[4], sys.argv[5])