##// END OF EJS Templates
ipcluster implemented with new subcommands
MinRK -
Show More
@@ -1,492 +1,525 b''
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3 """
4 4 The IPython cluster directory
5 5 """
6 6
7 7 #-----------------------------------------------------------------------------
8 8 # Copyright (C) 2008-2009 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING, distributed as part of this software.
12 12 #-----------------------------------------------------------------------------
13 13
14 14 #-----------------------------------------------------------------------------
15 15 # Imports
16 16 #-----------------------------------------------------------------------------
17 17
18 18 from __future__ import with_statement
19 19
20 20 import os
21 21 import logging
22 22 import re
23 23 import shutil
24 24 import sys
25 25
26 26 from subprocess import Popen, PIPE
27 27
28 28 from IPython.config.loader import PyFileConfigLoader, Config
29 29 from IPython.config.configurable import Configurable
30 30 from IPython.config.application import Application
31 31 from IPython.core.crashhandler import CrashHandler
32 32 from IPython.core.newapplication import BaseIPythonApplication
33 33 from IPython.core import release
34 34 from IPython.utils.path import (
35 35 get_ipython_package_dir,
36 36 get_ipython_dir,
37 37 expand_path
38 38 )
39 39 from IPython.utils.traitlets import Unicode, Bool, CStr, Instance, Dict
40 40
41 41 #-----------------------------------------------------------------------------
42 42 # Module errors
43 43 #-----------------------------------------------------------------------------
44 44
45 45 class ClusterDirError(Exception):
46 46 pass
47 47
48 48
49 49 class PIDFileError(Exception):
50 50 pass
51 51
52 52
53 53 #-----------------------------------------------------------------------------
54 54 # Class for managing cluster directories
55 55 #-----------------------------------------------------------------------------
56 56
57 57 class ClusterDir(Configurable):
58 58 """An object to manage the cluster directory and its resources.
59 59
60 60 The cluster directory is used by :command:`ipengine`,
61 61 :command:`ipcontroller` and :command:`ipclsuter` to manage the
62 62 configuration, logging and security of these applications.
63 63
64 64 This object knows how to find, create and manage these directories. This
65 65 should be used by any code that want's to handle cluster directories.
66 66 """
67 67
68 68 security_dir_name = Unicode('security')
69 69 log_dir_name = Unicode('log')
70 70 pid_dir_name = Unicode('pid')
71 71 security_dir = Unicode(u'')
72 72 log_dir = Unicode(u'')
73 73 pid_dir = Unicode(u'')
74 74
75 75 location = Unicode(u'', config=True,
76 76 help="""Set the cluster dir. This overrides the logic used by the
77 77 `profile` option.""",
78 78 )
79 79 profile = Unicode(u'default',
80 80 help="""The string name of the profile to be used. This determines the name
81 81 of the cluster dir as: cluster_<profile>. The default profile is named
82 82 'default'. The cluster directory is resolve this way if the
83 83 `cluster_dir` option is not used.""", config=True
84 84 )
85 auto_create = Bool(False,
86 help="""Whether to automatically create the ClusterDirectory if it does
87 not exist""")
88 overwrite = Bool(False,
89 help="""Whether to overwrite existing config files""")
85 90
86 91 _location_isset = Bool(False) # flag for detecting multiply set location
87 92 _new_dir = Bool(False) # flag for whether a new dir was created
88 93
89 94 def __init__(self, **kwargs):
90 95 super(ClusterDir, self).__init__(**kwargs)
91 96 if not self.location:
92 97 self._profile_changed('profile', 'default', self.profile)
93 98
94 99 def _location_changed(self, name, old, new):
95 100 if self._location_isset:
96 101 raise RuntimeError("Cannot set ClusterDir more than once.")
97 102 self._location_isset = True
98 103 if not os.path.isdir(new):
99 os.makedirs(new)
100 self._new_dir = True
104 if self.auto_create:
105 os.makedirs(new)
106 self._new_dir = True
107 else:
108 raise ClusterDirError('Directory not found: %s' % new)
109
101 110 # ensure config files exist:
102 self.copy_all_config_files(overwrite=False)
111 self.copy_all_config_files(overwrite=self.overwrite)
103 112 self.security_dir = os.path.join(new, self.security_dir_name)
104 113 self.log_dir = os.path.join(new, self.log_dir_name)
105 114 self.pid_dir = os.path.join(new, self.pid_dir_name)
106 115 self.check_dirs()
107 116
108 117 def _profile_changed(self, name, old, new):
109 118 if self._location_isset:
110 119 raise RuntimeError("ClusterDir already set. Cannot set by profile.")
111 120 self.location = os.path.join(get_ipython_dir(), 'cluster_'+new)
112 121
113 122 def _log_dir_changed(self, name, old, new):
114 123 self.check_log_dir()
115 124
116 125 def check_log_dir(self):
117 126 if not os.path.isdir(self.log_dir):
118 127 os.mkdir(self.log_dir)
119 128
120 129 def _security_dir_changed(self, name, old, new):
121 130 self.check_security_dir()
122 131
123 132 def check_security_dir(self):
124 133 if not os.path.isdir(self.security_dir):
125 134 os.mkdir(self.security_dir, 0700)
126 135 os.chmod(self.security_dir, 0700)
127 136
128 137 def _pid_dir_changed(self, name, old, new):
129 138 self.check_pid_dir()
130 139
131 140 def check_pid_dir(self):
132 141 if not os.path.isdir(self.pid_dir):
133 142 os.mkdir(self.pid_dir, 0700)
134 143 os.chmod(self.pid_dir, 0700)
135 144
136 145 def check_dirs(self):
137 146 self.check_security_dir()
138 147 self.check_log_dir()
139 148 self.check_pid_dir()
140 149
141 150 def copy_config_file(self, config_file, path=None, overwrite=False):
142 151 """Copy a default config file into the active cluster directory.
143 152
144 153 Default configuration files are kept in :mod:`IPython.config.default`.
145 154 This function moves these from that location to the working cluster
146 155 directory.
147 156 """
148 157 if path is None:
149 158 import IPython.config.default
150 159 path = IPython.config.default.__file__.split(os.path.sep)[:-1]
151 160 path = os.path.sep.join(path)
152 161 src = os.path.join(path, config_file)
153 162 dst = os.path.join(self.location, config_file)
154 163 if not os.path.isfile(dst) or overwrite:
155 164 shutil.copy(src, dst)
156 165
157 166 def copy_all_config_files(self, path=None, overwrite=False):
158 167 """Copy all config files into the active cluster directory."""
159 168 for f in [u'ipcontroller_config.py', u'ipengine_config.py',
160 169 u'ipcluster_config.py']:
161 170 self.copy_config_file(f, path=path, overwrite=overwrite)
162 171
163 172 @classmethod
164 173 def create_cluster_dir(csl, cluster_dir):
165 174 """Create a new cluster directory given a full path.
166 175
167 176 Parameters
168 177 ----------
169 178 cluster_dir : str
170 179 The full path to the cluster directory. If it does exist, it will
171 180 be used. If not, it will be created.
172 181 """
173 182 return ClusterDir(location=cluster_dir)
174 183
175 184 @classmethod
176 185 def create_cluster_dir_by_profile(cls, path, profile=u'default'):
177 186 """Create a cluster dir by profile name and path.
178 187
179 188 Parameters
180 189 ----------
181 190 path : str
182 191 The path (directory) to put the cluster directory in.
183 192 profile : str
184 193 The name of the profile. The name of the cluster directory will
185 194 be "cluster_<profile>".
186 195 """
187 196 if not os.path.isdir(path):
188 197 raise ClusterDirError('Directory not found: %s' % path)
189 198 cluster_dir = os.path.join(path, u'cluster_' + profile)
190 199 return ClusterDir(location=cluster_dir)
191 200
192 201 @classmethod
193 202 def find_cluster_dir_by_profile(cls, ipython_dir, profile=u'default'):
194 203 """Find an existing cluster dir by profile name, return its ClusterDir.
195 204
196 205 This searches through a sequence of paths for a cluster dir. If it
197 206 is not found, a :class:`ClusterDirError` exception will be raised.
198 207
199 208 The search path algorithm is:
200 209 1. ``os.getcwd()``
201 210 2. ``ipython_dir``
202 211 3. The directories found in the ":" separated
203 212 :env:`IPCLUSTER_DIR_PATH` environment variable.
204 213
205 214 Parameters
206 215 ----------
207 216 ipython_dir : unicode or str
208 217 The IPython directory to use.
209 218 profile : unicode or str
210 219 The name of the profile. The name of the cluster directory
211 220 will be "cluster_<profile>".
212 221 """
213 222 dirname = u'cluster_' + profile
214 223 cluster_dir_paths = os.environ.get('IPCLUSTER_DIR_PATH','')
215 224 if cluster_dir_paths:
216 225 cluster_dir_paths = cluster_dir_paths.split(':')
217 226 else:
218 227 cluster_dir_paths = []
219 228 paths = [os.getcwd(), ipython_dir] + cluster_dir_paths
220 229 for p in paths:
221 230 cluster_dir = os.path.join(p, dirname)
222 231 if os.path.isdir(cluster_dir):
223 232 return ClusterDir(location=cluster_dir)
224 233 else:
225 234 raise ClusterDirError('Cluster directory not found in paths: %s' % dirname)
226 235
227 236 @classmethod
228 237 def find_cluster_dir(cls, cluster_dir):
229 238 """Find/create a cluster dir and return its ClusterDir.
230 239
231 240 This will create the cluster directory if it doesn't exist.
232 241
233 242 Parameters
234 243 ----------
235 244 cluster_dir : unicode or str
236 245 The path of the cluster directory. This is expanded using
237 246 :func:`IPython.utils.genutils.expand_path`.
238 247 """
239 248 cluster_dir = expand_path(cluster_dir)
240 249 if not os.path.isdir(cluster_dir):
241 250 raise ClusterDirError('Cluster directory not found: %s' % cluster_dir)
242 251 return ClusterDir(location=cluster_dir)
243 252
244 253
245 254 #-----------------------------------------------------------------------------
246 255 # Crash handler for this application
247 256 #-----------------------------------------------------------------------------
248 257
249 258
250 259 _message_template = """\
251 260 Oops, $self.app_name crashed. We do our best to make it stable, but...
252 261
253 262 A crash report was automatically generated with the following information:
254 263 - A verbatim copy of the crash traceback.
255 264 - Data on your current $self.app_name configuration.
256 265
257 266 It was left in the file named:
258 267 \t'$self.crash_report_fname'
259 268 If you can email this file to the developers, the information in it will help
260 269 them in understanding and correcting the problem.
261 270
262 271 You can mail it to: $self.contact_name at $self.contact_email
263 272 with the subject '$self.app_name Crash Report'.
264 273
265 274 If you want to do it now, the following command will work (under Unix):
266 275 mail -s '$self.app_name Crash Report' $self.contact_email < $self.crash_report_fname
267 276
268 277 To ensure accurate tracking of this issue, please file a report about it at:
269 278 $self.bug_tracker
270 279 """
271 280
272 281 class ClusterDirCrashHandler(CrashHandler):
273 282 """sys.excepthook for IPython itself, leaves a detailed report on disk."""
274 283
275 284 message_template = _message_template
276 285
277 286 def __init__(self, app):
278 287 contact_name = release.authors['Min'][0]
279 288 contact_email = release.authors['Min'][1]
280 289 bug_tracker = 'http://github.com/ipython/ipython/issues'
281 290 super(ClusterDirCrashHandler,self).__init__(
282 291 app, contact_name, contact_email, bug_tracker
283 292 )
284 293
285 294
286 295 #-----------------------------------------------------------------------------
287 296 # Main application
288 297 #-----------------------------------------------------------------------------
289 298 base_aliases = {
290 299 'profile' : "ClusterDir.profile",
291 300 'cluster_dir' : 'ClusterDir.location',
292 'log_level' : 'Application.log_level',
293 'work_dir' : 'ClusterDirApplicaiton.work_dir',
294 'log_to_file' : 'ClusterDirApplicaiton.log_to_file',
295 'clean_logs' : 'ClusterDirApplicaiton.clean_logs',
296 'log_url' : 'ClusterDirApplicaiton.log_url',
301 'auto_create' : 'ClusterDirApplication.auto_create',
302 'log_level' : 'ClusterApplication.log_level',
303 'work_dir' : 'ClusterApplication.work_dir',
304 'log_to_file' : 'ClusterApplication.log_to_file',
305 'clean_logs' : 'ClusterApplication.clean_logs',
306 'log_url' : 'ClusterApplication.log_url',
297 307 }
298 308
299 309 base_flags = {
300 'debug' : ( {"Application" : {"log_level" : logging.DEBUG}}, "set loglevel to DEBUG"),
301 'clean-logs' : ( {"ClusterDirApplication" : {"clean_logs" : True}}, "cleanup old logfiles"),
302 'log-to-file' : ( {"ClusterDirApplication" : {"log_to_file" : True}}, "log to a file")
310 'debug' : ( {"ClusterApplication" : {"log_level" : logging.DEBUG}}, "set loglevel to DEBUG"),
311 'quiet' : ( {"ClusterApplication" : {"log_level" : logging.CRITICAL}}, "set loglevel to CRITICAL (minimal output)"),
312 'log-to-file' : ( {"ClusterApplication" : {"log_to_file" : True}}, "redirect log output to a file"),
303 313 }
304 314 for k,v in base_flags.iteritems():
305 315 base_flags[k] = (Config(v[0]),v[1])
306 316
307 class ClusterDirApplication(BaseIPythonApplication):
317 class ClusterApplication(BaseIPythonApplication):
308 318 """An application that puts everything into a cluster directory.
309 319
310 320 Instead of looking for things in the ipython_dir, this type of application
311 321 will use its own private directory called the "cluster directory"
312 322 for things like config files, log files, etc.
313 323
314 324 The cluster directory is resolved as follows:
315 325
316 326 * If the ``--cluster-dir`` option is given, it is used.
317 327 * If ``--cluster-dir`` is not given, the application directory is
318 328 resolve using the profile name as ``cluster_<profile>``. The search
319 329 path for this directory is then i) cwd if it is found there
320 330 and ii) in ipython_dir otherwise.
321 331
322 332 The config file for the application is to be put in the cluster
323 333 dir and named the value of the ``config_file_name`` class attribute.
324 334 """
325 335
326 336 crash_handler_class = ClusterDirCrashHandler
327 337 auto_create_cluster_dir = Bool(True, config=True,
328 338 help="whether to create the cluster_dir if it doesn't exist")
329 # temporarily override default_log_level to INFO
330 default_log_level = logging.INFO
331 339 cluster_dir = Instance(ClusterDir)
340 classes = [ClusterDir]
341
342 def _log_level_default(self):
343 # temporarily override default_log_level to INFO
344 return logging.INFO
332 345
333 346 work_dir = Unicode(os.getcwdu(), config=True,
334 347 help='Set the working dir for the process.'
335 348 )
336 349 def _work_dir_changed(self, name, old, new):
337 350 self.work_dir = unicode(expand_path(new))
338 351
339 352 log_to_file = Bool(config=True,
340 353 help="whether to log to a file")
341 354
342 clean_logs = Bool(True, shortname='--clean-logs', config=True,
355 clean_logs = Bool(False, shortname='--clean-logs', config=True,
343 356 help="whether to cleanup old logfiles before starting")
344 357
345 358 log_url = CStr('', shortname='--log-url', config=True,
346 359 help="The ZMQ URL of the iplooger to aggregate logging.")
347 360
348 361 config_file = Unicode(u'', config=True,
349 362 help="""Path to ipcontroller configuration file. The default is to use
350 363 <appname>_config.py, as found by cluster-dir."""
351 364 )
365
366 loop = Instance('zmq.eventloop.ioloop.IOLoop')
367 def _loop_default(self):
368 from zmq.eventloop.ioloop import IOLoop
369 return IOLoop.instance()
352 370
353 371 aliases = Dict(base_aliases)
354 372 flags = Dict(base_flags)
355 373
356 374 def init_clusterdir(self):
357 375 """This resolves the cluster directory.
358 376
359 377 This tries to find the cluster directory and if successful, it will
360 378 have done:
361 379 * Sets ``self.cluster_dir_obj`` to the :class:`ClusterDir` object for
362 380 the application.
363 381 * Sets ``self.cluster_dir`` attribute of the application and config
364 382 objects.
365 383
366 384 The algorithm used for this is as follows:
367 385 1. Try ``Global.cluster_dir``.
368 386 2. Try using ``Global.profile``.
369 387 3. If both of these fail and ``self.auto_create_cluster_dir`` is
370 388 ``True``, then create the new cluster dir in the IPython directory.
371 389 4. If all fails, then raise :class:`ClusterDirError`.
372 390 """
373 self.cluster_dir = ClusterDir(config=self.config)
391 self.cluster_dir = ClusterDir(config=self.config, auto_create=self.auto_create_cluster_dir)
374 392 if self.cluster_dir._new_dir:
375 393 self.log.info('Creating new cluster dir: %s' % \
376 394 self.cluster_dir.location)
377 395 else:
378 396 self.log.info('Using existing cluster dir: %s' % \
379 397 self.cluster_dir.location)
380
398
399 def initialize(self, argv=None):
400 """initialize the app"""
401 self.parse_command_line(argv)
402 cl_config = self.config
403 self.init_clusterdir()
404 if self.config_file:
405 self.load_config_file(self.config_file)
406 else:
407 self.load_config_file(self.default_config_file_name, path=self.cluster_dir.location)
408 # command-line should *override* config file, but command-line is necessary
409 # to determine clusterdir, etc.
410 self.update_config(cl_config)
411 self.reinit_logging()
412
413 self.to_work_dir()
414
381 415 def to_work_dir(self):
382 416 wd = self.work_dir
383 417 if unicode(wd) != os.getcwdu():
384 418 os.chdir(wd)
385 419 self.log.info("Changing to working dir: %s" % wd)
386 420
387 421 def load_config_file(self, filename, path=None):
388 422 """Load a .py based config file by filename and path."""
423 # use config.application.Application.load_config
424 # instead of inflexible
425 # core.newapplication.BaseIPythonApplication.load_config
389 426 return Application.load_config_file(self, filename, path=path)
390 427 #
391 428 # def load_default_config_file(self):
392 429 # """Load a .py based config file by filename and path."""
393 430 # return BaseIPythonApplication.load_config_file(self)
394 431
395 432 # disable URL-logging
396 # def init_logging(self):
397 # # Remove old log files
398 # if self.master_config.Global.clean_logs:
399 # log_dir = self.master_config.Global.log_dir
400 # for f in os.listdir(log_dir):
401 # if re.match(r'%s-\d+\.(log|err|out)'%self.name,f):
402 # # if f.startswith(self.name + u'-') and f.endswith('.log'):
403 # os.remove(os.path.join(log_dir, f))
404 # # Start logging to the new log file
405 # if self.master_config.Global.log_to_file:
406 # log_filename = self.name + u'-' + str(os.getpid()) + u'.log'
407 # logfile = os.path.join(self.log_dir, log_filename)
408 # open_log_file = open(logfile, 'w')
409 # elif self.master_config.Global.log_url:
410 # open_log_file = None
411 # else:
412 # open_log_file = sys.stdout
413 # if open_log_file is not None:
414 # self.log.removeHandler(self._log_handler)
415 # self._log_handler = logging.StreamHandler(open_log_file)
416 # self._log_formatter = logging.Formatter("[%(name)s] %(message)s")
417 # self._log_handler.setFormatter(self._log_formatter)
418 # self.log.addHandler(self._log_handler)
419 # # log.startLogging(open_log_file)
433 def reinit_logging(self):
434 # Remove old log files
435 log_dir = self.cluster_dir.log_dir
436 if self.clean_logs:
437 for f in os.listdir(log_dir):
438 if re.match(r'%s-\d+\.(log|err|out)'%self.name,f):
439 os.remove(os.path.join(log_dir, f))
440 if self.log_to_file:
441 # Start logging to the new log file
442 log_filename = self.name + u'-' + str(os.getpid()) + u'.log'
443 logfile = os.path.join(log_dir, log_filename)
444 open_log_file = open(logfile, 'w')
445 else:
446 open_log_file = None
447 if open_log_file is not None:
448 self.log.removeHandler(self._log_handler)
449 self._log_handler = logging.StreamHandler(open_log_file)
450 self._log_formatter = logging.Formatter("[%(name)s] %(message)s")
451 self._log_handler.setFormatter(self._log_formatter)
452 self.log.addHandler(self._log_handler)
420 453
421 454 def write_pid_file(self, overwrite=False):
422 455 """Create a .pid file in the pid_dir with my pid.
423 456
424 457 This must be called after pre_construct, which sets `self.pid_dir`.
425 458 This raises :exc:`PIDFileError` if the pid file exists already.
426 459 """
427 460 pid_file = os.path.join(self.cluster_dir.pid_dir, self.name + u'.pid')
428 461 if os.path.isfile(pid_file):
429 462 pid = self.get_pid_from_file()
430 463 if not overwrite:
431 464 raise PIDFileError(
432 465 'The pid file [%s] already exists. \nThis could mean that this '
433 466 'server is already running with [pid=%s].' % (pid_file, pid)
434 467 )
435 468 with open(pid_file, 'w') as f:
436 469 self.log.info("Creating pid file: %s" % pid_file)
437 470 f.write(repr(os.getpid())+'\n')
438 471
439 472 def remove_pid_file(self):
440 473 """Remove the pid file.
441 474
442 475 This should be called at shutdown by registering a callback with
443 476 :func:`reactor.addSystemEventTrigger`. This needs to return
444 477 ``None``.
445 478 """
446 479 pid_file = os.path.join(self.cluster_dir.pid_dir, self.name + u'.pid')
447 480 if os.path.isfile(pid_file):
448 481 try:
449 482 self.log.info("Removing pid file: %s" % pid_file)
450 483 os.remove(pid_file)
451 484 except:
452 485 self.log.warn("Error removing the pid file: %s" % pid_file)
453 486
454 487 def get_pid_from_file(self):
455 488 """Get the pid from the pid file.
456 489
457 490 If the pid file doesn't exist a :exc:`PIDFileError` is raised.
458 491 """
459 492 pid_file = os.path.join(self.cluster_dir.pid_dir, self.name + u'.pid')
460 493 if os.path.isfile(pid_file):
461 494 with open(pid_file, 'r') as f:
462 495 pid = int(f.read().strip())
463 496 return pid
464 497 else:
465 498 raise PIDFileError('pid file not found: %s' % pid_file)
466 499
467 500 def check_pid(self, pid):
468 501 if os.name == 'nt':
469 502 try:
470 503 import ctypes
471 504 # returns 0 if no such process (of ours) exists
472 505 # positive int otherwise
473 506 p = ctypes.windll.kernel32.OpenProcess(1,0,pid)
474 507 except Exception:
475 508 self.log.warn(
476 509 "Could not determine whether pid %i is running via `OpenProcess`. "
477 510 " Making the likely assumption that it is."%pid
478 511 )
479 512 return True
480 513 return bool(p)
481 514 else:
482 515 try:
483 516 p = Popen(['ps','x'], stdout=PIPE, stderr=PIPE)
484 517 output,_ = p.communicate()
485 518 except OSError:
486 519 self.log.warn(
487 520 "Could not determine whether pid %i is running via `ps x`. "
488 521 " Making the likely assumption that it is."%pid
489 522 )
490 523 return True
491 524 pids = map(int, re.findall(r'^\W*\d+', output, re.MULTILINE))
492 525 return pid in pids
This diff has been collapsed as it changes many lines, (706 lines changed) Show them Hide them
@@ -1,550 +1,542 b''
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3 """
4 4 The ipcluster application.
5 5 """
6 6
7 7 #-----------------------------------------------------------------------------
8 8 # Copyright (C) 2008-2009 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING, distributed as part of this software.
12 12 #-----------------------------------------------------------------------------
13 13
14 14 #-----------------------------------------------------------------------------
15 15 # Imports
16 16 #-----------------------------------------------------------------------------
17 17
18 18 import errno
19 19 import logging
20 20 import os
21 21 import re
22 22 import signal
23 23
24 24 from subprocess import check_call, CalledProcessError, PIPE
25 25 import zmq
26 26 from zmq.eventloop import ioloop
27 27
28 from IPython.config.application import Application, boolean_flag
28 29 from IPython.config.loader import Config
30 from IPython.core.newapplication import BaseIPythonApplication
29 31 from IPython.utils.importstring import import_item
30 32 from IPython.utils.traitlets import Int, CStr, CUnicode, Str, Bool, CFloat, Dict, List
31 33
32 34 from IPython.parallel.apps.clusterdir import (
33 ClusterDirApplication, ClusterDirError,
35 ClusterApplication, ClusterDirError, ClusterDir,
34 36 PIDFileError,
35 base_flags,
37 base_flags, base_aliases
36 38 )
37 39
38 40
39 41 #-----------------------------------------------------------------------------
40 42 # Module level variables
41 43 #-----------------------------------------------------------------------------
42 44
43 45
44 46 default_config_file_name = u'ipcluster_config.py'
45 47
46 48
47 49 _description = """\
48 50 Start an IPython cluster for parallel computing.\n\n
49 51
50 52 An IPython cluster consists of 1 controller and 1 or more engines.
51 53 This command automates the startup of these processes using a wide
52 54 range of startup methods (SSH, local processes, PBS, mpiexec,
53 55 Windows HPC Server 2008). To start a cluster with 4 engines on your
54 56 local host simply do 'ipcluster start n=4'. For more complex usage
55 you will typically do 'ipcluster --create profile=mycluster', then edit
56 configuration files, followed by 'ipcluster --start -p mycluster -n 4'.
57 you will typically do 'ipcluster create profile=mycluster', then edit
58 configuration files, followed by 'ipcluster start profile=mycluster n=4'.
57 59 """
58 60
59 61
60 62 # Exit codes for ipcluster
61 63
62 64 # This will be the exit code if the ipcluster appears to be running because
63 65 # a .pid file exists
64 66 ALREADY_STARTED = 10
65 67
66 68
67 69 # This will be the exit code if ipcluster stop is run, but there is not .pid
68 70 # file to be found.
69 71 ALREADY_STOPPED = 11
70 72
71 73 # This will be the exit code if ipcluster engines is run, but there is not .pid
72 74 # file to be found.
73 75 NO_CLUSTER = 12
74 76
75 77
76 78 #-----------------------------------------------------------------------------
77 79 # Main application
78 80 #-----------------------------------------------------------------------------
79 start_help = """Start an ipython cluster by its profile name or cluster
80 directory. Cluster directories contain configuration, log and
81 security related files and are named using the convention
82 'cluster_<profile>' and should be creating using the 'start'
83 subcommand of 'ipcluster'. If your cluster directory is in
84 the cwd or the ipython directory, you can simply refer to it
85 using its profile name, 'ipcluster start -n 4 -p <profile>`,
86 otherwise use the '--cluster-dir' option.
87 """
88 stop_help = """Stop a running ipython cluster by its profile name or cluster
89 directory. Cluster directories are named using the convention
90 'cluster_<profile>'. If your cluster directory is in
91 the cwd or the ipython directory, you can simply refer to it
92 using its profile name, 'ipcluster stop -p <profile>`, otherwise
93 use the '--cluster-dir' option.
94 """
95 engines_help = """Start one or more engines to connect to an existing Cluster
96 by profile name or cluster directory.
97 Cluster directories contain configuration, log and
98 security related files and are named using the convention
99 'cluster_<profile>' and should be creating using the 'start'
100 subcommand of 'ipcluster'. If your cluster directory is in
101 the cwd or the ipython directory, you can simply refer to it
102 using its profile name, 'ipcluster --engines -n 4 -p <profile>`,
103 otherwise use the 'cluster_dir' option.
104 """
105 create_help = """Create an ipython cluster directory by its profile name or
106 cluster directory path. Cluster directories contain
107 configuration, log and security related files and are named
108 using the convention 'cluster_<profile>'. By default they are
109 located in your ipython directory. Once created, you will
110 probably need to edit the configuration files in the cluster
111 directory to configure your cluster. Most users will create a
112 cluster directory by profile name,
113 'ipcluster create -p mycluster', which will put the directory
114 in '<ipython_dir>/cluster_mycluster'.
115 """
81 start_help = """
82 Start an ipython cluster by its profile name or cluster
83 directory. Cluster directories contain configuration, log and
84 security related files and are named using the convention
85 'cluster_<profile>' and should be creating using the 'start'
86 subcommand of 'ipcluster'. If your cluster directory is in
87 the cwd or the ipython directory, you can simply refer to it
88 using its profile name, 'ipcluster start n=4 profile=<profile>`,
89 otherwise use the 'cluster_dir' option.
90 """
91 stop_help = """
92 Stop a running ipython cluster by its profile name or cluster
93 directory. Cluster directories are named using the convention
94 'cluster_<profile>'. If your cluster directory is in
95 the cwd or the ipython directory, you can simply refer to it
96 using its profile name, 'ipcluster stop profile=<profile>`, otherwise
97 use the 'cluster_dir' option.
98 """
99 engines_help = """
100 Start one or more engines to connect to an existing Cluster
101 by profile name or cluster directory.
102 Cluster directories contain configuration, log and
103 security related files and are named using the convention
104 'cluster_<profile>' and should be creating using the 'start'
105 subcommand of 'ipcluster'. If your cluster directory is in
106 the cwd or the ipython directory, you can simply refer to it
107 using its profile name, 'ipcluster engines n=4 profile=<profile>`,
108 otherwise use the 'cluster_dir' option.
109 """
110 create_help = """
111 Create an ipython cluster directory by its profile name or
112 cluster directory path. Cluster directories contain
113 configuration, log and security related files and are named
114 using the convention 'cluster_<profile>'. By default they are
115 located in your ipython directory. Once created, you will
116 probably need to edit the configuration files in the cluster
117 directory to configure your cluster. Most users will create a
118 cluster directory by profile name,
119 `ipcluster create profile=mycluster`, which will put the directory
120 in `<ipython_dir>/cluster_mycluster`.
121 """
116 122 list_help = """List all available clusters, by cluster directory, that can
117 be found in the current working directly or in the ipython
118 directory. Cluster directories are named using the convention
119 'cluster_<profile>'."""
120
121
122 flags = {}
123 flags.update(base_flags)
124 flags.update({
125 'start' : ({ 'IPClusterApp': Config({'subcommand' : 'start'})} , start_help),
126 'stop' : ({ 'IPClusterApp': Config({'subcommand' : 'stop'})} , stop_help),
127 'create' : ({ 'IPClusterApp': Config({'subcommand' : 'create'})} , create_help),
128 'engines' : ({ 'IPClusterApp': Config({'subcommand' : 'engines'})} , engines_help),
129 'list' : ({ 'IPClusterApp': Config({'subcommand' : 'list'})} , list_help),
130
131 })
132
133 class IPClusterApp(ClusterDirApplication):
134
135 name = u'ipcluster'
136 description = _description
137 usage = None
138 default_config_file_name = default_config_file_name
139 default_log_level = logging.INFO
140 auto_create_cluster_dir = False
141 classes = List()
142 def _classes_default(self,):
143 from IPython.parallel.apps import launcher
144 return launcher.all_launchers
145
146 n = Int(0, config=True,
147 help="The number of engines to start.")
148 signal = Int(signal.SIGINT, config=True,
149 help="signal to use for stopping. [default: SIGINT]")
150 delay = CFloat(1., config=True,
151 help="delay (in s) between starting the controller and the engines")
152
153 subcommand = Str('', config=True,
154 help="""ipcluster has a variety of subcommands. The general way of
155 running ipcluster is 'ipcluster --<cmd> [options]'."""
156 )
157
158 controller_launcher_class = Str('IPython.parallel.apps.launcher.LocalControllerLauncher',
159 config=True,
160 help="The class for launching a Controller."
161 )
162 engine_launcher_class = Str('IPython.parallel.apps.launcher.LocalEngineSetLauncher',
163 config=True,
164 help="The class for launching Engines."
165 )
166 reset = Bool(False, config=True,
167 help="Whether to reset config files as part of '--create'."
168 )
169 daemonize = Bool(False, config=True,
170 help='Daemonize the ipcluster program. This implies --log-to-file')
171
172 def _daemonize_changed(self, name, old, new):
173 if new:
174 self.log_to_file = True
175
176 def _n_changed(self, name, old, new):
177 # propagate n all over the place...
178 # TODO make this clean
179 # ensure all classes are covered.
180 self.config.LocalEngineSetLauncher.n=new
181 self.config.MPIExecEngineSetLauncher.n=new
182 self.config.SSHEngineSetLauncher.n=new
183 self.config.PBSEngineSetLauncher.n=new
184 self.config.SGEEngineSetLauncher.n=new
185 self.config.WinHPEngineSetLauncher.n=new
186
187 aliases = Dict(dict(
188 n='IPClusterApp.n',
189 signal = 'IPClusterApp.signal',
190 delay = 'IPClusterApp.delay',
191 clauncher = 'IPClusterApp.controller_launcher_class',
192 elauncher = 'IPClusterApp.engine_launcher_class',
193 ))
194 flags = Dict(flags)
123 be found in the current working directly or in the ipython
124 directory. Cluster directories are named using the convention
125 'cluster_<profile>'.
126 """
195 127
196 def init_clusterdir(self):
197 subcommand = self.subcommand
198 if subcommand=='list':
199 self.list_cluster_dirs()
200 self.exit(0)
201 if subcommand=='create':
202 reset = self.reset_config
203 self.auto_create_cluster_dir = True
204 super(IPClusterApp, self).init_clusterdir()
205 self.log.info('Copying default config files to cluster directory '
206 '[overwrite=%r]' % (reset,))
207 self.cluster_dir.copy_all_config_files(overwrite=reset)
208 elif subcommand=='start' or subcommand=='stop':
209 self.auto_create_cluster_dir = True
210 try:
211 super(IPClusterApp, self).init_clusterdir()
212 except ClusterDirError:
213 raise ClusterDirError(
214 "Could not find a cluster directory. A cluster dir must "
215 "be created before running 'ipcluster start'. Do "
216 "'ipcluster create -h' or 'ipcluster list -h' for more "
217 "information about creating and listing cluster dirs."
218 )
219 elif subcommand=='engines':
220 self.auto_create_cluster_dir = False
221 try:
222 super(IPClusterApp, self).init_clusterdir()
223 except ClusterDirError:
224 raise ClusterDirError(
225 "Could not find a cluster directory. A cluster dir must "
226 "be created before running 'ipcluster start'. Do "
227 "'ipcluster create -h' or 'ipcluster list -h' for more "
228 "information about creating and listing cluster dirs."
229 )
230 128
129 class IPClusterList(BaseIPythonApplication):
130 name = u'ipcluster-list'
131 description = list_help
132
133 # empty aliases
134 aliases=Dict()
135 flags = Dict(base_flags)
136
137 def _log_level_default(self):
138 return 20
139
231 140 def list_cluster_dirs(self):
232 141 # Find the search paths
233 142 cluster_dir_paths = os.environ.get('IPCLUSTER_DIR_PATH','')
234 143 if cluster_dir_paths:
235 144 cluster_dir_paths = cluster_dir_paths.split(':')
236 145 else:
237 146 cluster_dir_paths = []
238 try:
239 ipython_dir = self.ipython_dir
240 except AttributeError:
241 ipython_dir = self.ipython_dir
242 paths = [os.getcwd(), ipython_dir] + \
243 cluster_dir_paths
147
148 ipython_dir = self.ipython_dir
149
150 paths = [os.getcwd(), ipython_dir] + cluster_dir_paths
244 151 paths = list(set(paths))
245 152
246 153 self.log.info('Searching for cluster dirs in paths: %r' % paths)
247 154 for path in paths:
248 155 files = os.listdir(path)
249 156 for f in files:
250 157 full_path = os.path.join(path, f)
251 158 if os.path.isdir(full_path) and f.startswith('cluster_'):
252 159 profile = full_path.split('_')[-1]
253 start_cmd = 'ipcluster --start profile=%s n=4' % profile
160 start_cmd = 'ipcluster start profile=%s n=4' % profile
254 161 print start_cmd + " ==> " + full_path
162
163 def start(self):
164 self.list_cluster_dirs()
255 165
256 def init_launchers(self):
257 config = self.config
258 subcmd = self.subcommand
259 if subcmd =='start':
260 self.start_logging()
261 self.loop = ioloop.IOLoop.instance()
262 # reactor.callWhenRunning(self.start_launchers)
263 dc = ioloop.DelayedCallback(self.start_launchers, 0, self.loop)
264 dc.start()
265 if subcmd == 'engines':
266 self.start_logging()
267 self.loop = ioloop.IOLoop.instance()
268 # reactor.callWhenRunning(self.start_launchers)
269 engine_only = lambda : self.start_launchers(controller=False)
270 dc = ioloop.DelayedCallback(engine_only, 0, self.loop)
271 dc.start()
166 create_flags = {}
167 create_flags.update(base_flags)
168 create_flags.update(boolean_flag('reset', 'IPClusterCreate.reset',
169 "reset config files to defaults", "leave existing config files"))
170
171 class IPClusterCreate(ClusterApplication):
172 name = u'ipcluster'
173 description = create_help
174 auto_create_cluster_dir = Bool(True,
175 help="whether to create the cluster_dir if it doesn't exist")
176 default_config_file_name = default_config_file_name
177
178 reset = Bool(False, config=True,
179 help="Whether to reset config files as part of 'create'."
180 )
181
182 flags = Dict(create_flags)
183
184 aliases = Dict(dict(profile='ClusterDir.profile'))
185
186 classes = [ClusterDir]
187
188 def init_clusterdir(self):
189 super(IPClusterCreate, self).init_clusterdir()
190 self.log.info('Copying default config files to cluster directory '
191 '[overwrite=%r]' % (self.reset,))
192 self.cluster_dir.copy_all_config_files(overwrite=self.reset)
193
194 def initialize(self, argv=None):
195 self.parse_command_line(argv)
196 self.init_clusterdir()
197
198 stop_aliases = dict(
199 signal='IPClusterStop.signal',
200 profile='ClusterDir.profile',
201 cluster_dir='ClusterDir.location',
202 )
203
204 class IPClusterStop(ClusterApplication):
205 name = u'ipcluster'
206 description = stop_help
207 auto_create_cluster_dir = Bool(False,
208 help="whether to create the cluster_dir if it doesn't exist")
209 default_config_file_name = default_config_file_name
210
211 signal = Int(signal.SIGINT, config=True,
212 help="signal to use for stopping processes.")
213
214 aliases = Dict(stop_aliases)
272 215
273 def start_launchers(self, controller=True):
274 config = self.config
275
276 # Create the launchers. In both bases, we set the work_dir of
277 # the launcher to the cluster_dir. This is where the launcher's
278 # subprocesses will be launched. It is not where the controller
279 # and engine will be launched.
280 if controller:
281 clsname = self.controller_launcher_class
282 if '.' not in clsname:
283 clsname = 'IPython.parallel.apps.launcher.'+clsname
284 cl_class = import_item(clsname)
285 self.controller_launcher = cl_class(
286 work_dir=self.cluster_dir.location, config=config,
287 logname=self.log.name
216 def start(self):
217 """Start the app for the stop subcommand."""
218 try:
219 pid = self.get_pid_from_file()
220 except PIDFileError:
221 self.log.critical(
222 'Could not read pid file, cluster is probably not running.'
288 223 )
289 # Setup the observing of stopping. If the controller dies, shut
290 # everything down as that will be completely fatal for the engines.
291 self.controller_launcher.on_stop(self.stop_launchers)
292 # But, we don't monitor the stopping of engines. An engine dying
293 # is just fine and in principle a user could start a new engine.
294 # Also, if we did monitor engine stopping, it is difficult to
295 # know what to do when only some engines die. Currently, the
296 # observing of engine stopping is inconsistent. Some launchers
297 # might trigger on a single engine stopping, other wait until
298 # all stop. TODO: think more about how to handle this.
299 else:
300 self.controller_launcher = None
224 # Here I exit with a unusual exit status that other processes
225 # can watch for to learn how I existed.
226 self.remove_pid_file()
227 self.exit(ALREADY_STOPPED)
301 228
302 clsname = self.engine_launcher_class
303 if '.' not in clsname:
304 # not a module, presume it's the raw name in apps.launcher
305 clsname = 'IPython.parallel.apps.launcher.'+clsname
306 print repr(clsname)
307 el_class = import_item(clsname)
229 if not self.check_pid(pid):
230 self.log.critical(
231 'Cluster [pid=%r] is not running.' % pid
232 )
233 self.remove_pid_file()
234 # Here I exit with a unusual exit status that other processes
235 # can watch for to learn how I existed.
236 self.exit(ALREADY_STOPPED)
237
238 elif os.name=='posix':
239 sig = self.signal
240 self.log.info(
241 "Stopping cluster [pid=%r] with [signal=%r]" % (pid, sig)
242 )
243 try:
244 os.kill(pid, sig)
245 except OSError:
246 self.log.error("Stopping cluster failed, assuming already dead.",
247 exc_info=True)
248 self.remove_pid_file()
249 elif os.name=='nt':
250 try:
251 # kill the whole tree
252 p = check_call(['taskkill', '-pid', str(pid), '-t', '-f'], stdout=PIPE,stderr=PIPE)
253 except (CalledProcessError, OSError):
254 self.log.error("Stopping cluster failed, assuming already dead.",
255 exc_info=True)
256 self.remove_pid_file()
257
258 engine_aliases = {}
259 engine_aliases.update(base_aliases)
260 engine_aliases.update(dict(
261 n='IPClusterEngines.n',
262 elauncher = 'IPClusterEngines.engine_launcher_class',
263 ))
264 class IPClusterEngines(ClusterApplication):
265
266 name = u'ipcluster'
267 description = engines_help
268 usage = None
269 default_config_file_name = default_config_file_name
270 default_log_level = logging.INFO
271 auto_create_cluster_dir = Bool(False)
272 classes = List()
273 def _classes_default(self):
274 from IPython.parallel.apps import launcher
275 launchers = launcher.all_launchers
276 eslaunchers = [ l for l in launchers if 'EngineSet' in l.__name__]
277 return [ClusterDir]+eslaunchers
278
279 n = Int(2, config=True,
280 help="The number of engines to start.")
308 281
309 self.engine_launcher = el_class(
310 work_dir=self.cluster_dir.location, config=config, logname=self.log.name
282 engine_launcher_class = Str('LocalEngineSetLauncher',
283 config=True,
284 help="The class for launching a set of Engines."
311 285 )
286 daemonize = Bool(False, config=True,
287 help='Daemonize the ipcluster program. This implies --log-to-file')
312 288
289 def _daemonize_changed(self, name, old, new):
290 if new:
291 self.log_to_file = True
292
293 aliases = Dict(engine_aliases)
294 # flags = Dict(flags)
295 _stopping = False
296
297 def initialize(self, argv=None):
298 super(IPClusterEngines, self).initialize(argv)
299 self.init_signal()
300 self.init_launchers()
301
302 def init_launchers(self):
303 self.engine_launcher = self.build_launcher(self.engine_launcher_class)
304 self.engine_launcher.on_stop(lambda r: self.loop.stop())
305
306 def init_signal(self):
313 307 # Setup signals
314 308 signal.signal(signal.SIGINT, self.sigint_handler)
309
310 def build_launcher(self, clsname):
311 """import and instantiate a Launcher based on importstring"""
312 if '.' not in clsname:
313 # not a module, presume it's the raw name in apps.launcher
314 clsname = 'IPython.parallel.apps.launcher.'+clsname
315 # print repr(clsname)
316 klass = import_item(clsname)
315 317
316 # Start the controller and engines
317 self._stopping = False # Make sure stop_launchers is not called 2x.
318 if controller:
319 self.start_controller()
320 dc = ioloop.DelayedCallback(self.start_engines, 1000*self.delay*controller, self.loop)
321 dc.start()
322 self.startup_message()
323
324 def startup_message(self, r=None):
325 self.log.info("IPython cluster: started")
326 return r
327
328 def start_controller(self, r=None):
329 # self.log.info("In start_controller")
330 config = self.config
331 d = self.controller_launcher.start(
332 cluster_dir=self.cluster_dir.location
318 launcher = klass(
319 work_dir=self.cluster_dir.location, config=self.config, logname=self.log.name
333 320 )
334 return d
335
336 def start_engines(self, r=None):
337 # self.log.info("In start_engines")
338 config = self.config
339
340 d = self.engine_launcher.start(
321 return launcher
322
323 def start_engines(self):
324 self.log.info("Starting %i engines"%self.n)
325 self.engine_launcher.start(
341 326 self.n,
342 327 cluster_dir=self.cluster_dir.location
343 328 )
344 return d
345 329
346 def stop_controller(self, r=None):
347 # self.log.info("In stop_controller")
348 if self.controller_launcher and self.controller_launcher.running:
349 return self.controller_launcher.stop()
350
351 def stop_engines(self, r=None):
352 # self.log.info("In stop_engines")
330 def stop_engines(self):
331 self.log.info("Stopping Engines...")
353 332 if self.engine_launcher.running:
354 333 d = self.engine_launcher.stop()
355 # d.addErrback(self.log_err)
356 334 return d
357 335 else:
358 336 return None
359 337
360 def log_err(self, f):
361 self.log.error(f.getTraceback())
362 return None
363
364 338 def stop_launchers(self, r=None):
365 339 if not self._stopping:
366 340 self._stopping = True
367 # if isinstance(r, failure.Failure):
368 # self.log.error('Unexpected error in ipcluster:')
369 # self.log.info(r.getTraceback())
370 341 self.log.error("IPython cluster: stopping")
371 # These return deferreds. We are not doing anything with them
372 # but we are holding refs to them as a reminder that they
373 # do return deferreds.
374 d1 = self.stop_engines()
375 d2 = self.stop_controller()
342 self.stop_engines()
376 343 # Wait a few seconds to let things shut down.
377 344 dc = ioloop.DelayedCallback(self.loop.stop, 4000, self.loop)
378 345 dc.start()
379 # reactor.callLater(4.0, reactor.stop)
380 346
381 347 def sigint_handler(self, signum, frame):
348 self.log.debug("SIGINT received, stopping launchers...")
382 349 self.stop_launchers()
383 350
384 351 def start_logging(self):
385 352 # Remove old log files of the controller and engine
386 353 if self.clean_logs:
387 354 log_dir = self.cluster_dir.log_dir
388 355 for f in os.listdir(log_dir):
389 356 if re.match(r'ip(engine|controller)z-\d+\.(log|err|out)',f):
390 357 os.remove(os.path.join(log_dir, f))
391 358 # This will remove old log files for ipcluster itself
392 359 # super(IPClusterApp, self).start_logging()
393 360
394 361 def start(self):
395 """Start the application, depending on what subcommand is used."""
396 subcmd = self.subcommand
397 if subcmd=='create':
398 # init_clusterdir step completed create action
399 return
400 elif subcmd=='start':
401 self.start_app_start()
402 elif subcmd=='stop':
403 self.start_app_stop()
404 elif subcmd=='engines':
405 self.start_app_engines()
406 else:
407 self.log.fatal("one command of '--start', '--stop', '--list', '--create', '--engines'"
408 " must be specified")
409 self.exit(-1)
362 """Start the app for the engines subcommand."""
363 self.log.info("IPython cluster: started")
364 # First see if the cluster is already running
365
366 # Now log and daemonize
367 self.log.info(
368 'Starting engines with [daemon=%r]' % self.daemonize
369 )
370 # TODO: Get daemonize working on Windows or as a Windows Server.
371 if self.daemonize:
372 if os.name=='posix':
373 from twisted.scripts._twistd_unix import daemonize
374 daemonize()
375
376 dc = ioloop.DelayedCallback(self.start_engines, 0, self.loop)
377 dc.start()
378 # Now write the new pid file AFTER our new forked pid is active.
379 # self.write_pid_file()
380 try:
381 self.loop.start()
382 except KeyboardInterrupt:
383 pass
384 except zmq.ZMQError as e:
385 if e.errno == errno.EINTR:
386 pass
387 else:
388 raise
389
390 start_aliases = {}
391 start_aliases.update(engine_aliases)
392 start_aliases.update(dict(
393 delay='IPClusterStart.delay',
394 clean_logs='IPClusterStart.clean_logs',
395 ))
396
397 class IPClusterStart(IPClusterEngines):
398
399 name = u'ipcluster'
400 description = start_help
401 usage = None
402 default_config_file_name = default_config_file_name
403 default_log_level = logging.INFO
404 auto_create_cluster_dir = Bool(True, config=True,
405 help="whether to create the cluster_dir if it doesn't exist")
406 classes = List()
407 def _classes_default(self,):
408 from IPython.parallel.apps import launcher
409 return [ClusterDir]+launcher.all_launchers
410
411 clean_logs = Bool(True, config=True,
412 help="whether to cleanup old logs before starting")
413
414 delay = CFloat(1., config=True,
415 help="delay (in s) between starting the controller and the engines")
416
417 controller_launcher_class = Str('LocalControllerLauncher',
418 config=True,
419 help="The class for launching a Controller."
420 )
421 reset = Bool(False, config=True,
422 help="Whether to reset config files as part of '--create'."
423 )
424
425 # flags = Dict(flags)
426 aliases = Dict(start_aliases)
427
428 def init_clusterdir(self):
429 try:
430 super(IPClusterStart, self).init_clusterdir()
431 except ClusterDirError:
432 raise ClusterDirError(
433 "Could not find a cluster directory. A cluster dir must "
434 "be created before running 'ipcluster start'. Do "
435 "'ipcluster create -h' or 'ipcluster list -h' for more "
436 "information about creating and listing cluster dirs."
437 )
438
439 def init_launchers(self):
440 self.controller_launcher = self.build_launcher(self.controller_launcher_class)
441 self.engine_launcher = self.build_launcher(self.engine_launcher_class)
442 self.controller_launcher.on_stop(self.stop_launchers)
443
444 def start_controller(self):
445 self.controller_launcher.start(
446 cluster_dir=self.cluster_dir.location
447 )
448
449 def stop_controller(self):
450 # self.log.info("In stop_controller")
451 if self.controller_launcher and self.controller_launcher.running:
452 return self.controller_launcher.stop()
453
454 def stop_launchers(self, r=None):
455 if not self._stopping:
456 self.stop_controller()
457 super(IPClusterStart, self).stop_launchers()
410 458
411 def start_app_start(self):
459 def start(self):
412 460 """Start the app for the start subcommand."""
413 config = self.config
414 461 # First see if the cluster is already running
415 462 try:
416 463 pid = self.get_pid_from_file()
417 464 except PIDFileError:
418 465 pass
419 466 else:
420 467 if self.check_pid(pid):
421 468 self.log.critical(
422 469 'Cluster is already running with [pid=%s]. '
423 470 'use "ipcluster stop" to stop the cluster.' % pid
424 471 )
425 472 # Here I exit with a unusual exit status that other processes
426 473 # can watch for to learn how I existed.
427 474 self.exit(ALREADY_STARTED)
428 475 else:
429 476 self.remove_pid_file()
430 477
431 478
432 479 # Now log and daemonize
433 480 self.log.info(
434 481 'Starting ipcluster with [daemon=%r]' % self.daemonize
435 482 )
436 483 # TODO: Get daemonize working on Windows or as a Windows Server.
437 484 if self.daemonize:
438 485 if os.name=='posix':
439 486 from twisted.scripts._twistd_unix import daemonize
440 487 daemonize()
441 488
489 dc = ioloop.DelayedCallback(self.start_controller, 0, self.loop)
490 dc.start()
491 dc = ioloop.DelayedCallback(self.start_engines, 1000*self.delay, self.loop)
492 dc.start()
442 493 # Now write the new pid file AFTER our new forked pid is active.
443 494 self.write_pid_file()
444 495 try:
445 496 self.loop.start()
446 497 except KeyboardInterrupt:
447 498 pass
448 499 except zmq.ZMQError as e:
449 500 if e.errno == errno.EINTR:
450 501 pass
451 502 else:
452 503 raise
453 504 finally:
454 505 self.remove_pid_file()
455 506
456 def start_app_engines(self):
457 """Start the app for the start subcommand."""
458 config = self.config
459 # First see if the cluster is already running
460
461 # Now log and daemonize
462 self.log.info(
463 'Starting engines with [daemon=%r]' % self.daemonize
464 )
465 # TODO: Get daemonize working on Windows or as a Windows Server.
466 if self.daemonize:
467 if os.name=='posix':
468 from twisted.scripts._twistd_unix import daemonize
469 daemonize()
507 base='IPython.parallel.apps.ipclusterapp.IPCluster'
470 508
471 # Now write the new pid file AFTER our new forked pid is active.
472 # self.write_pid_file()
473 try:
474 self.loop.start()
475 except KeyboardInterrupt:
476 pass
477 except zmq.ZMQError as e:
478 if e.errno == errno.EINTR:
479 pass
480 else:
481 raise
482 # self.remove_pid_file()
509 class IPClusterApp(Application):
510 name = u'ipcluster'
511 description = _description
483 512
484 def start_app_stop(self):
485 """Start the app for the stop subcommand."""
486 config = self.config
487 try:
488 pid = self.get_pid_from_file()
489 except PIDFileError:
490 self.log.critical(
491 'Could not read pid file, cluster is probably not running.'
492 )
493 # Here I exit with a unusual exit status that other processes
494 # can watch for to learn how I existed.
495 self.remove_pid_file()
496 self.exit(ALREADY_STOPPED)
497
498 if not self.check_pid(pid):
499 self.log.critical(
500 'Cluster [pid=%r] is not running.' % pid
501 )
502 self.remove_pid_file()
503 # Here I exit with a unusual exit status that other processes
504 # can watch for to learn how I existed.
505 self.exit(ALREADY_STOPPED)
506
507 elif os.name=='posix':
508 sig = self.signal
509 self.log.info(
510 "Stopping cluster [pid=%r] with [signal=%r]" % (pid, sig)
511 )
512 try:
513 os.kill(pid, sig)
514 except OSError:
515 self.log.error("Stopping cluster failed, assuming already dead.",
516 exc_info=True)
517 self.remove_pid_file()
518 elif os.name=='nt':
519 try:
520 # kill the whole tree
521 p = check_call(['taskkill', '-pid', str(pid), '-t', '-f'], stdout=PIPE,stderr=PIPE)
522 except (CalledProcessError, OSError):
523 self.log.error("Stopping cluster failed, assuming already dead.",
524 exc_info=True)
525 self.remove_pid_file()
526
513 subcommands = {'create' : (base+'Create', create_help),
514 'list' : (base+'List', list_help),
515 'start' : (base+'Start', start_help),
516 'stop' : (base+'Stop', stop_help),
517 'engines' : (base+'Engines', engines_help),
518 }
519
520 # no aliases or flags for parent App
521 aliases = Dict()
522 flags = Dict()
523
524 def start(self):
525 if self.subapp is None:
526 print "No subcommand specified! Must specify one of: %s"%(self.subcommands.keys())
527 print
528 self.print_subcommands()
529 self.exit(1)
530 else:
531 return self.subapp.start()
527 532
528 533 def launch_new_instance():
529 534 """Create and run the IPython cluster."""
530 535 app = IPClusterApp()
531 app.parse_command_line()
532 cl_config = app.config
533 app.init_clusterdir()
534 if app.config_file:
535 app.load_config_file(app.config_file)
536 else:
537 app.load_config_file(app.default_config_file_name, path=app.cluster_dir.location)
538 # command-line should *override* config file, but command-line is necessary
539 # to determine clusterdir, etc.
540 app.update_config(cl_config)
541
542 app.to_work_dir()
543 app.init_launchers()
544
536 app.initialize()
545 537 app.start()
546 538
547 539
548 540 if __name__ == '__main__':
549 541 launch_new_instance()
550 542
@@ -1,408 +1,400 b''
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3 """
4 4 The IPython controller application.
5 5 """
6 6
7 7 #-----------------------------------------------------------------------------
8 8 # Copyright (C) 2008-2009 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING, distributed as part of this software.
12 12 #-----------------------------------------------------------------------------
13 13
14 14 #-----------------------------------------------------------------------------
15 15 # Imports
16 16 #-----------------------------------------------------------------------------
17 17
18 18 from __future__ import with_statement
19 19
20 20 import copy
21 21 import os
22 22 import logging
23 23 import socket
24 24 import stat
25 25 import sys
26 26 import uuid
27 27
28 28 from multiprocessing import Process
29 29
30 30 import zmq
31 31 from zmq.devices import ProcessMonitoredQueue
32 32 from zmq.log.handlers import PUBHandler
33 33 from zmq.utils import jsonapi as json
34 34
35 35 from IPython.config.loader import Config
36 36
37 37 from IPython.parallel import factory
38 38
39 39 from IPython.parallel.apps.clusterdir import (
40 40 ClusterDir,
41 ClusterDirApplication,
41 ClusterApplication,
42 42 base_flags
43 43 # ClusterDirConfigLoader
44 44 )
45 45 from IPython.utils.importstring import import_item
46 46 from IPython.utils.traitlets import Instance, Unicode, Bool, List, CStr, Dict
47 47
48 48 # from IPython.parallel.controller.controller import ControllerFactory
49 49 from IPython.parallel.streamsession import StreamSession
50 50 from IPython.parallel.controller.heartmonitor import HeartMonitor
51 51 from IPython.parallel.controller.hub import Hub, HubFactory
52 52 from IPython.parallel.controller.scheduler import TaskScheduler,launch_scheduler
53 53 from IPython.parallel.controller.sqlitedb import SQLiteDB
54 54
55 55 from IPython.parallel.util import signal_children,disambiguate_ip_address, split_url
56 56
57 57 # conditional import of MongoDB backend class
58 58
59 59 try:
60 60 from IPython.parallel.controller.mongodb import MongoDB
61 61 except ImportError:
62 62 maybe_mongo = []
63 63 else:
64 64 maybe_mongo = [MongoDB]
65 65
66 66
67 67 #-----------------------------------------------------------------------------
68 68 # Module level variables
69 69 #-----------------------------------------------------------------------------
70 70
71 71
72 72 #: The default config file name for this application
73 73 default_config_file_name = u'ipcontroller_config.py'
74 74
75 75
76 76 _description = """Start the IPython controller for parallel computing.
77 77
78 78 The IPython controller provides a gateway between the IPython engines and
79 79 clients. The controller needs to be started before the engines and can be
80 80 configured using command line options or using a cluster directory. Cluster
81 81 directories contain config, log and security files and are usually located in
82 82 your ipython directory and named as "cluster_<profile>". See the --profile
83 83 and --cluster-dir options for details.
84 84 """
85 85
86 86
87 87
88 88
89 89 #-----------------------------------------------------------------------------
90 90 # The main application
91 91 #-----------------------------------------------------------------------------
92 92 flags = {}
93 93 flags.update(base_flags)
94 94 flags.update({
95 95 'usethreads' : ( {'IPControllerApp' : {'usethreads' : True}},
96 96 'Use threads instead of processes for the schedulers'),
97 97 'sqlitedb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.sqlitedb.SQLiteDB'}},
98 98 'use the SQLiteDB backend'),
99 99 'mongodb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.mongodb.MongoDB'}},
100 100 'use the MongoDB backend'),
101 101 'dictdb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.dictdb.DictDB'}},
102 102 'use the in-memory DictDB backend'),
103 103 })
104 104
105 105 flags.update()
106 106
107 class IPControllerApp(ClusterDirApplication):
107 class IPControllerApp(ClusterApplication):
108 108
109 109 name = u'ipcontroller'
110 110 description = _description
111 111 # command_line_loader = IPControllerAppConfigLoader
112 112 default_config_file_name = default_config_file_name
113 113 auto_create_cluster_dir = True
114 114 classes = [ClusterDir, StreamSession, HubFactory, TaskScheduler, HeartMonitor, SQLiteDB] + maybe_mongo
115 115
116 116 reuse_files = Bool(False, config=True,
117 117 help='Whether to reuse existing json connection files [default: False]'
118 118 )
119 119 secure = Bool(True, config=True,
120 120 help='Whether to use exec_keys for extra authentication [default: True]'
121 121 )
122 122 ssh_server = Unicode(u'', config=True,
123 123 help="""ssh url for clients to use when connecting to the Controller
124 124 processes. It should be of the form: [user@]server[:port]. The
125 125 Controller\'s listening addresses must be accessible from the ssh server""",
126 126 )
127 127 location = Unicode(u'', config=True,
128 128 help="""The external IP or domain name of the Controller, used for disambiguating
129 129 engine and client connections.""",
130 130 )
131 131 import_statements = List([], config=True,
132 132 help="import statements to be run at startup. Necessary in some environments"
133 133 )
134 134
135 135 usethreads = Bool(False, config=True,
136 136 help='Use threads instead of processes for the schedulers',
137 137 )
138 138
139 139 # internal
140 140 children = List()
141 141 mq_class = CStr('zmq.devices.ProcessMonitoredQueue')
142 142
143 143 def _usethreads_changed(self, name, old, new):
144 144 self.mq_class = 'zmq.devices.%sMonitoredQueue'%('Thread' if new else 'Process')
145 145
146 146 aliases = Dict(dict(
147 147 config = 'IPControllerApp.config_file',
148 148 # file = 'IPControllerApp.url_file',
149 149 log_level = 'IPControllerApp.log_level',
150 150 reuse_files = 'IPControllerApp.reuse_files',
151 151 secure = 'IPControllerApp.secure',
152 152 ssh = 'IPControllerApp.ssh_server',
153 153 usethreads = 'IPControllerApp.usethreads',
154 154 import_statements = 'IPControllerApp.import_statements',
155 155 location = 'IPControllerApp.location',
156 156
157 157 ident = 'StreamSession.session',
158 158 user = 'StreamSession.username',
159 159 exec_key = 'StreamSession.keyfile',
160 160
161 161 url = 'HubFactory.url',
162 162 ip = 'HubFactory.ip',
163 163 transport = 'HubFactory.transport',
164 164 port = 'HubFactory.regport',
165 165
166 166 ping = 'HeartMonitor.period',
167 167
168 168 scheme = 'TaskScheduler.scheme_name',
169 169 hwm = 'TaskScheduler.hwm',
170 170
171 171
172 172 profile = "ClusterDir.profile",
173 173 cluster_dir = 'ClusterDir.location',
174 174
175 175 ))
176 176 flags = Dict(flags)
177 177
178 178
179 179 def save_connection_dict(self, fname, cdict):
180 180 """save a connection dict to json file."""
181 181 c = self.config
182 182 url = cdict['url']
183 183 location = cdict['location']
184 184 if not location:
185 185 try:
186 186 proto,ip,port = split_url(url)
187 187 except AssertionError:
188 188 pass
189 189 else:
190 190 location = socket.gethostbyname_ex(socket.gethostname())[2][-1]
191 191 cdict['location'] = location
192 192 fname = os.path.join(self.cluster_dir.security_dir, fname)
193 193 with open(fname, 'w') as f:
194 194 f.write(json.dumps(cdict, indent=2))
195 195 os.chmod(fname, stat.S_IRUSR|stat.S_IWUSR)
196 196
197 197 def load_config_from_json(self):
198 198 """load config from existing json connector files."""
199 199 c = self.config
200 200 # load from engine config
201 201 with open(os.path.join(self.cluster_dir.security_dir, 'ipcontroller-engine.json')) as f:
202 202 cfg = json.loads(f.read())
203 203 key = c.StreamSession.key = cfg['exec_key']
204 204 xport,addr = cfg['url'].split('://')
205 205 c.HubFactory.engine_transport = xport
206 206 ip,ports = addr.split(':')
207 207 c.HubFactory.engine_ip = ip
208 208 c.HubFactory.regport = int(ports)
209 209 self.location = cfg['location']
210 210
211 211 # load client config
212 212 with open(os.path.join(self.cluster_dir.security_dir, 'ipcontroller-client.json')) as f:
213 213 cfg = json.loads(f.read())
214 214 assert key == cfg['exec_key'], "exec_key mismatch between engine and client keys"
215 215 xport,addr = cfg['url'].split('://')
216 216 c.HubFactory.client_transport = xport
217 217 ip,ports = addr.split(':')
218 218 c.HubFactory.client_ip = ip
219 219 self.ssh_server = cfg['ssh']
220 220 assert int(ports) == c.HubFactory.regport, "regport mismatch"
221 221
222 222 def init_hub(self):
223 223 # This is the working dir by now.
224 224 sys.path.insert(0, '')
225 225 c = self.config
226 226
227 227 self.do_import_statements()
228 228 reusing = self.reuse_files
229 229 if reusing:
230 230 try:
231 231 self.load_config_from_json()
232 232 except (AssertionError,IOError):
233 233 reusing=False
234 234 # check again, because reusing may have failed:
235 235 if reusing:
236 236 pass
237 237 elif self.secure:
238 238 key = str(uuid.uuid4())
239 239 # keyfile = os.path.join(self.cluster_dir.security_dir, self.exec_key)
240 240 # with open(keyfile, 'w') as f:
241 241 # f.write(key)
242 242 # os.chmod(keyfile, stat.S_IRUSR|stat.S_IWUSR)
243 243 c.StreamSession.key = key
244 244 else:
245 245 key = c.StreamSession.key = ''
246 246
247 247 try:
248 248 self.factory = HubFactory(config=c, log=self.log)
249 249 # self.start_logging()
250 250 self.factory.init_hub()
251 251 except:
252 252 self.log.error("Couldn't construct the Controller", exc_info=True)
253 253 self.exit(1)
254 254
255 255 if not reusing:
256 256 # save to new json config files
257 257 f = self.factory
258 258 cdict = {'exec_key' : key,
259 259 'ssh' : self.ssh_server,
260 260 'url' : "%s://%s:%s"%(f.client_transport, f.client_ip, f.regport),
261 261 'location' : self.location
262 262 }
263 263 self.save_connection_dict('ipcontroller-client.json', cdict)
264 264 edict = cdict
265 265 edict['url']="%s://%s:%s"%((f.client_transport, f.client_ip, f.regport))
266 266 self.save_connection_dict('ipcontroller-engine.json', edict)
267 267
268 268 #
269 269 def init_schedulers(self):
270 270 children = self.children
271 271 mq = import_item(self.mq_class)
272 272
273 273 hub = self.factory
274 274 # maybe_inproc = 'inproc://monitor' if self.usethreads else self.monitor_url
275 275 # IOPub relay (in a Process)
276 276 q = mq(zmq.PUB, zmq.SUB, zmq.PUB, 'N/A','iopub')
277 277 q.bind_in(hub.client_info['iopub'])
278 278 q.bind_out(hub.engine_info['iopub'])
279 279 q.setsockopt_out(zmq.SUBSCRIBE, '')
280 280 q.connect_mon(hub.monitor_url)
281 281 q.daemon=True
282 282 children.append(q)
283 283
284 284 # Multiplexer Queue (in a Process)
285 285 q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'in', 'out')
286 286 q.bind_in(hub.client_info['mux'])
287 287 q.setsockopt_in(zmq.IDENTITY, 'mux')
288 288 q.bind_out(hub.engine_info['mux'])
289 289 q.connect_mon(hub.monitor_url)
290 290 q.daemon=True
291 291 children.append(q)
292 292
293 293 # Control Queue (in a Process)
294 294 q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'incontrol', 'outcontrol')
295 295 q.bind_in(hub.client_info['control'])
296 296 q.setsockopt_in(zmq.IDENTITY, 'control')
297 297 q.bind_out(hub.engine_info['control'])
298 298 q.connect_mon(hub.monitor_url)
299 299 q.daemon=True
300 300 children.append(q)
301 301 try:
302 302 scheme = self.config.TaskScheduler.scheme_name
303 303 except AttributeError:
304 304 scheme = TaskScheduler.scheme_name.get_default_value()
305 305 # Task Queue (in a Process)
306 306 if scheme == 'pure':
307 307 self.log.warn("task::using pure XREQ Task scheduler")
308 308 q = mq(zmq.XREP, zmq.XREQ, zmq.PUB, 'intask', 'outtask')
309 309 # q.setsockopt_out(zmq.HWM, hub.hwm)
310 310 q.bind_in(hub.client_info['task'][1])
311 311 q.setsockopt_in(zmq.IDENTITY, 'task')
312 312 q.bind_out(hub.engine_info['task'])
313 313 q.connect_mon(hub.monitor_url)
314 314 q.daemon=True
315 315 children.append(q)
316 316 elif scheme == 'none':
317 317 self.log.warn("task::using no Task scheduler")
318 318
319 319 else:
320 320 self.log.info("task::using Python %s Task scheduler"%scheme)
321 321 sargs = (hub.client_info['task'][1], hub.engine_info['task'],
322 322 hub.monitor_url, hub.client_info['notification'])
323 323 kwargs = dict(logname=self.log.name, loglevel=self.log_level,
324 324 config=dict(self.config))
325 325 q = Process(target=launch_scheduler, args=sargs, kwargs=kwargs)
326 326 q.daemon=True
327 327 children.append(q)
328 328
329 329
330 330 def save_urls(self):
331 331 """save the registration urls to files."""
332 332 c = self.config
333 333
334 334 sec_dir = self.cluster_dir.security_dir
335 335 cf = self.factory
336 336
337 337 with open(os.path.join(sec_dir, 'ipcontroller-engine.url'), 'w') as f:
338 338 f.write("%s://%s:%s"%(cf.engine_transport, cf.engine_ip, cf.regport))
339 339
340 340 with open(os.path.join(sec_dir, 'ipcontroller-client.url'), 'w') as f:
341 341 f.write("%s://%s:%s"%(cf.client_transport, cf.client_ip, cf.regport))
342 342
343 343
344 344 def do_import_statements(self):
345 345 statements = self.import_statements
346 346 for s in statements:
347 347 try:
348 348 self.log.msg("Executing statement: '%s'" % s)
349 349 exec s in globals(), locals()
350 350 except:
351 351 self.log.msg("Error running statement: %s" % s)
352 352
353 353 # def start_logging(self):
354 354 # super(IPControllerApp, self).start_logging()
355 355 # if self.config.Global.log_url:
356 356 # context = self.factory.context
357 357 # lsock = context.socket(zmq.PUB)
358 358 # lsock.connect(self.config.Global.log_url)
359 359 # handler = PUBHandler(lsock)
360 360 # handler.root_topic = 'controller'
361 361 # handler.setLevel(self.log_level)
362 362 # self.log.addHandler(handler)
363 363 # #
364
365 def initialize(self, argv=None):
366 super(IPControllerApp, self).initialize(argv)
367 self.init_hub()
368 self.init_schedulers()
369
364 370 def start(self):
365 371 # Start the subprocesses:
366 372 self.factory.start()
367 373 child_procs = []
368 374 for child in self.children:
369 375 child.start()
370 376 if isinstance(child, ProcessMonitoredQueue):
371 377 child_procs.append(child.launcher)
372 378 elif isinstance(child, Process):
373 379 child_procs.append(child)
374 380 if child_procs:
375 381 signal_children(child_procs)
376 382
377 383 self.write_pid_file(overwrite=True)
378 384
379 385 try:
380 386 self.factory.loop.start()
381 387 except KeyboardInterrupt:
382 388 self.log.critical("Interrupted, Exiting...\n")
389
383 390
384 391
385 392 def launch_new_instance():
386 393 """Create and run the IPython controller"""
387 394 app = IPControllerApp()
388 app.parse_command_line()
389 cl_config = app.config
390 # app.load_config_file()
391 app.init_clusterdir()
392 if app.config_file:
393 app.load_config_file(app.config_file)
394 else:
395 app.load_config_file(app.default_config_file_name, path=app.cluster_dir.location)
396 # command-line should *override* config file, but command-line is necessary
397 # to determine clusterdir, etc.
398 app.update_config(cl_config)
399
400 app.to_work_dir()
401 app.init_hub()
402 app.init_schedulers()
403
395 app.initialize()
404 396 app.start()
405 397
406 398
407 399 if __name__ == '__main__':
408 400 launch_new_instance()
@@ -1,301 +1,290 b''
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3 """
4 4 The IPython engine application
5 5 """
6 6
7 7 #-----------------------------------------------------------------------------
8 8 # Copyright (C) 2008-2009 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING, distributed as part of this software.
12 12 #-----------------------------------------------------------------------------
13 13
14 14 #-----------------------------------------------------------------------------
15 15 # Imports
16 16 #-----------------------------------------------------------------------------
17 17
18 18 import json
19 19 import os
20 20 import sys
21 21
22 22 import zmq
23 23 from zmq.eventloop import ioloop
24 24
25 25 from IPython.parallel.apps.clusterdir import (
26 ClusterDirApplication,
26 ClusterApplication,
27 27 ClusterDir,
28 28 base_aliases,
29 29 # ClusterDirConfigLoader
30 30 )
31 31 from IPython.zmq.log import EnginePUBHandler
32 32
33 33 from IPython.config.configurable import Configurable
34 34 from IPython.parallel.streamsession import StreamSession
35 35 from IPython.parallel.engine.engine import EngineFactory
36 36 from IPython.parallel.engine.streamkernel import Kernel
37 37 from IPython.parallel.util import disambiguate_url
38 38
39 39 from IPython.utils.importstring import import_item
40 40 from IPython.utils.traitlets import Str, Bool, Unicode, Dict, List, CStr
41 41
42 42
43 43 #-----------------------------------------------------------------------------
44 44 # Module level variables
45 45 #-----------------------------------------------------------------------------
46 46
47 47 #: The default config file name for this application
48 48 default_config_file_name = u'ipengine_config.py'
49 49
50 50 _description = """Start an IPython engine for parallel computing.\n\n
51 51
52 52 IPython engines run in parallel and perform computations on behalf of a client
53 53 and controller. A controller needs to be started before the engines. The
54 54 engine can be configured using command line options or using a cluster
55 55 directory. Cluster directories contain config, log and security files and are
56 56 usually located in your ipython directory and named as "cluster_<profile>".
57 57 See the `profile` and `cluster_dir` options for details.
58 58 """
59 59
60 60
61 61 #-----------------------------------------------------------------------------
62 62 # MPI configuration
63 63 #-----------------------------------------------------------------------------
64 64
65 65 mpi4py_init = """from mpi4py import MPI as mpi
66 66 mpi.size = mpi.COMM_WORLD.Get_size()
67 67 mpi.rank = mpi.COMM_WORLD.Get_rank()
68 68 """
69 69
70 70
71 71 pytrilinos_init = """from PyTrilinos import Epetra
72 72 class SimpleStruct:
73 73 pass
74 74 mpi = SimpleStruct()
75 75 mpi.rank = 0
76 76 mpi.size = 0
77 77 """
78 78
79 79 class MPI(Configurable):
80 80 """Configurable for MPI initialization"""
81 81 use = Str('', config=True,
82 82 help='How to enable MPI (mpi4py, pytrilinos, or empty string to disable).'
83 83 )
84 84
85 85 def _on_use_changed(self, old, new):
86 86 # load default init script if it's not set
87 87 if not self.init_script:
88 88 self.init_script = self.default_inits.get(new, '')
89 89
90 90 init_script = Str('', config=True,
91 91 help="Initialization code for MPI")
92 92
93 93 default_inits = Dict({'mpi4py' : mpi4py_init, 'pytrilinos':pytrilinos_init},
94 94 config=True)
95 95
96 96
97 97 #-----------------------------------------------------------------------------
98 98 # Main application
99 99 #-----------------------------------------------------------------------------
100 100
101 101
102 class IPEngineApp(ClusterDirApplication):
102 class IPEngineApp(ClusterApplication):
103 103
104 104 app_name = Unicode(u'ipengine')
105 105 description = Unicode(_description)
106 106 default_config_file_name = default_config_file_name
107 107 classes = List([ClusterDir, StreamSession, EngineFactory, Kernel, MPI])
108 108
109 auto_create_cluster_dir = Bool(False, config=True,
110 help="whether to create the cluster_dir if it doesn't exist")
111
109 112 startup_script = Unicode(u'', config=True,
110 113 help='specify a script to be run at startup')
111 114 startup_command = Str('', config=True,
112 115 help='specify a command to be run at startup')
113 116
114 117 url_file = Unicode(u'', config=True,
115 118 help="""The full location of the file containing the connection information for
116 119 the controller. If this is not given, the file must be in the
117 120 security directory of the cluster directory. This location is
118 121 resolved using the `profile` or `cluster_dir` options.""",
119 122 )
120 123
121 124 url_file_name = Unicode(u'ipcontroller-engine.json')
122 125
123 126 aliases = Dict(dict(
124 127 config = 'IPEngineApp.config_file',
125 128 file = 'IPEngineApp.url_file',
126 129 c = 'IPEngineApp.startup_command',
127 130 s = 'IPEngineApp.startup_script',
128 131
129 132 ident = 'StreamSession.session',
130 133 user = 'StreamSession.username',
131 134 exec_key = 'StreamSession.keyfile',
132 135
133 136 url = 'EngineFactory.url',
134 137 ip = 'EngineFactory.ip',
135 138 transport = 'EngineFactory.transport',
136 139 port = 'EngineFactory.regport',
137 140 location = 'EngineFactory.location',
138 141
139 142 timeout = 'EngineFactory.timeout',
140 143
141 144 profile = "ClusterDir.profile",
142 145 cluster_dir = 'ClusterDir.location',
143 146
144 147 mpi = 'MPI.use',
145 148
146 149 log_level = 'IPEngineApp.log_level',
147 150 ))
148 151
149 152 # def find_key_file(self):
150 153 # """Set the key file.
151 154 #
152 155 # Here we don't try to actually see if it exists for is valid as that
153 156 # is hadled by the connection logic.
154 157 # """
155 158 # config = self.master_config
156 159 # # Find the actual controller key file
157 160 # if not config.Global.key_file:
158 161 # try_this = os.path.join(
159 162 # config.Global.cluster_dir,
160 163 # config.Global.security_dir,
161 164 # config.Global.key_file_name
162 165 # )
163 166 # config.Global.key_file = try_this
164 167
165 168 def find_url_file(self):
166 169 """Set the key file.
167 170
168 171 Here we don't try to actually see if it exists for is valid as that
169 172 is hadled by the connection logic.
170 173 """
171 174 config = self.config
172 175 # Find the actual controller key file
173 176 if not self.url_file:
174 177 self.url_file = os.path.join(
175 178 self.cluster_dir.security_dir,
176 179 self.url_file_name
177 180 )
178 181
179 182 def init_engine(self):
180 183 # This is the working dir by now.
181 184 sys.path.insert(0, '')
182 185 config = self.config
183 186 # print config
184 187 self.find_url_file()
185 188
186 189 # if os.path.exists(config.Global.key_file) and config.Global.secure:
187 190 # config.SessionFactory.exec_key = config.Global.key_file
188 191 if os.path.exists(self.url_file):
189 192 with open(self.url_file) as f:
190 193 d = json.loads(f.read())
191 194 for k,v in d.iteritems():
192 195 if isinstance(v, unicode):
193 196 d[k] = v.encode()
194 197 if d['exec_key']:
195 198 config.StreamSession.key = d['exec_key']
196 199 d['url'] = disambiguate_url(d['url'], d['location'])
197 200 config.EngineFactory.url = d['url']
198 201 config.EngineFactory.location = d['location']
199 202
200 203 try:
201 204 exec_lines = config.Kernel.exec_lines
202 205 except AttributeError:
203 206 config.Kernel.exec_lines = []
204 207 exec_lines = config.Kernel.exec_lines
205 208
206 209 if self.startup_script:
207 210 enc = sys.getfilesystemencoding() or 'utf8'
208 211 cmd="execfile(%r)"%self.startup_script.encode(enc)
209 212 exec_lines.append(cmd)
210 213 if self.startup_command:
211 214 exec_lines.append(self.startup_command)
212 215
213 216 # Create the underlying shell class and Engine
214 217 # shell_class = import_item(self.master_config.Global.shell_class)
215 218 # print self.config
216 219 try:
217 220 self.engine = EngineFactory(config=config, log=self.log)
218 221 except:
219 222 self.log.error("Couldn't start the Engine", exc_info=True)
220 223 self.exit(1)
221 224
222 225 # self.start_logging()
223 226
224 227 # Create the service hierarchy
225 228 # self.main_service = service.MultiService()
226 229 # self.engine_service.setServiceParent(self.main_service)
227 230 # self.tub_service = Tub()
228 231 # self.tub_service.setServiceParent(self.main_service)
229 232 # # This needs to be called before the connection is initiated
230 233 # self.main_service.startService()
231 234
232 235 # This initiates the connection to the controller and calls
233 236 # register_engine to tell the controller we are ready to do work
234 237 # self.engine_connector = EngineConnector(self.tub_service)
235 238
236 239 # self.log.info("Using furl file: %s" % self.master_config.Global.furl_file)
237 240
238 241 # reactor.callWhenRunning(self.call_connect)
239 242
240 243 # def start_logging(self):
241 244 # super(IPEngineApp, self).start_logging()
242 245 # if self.master_config.Global.log_url:
243 246 # context = self.engine.context
244 247 # lsock = context.socket(zmq.PUB)
245 248 # lsock.connect(self.master_config.Global.log_url)
246 249 # handler = EnginePUBHandler(self.engine, lsock)
247 250 # handler.setLevel(self.log_level)
248 251 # self.log.addHandler(handler)
249 252 #
250 253 def init_mpi(self):
251 254 global mpi
252 255 self.mpi = MPI(config=self.config)
253 256
254 257 mpi_import_statement = self.mpi.init_script
255 258 if mpi_import_statement:
256 259 try:
257 260 self.log.info("Initializing MPI:")
258 261 self.log.info(mpi_import_statement)
259 262 exec mpi_import_statement in globals()
260 263 except:
261 264 mpi = None
262 265 else:
263 266 mpi = None
264 267
265
268 def initialize(self, argv=None):
269 super(IPEngineApp, self).initialize(argv)
270 self.init_mpi()
271 self.init_engine()
272
266 273 def start(self):
267 274 self.engine.start()
268 275 try:
269 276 self.engine.loop.start()
270 277 except KeyboardInterrupt:
271 278 self.log.critical("Engine Interrupted, shutting down...\n")
272 279
273 280
274 281 def launch_new_instance():
275 282 """Create and run the IPython engine"""
276 283 app = IPEngineApp()
277 app.parse_command_line()
278 cl_config = app.config
279 app.init_clusterdir()
280 # app.load_config_file()
281 # print app.config
282 if app.config_file:
283 app.load_config_file(app.config_file)
284 else:
285 app.load_config_file(app.default_config_file_name, path=app.cluster_dir.location)
286
287 # command-line should *override* config file, but command-line is necessary
288 # to determine clusterdir, etc.
289 app.update_config(cl_config)
290
291 # print app.config
292 app.to_work_dir()
293 app.init_mpi()
294 app.init_engine()
295 print app.config
284 app.initialize()
296 285 app.start()
297 286
298 287
299 288 if __name__ == '__main__':
300 289 launch_new_instance()
301 290
@@ -1,132 +1,132 b''
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3 """
4 4 A simple IPython logger application
5 5 """
6 6
7 7 #-----------------------------------------------------------------------------
8 8 # Copyright (C) 2011 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING, distributed as part of this software.
12 12 #-----------------------------------------------------------------------------
13 13
14 14 #-----------------------------------------------------------------------------
15 15 # Imports
16 16 #-----------------------------------------------------------------------------
17 17
18 18 import os
19 19 import sys
20 20
21 21 import zmq
22 22
23 23 from IPython.parallel.apps.clusterdir import (
24 ClusterDirApplication,
24 ClusterApplication,
25 25 ClusterDirConfigLoader
26 26 )
27 27 from IPython.parallel.apps.logwatcher import LogWatcher
28 28
29 29 #-----------------------------------------------------------------------------
30 30 # Module level variables
31 31 #-----------------------------------------------------------------------------
32 32
33 33 #: The default config file name for this application
34 34 default_config_file_name = u'iplogger_config.py'
35 35
36 36 _description = """Start an IPython logger for parallel computing.\n\n
37 37
38 38 IPython controllers and engines (and your own processes) can broadcast log messages
39 39 by registering a `zmq.log.handlers.PUBHandler` with the `logging` module. The
40 40 logger can be configured using command line options or using a cluster
41 41 directory. Cluster directories contain config, log and security files and are
42 42 usually located in your ipython directory and named as "cluster_<profile>".
43 43 See the --profile and --cluster-dir options for details.
44 44 """
45 45
46 46 #-----------------------------------------------------------------------------
47 47 # Command line options
48 48 #-----------------------------------------------------------------------------
49 49
50 50
51 51 class IPLoggerAppConfigLoader(ClusterDirConfigLoader):
52 52
53 53 def _add_arguments(self):
54 54 super(IPLoggerAppConfigLoader, self)._add_arguments()
55 55 paa = self.parser.add_argument
56 56 # Controller config
57 57 paa('--url',
58 58 type=str, dest='LogWatcher.url',
59 59 help='The url the LogWatcher will listen on',
60 60 )
61 61 # MPI
62 62 paa('--topics',
63 63 type=str, dest='LogWatcher.topics', nargs='+',
64 64 help='What topics to subscribe to',
65 65 metavar='topics')
66 66 # Global config
67 67 paa('--log-to-file',
68 68 action='store_true', dest='Global.log_to_file',
69 69 help='Log to a file in the log directory (default is stdout)')
70 70
71 71
72 72 #-----------------------------------------------------------------------------
73 73 # Main application
74 74 #-----------------------------------------------------------------------------
75 75
76 76
77 class IPLoggerApp(ClusterDirApplication):
77 class IPLoggerApp(ClusterApplication):
78 78
79 79 name = u'iploggerz'
80 80 description = _description
81 81 command_line_loader = IPLoggerAppConfigLoader
82 82 default_config_file_name = default_config_file_name
83 83 auto_create_cluster_dir = True
84 84
85 85 def create_default_config(self):
86 86 super(IPLoggerApp, self).create_default_config()
87 87
88 88 # The engine should not clean logs as we don't want to remove the
89 89 # active log files of other running engines.
90 90 self.default_config.Global.clean_logs = False
91 91
92 92 # If given, this is the actual location of the logger's URL file.
93 93 # If not, this is computed using the profile, app_dir and furl_file_name
94 94 self.default_config.Global.url_file_name = u'iplogger.url'
95 95 self.default_config.Global.url_file = u''
96 96
97 97 def post_load_command_line_config(self):
98 98 pass
99 99
100 100 def pre_construct(self):
101 101 super(IPLoggerApp, self).pre_construct()
102 102
103 103 def construct(self):
104 104 # This is the working dir by now.
105 105 sys.path.insert(0, '')
106 106
107 107 self.start_logging()
108 108
109 109 try:
110 110 self.watcher = LogWatcher(config=self.master_config, logname=self.log.name)
111 111 except:
112 112 self.log.error("Couldn't start the LogWatcher", exc_info=True)
113 113 self.exit(1)
114 114
115 115
116 116 def start_app(self):
117 117 try:
118 118 self.watcher.start()
119 119 self.watcher.loop.start()
120 120 except KeyboardInterrupt:
121 121 self.log.critical("Logging Interrupted, shutting down...\n")
122 122
123 123
124 124 def launch_new_instance():
125 125 """Create and run the IPython LogWatcher"""
126 126 app = IPLoggerApp()
127 127 app.start()
128 128
129 129
130 130 if __name__ == '__main__':
131 131 launch_new_instance()
132 132
General Comments 0
You need to be logged in to leave comments. Login now