##// END OF EJS Templates
Refactor newparallel to use Config system...
MinRK -
Show More
This diff has been collapsed as it changes many lines, (549 lines changed) Show them Hide them
@@ -0,0 +1,549 b''
1 #!/usr/bin/env python
2 # encoding: utf-8
3 """
4 The IPython cluster directory
5 """
6
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2008-2009 The IPython Development Team
9 #
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
13
14 #-----------------------------------------------------------------------------
15 # Imports
16 #-----------------------------------------------------------------------------
17
18 from __future__ import with_statement
19
20 import os
21 import shutil
22 import sys
23 import logging
24 import warnings
25
26 from IPython.config.loader import PyFileConfigLoader
27 from IPython.core.application import Application, BaseAppConfigLoader
28 from IPython.config.configurable import Configurable
29 from IPython.core.crashhandler import CrashHandler
30 from IPython.core import release
31 from IPython.utils.path import (
32 get_ipython_package_dir,
33 expand_path
34 )
35 from IPython.utils.traitlets import Unicode
36
37 #-----------------------------------------------------------------------------
38 # Warnings control
39 #-----------------------------------------------------------------------------
40 # Twisted generates annoying warnings with Python 2.6, as will do other code
41 # that imports 'sets' as of today
42 warnings.filterwarnings('ignore', 'the sets module is deprecated',
43 DeprecationWarning )
44
45 # This one also comes from Twisted
46 warnings.filterwarnings('ignore', 'the sha module is deprecated',
47 DeprecationWarning)
48
49 #-----------------------------------------------------------------------------
50 # Module errors
51 #-----------------------------------------------------------------------------
52
53 class ClusterDirError(Exception):
54 pass
55
56
57 class PIDFileError(Exception):
58 pass
59
60
61 #-----------------------------------------------------------------------------
62 # Class for managing cluster directories
63 #-----------------------------------------------------------------------------
64
65 class ClusterDir(Configurable):
66 """An object to manage the cluster directory and its resources.
67
68 The cluster directory is used by :command:`ipengine`,
69 :command:`ipcontroller` and :command:`ipclsuter` to manage the
70 configuration, logging and security of these applications.
71
72 This object knows how to find, create and manage these directories. This
73 should be used by any code that want's to handle cluster directories.
74 """
75
76 security_dir_name = Unicode('security')
77 log_dir_name = Unicode('log')
78 pid_dir_name = Unicode('pid')
79 security_dir = Unicode(u'')
80 log_dir = Unicode(u'')
81 pid_dir = Unicode(u'')
82 location = Unicode(u'')
83
84 def __init__(self, location=u''):
85 super(ClusterDir, self).__init__(location=location)
86
87 def _location_changed(self, name, old, new):
88 if not os.path.isdir(new):
89 os.makedirs(new)
90 self.security_dir = os.path.join(new, self.security_dir_name)
91 self.log_dir = os.path.join(new, self.log_dir_name)
92 self.pid_dir = os.path.join(new, self.pid_dir_name)
93 self.check_dirs()
94
95 def _log_dir_changed(self, name, old, new):
96 self.check_log_dir()
97
98 def check_log_dir(self):
99 if not os.path.isdir(self.log_dir):
100 os.mkdir(self.log_dir)
101
102 def _security_dir_changed(self, name, old, new):
103 self.check_security_dir()
104
105 def check_security_dir(self):
106 if not os.path.isdir(self.security_dir):
107 os.mkdir(self.security_dir, 0700)
108 os.chmod(self.security_dir, 0700)
109
110 def _pid_dir_changed(self, name, old, new):
111 self.check_pid_dir()
112
113 def check_pid_dir(self):
114 if not os.path.isdir(self.pid_dir):
115 os.mkdir(self.pid_dir, 0700)
116 os.chmod(self.pid_dir, 0700)
117
118 def check_dirs(self):
119 self.check_security_dir()
120 self.check_log_dir()
121 self.check_pid_dir()
122
123 def load_config_file(self, filename):
124 """Load a config file from the top level of the cluster dir.
125
126 Parameters
127 ----------
128 filename : unicode or str
129 The filename only of the config file that must be located in
130 the top-level of the cluster directory.
131 """
132 loader = PyFileConfigLoader(filename, self.location)
133 return loader.load_config()
134
135 def copy_config_file(self, config_file, path=None, overwrite=False):
136 """Copy a default config file into the active cluster directory.
137
138 Default configuration files are kept in :mod:`IPython.config.default`.
139 This function moves these from that location to the working cluster
140 directory.
141 """
142 if path is None:
143 import IPython.config.default
144 path = IPython.config.default.__file__.split(os.path.sep)[:-1]
145 path = os.path.sep.join(path)
146 src = os.path.join(path, config_file)
147 dst = os.path.join(self.location, config_file)
148 if not os.path.isfile(dst) or overwrite:
149 shutil.copy(src, dst)
150
151 def copy_all_config_files(self, path=None, overwrite=False):
152 """Copy all config files into the active cluster directory."""
153 for f in [u'ipcontroller_config.py', u'ipengine_config.py',
154 u'ipcluster_config.py']:
155 self.copy_config_file(f, path=path, overwrite=overwrite)
156
157 @classmethod
158 def create_cluster_dir(csl, cluster_dir):
159 """Create a new cluster directory given a full path.
160
161 Parameters
162 ----------
163 cluster_dir : str
164 The full path to the cluster directory. If it does exist, it will
165 be used. If not, it will be created.
166 """
167 return ClusterDir(location=cluster_dir)
168
169 @classmethod
170 def create_cluster_dir_by_profile(cls, path, profile=u'default'):
171 """Create a cluster dir by profile name and path.
172
173 Parameters
174 ----------
175 path : str
176 The path (directory) to put the cluster directory in.
177 profile : str
178 The name of the profile. The name of the cluster directory will
179 be "clusterz_<profile>".
180 """
181 if not os.path.isdir(path):
182 raise ClusterDirError('Directory not found: %s' % path)
183 cluster_dir = os.path.join(path, u'clusterz_' + profile)
184 return ClusterDir(location=cluster_dir)
185
186 @classmethod
187 def find_cluster_dir_by_profile(cls, ipython_dir, profile=u'default'):
188 """Find an existing cluster dir by profile name, return its ClusterDir.
189
190 This searches through a sequence of paths for a cluster dir. If it
191 is not found, a :class:`ClusterDirError` exception will be raised.
192
193 The search path algorithm is:
194 1. ``os.getcwd()``
195 2. ``ipython_dir``
196 3. The directories found in the ":" separated
197 :env:`IPCLUSTER_DIR_PATH` environment variable.
198
199 Parameters
200 ----------
201 ipython_dir : unicode or str
202 The IPython directory to use.
203 profile : unicode or str
204 The name of the profile. The name of the cluster directory
205 will be "clusterz_<profile>".
206 """
207 dirname = u'clusterz_' + profile
208 cluster_dir_paths = os.environ.get('IPCLUSTER_DIR_PATH','')
209 if cluster_dir_paths:
210 cluster_dir_paths = cluster_dir_paths.split(':')
211 else:
212 cluster_dir_paths = []
213 paths = [os.getcwd(), ipython_dir] + cluster_dir_paths
214 for p in paths:
215 cluster_dir = os.path.join(p, dirname)
216 if os.path.isdir(cluster_dir):
217 return ClusterDir(location=cluster_dir)
218 else:
219 raise ClusterDirError('Cluster directory not found in paths: %s' % dirname)
220
221 @classmethod
222 def find_cluster_dir(cls, cluster_dir):
223 """Find/create a cluster dir and return its ClusterDir.
224
225 This will create the cluster directory if it doesn't exist.
226
227 Parameters
228 ----------
229 cluster_dir : unicode or str
230 The path of the cluster directory. This is expanded using
231 :func:`IPython.utils.genutils.expand_path`.
232 """
233 cluster_dir = expand_path(cluster_dir)
234 if not os.path.isdir(cluster_dir):
235 raise ClusterDirError('Cluster directory not found: %s' % cluster_dir)
236 return ClusterDir(location=cluster_dir)
237
238
239 #-----------------------------------------------------------------------------
240 # Command line options
241 #-----------------------------------------------------------------------------
242
243 class ClusterDirConfigLoader(BaseAppConfigLoader):
244
245 def _add_cluster_profile(self, parser):
246 paa = parser.add_argument
247 paa('-p', '--profile',
248 dest='Global.profile',type=unicode,
249 help=
250 """The string name of the profile to be used. This determines the name
251 of the cluster dir as: cluster_<profile>. The default profile is named
252 'default'. The cluster directory is resolve this way if the
253 --cluster-dir option is not used.""",
254 metavar='Global.profile')
255
256 def _add_cluster_dir(self, parser):
257 paa = parser.add_argument
258 paa('--cluster-dir',
259 dest='Global.cluster_dir',type=unicode,
260 help="""Set the cluster dir. This overrides the logic used by the
261 --profile option.""",
262 metavar='Global.cluster_dir')
263
264 def _add_work_dir(self, parser):
265 paa = parser.add_argument
266 paa('--work-dir',
267 dest='Global.work_dir',type=unicode,
268 help='Set the working dir for the process.',
269 metavar='Global.work_dir')
270
271 def _add_clean_logs(self, parser):
272 paa = parser.add_argument
273 paa('--clean-logs',
274 dest='Global.clean_logs', action='store_true',
275 help='Delete old log flies before starting.')
276
277 def _add_no_clean_logs(self, parser):
278 paa = parser.add_argument
279 paa('--no-clean-logs',
280 dest='Global.clean_logs', action='store_false',
281 help="Don't Delete old log flies before starting.")
282
283 def _add_arguments(self):
284 super(ClusterDirConfigLoader, self)._add_arguments()
285 self._add_cluster_profile(self.parser)
286 self._add_cluster_dir(self.parser)
287 self._add_work_dir(self.parser)
288 self._add_clean_logs(self.parser)
289 self._add_no_clean_logs(self.parser)
290
291
292 #-----------------------------------------------------------------------------
293 # Crash handler for this application
294 #-----------------------------------------------------------------------------
295
296
297 _message_template = """\
298 Oops, $self.app_name crashed. We do our best to make it stable, but...
299
300 A crash report was automatically generated with the following information:
301 - A verbatim copy of the crash traceback.
302 - Data on your current $self.app_name configuration.
303
304 It was left in the file named:
305 \t'$self.crash_report_fname'
306 If you can email this file to the developers, the information in it will help
307 them in understanding and correcting the problem.
308
309 You can mail it to: $self.contact_name at $self.contact_email
310 with the subject '$self.app_name Crash Report'.
311
312 If you want to do it now, the following command will work (under Unix):
313 mail -s '$self.app_name Crash Report' $self.contact_email < $self.crash_report_fname
314
315 To ensure accurate tracking of this issue, please file a report about it at:
316 $self.bug_tracker
317 """
318
319 class ClusterDirCrashHandler(CrashHandler):
320 """sys.excepthook for IPython itself, leaves a detailed report on disk."""
321
322 message_template = _message_template
323
324 def __init__(self, app):
325 contact_name = release.authors['Brian'][0]
326 contact_email = release.authors['Brian'][1]
327 bug_tracker = 'http://github.com/ipython/ipython/issues'
328 super(ClusterDirCrashHandler,self).__init__(
329 app, contact_name, contact_email, bug_tracker
330 )
331
332
333 #-----------------------------------------------------------------------------
334 # Main application
335 #-----------------------------------------------------------------------------
336
337 class ApplicationWithClusterDir(Application):
338 """An application that puts everything into a cluster directory.
339
340 Instead of looking for things in the ipython_dir, this type of application
341 will use its own private directory called the "cluster directory"
342 for things like config files, log files, etc.
343
344 The cluster directory is resolved as follows:
345
346 * If the ``--cluster-dir`` option is given, it is used.
347 * If ``--cluster-dir`` is not given, the application directory is
348 resolve using the profile name as ``cluster_<profile>``. The search
349 path for this directory is then i) cwd if it is found there
350 and ii) in ipython_dir otherwise.
351
352 The config file for the application is to be put in the cluster
353 dir and named the value of the ``config_file_name`` class attribute.
354 """
355
356 command_line_loader = ClusterDirConfigLoader
357 crash_handler_class = ClusterDirCrashHandler
358 auto_create_cluster_dir = True
359 # temporarily override default_log_level to DEBUG
360 default_log_level = logging.DEBUG
361
362 def create_default_config(self):
363 super(ApplicationWithClusterDir, self).create_default_config()
364 self.default_config.Global.profile = u'default'
365 self.default_config.Global.cluster_dir = u''
366 self.default_config.Global.work_dir = os.getcwd()
367 self.default_config.Global.log_to_file = False
368 self.default_config.Global.log_url = None
369 self.default_config.Global.clean_logs = False
370
371 def find_resources(self):
372 """This resolves the cluster directory.
373
374 This tries to find the cluster directory and if successful, it will
375 have done:
376 * Sets ``self.cluster_dir_obj`` to the :class:`ClusterDir` object for
377 the application.
378 * Sets ``self.cluster_dir`` attribute of the application and config
379 objects.
380
381 The algorithm used for this is as follows:
382 1. Try ``Global.cluster_dir``.
383 2. Try using ``Global.profile``.
384 3. If both of these fail and ``self.auto_create_cluster_dir`` is
385 ``True``, then create the new cluster dir in the IPython directory.
386 4. If all fails, then raise :class:`ClusterDirError`.
387 """
388
389 try:
390 cluster_dir = self.command_line_config.Global.cluster_dir
391 except AttributeError:
392 cluster_dir = self.default_config.Global.cluster_dir
393 cluster_dir = expand_path(cluster_dir)
394 try:
395 self.cluster_dir_obj = ClusterDir.find_cluster_dir(cluster_dir)
396 except ClusterDirError:
397 pass
398 else:
399 self.log.info('Using existing cluster dir: %s' % \
400 self.cluster_dir_obj.location
401 )
402 self.finish_cluster_dir()
403 return
404
405 try:
406 self.profile = self.command_line_config.Global.profile
407 except AttributeError:
408 self.profile = self.default_config.Global.profile
409 try:
410 self.cluster_dir_obj = ClusterDir.find_cluster_dir_by_profile(
411 self.ipython_dir, self.profile)
412 except ClusterDirError:
413 pass
414 else:
415 self.log.info('Using existing cluster dir: %s' % \
416 self.cluster_dir_obj.location
417 )
418 self.finish_cluster_dir()
419 return
420
421 if self.auto_create_cluster_dir:
422 self.cluster_dir_obj = ClusterDir.create_cluster_dir_by_profile(
423 self.ipython_dir, self.profile
424 )
425 self.log.info('Creating new cluster dir: %s' % \
426 self.cluster_dir_obj.location
427 )
428 self.finish_cluster_dir()
429 else:
430 raise ClusterDirError('Could not find a valid cluster directory.')
431
432 def finish_cluster_dir(self):
433 # Set the cluster directory
434 self.cluster_dir = self.cluster_dir_obj.location
435
436 # These have to be set because they could be different from the one
437 # that we just computed. Because command line has the highest
438 # priority, this will always end up in the master_config.
439 self.default_config.Global.cluster_dir = self.cluster_dir
440 self.command_line_config.Global.cluster_dir = self.cluster_dir
441
442 def find_config_file_name(self):
443 """Find the config file name for this application."""
444 # For this type of Application it should be set as a class attribute.
445 if not hasattr(self, 'default_config_file_name'):
446 self.log.critical("No config filename found")
447 else:
448 self.config_file_name = self.default_config_file_name
449
450 def find_config_file_paths(self):
451 # Set the search path to to the cluster directory. We should NOT
452 # include IPython.config.default here as the default config files
453 # are ALWAYS automatically moved to the cluster directory.
454 conf_dir = os.path.join(get_ipython_package_dir(), 'config', 'default')
455 self.config_file_paths = (self.cluster_dir,)
456
457 def pre_construct(self):
458 # The log and security dirs were set earlier, but here we put them
459 # into the config and log them.
460 config = self.master_config
461 sdir = self.cluster_dir_obj.security_dir
462 self.security_dir = config.Global.security_dir = sdir
463 ldir = self.cluster_dir_obj.log_dir
464 self.log_dir = config.Global.log_dir = ldir
465 pdir = self.cluster_dir_obj.pid_dir
466 self.pid_dir = config.Global.pid_dir = pdir
467 self.log.info("Cluster directory set to: %s" % self.cluster_dir)
468 config.Global.work_dir = unicode(expand_path(config.Global.work_dir))
469 # Change to the working directory. We do this just before construct
470 # is called so all the components there have the right working dir.
471 self.to_work_dir()
472
473 def to_work_dir(self):
474 wd = self.master_config.Global.work_dir
475 if unicode(wd) != unicode(os.getcwd()):
476 os.chdir(wd)
477 self.log.info("Changing to working dir: %s" % wd)
478
479 def start_logging(self):
480 # Remove old log files
481 if self.master_config.Global.clean_logs:
482 log_dir = self.master_config.Global.log_dir
483 for f in os.listdir(log_dir):
484 if f.startswith(self.name + u'-') and f.endswith('.log'):
485 os.remove(os.path.join(log_dir, f))
486 # Start logging to the new log file
487 if self.master_config.Global.log_to_file:
488 log_filename = self.name + u'-' + str(os.getpid()) + u'.log'
489 logfile = os.path.join(self.log_dir, log_filename)
490 open_log_file = open(logfile, 'w')
491 elif self.master_config.Global.log_url:
492 open_log_file = None
493 else:
494 open_log_file = sys.stdout
495 logger = logging.getLogger()
496 level = self.log_level
497 self.log = logger
498 # since we've reconnected the logger, we need to reconnect the log-level
499 self.log_level = level
500 if open_log_file is not None and self._log_handler not in self.log.handlers:
501 self.log.addHandler(self._log_handler)
502 # log.startLogging(open_log_file)
503
504 def write_pid_file(self, overwrite=False):
505 """Create a .pid file in the pid_dir with my pid.
506
507 This must be called after pre_construct, which sets `self.pid_dir`.
508 This raises :exc:`PIDFileError` if the pid file exists already.
509 """
510 pid_file = os.path.join(self.pid_dir, self.name + u'.pid')
511 if os.path.isfile(pid_file):
512 pid = self.get_pid_from_file()
513 if not overwrite:
514 raise PIDFileError(
515 'The pid file [%s] already exists. \nThis could mean that this '
516 'server is already running with [pid=%s].' % (pid_file, pid)
517 )
518 with open(pid_file, 'w') as f:
519 self.log.info("Creating pid file: %s" % pid_file)
520 f.write(repr(os.getpid())+'\n')
521
522 def remove_pid_file(self):
523 """Remove the pid file.
524
525 This should be called at shutdown by registering a callback with
526 :func:`reactor.addSystemEventTrigger`. This needs to return
527 ``None``.
528 """
529 pid_file = os.path.join(self.pid_dir, self.name + u'.pid')
530 if os.path.isfile(pid_file):
531 try:
532 self.log.info("Removing pid file: %s" % pid_file)
533 os.remove(pid_file)
534 except:
535 self.log.warn("Error removing the pid file: %s" % pid_file)
536
537 def get_pid_from_file(self):
538 """Get the pid from the pid file.
539
540 If the pid file doesn't exist a :exc:`PIDFileError` is raised.
541 """
542 pid_file = os.path.join(self.pid_dir, self.name + u'.pid')
543 if os.path.isfile(pid_file):
544 with open(pid_file, 'r') as f:
545 pid = int(f.read().strip())
546 return pid
547 else:
548 raise PIDFileError('pid file not found: %s' % pid_file)
549
@@ -0,0 +1,149 b''
1 """Base config factories."""
2
3 #-----------------------------------------------------------------------------
4 # Copyright (C) 2008-2009 The IPython Development Team
5 #
6 # Distributed under the terms of the BSD License. The full license is in
7 # the file COPYING, distributed as part of this software.
8 #-----------------------------------------------------------------------------
9
10 #-----------------------------------------------------------------------------
11 # Imports
12 #-----------------------------------------------------------------------------
13
14
15 import os
16 import logging
17
18 import uuid
19
20 from zmq.eventloop.ioloop import IOLoop
21
22 from IPython.config.configurable import Configurable
23 from IPython.utils.traitlets import Str,Int,Instance, CUnicode, CStr
24 from IPython.utils.importstring import import_item
25
26 from IPython.zmq.parallel.entry_point import select_random_ports
27 import IPython.zmq.parallel.streamsession as ss
28
29 #-----------------------------------------------------------------------------
30 # Classes
31 #-----------------------------------------------------------------------------
32
33
34 class SessionFactory(Configurable):
35 """The Base factory from which every factory in IPython.zmq.parallel inherits"""
36
37 packer = Str('',config=True)
38 unpacker = Str('',config=True)
39 ident = CStr('',config=True)
40 def _ident_default(self):
41 return str(uuid.uuid4())
42 username = Str(os.environ.get('USER','username'),config=True)
43 exec_key = CUnicode('',config=True)
44
45 # not configurable:
46 context = Instance('zmq.Context', (), {})
47 session = Instance('IPython.zmq.parallel.streamsession.StreamSession')
48 loop = Instance('zmq.eventloop.ioloop.IOLoop')
49 def _loop_default(self):
50 return IOLoop.instance()
51
52 def __init__(self, **kwargs):
53 super(SessionFactory, self).__init__(**kwargs)
54
55 keyfile = self.exec_key or None
56
57 # set the packers:
58 if not self.packer:
59 packer_f = unpacker_f = None
60 elif self.packer.lower() == 'json':
61 packer_f = ss.json_packer
62 unpacker_f = ss.json_unpacker
63 elif self.packer.lower() == 'pickle':
64 packer_f = ss.pickle_packer
65 unpacker_f = ss.pickle_unpacker
66 else:
67 packer_f = import_item(self.packer)
68 unpacker_f = import_item(self.unpacker)
69
70 # construct the session
71 self.session = ss.StreamSession(self.username, self.ident, packer=packer_f, unpacker=unpacker_f, keyfile=keyfile)
72
73
74 class RegistrationFactory(SessionFactory):
75 """The Base Configurable for objects that involve registration."""
76
77 url = Str('', config=True) # url takes precedence over ip,regport,transport
78 transport = Str('tcp', config=True)
79 ip = Str('127.0.0.1', config=True)
80 regport = Instance(int, config=True)
81 def _regport_default(self):
82 return 10101
83 # return select_random_ports(1)[0]
84
85 def __init__(self, **kwargs):
86 super(RegistrationFactory, self).__init__(**kwargs)
87 self._propagate_url()
88 self._rebuild_url()
89 self.on_trait_change(self._propagate_url, 'url')
90 self.on_trait_change(self._rebuild_url, 'ip')
91 self.on_trait_change(self._rebuild_url, 'transport')
92 self.on_trait_change(self._rebuild_url, 'regport')
93
94 def _rebuild_url(self):
95 self.url = "%s://%s:%i"%(self.transport, self.ip, self.regport)
96
97 def _propagate_url(self):
98 """Ensure self.url contains full transport://interface:port"""
99 if self.url:
100 iface = self.url.split('://',1)
101 if len(iface) == 2:
102 self.transport,iface = iface
103 iface = iface.split(':')
104 self.ip = iface[0]
105 if iface[1]:
106 self.regport = int(iface[1])
107
108 #-----------------------------------------------------------------------------
109 # argparse argument extenders
110 #-----------------------------------------------------------------------------
111
112
113 def add_session_arguments(parser):
114 paa = parser.add_argument
115 paa('--ident',
116 type=str, dest='SessionFactory.ident',
117 help='set the ZMQ and session identity [default: random uuid]',
118 metavar='identity')
119 # paa('--execkey',
120 # type=str, dest='SessionFactory.exec_key',
121 # help='path to a file containing an execution key.',
122 # metavar='execkey')
123 paa('--packer',
124 type=str, dest='SessionFactory.packer',
125 help='method to serialize messages: {json,pickle} [default: json]',
126 metavar='packer')
127 paa('--unpacker',
128 type=str, dest='SessionFactory.unpacker',
129 help='inverse function of `packer`. Only necessary when using something other than json|pickle',
130 metavar='packer')
131
132 def add_registration_arguments(parser):
133 paa = parser.add_argument
134 paa('--ip',
135 type=str, dest='RegistrationFactory.ip',
136 help="The IP used for registration [default: localhost]",
137 metavar='ip')
138 paa('--transport',
139 type=str, dest='RegistrationFactory.transport',
140 help="The ZeroMQ transport used for registration [default: tcp]",
141 metavar='transport')
142 paa('--url',
143 type=str, dest='RegistrationFactory.url',
144 help='set transport,ip,regport in one go, e.g. tcp://127.0.0.1:10101',
145 metavar='url')
146 paa('--regport',
147 type=int, dest='RegistrationFactory.regport',
148 help="The port used for registration [default: 10101]",
149 metavar='ip')
@@ -0,0 +1,322 b''
1 #!/usr/bin/env python
2 # encoding: utf-8
3 """
4 The IPython controller application.
5 """
6
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2008-2009 The IPython Development Team
9 #
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
13
14 #-----------------------------------------------------------------------------
15 # Imports
16 #-----------------------------------------------------------------------------
17
18 from __future__ import with_statement
19
20 import copy
21 import sys
22 import os
23 import logging
24 # from twisted.application import service
25 # from twisted.internet import reactor
26 # from twisted.python import log
27
28 import zmq
29 from zmq.log.handlers import PUBHandler
30
31 from IPython.config.loader import Config
32 from IPython.zmq.parallel import factory
33 from IPython.zmq.parallel.controller import ControllerFactory
34 from IPython.zmq.parallel.clusterdir import (
35 ApplicationWithClusterDir,
36 ClusterDirConfigLoader
37 )
38 # from IPython.kernel.fcutil import FCServiceFactory, FURLError
39 from IPython.utils.traitlets import Instance, Unicode
40
41 from entry_point import generate_exec_key
42
43
44 #-----------------------------------------------------------------------------
45 # Module level variables
46 #-----------------------------------------------------------------------------
47
48
49 #: The default config file name for this application
50 default_config_file_name = u'ipcontroller_config.py'
51
52
53 _description = """Start the IPython controller for parallel computing.
54
55 The IPython controller provides a gateway between the IPython engines and
56 clients. The controller needs to be started before the engines and can be
57 configured using command line options or using a cluster directory. Cluster
58 directories contain config, log and security files and are usually located in
59 your .ipython directory and named as "cluster_<profile>". See the --profile
60 and --cluster-dir options for details.
61 """
62
63 #-----------------------------------------------------------------------------
64 # Default interfaces
65 #-----------------------------------------------------------------------------
66
67 # The default client interfaces for FCClientServiceFactory.interfaces
68 default_client_interfaces = Config()
69 default_client_interfaces.Default.url_file = 'ipcontroller-client.url'
70
71 # Make this a dict we can pass to Config.__init__ for the default
72 default_client_interfaces = dict(copy.deepcopy(default_client_interfaces.items()))
73
74
75
76 # The default engine interfaces for FCEngineServiceFactory.interfaces
77 default_engine_interfaces = Config()
78 default_engine_interfaces.Default.url_file = u'ipcontroller-engine.url'
79
80 # Make this a dict we can pass to Config.__init__ for the default
81 default_engine_interfaces = dict(copy.deepcopy(default_engine_interfaces.items()))
82
83
84 #-----------------------------------------------------------------------------
85 # Service factories
86 #-----------------------------------------------------------------------------
87
88 #
89 # class FCClientServiceFactory(FCServiceFactory):
90 # """A Foolscap implementation of the client services."""
91 #
92 # cert_file = Unicode(u'ipcontroller-client.pem', config=True)
93 # interfaces = Instance(klass=Config, kw=default_client_interfaces,
94 # allow_none=False, config=True)
95 #
96 #
97 # class FCEngineServiceFactory(FCServiceFactory):
98 # """A Foolscap implementation of the engine services."""
99 #
100 # cert_file = Unicode(u'ipcontroller-engine.pem', config=True)
101 # interfaces = Instance(klass=dict, kw=default_engine_interfaces,
102 # allow_none=False, config=True)
103 #
104
105 #-----------------------------------------------------------------------------
106 # Command line options
107 #-----------------------------------------------------------------------------
108
109
110 class IPControllerAppConfigLoader(ClusterDirConfigLoader):
111
112 def _add_arguments(self):
113 super(IPControllerAppConfigLoader, self)._add_arguments()
114 paa = self.parser.add_argument
115
116 ## Hub Config:
117 paa('--mongodb',
118 dest='HubFactory.db_class', action='store_const',
119 const='IPython.zmq.parallel.mongodb.MongoDB',
120 help='Use MongoDB task storage [default: in-memory]')
121 paa('--hb',
122 type=int, dest='HubFactory.hb', nargs=2,
123 help='The (2) ports the Hub\'s Heartmonitor will use for the heartbeat '
124 'connections [default: random]',
125 metavar='Hub.hb_ports')
126
127 # Client config
128 paa('--client-ip',
129 type=str, dest='HubFactory.client_ip',
130 help='The IP address or hostname the Hub will listen on for '
131 'client connections. Both engine-ip and client-ip can be set simultaneously '
132 'via --ip [default: loopback]',
133 metavar='Hub.client_ip')
134 paa('--client-transport',
135 type=str, dest='HubFactory.client_transport',
136 help='The ZeroMQ transport the Hub will use for '
137 'client connections. Both engine-transport and client-transport can be set simultaneously '
138 'via --transport [default: tcp]',
139 metavar='Hub.client_transport')
140 paa('--query',
141 type=int, dest='HubFactory.query_port',
142 help='The port on which the Hub XREP socket will listen for result queries from clients [default: random]',
143 metavar='Hub.query_port')
144 paa('--notifier',
145 type=int, dest='HubFactory.notifier_port',
146 help='The port on which the Hub PUB socket will listen for notification connections [default: random]',
147 metavar='Hub.notifier_port')
148
149 # Engine config
150 paa('--engine-ip',
151 type=str, dest='HubFactory.engine_ip',
152 help='The IP address or hostname the Hub will listen on for '
153 'engine connections. This applies to the Hub and its schedulers'
154 'engine-ip and client-ip can be set simultaneously '
155 'via --ip [default: loopback]',
156 metavar='Hub.engine_ip')
157 paa('--engine-transport',
158 type=str, dest='HubFactory.engine_transport',
159 help='The ZeroMQ transport the Hub will use for '
160 'client connections. Both engine-transport and client-transport can be set simultaneously '
161 'via --transport [default: tcp]',
162 metavar='Hub.engine_transport')
163
164 # Scheduler config
165 paa('--mux',
166 type=int, dest='ControllerFactory.mux', nargs=2,
167 help='The (2) ports the MUX scheduler will listen on for client,engine '
168 'connections, respectively [default: random]',
169 metavar='Scheduler.mux_ports')
170 paa('--task',
171 type=int, dest='ControllerFactory.task', nargs=2,
172 help='The (2) ports the Task scheduler will listen on for client,engine '
173 'connections, respectively [default: random]',
174 metavar='Scheduler.task_ports')
175 paa('--control',
176 type=int, dest='ControllerFactory.control', nargs=2,
177 help='The (2) ports the Control scheduler will listen on for client,engine '
178 'connections, respectively [default: random]',
179 metavar='Scheduler.control_ports')
180 paa('--iopub',
181 type=int, dest='ControllerFactory.iopub', nargs=2,
182 help='The (2) ports the IOPub scheduler will listen on for client,engine '
183 'connections, respectively [default: random]',
184 metavar='Scheduler.iopub_ports')
185 paa('--scheme',
186 type=str, dest='ControllerFactory.scheme',
187 choices = ['pure', 'lru', 'plainrandom', 'weighted', 'twobin','leastload'],
188 help='select the task scheduler scheme [default: Python LRU]',
189 metavar='Scheduler.scheme')
190 paa('--usethreads',
191 dest='ControllerFactory.usethreads', action="store_true",
192 help='Use threads instead of processes for the schedulers',
193 )
194
195 ## Global config
196 paa('--log-to-file',
197 action='store_true', dest='Global.log_to_file',
198 help='Log to a file in the log directory (default is stdout)')
199 paa('--log-url',
200 type=str, dest='Global.log_url',
201 help='Broadcast logs to an iploggerz process [default: disabled]')
202 paa('-r','--reuse-key',
203 action='store_true', dest='Global.reuse_key',
204 help='Try to reuse existing execution keys.')
205 paa('--no-secure',
206 action='store_false', dest='Global.secure',
207 help='Turn off execution keys.')
208 paa('--secure',
209 action='store_true', dest='Global.secure',
210 help='Turn on execution keys (default).')
211 paa('--execkey',
212 type=str, dest='Global.exec_key',
213 help='path to a file containing an execution key.',
214 metavar='keyfile')
215 factory.add_session_arguments(self.parser)
216 factory.add_registration_arguments(self.parser)
217
218
219 #-----------------------------------------------------------------------------
220 # The main application
221 #-----------------------------------------------------------------------------
222
223
224 class IPControllerApp(ApplicationWithClusterDir):
225
226 name = u'ipcontrollerz'
227 description = _description
228 command_line_loader = IPControllerAppConfigLoader
229 default_config_file_name = default_config_file_name
230 auto_create_cluster_dir = True
231
232 def create_default_config(self):
233 super(IPControllerApp, self).create_default_config()
234 # Don't set defaults for Global.secure or Global.reuse_furls
235 # as those are set in a component.
236 self.default_config.Global.import_statements = []
237 self.default_config.Global.clean_logs = True
238 self.default_config.Global.secure = False
239 self.default_config.Global.reuse_key = False
240 self.default_config.Global.exec_key = "exec_key.key"
241
242 def pre_construct(self):
243 super(IPControllerApp, self).pre_construct()
244 c = self.master_config
245 # The defaults for these are set in FCClientServiceFactory and
246 # FCEngineServiceFactory, so we only set them here if the global
247 # options have be set to override the class level defaults.
248
249 # if hasattr(c.Global, 'reuse_furls'):
250 # c.FCClientServiceFactory.reuse_furls = c.Global.reuse_furls
251 # c.FCEngineServiceFactory.reuse_furls = c.Global.reuse_furls
252 # del c.Global.reuse_furls
253 # if hasattr(c.Global, 'secure'):
254 # c.FCClientServiceFactory.secure = c.Global.secure
255 # c.FCEngineServiceFactory.secure = c.Global.secure
256 # del c.Global.secure
257
258 def construct(self):
259 # This is the working dir by now.
260 sys.path.insert(0, '')
261 c = self.master_config
262
263 self.import_statements()
264
265 if c.Global.secure:
266 keyfile = os.path.join(c.Global.security_dir, c.Global.exec_key)
267 if not c.Global.reuse_key or not os.path.exists(keyfile):
268 generate_exec_key(keyfile)
269 c.SessionFactory.exec_key = keyfile
270 else:
271 keyfile = os.path.join(c.Global.security_dir, c.Global.exec_key)
272 if os.path.exists(keyfile):
273 os.remove(keyfile)
274 c.SessionFactory.exec_key = ''
275
276 try:
277 self.factory = ControllerFactory(config=c)
278 self.start_logging()
279 self.factory.construct()
280 except:
281 self.log.error("Couldn't construct the Controller", exc_info=True)
282 self.exit(1)
283
284
285 def import_statements(self):
286 statements = self.master_config.Global.import_statements
287 for s in statements:
288 try:
289 self.log.msg("Executing statement: '%s'" % s)
290 exec s in globals(), locals()
291 except:
292 self.log.msg("Error running statement: %s" % s)
293
294 # def start_logging(self):
295 # super(IPControllerApp, self).start_logging()
296 # if self.master_config.Global.log_url:
297 # context = self.factory.context
298 # lsock = context.socket(zmq.PUB)
299 # lsock.connect(self.master_config.Global.log_url)
300 # handler = PUBHandler(lsock)
301 # handler.root_topic = 'controller'
302 # handler.setLevel(self.log_level)
303 # self.log.addHandler(handler)
304 #
305 def start_app(self):
306 # Start the controller service.
307 self.factory.start()
308 self.write_pid_file(overwrite=True)
309 try:
310 self.factory.loop.start()
311 except KeyboardInterrupt:
312 self.log.critical("Interrupted, Exiting...\n")
313
314
315 def launch_new_instance():
316 """Create and run the IPython controller"""
317 app = IPControllerApp()
318 app.start()
319
320
321 if __name__ == '__main__':
322 launch_new_instance()
@@ -0,0 +1,261 b''
1 #!/usr/bin/env python
2 # encoding: utf-8
3 """
4 The IPython engine application
5 """
6
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2008-2009 The IPython Development Team
9 #
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
13
14 #-----------------------------------------------------------------------------
15 # Imports
16 #-----------------------------------------------------------------------------
17
18 import os
19 import sys
20
21 import zmq
22 from zmq.eventloop import ioloop
23
24 from IPython.zmq.parallel.clusterdir import (
25 ApplicationWithClusterDir,
26 ClusterDirConfigLoader
27 )
28 from IPython.zmq.log import EnginePUBHandler
29
30 from IPython.zmq.parallel import factory
31 from IPython.zmq.parallel.engine import EngineFactory
32 from IPython.zmq.parallel.streamkernel import Kernel
33 from IPython.utils.importstring import import_item
34
35 #-----------------------------------------------------------------------------
36 # Module level variables
37 #-----------------------------------------------------------------------------
38
39 #: The default config file name for this application
40 default_config_file_name = u'ipengine_config.py'
41
42
43 mpi4py_init = """from mpi4py import MPI as mpi
44 mpi.size = mpi.COMM_WORLD.Get_size()
45 mpi.rank = mpi.COMM_WORLD.Get_rank()
46 """
47
48
49 pytrilinos_init = """from PyTrilinos import Epetra
50 class SimpleStruct:
51 pass
52 mpi = SimpleStruct()
53 mpi.rank = 0
54 mpi.size = 0
55 """
56
57
58 _description = """Start an IPython engine for parallel computing.\n\n
59
60 IPython engines run in parallel and perform computations on behalf of a client
61 and controller. A controller needs to be started before the engines. The
62 engine can be configured using command line options or using a cluster
63 directory. Cluster directories contain config, log and security files and are
64 usually located in your .ipython directory and named as "cluster_<profile>".
65 See the --profile and --cluster-dir options for details.
66 """
67
68 #-----------------------------------------------------------------------------
69 # Command line options
70 #-----------------------------------------------------------------------------
71
72
73 class IPEngineAppConfigLoader(ClusterDirConfigLoader):
74
75 def _add_arguments(self):
76 super(IPEngineAppConfigLoader, self)._add_arguments()
77 paa = self.parser.add_argument
78 # Controller config
79 paa('--url-file',
80 type=unicode, dest='Global.url_file',
81 help='The full location of the file containing the FURL of the '
82 'controller. If this is not given, the FURL file must be in the '
83 'security directory of the cluster directory. This location is '
84 'resolved using the --profile and --app-dir options.',
85 metavar='Global.url_file')
86 # MPI
87 paa('--mpi',
88 type=str, dest='MPI.use',
89 help='How to enable MPI (mpi4py, pytrilinos, or empty string to disable).',
90 metavar='MPI.use')
91 # Global config
92 paa('--log-to-file',
93 action='store_true', dest='Global.log_to_file',
94 help='Log to a file in the log directory (default is stdout)')
95 paa('--log-url',
96 dest='Global.log_url',
97 help="url of ZMQ logger, as started with iploggerz")
98 paa('--execkey',
99 type=str, dest='Global.exec_key',
100 help='path to a file containing an execution key.',
101 metavar='keyfile')
102 paa('--no-secure',
103 action='store_false', dest='Global.secure',
104 help='Turn off execution keys.')
105 paa('--secure',
106 action='store_true', dest='Global.secure',
107 help='Turn on execution keys (default).')
108 # init command
109 paa('-c',
110 type=str, dest='Global.extra_exec_lines',
111 help='specify a command to be run at startup')
112
113 factory.add_session_arguments(self.parser)
114 factory.add_registration_arguments(self.parser)
115
116
117 #-----------------------------------------------------------------------------
118 # Main application
119 #-----------------------------------------------------------------------------
120
121
122 class IPEngineApp(ApplicationWithClusterDir):
123
124 name = u'ipenginez'
125 description = _description
126 command_line_loader = IPEngineAppConfigLoader
127 default_config_file_name = default_config_file_name
128 auto_create_cluster_dir = True
129
130 def create_default_config(self):
131 super(IPEngineApp, self).create_default_config()
132
133 # The engine should not clean logs as we don't want to remove the
134 # active log files of other running engines.
135 self.default_config.Global.clean_logs = False
136 self.default_config.Global.secure = True
137
138 # Global config attributes
139 self.default_config.Global.exec_lines = []
140 self.default_config.Global.extra_exec_lines = ''
141
142 # Configuration related to the controller
143 # This must match the filename (path not included) that the controller
144 # used for the FURL file.
145 self.default_config.Global.url = u'tcp://localhost:10101'
146 # If given, this is the actual location of the controller's FURL file.
147 # If not, this is computed using the profile, app_dir and furl_file_name
148 self.default_config.Global.key_file_name = u'exec_key.key'
149 self.default_config.Global.key_file = u''
150
151 # MPI related config attributes
152 self.default_config.MPI.use = ''
153 self.default_config.MPI.mpi4py = mpi4py_init
154 self.default_config.MPI.pytrilinos = pytrilinos_init
155
156 def post_load_command_line_config(self):
157 pass
158
159 def pre_construct(self):
160 super(IPEngineApp, self).pre_construct()
161 # self.find_cont_url_file()
162 self.find_key_file()
163 if self.master_config.Global.extra_exec_lines:
164 self.master_config.Global.exec_lines.append(self.master_config.Global.extra_exec_lines)
165
166 def find_key_file(self):
167 """Set the key file.
168
169 Here we don't try to actually see if it exists for is valid as that
170 is hadled by the connection logic.
171 """
172 config = self.master_config
173 # Find the actual controller key file
174 if not config.Global.key_file:
175 try_this = os.path.join(
176 config.Global.cluster_dir,
177 config.Global.security_dir,
178 config.Global.key_file_name
179 )
180 config.Global.key_file = try_this
181
182 def construct(self):
183 # This is the working dir by now.
184 sys.path.insert(0, '')
185 config = self.master_config
186 if os.path.exists(config.Global.key_file) and config.Global.secure:
187 config.SessionFactory.exec_key = config.Global.key_file
188
189 config.Kernel.exec_lines = config.Global.exec_lines
190
191 self.start_mpi()
192
193 # Create the underlying shell class and EngineService
194 # shell_class = import_item(self.master_config.Global.shell_class)
195 try:
196 self.engine = EngineFactory(config=config)
197 except:
198 self.log.error("Couldn't start the Engine", exc_info=True)
199 self.exit(1)
200
201 self.start_logging()
202
203 # Create the service hierarchy
204 # self.main_service = service.MultiService()
205 # self.engine_service.setServiceParent(self.main_service)
206 # self.tub_service = Tub()
207 # self.tub_service.setServiceParent(self.main_service)
208 # # This needs to be called before the connection is initiated
209 # self.main_service.startService()
210
211 # This initiates the connection to the controller and calls
212 # register_engine to tell the controller we are ready to do work
213 # self.engine_connector = EngineConnector(self.tub_service)
214
215 # self.log.info("Using furl file: %s" % self.master_config.Global.furl_file)
216
217 # reactor.callWhenRunning(self.call_connect)
218
219
220 def start_logging(self):
221 super(IPEngineApp, self).start_logging()
222 if self.master_config.Global.log_url:
223 context = self.engine.context
224 lsock = context.socket(zmq.PUB)
225 lsock.connect(self.master_config.Global.log_url)
226 handler = EnginePUBHandler(self.engine, lsock)
227 handler.setLevel(self.log_level)
228 self.log.addHandler(handler)
229
230 def start_mpi(self):
231 global mpi
232 mpikey = self.master_config.MPI.use
233 mpi_import_statement = self.master_config.MPI.get(mpikey, None)
234 if mpi_import_statement is not None:
235 try:
236 self.log.info("Initializing MPI:")
237 self.log.info(mpi_import_statement)
238 exec mpi_import_statement in globals()
239 except:
240 mpi = None
241 else:
242 mpi = None
243
244
245 def start_app(self):
246 self.engine.start()
247 try:
248 self.engine.loop.start()
249 except KeyboardInterrupt:
250 self.log.critical("Engine Interrupted, shutting down...\n")
251
252
253 def launch_new_instance():
254 """Create and run the IPython controller"""
255 app = IPEngineApp()
256 app.start()
257
258
259 if __name__ == '__main__':
260 launch_new_instance()
261
@@ -0,0 +1,131 b''
1 #!/usr/bin/env python
2 # encoding: utf-8
3 """
4 A simple IPython logger application
5 """
6
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2011 The IPython Development Team
9 #
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
13
14 #-----------------------------------------------------------------------------
15 # Imports
16 #-----------------------------------------------------------------------------
17
18 import os
19 import sys
20
21 import zmq
22
23 from IPython.zmq.parallel.clusterdir import (
24 ApplicationWithClusterDir,
25 ClusterDirConfigLoader
26 )
27 from IPython.zmq.parallel.logwatcher import LogWatcher
28
29 #-----------------------------------------------------------------------------
30 # Module level variables
31 #-----------------------------------------------------------------------------
32
33 #: The default config file name for this application
34 default_config_file_name = u'iplogger_config.py'
35
36 _description = """Start an IPython logger for parallel computing.\n\n
37
38 IPython controllers and engines (and your own processes) can broadcast log messages
39 by registering a `zmq.log.handlers.PUBHandler` with the `logging` module. The
40 logger can be configured using command line options or using a cluster
41 directory. Cluster directories contain config, log and security files and are
42 usually located in your .ipython directory and named as "cluster_<profile>".
43 See the --profile and --cluster-dir options for details.
44 """
45
46 #-----------------------------------------------------------------------------
47 # Command line options
48 #-----------------------------------------------------------------------------
49
50
51 class IPLoggerAppConfigLoader(ClusterDirConfigLoader):
52
53 def _add_arguments(self):
54 super(IPLoggerAppConfigLoader, self)._add_arguments()
55 paa = self.parser.add_argument
56 # Controller config
57 paa('--url',
58 type=str, dest='LogWatcher.url',
59 help='The url the LogWatcher will listen on',
60 )
61 # MPI
62 paa('--topics',
63 type=str, dest='LogWatcher.topics', nargs='+',
64 help='What topics to subscribe to',
65 metavar='topics')
66 # Global config
67 paa('--log-to-file',
68 action='store_true', dest='Global.log_to_file',
69 help='Log to a file in the log directory (default is stdout)')
70
71
72 #-----------------------------------------------------------------------------
73 # Main application
74 #-----------------------------------------------------------------------------
75
76
77 class IPLoggerApp(ApplicationWithClusterDir):
78
79 name = u'iploggerz'
80 description = _description
81 command_line_loader = IPLoggerAppConfigLoader
82 default_config_file_name = default_config_file_name
83 auto_create_cluster_dir = True
84
85 def create_default_config(self):
86 super(IPLoggerApp, self).create_default_config()
87
88 # The engine should not clean logs as we don't want to remove the
89 # active log files of other running engines.
90 self.default_config.Global.clean_logs = False
91
92 # If given, this is the actual location of the logger's URL file.
93 # If not, this is computed using the profile, app_dir and furl_file_name
94 self.default_config.Global.url_file_name = u'iplogger.url'
95 self.default_config.Global.url_file = u''
96
97 def post_load_command_line_config(self):
98 pass
99
100 def pre_construct(self):
101 super(IPLoggerApp, self).pre_construct()
102
103 def construct(self):
104 # This is the working dir by now.
105 sys.path.insert(0, '')
106
107 self.start_logging()
108
109 try:
110 self.watcher = LogWatcher(config=self.master_config)
111 except:
112 self.log.error("Couldn't start the LogWatcher", exc_info=True)
113 self.exit(1)
114
115
116 def start_app(self):
117 try:
118 self.watcher.loop.start()
119 except KeyboardInterrupt:
120 self.log.critical("Logging Interrupted, shutting down...\n")
121
122
123 def launch_new_instance():
124 """Create and run the IPython LogWatcher"""
125 app = IPLoggerApp()
126 app.start()
127
128
129 if __name__ == '__main__':
130 launch_new_instance()
131
@@ -0,0 +1,92 b''
1 #!/usr/bin/env python
2 """A simple logger object that consolidates messages incoming from ipclusterz processes."""
3
4 #-----------------------------------------------------------------------------
5 # Copyright (C) 2011 The IPython Development Team
6 #
7 # Distributed under the terms of the BSD License. The full license is in
8 # the file COPYING, distributed as part of this software.
9 #-----------------------------------------------------------------------------
10
11 #-----------------------------------------------------------------------------
12 # Imports
13 #-----------------------------------------------------------------------------
14
15
16 import sys
17 import logging
18
19 import zmq
20 from zmq.eventloop import ioloop, zmqstream
21 from IPython.config.configurable import Configurable
22 from IPython.utils.traitlets import Int, Str, Instance, List
23
24 #-----------------------------------------------------------------------------
25 # Classes
26 #-----------------------------------------------------------------------------
27
28
29 class LogWatcher(Configurable):
30 """A simple class that receives messages on a SUB socket, as published
31 by subclasses of `zmq.log.handlers.PUBHandler`, and logs them itself.
32
33 This can subscribe to multiple topics, but defaults to all topics.
34 """
35 # configurables
36 topics = List([''], config=True)
37 url = Str('tcp://127.0.0.1:20202', config=True)
38
39 # internals
40 context = Instance(zmq.Context, (), {})
41 stream = Instance('zmq.eventloop.zmqstream.ZMQStream')
42 loop = Instance('zmq.eventloop.ioloop.IOLoop')
43 def _loop_default(self):
44 return ioloop.IOLoop.instance()
45
46 def __init__(self, config=None):
47 super(LogWatcher, self).__init__(config=config)
48 s = self.context.socket(zmq.SUB)
49 s.bind(self.url)
50 self.stream = zmqstream.ZMQStream(s, self.loop)
51 self.subscribe()
52 self.on_trait_change(self.subscribe, 'topics')
53
54 self.stream.on_recv(self.log_message)
55
56 def subscribe(self):
57 """Update our SUB socket's subscriptions."""
58 self.stream.setsockopt(zmq.UNSUBSCRIBE, '')
59 for topic in self.topics:
60 logging.debug("Subscribing to: %r"%topic)
61 self.stream.setsockopt(zmq.SUBSCRIBE, topic)
62
63 def _extract_level(self, topic_str):
64 """Turn 'engine.0.INFO.extra' into (logging.INFO, 'engine.0.extra')"""
65 topics = topic_str.split('.')
66 for idx,t in enumerate(topics):
67 level = getattr(logging, t, None)
68 if level is not None:
69 break
70
71 if level is None:
72 level = logging.INFO
73 else:
74 topics.pop(idx)
75
76 return level, '.'.join(topics)
77
78
79 def log_message(self, raw):
80 """receive and parse a message, then log it."""
81 if len(raw) != 2 or '.' not in raw[0]:
82 logging.error("Invalid log message: %s"%raw)
83 return
84 else:
85 topic, msg = raw
86 # don't newline, since log messages always newline:
87 topic,level_name = topic.rsplit('.',1)
88 level,topic = self._extract_level(topic)
89 if msg[-1] == '\n':
90 msg = msg[:-1]
91 logging.log(level, "[%s] %s" % (topic, msg))
92
@@ -60,7 +60,7 b' from .importstring import import_item'
60
60
61 ClassTypes = (ClassType, type)
61 ClassTypes = (ClassType, type)
62
62
63 SequenceTypes = (ListType, TupleType)
63 SequenceTypes = (ListType, TupleType, set, frozenset)
64
64
65 #-----------------------------------------------------------------------------
65 #-----------------------------------------------------------------------------
66 # Basic classes
66 # Basic classes
@@ -1018,7 +1018,7 b' class List(Instance):'
1018 """An instance of a Python list."""
1018 """An instance of a Python list."""
1019
1019
1020 def __init__(self, default_value=None, allow_none=True, **metadata):
1020 def __init__(self, default_value=None, allow_none=True, **metadata):
1021 """Create a list trait type from a list or tuple.
1021 """Create a list trait type from a list, set, or tuple.
1022
1022
1023 The default value is created by doing ``list(default_value)``,
1023 The default value is created by doing ``list(default_value)``,
1024 which creates a copy of the ``default_value``.
1024 which creates a copy of the ``default_value``.
@@ -1034,6 +1034,26 b' class List(Instance):'
1034 allow_none=allow_none, **metadata)
1034 allow_none=allow_none, **metadata)
1035
1035
1036
1036
1037 class Set(Instance):
1038 """An instance of a Python set."""
1039
1040 def __init__(self, default_value=None, allow_none=True, **metadata):
1041 """Create a set trait type from a set, list, or tuple.
1042
1043 The default value is created by doing ``set(default_value)``,
1044 which creates a copy of the ``default_value``.
1045 """
1046 if default_value is None:
1047 args = ((),)
1048 elif isinstance(default_value, SequenceTypes):
1049 args = (default_value,)
1050 else:
1051 raise TypeError('default value of Set was %s' % default_value)
1052
1053 super(Set,self).__init__(klass=set, args=args,
1054 allow_none=allow_none, **metadata)
1055
1056
1037 class Dict(Instance):
1057 class Dict(Instance):
1038 """An instance of a Python dict."""
1058 """An instance of a Python dict."""
1039
1059
@@ -16,6 +16,7 b' and monitors traffic through the various queues.'
16 from __future__ import print_function
16 from __future__ import print_function
17
17
18 import os
18 import os
19 import sys
19 import time
20 import time
20 import logging
21 import logging
21 from multiprocessing import Process
22 from multiprocessing import Process
@@ -23,12 +24,13 b' from multiprocessing import Process'
23 import zmq
24 import zmq
24 from zmq.eventloop import ioloop
25 from zmq.eventloop import ioloop
25 from zmq.eventloop.zmqstream import ZMQStream
26 from zmq.eventloop.zmqstream import ZMQStream
26 from zmq.devices import ProcessMonitoredQueue
27 # from zmq.devices import ProcessMonitoredQueue
27
28
28 # internal:
29 # internal:
30 from IPython.utils.importstring import import_item
31 from IPython.utils.traitlets import Int, Str, Instance, List, Bool
29 from IPython.zmq.entry_point import bind_port
32 from IPython.zmq.entry_point import bind_port
30
33
31 from hub import Hub
32 from entry_point import (make_base_argument_parser, select_random_ports, split_ports,
34 from entry_point import (make_base_argument_parser, select_random_ports, split_ports,
33 connect_logger, parse_url, signal_children, generate_exec_key,
35 connect_logger, parse_url, signal_children, generate_exec_key,
34 local_logger)
36 local_logger)
@@ -37,6 +39,7 b' from entry_point import (make_base_argument_parser, select_random_ports, split_p'
37 import streamsession as session
39 import streamsession as session
38 import heartmonitor
40 import heartmonitor
39 from scheduler import launch_scheduler
41 from scheduler import launch_scheduler
42 from hub import Hub, HubFactory
40
43
41 from dictdb import DictDB
44 from dictdb import DictDB
42 try:
45 try:
@@ -81,8 +84,84 b' def make_argument_parser():'
81 help='Manually specify the session id.')
84 help='Manually specify the session id.')
82
85
83 return parser
86 return parser
87
88 class ControllerFactory(HubFactory):
89 """Configurable for setting up a Hub and Schedulers."""
90
91 scheme = Str('pure', config=True)
92 usethreads = Bool(False, config=True)
93
94 # internal
95 children = List()
96 mq_class = Str('zmq.devices.ProcessMonitoredQueue')
97
98 def _update_mq(self):
99 self.mq_class = 'zmq.devices.%sMonitoredQueue'%('Thread' if self.usethreads else 'Process')
100
101 def __init__(self, **kwargs):
102 super(ControllerFactory, self).__init__(**kwargs)
103 self.subconstructors.append(self.construct_schedulers)
104 self._update_mq()
105 self.on_trait_change(self._update_mq, 'usethreads')
106
107 def start(self):
108 super(ControllerFactory, self).start()
109 for child in self.children:
110 child.start()
111 if not self.usethreads:
112 signal_children([ getattr(c, 'launcher', c) for c in self.children ])
113
114
115 def construct_schedulers(self):
116 children = self.children
117 mq = import_item(self.mq_class)
118
119 # IOPub relay (in a Process)
120 q = mq(zmq.PUB, zmq.SUB, zmq.PUB, 'N/A','iopub')
121 q.bind_in(self.client_addrs['iopub'])
122 q.bind_out(self.engine_addrs['iopub'])
123 q.setsockopt_out(zmq.SUBSCRIBE, '')
124 q.connect_mon(self.monitor_url)
125 q.daemon=True
126 children.append(q)
127
128 # Multiplexer Queue (in a Process)
129 q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'in', 'out')
130 q.bind_in(self.client_addrs['mux'])
131 q.bind_out(self.engine_addrs['mux'])
132 q.connect_mon(self.monitor_url)
133 q.daemon=True
134 children.append(q)
135
136 # Control Queue (in a Process)
137 q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'incontrol', 'outcontrol')
138 q.bind_in(self.client_addrs['control'])
139 q.bind_out(self.engine_addrs['control'])
140 q.connect_mon(self.monitor_url)
141 q.daemon=True
142 children.append(q)
143 # Task Queue (in a Process)
144 if self.scheme == 'pure':
145 logging.warn("task::using pure XREQ Task scheduler")
146 q = mq(zmq.XREP, zmq.XREQ, zmq.PUB, 'intask', 'outtask')
147 q.bind_in(self.client_addrs['task'])
148 q.bind_out(self.engine_addrs['task'])
149 q.connect_mon(self.monitor_url)
150 q.daemon=True
151 children.append(q)
152 elif self.scheme == 'none':
153 logging.warn("task::using no Task scheduler")
154
155 else:
156 logging.warn("task::using Python %s Task scheduler"%self.scheme)
157 sargs = (self.client_addrs['task'], self.engine_addrs['task'], self.monitor_url, self.client_addrs['notification'])
158 q = Process(target=launch_scheduler, args=sargs, kwargs = dict(scheme=self.scheme))
159 q.daemon=True
160 children.append(q)
161
84
162
85 def main(argv=None):
163 def main(argv=None):
164 """DO NOT USE ME ANYMORE"""
86
165
87 parser = make_argument_parser()
166 parser = make_argument_parser()
88
167
@@ -256,9 +335,10 b' def main(argv=None):'
256
335
257 dc = ioloop.DelayedCallback(lambda : print("Controller started..."), 100, loop)
336 dc = ioloop.DelayedCallback(lambda : print("Controller started..."), 100, loop)
258 dc.start()
337 dc.start()
259 loop.start()
338 try:
260
339 loop.start()
261
340 except KeyboardInterrupt:
341 print ("interrupted, exiting...", file=sys.__stderr__)
262
342
263
343
264 if __name__ == '__main__':
344 if __name__ == '__main__':
@@ -75,7 +75,11 b' class CompositeFilter(object):'
75 return False
75 return False
76 return True
76 return True
77
77
78 class DictDB(object):
78 class BaseDB(object):
79 """Empty Parent class so traitlets work on DB."""
80 pass
81
82 class DictDB(BaseDB):
79 """Basic in-memory dict-based object for saving Task Records.
83 """Basic in-memory dict-based object for saving Task Records.
80
84
81 This is the first object to present the DB interface
85 This is the first object to present the DB interface
@@ -16,43 +16,49 b' from zmq.eventloop import ioloop, zmqstream'
16
16
17 # internal
17 # internal
18 from IPython.config.configurable import Configurable
18 from IPython.config.configurable import Configurable
19 from IPython.utils.traitlets import Instance, Str, Dict
19 from IPython.utils.traitlets import Instance, Str, Dict, Int, Type
20 # from IPython.utils.localinterfaces import LOCALHOST
20 # from IPython.utils.localinterfaces import LOCALHOST
21
21
22 from factory import RegistrationFactory
23
22 from streamsession import Message, StreamSession
24 from streamsession import Message, StreamSession
23 from streamkernel import Kernel, make_kernel
25 from streamkernel import Kernel, make_kernel
24 import heartmonitor
26 import heartmonitor
25 from entry_point import (make_base_argument_parser, connect_engine_logger, parse_url,
27 from entry_point import (make_base_argument_parser, connect_engine_logger, parse_url,
26 local_logger)
28 local_logger)
27 # import taskthread
29 # import taskthread
28 logger = logging.getLogger()
29
30
30 def printer(*msg):
31 def printer(*msg):
31 # print (logger.handlers, file=sys.__stdout__)
32 # print (logging.handlers, file=sys.__stdout__)
32 logger.info(str(msg))
33 logging.info(str(msg))
33
34
34 class Engine(Configurable):
35 class EngineFactory(RegistrationFactory):
35 """IPython engine"""
36 """IPython engine"""
36
37
37 kernel=None
38 id=None
39
40 # configurables:
38 # configurables:
41 context=Instance(zmq.Context)
39 user_ns=Dict(config=True)
42 loop=Instance(ioloop.IOLoop)
40 out_stream_factory=Type('IPython.zmq.iostream.OutStream', config=True)
43 session=Instance(StreamSession)
41 display_hook_factory=Type('IPython.zmq.displayhook.DisplayHook', config=True)
44 ident=Str()
42
45 registrar=Instance(zmqstream.ZMQStream)
43 # not configurable:
46 user_ns=Dict()
44 id=Int(allow_none=True)
45 registrar=Instance('zmq.eventloop.zmqstream.ZMQStream')
46 kernel=Instance(Kernel)
47
47
48
48 def __init__(self, **kwargs):
49 def __init__(self, **kwargs):
49 super(Engine, self).__init__(**kwargs)
50 super(EngineFactory, self).__init__(**kwargs)
50 if not self.ident:
51 ctx = self.context
51 self.ident = str(uuid.uuid4())
52
52 self.registrar.on_send(printer)
53 reg = ctx.socket(zmq.PAIR)
54 reg.setsockopt(zmq.IDENTITY, self.ident)
55 reg.connect(self.url)
56 self.registrar = zmqstream.ZMQStream(reg, self.loop)
53
57
54 def register(self):
58 def register(self):
59 """send the registration_request"""
55
60
61 logging.info("registering")
56 content = dict(queue=self.ident, heartbeat=self.ident, control=self.ident)
62 content = dict(queue=self.ident, heartbeat=self.ident, control=self.ident)
57 self.registrar.on_recv(self.complete_registration)
63 self.registrar.on_recv(self.complete_registration)
58 # print (self.session.key)
64 # print (self.session.key)
@@ -60,50 +66,87 b' class Engine(Configurable):'
60
66
61 def complete_registration(self, msg):
67 def complete_registration(self, msg):
62 # print msg
68 # print msg
69 ctx = self.context
70 loop = self.loop
71 identity = self.ident
72 print (identity)
73
63 idents,msg = self.session.feed_identities(msg)
74 idents,msg = self.session.feed_identities(msg)
64 msg = Message(self.session.unpack_message(msg))
75 msg = Message(self.session.unpack_message(msg))
76
65 if msg.content.status == 'ok':
77 if msg.content.status == 'ok':
66 self.id = int(msg.content.id)
78 self.id = int(msg.content.id)
67 self.session.username = 'engine-%i'%self.id
79
80 # create Shell Streams (MUX, Task, etc.):
68 queue_addr = msg.content.mux
81 queue_addr = msg.content.mux
69 shell_addrs = [ str(queue_addr) ]
82 shell_addrs = [ str(queue_addr) ]
70 control_addr = str(msg.content.control)
71 task_addr = msg.content.task
83 task_addr = msg.content.task
72 iopub_addr = msg.content.iopub
73 if task_addr:
84 if task_addr:
74 shell_addrs.append(str(task_addr))
85 shell_addrs.append(str(task_addr))
86 shell_streams = []
87 for addr in shell_addrs:
88 stream = zmqstream.ZMQStream(ctx.socket(zmq.PAIR), loop)
89 stream.setsockopt(zmq.IDENTITY, identity)
90 stream.connect(addr)
91 shell_streams.append(stream)
92
93 # control stream:
94 control_addr = str(msg.content.control)
95 control_stream = zmqstream.ZMQStream(ctx.socket(zmq.PAIR), loop)
96 control_stream.setsockopt(zmq.IDENTITY, identity)
97 control_stream.connect(control_addr)
75
98
99 # create iopub stream:
100 iopub_addr = msg.content.iopub
101 iopub_stream = zmqstream.ZMQStream(ctx.socket(zmq.PUB), loop)
102 iopub_stream.setsockopt(zmq.IDENTITY, identity)
103 iopub_stream.connect(iopub_addr)
104
105 # launch heartbeat
76 hb_addrs = msg.content.heartbeat
106 hb_addrs = msg.content.heartbeat
107 # print (hb_addrs)
108
109 # # Redirect input streams and set a display hook.
110 # if self.out_stream_factory:
111 # sys.stdout = self.out_stream_factory(self.session, iopub_stream, u'stdout')
112 # sys.stdout.topic = 'engine.%i.stdout'%self.id
113 # sys.stderr = self.out_stream_factory(self.session, iopub_stream, u'stderr')
114 # sys.stderr.topic = 'engine.%i.stderr'%self.id
115 # if self.display_hook_factory:
116 # sys.displayhook = self.display_hook_factory(self.session, iopub_stream)
117 # sys.displayhook.topic = 'engine.%i.pyout'%self.id
118
77 # ioloop.DelayedCallback(self.heart.start, 1000, self.loop).start()
119 # ioloop.DelayedCallback(self.heart.start, 1000, self.loop).start()
78 k = make_kernel(self.id, self.ident, control_addr, shell_addrs, iopub_addr,
120 self.kernel = Kernel(int_id=self.id, ident=self.ident, session=self.session,
79 hb_addrs, client_addr=None, loop=self.loop,
121 control_stream=control_stream,
80 context=self.context, key=self.session.key)[-1]
122 shell_streams=shell_streams, iopub_stream=iopub_stream, loop=loop,
81 self.kernel = k
123 user_ns = self.user_ns, config=self.config)
82 if self.user_ns is not None:
124 self.kernel.start()
83 self.user_ns.update(self.kernel.user_ns)
125
84 self.kernel.user_ns = self.user_ns
126 heart = heartmonitor.Heart(*map(str, hb_addrs), heart_id=identity)
127 heart.start()
128
85
129
86 else:
130 else:
87 logger.error("Registration Failed: %s"%msg)
131 logging.error("Registration Failed: %s"%msg)
88 raise Exception("Registration Failed: %s"%msg)
132 raise Exception("Registration Failed: %s"%msg)
89
133
90 logger.info("completed registration with id %i"%self.id)
134 logging.info("Completed registration with id %i"%self.id)
91
135
92 # logger.info(str(msg))
93
136
94 def unregister(self):
137 def unregister(self):
95 self.session.send(self.registrar, "unregistration_request", content=dict(id=int(self.session.username)))
138 self.session.send(self.registrar, "unregistration_request", content=dict(id=self.id))
96 time.sleep(1)
139 time.sleep(1)
97 sys.exit(0)
140 sys.exit(0)
98
141
99 def start(self):
142 def start(self):
100 logger.info("registering")
143 dc = ioloop.DelayedCallback(self.register, 0, self.loop)
101 self.register()
144 dc.start()
145
102
146
103
104
147
105 def main(argv=None, user_ns=None):
148 def main(argv=None, user_ns=None):
106
149 """DO NOT USE ME ANYMORE"""
107 parser = make_base_argument_parser()
150 parser = make_base_argument_parser()
108
151
109 args = parser.parse_args(argv)
152 args = parser.parse_args(argv)
@@ -137,7 +180,10 b' def main(argv=None, user_ns=None):'
137
180
138 dc = ioloop.DelayedCallback(e.start, 0, loop)
181 dc = ioloop.DelayedCallback(e.start, 0, loop)
139 dc.start()
182 dc.start()
140 loop.start()
183 try:
184 loop.start()
185 except KeyboardInterrupt:
186 print ("interrupted, exiting...", file=sys.__stderr__)
141
187
142 # Execution as a script
188 # Execution as a script
143 if __name__ == '__main__':
189 if __name__ == '__main__':
@@ -1,5 +1,9 b''
1 """ Defines helper functions for creating kernel entry points and process
1 """ Defines helper functions for creating kernel entry points and process
2 launchers.
2 launchers.
3
4 ************
5 NOTE: Most of this module has been deprecated by moving to Configurables
6 ************
3 """
7 """
4
8
5 # Standard library imports.
9 # Standard library imports.
@@ -33,17 +37,24 b' def split_ports(s, n):'
33 raise ValueError
37 raise ValueError
34 return ports
38 return ports
35
39
40 _random_ports = set()
41
36 def select_random_ports(n):
42 def select_random_ports(n):
37 """Selects and return n random ports that are available."""
43 """Selects and return n random ports that are available."""
38 ports = []
44 ports = []
39 for i in xrange(n):
45 for i in xrange(n):
40 sock = socket.socket()
46 sock = socket.socket()
41 sock.bind(('', 0))
47 sock.bind(('', 0))
48 while sock.getsockname()[1] in _random_ports:
49 sock.close()
50 sock = socket.socket()
51 sock.bind(('', 0))
42 ports.append(sock)
52 ports.append(sock)
43 for i, sock in enumerate(ports):
53 for i, sock in enumerate(ports):
44 port = sock.getsockname()[1]
54 port = sock.getsockname()[1]
45 sock.close()
55 sock.close()
46 ports[i] = port
56 ports[i] = port
57 _random_ports.add(port)
47 return ports
58 return ports
48
59
49 def parse_url(args):
60 def parse_url(args):
@@ -61,8 +72,11 b' def parse_url(args):'
61 def signal_children(children):
72 def signal_children(children):
62 """Relay interupt/term signals to children, for more solid process cleanup."""
73 """Relay interupt/term signals to children, for more solid process cleanup."""
63 def terminate_children(sig, frame):
74 def terminate_children(sig, frame):
75 logging.critical("Got signal %i, terminating children..."%sig)
64 for child in children:
76 for child in children:
65 child.terminate()
77 child.terminate()
78
79 sys.exit(sig != SIGINT)
66 # sys.exit(sig)
80 # sys.exit(sig)
67 for sig in (SIGINT, SIGABRT, SIGTERM):
81 for sig in (SIGINT, SIGABRT, SIGTERM):
68 signal(sig, terminate_children)
82 signal(sig, terminate_children)
@@ -72,7 +86,7 b' def generate_exec_key(keyfile):'
72 newkey = str(uuid.uuid4())
86 newkey = str(uuid.uuid4())
73 with open(keyfile, 'w') as f:
87 with open(keyfile, 'w') as f:
74 # f.write('ipython-key ')
88 # f.write('ipython-key ')
75 f.write(newkey)
89 f.write(newkey+'\n')
76 # set user-only RW permissions (0600)
90 # set user-only RW permissions (0600)
77 # this will have no effect on Windows
91 # this will have no effect on Windows
78 os.chmod(keyfile, stat.S_IRUSR|stat.S_IWUSR)
92 os.chmod(keyfile, stat.S_IRUSR|stat.S_IWUSR)
@@ -115,18 +129,24 b' def integer_loglevel(loglevel):'
115 return loglevel
129 return loglevel
116
130
117 def connect_logger(context, iface, root="ip", loglevel=logging.DEBUG):
131 def connect_logger(context, iface, root="ip", loglevel=logging.DEBUG):
132 logger = logging.getLogger()
133 if any([isinstance(h, handlers.PUBHandler) for h in logger.handlers]):
134 # don't add a second PUBHandler
135 return
118 loglevel = integer_loglevel(loglevel)
136 loglevel = integer_loglevel(loglevel)
119 lsock = context.socket(zmq.PUB)
137 lsock = context.socket(zmq.PUB)
120 lsock.connect(iface)
138 lsock.connect(iface)
121 handler = handlers.PUBHandler(lsock)
139 handler = handlers.PUBHandler(lsock)
122 handler.setLevel(loglevel)
140 handler.setLevel(loglevel)
123 handler.root_topic = root
141 handler.root_topic = root
124 logger = logging.getLogger()
125 logger.addHandler(handler)
142 logger.addHandler(handler)
126 logger.setLevel(loglevel)
143 logger.setLevel(loglevel)
127
144
128 def connect_engine_logger(context, iface, engine, loglevel=logging.DEBUG):
145 def connect_engine_logger(context, iface, engine, loglevel=logging.DEBUG):
129 logger = logging.getLogger()
146 logger = logging.getLogger()
147 if any([isinstance(h, handlers.PUBHandler) for h in logger.handlers]):
148 # don't add a second PUBHandler
149 return
130 loglevel = integer_loglevel(loglevel)
150 loglevel = integer_loglevel(loglevel)
131 lsock = context.socket(zmq.PUB)
151 lsock = context.socket(zmq.PUB)
132 lsock.connect(iface)
152 lsock.connect(iface)
@@ -138,8 +158,8 b' def connect_engine_logger(context, iface, engine, loglevel=logging.DEBUG):'
138 def local_logger(loglevel=logging.DEBUG):
158 def local_logger(loglevel=logging.DEBUG):
139 loglevel = integer_loglevel(loglevel)
159 loglevel = integer_loglevel(loglevel)
140 logger = logging.getLogger()
160 logger = logging.getLogger()
141 if logger.handlers:
161 if any([isinstance(h, logging.StreamHandler) for h in logger.handlers]):
142 # if there are any handlers, skip the hookup
162 # don't add a second StreamHandler
143 return
163 return
144 handler = logging.StreamHandler()
164 handler = logging.StreamHandler()
145 handler.setLevel(loglevel)
165 handler.setLevel(loglevel)
@@ -13,8 +13,6 b' import zmq'
13 from zmq.devices import ProcessDevice,ThreadDevice
13 from zmq.devices import ProcessDevice,ThreadDevice
14 from zmq.eventloop import ioloop, zmqstream
14 from zmq.eventloop import ioloop, zmqstream
15
15
16 logger = logging.getLogger()
17
18 class Heart(object):
16 class Heart(object):
19 """A basic heart object for responding to a HeartMonitor.
17 """A basic heart object for responding to a HeartMonitor.
20 This is a simple wrapper with defaults for the most common
18 This is a simple wrapper with defaults for the most common
@@ -78,12 +76,12 b' class HeartMonitor(object):'
78
76
79 def add_new_heart_handler(self, handler):
77 def add_new_heart_handler(self, handler):
80 """add a new handler for new hearts"""
78 """add a new handler for new hearts"""
81 logger.debug("heartbeat::new_heart_handler: %s"%handler)
79 logging.debug("heartbeat::new_heart_handler: %s"%handler)
82 self._new_handlers.add(handler)
80 self._new_handlers.add(handler)
83
81
84 def add_heart_failure_handler(self, handler):
82 def add_heart_failure_handler(self, handler):
85 """add a new handler for heart failure"""
83 """add a new handler for heart failure"""
86 logger.debug("heartbeat::new heart failure handler: %s"%handler)
84 logging.debug("heartbeat::new heart failure handler: %s"%handler)
87 self._failure_handlers.add(handler)
85 self._failure_handlers.add(handler)
88
86
89 def beat(self):
87 def beat(self):
@@ -93,7 +91,7 b' class HeartMonitor(object):'
93 toc = time.time()
91 toc = time.time()
94 self.lifetime += toc-self.tic
92 self.lifetime += toc-self.tic
95 self.tic = toc
93 self.tic = toc
96 # logger.debug("heartbeat::%s"%self.lifetime)
94 # logging.debug("heartbeat::%s"%self.lifetime)
97 goodhearts = self.hearts.intersection(self.responses)
95 goodhearts = self.hearts.intersection(self.responses)
98 missed_beats = self.hearts.difference(goodhearts)
96 missed_beats = self.hearts.difference(goodhearts)
99 heartfailures = self.on_probation.intersection(missed_beats)
97 heartfailures = self.on_probation.intersection(missed_beats)
@@ -103,7 +101,7 b' class HeartMonitor(object):'
103 self.on_probation = missed_beats.intersection(self.hearts)
101 self.on_probation = missed_beats.intersection(self.hearts)
104 self.responses = set()
102 self.responses = set()
105 # print self.on_probation, self.hearts
103 # print self.on_probation, self.hearts
106 # logger.debug("heartbeat::beat %.3f, %i beating hearts"%(self.lifetime, len(self.hearts)))
104 # logging.debug("heartbeat::beat %.3f, %i beating hearts"%(self.lifetime, len(self.hearts)))
107 self.pingstream.send(str(self.lifetime))
105 self.pingstream.send(str(self.lifetime))
108
106
109 def handle_new_heart(self, heart):
107 def handle_new_heart(self, heart):
@@ -111,7 +109,7 b' class HeartMonitor(object):'
111 for handler in self._new_handlers:
109 for handler in self._new_handlers:
112 handler(heart)
110 handler(heart)
113 else:
111 else:
114 logger.info("heartbeat::yay, got new heart %s!"%heart)
112 logging.info("heartbeat::yay, got new heart %s!"%heart)
115 self.hearts.add(heart)
113 self.hearts.add(heart)
116
114
117 def handle_heart_failure(self, heart):
115 def handle_heart_failure(self, heart):
@@ -120,11 +118,10 b' class HeartMonitor(object):'
120 try:
118 try:
121 handler(heart)
119 handler(heart)
122 except Exception as e:
120 except Exception as e:
123 print (e)
121 logging.error("heartbeat::Bad Handler! %s"%handler, exc_info=True)
124 logger.error("heartbeat::Bad Handler! %s"%handler)
125 pass
122 pass
126 else:
123 else:
127 logger.info("heartbeat::Heart %s failed :("%heart)
124 logging.info("heartbeat::Heart %s failed :("%heart)
128 self.hearts.remove(heart)
125 self.hearts.remove(heart)
129
126
130
127
@@ -132,14 +129,14 b' class HeartMonitor(object):'
132 "a heart just beat"
129 "a heart just beat"
133 if msg[1] == str(self.lifetime):
130 if msg[1] == str(self.lifetime):
134 delta = time.time()-self.tic
131 delta = time.time()-self.tic
135 # logger.debug("heartbeat::heart %r took %.2f ms to respond"%(msg[0], 1000*delta))
132 # logging.debug("heartbeat::heart %r took %.2f ms to respond"%(msg[0], 1000*delta))
136 self.responses.add(msg[0])
133 self.responses.add(msg[0])
137 elif msg[1] == str(self.last_ping):
134 elif msg[1] == str(self.last_ping):
138 delta = time.time()-self.tic + (self.lifetime-self.last_ping)
135 delta = time.time()-self.tic + (self.lifetime-self.last_ping)
139 logger.warn("heartbeat::heart %r missed a beat, and took %.2f ms to respond"%(msg[0], 1000*delta))
136 logging.warn("heartbeat::heart %r missed a beat, and took %.2f ms to respond"%(msg[0], 1000*delta))
140 self.responses.add(msg[0])
137 self.responses.add(msg[0])
141 else:
138 else:
142 logger.warn("heartbeat::got bad heartbeat (possibly old?): %s (current=%.3f)"%
139 logging.warn("heartbeat::got bad heartbeat (possibly old?): %s (current=%.3f)"%
143 (msg[1],self.lifetime))
140 (msg[1],self.lifetime))
144
141
145
142
@@ -21,12 +21,16 b' import time'
21 import logging
21 import logging
22
22
23 import zmq
23 import zmq
24 from zmq.eventloop import ioloop, zmqstream
24 from zmq.eventloop import ioloop
25 from zmq.eventloop.zmqstream import ZMQStream
25
26
26 # internal:
27 # internal:
27 from IPython.config.configurable import Configurable
28 from IPython.config.configurable import Configurable
28 from IPython.utils.traitlets import HasTraits, Instance, Int, Str, Dict
29 from IPython.utils.traitlets import HasTraits, Instance, Int, Str, Dict, Set, List, Bool
29 # from IPython.zmq.log import logger # a Logger object
30 from IPython.utils.importstring import import_item
31
32 from entry_point import select_random_ports
33 from factory import RegistrationFactory
30
34
31 from streamsession import Message, wrap_exception, ISO8601
35 from streamsession import Message, wrap_exception, ISO8601
32 from heartmonitor import HeartMonitor
36 from heartmonitor import HeartMonitor
@@ -43,8 +47,6 b' else:'
43 # Code
47 # Code
44 #-----------------------------------------------------------------------------
48 #-----------------------------------------------------------------------------
45
49
46 logger = logging.getLogger()
47
48 def _passer(*args, **kwargs):
50 def _passer(*args, **kwargs):
49 return
51 return
50
52
@@ -92,13 +94,179 b' class EngineConnector(HasTraits):'
92 control=Str()
94 control=Str()
93 registration=Str()
95 registration=Str()
94 heartbeat=Str()
96 heartbeat=Str()
95 pending=Instance(set)
97 pending=Set()
96
98
97 def __init__(self, **kwargs):
99 def __init__(self, **kwargs):
98 super(EngineConnector, self).__init__(**kwargs)
100 super(EngineConnector, self).__init__(**kwargs)
99 logger.info("engine::Engine Connected: %i"%self.id)
101 logging.info("engine::Engine Connected: %i"%self.id)
102
103 class HubFactory(RegistrationFactory):
104 """The Configurable for setting up a Hub."""
105
106 # port-pairs for monitoredqueues:
107 hb = Instance(list, config=True)
108 def _hb_default(self):
109 return select_random_ports(2)
110
111 mux = Instance(list, config=True)
112 def _mux_default(self):
113 return select_random_ports(2)
114
115 task = Instance(list, config=True)
116 def _task_default(self):
117 return select_random_ports(2)
118
119 control = Instance(list, config=True)
120 def _control_default(self):
121 return select_random_ports(2)
122
123 iopub = Instance(list, config=True)
124 def _iopub_default(self):
125 return select_random_ports(2)
126
127 # single ports:
128 mon_port = Instance(int, config=True)
129 def _mon_port_default(self):
130 return select_random_ports(1)[0]
131
132 query_port = Instance(int, config=True)
133 def _query_port_default(self):
134 return select_random_ports(1)[0]
135
136 notifier_port = Instance(int, config=True)
137 def _notifier_port_default(self):
138 return select_random_ports(1)[0]
139
140 ping = Int(1000, config=True) # ping frequency
141
142 engine_ip = Str('127.0.0.1', config=True)
143 engine_transport = Str('tcp', config=True)
144
145 client_ip = Str('127.0.0.1', config=True)
146 client_transport = Str('tcp', config=True)
147
148 monitor_ip = Str('127.0.0.1', config=True)
149 monitor_transport = Str('tcp', config=True)
150
151 monitor_url = Str('')
152
153 db_class = Str('IPython.zmq.parallel.dictdb.DictDB', config=True)
154
155 # not configurable
156 db = Instance('IPython.zmq.parallel.dictdb.BaseDB')
157 heartmonitor = Instance('IPython.zmq.parallel.heartmonitor.HeartMonitor')
158 subconstructors = List()
159 _constructed = Bool(False)
160
161 def _update_monitor_url(self):
162 self.monitor_url = "%s://%s:%i"%(self.monitor_transport, self.monitor_ip, self.mon_port)
163
164 def _sync_ips(self):
165 self.engine_ip = self.ip
166 self.client_ip = self.ip
167 self.monitor_ip = self.ip
168 self._update_monitor_url()
169
170 def _sync_transports(self):
171 self.engine_transport = self.transport
172 self.client_transport = self.transport
173 self.monitor_transport = self.transport
174 self._update_monitor_url()
175
176 def __init__(self, **kwargs):
177 super(HubFactory, self).__init__(**kwargs)
178 self._update_monitor_url()
179 self.on_trait_change(self._sync_ips, 'ip')
180 self.on_trait_change(self._sync_transports, 'transport')
181 self.subconstructors.append(self.construct_hub)
182
183
184 def construct(self):
185 assert not self._constructed, "already constructed!"
186
187 for subc in self.subconstructors:
188 subc()
189
190 self._constructed = True
191
192
193 def start(self):
194 assert self._constructed, "must be constructed by self.construct() first!"
195 self.heartmonitor.start()
196 logging.info("Heartmonitor started")
197
198 def construct_hub(self):
199 """construct"""
200 client_iface = "%s://%s:"%(self.client_transport, self.client_ip) + "%i"
201 engine_iface = "%s://%s:"%(self.engine_transport, self.engine_ip) + "%i"
202
203 ctx = self.context
204 loop = self.loop
205
206 # Registrar socket
207 reg = ZMQStream(ctx.socket(zmq.XREP), loop)
208 reg.bind(client_iface % self.regport)
209 logging.info("Hub listening on %s for registration."%(client_iface%self.regport))
210 if self.client_ip != self.engine_ip:
211 reg.bind(engine_iface % self.regport)
212 logging.info("Hub listening on %s for registration."%(engine_iface%self.regport))
213
214 ### Engine connections ###
215
216 # heartbeat
217 hpub = ctx.socket(zmq.PUB)
218 hpub.bind(engine_iface % self.hb[0])
219 hrep = ctx.socket(zmq.XREP)
220 hrep.bind(engine_iface % self.hb[1])
221
222 self.heartmonitor = HeartMonitor(loop, ZMQStream(hpub,loop), ZMQStream(hrep,loop), self.ping)
223
224 ### Client connections ###
225 # Clientele socket
226 c = ZMQStream(ctx.socket(zmq.XREP), loop)
227 c.bind(client_iface%self.query_port)
228 # Notifier socket
229 n = ZMQStream(ctx.socket(zmq.PUB), loop)
230 n.bind(client_iface%self.notifier_port)
231
232 ### build and launch the queues ###
233
234 # monitor socket
235 sub = ctx.socket(zmq.SUB)
236 sub.setsockopt(zmq.SUBSCRIBE, "")
237 sub.bind(self.monitor_url)
238 sub = ZMQStream(sub, loop)
100
239
101 class Hub(Configurable):
240 # connect the db
241 self.db = import_item(self.db_class)()
242 time.sleep(.25)
243
244 # build connection dicts
245 self.engine_addrs = {
246 'control' : engine_iface%self.control[1],
247 'mux': engine_iface%self.mux[1],
248 'heartbeat': (engine_iface%self.hb[0], engine_iface%self.hb[1]),
249 'task' : engine_iface%self.task[1],
250 'iopub' : engine_iface%self.iopub[1],
251 # 'monitor' : engine_iface%self.mon_port,
252 }
253
254 self.client_addrs = {
255 'control' : client_iface%self.control[0],
256 'query': client_iface%self.query_port,
257 'mux': client_iface%self.mux[0],
258 'task' : client_iface%self.task[0],
259 'iopub' : client_iface%self.iopub[0],
260 'notification': client_iface%self.notifier_port
261 }
262 logging.debug("hub::Hub engine addrs: %s"%self.engine_addrs)
263 logging.debug("hub::Hub client addrs: %s"%self.client_addrs)
264 self.hub = Hub(loop=loop, session=self.session, monitor=sub, heartmonitor=self.heartmonitor,
265 registrar=reg, clientele=c, notifier=n, db=self.db,
266 engine_addrs=self.engine_addrs, client_addrs=self.client_addrs)
267
268
269 class Hub(HasTraits):
102 """The IPython Controller Hub with 0MQ connections
270 """The IPython Controller Hub with 0MQ connections
103
271
104 Parameters
272 Parameters
@@ -120,25 +288,29 b' class Hub(Configurable):'
120 to the queues.
288 to the queues.
121 """
289 """
122 # internal data structures:
290 # internal data structures:
123 ids=None # engine IDs
291 ids=Set() # engine IDs
124 keytable=None
292 keytable=Dict()
125 engines=None
293 by_ident=Dict()
126 clients=None
294 engines=Dict()
127 hearts=None
295 clients=Dict()
128 pending=None
296 hearts=Dict()
129 tasks=None
297 pending=Set()
130 completed=None
298 queues=Dict() # pending msg_ids keyed by engine_id
299 tasks=Dict() # pending msg_ids submitted as tasks, keyed by client_id
300 completed=Dict() # completed msg_ids keyed by engine_id
301 all_completed=Set() # completed msg_ids keyed by engine_id
131 # mia=None
302 # mia=None
132 incoming_registrations=None
303 incoming_registrations=Dict()
133 registration_timeout=None
304 registration_timeout=Int()
305 _idcounter=Int(0)
134
306
135 # objects from constructor:
307 # objects from constructor:
136 loop=Instance(ioloop.IOLoop)
308 loop=Instance(ioloop.IOLoop)
137 registrar=Instance(zmqstream.ZMQStream)
309 registrar=Instance(ZMQStream)
138 clientele=Instance(zmqstream.ZMQStream)
310 clientele=Instance(ZMQStream)
139 monitor=Instance(zmqstream.ZMQStream)
311 monitor=Instance(ZMQStream)
140 heartmonitor=Instance(HeartMonitor)
312 heartmonitor=Instance(HeartMonitor)
141 notifier=Instance(zmqstream.ZMQStream)
313 notifier=Instance(ZMQStream)
142 db=Instance(object)
314 db=Instance(object)
143 client_addrs=Dict()
315 client_addrs=Dict()
144 engine_addrs=Dict()
316 engine_addrs=Dict()
@@ -163,21 +335,22 b' class Hub(Configurable):'
163
335
164 super(Hub, self).__init__(**kwargs)
336 super(Hub, self).__init__(**kwargs)
165 self.ids = set()
337 self.ids = set()
166 self.keytable={}
338 self.pending = set()
167 self.incoming_registrations={}
339 # self.keytable={}
168 self.engines = {}
340 # self.incoming_registrations={}
169 self.by_ident = {}
341 # self.engines = {}
170 self.clients = {}
342 # self.by_ident = {}
171 self.hearts = {}
343 # self.clients = {}
344 # self.hearts = {}
172 # self.mia = set()
345 # self.mia = set()
173 self.registration_timeout = max(5000, 2*self.heartmonitor.period)
346 self.registration_timeout = max(5000, 2*self.heartmonitor.period)
174 # this is the stuff that will move to DB:
347 # this is the stuff that will move to DB:
175 self.pending = set() # pending messages, keyed by msg_id
348 # self.pending = set() # pending messages, keyed by msg_id
176 self.queues = {} # pending msg_ids keyed by engine_id
349 # self.queues = {} # pending msg_ids keyed by engine_id
177 self.tasks = {} # pending msg_ids submitted as tasks, keyed by client_id
350 # self.tasks = {} # pending msg_ids submitted as tasks, keyed by client_id
178 self.completed = {} # completed msg_ids keyed by engine_id
351 # self.completed = {} # completed msg_ids keyed by engine_id
179 self.all_completed = set()
352 # self.all_completed = set()
180 self._idcounter = 0
353 # self._idcounter = 0
181 # self.sockets = {}
354 # self.sockets = {}
182 # self.loop = loop
355 # self.loop = loop
183 # self.session = session
356 # self.session = session
@@ -232,7 +405,7 b' class Hub(Configurable):'
232 'connection_request': self.connection_request,
405 'connection_request': self.connection_request,
233 }
406 }
234
407
235 logger.info("controller::created controller")
408 logging.info("hub::created hub")
236
409
237 @property
410 @property
238 def _next_id(self):
411 def _next_id(self):
@@ -281,7 +454,7 b' class Hub(Configurable):'
281 try:
454 try:
282 msg = self.session.unpack_message(msg[1:], content=True)
455 msg = self.session.unpack_message(msg[1:], content=True)
283 except:
456 except:
284 logger.error("client::Invalid Message %s"%msg, exc_info=True)
457 logging.error("client::Invalid Message %s"%msg, exc_info=True)
285 return False
458 return False
286
459
287 msg_type = msg.get('msg_type', None)
460 msg_type = msg.get('msg_type', None)
@@ -298,15 +471,15 b' class Hub(Configurable):'
298
471
299 def dispatch_register_request(self, msg):
472 def dispatch_register_request(self, msg):
300 """"""
473 """"""
301 logger.debug("registration::dispatch_register_request(%s)"%msg)
474 logging.debug("registration::dispatch_register_request(%s)"%msg)
302 idents,msg = self.session.feed_identities(msg)
475 idents,msg = self.session.feed_identities(msg)
303 if not idents:
476 if not idents:
304 logger.error("Bad Queue Message: %s"%msg, exc_info=True)
477 logging.error("Bad Queue Message: %s"%msg, exc_info=True)
305 return
478 return
306 try:
479 try:
307 msg = self.session.unpack_message(msg,content=True)
480 msg = self.session.unpack_message(msg,content=True)
308 except:
481 except:
309 logger.error("registration::got bad registration message: %s"%msg, exc_info=True)
482 logging.error("registration::got bad registration message: %s"%msg, exc_info=True)
310 return
483 return
311
484
312 msg_type = msg['msg_type']
485 msg_type = msg['msg_type']
@@ -314,53 +487,53 b' class Hub(Configurable):'
314
487
315 handler = self.registrar_handlers.get(msg_type, None)
488 handler = self.registrar_handlers.get(msg_type, None)
316 if handler is None:
489 if handler is None:
317 logger.error("registration::got bad registration message: %s"%msg)
490 logging.error("registration::got bad registration message: %s"%msg)
318 else:
491 else:
319 handler(idents, msg)
492 handler(idents, msg)
320
493
321 def dispatch_monitor_traffic(self, msg):
494 def dispatch_monitor_traffic(self, msg):
322 """all ME and Task queue messages come through here, as well as
495 """all ME and Task queue messages come through here, as well as
323 IOPub traffic."""
496 IOPub traffic."""
324 logger.debug("monitor traffic: %s"%msg[:2])
497 logging.debug("monitor traffic: %s"%msg[:2])
325 switch = msg[0]
498 switch = msg[0]
326 idents, msg = self.session.feed_identities(msg[1:])
499 idents, msg = self.session.feed_identities(msg[1:])
327 if not idents:
500 if not idents:
328 logger.error("Bad Monitor Message: %s"%msg)
501 logging.error("Bad Monitor Message: %s"%msg)
329 return
502 return
330 handler = self.monitor_handlers.get(switch, None)
503 handler = self.monitor_handlers.get(switch, None)
331 if handler is not None:
504 if handler is not None:
332 handler(idents, msg)
505 handler(idents, msg)
333 else:
506 else:
334 logger.error("Invalid monitor topic: %s"%switch)
507 logging.error("Invalid monitor topic: %s"%switch)
335
508
336
509
337 def dispatch_client_msg(self, msg):
510 def dispatch_client_msg(self, msg):
338 """Route messages from clients"""
511 """Route messages from clients"""
339 idents, msg = self.session.feed_identities(msg)
512 idents, msg = self.session.feed_identities(msg)
340 if not idents:
513 if not idents:
341 logger.error("Bad Client Message: %s"%msg)
514 logging.error("Bad Client Message: %s"%msg)
342 return
515 return
343 client_id = idents[0]
516 client_id = idents[0]
344 try:
517 try:
345 msg = self.session.unpack_message(msg, content=True)
518 msg = self.session.unpack_message(msg, content=True)
346 except:
519 except:
347 content = wrap_exception()
520 content = wrap_exception()
348 logger.error("Bad Client Message: %s"%msg, exc_info=True)
521 logging.error("Bad Client Message: %s"%msg, exc_info=True)
349 self.session.send(self.clientele, "controller_error", ident=client_id,
522 self.session.send(self.clientele, "hub_error", ident=client_id,
350 content=content)
523 content=content)
351 return
524 return
352
525
353 # print client_id, header, parent, content
526 # print client_id, header, parent, content
354 #switch on message type:
527 #switch on message type:
355 msg_type = msg['msg_type']
528 msg_type = msg['msg_type']
356 logger.info("client:: client %s requested %s"%(client_id, msg_type))
529 logging.info("client:: client %s requested %s"%(client_id, msg_type))
357 handler = self.client_handlers.get(msg_type, None)
530 handler = self.client_handlers.get(msg_type, None)
358 try:
531 try:
359 assert handler is not None, "Bad Message Type: %s"%msg_type
532 assert handler is not None, "Bad Message Type: %s"%msg_type
360 except:
533 except:
361 content = wrap_exception()
534 content = wrap_exception()
362 logger.error("Bad Message Type: %s"%msg_type, exc_info=True)
535 logging.error("Bad Message Type: %s"%msg_type, exc_info=True)
363 self.session.send(self.clientele, "controller_error", ident=client_id,
536 self.session.send(self.clientele, "hub_error", ident=client_id,
364 content=content)
537 content=content)
365 return
538 return
366 else:
539 else:
@@ -380,9 +553,9 b' class Hub(Configurable):'
380 """handler to attach to heartbeater.
553 """handler to attach to heartbeater.
381 Called when a new heart starts to beat.
554 Called when a new heart starts to beat.
382 Triggers completion of registration."""
555 Triggers completion of registration."""
383 logger.debug("heartbeat::handle_new_heart(%r)"%heart)
556 logging.debug("heartbeat::handle_new_heart(%r)"%heart)
384 if heart not in self.incoming_registrations:
557 if heart not in self.incoming_registrations:
385 logger.info("heartbeat::ignoring new heart: %r"%heart)
558 logging.info("heartbeat::ignoring new heart: %r"%heart)
386 else:
559 else:
387 self.finish_registration(heart)
560 self.finish_registration(heart)
388
561
@@ -391,11 +564,11 b' class Hub(Configurable):'
391 """handler to attach to heartbeater.
564 """handler to attach to heartbeater.
392 called when a previously registered heart fails to respond to beat request.
565 called when a previously registered heart fails to respond to beat request.
393 triggers unregistration"""
566 triggers unregistration"""
394 logger.debug("heartbeat::handle_heart_failure(%r)"%heart)
567 logging.debug("heartbeat::handle_heart_failure(%r)"%heart)
395 eid = self.hearts.get(heart, None)
568 eid = self.hearts.get(heart, None)
396 queue = self.engines[eid].queue
569 queue = self.engines[eid].queue
397 if eid is None:
570 if eid is None:
398 logger.info("heartbeat::ignoring heart failure %r"%heart)
571 logging.info("heartbeat::ignoring heart failure %r"%heart)
399 else:
572 else:
400 self.unregister_engine(heart, dict(content=dict(id=eid, queue=queue)))
573 self.unregister_engine(heart, dict(content=dict(id=eid, queue=queue)))
401
574
@@ -403,19 +576,19 b' class Hub(Configurable):'
403
576
404 def save_queue_request(self, idents, msg):
577 def save_queue_request(self, idents, msg):
405 if len(idents) < 2:
578 if len(idents) < 2:
406 logger.error("invalid identity prefix: %s"%idents)
579 logging.error("invalid identity prefix: %s"%idents)
407 return
580 return
408 queue_id, client_id = idents[:2]
581 queue_id, client_id = idents[:2]
409 try:
582 try:
410 msg = self.session.unpack_message(msg, content=False)
583 msg = self.session.unpack_message(msg, content=False)
411 except:
584 except:
412 logger.error("queue::client %r sent invalid message to %r: %s"%(client_id, queue_id, msg), exc_info=True)
585 logging.error("queue::client %r sent invalid message to %r: %s"%(client_id, queue_id, msg), exc_info=True)
413 return
586 return
414
587
415 eid = self.by_ident.get(queue_id, None)
588 eid = self.by_ident.get(queue_id, None)
416 if eid is None:
589 if eid is None:
417 logger.error("queue::target %r not registered"%queue_id)
590 logging.error("queue::target %r not registered"%queue_id)
418 logger.debug("queue:: valid are: %s"%(self.by_ident.keys()))
591 logging.debug("queue:: valid are: %s"%(self.by_ident.keys()))
419 return
592 return
420
593
421 header = msg['header']
594 header = msg['header']
@@ -432,21 +605,21 b' class Hub(Configurable):'
432
605
433 def save_queue_result(self, idents, msg):
606 def save_queue_result(self, idents, msg):
434 if len(idents) < 2:
607 if len(idents) < 2:
435 logger.error("invalid identity prefix: %s"%idents)
608 logging.error("invalid identity prefix: %s"%idents)
436 return
609 return
437
610
438 client_id, queue_id = idents[:2]
611 client_id, queue_id = idents[:2]
439 try:
612 try:
440 msg = self.session.unpack_message(msg, content=False)
613 msg = self.session.unpack_message(msg, content=False)
441 except:
614 except:
442 logger.error("queue::engine %r sent invalid message to %r: %s"%(
615 logging.error("queue::engine %r sent invalid message to %r: %s"%(
443 queue_id,client_id, msg), exc_info=True)
616 queue_id,client_id, msg), exc_info=True)
444 return
617 return
445
618
446 eid = self.by_ident.get(queue_id, None)
619 eid = self.by_ident.get(queue_id, None)
447 if eid is None:
620 if eid is None:
448 logger.error("queue::unknown engine %r is sending a reply: "%queue_id)
621 logging.error("queue::unknown engine %r is sending a reply: "%queue_id)
449 logger.debug("queue:: %s"%msg[2:])
622 logging.debug("queue:: %s"%msg[2:])
450 return
623 return
451
624
452 parent = msg['parent_header']
625 parent = msg['parent_header']
@@ -475,7 +648,7 b' class Hub(Configurable):'
475 result['result_buffers'] = msg['buffers']
648 result['result_buffers'] = msg['buffers']
476 self.db.update_record(msg_id, result)
649 self.db.update_record(msg_id, result)
477 else:
650 else:
478 logger.debug("queue:: unknown msg finished %s"%msg_id)
651 logging.debug("queue:: unknown msg finished %s"%msg_id)
479
652
480 #--------------------- Task Queue Traffic ------------------------------
653 #--------------------- Task Queue Traffic ------------------------------
481
654
@@ -486,7 +659,7 b' class Hub(Configurable):'
486 try:
659 try:
487 msg = self.session.unpack_message(msg, content=False)
660 msg = self.session.unpack_message(msg, content=False)
488 except:
661 except:
489 logger.error("task::client %r sent invalid task message: %s"%(
662 logging.error("task::client %r sent invalid task message: %s"%(
490 client_id, msg), exc_info=True)
663 client_id, msg), exc_info=True)
491 return
664 return
492 record = init_record(msg)
665 record = init_record(msg)
@@ -505,7 +678,7 b' class Hub(Configurable):'
505 try:
678 try:
506 msg = self.session.unpack_message(msg, content=False)
679 msg = self.session.unpack_message(msg, content=False)
507 except:
680 except:
508 logger.error("task::invalid task result message send to %r: %s"%(
681 logging.error("task::invalid task result message send to %r: %s"%(
509 client_id, msg), exc_info=True)
682 client_id, msg), exc_info=True)
510 raise
683 raise
511 return
684 return
@@ -513,7 +686,7 b' class Hub(Configurable):'
513 parent = msg['parent_header']
686 parent = msg['parent_header']
514 if not parent:
687 if not parent:
515 # print msg
688 # print msg
516 logger.warn("Task %r had no parent!"%msg)
689 logging.warn("Task %r had no parent!"%msg)
517 return
690 return
518 msg_id = parent['msg_id']
691 msg_id = parent['msg_id']
519
692
@@ -546,13 +719,13 b' class Hub(Configurable):'
546 self.db.update_record(msg_id, result)
719 self.db.update_record(msg_id, result)
547
720
548 else:
721 else:
549 logger.debug("task::unknown task %s finished"%msg_id)
722 logging.debug("task::unknown task %s finished"%msg_id)
550
723
551 def save_task_destination(self, idents, msg):
724 def save_task_destination(self, idents, msg):
552 try:
725 try:
553 msg = self.session.unpack_message(msg, content=True)
726 msg = self.session.unpack_message(msg, content=True)
554 except:
727 except:
555 logger.error("task::invalid task tracking message", exc_info=True)
728 logging.error("task::invalid task tracking message", exc_info=True)
556 return
729 return
557 content = msg['content']
730 content = msg['content']
558 print (content)
731 print (content)
@@ -560,11 +733,11 b' class Hub(Configurable):'
560 engine_uuid = content['engine_id']
733 engine_uuid = content['engine_id']
561 eid = self.by_ident[engine_uuid]
734 eid = self.by_ident[engine_uuid]
562
735
563 logger.info("task::task %s arrived on %s"%(msg_id, eid))
736 logging.info("task::task %s arrived on %s"%(msg_id, eid))
564 # if msg_id in self.mia:
737 # if msg_id in self.mia:
565 # self.mia.remove(msg_id)
738 # self.mia.remove(msg_id)
566 # else:
739 # else:
567 # logger.debug("task::task %s not listed as MIA?!"%(msg_id))
740 # logging.debug("task::task %s not listed as MIA?!"%(msg_id))
568
741
569 self.tasks[eid].append(msg_id)
742 self.tasks[eid].append(msg_id)
570 # self.pending[msg_id][1].update(received=datetime.now(),engine=(eid,engine_uuid))
743 # self.pending[msg_id][1].update(received=datetime.now(),engine=(eid,engine_uuid))
@@ -585,12 +758,12 b' class Hub(Configurable):'
585 try:
758 try:
586 msg = self.session.unpack_message(msg, content=True)
759 msg = self.session.unpack_message(msg, content=True)
587 except:
760 except:
588 logger.error("iopub::invalid IOPub message", exc_info=True)
761 logging.error("iopub::invalid IOPub message", exc_info=True)
589 return
762 return
590
763
591 parent = msg['parent_header']
764 parent = msg['parent_header']
592 if not parent:
765 if not parent:
593 logger.error("iopub::invalid IOPub message: %s"%msg)
766 logging.error("iopub::invalid IOPub message: %s"%msg)
594 return
767 return
595 msg_id = parent['msg_id']
768 msg_id = parent['msg_id']
596 msg_type = msg['msg_type']
769 msg_type = msg['msg_type']
@@ -600,7 +773,7 b' class Hub(Configurable):'
600 try:
773 try:
601 rec = self.db.get_record(msg_id)
774 rec = self.db.get_record(msg_id)
602 except:
775 except:
603 logger.error("iopub::IOPub message has invalid parent", exc_info=True)
776 logging.error("iopub::IOPub message has invalid parent", exc_info=True)
604 return
777 return
605 # stream
778 # stream
606 d = {}
779 d = {}
@@ -624,7 +797,7 b' class Hub(Configurable):'
624
797
625 def connection_request(self, client_id, msg):
798 def connection_request(self, client_id, msg):
626 """Reply with connection addresses for clients."""
799 """Reply with connection addresses for clients."""
627 logger.info("client::client %s connected"%client_id)
800 logging.info("client::client %s connected"%client_id)
628 content = dict(status='ok')
801 content = dict(status='ok')
629 content.update(self.client_addrs)
802 content.update(self.client_addrs)
630 jsonable = {}
803 jsonable = {}
@@ -639,14 +812,14 b' class Hub(Configurable):'
639 try:
812 try:
640 queue = content['queue']
813 queue = content['queue']
641 except KeyError:
814 except KeyError:
642 logger.error("registration::queue not specified", exc_info=True)
815 logging.error("registration::queue not specified", exc_info=True)
643 return
816 return
644 heart = content.get('heartbeat', None)
817 heart = content.get('heartbeat', None)
645 """register a new engine, and create the socket(s) necessary"""
818 """register a new engine, and create the socket(s) necessary"""
646 eid = self._next_id
819 eid = self._next_id
647 # print (eid, queue, reg, heart)
820 # print (eid, queue, reg, heart)
648
821
649 logger.debug("registration::register_engine(%i, %r, %r, %r)"%(eid, queue, reg, heart))
822 logging.debug("registration::register_engine(%i, %r, %r, %r)"%(eid, queue, reg, heart))
650
823
651 content = dict(id=eid,status='ok')
824 content = dict(id=eid,status='ok')
652 content.update(self.engine_addrs)
825 content.update(self.engine_addrs)
@@ -656,12 +829,12 b' class Hub(Configurable):'
656 raise KeyError("queue_id %r in use"%queue)
829 raise KeyError("queue_id %r in use"%queue)
657 except:
830 except:
658 content = wrap_exception()
831 content = wrap_exception()
659 logger.error("queue_id %r in use"%queue, exc_info=True)
832 logging.error("queue_id %r in use"%queue, exc_info=True)
660 elif heart in self.hearts: # need to check unique hearts?
833 elif heart in self.hearts: # need to check unique hearts?
661 try:
834 try:
662 raise KeyError("heart_id %r in use"%heart)
835 raise KeyError("heart_id %r in use"%heart)
663 except:
836 except:
664 logger.error("heart_id %r in use"%heart, exc_info=True)
837 logging.error("heart_id %r in use"%heart, exc_info=True)
665 content = wrap_exception()
838 content = wrap_exception()
666 else:
839 else:
667 for h, pack in self.incoming_registrations.iteritems():
840 for h, pack in self.incoming_registrations.iteritems():
@@ -669,14 +842,14 b' class Hub(Configurable):'
669 try:
842 try:
670 raise KeyError("heart_id %r in use"%heart)
843 raise KeyError("heart_id %r in use"%heart)
671 except:
844 except:
672 logger.error("heart_id %r in use"%heart, exc_info=True)
845 logging.error("heart_id %r in use"%heart, exc_info=True)
673 content = wrap_exception()
846 content = wrap_exception()
674 break
847 break
675 elif queue == pack[1]:
848 elif queue == pack[1]:
676 try:
849 try:
677 raise KeyError("queue_id %r in use"%queue)
850 raise KeyError("queue_id %r in use"%queue)
678 except:
851 except:
679 logger.error("queue_id %r in use"%queue, exc_info=True)
852 logging.error("queue_id %r in use"%queue, exc_info=True)
680 content = wrap_exception()
853 content = wrap_exception()
681 break
854 break
682
855
@@ -695,7 +868,7 b' class Hub(Configurable):'
695 dc.start()
868 dc.start()
696 self.incoming_registrations[heart] = (eid,queue,reg[0],dc)
869 self.incoming_registrations[heart] = (eid,queue,reg[0],dc)
697 else:
870 else:
698 logger.error("registration::registration %i failed: %s"%(eid, content['evalue']))
871 logging.error("registration::registration %i failed: %s"%(eid, content['evalue']))
699 return eid
872 return eid
700
873
701 def unregister_engine(self, ident, msg):
874 def unregister_engine(self, ident, msg):
@@ -703,9 +876,9 b' class Hub(Configurable):'
703 try:
876 try:
704 eid = msg['content']['id']
877 eid = msg['content']['id']
705 except:
878 except:
706 logger.error("registration::bad engine id for unregistration: %s"%ident, exc_info=True)
879 logging.error("registration::bad engine id for unregistration: %s"%ident, exc_info=True)
707 return
880 return
708 logger.info("registration::unregister_engine(%s)"%eid)
881 logging.info("registration::unregister_engine(%s)"%eid)
709 content=dict(id=eid, queue=self.engines[eid].queue)
882 content=dict(id=eid, queue=self.engines[eid].queue)
710 self.ids.remove(eid)
883 self.ids.remove(eid)
711 self.keytable.pop(eid)
884 self.keytable.pop(eid)
@@ -726,9 +899,9 b' class Hub(Configurable):'
726 try:
899 try:
727 (eid,queue,reg,purge) = self.incoming_registrations.pop(heart)
900 (eid,queue,reg,purge) = self.incoming_registrations.pop(heart)
728 except KeyError:
901 except KeyError:
729 logger.error("registration::tried to finish nonexistant registration", exc_info=True)
902 logging.error("registration::tried to finish nonexistant registration", exc_info=True)
730 return
903 return
731 logger.info("registration::finished registering engine %i:%r"%(eid,queue))
904 logging.info("registration::finished registering engine %i:%r"%(eid,queue))
732 if purge is not None:
905 if purge is not None:
733 purge.stop()
906 purge.stop()
734 control = queue
907 control = queue
@@ -748,7 +921,7 b' class Hub(Configurable):'
748 def _purge_stalled_registration(self, heart):
921 def _purge_stalled_registration(self, heart):
749 if heart in self.incoming_registrations:
922 if heart in self.incoming_registrations:
750 eid = self.incoming_registrations.pop(heart)[0]
923 eid = self.incoming_registrations.pop(heart)[0]
751 logger.info("registration::purging stalled registration: %i"%eid)
924 logging.info("registration::purging stalled registration: %i"%eid)
752 else:
925 else:
753 pass
926 pass
754
927
@@ -769,7 +942,7 b' class Hub(Configurable):'
769 dc.start()
942 dc.start()
770
943
771 def _shutdown(self):
944 def _shutdown(self):
772 logger.info("controller::controller shutting down.")
945 logging.info("hub::hub shutting down.")
773 time.sleep(0.1)
946 time.sleep(0.1)
774 sys.exit(0)
947 sys.exit(0)
775
948
@@ -781,7 +954,7 b' class Hub(Configurable):'
781 targets = self._validate_targets(targets)
954 targets = self._validate_targets(targets)
782 except:
955 except:
783 content = wrap_exception()
956 content = wrap_exception()
784 self.session.send(self.clientele, "controller_error",
957 self.session.send(self.clientele, "hub_error",
785 content=content, ident=client_id)
958 content=content, ident=client_id)
786 return
959 return
787
960
@@ -805,7 +978,7 b' class Hub(Configurable):'
805 targets = self._validate_targets(targets)
978 targets = self._validate_targets(targets)
806 except:
979 except:
807 content = wrap_exception()
980 content = wrap_exception()
808 self.session.send(self.clientele, "controller_error",
981 self.session.send(self.clientele, "hub_error",
809 content=content, ident=client_id)
982 content=content, ident=client_id)
810 return
983 return
811 verbose = content.get('verbose', False)
984 verbose = content.get('verbose', False)
@@ -4,8 +4,7 b' import sys,os'
4 import time
4 import time
5 from subprocess import Popen, PIPE
5 from subprocess import Popen, PIPE
6
6
7 from entry_point import parse_url
7 from IPython.external.argparse import ArgumentParser, SUPPRESS
8 from controller import make_argument_parser
9
8
10 def _filter_arg(flag, args):
9 def _filter_arg(flag, args):
11 filtered = []
10 filtered = []
@@ -48,7 +47,7 b' def strip_args(flags, args=sys.argv[1:]):'
48
47
49 def launch_process(mod, args):
48 def launch_process(mod, args):
50 """Launch a controller or engine in a subprocess."""
49 """Launch a controller or engine in a subprocess."""
51 code = "from IPython.zmq.parallel.%s import main;main()"%mod
50 code = "from IPython.zmq.parallel.%s import launch_new_instance;launch_new_instance()"%mod
52 arguments = [ sys.executable, '-c', code ] + args
51 arguments = [ sys.executable, '-c', code ] + args
53 blackholew = file(os.devnull, 'w')
52 blackholew = file(os.devnull, 'w')
54 blackholer = file(os.devnull, 'r')
53 blackholer = file(os.devnull, 'r')
@@ -57,17 +56,13 b' def launch_process(mod, args):'
57 return proc
56 return proc
58
57
59 def main():
58 def main():
60 parser = make_argument_parser()
59 parser = ArgumentParser(argument_default=SUPPRESS)
61 parser.add_argument('--n', '-n', type=int, default=1,
60 parser.add_argument('--n', '-n', type=int, default=1,
62 help="The number of engines to start.")
61 help="The number of engines to start.")
63 args = parser.parse_args()
62 ns,args = parser.parse_known_args()
64 parse_url(args)
63 n = ns.n
65
66 controller_args = strip_args([('--n','-n')])
67 engine_args = filter_args(['--url', '--regport', '--logport', '--ip',
68 '--transport','--loglevel','--packer', '--execkey'])+['--ident']
69
64
70 controller = launch_process('controller', controller_args)
65 controller = launch_process('ipcontrollerapp', args)
71 for i in range(10):
66 for i in range(10):
72 time.sleep(.1)
67 time.sleep(.1)
73 if controller.poll() is not None:
68 if controller.poll() is not None:
@@ -75,9 +70,9 b' def main():'
75 print (controller.stderr.read())
70 print (controller.stderr.read())
76 sys.exit(255)
71 sys.exit(255)
77
72
78 print("Launched Controller at %s"%args.url)
73 print("Launched Controller")
79 engines = [ launch_process('engine', engine_args+['engine-%i'%i]) for i in range(args.n) ]
74 engines = [ launch_process('ipengineapp', args+['--ident', 'engine-%i'%i]) for i in range(n) ]
80 print("%i Engines started"%args.n)
75 print("%i Engines started"%n)
81
76
82 def wait_quietly(p):
77 def wait_quietly(p):
83 try:
78 try:
@@ -10,10 +10,11 b' from datetime import datetime'
10
10
11 from pymongo import Connection
11 from pymongo import Connection
12
12
13 from dictdb import BaseDB
13 #----------------------
14 #----------------------
14 # MongoDB class
15 # MongoDB class
15 #----------------------
16 #----------------------
16 class MongoDB(object):
17 class MongoDB(BaseDB):
17 """MongoDB TaskRecord backend."""
18 """MongoDB TaskRecord backend."""
18 def __init__(self, session_uuid, *args, **kwargs):
19 def __init__(self, session_uuid, *args, **kwargs):
19 self._connection = Connection(*args, **kwargs)
20 self._connection = Connection(*args, **kwargs)
@@ -25,7 +25,7 b' from zmq.eventloop import ioloop, zmqstream'
25 # local imports
25 # local imports
26 from IPython.external.decorator import decorator
26 from IPython.external.decorator import decorator
27 from IPython.config.configurable import Configurable
27 from IPython.config.configurable import Configurable
28 from IPython.utils.traitlets import Instance
28 from IPython.utils.traitlets import Instance, Dict, List, Set
29
29
30 from client import Client
30 from client import Client
31 from dependency import Dependency
31 from dependency import Dependency
@@ -33,12 +33,10 b' import streamsession as ss'
33 from entry_point import connect_logger, local_logger
33 from entry_point import connect_logger, local_logger
34
34
35
35
36 logger = logging.getLogger()
37
38 @decorator
36 @decorator
39 def logged(f,self,*args,**kwargs):
37 def logged(f,self,*args,**kwargs):
40 # print ("#--------------------")
38 # print ("#--------------------")
41 logger.debug("scheduler::%s(*%s,**%s)"%(f.func_name, args, kwargs))
39 logging.debug("scheduler::%s(*%s,**%s)"%(f.func_name, args, kwargs))
42 # print ("#--")
40 # print ("#--")
43 return f(self,*args, **kwargs)
41 return f(self,*args, **kwargs)
44
42
@@ -115,7 +113,7 b' class TaskScheduler(Configurable):'
115
113
116 """
114 """
117
115
118 # configurables:
116 # input arguments:
119 scheme = Instance(FunctionType, default=leastload) # function for determining the destination
117 scheme = Instance(FunctionType, default=leastload) # function for determining the destination
120 client_stream = Instance(zmqstream.ZMQStream) # client-facing stream
118 client_stream = Instance(zmqstream.ZMQStream) # client-facing stream
121 engine_stream = Instance(zmqstream.ZMQStream) # engine-facing stream
119 engine_stream = Instance(zmqstream.ZMQStream) # engine-facing stream
@@ -124,30 +122,22 b' class TaskScheduler(Configurable):'
124 io_loop = Instance(ioloop.IOLoop)
122 io_loop = Instance(ioloop.IOLoop)
125
123
126 # internals:
124 # internals:
127 dependencies = None # dict by msg_id of [ msg_ids that depend on key ]
125 dependencies = Dict() # dict by msg_id of [ msg_ids that depend on key ]
128 depending = None # dict by msg_id of (msg_id, raw_msg, after, follow)
126 depending = Dict() # dict by msg_id of (msg_id, raw_msg, after, follow)
129 pending = None # dict by engine_uuid of submitted tasks
127 pending = Dict() # dict by engine_uuid of submitted tasks
130 completed = None # dict by engine_uuid of completed tasks
128 completed = Dict() # dict by engine_uuid of completed tasks
131 clients = None # dict by msg_id for who submitted the task
129 clients = Dict() # dict by msg_id for who submitted the task
132 targets = None # list of target IDENTs
130 targets = List() # list of target IDENTs
133 loads = None # list of engine loads
131 loads = List() # list of engine loads
134 all_done = None # set of all completed tasks
132 all_done = Set() # set of all completed tasks
135 blacklist = None # dict by msg_id of locations where a job has encountered UnmetDependency
133 blacklist = Dict() # dict by msg_id of locations where a job has encountered UnmetDependency
134 session = Instance(ss.StreamSession)
136
135
137
136
138 def __init__(self, **kwargs):
137 def __init__(self, **kwargs):
139 super(TaskScheduler, self).__init__(**kwargs)
138 super(TaskScheduler, self).__init__(**kwargs)
140
139
141 self.session = ss.StreamSession(username="TaskScheduler")
140 self.session = ss.StreamSession(username="TaskScheduler")
142 self.dependencies = {}
143 self.depending = {}
144 self.completed = {}
145 self.pending = {}
146 self.all_done = set()
147 self.blacklist = {}
148
149 self.targets = []
150 self.loads = []
151
141
152 self.engine_stream.on_recv(self.dispatch_result, copy=False)
142 self.engine_stream.on_recv(self.dispatch_result, copy=False)
153 self._notification_handlers = dict(
143 self._notification_handlers = dict(
@@ -155,7 +145,7 b' class TaskScheduler(Configurable):'
155 unregistration_notification = self._unregister_engine
145 unregistration_notification = self._unregister_engine
156 )
146 )
157 self.notifier_stream.on_recv(self.dispatch_notification)
147 self.notifier_stream.on_recv(self.dispatch_notification)
158 logger.info("Scheduler started...%r"%self)
148 logging.info("Scheduler started...%r"%self)
159
149
160 def resume_receiving(self):
150 def resume_receiving(self):
161 """Resume accepting jobs."""
151 """Resume accepting jobs."""
@@ -182,7 +172,7 b' class TaskScheduler(Configurable):'
182 try:
172 try:
183 handler(str(msg['content']['queue']))
173 handler(str(msg['content']['queue']))
184 except KeyError:
174 except KeyError:
185 logger.error("task::Invalid notification msg: %s"%msg)
175 logging.error("task::Invalid notification msg: %s"%msg)
186
176
187 @logged
177 @logged
188 def _register_engine(self, uid):
178 def _register_engine(self, uid):
@@ -232,7 +222,7 b' class TaskScheduler(Configurable):'
232 try:
222 try:
233 idents, msg = self.session.feed_identities(raw_msg, copy=False)
223 idents, msg = self.session.feed_identities(raw_msg, copy=False)
234 except Exception as e:
224 except Exception as e:
235 logger.error("task::Invaid msg: %s"%msg)
225 logging.error("task::Invaid msg: %s"%msg)
236 return
226 return
237
227
238 # send to monitor
228 # send to monitor
@@ -318,7 +308,7 b' class TaskScheduler(Configurable):'
318 try:
308 try:
319 idents,msg = self.session.feed_identities(raw_msg, copy=False)
309 idents,msg = self.session.feed_identities(raw_msg, copy=False)
320 except Exception as e:
310 except Exception as e:
321 logger.error("task::Invaid result: %s"%msg)
311 logging.error("task::Invaid result: %s"%msg)
322 return
312 return
323 msg = self.session.unpack_message(msg, content=False, copy=False)
313 msg = self.session.unpack_message(msg, content=False, copy=False)
324 header = msg['header']
314 header = msg['header']
@@ -404,8 +394,6 b' def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, log_addr=None, logle'
404 ctx = zmq.Context()
394 ctx = zmq.Context()
405 loop = ioloop.IOLoop()
395 loop = ioloop.IOLoop()
406
396
407 scheme = globals().get(scheme)
408
409 ins = ZMQStream(ctx.socket(zmq.XREP),loop)
397 ins = ZMQStream(ctx.socket(zmq.XREP),loop)
410 ins.bind(in_addr)
398 ins.bind(in_addr)
411 outs = ZMQStream(ctx.socket(zmq.XREP),loop)
399 outs = ZMQStream(ctx.socket(zmq.XREP),loop)
@@ -416,6 +404,7 b' def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, log_addr=None, logle'
416 nots.setsockopt(zmq.SUBSCRIBE, '')
404 nots.setsockopt(zmq.SUBSCRIBE, '')
417 nots.connect(not_addr)
405 nots.connect(not_addr)
418
406
407 scheme = globals().get(scheme, None)
419 # setup logging
408 # setup logging
420 if log_addr:
409 if log_addr:
421 connect_logger(ctx, log_addr, root="scheduler", loglevel=loglevel)
410 connect_logger(ctx, log_addr, root="scheduler", loglevel=loglevel)
@@ -426,7 +415,10 b' def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, log_addr=None, logle'
426 mon_stream=mons,notifier_stream=nots,
415 mon_stream=mons,notifier_stream=nots,
427 scheme=scheme,io_loop=loop)
416 scheme=scheme,io_loop=loop)
428
417
429 loop.start()
418 try:
419 loop.start()
420 except KeyboardInterrupt:
421 print ("interrupted, exiting...", file=sys.__stderr__)
430
422
431
423
432 if __name__ == '__main__':
424 if __name__ == '__main__':
@@ -26,69 +26,97 b' from zmq.eventloop import ioloop, zmqstream'
26
26
27 # Local imports.
27 # Local imports.
28 from IPython.core import ultratb
28 from IPython.core import ultratb
29 from IPython.utils.traitlets import HasTraits, Instance, List, Int
29 from IPython.utils.traitlets import HasTraits, Instance, List, Int, Dict, Set, Str
30 from IPython.zmq.completer import KernelCompleter
30 from IPython.zmq.completer import KernelCompleter
31 from IPython.zmq.iostream import OutStream
31 from IPython.zmq.iostream import OutStream
32 from IPython.zmq.displayhook import DisplayHook
32 from IPython.zmq.displayhook import DisplayHook
33
33
34
34 from factory import SessionFactory
35 from streamsession import StreamSession, Message, extract_header, serialize_object,\
35 from streamsession import StreamSession, Message, extract_header, serialize_object,\
36 unpack_apply_message, ISO8601, wrap_exception
36 unpack_apply_message, ISO8601, wrap_exception
37 from dependency import UnmetDependency
37 from dependency import UnmetDependency
38 import heartmonitor
38 import heartmonitor
39 from client import Client
39 from client import Client
40
40
41 logger = logging.getLogger()
42
43 def printer(*args):
41 def printer(*args):
44 pprint(args, stream=sys.__stdout__)
42 pprint(args, stream=sys.__stdout__)
45
43
44
45 class _Passer:
46 """Empty class that implements `send()` that does nothing."""
47 def send(self, *args, **kwargs):
48 pass
49 send_multipart = send
50
51
46 #-----------------------------------------------------------------------------
52 #-----------------------------------------------------------------------------
47 # Main kernel class
53 # Main kernel class
48 #-----------------------------------------------------------------------------
54 #-----------------------------------------------------------------------------
49
55
50 class Kernel(HasTraits):
56 class Kernel(SessionFactory):
51
57
52 #---------------------------------------------------------------------------
58 #---------------------------------------------------------------------------
53 # Kernel interface
59 # Kernel interface
54 #---------------------------------------------------------------------------
60 #---------------------------------------------------------------------------
55
61
56 id = Int(-1)
62 # kwargs:
57 session = Instance(StreamSession)
63 int_id = Int(-1, config=True)
58 shell_streams = List()
64 user_ns = Dict(config=True)
65 exec_lines = List(config=True)
66
59 control_stream = Instance(zmqstream.ZMQStream)
67 control_stream = Instance(zmqstream.ZMQStream)
60 task_stream = Instance(zmqstream.ZMQStream)
68 task_stream = Instance(zmqstream.ZMQStream)
61 iopub_stream = Instance(zmqstream.ZMQStream)
69 iopub_stream = Instance(zmqstream.ZMQStream)
62 client = Instance(Client)
70 client = Instance('IPython.zmq.parallel.client.Client')
63 loop = Instance(ioloop.IOLoop)
71
72 # internals
73 shell_streams = List()
74 compiler = Instance(CommandCompiler, (), {})
75 completer = Instance(KernelCompleter)
76
77 aborted = Set()
78 shell_handlers = Dict()
79 control_handlers = Dict()
80
81 def _set_prefix(self):
82 self.prefix = "engine.%s"%self.int_id
83
84 def _connect_completer(self):
85 self.completer = KernelCompleter(self.user_ns)
64
86
65 def __init__(self, **kwargs):
87 def __init__(self, **kwargs):
66 super(Kernel, self).__init__(**kwargs)
88 super(Kernel, self).__init__(**kwargs)
67 self.identity = self.shell_streams[0].getsockopt(zmq.IDENTITY)
89 self._set_prefix()
68 self.prefix = 'engine.%s'%self.id
90 self._connect_completer()
69 logger.root_topic = self.prefix
91
70 self.user_ns = {}
92 self.on_trait_change(self._set_prefix, 'id')
71 self.history = []
93 self.on_trait_change(self._connect_completer, 'user_ns')
72 self.compiler = CommandCompiler()
73 self.completer = KernelCompleter(self.user_ns)
74 self.aborted = set()
75
94
76 # Build dict of handlers for message types
95 # Build dict of handlers for message types
77 self.shell_handlers = {}
78 self.control_handlers = {}
79 for msg_type in ['execute_request', 'complete_request', 'apply_request',
96 for msg_type in ['execute_request', 'complete_request', 'apply_request',
80 'clear_request']:
97 'clear_request']:
81 self.shell_handlers[msg_type] = getattr(self, msg_type)
98 self.shell_handlers[msg_type] = getattr(self, msg_type)
82
99
83 for msg_type in ['shutdown_request', 'abort_request']+self.shell_handlers.keys():
100 for msg_type in ['shutdown_request', 'abort_request']+self.shell_handlers.keys():
84 self.control_handlers[msg_type] = getattr(self, msg_type)
101 self.control_handlers[msg_type] = getattr(self, msg_type)
85
102
103 self._initial_exec_lines()
86
104
87 def _wrap_exception(self, method=None):
105 def _wrap_exception(self, method=None):
88 e_info = dict(engineid=self.identity, method=method)
106 e_info = dict(engineid=self.ident, method=method)
89 content=wrap_exception(e_info)
107 content=wrap_exception(e_info)
90 return content
108 return content
91
109
110 def _initial_exec_lines(self):
111 s = _Passer()
112 content = dict(silent=True, user_variable=[],user_expressions=[])
113 for line in self.exec_lines:
114 logging.debug("executing initialization: %s"%line)
115 content.update({'code':line})
116 msg = self.session.msg('execute_request', content)
117 self.execute_request(s, [], msg)
118
119
92 #-------------------- control handlers -----------------------------
120 #-------------------- control handlers -----------------------------
93 def abort_queues(self):
121 def abort_queues(self):
94 for stream in self.shell_streams:
122 for stream in self.shell_streams:
@@ -112,8 +140,8 b' class Kernel(HasTraits):'
112
140
113 # assert self.reply_socketly_socket.rcvmore(), "Unexpected missing message part."
141 # assert self.reply_socketly_socket.rcvmore(), "Unexpected missing message part."
114 # msg = self.reply_socket.recv_json()
142 # msg = self.reply_socket.recv_json()
115 logger.info("Aborting:")
143 logging.info("Aborting:")
116 logger.info(str(msg))
144 logging.info(str(msg))
117 msg_type = msg['msg_type']
145 msg_type = msg['msg_type']
118 reply_type = msg_type.split('_')[0] + '_reply'
146 reply_type = msg_type.split('_')[0] + '_reply'
119 # reply_msg = self.session.msg(reply_type, {'status' : 'aborted'}, msg)
147 # reply_msg = self.session.msg(reply_type, {'status' : 'aborted'}, msg)
@@ -121,7 +149,7 b' class Kernel(HasTraits):'
121 # self.reply_socket.send_json(reply_msg)
149 # self.reply_socket.send_json(reply_msg)
122 reply_msg = self.session.send(stream, reply_type,
150 reply_msg = self.session.send(stream, reply_type,
123 content={'status' : 'aborted'}, parent=msg, ident=idents)[0]
151 content={'status' : 'aborted'}, parent=msg, ident=idents)[0]
124 logger.debug(str(reply_msg))
152 logging.debug(str(reply_msg))
125 # We need to wait a bit for requests to come in. This can probably
153 # We need to wait a bit for requests to come in. This can probably
126 # be set shorter for true asynchronous clients.
154 # be set shorter for true asynchronous clients.
127 time.sleep(0.05)
155 time.sleep(0.05)
@@ -139,7 +167,7 b' class Kernel(HasTraits):'
139 content = dict(status='ok')
167 content = dict(status='ok')
140 reply_msg = self.session.send(stream, 'abort_reply', content=content,
168 reply_msg = self.session.send(stream, 'abort_reply', content=content,
141 parent=parent, ident=ident)[0]
169 parent=parent, ident=ident)[0]
142 logger(Message(reply_msg), file=sys.__stdout__)
170 logging.debug(str(reply_msg))
143
171
144 def shutdown_request(self, stream, ident, parent):
172 def shutdown_request(self, stream, ident, parent):
145 """kill ourself. This should really be handled in an external process"""
173 """kill ourself. This should really be handled in an external process"""
@@ -164,7 +192,7 b' class Kernel(HasTraits):'
164 try:
192 try:
165 msg = self.session.unpack_message(msg, content=True, copy=False)
193 msg = self.session.unpack_message(msg, content=True, copy=False)
166 except:
194 except:
167 logger.error("Invalid Message", exc_info=True)
195 logging.error("Invalid Message", exc_info=True)
168 return
196 return
169
197
170 header = msg['header']
198 header = msg['header']
@@ -172,7 +200,7 b' class Kernel(HasTraits):'
172
200
173 handler = self.control_handlers.get(msg['msg_type'], None)
201 handler = self.control_handlers.get(msg['msg_type'], None)
174 if handler is None:
202 if handler is None:
175 logger.error("UNKNOWN CONTROL MESSAGE TYPE: %r"%msg['msg_type'])
203 logging.error("UNKNOWN CONTROL MESSAGE TYPE: %r"%msg['msg_type'])
176 else:
204 else:
177 handler(self.control_stream, idents, msg)
205 handler(self.control_stream, idents, msg)
178
206
@@ -210,15 +238,15 b' class Kernel(HasTraits):'
210 self.user_ns = {}
238 self.user_ns = {}
211 msg = self.session.send(stream, 'clear_reply', ident=idents, parent=parent,
239 msg = self.session.send(stream, 'clear_reply', ident=idents, parent=parent,
212 content = dict(status='ok'))
240 content = dict(status='ok'))
241 self._initial_exec_lines()
213
242
214 def execute_request(self, stream, ident, parent):
243 def execute_request(self, stream, ident, parent):
244 logging.debug('execute request %s'%parent)
215 try:
245 try:
216 code = parent[u'content'][u'code']
246 code = parent[u'content'][u'code']
217 except:
247 except:
218 logger.error("Got bad msg: %s"%parent, exc_info=True)
248 logging.error("Got bad msg: %s"%parent, exc_info=True)
219 return
249 return
220 # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
221 # self.iopub_stream.send(pyin_msg)
222 self.session.send(self.iopub_stream, u'pyin', {u'code':code},parent=parent,
250 self.session.send(self.iopub_stream, u'pyin', {u'code':code},parent=parent,
223 ident='%s.pyin'%self.prefix)
251 ident='%s.pyin'%self.prefix)
224 started = datetime.now().strftime(ISO8601)
252 started = datetime.now().strftime(ISO8601)
@@ -243,7 +271,7 b' class Kernel(HasTraits):'
243 # self.reply_socket.send_json(reply_msg)
271 # self.reply_socket.send_json(reply_msg)
244 reply_msg = self.session.send(stream, u'execute_reply', reply_content, parent=parent,
272 reply_msg = self.session.send(stream, u'execute_reply', reply_content, parent=parent,
245 ident=ident, subheader = dict(started=started))
273 ident=ident, subheader = dict(started=started))
246 logger.debug(str(reply_msg))
274 logging.debug(str(reply_msg))
247 if reply_msg['content']['status'] == u'error':
275 if reply_msg['content']['status'] == u'error':
248 self.abort_queues()
276 self.abort_queues()
249
277
@@ -265,12 +293,12 b' class Kernel(HasTraits):'
265 msg_id = parent['header']['msg_id']
293 msg_id = parent['header']['msg_id']
266 bound = content.get('bound', False)
294 bound = content.get('bound', False)
267 except:
295 except:
268 logger.error("Got bad msg: %s"%parent, exc_info=True)
296 logging.error("Got bad msg: %s"%parent, exc_info=True)
269 return
297 return
270 # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
298 # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
271 # self.iopub_stream.send(pyin_msg)
299 # self.iopub_stream.send(pyin_msg)
272 # self.session.send(self.iopub_stream, u'pyin', {u'code':code},parent=parent)
300 # self.session.send(self.iopub_stream, u'pyin', {u'code':code},parent=parent)
273 sub = {'dependencies_met' : True, 'engine' : self.identity,
301 sub = {'dependencies_met' : True, 'engine' : self.ident,
274 'started': datetime.now().strftime(ISO8601)}
302 'started': datetime.now().strftime(ISO8601)}
275 try:
303 try:
276 # allow for not overriding displayhook
304 # allow for not overriding displayhook
@@ -341,7 +369,7 b' class Kernel(HasTraits):'
341 try:
369 try:
342 msg = self.session.unpack_message(msg, content=True, copy=False)
370 msg = self.session.unpack_message(msg, content=True, copy=False)
343 except:
371 except:
344 logger.error("Invalid Message", exc_info=True)
372 logging.error("Invalid Message", exc_info=True)
345 return
373 return
346
374
347
375
@@ -356,7 +384,7 b' class Kernel(HasTraits):'
356 return
384 return
357 handler = self.shell_handlers.get(msg['msg_type'], None)
385 handler = self.shell_handlers.get(msg['msg_type'], None)
358 if handler is None:
386 if handler is None:
359 logger.error("UNKNOWN MESSAGE TYPE: %r"%msg['msg_type'])
387 logging.error("UNKNOWN MESSAGE TYPE: %r"%msg['msg_type'])
360 else:
388 else:
361 handler(stream, idents, msg)
389 handler(stream, idents, msg)
362
390
@@ -372,8 +400,9 b' class Kernel(HasTraits):'
372 return dispatcher
400 return dispatcher
373
401
374 for s in self.shell_streams:
402 for s in self.shell_streams:
403 # s.on_recv(printer)
375 s.on_recv(make_dispatcher(s), copy=False)
404 s.on_recv(make_dispatcher(s), copy=False)
376 s.on_err(printer)
405 # s.on_err(printer)
377
406
378 if self.iopub_stream:
407 if self.iopub_stream:
379 self.iopub_stream.on_err(printer)
408 self.iopub_stream.on_err(printer)
@@ -403,7 +432,7 b' class Kernel(HasTraits):'
403 def make_kernel(int_id, identity, control_addr, shell_addrs, iopub_addr, hb_addrs,
432 def make_kernel(int_id, identity, control_addr, shell_addrs, iopub_addr, hb_addrs,
404 client_addr=None, loop=None, context=None, key=None,
433 client_addr=None, loop=None, context=None, key=None,
405 out_stream_factory=OutStream, display_hook_factory=DisplayHook):
434 out_stream_factory=OutStream, display_hook_factory=DisplayHook):
406
435 """NO LONGER IN USE"""
407 # create loop, context, and session:
436 # create loop, context, and session:
408 if loop is None:
437 if loop is None:
409 loop = ioloop.IOLoop.instance()
438 loop = ioloop.IOLoop.instance()
@@ -453,7 +482,7 b' def make_kernel(int_id, identity, control_addr, shell_addrs, iopub_addr, hb_addr'
453 else:
482 else:
454 client = None
483 client = None
455
484
456 kernel = Kernel(id=int_id, session=session, control_stream=control_stream,
485 kernel = Kernel(id=int_id, session=session, control_stream=control_stream,
457 shell_streams=shell_streams, iopub_stream=iopub_stream,
486 shell_streams=shell_streams, iopub_stream=iopub_stream,
458 client=client, loop=loop)
487 client=client, loop=loop)
459 kernel.start()
488 kernel.start()
@@ -51,12 +51,18 b' def squash_unicode(obj):'
51 obj = obj.encode('utf8')
51 obj = obj.encode('utf8')
52 return obj
52 return obj
53
53
54 json_packer = jsonapi.dumps
55 json_unpacker = lambda s: squash_unicode(jsonapi.loads(s))
56
57 pickle_packer = lambda o: pickle.dumps(o,-1)
58 pickle_unpacker = pickle.loads
59
54 if use_json:
60 if use_json:
55 default_packer = jsonapi.dumps
61 default_packer = json_packer
56 default_unpacker = lambda s: squash_unicode(jsonapi.loads(s))
62 default_unpacker = json_unpacker
57 else:
63 else:
58 default_packer = lambda o: pickle.dumps(o,-1)
64 default_packer = pickle_packer
59 default_unpacker = pickle.loads
65 default_unpacker = pickle_unpacker
60
66
61
67
62 DELIM="<IDS|MSG>"
68 DELIM="<IDS|MSG>"
@@ -215,8 +215,9 b" if 'setuptools' in sys.modules:"
215 'ipython = IPython.frontend.terminal.ipapp:launch_new_instance',
215 'ipython = IPython.frontend.terminal.ipapp:launch_new_instance',
216 'ipython-qtconsole = IPython.frontend.qt.console.ipythonqt:main',
216 'ipython-qtconsole = IPython.frontend.qt.console.ipythonqt:main',
217 'pycolor = IPython.utils.PyColorize:main',
217 'pycolor = IPython.utils.PyColorize:main',
218 'ipcontrollerz = IPython.zmq.parallel.controller:main',
218 'ipcontrollerz = IPython.zmq.parallel.ipcontrollerapp:launch_new_instance',
219 'ipenginez = IPython.zmq.parallel.engine:main',
219 'ipenginez = IPython.zmq.parallel.ipengineapp:launch_new_instance',
220 'iploggerz = IPython.zmq.parallel.iploggerapp:launch_new_instance',
220 'ipclusterz = IPython.zmq.parallel.ipcluster:main',
221 'ipclusterz = IPython.zmq.parallel.ipcluster:main',
221 'iptest = IPython.testing.iptest:main',
222 'iptest = IPython.testing.iptest:main',
222 'irunner = IPython.lib.irunner:main'
223 'irunner = IPython.lib.irunner:main'
General Comments 0
You need to be logged in to leave comments. Login now