##// END OF EJS Templates
add ipcluster engines; fix ssh process shutdown
MinRK -
Show More
@@ -1,505 +1,580 b''
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3 """
4 4 The ipcluster application.
5 5 """
6 6
7 7 #-----------------------------------------------------------------------------
8 8 # Copyright (C) 2008-2009 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING, distributed as part of this software.
12 12 #-----------------------------------------------------------------------------
13 13
14 14 #-----------------------------------------------------------------------------
15 15 # Imports
16 16 #-----------------------------------------------------------------------------
17 17
18 18 import re
19 19 import logging
20 20 import os
21 21 import signal
22 22 import logging
23 23
24 24 from zmq.eventloop import ioloop
25 25
26 26 from IPython.external.argparse import ArgumentParser, SUPPRESS
27 27 from IPython.utils.importstring import import_item
28 28 from IPython.zmq.parallel.clusterdir import (
29 29 ApplicationWithClusterDir, ClusterDirConfigLoader,
30 30 ClusterDirError, PIDFileError
31 31 )
32 32
33 33
34 34 #-----------------------------------------------------------------------------
35 35 # Module level variables
36 36 #-----------------------------------------------------------------------------
37 37
38 38
39 39 default_config_file_name = u'ipcluster_config.py'
40 40
41 41
42 42 _description = """\
43 43 Start an IPython cluster for parallel computing.\n\n
44 44
45 45 An IPython cluster consists of 1 controller and 1 or more engines.
46 46 This command automates the startup of these processes using a wide
47 47 range of startup methods (SSH, local processes, PBS, mpiexec,
48 48 Windows HPC Server 2008). To start a cluster with 4 engines on your
49 49 local host simply do 'ipclusterz start -n 4'. For more complex usage
50 50 you will typically do 'ipclusterz create -p mycluster', then edit
51 51 configuration files, followed by 'ipclusterz start -p mycluster -n 4'.
52 52 """
53 53
54 54
55 55 # Exit codes for ipcluster
56 56
57 57 # This will be the exit code if the ipcluster appears to be running because
58 58 # a .pid file exists
59 59 ALREADY_STARTED = 10
60 60
61 61
62 62 # This will be the exit code if ipcluster stop is run, but there is not .pid
63 63 # file to be found.
64 64 ALREADY_STOPPED = 11
65 65
66 # This will be the exit code if ipcluster engines is run, but there is not .pid
67 # file to be found.
68 NO_CLUSTER = 12
69
66 70
67 71 #-----------------------------------------------------------------------------
68 72 # Command line options
69 73 #-----------------------------------------------------------------------------
70 74
71 75
72 76 class IPClusterAppConfigLoader(ClusterDirConfigLoader):
73 77
74 78 def _add_arguments(self):
75 79 # Don't call ClusterDirConfigLoader._add_arguments as we don't want
76 80 # its defaults on self.parser. Instead, we will put those on
77 81 # default options on our subparsers.
78 82
79 83 # This has all the common options that all subcommands use
80 84 parent_parser1 = ArgumentParser(
81 85 add_help=False,
82 86 argument_default=SUPPRESS
83 87 )
84 88 self._add_ipython_dir(parent_parser1)
85 89 self._add_log_level(parent_parser1)
86 90
87 91 # This has all the common options that other subcommands use
88 92 parent_parser2 = ArgumentParser(
89 93 add_help=False,
90 94 argument_default=SUPPRESS
91 95 )
92 96 self._add_cluster_profile(parent_parser2)
93 97 self._add_cluster_dir(parent_parser2)
94 98 self._add_work_dir(parent_parser2)
95 99 paa = parent_parser2.add_argument
96 100 paa('--log-to-file',
97 101 action='store_true', dest='Global.log_to_file',
98 102 help='Log to a file in the log directory (default is stdout)')
99 103
100 104 # Create the object used to create the subparsers.
101 105 subparsers = self.parser.add_subparsers(
102 106 dest='Global.subcommand',
103 107 title='ipcluster subcommands',
104 108 description=
105 109 """ipcluster has a variety of subcommands. The general way of
106 110 running ipcluster is 'ipclusterz <cmd> [options]'. To get help
107 111 on a particular subcommand do 'ipclusterz <cmd> -h'."""
108 112 # help="For more help, type 'ipclusterz <cmd> -h'",
109 113 )
110 114
111 115 # The "list" subcommand parser
112 116 parser_list = subparsers.add_parser(
113 117 'list',
114 118 parents=[parent_parser1],
115 119 argument_default=SUPPRESS,
116 120 help="List all clusters in cwd and ipython_dir.",
117 121 description=
118 122 """List all available clusters, by cluster directory, that can
119 123 be found in the current working directly or in the ipython
120 124 directory. Cluster directories are named using the convention
121 125 'cluster_<profile>'."""
122 126 )
123 127
124 128 # The "create" subcommand parser
125 129 parser_create = subparsers.add_parser(
126 130 'create',
127 131 parents=[parent_parser1, parent_parser2],
128 132 argument_default=SUPPRESS,
129 133 help="Create a new cluster directory.",
130 134 description=
131 135 """Create an ipython cluster directory by its profile name or
132 136 cluster directory path. Cluster directories contain
133 137 configuration, log and security related files and are named
134 138 using the convention 'cluster_<profile>'. By default they are
135 139 located in your ipython directory. Once created, you will
136 140 probably need to edit the configuration files in the cluster
137 141 directory to configure your cluster. Most users will create a
138 142 cluster directory by profile name,
139 143 'ipclusterz create -p mycluster', which will put the directory
140 144 in '<ipython_dir>/cluster_mycluster'.
141 145 """
142 146 )
143 147 paa = parser_create.add_argument
144 148 paa('--reset-config',
145 149 dest='Global.reset_config', action='store_true',
146 150 help=
147 151 """Recopy the default config files to the cluster directory.
148 152 You will loose any modifications you have made to these files.""")
149 153
150 154 # The "start" subcommand parser
151 155 parser_start = subparsers.add_parser(
152 156 'start',
153 157 parents=[parent_parser1, parent_parser2],
154 158 argument_default=SUPPRESS,
155 159 help="Start a cluster.",
156 160 description=
157 161 """Start an ipython cluster by its profile name or cluster
158 162 directory. Cluster directories contain configuration, log and
159 163 security related files and are named using the convention
160 164 'cluster_<profile>' and should be creating using the 'start'
161 165 subcommand of 'ipcluster'. If your cluster directory is in
162 166 the cwd or the ipython directory, you can simply refer to it
163 167 using its profile name, 'ipclusterz start -n 4 -p <profile>`,
164 168 otherwise use the '--cluster-dir' option.
165 169 """
166 170 )
171
167 172 paa = parser_start.add_argument
168 173 paa('-n', '--number',
169 174 type=int, dest='Global.n',
170 175 help='The number of engines to start.',
171 176 metavar='Global.n')
172 177 paa('--clean-logs',
173 178 dest='Global.clean_logs', action='store_true',
174 179 help='Delete old log flies before starting.')
175 180 paa('--no-clean-logs',
176 181 dest='Global.clean_logs', action='store_false',
177 182 help="Don't delete old log flies before starting.")
178 183 paa('--daemon',
179 184 dest='Global.daemonize', action='store_true',
180 185 help='Daemonize the ipcluster program. This implies --log-to-file')
181 186 paa('--no-daemon',
182 187 dest='Global.daemonize', action='store_false',
183 188 help="Dont't daemonize the ipcluster program.")
184 189 paa('--delay',
185 190 type=float, dest='Global.delay',
186 191 help="Specify the delay (in seconds) between starting the controller and starting the engine(s).")
187 192
188 193 # The "stop" subcommand parser
189 194 parser_stop = subparsers.add_parser(
190 195 'stop',
191 196 parents=[parent_parser1, parent_parser2],
192 197 argument_default=SUPPRESS,
193 198 help="Stop a running cluster.",
194 199 description=
195 200 """Stop a running ipython cluster by its profile name or cluster
196 201 directory. Cluster directories are named using the convention
197 202 'cluster_<profile>'. If your cluster directory is in
198 203 the cwd or the ipython directory, you can simply refer to it
199 204 using its profile name, 'ipclusterz stop -p <profile>`, otherwise
200 205 use the '--cluster-dir' option.
201 206 """
202 207 )
203 208 paa = parser_stop.add_argument
204 209 paa('--signal',
205 210 dest='Global.signal', type=int,
206 211 help="The signal number to use in stopping the cluster (default=2).",
207 212 metavar="Global.signal")
208
213
214 # the "engines" subcommand parser
215 parser_engines = subparsers.add_parser(
216 'engines',
217 parents=[parent_parser1, parent_parser2],
218 argument_default=SUPPRESS,
219 help="Attach some engines to an existing controller or cluster.",
220 description=
221 """Start one or more engines to connect to an existing Cluster
222 by profile name or cluster directory.
223 Cluster directories contain configuration, log and
224 security related files and are named using the convention
225 'cluster_<profile>' and should be creating using the 'start'
226 subcommand of 'ipcluster'. If your cluster directory is in
227 the cwd or the ipython directory, you can simply refer to it
228 using its profile name, 'ipclusterz engines -n 4 -p <profile>`,
229 otherwise use the '--cluster-dir' option.
230 """
231 )
232 paa = parser_engines.add_argument
233 paa('-n', '--number',
234 type=int, dest='Global.n',
235 help='The number of engines to start.',
236 metavar='Global.n')
237 paa('--daemon',
238 dest='Global.daemonize', action='store_true',
239 help='Daemonize the ipcluster program. This implies --log-to-file')
240 paa('--no-daemon',
241 dest='Global.daemonize', action='store_false',
242 help="Dont't daemonize the ipcluster program.")
209 243
210 244 #-----------------------------------------------------------------------------
211 245 # Main application
212 246 #-----------------------------------------------------------------------------
213 247
214 248
215 249 class IPClusterApp(ApplicationWithClusterDir):
216 250
217 251 name = u'ipclusterz'
218 252 description = _description
219 253 usage = None
220 254 command_line_loader = IPClusterAppConfigLoader
221 255 default_config_file_name = default_config_file_name
222 256 default_log_level = logging.INFO
223 257 auto_create_cluster_dir = False
224 258
225 259 def create_default_config(self):
226 260 super(IPClusterApp, self).create_default_config()
227 261 self.default_config.Global.controller_launcher = \
228 262 'IPython.zmq.parallel.launcher.LocalControllerLauncher'
229 263 self.default_config.Global.engine_launcher = \
230 264 'IPython.zmq.parallel.launcher.LocalEngineSetLauncher'
231 265 self.default_config.Global.n = 2
232 266 self.default_config.Global.delay = 1
233 267 self.default_config.Global.reset_config = False
234 268 self.default_config.Global.clean_logs = True
235 self.default_config.Global.signal = 2
269 self.default_config.Global.signal = signal.SIGINT
236 270 self.default_config.Global.daemonize = False
237 271
238 272 def find_resources(self):
239 273 subcommand = self.command_line_config.Global.subcommand
240 274 if subcommand=='list':
241 275 self.list_cluster_dirs()
242 276 # Exit immediately because there is nothing left to do.
243 277 self.exit()
244 278 elif subcommand=='create':
245 279 self.auto_create_cluster_dir = True
246 280 super(IPClusterApp, self).find_resources()
247 281 elif subcommand=='start' or subcommand=='stop':
248 282 self.auto_create_cluster_dir = True
249 283 try:
250 284 super(IPClusterApp, self).find_resources()
251 285 except ClusterDirError:
252 286 raise ClusterDirError(
253 287 "Could not find a cluster directory. A cluster dir must "
254 288 "be created before running 'ipclusterz start'. Do "
255 289 "'ipclusterz create -h' or 'ipclusterz list -h' for more "
256 290 "information about creating and listing cluster dirs."
257 291 )
292 elif subcommand=='engines':
293 self.auto_create_cluster_dir = False
294 try:
295 super(IPClusterApp, self).find_resources()
296 except ClusterDirError:
297 raise ClusterDirError(
298 "Could not find a cluster directory. A cluster dir must "
299 "be created before running 'ipclusterz start'. Do "
300 "'ipclusterz create -h' or 'ipclusterz list -h' for more "
301 "information about creating and listing cluster dirs."
302 )
258 303
259 304 def list_cluster_dirs(self):
260 305 # Find the search paths
261 306 cluster_dir_paths = os.environ.get('IPCLUSTER_DIR_PATH','')
262 307 if cluster_dir_paths:
263 308 cluster_dir_paths = cluster_dir_paths.split(':')
264 309 else:
265 310 cluster_dir_paths = []
266 311 try:
267 312 ipython_dir = self.command_line_config.Global.ipython_dir
268 313 except AttributeError:
269 314 ipython_dir = self.default_config.Global.ipython_dir
270 315 paths = [os.getcwd(), ipython_dir] + \
271 316 cluster_dir_paths
272 317 paths = list(set(paths))
273 318
274 319 self.log.info('Searching for cluster dirs in paths: %r' % paths)
275 320 for path in paths:
276 321 files = os.listdir(path)
277 322 for f in files:
278 323 full_path = os.path.join(path, f)
279 324 if os.path.isdir(full_path) and f.startswith('cluster_'):
280 325 profile = full_path.split('_')[-1]
281 326 start_cmd = 'ipclusterz start -p %s -n 4' % profile
282 327 print start_cmd + " ==> " + full_path
283 328
284 329 def pre_construct(self):
285 330 # IPClusterApp.pre_construct() is where we cd to the working directory.
286 331 super(IPClusterApp, self).pre_construct()
287 332 config = self.master_config
288 333 try:
289 334 daemon = config.Global.daemonize
290 335 if daemon:
291 336 config.Global.log_to_file = True
292 337 except AttributeError:
293 338 pass
294 339
295 340 def construct(self):
296 341 config = self.master_config
297 342 subcmd = config.Global.subcommand
298 343 reset = config.Global.reset_config
299 344 if subcmd == 'list':
300 345 return
301 346 if subcmd == 'create':
302 347 self.log.info('Copying default config files to cluster directory '
303 348 '[overwrite=%r]' % (reset,))
304 349 self.cluster_dir_obj.copy_all_config_files(overwrite=reset)
305 350 if subcmd =='start':
306 351 self.cluster_dir_obj.copy_all_config_files(overwrite=False)
307 352 self.start_logging()
308 353 self.loop = ioloop.IOLoop.instance()
309 354 # reactor.callWhenRunning(self.start_launchers)
310 355 dc = ioloop.DelayedCallback(self.start_launchers, 0, self.loop)
311 356 dc.start()
357 if subcmd == 'engines':
358 self.start_logging()
359 self.loop = ioloop.IOLoop.instance()
360 # reactor.callWhenRunning(self.start_launchers)
361 engine_only = lambda : self.start_launchers(controller=False)
362 dc = ioloop.DelayedCallback(engine_only, 0, self.loop)
363 dc.start()
312 364
313 def start_launchers(self):
365 def start_launchers(self, controller=True):
314 366 config = self.master_config
315 367
316 368 # Create the launchers. In both bases, we set the work_dir of
317 369 # the launcher to the cluster_dir. This is where the launcher's
318 370 # subprocesses will be launched. It is not where the controller
319 371 # and engine will be launched.
372 if controller:
373 cl_class = import_item(config.Global.controller_launcher)
374 self.controller_launcher = cl_class(
375 work_dir=self.cluster_dir, config=config,
376 logname=self.log.name
377 )
378 # Setup the observing of stopping. If the controller dies, shut
379 # everything down as that will be completely fatal for the engines.
380 self.controller_launcher.on_stop(self.stop_launchers)
381 # But, we don't monitor the stopping of engines. An engine dying
382 # is just fine and in principle a user could start a new engine.
383 # Also, if we did monitor engine stopping, it is difficult to
384 # know what to do when only some engines die. Currently, the
385 # observing of engine stopping is inconsistent. Some launchers
386 # might trigger on a single engine stopping, other wait until
387 # all stop. TODO: think more about how to handle this.
388
389
320 390 el_class = import_item(config.Global.engine_launcher)
321 391 self.engine_launcher = el_class(
322 392 work_dir=self.cluster_dir, config=config, logname=self.log.name
323 393 )
324 cl_class = import_item(config.Global.controller_launcher)
325 self.controller_launcher = cl_class(
326 work_dir=self.cluster_dir, config=config,
327 logname=self.log.name
328 )
329 394
330 395 # Setup signals
331 396 signal.signal(signal.SIGINT, self.sigint_handler)
332 397
333 # Setup the observing of stopping. If the controller dies, shut
334 # everything down as that will be completely fatal for the engines.
335 self.controller_launcher.on_stop(self.stop_launchers)
336 # d1.addCallback(self.stop_launchers)
337 # But, we don't monitor the stopping of engines. An engine dying
338 # is just fine and in principle a user could start a new engine.
339 # Also, if we did monitor engine stopping, it is difficult to
340 # know what to do when only some engines die. Currently, the
341 # observing of engine stopping is inconsistent. Some launchers
342 # might trigger on a single engine stopping, other wait until
343 # all stop. TODO: think more about how to handle this.
344
345 398 # Start the controller and engines
346 399 self._stopping = False # Make sure stop_launchers is not called 2x.
347 d = self.start_controller()
348 dc = ioloop.DelayedCallback(self.start_engines, 1000*config.Global.delay, self.loop)
400 if controller:
401 self.start_controller()
402 dc = ioloop.DelayedCallback(self.start_engines, 1000*config.Global.delay*controller, self.loop)
349 403 dc.start()
350 404 self.startup_message()
351 # d.addCallback(self.start_engines)
352 # d.addCallback(self.startup_message)
353 # If the controller or engines fail to start, stop everything
354 # d.addErrback(self.stop_launchers)
355 return d
356 405
357 406 def startup_message(self, r=None):
358 407 self.log.info("IPython cluster: started")
359 408 return r
360 409
361 410 def start_controller(self, r=None):
362 411 # self.log.info("In start_controller")
363 412 config = self.master_config
364 413 d = self.controller_launcher.start(
365 414 cluster_dir=config.Global.cluster_dir
366 415 )
367 416 return d
368 417
369 418 def start_engines(self, r=None):
370 419 # self.log.info("In start_engines")
371 420 config = self.master_config
421
372 422 d = self.engine_launcher.start(
373 423 config.Global.n,
374 424 cluster_dir=config.Global.cluster_dir
375 425 )
376 426 return d
377 427
378 428 def stop_controller(self, r=None):
379 429 # self.log.info("In stop_controller")
380 430 if self.controller_launcher.running:
381 431 return self.controller_launcher.stop()
382 432
383 433 def stop_engines(self, r=None):
384 434 # self.log.info("In stop_engines")
385 435 if self.engine_launcher.running:
386 436 d = self.engine_launcher.stop()
387 437 # d.addErrback(self.log_err)
388 438 return d
389 439 else:
390 440 return None
391 441
392 442 def log_err(self, f):
393 443 self.log.error(f.getTraceback())
394 444 return None
395 445
396 446 def stop_launchers(self, r=None):
397 447 if not self._stopping:
398 448 self._stopping = True
399 449 # if isinstance(r, failure.Failure):
400 450 # self.log.error('Unexpected error in ipcluster:')
401 451 # self.log.info(r.getTraceback())
402 452 self.log.error("IPython cluster: stopping")
403 453 # These return deferreds. We are not doing anything with them
404 454 # but we are holding refs to them as a reminder that they
405 455 # do return deferreds.
406 456 d1 = self.stop_engines()
407 457 d2 = self.stop_controller()
408 458 # Wait a few seconds to let things shut down.
409 459 dc = ioloop.DelayedCallback(self.loop.stop, 4000, self.loop)
410 460 dc.start()
411 461 # reactor.callLater(4.0, reactor.stop)
412 462
413 463 def sigint_handler(self, signum, frame):
414 464 self.stop_launchers()
415 465
416 466 def start_logging(self):
417 467 # Remove old log files of the controller and engine
418 468 if self.master_config.Global.clean_logs:
419 469 log_dir = self.master_config.Global.log_dir
420 470 for f in os.listdir(log_dir):
421 471 if re.match(r'ip(engine|controller)z-\d+\.(log|err|out)',f):
422 472 os.remove(os.path.join(log_dir, f))
423 473 # This will remove old log files for ipcluster itself
424 474 super(IPClusterApp, self).start_logging()
425 475
426 476 def start_app(self):
427 477 """Start the application, depending on what subcommand is used."""
428 478 subcmd = self.master_config.Global.subcommand
429 479 if subcmd=='create' or subcmd=='list':
430 480 return
431 481 elif subcmd=='start':
432 482 self.start_app_start()
433 483 elif subcmd=='stop':
434 484 self.start_app_stop()
485 elif subcmd=='engines':
486 self.start_app_engines()
435 487
436 488 def start_app_start(self):
437 489 """Start the app for the start subcommand."""
438 490 config = self.master_config
439 491 # First see if the cluster is already running
440 492 try:
441 493 pid = self.get_pid_from_file()
442 494 except PIDFileError:
443 495 pass
444 496 else:
445 497 self.log.critical(
446 498 'Cluster is already running with [pid=%s]. '
447 499 'use "ipclusterz stop" to stop the cluster.' % pid
448 500 )
449 501 # Here I exit with a unusual exit status that other processes
450 502 # can watch for to learn how I existed.
451 503 self.exit(ALREADY_STARTED)
452 504
453 505 # Now log and daemonize
454 506 self.log.info(
455 507 'Starting ipclusterz with [daemon=%r]' % config.Global.daemonize
456 508 )
457 509 # TODO: Get daemonize working on Windows or as a Windows Server.
458 510 if config.Global.daemonize:
459 511 if os.name=='posix':
460 512 from twisted.scripts._twistd_unix import daemonize
461 513 daemonize()
462 514
463 515 # Now write the new pid file AFTER our new forked pid is active.
464 516 self.write_pid_file()
465 517 try:
466 518 self.loop.start()
467 519 except:
468 520 self.log.info("stopping...")
469 521 self.remove_pid_file()
470 522
523 def start_app_engines(self):
524 """Start the app for the start subcommand."""
525 config = self.master_config
526 # First see if the cluster is already running
527
528 # Now log and daemonize
529 self.log.info(
530 'Starting engines with [daemon=%r]' % config.Global.daemonize
531 )
532 # TODO: Get daemonize working on Windows or as a Windows Server.
533 if config.Global.daemonize:
534 if os.name=='posix':
535 from twisted.scripts._twistd_unix import daemonize
536 daemonize()
537
538 # Now write the new pid file AFTER our new forked pid is active.
539 # self.write_pid_file()
540 try:
541 self.loop.start()
542 except:
543 self.log.fatal("stopping...")
544 # self.remove_pid_file()
545
471 546 def start_app_stop(self):
472 547 """Start the app for the stop subcommand."""
473 548 config = self.master_config
474 549 try:
475 550 pid = self.get_pid_from_file()
476 551 except PIDFileError:
477 552 self.log.critical(
478 553 'Problem reading pid file, cluster is probably not running.'
479 554 )
480 555 # Here I exit with a unusual exit status that other processes
481 556 # can watch for to learn how I existed.
482 557 self.exit(ALREADY_STOPPED)
483 558 else:
484 559 if os.name=='posix':
485 560 sig = config.Global.signal
486 561 self.log.info(
487 562 "Stopping cluster [pid=%r] with [signal=%r]" % (pid, sig)
488 563 )
489 564 os.kill(pid, sig)
490 565 elif os.name=='nt':
491 566 # As of right now, we don't support daemonize on Windows, so
492 567 # stop will not do anything. Minimally, it should clean up the
493 568 # old .pid files.
494 569 self.remove_pid_file()
495 570
496 571
497 572 def launch_new_instance():
498 573 """Create and run the IPython cluster."""
499 574 app = IPClusterApp()
500 575 app.start()
501 576
502 577
503 578 if __name__ == '__main__':
504 579 launch_new_instance()
505 580
@@ -1,835 +1,844 b''
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3 """
4 4 Facilities for launching IPython processes asynchronously.
5 5 """
6 6
7 7 #-----------------------------------------------------------------------------
8 8 # Copyright (C) 2008-2009 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING, distributed as part of this software.
12 12 #-----------------------------------------------------------------------------
13 13
14 14 #-----------------------------------------------------------------------------
15 15 # Imports
16 16 #-----------------------------------------------------------------------------
17 17
18 18 import os
19 19 import re
20 20 import sys
21 21 import logging
22 22
23 23 from signal import SIGINT, SIGTERM
24 24 try:
25 25 from signal import SIGKILL
26 26 except ImportError:
27 27 SIGKILL=SIGTERM
28 28
29 29 from subprocess import Popen, PIPE, STDOUT
30 30 try:
31 from subprocess import check_open
31 from subprocess import check_output
32 32 except ImportError:
33 33 # pre-2.7:
34 34 from StringIO import StringIO
35 35
36 def check_open(*args, **kwargs):
36 def check_output(*args, **kwargs):
37 37 sio = StringIO()
38 38 kwargs.update(dict(stdout=PIPE, stderr=STDOUT))
39 39 p = Popen(*args, **kwargs)
40 40 out,err = p.communicate()
41 41 return out
42 42
43 43 from zmq.eventloop import ioloop
44 44
45 45 from IPython.external import Itpl
46 46 # from IPython.config.configurable import Configurable
47 47 from IPython.utils.traitlets import Str, Int, List, Unicode, Instance
48 48 from IPython.utils.path import get_ipython_module_path
49 49 from IPython.utils.process import find_cmd, pycmd2argv, FindCmdError
50 50
51 51 from factory import LoggingFactory
52 52
53 53 # load winhpcjob from IPython.kernel
54 54 try:
55 55 from IPython.kernel.winhpcjob import (
56 56 IPControllerTask, IPEngineTask,
57 57 IPControllerJob, IPEngineSetJob
58 58 )
59 59 except ImportError:
60 60 pass
61 61
62 62
63 63 #-----------------------------------------------------------------------------
64 64 # Paths to the kernel apps
65 65 #-----------------------------------------------------------------------------
66 66
67 67
68 68 ipcluster_cmd_argv = pycmd2argv(get_ipython_module_path(
69 69 'IPython.zmq.parallel.ipclusterapp'
70 70 ))
71 71
72 72 ipengine_cmd_argv = pycmd2argv(get_ipython_module_path(
73 73 'IPython.zmq.parallel.ipengineapp'
74 74 ))
75 75
76 76 ipcontroller_cmd_argv = pycmd2argv(get_ipython_module_path(
77 77 'IPython.zmq.parallel.ipcontrollerapp'
78 78 ))
79 79
80 80 #-----------------------------------------------------------------------------
81 81 # Base launchers and errors
82 82 #-----------------------------------------------------------------------------
83 83
84 84
85 85 class LauncherError(Exception):
86 86 pass
87 87
88 88
89 89 class ProcessStateError(LauncherError):
90 90 pass
91 91
92 92
93 93 class UnknownStatus(LauncherError):
94 94 pass
95 95
96 96
97 97 class BaseLauncher(LoggingFactory):
98 98 """An asbtraction for starting, stopping and signaling a process."""
99 99
100 100 # In all of the launchers, the work_dir is where child processes will be
101 101 # run. This will usually be the cluster_dir, but may not be. any work_dir
102 102 # passed into the __init__ method will override the config value.
103 103 # This should not be used to set the work_dir for the actual engine
104 104 # and controller. Instead, use their own config files or the
105 105 # controller_args, engine_args attributes of the launchers to add
106 106 # the --work-dir option.
107 107 work_dir = Unicode(u'.')
108 108 loop = Instance('zmq.eventloop.ioloop.IOLoop')
109 109 def _loop_default(self):
110 110 return ioloop.IOLoop.instance()
111 111
112 112 def __init__(self, work_dir=u'.', config=None, **kwargs):
113 113 super(BaseLauncher, self).__init__(work_dir=work_dir, config=config, **kwargs)
114 114 self.state = 'before' # can be before, running, after
115 115 self.stop_callbacks = []
116 116 self.start_data = None
117 117 self.stop_data = None
118 118
119 119 @property
120 120 def args(self):
121 121 """A list of cmd and args that will be used to start the process.
122 122
123 123 This is what is passed to :func:`spawnProcess` and the first element
124 124 will be the process name.
125 125 """
126 126 return self.find_args()
127 127
128 128 def find_args(self):
129 129 """The ``.args`` property calls this to find the args list.
130 130
131 131 Subcommand should implement this to construct the cmd and args.
132 132 """
133 133 raise NotImplementedError('find_args must be implemented in a subclass')
134 134
135 135 @property
136 136 def arg_str(self):
137 137 """The string form of the program arguments."""
138 138 return ' '.join(self.args)
139 139
140 140 @property
141 141 def running(self):
142 142 """Am I running."""
143 143 if self.state == 'running':
144 144 return True
145 145 else:
146 146 return False
147 147
148 148 def start(self):
149 149 """Start the process.
150 150
151 151 This must return a deferred that fires with information about the
152 152 process starting (like a pid, job id, etc.).
153 153 """
154 154 raise NotImplementedError('start must be implemented in a subclass')
155 155
156 156 def stop(self):
157 157 """Stop the process and notify observers of stopping.
158 158
159 159 This must return a deferred that fires with information about the
160 160 processing stopping, like errors that occur while the process is
161 161 attempting to be shut down. This deferred won't fire when the process
162 162 actually stops. To observe the actual process stopping, see
163 163 :func:`observe_stop`.
164 164 """
165 165 raise NotImplementedError('stop must be implemented in a subclass')
166 166
167 167 def on_stop(self, f):
168 168 """Get a deferred that will fire when the process stops.
169 169
170 170 The deferred will fire with data that contains information about
171 171 the exit status of the process.
172 172 """
173 173 if self.state=='after':
174 174 return f(self.stop_data)
175 175 else:
176 176 self.stop_callbacks.append(f)
177 177
178 178 def notify_start(self, data):
179 179 """Call this to trigger startup actions.
180 180
181 181 This logs the process startup and sets the state to 'running'. It is
182 182 a pass-through so it can be used as a callback.
183 183 """
184 184
185 185 self.log.info('Process %r started: %r' % (self.args[0], data))
186 186 self.start_data = data
187 187 self.state = 'running'
188 188 return data
189 189
190 190 def notify_stop(self, data):
191 191 """Call this to trigger process stop actions.
192 192
193 193 This logs the process stopping and sets the state to 'after'. Call
194 194 this to trigger all the deferreds from :func:`observe_stop`."""
195 195
196 196 self.log.info('Process %r stopped: %r' % (self.args[0], data))
197 197 self.stop_data = data
198 198 self.state = 'after'
199 199 for i in range(len(self.stop_callbacks)):
200 200 d = self.stop_callbacks.pop()
201 201 d(data)
202 202 return data
203 203
204 204 def signal(self, sig):
205 205 """Signal the process.
206 206
207 207 Return a semi-meaningless deferred after signaling the process.
208 208
209 209 Parameters
210 210 ----------
211 211 sig : str or int
212 212 'KILL', 'INT', etc., or any signal number
213 213 """
214 214 raise NotImplementedError('signal must be implemented in a subclass')
215 215
216 216
217 217 #-----------------------------------------------------------------------------
218 218 # Local process launchers
219 219 #-----------------------------------------------------------------------------
220 220
221 221
222 222 class LocalProcessLauncher(BaseLauncher):
223 223 """Start and stop an external process in an asynchronous manner.
224 224
225 225 This will launch the external process with a working directory of
226 226 ``self.work_dir``.
227 227 """
228 228
229 229 # This is used to to construct self.args, which is passed to
230 230 # spawnProcess.
231 231 cmd_and_args = List([])
232 232 poll_frequency = Int(100) # in ms
233 233
234 234 def __init__(self, work_dir=u'.', config=None, **kwargs):
235 235 super(LocalProcessLauncher, self).__init__(
236 236 work_dir=work_dir, config=config, **kwargs
237 237 )
238 238 self.process = None
239 239 self.start_deferred = None
240 240 self.poller = None
241 241
242 242 def find_args(self):
243 243 return self.cmd_and_args
244 244
245 245 def start(self):
246 246 if self.state == 'before':
247 247 self.process = Popen(self.args,
248 248 stdout=PIPE,stderr=PIPE,stdin=PIPE,
249 249 env=os.environ,
250 250 cwd=self.work_dir
251 251 )
252 252
253 253 self.loop.add_handler(self.process.stdout.fileno(), self.handle_stdout, self.loop.READ)
254 254 self.loop.add_handler(self.process.stderr.fileno(), self.handle_stderr, self.loop.READ)
255 255 self.poller = ioloop.PeriodicCallback(self.poll, self.poll_frequency, self.loop)
256 256 self.poller.start()
257 257 self.notify_start(self.process.pid)
258 258 else:
259 259 s = 'The process was already started and has state: %r' % self.state
260 260 raise ProcessStateError(s)
261 261
262 262 def stop(self):
263 263 return self.interrupt_then_kill()
264 264
265 265 def signal(self, sig):
266 266 if self.state == 'running':
267 267 self.process.send_signal(sig)
268 268
269 269 def interrupt_then_kill(self, delay=2.0):
270 270 """Send INT, wait a delay and then send KILL."""
271 271 self.signal(SIGINT)
272 272 self.killer = ioloop.DelayedCallback(lambda : self.signal(SIGKILL), delay*1000, self.loop)
273 273 self.killer.start()
274 274
275 275 # callbacks, etc:
276 276
277 277 def handle_stdout(self, fd, events):
278 278 line = self.process.stdout.readline()
279 279 # a stopped process will be readable but return empty strings
280 280 if line:
281 281 self.log.info(line[:-1])
282 282 else:
283 283 self.poll()
284 284
285 285 def handle_stderr(self, fd, events):
286 286 line = self.process.stderr.readline()
287 287 # a stopped process will be readable but return empty strings
288 288 if line:
289 289 self.log.error(line[:-1])
290 290 else:
291 291 self.poll()
292 292
293 293 def poll(self):
294 294 status = self.process.poll()
295 295 if status is not None:
296 296 self.poller.stop()
297 297 self.loop.remove_handler(self.process.stdout.fileno())
298 298 self.loop.remove_handler(self.process.stderr.fileno())
299 299 self.notify_stop(dict(exit_code=status, pid=self.process.pid))
300 300 return status
301 301
302 302 class LocalControllerLauncher(LocalProcessLauncher):
303 303 """Launch a controller as a regular external process."""
304 304
305 305 controller_cmd = List(ipcontroller_cmd_argv, config=True)
306 306 # Command line arguments to ipcontroller.
307 307 controller_args = List(['--log-to-file','--log-level', str(logging.INFO)], config=True)
308 308
309 309 def find_args(self):
310 310 return self.controller_cmd + self.controller_args
311 311
312 312 def start(self, cluster_dir):
313 313 """Start the controller by cluster_dir."""
314 314 self.controller_args.extend(['--cluster-dir', cluster_dir])
315 315 self.cluster_dir = unicode(cluster_dir)
316 316 self.log.info("Starting LocalControllerLauncher: %r" % self.args)
317 317 return super(LocalControllerLauncher, self).start()
318 318
319 319
320 320 class LocalEngineLauncher(LocalProcessLauncher):
321 321 """Launch a single engine as a regular externall process."""
322 322
323 323 engine_cmd = List(ipengine_cmd_argv, config=True)
324 324 # Command line arguments for ipengine.
325 325 engine_args = List(
326 326 ['--log-to-file','--log-level', str(logging.INFO)], config=True
327 327 )
328 328
329 329 def find_args(self):
330 330 return self.engine_cmd + self.engine_args
331 331
332 332 def start(self, cluster_dir):
333 333 """Start the engine by cluster_dir."""
334 334 self.engine_args.extend(['--cluster-dir', cluster_dir])
335 335 self.cluster_dir = unicode(cluster_dir)
336 336 return super(LocalEngineLauncher, self).start()
337 337
338 338
339 339 class LocalEngineSetLauncher(BaseLauncher):
340 340 """Launch a set of engines as regular external processes."""
341 341
342 342 # Command line arguments for ipengine.
343 343 engine_args = List(
344 344 ['--log-to-file','--log-level', str(logging.INFO)], config=True
345 345 )
346 346 # launcher class
347 347 launcher_class = LocalEngineLauncher
348 348
349 349 def __init__(self, work_dir=u'.', config=None, **kwargs):
350 350 super(LocalEngineSetLauncher, self).__init__(
351 351 work_dir=work_dir, config=config, **kwargs
352 352 )
353 353 self.launchers = {}
354 354 self.stop_data = {}
355 355
356 356 def start(self, n, cluster_dir):
357 357 """Start n engines by profile or cluster_dir."""
358 358 self.cluster_dir = unicode(cluster_dir)
359 359 dlist = []
360 360 for i in range(n):
361 361 el = self.launcher_class(work_dir=self.work_dir, config=self.config, logname=self.log.name)
362 362 # Copy the engine args over to each engine launcher.
363 363 import copy
364 364 el.engine_args = copy.deepcopy(self.engine_args)
365 365 el.on_stop(self._notice_engine_stopped)
366 366 d = el.start(cluster_dir)
367 367 if i==0:
368 368 self.log.info("Starting LocalEngineSetLauncher: %r" % el.args)
369 369 self.launchers[i] = el
370 370 dlist.append(d)
371 371 self.notify_start(dlist)
372 372 # The consumeErrors here could be dangerous
373 373 # dfinal = gatherBoth(dlist, consumeErrors=True)
374 374 # dfinal.addCallback(self.notify_start)
375 375 return dlist
376 376
377 377 def find_args(self):
378 378 return ['engine set']
379 379
380 380 def signal(self, sig):
381 381 dlist = []
382 382 for el in self.launchers.itervalues():
383 383 d = el.signal(sig)
384 384 dlist.append(d)
385 385 # dfinal = gatherBoth(dlist, consumeErrors=True)
386 386 return dlist
387 387
388 388 def interrupt_then_kill(self, delay=1.0):
389 389 dlist = []
390 390 for el in self.launchers.itervalues():
391 391 d = el.interrupt_then_kill(delay)
392 392 dlist.append(d)
393 393 # dfinal = gatherBoth(dlist, consumeErrors=True)
394 394 return dlist
395 395
396 396 def stop(self):
397 397 return self.interrupt_then_kill()
398 398
399 399 def _notice_engine_stopped(self, data):
400 400 print "notice", data
401 401 pid = data['pid']
402 402 for idx,el in self.launchers.iteritems():
403 403 if el.process.pid == pid:
404 404 break
405 405 self.launchers.pop(idx)
406 406 self.stop_data[idx] = data
407 407 if not self.launchers:
408 408 self.notify_stop(self.stop_data)
409 409
410 410
411 411 #-----------------------------------------------------------------------------
412 412 # MPIExec launchers
413 413 #-----------------------------------------------------------------------------
414 414
415 415
416 416 class MPIExecLauncher(LocalProcessLauncher):
417 417 """Launch an external process using mpiexec."""
418 418
419 419 # The mpiexec command to use in starting the process.
420 420 mpi_cmd = List(['mpiexec'], config=True)
421 421 # The command line arguments to pass to mpiexec.
422 422 mpi_args = List([], config=True)
423 423 # The program to start using mpiexec.
424 424 program = List(['date'], config=True)
425 425 # The command line argument to the program.
426 426 program_args = List([], config=True)
427 427 # The number of instances of the program to start.
428 428 n = Int(1, config=True)
429 429
430 430 def find_args(self):
431 431 """Build self.args using all the fields."""
432 432 return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \
433 433 self.program + self.program_args
434 434
435 435 def start(self, n):
436 436 """Start n instances of the program using mpiexec."""
437 437 self.n = n
438 438 return super(MPIExecLauncher, self).start()
439 439
440 440
441 441 class MPIExecControllerLauncher(MPIExecLauncher):
442 442 """Launch a controller using mpiexec."""
443 443
444 444 controller_cmd = List(ipcontroller_cmd_argv, config=True)
445 445 # Command line arguments to ipcontroller.
446 446 controller_args = List(['--log-to-file','--log-level', str(logging.INFO)], config=True)
447 447 n = Int(1, config=False)
448 448
449 449 def start(self, cluster_dir):
450 450 """Start the controller by cluster_dir."""
451 451 self.controller_args.extend(['--cluster-dir', cluster_dir])
452 452 self.cluster_dir = unicode(cluster_dir)
453 453 self.log.info("Starting MPIExecControllerLauncher: %r" % self.args)
454 454 return super(MPIExecControllerLauncher, self).start(1)
455 455
456 456 def find_args(self):
457 457 return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \
458 458 self.controller_cmd + self.controller_args
459 459
460 460
461 461 class MPIExecEngineSetLauncher(MPIExecLauncher):
462 462
463 463 engine_cmd = List(ipengine_cmd_argv, config=True)
464 464 # Command line arguments for ipengine.
465 465 engine_args = List(
466 466 ['--log-to-file','--log-level', str(logging.INFO)], config=True
467 467 )
468 468 n = Int(1, config=True)
469 469
470 470 def start(self, n, cluster_dir):
471 471 """Start n engines by profile or cluster_dir."""
472 472 self.engine_args.extend(['--cluster-dir', cluster_dir])
473 473 self.cluster_dir = unicode(cluster_dir)
474 474 self.n = n
475 475 self.log.info('Starting MPIExecEngineSetLauncher: %r' % self.args)
476 476 return super(MPIExecEngineSetLauncher, self).start(n)
477 477
478 478 def find_args(self):
479 479 return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \
480 480 self.engine_cmd + self.engine_args
481 481
482 482
483 483 #-----------------------------------------------------------------------------
484 484 # SSH launchers
485 485 #-----------------------------------------------------------------------------
486 486
487 487 # TODO: Get SSH Launcher working again.
488 488
489 489 class SSHLauncher(LocalProcessLauncher):
490 490 """A minimal launcher for ssh.
491 491
492 492 To be useful this will probably have to be extended to use the ``sshx``
493 493 idea for environment variables. There could be other things this needs
494 494 as well.
495 495 """
496 496
497 497 ssh_cmd = List(['ssh'], config=True)
498 ssh_args = List([], config=True)
498 ssh_args = List(['-tt'], config=True)
499 499 program = List(['date'], config=True)
500 500 program_args = List([], config=True)
501 501 hostname = Str('', config=True)
502 502 user = Str(os.environ.get('USER','username'), config=True)
503 503 location = Str('')
504 504
505 505 def _hostname_changed(self, name, old, new):
506 506 self.location = '%s@%s' % (self.user, new)
507 507
508 508 def _user_changed(self, name, old, new):
509 509 self.location = '%s@%s' % (new, self.hostname)
510 510
511 511 def find_args(self):
512 512 return self.ssh_cmd + self.ssh_args + [self.location] + \
513 513 self.program + self.program_args
514 514
515 515 def start(self, cluster_dir, hostname=None, user=None):
516 print self.config
516 517 if hostname is not None:
517 518 self.hostname = hostname
518 519 if user is not None:
519 520 self.user = user
521 print (self.location, hostname, user)
520 522 return super(SSHLauncher, self).start()
523
524 def signal(self, sig):
525 if self.state == 'running':
526 # send escaped ssh connection-closer
527 self.process.stdin.write('~.')
528 self.process.stdin.flush()
529
521 530
522 531
523 532 class SSHControllerLauncher(SSHLauncher):
524 533
525 534 program = List(ipcontroller_cmd_argv, config=True)
526 535 # Command line arguments to ipcontroller.
527 536 program_args = List(['--log-to-file','--log-level', str(logging.INFO)], config=True)
528 537
529 538
530 539 class SSHEngineLauncher(SSHLauncher):
531 540 program = List(ipengine_cmd_argv, config=True)
532 541 # Command line arguments for ipengine.
533 542 program_args = List(
534 543 ['--log-to-file','--log-level', str(logging.INFO)], config=True
535 544 )
536 545
537 546 class SSHEngineSetLauncher(LocalEngineSetLauncher):
538 547 launcher_class = SSHEngineLauncher
539 548
540 549
541 550 #-----------------------------------------------------------------------------
542 551 # Windows HPC Server 2008 scheduler launchers
543 552 #-----------------------------------------------------------------------------
544 553
545 554
546 555 # This is only used on Windows.
547 556 def find_job_cmd():
548 557 if os.name=='nt':
549 558 try:
550 559 return find_cmd('job')
551 560 except FindCmdError:
552 561 return 'job'
553 562 else:
554 563 return 'job'
555 564
556 565
557 566 class WindowsHPCLauncher(BaseLauncher):
558 567
559 568 # A regular expression used to get the job id from the output of the
560 569 # submit_command.
561 570 job_id_regexp = Str(r'\d+', config=True)
562 571 # The filename of the instantiated job script.
563 572 job_file_name = Unicode(u'ipython_job.xml', config=True)
564 573 # The full path to the instantiated job script. This gets made dynamically
565 574 # by combining the work_dir with the job_file_name.
566 575 job_file = Unicode(u'')
567 576 # The hostname of the scheduler to submit the job to
568 577 scheduler = Str('', config=True)
569 578 job_cmd = Str(find_job_cmd(), config=True)
570 579
571 def __init__(self, work_dir=u'.', config=None):
580 def __init__(self, work_dir=u'.', config=None, **kwargs):
572 581 super(WindowsHPCLauncher, self).__init__(
573 work_dir=work_dir, config=config
582 work_dir=work_dir, config=config, **kwargs
574 583 )
575 584
576 585 @property
577 586 def job_file(self):
578 587 return os.path.join(self.work_dir, self.job_file_name)
579 588
580 589 def write_job_file(self, n):
581 590 raise NotImplementedError("Implement write_job_file in a subclass.")
582 591
583 592 def find_args(self):
584 593 return ['job.exe']
585 594
586 595 def parse_job_id(self, output):
587 596 """Take the output of the submit command and return the job id."""
588 597 m = re.search(self.job_id_regexp, output)
589 598 if m is not None:
590 599 job_id = m.group()
591 600 else:
592 601 raise LauncherError("Job id couldn't be determined: %s" % output)
593 602 self.job_id = job_id
594 603 self.log.info('Job started with job id: %r' % job_id)
595 604 return job_id
596 605
597 606 def start(self, n):
598 607 """Start n copies of the process using the Win HPC job scheduler."""
599 608 self.write_job_file(n)
600 609 args = [
601 610 'submit',
602 611 '/jobfile:%s' % self.job_file,
603 612 '/scheduler:%s' % self.scheduler
604 613 ]
605 614 self.log.info("Starting Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
606 615 # Twisted will raise DeprecationWarnings if we try to pass unicode to this
607 616 output = check_output([self.job_cmd]+args,
608 617 env=os.environ,
609 618 cwd=self.work_dir,
610 619 stderr=STDOUT
611 620 )
612 621 job_id = self.parse_job_id(output)
613 622 # self.notify_start(job_id)
614 623 return job_id
615 624
616 625 def stop(self):
617 626 args = [
618 627 'cancel',
619 628 self.job_id,
620 629 '/scheduler:%s' % self.scheduler
621 630 ]
622 631 self.log.info("Stopping Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
623 632 try:
624 633 output = check_output([self.job_cmd]+args,
625 634 env=os.environ,
626 635 cwd=self.work_dir,
627 636 stderr=STDOUT
628 637 )
629 638 except:
630 639 output = 'The job already appears to be stoppped: %r' % self.job_id
631 640 self.notify_stop(output) # Pass the output of the kill cmd
632 641 return output
633 642
634 643
635 644 class WindowsHPCControllerLauncher(WindowsHPCLauncher):
636 645
637 646 job_file_name = Unicode(u'ipcontroller_job.xml', config=True)
638 647 extra_args = List([], config=False)
639 648
640 649 def write_job_file(self, n):
641 650 job = IPControllerJob(config=self.config)
642 651
643 652 t = IPControllerTask(config=self.config)
644 653 # The tasks work directory is *not* the actual work directory of
645 654 # the controller. It is used as the base path for the stdout/stderr
646 655 # files that the scheduler redirects to.
647 656 t.work_directory = self.cluster_dir
648 657 # Add the --cluster-dir and from self.start().
649 658 t.controller_args.extend(self.extra_args)
650 659 job.add_task(t)
651 660
652 661 self.log.info("Writing job description file: %s" % self.job_file)
653 662 job.write(self.job_file)
654 663
655 664 @property
656 665 def job_file(self):
657 666 return os.path.join(self.cluster_dir, self.job_file_name)
658 667
659 668 def start(self, cluster_dir):
660 669 """Start the controller by cluster_dir."""
661 670 self.extra_args = ['--cluster-dir', cluster_dir]
662 671 self.cluster_dir = unicode(cluster_dir)
663 672 return super(WindowsHPCControllerLauncher, self).start(1)
664 673
665 674
666 675 class WindowsHPCEngineSetLauncher(WindowsHPCLauncher):
667 676
668 677 job_file_name = Unicode(u'ipengineset_job.xml', config=True)
669 678 extra_args = List([], config=False)
670 679
671 680 def write_job_file(self, n):
672 681 job = IPEngineSetJob(config=self.config)
673 682
674 683 for i in range(n):
675 684 t = IPEngineTask(config=self.config)
676 685 # The tasks work directory is *not* the actual work directory of
677 686 # the engine. It is used as the base path for the stdout/stderr
678 687 # files that the scheduler redirects to.
679 688 t.work_directory = self.cluster_dir
680 689 # Add the --cluster-dir and from self.start().
681 690 t.engine_args.extend(self.extra_args)
682 691 job.add_task(t)
683 692
684 693 self.log.info("Writing job description file: %s" % self.job_file)
685 694 job.write(self.job_file)
686 695
687 696 @property
688 697 def job_file(self):
689 698 return os.path.join(self.cluster_dir, self.job_file_name)
690 699
691 700 def start(self, n, cluster_dir):
692 701 """Start the controller by cluster_dir."""
693 702 self.extra_args = ['--cluster-dir', cluster_dir]
694 703 self.cluster_dir = unicode(cluster_dir)
695 704 return super(WindowsHPCEngineSetLauncher, self).start(n)
696 705
697 706
698 707 #-----------------------------------------------------------------------------
699 708 # Batch (PBS) system launchers
700 709 #-----------------------------------------------------------------------------
701 710
702 711 # TODO: Get PBS launcher working again.
703 712
704 713 class BatchSystemLauncher(BaseLauncher):
705 714 """Launch an external process using a batch system.
706 715
707 716 This class is designed to work with UNIX batch systems like PBS, LSF,
708 717 GridEngine, etc. The overall model is that there are different commands
709 718 like qsub, qdel, etc. that handle the starting and stopping of the process.
710 719
711 720 This class also has the notion of a batch script. The ``batch_template``
712 721 attribute can be set to a string that is a template for the batch script.
713 722 This template is instantiated using Itpl. Thus the template can use
714 723 ${n} fot the number of instances. Subclasses can add additional variables
715 724 to the template dict.
716 725 """
717 726
718 727 # Subclasses must fill these in. See PBSEngineSet
719 728 # The name of the command line program used to submit jobs.
720 729 submit_command = Str('', config=True)
721 730 # The name of the command line program used to delete jobs.
722 731 delete_command = Str('', config=True)
723 732 # A regular expression used to get the job id from the output of the
724 733 # submit_command.
725 734 job_id_regexp = Str('', config=True)
726 735 # The string that is the batch script template itself.
727 736 batch_template = Str('', config=True)
728 737 # The filename of the instantiated batch script.
729 738 batch_file_name = Unicode(u'batch_script', config=True)
730 739 # The full path to the instantiated batch script.
731 740 batch_file = Unicode(u'')
732 741
733 def __init__(self, work_dir=u'.', config=None):
742 def __init__(self, work_dir=u'.', config=None, **kwargs):
734 743 super(BatchSystemLauncher, self).__init__(
735 work_dir=work_dir, config=config
744 work_dir=work_dir, config=config, **kwargs
736 745 )
737 746 self.batch_file = os.path.join(self.work_dir, self.batch_file_name)
738 747 self.context = {}
739 748
740 749 def parse_job_id(self, output):
741 750 """Take the output of the submit command and return the job id."""
742 751 m = re.match(self.job_id_regexp, output)
743 752 if m is not None:
744 753 job_id = m.group()
745 754 else:
746 755 raise LauncherError("Job id couldn't be determined: %s" % output)
747 756 self.job_id = job_id
748 757 self.log.info('Job started with job id: %r' % job_id)
749 758 return job_id
750 759
751 760 def write_batch_script(self, n):
752 761 """Instantiate and write the batch script to the work_dir."""
753 762 self.context['n'] = n
754 763 script_as_string = Itpl.itplns(self.batch_template, self.context)
755 764 self.log.info('Writing instantiated batch script: %s' % self.batch_file)
756 765 f = open(self.batch_file, 'w')
757 766 f.write(script_as_string)
758 767 f.close()
759 768
760 769 def start(self, n):
761 770 """Start n copies of the process using a batch system."""
762 771 self.write_batch_script(n)
763 772 output = check_output([self.submit_command, self.batch_file], env=os.environ, stdout=STDOUT)
764 773 job_id = self.parse_job_id(output)
765 774 # self.notify_start(job_id)
766 775 return job_id
767 776
768 777 def stop(self):
769 output = Popen([self.delete_command, self.job_id], env=os.environ, stderr=STDOUT)
778 output = check_output([self.delete_command, self.job_id], env=os.environ, stderr=STDOUT)
770 779 self.notify_stop(output) # Pass the output of the kill cmd
771 780 return output
772 781
773 782
774 783 class PBSLauncher(BatchSystemLauncher):
775 784 """A BatchSystemLauncher subclass for PBS."""
776 785
777 786 submit_command = Str('qsub', config=True)
778 787 delete_command = Str('qdel', config=True)
779 788 job_id_regexp = Str(r'\d+', config=True)
780 789 batch_template = Str('', config=True)
781 790 batch_file_name = Unicode(u'pbs_batch_script', config=True)
782 791 batch_file = Unicode(u'')
783 792
784 793
785 794 class PBSControllerLauncher(PBSLauncher):
786 795 """Launch a controller using PBS."""
787 796
788 797 batch_file_name = Unicode(u'pbs_batch_script_controller', config=True)
789 798
790 799 def start(self, cluster_dir):
791 800 """Start the controller by profile or cluster_dir."""
792 801 # Here we save profile and cluster_dir in the context so they
793 802 # can be used in the batch script template as ${profile} and
794 803 # ${cluster_dir}
795 804 self.context['cluster_dir'] = cluster_dir
796 805 self.cluster_dir = unicode(cluster_dir)
797 806 self.log.info("Starting PBSControllerLauncher: %r" % self.args)
798 807 return super(PBSControllerLauncher, self).start(1)
799 808
800 809
801 810 class PBSEngineSetLauncher(PBSLauncher):
802 811
803 812 batch_file_name = Unicode(u'pbs_batch_script_engines', config=True)
804 813
805 814 def start(self, n, cluster_dir):
806 815 """Start n engines by profile or cluster_dir."""
807 816 self.program_args.extend(['--cluster-dir', cluster_dir])
808 817 self.cluster_dir = unicode(cluster_dir)
809 818 self.log.info('Starting PBSEngineSetLauncher: %r' % self.args)
810 819 return super(PBSEngineSetLauncher, self).start(n)
811 820
812 821
813 822 #-----------------------------------------------------------------------------
814 823 # A launcher for ipcluster itself!
815 824 #-----------------------------------------------------------------------------
816 825
817 826
818 827 class IPClusterLauncher(LocalProcessLauncher):
819 828 """Launch the ipcluster program in an external process."""
820 829
821 830 ipcluster_cmd = List(ipcluster_cmd_argv, config=True)
822 831 # Command line arguments to pass to ipcluster.
823 832 ipcluster_args = List(
824 833 ['--clean-logs', '--log-to-file', '--log-level', str(logging.INFO)], config=True)
825 834 ipcluster_subcommand = Str('start')
826 835 ipcluster_n = Int(2)
827 836
828 837 def find_args(self):
829 838 return self.ipcluster_cmd + [self.ipcluster_subcommand] + \
830 839 ['-n', repr(self.ipcluster_n)] + self.ipcluster_args
831 840
832 841 def start(self):
833 842 self.log.info("Starting ipcluster: %r" % self.args)
834 843 return super(IPClusterLauncher, self).start()
835 844
General Comments 0
You need to be logged in to leave comments. Login now