##// END OF EJS Templates
rework logging connections
MinRK -
Show More
@@ -1,537 +1,536 b''
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3 """
4 4 The IPython cluster directory
5 5 """
6 6
7 7 #-----------------------------------------------------------------------------
8 8 # Copyright (C) 2008-2009 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING, distributed as part of this software.
12 12 #-----------------------------------------------------------------------------
13 13
14 14 #-----------------------------------------------------------------------------
15 15 # Imports
16 16 #-----------------------------------------------------------------------------
17 17
18 18 from __future__ import with_statement
19 19
20 20 import os
21 21 import shutil
22 22 import sys
23 23 import logging
24 24 import warnings
25 25
26 26 from IPython.config.loader import PyFileConfigLoader
27 27 from IPython.core.application import Application, BaseAppConfigLoader
28 28 from IPython.config.configurable import Configurable
29 29 from IPython.core.crashhandler import CrashHandler
30 30 from IPython.core import release
31 31 from IPython.utils.path import (
32 32 get_ipython_package_dir,
33 33 expand_path
34 34 )
35 35 from IPython.utils.traitlets import Unicode
36 36
37 37 #-----------------------------------------------------------------------------
38 38 # Module errors
39 39 #-----------------------------------------------------------------------------
40 40
41 41 class ClusterDirError(Exception):
42 42 pass
43 43
44 44
45 45 class PIDFileError(Exception):
46 46 pass
47 47
48 48
49 49 #-----------------------------------------------------------------------------
50 50 # Class for managing cluster directories
51 51 #-----------------------------------------------------------------------------
52 52
53 53 class ClusterDir(Configurable):
54 54 """An object to manage the cluster directory and its resources.
55 55
56 56 The cluster directory is used by :command:`ipengine`,
57 57 :command:`ipcontroller` and :command:`ipclsuter` to manage the
58 58 configuration, logging and security of these applications.
59 59
60 60 This object knows how to find, create and manage these directories. This
61 61 should be used by any code that want's to handle cluster directories.
62 62 """
63 63
64 64 security_dir_name = Unicode('security')
65 65 log_dir_name = Unicode('log')
66 66 pid_dir_name = Unicode('pid')
67 67 security_dir = Unicode(u'')
68 68 log_dir = Unicode(u'')
69 69 pid_dir = Unicode(u'')
70 70 location = Unicode(u'')
71 71
72 72 def __init__(self, location=u''):
73 73 super(ClusterDir, self).__init__(location=location)
74 74
75 75 def _location_changed(self, name, old, new):
76 76 if not os.path.isdir(new):
77 77 os.makedirs(new)
78 78 self.security_dir = os.path.join(new, self.security_dir_name)
79 79 self.log_dir = os.path.join(new, self.log_dir_name)
80 80 self.pid_dir = os.path.join(new, self.pid_dir_name)
81 81 self.check_dirs()
82 82
83 83 def _log_dir_changed(self, name, old, new):
84 84 self.check_log_dir()
85 85
86 86 def check_log_dir(self):
87 87 if not os.path.isdir(self.log_dir):
88 88 os.mkdir(self.log_dir)
89 89
90 90 def _security_dir_changed(self, name, old, new):
91 91 self.check_security_dir()
92 92
93 93 def check_security_dir(self):
94 94 if not os.path.isdir(self.security_dir):
95 95 os.mkdir(self.security_dir, 0700)
96 96 os.chmod(self.security_dir, 0700)
97 97
98 98 def _pid_dir_changed(self, name, old, new):
99 99 self.check_pid_dir()
100 100
101 101 def check_pid_dir(self):
102 102 if not os.path.isdir(self.pid_dir):
103 103 os.mkdir(self.pid_dir, 0700)
104 104 os.chmod(self.pid_dir, 0700)
105 105
106 106 def check_dirs(self):
107 107 self.check_security_dir()
108 108 self.check_log_dir()
109 109 self.check_pid_dir()
110 110
111 111 def load_config_file(self, filename):
112 112 """Load a config file from the top level of the cluster dir.
113 113
114 114 Parameters
115 115 ----------
116 116 filename : unicode or str
117 117 The filename only of the config file that must be located in
118 118 the top-level of the cluster directory.
119 119 """
120 120 loader = PyFileConfigLoader(filename, self.location)
121 121 return loader.load_config()
122 122
123 123 def copy_config_file(self, config_file, path=None, overwrite=False):
124 124 """Copy a default config file into the active cluster directory.
125 125
126 126 Default configuration files are kept in :mod:`IPython.config.default`.
127 127 This function moves these from that location to the working cluster
128 128 directory.
129 129 """
130 130 if path is None:
131 131 import IPython.config.default
132 132 path = IPython.config.default.__file__.split(os.path.sep)[:-1]
133 133 path = os.path.sep.join(path)
134 134 src = os.path.join(path, config_file)
135 135 dst = os.path.join(self.location, config_file)
136 136 if not os.path.isfile(dst) or overwrite:
137 137 shutil.copy(src, dst)
138 138
139 139 def copy_all_config_files(self, path=None, overwrite=False):
140 140 """Copy all config files into the active cluster directory."""
141 141 for f in [u'ipcontroller_config.py', u'ipengine_config.py',
142 142 u'ipcluster_config.py']:
143 143 self.copy_config_file(f, path=path, overwrite=overwrite)
144 144
145 145 @classmethod
146 146 def create_cluster_dir(csl, cluster_dir):
147 147 """Create a new cluster directory given a full path.
148 148
149 149 Parameters
150 150 ----------
151 151 cluster_dir : str
152 152 The full path to the cluster directory. If it does exist, it will
153 153 be used. If not, it will be created.
154 154 """
155 155 return ClusterDir(location=cluster_dir)
156 156
157 157 @classmethod
158 158 def create_cluster_dir_by_profile(cls, path, profile=u'default'):
159 159 """Create a cluster dir by profile name and path.
160 160
161 161 Parameters
162 162 ----------
163 163 path : str
164 164 The path (directory) to put the cluster directory in.
165 165 profile : str
166 166 The name of the profile. The name of the cluster directory will
167 167 be "clusterz_<profile>".
168 168 """
169 169 if not os.path.isdir(path):
170 170 raise ClusterDirError('Directory not found: %s' % path)
171 171 cluster_dir = os.path.join(path, u'clusterz_' + profile)
172 172 return ClusterDir(location=cluster_dir)
173 173
174 174 @classmethod
175 175 def find_cluster_dir_by_profile(cls, ipython_dir, profile=u'default'):
176 176 """Find an existing cluster dir by profile name, return its ClusterDir.
177 177
178 178 This searches through a sequence of paths for a cluster dir. If it
179 179 is not found, a :class:`ClusterDirError` exception will be raised.
180 180
181 181 The search path algorithm is:
182 182 1. ``os.getcwd()``
183 183 2. ``ipython_dir``
184 184 3. The directories found in the ":" separated
185 185 :env:`IPCLUSTER_DIR_PATH` environment variable.
186 186
187 187 Parameters
188 188 ----------
189 189 ipython_dir : unicode or str
190 190 The IPython directory to use.
191 191 profile : unicode or str
192 192 The name of the profile. The name of the cluster directory
193 193 will be "clusterz_<profile>".
194 194 """
195 195 dirname = u'clusterz_' + profile
196 196 cluster_dir_paths = os.environ.get('IPCLUSTER_DIR_PATH','')
197 197 if cluster_dir_paths:
198 198 cluster_dir_paths = cluster_dir_paths.split(':')
199 199 else:
200 200 cluster_dir_paths = []
201 201 paths = [os.getcwd(), ipython_dir] + cluster_dir_paths
202 202 for p in paths:
203 203 cluster_dir = os.path.join(p, dirname)
204 204 if os.path.isdir(cluster_dir):
205 205 return ClusterDir(location=cluster_dir)
206 206 else:
207 207 raise ClusterDirError('Cluster directory not found in paths: %s' % dirname)
208 208
209 209 @classmethod
210 210 def find_cluster_dir(cls, cluster_dir):
211 211 """Find/create a cluster dir and return its ClusterDir.
212 212
213 213 This will create the cluster directory if it doesn't exist.
214 214
215 215 Parameters
216 216 ----------
217 217 cluster_dir : unicode or str
218 218 The path of the cluster directory. This is expanded using
219 219 :func:`IPython.utils.genutils.expand_path`.
220 220 """
221 221 cluster_dir = expand_path(cluster_dir)
222 222 if not os.path.isdir(cluster_dir):
223 223 raise ClusterDirError('Cluster directory not found: %s' % cluster_dir)
224 224 return ClusterDir(location=cluster_dir)
225 225
226 226
227 227 #-----------------------------------------------------------------------------
228 228 # Command line options
229 229 #-----------------------------------------------------------------------------
230 230
231 231 class ClusterDirConfigLoader(BaseAppConfigLoader):
232 232
233 233 def _add_cluster_profile(self, parser):
234 234 paa = parser.add_argument
235 235 paa('-p', '--profile',
236 236 dest='Global.profile',type=unicode,
237 237 help=
238 238 """The string name of the profile to be used. This determines the name
239 239 of the cluster dir as: cluster_<profile>. The default profile is named
240 240 'default'. The cluster directory is resolve this way if the
241 241 --cluster-dir option is not used.""",
242 242 metavar='Global.profile')
243 243
244 244 def _add_cluster_dir(self, parser):
245 245 paa = parser.add_argument
246 246 paa('--cluster-dir',
247 247 dest='Global.cluster_dir',type=unicode,
248 248 help="""Set the cluster dir. This overrides the logic used by the
249 249 --profile option.""",
250 250 metavar='Global.cluster_dir')
251 251
252 252 def _add_work_dir(self, parser):
253 253 paa = parser.add_argument
254 254 paa('--work-dir',
255 255 dest='Global.work_dir',type=unicode,
256 256 help='Set the working dir for the process.',
257 257 metavar='Global.work_dir')
258 258
259 259 def _add_clean_logs(self, parser):
260 260 paa = parser.add_argument
261 261 paa('--clean-logs',
262 262 dest='Global.clean_logs', action='store_true',
263 263 help='Delete old log flies before starting.')
264 264
265 265 def _add_no_clean_logs(self, parser):
266 266 paa = parser.add_argument
267 267 paa('--no-clean-logs',
268 268 dest='Global.clean_logs', action='store_false',
269 269 help="Don't Delete old log flies before starting.")
270 270
271 271 def _add_arguments(self):
272 272 super(ClusterDirConfigLoader, self)._add_arguments()
273 273 self._add_cluster_profile(self.parser)
274 274 self._add_cluster_dir(self.parser)
275 275 self._add_work_dir(self.parser)
276 276 self._add_clean_logs(self.parser)
277 277 self._add_no_clean_logs(self.parser)
278 278
279 279
280 280 #-----------------------------------------------------------------------------
281 281 # Crash handler for this application
282 282 #-----------------------------------------------------------------------------
283 283
284 284
285 285 _message_template = """\
286 286 Oops, $self.app_name crashed. We do our best to make it stable, but...
287 287
288 288 A crash report was automatically generated with the following information:
289 289 - A verbatim copy of the crash traceback.
290 290 - Data on your current $self.app_name configuration.
291 291
292 292 It was left in the file named:
293 293 \t'$self.crash_report_fname'
294 294 If you can email this file to the developers, the information in it will help
295 295 them in understanding and correcting the problem.
296 296
297 297 You can mail it to: $self.contact_name at $self.contact_email
298 298 with the subject '$self.app_name Crash Report'.
299 299
300 300 If you want to do it now, the following command will work (under Unix):
301 301 mail -s '$self.app_name Crash Report' $self.contact_email < $self.crash_report_fname
302 302
303 303 To ensure accurate tracking of this issue, please file a report about it at:
304 304 $self.bug_tracker
305 305 """
306 306
307 307 class ClusterDirCrashHandler(CrashHandler):
308 308 """sys.excepthook for IPython itself, leaves a detailed report on disk."""
309 309
310 310 message_template = _message_template
311 311
312 312 def __init__(self, app):
313 313 contact_name = release.authors['Brian'][0]
314 314 contact_email = release.authors['Brian'][1]
315 315 bug_tracker = 'http://github.com/ipython/ipython/issues'
316 316 super(ClusterDirCrashHandler,self).__init__(
317 317 app, contact_name, contact_email, bug_tracker
318 318 )
319 319
320 320
321 321 #-----------------------------------------------------------------------------
322 322 # Main application
323 323 #-----------------------------------------------------------------------------
324 324
325 325 class ApplicationWithClusterDir(Application):
326 326 """An application that puts everything into a cluster directory.
327 327
328 328 Instead of looking for things in the ipython_dir, this type of application
329 329 will use its own private directory called the "cluster directory"
330 330 for things like config files, log files, etc.
331 331
332 332 The cluster directory is resolved as follows:
333 333
334 334 * If the ``--cluster-dir`` option is given, it is used.
335 335 * If ``--cluster-dir`` is not given, the application directory is
336 336 resolve using the profile name as ``cluster_<profile>``. The search
337 337 path for this directory is then i) cwd if it is found there
338 338 and ii) in ipython_dir otherwise.
339 339
340 340 The config file for the application is to be put in the cluster
341 341 dir and named the value of the ``config_file_name`` class attribute.
342 342 """
343 343
344 344 command_line_loader = ClusterDirConfigLoader
345 345 crash_handler_class = ClusterDirCrashHandler
346 346 auto_create_cluster_dir = True
347 347 # temporarily override default_log_level to DEBUG
348 348 default_log_level = logging.DEBUG
349 349
350 350 def create_default_config(self):
351 351 super(ApplicationWithClusterDir, self).create_default_config()
352 352 self.default_config.Global.profile = u'default'
353 353 self.default_config.Global.cluster_dir = u''
354 354 self.default_config.Global.work_dir = os.getcwd()
355 355 self.default_config.Global.log_to_file = False
356 356 self.default_config.Global.log_url = None
357 357 self.default_config.Global.clean_logs = False
358 358
359 359 def find_resources(self):
360 360 """This resolves the cluster directory.
361 361
362 362 This tries to find the cluster directory and if successful, it will
363 363 have done:
364 364 * Sets ``self.cluster_dir_obj`` to the :class:`ClusterDir` object for
365 365 the application.
366 366 * Sets ``self.cluster_dir`` attribute of the application and config
367 367 objects.
368 368
369 369 The algorithm used for this is as follows:
370 370 1. Try ``Global.cluster_dir``.
371 371 2. Try using ``Global.profile``.
372 372 3. If both of these fail and ``self.auto_create_cluster_dir`` is
373 373 ``True``, then create the new cluster dir in the IPython directory.
374 374 4. If all fails, then raise :class:`ClusterDirError`.
375 375 """
376 376
377 377 try:
378 378 cluster_dir = self.command_line_config.Global.cluster_dir
379 379 except AttributeError:
380 380 cluster_dir = self.default_config.Global.cluster_dir
381 381 cluster_dir = expand_path(cluster_dir)
382 382 try:
383 383 self.cluster_dir_obj = ClusterDir.find_cluster_dir(cluster_dir)
384 384 except ClusterDirError:
385 385 pass
386 386 else:
387 387 self.log.info('Using existing cluster dir: %s' % \
388 388 self.cluster_dir_obj.location
389 389 )
390 390 self.finish_cluster_dir()
391 391 return
392 392
393 393 try:
394 394 self.profile = self.command_line_config.Global.profile
395 395 except AttributeError:
396 396 self.profile = self.default_config.Global.profile
397 397 try:
398 398 self.cluster_dir_obj = ClusterDir.find_cluster_dir_by_profile(
399 399 self.ipython_dir, self.profile)
400 400 except ClusterDirError:
401 401 pass
402 402 else:
403 403 self.log.info('Using existing cluster dir: %s' % \
404 404 self.cluster_dir_obj.location
405 405 )
406 406 self.finish_cluster_dir()
407 407 return
408 408
409 409 if self.auto_create_cluster_dir:
410 410 self.cluster_dir_obj = ClusterDir.create_cluster_dir_by_profile(
411 411 self.ipython_dir, self.profile
412 412 )
413 413 self.log.info('Creating new cluster dir: %s' % \
414 414 self.cluster_dir_obj.location
415 415 )
416 416 self.finish_cluster_dir()
417 417 else:
418 418 raise ClusterDirError('Could not find a valid cluster directory.')
419 419
420 420 def finish_cluster_dir(self):
421 421 # Set the cluster directory
422 422 self.cluster_dir = self.cluster_dir_obj.location
423 423
424 424 # These have to be set because they could be different from the one
425 425 # that we just computed. Because command line has the highest
426 426 # priority, this will always end up in the master_config.
427 427 self.default_config.Global.cluster_dir = self.cluster_dir
428 428 self.command_line_config.Global.cluster_dir = self.cluster_dir
429 429
430 430 def find_config_file_name(self):
431 431 """Find the config file name for this application."""
432 432 # For this type of Application it should be set as a class attribute.
433 433 if not hasattr(self, 'default_config_file_name'):
434 434 self.log.critical("No config filename found")
435 435 else:
436 436 self.config_file_name = self.default_config_file_name
437 437
438 438 def find_config_file_paths(self):
439 439 # Set the search path to to the cluster directory. We should NOT
440 440 # include IPython.config.default here as the default config files
441 441 # are ALWAYS automatically moved to the cluster directory.
442 442 conf_dir = os.path.join(get_ipython_package_dir(), 'config', 'default')
443 443 self.config_file_paths = (self.cluster_dir,)
444 444
445 445 def pre_construct(self):
446 446 # The log and security dirs were set earlier, but here we put them
447 447 # into the config and log them.
448 448 config = self.master_config
449 449 sdir = self.cluster_dir_obj.security_dir
450 450 self.security_dir = config.Global.security_dir = sdir
451 451 ldir = self.cluster_dir_obj.log_dir
452 452 self.log_dir = config.Global.log_dir = ldir
453 453 pdir = self.cluster_dir_obj.pid_dir
454 454 self.pid_dir = config.Global.pid_dir = pdir
455 455 self.log.info("Cluster directory set to: %s" % self.cluster_dir)
456 456 config.Global.work_dir = unicode(expand_path(config.Global.work_dir))
457 457 # Change to the working directory. We do this just before construct
458 458 # is called so all the components there have the right working dir.
459 459 self.to_work_dir()
460 460
461 461 def to_work_dir(self):
462 462 wd = self.master_config.Global.work_dir
463 463 if unicode(wd) != unicode(os.getcwd()):
464 464 os.chdir(wd)
465 465 self.log.info("Changing to working dir: %s" % wd)
466 466
467 467 def start_logging(self):
468 468 # Remove old log files
469 469 if self.master_config.Global.clean_logs:
470 470 log_dir = self.master_config.Global.log_dir
471 471 for f in os.listdir(log_dir):
472 472 if f.startswith(self.name + u'-') and f.endswith('.log'):
473 473 os.remove(os.path.join(log_dir, f))
474 474 # Start logging to the new log file
475 475 if self.master_config.Global.log_to_file:
476 476 log_filename = self.name + u'-' + str(os.getpid()) + u'.log'
477 477 logfile = os.path.join(self.log_dir, log_filename)
478 478 open_log_file = open(logfile, 'w')
479 479 elif self.master_config.Global.log_url:
480 480 open_log_file = None
481 481 else:
482 482 open_log_file = sys.stdout
483 logger = logging.getLogger()
484 level = self.log_level
485 self.log = logger
486 # since we've reconnected the logger, we need to reconnect the log-level
487 self.log_level = level
488 if open_log_file is not None and self._log_handler not in self.log.handlers:
483 if open_log_file is not None:
484 self.log.removeHandler(self._log_handler)
485 self._log_handler = logging.StreamHandler(open_log_file)
486 self._log_formatter = logging.Formatter("[%(name)s] %(message)s")
487 self._log_handler.setFormatter(self._log_formatter)
489 488 self.log.addHandler(self._log_handler)
490 489 # log.startLogging(open_log_file)
491 490
492 491 def write_pid_file(self, overwrite=False):
493 492 """Create a .pid file in the pid_dir with my pid.
494 493
495 494 This must be called after pre_construct, which sets `self.pid_dir`.
496 495 This raises :exc:`PIDFileError` if the pid file exists already.
497 496 """
498 497 pid_file = os.path.join(self.pid_dir, self.name + u'.pid')
499 498 if os.path.isfile(pid_file):
500 499 pid = self.get_pid_from_file()
501 500 if not overwrite:
502 501 raise PIDFileError(
503 502 'The pid file [%s] already exists. \nThis could mean that this '
504 503 'server is already running with [pid=%s].' % (pid_file, pid)
505 504 )
506 505 with open(pid_file, 'w') as f:
507 506 self.log.info("Creating pid file: %s" % pid_file)
508 507 f.write(repr(os.getpid())+'\n')
509 508
510 509 def remove_pid_file(self):
511 510 """Remove the pid file.
512 511
513 512 This should be called at shutdown by registering a callback with
514 513 :func:`reactor.addSystemEventTrigger`. This needs to return
515 514 ``None``.
516 515 """
517 516 pid_file = os.path.join(self.pid_dir, self.name + u'.pid')
518 517 if os.path.isfile(pid_file):
519 518 try:
520 519 self.log.info("Removing pid file: %s" % pid_file)
521 520 os.remove(pid_file)
522 521 except:
523 522 self.log.warn("Error removing the pid file: %s" % pid_file)
524 523
525 524 def get_pid_from_file(self):
526 525 """Get the pid from the pid file.
527 526
528 527 If the pid file doesn't exist a :exc:`PIDFileError` is raised.
529 528 """
530 529 pid_file = os.path.join(self.pid_dir, self.name + u'.pid')
531 530 if os.path.isfile(pid_file):
532 531 with open(pid_file, 'r') as f:
533 532 pid = int(f.read().strip())
534 533 return pid
535 534 else:
536 535 raise PIDFileError('pid file not found: %s' % pid_file)
537 536
@@ -1,110 +1,108 b''
1 1 #!/usr/bin/env python
2 2 """The IPython Controller with 0MQ
3 3 This is a collection of one Hub and several Schedulers.
4 4 """
5 5 #-----------------------------------------------------------------------------
6 6 # Copyright (C) 2010 The IPython Development Team
7 7 #
8 8 # Distributed under the terms of the BSD License. The full license is in
9 9 # the file COPYING, distributed as part of this software.
10 10 #-----------------------------------------------------------------------------
11 11
12 12 #-----------------------------------------------------------------------------
13 13 # Imports
14 14 #-----------------------------------------------------------------------------
15 15 from __future__ import print_function
16 16
17 17 import logging
18 18 from multiprocessing import Process
19 19
20 20 import zmq
21 21
22 22 # internal:
23 23 from IPython.utils.importstring import import_item
24 24 from IPython.utils.traitlets import Int, Str, Instance, List, Bool
25 25
26 26 from entry_point import signal_children
27 27
28 28
29 29 from scheduler import launch_scheduler
30 30 from hub import Hub, HubFactory
31 31
32 32 #-----------------------------------------------------------------------------
33 33 # Configurable
34 34 #-----------------------------------------------------------------------------
35 35
36 36
37 37 class ControllerFactory(HubFactory):
38 38 """Configurable for setting up a Hub and Schedulers."""
39 39
40 40 scheme = Str('pure', config=True)
41 41 usethreads = Bool(False, config=True)
42 42
43 43 # internal
44 44 children = List()
45 45 mq_class = Str('zmq.devices.ProcessMonitoredQueue')
46 46
47 def _update_mq(self):
48 self.mq_class = 'zmq.devices.%sMonitoredQueue'%('Thread' if self.usethreads else 'Process')
47 def _usethreads_changed(self, name, old, new):
48 self.mq_class = 'zmq.devices.%sMonitoredQueue'%('Thread' if new else 'Process')
49 49
50 50 def __init__(self, **kwargs):
51 51 super(ControllerFactory, self).__init__(**kwargs)
52 52 self.subconstructors.append(self.construct_schedulers)
53 self._update_mq()
54 self.on_trait_change(self._update_mq, 'usethreads')
55 53
56 54 def start(self):
57 55 super(ControllerFactory, self).start()
58 56 for child in self.children:
59 57 child.start()
60 58 if not self.usethreads:
61 59 signal_children([ getattr(c, 'launcher', c) for c in self.children ])
62 60
63 61
64 62 def construct_schedulers(self):
65 63 children = self.children
66 64 mq = import_item(self.mq_class)
67 65
68 66 # IOPub relay (in a Process)
69 67 q = mq(zmq.PUB, zmq.SUB, zmq.PUB, 'N/A','iopub')
70 68 q.bind_in(self.client_addrs['iopub'])
71 69 q.bind_out(self.engine_addrs['iopub'])
72 70 q.setsockopt_out(zmq.SUBSCRIBE, '')
73 71 q.connect_mon(self.monitor_url)
74 72 q.daemon=True
75 73 children.append(q)
76 74
77 75 # Multiplexer Queue (in a Process)
78 76 q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'in', 'out')
79 77 q.bind_in(self.client_addrs['mux'])
80 78 q.bind_out(self.engine_addrs['mux'])
81 79 q.connect_mon(self.monitor_url)
82 80 q.daemon=True
83 81 children.append(q)
84 82
85 83 # Control Queue (in a Process)
86 84 q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'incontrol', 'outcontrol')
87 85 q.bind_in(self.client_addrs['control'])
88 86 q.bind_out(self.engine_addrs['control'])
89 87 q.connect_mon(self.monitor_url)
90 88 q.daemon=True
91 89 children.append(q)
92 90 # Task Queue (in a Process)
93 91 if self.scheme == 'pure':
94 logging.warn("task::using pure XREQ Task scheduler")
92 self.log.warn("task::using pure XREQ Task scheduler")
95 93 q = mq(zmq.XREP, zmq.XREQ, zmq.PUB, 'intask', 'outtask')
96 94 q.bind_in(self.client_addrs['task'])
97 95 q.bind_out(self.engine_addrs['task'])
98 96 q.connect_mon(self.monitor_url)
99 97 q.daemon=True
100 98 children.append(q)
101 99 elif self.scheme == 'none':
102 logging.warn("task::using no Task scheduler")
100 self.log.warn("task::using no Task scheduler")
103 101
104 102 else:
105 logging.warn("task::using Python %s Task scheduler"%self.scheme)
103 self.log.warn("task::using Python %s Task scheduler"%self.scheme)
106 104 sargs = (self.client_addrs['task'], self.engine_addrs['task'], self.monitor_url, self.client_addrs['notification'])
107 105 q = Process(target=launch_scheduler, args=sargs, kwargs = dict(scheme=self.scheme))
108 106 q.daemon=True
109 107 children.append(q)
110 108
@@ -1,141 +1,140 b''
1 1 #!/usr/bin/env python
2 2 """A simple engine that talks to a controller over 0MQ.
3 3 it handles registration, etc. and launches a kernel
4 4 connected to the Controller's queue(s).
5 5 """
6 6 from __future__ import print_function
7 7 import sys
8 8 import time
9 9 import uuid
10 10 import logging
11 11 from pprint import pprint
12 12
13 13 import zmq
14 14 from zmq.eventloop import ioloop, zmqstream
15 15
16 16 # internal
17 17 from IPython.config.configurable import Configurable
18 18 from IPython.utils.traitlets import Instance, Str, Dict, Int, Type
19 19 # from IPython.utils.localinterfaces import LOCALHOST
20 20
21 21 from factory import RegistrationFactory
22 22
23 23 from streamsession import Message
24 24 from streamkernel import Kernel
25 25 import heartmonitor
26 26
27 27 def printer(*msg):
28 # print (logging.handlers, file=sys.__stdout__)
29 logging.info(str(msg))
28 # print (self.log.handlers, file=sys.__stdout__)
29 self.log.info(str(msg))
30 30
31 31 class EngineFactory(RegistrationFactory):
32 32 """IPython engine"""
33 33
34 34 # configurables:
35 35 user_ns=Dict(config=True)
36 36 out_stream_factory=Type('IPython.zmq.iostream.OutStream', config=True)
37 37 display_hook_factory=Type('IPython.zmq.displayhook.DisplayHook', config=True)
38 38
39 39 # not configurable:
40 40 id=Int(allow_none=True)
41 41 registrar=Instance('zmq.eventloop.zmqstream.ZMQStream')
42 42 kernel=Instance(Kernel)
43 43
44 44
45 45 def __init__(self, **kwargs):
46 46 super(EngineFactory, self).__init__(**kwargs)
47 47 ctx = self.context
48 48
49 49 reg = ctx.socket(zmq.PAIR)
50 50 reg.setsockopt(zmq.IDENTITY, self.ident)
51 51 reg.connect(self.url)
52 52 self.registrar = zmqstream.ZMQStream(reg, self.loop)
53 53
54 54 def register(self):
55 55 """send the registration_request"""
56 56
57 logging.info("registering")
57 self.log.info("registering")
58 58 content = dict(queue=self.ident, heartbeat=self.ident, control=self.ident)
59 59 self.registrar.on_recv(self.complete_registration)
60 60 # print (self.session.key)
61 61 self.session.send(self.registrar, "registration_request",content=content)
62 62
63 63 def complete_registration(self, msg):
64 64 # print msg
65 65 ctx = self.context
66 66 loop = self.loop
67 67 identity = self.ident
68 68 print (identity)
69 69
70 70 idents,msg = self.session.feed_identities(msg)
71 71 msg = Message(self.session.unpack_message(msg))
72 72
73 73 if msg.content.status == 'ok':
74 74 self.id = int(msg.content.id)
75 75
76 76 # create Shell Streams (MUX, Task, etc.):
77 77 queue_addr = msg.content.mux
78 78 shell_addrs = [ str(queue_addr) ]
79 79 task_addr = msg.content.task
80 80 if task_addr:
81 81 shell_addrs.append(str(task_addr))
82 82 shell_streams = []
83 83 for addr in shell_addrs:
84 84 stream = zmqstream.ZMQStream(ctx.socket(zmq.PAIR), loop)
85 85 stream.setsockopt(zmq.IDENTITY, identity)
86 86 stream.connect(addr)
87 87 shell_streams.append(stream)
88 88
89 89 # control stream:
90 90 control_addr = str(msg.content.control)
91 91 control_stream = zmqstream.ZMQStream(ctx.socket(zmq.PAIR), loop)
92 92 control_stream.setsockopt(zmq.IDENTITY, identity)
93 93 control_stream.connect(control_addr)
94 94
95 95 # create iopub stream:
96 96 iopub_addr = msg.content.iopub
97 97 iopub_stream = zmqstream.ZMQStream(ctx.socket(zmq.PUB), loop)
98 98 iopub_stream.setsockopt(zmq.IDENTITY, identity)
99 99 iopub_stream.connect(iopub_addr)
100 100
101 101 # launch heartbeat
102 102 hb_addrs = msg.content.heartbeat
103 103 # print (hb_addrs)
104 104
105 105 # # Redirect input streams and set a display hook.
106 106 if self.out_stream_factory:
107 107 sys.stdout = self.out_stream_factory(self.session, iopub_stream, u'stdout')
108 108 sys.stdout.topic = 'engine.%i.stdout'%self.id
109 109 sys.stderr = self.out_stream_factory(self.session, iopub_stream, u'stderr')
110 110 sys.stderr.topic = 'engine.%i.stderr'%self.id
111 111 if self.display_hook_factory:
112 112 sys.displayhook = self.display_hook_factory(self.session, iopub_stream)
113 113 sys.displayhook.topic = 'engine.%i.pyout'%self.id
114 114
115 self.kernel = Kernel(int_id=self.id, ident=self.ident, session=self.session,
116 control_stream=control_stream,
117 shell_streams=shell_streams, iopub_stream=iopub_stream, loop=loop,
118 user_ns = self.user_ns, config=self.config)
115 self.kernel = Kernel(config=self.config, int_id=self.id, ident=self.ident, session=self.session,
116 control_stream=control_stream, shell_streams=shell_streams, iopub_stream=iopub_stream,
117 loop=loop, user_ns = self.user_ns, logname=self.log.name)
119 118 self.kernel.start()
120 119
121 120 heart = heartmonitor.Heart(*map(str, hb_addrs), heart_id=identity)
122 121 # ioloop.DelayedCallback(heart.start, 1000, self.loop).start()
123 122 heart.start()
124 123
125 124
126 125 else:
127 logging.error("Registration Failed: %s"%msg)
126 self.log.error("Registration Failed: %s"%msg)
128 127 raise Exception("Registration Failed: %s"%msg)
129 128
130 logging.info("Completed registration with id %i"%self.id)
129 self.log.info("Completed registration with id %i"%self.id)
131 130
132 131
133 132 def unregister(self):
134 133 self.session.send(self.registrar, "unregistration_request", content=dict(id=self.id))
135 134 time.sleep(1)
136 135 sys.exit(0)
137 136
138 137 def start(self):
139 138 dc = ioloop.DelayedCallback(self.register, 0, self.loop)
140 139 dc.start()
141 140
@@ -1,118 +1,118 b''
1 1 """ Defines helper functions for creating kernel entry points and process
2 2 launchers.
3 3
4 4 ************
5 5 NOTE: Most of this module has been deprecated by moving to Configurables
6 6 ************
7 7 """
8 8
9 9 # Standard library imports.
10 10 import logging
11 11 import atexit
12 12 import sys
13 13 import os
14 14 import stat
15 15 import socket
16 16 from subprocess import Popen, PIPE
17 17 from signal import signal, SIGINT, SIGABRT, SIGTERM
18 18 try:
19 19 from signal import SIGKILL
20 20 except ImportError:
21 21 SIGKILL=None
22 22
23 23 # System library imports.
24 24 import zmq
25 25 from zmq.log import handlers
26 26 # Local imports.
27 27 from IPython.core.ultratb import FormattedTB
28 28 from IPython.external.argparse import ArgumentParser
29 29 from IPython.zmq.log import EnginePUBHandler
30 30
31 31 _random_ports = set()
32 32
33 33 def select_random_ports(n):
34 34 """Selects and return n random ports that are available."""
35 35 ports = []
36 36 for i in xrange(n):
37 37 sock = socket.socket()
38 38 sock.bind(('', 0))
39 39 while sock.getsockname()[1] in _random_ports:
40 40 sock.close()
41 41 sock = socket.socket()
42 42 sock.bind(('', 0))
43 43 ports.append(sock)
44 44 for i, sock in enumerate(ports):
45 45 port = sock.getsockname()[1]
46 46 sock.close()
47 47 ports[i] = port
48 48 _random_ports.add(port)
49 49 return ports
50 50
51 51 def signal_children(children):
52 52 """Relay interupt/term signals to children, for more solid process cleanup."""
53 53 def terminate_children(sig, frame):
54 54 logging.critical("Got signal %i, terminating children..."%sig)
55 55 for child in children:
56 56 child.terminate()
57 57
58 58 sys.exit(sig != SIGINT)
59 59 # sys.exit(sig)
60 60 for sig in (SIGINT, SIGABRT, SIGTERM):
61 61 signal(sig, terminate_children)
62 62
63 63 def generate_exec_key(keyfile):
64 64 import uuid
65 65 newkey = str(uuid.uuid4())
66 66 with open(keyfile, 'w') as f:
67 67 # f.write('ipython-key ')
68 68 f.write(newkey+'\n')
69 69 # set user-only RW permissions (0600)
70 70 # this will have no effect on Windows
71 71 os.chmod(keyfile, stat.S_IRUSR|stat.S_IWUSR)
72 72
73 73
74 74 def integer_loglevel(loglevel):
75 75 try:
76 76 loglevel = int(loglevel)
77 77 except ValueError:
78 78 if isinstance(loglevel, str):
79 79 loglevel = getattr(logging, loglevel)
80 80 return loglevel
81 81
82 def connect_logger(context, iface, root="ip", loglevel=logging.DEBUG):
83 logger = logging.getLogger()
82 def connect_logger(logname, context, iface, root="ip", loglevel=logging.DEBUG):
83 logger = logging.getLogger(logname)
84 84 if any([isinstance(h, handlers.PUBHandler) for h in logger.handlers]):
85 85 # don't add a second PUBHandler
86 86 return
87 87 loglevel = integer_loglevel(loglevel)
88 88 lsock = context.socket(zmq.PUB)
89 89 lsock.connect(iface)
90 90 handler = handlers.PUBHandler(lsock)
91 91 handler.setLevel(loglevel)
92 92 handler.root_topic = root
93 93 logger.addHandler(handler)
94 94 logger.setLevel(loglevel)
95 95
96 96 def connect_engine_logger(context, iface, engine, loglevel=logging.DEBUG):
97 97 logger = logging.getLogger()
98 98 if any([isinstance(h, handlers.PUBHandler) for h in logger.handlers]):
99 99 # don't add a second PUBHandler
100 100 return
101 101 loglevel = integer_loglevel(loglevel)
102 102 lsock = context.socket(zmq.PUB)
103 103 lsock.connect(iface)
104 104 handler = EnginePUBHandler(engine, lsock)
105 105 handler.setLevel(loglevel)
106 106 logger.addHandler(handler)
107 107 logger.setLevel(loglevel)
108 108
109 def local_logger(loglevel=logging.DEBUG):
109 def local_logger(logname, loglevel=logging.DEBUG):
110 110 loglevel = integer_loglevel(loglevel)
111 logger = logging.getLogger()
111 logger = logging.getLogger(logname)
112 112 if any([isinstance(h, logging.StreamHandler) for h in logger.handlers]):
113 113 # don't add a second StreamHandler
114 114 return
115 115 handler = logging.StreamHandler()
116 116 handler.setLevel(loglevel)
117 117 logger.addHandler(handler)
118 118 logger.setLevel(loglevel)
@@ -1,149 +1,155 b''
1 1 """Base config factories."""
2 2
3 3 #-----------------------------------------------------------------------------
4 4 # Copyright (C) 2008-2009 The IPython Development Team
5 5 #
6 6 # Distributed under the terms of the BSD License. The full license is in
7 7 # the file COPYING, distributed as part of this software.
8 8 #-----------------------------------------------------------------------------
9 9
10 10 #-----------------------------------------------------------------------------
11 11 # Imports
12 12 #-----------------------------------------------------------------------------
13 13
14 14
15 15 import os
16 16 import logging
17 17
18 18 import uuid
19 19
20 20 from zmq.eventloop.ioloop import IOLoop
21 21
22 22 from IPython.config.configurable import Configurable
23 23 from IPython.utils.traitlets import Str,Int,Instance, CUnicode, CStr
24 24 from IPython.utils.importstring import import_item
25 25
26 26 from IPython.zmq.parallel.entry_point import select_random_ports
27 27 import IPython.zmq.parallel.streamsession as ss
28 28
29 29 #-----------------------------------------------------------------------------
30 30 # Classes
31 31 #-----------------------------------------------------------------------------
32 class LoggingFactory(Configurable):
33 """A most basic class, that has a `log` (type:`Logger`) attribute, set via a `logname` Trait."""
34 log = Instance('logging.Logger', ('ZMQ', logging.WARN))
35 logname = CStr('ZMQ')
36 def _logname_changed(self, name, old, new):
37 self.log = logging.getLogger(new)
38
32 39
33
34 class SessionFactory(Configurable):
40 class SessionFactory(LoggingFactory):
35 41 """The Base factory from which every factory in IPython.zmq.parallel inherits"""
36 42
37 43 packer = Str('',config=True)
38 44 unpacker = Str('',config=True)
39 45 ident = CStr('',config=True)
40 46 def _ident_default(self):
41 47 return str(uuid.uuid4())
42 48 username = Str(os.environ.get('USER','username'),config=True)
43 49 exec_key = CUnicode('',config=True)
44
45 50 # not configurable:
46 51 context = Instance('zmq.Context', (), {})
47 52 session = Instance('IPython.zmq.parallel.streamsession.StreamSession')
48 loop = Instance('zmq.eventloop.ioloop.IOLoop')
53 loop = Instance('zmq.eventloop.ioloop.IOLoop', allow_none=False)
49 54 def _loop_default(self):
50 55 return IOLoop.instance()
51 56
57
52 58 def __init__(self, **kwargs):
53 59 super(SessionFactory, self).__init__(**kwargs)
54 60
55 61 keyfile = self.exec_key or None
56 62
57 63 # set the packers:
58 64 if not self.packer:
59 65 packer_f = unpacker_f = None
60 66 elif self.packer.lower() == 'json':
61 67 packer_f = ss.json_packer
62 68 unpacker_f = ss.json_unpacker
63 69 elif self.packer.lower() == 'pickle':
64 70 packer_f = ss.pickle_packer
65 71 unpacker_f = ss.pickle_unpacker
66 72 else:
67 73 packer_f = import_item(self.packer)
68 74 unpacker_f = import_item(self.unpacker)
69 75
70 76 # construct the session
71 77 self.session = ss.StreamSession(self.username, self.ident, packer=packer_f, unpacker=unpacker_f, keyfile=keyfile)
72 78
73 79
74 80 class RegistrationFactory(SessionFactory):
75 81 """The Base Configurable for objects that involve registration."""
76 82
77 83 url = Str('', config=True) # url takes precedence over ip,regport,transport
78 84 transport = Str('tcp', config=True)
79 85 ip = Str('127.0.0.1', config=True)
80 86 regport = Instance(int, config=True)
81 87 def _regport_default(self):
82 88 return 10101
83 89 # return select_random_ports(1)[0]
84 90
85 91 def __init__(self, **kwargs):
86 92 super(RegistrationFactory, self).__init__(**kwargs)
87 93 self._propagate_url()
88 94 self._rebuild_url()
89 95 self.on_trait_change(self._propagate_url, 'url')
90 96 self.on_trait_change(self._rebuild_url, 'ip')
91 97 self.on_trait_change(self._rebuild_url, 'transport')
92 98 self.on_trait_change(self._rebuild_url, 'regport')
93 99
94 100 def _rebuild_url(self):
95 101 self.url = "%s://%s:%i"%(self.transport, self.ip, self.regport)
96 102
97 103 def _propagate_url(self):
98 104 """Ensure self.url contains full transport://interface:port"""
99 105 if self.url:
100 106 iface = self.url.split('://',1)
101 107 if len(iface) == 2:
102 108 self.transport,iface = iface
103 109 iface = iface.split(':')
104 110 self.ip = iface[0]
105 111 if iface[1]:
106 112 self.regport = int(iface[1])
107 113
108 114 #-----------------------------------------------------------------------------
109 115 # argparse argument extenders
110 116 #-----------------------------------------------------------------------------
111 117
112 118
113 119 def add_session_arguments(parser):
114 120 paa = parser.add_argument
115 121 paa('--ident',
116 122 type=str, dest='SessionFactory.ident',
117 123 help='set the ZMQ and session identity [default: random uuid]',
118 124 metavar='identity')
119 125 # paa('--execkey',
120 126 # type=str, dest='SessionFactory.exec_key',
121 127 # help='path to a file containing an execution key.',
122 128 # metavar='execkey')
123 129 paa('--packer',
124 130 type=str, dest='SessionFactory.packer',
125 131 help='method to serialize messages: {json,pickle} [default: json]',
126 132 metavar='packer')
127 133 paa('--unpacker',
128 134 type=str, dest='SessionFactory.unpacker',
129 135 help='inverse function of `packer`. Only necessary when using something other than json|pickle',
130 136 metavar='packer')
131 137
132 138 def add_registration_arguments(parser):
133 139 paa = parser.add_argument
134 140 paa('--ip',
135 141 type=str, dest='RegistrationFactory.ip',
136 142 help="The IP used for registration [default: localhost]",
137 143 metavar='ip')
138 144 paa('--transport',
139 145 type=str, dest='RegistrationFactory.transport',
140 146 help="The ZeroMQ transport used for registration [default: tcp]",
141 147 metavar='transport')
142 148 paa('--url',
143 149 type=str, dest='RegistrationFactory.url',
144 150 help='set transport,ip,regport in one go, e.g. tcp://127.0.0.1:10101',
145 151 metavar='url')
146 152 paa('--regport',
147 153 type=int, dest='RegistrationFactory.regport',
148 154 help="The port used for registration [default: 10101]",
149 155 metavar='ip')
@@ -1,156 +1,158 b''
1 1 #!/usr/bin/env python
2 2 """
3 3 A multi-heart Heartbeat system using PUB and XREP sockets. pings are sent out on the PUB,
4 4 and hearts are tracked based on their XREQ identities.
5 5 """
6 6
7 7 from __future__ import print_function
8 8 import time
9 9 import uuid
10 10 import logging
11 11
12 12 import zmq
13 13 from zmq.devices import ProcessDevice,ThreadDevice
14 14 from zmq.eventloop import ioloop, zmqstream
15 15
16 from IPython.utils.traitlets import Set, Instance, CFloat, Bool
17 from factory import LoggingFactory
18
16 19 class Heart(object):
17 20 """A basic heart object for responding to a HeartMonitor.
18 21 This is a simple wrapper with defaults for the most common
19 22 Device model for responding to heartbeats.
20 23
21 24 It simply builds a threadsafe zmq.FORWARDER Device, defaulting to using
22 25 SUB/XREQ for in/out.
23 26
24 27 You can specify the XREQ's IDENTITY via the optional heart_id argument."""
25 28 device=None
26 29 id=None
27 30 def __init__(self, in_addr, out_addr, in_type=zmq.SUB, out_type=zmq.XREQ, heart_id=None):
28 31 self.device = ThreadDevice(zmq.FORWARDER, in_type, out_type)
29 32 self.device.daemon=True
30 33 self.device.connect_in(in_addr)
31 34 self.device.connect_out(out_addr)
32 35 if in_type == zmq.SUB:
33 36 self.device.setsockopt_in(zmq.SUBSCRIBE, "")
34 37 if heart_id is None:
35 38 heart_id = str(uuid.uuid4())
36 39 self.device.setsockopt_out(zmq.IDENTITY, heart_id)
37 40 self.id = heart_id
38 41
39 42 def start(self):
40 43 return self.device.start()
41 44
42 class HeartMonitor(object):
45 class HeartMonitor(LoggingFactory):
43 46 """A basic HeartMonitor class
44 47 pingstream: a PUB stream
45 48 pongstream: an XREP stream
46 49 period: the period of the heartbeat in milliseconds"""
47 loop=None
48 pingstream=None
49 pongstream=None
50 period=None
51 hearts=None
52 on_probation=None
53 last_ping=None
54 # debug=False
55 50
56 def __init__(self, loop, pingstream, pongstream, period=1000):
57 self.loop = loop
58 self.period = period
51 period=CFloat(1000, config=True) # in milliseconds
52
53 pingstream=Instance('zmq.eventloop.zmqstream.ZMQStream')
54 pongstream=Instance('zmq.eventloop.zmqstream.ZMQStream')
55 loop = Instance('zmq.eventloop.ioloop.IOLoop')
56 def _loop_default(self):
57 return ioloop.IOLoop.instance()
58 debug=Bool(False)
59
60 # not settable:
61 hearts=Set()
62 responses=Set()
63 on_probation=Set()
64 last_ping=CFloat(0)
65 _new_handlers = Set()
66 _failure_handlers = Set()
67 lifetime = CFloat(0)
68 tic = CFloat(0)
69
70 def __init__(self, **kwargs):
71 super(HeartMonitor, self).__init__(**kwargs)
59 72
60 self.pingstream = pingstream
61 self.pongstream = pongstream
62 73 self.pongstream.on_recv(self.handle_pong)
63
64 self.hearts = set()
65 self.responses = set()
66 self.on_probation = set()
67 self.lifetime = 0
68 self.tic = time.time()
69
70 self._new_handlers = set()
71 self._failure_handlers = set()
72 74
73 75 def start(self):
74 76 self.caller = ioloop.PeriodicCallback(self.beat, self.period, self.loop)
75 77 self.caller.start()
76 78
77 79 def add_new_heart_handler(self, handler):
78 80 """add a new handler for new hearts"""
79 logging.debug("heartbeat::new_heart_handler: %s"%handler)
81 self.log.debug("heartbeat::new_heart_handler: %s"%handler)
80 82 self._new_handlers.add(handler)
81 83
82 84 def add_heart_failure_handler(self, handler):
83 85 """add a new handler for heart failure"""
84 logging.debug("heartbeat::new heart failure handler: %s"%handler)
86 self.log.debug("heartbeat::new heart failure handler: %s"%handler)
85 87 self._failure_handlers.add(handler)
86 88
87 89 def beat(self):
88 90 self.pongstream.flush()
89 91 self.last_ping = self.lifetime
90 92
91 93 toc = time.time()
92 94 self.lifetime += toc-self.tic
93 95 self.tic = toc
94 # logging.debug("heartbeat::%s"%self.lifetime)
96 # self.log.debug("heartbeat::%s"%self.lifetime)
95 97 goodhearts = self.hearts.intersection(self.responses)
96 98 missed_beats = self.hearts.difference(goodhearts)
97 99 heartfailures = self.on_probation.intersection(missed_beats)
98 100 newhearts = self.responses.difference(goodhearts)
99 101 map(self.handle_new_heart, newhearts)
100 102 map(self.handle_heart_failure, heartfailures)
101 103 self.on_probation = missed_beats.intersection(self.hearts)
102 104 self.responses = set()
103 105 # print self.on_probation, self.hearts
104 # logging.debug("heartbeat::beat %.3f, %i beating hearts"%(self.lifetime, len(self.hearts)))
106 # self.log.debug("heartbeat::beat %.3f, %i beating hearts"%(self.lifetime, len(self.hearts)))
105 107 self.pingstream.send(str(self.lifetime))
106 108
107 109 def handle_new_heart(self, heart):
108 110 if self._new_handlers:
109 111 for handler in self._new_handlers:
110 112 handler(heart)
111 113 else:
112 logging.info("heartbeat::yay, got new heart %s!"%heart)
114 self.log.info("heartbeat::yay, got new heart %s!"%heart)
113 115 self.hearts.add(heart)
114 116
115 117 def handle_heart_failure(self, heart):
116 118 if self._failure_handlers:
117 119 for handler in self._failure_handlers:
118 120 try:
119 121 handler(heart)
120 122 except Exception as e:
121 logging.error("heartbeat::Bad Handler! %s"%handler, exc_info=True)
123 self.log.error("heartbeat::Bad Handler! %s"%handler, exc_info=True)
122 124 pass
123 125 else:
124 logging.info("heartbeat::Heart %s failed :("%heart)
126 self.log.info("heartbeat::Heart %s failed :("%heart)
125 127 self.hearts.remove(heart)
126 128
127 129
128 130 def handle_pong(self, msg):
129 131 "a heart just beat"
130 132 if msg[1] == str(self.lifetime):
131 133 delta = time.time()-self.tic
132 # logging.debug("heartbeat::heart %r took %.2f ms to respond"%(msg[0], 1000*delta))
134 # self.log.debug("heartbeat::heart %r took %.2f ms to respond"%(msg[0], 1000*delta))
133 135 self.responses.add(msg[0])
134 136 elif msg[1] == str(self.last_ping):
135 137 delta = time.time()-self.tic + (self.lifetime-self.last_ping)
136 logging.warn("heartbeat::heart %r missed a beat, and took %.2f ms to respond"%(msg[0], 1000*delta))
138 self.log.warn("heartbeat::heart %r missed a beat, and took %.2f ms to respond"%(msg[0], 1000*delta))
137 139 self.responses.add(msg[0])
138 140 else:
139 logging.warn("heartbeat::got bad heartbeat (possibly old?): %s (current=%.3f)"%
141 self.log.warn("heartbeat::got bad heartbeat (possibly old?): %s (current=%.3f)"%
140 142 (msg[1],self.lifetime))
141 143
142 144
143 145 if __name__ == '__main__':
144 146 loop = ioloop.IOLoop.instance()
145 147 context = zmq.Context()
146 148 pub = context.socket(zmq.PUB)
147 149 pub.bind('tcp://127.0.0.1:5555')
148 150 xrep = context.socket(zmq.XREP)
149 151 xrep.bind('tcp://127.0.0.1:5556')
150 152
151 153 outstream = zmqstream.ZMQStream(pub, loop)
152 154 instream = zmqstream.ZMQStream(xrep, loop)
153 155
154 156 hb = HeartMonitor(loop, outstream, instream)
155 157
156 158 loop.start()
@@ -1,1047 +1,1045 b''
1 1 #!/usr/bin/env python
2 2 """The IPython Controller Hub with 0MQ
3 3 This is the master object that handles connections from engines and clients,
4 4 and monitors traffic through the various queues.
5 5 """
6 6 #-----------------------------------------------------------------------------
7 7 # Copyright (C) 2010 The IPython Development Team
8 8 #
9 9 # Distributed under the terms of the BSD License. The full license is in
10 10 # the file COPYING, distributed as part of this software.
11 11 #-----------------------------------------------------------------------------
12 12
13 13 #-----------------------------------------------------------------------------
14 14 # Imports
15 15 #-----------------------------------------------------------------------------
16 16 from __future__ import print_function
17 17
18 18 import sys
19 19 from datetime import datetime
20 20 import time
21 21 import logging
22 22
23 23 import zmq
24 24 from zmq.eventloop import ioloop
25 25 from zmq.eventloop.zmqstream import ZMQStream
26 26
27 27 # internal:
28 28 from IPython.config.configurable import Configurable
29 29 from IPython.utils.traitlets import HasTraits, Instance, Int, Str, Dict, Set, List, Bool
30 30 from IPython.utils.importstring import import_item
31 31
32 32 from entry_point import select_random_ports
33 from factory import RegistrationFactory
33 from factory import RegistrationFactory, LoggingFactory
34 34
35 35 from streamsession import Message, wrap_exception, ISO8601
36 36 from heartmonitor import HeartMonitor
37 37 from util import validate_url_container
38 38
39 39 try:
40 40 from pymongo.binary import Binary
41 41 except ImportError:
42 42 MongoDB=None
43 43 else:
44 44 from mongodb import MongoDB
45 45
46 46 #-----------------------------------------------------------------------------
47 47 # Code
48 48 #-----------------------------------------------------------------------------
49 49
50 50 def _passer(*args, **kwargs):
51 51 return
52 52
53 53 def _printer(*args, **kwargs):
54 54 print (args)
55 55 print (kwargs)
56 56
57 57 def init_record(msg):
58 58 """Initialize a TaskRecord based on a request."""
59 59 header = msg['header']
60 60 return {
61 61 'msg_id' : header['msg_id'],
62 62 'header' : header,
63 63 'content': msg['content'],
64 64 'buffers': msg['buffers'],
65 65 'submitted': datetime.strptime(header['date'], ISO8601),
66 66 'client_uuid' : None,
67 67 'engine_uuid' : None,
68 68 'started': None,
69 69 'completed': None,
70 70 'resubmitted': None,
71 71 'result_header' : None,
72 72 'result_content' : None,
73 73 'result_buffers' : None,
74 74 'queue' : None,
75 75 'pyin' : None,
76 76 'pyout': None,
77 77 'pyerr': None,
78 78 'stdout': '',
79 79 'stderr': '',
80 80 }
81 81
82 82
83 83 class EngineConnector(HasTraits):
84 84 """A simple object for accessing the various zmq connections of an object.
85 85 Attributes are:
86 86 id (int): engine ID
87 87 uuid (str): uuid (unused?)
88 88 queue (str): identity of queue's XREQ socket
89 89 registration (str): identity of registration XREQ socket
90 90 heartbeat (str): identity of heartbeat XREQ socket
91 91 """
92 92 id=Int(0)
93 93 queue=Str()
94 94 control=Str()
95 95 registration=Str()
96 96 heartbeat=Str()
97 97 pending=Set()
98
99 def __init__(self, **kwargs):
100 super(EngineConnector, self).__init__(**kwargs)
101 logging.info("engine::Engine Connected: %i"%self.id)
102 98
103 99 class HubFactory(RegistrationFactory):
104 100 """The Configurable for setting up a Hub."""
105 101
106 102 # port-pairs for monitoredqueues:
107 103 hb = Instance(list, config=True)
108 104 def _hb_default(self):
109 105 return select_random_ports(2)
110 106
111 107 mux = Instance(list, config=True)
112 108 def _mux_default(self):
113 109 return select_random_ports(2)
114 110
115 111 task = Instance(list, config=True)
116 112 def _task_default(self):
117 113 return select_random_ports(2)
118 114
119 115 control = Instance(list, config=True)
120 116 def _control_default(self):
121 117 return select_random_ports(2)
122 118
123 119 iopub = Instance(list, config=True)
124 120 def _iopub_default(self):
125 121 return select_random_ports(2)
126 122
127 123 # single ports:
128 124 mon_port = Instance(int, config=True)
129 125 def _mon_port_default(self):
130 126 return select_random_ports(1)[0]
131 127
132 128 query_port = Instance(int, config=True)
133 129 def _query_port_default(self):
134 130 return select_random_ports(1)[0]
135 131
136 132 notifier_port = Instance(int, config=True)
137 133 def _notifier_port_default(self):
138 134 return select_random_ports(1)[0]
139 135
140 136 ping = Int(1000, config=True) # ping frequency
141 137
142 138 engine_ip = Str('127.0.0.1', config=True)
143 139 engine_transport = Str('tcp', config=True)
144 140
145 141 client_ip = Str('127.0.0.1', config=True)
146 142 client_transport = Str('tcp', config=True)
147 143
148 144 monitor_ip = Str('127.0.0.1', config=True)
149 145 monitor_transport = Str('tcp', config=True)
150 146
151 147 monitor_url = Str('')
152 148
153 149 db_class = Str('IPython.zmq.parallel.dictdb.DictDB', config=True)
154 150
155 151 # not configurable
156 152 db = Instance('IPython.zmq.parallel.dictdb.BaseDB')
157 153 heartmonitor = Instance('IPython.zmq.parallel.heartmonitor.HeartMonitor')
158 154 subconstructors = List()
159 155 _constructed = Bool(False)
160 156
161 157 def _ip_changed(self, name, old, new):
162 158 self.engine_ip = new
163 159 self.client_ip = new
164 160 self.monitor_ip = new
165 161 self._update_monitor_url()
166 162
167 163 def _update_monitor_url(self):
168 164 self.monitor_url = "%s://%s:%i"%(self.monitor_transport, self.monitor_ip, self.mon_port)
169 165
170 166 def _transport_changed(self, name, old, new):
171 167 self.engine_transport = new
172 168 self.client_transport = new
173 169 self.monitor_transport = new
174 170 self._update_monitor_url()
175 171
176 172 def __init__(self, **kwargs):
177 173 super(HubFactory, self).__init__(**kwargs)
178 174 self._update_monitor_url()
179 175 # self.on_trait_change(self._sync_ips, 'ip')
180 176 # self.on_trait_change(self._sync_transports, 'transport')
181 177 self.subconstructors.append(self.construct_hub)
182 178
183 179
184 180 def construct(self):
185 181 assert not self._constructed, "already constructed!"
186 182
187 183 for subc in self.subconstructors:
188 184 subc()
189 185
190 186 self._constructed = True
191 187
192 188
193 189 def start(self):
194 190 assert self._constructed, "must be constructed by self.construct() first!"
195 191 self.heartmonitor.start()
196 logging.info("Heartmonitor started")
192 self.log.info("Heartmonitor started")
197 193
198 194 def construct_hub(self):
199 195 """construct"""
200 196 client_iface = "%s://%s:"%(self.client_transport, self.client_ip) + "%i"
201 197 engine_iface = "%s://%s:"%(self.engine_transport, self.engine_ip) + "%i"
202 198
203 199 ctx = self.context
204 200 loop = self.loop
205 201
206 202 # Registrar socket
207 203 reg = ZMQStream(ctx.socket(zmq.XREP), loop)
208 204 reg.bind(client_iface % self.regport)
209 logging.info("Hub listening on %s for registration."%(client_iface%self.regport))
205 self.log.info("Hub listening on %s for registration."%(client_iface%self.regport))
210 206 if self.client_ip != self.engine_ip:
211 207 reg.bind(engine_iface % self.regport)
212 logging.info("Hub listening on %s for registration."%(engine_iface%self.regport))
208 self.log.info("Hub listening on %s for registration."%(engine_iface%self.regport))
213 209
214 210 ### Engine connections ###
215 211
216 212 # heartbeat
217 213 hpub = ctx.socket(zmq.PUB)
218 214 hpub.bind(engine_iface % self.hb[0])
219 215 hrep = ctx.socket(zmq.XREP)
220 216 hrep.bind(engine_iface % self.hb[1])
221
222 self.heartmonitor = HeartMonitor(loop, ZMQStream(hpub,loop), ZMQStream(hrep,loop), self.ping)
217 self.heartmonitor = HeartMonitor(loop=loop, pingstream=ZMQStream(hpub,loop), pongstream=ZMQStream(hrep,loop),
218 period=self.ping, logname=self.log.name)
223 219
224 220 ### Client connections ###
225 221 # Clientele socket
226 222 c = ZMQStream(ctx.socket(zmq.XREP), loop)
227 223 c.bind(client_iface%self.query_port)
228 224 # Notifier socket
229 225 n = ZMQStream(ctx.socket(zmq.PUB), loop)
230 226 n.bind(client_iface%self.notifier_port)
231 227
232 228 ### build and launch the queues ###
233 229
234 230 # monitor socket
235 231 sub = ctx.socket(zmq.SUB)
236 232 sub.setsockopt(zmq.SUBSCRIBE, "")
237 233 sub.bind(self.monitor_url)
238 234 sub = ZMQStream(sub, loop)
239 235
240 236 # connect the db
241 237 self.db = import_item(self.db_class)()
242 238 time.sleep(.25)
243 239
244 240 # build connection dicts
245 241 self.engine_addrs = {
246 242 'control' : engine_iface%self.control[1],
247 243 'mux': engine_iface%self.mux[1],
248 244 'heartbeat': (engine_iface%self.hb[0], engine_iface%self.hb[1]),
249 245 'task' : engine_iface%self.task[1],
250 246 'iopub' : engine_iface%self.iopub[1],
251 247 # 'monitor' : engine_iface%self.mon_port,
252 248 }
253 249
254 250 self.client_addrs = {
255 251 'control' : client_iface%self.control[0],
256 252 'query': client_iface%self.query_port,
257 253 'mux': client_iface%self.mux[0],
258 254 'task' : client_iface%self.task[0],
259 255 'iopub' : client_iface%self.iopub[0],
260 256 'notification': client_iface%self.notifier_port
261 257 }
262 logging.debug("hub::Hub engine addrs: %s"%self.engine_addrs)
263 logging.debug("hub::Hub client addrs: %s"%self.client_addrs)
258 self.log.debug("hub::Hub engine addrs: %s"%self.engine_addrs)
259 self.log.debug("hub::Hub client addrs: %s"%self.client_addrs)
264 260 self.hub = Hub(loop=loop, session=self.session, monitor=sub, heartmonitor=self.heartmonitor,
265 261 registrar=reg, clientele=c, notifier=n, db=self.db,
266 engine_addrs=self.engine_addrs, client_addrs=self.client_addrs)
262 engine_addrs=self.engine_addrs, client_addrs=self.client_addrs,
263 logname=self.log.name)
267 264
268 265
269 class Hub(HasTraits):
266 class Hub(LoggingFactory):
270 267 """The IPython Controller Hub with 0MQ connections
271 268
272 269 Parameters
273 270 ==========
274 271 loop: zmq IOLoop instance
275 272 session: StreamSession object
276 273 <removed> context: zmq context for creating new connections (?)
277 274 queue: ZMQStream for monitoring the command queue (SUB)
278 275 registrar: ZMQStream for engine registration requests (XREP)
279 276 heartbeat: HeartMonitor object checking the pulse of the engines
280 277 clientele: ZMQStream for client connections (XREP)
281 278 not used for jobs, only query/control commands
282 279 notifier: ZMQStream for broadcasting engine registration changes (PUB)
283 280 db: connection to db for out of memory logging of commands
284 281 NotImplemented
285 282 engine_addrs: dict of zmq connection information for engines to connect
286 283 to the queues.
287 284 client_addrs: dict of zmq connection information for engines to connect
288 285 to the queues.
289 286 """
290 287 # internal data structures:
291 288 ids=Set() # engine IDs
292 289 keytable=Dict()
293 290 by_ident=Dict()
294 291 engines=Dict()
295 292 clients=Dict()
296 293 hearts=Dict()
297 294 pending=Set()
298 295 queues=Dict() # pending msg_ids keyed by engine_id
299 296 tasks=Dict() # pending msg_ids submitted as tasks, keyed by client_id
300 297 completed=Dict() # completed msg_ids keyed by engine_id
301 298 all_completed=Set() # completed msg_ids keyed by engine_id
302 299 # mia=None
303 300 incoming_registrations=Dict()
304 301 registration_timeout=Int()
305 302 _idcounter=Int(0)
306 303
307 304 # objects from constructor:
308 305 loop=Instance(ioloop.IOLoop)
309 306 registrar=Instance(ZMQStream)
310 307 clientele=Instance(ZMQStream)
311 308 monitor=Instance(ZMQStream)
312 309 heartmonitor=Instance(HeartMonitor)
313 310 notifier=Instance(ZMQStream)
314 311 db=Instance(object)
315 312 client_addrs=Dict()
316 313 engine_addrs=Dict()
317 314
318 315
319 316 def __init__(self, **kwargs):
320 317 """
321 318 # universal:
322 319 loop: IOLoop for creating future connections
323 320 session: streamsession for sending serialized data
324 321 # engine:
325 322 queue: ZMQStream for monitoring queue messages
326 323 registrar: ZMQStream for engine registration
327 324 heartbeat: HeartMonitor object for tracking engines
328 325 # client:
329 326 clientele: ZMQStream for client connections
330 327 # extra:
331 328 db: ZMQStream for db connection (NotImplemented)
332 329 engine_addrs: zmq address/protocol dict for engine connections
333 330 client_addrs: zmq address/protocol dict for client connections
334 331 """
335 332
336 333 super(Hub, self).__init__(**kwargs)
337 334 self.registration_timeout = max(5000, 2*self.heartmonitor.period)
338 335
339 336 # validate connection dicts:
340 337 validate_url_container(self.client_addrs)
341 338 validate_url_container(self.engine_addrs)
342 339
343 340 # register our callbacks
344 341 self.registrar.on_recv(self.dispatch_register_request)
345 342 self.clientele.on_recv(self.dispatch_client_msg)
346 343 self.monitor.on_recv(self.dispatch_monitor_traffic)
347 344
348 345 self.heartmonitor.add_heart_failure_handler(self.handle_heart_failure)
349 346 self.heartmonitor.add_new_heart_handler(self.handle_new_heart)
350 347
351 348 self.monitor_handlers = { 'in' : self.save_queue_request,
352 349 'out': self.save_queue_result,
353 350 'intask': self.save_task_request,
354 351 'outtask': self.save_task_result,
355 352 'tracktask': self.save_task_destination,
356 353 'incontrol': _passer,
357 354 'outcontrol': _passer,
358 355 'iopub': self.save_iopub_message,
359 356 }
360 357
361 358 self.client_handlers = {'queue_request': self.queue_status,
362 359 'result_request': self.get_results,
363 360 'purge_request': self.purge_results,
364 361 'load_request': self.check_load,
365 362 'resubmit_request': self.resubmit_task,
366 363 'shutdown_request': self.shutdown_request,
367 364 }
368 365
369 366 self.registrar_handlers = {'registration_request' : self.register_engine,
370 367 'unregistration_request' : self.unregister_engine,
371 368 'connection_request': self.connection_request,
372 369 }
373 370
374 logging.info("hub::created hub")
371 self.log.info("hub::created hub")
375 372
376 373 @property
377 374 def _next_id(self):
378 375 """gemerate a new ID.
379 376
380 377 No longer reuse old ids, just count from 0."""
381 378 newid = self._idcounter
382 379 self._idcounter += 1
383 380 return newid
384 381 # newid = 0
385 382 # incoming = [id[0] for id in self.incoming_registrations.itervalues()]
386 383 # # print newid, self.ids, self.incoming_registrations
387 384 # while newid in self.ids or newid in incoming:
388 385 # newid += 1
389 386 # return newid
390 387
391 388 #-----------------------------------------------------------------------------
392 389 # message validation
393 390 #-----------------------------------------------------------------------------
394 391
395 392 def _validate_targets(self, targets):
396 393 """turn any valid targets argument into a list of integer ids"""
397 394 if targets is None:
398 395 # default to all
399 396 targets = self.ids
400 397
401 398 if isinstance(targets, (int,str,unicode)):
402 399 # only one target specified
403 400 targets = [targets]
404 401 _targets = []
405 402 for t in targets:
406 403 # map raw identities to ids
407 404 if isinstance(t, (str,unicode)):
408 405 t = self.by_ident.get(t, t)
409 406 _targets.append(t)
410 407 targets = _targets
411 408 bad_targets = [ t for t in targets if t not in self.ids ]
412 409 if bad_targets:
413 410 raise IndexError("No Such Engine: %r"%bad_targets)
414 411 if not targets:
415 412 raise IndexError("No Engines Registered")
416 413 return targets
417 414
418 415 def _validate_client_msg(self, msg):
419 416 """validates and unpacks headers of a message. Returns False if invalid,
420 417 (ident, header, parent, content)"""
421 418 client_id = msg[0]
422 419 try:
423 420 msg = self.session.unpack_message(msg[1:], content=True)
424 421 except:
425 logging.error("client::Invalid Message %s"%msg, exc_info=True)
422 self.log.error("client::Invalid Message %s"%msg, exc_info=True)
426 423 return False
427 424
428 425 msg_type = msg.get('msg_type', None)
429 426 if msg_type is None:
430 427 return False
431 428 header = msg.get('header')
432 429 # session doesn't handle split content for now:
433 430 return client_id, msg
434 431
435 432
436 433 #-----------------------------------------------------------------------------
437 434 # dispatch methods (1 per stream)
438 435 #-----------------------------------------------------------------------------
439 436
440 437 def dispatch_register_request(self, msg):
441 438 """"""
442 logging.debug("registration::dispatch_register_request(%s)"%msg)
439 self.log.debug("registration::dispatch_register_request(%s)"%msg)
443 440 idents,msg = self.session.feed_identities(msg)
444 441 if not idents:
445 logging.error("Bad Queue Message: %s"%msg, exc_info=True)
442 self.log.error("Bad Queue Message: %s"%msg, exc_info=True)
446 443 return
447 444 try:
448 445 msg = self.session.unpack_message(msg,content=True)
449 446 except:
450 logging.error("registration::got bad registration message: %s"%msg, exc_info=True)
447 self.log.error("registration::got bad registration message: %s"%msg, exc_info=True)
451 448 return
452 449
453 450 msg_type = msg['msg_type']
454 451 content = msg['content']
455 452
456 453 handler = self.registrar_handlers.get(msg_type, None)
457 454 if handler is None:
458 logging.error("registration::got bad registration message: %s"%msg)
455 self.log.error("registration::got bad registration message: %s"%msg)
459 456 else:
460 457 handler(idents, msg)
461 458
462 459 def dispatch_monitor_traffic(self, msg):
463 460 """all ME and Task queue messages come through here, as well as
464 461 IOPub traffic."""
465 logging.debug("monitor traffic: %s"%msg[:2])
462 self.log.debug("monitor traffic: %s"%msg[:2])
466 463 switch = msg[0]
467 464 idents, msg = self.session.feed_identities(msg[1:])
468 465 if not idents:
469 logging.error("Bad Monitor Message: %s"%msg)
466 self.log.error("Bad Monitor Message: %s"%msg)
470 467 return
471 468 handler = self.monitor_handlers.get(switch, None)
472 469 if handler is not None:
473 470 handler(idents, msg)
474 471 else:
475 logging.error("Invalid monitor topic: %s"%switch)
472 self.log.error("Invalid monitor topic: %s"%switch)
476 473
477 474
478 475 def dispatch_client_msg(self, msg):
479 476 """Route messages from clients"""
480 477 idents, msg = self.session.feed_identities(msg)
481 478 if not idents:
482 logging.error("Bad Client Message: %s"%msg)
479 self.log.error("Bad Client Message: %s"%msg)
483 480 return
484 481 client_id = idents[0]
485 482 try:
486 483 msg = self.session.unpack_message(msg, content=True)
487 484 except:
488 485 content = wrap_exception()
489 logging.error("Bad Client Message: %s"%msg, exc_info=True)
486 self.log.error("Bad Client Message: %s"%msg, exc_info=True)
490 487 self.session.send(self.clientele, "hub_error", ident=client_id,
491 488 content=content)
492 489 return
493 490
494 491 # print client_id, header, parent, content
495 492 #switch on message type:
496 493 msg_type = msg['msg_type']
497 logging.info("client:: client %s requested %s"%(client_id, msg_type))
494 self.log.info("client:: client %s requested %s"%(client_id, msg_type))
498 495 handler = self.client_handlers.get(msg_type, None)
499 496 try:
500 497 assert handler is not None, "Bad Message Type: %s"%msg_type
501 498 except:
502 499 content = wrap_exception()
503 logging.error("Bad Message Type: %s"%msg_type, exc_info=True)
500 self.log.error("Bad Message Type: %s"%msg_type, exc_info=True)
504 501 self.session.send(self.clientele, "hub_error", ident=client_id,
505 502 content=content)
506 503 return
507 504 else:
508 505 handler(client_id, msg)
509 506
510 507 def dispatch_db(self, msg):
511 508 """"""
512 509 raise NotImplementedError
513 510
514 511 #---------------------------------------------------------------------------
515 512 # handler methods (1 per event)
516 513 #---------------------------------------------------------------------------
517 514
518 515 #----------------------- Heartbeat --------------------------------------
519 516
520 517 def handle_new_heart(self, heart):
521 518 """handler to attach to heartbeater.
522 519 Called when a new heart starts to beat.
523 520 Triggers completion of registration."""
524 logging.debug("heartbeat::handle_new_heart(%r)"%heart)
521 self.log.debug("heartbeat::handle_new_heart(%r)"%heart)
525 522 if heart not in self.incoming_registrations:
526 logging.info("heartbeat::ignoring new heart: %r"%heart)
523 self.log.info("heartbeat::ignoring new heart: %r"%heart)
527 524 else:
528 525 self.finish_registration(heart)
529 526
530 527
531 528 def handle_heart_failure(self, heart):
532 529 """handler to attach to heartbeater.
533 530 called when a previously registered heart fails to respond to beat request.
534 531 triggers unregistration"""
535 logging.debug("heartbeat::handle_heart_failure(%r)"%heart)
532 self.log.debug("heartbeat::handle_heart_failure(%r)"%heart)
536 533 eid = self.hearts.get(heart, None)
537 534 queue = self.engines[eid].queue
538 535 if eid is None:
539 logging.info("heartbeat::ignoring heart failure %r"%heart)
536 self.log.info("heartbeat::ignoring heart failure %r"%heart)
540 537 else:
541 538 self.unregister_engine(heart, dict(content=dict(id=eid, queue=queue)))
542 539
543 540 #----------------------- MUX Queue Traffic ------------------------------
544 541
545 542 def save_queue_request(self, idents, msg):
546 543 if len(idents) < 2:
547 logging.error("invalid identity prefix: %s"%idents)
544 self.log.error("invalid identity prefix: %s"%idents)
548 545 return
549 546 queue_id, client_id = idents[:2]
550 547 try:
551 548 msg = self.session.unpack_message(msg, content=False)
552 549 except:
553 logging.error("queue::client %r sent invalid message to %r: %s"%(client_id, queue_id, msg), exc_info=True)
550 self.log.error("queue::client %r sent invalid message to %r: %s"%(client_id, queue_id, msg), exc_info=True)
554 551 return
555 552
556 553 eid = self.by_ident.get(queue_id, None)
557 554 if eid is None:
558 logging.error("queue::target %r not registered"%queue_id)
559 logging.debug("queue:: valid are: %s"%(self.by_ident.keys()))
555 self.log.error("queue::target %r not registered"%queue_id)
556 self.log.debug("queue:: valid are: %s"%(self.by_ident.keys()))
560 557 return
561 558
562 559 header = msg['header']
563 560 msg_id = header['msg_id']
564 561 record = init_record(msg)
565 562 record['engine_uuid'] = queue_id
566 563 record['client_uuid'] = client_id
567 564 record['queue'] = 'mux'
568 565 if MongoDB is not None and isinstance(self.db, MongoDB):
569 566 record['buffers'] = map(Binary, record['buffers'])
570 567 self.pending.add(msg_id)
571 568 self.queues[eid].append(msg_id)
572 569 self.db.add_record(msg_id, record)
573 570
574 571 def save_queue_result(self, idents, msg):
575 572 if len(idents) < 2:
576 logging.error("invalid identity prefix: %s"%idents)
573 self.log.error("invalid identity prefix: %s"%idents)
577 574 return
578 575
579 576 client_id, queue_id = idents[:2]
580 577 try:
581 578 msg = self.session.unpack_message(msg, content=False)
582 579 except:
583 logging.error("queue::engine %r sent invalid message to %r: %s"%(
580 self.log.error("queue::engine %r sent invalid message to %r: %s"%(
584 581 queue_id,client_id, msg), exc_info=True)
585 582 return
586 583
587 584 eid = self.by_ident.get(queue_id, None)
588 585 if eid is None:
589 logging.error("queue::unknown engine %r is sending a reply: "%queue_id)
590 logging.debug("queue:: %s"%msg[2:])
586 self.log.error("queue::unknown engine %r is sending a reply: "%queue_id)
587 self.log.debug("queue:: %s"%msg[2:])
591 588 return
592 589
593 590 parent = msg['parent_header']
594 591 if not parent:
595 592 return
596 593 msg_id = parent['msg_id']
597 594 if msg_id in self.pending:
598 595 self.pending.remove(msg_id)
599 596 self.all_completed.add(msg_id)
600 597 self.queues[eid].remove(msg_id)
601 598 self.completed[eid].append(msg_id)
602 599 rheader = msg['header']
603 600 completed = datetime.strptime(rheader['date'], ISO8601)
604 601 started = rheader.get('started', None)
605 602 if started is not None:
606 603 started = datetime.strptime(started, ISO8601)
607 604 result = {
608 605 'result_header' : rheader,
609 606 'result_content': msg['content'],
610 607 'started' : started,
611 608 'completed' : completed
612 609 }
613 610 if MongoDB is not None and isinstance(self.db, MongoDB):
614 611 result['result_buffers'] = map(Binary, msg['buffers'])
615 612 else:
616 613 result['result_buffers'] = msg['buffers']
617 614 self.db.update_record(msg_id, result)
618 615 else:
619 logging.debug("queue:: unknown msg finished %s"%msg_id)
616 self.log.debug("queue:: unknown msg finished %s"%msg_id)
620 617
621 618 #--------------------- Task Queue Traffic ------------------------------
622 619
623 620 def save_task_request(self, idents, msg):
624 621 """Save the submission of a task."""
625 622 client_id = idents[0]
626 623
627 624 try:
628 625 msg = self.session.unpack_message(msg, content=False)
629 626 except:
630 logging.error("task::client %r sent invalid task message: %s"%(
627 self.log.error("task::client %r sent invalid task message: %s"%(
631 628 client_id, msg), exc_info=True)
632 629 return
633 630 record = init_record(msg)
634 631 if MongoDB is not None and isinstance(self.db, MongoDB):
635 632 record['buffers'] = map(Binary, record['buffers'])
636 633 record['client_uuid'] = client_id
637 634 record['queue'] = 'task'
638 635 header = msg['header']
639 636 msg_id = header['msg_id']
640 637 self.pending.add(msg_id)
641 638 self.db.add_record(msg_id, record)
642 639
643 640 def save_task_result(self, idents, msg):
644 641 """save the result of a completed task."""
645 642 client_id = idents[0]
646 643 try:
647 644 msg = self.session.unpack_message(msg, content=False)
648 645 except:
649 logging.error("task::invalid task result message send to %r: %s"%(
646 self.log.error("task::invalid task result message send to %r: %s"%(
650 647 client_id, msg), exc_info=True)
651 648 raise
652 649 return
653 650
654 651 parent = msg['parent_header']
655 652 if not parent:
656 653 # print msg
657 logging.warn("Task %r had no parent!"%msg)
654 self.log.warn("Task %r had no parent!"%msg)
658 655 return
659 656 msg_id = parent['msg_id']
660 657
661 658 header = msg['header']
662 659 engine_uuid = header.get('engine', None)
663 660 eid = self.by_ident.get(engine_uuid, None)
664 661
665 662 if msg_id in self.pending:
666 663 self.pending.remove(msg_id)
667 664 self.all_completed.add(msg_id)
668 665 if eid is not None:
669 666 self.completed[eid].append(msg_id)
670 667 if msg_id in self.tasks[eid]:
671 668 self.tasks[eid].remove(msg_id)
672 669 completed = datetime.strptime(header['date'], ISO8601)
673 670 started = header.get('started', None)
674 671 if started is not None:
675 672 started = datetime.strptime(started, ISO8601)
676 673 result = {
677 674 'result_header' : header,
678 675 'result_content': msg['content'],
679 676 'started' : started,
680 677 'completed' : completed,
681 678 'engine_uuid': engine_uuid
682 679 }
683 680 if MongoDB is not None and isinstance(self.db, MongoDB):
684 681 result['result_buffers'] = map(Binary, msg['buffers'])
685 682 else:
686 683 result['result_buffers'] = msg['buffers']
687 684 self.db.update_record(msg_id, result)
688 685
689 686 else:
690 logging.debug("task::unknown task %s finished"%msg_id)
687 self.log.debug("task::unknown task %s finished"%msg_id)
691 688
692 689 def save_task_destination(self, idents, msg):
693 690 try:
694 691 msg = self.session.unpack_message(msg, content=True)
695 692 except:
696 logging.error("task::invalid task tracking message", exc_info=True)
693 self.log.error("task::invalid task tracking message", exc_info=True)
697 694 return
698 695 content = msg['content']
699 696 print (content)
700 697 msg_id = content['msg_id']
701 698 engine_uuid = content['engine_id']
702 699 eid = self.by_ident[engine_uuid]
703 700
704 logging.info("task::task %s arrived on %s"%(msg_id, eid))
701 self.log.info("task::task %s arrived on %s"%(msg_id, eid))
705 702 # if msg_id in self.mia:
706 703 # self.mia.remove(msg_id)
707 704 # else:
708 # logging.debug("task::task %s not listed as MIA?!"%(msg_id))
705 # self.log.debug("task::task %s not listed as MIA?!"%(msg_id))
709 706
710 707 self.tasks[eid].append(msg_id)
711 708 # self.pending[msg_id][1].update(received=datetime.now(),engine=(eid,engine_uuid))
712 709 self.db.update_record(msg_id, dict(engine_uuid=engine_uuid))
713 710
714 711 def mia_task_request(self, idents, msg):
715 712 raise NotImplementedError
716 713 client_id = idents[0]
717 714 # content = dict(mia=self.mia,status='ok')
718 715 # self.session.send('mia_reply', content=content, idents=client_id)
719 716
720 717
721 718 #--------------------- IOPub Traffic ------------------------------
722 719
723 720 def save_iopub_message(self, topics, msg):
724 721 """save an iopub message into the db"""
725 722 print (topics)
726 723 try:
727 724 msg = self.session.unpack_message(msg, content=True)
728 725 except:
729 logging.error("iopub::invalid IOPub message", exc_info=True)
726 self.log.error("iopub::invalid IOPub message", exc_info=True)
730 727 return
731 728
732 729 parent = msg['parent_header']
733 730 if not parent:
734 logging.error("iopub::invalid IOPub message: %s"%msg)
731 self.log.error("iopub::invalid IOPub message: %s"%msg)
735 732 return
736 733 msg_id = parent['msg_id']
737 734 msg_type = msg['msg_type']
738 735 content = msg['content']
739 736
740 737 # ensure msg_id is in db
741 738 try:
742 739 rec = self.db.get_record(msg_id)
743 740 except:
744 logging.error("iopub::IOPub message has invalid parent", exc_info=True)
741 self.log.error("iopub::IOPub message has invalid parent", exc_info=True)
745 742 return
746 743 # stream
747 744 d = {}
748 745 if msg_type == 'stream':
749 746 name = content['name']
750 747 s = rec[name] or ''
751 748 d[name] = s + content['data']
752 749
753 750 elif msg_type == 'pyerr':
754 751 d['pyerr'] = content
755 752 else:
756 753 d[msg_type] = content['data']
757 754
758 755 self.db.update_record(msg_id, d)
759 756
760 757
761 758
762 759 #-------------------------------------------------------------------------
763 760 # Registration requests
764 761 #-------------------------------------------------------------------------
765 762
766 763 def connection_request(self, client_id, msg):
767 764 """Reply with connection addresses for clients."""
768 logging.info("client::client %s connected"%client_id)
765 self.log.info("client::client %s connected"%client_id)
769 766 content = dict(status='ok')
770 767 content.update(self.client_addrs)
771 768 jsonable = {}
772 769 for k,v in self.keytable.iteritems():
773 770 jsonable[str(k)] = v
774 771 content['engines'] = jsonable
775 772 self.session.send(self.registrar, 'connection_reply', content, parent=msg, ident=client_id)
776 773
777 774 def register_engine(self, reg, msg):
778 775 """Register a new engine."""
779 776 content = msg['content']
780 777 try:
781 778 queue = content['queue']
782 779 except KeyError:
783 logging.error("registration::queue not specified", exc_info=True)
780 self.log.error("registration::queue not specified", exc_info=True)
784 781 return
785 782 heart = content.get('heartbeat', None)
786 783 """register a new engine, and create the socket(s) necessary"""
787 784 eid = self._next_id
788 785 # print (eid, queue, reg, heart)
789 786
790 logging.debug("registration::register_engine(%i, %r, %r, %r)"%(eid, queue, reg, heart))
787 self.log.debug("registration::register_engine(%i, %r, %r, %r)"%(eid, queue, reg, heart))
791 788
792 789 content = dict(id=eid,status='ok')
793 790 content.update(self.engine_addrs)
794 791 # check if requesting available IDs:
795 792 if queue in self.by_ident:
796 793 try:
797 794 raise KeyError("queue_id %r in use"%queue)
798 795 except:
799 796 content = wrap_exception()
800 logging.error("queue_id %r in use"%queue, exc_info=True)
797 self.log.error("queue_id %r in use"%queue, exc_info=True)
801 798 elif heart in self.hearts: # need to check unique hearts?
802 799 try:
803 800 raise KeyError("heart_id %r in use"%heart)
804 801 except:
805 logging.error("heart_id %r in use"%heart, exc_info=True)
802 self.log.error("heart_id %r in use"%heart, exc_info=True)
806 803 content = wrap_exception()
807 804 else:
808 805 for h, pack in self.incoming_registrations.iteritems():
809 806 if heart == h:
810 807 try:
811 808 raise KeyError("heart_id %r in use"%heart)
812 809 except:
813 logging.error("heart_id %r in use"%heart, exc_info=True)
810 self.log.error("heart_id %r in use"%heart, exc_info=True)
814 811 content = wrap_exception()
815 812 break
816 813 elif queue == pack[1]:
817 814 try:
818 815 raise KeyError("queue_id %r in use"%queue)
819 816 except:
820 logging.error("queue_id %r in use"%queue, exc_info=True)
817 self.log.error("queue_id %r in use"%queue, exc_info=True)
821 818 content = wrap_exception()
822 819 break
823 820
824 821 msg = self.session.send(self.registrar, "registration_reply",
825 822 content=content,
826 823 ident=reg)
827 824
828 825 if content['status'] == 'ok':
829 826 if heart in self.heartmonitor.hearts:
830 827 # already beating
831 828 self.incoming_registrations[heart] = (eid,queue,reg[0],None)
832 829 self.finish_registration(heart)
833 830 else:
834 831 purge = lambda : self._purge_stalled_registration(heart)
835 832 dc = ioloop.DelayedCallback(purge, self.registration_timeout, self.loop)
836 833 dc.start()
837 834 self.incoming_registrations[heart] = (eid,queue,reg[0],dc)
838 835 else:
839 logging.error("registration::registration %i failed: %s"%(eid, content['evalue']))
836 self.log.error("registration::registration %i failed: %s"%(eid, content['evalue']))
840 837 return eid
841 838
842 839 def unregister_engine(self, ident, msg):
843 840 """Unregister an engine that explicitly requested to leave."""
844 841 try:
845 842 eid = msg['content']['id']
846 843 except:
847 logging.error("registration::bad engine id for unregistration: %s"%ident, exc_info=True)
844 self.log.error("registration::bad engine id for unregistration: %s"%ident, exc_info=True)
848 845 return
849 logging.info("registration::unregister_engine(%s)"%eid)
846 self.log.info("registration::unregister_engine(%s)"%eid)
850 847 content=dict(id=eid, queue=self.engines[eid].queue)
851 848 self.ids.remove(eid)
852 849 self.keytable.pop(eid)
853 850 ec = self.engines.pop(eid)
854 851 self.hearts.pop(ec.heartbeat)
855 852 self.by_ident.pop(ec.queue)
856 853 self.completed.pop(eid)
857 854 for msg_id in self.queues.pop(eid):
858 855 msg = self.pending.remove(msg_id)
859 856 ############## TODO: HANDLE IT ################
860 857
861 858 if self.notifier:
862 859 self.session.send(self.notifier, "unregistration_notification", content=content)
863 860
864 861 def finish_registration(self, heart):
865 862 """Second half of engine registration, called after our HeartMonitor
866 863 has received a beat from the Engine's Heart."""
867 864 try:
868 865 (eid,queue,reg,purge) = self.incoming_registrations.pop(heart)
869 866 except KeyError:
870 logging.error("registration::tried to finish nonexistant registration", exc_info=True)
867 self.log.error("registration::tried to finish nonexistant registration", exc_info=True)
871 868 return
872 logging.info("registration::finished registering engine %i:%r"%(eid,queue))
869 self.log.info("registration::finished registering engine %i:%r"%(eid,queue))
873 870 if purge is not None:
874 871 purge.stop()
875 872 control = queue
876 873 self.ids.add(eid)
877 874 self.keytable[eid] = queue
878 875 self.engines[eid] = EngineConnector(id=eid, queue=queue, registration=reg,
879 876 control=control, heartbeat=heart)
880 877 self.by_ident[queue] = eid
881 878 self.queues[eid] = list()
882 879 self.tasks[eid] = list()
883 880 self.completed[eid] = list()
884 881 self.hearts[heart] = eid
885 882 content = dict(id=eid, queue=self.engines[eid].queue)
886 883 if self.notifier:
887 884 self.session.send(self.notifier, "registration_notification", content=content)
885 self.log.info("engine::Engine Connected: %i"%eid)
888 886
889 887 def _purge_stalled_registration(self, heart):
890 888 if heart in self.incoming_registrations:
891 889 eid = self.incoming_registrations.pop(heart)[0]
892 logging.info("registration::purging stalled registration: %i"%eid)
890 self.log.info("registration::purging stalled registration: %i"%eid)
893 891 else:
894 892 pass
895 893
896 894 #-------------------------------------------------------------------------
897 895 # Client Requests
898 896 #-------------------------------------------------------------------------
899 897
900 898 def shutdown_request(self, client_id, msg):
901 899 """handle shutdown request."""
902 900 # s = self.context.socket(zmq.XREQ)
903 901 # s.connect(self.client_connections['mux'])
904 902 # time.sleep(0.1)
905 903 # for eid,ec in self.engines.iteritems():
906 904 # self.session.send(s, 'shutdown_request', content=dict(restart=False), ident=ec.queue)
907 905 # time.sleep(1)
908 906 self.session.send(self.clientele, 'shutdown_reply', content={'status': 'ok'}, ident=client_id)
909 907 dc = ioloop.DelayedCallback(lambda : self._shutdown(), 1000, self.loop)
910 908 dc.start()
911 909
912 910 def _shutdown(self):
913 logging.info("hub::hub shutting down.")
911 self.log.info("hub::hub shutting down.")
914 912 time.sleep(0.1)
915 913 sys.exit(0)
916 914
917 915
918 916 def check_load(self, client_id, msg):
919 917 content = msg['content']
920 918 try:
921 919 targets = content['targets']
922 920 targets = self._validate_targets(targets)
923 921 except:
924 922 content = wrap_exception()
925 923 self.session.send(self.clientele, "hub_error",
926 924 content=content, ident=client_id)
927 925 return
928 926
929 927 content = dict(status='ok')
930 928 # loads = {}
931 929 for t in targets:
932 930 content[bytes(t)] = len(self.queues[t])+len(self.tasks[t])
933 931 self.session.send(self.clientele, "load_reply", content=content, ident=client_id)
934 932
935 933
936 934 def queue_status(self, client_id, msg):
937 935 """Return the Queue status of one or more targets.
938 936 if verbose: return the msg_ids
939 937 else: return len of each type.
940 938 keys: queue (pending MUX jobs)
941 939 tasks (pending Task jobs)
942 940 completed (finished jobs from both queues)"""
943 941 content = msg['content']
944 942 targets = content['targets']
945 943 try:
946 944 targets = self._validate_targets(targets)
947 945 except:
948 946 content = wrap_exception()
949 947 self.session.send(self.clientele, "hub_error",
950 948 content=content, ident=client_id)
951 949 return
952 950 verbose = content.get('verbose', False)
953 951 content = dict(status='ok')
954 952 for t in targets:
955 953 queue = self.queues[t]
956 954 completed = self.completed[t]
957 955 tasks = self.tasks[t]
958 956 if not verbose:
959 957 queue = len(queue)
960 958 completed = len(completed)
961 959 tasks = len(tasks)
962 960 content[bytes(t)] = {'queue': queue, 'completed': completed , 'tasks': tasks}
963 961 # pending
964 962 self.session.send(self.clientele, "queue_reply", content=content, ident=client_id)
965 963
966 964 def purge_results(self, client_id, msg):
967 965 """Purge results from memory. This method is more valuable before we move
968 966 to a DB based message storage mechanism."""
969 967 content = msg['content']
970 968 msg_ids = content.get('msg_ids', [])
971 969 reply = dict(status='ok')
972 970 if msg_ids == 'all':
973 971 self.db.drop_matching_records(dict(completed={'$ne':None}))
974 972 else:
975 973 for msg_id in msg_ids:
976 974 if msg_id in self.all_completed:
977 975 self.db.drop_record(msg_id)
978 976 else:
979 977 if msg_id in self.pending:
980 978 try:
981 979 raise IndexError("msg pending: %r"%msg_id)
982 980 except:
983 981 reply = wrap_exception()
984 982 else:
985 983 try:
986 984 raise IndexError("No such msg: %r"%msg_id)
987 985 except:
988 986 reply = wrap_exception()
989 987 break
990 988 eids = content.get('engine_ids', [])
991 989 for eid in eids:
992 990 if eid not in self.engines:
993 991 try:
994 992 raise IndexError("No such engine: %i"%eid)
995 993 except:
996 994 reply = wrap_exception()
997 995 break
998 996 msg_ids = self.completed.pop(eid)
999 997 uid = self.engines[eid].queue
1000 998 self.db.drop_matching_records(dict(engine_uuid=uid, completed={'$ne':None}))
1001 999
1002 1000 self.session.send(self.clientele, 'purge_reply', content=reply, ident=client_id)
1003 1001
1004 1002 def resubmit_task(self, client_id, msg, buffers):
1005 1003 """Resubmit a task."""
1006 1004 raise NotImplementedError
1007 1005
1008 1006 def get_results(self, client_id, msg):
1009 1007 """Get the result of 1 or more messages."""
1010 1008 content = msg['content']
1011 1009 msg_ids = sorted(set(content['msg_ids']))
1012 1010 statusonly = content.get('status_only', False)
1013 1011 pending = []
1014 1012 completed = []
1015 1013 content = dict(status='ok')
1016 1014 content['pending'] = pending
1017 1015 content['completed'] = completed
1018 1016 buffers = []
1019 1017 if not statusonly:
1020 1018 content['results'] = {}
1021 1019 records = self.db.find_records(dict(msg_id={'$in':msg_ids}))
1022 1020 for msg_id in msg_ids:
1023 1021 if msg_id in self.pending:
1024 1022 pending.append(msg_id)
1025 1023 elif msg_id in self.all_completed:
1026 1024 completed.append(msg_id)
1027 1025 if not statusonly:
1028 1026 rec = records[msg_id]
1029 1027 io_dict = {}
1030 1028 for key in 'pyin pyout pyerr stdout stderr'.split():
1031 1029 io_dict[key] = rec[key]
1032 1030 content[msg_id] = { 'result_content': rec['result_content'],
1033 1031 'header': rec['header'],
1034 1032 'result_header' : rec['result_header'],
1035 1033 'io' : io_dict,
1036 1034 }
1037 1035 buffers.extend(map(str, rec['result_buffers']))
1038 1036 else:
1039 1037 try:
1040 1038 raise KeyError('No such message: '+msg_id)
1041 1039 except:
1042 1040 content = wrap_exception()
1043 1041 break
1044 1042 self.session.send(self.clientele, "result_reply", content=content,
1045 1043 parent=msg, ident=client_id,
1046 1044 buffers=buffers)
1047 1045
@@ -1,502 +1,503 b''
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3 """
4 4 The ipcluster application.
5 5 """
6 6
7 7 #-----------------------------------------------------------------------------
8 8 # Copyright (C) 2008-2009 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING, distributed as part of this software.
12 12 #-----------------------------------------------------------------------------
13 13
14 14 #-----------------------------------------------------------------------------
15 15 # Imports
16 16 #-----------------------------------------------------------------------------
17 17
18 18 import logging
19 19 import os
20 20 import signal
21 21 import logging
22 22
23 23 from zmq.eventloop import ioloop
24 24
25 25 from IPython.external.argparse import ArgumentParser, SUPPRESS
26 26 from IPython.utils.importstring import import_item
27 27 from IPython.zmq.parallel.clusterdir import (
28 28 ApplicationWithClusterDir, ClusterDirConfigLoader,
29 29 ClusterDirError, PIDFileError
30 30 )
31 31
32 32
33 33 #-----------------------------------------------------------------------------
34 34 # Module level variables
35 35 #-----------------------------------------------------------------------------
36 36
37 37
38 38 default_config_file_name = u'ipcluster_config.py'
39 39
40 40
41 41 _description = """\
42 42 Start an IPython cluster for parallel computing.\n\n
43 43
44 44 An IPython cluster consists of 1 controller and 1 or more engines.
45 45 This command automates the startup of these processes using a wide
46 46 range of startup methods (SSH, local processes, PBS, mpiexec,
47 47 Windows HPC Server 2008). To start a cluster with 4 engines on your
48 48 local host simply do 'ipcluster start -n 4'. For more complex usage
49 49 you will typically do 'ipcluster create -p mycluster', then edit
50 50 configuration files, followed by 'ipcluster start -p mycluster -n 4'.
51 51 """
52 52
53 53
54 54 # Exit codes for ipcluster
55 55
56 56 # This will be the exit code if the ipcluster appears to be running because
57 57 # a .pid file exists
58 58 ALREADY_STARTED = 10
59 59
60 60
61 61 # This will be the exit code if ipcluster stop is run, but there is not .pid
62 62 # file to be found.
63 63 ALREADY_STOPPED = 11
64 64
65 65
66 66 #-----------------------------------------------------------------------------
67 67 # Command line options
68 68 #-----------------------------------------------------------------------------
69 69
70 70
71 71 class IPClusterAppConfigLoader(ClusterDirConfigLoader):
72 72
73 73 def _add_arguments(self):
74 74 # Don't call ClusterDirConfigLoader._add_arguments as we don't want
75 75 # its defaults on self.parser. Instead, we will put those on
76 76 # default options on our subparsers.
77 77
78 78 # This has all the common options that all subcommands use
79 79 parent_parser1 = ArgumentParser(
80 80 add_help=False,
81 81 argument_default=SUPPRESS
82 82 )
83 83 self._add_ipython_dir(parent_parser1)
84 84 self._add_log_level(parent_parser1)
85 85
86 86 # This has all the common options that other subcommands use
87 87 parent_parser2 = ArgumentParser(
88 88 add_help=False,
89 89 argument_default=SUPPRESS
90 90 )
91 91 self._add_cluster_profile(parent_parser2)
92 92 self._add_cluster_dir(parent_parser2)
93 93 self._add_work_dir(parent_parser2)
94 94 paa = parent_parser2.add_argument
95 95 paa('--log-to-file',
96 96 action='store_true', dest='Global.log_to_file',
97 97 help='Log to a file in the log directory (default is stdout)')
98 98
99 99 # Create the object used to create the subparsers.
100 100 subparsers = self.parser.add_subparsers(
101 101 dest='Global.subcommand',
102 102 title='ipcluster subcommands',
103 103 description=
104 104 """ipcluster has a variety of subcommands. The general way of
105 105 running ipcluster is 'ipcluster <cmd> [options]'. To get help
106 106 on a particular subcommand do 'ipcluster <cmd> -h'."""
107 107 # help="For more help, type 'ipcluster <cmd> -h'",
108 108 )
109 109
110 110 # The "list" subcommand parser
111 111 parser_list = subparsers.add_parser(
112 112 'list',
113 113 parents=[parent_parser1],
114 114 argument_default=SUPPRESS,
115 115 help="List all clusters in cwd and ipython_dir.",
116 116 description=
117 117 """List all available clusters, by cluster directory, that can
118 118 be found in the current working directly or in the ipython
119 119 directory. Cluster directories are named using the convention
120 120 'cluster_<profile>'."""
121 121 )
122 122
123 123 # The "create" subcommand parser
124 124 parser_create = subparsers.add_parser(
125 125 'create',
126 126 parents=[parent_parser1, parent_parser2],
127 127 argument_default=SUPPRESS,
128 128 help="Create a new cluster directory.",
129 129 description=
130 130 """Create an ipython cluster directory by its profile name or
131 131 cluster directory path. Cluster directories contain
132 132 configuration, log and security related files and are named
133 133 using the convention 'cluster_<profile>'. By default they are
134 134 located in your ipython directory. Once created, you will
135 135 probably need to edit the configuration files in the cluster
136 136 directory to configure your cluster. Most users will create a
137 137 cluster directory by profile name,
138 138 'ipcluster create -p mycluster', which will put the directory
139 139 in '<ipython_dir>/cluster_mycluster'.
140 140 """
141 141 )
142 142 paa = parser_create.add_argument
143 143 paa('--reset-config',
144 144 dest='Global.reset_config', action='store_true',
145 145 help=
146 146 """Recopy the default config files to the cluster directory.
147 147 You will loose any modifications you have made to these files.""")
148 148
149 149 # The "start" subcommand parser
150 150 parser_start = subparsers.add_parser(
151 151 'start',
152 152 parents=[parent_parser1, parent_parser2],
153 153 argument_default=SUPPRESS,
154 154 help="Start a cluster.",
155 155 description=
156 156 """Start an ipython cluster by its profile name or cluster
157 157 directory. Cluster directories contain configuration, log and
158 158 security related files and are named using the convention
159 159 'cluster_<profile>' and should be creating using the 'start'
160 160 subcommand of 'ipcluster'. If your cluster directory is in
161 161 the cwd or the ipython directory, you can simply refer to it
162 162 using its profile name, 'ipcluster start -n 4 -p <profile>`,
163 163 otherwise use the '--cluster-dir' option.
164 164 """
165 165 )
166 166 paa = parser_start.add_argument
167 167 paa('-n', '--number',
168 168 type=int, dest='Global.n',
169 169 help='The number of engines to start.',
170 170 metavar='Global.n')
171 171 paa('--clean-logs',
172 172 dest='Global.clean_logs', action='store_true',
173 173 help='Delete old log flies before starting.')
174 174 paa('--no-clean-logs',
175 175 dest='Global.clean_logs', action='store_false',
176 176 help="Don't delete old log flies before starting.")
177 177 paa('--daemon',
178 178 dest='Global.daemonize', action='store_true',
179 179 help='Daemonize the ipcluster program. This implies --log-to-file')
180 180 paa('--no-daemon',
181 181 dest='Global.daemonize', action='store_false',
182 182 help="Dont't daemonize the ipcluster program.")
183 183
184 184 # The "stop" subcommand parser
185 185 parser_stop = subparsers.add_parser(
186 186 'stop',
187 187 parents=[parent_parser1, parent_parser2],
188 188 argument_default=SUPPRESS,
189 189 help="Stop a running cluster.",
190 190 description=
191 191 """Stop a running ipython cluster by its profile name or cluster
192 192 directory. Cluster directories are named using the convention
193 193 'cluster_<profile>'. If your cluster directory is in
194 194 the cwd or the ipython directory, you can simply refer to it
195 195 using its profile name, 'ipcluster stop -p <profile>`, otherwise
196 196 use the '--cluster-dir' option.
197 197 """
198 198 )
199 199 paa = parser_stop.add_argument
200 200 paa('--signal',
201 201 dest='Global.signal', type=int,
202 202 help="The signal number to use in stopping the cluster (default=2).",
203 203 metavar="Global.signal")
204 204
205 205
206 206 #-----------------------------------------------------------------------------
207 207 # Main application
208 208 #-----------------------------------------------------------------------------
209 209
210 210
211 211 class IPClusterApp(ApplicationWithClusterDir):
212 212
213 213 name = u'ipclusterz'
214 214 description = _description
215 215 usage = None
216 216 command_line_loader = IPClusterAppConfigLoader
217 217 default_config_file_name = default_config_file_name
218 218 default_log_level = logging.INFO
219 219 auto_create_cluster_dir = False
220 220
221 221 def create_default_config(self):
222 222 super(IPClusterApp, self).create_default_config()
223 223 self.default_config.Global.controller_launcher = \
224 224 'IPython.zmq.parallel.launcher.LocalControllerLauncher'
225 225 self.default_config.Global.engine_launcher = \
226 226 'IPython.zmq.parallel.launcher.LocalEngineSetLauncher'
227 227 self.default_config.Global.n = 2
228 228 self.default_config.Global.reset_config = False
229 229 self.default_config.Global.clean_logs = True
230 230 self.default_config.Global.signal = 2
231 231 self.default_config.Global.daemonize = False
232 232
233 233 def find_resources(self):
234 234 subcommand = self.command_line_config.Global.subcommand
235 235 if subcommand=='list':
236 236 self.list_cluster_dirs()
237 237 # Exit immediately because there is nothing left to do.
238 238 self.exit()
239 239 elif subcommand=='create':
240 240 self.auto_create_cluster_dir = True
241 241 super(IPClusterApp, self).find_resources()
242 242 elif subcommand=='start' or subcommand=='stop':
243 243 self.auto_create_cluster_dir = True
244 244 try:
245 245 super(IPClusterApp, self).find_resources()
246 246 except ClusterDirError:
247 247 raise ClusterDirError(
248 248 "Could not find a cluster directory. A cluster dir must "
249 249 "be created before running 'ipcluster start'. Do "
250 250 "'ipcluster create -h' or 'ipcluster list -h' for more "
251 251 "information about creating and listing cluster dirs."
252 252 )
253 253
254 254 def list_cluster_dirs(self):
255 255 # Find the search paths
256 256 cluster_dir_paths = os.environ.get('IPCLUSTER_DIR_PATH','')
257 257 if cluster_dir_paths:
258 258 cluster_dir_paths = cluster_dir_paths.split(':')
259 259 else:
260 260 cluster_dir_paths = []
261 261 try:
262 262 ipython_dir = self.command_line_config.Global.ipython_dir
263 263 except AttributeError:
264 264 ipython_dir = self.default_config.Global.ipython_dir
265 265 paths = [os.getcwd(), ipython_dir] + \
266 266 cluster_dir_paths
267 267 paths = list(set(paths))
268 268
269 269 self.log.info('Searching for cluster dirs in paths: %r' % paths)
270 270 for path in paths:
271 271 files = os.listdir(path)
272 272 for f in files:
273 273 full_path = os.path.join(path, f)
274 274 if os.path.isdir(full_path) and f.startswith('cluster_'):
275 275 profile = full_path.split('_')[-1]
276 276 start_cmd = 'ipcluster start -p %s -n 4' % profile
277 277 print start_cmd + " ==> " + full_path
278 278
279 279 def pre_construct(self):
280 280 # IPClusterApp.pre_construct() is where we cd to the working directory.
281 281 super(IPClusterApp, self).pre_construct()
282 282 config = self.master_config
283 283 try:
284 284 daemon = config.Global.daemonize
285 285 if daemon:
286 286 config.Global.log_to_file = True
287 287 except AttributeError:
288 288 pass
289 289
290 290 def construct(self):
291 291 config = self.master_config
292 292 subcmd = config.Global.subcommand
293 293 reset = config.Global.reset_config
294 294 if subcmd == 'list':
295 295 return
296 296 if subcmd == 'create':
297 297 self.log.info('Copying default config files to cluster directory '
298 298 '[overwrite=%r]' % (reset,))
299 299 self.cluster_dir_obj.copy_all_config_files(overwrite=reset)
300 300 if subcmd =='start':
301 301 self.cluster_dir_obj.copy_all_config_files(overwrite=False)
302 302 self.start_logging()
303 303 self.loop = ioloop.IOLoop.instance()
304 304 # reactor.callWhenRunning(self.start_launchers)
305 305 dc = ioloop.DelayedCallback(self.start_launchers, 0, self.loop)
306 306 dc.start()
307 307
308 308 def start_launchers(self):
309 309 config = self.master_config
310 310
311 311 # Create the launchers. In both bases, we set the work_dir of
312 312 # the launcher to the cluster_dir. This is where the launcher's
313 313 # subprocesses will be launched. It is not where the controller
314 314 # and engine will be launched.
315 315 el_class = import_item(config.Global.engine_launcher)
316 316 self.engine_launcher = el_class(
317 work_dir=self.cluster_dir, config=config
317 work_dir=self.cluster_dir, config=config, logname=self.log.name
318 318 )
319 319 cl_class = import_item(config.Global.controller_launcher)
320 320 self.controller_launcher = cl_class(
321 work_dir=self.cluster_dir, config=config
321 work_dir=self.cluster_dir, config=config,
322 logname=self.log.name
322 323 )
323 324
324 325 # Setup signals
325 326 signal.signal(signal.SIGINT, self.sigint_handler)
326 327
327 328 # Setup the observing of stopping. If the controller dies, shut
328 329 # everything down as that will be completely fatal for the engines.
329 330 self.controller_launcher.on_stop(self.stop_launchers)
330 331 # d1.addCallback(self.stop_launchers)
331 332 # But, we don't monitor the stopping of engines. An engine dying
332 333 # is just fine and in principle a user could start a new engine.
333 334 # Also, if we did monitor engine stopping, it is difficult to
334 335 # know what to do when only some engines die. Currently, the
335 336 # observing of engine stopping is inconsistent. Some launchers
336 337 # might trigger on a single engine stopping, other wait until
337 338 # all stop. TODO: think more about how to handle this.
338 339
339 340 # Start the controller and engines
340 341 self._stopping = False # Make sure stop_launchers is not called 2x.
341 342 d = self.start_controller()
342 343 self.start_engines()
343 344 self.startup_message()
344 345 # d.addCallback(self.start_engines)
345 346 # d.addCallback(self.startup_message)
346 347 # If the controller or engines fail to start, stop everything
347 348 # d.addErrback(self.stop_launchers)
348 349 return d
349 350
350 351 def startup_message(self, r=None):
351 logging.info("IPython cluster: started")
352 self.log.info("IPython cluster: started")
352 353 return r
353 354
354 355 def start_controller(self, r=None):
355 # logging.info("In start_controller")
356 # self.log.info("In start_controller")
356 357 config = self.master_config
357 358 d = self.controller_launcher.start(
358 359 cluster_dir=config.Global.cluster_dir
359 360 )
360 361 return d
361 362
362 363 def start_engines(self, r=None):
363 # logging.info("In start_engines")
364 # self.log.info("In start_engines")
364 365 config = self.master_config
365 366 d = self.engine_launcher.start(
366 367 config.Global.n,
367 368 cluster_dir=config.Global.cluster_dir
368 369 )
369 370 return d
370 371
371 372 def stop_controller(self, r=None):
372 # logging.info("In stop_controller")
373 # self.log.info("In stop_controller")
373 374 if self.controller_launcher.running:
374 375 return self.controller_launcher.stop()
375 376
376 377 def stop_engines(self, r=None):
377 # logging.info("In stop_engines")
378 # self.log.info("In stop_engines")
378 379 if self.engine_launcher.running:
379 380 d = self.engine_launcher.stop()
380 381 # d.addErrback(self.log_err)
381 382 return d
382 383 else:
383 384 return None
384 385
385 386 def log_err(self, f):
386 logging.error(f.getTraceback())
387 self.log.error(f.getTraceback())
387 388 return None
388 389
389 390 def stop_launchers(self, r=None):
390 391 if not self._stopping:
391 392 self._stopping = True
392 393 # if isinstance(r, failure.Failure):
393 # logging.error('Unexpected error in ipcluster:')
394 # logging.info(r.getTraceback())
395 logging.error("IPython cluster: stopping")
394 # self.log.error('Unexpected error in ipcluster:')
395 # self.log.info(r.getTraceback())
396 self.log.error("IPython cluster: stopping")
396 397 # These return deferreds. We are not doing anything with them
397 398 # but we are holding refs to them as a reminder that they
398 399 # do return deferreds.
399 400 d1 = self.stop_engines()
400 401 d2 = self.stop_controller()
401 402 # Wait a few seconds to let things shut down.
402 403 dc = ioloop.DelayedCallback(self.loop.stop, 4000, self.loop)
403 404 dc.start()
404 405 # reactor.callLater(4.0, reactor.stop)
405 406
406 407 def sigint_handler(self, signum, frame):
407 408 self.stop_launchers()
408 409
409 410 def start_logging(self):
410 411 # Remove old log files of the controller and engine
411 412 if self.master_config.Global.clean_logs:
412 413 log_dir = self.master_config.Global.log_dir
413 414 for f in os.listdir(log_dir):
414 415 if f.startswith('ipengine' + '-'):
415 416 if f.endswith('.log') or f.endswith('.out') or f.endswith('.err'):
416 417 os.remove(os.path.join(log_dir, f))
417 418 if f.startswith('ipcontroller' + '-'):
418 419 if f.endswith('.log') or f.endswith('.out') or f.endswith('.err'):
419 420 os.remove(os.path.join(log_dir, f))
420 421 # This will remote old log files for ipcluster itself
421 422 super(IPClusterApp, self).start_logging()
422 423
423 424 def start_app(self):
424 425 """Start the application, depending on what subcommand is used."""
425 426 subcmd = self.master_config.Global.subcommand
426 427 if subcmd=='create' or subcmd=='list':
427 428 return
428 429 elif subcmd=='start':
429 430 self.start_app_start()
430 431 elif subcmd=='stop':
431 432 self.start_app_stop()
432 433
433 434 def start_app_start(self):
434 435 """Start the app for the start subcommand."""
435 436 config = self.master_config
436 437 # First see if the cluster is already running
437 438 try:
438 439 pid = self.get_pid_from_file()
439 440 except PIDFileError:
440 441 pass
441 442 else:
442 443 self.log.critical(
443 444 'Cluster is already running with [pid=%s]. '
444 445 'use "ipcluster stop" to stop the cluster.' % pid
445 446 )
446 447 # Here I exit with a unusual exit status that other processes
447 448 # can watch for to learn how I existed.
448 449 self.exit(ALREADY_STARTED)
449 450
450 451 # Now log and daemonize
451 452 self.log.info(
452 453 'Starting ipcluster with [daemon=%r]' % config.Global.daemonize
453 454 )
454 455 # TODO: Get daemonize working on Windows or as a Windows Server.
455 456 if config.Global.daemonize:
456 457 if os.name=='posix':
457 458 from twisted.scripts._twistd_unix import daemonize
458 459 daemonize()
459 460
460 461 # Now write the new pid file AFTER our new forked pid is active.
461 462 self.write_pid_file()
462 463 try:
463 464 self.loop.start()
464 465 except:
465 logging.info("stopping...")
466 self.log.info("stopping...")
466 467 self.remove_pid_file()
467 468
468 469 def start_app_stop(self):
469 470 """Start the app for the stop subcommand."""
470 471 config = self.master_config
471 472 try:
472 473 pid = self.get_pid_from_file()
473 474 except PIDFileError:
474 475 self.log.critical(
475 476 'Problem reading pid file, cluster is probably not running.'
476 477 )
477 478 # Here I exit with a unusual exit status that other processes
478 479 # can watch for to learn how I existed.
479 480 self.exit(ALREADY_STOPPED)
480 481 else:
481 482 if os.name=='posix':
482 483 sig = config.Global.signal
483 484 self.log.info(
484 485 "Stopping cluster [pid=%r] with [signal=%r]" % (pid, sig)
485 486 )
486 487 os.kill(pid, sig)
487 488 elif os.name=='nt':
488 489 # As of right now, we don't support daemonize on Windows, so
489 490 # stop will not do anything. Minimally, it should clean up the
490 491 # old .pid files.
491 492 self.remove_pid_file()
492 493
493 494
494 495 def launch_new_instance():
495 496 """Create and run the IPython cluster."""
496 497 app = IPClusterApp()
497 498 app.start()
498 499
499 500
500 501 if __name__ == '__main__':
501 502 launch_new_instance()
502 503
@@ -1,340 +1,340 b''
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3 """
4 4 The IPython controller application.
5 5 """
6 6
7 7 #-----------------------------------------------------------------------------
8 8 # Copyright (C) 2008-2009 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING, distributed as part of this software.
12 12 #-----------------------------------------------------------------------------
13 13
14 14 #-----------------------------------------------------------------------------
15 15 # Imports
16 16 #-----------------------------------------------------------------------------
17 17
18 18 from __future__ import with_statement
19 19
20 20 import copy
21 21 import sys
22 22 import os
23 23 import logging
24 24 # from twisted.application import service
25 25 # from twisted.internet import reactor
26 26 # from twisted.python import log
27 27
28 28 import zmq
29 29 from zmq.log.handlers import PUBHandler
30 30
31 31 from IPython.config.loader import Config
32 32 from IPython.zmq.parallel import factory
33 33 from IPython.zmq.parallel.controller import ControllerFactory
34 34 from IPython.zmq.parallel.clusterdir import (
35 35 ApplicationWithClusterDir,
36 36 ClusterDirConfigLoader
37 37 )
38 38 # from IPython.kernel.fcutil import FCServiceFactory, FURLError
39 39 from IPython.utils.traitlets import Instance, Unicode
40 40
41 41 from entry_point import generate_exec_key
42 42
43 43
44 44 #-----------------------------------------------------------------------------
45 45 # Module level variables
46 46 #-----------------------------------------------------------------------------
47 47
48 48
49 49 #: The default config file name for this application
50 50 default_config_file_name = u'ipcontroller_config.py'
51 51
52 52
53 53 _description = """Start the IPython controller for parallel computing.
54 54
55 55 The IPython controller provides a gateway between the IPython engines and
56 56 clients. The controller needs to be started before the engines and can be
57 57 configured using command line options or using a cluster directory. Cluster
58 58 directories contain config, log and security files and are usually located in
59 59 your .ipython directory and named as "cluster_<profile>". See the --profile
60 60 and --cluster-dir options for details.
61 61 """
62 62
63 63 #-----------------------------------------------------------------------------
64 64 # Default interfaces
65 65 #-----------------------------------------------------------------------------
66 66
67 67 # The default client interfaces for FCClientServiceFactory.interfaces
68 68 default_client_interfaces = Config()
69 69 default_client_interfaces.Default.url_file = 'ipcontroller-client.url'
70 70
71 71 # Make this a dict we can pass to Config.__init__ for the default
72 72 default_client_interfaces = dict(copy.deepcopy(default_client_interfaces.items()))
73 73
74 74
75 75
76 76 # The default engine interfaces for FCEngineServiceFactory.interfaces
77 77 default_engine_interfaces = Config()
78 78 default_engine_interfaces.Default.url_file = u'ipcontroller-engine.url'
79 79
80 80 # Make this a dict we can pass to Config.__init__ for the default
81 81 default_engine_interfaces = dict(copy.deepcopy(default_engine_interfaces.items()))
82 82
83 83
84 84 #-----------------------------------------------------------------------------
85 85 # Service factories
86 86 #-----------------------------------------------------------------------------
87 87
88 88 #
89 89 # class FCClientServiceFactory(FCServiceFactory):
90 90 # """A Foolscap implementation of the client services."""
91 91 #
92 92 # cert_file = Unicode(u'ipcontroller-client.pem', config=True)
93 93 # interfaces = Instance(klass=Config, kw=default_client_interfaces,
94 94 # allow_none=False, config=True)
95 95 #
96 96 #
97 97 # class FCEngineServiceFactory(FCServiceFactory):
98 98 # """A Foolscap implementation of the engine services."""
99 99 #
100 100 # cert_file = Unicode(u'ipcontroller-engine.pem', config=True)
101 101 # interfaces = Instance(klass=dict, kw=default_engine_interfaces,
102 102 # allow_none=False, config=True)
103 103 #
104 104
105 105 #-----------------------------------------------------------------------------
106 106 # Command line options
107 107 #-----------------------------------------------------------------------------
108 108
109 109
110 110 class IPControllerAppConfigLoader(ClusterDirConfigLoader):
111 111
112 112 def _add_arguments(self):
113 113 super(IPControllerAppConfigLoader, self)._add_arguments()
114 114 paa = self.parser.add_argument
115 115
116 116 ## Hub Config:
117 117 paa('--mongodb',
118 118 dest='HubFactory.db_class', action='store_const',
119 119 const='IPython.zmq.parallel.mongodb.MongoDB',
120 120 help='Use MongoDB task storage [default: in-memory]')
121 121 paa('--hb',
122 122 type=int, dest='HubFactory.hb', nargs=2,
123 123 help='The (2) ports the Hub\'s Heartmonitor will use for the heartbeat '
124 124 'connections [default: random]',
125 125 metavar='Hub.hb_ports')
126 126 paa('--ping',
127 127 type=int, dest='HubFactory.ping',
128 128 help='The frequency at which the Hub pings the engines for heartbeats '
129 129 ' (in ms) [default: 100]',
130 130 metavar='Hub.ping')
131 131
132 132 # Client config
133 133 paa('--client-ip',
134 134 type=str, dest='HubFactory.client_ip',
135 135 help='The IP address or hostname the Hub will listen on for '
136 136 'client connections. Both engine-ip and client-ip can be set simultaneously '
137 137 'via --ip [default: loopback]',
138 138 metavar='Hub.client_ip')
139 139 paa('--client-transport',
140 140 type=str, dest='HubFactory.client_transport',
141 141 help='The ZeroMQ transport the Hub will use for '
142 142 'client connections. Both engine-transport and client-transport can be set simultaneously '
143 143 'via --transport [default: tcp]',
144 144 metavar='Hub.client_transport')
145 145 paa('--query',
146 146 type=int, dest='HubFactory.query_port',
147 147 help='The port on which the Hub XREP socket will listen for result queries from clients [default: random]',
148 148 metavar='Hub.query_port')
149 149 paa('--notifier',
150 150 type=int, dest='HubFactory.notifier_port',
151 151 help='The port on which the Hub PUB socket will listen for notification connections [default: random]',
152 152 metavar='Hub.notifier_port')
153 153
154 154 # Engine config
155 155 paa('--engine-ip',
156 156 type=str, dest='HubFactory.engine_ip',
157 157 help='The IP address or hostname the Hub will listen on for '
158 158 'engine connections. This applies to the Hub and its schedulers'
159 159 'engine-ip and client-ip can be set simultaneously '
160 160 'via --ip [default: loopback]',
161 161 metavar='Hub.engine_ip')
162 162 paa('--engine-transport',
163 163 type=str, dest='HubFactory.engine_transport',
164 164 help='The ZeroMQ transport the Hub will use for '
165 165 'client connections. Both engine-transport and client-transport can be set simultaneously '
166 166 'via --transport [default: tcp]',
167 167 metavar='Hub.engine_transport')
168 168
169 169 # Scheduler config
170 170 paa('--mux',
171 171 type=int, dest='ControllerFactory.mux', nargs=2,
172 172 help='The (2) ports the MUX scheduler will listen on for client,engine '
173 173 'connections, respectively [default: random]',
174 174 metavar='Scheduler.mux_ports')
175 175 paa('--task',
176 176 type=int, dest='ControllerFactory.task', nargs=2,
177 177 help='The (2) ports the Task scheduler will listen on for client,engine '
178 178 'connections, respectively [default: random]',
179 179 metavar='Scheduler.task_ports')
180 180 paa('--control',
181 181 type=int, dest='ControllerFactory.control', nargs=2,
182 182 help='The (2) ports the Control scheduler will listen on for client,engine '
183 183 'connections, respectively [default: random]',
184 184 metavar='Scheduler.control_ports')
185 185 paa('--iopub',
186 186 type=int, dest='ControllerFactory.iopub', nargs=2,
187 187 help='The (2) ports the IOPub scheduler will listen on for client,engine '
188 188 'connections, respectively [default: random]',
189 189 metavar='Scheduler.iopub_ports')
190 190 paa('--scheme',
191 191 type=str, dest='ControllerFactory.scheme',
192 192 choices = ['pure', 'lru', 'plainrandom', 'weighted', 'twobin','leastload'],
193 193 help='select the task scheduler scheme [default: Python LRU]',
194 194 metavar='Scheduler.scheme')
195 195 paa('--usethreads',
196 196 dest='ControllerFactory.usethreads', action="store_true",
197 197 help='Use threads instead of processes for the schedulers',
198 198 )
199 199
200 200 ## Global config
201 201 paa('--log-to-file',
202 202 action='store_true', dest='Global.log_to_file',
203 203 help='Log to a file in the log directory (default is stdout)')
204 204 paa('--log-url',
205 205 type=str, dest='Global.log_url',
206 206 help='Broadcast logs to an iploggerz process [default: disabled]')
207 207 paa('-r','--reuse-key',
208 208 action='store_true', dest='Global.reuse_key',
209 209 help='Try to reuse existing execution keys.')
210 210 paa('--no-secure',
211 211 action='store_false', dest='Global.secure',
212 212 help='Turn off execution keys (default).')
213 213 paa('--secure',
214 214 action='store_true', dest='Global.secure',
215 215 help='Turn on execution keys.')
216 216 paa('--execkey',
217 217 type=str, dest='Global.exec_key',
218 218 help='path to a file containing an execution key.',
219 219 metavar='keyfile')
220 220 factory.add_session_arguments(self.parser)
221 221 factory.add_registration_arguments(self.parser)
222 222
223 223
224 224 #-----------------------------------------------------------------------------
225 225 # The main application
226 226 #-----------------------------------------------------------------------------
227 227
228 228
229 229 class IPControllerApp(ApplicationWithClusterDir):
230 230
231 231 name = u'ipcontrollerz'
232 232 description = _description
233 233 command_line_loader = IPControllerAppConfigLoader
234 234 default_config_file_name = default_config_file_name
235 235 auto_create_cluster_dir = True
236 236
237 237 def create_default_config(self):
238 238 super(IPControllerApp, self).create_default_config()
239 239 # Don't set defaults for Global.secure or Global.reuse_furls
240 240 # as those are set in a component.
241 241 self.default_config.Global.import_statements = []
242 242 self.default_config.Global.clean_logs = True
243 243 self.default_config.Global.secure = False
244 244 self.default_config.Global.reuse_key = False
245 245 self.default_config.Global.exec_key = "exec_key.key"
246 246
247 247 def pre_construct(self):
248 248 super(IPControllerApp, self).pre_construct()
249 249 c = self.master_config
250 250 # The defaults for these are set in FCClientServiceFactory and
251 251 # FCEngineServiceFactory, so we only set them here if the global
252 252 # options have be set to override the class level defaults.
253 253
254 254 # if hasattr(c.Global, 'reuse_furls'):
255 255 # c.FCClientServiceFactory.reuse_furls = c.Global.reuse_furls
256 256 # c.FCEngineServiceFactory.reuse_furls = c.Global.reuse_furls
257 257 # del c.Global.reuse_furls
258 258 # if hasattr(c.Global, 'secure'):
259 259 # c.FCClientServiceFactory.secure = c.Global.secure
260 260 # c.FCEngineServiceFactory.secure = c.Global.secure
261 261 # del c.Global.secure
262 262
263 263 def construct(self):
264 264 # This is the working dir by now.
265 265 sys.path.insert(0, '')
266 266 c = self.master_config
267 267
268 268 self.import_statements()
269 269
270 270 if c.Global.secure:
271 271 keyfile = os.path.join(c.Global.security_dir, c.Global.exec_key)
272 272 if not c.Global.reuse_key or not os.path.exists(keyfile):
273 273 generate_exec_key(keyfile)
274 274 c.SessionFactory.exec_key = keyfile
275 275 else:
276 276 keyfile = os.path.join(c.Global.security_dir, c.Global.exec_key)
277 277 if os.path.exists(keyfile):
278 278 os.remove(keyfile)
279 279 c.SessionFactory.exec_key = ''
280 280
281 281 try:
282 self.factory = ControllerFactory(config=c)
282 self.factory = ControllerFactory(config=c, logname=self.log.name)
283 283 self.start_logging()
284 284 self.factory.construct()
285 285 except:
286 286 self.log.error("Couldn't construct the Controller", exc_info=True)
287 287 self.exit(1)
288 288
289 289 def save_urls(self):
290 290 """save the registration urls to files."""
291 291 c = self.master_config
292 292
293 293 sec_dir = c.Global.security_dir
294 294 cf = self.factory
295 295
296 296 with open(os.path.join(sec_dir, 'ipcontroller-engine.url'), 'w') as f:
297 297 f.write("%s://%s:%s"%(cf.engine_transport, cf.engine_ip, cf.regport))
298 298
299 299 with open(os.path.join(sec_dir, 'ipcontroller-client.url'), 'w') as f:
300 300 f.write("%s://%s:%s"%(cf.client_transport, cf.client_ip, cf.regport))
301 301
302 302
303 303 def import_statements(self):
304 304 statements = self.master_config.Global.import_statements
305 305 for s in statements:
306 306 try:
307 307 self.log.msg("Executing statement: '%s'" % s)
308 308 exec s in globals(), locals()
309 309 except:
310 310 self.log.msg("Error running statement: %s" % s)
311 311
312 312 def start_logging(self):
313 313 super(IPControllerApp, self).start_logging()
314 314 if self.master_config.Global.log_url:
315 315 context = self.factory.context
316 316 lsock = context.socket(zmq.PUB)
317 317 lsock.connect(self.master_config.Global.log_url)
318 318 handler = PUBHandler(lsock)
319 319 handler.root_topic = 'controller'
320 320 handler.setLevel(self.log_level)
321 321 self.log.addHandler(handler)
322 322 #
323 323 def start_app(self):
324 324 # Start the subprocesses:
325 325 self.factory.start()
326 326 self.write_pid_file(overwrite=True)
327 327 try:
328 328 self.factory.loop.start()
329 329 except KeyboardInterrupt:
330 330 self.log.critical("Interrupted, Exiting...\n")
331 331
332 332
333 333 def launch_new_instance():
334 334 """Create and run the IPython controller"""
335 335 app = IPControllerApp()
336 336 app.start()
337 337
338 338
339 339 if __name__ == '__main__':
340 340 launch_new_instance()
@@ -1,261 +1,261 b''
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3 """
4 4 The IPython engine application
5 5 """
6 6
7 7 #-----------------------------------------------------------------------------
8 8 # Copyright (C) 2008-2009 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING, distributed as part of this software.
12 12 #-----------------------------------------------------------------------------
13 13
14 14 #-----------------------------------------------------------------------------
15 15 # Imports
16 16 #-----------------------------------------------------------------------------
17 17
18 18 import os
19 19 import sys
20 20
21 21 import zmq
22 22 from zmq.eventloop import ioloop
23 23
24 24 from IPython.zmq.parallel.clusterdir import (
25 25 ApplicationWithClusterDir,
26 26 ClusterDirConfigLoader
27 27 )
28 28 from IPython.zmq.log import EnginePUBHandler
29 29
30 30 from IPython.zmq.parallel import factory
31 31 from IPython.zmq.parallel.engine import EngineFactory
32 32 from IPython.zmq.parallel.streamkernel import Kernel
33 33 from IPython.utils.importstring import import_item
34 34
35 35 #-----------------------------------------------------------------------------
36 36 # Module level variables
37 37 #-----------------------------------------------------------------------------
38 38
39 39 #: The default config file name for this application
40 40 default_config_file_name = u'ipengine_config.py'
41 41
42 42
43 43 mpi4py_init = """from mpi4py import MPI as mpi
44 44 mpi.size = mpi.COMM_WORLD.Get_size()
45 45 mpi.rank = mpi.COMM_WORLD.Get_rank()
46 46 """
47 47
48 48
49 49 pytrilinos_init = """from PyTrilinos import Epetra
50 50 class SimpleStruct:
51 51 pass
52 52 mpi = SimpleStruct()
53 53 mpi.rank = 0
54 54 mpi.size = 0
55 55 """
56 56
57 57
58 58 _description = """Start an IPython engine for parallel computing.\n\n
59 59
60 60 IPython engines run in parallel and perform computations on behalf of a client
61 61 and controller. A controller needs to be started before the engines. The
62 62 engine can be configured using command line options or using a cluster
63 63 directory. Cluster directories contain config, log and security files and are
64 64 usually located in your .ipython directory and named as "cluster_<profile>".
65 65 See the --profile and --cluster-dir options for details.
66 66 """
67 67
68 68 #-----------------------------------------------------------------------------
69 69 # Command line options
70 70 #-----------------------------------------------------------------------------
71 71
72 72
73 73 class IPEngineAppConfigLoader(ClusterDirConfigLoader):
74 74
75 75 def _add_arguments(self):
76 76 super(IPEngineAppConfigLoader, self)._add_arguments()
77 77 paa = self.parser.add_argument
78 78 # Controller config
79 79 paa('--url-file',
80 80 type=unicode, dest='Global.url_file',
81 81 help='The full location of the file containing the FURL of the '
82 82 'controller. If this is not given, the FURL file must be in the '
83 83 'security directory of the cluster directory. This location is '
84 84 'resolved using the --profile and --app-dir options.',
85 85 metavar='Global.url_file')
86 86 # MPI
87 87 paa('--mpi',
88 88 type=str, dest='MPI.use',
89 89 help='How to enable MPI (mpi4py, pytrilinos, or empty string to disable).',
90 90 metavar='MPI.use')
91 91 # Global config
92 92 paa('--log-to-file',
93 93 action='store_true', dest='Global.log_to_file',
94 94 help='Log to a file in the log directory (default is stdout)')
95 95 paa('--log-url',
96 96 dest='Global.log_url',
97 97 help="url of ZMQ logger, as started with iploggerz")
98 98 paa('--execkey',
99 99 type=str, dest='Global.exec_key',
100 100 help='path to a file containing an execution key.',
101 101 metavar='keyfile')
102 102 paa('--no-secure',
103 103 action='store_false', dest='Global.secure',
104 104 help='Turn off execution keys.')
105 105 paa('--secure',
106 106 action='store_true', dest='Global.secure',
107 107 help='Turn on execution keys (default).')
108 108 # init command
109 109 paa('-c',
110 110 type=str, dest='Global.extra_exec_lines',
111 111 help='specify a command to be run at startup')
112 112
113 113 factory.add_session_arguments(self.parser)
114 114 factory.add_registration_arguments(self.parser)
115 115
116 116
117 117 #-----------------------------------------------------------------------------
118 118 # Main application
119 119 #-----------------------------------------------------------------------------
120 120
121 121
122 122 class IPEngineApp(ApplicationWithClusterDir):
123 123
124 124 name = u'ipenginez'
125 125 description = _description
126 126 command_line_loader = IPEngineAppConfigLoader
127 127 default_config_file_name = default_config_file_name
128 128 auto_create_cluster_dir = True
129 129
130 130 def create_default_config(self):
131 131 super(IPEngineApp, self).create_default_config()
132 132
133 133 # The engine should not clean logs as we don't want to remove the
134 134 # active log files of other running engines.
135 135 self.default_config.Global.clean_logs = False
136 136 self.default_config.Global.secure = True
137 137
138 138 # Global config attributes
139 139 self.default_config.Global.exec_lines = []
140 140 self.default_config.Global.extra_exec_lines = ''
141 141
142 142 # Configuration related to the controller
143 143 # This must match the filename (path not included) that the controller
144 144 # used for the FURL file.
145 145 self.default_config.Global.url = u'tcp://localhost:10101'
146 146 # If given, this is the actual location of the controller's FURL file.
147 147 # If not, this is computed using the profile, app_dir and furl_file_name
148 148 self.default_config.Global.key_file_name = u'exec_key.key'
149 149 self.default_config.Global.key_file = u''
150 150
151 151 # MPI related config attributes
152 152 self.default_config.MPI.use = ''
153 153 self.default_config.MPI.mpi4py = mpi4py_init
154 154 self.default_config.MPI.pytrilinos = pytrilinos_init
155 155
156 156 def post_load_command_line_config(self):
157 157 pass
158 158
159 159 def pre_construct(self):
160 160 super(IPEngineApp, self).pre_construct()
161 161 # self.find_cont_url_file()
162 162 self.find_key_file()
163 163 if self.master_config.Global.extra_exec_lines:
164 164 self.master_config.Global.exec_lines.append(self.master_config.Global.extra_exec_lines)
165 165
166 166 def find_key_file(self):
167 167 """Set the key file.
168 168
169 169 Here we don't try to actually see if it exists for is valid as that
170 170 is hadled by the connection logic.
171 171 """
172 172 config = self.master_config
173 173 # Find the actual controller key file
174 174 if not config.Global.key_file:
175 175 try_this = os.path.join(
176 176 config.Global.cluster_dir,
177 177 config.Global.security_dir,
178 178 config.Global.key_file_name
179 179 )
180 180 config.Global.key_file = try_this
181 181
182 182 def construct(self):
183 183 # This is the working dir by now.
184 184 sys.path.insert(0, '')
185 185 config = self.master_config
186 186 if os.path.exists(config.Global.key_file) and config.Global.secure:
187 187 config.SessionFactory.exec_key = config.Global.key_file
188 188
189 189 config.Kernel.exec_lines = config.Global.exec_lines
190 190
191 191 self.start_mpi()
192 192
193 193 # Create the underlying shell class and EngineService
194 194 # shell_class = import_item(self.master_config.Global.shell_class)
195 195 try:
196 self.engine = EngineFactory(config=config)
196 self.engine = EngineFactory(config=config, logname=self.log.name)
197 197 except:
198 198 self.log.error("Couldn't start the Engine", exc_info=True)
199 199 self.exit(1)
200 200
201 201 self.start_logging()
202 202
203 203 # Create the service hierarchy
204 204 # self.main_service = service.MultiService()
205 205 # self.engine_service.setServiceParent(self.main_service)
206 206 # self.tub_service = Tub()
207 207 # self.tub_service.setServiceParent(self.main_service)
208 208 # # This needs to be called before the connection is initiated
209 209 # self.main_service.startService()
210 210
211 211 # This initiates the connection to the controller and calls
212 212 # register_engine to tell the controller we are ready to do work
213 213 # self.engine_connector = EngineConnector(self.tub_service)
214 214
215 215 # self.log.info("Using furl file: %s" % self.master_config.Global.furl_file)
216 216
217 217 # reactor.callWhenRunning(self.call_connect)
218 218
219 219
220 220 def start_logging(self):
221 221 super(IPEngineApp, self).start_logging()
222 222 if self.master_config.Global.log_url:
223 223 context = self.engine.context
224 224 lsock = context.socket(zmq.PUB)
225 225 lsock.connect(self.master_config.Global.log_url)
226 226 handler = EnginePUBHandler(self.engine, lsock)
227 227 handler.setLevel(self.log_level)
228 228 self.log.addHandler(handler)
229 229
230 230 def start_mpi(self):
231 231 global mpi
232 232 mpikey = self.master_config.MPI.use
233 233 mpi_import_statement = self.master_config.MPI.get(mpikey, None)
234 234 if mpi_import_statement is not None:
235 235 try:
236 236 self.log.info("Initializing MPI:")
237 237 self.log.info(mpi_import_statement)
238 238 exec mpi_import_statement in globals()
239 239 except:
240 240 mpi = None
241 241 else:
242 242 mpi = None
243 243
244 244
245 245 def start_app(self):
246 246 self.engine.start()
247 247 try:
248 248 self.engine.loop.start()
249 249 except KeyboardInterrupt:
250 250 self.log.critical("Engine Interrupted, shutting down...\n")
251 251
252 252
253 253 def launch_new_instance():
254 254 """Create and run the IPython controller"""
255 255 app = IPEngineApp()
256 256 app.start()
257 257
258 258
259 259 if __name__ == '__main__':
260 260 launch_new_instance()
261 261
@@ -1,131 +1,132 b''
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3 """
4 4 A simple IPython logger application
5 5 """
6 6
7 7 #-----------------------------------------------------------------------------
8 8 # Copyright (C) 2011 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING, distributed as part of this software.
12 12 #-----------------------------------------------------------------------------
13 13
14 14 #-----------------------------------------------------------------------------
15 15 # Imports
16 16 #-----------------------------------------------------------------------------
17 17
18 18 import os
19 19 import sys
20 20
21 21 import zmq
22 22
23 23 from IPython.zmq.parallel.clusterdir import (
24 24 ApplicationWithClusterDir,
25 25 ClusterDirConfigLoader
26 26 )
27 27 from IPython.zmq.parallel.logwatcher import LogWatcher
28 28
29 29 #-----------------------------------------------------------------------------
30 30 # Module level variables
31 31 #-----------------------------------------------------------------------------
32 32
33 33 #: The default config file name for this application
34 34 default_config_file_name = u'iplogger_config.py'
35 35
36 36 _description = """Start an IPython logger for parallel computing.\n\n
37 37
38 38 IPython controllers and engines (and your own processes) can broadcast log messages
39 39 by registering a `zmq.log.handlers.PUBHandler` with the `logging` module. The
40 40 logger can be configured using command line options or using a cluster
41 41 directory. Cluster directories contain config, log and security files and are
42 42 usually located in your .ipython directory and named as "cluster_<profile>".
43 43 See the --profile and --cluster-dir options for details.
44 44 """
45 45
46 46 #-----------------------------------------------------------------------------
47 47 # Command line options
48 48 #-----------------------------------------------------------------------------
49 49
50 50
51 51 class IPLoggerAppConfigLoader(ClusterDirConfigLoader):
52 52
53 53 def _add_arguments(self):
54 54 super(IPLoggerAppConfigLoader, self)._add_arguments()
55 55 paa = self.parser.add_argument
56 56 # Controller config
57 57 paa('--url',
58 58 type=str, dest='LogWatcher.url',
59 59 help='The url the LogWatcher will listen on',
60 60 )
61 61 # MPI
62 62 paa('--topics',
63 63 type=str, dest='LogWatcher.topics', nargs='+',
64 64 help='What topics to subscribe to',
65 65 metavar='topics')
66 66 # Global config
67 67 paa('--log-to-file',
68 68 action='store_true', dest='Global.log_to_file',
69 69 help='Log to a file in the log directory (default is stdout)')
70 70
71 71
72 72 #-----------------------------------------------------------------------------
73 73 # Main application
74 74 #-----------------------------------------------------------------------------
75 75
76 76
77 77 class IPLoggerApp(ApplicationWithClusterDir):
78 78
79 79 name = u'iploggerz'
80 80 description = _description
81 81 command_line_loader = IPLoggerAppConfigLoader
82 82 default_config_file_name = default_config_file_name
83 83 auto_create_cluster_dir = True
84 84
85 85 def create_default_config(self):
86 86 super(IPLoggerApp, self).create_default_config()
87 87
88 88 # The engine should not clean logs as we don't want to remove the
89 89 # active log files of other running engines.
90 90 self.default_config.Global.clean_logs = False
91 91
92 92 # If given, this is the actual location of the logger's URL file.
93 93 # If not, this is computed using the profile, app_dir and furl_file_name
94 94 self.default_config.Global.url_file_name = u'iplogger.url'
95 95 self.default_config.Global.url_file = u''
96 96
97 97 def post_load_command_line_config(self):
98 98 pass
99 99
100 100 def pre_construct(self):
101 101 super(IPLoggerApp, self).pre_construct()
102 102
103 103 def construct(self):
104 104 # This is the working dir by now.
105 105 sys.path.insert(0, '')
106 106
107 107 self.start_logging()
108 108
109 109 try:
110 self.watcher = LogWatcher(config=self.master_config)
110 self.watcher = LogWatcher(config=self.master_config, logname=self.log.name)
111 111 except:
112 112 self.log.error("Couldn't start the LogWatcher", exc_info=True)
113 113 self.exit(1)
114 114
115 115
116 116 def start_app(self):
117 117 try:
118 self.watcher.start()
118 119 self.watcher.loop.start()
119 120 except KeyboardInterrupt:
120 121 self.log.critical("Logging Interrupted, shutting down...\n")
121 122
122 123
123 124 def launch_new_instance():
124 125 """Create and run the IPython LogWatcher"""
125 126 app = IPLoggerApp()
126 127 app.start()
127 128
128 129
129 130 if __name__ == '__main__':
130 131 launch_new_instance()
131 132
@@ -1,824 +1,825 b''
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3 """
4 4 Facilities for launching IPython processes asynchronously.
5 5 """
6 6
7 7 #-----------------------------------------------------------------------------
8 8 # Copyright (C) 2008-2009 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING, distributed as part of this software.
12 12 #-----------------------------------------------------------------------------
13 13
14 14 #-----------------------------------------------------------------------------
15 15 # Imports
16 16 #-----------------------------------------------------------------------------
17 17
18 18 import os
19 19 import re
20 20 import sys
21 21 import logging
22 22
23 23 from signal import SIGINT
24 24 try:
25 25 from signal import SIGKILL
26 26 except ImportError:
27 27 SIGKILL=SIGTERM
28 28
29 29 from subprocess import Popen, PIPE
30 30
31 31 from zmq.eventloop import ioloop
32 32
33 from IPython.config.configurable import Configurable
33 # from IPython.config.configurable import Configurable
34 34 from IPython.utils.traitlets import Str, Int, List, Unicode, Instance
35 35 from IPython.utils.path import get_ipython_module_path
36 36 from IPython.utils.process import find_cmd, pycmd2argv, FindCmdError
37 37
38 from factory import LoggingFactory
38 39 # from IPython.kernel.winhpcjob import (
39 40 # IPControllerTask, IPEngineTask,
40 41 # IPControllerJob, IPEngineSetJob
41 42 # )
42 43
43 44
44 45 #-----------------------------------------------------------------------------
45 46 # Paths to the kernel apps
46 47 #-----------------------------------------------------------------------------
47 48
48 49
49 50 ipcluster_cmd_argv = pycmd2argv(get_ipython_module_path(
50 51 'IPython.zmq.parallel.ipclusterapp'
51 52 ))
52 53
53 54 ipengine_cmd_argv = pycmd2argv(get_ipython_module_path(
54 55 'IPython.zmq.parallel.ipengineapp'
55 56 ))
56 57
57 58 ipcontroller_cmd_argv = pycmd2argv(get_ipython_module_path(
58 59 'IPython.zmq.parallel.ipcontrollerapp'
59 60 ))
60 61
61 62 #-----------------------------------------------------------------------------
62 63 # Base launchers and errors
63 64 #-----------------------------------------------------------------------------
64 65
65 66
66 67 class LauncherError(Exception):
67 68 pass
68 69
69 70
70 71 class ProcessStateError(LauncherError):
71 72 pass
72 73
73 74
74 75 class UnknownStatus(LauncherError):
75 76 pass
76 77
77 78
78 class BaseLauncher(Configurable):
79 class BaseLauncher(LoggingFactory):
79 80 """An asbtraction for starting, stopping and signaling a process."""
80 81
81 82 # In all of the launchers, the work_dir is where child processes will be
82 83 # run. This will usually be the cluster_dir, but may not be. any work_dir
83 84 # passed into the __init__ method will override the config value.
84 85 # This should not be used to set the work_dir for the actual engine
85 86 # and controller. Instead, use their own config files or the
86 87 # controller_args, engine_args attributes of the launchers to add
87 88 # the --work-dir option.
88 89 work_dir = Unicode(u'.')
89 90 loop = Instance('zmq.eventloop.ioloop.IOLoop')
90 91 def _loop_default(self):
91 92 return ioloop.IOLoop.instance()
92 93
93 def __init__(self, work_dir=u'.', config=None):
94 super(BaseLauncher, self).__init__(work_dir=work_dir, config=config)
94 def __init__(self, work_dir=u'.', config=None, **kwargs):
95 super(BaseLauncher, self).__init__(work_dir=work_dir, config=config, **kwargs)
95 96 self.state = 'before' # can be before, running, after
96 97 self.stop_callbacks = []
97 98 self.start_data = None
98 99 self.stop_data = None
99 100
100 101 @property
101 102 def args(self):
102 103 """A list of cmd and args that will be used to start the process.
103 104
104 105 This is what is passed to :func:`spawnProcess` and the first element
105 106 will be the process name.
106 107 """
107 108 return self.find_args()
108 109
109 110 def find_args(self):
110 111 """The ``.args`` property calls this to find the args list.
111 112
112 113 Subcommand should implement this to construct the cmd and args.
113 114 """
114 115 raise NotImplementedError('find_args must be implemented in a subclass')
115 116
116 117 @property
117 118 def arg_str(self):
118 119 """The string form of the program arguments."""
119 120 return ' '.join(self.args)
120 121
121 122 @property
122 123 def running(self):
123 124 """Am I running."""
124 125 if self.state == 'running':
125 126 return True
126 127 else:
127 128 return False
128 129
129 130 def start(self):
130 131 """Start the process.
131 132
132 133 This must return a deferred that fires with information about the
133 134 process starting (like a pid, job id, etc.).
134 135 """
135 136 raise NotImplementedError('start must be implemented in a subclass')
136 137
137 138 def stop(self):
138 139 """Stop the process and notify observers of stopping.
139 140
140 141 This must return a deferred that fires with information about the
141 142 processing stopping, like errors that occur while the process is
142 143 attempting to be shut down. This deferred won't fire when the process
143 144 actually stops. To observe the actual process stopping, see
144 145 :func:`observe_stop`.
145 146 """
146 147 raise NotImplementedError('stop must be implemented in a subclass')
147 148
148 149 def on_stop(self, f):
149 150 """Get a deferred that will fire when the process stops.
150 151
151 152 The deferred will fire with data that contains information about
152 153 the exit status of the process.
153 154 """
154 155 if self.state=='after':
155 156 return f(self.stop_data)
156 157 else:
157 158 self.stop_callbacks.append(f)
158 159
159 160 def notify_start(self, data):
160 161 """Call this to trigger startup actions.
161 162
162 163 This logs the process startup and sets the state to 'running'. It is
163 164 a pass-through so it can be used as a callback.
164 165 """
165 166
166 logging.info('Process %r started: %r' % (self.args[0], data))
167 self.log.info('Process %r started: %r' % (self.args[0], data))
167 168 self.start_data = data
168 169 self.state = 'running'
169 170 return data
170 171
171 172 def notify_stop(self, data):
172 173 """Call this to trigger process stop actions.
173 174
174 175 This logs the process stopping and sets the state to 'after'. Call
175 176 this to trigger all the deferreds from :func:`observe_stop`."""
176 177
177 logging.info('Process %r stopped: %r' % (self.args[0], data))
178 self.log.info('Process %r stopped: %r' % (self.args[0], data))
178 179 self.stop_data = data
179 180 self.state = 'after'
180 181 for i in range(len(self.stop_callbacks)):
181 182 d = self.stop_callbacks.pop()
182 183 d(data)
183 184 return data
184 185
185 186 def signal(self, sig):
186 187 """Signal the process.
187 188
188 189 Return a semi-meaningless deferred after signaling the process.
189 190
190 191 Parameters
191 192 ----------
192 193 sig : str or int
193 194 'KILL', 'INT', etc., or any signal number
194 195 """
195 196 raise NotImplementedError('signal must be implemented in a subclass')
196 197
197 198
198 199 #-----------------------------------------------------------------------------
199 200 # Local process launchers
200 201 #-----------------------------------------------------------------------------
201 202
202 203
203 204 class LocalProcessLauncher(BaseLauncher):
204 205 """Start and stop an external process in an asynchronous manner.
205 206
206 207 This will launch the external process with a working directory of
207 208 ``self.work_dir``.
208 209 """
209 210
210 211 # This is used to to construct self.args, which is passed to
211 212 # spawnProcess.
212 213 cmd_and_args = List([])
213 214 poll_frequency = Int(100) # in ms
214 215
215 def __init__(self, work_dir=u'.', config=None):
216 def __init__(self, work_dir=u'.', config=None, **kwargs):
216 217 super(LocalProcessLauncher, self).__init__(
217 work_dir=work_dir, config=config
218 work_dir=work_dir, config=config, **kwargs
218 219 )
219 220 self.process = None
220 221 self.start_deferred = None
221 222 self.poller = None
222 223
223 224 def find_args(self):
224 225 return self.cmd_and_args
225 226
226 227 def start(self):
227 228 if self.state == 'before':
228 229 self.process = Popen(self.args,
229 230 stdout=PIPE,stderr=PIPE,stdin=PIPE,
230 231 env=os.environ,
231 232 cwd=self.work_dir
232 233 )
233 234
234 235 self.loop.add_handler(self.process.stdout.fileno(), self.handle_stdout, self.loop.READ)
235 236 self.loop.add_handler(self.process.stderr.fileno(), self.handle_stderr, self.loop.READ)
236 237 self.poller = ioloop.PeriodicCallback(self.poll, self.poll_frequency, self.loop)
237 238 self.poller.start()
238 239 self.notify_start(self.process.pid)
239 240 else:
240 241 s = 'The process was already started and has state: %r' % self.state
241 242 raise ProcessStateError(s)
242 243
243 244 def stop(self):
244 245 return self.interrupt_then_kill()
245 246
246 247 def signal(self, sig):
247 248 if self.state == 'running':
248 249 self.process.send_signal(sig)
249 250
250 251 def interrupt_then_kill(self, delay=2.0):
251 252 """Send INT, wait a delay and then send KILL."""
252 253 self.signal(SIGINT)
253 254 self.killer = ioloop.DelayedCallback(lambda : self.signal(SIGKILL), delay*1000, self.loop)
254 255 self.killer.start()
255 256
256 257 # callbacks, etc:
257 258
258 259 def handle_stdout(self, fd, events):
259 260 line = self.process.stdout.readline()
260 261 # a stopped process will be readable but return empty strings
261 262 if line:
262 logging.info(line[:-1])
263 self.log.info(line[:-1])
263 264 else:
264 265 self.poll()
265 266
266 267 def handle_stderr(self, fd, events):
267 268 line = self.process.stderr.readline()
268 269 # a stopped process will be readable but return empty strings
269 270 if line:
270 logging.error(line[:-1])
271 self.log.error(line[:-1])
271 272 else:
272 273 self.poll()
273 274
274 275 def poll(self):
275 276 status = self.process.poll()
276 277 if status is not None:
277 278 self.poller.stop()
278 279 self.loop.remove_handler(self.process.stdout.fileno())
279 280 self.loop.remove_handler(self.process.stderr.fileno())
280 281 self.notify_stop(dict(exit_code=status, pid=self.process.pid))
281 282 return status
282 283
283 284 class LocalControllerLauncher(LocalProcessLauncher):
284 285 """Launch a controller as a regular external process."""
285 286
286 287 controller_cmd = List(ipcontroller_cmd_argv, config=True)
287 288 # Command line arguments to ipcontroller.
288 289 controller_args = List(['--log-to-file','--log-level', str(logging.ERROR)], config=True)
289 290
290 291 def find_args(self):
291 292 return self.controller_cmd + self.controller_args
292 293
293 294 def start(self, cluster_dir):
294 295 """Start the controller by cluster_dir."""
295 296 self.controller_args.extend(['--cluster-dir', cluster_dir])
296 297 self.cluster_dir = unicode(cluster_dir)
297 logging.info("Starting LocalControllerLauncher: %r" % self.args)
298 self.log.info("Starting LocalControllerLauncher: %r" % self.args)
298 299 return super(LocalControllerLauncher, self).start()
299 300
300 301
301 302 class LocalEngineLauncher(LocalProcessLauncher):
302 303 """Launch a single engine as a regular externall process."""
303 304
304 305 engine_cmd = List(ipengine_cmd_argv, config=True)
305 306 # Command line arguments for ipengine.
306 307 engine_args = List(
307 308 ['--log-to-file','--log-level', str(logging.ERROR)], config=True
308 309 )
309 310
310 311 def find_args(self):
311 312 return self.engine_cmd + self.engine_args
312 313
313 314 def start(self, cluster_dir):
314 315 """Start the engine by cluster_dir."""
315 316 self.engine_args.extend(['--cluster-dir', cluster_dir])
316 317 self.cluster_dir = unicode(cluster_dir)
317 318 return super(LocalEngineLauncher, self).start()
318 319
319 320
320 321 class LocalEngineSetLauncher(BaseLauncher):
321 322 """Launch a set of engines as regular external processes."""
322 323
323 324 # Command line arguments for ipengine.
324 325 engine_args = List(
325 326 ['--log-to-file','--log-level', str(logging.ERROR)], config=True
326 327 )
327 328 # launcher class
328 329 launcher_class = LocalEngineLauncher
329 330
330 def __init__(self, work_dir=u'.', config=None):
331 def __init__(self, work_dir=u'.', config=None, **kwargs):
331 332 super(LocalEngineSetLauncher, self).__init__(
332 work_dir=work_dir, config=config
333 work_dir=work_dir, config=config, **kwargs
333 334 )
334 335 self.launchers = {}
335 336 self.stop_data = {}
336 337
337 338 def start(self, n, cluster_dir):
338 339 """Start n engines by profile or cluster_dir."""
339 340 self.cluster_dir = unicode(cluster_dir)
340 341 dlist = []
341 342 for i in range(n):
342 el = self.launcher_class(work_dir=self.work_dir, config=self.config)
343 el = self.launcher_class(work_dir=self.work_dir, config=self.config, logname=self.log.name)
343 344 # Copy the engine args over to each engine launcher.
344 345 import copy
345 346 el.engine_args = copy.deepcopy(self.engine_args)
346 347 el.on_stop(self._notice_engine_stopped)
347 348 d = el.start(cluster_dir)
348 349 if i==0:
349 logging.info("Starting LocalEngineSetLauncher: %r" % el.args)
350 self.log.info("Starting LocalEngineSetLauncher: %r" % el.args)
350 351 self.launchers[i] = el
351 352 dlist.append(d)
352 353 self.notify_start(dlist)
353 354 # The consumeErrors here could be dangerous
354 355 # dfinal = gatherBoth(dlist, consumeErrors=True)
355 356 # dfinal.addCallback(self.notify_start)
356 357 return dlist
357 358
358 359 def find_args(self):
359 360 return ['engine set']
360 361
361 362 def signal(self, sig):
362 363 dlist = []
363 364 for el in self.launchers.itervalues():
364 365 d = el.signal(sig)
365 366 dlist.append(d)
366 367 # dfinal = gatherBoth(dlist, consumeErrors=True)
367 368 return dlist
368 369
369 370 def interrupt_then_kill(self, delay=1.0):
370 371 dlist = []
371 372 for el in self.launchers.itervalues():
372 373 d = el.interrupt_then_kill(delay)
373 374 dlist.append(d)
374 375 # dfinal = gatherBoth(dlist, consumeErrors=True)
375 376 return dlist
376 377
377 378 def stop(self):
378 379 return self.interrupt_then_kill()
379 380
380 381 def _notice_engine_stopped(self, data):
381 382 print "notice", data
382 383 pid = data['pid']
383 384 for idx,el in self.launchers.iteritems():
384 385 if el.process.pid == pid:
385 386 break
386 387 self.launchers.pop(idx)
387 388 self.stop_data[idx] = data
388 389 if not self.launchers:
389 390 self.notify_stop(self.stop_data)
390 391
391 392
392 393 #-----------------------------------------------------------------------------
393 394 # MPIExec launchers
394 395 #-----------------------------------------------------------------------------
395 396
396 397
397 398 class MPIExecLauncher(LocalProcessLauncher):
398 399 """Launch an external process using mpiexec."""
399 400
400 401 # The mpiexec command to use in starting the process.
401 402 mpi_cmd = List(['mpiexec'], config=True)
402 403 # The command line arguments to pass to mpiexec.
403 404 mpi_args = List([], config=True)
404 405 # The program to start using mpiexec.
405 406 program = List(['date'], config=True)
406 407 # The command line argument to the program.
407 408 program_args = List([], config=True)
408 409 # The number of instances of the program to start.
409 410 n = Int(1, config=True)
410 411
411 412 def find_args(self):
412 413 """Build self.args using all the fields."""
413 414 return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \
414 415 self.program + self.program_args
415 416
416 417 def start(self, n):
417 418 """Start n instances of the program using mpiexec."""
418 419 self.n = n
419 420 return super(MPIExecLauncher, self).start()
420 421
421 422
422 423 class MPIExecControllerLauncher(MPIExecLauncher):
423 424 """Launch a controller using mpiexec."""
424 425
425 426 controller_cmd = List(ipcontroller_cmd_argv, config=True)
426 427 # Command line arguments to ipcontroller.
427 428 controller_args = List(['--log-to-file','--log-level', str(logging.ERROR)], config=True)
428 429 n = Int(1, config=False)
429 430
430 431 def start(self, cluster_dir):
431 432 """Start the controller by cluster_dir."""
432 433 self.controller_args.extend(['--cluster-dir', cluster_dir])
433 434 self.cluster_dir = unicode(cluster_dir)
434 logging.info("Starting MPIExecControllerLauncher: %r" % self.args)
435 self.log.info("Starting MPIExecControllerLauncher: %r" % self.args)
435 436 return super(MPIExecControllerLauncher, self).start(1)
436 437
437 438 def find_args(self):
438 439 return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \
439 440 self.controller_cmd + self.controller_args
440 441
441 442
442 443 class MPIExecEngineSetLauncher(MPIExecLauncher):
443 444
444 445 engine_cmd = List(ipengine_cmd_argv, config=True)
445 446 # Command line arguments for ipengine.
446 447 engine_args = List(
447 448 ['--log-to-file','--log-level', str(logging.ERROR)], config=True
448 449 )
449 450 n = Int(1, config=True)
450 451
451 452 def start(self, n, cluster_dir):
452 453 """Start n engines by profile or cluster_dir."""
453 454 self.engine_args.extend(['--cluster-dir', cluster_dir])
454 455 self.cluster_dir = unicode(cluster_dir)
455 456 self.n = n
456 logging.info('Starting MPIExecEngineSetLauncher: %r' % self.args)
457 self.log.info('Starting MPIExecEngineSetLauncher: %r' % self.args)
457 458 return super(MPIExecEngineSetLauncher, self).start(n)
458 459
459 460 def find_args(self):
460 461 return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \
461 462 self.engine_cmd + self.engine_args
462 463
463 464
464 465 #-----------------------------------------------------------------------------
465 466 # SSH launchers
466 467 #-----------------------------------------------------------------------------
467 468
468 469 # TODO: Get SSH Launcher working again.
469 470
470 471 class SSHLauncher(LocalProcessLauncher):
471 472 """A minimal launcher for ssh.
472 473
473 474 To be useful this will probably have to be extended to use the ``sshx``
474 475 idea for environment variables. There could be other things this needs
475 476 as well.
476 477 """
477 478
478 479 ssh_cmd = List(['ssh'], config=True)
479 480 ssh_args = List([], config=True)
480 481 program = List(['date'], config=True)
481 482 program_args = List([], config=True)
482 483 hostname = Str('', config=True)
483 484 user = Str(os.environ.get('USER','username'), config=True)
484 485 location = Str('')
485 486
486 487 def _hostname_changed(self, name, old, new):
487 488 self.location = '%s@%s' % (self.user, new)
488 489
489 490 def _user_changed(self, name, old, new):
490 491 self.location = '%s@%s' % (new, self.hostname)
491 492
492 493 def find_args(self):
493 494 return self.ssh_cmd + self.ssh_args + [self.location] + \
494 495 self.program + self.program_args
495 496
496 497 def start(self, cluster_dir, hostname=None, user=None):
497 498 if hostname is not None:
498 499 self.hostname = hostname
499 500 if user is not None:
500 501 self.user = user
501 502 return super(SSHLauncher, self).start()
502 503
503 504
504 505 class SSHControllerLauncher(SSHLauncher):
505 506
506 507 program = List(ipcontroller_cmd_argv, config=True)
507 508 # Command line arguments to ipcontroller.
508 509 program_args = List(['--log-to-file','--log-level', str(logging.ERROR)], config=True)
509 510
510 511
511 512 class SSHEngineLauncher(SSHLauncher):
512 513 program = List(ipengine_cmd_argv, config=True)
513 514 # Command line arguments for ipengine.
514 515 program_args = List(
515 516 ['--log-to-file','--log-level', str(logging.ERROR)], config=True
516 517 )
517 518
518 519 class SSHEngineSetLauncher(LocalEngineSetLauncher):
519 520 launcher_class = SSHEngineLauncher
520 521
521 522
522 523 #-----------------------------------------------------------------------------
523 524 # Windows HPC Server 2008 scheduler launchers
524 525 #-----------------------------------------------------------------------------
525 526
526 527
527 528 # # This is only used on Windows.
528 529 # def find_job_cmd():
529 530 # if os.name=='nt':
530 531 # try:
531 532 # return find_cmd('job')
532 533 # except FindCmdError:
533 534 # return 'job'
534 535 # else:
535 536 # return 'job'
536 537 #
537 538 #
538 539 # class WindowsHPCLauncher(BaseLauncher):
539 540 #
540 541 # # A regular expression used to get the job id from the output of the
541 542 # # submit_command.
542 543 # job_id_regexp = Str(r'\d+', config=True)
543 544 # # The filename of the instantiated job script.
544 545 # job_file_name = Unicode(u'ipython_job.xml', config=True)
545 546 # # The full path to the instantiated job script. This gets made dynamically
546 547 # # by combining the work_dir with the job_file_name.
547 548 # job_file = Unicode(u'')
548 549 # # The hostname of the scheduler to submit the job to
549 550 # scheduler = Str('', config=True)
550 551 # job_cmd = Str(find_job_cmd(), config=True)
551 552 #
552 553 # def __init__(self, work_dir=u'.', config=None):
553 554 # super(WindowsHPCLauncher, self).__init__(
554 555 # work_dir=work_dir, config=config
555 556 # )
556 557 #
557 558 # @property
558 559 # def job_file(self):
559 560 # return os.path.join(self.work_dir, self.job_file_name)
560 561 #
561 562 # def write_job_file(self, n):
562 563 # raise NotImplementedError("Implement write_job_file in a subclass.")
563 564 #
564 565 # def find_args(self):
565 566 # return ['job.exe']
566 567 #
567 568 # def parse_job_id(self, output):
568 569 # """Take the output of the submit command and return the job id."""
569 570 # m = re.search(self.job_id_regexp, output)
570 571 # if m is not None:
571 572 # job_id = m.group()
572 573 # else:
573 574 # raise LauncherError("Job id couldn't be determined: %s" % output)
574 575 # self.job_id = job_id
575 # logging.info('Job started with job id: %r' % job_id)
576 # self.log.info('Job started with job id: %r' % job_id)
576 577 # return job_id
577 578 #
578 579 # @inlineCallbacks
579 580 # def start(self, n):
580 581 # """Start n copies of the process using the Win HPC job scheduler."""
581 582 # self.write_job_file(n)
582 583 # args = [
583 584 # 'submit',
584 585 # '/jobfile:%s' % self.job_file,
585 586 # '/scheduler:%s' % self.scheduler
586 587 # ]
587 # logging.info("Starting Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
588 # self.log.info("Starting Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
588 589 # # Twisted will raise DeprecationWarnings if we try to pass unicode to this
589 590 # output = yield getProcessOutput(str(self.job_cmd),
590 591 # [str(a) for a in args],
591 592 # env=dict((str(k),str(v)) for k,v in os.environ.items()),
592 593 # path=self.work_dir
593 594 # )
594 595 # job_id = self.parse_job_id(output)
595 596 # self.notify_start(job_id)
596 597 # defer.returnValue(job_id)
597 598 #
598 599 # @inlineCallbacks
599 600 # def stop(self):
600 601 # args = [
601 602 # 'cancel',
602 603 # self.job_id,
603 604 # '/scheduler:%s' % self.scheduler
604 605 # ]
605 # logging.info("Stopping Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
606 # self.log.info("Stopping Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
606 607 # try:
607 608 # # Twisted will raise DeprecationWarnings if we try to pass unicode to this
608 609 # output = yield getProcessOutput(str(self.job_cmd),
609 610 # [str(a) for a in args],
610 611 # env=dict((str(k),str(v)) for k,v in os.environ.iteritems()),
611 612 # path=self.work_dir
612 613 # )
613 614 # except:
614 615 # output = 'The job already appears to be stoppped: %r' % self.job_id
615 616 # self.notify_stop(output) # Pass the output of the kill cmd
616 617 # defer.returnValue(output)
617 618 #
618 619 #
619 620 # class WindowsHPCControllerLauncher(WindowsHPCLauncher):
620 621 #
621 622 # job_file_name = Unicode(u'ipcontroller_job.xml', config=True)
622 623 # extra_args = List([], config=False)
623 624 #
624 625 # def write_job_file(self, n):
625 626 # job = IPControllerJob(config=self.config)
626 627 #
627 628 # t = IPControllerTask(config=self.config)
628 629 # # The tasks work directory is *not* the actual work directory of
629 630 # # the controller. It is used as the base path for the stdout/stderr
630 631 # # files that the scheduler redirects to.
631 632 # t.work_directory = self.cluster_dir
632 633 # # Add the --cluster-dir and from self.start().
633 634 # t.controller_args.extend(self.extra_args)
634 635 # job.add_task(t)
635 636 #
636 # logging.info("Writing job description file: %s" % self.job_file)
637 # self.log.info("Writing job description file: %s" % self.job_file)
637 638 # job.write(self.job_file)
638 639 #
639 640 # @property
640 641 # def job_file(self):
641 642 # return os.path.join(self.cluster_dir, self.job_file_name)
642 643 #
643 644 # def start(self, cluster_dir):
644 645 # """Start the controller by cluster_dir."""
645 646 # self.extra_args = ['--cluster-dir', cluster_dir]
646 647 # self.cluster_dir = unicode(cluster_dir)
647 648 # return super(WindowsHPCControllerLauncher, self).start(1)
648 649 #
649 650 #
650 651 # class WindowsHPCEngineSetLauncher(WindowsHPCLauncher):
651 652 #
652 653 # job_file_name = Unicode(u'ipengineset_job.xml', config=True)
653 654 # extra_args = List([], config=False)
654 655 #
655 656 # def write_job_file(self, n):
656 657 # job = IPEngineSetJob(config=self.config)
657 658 #
658 659 # for i in range(n):
659 660 # t = IPEngineTask(config=self.config)
660 661 # # The tasks work directory is *not* the actual work directory of
661 662 # # the engine. It is used as the base path for the stdout/stderr
662 663 # # files that the scheduler redirects to.
663 664 # t.work_directory = self.cluster_dir
664 665 # # Add the --cluster-dir and from self.start().
665 666 # t.engine_args.extend(self.extra_args)
666 667 # job.add_task(t)
667 668 #
668 # logging.info("Writing job description file: %s" % self.job_file)
669 # self.log.info("Writing job description file: %s" % self.job_file)
669 670 # job.write(self.job_file)
670 671 #
671 672 # @property
672 673 # def job_file(self):
673 674 # return os.path.join(self.cluster_dir, self.job_file_name)
674 675 #
675 676 # def start(self, n, cluster_dir):
676 677 # """Start the controller by cluster_dir."""
677 678 # self.extra_args = ['--cluster-dir', cluster_dir]
678 679 # self.cluster_dir = unicode(cluster_dir)
679 680 # return super(WindowsHPCEngineSetLauncher, self).start(n)
680 681 #
681 682 #
682 683 # #-----------------------------------------------------------------------------
683 684 # # Batch (PBS) system launchers
684 685 # #-----------------------------------------------------------------------------
685 686 #
686 687 # # TODO: Get PBS launcher working again.
687 688 #
688 689 # class BatchSystemLauncher(BaseLauncher):
689 690 # """Launch an external process using a batch system.
690 691 #
691 692 # This class is designed to work with UNIX batch systems like PBS, LSF,
692 693 # GridEngine, etc. The overall model is that there are different commands
693 694 # like qsub, qdel, etc. that handle the starting and stopping of the process.
694 695 #
695 696 # This class also has the notion of a batch script. The ``batch_template``
696 697 # attribute can be set to a string that is a template for the batch script.
697 698 # This template is instantiated using Itpl. Thus the template can use
698 699 # ${n} fot the number of instances. Subclasses can add additional variables
699 700 # to the template dict.
700 701 # """
701 702 #
702 703 # # Subclasses must fill these in. See PBSEngineSet
703 704 # # The name of the command line program used to submit jobs.
704 705 # submit_command = Str('', config=True)
705 706 # # The name of the command line program used to delete jobs.
706 707 # delete_command = Str('', config=True)
707 708 # # A regular expression used to get the job id from the output of the
708 709 # # submit_command.
709 710 # job_id_regexp = Str('', config=True)
710 711 # # The string that is the batch script template itself.
711 712 # batch_template = Str('', config=True)
712 713 # # The filename of the instantiated batch script.
713 714 # batch_file_name = Unicode(u'batch_script', config=True)
714 715 # # The full path to the instantiated batch script.
715 716 # batch_file = Unicode(u'')
716 717 #
717 718 # def __init__(self, work_dir=u'.', config=None):
718 719 # super(BatchSystemLauncher, self).__init__(
719 720 # work_dir=work_dir, config=config
720 721 # )
721 722 # self.batch_file = os.path.join(self.work_dir, self.batch_file_name)
722 723 # self.context = {}
723 724 #
724 725 # def parse_job_id(self, output):
725 726 # """Take the output of the submit command and return the job id."""
726 727 # m = re.match(self.job_id_regexp, output)
727 728 # if m is not None:
728 729 # job_id = m.group()
729 730 # else:
730 731 # raise LauncherError("Job id couldn't be determined: %s" % output)
731 732 # self.job_id = job_id
732 # logging.info('Job started with job id: %r' % job_id)
733 # self.log.info('Job started with job id: %r' % job_id)
733 734 # return job_id
734 735 #
735 736 # def write_batch_script(self, n):
736 737 # """Instantiate and write the batch script to the work_dir."""
737 738 # self.context['n'] = n
738 739 # script_as_string = Itpl.itplns(self.batch_template, self.context)
739 # logging.info('Writing instantiated batch script: %s' % self.batch_file)
740 # self.log.info('Writing instantiated batch script: %s' % self.batch_file)
740 741 # f = open(self.batch_file, 'w')
741 742 # f.write(script_as_string)
742 743 # f.close()
743 744 #
744 745 # @inlineCallbacks
745 746 # def start(self, n):
746 747 # """Start n copies of the process using a batch system."""
747 748 # self.write_batch_script(n)
748 749 # output = yield getProcessOutput(self.submit_command,
749 750 # [self.batch_file], env=os.environ)
750 751 # job_id = self.parse_job_id(output)
751 752 # self.notify_start(job_id)
752 753 # defer.returnValue(job_id)
753 754 #
754 755 # @inlineCallbacks
755 756 # def stop(self):
756 757 # output = yield getProcessOutput(self.delete_command,
757 758 # [self.job_id], env=os.environ
758 759 # )
759 760 # self.notify_stop(output) # Pass the output of the kill cmd
760 761 # defer.returnValue(output)
761 762 #
762 763 #
763 764 # class PBSLauncher(BatchSystemLauncher):
764 765 # """A BatchSystemLauncher subclass for PBS."""
765 766 #
766 767 # submit_command = Str('qsub', config=True)
767 768 # delete_command = Str('qdel', config=True)
768 769 # job_id_regexp = Str(r'\d+', config=True)
769 770 # batch_template = Str('', config=True)
770 771 # batch_file_name = Unicode(u'pbs_batch_script', config=True)
771 772 # batch_file = Unicode(u'')
772 773 #
773 774 #
774 775 # class PBSControllerLauncher(PBSLauncher):
775 776 # """Launch a controller using PBS."""
776 777 #
777 778 # batch_file_name = Unicode(u'pbs_batch_script_controller', config=True)
778 779 #
779 780 # def start(self, cluster_dir):
780 781 # """Start the controller by profile or cluster_dir."""
781 782 # # Here we save profile and cluster_dir in the context so they
782 783 # # can be used in the batch script template as ${profile} and
783 784 # # ${cluster_dir}
784 785 # self.context['cluster_dir'] = cluster_dir
785 786 # self.cluster_dir = unicode(cluster_dir)
786 # logging.info("Starting PBSControllerLauncher: %r" % self.args)
787 # self.log.info("Starting PBSControllerLauncher: %r" % self.args)
787 788 # return super(PBSControllerLauncher, self).start(1)
788 789 #
789 790 #
790 791 # class PBSEngineSetLauncher(PBSLauncher):
791 792 #
792 793 # batch_file_name = Unicode(u'pbs_batch_script_engines', config=True)
793 794 #
794 795 # def start(self, n, cluster_dir):
795 796 # """Start n engines by profile or cluster_dir."""
796 797 # self.program_args.extend(['--cluster-dir', cluster_dir])
797 798 # self.cluster_dir = unicode(cluster_dir)
798 # logging.info('Starting PBSEngineSetLauncher: %r' % self.args)
799 # self.log.info('Starting PBSEngineSetLauncher: %r' % self.args)
799 800 # return super(PBSEngineSetLauncher, self).start(n)
800 801
801 802
802 803 #-----------------------------------------------------------------------------
803 804 # A launcher for ipcluster itself!
804 805 #-----------------------------------------------------------------------------
805 806
806 807
807 808 class IPClusterLauncher(LocalProcessLauncher):
808 809 """Launch the ipcluster program in an external process."""
809 810
810 811 ipcluster_cmd = List(ipcluster_cmd_argv, config=True)
811 812 # Command line arguments to pass to ipcluster.
812 813 ipcluster_args = List(
813 814 ['--clean-logs', '--log-to-file', '--log-level', str(logging.ERROR)], config=True)
814 815 ipcluster_subcommand = Str('start')
815 816 ipcluster_n = Int(2)
816 817
817 818 def find_args(self):
818 819 return self.ipcluster_cmd + [self.ipcluster_subcommand] + \
819 820 ['-n', repr(self.ipcluster_n)] + self.ipcluster_args
820 821
821 822 def start(self):
822 logging.info("Starting ipcluster: %r" % self.args)
823 self.log.info("Starting ipcluster: %r" % self.args)
823 824 return super(IPClusterLauncher, self).start()
824 825
@@ -1,92 +1,97 b''
1 1 #!/usr/bin/env python
2 2 """A simple logger object that consolidates messages incoming from ipclusterz processes."""
3 3
4 4 #-----------------------------------------------------------------------------
5 5 # Copyright (C) 2011 The IPython Development Team
6 6 #
7 7 # Distributed under the terms of the BSD License. The full license is in
8 8 # the file COPYING, distributed as part of this software.
9 9 #-----------------------------------------------------------------------------
10 10
11 11 #-----------------------------------------------------------------------------
12 12 # Imports
13 13 #-----------------------------------------------------------------------------
14 14
15 15
16 16 import sys
17 17 import logging
18 18
19 19 import zmq
20 20 from zmq.eventloop import ioloop, zmqstream
21 from IPython.config.configurable import Configurable
22 21 from IPython.utils.traitlets import Int, Str, Instance, List
23 22
23 from factory import LoggingFactory
24
24 25 #-----------------------------------------------------------------------------
25 26 # Classes
26 27 #-----------------------------------------------------------------------------
27 28
28 29
29 class LogWatcher(Configurable):
30 class LogWatcher(LoggingFactory):
30 31 """A simple class that receives messages on a SUB socket, as published
31 32 by subclasses of `zmq.log.handlers.PUBHandler`, and logs them itself.
32 33
33 34 This can subscribe to multiple topics, but defaults to all topics.
34 35 """
35 36 # configurables
36 37 topics = List([''], config=True)
37 38 url = Str('tcp://127.0.0.1:20202', config=True)
38 39
39 40 # internals
40 41 context = Instance(zmq.Context, (), {})
41 42 stream = Instance('zmq.eventloop.zmqstream.ZMQStream')
42 43 loop = Instance('zmq.eventloop.ioloop.IOLoop')
43 44 def _loop_default(self):
44 45 return ioloop.IOLoop.instance()
45 46
46 def __init__(self, config=None):
47 super(LogWatcher, self).__init__(config=config)
47 def __init__(self, **kwargs):
48 super(LogWatcher, self).__init__(**kwargs)
48 49 s = self.context.socket(zmq.SUB)
49 50 s.bind(self.url)
50 51 self.stream = zmqstream.ZMQStream(s, self.loop)
51 52 self.subscribe()
52 53 self.on_trait_change(self.subscribe, 'topics')
53
54
55 def start(self):
54 56 self.stream.on_recv(self.log_message)
55 57
58 def stop(self):
59 self.stream.stop_on_recv()
60
56 61 def subscribe(self):
57 62 """Update our SUB socket's subscriptions."""
58 63 self.stream.setsockopt(zmq.UNSUBSCRIBE, '')
59 64 for topic in self.topics:
60 logging.debug("Subscribing to: %r"%topic)
65 self.log.debug("Subscribing to: %r"%topic)
61 66 self.stream.setsockopt(zmq.SUBSCRIBE, topic)
62 67
63 68 def _extract_level(self, topic_str):
64 69 """Turn 'engine.0.INFO.extra' into (logging.INFO, 'engine.0.extra')"""
65 70 topics = topic_str.split('.')
66 71 for idx,t in enumerate(topics):
67 72 level = getattr(logging, t, None)
68 73 if level is not None:
69 74 break
70 75
71 76 if level is None:
72 77 level = logging.INFO
73 78 else:
74 79 topics.pop(idx)
75 80
76 81 return level, '.'.join(topics)
77 82
78 83
79 84 def log_message(self, raw):
80 85 """receive and parse a message, then log it."""
81 86 if len(raw) != 2 or '.' not in raw[0]:
82 logging.error("Invalid log message: %s"%raw)
87 self.log.error("Invalid log message: %s"%raw)
83 88 return
84 89 else:
85 90 topic, msg = raw
86 91 # don't newline, since log messages always newline:
87 92 topic,level_name = topic.rsplit('.',1)
88 93 level,topic = self._extract_level(topic)
89 94 if msg[-1] == '\n':
90 95 msg = msg[:-1]
91 96 logging.log(level, "[%s] %s" % (topic, msg))
92 97
@@ -1,509 +1,511 b''
1 1 """The Python scheduler for rich scheduling.
2 2
3 3 The Pure ZMQ scheduler does not allow routing schemes other than LRU,
4 4 nor does it check msg_id DAG dependencies. For those, a slightly slower
5 5 Python Scheduler exists.
6 6 """
7 7
8 8 #----------------------------------------------------------------------
9 9 # Imports
10 10 #----------------------------------------------------------------------
11 11
12 12 from __future__ import print_function
13 from random import randint,random
13 import sys
14 14 import logging
15 from random import randint,random
15 16 from types import FunctionType
16 17
17 18 try:
18 19 import numpy
19 20 except ImportError:
20 21 numpy = None
21 22
22 23 import zmq
23 24 from zmq.eventloop import ioloop, zmqstream
24 25
25 26 # local imports
26 27 from IPython.external.decorator import decorator
27 from IPython.config.configurable import Configurable
28 # from IPython.config.configurable import Configurable
28 29 from IPython.utils.traitlets import Instance, Dict, List, Set
29 30
30 31 import error
31 32 from client import Client
32 33 from dependency import Dependency
33 34 import streamsession as ss
34 35 from entry_point import connect_logger, local_logger
36 from factory import LoggingFactory
35 37
36 38
37 39 @decorator
38 40 def logged(f,self,*args,**kwargs):
39 41 # print ("#--------------------")
40 logging.debug("scheduler::%s(*%s,**%s)"%(f.func_name, args, kwargs))
42 self.log.debug("scheduler::%s(*%s,**%s)"%(f.func_name, args, kwargs))
41 43 # print ("#--")
42 44 return f(self,*args, **kwargs)
43 45
44 46 #----------------------------------------------------------------------
45 47 # Chooser functions
46 48 #----------------------------------------------------------------------
47 49
48 50 def plainrandom(loads):
49 51 """Plain random pick."""
50 52 n = len(loads)
51 53 return randint(0,n-1)
52 54
53 55 def lru(loads):
54 56 """Always pick the front of the line.
55 57
56 58 The content of `loads` is ignored.
57 59
58 60 Assumes LRU ordering of loads, with oldest first.
59 61 """
60 62 return 0
61 63
62 64 def twobin(loads):
63 65 """Pick two at random, use the LRU of the two.
64 66
65 67 The content of loads is ignored.
66 68
67 69 Assumes LRU ordering of loads, with oldest first.
68 70 """
69 71 n = len(loads)
70 72 a = randint(0,n-1)
71 73 b = randint(0,n-1)
72 74 return min(a,b)
73 75
74 76 def weighted(loads):
75 77 """Pick two at random using inverse load as weight.
76 78
77 79 Return the less loaded of the two.
78 80 """
79 81 # weight 0 a million times more than 1:
80 82 weights = 1./(1e-6+numpy.array(loads))
81 83 sums = weights.cumsum()
82 84 t = sums[-1]
83 85 x = random()*t
84 86 y = random()*t
85 87 idx = 0
86 88 idy = 0
87 89 while sums[idx] < x:
88 90 idx += 1
89 91 while sums[idy] < y:
90 92 idy += 1
91 93 if weights[idy] > weights[idx]:
92 94 return idy
93 95 else:
94 96 return idx
95 97
96 98 def leastload(loads):
97 99 """Always choose the lowest load.
98 100
99 101 If the lowest load occurs more than once, the first
100 102 occurance will be used. If loads has LRU ordering, this means
101 103 the LRU of those with the lowest load is chosen.
102 104 """
103 105 return loads.index(min(loads))
104 106
105 107 #---------------------------------------------------------------------
106 108 # Classes
107 109 #---------------------------------------------------------------------
108 110 # store empty default dependency:
109 111 MET = Dependency([])
110 112
111 class TaskScheduler(Configurable):
113 class TaskScheduler(LoggingFactory):
112 114 """Python TaskScheduler object.
113 115
114 116 This is the simplest object that supports msg_id based
115 117 DAG dependencies. *Only* task msg_ids are checked, not
116 118 msg_ids of jobs submitted via the MUX queue.
117 119
118 120 """
119 121
120 122 # input arguments:
121 123 scheme = Instance(FunctionType, default=leastload) # function for determining the destination
122 124 client_stream = Instance(zmqstream.ZMQStream) # client-facing stream
123 125 engine_stream = Instance(zmqstream.ZMQStream) # engine-facing stream
124 126 notifier_stream = Instance(zmqstream.ZMQStream) # hub-facing sub stream
125 127 mon_stream = Instance(zmqstream.ZMQStream) # hub-facing pub stream
126 128 io_loop = Instance(ioloop.IOLoop)
127 129
128 130 # internals:
129 131 dependencies = Dict() # dict by msg_id of [ msg_ids that depend on key ]
130 132 depending = Dict() # dict by msg_id of (msg_id, raw_msg, after, follow)
131 133 pending = Dict() # dict by engine_uuid of submitted tasks
132 134 completed = Dict() # dict by engine_uuid of completed tasks
133 135 failed = Dict() # dict by engine_uuid of failed tasks
134 136 destinations = Dict() # dict by msg_id of engine_uuids where jobs ran (reverse of completed+failed)
135 137 clients = Dict() # dict by msg_id for who submitted the task
136 138 targets = List() # list of target IDENTs
137 139 loads = List() # list of engine loads
138 140 all_completed = Set() # set of all completed tasks
139 141 all_failed = Set() # set of all failed tasks
140 142 all_done = Set() # set of all finished tasks=union(completed,failed)
141 143 blacklist = Dict() # dict by msg_id of locations where a job has encountered UnmetDependency
142 144 session = Instance(ss.StreamSession)
143 145
144 146
145 147 def __init__(self, **kwargs):
146 148 super(TaskScheduler, self).__init__(**kwargs)
147 149
148 150 self.session = ss.StreamSession(username="TaskScheduler")
149 151
150 152 self.engine_stream.on_recv(self.dispatch_result, copy=False)
151 153 self._notification_handlers = dict(
152 154 registration_notification = self._register_engine,
153 155 unregistration_notification = self._unregister_engine
154 156 )
155 157 self.notifier_stream.on_recv(self.dispatch_notification)
156 logging.info("Scheduler started...%r"%self)
158 self.log.info("Scheduler started...%r"%self)
157 159
158 160 def resume_receiving(self):
159 161 """Resume accepting jobs."""
160 162 self.client_stream.on_recv(self.dispatch_submission, copy=False)
161 163
162 164 def stop_receiving(self):
163 165 """Stop accepting jobs while there are no engines.
164 166 Leave them in the ZMQ queue."""
165 167 self.client_stream.on_recv(None)
166 168
167 169 #-----------------------------------------------------------------------
168 170 # [Un]Registration Handling
169 171 #-----------------------------------------------------------------------
170 172
171 173 def dispatch_notification(self, msg):
172 174 """dispatch register/unregister events."""
173 175 idents,msg = self.session.feed_identities(msg)
174 176 msg = self.session.unpack_message(msg)
175 177 msg_type = msg['msg_type']
176 178 handler = self._notification_handlers.get(msg_type, None)
177 179 if handler is None:
178 180 raise Exception("Unhandled message type: %s"%msg_type)
179 181 else:
180 182 try:
181 183 handler(str(msg['content']['queue']))
182 184 except KeyError:
183 logging.error("task::Invalid notification msg: %s"%msg)
185 self.log.error("task::Invalid notification msg: %s"%msg)
184 186
185 187 @logged
186 188 def _register_engine(self, uid):
187 189 """New engine with ident `uid` became available."""
188 190 # head of the line:
189 191 self.targets.insert(0,uid)
190 192 self.loads.insert(0,0)
191 193 # initialize sets
192 194 self.completed[uid] = set()
193 195 self.failed[uid] = set()
194 196 self.pending[uid] = {}
195 197 if len(self.targets) == 1:
196 198 self.resume_receiving()
197 199
198 200 def _unregister_engine(self, uid):
199 201 """Existing engine with ident `uid` became unavailable."""
200 202 if len(self.targets) == 1:
201 203 # this was our only engine
202 204 self.stop_receiving()
203 205
204 206 # handle any potentially finished tasks:
205 207 self.engine_stream.flush()
206 208
207 209 self.completed.pop(uid)
208 210 self.failed.pop(uid)
209 211 # don't pop destinations, because it might be used later
210 212 # map(self.destinations.pop, self.completed.pop(uid))
211 213 # map(self.destinations.pop, self.failed.pop(uid))
212 214
213 215 lost = self.pending.pop(uid)
214 216
215 217 idx = self.targets.index(uid)
216 218 self.targets.pop(idx)
217 219 self.loads.pop(idx)
218 220
219 221 self.handle_stranded_tasks(lost)
220 222
221 223 def handle_stranded_tasks(self, lost):
222 224 """Deal with jobs resident in an engine that died."""
223 225 # TODO: resubmit the tasks?
224 226 for msg_id in lost:
225 227 pass
226 228
227 229
228 230 #-----------------------------------------------------------------------
229 231 # Job Submission
230 232 #-----------------------------------------------------------------------
231 233 @logged
232 234 def dispatch_submission(self, raw_msg):
233 235 """Dispatch job submission to appropriate handlers."""
234 236 # ensure targets up to date:
235 237 self.notifier_stream.flush()
236 238 try:
237 239 idents, msg = self.session.feed_identities(raw_msg, copy=False)
238 240 except Exception as e:
239 logging.error("task::Invaid msg: %s"%msg)
241 self.log.error("task::Invaid msg: %s"%msg)
240 242 return
241 243
242 244 # send to monitor
243 245 self.mon_stream.send_multipart(['intask']+raw_msg, copy=False)
244 246
245 247 msg = self.session.unpack_message(msg, content=False, copy=False)
246 248 header = msg['header']
247 249 msg_id = header['msg_id']
248 250
249 251 # time dependencies
250 252 after = Dependency(header.get('after', []))
251 253 if after.mode == 'all':
252 254 after.difference_update(self.all_completed)
253 255 if not after.success_only:
254 256 after.difference_update(self.all_failed)
255 257 if after.check(self.all_completed, self.all_failed):
256 258 # recast as empty set, if `after` already met,
257 259 # to prevent unnecessary set comparisons
258 260 after = MET
259 261
260 262 # location dependencies
261 263 follow = Dependency(header.get('follow', []))
262 264
263 265 # check if unreachable:
264 266 if after.unreachable(self.all_failed) or follow.unreachable(self.all_failed):
265 267 self.depending[msg_id] = [raw_msg,MET,MET]
266 268 return self.fail_unreachable(msg_id)
267 269
268 270 if after.check(self.all_completed, self.all_failed):
269 271 # time deps already met, try to run
270 272 if not self.maybe_run(msg_id, raw_msg, follow):
271 273 # can't run yet
272 274 self.save_unmet(msg_id, raw_msg, after, follow)
273 275 else:
274 276 self.save_unmet(msg_id, raw_msg, after, follow)
275 277
276 278 @logged
277 279 def fail_unreachable(self, msg_id):
278 280 """a message has become unreachable"""
279 281 if msg_id not in self.depending:
280 logging.error("msg %r already failed!"%msg_id)
282 self.log.error("msg %r already failed!"%msg_id)
281 283 return
282 284 raw_msg, after, follow = self.depending.pop(msg_id)
283 285 for mid in follow.union(after):
284 286 if mid in self.dependencies:
285 287 self.dependencies[mid].remove(msg_id)
286 288
287 289 idents,msg = self.session.feed_identities(raw_msg, copy=False)
288 290 msg = self.session.unpack_message(msg, copy=False, content=False)
289 291 header = msg['header']
290 292
291 293 try:
292 294 raise error.ImpossibleDependency()
293 295 except:
294 296 content = ss.wrap_exception()
295 297
296 298 self.all_done.add(msg_id)
297 299 self.all_failed.add(msg_id)
298 300
299 301 msg = self.session.send(self.client_stream, 'apply_reply', content,
300 302 parent=header, ident=idents)
301 303 self.session.send(self.mon_stream, msg, ident=['outtask']+idents)
302 304
303 305 self.update_dependencies(msg_id, success=False)
304 306
305 307 @logged
306 308 def maybe_run(self, msg_id, raw_msg, follow=None):
307 309 """check location dependencies, and run if they are met."""
308 310
309 311 if follow:
310 312 def can_run(idx):
311 313 target = self.targets[idx]
312 314 return target not in self.blacklist.get(msg_id, []) and\
313 315 follow.check(self.completed[target], self.failed[target])
314 316
315 317 indices = filter(can_run, range(len(self.targets)))
316 318 if not indices:
317 319 # TODO evaluate unmeetable follow dependencies
318 320 if follow.mode == 'all':
319 321 dests = set()
320 322 relevant = self.all_completed if follow.success_only else self.all_done
321 323 for m in follow.intersection(relevant):
322 324 dests.add(self.destinations[m])
323 325 if len(dests) > 1:
324 326 self.fail_unreachable(msg_id)
325 327
326 328
327 329 return False
328 330 else:
329 331 indices = None
330 332
331 333 self.submit_task(msg_id, raw_msg, indices)
332 334 return True
333 335
334 336 @logged
335 337 def save_unmet(self, msg_id, raw_msg, after, follow):
336 338 """Save a message for later submission when its dependencies are met."""
337 339 self.depending[msg_id] = [raw_msg,after,follow]
338 340 # track the ids in follow or after, but not those already finished
339 341 for dep_id in after.union(follow).difference(self.all_done):
340 342 if dep_id not in self.dependencies:
341 343 self.dependencies[dep_id] = set()
342 344 self.dependencies[dep_id].add(msg_id)
343 345
344 346 @logged
345 347 def submit_task(self, msg_id, msg, follow=None, indices=None):
346 348 """Submit a task to any of a subset of our targets."""
347 349 if indices:
348 350 loads = [self.loads[i] for i in indices]
349 351 else:
350 352 loads = self.loads
351 353 idx = self.scheme(loads)
352 354 if indices:
353 355 idx = indices[idx]
354 356 target = self.targets[idx]
355 357 # print (target, map(str, msg[:3]))
356 358 self.engine_stream.send(target, flags=zmq.SNDMORE, copy=False)
357 359 self.engine_stream.send_multipart(msg, copy=False)
358 360 self.add_job(idx)
359 361 self.pending[target][msg_id] = (msg, follow)
360 362 content = dict(msg_id=msg_id, engine_id=target)
361 363 self.session.send(self.mon_stream, 'task_destination', content=content,
362 364 ident=['tracktask',self.session.session])
363 365
364 366 #-----------------------------------------------------------------------
365 367 # Result Handling
366 368 #-----------------------------------------------------------------------
367 369 @logged
368 370 def dispatch_result(self, raw_msg):
369 371 try:
370 372 idents,msg = self.session.feed_identities(raw_msg, copy=False)
371 373 except Exception as e:
372 logging.error("task::Invaid result: %s"%msg)
374 self.log.error("task::Invaid result: %s"%msg)
373 375 return
374 376 msg = self.session.unpack_message(msg, content=False, copy=False)
375 377 header = msg['header']
376 378 if header.get('dependencies_met', True):
377 379 success = (header['status'] == 'ok')
378 380 self.handle_result(idents, msg['parent_header'], raw_msg, success)
379 381 # send to Hub monitor
380 382 self.mon_stream.send_multipart(['outtask']+raw_msg, copy=False)
381 383 else:
382 384 self.handle_unmet_dependency(idents, msg['parent_header'])
383 385
384 386 @logged
385 387 def handle_result(self, idents, parent, raw_msg, success=True):
386 388 # first, relay result to client
387 389 engine = idents[0]
388 390 client = idents[1]
389 391 # swap_ids for XREP-XREP mirror
390 392 raw_msg[:2] = [client,engine]
391 393 # print (map(str, raw_msg[:4]))
392 394 self.client_stream.send_multipart(raw_msg, copy=False)
393 395 # now, update our data structures
394 396 msg_id = parent['msg_id']
395 397 self.pending[engine].pop(msg_id)
396 398 if success:
397 399 self.completed[engine].add(msg_id)
398 400 self.all_completed.add(msg_id)
399 401 else:
400 402 self.failed[engine].add(msg_id)
401 403 self.all_failed.add(msg_id)
402 404 self.all_done.add(msg_id)
403 405 self.destinations[msg_id] = engine
404 406
405 407 self.update_dependencies(msg_id, success)
406 408
407 409 @logged
408 410 def handle_unmet_dependency(self, idents, parent):
409 411 engine = idents[0]
410 412 msg_id = parent['msg_id']
411 413 if msg_id not in self.blacklist:
412 414 self.blacklist[msg_id] = set()
413 415 self.blacklist[msg_id].add(engine)
414 416 raw_msg,follow = self.pending[engine].pop(msg_id)
415 417 if not self.maybe_run(msg_id, raw_msg, follow):
416 418 # resubmit failed, put it back in our dependency tree
417 419 self.save_unmet(msg_id, raw_msg, MET, follow)
418 420 pass
419 421
420 422 @logged
421 423 def update_dependencies(self, dep_id, success=True):
422 424 """dep_id just finished. Update our dependency
423 425 table and submit any jobs that just became runable."""
424 426 # print ("\n\n***********")
425 427 # pprint (dep_id)
426 428 # pprint (self.dependencies)
427 429 # pprint (self.depending)
428 430 # pprint (self.all_completed)
429 431 # pprint (self.all_failed)
430 432 # print ("\n\n***********\n\n")
431 433 if dep_id not in self.dependencies:
432 434 return
433 435 jobs = self.dependencies.pop(dep_id)
434 436
435 437 for msg_id in jobs:
436 438 raw_msg, after, follow = self.depending[msg_id]
437 439 # if dep_id in after:
438 440 # if after.mode == 'all' and (success or not after.success_only):
439 441 # after.remove(dep_id)
440 442
441 443 if after.unreachable(self.all_failed) or follow.unreachable(self.all_failed):
442 444 self.fail_unreachable(msg_id)
443 445
444 446 elif after.check(self.all_completed, self.all_failed): # time deps met, maybe run
445 447 self.depending[msg_id][1] = MET
446 448 if self.maybe_run(msg_id, raw_msg, follow):
447 449
448 450 self.depending.pop(msg_id)
449 451 for mid in follow.union(after):
450 452 if mid in self.dependencies:
451 453 self.dependencies[mid].remove(msg_id)
452 454
453 455 #----------------------------------------------------------------------
454 456 # methods to be overridden by subclasses
455 457 #----------------------------------------------------------------------
456 458
457 459 def add_job(self, idx):
458 460 """Called after self.targets[idx] just got the job with header.
459 461 Override with subclasses. The default ordering is simple LRU.
460 462 The default loads are the number of outstanding jobs."""
461 463 self.loads[idx] += 1
462 464 for lis in (self.targets, self.loads):
463 465 lis.append(lis.pop(idx))
464 466
465 467
466 468 def finish_job(self, idx):
467 469 """Called after self.targets[idx] just finished a job.
468 470 Override with subclasses."""
469 471 self.loads[idx] -= 1
470 472
471 473
472 474
473 def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, log_addr=None, loglevel=logging.DEBUG, scheme='weighted'):
475 def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, logname='ZMQ', log_addr=None, loglevel=logging.DEBUG, scheme='weighted'):
474 476 from zmq.eventloop import ioloop
475 477 from zmq.eventloop.zmqstream import ZMQStream
476 478
477 479 ctx = zmq.Context()
478 480 loop = ioloop.IOLoop()
479 481
480 482 ins = ZMQStream(ctx.socket(zmq.XREP),loop)
481 483 ins.bind(in_addr)
482 484 outs = ZMQStream(ctx.socket(zmq.XREP),loop)
483 485 outs.bind(out_addr)
484 486 mons = ZMQStream(ctx.socket(zmq.PUB),loop)
485 487 mons.connect(mon_addr)
486 488 nots = ZMQStream(ctx.socket(zmq.SUB),loop)
487 489 nots.setsockopt(zmq.SUBSCRIBE, '')
488 490 nots.connect(not_addr)
489 491
490 492 scheme = globals().get(scheme, None)
491 493 # setup logging
492 494 if log_addr:
493 connect_logger(ctx, log_addr, root="scheduler", loglevel=loglevel)
495 connect_logger(logname, ctx, log_addr, root="scheduler", loglevel=loglevel)
494 496 else:
495 local_logger(loglevel)
497 local_logger(logname, loglevel)
496 498
497 499 scheduler = TaskScheduler(client_stream=ins, engine_stream=outs,
498 500 mon_stream=mons,notifier_stream=nots,
499 scheme=scheme,io_loop=loop)
501 scheme=scheme,io_loop=loop, logname=logname)
500 502
501 503 try:
502 504 loop.start()
503 505 except KeyboardInterrupt:
504 506 print ("interrupted, exiting...", file=sys.__stderr__)
505 507
506 508
507 509 if __name__ == '__main__':
508 510 iface = 'tcp://127.0.0.1:%i'
509 511 launch_scheduler(iface%12345,iface%1236,iface%12347,iface%12348)
@@ -1,483 +1,483 b''
1 1 #!/usr/bin/env python
2 2 """
3 3 Kernel adapted from kernel.py to use ZMQ Streams
4 4 """
5 5
6 6 #-----------------------------------------------------------------------------
7 7 # Imports
8 8 #-----------------------------------------------------------------------------
9 9
10 10 # Standard library imports.
11 11 from __future__ import print_function
12 12 import __builtin__
13 13 from code import CommandCompiler
14 14 import os
15 15 import sys
16 16 import time
17 17 import traceback
18 18 import logging
19 19 from datetime import datetime
20 20 from signal import SIGTERM, SIGKILL
21 21 from pprint import pprint
22 22
23 23 # System library imports.
24 24 import zmq
25 25 from zmq.eventloop import ioloop, zmqstream
26 26
27 27 # Local imports.
28 28 from IPython.core import ultratb
29 29 from IPython.utils.traitlets import HasTraits, Instance, List, Int, Dict, Set, Str
30 30 from IPython.zmq.completer import KernelCompleter
31 31 from IPython.zmq.iostream import OutStream
32 32 from IPython.zmq.displayhook import DisplayHook
33 33
34 34 from factory import SessionFactory
35 35 from streamsession import StreamSession, Message, extract_header, serialize_object,\
36 36 unpack_apply_message, ISO8601, wrap_exception
37 37 import heartmonitor
38 38 from client import Client
39 39
40 40 def printer(*args):
41 41 pprint(args, stream=sys.__stdout__)
42 42
43 43
44 44 class _Passer:
45 45 """Empty class that implements `send()` that does nothing."""
46 46 def send(self, *args, **kwargs):
47 47 pass
48 48 send_multipart = send
49 49
50 50
51 51 #-----------------------------------------------------------------------------
52 52 # Main kernel class
53 53 #-----------------------------------------------------------------------------
54 54
55 55 class Kernel(SessionFactory):
56 56
57 57 #---------------------------------------------------------------------------
58 58 # Kernel interface
59 59 #---------------------------------------------------------------------------
60 60
61 61 # kwargs:
62 62 int_id = Int(-1, config=True)
63 63 user_ns = Dict(config=True)
64 64 exec_lines = List(config=True)
65 65
66 66 control_stream = Instance(zmqstream.ZMQStream)
67 67 task_stream = Instance(zmqstream.ZMQStream)
68 68 iopub_stream = Instance(zmqstream.ZMQStream)
69 69 client = Instance('IPython.zmq.parallel.client.Client')
70 70
71 71 # internals
72 72 shell_streams = List()
73 73 compiler = Instance(CommandCompiler, (), {})
74 74 completer = Instance(KernelCompleter)
75 75
76 76 aborted = Set()
77 77 shell_handlers = Dict()
78 78 control_handlers = Dict()
79 79
80 80 def _set_prefix(self):
81 81 self.prefix = "engine.%s"%self.int_id
82 82
83 83 def _connect_completer(self):
84 84 self.completer = KernelCompleter(self.user_ns)
85 85
86 86 def __init__(self, **kwargs):
87 87 super(Kernel, self).__init__(**kwargs)
88 88 self._set_prefix()
89 89 self._connect_completer()
90 90
91 91 self.on_trait_change(self._set_prefix, 'id')
92 92 self.on_trait_change(self._connect_completer, 'user_ns')
93 93
94 94 # Build dict of handlers for message types
95 95 for msg_type in ['execute_request', 'complete_request', 'apply_request',
96 96 'clear_request']:
97 97 self.shell_handlers[msg_type] = getattr(self, msg_type)
98 98
99 99 for msg_type in ['shutdown_request', 'abort_request']+self.shell_handlers.keys():
100 100 self.control_handlers[msg_type] = getattr(self, msg_type)
101 101
102 102 self._initial_exec_lines()
103 103
104 104 def _wrap_exception(self, method=None):
105 105 e_info = dict(engineid=self.ident, method=method)
106 106 content=wrap_exception(e_info)
107 107 return content
108 108
109 109 def _initial_exec_lines(self):
110 110 s = _Passer()
111 111 content = dict(silent=True, user_variable=[],user_expressions=[])
112 112 for line in self.exec_lines:
113 logging.debug("executing initialization: %s"%line)
113 self.log.debug("executing initialization: %s"%line)
114 114 content.update({'code':line})
115 115 msg = self.session.msg('execute_request', content)
116 116 self.execute_request(s, [], msg)
117 117
118 118
119 119 #-------------------- control handlers -----------------------------
120 120 def abort_queues(self):
121 121 for stream in self.shell_streams:
122 122 if stream:
123 123 self.abort_queue(stream)
124 124
125 125 def abort_queue(self, stream):
126 126 while True:
127 127 try:
128 128 msg = self.session.recv(stream, zmq.NOBLOCK,content=True)
129 129 except zmq.ZMQError as e:
130 130 if e.errno == zmq.EAGAIN:
131 131 break
132 132 else:
133 133 return
134 134 else:
135 135 if msg is None:
136 136 return
137 137 else:
138 138 idents,msg = msg
139 139
140 140 # assert self.reply_socketly_socket.rcvmore(), "Unexpected missing message part."
141 141 # msg = self.reply_socket.recv_json()
142 logging.info("Aborting:")
143 logging.info(str(msg))
142 self.log.info("Aborting:")
143 self.log.info(str(msg))
144 144 msg_type = msg['msg_type']
145 145 reply_type = msg_type.split('_')[0] + '_reply'
146 146 # reply_msg = self.session.msg(reply_type, {'status' : 'aborted'}, msg)
147 147 # self.reply_socket.send(ident,zmq.SNDMORE)
148 148 # self.reply_socket.send_json(reply_msg)
149 149 reply_msg = self.session.send(stream, reply_type,
150 150 content={'status' : 'aborted'}, parent=msg, ident=idents)[0]
151 logging.debug(str(reply_msg))
151 self.log.debug(str(reply_msg))
152 152 # We need to wait a bit for requests to come in. This can probably
153 153 # be set shorter for true asynchronous clients.
154 154 time.sleep(0.05)
155 155
156 156 def abort_request(self, stream, ident, parent):
157 157 """abort a specifig msg by id"""
158 158 msg_ids = parent['content'].get('msg_ids', None)
159 159 if isinstance(msg_ids, basestring):
160 160 msg_ids = [msg_ids]
161 161 if not msg_ids:
162 162 self.abort_queues()
163 163 for mid in msg_ids:
164 164 self.aborted.add(str(mid))
165 165
166 166 content = dict(status='ok')
167 167 reply_msg = self.session.send(stream, 'abort_reply', content=content,
168 168 parent=parent, ident=ident)[0]
169 logging.debug(str(reply_msg))
169 self.log.debug(str(reply_msg))
170 170
171 171 def shutdown_request(self, stream, ident, parent):
172 172 """kill ourself. This should really be handled in an external process"""
173 173 try:
174 174 self.abort_queues()
175 175 except:
176 176 content = self._wrap_exception('shutdown')
177 177 else:
178 178 content = dict(parent['content'])
179 179 content['status'] = 'ok'
180 180 msg = self.session.send(stream, 'shutdown_reply',
181 181 content=content, parent=parent, ident=ident)
182 182 # msg = self.session.send(self.pub_socket, 'shutdown_reply',
183 183 # content, parent, ident)
184 184 # print >> sys.__stdout__, msg
185 185 # time.sleep(0.2)
186 186 dc = ioloop.DelayedCallback(lambda : sys.exit(0), 1000, self.loop)
187 187 dc.start()
188 188
189 189 def dispatch_control(self, msg):
190 190 idents,msg = self.session.feed_identities(msg, copy=False)
191 191 try:
192 192 msg = self.session.unpack_message(msg, content=True, copy=False)
193 193 except:
194 logging.error("Invalid Message", exc_info=True)
194 self.log.error("Invalid Message", exc_info=True)
195 195 return
196 196
197 197 header = msg['header']
198 198 msg_id = header['msg_id']
199 199
200 200 handler = self.control_handlers.get(msg['msg_type'], None)
201 201 if handler is None:
202 logging.error("UNKNOWN CONTROL MESSAGE TYPE: %r"%msg['msg_type'])
202 self.log.error("UNKNOWN CONTROL MESSAGE TYPE: %r"%msg['msg_type'])
203 203 else:
204 204 handler(self.control_stream, idents, msg)
205 205
206 206
207 207 #-------------------- queue helpers ------------------------------
208 208
209 209 def check_dependencies(self, dependencies):
210 210 if not dependencies:
211 211 return True
212 212 if len(dependencies) == 2 and dependencies[0] in 'any all'.split():
213 213 anyorall = dependencies[0]
214 214 dependencies = dependencies[1]
215 215 else:
216 216 anyorall = 'all'
217 217 results = self.client.get_results(dependencies,status_only=True)
218 218 if results['status'] != 'ok':
219 219 return False
220 220
221 221 if anyorall == 'any':
222 222 if not results['completed']:
223 223 return False
224 224 else:
225 225 if results['pending']:
226 226 return False
227 227
228 228 return True
229 229
230 230 def check_aborted(self, msg_id):
231 231 return msg_id in self.aborted
232 232
233 233 #-------------------- queue handlers -----------------------------
234 234
235 235 def clear_request(self, stream, idents, parent):
236 236 """Clear our namespace."""
237 237 self.user_ns = {}
238 238 msg = self.session.send(stream, 'clear_reply', ident=idents, parent=parent,
239 239 content = dict(status='ok'))
240 240 self._initial_exec_lines()
241 241
242 242 def execute_request(self, stream, ident, parent):
243 logging.debug('execute request %s'%parent)
243 self.log.debug('execute request %s'%parent)
244 244 try:
245 245 code = parent[u'content'][u'code']
246 246 except:
247 logging.error("Got bad msg: %s"%parent, exc_info=True)
247 self.log.error("Got bad msg: %s"%parent, exc_info=True)
248 248 return
249 249 self.session.send(self.iopub_stream, u'pyin', {u'code':code},parent=parent,
250 250 ident='%s.pyin'%self.prefix)
251 251 started = datetime.now().strftime(ISO8601)
252 252 try:
253 253 comp_code = self.compiler(code, '<zmq-kernel>')
254 254 # allow for not overriding displayhook
255 255 if hasattr(sys.displayhook, 'set_parent'):
256 256 sys.displayhook.set_parent(parent)
257 257 sys.stdout.set_parent(parent)
258 258 sys.stderr.set_parent(parent)
259 259 exec comp_code in self.user_ns, self.user_ns
260 260 except:
261 261 exc_content = self._wrap_exception('execute')
262 262 # exc_msg = self.session.msg(u'pyerr', exc_content, parent)
263 263 self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent,
264 264 ident='%s.pyerr'%self.prefix)
265 265 reply_content = exc_content
266 266 else:
267 267 reply_content = {'status' : 'ok'}
268 268
269 269 reply_msg = self.session.send(stream, u'execute_reply', reply_content, parent=parent,
270 270 ident=ident, subheader = dict(started=started))
271 logging.debug(str(reply_msg))
271 self.log.debug(str(reply_msg))
272 272 if reply_msg['content']['status'] == u'error':
273 273 self.abort_queues()
274 274
275 275 def complete_request(self, stream, ident, parent):
276 276 matches = {'matches' : self.complete(parent),
277 277 'status' : 'ok'}
278 278 completion_msg = self.session.send(stream, 'complete_reply',
279 279 matches, parent, ident)
280 280 # print >> sys.__stdout__, completion_msg
281 281
282 282 def complete(self, msg):
283 283 return self.completer.complete(msg.content.line, msg.content.text)
284 284
285 285 def apply_request(self, stream, ident, parent):
286 286 # print (parent)
287 287 try:
288 288 content = parent[u'content']
289 289 bufs = parent[u'buffers']
290 290 msg_id = parent['header']['msg_id']
291 291 bound = content.get('bound', False)
292 292 except:
293 logging.error("Got bad msg: %s"%parent, exc_info=True)
293 self.log.error("Got bad msg: %s"%parent, exc_info=True)
294 294 return
295 295 # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
296 296 # self.iopub_stream.send(pyin_msg)
297 297 # self.session.send(self.iopub_stream, u'pyin', {u'code':code},parent=parent)
298 298 sub = {'dependencies_met' : True, 'engine' : self.ident,
299 299 'started': datetime.now().strftime(ISO8601)}
300 300 try:
301 301 # allow for not overriding displayhook
302 302 if hasattr(sys.displayhook, 'set_parent'):
303 303 sys.displayhook.set_parent(parent)
304 304 sys.stdout.set_parent(parent)
305 305 sys.stderr.set_parent(parent)
306 306 # exec "f(*args,**kwargs)" in self.user_ns, self.user_ns
307 307 if bound:
308 308 working = self.user_ns
309 309 suffix = str(msg_id).replace("-","")
310 310 prefix = "_"
311 311
312 312 else:
313 313 working = dict()
314 314 suffix = prefix = "_" # prevent keyword collisions with lambda
315 315 f,args,kwargs = unpack_apply_message(bufs, working, copy=False)
316 316 # if f.fun
317 317 fname = getattr(f, '__name__', 'f')
318 318
319 319 fname = prefix+fname.strip('<>')+suffix
320 320 argname = prefix+"args"+suffix
321 321 kwargname = prefix+"kwargs"+suffix
322 322 resultname = prefix+"result"+suffix
323 323
324 324 ns = { fname : f, argname : args, kwargname : kwargs }
325 325 # print ns
326 326 working.update(ns)
327 327 code = "%s=%s(*%s,**%s)"%(resultname, fname, argname, kwargname)
328 328 exec code in working, working
329 329 result = working.get(resultname)
330 330 # clear the namespace
331 331 if bound:
332 332 for key in ns.iterkeys():
333 333 self.user_ns.pop(key)
334 334 else:
335 335 del working
336 336
337 337 packed_result,buf = serialize_object(result)
338 338 result_buf = [packed_result]+buf
339 339 except:
340 340 exc_content = self._wrap_exception('apply')
341 341 # exc_msg = self.session.msg(u'pyerr', exc_content, parent)
342 342 self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent,
343 343 ident='%s.pyerr'%self.prefix)
344 344 reply_content = exc_content
345 345 result_buf = []
346 346
347 347 if exc_content['ename'] == 'UnmetDependency':
348 348 sub['dependencies_met'] = False
349 349 else:
350 350 reply_content = {'status' : 'ok'}
351 351
352 352 # put 'ok'/'error' status in header, for scheduler introspection:
353 353 sub['status'] = reply_content['status']
354 354
355 355 reply_msg = self.session.send(stream, u'apply_reply', reply_content,
356 356 parent=parent, ident=ident,buffers=result_buf, subheader=sub)
357 357
358 358 # if reply_msg['content']['status'] == u'error':
359 359 # self.abort_queues()
360 360
361 361 def dispatch_queue(self, stream, msg):
362 362 self.control_stream.flush()
363 363 idents,msg = self.session.feed_identities(msg, copy=False)
364 364 try:
365 365 msg = self.session.unpack_message(msg, content=True, copy=False)
366 366 except:
367 logging.error("Invalid Message", exc_info=True)
367 self.log.error("Invalid Message", exc_info=True)
368 368 return
369 369
370 370
371 371 header = msg['header']
372 372 msg_id = header['msg_id']
373 373 if self.check_aborted(msg_id):
374 374 self.aborted.remove(msg_id)
375 375 # is it safe to assume a msg_id will not be resubmitted?
376 376 reply_type = msg['msg_type'].split('_')[0] + '_reply'
377 377 reply_msg = self.session.send(stream, reply_type,
378 378 content={'status' : 'aborted'}, parent=msg, ident=idents)
379 379 return
380 380 handler = self.shell_handlers.get(msg['msg_type'], None)
381 381 if handler is None:
382 logging.error("UNKNOWN MESSAGE TYPE: %r"%msg['msg_type'])
382 self.log.error("UNKNOWN MESSAGE TYPE: %r"%msg['msg_type'])
383 383 else:
384 384 handler(stream, idents, msg)
385 385
386 386 def start(self):
387 387 #### stream mode:
388 388 if self.control_stream:
389 389 self.control_stream.on_recv(self.dispatch_control, copy=False)
390 390 self.control_stream.on_err(printer)
391 391
392 392 def make_dispatcher(stream):
393 393 def dispatcher(msg):
394 394 return self.dispatch_queue(stream, msg)
395 395 return dispatcher
396 396
397 397 for s in self.shell_streams:
398 398 s.on_recv(make_dispatcher(s), copy=False)
399 399 s.on_err(printer)
400 400
401 401 if self.iopub_stream:
402 402 self.iopub_stream.on_err(printer)
403 403
404 404 #### while True mode:
405 405 # while True:
406 406 # idle = True
407 407 # try:
408 408 # msg = self.shell_stream.socket.recv_multipart(
409 409 # zmq.NOBLOCK, copy=False)
410 410 # except zmq.ZMQError, e:
411 411 # if e.errno != zmq.EAGAIN:
412 412 # raise e
413 413 # else:
414 414 # idle=False
415 415 # self.dispatch_queue(self.shell_stream, msg)
416 416 #
417 417 # if not self.task_stream.empty():
418 418 # idle=False
419 419 # msg = self.task_stream.recv_multipart()
420 420 # self.dispatch_queue(self.task_stream, msg)
421 421 # if idle:
422 422 # # don't busywait
423 423 # time.sleep(1e-3)
424 424
425 425 def make_kernel(int_id, identity, control_addr, shell_addrs, iopub_addr, hb_addrs,
426 426 client_addr=None, loop=None, context=None, key=None,
427 427 out_stream_factory=OutStream, display_hook_factory=DisplayHook):
428 428 """NO LONGER IN USE"""
429 429 # create loop, context, and session:
430 430 if loop is None:
431 431 loop = ioloop.IOLoop.instance()
432 432 if context is None:
433 433 context = zmq.Context()
434 434 c = context
435 435 session = StreamSession(key=key)
436 436 # print (session.key)
437 437 # print (control_addr, shell_addrs, iopub_addr, hb_addrs)
438 438
439 439 # create Control Stream
440 440 control_stream = zmqstream.ZMQStream(c.socket(zmq.PAIR), loop)
441 441 control_stream.setsockopt(zmq.IDENTITY, identity)
442 442 control_stream.connect(control_addr)
443 443
444 444 # create Shell Streams (MUX, Task, etc.):
445 445 shell_streams = []
446 446 for addr in shell_addrs:
447 447 stream = zmqstream.ZMQStream(c.socket(zmq.PAIR), loop)
448 448 stream.setsockopt(zmq.IDENTITY, identity)
449 449 stream.connect(addr)
450 450 shell_streams.append(stream)
451 451
452 452 # create iopub stream:
453 453 iopub_stream = zmqstream.ZMQStream(c.socket(zmq.PUB), loop)
454 454 iopub_stream.setsockopt(zmq.IDENTITY, identity)
455 455 iopub_stream.connect(iopub_addr)
456 456
457 457 # Redirect input streams and set a display hook.
458 458 if out_stream_factory:
459 459 sys.stdout = out_stream_factory(session, iopub_stream, u'stdout')
460 460 sys.stdout.topic = 'engine.%i.stdout'%int_id
461 461 sys.stderr = out_stream_factory(session, iopub_stream, u'stderr')
462 462 sys.stderr.topic = 'engine.%i.stderr'%int_id
463 463 if display_hook_factory:
464 464 sys.displayhook = display_hook_factory(session, iopub_stream)
465 465 sys.displayhook.topic = 'engine.%i.pyout'%int_id
466 466
467 467
468 468 # launch heartbeat
469 469 heart = heartmonitor.Heart(*map(str, hb_addrs), heart_id=identity)
470 470 heart.start()
471 471
472 472 # create (optional) Client
473 473 if client_addr:
474 474 client = Client(client_addr, username=identity)
475 475 else:
476 476 client = None
477 477
478 478 kernel = Kernel(id=int_id, session=session, control_stream=control_stream,
479 479 shell_streams=shell_streams, iopub_stream=iopub_stream,
480 480 client=client, loop=loop)
481 481 kernel.start()
482 482 return loop, c, kernel
483 483
@@ -1,295 +1,295 b''
1 1 """Basic ssh tunneling utilities."""
2 2
3 3 #-----------------------------------------------------------------------------
4 4 # Copyright (C) 2008-2010 The IPython Development Team
5 5 #
6 6 # Distributed under the terms of the BSD License. The full license is in
7 7 # the file COPYING, distributed as part of this software.
8 8 #-----------------------------------------------------------------------------
9 9
10 10
11 11
12 12 #-----------------------------------------------------------------------------
13 13 # Imports
14 14 #-----------------------------------------------------------------------------
15 15
16 16 from __future__ import print_function
17 17
18 18 import os,sys, atexit
19 19 from multiprocessing import Process
20 20 from getpass import getpass, getuser
21 21 import warnings
22 22
23 23 try:
24 24 with warnings.catch_warnings():
25 25 warnings.simplefilter('ignore', DeprecationWarning)
26 26 import paramiko
27 27 except ImportError:
28 28 paramiko = None
29 29 else:
30 30 from forward import forward_tunnel
31 31
32 32 try:
33 33 from IPython.external import pexpect
34 34 except ImportError:
35 35 pexpect = None
36 36
37 37 from IPython.zmq.parallel.entry_point import select_random_ports
38 38
39 39 #-----------------------------------------------------------------------------
40 40 # Code
41 41 #-----------------------------------------------------------------------------
42 42
43 43 #-----------------------------------------------------------------------------
44 44 # Check for passwordless login
45 45 #-----------------------------------------------------------------------------
46 46
47 47 def try_passwordless_ssh(server, keyfile, paramiko=None):
48 48 """Attempt to make an ssh connection without a password.
49 49 This is mainly used for requiring password input only once
50 50 when many tunnels may be connected to the same server.
51 51
52 52 If paramiko is None, the default for the platform is chosen.
53 53 """
54 54 if paramiko is None:
55 55 paramiko = sys.platform == 'win32'
56 56 if not paramiko:
57 57 f = _try_passwordless_openssh
58 58 else:
59 59 f = _try_passwordless_paramiko
60 60 return f(server, keyfile)
61 61
62 62 def _try_passwordless_openssh(server, keyfile):
63 63 """Try passwordless login with shell ssh command."""
64 64 if pexpect is None:
65 65 raise ImportError("pexpect unavailable, use paramiko")
66 66 cmd = 'ssh -f '+ server
67 67 if keyfile:
68 68 cmd += ' -i ' + keyfile
69 69 cmd += ' exit'
70 70 p = pexpect.spawn(cmd)
71 71 while True:
72 72 try:
73 73 p.expect('[Ppassword]:', timeout=.1)
74 74 except pexpect.TIMEOUT:
75 75 continue
76 76 except pexpect.EOF:
77 77 return True
78 78 else:
79 79 return False
80 80
81 81 def _try_passwordless_paramiko(server, keyfile):
82 82 """Try passwordless login with paramiko."""
83 83 if paramiko is None:
84 84 raise ImportError("paramiko unavailable, use openssh")
85 85 username, server, port = _split_server(server)
86 86 client = paramiko.SSHClient()
87 87 client.load_system_host_keys()
88 88 client.set_missing_host_key_policy(paramiko.WarningPolicy())
89 89 try:
90 90 client.connect(server, port, username=username, key_filename=keyfile,
91 91 look_for_keys=True)
92 92 except paramiko.AuthenticationException:
93 93 return False
94 94 else:
95 95 client.close()
96 96 return True
97 97
98 98
99 99 def tunnel_connection(socket, addr, server, keyfile=None, password=None, paramiko=None):
100 100 """Connect a socket to an address via an ssh tunnel.
101 101
102 102 This is a wrapper for socket.connect(addr), when addr is not accessible
103 103 from the local machine. It simply creates an ssh tunnel using the remaining args,
104 104 and calls socket.connect('tcp://localhost:lport') where lport is the randomly
105 105 selected local port of the tunnel.
106 106
107 107 """
108 108 lport = select_random_ports(1)[0]
109 109 transport, addr = addr.split('://')
110 110 ip,rport = addr.split(':')
111 111 rport = int(rport)
112 112 if paramiko is None:
113 113 paramiko = sys.platform == 'win32'
114 114 if paramiko:
115 115 tunnelf = paramiko_tunnel
116 116 else:
117 117 tunnelf = openssh_tunnel
118 118 tunnel = tunnelf(lport, rport, server, remoteip=ip, keyfile=keyfile, password=password)
119 119 socket.connect('tcp://127.0.0.1:%i'%lport)
120 120 return tunnel
121 121
122 122 def openssh_tunnel(lport, rport, server, remoteip='127.0.0.1', keyfile=None, password=None, timeout=15):
123 123 """Create an ssh tunnel using command-line ssh that connects port lport
124 124 on this machine to localhost:rport on server. The tunnel
125 125 will automatically close when not in use, remaining open
126 126 for a minimum of timeout seconds for an initial connection.
127 127
128 128 This creates a tunnel redirecting `localhost:lport` to `remoteip:rport`,
129 129 as seen from `server`.
130 130
131 131 keyfile and password may be specified, but ssh config is checked for defaults.
132 132
133 133 Parameters
134 134 ----------
135 135
136 136 lport : int
137 137 local port for connecting to the tunnel from this machine.
138 138 rport : int
139 139 port on the remote machine to connect to.
140 140 server : str
141 141 The ssh server to connect to. The full ssh server string will be parsed.
142 142 user@server:port
143 143 remoteip : str [Default: 127.0.0.1]
144 144 The remote ip, specifying the destination of the tunnel.
145 145 Default is localhost, which means that the tunnel would redirect
146 146 localhost:lport on this machine to localhost:rport on the *server*.
147 147
148 148 keyfile : str; path to public key file
149 149 This specifies a key to be used in ssh login, default None.
150 150 Regular default ssh keys will be used without specifying this argument.
151 151 password : str;
152 152 Your ssh password to the ssh server. Note that if this is left None,
153 153 you will be prompted for it if passwordless key based login is unavailable.
154 154
155 155 """
156 156 if pexpect is None:
157 157 raise ImportError("pexpect unavailable, use paramiko_tunnel")
158 158 ssh="ssh "
159 159 if keyfile:
160 160 ssh += "-i " + keyfile
161 cmd = ssh + " -f -L 127.0.0.1:%i:127.0.0.1:%i %s sleep %i"%(lport, rport, server, timeout)
161 cmd = ssh + " -f -L 127.0.0.1:%i:%s:%i %s sleep %i"%(lport, remoteip, rport, server, timeout)
162 162 tunnel = pexpect.spawn(cmd)
163 163 failed = False
164 164 while True:
165 165 try:
166 166 tunnel.expect('[Pp]assword:', timeout=.1)
167 167 except pexpect.TIMEOUT:
168 168 continue
169 169 except pexpect.EOF:
170 170 if tunnel.exitstatus:
171 171 print (tunnel.exitstatus)
172 172 print (tunnel.before)
173 173 print (tunnel.after)
174 174 raise RuntimeError("tunnel '%s' failed to start"%(cmd))
175 175 else:
176 176 return tunnel.pid
177 177 else:
178 178 if failed:
179 179 print("Password rejected, try again")
180 180 password=None
181 181 if password is None:
182 182 password = getpass("%s's password: "%(server))
183 183 tunnel.sendline(password)
184 184 failed = True
185 185
186 186 def _split_server(server):
187 187 if '@' in server:
188 188 username,server = server.split('@', 1)
189 189 else:
190 190 username = getuser()
191 191 if ':' in server:
192 192 server, port = server.split(':')
193 193 port = int(port)
194 194 else:
195 195 port = 22
196 196 return username, server, port
197 197
198 198 def paramiko_tunnel(lport, rport, server, remoteip='127.0.0.1', keyfile=None, password=None, timeout=15):
199 199 """launch a tunner with paramiko in a subprocess. This should only be used
200 200 when shell ssh is unavailable (e.g. Windows).
201 201
202 202 This creates a tunnel redirecting `localhost:lport` to `remoteip:rport`,
203 203 as seen from `server`.
204 204
205 205 If you are familiar with ssh tunnels, this creates the tunnel:
206 206
207 207 ssh server -L localhost:lport:remoteip:rport
208 208
209 209 keyfile and password may be specified, but ssh config is checked for defaults.
210 210
211 211
212 212 Parameters
213 213 ----------
214 214
215 215 lport : int
216 216 local port for connecting to the tunnel from this machine.
217 217 rport : int
218 218 port on the remote machine to connect to.
219 219 server : str
220 220 The ssh server to connect to. The full ssh server string will be parsed.
221 221 user@server:port
222 222 remoteip : str [Default: 127.0.0.1]
223 223 The remote ip, specifying the destination of the tunnel.
224 224 Default is localhost, which means that the tunnel would redirect
225 225 localhost:lport on this machine to localhost:rport on the *server*.
226 226
227 227 keyfile : str; path to public key file
228 228 This specifies a key to be used in ssh login, default None.
229 229 Regular default ssh keys will be used without specifying this argument.
230 230 password : str;
231 231 Your ssh password to the ssh server. Note that if this is left None,
232 232 you will be prompted for it if passwordless key based login is unavailable.
233 233
234 234 """
235 235 if paramiko is None:
236 236 raise ImportError("Paramiko not available")
237 237
238 238 if password is None:
239 239 if not _check_passwordless_paramiko(server, keyfile):
240 240 password = getpass("%s's password: "%(server))
241 241
242 242 p = Process(target=_paramiko_tunnel,
243 243 args=(lport, rport, server, remoteip),
244 244 kwargs=dict(keyfile=keyfile, password=password))
245 245 p.daemon=False
246 246 p.start()
247 247 atexit.register(_shutdown_process, p)
248 248 return p
249 249
250 250 def _shutdown_process(p):
251 251 if p.isalive():
252 252 p.terminate()
253 253
254 254 def _paramiko_tunnel(lport, rport, server, remoteip, keyfile=None, password=None):
255 255 """Function for actually starting a paramiko tunnel, to be passed
256 256 to multiprocessing.Process(target=this), and not called directly.
257 257 """
258 258 username, server, port = _split_server(server)
259 259 client = paramiko.SSHClient()
260 260 client.load_system_host_keys()
261 261 client.set_missing_host_key_policy(paramiko.WarningPolicy())
262 262
263 263 try:
264 264 client.connect(server, port, username=username, key_filename=keyfile,
265 265 look_for_keys=True, password=password)
266 266 # except paramiko.AuthenticationException:
267 267 # if password is None:
268 268 # password = getpass("%s@%s's password: "%(username, server))
269 269 # client.connect(server, port, username=username, password=password)
270 270 # else:
271 271 # raise
272 272 except Exception as e:
273 273 print ('*** Failed to connect to %s:%d: %r' % (server, port, e))
274 274 sys.exit(1)
275 275
276 276 # print ('Now forwarding port %d to %s:%d ...' % (lport, server, rport))
277 277
278 278 try:
279 279 forward_tunnel(lport, remoteip, rport, client.get_transport())
280 280 except KeyboardInterrupt:
281 281 print ('SIGINT: Port forwarding stopped cleanly')
282 282 sys.exit(0)
283 283 except Exception as e:
284 284 print ("Port forwarding stopped uncleanly: %s"%e)
285 285 sys.exit(255)
286 286
287 287 if sys.platform == 'win32':
288 288 ssh_tunnel = paramiko_tunnel
289 289 else:
290 290 ssh_tunnel = openssh_tunnel
291 291
292 292
293 293 __all__ = ['tunnel_connection', 'ssh_tunnel', 'openssh_tunnel', 'paramiko_tunnel', 'try_passwordless_ssh']
294 294
295 295
General Comments 0
You need to be logged in to leave comments. Login now