##// END OF EJS Templates
restore auto_create behavior
MinRK -
Show More
@@ -1,525 +1,539 b''
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3 """
4 4 The IPython cluster directory
5 5 """
6 6
7 7 #-----------------------------------------------------------------------------
8 8 # Copyright (C) 2008-2009 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING, distributed as part of this software.
12 12 #-----------------------------------------------------------------------------
13 13
14 14 #-----------------------------------------------------------------------------
15 15 # Imports
16 16 #-----------------------------------------------------------------------------
17 17
18 18 from __future__ import with_statement
19 19
20 20 import os
21 21 import logging
22 22 import re
23 23 import shutil
24 24 import sys
25 25
26 26 from subprocess import Popen, PIPE
27 27
28 28 from IPython.config.loader import PyFileConfigLoader, Config
29 29 from IPython.config.configurable import Configurable
30 30 from IPython.config.application import Application
31 31 from IPython.core.crashhandler import CrashHandler
32 32 from IPython.core.newapplication import BaseIPythonApplication
33 33 from IPython.core import release
34 34 from IPython.utils.path import (
35 35 get_ipython_package_dir,
36 36 get_ipython_dir,
37 37 expand_path
38 38 )
39 39 from IPython.utils.traitlets import Unicode, Bool, CStr, Instance, Dict
40 40
41 41 #-----------------------------------------------------------------------------
42 42 # Module errors
43 43 #-----------------------------------------------------------------------------
44 44
45 45 class ClusterDirError(Exception):
46 46 pass
47 47
48 48
49 49 class PIDFileError(Exception):
50 50 pass
51 51
52 52
53 53 #-----------------------------------------------------------------------------
54 54 # Class for managing cluster directories
55 55 #-----------------------------------------------------------------------------
56 56
57 57 class ClusterDir(Configurable):
58 58 """An object to manage the cluster directory and its resources.
59 59
60 60 The cluster directory is used by :command:`ipengine`,
61 61 :command:`ipcontroller` and :command:`ipclsuter` to manage the
62 62 configuration, logging and security of these applications.
63 63
64 64 This object knows how to find, create and manage these directories. This
65 65 should be used by any code that want's to handle cluster directories.
66 66 """
67 67
68 68 security_dir_name = Unicode('security')
69 69 log_dir_name = Unicode('log')
70 70 pid_dir_name = Unicode('pid')
71 71 security_dir = Unicode(u'')
72 72 log_dir = Unicode(u'')
73 73 pid_dir = Unicode(u'')
74 74
75 auto_create = Bool(False,
76 help="""Whether to automatically create the ClusterDirectory if it does
77 not exist""")
78 overwrite = Bool(False,
79 help="""Whether to overwrite existing config files""")
75 80 location = Unicode(u'', config=True,
76 81 help="""Set the cluster dir. This overrides the logic used by the
77 82 `profile` option.""",
78 83 )
79 profile = Unicode(u'default',
84 profile = Unicode(u'default', config=True,
80 85 help="""The string name of the profile to be used. This determines the name
81 86 of the cluster dir as: cluster_<profile>. The default profile is named
82 87 'default'. The cluster directory is resolve this way if the
83 `cluster_dir` option is not used.""", config=True
88 `cluster_dir` option is not used."""
84 89 )
85 auto_create = Bool(False,
86 help="""Whether to automatically create the ClusterDirectory if it does
87 not exist""")
88 overwrite = Bool(False,
89 help="""Whether to overwrite existing config files""")
90 90
91 91 _location_isset = Bool(False) # flag for detecting multiply set location
92 92 _new_dir = Bool(False) # flag for whether a new dir was created
93 93
94 94 def __init__(self, **kwargs):
95 # make sure auto_create,overwrite are set *before* location
96 for name in ('auto_create', 'overwrite'):
97 v = kwargs.pop(name, None)
98 if v is not None:
99 setattr(self, name, v)
95 100 super(ClusterDir, self).__init__(**kwargs)
96 101 if not self.location:
97 102 self._profile_changed('profile', 'default', self.profile)
98 103
99 104 def _location_changed(self, name, old, new):
100 105 if self._location_isset:
101 106 raise RuntimeError("Cannot set ClusterDir more than once.")
102 107 self._location_isset = True
103 108 if not os.path.isdir(new):
104 if self.auto_create:
109 if self.auto_create:# or self.config.ClusterDir.auto_create:
105 110 os.makedirs(new)
106 111 self._new_dir = True
107 112 else:
108 113 raise ClusterDirError('Directory not found: %s' % new)
109 114
110 115 # ensure config files exist:
111 116 self.copy_all_config_files(overwrite=self.overwrite)
112 117 self.security_dir = os.path.join(new, self.security_dir_name)
113 118 self.log_dir = os.path.join(new, self.log_dir_name)
114 119 self.pid_dir = os.path.join(new, self.pid_dir_name)
115 120 self.check_dirs()
116 121
117 122 def _profile_changed(self, name, old, new):
118 123 if self._location_isset:
119 124 raise RuntimeError("ClusterDir already set. Cannot set by profile.")
120 125 self.location = os.path.join(get_ipython_dir(), 'cluster_'+new)
121 126
122 127 def _log_dir_changed(self, name, old, new):
123 128 self.check_log_dir()
124 129
125 130 def check_log_dir(self):
126 131 if not os.path.isdir(self.log_dir):
127 132 os.mkdir(self.log_dir)
128 133
129 134 def _security_dir_changed(self, name, old, new):
130 135 self.check_security_dir()
131 136
132 137 def check_security_dir(self):
133 138 if not os.path.isdir(self.security_dir):
134 139 os.mkdir(self.security_dir, 0700)
135 140 os.chmod(self.security_dir, 0700)
136 141
137 142 def _pid_dir_changed(self, name, old, new):
138 143 self.check_pid_dir()
139 144
140 145 def check_pid_dir(self):
141 146 if not os.path.isdir(self.pid_dir):
142 147 os.mkdir(self.pid_dir, 0700)
143 148 os.chmod(self.pid_dir, 0700)
144 149
145 150 def check_dirs(self):
146 151 self.check_security_dir()
147 152 self.check_log_dir()
148 153 self.check_pid_dir()
149 154
150 155 def copy_config_file(self, config_file, path=None, overwrite=False):
151 156 """Copy a default config file into the active cluster directory.
152 157
153 158 Default configuration files are kept in :mod:`IPython.config.default`.
154 159 This function moves these from that location to the working cluster
155 160 directory.
156 161 """
157 162 if path is None:
158 163 import IPython.config.default
159 164 path = IPython.config.default.__file__.split(os.path.sep)[:-1]
160 165 path = os.path.sep.join(path)
161 166 src = os.path.join(path, config_file)
162 167 dst = os.path.join(self.location, config_file)
163 168 if not os.path.isfile(dst) or overwrite:
164 169 shutil.copy(src, dst)
165 170
166 171 def copy_all_config_files(self, path=None, overwrite=False):
167 172 """Copy all config files into the active cluster directory."""
168 173 for f in [u'ipcontroller_config.py', u'ipengine_config.py',
169 174 u'ipcluster_config.py']:
170 175 self.copy_config_file(f, path=path, overwrite=overwrite)
171 176
172 177 @classmethod
173 178 def create_cluster_dir(csl, cluster_dir):
174 179 """Create a new cluster directory given a full path.
175 180
176 181 Parameters
177 182 ----------
178 183 cluster_dir : str
179 184 The full path to the cluster directory. If it does exist, it will
180 185 be used. If not, it will be created.
181 186 """
182 187 return ClusterDir(location=cluster_dir)
183 188
184 189 @classmethod
185 190 def create_cluster_dir_by_profile(cls, path, profile=u'default'):
186 191 """Create a cluster dir by profile name and path.
187 192
188 193 Parameters
189 194 ----------
190 195 path : str
191 196 The path (directory) to put the cluster directory in.
192 197 profile : str
193 198 The name of the profile. The name of the cluster directory will
194 199 be "cluster_<profile>".
195 200 """
196 201 if not os.path.isdir(path):
197 202 raise ClusterDirError('Directory not found: %s' % path)
198 203 cluster_dir = os.path.join(path, u'cluster_' + profile)
199 204 return ClusterDir(location=cluster_dir)
200 205
201 206 @classmethod
202 207 def find_cluster_dir_by_profile(cls, ipython_dir, profile=u'default'):
203 208 """Find an existing cluster dir by profile name, return its ClusterDir.
204 209
205 210 This searches through a sequence of paths for a cluster dir. If it
206 211 is not found, a :class:`ClusterDirError` exception will be raised.
207 212
208 213 The search path algorithm is:
209 214 1. ``os.getcwd()``
210 215 2. ``ipython_dir``
211 216 3. The directories found in the ":" separated
212 217 :env:`IPCLUSTER_DIR_PATH` environment variable.
213 218
214 219 Parameters
215 220 ----------
216 221 ipython_dir : unicode or str
217 222 The IPython directory to use.
218 223 profile : unicode or str
219 224 The name of the profile. The name of the cluster directory
220 225 will be "cluster_<profile>".
221 226 """
222 227 dirname = u'cluster_' + profile
223 228 cluster_dir_paths = os.environ.get('IPCLUSTER_DIR_PATH','')
224 229 if cluster_dir_paths:
225 230 cluster_dir_paths = cluster_dir_paths.split(':')
226 231 else:
227 232 cluster_dir_paths = []
228 233 paths = [os.getcwd(), ipython_dir] + cluster_dir_paths
229 234 for p in paths:
230 235 cluster_dir = os.path.join(p, dirname)
231 236 if os.path.isdir(cluster_dir):
232 237 return ClusterDir(location=cluster_dir)
233 238 else:
234 239 raise ClusterDirError('Cluster directory not found in paths: %s' % dirname)
235 240
236 241 @classmethod
237 242 def find_cluster_dir(cls, cluster_dir):
238 243 """Find/create a cluster dir and return its ClusterDir.
239 244
240 245 This will create the cluster directory if it doesn't exist.
241 246
242 247 Parameters
243 248 ----------
244 249 cluster_dir : unicode or str
245 250 The path of the cluster directory. This is expanded using
246 251 :func:`IPython.utils.genutils.expand_path`.
247 252 """
248 253 cluster_dir = expand_path(cluster_dir)
249 254 if not os.path.isdir(cluster_dir):
250 255 raise ClusterDirError('Cluster directory not found: %s' % cluster_dir)
251 256 return ClusterDir(location=cluster_dir)
252 257
253 258
254 259 #-----------------------------------------------------------------------------
255 260 # Crash handler for this application
256 261 #-----------------------------------------------------------------------------
257 262
258 263
259 264 _message_template = """\
260 265 Oops, $self.app_name crashed. We do our best to make it stable, but...
261 266
262 267 A crash report was automatically generated with the following information:
263 268 - A verbatim copy of the crash traceback.
264 269 - Data on your current $self.app_name configuration.
265 270
266 271 It was left in the file named:
267 272 \t'$self.crash_report_fname'
268 273 If you can email this file to the developers, the information in it will help
269 274 them in understanding and correcting the problem.
270 275
271 276 You can mail it to: $self.contact_name at $self.contact_email
272 277 with the subject '$self.app_name Crash Report'.
273 278
274 279 If you want to do it now, the following command will work (under Unix):
275 280 mail -s '$self.app_name Crash Report' $self.contact_email < $self.crash_report_fname
276 281
277 282 To ensure accurate tracking of this issue, please file a report about it at:
278 283 $self.bug_tracker
279 284 """
280 285
281 286 class ClusterDirCrashHandler(CrashHandler):
282 287 """sys.excepthook for IPython itself, leaves a detailed report on disk."""
283 288
284 289 message_template = _message_template
285 290
286 291 def __init__(self, app):
287 292 contact_name = release.authors['Min'][0]
288 293 contact_email = release.authors['Min'][1]
289 294 bug_tracker = 'http://github.com/ipython/ipython/issues'
290 295 super(ClusterDirCrashHandler,self).__init__(
291 296 app, contact_name, contact_email, bug_tracker
292 297 )
293 298
294 299
295 300 #-----------------------------------------------------------------------------
296 301 # Main application
297 302 #-----------------------------------------------------------------------------
298 303 base_aliases = {
299 304 'profile' : "ClusterDir.profile",
300 305 'cluster_dir' : 'ClusterDir.location',
301 306 'auto_create' : 'ClusterDirApplication.auto_create',
302 307 'log_level' : 'ClusterApplication.log_level',
303 308 'work_dir' : 'ClusterApplication.work_dir',
304 309 'log_to_file' : 'ClusterApplication.log_to_file',
305 310 'clean_logs' : 'ClusterApplication.clean_logs',
306 311 'log_url' : 'ClusterApplication.log_url',
307 312 }
308 313
309 314 base_flags = {
310 315 'debug' : ( {"ClusterApplication" : {"log_level" : logging.DEBUG}}, "set loglevel to DEBUG"),
311 316 'quiet' : ( {"ClusterApplication" : {"log_level" : logging.CRITICAL}}, "set loglevel to CRITICAL (minimal output)"),
312 317 'log-to-file' : ( {"ClusterApplication" : {"log_to_file" : True}}, "redirect log output to a file"),
313 318 }
314 319 for k,v in base_flags.iteritems():
315 320 base_flags[k] = (Config(v[0]),v[1])
316 321
317 322 class ClusterApplication(BaseIPythonApplication):
318 323 """An application that puts everything into a cluster directory.
319 324
320 325 Instead of looking for things in the ipython_dir, this type of application
321 326 will use its own private directory called the "cluster directory"
322 327 for things like config files, log files, etc.
323 328
324 329 The cluster directory is resolved as follows:
325 330
326 331 * If the ``--cluster-dir`` option is given, it is used.
327 332 * If ``--cluster-dir`` is not given, the application directory is
328 333 resolve using the profile name as ``cluster_<profile>``. The search
329 334 path for this directory is then i) cwd if it is found there
330 335 and ii) in ipython_dir otherwise.
331 336
332 337 The config file for the application is to be put in the cluster
333 338 dir and named the value of the ``config_file_name`` class attribute.
334 339 """
335 340
336 341 crash_handler_class = ClusterDirCrashHandler
337 342 auto_create_cluster_dir = Bool(True, config=True,
338 343 help="whether to create the cluster_dir if it doesn't exist")
339 344 cluster_dir = Instance(ClusterDir)
340 345 classes = [ClusterDir]
341 346
342 347 def _log_level_default(self):
343 348 # temporarily override default_log_level to INFO
344 349 return logging.INFO
345 350
346 351 work_dir = Unicode(os.getcwdu(), config=True,
347 352 help='Set the working dir for the process.'
348 353 )
349 354 def _work_dir_changed(self, name, old, new):
350 355 self.work_dir = unicode(expand_path(new))
351 356
352 357 log_to_file = Bool(config=True,
353 358 help="whether to log to a file")
354 359
355 360 clean_logs = Bool(False, shortname='--clean-logs', config=True,
356 361 help="whether to cleanup old logfiles before starting")
357 362
358 363 log_url = CStr('', shortname='--log-url', config=True,
359 364 help="The ZMQ URL of the iplooger to aggregate logging.")
360 365
361 366 config_file = Unicode(u'', config=True,
362 367 help="""Path to ipcontroller configuration file. The default is to use
363 368 <appname>_config.py, as found by cluster-dir."""
364 369 )
365 370
366 371 loop = Instance('zmq.eventloop.ioloop.IOLoop')
367 372 def _loop_default(self):
368 373 from zmq.eventloop.ioloop import IOLoop
369 374 return IOLoop.instance()
370 375
371 376 aliases = Dict(base_aliases)
372 377 flags = Dict(base_flags)
373 378
374 379 def init_clusterdir(self):
375 380 """This resolves the cluster directory.
376 381
377 382 This tries to find the cluster directory and if successful, it will
378 383 have done:
379 384 * Sets ``self.cluster_dir_obj`` to the :class:`ClusterDir` object for
380 385 the application.
381 386 * Sets ``self.cluster_dir`` attribute of the application and config
382 387 objects.
383 388
384 389 The algorithm used for this is as follows:
385 390 1. Try ``Global.cluster_dir``.
386 391 2. Try using ``Global.profile``.
387 392 3. If both of these fail and ``self.auto_create_cluster_dir`` is
388 393 ``True``, then create the new cluster dir in the IPython directory.
389 394 4. If all fails, then raise :class:`ClusterDirError`.
390 395 """
391 self.cluster_dir = ClusterDir(config=self.config, auto_create=self.auto_create_cluster_dir)
396 try:
397 self.cluster_dir = ClusterDir(auto_create=self.auto_create_cluster_dir, config=self.config)
398 except ClusterDirError as e:
399 self.log.fatal("Error initializing cluster dir: %s"%e)
400 self.log.fatal("A cluster dir must be created before running this command.")
401 self.log.fatal("Do 'ipcluster create -h' or 'ipcluster list -h' for more "
402 "information about creating and listing cluster dirs."
403 )
404 self.exit(1)
405
392 406 if self.cluster_dir._new_dir:
393 407 self.log.info('Creating new cluster dir: %s' % \
394 408 self.cluster_dir.location)
395 409 else:
396 410 self.log.info('Using existing cluster dir: %s' % \
397 411 self.cluster_dir.location)
398 412
399 413 def initialize(self, argv=None):
400 414 """initialize the app"""
415 self.init_crash_handler()
401 416 self.parse_command_line(argv)
402 417 cl_config = self.config
403 418 self.init_clusterdir()
404 419 if self.config_file:
405 420 self.load_config_file(self.config_file)
406 421 else:
407 422 self.load_config_file(self.default_config_file_name, path=self.cluster_dir.location)
408 423 # command-line should *override* config file, but command-line is necessary
409 424 # to determine clusterdir, etc.
410 425 self.update_config(cl_config)
411 426 self.reinit_logging()
412 427
413 428 self.to_work_dir()
414 429
415 430 def to_work_dir(self):
416 431 wd = self.work_dir
417 432 if unicode(wd) != os.getcwdu():
418 433 os.chdir(wd)
419 434 self.log.info("Changing to working dir: %s" % wd)
420 435
421 436 def load_config_file(self, filename, path=None):
422 437 """Load a .py based config file by filename and path."""
423 438 # use config.application.Application.load_config
424 # instead of inflexible
425 # core.newapplication.BaseIPythonApplication.load_config
439 # instead of inflexible core.newapplication.BaseIPythonApplication.load_config
426 440 return Application.load_config_file(self, filename, path=path)
427 441 #
428 442 # def load_default_config_file(self):
429 443 # """Load a .py based config file by filename and path."""
430 444 # return BaseIPythonApplication.load_config_file(self)
431 445
432 446 # disable URL-logging
433 447 def reinit_logging(self):
434 448 # Remove old log files
435 449 log_dir = self.cluster_dir.log_dir
436 450 if self.clean_logs:
437 451 for f in os.listdir(log_dir):
438 452 if re.match(r'%s-\d+\.(log|err|out)'%self.name,f):
439 453 os.remove(os.path.join(log_dir, f))
440 454 if self.log_to_file:
441 455 # Start logging to the new log file
442 456 log_filename = self.name + u'-' + str(os.getpid()) + u'.log'
443 457 logfile = os.path.join(log_dir, log_filename)
444 458 open_log_file = open(logfile, 'w')
445 459 else:
446 460 open_log_file = None
447 461 if open_log_file is not None:
448 462 self.log.removeHandler(self._log_handler)
449 463 self._log_handler = logging.StreamHandler(open_log_file)
450 464 self._log_formatter = logging.Formatter("[%(name)s] %(message)s")
451 465 self._log_handler.setFormatter(self._log_formatter)
452 466 self.log.addHandler(self._log_handler)
453 467
454 468 def write_pid_file(self, overwrite=False):
455 469 """Create a .pid file in the pid_dir with my pid.
456 470
457 471 This must be called after pre_construct, which sets `self.pid_dir`.
458 472 This raises :exc:`PIDFileError` if the pid file exists already.
459 473 """
460 474 pid_file = os.path.join(self.cluster_dir.pid_dir, self.name + u'.pid')
461 475 if os.path.isfile(pid_file):
462 476 pid = self.get_pid_from_file()
463 477 if not overwrite:
464 478 raise PIDFileError(
465 479 'The pid file [%s] already exists. \nThis could mean that this '
466 480 'server is already running with [pid=%s].' % (pid_file, pid)
467 481 )
468 482 with open(pid_file, 'w') as f:
469 483 self.log.info("Creating pid file: %s" % pid_file)
470 484 f.write(repr(os.getpid())+'\n')
471 485
472 486 def remove_pid_file(self):
473 487 """Remove the pid file.
474 488
475 489 This should be called at shutdown by registering a callback with
476 490 :func:`reactor.addSystemEventTrigger`. This needs to return
477 491 ``None``.
478 492 """
479 493 pid_file = os.path.join(self.cluster_dir.pid_dir, self.name + u'.pid')
480 494 if os.path.isfile(pid_file):
481 495 try:
482 496 self.log.info("Removing pid file: %s" % pid_file)
483 497 os.remove(pid_file)
484 498 except:
485 499 self.log.warn("Error removing the pid file: %s" % pid_file)
486 500
487 501 def get_pid_from_file(self):
488 502 """Get the pid from the pid file.
489 503
490 504 If the pid file doesn't exist a :exc:`PIDFileError` is raised.
491 505 """
492 506 pid_file = os.path.join(self.cluster_dir.pid_dir, self.name + u'.pid')
493 507 if os.path.isfile(pid_file):
494 508 with open(pid_file, 'r') as f:
495 509 pid = int(f.read().strip())
496 510 return pid
497 511 else:
498 512 raise PIDFileError('pid file not found: %s' % pid_file)
499 513
500 514 def check_pid(self, pid):
501 515 if os.name == 'nt':
502 516 try:
503 517 import ctypes
504 518 # returns 0 if no such process (of ours) exists
505 519 # positive int otherwise
506 520 p = ctypes.windll.kernel32.OpenProcess(1,0,pid)
507 521 except Exception:
508 522 self.log.warn(
509 523 "Could not determine whether pid %i is running via `OpenProcess`. "
510 524 " Making the likely assumption that it is."%pid
511 525 )
512 526 return True
513 527 return bool(p)
514 528 else:
515 529 try:
516 530 p = Popen(['ps','x'], stdout=PIPE, stderr=PIPE)
517 531 output,_ = p.communicate()
518 532 except OSError:
519 533 self.log.warn(
520 534 "Could not determine whether pid %i is running via `ps x`. "
521 535 " Making the likely assumption that it is."%pid
522 536 )
523 537 return True
524 538 pids = map(int, re.findall(r'^\W*\d+', output, re.MULTILINE))
525 539 return pid in pids
@@ -1,542 +1,537 b''
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3 """
4 4 The ipcluster application.
5 5 """
6 6
7 7 #-----------------------------------------------------------------------------
8 8 # Copyright (C) 2008-2009 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING, distributed as part of this software.
12 12 #-----------------------------------------------------------------------------
13 13
14 14 #-----------------------------------------------------------------------------
15 15 # Imports
16 16 #-----------------------------------------------------------------------------
17 17
18 18 import errno
19 19 import logging
20 20 import os
21 21 import re
22 22 import signal
23 23
24 24 from subprocess import check_call, CalledProcessError, PIPE
25 25 import zmq
26 26 from zmq.eventloop import ioloop
27 27
28 28 from IPython.config.application import Application, boolean_flag
29 29 from IPython.config.loader import Config
30 30 from IPython.core.newapplication import BaseIPythonApplication
31 31 from IPython.utils.importstring import import_item
32 32 from IPython.utils.traitlets import Int, CStr, CUnicode, Str, Bool, CFloat, Dict, List
33 33
34 34 from IPython.parallel.apps.clusterdir import (
35 35 ClusterApplication, ClusterDirError, ClusterDir,
36 36 PIDFileError,
37 37 base_flags, base_aliases
38 38 )
39 39
40 40
41 41 #-----------------------------------------------------------------------------
42 42 # Module level variables
43 43 #-----------------------------------------------------------------------------
44 44
45 45
46 46 default_config_file_name = u'ipcluster_config.py'
47 47
48 48
49 49 _description = """\
50 50 Start an IPython cluster for parallel computing.\n\n
51 51
52 52 An IPython cluster consists of 1 controller and 1 or more engines.
53 53 This command automates the startup of these processes using a wide
54 54 range of startup methods (SSH, local processes, PBS, mpiexec,
55 55 Windows HPC Server 2008). To start a cluster with 4 engines on your
56 56 local host simply do 'ipcluster start n=4'. For more complex usage
57 57 you will typically do 'ipcluster create profile=mycluster', then edit
58 58 configuration files, followed by 'ipcluster start profile=mycluster n=4'.
59 59 """
60 60
61 61
62 62 # Exit codes for ipcluster
63 63
64 64 # This will be the exit code if the ipcluster appears to be running because
65 65 # a .pid file exists
66 66 ALREADY_STARTED = 10
67 67
68 68
69 69 # This will be the exit code if ipcluster stop is run, but there is not .pid
70 70 # file to be found.
71 71 ALREADY_STOPPED = 11
72 72
73 73 # This will be the exit code if ipcluster engines is run, but there is not .pid
74 74 # file to be found.
75 75 NO_CLUSTER = 12
76 76
77 77
78 78 #-----------------------------------------------------------------------------
79 79 # Main application
80 80 #-----------------------------------------------------------------------------
81 81 start_help = """
82 82 Start an ipython cluster by its profile name or cluster
83 83 directory. Cluster directories contain configuration, log and
84 84 security related files and are named using the convention
85 85 'cluster_<profile>' and should be creating using the 'start'
86 86 subcommand of 'ipcluster'. If your cluster directory is in
87 87 the cwd or the ipython directory, you can simply refer to it
88 88 using its profile name, 'ipcluster start n=4 profile=<profile>`,
89 89 otherwise use the 'cluster_dir' option.
90 90 """
91 91 stop_help = """
92 92 Stop a running ipython cluster by its profile name or cluster
93 93 directory. Cluster directories are named using the convention
94 94 'cluster_<profile>'. If your cluster directory is in
95 95 the cwd or the ipython directory, you can simply refer to it
96 96 using its profile name, 'ipcluster stop profile=<profile>`, otherwise
97 97 use the 'cluster_dir' option.
98 98 """
99 99 engines_help = """
100 100 Start one or more engines to connect to an existing Cluster
101 101 by profile name or cluster directory.
102 102 Cluster directories contain configuration, log and
103 103 security related files and are named using the convention
104 104 'cluster_<profile>' and should be creating using the 'start'
105 105 subcommand of 'ipcluster'. If your cluster directory is in
106 106 the cwd or the ipython directory, you can simply refer to it
107 107 using its profile name, 'ipcluster engines n=4 profile=<profile>`,
108 108 otherwise use the 'cluster_dir' option.
109 109 """
110 110 create_help = """
111 111 Create an ipython cluster directory by its profile name or
112 112 cluster directory path. Cluster directories contain
113 113 configuration, log and security related files and are named
114 114 using the convention 'cluster_<profile>'. By default they are
115 115 located in your ipython directory. Once created, you will
116 116 probably need to edit the configuration files in the cluster
117 117 directory to configure your cluster. Most users will create a
118 118 cluster directory by profile name,
119 119 `ipcluster create profile=mycluster`, which will put the directory
120 120 in `<ipython_dir>/cluster_mycluster`.
121 121 """
122 122 list_help = """List all available clusters, by cluster directory, that can
123 123 be found in the current working directly or in the ipython
124 124 directory. Cluster directories are named using the convention
125 125 'cluster_<profile>'.
126 126 """
127 127
128 128
129 129 class IPClusterList(BaseIPythonApplication):
130 130 name = u'ipcluster-list'
131 131 description = list_help
132 132
133 133 # empty aliases
134 134 aliases=Dict()
135 135 flags = Dict(base_flags)
136 136
137 137 def _log_level_default(self):
138 138 return 20
139 139
140 140 def list_cluster_dirs(self):
141 141 # Find the search paths
142 142 cluster_dir_paths = os.environ.get('IPCLUSTER_DIR_PATH','')
143 143 if cluster_dir_paths:
144 144 cluster_dir_paths = cluster_dir_paths.split(':')
145 145 else:
146 146 cluster_dir_paths = []
147 147
148 148 ipython_dir = self.ipython_dir
149 149
150 150 paths = [os.getcwd(), ipython_dir] + cluster_dir_paths
151 151 paths = list(set(paths))
152 152
153 153 self.log.info('Searching for cluster dirs in paths: %r' % paths)
154 154 for path in paths:
155 155 files = os.listdir(path)
156 156 for f in files:
157 157 full_path = os.path.join(path, f)
158 158 if os.path.isdir(full_path) and f.startswith('cluster_'):
159 159 profile = full_path.split('_')[-1]
160 160 start_cmd = 'ipcluster start profile=%s n=4' % profile
161 161 print start_cmd + " ==> " + full_path
162 162
163 163 def start(self):
164 164 self.list_cluster_dirs()
165 165
166 166 create_flags = {}
167 167 create_flags.update(base_flags)
168 168 create_flags.update(boolean_flag('reset', 'IPClusterCreate.reset',
169 169 "reset config files to defaults", "leave existing config files"))
170 170
171 171 class IPClusterCreate(ClusterApplication):
172 172 name = u'ipcluster'
173 173 description = create_help
174 174 auto_create_cluster_dir = Bool(True,
175 175 help="whether to create the cluster_dir if it doesn't exist")
176 176 default_config_file_name = default_config_file_name
177 177
178 178 reset = Bool(False, config=True,
179 179 help="Whether to reset config files as part of 'create'."
180 180 )
181 181
182 182 flags = Dict(create_flags)
183 183
184 184 aliases = Dict(dict(profile='ClusterDir.profile'))
185 185
186 186 classes = [ClusterDir]
187 187
188 188 def init_clusterdir(self):
189 189 super(IPClusterCreate, self).init_clusterdir()
190 190 self.log.info('Copying default config files to cluster directory '
191 191 '[overwrite=%r]' % (self.reset,))
192 192 self.cluster_dir.copy_all_config_files(overwrite=self.reset)
193 193
194 194 def initialize(self, argv=None):
195 195 self.parse_command_line(argv)
196 196 self.init_clusterdir()
197 197
198 198 stop_aliases = dict(
199 199 signal='IPClusterStop.signal',
200 200 profile='ClusterDir.profile',
201 201 cluster_dir='ClusterDir.location',
202 202 )
203 203
204 204 class IPClusterStop(ClusterApplication):
205 205 name = u'ipcluster'
206 206 description = stop_help
207 auto_create_cluster_dir = Bool(False,
208 help="whether to create the cluster_dir if it doesn't exist")
207 auto_create_cluster_dir = Bool(False)
209 208 default_config_file_name = default_config_file_name
210 209
211 210 signal = Int(signal.SIGINT, config=True,
212 211 help="signal to use for stopping processes.")
213 212
214 213 aliases = Dict(stop_aliases)
215
214
215 def init_clusterdir(self):
216 try:
217 super(IPClusterStop, self).init_clusterdir()
218 except ClusterDirError as e:
219 self.log.fatal("Failed ClusterDir init: %s"%e)
220 self.exit(1)
221
216 222 def start(self):
217 223 """Start the app for the stop subcommand."""
218 224 try:
219 225 pid = self.get_pid_from_file()
220 226 except PIDFileError:
221 227 self.log.critical(
222 228 'Could not read pid file, cluster is probably not running.'
223 229 )
224 230 # Here I exit with a unusual exit status that other processes
225 231 # can watch for to learn how I existed.
226 232 self.remove_pid_file()
227 233 self.exit(ALREADY_STOPPED)
228 234
229 235 if not self.check_pid(pid):
230 236 self.log.critical(
231 237 'Cluster [pid=%r] is not running.' % pid
232 238 )
233 239 self.remove_pid_file()
234 240 # Here I exit with a unusual exit status that other processes
235 241 # can watch for to learn how I existed.
236 242 self.exit(ALREADY_STOPPED)
237 243
238 244 elif os.name=='posix':
239 245 sig = self.signal
240 246 self.log.info(
241 247 "Stopping cluster [pid=%r] with [signal=%r]" % (pid, sig)
242 248 )
243 249 try:
244 250 os.kill(pid, sig)
245 251 except OSError:
246 252 self.log.error("Stopping cluster failed, assuming already dead.",
247 253 exc_info=True)
248 254 self.remove_pid_file()
249 255 elif os.name=='nt':
250 256 try:
251 257 # kill the whole tree
252 258 p = check_call(['taskkill', '-pid', str(pid), '-t', '-f'], stdout=PIPE,stderr=PIPE)
253 259 except (CalledProcessError, OSError):
254 260 self.log.error("Stopping cluster failed, assuming already dead.",
255 261 exc_info=True)
256 262 self.remove_pid_file()
257 263
258 264 engine_aliases = {}
259 265 engine_aliases.update(base_aliases)
260 266 engine_aliases.update(dict(
261 267 n='IPClusterEngines.n',
262 268 elauncher = 'IPClusterEngines.engine_launcher_class',
263 269 ))
264 270 class IPClusterEngines(ClusterApplication):
265 271
266 272 name = u'ipcluster'
267 273 description = engines_help
268 274 usage = None
269 275 default_config_file_name = default_config_file_name
270 276 default_log_level = logging.INFO
271 277 auto_create_cluster_dir = Bool(False)
272 278 classes = List()
273 279 def _classes_default(self):
274 280 from IPython.parallel.apps import launcher
275 281 launchers = launcher.all_launchers
276 282 eslaunchers = [ l for l in launchers if 'EngineSet' in l.__name__]
277 283 return [ClusterDir]+eslaunchers
278 284
279 285 n = Int(2, config=True,
280 286 help="The number of engines to start.")
281 287
282 288 engine_launcher_class = Str('LocalEngineSetLauncher',
283 289 config=True,
284 290 help="The class for launching a set of Engines."
285 291 )
286 292 daemonize = Bool(False, config=True,
287 293 help='Daemonize the ipcluster program. This implies --log-to-file')
288 294
289 295 def _daemonize_changed(self, name, old, new):
290 296 if new:
291 297 self.log_to_file = True
292 298
293 299 aliases = Dict(engine_aliases)
294 300 # flags = Dict(flags)
295 301 _stopping = False
296 302
297 303 def initialize(self, argv=None):
298 304 super(IPClusterEngines, self).initialize(argv)
299 305 self.init_signal()
300 306 self.init_launchers()
301 307
302 308 def init_launchers(self):
303 309 self.engine_launcher = self.build_launcher(self.engine_launcher_class)
304 310 self.engine_launcher.on_stop(lambda r: self.loop.stop())
305 311
306 312 def init_signal(self):
307 313 # Setup signals
308 314 signal.signal(signal.SIGINT, self.sigint_handler)
309 315
310 316 def build_launcher(self, clsname):
311 317 """import and instantiate a Launcher based on importstring"""
312 318 if '.' not in clsname:
313 319 # not a module, presume it's the raw name in apps.launcher
314 320 clsname = 'IPython.parallel.apps.launcher.'+clsname
315 321 # print repr(clsname)
316 322 klass = import_item(clsname)
317 323
318 324 launcher = klass(
319 325 work_dir=self.cluster_dir.location, config=self.config, logname=self.log.name
320 326 )
321 327 return launcher
322 328
323 329 def start_engines(self):
324 330 self.log.info("Starting %i engines"%self.n)
325 331 self.engine_launcher.start(
326 332 self.n,
327 333 cluster_dir=self.cluster_dir.location
328 334 )
329 335
330 336 def stop_engines(self):
331 337 self.log.info("Stopping Engines...")
332 338 if self.engine_launcher.running:
333 339 d = self.engine_launcher.stop()
334 340 return d
335 341 else:
336 342 return None
337 343
338 344 def stop_launchers(self, r=None):
339 345 if not self._stopping:
340 346 self._stopping = True
341 347 self.log.error("IPython cluster: stopping")
342 348 self.stop_engines()
343 349 # Wait a few seconds to let things shut down.
344 350 dc = ioloop.DelayedCallback(self.loop.stop, 4000, self.loop)
345 351 dc.start()
346 352
347 353 def sigint_handler(self, signum, frame):
348 354 self.log.debug("SIGINT received, stopping launchers...")
349 355 self.stop_launchers()
350 356
351 357 def start_logging(self):
352 358 # Remove old log files of the controller and engine
353 359 if self.clean_logs:
354 360 log_dir = self.cluster_dir.log_dir
355 361 for f in os.listdir(log_dir):
356 362 if re.match(r'ip(engine|controller)z-\d+\.(log|err|out)',f):
357 363 os.remove(os.path.join(log_dir, f))
358 364 # This will remove old log files for ipcluster itself
359 365 # super(IPClusterApp, self).start_logging()
360 366
361 367 def start(self):
362 368 """Start the app for the engines subcommand."""
363 369 self.log.info("IPython cluster: started")
364 370 # First see if the cluster is already running
365 371
366 372 # Now log and daemonize
367 373 self.log.info(
368 374 'Starting engines with [daemon=%r]' % self.daemonize
369 375 )
370 376 # TODO: Get daemonize working on Windows or as a Windows Server.
371 377 if self.daemonize:
372 378 if os.name=='posix':
373 379 from twisted.scripts._twistd_unix import daemonize
374 380 daemonize()
375 381
376 382 dc = ioloop.DelayedCallback(self.start_engines, 0, self.loop)
377 383 dc.start()
378 384 # Now write the new pid file AFTER our new forked pid is active.
379 385 # self.write_pid_file()
380 386 try:
381 387 self.loop.start()
382 388 except KeyboardInterrupt:
383 389 pass
384 390 except zmq.ZMQError as e:
385 391 if e.errno == errno.EINTR:
386 392 pass
387 393 else:
388 394 raise
389 395
390 396 start_aliases = {}
391 397 start_aliases.update(engine_aliases)
392 398 start_aliases.update(dict(
393 399 delay='IPClusterStart.delay',
394 400 clean_logs='IPClusterStart.clean_logs',
395 401 ))
396 402
397 403 class IPClusterStart(IPClusterEngines):
398 404
399 405 name = u'ipcluster'
400 406 description = start_help
401 407 usage = None
402 408 default_config_file_name = default_config_file_name
403 409 default_log_level = logging.INFO
404 410 auto_create_cluster_dir = Bool(True, config=True,
405 411 help="whether to create the cluster_dir if it doesn't exist")
406 412 classes = List()
407 413 def _classes_default(self,):
408 414 from IPython.parallel.apps import launcher
409 415 return [ClusterDir]+launcher.all_launchers
410 416
411 417 clean_logs = Bool(True, config=True,
412 418 help="whether to cleanup old logs before starting")
413 419
414 420 delay = CFloat(1., config=True,
415 421 help="delay (in s) between starting the controller and the engines")
416 422
417 423 controller_launcher_class = Str('LocalControllerLauncher',
418 424 config=True,
419 425 help="The class for launching a Controller."
420 426 )
421 427 reset = Bool(False, config=True,
422 428 help="Whether to reset config files as part of '--create'."
423 429 )
424 430
425 431 # flags = Dict(flags)
426 432 aliases = Dict(start_aliases)
427 433
428 def init_clusterdir(self):
429 try:
430 super(IPClusterStart, self).init_clusterdir()
431 except ClusterDirError:
432 raise ClusterDirError(
433 "Could not find a cluster directory. A cluster dir must "
434 "be created before running 'ipcluster start'. Do "
435 "'ipcluster create -h' or 'ipcluster list -h' for more "
436 "information about creating and listing cluster dirs."
437 )
438
439 434 def init_launchers(self):
440 435 self.controller_launcher = self.build_launcher(self.controller_launcher_class)
441 436 self.engine_launcher = self.build_launcher(self.engine_launcher_class)
442 437 self.controller_launcher.on_stop(self.stop_launchers)
443 438
444 439 def start_controller(self):
445 440 self.controller_launcher.start(
446 441 cluster_dir=self.cluster_dir.location
447 442 )
448 443
449 444 def stop_controller(self):
450 445 # self.log.info("In stop_controller")
451 446 if self.controller_launcher and self.controller_launcher.running:
452 447 return self.controller_launcher.stop()
453 448
454 449 def stop_launchers(self, r=None):
455 450 if not self._stopping:
456 451 self.stop_controller()
457 452 super(IPClusterStart, self).stop_launchers()
458 453
459 454 def start(self):
460 455 """Start the app for the start subcommand."""
461 456 # First see if the cluster is already running
462 457 try:
463 458 pid = self.get_pid_from_file()
464 459 except PIDFileError:
465 460 pass
466 461 else:
467 462 if self.check_pid(pid):
468 463 self.log.critical(
469 464 'Cluster is already running with [pid=%s]. '
470 465 'use "ipcluster stop" to stop the cluster.' % pid
471 466 )
472 467 # Here I exit with a unusual exit status that other processes
473 468 # can watch for to learn how I existed.
474 469 self.exit(ALREADY_STARTED)
475 470 else:
476 471 self.remove_pid_file()
477 472
478 473
479 474 # Now log and daemonize
480 475 self.log.info(
481 476 'Starting ipcluster with [daemon=%r]' % self.daemonize
482 477 )
483 478 # TODO: Get daemonize working on Windows or as a Windows Server.
484 479 if self.daemonize:
485 480 if os.name=='posix':
486 481 from twisted.scripts._twistd_unix import daemonize
487 482 daemonize()
488 483
489 484 dc = ioloop.DelayedCallback(self.start_controller, 0, self.loop)
490 485 dc.start()
491 486 dc = ioloop.DelayedCallback(self.start_engines, 1000*self.delay, self.loop)
492 487 dc.start()
493 488 # Now write the new pid file AFTER our new forked pid is active.
494 489 self.write_pid_file()
495 490 try:
496 491 self.loop.start()
497 492 except KeyboardInterrupt:
498 493 pass
499 494 except zmq.ZMQError as e:
500 495 if e.errno == errno.EINTR:
501 496 pass
502 497 else:
503 498 raise
504 499 finally:
505 500 self.remove_pid_file()
506 501
507 502 base='IPython.parallel.apps.ipclusterapp.IPCluster'
508 503
509 504 class IPClusterApp(Application):
510 505 name = u'ipcluster'
511 506 description = _description
512 507
513 508 subcommands = {'create' : (base+'Create', create_help),
514 509 'list' : (base+'List', list_help),
515 510 'start' : (base+'Start', start_help),
516 511 'stop' : (base+'Stop', stop_help),
517 512 'engines' : (base+'Engines', engines_help),
518 513 }
519 514
520 515 # no aliases or flags for parent App
521 516 aliases = Dict()
522 517 flags = Dict()
523 518
524 519 def start(self):
525 520 if self.subapp is None:
526 521 print "No subcommand specified! Must specify one of: %s"%(self.subcommands.keys())
527 522 print
528 523 self.print_subcommands()
529 524 self.exit(1)
530 525 else:
531 526 return self.subapp.start()
532 527
533 528 def launch_new_instance():
534 529 """Create and run the IPython cluster."""
535 530 app = IPClusterApp()
536 531 app.initialize()
537 532 app.start()
538 533
539 534
540 535 if __name__ == '__main__':
541 536 launch_new_instance()
542 537
@@ -1,400 +1,401 b''
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3 """
4 4 The IPython controller application.
5 5 """
6 6
7 7 #-----------------------------------------------------------------------------
8 8 # Copyright (C) 2008-2009 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING, distributed as part of this software.
12 12 #-----------------------------------------------------------------------------
13 13
14 14 #-----------------------------------------------------------------------------
15 15 # Imports
16 16 #-----------------------------------------------------------------------------
17 17
18 18 from __future__ import with_statement
19 19
20 20 import copy
21 21 import os
22 22 import logging
23 23 import socket
24 24 import stat
25 25 import sys
26 26 import uuid
27 27
28 28 from multiprocessing import Process
29 29
30 30 import zmq
31 31 from zmq.devices import ProcessMonitoredQueue
32 32 from zmq.log.handlers import PUBHandler
33 33 from zmq.utils import jsonapi as json
34 34
35 35 from IPython.config.loader import Config
36 36
37 37 from IPython.parallel import factory
38 38
39 39 from IPython.parallel.apps.clusterdir import (
40 40 ClusterDir,
41 41 ClusterApplication,
42 42 base_flags
43 43 # ClusterDirConfigLoader
44 44 )
45 45 from IPython.utils.importstring import import_item
46 46 from IPython.utils.traitlets import Instance, Unicode, Bool, List, CStr, Dict
47 47
48 48 # from IPython.parallel.controller.controller import ControllerFactory
49 49 from IPython.parallel.streamsession import StreamSession
50 50 from IPython.parallel.controller.heartmonitor import HeartMonitor
51 51 from IPython.parallel.controller.hub import Hub, HubFactory
52 52 from IPython.parallel.controller.scheduler import TaskScheduler,launch_scheduler
53 53 from IPython.parallel.controller.sqlitedb import SQLiteDB
54 54
55 55 from IPython.parallel.util import signal_children,disambiguate_ip_address, split_url
56 56
57 57 # conditional import of MongoDB backend class
58 58
59 59 try:
60 60 from IPython.parallel.controller.mongodb import MongoDB
61 61 except ImportError:
62 62 maybe_mongo = []
63 63 else:
64 64 maybe_mongo = [MongoDB]
65 65
66 66
67 67 #-----------------------------------------------------------------------------
68 68 # Module level variables
69 69 #-----------------------------------------------------------------------------
70 70
71 71
72 72 #: The default config file name for this application
73 73 default_config_file_name = u'ipcontroller_config.py'
74 74
75 75
76 76 _description = """Start the IPython controller for parallel computing.
77 77
78 78 The IPython controller provides a gateway between the IPython engines and
79 79 clients. The controller needs to be started before the engines and can be
80 80 configured using command line options or using a cluster directory. Cluster
81 81 directories contain config, log and security files and are usually located in
82 82 your ipython directory and named as "cluster_<profile>". See the --profile
83 83 and --cluster-dir options for details.
84 84 """
85 85
86 86
87 87
88 88
89 89 #-----------------------------------------------------------------------------
90 90 # The main application
91 91 #-----------------------------------------------------------------------------
92 92 flags = {}
93 93 flags.update(base_flags)
94 94 flags.update({
95 95 'usethreads' : ( {'IPControllerApp' : {'usethreads' : True}},
96 96 'Use threads instead of processes for the schedulers'),
97 97 'sqlitedb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.sqlitedb.SQLiteDB'}},
98 98 'use the SQLiteDB backend'),
99 99 'mongodb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.mongodb.MongoDB'}},
100 100 'use the MongoDB backend'),
101 101 'dictdb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.dictdb.DictDB'}},
102 102 'use the in-memory DictDB backend'),
103 103 })
104 104
105 105 flags.update()
106 106
107 107 class IPControllerApp(ClusterApplication):
108 108
109 109 name = u'ipcontroller'
110 110 description = _description
111 111 # command_line_loader = IPControllerAppConfigLoader
112 112 default_config_file_name = default_config_file_name
113 auto_create_cluster_dir = True
114 113 classes = [ClusterDir, StreamSession, HubFactory, TaskScheduler, HeartMonitor, SQLiteDB] + maybe_mongo
115 114
115 auto_create_cluster_dir = Bool(True, config=True,
116 help="Whether to create cluster_dir if it exists.")
116 117 reuse_files = Bool(False, config=True,
117 118 help='Whether to reuse existing json connection files [default: False]'
118 119 )
119 120 secure = Bool(True, config=True,
120 121 help='Whether to use exec_keys for extra authentication [default: True]'
121 122 )
122 123 ssh_server = Unicode(u'', config=True,
123 124 help="""ssh url for clients to use when connecting to the Controller
124 125 processes. It should be of the form: [user@]server[:port]. The
125 126 Controller\'s listening addresses must be accessible from the ssh server""",
126 127 )
127 128 location = Unicode(u'', config=True,
128 129 help="""The external IP or domain name of the Controller, used for disambiguating
129 130 engine and client connections.""",
130 131 )
131 132 import_statements = List([], config=True,
132 133 help="import statements to be run at startup. Necessary in some environments"
133 134 )
134 135
135 136 usethreads = Bool(False, config=True,
136 137 help='Use threads instead of processes for the schedulers',
137 138 )
138 139
139 140 # internal
140 141 children = List()
141 142 mq_class = CStr('zmq.devices.ProcessMonitoredQueue')
142 143
143 144 def _usethreads_changed(self, name, old, new):
144 145 self.mq_class = 'zmq.devices.%sMonitoredQueue'%('Thread' if new else 'Process')
145 146
146 147 aliases = Dict(dict(
147 148 config = 'IPControllerApp.config_file',
148 149 # file = 'IPControllerApp.url_file',
149 150 log_level = 'IPControllerApp.log_level',
150 151 reuse_files = 'IPControllerApp.reuse_files',
151 152 secure = 'IPControllerApp.secure',
152 153 ssh = 'IPControllerApp.ssh_server',
153 154 usethreads = 'IPControllerApp.usethreads',
154 155 import_statements = 'IPControllerApp.import_statements',
155 156 location = 'IPControllerApp.location',
156 157
157 158 ident = 'StreamSession.session',
158 159 user = 'StreamSession.username',
159 160 exec_key = 'StreamSession.keyfile',
160 161
161 162 url = 'HubFactory.url',
162 163 ip = 'HubFactory.ip',
163 164 transport = 'HubFactory.transport',
164 165 port = 'HubFactory.regport',
165 166
166 167 ping = 'HeartMonitor.period',
167 168
168 169 scheme = 'TaskScheduler.scheme_name',
169 170 hwm = 'TaskScheduler.hwm',
170 171
171 172
172 173 profile = "ClusterDir.profile",
173 174 cluster_dir = 'ClusterDir.location',
174 175
175 176 ))
176 177 flags = Dict(flags)
177 178
178 179
179 180 def save_connection_dict(self, fname, cdict):
180 181 """save a connection dict to json file."""
181 182 c = self.config
182 183 url = cdict['url']
183 184 location = cdict['location']
184 185 if not location:
185 186 try:
186 187 proto,ip,port = split_url(url)
187 188 except AssertionError:
188 189 pass
189 190 else:
190 191 location = socket.gethostbyname_ex(socket.gethostname())[2][-1]
191 192 cdict['location'] = location
192 193 fname = os.path.join(self.cluster_dir.security_dir, fname)
193 194 with open(fname, 'w') as f:
194 195 f.write(json.dumps(cdict, indent=2))
195 196 os.chmod(fname, stat.S_IRUSR|stat.S_IWUSR)
196 197
197 198 def load_config_from_json(self):
198 199 """load config from existing json connector files."""
199 200 c = self.config
200 201 # load from engine config
201 202 with open(os.path.join(self.cluster_dir.security_dir, 'ipcontroller-engine.json')) as f:
202 203 cfg = json.loads(f.read())
203 204 key = c.StreamSession.key = cfg['exec_key']
204 205 xport,addr = cfg['url'].split('://')
205 206 c.HubFactory.engine_transport = xport
206 207 ip,ports = addr.split(':')
207 208 c.HubFactory.engine_ip = ip
208 209 c.HubFactory.regport = int(ports)
209 210 self.location = cfg['location']
210 211
211 212 # load client config
212 213 with open(os.path.join(self.cluster_dir.security_dir, 'ipcontroller-client.json')) as f:
213 214 cfg = json.loads(f.read())
214 215 assert key == cfg['exec_key'], "exec_key mismatch between engine and client keys"
215 216 xport,addr = cfg['url'].split('://')
216 217 c.HubFactory.client_transport = xport
217 218 ip,ports = addr.split(':')
218 219 c.HubFactory.client_ip = ip
219 220 self.ssh_server = cfg['ssh']
220 221 assert int(ports) == c.HubFactory.regport, "regport mismatch"
221 222
222 223 def init_hub(self):
223 224 # This is the working dir by now.
224 225 sys.path.insert(0, '')
225 226 c = self.config
226 227
227 228 self.do_import_statements()
228 229 reusing = self.reuse_files
229 230 if reusing:
230 231 try:
231 232 self.load_config_from_json()
232 233 except (AssertionError,IOError):
233 234 reusing=False
234 235 # check again, because reusing may have failed:
235 236 if reusing:
236 237 pass
237 238 elif self.secure:
238 239 key = str(uuid.uuid4())
239 240 # keyfile = os.path.join(self.cluster_dir.security_dir, self.exec_key)
240 241 # with open(keyfile, 'w') as f:
241 242 # f.write(key)
242 243 # os.chmod(keyfile, stat.S_IRUSR|stat.S_IWUSR)
243 244 c.StreamSession.key = key
244 245 else:
245 246 key = c.StreamSession.key = ''
246 247
247 248 try:
248 249 self.factory = HubFactory(config=c, log=self.log)
249 250 # self.start_logging()
250 251 self.factory.init_hub()
251 252 except:
252 253 self.log.error("Couldn't construct the Controller", exc_info=True)
253 254 self.exit(1)
254 255
255 256 if not reusing:
256 257 # save to new json config files
257 258 f = self.factory
258 259 cdict = {'exec_key' : key,
259 260 'ssh' : self.ssh_server,
260 261 'url' : "%s://%s:%s"%(f.client_transport, f.client_ip, f.regport),
261 262 'location' : self.location
262 263 }
263 264 self.save_connection_dict('ipcontroller-client.json', cdict)
264 265 edict = cdict
265 266 edict['url']="%s://%s:%s"%((f.client_transport, f.client_ip, f.regport))
266 267 self.save_connection_dict('ipcontroller-engine.json', edict)
267 268
268 269 #
269 270 def init_schedulers(self):
270 271 children = self.children
271 272 mq = import_item(self.mq_class)
272 273
273 274 hub = self.factory
274 275 # maybe_inproc = 'inproc://monitor' if self.usethreads else self.monitor_url
275 276 # IOPub relay (in a Process)
276 277 q = mq(zmq.PUB, zmq.SUB, zmq.PUB, 'N/A','iopub')
277 278 q.bind_in(hub.client_info['iopub'])
278 279 q.bind_out(hub.engine_info['iopub'])
279 280 q.setsockopt_out(zmq.SUBSCRIBE, '')
280 281 q.connect_mon(hub.monitor_url)
281 282 q.daemon=True
282 283 children.append(q)
283 284
284 285 # Multiplexer Queue (in a Process)
285 286 q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'in', 'out')
286 287 q.bind_in(hub.client_info['mux'])
287 288 q.setsockopt_in(zmq.IDENTITY, 'mux')
288 289 q.bind_out(hub.engine_info['mux'])
289 290 q.connect_mon(hub.monitor_url)
290 291 q.daemon=True
291 292 children.append(q)
292 293
293 294 # Control Queue (in a Process)
294 295 q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'incontrol', 'outcontrol')
295 296 q.bind_in(hub.client_info['control'])
296 297 q.setsockopt_in(zmq.IDENTITY, 'control')
297 298 q.bind_out(hub.engine_info['control'])
298 299 q.connect_mon(hub.monitor_url)
299 300 q.daemon=True
300 301 children.append(q)
301 302 try:
302 303 scheme = self.config.TaskScheduler.scheme_name
303 304 except AttributeError:
304 305 scheme = TaskScheduler.scheme_name.get_default_value()
305 306 # Task Queue (in a Process)
306 307 if scheme == 'pure':
307 308 self.log.warn("task::using pure XREQ Task scheduler")
308 309 q = mq(zmq.XREP, zmq.XREQ, zmq.PUB, 'intask', 'outtask')
309 310 # q.setsockopt_out(zmq.HWM, hub.hwm)
310 311 q.bind_in(hub.client_info['task'][1])
311 312 q.setsockopt_in(zmq.IDENTITY, 'task')
312 313 q.bind_out(hub.engine_info['task'])
313 314 q.connect_mon(hub.monitor_url)
314 315 q.daemon=True
315 316 children.append(q)
316 317 elif scheme == 'none':
317 318 self.log.warn("task::using no Task scheduler")
318 319
319 320 else:
320 321 self.log.info("task::using Python %s Task scheduler"%scheme)
321 322 sargs = (hub.client_info['task'][1], hub.engine_info['task'],
322 323 hub.monitor_url, hub.client_info['notification'])
323 324 kwargs = dict(logname=self.log.name, loglevel=self.log_level,
324 325 config=dict(self.config))
325 326 q = Process(target=launch_scheduler, args=sargs, kwargs=kwargs)
326 327 q.daemon=True
327 328 children.append(q)
328 329
329 330
330 331 def save_urls(self):
331 332 """save the registration urls to files."""
332 333 c = self.config
333 334
334 335 sec_dir = self.cluster_dir.security_dir
335 336 cf = self.factory
336 337
337 338 with open(os.path.join(sec_dir, 'ipcontroller-engine.url'), 'w') as f:
338 339 f.write("%s://%s:%s"%(cf.engine_transport, cf.engine_ip, cf.regport))
339 340
340 341 with open(os.path.join(sec_dir, 'ipcontroller-client.url'), 'w') as f:
341 342 f.write("%s://%s:%s"%(cf.client_transport, cf.client_ip, cf.regport))
342 343
343 344
344 345 def do_import_statements(self):
345 346 statements = self.import_statements
346 347 for s in statements:
347 348 try:
348 349 self.log.msg("Executing statement: '%s'" % s)
349 350 exec s in globals(), locals()
350 351 except:
351 352 self.log.msg("Error running statement: %s" % s)
352 353
353 354 # def start_logging(self):
354 355 # super(IPControllerApp, self).start_logging()
355 356 # if self.config.Global.log_url:
356 357 # context = self.factory.context
357 358 # lsock = context.socket(zmq.PUB)
358 359 # lsock.connect(self.config.Global.log_url)
359 360 # handler = PUBHandler(lsock)
360 361 # handler.root_topic = 'controller'
361 362 # handler.setLevel(self.log_level)
362 363 # self.log.addHandler(handler)
363 364 # #
364 365
365 366 def initialize(self, argv=None):
366 367 super(IPControllerApp, self).initialize(argv)
367 368 self.init_hub()
368 369 self.init_schedulers()
369 370
370 371 def start(self):
371 372 # Start the subprocesses:
372 373 self.factory.start()
373 374 child_procs = []
374 375 for child in self.children:
375 376 child.start()
376 377 if isinstance(child, ProcessMonitoredQueue):
377 378 child_procs.append(child.launcher)
378 379 elif isinstance(child, Process):
379 380 child_procs.append(child)
380 381 if child_procs:
381 382 signal_children(child_procs)
382 383
383 384 self.write_pid_file(overwrite=True)
384 385
385 386 try:
386 387 self.factory.loop.start()
387 388 except KeyboardInterrupt:
388 389 self.log.critical("Interrupted, Exiting...\n")
389 390
390 391
391 392
392 393 def launch_new_instance():
393 394 """Create and run the IPython controller"""
394 395 app = IPControllerApp()
395 396 app.initialize()
396 397 app.start()
397 398
398 399
399 400 if __name__ == '__main__':
400 401 launch_new_instance()
@@ -1,290 +1,289 b''
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3 """
4 4 The IPython engine application
5 5 """
6 6
7 7 #-----------------------------------------------------------------------------
8 8 # Copyright (C) 2008-2009 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING, distributed as part of this software.
12 12 #-----------------------------------------------------------------------------
13 13
14 14 #-----------------------------------------------------------------------------
15 15 # Imports
16 16 #-----------------------------------------------------------------------------
17 17
18 18 import json
19 19 import os
20 20 import sys
21 21
22 22 import zmq
23 23 from zmq.eventloop import ioloop
24 24
25 25 from IPython.parallel.apps.clusterdir import (
26 26 ClusterApplication,
27 27 ClusterDir,
28 28 base_aliases,
29 29 # ClusterDirConfigLoader
30 30 )
31 31 from IPython.zmq.log import EnginePUBHandler
32 32
33 33 from IPython.config.configurable import Configurable
34 34 from IPython.parallel.streamsession import StreamSession
35 35 from IPython.parallel.engine.engine import EngineFactory
36 36 from IPython.parallel.engine.streamkernel import Kernel
37 37 from IPython.parallel.util import disambiguate_url
38 38
39 39 from IPython.utils.importstring import import_item
40 40 from IPython.utils.traitlets import Str, Bool, Unicode, Dict, List, CStr
41 41
42 42
43 43 #-----------------------------------------------------------------------------
44 44 # Module level variables
45 45 #-----------------------------------------------------------------------------
46 46
47 47 #: The default config file name for this application
48 48 default_config_file_name = u'ipengine_config.py'
49 49
50 50 _description = """Start an IPython engine for parallel computing.\n\n
51 51
52 52 IPython engines run in parallel and perform computations on behalf of a client
53 53 and controller. A controller needs to be started before the engines. The
54 54 engine can be configured using command line options or using a cluster
55 55 directory. Cluster directories contain config, log and security files and are
56 56 usually located in your ipython directory and named as "cluster_<profile>".
57 57 See the `profile` and `cluster_dir` options for details.
58 58 """
59 59
60 60
61 61 #-----------------------------------------------------------------------------
62 62 # MPI configuration
63 63 #-----------------------------------------------------------------------------
64 64
65 65 mpi4py_init = """from mpi4py import MPI as mpi
66 66 mpi.size = mpi.COMM_WORLD.Get_size()
67 67 mpi.rank = mpi.COMM_WORLD.Get_rank()
68 68 """
69 69
70 70
71 71 pytrilinos_init = """from PyTrilinos import Epetra
72 72 class SimpleStruct:
73 73 pass
74 74 mpi = SimpleStruct()
75 75 mpi.rank = 0
76 76 mpi.size = 0
77 77 """
78 78
79 79 class MPI(Configurable):
80 80 """Configurable for MPI initialization"""
81 81 use = Str('', config=True,
82 82 help='How to enable MPI (mpi4py, pytrilinos, or empty string to disable).'
83 83 )
84 84
85 85 def _on_use_changed(self, old, new):
86 86 # load default init script if it's not set
87 87 if not self.init_script:
88 88 self.init_script = self.default_inits.get(new, '')
89 89
90 90 init_script = Str('', config=True,
91 91 help="Initialization code for MPI")
92 92
93 93 default_inits = Dict({'mpi4py' : mpi4py_init, 'pytrilinos':pytrilinos_init},
94 94 config=True)
95 95
96 96
97 97 #-----------------------------------------------------------------------------
98 98 # Main application
99 99 #-----------------------------------------------------------------------------
100 100
101 101
102 102 class IPEngineApp(ClusterApplication):
103 103
104 104 app_name = Unicode(u'ipengine')
105 105 description = Unicode(_description)
106 106 default_config_file_name = default_config_file_name
107 107 classes = List([ClusterDir, StreamSession, EngineFactory, Kernel, MPI])
108 108
109 auto_create_cluster_dir = Bool(False, config=True,
109 auto_create_cluster_dir = Bool(False,
110 110 help="whether to create the cluster_dir if it doesn't exist")
111 111
112 112 startup_script = Unicode(u'', config=True,
113 113 help='specify a script to be run at startup')
114 114 startup_command = Str('', config=True,
115 115 help='specify a command to be run at startup')
116 116
117 117 url_file = Unicode(u'', config=True,
118 118 help="""The full location of the file containing the connection information for
119 119 the controller. If this is not given, the file must be in the
120 120 security directory of the cluster directory. This location is
121 121 resolved using the `profile` or `cluster_dir` options.""",
122 122 )
123 123
124 124 url_file_name = Unicode(u'ipcontroller-engine.json')
125 125
126 126 aliases = Dict(dict(
127 127 config = 'IPEngineApp.config_file',
128 128 file = 'IPEngineApp.url_file',
129 129 c = 'IPEngineApp.startup_command',
130 130 s = 'IPEngineApp.startup_script',
131 131
132 132 ident = 'StreamSession.session',
133 133 user = 'StreamSession.username',
134 134 exec_key = 'StreamSession.keyfile',
135 135
136 136 url = 'EngineFactory.url',
137 137 ip = 'EngineFactory.ip',
138 138 transport = 'EngineFactory.transport',
139 139 port = 'EngineFactory.regport',
140 140 location = 'EngineFactory.location',
141 141
142 142 timeout = 'EngineFactory.timeout',
143 143
144 144 profile = "ClusterDir.profile",
145 145 cluster_dir = 'ClusterDir.location',
146 146
147 147 mpi = 'MPI.use',
148 148
149 149 log_level = 'IPEngineApp.log_level',
150 150 ))
151 151
152 152 # def find_key_file(self):
153 153 # """Set the key file.
154 154 #
155 155 # Here we don't try to actually see if it exists for is valid as that
156 156 # is hadled by the connection logic.
157 157 # """
158 158 # config = self.master_config
159 159 # # Find the actual controller key file
160 160 # if not config.Global.key_file:
161 161 # try_this = os.path.join(
162 162 # config.Global.cluster_dir,
163 163 # config.Global.security_dir,
164 164 # config.Global.key_file_name
165 165 # )
166 166 # config.Global.key_file = try_this
167 167
168 168 def find_url_file(self):
169 169 """Set the key file.
170 170
171 171 Here we don't try to actually see if it exists for is valid as that
172 172 is hadled by the connection logic.
173 173 """
174 174 config = self.config
175 175 # Find the actual controller key file
176 176 if not self.url_file:
177 177 self.url_file = os.path.join(
178 178 self.cluster_dir.security_dir,
179 179 self.url_file_name
180 180 )
181
182 181 def init_engine(self):
183 182 # This is the working dir by now.
184 183 sys.path.insert(0, '')
185 184 config = self.config
186 185 # print config
187 186 self.find_url_file()
188 187
189 188 # if os.path.exists(config.Global.key_file) and config.Global.secure:
190 189 # config.SessionFactory.exec_key = config.Global.key_file
191 190 if os.path.exists(self.url_file):
192 191 with open(self.url_file) as f:
193 192 d = json.loads(f.read())
194 193 for k,v in d.iteritems():
195 194 if isinstance(v, unicode):
196 195 d[k] = v.encode()
197 196 if d['exec_key']:
198 197 config.StreamSession.key = d['exec_key']
199 198 d['url'] = disambiguate_url(d['url'], d['location'])
200 199 config.EngineFactory.url = d['url']
201 200 config.EngineFactory.location = d['location']
202 201
203 202 try:
204 203 exec_lines = config.Kernel.exec_lines
205 204 except AttributeError:
206 205 config.Kernel.exec_lines = []
207 206 exec_lines = config.Kernel.exec_lines
208 207
209 208 if self.startup_script:
210 209 enc = sys.getfilesystemencoding() or 'utf8'
211 210 cmd="execfile(%r)"%self.startup_script.encode(enc)
212 211 exec_lines.append(cmd)
213 212 if self.startup_command:
214 213 exec_lines.append(self.startup_command)
215 214
216 215 # Create the underlying shell class and Engine
217 216 # shell_class = import_item(self.master_config.Global.shell_class)
218 217 # print self.config
219 218 try:
220 219 self.engine = EngineFactory(config=config, log=self.log)
221 220 except:
222 221 self.log.error("Couldn't start the Engine", exc_info=True)
223 222 self.exit(1)
224 223
225 224 # self.start_logging()
226 225
227 226 # Create the service hierarchy
228 227 # self.main_service = service.MultiService()
229 228 # self.engine_service.setServiceParent(self.main_service)
230 229 # self.tub_service = Tub()
231 230 # self.tub_service.setServiceParent(self.main_service)
232 231 # # This needs to be called before the connection is initiated
233 232 # self.main_service.startService()
234 233
235 234 # This initiates the connection to the controller and calls
236 235 # register_engine to tell the controller we are ready to do work
237 236 # self.engine_connector = EngineConnector(self.tub_service)
238 237
239 238 # self.log.info("Using furl file: %s" % self.master_config.Global.furl_file)
240 239
241 240 # reactor.callWhenRunning(self.call_connect)
242 241
243 242 # def start_logging(self):
244 243 # super(IPEngineApp, self).start_logging()
245 244 # if self.master_config.Global.log_url:
246 245 # context = self.engine.context
247 246 # lsock = context.socket(zmq.PUB)
248 247 # lsock.connect(self.master_config.Global.log_url)
249 248 # handler = EnginePUBHandler(self.engine, lsock)
250 249 # handler.setLevel(self.log_level)
251 250 # self.log.addHandler(handler)
252 251 #
253 252 def init_mpi(self):
254 253 global mpi
255 254 self.mpi = MPI(config=self.config)
256 255
257 256 mpi_import_statement = self.mpi.init_script
258 257 if mpi_import_statement:
259 258 try:
260 259 self.log.info("Initializing MPI:")
261 260 self.log.info(mpi_import_statement)
262 261 exec mpi_import_statement in globals()
263 262 except:
264 263 mpi = None
265 264 else:
266 265 mpi = None
267 266
268 267 def initialize(self, argv=None):
269 268 super(IPEngineApp, self).initialize(argv)
270 269 self.init_mpi()
271 270 self.init_engine()
272 271
273 272 def start(self):
274 273 self.engine.start()
275 274 try:
276 275 self.engine.loop.start()
277 276 except KeyboardInterrupt:
278 277 self.log.critical("Engine Interrupted, shutting down...\n")
279 278
280 279
281 280 def launch_new_instance():
282 281 """Create and run the IPython engine"""
283 282 app = IPEngineApp()
284 283 app.initialize()
285 284 app.start()
286 285
287 286
288 287 if __name__ == '__main__':
289 288 launch_new_instance()
290 289
General Comments 0
You need to be logged in to leave comments. Login now