1 # Copyright (C) 2013-2016 Martin Vejmelka, UC Denver
3 # Permission is hereby granted, free of charge, to any person obtaining a copy
4 # of this software and associated documentation files (the "Software"), to deal
5 # in the Software without restriction, including without limitation the rights
6 # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies
7 # of the Software, and to permit persons to whom the Software is furnished to do
8 # so, subject to the following conditions:
10 # The above copyright notice and this permission notice shall be included in all
11 # copies or substantial portions of the Software.
13 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED,
14 # INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR
15 # A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
16 # HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
17 # OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
20 from __future__
import absolute_import
21 from __future__
import print_function
22 from utils
import to_esmf
, to_utc
, rm
23 from datetime
import datetime
, timedelta
30 from subprocess
import Popen
, call
, check_output
36 def select_grib_source(start_time
):
37 now
= datetime
.utcnow().replace(tzinfo
=pytz
.UTC
)
38 if now
- start_time
< timedelta(days
=30):
43 def simulation_paths(sim_id
, conf
):
45 Get paths to simulation files.
47 :param sim_id: the simulation id
48 :param conf: configuration
50 return {'log_path' : conf
['logs_path'] + '/' + sim_id
+ '.log' ,
51 'json_path' : conf
['jobs_path'] + '/' + sim_id
+ '.json',
52 'state_path' : conf
['sims_path'] + '/' + sim_id
+ '.json',
53 'run_script' : conf
['jobs_path'] + '/' + sim_id
+ '.sh',
54 'kml_path': conf
['jobs_path'] + '/' + sim_id
+ '.kml'}
56 def cancel_simulation(sim_info
,conf
):
58 Cancel simulation job. Do not delete files.
59 :param sim_info: the simulation json
60 :param conf: configuration
62 cmd
= osp
.abspath(osp
.join(conf
['wrfxpy_path'],'cleanup.sh'))
63 job_id
= sim_info
['job_id']
64 exe
= [cmd
, 'cancel', job_id
]
65 logging
.debug('Calling ' + ' '.join(exe
))
66 out
= check_output(exe
)
69 paths
= simulation_paths(sim_info
['id'],conf
)
70 log_path
= paths
['log_path']
71 with
open(log_path
, 'wb') as f
:
75 def cleanup_sim_output(sim_info
,conf
):
76 """ Cleanup simulation output.
77 :param sim_info: the simulation json
78 :param conf: configuration
80 cmd
= osp
.abspath(osp
.join(conf
['wrfxpy_path'],'cleanup.sh'))
81 job_id
= sim_info
['job_id']
82 exe
= [cmd
, 'output', job_id
]
83 logging
.debug('Calling ' + ' '.join(exe
))
84 os
.system(' '.join(exe
))
86 def cleanup_sim_workspace(sim_info
,conf
):
87 """ Cleanup simulation workspace.
88 :param sim_info: the simulation json
89 :param conf: configuration
91 cmd
= osp
.abspath(osp
.join(conf
['wrfxpy_path'],'cleanup.sh'))
92 job_id
= sim_info
['job_id']
93 exe
= [cmd
, 'workspace', job_id
]
94 logging
.debug('Calling ' + ' '.join(exe
))
95 os
.system(' '.join(exe
))
97 def delete_simulation(sim_info
,conf
):
99 Delete simulation from wrfxpy and all files.
100 :param sim_info: the simulation json
101 :param conf: configuration
103 cmd
= osp
.abspath(osp
.join(conf
['wrfxpy_path'],'cleanup.sh'))
104 job_id
= sim_info
['job_id']
105 exe
= [cmd
, 'delete', job_id
]
106 logging
.debug('Calling ' + ' '.join(exe
))
107 os
.system(' '.join(exe
))
108 delete_simulation_files(sim_info
['id'],conf
)
110 def delete_simulation_files(sim_id
,conf
):
112 Remove all files for given simulation.
113 :param sim_id: the simulation id
114 :param conf: configuration
116 rm(list(simulation_paths(sim_id
,conf
).values()))
118 def load_simulations(sims_path
):
120 Load all simulations stored in the simulations/ directory.
122 :params sims_path: path to jsons with simulation states
123 :return: a dictionary of simulations
126 logging
.info('Loading simulation states from %s' % sims_path
)
127 files
= glob
.glob(sims_path
+ '/*.json')
130 logging
.info('load_simulations: loading file %s' % f
)
132 sim_info
= json
.load(open(f
))
133 if 'job_id' not in sim_info
:
134 # older files do not have job_id, redo from the visualization link
135 link
=sim_info
['visualization_link']
136 sim_info
['job_id']=link
[link
.find('wfc-'):]
137 logging
.debug('Added missing job_id ' + sim_info
['job_id'])
138 sim_id
= sim_info
['id']
139 simulations
[sim_id
] = sim_info
140 logging
.info('load_simulations: loaded simulation id %s' % sim_id
)
142 logging
.error('load_simulations: failed to reload simulation %s' % f
)
143 os
.rename(f
, f
+ '.error')
146 def create_simulation(info
, conf
, cluster
):
148 Build a simulation JSON configuration based on profiles and execute
149 the simulation using wrfxpy.
151 :param info: the simulation info gathered from the build page
152 :param conf: configuration
153 :param cluster: a cluster object that conveys information about the computing environment
154 :return: the simulation info object
156 now
= datetime
.utcnow()
157 sim_id
= 'from-web-%04d-%02d-%02d_%02d-%02d-%02d' % (now
.year
, now
.month
, now
.day
, now
.hour
, now
.minute
, now
.second
)
159 # get paths of all files created
160 path
= simulation_paths(sim_id
,conf
)
161 log_path
= path
['log_path']
162 json_path
= path
['json_path']
163 run_script
= path
['run_script']
165 # store simulation configuration
166 profile
= info
['profile']
167 print('profile = %s' % json
.dumps(profile
, indent
=4, separators
=(',',': ')))
168 sim_descr
= info
['description']
171 'started_at' : to_esmf(datetime
.now()),
172 'description' : sim_descr
,
173 'profile' : info
['profile'],
174 'log_file' : log_path
,
175 'state' : make_initial_state()
178 # build a new job template
179 template
= osp
.abspath(profile
['template'])
180 cfg
= json
.load(open(template
))
181 print('Job template %s:' % template
)
182 print(json
.dumps(cfg
, indent
=4, separators
=(',', ': ')))
184 cfg
['template'] = template
185 cfg
['profile'] = profile
186 cfg
['grid_code'] = sim_id
187 cfg
['num_nodes'] = np
.floor(cfg
['num_nodes']*cfg
['ppn']/cluster
.ppn
)
188 cfg
['ppn'] = cluster
.ppn
189 sim_start
= to_esmf(datetime
.strptime(info
['start_utc'], '%b %d, %Y %I:%M %p'))
190 start_utc
= to_utc(to_esmf(datetime
.strptime(info
['start_utc'], '%b %d, %Y %I:%M %p')))
191 sim_end
= to_esmf(datetime
.strptime(info
['end_utc'], '%b %d, %Y %I:%M %p'))
192 end_utc
= to_utc(to_esmf(datetime
.strptime(info
['end_utc'], '%b %d, %Y %I:%M %p')))
193 sim_info
['start_utc'] = sim_start
194 cfg
['start_utc'] = sim_start
195 sim_info
['end_utc'] = sim_end
196 cfg
['end_utc'] = sim_end
197 if 'grib_source' not in cfg
or cfg
['grib_source'] == 'auto':
198 cfg
['grib_source'] = select_grib_source(start_utc
)
199 print('GRIB source not specified, selected %s from sim start time' % cfg
['grib_source'])
201 print('Using GRIB source %s from %s' % (cfg
['grib_source'], profile
['template']))
203 # build wrfpy_id and the visualization link
204 job_id
= 'wfc-%s-%s-%s' % (sim_id
, to_esmf(start_utc
), to_esmf(end_utc
))
205 sim_info
['job_id']=job_id
206 sim_info
['visualization_link'] = osp
.join(conf
['wrfxweb_url'], '?job_id=' + job_id
)
209 # place top-level domain
210 domain_lat
= float(info
['domain_center_lat'])
211 domain_lon
= float(info
['domain_center_lon'])
212 cfg
['domains']['1']['truelats'] = [domain_lat
, domain_lat
]
213 cfg
['domains']['1']['stand_lon'] = domain_lon
214 cfg
['domains']['1']['center_latlon'] = [domain_lat
, domain_lon
]
216 burn_plot_boundary
= []
217 if info
['ignition_perimeter_lats'] != "[]":
218 ign_line_lats
= info
['ignition_perimeter_lats'][1:-1].split(',')
219 ign_line_lons
= info
['ignition_perimeter_lons'][1:-1].split(',')
220 sim_info
['ignition_perimeter_lats'] = ign_line_lats
221 sim_info
['ignition_perimeter_lons'] = ign_line_lons
222 for i
in range(len(ign_line_lats
)):
223 ign_line_lat
= float(ign_line_lats
[i
])
224 ign_line_lon
= float(ign_line_lons
[i
])
225 perimeter
= [ign_line_lat
, ign_line_lon
]
226 burn_plot_boundary
.append(perimeter
)
227 cfg
['burn_plot_boundary'] = burn_plot_boundary
230 # setting the ignitions
231 if info
['ignition_line_lats'] != "[]":
232 ign_line_lats
= info
['ignition_line_lats'][1:-1].split(',')
233 ign_line_lons
= info
['ignition_line_lons'][1:-1].split(',')
234 ign_line_ign_time_esmfs
= [to_esmf(datetime
.strptime(ign_time
, '%b %d, %Y %I:%M %p')) for ign_time
in info
['ignition_line_ignition_times'][2:-2].split("\",\"")]
235 ign_line_fc_hours
= [int(fc_hour
) for fc_hour
in info
['ignition_line_fc_hours'][1:-1].split(',')]
236 sim_info
['ignition_line_lats'] = ign_line_lats
237 sim_info
['ignition_line_lons'] = ign_line_lons
238 sim_info
['ignition_line_ignition_times'] = ign_line_ign_time_esmfs
239 sim_info
['ign_line_fc_hours'] = ign_line_fc_hours
240 sim_info
['ignition_line_info'] = [[float(lat
), float(lon
), time
] for lat
,lon
,time
in zip(ign_line_lats
, ign_line_lons
, ign_line_ign_time_esmfs
)]
241 for i
in range(len(ign_line_lats
)):
242 ign_line_lat
= float(ign_line_lats
[i
])
243 ign_line_lon
= float(ign_line_lons
[i
])
244 ign_line_ign_time_esmf
= ign_line_ign_time_esmfs
[i
]
245 ign_line_fc_hour
= ign_line_fc_hours
[i
]
247 'latlon': [ign_line_lat
, ign_line_lon
],
248 'time_utc': ign_line_ign_time_esmf
,
249 'duration_s': ign_line_fc_hour
,
252 ignitions
.append(ignition
)
254 if info
['multiple_ignitions_lats'] != "[]":
255 ign_lats
= info
['multiple_ignitions_lats'][1:-1].split(',')
256 ign_lons
= info
['multiple_ignitions_lons'][1:-1].split(',')
257 ign_time_esmfs
= [to_esmf(datetime
.strptime(ign_time
, '%b %d, %Y %I:%M %p')) for ign_time
in info
['multiple_ignitions_ignition_times'][2:-2].split("\",\"")]
258 ign_fc_hours
= [int(fc_hour
) for fc_hour
in info
['multiple_ignitions_fc_hours'][1:-1].split(',')]
259 sim_info
['multiple_ignition_lats'] = ign_lats
260 sim_info
['multiple_ignition_lons'] = ign_lons
261 sim_info
['multiple_ignition_ignition_times'] = ign_time_esmfs
262 sim_info
['multiple_ignition_fc_hours'] = ign_fc_hours
263 sim_info
['multiple_ignition_info'] = [[float(lat
), float(lon
), time
] for lat
,lon
,time
in zip(ign_lats
, ign_lons
, ign_time_esmfs
)]
264 for i
in range(len(ign_lats
)):
265 ign_line_lat
= float(ign_lats
[i
])
266 ign_line_lon
= float(ign_lons
[i
])
267 ign_time_esmf
= ign_time_esmfs
[i
]
268 ign_fc_hour
= ign_fc_hours
[i
]
270 'latlon': [ign_line_lat
, ign_line_lon
],
271 'time_utc': ign_time_esmf
,
272 'duration_s': ign_fc_hour
,
274 ignitions
.append(ignition
)
277 domain
= list(cfg
['ignitions'].keys())[0]
278 cfg
['ignitions'][domain
] = ignitions
280 cfg
['ignitions'] = {}
282 # switch on sending results to visualization server
283 cfg
['postproc']['shuttle'] = 'incremental'
284 cfg
['postproc']['description'] = sim_descr
286 json
.dump(cfg
, open(json_path
, 'w'), indent
=4, separators
=(',',': '))
288 print('Job configuration %s:' % json_path
)
289 print(json
.dumps(cfg
, indent
=4, separators
=(',', ': ')))
291 # drop a shell script that will run the file
292 with
open(run_script
, 'w') as f
:
293 f
.write('#!/usr/bin/env bash\n')
294 #f.write('/usr/bin/env\n')
295 f
.write('export PYTHONPATH=src\n')
296 f
.write('cd ' + conf
['wrfxpy_path'] + '\n')
297 f
.write('LOG=' + osp
.abspath(log_path
) + '\n')
298 f
.write('./forecast.sh ' + osp
.abspath(json_path
) + ' &> $LOG \n')
301 st
= os
.stat(run_script
)
302 os
.chmod(run_script
, st
.st_mode | stat
.S_IEXEC
)
304 # execute the fire forecast and reroute into the log file provided
305 print('Running %s' % run_script
)
306 proc
= Popen(run_script
, shell
=True, stdin
=None, stdout
=None, stderr
=None, close_fds
=True)
312 def write_kml(ign_lat
, ign_lon
, time
, path
):
313 color
= simplekml
.Color
.red
314 kml
= simplekml
.Kml()
315 kml
.document
.name
= "Perimeters"
316 multipoly
= kml
.newmultigeometry(name
='PERIM_'+time
)
318 for n
, lat
in enumerate(ign_lat
):
319 outer
.append((ign_lon
[n
], lat
))
320 multipoly
.newpolygon(outerboundaryis
=outer
)
321 multipoly
.timestamp
.when
= time
322 polycolor
= '00'+color
[2:]
323 multipoly
.style
.polystyle
.color
= polycolor
324 multipoly
.style
.linestyle
.color
= color
328 def parse_error(state
, line
):
330 Find the tool that created the error in line and update state.
332 :param line: the line that created the error
333 :param state: the state dictionary containing state of each tool
335 tools
= ['geogrid', 'ingest', 'ungrib', 'metgrid', 'real', 'wrf', 'output']
337 if t
in line
.lower() or state
[t
] in ['waiting','running']:
342 def parse_time(line
):
344 All logging lines begin with a timestamp, parse it and return the datetime it represents.
346 Example: 2016-04-14 14:50:33,325
347 :param line: the log line containing time
348 :return: native datetime
350 return datetime
.strptime(line
[:19], '%y-%m-%d %h:%M:%S')
353 def make_initial_state():
355 Create an initial state dictionary.
357 return { 'geogrid' : 'waiting',
358 'ingest' : 'waiting',
359 'ungrib' : 'waiting',
360 'metgrid' : 'waiting',
363 'output': 'waiting' }
367 def get_simulation_state(path
):
369 Identify the state of the computation for each subcomponent
370 from the output log. If the log does not exist, return default state.
372 :param path: the path to the log file
374 state
= make_initial_state()
376 # for some reason the "with open(path) as f" started just quietly exiting
377 # when the file does not exist...
383 parse_error(state
,line
)
384 if 'subprocess.CalledProcessError' in line
:
385 parse_error(state
, line
)
386 if 'WRF completion detected' in line
:
387 state
['wrf'] = 'complete'
388 if 'running GEOGRID' in line
:
389 state
['geogrid'] = 'running'
390 elif 'GEOGRID complete' in line
:
391 state
['geogrid'] = 'complete'
392 elif 'retrieving GRIB files' in line
:
393 state
['ingest'] = 'running'
394 elif 'running UNGRIB' in line
:
395 state
['ingest'] = 'complete'
396 state
['ungrib'] = 'running'
397 elif 'UNGRIB complete' in line
:
398 state
['ingest'] = 'complete'
399 state
['ungrib'] = 'complete'
400 elif 'running METGRID' in line
:
401 state
['metgrid'] = 'running'
402 elif 'METGRID complete' in line
:
403 state
['metgrid'] = 'complete'
404 elif 'running REAL' in line
:
405 state
['metgrid'] = 'complete'
406 state
['real'] = 'running'
407 elif 'submitting WRF' in line
:
408 state
['real'] = 'complete'
409 state
['wrf'] = 'submit'
410 elif 'Detected rsl.error.0000' in line
:
411 state
['wrf'] = 'running'
412 elif 'SHUTTLE operations completed' in line
:
413 state
['output'] = 'available'
414 if 'Cancelled' in line
:
415 state
['wrf'] = 'cancelled'
417 if state
['geogrid'] == 'failed' or \
418 state
['ungrib'] == 'failed':
419 state
['metgrid'] = 'failed'
420 state
['real'] = 'failed'
421 state
['wrf'] = 'failed'
422 state
['output'] = 'failed'
424 print("Cannot open file %s" % path
)