##// END OF EJS Templates
scrub twisted/deferred references from launchers...
MinRK -
Show More
@@ -0,0 +1,26 b''
1 """daemonize function from twisted.scripts._twistd_unix."""
2
3 #-----------------------------------------------------------------------------
4 # Copyright (c) Twisted Matrix Laboratories.
5 # See Twisted's LICENSE for details.
6 # http://twistedmatrix.com/
7 #-----------------------------------------------------------------------------
8
9 import os, errno
10
11 def daemonize():
12 # See http://www.erlenstar.demon.co.uk/unix/faq_toc.html#TOC16
13 if os.fork(): # launch child and...
14 os._exit(0) # kill off parent
15 os.setsid()
16 if os.fork(): # launch child and...
17 os._exit(0) # kill off parent again.
18 null = os.open('/dev/null', os.O_RDWR)
19 for i in range(3):
20 try:
21 os.dup2(null, i)
22 except OSError, e:
23 if e.errno != errno.EBADF:
24 raise
25 os.close(null)
26
@@ -1,527 +1,526 b''
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3 """
4 4 The ipcluster application.
5 5
6 6 Authors:
7 7
8 8 * Brian Granger
9 9 * MinRK
10 10
11 11 """
12 12
13 13 #-----------------------------------------------------------------------------
14 14 # Copyright (C) 2008-2011 The IPython Development Team
15 15 #
16 16 # Distributed under the terms of the BSD License. The full license is in
17 17 # the file COPYING, distributed as part of this software.
18 18 #-----------------------------------------------------------------------------
19 19
20 20 #-----------------------------------------------------------------------------
21 21 # Imports
22 22 #-----------------------------------------------------------------------------
23 23
24 24 import errno
25 25 import logging
26 26 import os
27 27 import re
28 28 import signal
29 29
30 30 from subprocess import check_call, CalledProcessError, PIPE
31 31 import zmq
32 32 from zmq.eventloop import ioloop
33 33
34 34 from IPython.config.application import Application, boolean_flag
35 35 from IPython.config.loader import Config
36 36 from IPython.core.newapplication import BaseIPythonApplication, ProfileDir
37 from IPython.utils.daemonize import daemonize
37 38 from IPython.utils.importstring import import_item
38 39 from IPython.utils.traitlets import Int, Unicode, Bool, CFloat, Dict, List
39 40
40 41 from IPython.parallel.apps.baseapp import (
41 42 BaseParallelApplication,
42 43 PIDFileError,
43 44 base_flags, base_aliases
44 45 )
45 46
46 47
47 48 #-----------------------------------------------------------------------------
48 49 # Module level variables
49 50 #-----------------------------------------------------------------------------
50 51
51 52
52 53 default_config_file_name = u'ipcluster_config.py'
53 54
54 55
55 56 _description = """Start an IPython cluster for parallel computing.
56 57
57 58 An IPython cluster consists of 1 controller and 1 or more engines.
58 59 This command automates the startup of these processes using a wide
59 60 range of startup methods (SSH, local processes, PBS, mpiexec,
60 61 Windows HPC Server 2008). To start a cluster with 4 engines on your
61 62 local host simply do 'ipcluster start n=4'. For more complex usage
62 63 you will typically do 'ipcluster create profile=mycluster', then edit
63 64 configuration files, followed by 'ipcluster start profile=mycluster n=4'.
64 65 """
65 66
66 67
67 68 # Exit codes for ipcluster
68 69
69 70 # This will be the exit code if the ipcluster appears to be running because
70 71 # a .pid file exists
71 72 ALREADY_STARTED = 10
72 73
73 74
74 75 # This will be the exit code if ipcluster stop is run, but there is not .pid
75 76 # file to be found.
76 77 ALREADY_STOPPED = 11
77 78
78 79 # This will be the exit code if ipcluster engines is run, but there is not .pid
79 80 # file to be found.
80 81 NO_CLUSTER = 12
81 82
82 83
83 84 #-----------------------------------------------------------------------------
84 85 # Main application
85 86 #-----------------------------------------------------------------------------
86 87 start_help = """Start an IPython cluster for parallel computing
87 88
88 89 Start an ipython cluster by its profile name or cluster
89 90 directory. Cluster directories contain configuration, log and
90 91 security related files and are named using the convention
91 92 'cluster_<profile>' and should be creating using the 'start'
92 93 subcommand of 'ipcluster'. If your cluster directory is in
93 94 the cwd or the ipython directory, you can simply refer to it
94 95 using its profile name, 'ipcluster start n=4 profile=<profile>`,
95 96 otherwise use the 'profile_dir' option.
96 97 """
97 98 stop_help = """Stop a running IPython cluster
98 99
99 100 Stop a running ipython cluster by its profile name or cluster
100 101 directory. Cluster directories are named using the convention
101 102 'cluster_<profile>'. If your cluster directory is in
102 103 the cwd or the ipython directory, you can simply refer to it
103 104 using its profile name, 'ipcluster stop profile=<profile>`, otherwise
104 105 use the 'profile_dir' option.
105 106 """
106 107 engines_help = """Start engines connected to an existing IPython cluster
107 108
108 109 Start one or more engines to connect to an existing Cluster
109 110 by profile name or cluster directory.
110 111 Cluster directories contain configuration, log and
111 112 security related files and are named using the convention
112 113 'cluster_<profile>' and should be creating using the 'start'
113 114 subcommand of 'ipcluster'. If your cluster directory is in
114 115 the cwd or the ipython directory, you can simply refer to it
115 116 using its profile name, 'ipcluster engines n=4 profile=<profile>`,
116 117 otherwise use the 'profile_dir' option.
117 118 """
118 119 create_help = """Create an ipcluster profile by name
119 120
120 121 Create an ipython cluster directory by its profile name or
121 122 cluster directory path. Cluster directories contain
122 123 configuration, log and security related files and are named
123 124 using the convention 'cluster_<profile>'. By default they are
124 125 located in your ipython directory. Once created, you will
125 126 probably need to edit the configuration files in the cluster
126 127 directory to configure your cluster. Most users will create a
127 128 cluster directory by profile name,
128 129 `ipcluster create profile=mycluster`, which will put the directory
129 130 in `<ipython_dir>/cluster_mycluster`.
130 131 """
131 132 list_help = """List available cluster profiles
132 133
133 134 List all available clusters, by cluster directory, that can
134 135 be found in the current working directly or in the ipython
135 136 directory. Cluster directories are named using the convention
136 137 'cluster_<profile>'.
137 138 """
138 139
139 140
140 141 class IPClusterList(BaseIPythonApplication):
141 142 name = u'ipcluster-list'
142 143 description = list_help
143 144
144 145 # empty aliases
145 146 aliases=Dict()
146 147 flags = Dict(base_flags)
147 148
148 149 def _log_level_default(self):
149 150 return 20
150 151
151 152 def list_profile_dirs(self):
152 153 # Find the search paths
153 154 profile_dir_paths = os.environ.get('IPYTHON_PROFILE_PATH','')
154 155 if profile_dir_paths:
155 156 profile_dir_paths = profile_dir_paths.split(':')
156 157 else:
157 158 profile_dir_paths = []
158 159
159 160 ipython_dir = self.ipython_dir
160 161
161 162 paths = [os.getcwd(), ipython_dir] + profile_dir_paths
162 163 paths = list(set(paths))
163 164
164 165 self.log.info('Searching for cluster profiles in paths: %r' % paths)
165 166 for path in paths:
166 167 files = os.listdir(path)
167 168 for f in files:
168 169 full_path = os.path.join(path, f)
169 170 if os.path.isdir(full_path) and f.startswith('profile_') and \
170 171 os.path.isfile(os.path.join(full_path, 'ipcontroller_config.py')):
171 172 profile = f.split('_')[-1]
172 173 start_cmd = 'ipcluster start profile=%s n=4' % profile
173 174 print start_cmd + " ==> " + full_path
174 175
175 176 def start(self):
176 177 self.list_profile_dirs()
177 178
178 179
179 180 # `ipcluster create` will be deprecated when `ipython profile create` or equivalent exists
180 181
181 182 create_flags = {}
182 183 create_flags.update(base_flags)
183 184 create_flags.update(boolean_flag('reset', 'IPClusterCreate.overwrite',
184 185 "reset config files to defaults", "leave existing config files"))
185 186
186 187 class IPClusterCreate(BaseParallelApplication):
187 188 name = u'ipcluster-create'
188 189 description = create_help
189 190 auto_create = Bool(True)
190 191 config_file_name = Unicode(default_config_file_name)
191 192
192 193 flags = Dict(create_flags)
193 194
194 195 aliases = Dict(dict(profile='BaseIPythonApplication.profile'))
195 196
196 197 classes = [ProfileDir]
197 198
198 199
199 200 stop_aliases = dict(
200 201 signal='IPClusterStop.signal',
201 202 profile='BaseIPythonApplication.profile',
202 203 profile_dir='ProfileDir.location',
203 204 )
204 205
205 206 class IPClusterStop(BaseParallelApplication):
206 207 name = u'ipcluster'
207 208 description = stop_help
208 209 config_file_name = Unicode(default_config_file_name)
209 210
210 211 signal = Int(signal.SIGINT, config=True,
211 212 help="signal to use for stopping processes.")
212 213
213 214 aliases = Dict(stop_aliases)
214 215
215 216 def start(self):
216 217 """Start the app for the stop subcommand."""
217 218 try:
218 219 pid = self.get_pid_from_file()
219 220 except PIDFileError:
220 221 self.log.critical(
221 222 'Could not read pid file, cluster is probably not running.'
222 223 )
223 224 # Here I exit with a unusual exit status that other processes
224 225 # can watch for to learn how I existed.
225 226 self.remove_pid_file()
226 227 self.exit(ALREADY_STOPPED)
227 228
228 229 if not self.check_pid(pid):
229 230 self.log.critical(
230 231 'Cluster [pid=%r] is not running.' % pid
231 232 )
232 233 self.remove_pid_file()
233 234 # Here I exit with a unusual exit status that other processes
234 235 # can watch for to learn how I existed.
235 236 self.exit(ALREADY_STOPPED)
236 237
237 238 elif os.name=='posix':
238 239 sig = self.signal
239 240 self.log.info(
240 241 "Stopping cluster [pid=%r] with [signal=%r]" % (pid, sig)
241 242 )
242 243 try:
243 244 os.kill(pid, sig)
244 245 except OSError:
245 246 self.log.error("Stopping cluster failed, assuming already dead.",
246 247 exc_info=True)
247 248 self.remove_pid_file()
248 249 elif os.name=='nt':
249 250 try:
250 251 # kill the whole tree
251 252 p = check_call(['taskkill', '-pid', str(pid), '-t', '-f'], stdout=PIPE,stderr=PIPE)
252 253 except (CalledProcessError, OSError):
253 254 self.log.error("Stopping cluster failed, assuming already dead.",
254 255 exc_info=True)
255 256 self.remove_pid_file()
256 257
257 258 engine_aliases = {}
258 259 engine_aliases.update(base_aliases)
259 260 engine_aliases.update(dict(
260 261 n='IPClusterEngines.n',
261 262 elauncher = 'IPClusterEngines.engine_launcher_class',
262 263 ))
263 264 class IPClusterEngines(BaseParallelApplication):
264 265
265 266 name = u'ipcluster'
266 267 description = engines_help
267 268 usage = None
268 269 config_file_name = Unicode(default_config_file_name)
269 270 default_log_level = logging.INFO
270 271 classes = List()
271 272 def _classes_default(self):
272 273 from IPython.parallel.apps import launcher
273 274 launchers = launcher.all_launchers
274 275 eslaunchers = [ l for l in launchers if 'EngineSet' in l.__name__]
275 276 return [ProfileDir]+eslaunchers
276 277
277 278 n = Int(2, config=True,
278 279 help="The number of engines to start.")
279 280
280 281 engine_launcher_class = Unicode('LocalEngineSetLauncher',
281 282 config=True,
282 283 help="The class for launching a set of Engines."
283 284 )
284 285 daemonize = Bool(False, config=True,
285 286 help='Daemonize the ipcluster program. This implies --log-to-file')
286 287
287 288 def _daemonize_changed(self, name, old, new):
288 289 if new:
289 290 self.log_to_file = True
290 291
291 292 aliases = Dict(engine_aliases)
292 293 # flags = Dict(flags)
293 294 _stopping = False
294 295
295 296 def initialize(self, argv=None):
296 297 super(IPClusterEngines, self).initialize(argv)
297 298 self.init_signal()
298 299 self.init_launchers()
299 300
300 301 def init_launchers(self):
301 302 self.engine_launcher = self.build_launcher(self.engine_launcher_class)
302 303 self.engine_launcher.on_stop(lambda r: self.loop.stop())
303 304
304 305 def init_signal(self):
305 306 # Setup signals
306 307 signal.signal(signal.SIGINT, self.sigint_handler)
307 308
308 309 def build_launcher(self, clsname):
309 310 """import and instantiate a Launcher based on importstring"""
310 311 if '.' not in clsname:
311 312 # not a module, presume it's the raw name in apps.launcher
312 313 clsname = 'IPython.parallel.apps.launcher.'+clsname
313 314 # print repr(clsname)
314 315 klass = import_item(clsname)
315 316
316 317 launcher = klass(
317 318 work_dir=self.profile_dir.location, config=self.config, log=self.log
318 319 )
319 320 return launcher
320 321
321 322 def start_engines(self):
322 323 self.log.info("Starting %i engines"%self.n)
323 324 self.engine_launcher.start(
324 325 self.n,
325 326 self.profile_dir.location
326 327 )
327 328
328 329 def stop_engines(self):
329 330 self.log.info("Stopping Engines...")
330 331 if self.engine_launcher.running:
331 332 d = self.engine_launcher.stop()
332 333 return d
333 334 else:
334 335 return None
335 336
336 337 def stop_launchers(self, r=None):
337 338 if not self._stopping:
338 339 self._stopping = True
339 340 self.log.error("IPython cluster: stopping")
340 341 self.stop_engines()
341 342 # Wait a few seconds to let things shut down.
342 343 dc = ioloop.DelayedCallback(self.loop.stop, 4000, self.loop)
343 344 dc.start()
344 345
345 346 def sigint_handler(self, signum, frame):
346 347 self.log.debug("SIGINT received, stopping launchers...")
347 348 self.stop_launchers()
348 349
349 350 def start_logging(self):
350 351 # Remove old log files of the controller and engine
351 352 if self.clean_logs:
352 353 log_dir = self.profile_dir.log_dir
353 354 for f in os.listdir(log_dir):
354 355 if re.match(r'ip(engine|controller)z-\d+\.(log|err|out)',f):
355 356 os.remove(os.path.join(log_dir, f))
356 357 # This will remove old log files for ipcluster itself
357 358 # super(IPBaseParallelApplication, self).start_logging()
358 359
359 360 def start(self):
360 361 """Start the app for the engines subcommand."""
361 362 self.log.info("IPython cluster: started")
362 363 # First see if the cluster is already running
363 364
364 365 # Now log and daemonize
365 366 self.log.info(
366 367 'Starting engines with [daemon=%r]' % self.daemonize
367 368 )
368 369 # TODO: Get daemonize working on Windows or as a Windows Server.
369 370 if self.daemonize:
370 371 if os.name=='posix':
371 from twisted.scripts._twistd_unix import daemonize
372 372 daemonize()
373 373
374 374 dc = ioloop.DelayedCallback(self.start_engines, 0, self.loop)
375 375 dc.start()
376 376 # Now write the new pid file AFTER our new forked pid is active.
377 377 # self.write_pid_file()
378 378 try:
379 379 self.loop.start()
380 380 except KeyboardInterrupt:
381 381 pass
382 382 except zmq.ZMQError as e:
383 383 if e.errno == errno.EINTR:
384 384 pass
385 385 else:
386 386 raise
387 387
388 388 start_aliases = {}
389 389 start_aliases.update(engine_aliases)
390 390 start_aliases.update(dict(
391 391 delay='IPClusterStart.delay',
392 392 clean_logs='IPClusterStart.clean_logs',
393 393 ))
394 394
395 395 class IPClusterStart(IPClusterEngines):
396 396
397 397 name = u'ipcluster'
398 398 description = start_help
399 399 default_log_level = logging.INFO
400 400 auto_create = Bool(True, config=True,
401 401 help="whether to create the profile_dir if it doesn't exist")
402 402 classes = List()
403 403 def _classes_default(self,):
404 404 from IPython.parallel.apps import launcher
405 405 return [ProfileDir]+launcher.all_launchers
406 406
407 407 clean_logs = Bool(True, config=True,
408 408 help="whether to cleanup old logs before starting")
409 409
410 410 delay = CFloat(1., config=True,
411 411 help="delay (in s) between starting the controller and the engines")
412 412
413 413 controller_launcher_class = Unicode('LocalControllerLauncher',
414 414 config=True,
415 415 help="The class for launching a Controller."
416 416 )
417 417 reset = Bool(False, config=True,
418 418 help="Whether to reset config files as part of '--create'."
419 419 )
420 420
421 421 # flags = Dict(flags)
422 422 aliases = Dict(start_aliases)
423 423
424 424 def init_launchers(self):
425 425 self.controller_launcher = self.build_launcher(self.controller_launcher_class)
426 426 self.engine_launcher = self.build_launcher(self.engine_launcher_class)
427 427 self.controller_launcher.on_stop(self.stop_launchers)
428 428
429 429 def start_controller(self):
430 430 self.controller_launcher.start(
431 431 self.profile_dir.location
432 432 )
433 433
434 434 def stop_controller(self):
435 435 # self.log.info("In stop_controller")
436 436 if self.controller_launcher and self.controller_launcher.running:
437 437 return self.controller_launcher.stop()
438 438
439 439 def stop_launchers(self, r=None):
440 440 if not self._stopping:
441 441 self.stop_controller()
442 442 super(IPClusterStart, self).stop_launchers()
443 443
444 444 def start(self):
445 445 """Start the app for the start subcommand."""
446 446 # First see if the cluster is already running
447 447 try:
448 448 pid = self.get_pid_from_file()
449 449 except PIDFileError:
450 450 pass
451 451 else:
452 452 if self.check_pid(pid):
453 453 self.log.critical(
454 454 'Cluster is already running with [pid=%s]. '
455 455 'use "ipcluster stop" to stop the cluster.' % pid
456 456 )
457 457 # Here I exit with a unusual exit status that other processes
458 458 # can watch for to learn how I existed.
459 459 self.exit(ALREADY_STARTED)
460 460 else:
461 461 self.remove_pid_file()
462 462
463 463
464 464 # Now log and daemonize
465 465 self.log.info(
466 466 'Starting ipcluster with [daemon=%r]' % self.daemonize
467 467 )
468 468 # TODO: Get daemonize working on Windows or as a Windows Server.
469 469 if self.daemonize:
470 470 if os.name=='posix':
471 from twisted.scripts._twistd_unix import daemonize
472 471 daemonize()
473 472
474 473 dc = ioloop.DelayedCallback(self.start_controller, 0, self.loop)
475 474 dc.start()
476 475 dc = ioloop.DelayedCallback(self.start_engines, 1000*self.delay, self.loop)
477 476 dc.start()
478 477 # Now write the new pid file AFTER our new forked pid is active.
479 478 self.write_pid_file()
480 479 try:
481 480 self.loop.start()
482 481 except KeyboardInterrupt:
483 482 pass
484 483 except zmq.ZMQError as e:
485 484 if e.errno == errno.EINTR:
486 485 pass
487 486 else:
488 487 raise
489 488 finally:
490 489 self.remove_pid_file()
491 490
492 491 base='IPython.parallel.apps.ipclusterapp.IPCluster'
493 492
494 493 class IPBaseParallelApplication(Application):
495 494 name = u'ipcluster'
496 495 description = _description
497 496
498 497 subcommands = {'create' : (base+'Create', create_help),
499 498 'list' : (base+'List', list_help),
500 499 'start' : (base+'Start', start_help),
501 500 'stop' : (base+'Stop', stop_help),
502 501 'engines' : (base+'Engines', engines_help),
503 502 }
504 503
505 504 # no aliases or flags for parent App
506 505 aliases = Dict()
507 506 flags = Dict()
508 507
509 508 def start(self):
510 509 if self.subapp is None:
511 510 print "No subcommand specified! Must specify one of: %s"%(self.subcommands.keys())
512 511 print
513 512 self.print_subcommands()
514 513 self.exit(1)
515 514 else:
516 515 return self.subapp.start()
517 516
518 517 def launch_new_instance():
519 518 """Create and run the IPython cluster."""
520 519 app = IPBaseParallelApplication.instance()
521 520 app.initialize()
522 521 app.start()
523 522
524 523
525 524 if __name__ == '__main__':
526 525 launch_new_instance()
527 526
@@ -1,1074 +1,1063 b''
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3 """
4 4 Facilities for launching IPython processes asynchronously.
5 5
6 6 Authors:
7 7
8 8 * Brian Granger
9 9 * MinRK
10 10 """
11 11
12 12 #-----------------------------------------------------------------------------
13 13 # Copyright (C) 2008-2011 The IPython Development Team
14 14 #
15 15 # Distributed under the terms of the BSD License. The full license is in
16 16 # the file COPYING, distributed as part of this software.
17 17 #-----------------------------------------------------------------------------
18 18
19 19 #-----------------------------------------------------------------------------
20 20 # Imports
21 21 #-----------------------------------------------------------------------------
22 22
23 23 import copy
24 24 import logging
25 25 import os
26 26 import re
27 27 import stat
28 28
29 29 # signal imports, handling various platforms, versions
30 30
31 31 from signal import SIGINT, SIGTERM
32 32 try:
33 33 from signal import SIGKILL
34 34 except ImportError:
35 35 # Windows
36 36 SIGKILL=SIGTERM
37 37
38 38 try:
39 39 # Windows >= 2.7, 3.2
40 40 from signal import CTRL_C_EVENT as SIGINT
41 41 except ImportError:
42 42 pass
43 43
44 44 from subprocess import Popen, PIPE, STDOUT
45 45 try:
46 46 from subprocess import check_output
47 47 except ImportError:
48 48 # pre-2.7, define check_output with Popen
49 49 def check_output(*args, **kwargs):
50 50 kwargs.update(dict(stdout=PIPE))
51 51 p = Popen(*args, **kwargs)
52 52 out,err = p.communicate()
53 53 return out
54 54
55 55 from zmq.eventloop import ioloop
56 56
57 57 from IPython.config.application import Application
58 58 from IPython.config.configurable import LoggingConfigurable
59 59 from IPython.utils.text import EvalFormatter
60 60 from IPython.utils.traitlets import Any, Int, List, Unicode, Dict, Instance
61 61 from IPython.utils.path import get_ipython_module_path
62 62 from IPython.utils.process import find_cmd, pycmd2argv, FindCmdError
63 63
64 64 from .win32support import forward_read_events
65 65
66 66 from .winhpcjob import IPControllerTask, IPEngineTask, IPControllerJob, IPEngineSetJob
67 67
68 68 WINDOWS = os.name == 'nt'
69 69
70 70 #-----------------------------------------------------------------------------
71 71 # Paths to the kernel apps
72 72 #-----------------------------------------------------------------------------
73 73
74 74
75 75 ipcluster_cmd_argv = pycmd2argv(get_ipython_module_path(
76 76 'IPython.parallel.apps.ipclusterapp'
77 77 ))
78 78
79 79 ipengine_cmd_argv = pycmd2argv(get_ipython_module_path(
80 80 'IPython.parallel.apps.ipengineapp'
81 81 ))
82 82
83 83 ipcontroller_cmd_argv = pycmd2argv(get_ipython_module_path(
84 84 'IPython.parallel.apps.ipcontrollerapp'
85 85 ))
86 86
87 87 #-----------------------------------------------------------------------------
88 88 # Base launchers and errors
89 89 #-----------------------------------------------------------------------------
90 90
91 91
92 92 class LauncherError(Exception):
93 93 pass
94 94
95 95
96 96 class ProcessStateError(LauncherError):
97 97 pass
98 98
99 99
100 100 class UnknownStatus(LauncherError):
101 101 pass
102 102
103 103
104 104 class BaseLauncher(LoggingConfigurable):
105 105 """An asbtraction for starting, stopping and signaling a process."""
106 106
107 107 # In all of the launchers, the work_dir is where child processes will be
108 108 # run. This will usually be the profile_dir, but may not be. any work_dir
109 109 # passed into the __init__ method will override the config value.
110 110 # This should not be used to set the work_dir for the actual engine
111 111 # and controller. Instead, use their own config files or the
112 112 # controller_args, engine_args attributes of the launchers to add
113 113 # the work_dir option.
114 114 work_dir = Unicode(u'.')
115 115 loop = Instance('zmq.eventloop.ioloop.IOLoop')
116 116
117 117 start_data = Any()
118 118 stop_data = Any()
119 119
120 120 def _loop_default(self):
121 121 return ioloop.IOLoop.instance()
122 122
123 123 def __init__(self, work_dir=u'.', config=None, **kwargs):
124 124 super(BaseLauncher, self).__init__(work_dir=work_dir, config=config, **kwargs)
125 125 self.state = 'before' # can be before, running, after
126 126 self.stop_callbacks = []
127 127 self.start_data = None
128 128 self.stop_data = None
129 129
130 130 @property
131 131 def args(self):
132 132 """A list of cmd and args that will be used to start the process.
133 133
134 134 This is what is passed to :func:`spawnProcess` and the first element
135 135 will be the process name.
136 136 """
137 137 return self.find_args()
138 138
139 139 def find_args(self):
140 140 """The ``.args`` property calls this to find the args list.
141 141
142 142 Subcommand should implement this to construct the cmd and args.
143 143 """
144 144 raise NotImplementedError('find_args must be implemented in a subclass')
145 145
146 146 @property
147 147 def arg_str(self):
148 148 """The string form of the program arguments."""
149 149 return ' '.join(self.args)
150 150
151 151 @property
152 152 def running(self):
153 153 """Am I running."""
154 154 if self.state == 'running':
155 155 return True
156 156 else:
157 157 return False
158 158
159 159 def start(self):
160 """Start the process.
161
162 This must return a deferred that fires with information about the
163 process starting (like a pid, job id, etc.).
164 """
160 """Start the process."""
165 161 raise NotImplementedError('start must be implemented in a subclass')
166 162
167 163 def stop(self):
168 164 """Stop the process and notify observers of stopping.
169 165
170 This must return a deferred that fires with information about the
171 processing stopping, like errors that occur while the process is
172 attempting to be shut down. This deferred won't fire when the process
173 actually stops. To observe the actual process stopping, see
174 :func:`observe_stop`.
166 This method will return None immediately.
167 To observe the actual process stopping, see :meth:`on_stop`.
175 168 """
176 169 raise NotImplementedError('stop must be implemented in a subclass')
177 170
178 171 def on_stop(self, f):
179 """Get a deferred that will fire when the process stops.
180
181 The deferred will fire with data that contains information about
182 the exit status of the process.
172 """Register a callback to be called with this Launcher's stop_data
173 when the process actually finishes.
183 174 """
184 175 if self.state=='after':
185 176 return f(self.stop_data)
186 177 else:
187 178 self.stop_callbacks.append(f)
188 179
189 180 def notify_start(self, data):
190 181 """Call this to trigger startup actions.
191 182
192 183 This logs the process startup and sets the state to 'running'. It is
193 184 a pass-through so it can be used as a callback.
194 185 """
195 186
196 187 self.log.info('Process %r started: %r' % (self.args[0], data))
197 188 self.start_data = data
198 189 self.state = 'running'
199 190 return data
200 191
201 192 def notify_stop(self, data):
202 193 """Call this to trigger process stop actions.
203 194
204 195 This logs the process stopping and sets the state to 'after'. Call
205 this to trigger all the deferreds from :func:`observe_stop`."""
196 this to trigger callbacks registered via :meth:`on_stop`."""
206 197
207 198 self.log.info('Process %r stopped: %r' % (self.args[0], data))
208 199 self.stop_data = data
209 200 self.state = 'after'
210 201 for i in range(len(self.stop_callbacks)):
211 202 d = self.stop_callbacks.pop()
212 203 d(data)
213 204 return data
214 205
215 206 def signal(self, sig):
216 207 """Signal the process.
217 208
218 Return a semi-meaningless deferred after signaling the process.
219
220 209 Parameters
221 210 ----------
222 211 sig : str or int
223 212 'KILL', 'INT', etc., or any signal number
224 213 """
225 214 raise NotImplementedError('signal must be implemented in a subclass')
226 215
227 216
228 217 #-----------------------------------------------------------------------------
229 218 # Local process launchers
230 219 #-----------------------------------------------------------------------------
231 220
232 221
233 222 class LocalProcessLauncher(BaseLauncher):
234 223 """Start and stop an external process in an asynchronous manner.
235 224
236 225 This will launch the external process with a working directory of
237 226 ``self.work_dir``.
238 227 """
239 228
240 229 # This is used to to construct self.args, which is passed to
241 230 # spawnProcess.
242 231 cmd_and_args = List([])
243 232 poll_frequency = Int(100) # in ms
244 233
245 234 def __init__(self, work_dir=u'.', config=None, **kwargs):
246 235 super(LocalProcessLauncher, self).__init__(
247 236 work_dir=work_dir, config=config, **kwargs
248 237 )
249 238 self.process = None
250 self.start_deferred = None
251 239 self.poller = None
252 240
253 241 def find_args(self):
254 242 return self.cmd_and_args
255 243
256 244 def start(self):
257 245 if self.state == 'before':
258 246 self.process = Popen(self.args,
259 247 stdout=PIPE,stderr=PIPE,stdin=PIPE,
260 248 env=os.environ,
261 249 cwd=self.work_dir
262 250 )
263 251 if WINDOWS:
264 252 self.stdout = forward_read_events(self.process.stdout)
265 253 self.stderr = forward_read_events(self.process.stderr)
266 254 else:
267 255 self.stdout = self.process.stdout.fileno()
268 256 self.stderr = self.process.stderr.fileno()
269 257 self.loop.add_handler(self.stdout, self.handle_stdout, self.loop.READ)
270 258 self.loop.add_handler(self.stderr, self.handle_stderr, self.loop.READ)
271 259 self.poller = ioloop.PeriodicCallback(self.poll, self.poll_frequency, self.loop)
272 260 self.poller.start()
273 261 self.notify_start(self.process.pid)
274 262 else:
275 263 s = 'The process was already started and has state: %r' % self.state
276 264 raise ProcessStateError(s)
277 265
278 266 def stop(self):
279 267 return self.interrupt_then_kill()
280 268
281 269 def signal(self, sig):
282 270 if self.state == 'running':
283 271 if WINDOWS and sig != SIGINT:
284 272 # use Windows tree-kill for better child cleanup
285 273 check_output(['taskkill', '-pid', str(self.process.pid), '-t', '-f'])
286 274 else:
287 275 self.process.send_signal(sig)
288 276
289 277 def interrupt_then_kill(self, delay=2.0):
290 278 """Send INT, wait a delay and then send KILL."""
291 279 try:
292 280 self.signal(SIGINT)
293 281 except Exception:
294 282 self.log.debug("interrupt failed")
295 283 pass
296 284 self.killer = ioloop.DelayedCallback(lambda : self.signal(SIGKILL), delay*1000, self.loop)
297 285 self.killer.start()
298 286
299 287 # callbacks, etc:
300 288
301 289 def handle_stdout(self, fd, events):
302 290 if WINDOWS:
303 291 line = self.stdout.recv()
304 292 else:
305 293 line = self.process.stdout.readline()
306 294 # a stopped process will be readable but return empty strings
307 295 if line:
308 296 self.log.info(line[:-1])
309 297 else:
310 298 self.poll()
311 299
312 300 def handle_stderr(self, fd, events):
313 301 if WINDOWS:
314 302 line = self.stderr.recv()
315 303 else:
316 304 line = self.process.stderr.readline()
317 305 # a stopped process will be readable but return empty strings
318 306 if line:
319 307 self.log.error(line[:-1])
320 308 else:
321 309 self.poll()
322 310
323 311 def poll(self):
324 312 status = self.process.poll()
325 313 if status is not None:
326 314 self.poller.stop()
327 315 self.loop.remove_handler(self.stdout)
328 316 self.loop.remove_handler(self.stderr)
329 317 self.notify_stop(dict(exit_code=status, pid=self.process.pid))
330 318 return status
331 319
332 320 class LocalControllerLauncher(LocalProcessLauncher):
333 321 """Launch a controller as a regular external process."""
334 322
335 323 controller_cmd = List(ipcontroller_cmd_argv, config=True,
336 324 help="""Popen command to launch ipcontroller.""")
337 325 # Command line arguments to ipcontroller.
338 326 controller_args = List(['--log-to-file','log_level=%i'%logging.INFO], config=True,
339 327 help="""command-line args to pass to ipcontroller""")
340 328
341 329 def find_args(self):
342 330 return self.controller_cmd + self.controller_args
343 331
344 332 def start(self, profile_dir):
345 333 """Start the controller by profile_dir."""
346 334 self.controller_args.extend(['profile_dir=%s'%profile_dir])
347 335 self.profile_dir = unicode(profile_dir)
348 336 self.log.info("Starting LocalControllerLauncher: %r" % self.args)
349 337 return super(LocalControllerLauncher, self).start()
350 338
351 339
352 340 class LocalEngineLauncher(LocalProcessLauncher):
353 341 """Launch a single engine as a regular externall process."""
354 342
355 343 engine_cmd = List(ipengine_cmd_argv, config=True,
356 344 help="""command to launch the Engine.""")
357 345 # Command line arguments for ipengine.
358 346 engine_args = List(['--log-to-file','log_level=%i'%logging.INFO], config=True,
359 347 help="command-line arguments to pass to ipengine"
360 348 )
361 349
362 350 def find_args(self):
363 351 return self.engine_cmd + self.engine_args
364 352
365 353 def start(self, profile_dir):
366 354 """Start the engine by profile_dir."""
367 355 self.engine_args.extend(['profile_dir=%s'%profile_dir])
368 356 self.profile_dir = unicode(profile_dir)
369 357 return super(LocalEngineLauncher, self).start()
370 358
371 359
372 360 class LocalEngineSetLauncher(BaseLauncher):
373 361 """Launch a set of engines as regular external processes."""
374 362
375 363 # Command line arguments for ipengine.
376 364 engine_args = List(
377 365 ['--log-to-file','log_level=%i'%logging.INFO], config=True,
378 366 help="command-line arguments to pass to ipengine"
379 367 )
380 368 # launcher class
381 369 launcher_class = LocalEngineLauncher
382 370
383 371 launchers = Dict()
384 372 stop_data = Dict()
385 373
386 374 def __init__(self, work_dir=u'.', config=None, **kwargs):
387 375 super(LocalEngineSetLauncher, self).__init__(
388 376 work_dir=work_dir, config=config, **kwargs
389 377 )
390 378 self.stop_data = {}
391 379
392 380 def start(self, n, profile_dir):
393 381 """Start n engines by profile or profile_dir."""
394 382 self.profile_dir = unicode(profile_dir)
395 383 dlist = []
396 384 for i in range(n):
397 385 el = self.launcher_class(work_dir=self.work_dir, config=self.config, log=self.log)
398 386 # Copy the engine args over to each engine launcher.
399 387 el.engine_args = copy.deepcopy(self.engine_args)
400 388 el.on_stop(self._notice_engine_stopped)
401 389 d = el.start(profile_dir)
402 390 if i==0:
403 391 self.log.info("Starting LocalEngineSetLauncher: %r" % el.args)
404 392 self.launchers[i] = el
405 393 dlist.append(d)
406 394 self.notify_start(dlist)
407 395 # The consumeErrors here could be dangerous
408 396 # dfinal = gatherBoth(dlist, consumeErrors=True)
409 397 # dfinal.addCallback(self.notify_start)
410 398 return dlist
411 399
412 400 def find_args(self):
413 401 return ['engine set']
414 402
415 403 def signal(self, sig):
416 404 dlist = []
417 405 for el in self.launchers.itervalues():
418 406 d = el.signal(sig)
419 407 dlist.append(d)
420 408 # dfinal = gatherBoth(dlist, consumeErrors=True)
421 409 return dlist
422 410
423 411 def interrupt_then_kill(self, delay=1.0):
424 412 dlist = []
425 413 for el in self.launchers.itervalues():
426 414 d = el.interrupt_then_kill(delay)
427 415 dlist.append(d)
428 416 # dfinal = gatherBoth(dlist, consumeErrors=True)
429 417 return dlist
430 418
431 419 def stop(self):
432 420 return self.interrupt_then_kill()
433 421
434 422 def _notice_engine_stopped(self, data):
435 423 pid = data['pid']
436 424 for idx,el in self.launchers.iteritems():
437 425 if el.process.pid == pid:
438 426 break
439 427 self.launchers.pop(idx)
440 428 self.stop_data[idx] = data
441 429 if not self.launchers:
442 430 self.notify_stop(self.stop_data)
443 431
444 432
445 433 #-----------------------------------------------------------------------------
446 434 # MPIExec launchers
447 435 #-----------------------------------------------------------------------------
448 436
449 437
450 438 class MPIExecLauncher(LocalProcessLauncher):
451 439 """Launch an external process using mpiexec."""
452 440
453 441 mpi_cmd = List(['mpiexec'], config=True,
454 442 help="The mpiexec command to use in starting the process."
455 443 )
456 444 mpi_args = List([], config=True,
457 445 help="The command line arguments to pass to mpiexec."
458 446 )
459 447 program = List(['date'], config=True,
460 448 help="The program to start via mpiexec.")
461 449 program_args = List([], config=True,
462 450 help="The command line argument to the program."
463 451 )
464 452 n = Int(1)
465 453
466 454 def find_args(self):
467 455 """Build self.args using all the fields."""
468 456 return self.mpi_cmd + ['-n', str(self.n)] + self.mpi_args + \
469 457 self.program + self.program_args
470 458
471 459 def start(self, n):
472 460 """Start n instances of the program using mpiexec."""
473 461 self.n = n
474 462 return super(MPIExecLauncher, self).start()
475 463
476 464
477 465 class MPIExecControllerLauncher(MPIExecLauncher):
478 466 """Launch a controller using mpiexec."""
479 467
480 468 controller_cmd = List(ipcontroller_cmd_argv, config=True,
481 469 help="Popen command to launch the Contropper"
482 470 )
483 471 controller_args = List(['--log-to-file','log_level=%i'%logging.INFO], config=True,
484 472 help="Command line arguments to pass to ipcontroller."
485 473 )
486 474 n = Int(1)
487 475
488 476 def start(self, profile_dir):
489 477 """Start the controller by profile_dir."""
490 478 self.controller_args.extend(['profile_dir=%s'%profile_dir])
491 479 self.profile_dir = unicode(profile_dir)
492 480 self.log.info("Starting MPIExecControllerLauncher: %r" % self.args)
493 481 return super(MPIExecControllerLauncher, self).start(1)
494 482
495 483 def find_args(self):
496 484 return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \
497 485 self.controller_cmd + self.controller_args
498 486
499 487
500 488 class MPIExecEngineSetLauncher(MPIExecLauncher):
501 489
502 490 program = List(ipengine_cmd_argv, config=True,
503 491 help="Popen command for ipengine"
504 492 )
505 493 program_args = List(
506 494 ['--log-to-file','log_level=%i'%logging.INFO], config=True,
507 495 help="Command line arguments for ipengine."
508 496 )
509 497 n = Int(1)
510 498
511 499 def start(self, n, profile_dir):
512 500 """Start n engines by profile or profile_dir."""
513 501 self.program_args.extend(['profile_dir=%s'%profile_dir])
514 502 self.profile_dir = unicode(profile_dir)
515 503 self.n = n
516 504 self.log.info('Starting MPIExecEngineSetLauncher: %r' % self.args)
517 505 return super(MPIExecEngineSetLauncher, self).start(n)
518 506
519 507 #-----------------------------------------------------------------------------
520 508 # SSH launchers
521 509 #-----------------------------------------------------------------------------
522 510
523 # TODO: Get SSH Launcher working again.
511 # TODO: Get SSH Launcher back to level of sshx in 0.10.2
524 512
525 513 class SSHLauncher(LocalProcessLauncher):
526 514 """A minimal launcher for ssh.
527 515
528 516 To be useful this will probably have to be extended to use the ``sshx``
529 517 idea for environment variables. There could be other things this needs
530 518 as well.
531 519 """
532 520
533 521 ssh_cmd = List(['ssh'], config=True,
534 522 help="command for starting ssh")
535 523 ssh_args = List(['-tt'], config=True,
536 524 help="args to pass to ssh")
537 525 program = List(['date'], config=True,
538 526 help="Program to launch via ssh")
539 527 program_args = List([], config=True,
540 528 help="args to pass to remote program")
541 529 hostname = Unicode('', config=True,
542 530 help="hostname on which to launch the program")
543 531 user = Unicode('', config=True,
544 532 help="username for ssh")
545 533 location = Unicode('', config=True,
546 534 help="user@hostname location for ssh in one setting")
547 535
548 536 def _hostname_changed(self, name, old, new):
549 537 if self.user:
550 538 self.location = u'%s@%s' % (self.user, new)
551 539 else:
552 540 self.location = new
553 541
554 542 def _user_changed(self, name, old, new):
555 543 self.location = u'%s@%s' % (new, self.hostname)
556 544
557 545 def find_args(self):
558 546 return self.ssh_cmd + self.ssh_args + [self.location] + \
559 547 self.program + self.program_args
560 548
561 549 def start(self, profile_dir, hostname=None, user=None):
562 550 self.profile_dir = unicode(profile_dir)
563 551 if hostname is not None:
564 552 self.hostname = hostname
565 553 if user is not None:
566 554 self.user = user
567 555
568 556 return super(SSHLauncher, self).start()
569 557
570 558 def signal(self, sig):
571 559 if self.state == 'running':
572 560 # send escaped ssh connection-closer
573 561 self.process.stdin.write('~.')
574 562 self.process.stdin.flush()
575 563
576 564
577 565
578 566 class SSHControllerLauncher(SSHLauncher):
579 567
580 568 program = List(ipcontroller_cmd_argv, config=True,
581 569 help="remote ipcontroller command.")
582 570 program_args = List(['--reuse-files', '--log-to-file','log_level=%i'%logging.INFO], config=True,
583 571 help="Command line arguments to ipcontroller.")
584 572
585 573
586 574 class SSHEngineLauncher(SSHLauncher):
587 575 program = List(ipengine_cmd_argv, config=True,
588 576 help="remote ipengine command.")
589 577 # Command line arguments for ipengine.
590 578 program_args = List(
591 579 ['--log-to-file','log_level=%i'%logging.INFO], config=True,
592 580 help="Command line arguments to ipengine."
593 581 )
594 582
595 583 class SSHEngineSetLauncher(LocalEngineSetLauncher):
596 584 launcher_class = SSHEngineLauncher
597 585 engines = Dict(config=True,
598 586 help="""dict of engines to launch. This is a dict by hostname of ints,
599 587 corresponding to the number of engines to start on that host.""")
600 588
601 589 def start(self, n, profile_dir):
602 590 """Start engines by profile or profile_dir.
603 591 `n` is ignored, and the `engines` config property is used instead.
604 592 """
605 593
606 594 self.profile_dir = unicode(profile_dir)
607 595 dlist = []
608 596 for host, n in self.engines.iteritems():
609 597 if isinstance(n, (tuple, list)):
610 598 n, args = n
611 599 else:
612 600 args = copy.deepcopy(self.engine_args)
613 601
614 602 if '@' in host:
615 603 user,host = host.split('@',1)
616 604 else:
617 605 user=None
618 606 for i in range(n):
619 607 el = self.launcher_class(work_dir=self.work_dir, config=self.config, log=self.log)
620 608
621 609 # Copy the engine args over to each engine launcher.
622 610 i
623 611 el.program_args = args
624 612 el.on_stop(self._notice_engine_stopped)
625 613 d = el.start(profile_dir, user=user, hostname=host)
626 614 if i==0:
627 615 self.log.info("Starting SSHEngineSetLauncher: %r" % el.args)
628 616 self.launchers[host+str(i)] = el
629 617 dlist.append(d)
630 618 self.notify_start(dlist)
631 619 return dlist
632 620
633 621
634 622
635 623 #-----------------------------------------------------------------------------
636 624 # Windows HPC Server 2008 scheduler launchers
637 625 #-----------------------------------------------------------------------------
638 626
639 627
640 628 # This is only used on Windows.
641 629 def find_job_cmd():
642 630 if WINDOWS:
643 631 try:
644 632 return find_cmd('job')
645 633 except (FindCmdError, ImportError):
646 634 # ImportError will be raised if win32api is not installed
647 635 return 'job'
648 636 else:
649 637 return 'job'
650 638
651 639
652 640 class WindowsHPCLauncher(BaseLauncher):
653 641
654 642 job_id_regexp = Unicode(r'\d+', config=True,
655 643 help="""A regular expression used to get the job id from the output of the
656 644 submit_command. """
657 645 )
658 646 job_file_name = Unicode(u'ipython_job.xml', config=True,
659 647 help="The filename of the instantiated job script.")
660 648 # The full path to the instantiated job script. This gets made dynamically
661 649 # by combining the work_dir with the job_file_name.
662 650 job_file = Unicode(u'')
663 651 scheduler = Unicode('', config=True,
664 652 help="The hostname of the scheduler to submit the job to.")
665 653 job_cmd = Unicode(find_job_cmd(), config=True,
666 654 help="The command for submitting jobs.")
667 655
668 656 def __init__(self, work_dir=u'.', config=None, **kwargs):
669 657 super(WindowsHPCLauncher, self).__init__(
670 658 work_dir=work_dir, config=config, **kwargs
671 659 )
672 660
673 661 @property
674 662 def job_file(self):
675 663 return os.path.join(self.work_dir, self.job_file_name)
676 664
677 665 def write_job_file(self, n):
678 666 raise NotImplementedError("Implement write_job_file in a subclass.")
679 667
680 668 def find_args(self):
681 669 return [u'job.exe']
682 670
683 671 def parse_job_id(self, output):
684 672 """Take the output of the submit command and return the job id."""
685 673 m = re.search(self.job_id_regexp, output)
686 674 if m is not None:
687 675 job_id = m.group()
688 676 else:
689 677 raise LauncherError("Job id couldn't be determined: %s" % output)
690 678 self.job_id = job_id
691 679 self.log.info('Job started with job id: %r' % job_id)
692 680 return job_id
693 681
694 682 def start(self, n):
695 683 """Start n copies of the process using the Win HPC job scheduler."""
696 684 self.write_job_file(n)
697 685 args = [
698 686 'submit',
699 687 '/jobfile:%s' % self.job_file,
700 688 '/scheduler:%s' % self.scheduler
701 689 ]
702 690 self.log.info("Starting Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
703 # Twisted will raise DeprecationWarnings if we try to pass unicode to this
691
704 692 output = check_output([self.job_cmd]+args,
705 693 env=os.environ,
706 694 cwd=self.work_dir,
707 695 stderr=STDOUT
708 696 )
709 697 job_id = self.parse_job_id(output)
710 698 self.notify_start(job_id)
711 699 return job_id
712 700
713 701 def stop(self):
714 702 args = [
715 703 'cancel',
716 704 self.job_id,
717 705 '/scheduler:%s' % self.scheduler
718 706 ]
719 707 self.log.info("Stopping Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
720 708 try:
721 709 output = check_output([self.job_cmd]+args,
722 710 env=os.environ,
723 711 cwd=self.work_dir,
724 712 stderr=STDOUT
725 713 )
726 714 except:
727 715 output = 'The job already appears to be stoppped: %r' % self.job_id
728 716 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
729 717 return output
730 718
731 719
732 720 class WindowsHPCControllerLauncher(WindowsHPCLauncher):
733 721
734 722 job_file_name = Unicode(u'ipcontroller_job.xml', config=True,
735 723 help="WinHPC xml job file.")
736 724 extra_args = List([], config=False,
737 725 help="extra args to pass to ipcontroller")
738 726
739 727 def write_job_file(self, n):
740 728 job = IPControllerJob(config=self.config)
741 729
742 730 t = IPControllerTask(config=self.config)
743 731 # The tasks work directory is *not* the actual work directory of
744 732 # the controller. It is used as the base path for the stdout/stderr
745 733 # files that the scheduler redirects to.
746 734 t.work_directory = self.profile_dir
747 735 # Add the profile_dir and from self.start().
748 736 t.controller_args.extend(self.extra_args)
749 737 job.add_task(t)
750 738
751 739 self.log.info("Writing job description file: %s" % self.job_file)
752 740 job.write(self.job_file)
753 741
754 742 @property
755 743 def job_file(self):
756 744 return os.path.join(self.profile_dir, self.job_file_name)
757 745
758 746 def start(self, profile_dir):
759 747 """Start the controller by profile_dir."""
760 748 self.extra_args = ['profile_dir=%s'%profile_dir]
761 749 self.profile_dir = unicode(profile_dir)
762 750 return super(WindowsHPCControllerLauncher, self).start(1)
763 751
764 752
765 753 class WindowsHPCEngineSetLauncher(WindowsHPCLauncher):
766 754
767 755 job_file_name = Unicode(u'ipengineset_job.xml', config=True,
768 756 help="jobfile for ipengines job")
769 757 extra_args = List([], config=False,
770 758 help="extra args to pas to ipengine")
771 759
772 760 def write_job_file(self, n):
773 761 job = IPEngineSetJob(config=self.config)
774 762
775 763 for i in range(n):
776 764 t = IPEngineTask(config=self.config)
777 765 # The tasks work directory is *not* the actual work directory of
778 766 # the engine. It is used as the base path for the stdout/stderr
779 767 # files that the scheduler redirects to.
780 768 t.work_directory = self.profile_dir
781 769 # Add the profile_dir and from self.start().
782 770 t.engine_args.extend(self.extra_args)
783 771 job.add_task(t)
784 772
785 773 self.log.info("Writing job description file: %s" % self.job_file)
786 774 job.write(self.job_file)
787 775
788 776 @property
789 777 def job_file(self):
790 778 return os.path.join(self.profile_dir, self.job_file_name)
791 779
792 780 def start(self, n, profile_dir):
793 781 """Start the controller by profile_dir."""
794 782 self.extra_args = ['profile_dir=%s'%profile_dir]
795 783 self.profile_dir = unicode(profile_dir)
796 784 return super(WindowsHPCEngineSetLauncher, self).start(n)
797 785
798 786
799 787 #-----------------------------------------------------------------------------
800 788 # Batch (PBS) system launchers
801 789 #-----------------------------------------------------------------------------
802 790
803 791 class BatchSystemLauncher(BaseLauncher):
804 792 """Launch an external process using a batch system.
805 793
806 794 This class is designed to work with UNIX batch systems like PBS, LSF,
807 795 GridEngine, etc. The overall model is that there are different commands
808 796 like qsub, qdel, etc. that handle the starting and stopping of the process.
809 797
810 798 This class also has the notion of a batch script. The ``batch_template``
811 799 attribute can be set to a string that is a template for the batch script.
812 800 This template is instantiated using string formatting. Thus the template can
813 801 use {n} fot the number of instances. Subclasses can add additional variables
814 802 to the template dict.
815 803 """
816 804
817 805 # Subclasses must fill these in. See PBSEngineSet
818 806 submit_command = List([''], config=True,
819 807 help="The name of the command line program used to submit jobs.")
820 808 delete_command = List([''], config=True,
821 809 help="The name of the command line program used to delete jobs.")
822 810 job_id_regexp = Unicode('', config=True,
823 811 help="""A regular expression used to get the job id from the output of the
824 812 submit_command.""")
825 813 batch_template = Unicode('', config=True,
826 814 help="The string that is the batch script template itself.")
827 815 batch_template_file = Unicode(u'', config=True,
828 816 help="The file that contains the batch template.")
829 817 batch_file_name = Unicode(u'batch_script', config=True,
830 818 help="The filename of the instantiated batch script.")
831 819 queue = Unicode(u'', config=True,
832 820 help="The PBS Queue.")
833 821
834 822 # not configurable, override in subclasses
835 823 # PBS Job Array regex
836 824 job_array_regexp = Unicode('')
837 825 job_array_template = Unicode('')
838 826 # PBS Queue regex
839 827 queue_regexp = Unicode('')
840 828 queue_template = Unicode('')
841 829 # The default batch template, override in subclasses
842 830 default_template = Unicode('')
843 831 # The full path to the instantiated batch script.
844 832 batch_file = Unicode(u'')
845 833 # the format dict used with batch_template:
846 834 context = Dict()
847 835 # the Formatter instance for rendering the templates:
848 836 formatter = Instance(EvalFormatter, (), {})
849 837
850 838
851 839 def find_args(self):
852 840 return self.submit_command + [self.batch_file]
853 841
854 842 def __init__(self, work_dir=u'.', config=None, **kwargs):
855 843 super(BatchSystemLauncher, self).__init__(
856 844 work_dir=work_dir, config=config, **kwargs
857 845 )
858 846 self.batch_file = os.path.join(self.work_dir, self.batch_file_name)
859 847
860 848 def parse_job_id(self, output):
861 849 """Take the output of the submit command and return the job id."""
862 850 m = re.search(self.job_id_regexp, output)
863 851 if m is not None:
864 852 job_id = m.group()
865 853 else:
866 854 raise LauncherError("Job id couldn't be determined: %s" % output)
867 855 self.job_id = job_id
868 856 self.log.info('Job submitted with job id: %r' % job_id)
869 857 return job_id
870 858
871 859 def write_batch_script(self, n):
872 860 """Instantiate and write the batch script to the work_dir."""
873 861 self.context['n'] = n
874 862 self.context['queue'] = self.queue
875 863 # first priority is batch_template if set
876 864 if self.batch_template_file and not self.batch_template:
877 865 # second priority is batch_template_file
878 866 with open(self.batch_template_file) as f:
879 867 self.batch_template = f.read()
880 868 if not self.batch_template:
881 869 # third (last) priority is default_template
882 870 self.batch_template = self.default_template
883 871
884 872 regex = re.compile(self.job_array_regexp)
885 873 # print regex.search(self.batch_template)
886 874 if not regex.search(self.batch_template):
887 875 self.log.info("adding job array settings to batch script")
888 876 firstline, rest = self.batch_template.split('\n',1)
889 877 self.batch_template = u'\n'.join([firstline, self.job_array_template, rest])
890 878
891 879 regex = re.compile(self.queue_regexp)
892 880 # print regex.search(self.batch_template)
893 881 if self.queue and not regex.search(self.batch_template):
894 882 self.log.info("adding PBS queue settings to batch script")
895 883 firstline, rest = self.batch_template.split('\n',1)
896 884 self.batch_template = u'\n'.join([firstline, self.queue_template, rest])
897 885
898 886 script_as_string = self.formatter.format(self.batch_template, **self.context)
899 887 self.log.info('Writing instantiated batch script: %s' % self.batch_file)
900 888
901 889 with open(self.batch_file, 'w') as f:
902 890 f.write(script_as_string)
903 891 os.chmod(self.batch_file, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
904 892
905 893 def start(self, n, profile_dir):
906 894 """Start n copies of the process using a batch system."""
907 895 # Here we save profile_dir in the context so they
908 896 # can be used in the batch script template as {profile_dir}
909 897 self.context['profile_dir'] = profile_dir
910 898 self.profile_dir = unicode(profile_dir)
911 899 self.write_batch_script(n)
912 900 output = check_output(self.args, env=os.environ)
913 901
914 902 job_id = self.parse_job_id(output)
915 903 self.notify_start(job_id)
916 904 return job_id
917 905
918 906 def stop(self):
919 907 output = check_output(self.delete_command+[self.job_id], env=os.environ)
920 908 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
921 909 return output
922 910
923 911
924 912 class PBSLauncher(BatchSystemLauncher):
925 913 """A BatchSystemLauncher subclass for PBS."""
926 914
927 915 submit_command = List(['qsub'], config=True,
928 916 help="The PBS submit command ['qsub']")
929 917 delete_command = List(['qdel'], config=True,
930 918 help="The PBS delete command ['qsub']")
931 919 job_id_regexp = Unicode(r'\d+', config=True,
932 920 help="Regular expresion for identifying the job ID [r'\d+']")
933 921
934 922 batch_file = Unicode(u'')
935 923 job_array_regexp = Unicode('#PBS\W+-t\W+[\w\d\-\$]+')
936 924 job_array_template = Unicode('#PBS -t 1-{n}')
937 925 queue_regexp = Unicode('#PBS\W+-q\W+\$?\w+')
938 926 queue_template = Unicode('#PBS -q {queue}')
939 927
940 928
941 929 class PBSControllerLauncher(PBSLauncher):
942 930 """Launch a controller using PBS."""
943 931
944 932 batch_file_name = Unicode(u'pbs_controller', config=True,
945 933 help="batch file name for the controller job.")
946 934 default_template= Unicode("""#!/bin/sh
947 935 #PBS -V
948 936 #PBS -N ipcontroller
949 937 %s --log-to-file profile_dir={profile_dir}
950 938 """%(' '.join(ipcontroller_cmd_argv)))
951 939
952 940 def start(self, profile_dir):
953 941 """Start the controller by profile or profile_dir."""
954 942 self.log.info("Starting PBSControllerLauncher: %r" % self.args)
955 943 return super(PBSControllerLauncher, self).start(1, profile_dir)
956 944
957 945
958 946 class PBSEngineSetLauncher(PBSLauncher):
959 947 """Launch Engines using PBS"""
960 948 batch_file_name = Unicode(u'pbs_engines', config=True,
961 949 help="batch file name for the engine(s) job.")
962 950 default_template= Unicode(u"""#!/bin/sh
963 951 #PBS -V
964 952 #PBS -N ipengine
965 953 %s profile_dir={profile_dir}
966 954 """%(' '.join(ipengine_cmd_argv)))
967 955
968 956 def start(self, n, profile_dir):
969 957 """Start n engines by profile or profile_dir."""
970 958 self.log.info('Starting %i engines with PBSEngineSetLauncher: %r' % (n, self.args))
971 959 return super(PBSEngineSetLauncher, self).start(n, profile_dir)
972 960
973 961 #SGE is very similar to PBS
974 962
975 963 class SGELauncher(PBSLauncher):
976 964 """Sun GridEngine is a PBS clone with slightly different syntax"""
977 965 job_array_regexp = Unicode('#\$\W+\-t')
978 966 job_array_template = Unicode('#$ -t 1-{n}')
979 967 queue_regexp = Unicode('#\$\W+-q\W+\$?\w+')
980 968 queue_template = Unicode('#$ -q $queue')
981 969
982 970 class SGEControllerLauncher(SGELauncher):
983 971 """Launch a controller using SGE."""
984 972
985 973 batch_file_name = Unicode(u'sge_controller', config=True,
986 974 help="batch file name for the ipontroller job.")
987 975 default_template= Unicode(u"""#$ -V
988 976 #$ -S /bin/sh
989 977 #$ -N ipcontroller
990 978 %s --log-to-file profile_dir={profile_dir}
991 979 """%(' '.join(ipcontroller_cmd_argv)))
992 980
993 981 def start(self, profile_dir):
994 982 """Start the controller by profile or profile_dir."""
995 983 self.log.info("Starting PBSControllerLauncher: %r" % self.args)
996 984 return super(SGEControllerLauncher, self).start(1, profile_dir)
997 985
998 986 class SGEEngineSetLauncher(SGELauncher):
999 987 """Launch Engines with SGE"""
1000 988 batch_file_name = Unicode(u'sge_engines', config=True,
1001 989 help="batch file name for the engine(s) job.")
1002 990 default_template = Unicode("""#$ -V
1003 991 #$ -S /bin/sh
1004 992 #$ -N ipengine
1005 993 %s profile_dir={profile_dir}
1006 994 """%(' '.join(ipengine_cmd_argv)))
1007 995
1008 996 def start(self, n, profile_dir):
1009 997 """Start n engines by profile or profile_dir."""
1010 998 self.log.info('Starting %i engines with SGEEngineSetLauncher: %r' % (n, self.args))
1011 999 return super(SGEEngineSetLauncher, self).start(n, profile_dir)
1012 1000
1013 1001
1014 1002 #-----------------------------------------------------------------------------
1015 1003 # A launcher for ipcluster itself!
1016 1004 #-----------------------------------------------------------------------------
1017 1005
1018 1006
1019 1007 class IPClusterLauncher(LocalProcessLauncher):
1020 1008 """Launch the ipcluster program in an external process."""
1021 1009
1022 1010 ipcluster_cmd = List(ipcluster_cmd_argv, config=True,
1023 1011 help="Popen command for ipcluster")
1024 1012 ipcluster_args = List(
1025 1013 ['--clean-logs', '--log-to-file', 'log_level=%i'%logging.INFO], config=True,
1026 1014 help="Command line arguments to pass to ipcluster.")
1027 1015 ipcluster_subcommand = Unicode('start')
1028 1016 ipcluster_n = Int(2)
1029 1017
1030 1018 def find_args(self):
1031 1019 return self.ipcluster_cmd + ['--'+self.ipcluster_subcommand] + \
1032 1020 ['n=%i'%self.ipcluster_n] + self.ipcluster_args
1033 1021
1034 1022 def start(self):
1035 1023 self.log.info("Starting ipcluster: %r" % self.args)
1036 1024 return super(IPClusterLauncher, self).start()
1037 1025
1038 1026 #-----------------------------------------------------------------------------
1039 1027 # Collections of launchers
1040 1028 #-----------------------------------------------------------------------------
1041 1029
1042 1030 local_launchers = [
1043 1031 LocalControllerLauncher,
1044 1032 LocalEngineLauncher,
1045 1033 LocalEngineSetLauncher,
1046 1034 ]
1047 1035 mpi_launchers = [
1048 1036 MPIExecLauncher,
1049 1037 MPIExecControllerLauncher,
1050 1038 MPIExecEngineSetLauncher,
1051 1039 ]
1052 1040 ssh_launchers = [
1053 1041 SSHLauncher,
1054 1042 SSHControllerLauncher,
1055 1043 SSHEngineLauncher,
1056 1044 SSHEngineSetLauncher,
1057 1045 ]
1058 1046 winhpc_launchers = [
1059 1047 WindowsHPCLauncher,
1060 1048 WindowsHPCControllerLauncher,
1061 1049 WindowsHPCEngineSetLauncher,
1062 1050 ]
1063 1051 pbs_launchers = [
1064 1052 PBSLauncher,
1065 1053 PBSControllerLauncher,
1066 1054 PBSEngineSetLauncher,
1067 1055 ]
1068 1056 sge_launchers = [
1069 1057 SGELauncher,
1070 1058 SGEControllerLauncher,
1071 1059 SGEEngineSetLauncher,
1072 1060 ]
1073 1061 all_launchers = local_launchers + mpi_launchers + ssh_launchers + winhpc_launchers\
1074 1062 + pbs_launchers + sge_launchers
1063
General Comments 0
You need to be logged in to leave comments. Login now