Show More
@@ -454,7 +454,8 b' class ApplicationWithClusterDir(Application):' | |||
|
454 | 454 | """Remove the pid file. |
|
455 | 455 | |
|
456 | 456 | This should be called at shutdown by registering a callback with |
|
457 | :func:`reactor.addSystemEventTrigger`. | |
|
457 | :func:`reactor.addSystemEventTrigger`. This needs to return | |
|
458 | ``None``. | |
|
458 | 459 | """ |
|
459 | 460 | pid_file = os.path.join(self.pid_dir, self.name + u'.pid') |
|
460 | 461 | if os.path.isfile(pid_file): |
@@ -463,7 +464,6 b' class ApplicationWithClusterDir(Application):' | |||
|
463 | 464 | os.remove(pid_file) |
|
464 | 465 | except: |
|
465 | 466 | self.log.warn("Error removing the pid file: %s" % pid_file) |
|
466 | raise | |
|
467 | 467 | |
|
468 | 468 | def get_pid_from_file(self): |
|
469 | 469 | """Get the pid from the pid file. |
@@ -32,8 +32,8 b' from IPython.kernel.clusterdir import (' | |||
|
32 | 32 | ApplicationWithClusterDir, ClusterDirError, PIDFileError |
|
33 | 33 | ) |
|
34 | 34 | |
|
35 | from twisted.internet import reactor | |
|
36 | from twisted.python import log | |
|
35 | from twisted.internet import reactor, defer | |
|
36 | from twisted.python import log, failure | |
|
37 | 37 | |
|
38 | 38 | |
|
39 | 39 | #----------------------------------------------------------------------------- |
@@ -283,31 +283,43 b' class IPClusterApp(ApplicationWithClusterDir):' | |||
|
283 | 283 | ) |
|
284 | 284 | |
|
285 | 285 | # Setup signals |
|
286 |
signal.signal(signal.SIGINT, self.s |
|
|
286 | signal.signal(signal.SIGINT, self.sigint_handler) | |
|
287 | 287 | |
|
288 | # Setup the observing of stopping | |
|
288 | # Setup the observing of stopping. If the controller dies, shut | |
|
289 | # everything down as that will be completely fatal for the engines. | |
|
289 | 290 | d1 = self.controller_launcher.observe_stop() |
|
290 |
d1.addCallback(self.stop_ |
|
|
291 | d1.addErrback(self.err_and_stop) | |
|
292 | # If this triggers, just let them die | |
|
293 | # d2 = self.engine_launcher.observe_stop() | |
|
291 | d1.addCallback(self.stop_launchers) | |
|
292 | # But, we don't monitor the stopping of engines. An engine dying | |
|
293 | # is just fine and in principle a user could start a new engine. | |
|
294 | # Also, if we did monitor engine stopping, it is difficult to | |
|
295 | # know what to do when only some engines die. Currently, the | |
|
296 | # observing of engine stopping is inconsistent. Some launchers | |
|
297 | # might trigger on a single engine stopping, other wait until | |
|
298 | # all stop. TODO: think more about how to handle this. | |
|
294 | 299 | |
|
295 | 300 | # Start the controller and engines |
|
301 | self._stopping = False # Make sure stop_launchers is not called 2x. | |
|
302 | d = self.start_controller() | |
|
303 | d.addCallback(self.start_engines) | |
|
304 | d.addCallback(self.startup_message) | |
|
305 | # If the controller or engines fail to start, stop everything | |
|
306 | d.addErrback(self.stop_launchers) | |
|
307 | return d | |
|
308 | ||
|
309 | def startup_message(self, r=None): | |
|
310 | log.msg("IPython cluster: started") | |
|
311 | return r | |
|
312 | ||
|
313 | def start_controller(self, r=None): | |
|
314 | # log.msg("In start_controller") | |
|
315 | config = self.master_config | |
|
296 | 316 | d = self.controller_launcher.start( |
|
297 | 317 | cluster_dir=config.Global.cluster_dir |
|
298 | 318 | ) |
|
299 | d.addCallback(lambda _: self.start_engines()) | |
|
300 | d.addErrback(self.err_and_stop) | |
|
301 | ||
|
302 | def err_and_stop(self, f): | |
|
303 | log.msg('Unexpected error in ipcluster:') | |
|
304 | log.err(f) | |
|
305 | reactor.stop() | |
|
306 | ||
|
307 | def stop_engines(self, r): | |
|
308 | return self.engine_launcher.stop() | |
|
319 | return d | |
|
309 | 320 | |
|
310 | def start_engines(self): | |
|
321 | def start_engines(self, r=None): | |
|
322 | # log.msg("In start_engines") | |
|
311 | 323 | config = self.master_config |
|
312 | 324 | d = self.engine_launcher.start( |
|
313 | 325 | config.Global.n, |
@@ -315,25 +327,55 b' class IPClusterApp(ApplicationWithClusterDir):' | |||
|
315 | 327 | ) |
|
316 | 328 | return d |
|
317 | 329 | |
|
318 |
def stop_l |
|
|
319 |
log.msg(" |
|
|
320 | d1 = self.engine_launcher.stop() | |
|
321 |
d |
|
|
322 | # d1.addCallback(lambda _: self.controller_launcher.stop) | |
|
323 | d1.addErrback(self.err_and_stop) | |
|
324 | d2.addErrback(self.err_and_stop) | |
|
325 | reactor.callLater(2.0, reactor.stop) | |
|
330 | def stop_controller(self, r=None): | |
|
331 | # log.msg("In stop_controller") | |
|
332 | if self.controller_launcher.running: | |
|
333 | d = self.controller_launcher.stop() | |
|
334 | d.addErrback(self.log_err) | |
|
335 | return d | |
|
336 | else: | |
|
337 | return defer.succeed(None) | |
|
338 | ||
|
339 | def stop_engines(self, r=None): | |
|
340 | # log.msg("In stop_engines") | |
|
341 | if self.engine_launcher.running: | |
|
342 | d = self.engine_launcher.stop() | |
|
343 | d.addErrback(self.log_err) | |
|
344 | return d | |
|
345 | else: | |
|
346 | return defer.succeed(None) | |
|
347 | ||
|
348 | def log_err(self, f): | |
|
349 | log.msg(f.getTraceback()) | |
|
350 | return None | |
|
351 | ||
|
352 | def stop_launchers(self, r=None): | |
|
353 | if not self._stopping: | |
|
354 | self._stopping = True | |
|
355 | if isinstance(r, failure.Failure): | |
|
356 | log.msg('Unexpected error in ipcluster:') | |
|
357 | log.msg(r.getTraceback()) | |
|
358 | log.msg("IPython cluster: stopping") | |
|
359 | d= self.stop_engines() | |
|
360 | d2 = self.stop_controller() | |
|
361 | # Wait a few seconds to let things shut down. | |
|
362 | reactor.callLater(3.0, reactor.stop) | |
|
363 | ||
|
364 | def sigint_handler(self, signum, frame): | |
|
365 | self.stop_launchers() | |
|
326 | 366 | |
|
327 | 367 | def start_logging(self): |
|
328 | # Remove old log files | |
|
368 | # Remove old log files of the controller and engine | |
|
329 | 369 | if self.master_config.Global.clean_logs: |
|
330 | 370 | log_dir = self.master_config.Global.log_dir |
|
331 | 371 | for f in os.listdir(log_dir): |
|
332 |
if f.startswith('ipengine' + '-') |
|
|
372 | if f.startswith('ipengine' + '-'): | |
|
373 | if f.endswith('.log') or f.endswith('.out') or f.endswith('.err'): | |
|
333 | 374 | os.remove(os.path.join(log_dir, f)) |
|
334 | for f in os.listdir(log_dir): | |
|
335 | if f.startswith('ipcontroller' + '-') and f.endswith('.log'): | |
|
375 | if f.startswith('ipcontroller' + '-'): | |
|
376 | if f.endswith('.log') or f.endswith('.out') or f.endswith('.err'): | |
|
336 | 377 | os.remove(os.path.join(log_dir, f)) |
|
378 | # This will remote old log files for ipcluster itself | |
|
337 | 379 | super(IPClusterApp, self).start_logging() |
|
338 | 380 | |
|
339 | 381 | def start_app(self): |
@@ -252,8 +252,8 b' class IPControllerTask(WinHPCTask):' | |||
|
252 | 252 | controller_cmd = List(['ipcontroller.exe'], config=True) |
|
253 | 253 | controller_args = List(['--log-to-file', '--log-level', '40'], config=True) |
|
254 | 254 | # I don't want these to be configurable |
|
255 |
std_out_file_path = CStr( |
|
|
256 |
std_err_file_path = CStr( |
|
|
255 | std_out_file_path = CStr('', config=False) | |
|
256 | std_err_file_path = CStr('', config=False) | |
|
257 | 257 | min_cores = Int(1, config=False) |
|
258 | 258 | max_cores = Int(1, config=False) |
|
259 | 259 | min_sockets = Int(1, config=False) |
@@ -263,6 +263,12 b' class IPControllerTask(WinHPCTask):' | |||
|
263 | 263 | unit_type = Str("Core", config=False) |
|
264 | 264 | work_directory = CStr('', config=False) |
|
265 | 265 | |
|
266 | def __init__(self, parent, name=None, config=None): | |
|
267 | super(IPControllerTask, self).__init__(parent, name, config) | |
|
268 | the_uuid = uuid.uuid1() | |
|
269 | self.std_out_file_path = os.path.join('log','ipcontroller-%s.out' % the_uuid) | |
|
270 | self.std_err_file_path = os.path.join('log','ipcontroller-%s.err' % the_uuid) | |
|
271 | ||
|
266 | 272 | @property |
|
267 | 273 | def command_line(self): |
|
268 | 274 | return ' '.join(self.controller_cmd + self.controller_args) |
@@ -274,8 +280,8 b' class IPEngineTask(WinHPCTask):' | |||
|
274 | 280 | engine_cmd = List(['ipengine.exe'], config=True) |
|
275 | 281 | engine_args = List(['--log-to-file', '--log-level', '40'], config=True) |
|
276 | 282 | # I don't want these to be configurable |
|
277 |
std_out_file_path = CStr( |
|
|
278 |
std_err_file_path = CStr( |
|
|
283 | std_out_file_path = CStr('', config=False) | |
|
284 | std_err_file_path = CStr('', config=False) | |
|
279 | 285 | min_cores = Int(1, config=False) |
|
280 | 286 | max_cores = Int(1, config=False) |
|
281 | 287 | min_sockets = Int(1, config=False) |
@@ -285,6 +291,12 b' class IPEngineTask(WinHPCTask):' | |||
|
285 | 291 | unit_type = Str("Core", config=False) |
|
286 | 292 | work_directory = CStr('', config=False) |
|
287 | 293 | |
|
294 | def __init__(self, parent, name=None, config=None): | |
|
295 | super(IPEngineTask,self).__init__(parent, name, config) | |
|
296 | the_uuid = uuid.uuid1() | |
|
297 | self.std_out_file_path = os.path.join('log','ipengine-%s.out' % the_uuid) | |
|
298 | self.std_err_file_path = os.path.join('log','ipengine-%s.err' % the_uuid) | |
|
299 | ||
|
288 | 300 | @property |
|
289 | 301 | def command_line(self): |
|
290 | 302 | return ' '.join(self.engine_cmd + self.engine_args) |
General Comments 0
You need to be logged in to leave comments.
Login now