##// END OF EJS Templates
use BaseIPythonApp.load_config, not Application.load_config
MinRK -
Show More
@@ -1,544 +1,540 b''
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3 """
4 4 The IPython cluster directory
5 5 """
6 6
7 7 #-----------------------------------------------------------------------------
8 8 # Copyright (C) 2008-2009 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING, distributed as part of this software.
12 12 #-----------------------------------------------------------------------------
13 13
14 14 #-----------------------------------------------------------------------------
15 15 # Imports
16 16 #-----------------------------------------------------------------------------
17 17
18 18 from __future__ import with_statement
19 19
20 20 import os
21 21 import logging
22 22 import re
23 23 import shutil
24 24 import sys
25 25
26 26 from subprocess import Popen, PIPE
27 27
28 28 from IPython.config.loader import PyFileConfigLoader, Config
29 29 from IPython.config.configurable import Configurable
30 30 from IPython.config.application import Application
31 31 from IPython.core.crashhandler import CrashHandler
32 32 from IPython.core.newapplication import BaseIPythonApplication
33 33 from IPython.core import release
34 34 from IPython.utils.path import (
35 35 get_ipython_package_dir,
36 36 get_ipython_dir,
37 37 expand_path
38 38 )
39 from IPython.utils.traitlets import Unicode, Bool, Instance, Dict
39 from IPython.utils.traitlets import Unicode, Bool, Instance, Dict, List
40 40
41 41 #-----------------------------------------------------------------------------
42 42 # Module errors
43 43 #-----------------------------------------------------------------------------
44 44
45 45 class ClusterDirError(Exception):
46 46 pass
47 47
48 48
49 49 class PIDFileError(Exception):
50 50 pass
51 51
52 52
53 53 #-----------------------------------------------------------------------------
54 54 # Class for managing cluster directories
55 55 #-----------------------------------------------------------------------------
56 56
57 57 class ClusterDir(Configurable):
58 58 """An object to manage the cluster directory and its resources.
59 59
60 60 The cluster directory is used by :command:`ipengine`,
61 61 :command:`ipcontroller` and :command:`ipclsuter` to manage the
62 62 configuration, logging and security of these applications.
63 63
64 64 This object knows how to find, create and manage these directories. This
65 65 should be used by any code that want's to handle cluster directories.
66 66 """
67 67
68 68 security_dir_name = Unicode('security')
69 69 log_dir_name = Unicode('log')
70 70 pid_dir_name = Unicode('pid')
71 71 security_dir = Unicode(u'')
72 72 log_dir = Unicode(u'')
73 73 pid_dir = Unicode(u'')
74 74
75 75 auto_create = Bool(False,
76 76 help="""Whether to automatically create the ClusterDirectory if it does
77 77 not exist""")
78 78 overwrite = Bool(False,
79 79 help="""Whether to overwrite existing config files""")
80 80 location = Unicode(u'', config=True,
81 81 help="""Set the cluster dir. This overrides the logic used by the
82 82 `profile` option.""",
83 83 )
84 84 profile = Unicode(u'default', config=True,
85 85 help="""The string name of the profile to be used. This determines the name
86 86 of the cluster dir as: cluster_<profile>. The default profile is named
87 87 'default'. The cluster directory is resolve this way if the
88 88 `cluster_dir` option is not used."""
89 89 )
90 90
91 91 _location_isset = Bool(False) # flag for detecting multiply set location
92 92 _new_dir = Bool(False) # flag for whether a new dir was created
93 93
94 94 def __init__(self, **kwargs):
95 95 # make sure auto_create,overwrite are set *before* location
96 96 for name in ('auto_create', 'overwrite'):
97 97 v = kwargs.pop(name, None)
98 98 if v is not None:
99 99 setattr(self, name, v)
100 100 super(ClusterDir, self).__init__(**kwargs)
101 101 if not self.location:
102 102 self._profile_changed('profile', 'default', self.profile)
103 103
104 104 def _location_changed(self, name, old, new):
105 105 if self._location_isset:
106 106 raise RuntimeError("Cannot set ClusterDir more than once.")
107 107 self._location_isset = True
108 108 if not os.path.isdir(new):
109 109 if self.auto_create:# or self.config.ClusterDir.auto_create:
110 110 os.makedirs(new)
111 111 self._new_dir = True
112 112 else:
113 113 raise ClusterDirError('Directory not found: %s' % new)
114 114
115 115 # ensure config files exist:
116 116 self.copy_all_config_files(overwrite=self.overwrite)
117 117 self.security_dir = os.path.join(new, self.security_dir_name)
118 118 self.log_dir = os.path.join(new, self.log_dir_name)
119 119 self.pid_dir = os.path.join(new, self.pid_dir_name)
120 120 self.check_dirs()
121 121
122 122 def _profile_changed(self, name, old, new):
123 123 if self._location_isset:
124 124 raise RuntimeError("ClusterDir already set. Cannot set by profile.")
125 125 self.location = os.path.join(get_ipython_dir(), 'cluster_'+new)
126 126
127 127 def _log_dir_changed(self, name, old, new):
128 128 self.check_log_dir()
129 129
130 130 def check_log_dir(self):
131 131 if not os.path.isdir(self.log_dir):
132 132 os.mkdir(self.log_dir)
133 133
134 134 def _security_dir_changed(self, name, old, new):
135 135 self.check_security_dir()
136 136
137 137 def check_security_dir(self):
138 138 if not os.path.isdir(self.security_dir):
139 139 os.mkdir(self.security_dir, 0700)
140 140 os.chmod(self.security_dir, 0700)
141 141
142 142 def _pid_dir_changed(self, name, old, new):
143 143 self.check_pid_dir()
144 144
145 145 def check_pid_dir(self):
146 146 if not os.path.isdir(self.pid_dir):
147 147 os.mkdir(self.pid_dir, 0700)
148 148 os.chmod(self.pid_dir, 0700)
149 149
150 150 def check_dirs(self):
151 151 self.check_security_dir()
152 152 self.check_log_dir()
153 153 self.check_pid_dir()
154 154
155 155 def copy_config_file(self, config_file, path=None, overwrite=False):
156 156 """Copy a default config file into the active cluster directory.
157 157
158 158 Default configuration files are kept in :mod:`IPython.config.default`.
159 159 This function moves these from that location to the working cluster
160 160 directory.
161 161 """
162 162 if path is None:
163 163 import IPython.config.default
164 164 path = IPython.config.default.__file__.split(os.path.sep)[:-1]
165 165 path = os.path.sep.join(path)
166 166 src = os.path.join(path, config_file)
167 167 dst = os.path.join(self.location, config_file)
168 168 if not os.path.isfile(dst) or overwrite:
169 169 shutil.copy(src, dst)
170 170
171 171 def copy_all_config_files(self, path=None, overwrite=False):
172 172 """Copy all config files into the active cluster directory."""
173 173 for f in [u'ipcontroller_config.py', u'ipengine_config.py',
174 174 u'ipcluster_config.py']:
175 175 self.copy_config_file(f, path=path, overwrite=overwrite)
176 176
177 177 @classmethod
178 178 def create_cluster_dir(csl, cluster_dir):
179 179 """Create a new cluster directory given a full path.
180 180
181 181 Parameters
182 182 ----------
183 183 cluster_dir : str
184 184 The full path to the cluster directory. If it does exist, it will
185 185 be used. If not, it will be created.
186 186 """
187 187 return ClusterDir(location=cluster_dir)
188 188
189 189 @classmethod
190 190 def create_cluster_dir_by_profile(cls, path, profile=u'default'):
191 191 """Create a cluster dir by profile name and path.
192 192
193 193 Parameters
194 194 ----------
195 195 path : str
196 196 The path (directory) to put the cluster directory in.
197 197 profile : str
198 198 The name of the profile. The name of the cluster directory will
199 199 be "cluster_<profile>".
200 200 """
201 201 if not os.path.isdir(path):
202 202 raise ClusterDirError('Directory not found: %s' % path)
203 203 cluster_dir = os.path.join(path, u'cluster_' + profile)
204 204 return ClusterDir(location=cluster_dir)
205 205
206 206 @classmethod
207 207 def find_cluster_dir_by_profile(cls, ipython_dir, profile=u'default'):
208 208 """Find an existing cluster dir by profile name, return its ClusterDir.
209 209
210 210 This searches through a sequence of paths for a cluster dir. If it
211 211 is not found, a :class:`ClusterDirError` exception will be raised.
212 212
213 213 The search path algorithm is:
214 214 1. ``os.getcwd()``
215 215 2. ``ipython_dir``
216 216 3. The directories found in the ":" separated
217 217 :env:`IPCLUSTER_DIR_PATH` environment variable.
218 218
219 219 Parameters
220 220 ----------
221 221 ipython_dir : unicode or str
222 222 The IPython directory to use.
223 223 profile : unicode or str
224 224 The name of the profile. The name of the cluster directory
225 225 will be "cluster_<profile>".
226 226 """
227 227 dirname = u'cluster_' + profile
228 228 cluster_dir_paths = os.environ.get('IPCLUSTER_DIR_PATH','')
229 229 if cluster_dir_paths:
230 230 cluster_dir_paths = cluster_dir_paths.split(':')
231 231 else:
232 232 cluster_dir_paths = []
233 233 paths = [os.getcwd(), ipython_dir] + cluster_dir_paths
234 234 for p in paths:
235 235 cluster_dir = os.path.join(p, dirname)
236 236 if os.path.isdir(cluster_dir):
237 237 return ClusterDir(location=cluster_dir)
238 238 else:
239 239 raise ClusterDirError('Cluster directory not found in paths: %s' % dirname)
240 240
241 241 @classmethod
242 242 def find_cluster_dir(cls, cluster_dir):
243 243 """Find/create a cluster dir and return its ClusterDir.
244 244
245 245 This will create the cluster directory if it doesn't exist.
246 246
247 247 Parameters
248 248 ----------
249 249 cluster_dir : unicode or str
250 250 The path of the cluster directory. This is expanded using
251 251 :func:`IPython.utils.genutils.expand_path`.
252 252 """
253 253 cluster_dir = expand_path(cluster_dir)
254 254 if not os.path.isdir(cluster_dir):
255 255 raise ClusterDirError('Cluster directory not found: %s' % cluster_dir)
256 256 return ClusterDir(location=cluster_dir)
257 257
258 258
259 259 #-----------------------------------------------------------------------------
260 260 # Crash handler for this application
261 261 #-----------------------------------------------------------------------------
262 262
263 263
264 264 _message_template = """\
265 265 Oops, $self.app_name crashed. We do our best to make it stable, but...
266 266
267 267 A crash report was automatically generated with the following information:
268 268 - A verbatim copy of the crash traceback.
269 269 - Data on your current $self.app_name configuration.
270 270
271 271 It was left in the file named:
272 272 \t'$self.crash_report_fname'
273 273 If you can email this file to the developers, the information in it will help
274 274 them in understanding and correcting the problem.
275 275
276 276 You can mail it to: $self.contact_name at $self.contact_email
277 277 with the subject '$self.app_name Crash Report'.
278 278
279 279 If you want to do it now, the following command will work (under Unix):
280 280 mail -s '$self.app_name Crash Report' $self.contact_email < $self.crash_report_fname
281 281
282 282 To ensure accurate tracking of this issue, please file a report about it at:
283 283 $self.bug_tracker
284 284 """
285 285
286 286 class ClusterDirCrashHandler(CrashHandler):
287 287 """sys.excepthook for IPython itself, leaves a detailed report on disk."""
288 288
289 289 message_template = _message_template
290 290
291 291 def __init__(self, app):
292 292 contact_name = release.authors['Min'][0]
293 293 contact_email = release.authors['Min'][1]
294 294 bug_tracker = 'http://github.com/ipython/ipython/issues'
295 295 super(ClusterDirCrashHandler,self).__init__(
296 296 app, contact_name, contact_email, bug_tracker
297 297 )
298 298
299 299
300 300 #-----------------------------------------------------------------------------
301 301 # Main application
302 302 #-----------------------------------------------------------------------------
303 303 base_aliases = {
304 304 'profile' : "ClusterDir.profile",
305 305 'cluster_dir' : 'ClusterDir.location',
306 'auto_create' : 'ClusterDirApplication.auto_create',
307 306 'log_level' : 'ClusterApplication.log_level',
308 307 'work_dir' : 'ClusterApplication.work_dir',
309 308 'log_to_file' : 'ClusterApplication.log_to_file',
310 309 'clean_logs' : 'ClusterApplication.clean_logs',
311 310 'log_url' : 'ClusterApplication.log_url',
311 'config' : 'ClusterApplication.config_file',
312 312 }
313 313
314 314 base_flags = {
315 315 'debug' : ( {"ClusterApplication" : {"log_level" : logging.DEBUG}}, "set loglevel to DEBUG"),
316 316 'quiet' : ( {"ClusterApplication" : {"log_level" : logging.CRITICAL}}, "set loglevel to CRITICAL (minimal output)"),
317 317 'log-to-file' : ( {"ClusterApplication" : {"log_to_file" : True}}, "redirect log output to a file"),
318 318 }
319 319 for k,v in base_flags.iteritems():
320 320 base_flags[k] = (Config(v[0]),v[1])
321 321
322 322 class ClusterApplication(BaseIPythonApplication):
323 323 """An application that puts everything into a cluster directory.
324 324
325 325 Instead of looking for things in the ipython_dir, this type of application
326 326 will use its own private directory called the "cluster directory"
327 327 for things like config files, log files, etc.
328 328
329 329 The cluster directory is resolved as follows:
330 330
331 331 * If the ``cluster_dir`` option is given, it is used.
332 * If ``cluster_dir`` is not given, the application directory is
332 * If ``cluster_dir`` is not given, the application directory is
333 333 resolve using the profile name as ``cluster_<profile>``. The search
334 334 path for this directory is then i) cwd if it is found there
335 335 and ii) in ipython_dir otherwise.
336 336
337 337 The config file for the application is to be put in the cluster
338 338 dir and named the value of the ``config_file_name`` class attribute.
339 339 """
340 340
341 341 crash_handler_class = ClusterDirCrashHandler
342 342 auto_create_cluster_dir = Bool(True, config=True,
343 343 help="whether to create the cluster_dir if it doesn't exist")
344 344 cluster_dir = Instance(ClusterDir)
345 345 classes = [ClusterDir]
346 346
347 347 def _log_level_default(self):
348 348 # temporarily override default_log_level to INFO
349 349 return logging.INFO
350 350
351 351 work_dir = Unicode(os.getcwdu(), config=True,
352 352 help='Set the working dir for the process.'
353 353 )
354 354 def _work_dir_changed(self, name, old, new):
355 355 self.work_dir = unicode(expand_path(new))
356 356
357 357 log_to_file = Bool(config=True,
358 358 help="whether to log to a file")
359 359
360 clean_logs = Bool(False, shortname='--clean-logs', config=True,
360 clean_logs = Bool(False, config=True,
361 361 help="whether to cleanup old logfiles before starting")
362 362
363 log_url = Unicode('', shortname='--log-url', config=True,
363 log_url = Unicode('', config=True,
364 364 help="The ZMQ URL of the iplogger to aggregate logging.")
365 365
366 366 config_file = Unicode(u'', config=True,
367 help="""Path to ipcontroller configuration file. The default is to use
367 help="""Path to ip<appname> configuration file. The default is to use
368 368 <appname>_config.py, as found by cluster-dir."""
369 369 )
370 def _config_file_paths_default(self):
371 # don't include profile dir
372 return [ os.getcwdu(), self.ipython_dir ]
373
374 def _config_file_changed(self, name, old, new):
375 if os.pathsep in new:
376 path, new = new.rsplit(os.pathsep)
377 self.config_file_paths.insert(0, path)
378 self.config_file_name = new
379
380 config_file_name = Unicode('')
370 381
371 382 loop = Instance('zmq.eventloop.ioloop.IOLoop')
372 383 def _loop_default(self):
373 384 from zmq.eventloop.ioloop import IOLoop
374 385 return IOLoop.instance()
375 386
376 387 aliases = Dict(base_aliases)
377 388 flags = Dict(base_flags)
378 389
379 390 def init_clusterdir(self):
380 391 """This resolves the cluster directory.
381 392
382 393 This tries to find the cluster directory and if successful, it will
383 394 have done:
384 395 * Sets ``self.cluster_dir_obj`` to the :class:`ClusterDir` object for
385 396 the application.
386 397 * Sets ``self.cluster_dir`` attribute of the application and config
387 398 objects.
388 399
389 400 The algorithm used for this is as follows:
390 401 1. Try ``Global.cluster_dir``.
391 402 2. Try using ``Global.profile``.
392 403 3. If both of these fail and ``self.auto_create_cluster_dir`` is
393 404 ``True``, then create the new cluster dir in the IPython directory.
394 405 4. If all fails, then raise :class:`ClusterDirError`.
395 406 """
396 407 try:
397 408 self.cluster_dir = ClusterDir(auto_create=self.auto_create_cluster_dir, config=self.config)
398 409 except ClusterDirError as e:
399 410 self.log.fatal("Error initializing cluster dir: %s"%e)
400 411 self.log.fatal("A cluster dir must be created before running this command.")
401 412 self.log.fatal("Do 'ipcluster create -h' or 'ipcluster list -h' for more "
402 413 "information about creating and listing cluster dirs."
403 414 )
404 415 self.exit(1)
405 416
406 417 if self.cluster_dir._new_dir:
407 418 self.log.info('Creating new cluster dir: %s' % \
408 419 self.cluster_dir.location)
409 420 else:
410 421 self.log.info('Using existing cluster dir: %s' % \
411 422 self.cluster_dir.location)
423
424 # insert after cwd:
425 self.config_file_paths.insert(1, self.cluster_dir.location)
412 426
413 427 def initialize(self, argv=None):
414 428 """initialize the app"""
415 429 self.init_crash_handler()
416 430 self.parse_command_line(argv)
417 431 cl_config = self.config
418 432 self.init_clusterdir()
419 if self.config_file:
420 self.load_config_file(self.config_file)
421 elif self.default_config_file_name:
422 try:
423 self.load_config_file(self.default_config_file_name,
424 path=self.cluster_dir.location)
425 except IOError:
426 self.log.warn("Warning: Default config file not found")
433 self.load_config_file()
427 434 # command-line should *override* config file, but command-line is necessary
428 435 # to determine clusterdir, etc.
429 436 self.update_config(cl_config)
430 437 self.to_work_dir()
431 438 self.reinit_logging()
432 439
433 440 def to_work_dir(self):
434 441 wd = self.work_dir
435 442 if unicode(wd) != os.getcwdu():
436 443 os.chdir(wd)
437 444 self.log.info("Changing to working dir: %s" % wd)
438 445 # This is the working dir by now.
439 446 sys.path.insert(0, '')
440 447
441 def load_config_file(self, filename, path=None):
442 """Load a .py based config file by filename and path."""
443 # use config.application.Application.load_config
444 # instead of inflexible core.newapplication.BaseIPythonApplication.load_config
445 return Application.load_config_file(self, filename, path=path)
446 #
447 # def load_default_config_file(self):
448 # """Load a .py based config file by filename and path."""
449 # return BaseIPythonApplication.load_config_file(self)
450
451 # disable URL-logging
452 448 def reinit_logging(self):
453 449 # Remove old log files
454 450 log_dir = self.cluster_dir.log_dir
455 451 if self.clean_logs:
456 452 for f in os.listdir(log_dir):
457 453 if re.match(r'%s-\d+\.(log|err|out)'%self.name,f):
458 454 os.remove(os.path.join(log_dir, f))
459 455 if self.log_to_file:
460 456 # Start logging to the new log file
461 457 log_filename = self.name + u'-' + str(os.getpid()) + u'.log'
462 458 logfile = os.path.join(log_dir, log_filename)
463 459 open_log_file = open(logfile, 'w')
464 460 else:
465 461 open_log_file = None
466 462 if open_log_file is not None:
467 463 self.log.removeHandler(self._log_handler)
468 464 self._log_handler = logging.StreamHandler(open_log_file)
469 465 self._log_formatter = logging.Formatter("[%(name)s] %(message)s")
470 466 self._log_handler.setFormatter(self._log_formatter)
471 467 self.log.addHandler(self._log_handler)
472 468
473 469 def write_pid_file(self, overwrite=False):
474 470 """Create a .pid file in the pid_dir with my pid.
475 471
476 472 This must be called after pre_construct, which sets `self.pid_dir`.
477 473 This raises :exc:`PIDFileError` if the pid file exists already.
478 474 """
479 475 pid_file = os.path.join(self.cluster_dir.pid_dir, self.name + u'.pid')
480 476 if os.path.isfile(pid_file):
481 477 pid = self.get_pid_from_file()
482 478 if not overwrite:
483 479 raise PIDFileError(
484 480 'The pid file [%s] already exists. \nThis could mean that this '
485 481 'server is already running with [pid=%s].' % (pid_file, pid)
486 482 )
487 483 with open(pid_file, 'w') as f:
488 484 self.log.info("Creating pid file: %s" % pid_file)
489 485 f.write(repr(os.getpid())+'\n')
490 486
491 487 def remove_pid_file(self):
492 488 """Remove the pid file.
493 489
494 490 This should be called at shutdown by registering a callback with
495 491 :func:`reactor.addSystemEventTrigger`. This needs to return
496 492 ``None``.
497 493 """
498 494 pid_file = os.path.join(self.cluster_dir.pid_dir, self.name + u'.pid')
499 495 if os.path.isfile(pid_file):
500 496 try:
501 497 self.log.info("Removing pid file: %s" % pid_file)
502 498 os.remove(pid_file)
503 499 except:
504 500 self.log.warn("Error removing the pid file: %s" % pid_file)
505 501
506 502 def get_pid_from_file(self):
507 503 """Get the pid from the pid file.
508 504
509 505 If the pid file doesn't exist a :exc:`PIDFileError` is raised.
510 506 """
511 507 pid_file = os.path.join(self.cluster_dir.pid_dir, self.name + u'.pid')
512 508 if os.path.isfile(pid_file):
513 509 with open(pid_file, 'r') as f:
514 510 pid = int(f.read().strip())
515 511 return pid
516 512 else:
517 513 raise PIDFileError('pid file not found: %s' % pid_file)
518 514
519 515 def check_pid(self, pid):
520 516 if os.name == 'nt':
521 517 try:
522 518 import ctypes
523 519 # returns 0 if no such process (of ours) exists
524 520 # positive int otherwise
525 521 p = ctypes.windll.kernel32.OpenProcess(1,0,pid)
526 522 except Exception:
527 523 self.log.warn(
528 524 "Could not determine whether pid %i is running via `OpenProcess`. "
529 525 " Making the likely assumption that it is."%pid
530 526 )
531 527 return True
532 528 return bool(p)
533 529 else:
534 530 try:
535 531 p = Popen(['ps','x'], stdout=PIPE, stderr=PIPE)
536 532 output,_ = p.communicate()
537 533 except OSError:
538 534 self.log.warn(
539 535 "Could not determine whether pid %i is running via `ps x`. "
540 536 " Making the likely assumption that it is."%pid
541 537 )
542 538 return True
543 539 pids = map(int, re.findall(r'^\W*\d+', output, re.MULTILINE))
544 540 return pid in pids
@@ -1,542 +1,540 b''
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3 """
4 4 The ipcluster application.
5 5 """
6 6
7 7 #-----------------------------------------------------------------------------
8 8 # Copyright (C) 2008-2009 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING, distributed as part of this software.
12 12 #-----------------------------------------------------------------------------
13 13
14 14 #-----------------------------------------------------------------------------
15 15 # Imports
16 16 #-----------------------------------------------------------------------------
17 17
18 18 import errno
19 19 import logging
20 20 import os
21 21 import re
22 22 import signal
23 23
24 24 from subprocess import check_call, CalledProcessError, PIPE
25 25 import zmq
26 26 from zmq.eventloop import ioloop
27 27
28 28 from IPython.config.application import Application, boolean_flag
29 29 from IPython.config.loader import Config
30 30 from IPython.core.newapplication import BaseIPythonApplication
31 31 from IPython.utils.importstring import import_item
32 32 from IPython.utils.traitlets import Int, Unicode, Bool, CFloat, Dict, List
33 33
34 34 from IPython.parallel.apps.clusterdir import (
35 35 ClusterApplication, ClusterDirError, ClusterDir,
36 36 PIDFileError,
37 37 base_flags, base_aliases
38 38 )
39 39
40 40
41 41 #-----------------------------------------------------------------------------
42 42 # Module level variables
43 43 #-----------------------------------------------------------------------------
44 44
45 45
46 46 default_config_file_name = u'ipcluster_config.py'
47 47
48 48
49 49 _description = """Start an IPython cluster for parallel computing.
50 50
51 51 An IPython cluster consists of 1 controller and 1 or more engines.
52 52 This command automates the startup of these processes using a wide
53 53 range of startup methods (SSH, local processes, PBS, mpiexec,
54 54 Windows HPC Server 2008). To start a cluster with 4 engines on your
55 55 local host simply do 'ipcluster start n=4'. For more complex usage
56 56 you will typically do 'ipcluster create profile=mycluster', then edit
57 57 configuration files, followed by 'ipcluster start profile=mycluster n=4'.
58 58 """
59 59
60 60
61 61 # Exit codes for ipcluster
62 62
63 63 # This will be the exit code if the ipcluster appears to be running because
64 64 # a .pid file exists
65 65 ALREADY_STARTED = 10
66 66
67 67
68 68 # This will be the exit code if ipcluster stop is run, but there is not .pid
69 69 # file to be found.
70 70 ALREADY_STOPPED = 11
71 71
72 72 # This will be the exit code if ipcluster engines is run, but there is not .pid
73 73 # file to be found.
74 74 NO_CLUSTER = 12
75 75
76 76
77 77 #-----------------------------------------------------------------------------
78 78 # Main application
79 79 #-----------------------------------------------------------------------------
80 80 start_help = """Start an IPython cluster for parallel computing
81 81
82 82 Start an ipython cluster by its profile name or cluster
83 83 directory. Cluster directories contain configuration, log and
84 84 security related files and are named using the convention
85 85 'cluster_<profile>' and should be creating using the 'start'
86 86 subcommand of 'ipcluster'. If your cluster directory is in
87 87 the cwd or the ipython directory, you can simply refer to it
88 88 using its profile name, 'ipcluster start n=4 profile=<profile>`,
89 89 otherwise use the 'cluster_dir' option.
90 90 """
91 91 stop_help = """Stop a running IPython cluster
92 92
93 93 Stop a running ipython cluster by its profile name or cluster
94 94 directory. Cluster directories are named using the convention
95 95 'cluster_<profile>'. If your cluster directory is in
96 96 the cwd or the ipython directory, you can simply refer to it
97 97 using its profile name, 'ipcluster stop profile=<profile>`, otherwise
98 98 use the 'cluster_dir' option.
99 99 """
100 100 engines_help = """Start engines connected to an existing IPython cluster
101 101
102 102 Start one or more engines to connect to an existing Cluster
103 103 by profile name or cluster directory.
104 104 Cluster directories contain configuration, log and
105 105 security related files and are named using the convention
106 106 'cluster_<profile>' and should be creating using the 'start'
107 107 subcommand of 'ipcluster'. If your cluster directory is in
108 108 the cwd or the ipython directory, you can simply refer to it
109 109 using its profile name, 'ipcluster engines n=4 profile=<profile>`,
110 110 otherwise use the 'cluster_dir' option.
111 111 """
112 112 create_help = """Create an ipcluster profile by name
113 113
114 114 Create an ipython cluster directory by its profile name or
115 115 cluster directory path. Cluster directories contain
116 116 configuration, log and security related files and are named
117 117 using the convention 'cluster_<profile>'. By default they are
118 118 located in your ipython directory. Once created, you will
119 119 probably need to edit the configuration files in the cluster
120 120 directory to configure your cluster. Most users will create a
121 121 cluster directory by profile name,
122 122 `ipcluster create profile=mycluster`, which will put the directory
123 123 in `<ipython_dir>/cluster_mycluster`.
124 124 """
125 125 list_help = """List available cluster profiles
126 126
127 127 List all available clusters, by cluster directory, that can
128 128 be found in the current working directly or in the ipython
129 129 directory. Cluster directories are named using the convention
130 130 'cluster_<profile>'.
131 131 """
132 132
133 133
134 134 class IPClusterList(BaseIPythonApplication):
135 135 name = u'ipcluster-list'
136 136 description = list_help
137 137
138 138 # empty aliases
139 139 aliases=Dict()
140 140 flags = Dict(base_flags)
141 141
142 142 def _log_level_default(self):
143 143 return 20
144 144
145 145 def list_cluster_dirs(self):
146 146 # Find the search paths
147 147 cluster_dir_paths = os.environ.get('IPCLUSTER_DIR_PATH','')
148 148 if cluster_dir_paths:
149 149 cluster_dir_paths = cluster_dir_paths.split(':')
150 150 else:
151 151 cluster_dir_paths = []
152 152
153 153 ipython_dir = self.ipython_dir
154 154
155 155 paths = [os.getcwd(), ipython_dir] + cluster_dir_paths
156 156 paths = list(set(paths))
157 157
158 158 self.log.info('Searching for cluster dirs in paths: %r' % paths)
159 159 for path in paths:
160 160 files = os.listdir(path)
161 161 for f in files:
162 162 full_path = os.path.join(path, f)
163 163 if os.path.isdir(full_path) and f.startswith('cluster_'):
164 164 profile = full_path.split('_')[-1]
165 165 start_cmd = 'ipcluster start profile=%s n=4' % profile
166 166 print start_cmd + " ==> " + full_path
167 167
168 168 def start(self):
169 169 self.list_cluster_dirs()
170 170
171 171 create_flags = {}
172 172 create_flags.update(base_flags)
173 173 create_flags.update(boolean_flag('reset', 'IPClusterCreate.reset',
174 174 "reset config files to defaults", "leave existing config files"))
175 175
176 176 class IPClusterCreate(ClusterApplication):
177 177 name = u'ipcluster'
178 178 description = create_help
179 179 auto_create_cluster_dir = Bool(True,
180 180 help="whether to create the cluster_dir if it doesn't exist")
181 default_config_file_name = default_config_file_name
181 config_file_name = Unicode(default_config_file_name)
182 182
183 183 reset = Bool(False, config=True,
184 184 help="Whether to reset config files as part of 'create'."
185 185 )
186 186
187 187 flags = Dict(create_flags)
188 188
189 189 aliases = Dict(dict(profile='ClusterDir.profile'))
190 190
191 191 classes = [ClusterDir]
192 192
193 193 def init_clusterdir(self):
194 194 super(IPClusterCreate, self).init_clusterdir()
195 195 self.log.info('Copying default config files to cluster directory '
196 196 '[overwrite=%r]' % (self.reset,))
197 197 self.cluster_dir.copy_all_config_files(overwrite=self.reset)
198 198
199 199 def initialize(self, argv=None):
200 200 self.parse_command_line(argv)
201 201 self.init_clusterdir()
202 202
203 203 stop_aliases = dict(
204 204 signal='IPClusterStop.signal',
205 205 profile='ClusterDir.profile',
206 206 cluster_dir='ClusterDir.location',
207 207 )
208 208
209 209 class IPClusterStop(ClusterApplication):
210 210 name = u'ipcluster'
211 211 description = stop_help
212 212 auto_create_cluster_dir = Bool(False)
213 default_config_file_name = default_config_file_name
213 config_file_name = Unicode(default_config_file_name)
214 214
215 215 signal = Int(signal.SIGINT, config=True,
216 216 help="signal to use for stopping processes.")
217 217
218 218 aliases = Dict(stop_aliases)
219 219
220 220 def init_clusterdir(self):
221 221 try:
222 222 super(IPClusterStop, self).init_clusterdir()
223 223 except ClusterDirError as e:
224 224 self.log.fatal("Failed ClusterDir init: %s"%e)
225 225 self.exit(1)
226 226
227 227 def start(self):
228 228 """Start the app for the stop subcommand."""
229 229 try:
230 230 pid = self.get_pid_from_file()
231 231 except PIDFileError:
232 232 self.log.critical(
233 233 'Could not read pid file, cluster is probably not running.'
234 234 )
235 235 # Here I exit with a unusual exit status that other processes
236 236 # can watch for to learn how I existed.
237 237 self.remove_pid_file()
238 238 self.exit(ALREADY_STOPPED)
239 239
240 240 if not self.check_pid(pid):
241 241 self.log.critical(
242 242 'Cluster [pid=%r] is not running.' % pid
243 243 )
244 244 self.remove_pid_file()
245 245 # Here I exit with a unusual exit status that other processes
246 246 # can watch for to learn how I existed.
247 247 self.exit(ALREADY_STOPPED)
248 248
249 249 elif os.name=='posix':
250 250 sig = self.signal
251 251 self.log.info(
252 252 "Stopping cluster [pid=%r] with [signal=%r]" % (pid, sig)
253 253 )
254 254 try:
255 255 os.kill(pid, sig)
256 256 except OSError:
257 257 self.log.error("Stopping cluster failed, assuming already dead.",
258 258 exc_info=True)
259 259 self.remove_pid_file()
260 260 elif os.name=='nt':
261 261 try:
262 262 # kill the whole tree
263 263 p = check_call(['taskkill', '-pid', str(pid), '-t', '-f'], stdout=PIPE,stderr=PIPE)
264 264 except (CalledProcessError, OSError):
265 265 self.log.error("Stopping cluster failed, assuming already dead.",
266 266 exc_info=True)
267 267 self.remove_pid_file()
268 268
269 269 engine_aliases = {}
270 270 engine_aliases.update(base_aliases)
271 271 engine_aliases.update(dict(
272 272 n='IPClusterEngines.n',
273 273 elauncher = 'IPClusterEngines.engine_launcher_class',
274 274 ))
275 275 class IPClusterEngines(ClusterApplication):
276 276
277 277 name = u'ipcluster'
278 278 description = engines_help
279 279 usage = None
280 default_config_file_name = default_config_file_name
280 config_file_name = Unicode(default_config_file_name)
281 281 default_log_level = logging.INFO
282 282 auto_create_cluster_dir = Bool(False)
283 283 classes = List()
284 284 def _classes_default(self):
285 285 from IPython.parallel.apps import launcher
286 286 launchers = launcher.all_launchers
287 287 eslaunchers = [ l for l in launchers if 'EngineSet' in l.__name__]
288 288 return [ClusterDir]+eslaunchers
289 289
290 290 n = Int(2, config=True,
291 291 help="The number of engines to start.")
292 292
293 293 engine_launcher_class = Unicode('LocalEngineSetLauncher',
294 294 config=True,
295 295 help="The class for launching a set of Engines."
296 296 )
297 297 daemonize = Bool(False, config=True,
298 298 help='Daemonize the ipcluster program. This implies --log-to-file')
299 299
300 300 def _daemonize_changed(self, name, old, new):
301 301 if new:
302 302 self.log_to_file = True
303 303
304 304 aliases = Dict(engine_aliases)
305 305 # flags = Dict(flags)
306 306 _stopping = False
307 307
308 308 def initialize(self, argv=None):
309 309 super(IPClusterEngines, self).initialize(argv)
310 310 self.init_signal()
311 311 self.init_launchers()
312 312
313 313 def init_launchers(self):
314 314 self.engine_launcher = self.build_launcher(self.engine_launcher_class)
315 315 self.engine_launcher.on_stop(lambda r: self.loop.stop())
316 316
317 317 def init_signal(self):
318 318 # Setup signals
319 319 signal.signal(signal.SIGINT, self.sigint_handler)
320 320
321 321 def build_launcher(self, clsname):
322 322 """import and instantiate a Launcher based on importstring"""
323 323 if '.' not in clsname:
324 324 # not a module, presume it's the raw name in apps.launcher
325 325 clsname = 'IPython.parallel.apps.launcher.'+clsname
326 326 # print repr(clsname)
327 327 klass = import_item(clsname)
328 328
329 329 launcher = klass(
330 330 work_dir=self.cluster_dir.location, config=self.config, logname=self.log.name
331 331 )
332 332 return launcher
333 333
334 334 def start_engines(self):
335 335 self.log.info("Starting %i engines"%self.n)
336 336 self.engine_launcher.start(
337 337 self.n,
338 338 cluster_dir=self.cluster_dir.location
339 339 )
340 340
341 341 def stop_engines(self):
342 342 self.log.info("Stopping Engines...")
343 343 if self.engine_launcher.running:
344 344 d = self.engine_launcher.stop()
345 345 return d
346 346 else:
347 347 return None
348 348
349 349 def stop_launchers(self, r=None):
350 350 if not self._stopping:
351 351 self._stopping = True
352 352 self.log.error("IPython cluster: stopping")
353 353 self.stop_engines()
354 354 # Wait a few seconds to let things shut down.
355 355 dc = ioloop.DelayedCallback(self.loop.stop, 4000, self.loop)
356 356 dc.start()
357 357
358 358 def sigint_handler(self, signum, frame):
359 359 self.log.debug("SIGINT received, stopping launchers...")
360 360 self.stop_launchers()
361 361
362 362 def start_logging(self):
363 363 # Remove old log files of the controller and engine
364 364 if self.clean_logs:
365 365 log_dir = self.cluster_dir.log_dir
366 366 for f in os.listdir(log_dir):
367 367 if re.match(r'ip(engine|controller)z-\d+\.(log|err|out)',f):
368 368 os.remove(os.path.join(log_dir, f))
369 369 # This will remove old log files for ipcluster itself
370 370 # super(IPClusterApp, self).start_logging()
371 371
372 372 def start(self):
373 373 """Start the app for the engines subcommand."""
374 374 self.log.info("IPython cluster: started")
375 375 # First see if the cluster is already running
376 376
377 377 # Now log and daemonize
378 378 self.log.info(
379 379 'Starting engines with [daemon=%r]' % self.daemonize
380 380 )
381 381 # TODO: Get daemonize working on Windows or as a Windows Server.
382 382 if self.daemonize:
383 383 if os.name=='posix':
384 384 from twisted.scripts._twistd_unix import daemonize
385 385 daemonize()
386 386
387 387 dc = ioloop.DelayedCallback(self.start_engines, 0, self.loop)
388 388 dc.start()
389 389 # Now write the new pid file AFTER our new forked pid is active.
390 390 # self.write_pid_file()
391 391 try:
392 392 self.loop.start()
393 393 except KeyboardInterrupt:
394 394 pass
395 395 except zmq.ZMQError as e:
396 396 if e.errno == errno.EINTR:
397 397 pass
398 398 else:
399 399 raise
400 400
401 401 start_aliases = {}
402 402 start_aliases.update(engine_aliases)
403 403 start_aliases.update(dict(
404 404 delay='IPClusterStart.delay',
405 405 clean_logs='IPClusterStart.clean_logs',
406 406 ))
407 407
408 408 class IPClusterStart(IPClusterEngines):
409 409
410 410 name = u'ipcluster'
411 411 description = start_help
412 usage = None
413 default_config_file_name = default_config_file_name
414 412 default_log_level = logging.INFO
415 413 auto_create_cluster_dir = Bool(True, config=True,
416 414 help="whether to create the cluster_dir if it doesn't exist")
417 415 classes = List()
418 416 def _classes_default(self,):
419 417 from IPython.parallel.apps import launcher
420 418 return [ClusterDir]+launcher.all_launchers
421 419
422 420 clean_logs = Bool(True, config=True,
423 421 help="whether to cleanup old logs before starting")
424 422
425 423 delay = CFloat(1., config=True,
426 424 help="delay (in s) between starting the controller and the engines")
427 425
428 426 controller_launcher_class = Unicode('LocalControllerLauncher',
429 427 config=True,
430 428 help="The class for launching a Controller."
431 429 )
432 430 reset = Bool(False, config=True,
433 431 help="Whether to reset config files as part of '--create'."
434 432 )
435 433
436 434 # flags = Dict(flags)
437 435 aliases = Dict(start_aliases)
438 436
439 437 def init_launchers(self):
440 438 self.controller_launcher = self.build_launcher(self.controller_launcher_class)
441 439 self.engine_launcher = self.build_launcher(self.engine_launcher_class)
442 440 self.controller_launcher.on_stop(self.stop_launchers)
443 441
444 442 def start_controller(self):
445 443 self.controller_launcher.start(
446 444 cluster_dir=self.cluster_dir.location
447 445 )
448 446
449 447 def stop_controller(self):
450 448 # self.log.info("In stop_controller")
451 449 if self.controller_launcher and self.controller_launcher.running:
452 450 return self.controller_launcher.stop()
453 451
454 452 def stop_launchers(self, r=None):
455 453 if not self._stopping:
456 454 self.stop_controller()
457 455 super(IPClusterStart, self).stop_launchers()
458 456
459 457 def start(self):
460 458 """Start the app for the start subcommand."""
461 459 # First see if the cluster is already running
462 460 try:
463 461 pid = self.get_pid_from_file()
464 462 except PIDFileError:
465 463 pass
466 464 else:
467 465 if self.check_pid(pid):
468 466 self.log.critical(
469 467 'Cluster is already running with [pid=%s]. '
470 468 'use "ipcluster stop" to stop the cluster.' % pid
471 469 )
472 470 # Here I exit with a unusual exit status that other processes
473 471 # can watch for to learn how I existed.
474 472 self.exit(ALREADY_STARTED)
475 473 else:
476 474 self.remove_pid_file()
477 475
478 476
479 477 # Now log and daemonize
480 478 self.log.info(
481 479 'Starting ipcluster with [daemon=%r]' % self.daemonize
482 480 )
483 481 # TODO: Get daemonize working on Windows or as a Windows Server.
484 482 if self.daemonize:
485 483 if os.name=='posix':
486 484 from twisted.scripts._twistd_unix import daemonize
487 485 daemonize()
488 486
489 487 dc = ioloop.DelayedCallback(self.start_controller, 0, self.loop)
490 488 dc.start()
491 489 dc = ioloop.DelayedCallback(self.start_engines, 1000*self.delay, self.loop)
492 490 dc.start()
493 491 # Now write the new pid file AFTER our new forked pid is active.
494 492 self.write_pid_file()
495 493 try:
496 494 self.loop.start()
497 495 except KeyboardInterrupt:
498 496 pass
499 497 except zmq.ZMQError as e:
500 498 if e.errno == errno.EINTR:
501 499 pass
502 500 else:
503 501 raise
504 502 finally:
505 503 self.remove_pid_file()
506 504
507 505 base='IPython.parallel.apps.ipclusterapp.IPCluster'
508 506
509 507 class IPClusterApp(Application):
510 508 name = u'ipcluster'
511 509 description = _description
512 510
513 511 subcommands = {'create' : (base+'Create', create_help),
514 512 'list' : (base+'List', list_help),
515 513 'start' : (base+'Start', start_help),
516 514 'stop' : (base+'Stop', stop_help),
517 515 'engines' : (base+'Engines', engines_help),
518 516 }
519 517
520 518 # no aliases or flags for parent App
521 519 aliases = Dict()
522 520 flags = Dict()
523 521
524 522 def start(self):
525 523 if self.subapp is None:
526 524 print "No subcommand specified! Must specify one of: %s"%(self.subcommands.keys())
527 525 print
528 526 self.print_subcommands()
529 527 self.exit(1)
530 528 else:
531 529 return self.subapp.start()
532 530
533 531 def launch_new_instance():
534 532 """Create and run the IPython cluster."""
535 533 app = IPClusterApp()
536 534 app.initialize()
537 535 app.start()
538 536
539 537
540 538 if __name__ == '__main__':
541 539 launch_new_instance()
542 540
@@ -1,405 +1,404 b''
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3 """
4 4 The IPython controller application.
5 5 """
6 6
7 7 #-----------------------------------------------------------------------------
8 8 # Copyright (C) 2008-2009 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING, distributed as part of this software.
12 12 #-----------------------------------------------------------------------------
13 13
14 14 #-----------------------------------------------------------------------------
15 15 # Imports
16 16 #-----------------------------------------------------------------------------
17 17
18 18 from __future__ import with_statement
19 19
20 20 import copy
21 21 import os
22 22 import logging
23 23 import socket
24 24 import stat
25 25 import sys
26 26 import uuid
27 27
28 28 from multiprocessing import Process
29 29
30 30 import zmq
31 31 from zmq.devices import ProcessMonitoredQueue
32 32 from zmq.log.handlers import PUBHandler
33 33 from zmq.utils import jsonapi as json
34 34
35 35 from IPython.config.loader import Config
36 36
37 37 from IPython.parallel import factory
38 38
39 39 from IPython.parallel.apps.clusterdir import (
40 40 ClusterDir,
41 41 ClusterApplication,
42 42 base_flags
43 43 # ClusterDirConfigLoader
44 44 )
45 45 from IPython.utils.importstring import import_item
46 46 from IPython.utils.traitlets import Instance, Unicode, Bool, List, Dict
47 47
48 48 # from IPython.parallel.controller.controller import ControllerFactory
49 49 from IPython.parallel.streamsession import StreamSession
50 50 from IPython.parallel.controller.heartmonitor import HeartMonitor
51 51 from IPython.parallel.controller.hub import Hub, HubFactory
52 52 from IPython.parallel.controller.scheduler import TaskScheduler,launch_scheduler
53 53 from IPython.parallel.controller.sqlitedb import SQLiteDB
54 54
55 55 from IPython.parallel.util import signal_children,disambiguate_ip_address, split_url
56 56
57 57 # conditional import of MongoDB backend class
58 58
59 59 try:
60 60 from IPython.parallel.controller.mongodb import MongoDB
61 61 except ImportError:
62 62 maybe_mongo = []
63 63 else:
64 64 maybe_mongo = [MongoDB]
65 65
66 66
67 67 #-----------------------------------------------------------------------------
68 68 # Module level variables
69 69 #-----------------------------------------------------------------------------
70 70
71 71
72 72 #: The default config file name for this application
73 73 default_config_file_name = u'ipcontroller_config.py'
74 74
75 75
76 76 _description = """Start the IPython controller for parallel computing.
77 77
78 78 The IPython controller provides a gateway between the IPython engines and
79 79 clients. The controller needs to be started before the engines and can be
80 80 configured using command line options or using a cluster directory. Cluster
81 81 directories contain config, log and security files and are usually located in
82 82 your ipython directory and named as "cluster_<profile>". See the `profile`
83 83 and `cluster_dir` options for details.
84 84 """
85 85
86 86
87 87
88 88
89 89 #-----------------------------------------------------------------------------
90 90 # The main application
91 91 #-----------------------------------------------------------------------------
92 92 flags = {}
93 93 flags.update(base_flags)
94 94 flags.update({
95 95 'usethreads' : ( {'IPControllerApp' : {'use_threads' : True}},
96 96 'Use threads instead of processes for the schedulers'),
97 97 'sqlitedb' : ({'HubFactory' : Config({'db_class' : 'IPython.parallel.controller.sqlitedb.SQLiteDB'})},
98 98 'use the SQLiteDB backend'),
99 99 'mongodb' : ({'HubFactory' : Config({'db_class' : 'IPython.parallel.controller.mongodb.MongoDB'})},
100 100 'use the MongoDB backend'),
101 101 'dictdb' : ({'HubFactory' : Config({'db_class' : 'IPython.parallel.controller.dictdb.DictDB'})},
102 102 'use the in-memory DictDB backend'),
103 103 'reuse' : ({'IPControllerApp' : Config({'reuse_files' : True})},
104 104 'reuse existing json connection files')
105 105 })
106 106
107 107 flags.update()
108 108
109 109 class IPControllerApp(ClusterApplication):
110 110
111 111 name = u'ipcontroller'
112 112 description = _description
113 # command_line_loader = IPControllerAppConfigLoader
114 default_config_file_name = default_config_file_name
113 config_file_name = Unicode(default_config_file_name)
115 114 classes = [ClusterDir, StreamSession, HubFactory, TaskScheduler, HeartMonitor, SQLiteDB] + maybe_mongo
116 115
117 116 auto_create_cluster_dir = Bool(True, config=True,
118 117 help="Whether to create cluster_dir if it exists.")
119 118 reuse_files = Bool(False, config=True,
120 119 help='Whether to reuse existing json connection files [default: False]'
121 120 )
122 121 secure = Bool(True, config=True,
123 122 help='Whether to use exec_keys for extra authentication [default: True]'
124 123 )
125 124 ssh_server = Unicode(u'', config=True,
126 125 help="""ssh url for clients to use when connecting to the Controller
127 126 processes. It should be of the form: [user@]server[:port]. The
128 127 Controller\'s listening addresses must be accessible from the ssh server""",
129 128 )
130 129 location = Unicode(u'', config=True,
131 130 help="""The external IP or domain name of the Controller, used for disambiguating
132 131 engine and client connections.""",
133 132 )
134 133 import_statements = List([], config=True,
135 134 help="import statements to be run at startup. Necessary in some environments"
136 135 )
137 136
138 137 use_threads = Bool(False, config=True,
139 138 help='Use threads instead of processes for the schedulers',
140 139 )
141 140
142 141 # internal
143 142 children = List()
144 143 mq_class = Unicode('zmq.devices.ProcessMonitoredQueue')
145 144
146 145 def _use_threads_changed(self, name, old, new):
147 146 self.mq_class = 'zmq.devices.%sMonitoredQueue'%('Thread' if new else 'Process')
148 147
149 148 aliases = Dict(dict(
150 149 config = 'IPControllerApp.config_file',
151 150 # file = 'IPControllerApp.url_file',
152 151 log_level = 'IPControllerApp.log_level',
153 152 log_url = 'IPControllerApp.log_url',
154 153 reuse_files = 'IPControllerApp.reuse_files',
155 154 secure = 'IPControllerApp.secure',
156 155 ssh = 'IPControllerApp.ssh_server',
157 156 use_threads = 'IPControllerApp.use_threads',
158 157 import_statements = 'IPControllerApp.import_statements',
159 158 location = 'IPControllerApp.location',
160 159
161 160 ident = 'StreamSession.session',
162 161 user = 'StreamSession.username',
163 162 exec_key = 'StreamSession.keyfile',
164 163
165 164 url = 'HubFactory.url',
166 165 ip = 'HubFactory.ip',
167 166 transport = 'HubFactory.transport',
168 167 port = 'HubFactory.regport',
169 168
170 169 ping = 'HeartMonitor.period',
171 170
172 171 scheme = 'TaskScheduler.scheme_name',
173 172 hwm = 'TaskScheduler.hwm',
174 173
175 174
176 175 profile = "ClusterDir.profile",
177 176 cluster_dir = 'ClusterDir.location',
178 177
179 178 ))
180 179 flags = Dict(flags)
181 180
182 181
183 182 def save_connection_dict(self, fname, cdict):
184 183 """save a connection dict to json file."""
185 184 c = self.config
186 185 url = cdict['url']
187 186 location = cdict['location']
188 187 if not location:
189 188 try:
190 189 proto,ip,port = split_url(url)
191 190 except AssertionError:
192 191 pass
193 192 else:
194 193 location = socket.gethostbyname_ex(socket.gethostname())[2][-1]
195 194 cdict['location'] = location
196 195 fname = os.path.join(self.cluster_dir.security_dir, fname)
197 196 with open(fname, 'w') as f:
198 197 f.write(json.dumps(cdict, indent=2))
199 198 os.chmod(fname, stat.S_IRUSR|stat.S_IWUSR)
200 199
201 200 def load_config_from_json(self):
202 201 """load config from existing json connector files."""
203 202 c = self.config
204 203 # load from engine config
205 204 with open(os.path.join(self.cluster_dir.security_dir, 'ipcontroller-engine.json')) as f:
206 205 cfg = json.loads(f.read())
207 206 key = c.StreamSession.key = cfg['exec_key']
208 207 xport,addr = cfg['url'].split('://')
209 208 c.HubFactory.engine_transport = xport
210 209 ip,ports = addr.split(':')
211 210 c.HubFactory.engine_ip = ip
212 211 c.HubFactory.regport = int(ports)
213 212 self.location = cfg['location']
214 213
215 214 # load client config
216 215 with open(os.path.join(self.cluster_dir.security_dir, 'ipcontroller-client.json')) as f:
217 216 cfg = json.loads(f.read())
218 217 assert key == cfg['exec_key'], "exec_key mismatch between engine and client keys"
219 218 xport,addr = cfg['url'].split('://')
220 219 c.HubFactory.client_transport = xport
221 220 ip,ports = addr.split(':')
222 221 c.HubFactory.client_ip = ip
223 222 self.ssh_server = cfg['ssh']
224 223 assert int(ports) == c.HubFactory.regport, "regport mismatch"
225 224
226 225 def init_hub(self):
227 226 c = self.config
228 227
229 228 self.do_import_statements()
230 229 reusing = self.reuse_files
231 230 if reusing:
232 231 try:
233 232 self.load_config_from_json()
234 233 except (AssertionError,IOError):
235 234 reusing=False
236 235 # check again, because reusing may have failed:
237 236 if reusing:
238 237 pass
239 238 elif self.secure:
240 239 key = str(uuid.uuid4())
241 240 # keyfile = os.path.join(self.cluster_dir.security_dir, self.exec_key)
242 241 # with open(keyfile, 'w') as f:
243 242 # f.write(key)
244 243 # os.chmod(keyfile, stat.S_IRUSR|stat.S_IWUSR)
245 244 c.StreamSession.key = key
246 245 else:
247 246 key = c.StreamSession.key = ''
248 247
249 248 try:
250 249 self.factory = HubFactory(config=c, log=self.log)
251 250 # self.start_logging()
252 251 self.factory.init_hub()
253 252 except:
254 253 self.log.error("Couldn't construct the Controller", exc_info=True)
255 254 self.exit(1)
256 255
257 256 if not reusing:
258 257 # save to new json config files
259 258 f = self.factory
260 259 cdict = {'exec_key' : key,
261 260 'ssh' : self.ssh_server,
262 261 'url' : "%s://%s:%s"%(f.client_transport, f.client_ip, f.regport),
263 262 'location' : self.location
264 263 }
265 264 self.save_connection_dict('ipcontroller-client.json', cdict)
266 265 edict = cdict
267 266 edict['url']="%s://%s:%s"%((f.client_transport, f.client_ip, f.regport))
268 267 self.save_connection_dict('ipcontroller-engine.json', edict)
269 268
270 269 #
271 270 def init_schedulers(self):
272 271 children = self.children
273 272 mq = import_item(str(self.mq_class))
274 273
275 274 hub = self.factory
276 275 # maybe_inproc = 'inproc://monitor' if self.use_threads else self.monitor_url
277 276 # IOPub relay (in a Process)
278 277 q = mq(zmq.PUB, zmq.SUB, zmq.PUB, 'N/A','iopub')
279 278 q.bind_in(hub.client_info['iopub'])
280 279 q.bind_out(hub.engine_info['iopub'])
281 280 q.setsockopt_out(zmq.SUBSCRIBE, '')
282 281 q.connect_mon(hub.monitor_url)
283 282 q.daemon=True
284 283 children.append(q)
285 284
286 285 # Multiplexer Queue (in a Process)
287 286 q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'in', 'out')
288 287 q.bind_in(hub.client_info['mux'])
289 288 q.setsockopt_in(zmq.IDENTITY, 'mux')
290 289 q.bind_out(hub.engine_info['mux'])
291 290 q.connect_mon(hub.monitor_url)
292 291 q.daemon=True
293 292 children.append(q)
294 293
295 294 # Control Queue (in a Process)
296 295 q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'incontrol', 'outcontrol')
297 296 q.bind_in(hub.client_info['control'])
298 297 q.setsockopt_in(zmq.IDENTITY, 'control')
299 298 q.bind_out(hub.engine_info['control'])
300 299 q.connect_mon(hub.monitor_url)
301 300 q.daemon=True
302 301 children.append(q)
303 302 try:
304 303 scheme = self.config.TaskScheduler.scheme_name
305 304 except AttributeError:
306 305 scheme = TaskScheduler.scheme_name.get_default_value()
307 306 # Task Queue (in a Process)
308 307 if scheme == 'pure':
309 308 self.log.warn("task::using pure XREQ Task scheduler")
310 309 q = mq(zmq.XREP, zmq.XREQ, zmq.PUB, 'intask', 'outtask')
311 310 # q.setsockopt_out(zmq.HWM, hub.hwm)
312 311 q.bind_in(hub.client_info['task'][1])
313 312 q.setsockopt_in(zmq.IDENTITY, 'task')
314 313 q.bind_out(hub.engine_info['task'])
315 314 q.connect_mon(hub.monitor_url)
316 315 q.daemon=True
317 316 children.append(q)
318 317 elif scheme == 'none':
319 318 self.log.warn("task::using no Task scheduler")
320 319
321 320 else:
322 321 self.log.info("task::using Python %s Task scheduler"%scheme)
323 322 sargs = (hub.client_info['task'][1], hub.engine_info['task'],
324 323 hub.monitor_url, hub.client_info['notification'])
325 324 kwargs = dict(logname='scheduler', loglevel=self.log_level,
326 325 log_url = self.log_url, config=dict(self.config))
327 326 q = Process(target=launch_scheduler, args=sargs, kwargs=kwargs)
328 327 q.daemon=True
329 328 children.append(q)
330 329
331 330
332 331 def save_urls(self):
333 332 """save the registration urls to files."""
334 333 c = self.config
335 334
336 335 sec_dir = self.cluster_dir.security_dir
337 336 cf = self.factory
338 337
339 338 with open(os.path.join(sec_dir, 'ipcontroller-engine.url'), 'w') as f:
340 339 f.write("%s://%s:%s"%(cf.engine_transport, cf.engine_ip, cf.regport))
341 340
342 341 with open(os.path.join(sec_dir, 'ipcontroller-client.url'), 'w') as f:
343 342 f.write("%s://%s:%s"%(cf.client_transport, cf.client_ip, cf.regport))
344 343
345 344
346 345 def do_import_statements(self):
347 346 statements = self.import_statements
348 347 for s in statements:
349 348 try:
350 349 self.log.msg("Executing statement: '%s'" % s)
351 350 exec s in globals(), locals()
352 351 except:
353 352 self.log.msg("Error running statement: %s" % s)
354 353
355 354 def forward_logging(self):
356 355 if self.log_url:
357 356 self.log.info("Forwarding logging to %s"%self.log_url)
358 357 context = zmq.Context.instance()
359 358 lsock = context.socket(zmq.PUB)
360 359 lsock.connect(self.log_url)
361 360 handler = PUBHandler(lsock)
362 361 self.log.removeHandler(self._log_handler)
363 362 handler.root_topic = 'controller'
364 363 handler.setLevel(self.log_level)
365 364 self.log.addHandler(handler)
366 365 self._log_handler = handler
367 366 # #
368 367
369 368 def initialize(self, argv=None):
370 369 super(IPControllerApp, self).initialize(argv)
371 370 self.forward_logging()
372 371 self.init_hub()
373 372 self.init_schedulers()
374 373
375 374 def start(self):
376 375 # Start the subprocesses:
377 376 self.factory.start()
378 377 child_procs = []
379 378 for child in self.children:
380 379 child.start()
381 380 if isinstance(child, ProcessMonitoredQueue):
382 381 child_procs.append(child.launcher)
383 382 elif isinstance(child, Process):
384 383 child_procs.append(child)
385 384 if child_procs:
386 385 signal_children(child_procs)
387 386
388 387 self.write_pid_file(overwrite=True)
389 388
390 389 try:
391 390 self.factory.loop.start()
392 391 except KeyboardInterrupt:
393 392 self.log.critical("Interrupted, Exiting...\n")
394 393
395 394
396 395
397 396 def launch_new_instance():
398 397 """Create and run the IPython controller"""
399 398 app = IPControllerApp()
400 399 app.initialize()
401 400 app.start()
402 401
403 402
404 403 if __name__ == '__main__':
405 404 launch_new_instance()
@@ -1,277 +1,277 b''
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3 """
4 4 The IPython engine application
5 5 """
6 6
7 7 #-----------------------------------------------------------------------------
8 8 # Copyright (C) 2008-2009 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING, distributed as part of this software.
12 12 #-----------------------------------------------------------------------------
13 13
14 14 #-----------------------------------------------------------------------------
15 15 # Imports
16 16 #-----------------------------------------------------------------------------
17 17
18 18 import json
19 19 import os
20 20 import sys
21 21
22 22 import zmq
23 23 from zmq.eventloop import ioloop
24 24
25 25 from IPython.parallel.apps.clusterdir import (
26 26 ClusterApplication,
27 27 ClusterDir,
28 28 # ClusterDirConfigLoader
29 29 )
30 30 from IPython.zmq.log import EnginePUBHandler
31 31
32 32 from IPython.config.configurable import Configurable
33 33 from IPython.parallel.streamsession import StreamSession
34 34 from IPython.parallel.engine.engine import EngineFactory
35 35 from IPython.parallel.engine.streamkernel import Kernel
36 36 from IPython.parallel.util import disambiguate_url
37 37
38 38 from IPython.utils.importstring import import_item
39 39 from IPython.utils.traitlets import Bool, Unicode, Dict, List
40 40
41 41
42 42 #-----------------------------------------------------------------------------
43 43 # Module level variables
44 44 #-----------------------------------------------------------------------------
45 45
46 46 #: The default config file name for this application
47 47 default_config_file_name = u'ipengine_config.py'
48 48
49 49 _description = """Start an IPython engine for parallel computing.
50 50
51 51 IPython engines run in parallel and perform computations on behalf of a client
52 52 and controller. A controller needs to be started before the engines. The
53 53 engine can be configured using command line options or using a cluster
54 54 directory. Cluster directories contain config, log and security files and are
55 55 usually located in your ipython directory and named as "cluster_<profile>".
56 56 See the `profile` and `cluster_dir` options for details.
57 57 """
58 58
59 59
60 60 #-----------------------------------------------------------------------------
61 61 # MPI configuration
62 62 #-----------------------------------------------------------------------------
63 63
64 64 mpi4py_init = """from mpi4py import MPI as mpi
65 65 mpi.size = mpi.COMM_WORLD.Get_size()
66 66 mpi.rank = mpi.COMM_WORLD.Get_rank()
67 67 """
68 68
69 69
70 70 pytrilinos_init = """from PyTrilinos import Epetra
71 71 class SimpleStruct:
72 72 pass
73 73 mpi = SimpleStruct()
74 74 mpi.rank = 0
75 75 mpi.size = 0
76 76 """
77 77
78 78 class MPI(Configurable):
79 79 """Configurable for MPI initialization"""
80 80 use = Unicode('', config=True,
81 81 help='How to enable MPI (mpi4py, pytrilinos, or empty string to disable).'
82 82 )
83 83
84 84 def _on_use_changed(self, old, new):
85 85 # load default init script if it's not set
86 86 if not self.init_script:
87 87 self.init_script = self.default_inits.get(new, '')
88 88
89 89 init_script = Unicode('', config=True,
90 90 help="Initialization code for MPI")
91 91
92 92 default_inits = Dict({'mpi4py' : mpi4py_init, 'pytrilinos':pytrilinos_init},
93 93 config=True)
94 94
95 95
96 96 #-----------------------------------------------------------------------------
97 97 # Main application
98 98 #-----------------------------------------------------------------------------
99 99
100 100
101 101 class IPEngineApp(ClusterApplication):
102 102
103 103 app_name = Unicode(u'ipengine')
104 104 description = Unicode(_description)
105 default_config_file_name = default_config_file_name
105 config_file_name = Unicode(default_config_file_name)
106 106 classes = List([ClusterDir, StreamSession, EngineFactory, Kernel, MPI])
107 107
108 108 auto_create_cluster_dir = Bool(False,
109 109 help="whether to create the cluster_dir if it doesn't exist")
110 110
111 111 startup_script = Unicode(u'', config=True,
112 112 help='specify a script to be run at startup')
113 113 startup_command = Unicode('', config=True,
114 114 help='specify a command to be run at startup')
115 115
116 116 url_file = Unicode(u'', config=True,
117 117 help="""The full location of the file containing the connection information for
118 118 the controller. If this is not given, the file must be in the
119 119 security directory of the cluster directory. This location is
120 120 resolved using the `profile` or `cluster_dir` options.""",
121 121 )
122 122
123 123 url_file_name = Unicode(u'ipcontroller-engine.json')
124 124 log_url = Unicode('', config=True,
125 125 help="""The URL for the iploggerapp instance, for forwarding
126 126 logging to a central location.""")
127 127
128 128 aliases = Dict(dict(
129 129 config = 'IPEngineApp.config_file',
130 130 file = 'IPEngineApp.url_file',
131 131 c = 'IPEngineApp.startup_command',
132 132 s = 'IPEngineApp.startup_script',
133 133
134 134 ident = 'StreamSession.session',
135 135 user = 'StreamSession.username',
136 136 exec_key = 'StreamSession.keyfile',
137 137
138 138 url = 'EngineFactory.url',
139 139 ip = 'EngineFactory.ip',
140 140 transport = 'EngineFactory.transport',
141 141 port = 'EngineFactory.regport',
142 142 location = 'EngineFactory.location',
143 143
144 144 timeout = 'EngineFactory.timeout',
145 145
146 146 profile = "ClusterDir.profile",
147 147 cluster_dir = 'ClusterDir.location',
148 148
149 149 mpi = 'MPI.use',
150 150
151 151 log_level = 'IPEngineApp.log_level',
152 152 log_url = 'IPEngineApp.log_url'
153 153 ))
154 154
155 155 # def find_key_file(self):
156 156 # """Set the key file.
157 157 #
158 158 # Here we don't try to actually see if it exists for is valid as that
159 159 # is hadled by the connection logic.
160 160 # """
161 161 # config = self.master_config
162 162 # # Find the actual controller key file
163 163 # if not config.Global.key_file:
164 164 # try_this = os.path.join(
165 165 # config.Global.cluster_dir,
166 166 # config.Global.security_dir,
167 167 # config.Global.key_file_name
168 168 # )
169 169 # config.Global.key_file = try_this
170 170
171 171 def find_url_file(self):
172 172 """Set the key file.
173 173
174 174 Here we don't try to actually see if it exists for is valid as that
175 175 is hadled by the connection logic.
176 176 """
177 177 config = self.config
178 178 # Find the actual controller key file
179 179 if not self.url_file:
180 180 self.url_file = os.path.join(
181 181 self.cluster_dir.security_dir,
182 182 self.url_file_name
183 183 )
184 184 def init_engine(self):
185 185 # This is the working dir by now.
186 186 sys.path.insert(0, '')
187 187 config = self.config
188 188 # print config
189 189 self.find_url_file()
190 190
191 191 # if os.path.exists(config.Global.key_file) and config.Global.secure:
192 192 # config.SessionFactory.exec_key = config.Global.key_file
193 193 if os.path.exists(self.url_file):
194 194 with open(self.url_file) as f:
195 195 d = json.loads(f.read())
196 196 for k,v in d.iteritems():
197 197 if isinstance(v, unicode):
198 198 d[k] = v.encode()
199 199 if d['exec_key']:
200 200 config.StreamSession.key = d['exec_key']
201 201 d['url'] = disambiguate_url(d['url'], d['location'])
202 202 config.EngineFactory.url = d['url']
203 203 config.EngineFactory.location = d['location']
204 204
205 205 try:
206 206 exec_lines = config.Kernel.exec_lines
207 207 except AttributeError:
208 208 config.Kernel.exec_lines = []
209 209 exec_lines = config.Kernel.exec_lines
210 210
211 211 if self.startup_script:
212 212 enc = sys.getfilesystemencoding() or 'utf8'
213 213 cmd="execfile(%r)"%self.startup_script.encode(enc)
214 214 exec_lines.append(cmd)
215 215 if self.startup_command:
216 216 exec_lines.append(self.startup_command)
217 217
218 218 # Create the underlying shell class and Engine
219 219 # shell_class = import_item(self.master_config.Global.shell_class)
220 220 # print self.config
221 221 try:
222 222 self.engine = EngineFactory(config=config, log=self.log)
223 223 except:
224 224 self.log.error("Couldn't start the Engine", exc_info=True)
225 225 self.exit(1)
226 226
227 227 def forward_logging(self):
228 228 if self.log_url:
229 229 self.log.info("Forwarding logging to %s"%self.log_url)
230 230 context = self.engine.context
231 231 lsock = context.socket(zmq.PUB)
232 232 lsock.connect(self.log_url)
233 233 self.log.removeHandler(self._log_handler)
234 234 handler = EnginePUBHandler(self.engine, lsock)
235 235 handler.setLevel(self.log_level)
236 236 self.log.addHandler(handler)
237 237 self._log_handler = handler
238 238 #
239 239 def init_mpi(self):
240 240 global mpi
241 241 self.mpi = MPI(config=self.config)
242 242
243 243 mpi_import_statement = self.mpi.init_script
244 244 if mpi_import_statement:
245 245 try:
246 246 self.log.info("Initializing MPI:")
247 247 self.log.info(mpi_import_statement)
248 248 exec mpi_import_statement in globals()
249 249 except:
250 250 mpi = None
251 251 else:
252 252 mpi = None
253 253
254 254 def initialize(self, argv=None):
255 255 super(IPEngineApp, self).initialize(argv)
256 256 self.init_mpi()
257 257 self.init_engine()
258 258 self.forward_logging()
259 259
260 260 def start(self):
261 261 self.engine.start()
262 262 try:
263 263 self.engine.loop.start()
264 264 except KeyboardInterrupt:
265 265 self.log.critical("Engine Interrupted, shutting down...\n")
266 266
267 267
268 268 def launch_new_instance():
269 269 """Create and run the IPython engine"""
270 270 app = IPEngineApp()
271 271 app.initialize()
272 272 app.start()
273 273
274 274
275 275 if __name__ == '__main__':
276 276 launch_new_instance()
277 277
@@ -1,97 +1,97 b''
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3 """
4 4 A simple IPython logger application
5 5 """
6 6
7 7 #-----------------------------------------------------------------------------
8 8 # Copyright (C) 2011 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING, distributed as part of this software.
12 12 #-----------------------------------------------------------------------------
13 13
14 14 #-----------------------------------------------------------------------------
15 15 # Imports
16 16 #-----------------------------------------------------------------------------
17 17
18 18 import os
19 19 import sys
20 20
21 21 import zmq
22 22
23 from IPython.utils.traitlets import Bool, Dict
23 from IPython.utils.traitlets import Bool, Dict, Unicode
24 24
25 25 from IPython.parallel.apps.clusterdir import (
26 26 ClusterApplication,
27 27 ClusterDir,
28 28 base_aliases
29 29 )
30 30 from IPython.parallel.apps.logwatcher import LogWatcher
31 31
32 32 #-----------------------------------------------------------------------------
33 33 # Module level variables
34 34 #-----------------------------------------------------------------------------
35 35
36 36 #: The default config file name for this application
37 37 default_config_file_name = u'iplogger_config.py'
38 38
39 39 _description = """Start an IPython logger for parallel computing.
40 40
41 41 IPython controllers and engines (and your own processes) can broadcast log messages
42 42 by registering a `zmq.log.handlers.PUBHandler` with the `logging` module. The
43 43 logger can be configured using command line options or using a cluster
44 44 directory. Cluster directories contain config, log and security files and are
45 45 usually located in your ipython directory and named as "cluster_<profile>".
46 46 See the `profile` and `cluster_dir` options for details.
47 47 """
48 48
49 49
50 50 #-----------------------------------------------------------------------------
51 51 # Main application
52 52 #-----------------------------------------------------------------------------
53 53 aliases = {}
54 54 aliases.update(base_aliases)
55 55 aliases.update(dict(url='LogWatcher.url', topics='LogWatcher.topics'))
56 56
57 57 class IPLoggerApp(ClusterApplication):
58 58
59 59 name = u'iploggerz'
60 60 description = _description
61 default_config_file_name = default_config_file_name
61 config_file_name = Unicode(default_config_file_name)
62 62 auto_create_cluster_dir = Bool(False)
63 63
64 64 classes = [LogWatcher, ClusterDir]
65 65 aliases = Dict(aliases)
66 66
67 67 def initialize(self, argv=None):
68 68 super(IPLoggerApp, self).initialize(argv)
69 69 self.init_watcher()
70 70
71 71 def init_watcher(self):
72 72 try:
73 73 self.watcher = LogWatcher(config=self.config, logname=self.log.name)
74 74 except:
75 75 self.log.error("Couldn't start the LogWatcher", exc_info=True)
76 76 self.exit(1)
77 77 self.log.info("Listening for log messages on %r"%self.watcher.url)
78 78
79 79
80 80 def start(self):
81 81 self.watcher.start()
82 82 try:
83 83 self.watcher.loop.start()
84 84 except KeyboardInterrupt:
85 85 self.log.critical("Logging Interrupted, shutting down...\n")
86 86
87 87
88 88 def launch_new_instance():
89 89 """Create and run the IPython LogWatcher"""
90 90 app = IPLoggerApp()
91 91 app.initialize()
92 92 app.start()
93 93
94 94
95 95 if __name__ == '__main__':
96 96 launch_new_instance()
97 97
General Comments 0
You need to be logged in to leave comments. Login now