##// END OF EJS Templates
Refactor newparallel to use Config system...
MinRK -
Show More
This diff has been collapsed as it changes many lines, (549 lines changed) Show them Hide them
@@ -0,0 +1,549 b''
1 #!/usr/bin/env python
2 # encoding: utf-8
3 """
4 The IPython cluster directory
5 """
6
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2008-2009 The IPython Development Team
9 #
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
13
14 #-----------------------------------------------------------------------------
15 # Imports
16 #-----------------------------------------------------------------------------
17
18 from __future__ import with_statement
19
20 import os
21 import shutil
22 import sys
23 import logging
24 import warnings
25
26 from IPython.config.loader import PyFileConfigLoader
27 from IPython.core.application import Application, BaseAppConfigLoader
28 from IPython.config.configurable import Configurable
29 from IPython.core.crashhandler import CrashHandler
30 from IPython.core import release
31 from IPython.utils.path import (
32 get_ipython_package_dir,
33 expand_path
34 )
35 from IPython.utils.traitlets import Unicode
36
37 #-----------------------------------------------------------------------------
38 # Warnings control
39 #-----------------------------------------------------------------------------
40 # Twisted generates annoying warnings with Python 2.6, as will do other code
41 # that imports 'sets' as of today
42 warnings.filterwarnings('ignore', 'the sets module is deprecated',
43 DeprecationWarning )
44
45 # This one also comes from Twisted
46 warnings.filterwarnings('ignore', 'the sha module is deprecated',
47 DeprecationWarning)
48
49 #-----------------------------------------------------------------------------
50 # Module errors
51 #-----------------------------------------------------------------------------
52
53 class ClusterDirError(Exception):
54 pass
55
56
57 class PIDFileError(Exception):
58 pass
59
60
61 #-----------------------------------------------------------------------------
62 # Class for managing cluster directories
63 #-----------------------------------------------------------------------------
64
65 class ClusterDir(Configurable):
66 """An object to manage the cluster directory and its resources.
67
68 The cluster directory is used by :command:`ipengine`,
69 :command:`ipcontroller` and :command:`ipclsuter` to manage the
70 configuration, logging and security of these applications.
71
72 This object knows how to find, create and manage these directories. This
73 should be used by any code that want's to handle cluster directories.
74 """
75
76 security_dir_name = Unicode('security')
77 log_dir_name = Unicode('log')
78 pid_dir_name = Unicode('pid')
79 security_dir = Unicode(u'')
80 log_dir = Unicode(u'')
81 pid_dir = Unicode(u'')
82 location = Unicode(u'')
83
84 def __init__(self, location=u''):
85 super(ClusterDir, self).__init__(location=location)
86
87 def _location_changed(self, name, old, new):
88 if not os.path.isdir(new):
89 os.makedirs(new)
90 self.security_dir = os.path.join(new, self.security_dir_name)
91 self.log_dir = os.path.join(new, self.log_dir_name)
92 self.pid_dir = os.path.join(new, self.pid_dir_name)
93 self.check_dirs()
94
95 def _log_dir_changed(self, name, old, new):
96 self.check_log_dir()
97
98 def check_log_dir(self):
99 if not os.path.isdir(self.log_dir):
100 os.mkdir(self.log_dir)
101
102 def _security_dir_changed(self, name, old, new):
103 self.check_security_dir()
104
105 def check_security_dir(self):
106 if not os.path.isdir(self.security_dir):
107 os.mkdir(self.security_dir, 0700)
108 os.chmod(self.security_dir, 0700)
109
110 def _pid_dir_changed(self, name, old, new):
111 self.check_pid_dir()
112
113 def check_pid_dir(self):
114 if not os.path.isdir(self.pid_dir):
115 os.mkdir(self.pid_dir, 0700)
116 os.chmod(self.pid_dir, 0700)
117
118 def check_dirs(self):
119 self.check_security_dir()
120 self.check_log_dir()
121 self.check_pid_dir()
122
123 def load_config_file(self, filename):
124 """Load a config file from the top level of the cluster dir.
125
126 Parameters
127 ----------
128 filename : unicode or str
129 The filename only of the config file that must be located in
130 the top-level of the cluster directory.
131 """
132 loader = PyFileConfigLoader(filename, self.location)
133 return loader.load_config()
134
135 def copy_config_file(self, config_file, path=None, overwrite=False):
136 """Copy a default config file into the active cluster directory.
137
138 Default configuration files are kept in :mod:`IPython.config.default`.
139 This function moves these from that location to the working cluster
140 directory.
141 """
142 if path is None:
143 import IPython.config.default
144 path = IPython.config.default.__file__.split(os.path.sep)[:-1]
145 path = os.path.sep.join(path)
146 src = os.path.join(path, config_file)
147 dst = os.path.join(self.location, config_file)
148 if not os.path.isfile(dst) or overwrite:
149 shutil.copy(src, dst)
150
151 def copy_all_config_files(self, path=None, overwrite=False):
152 """Copy all config files into the active cluster directory."""
153 for f in [u'ipcontroller_config.py', u'ipengine_config.py',
154 u'ipcluster_config.py']:
155 self.copy_config_file(f, path=path, overwrite=overwrite)
156
157 @classmethod
158 def create_cluster_dir(csl, cluster_dir):
159 """Create a new cluster directory given a full path.
160
161 Parameters
162 ----------
163 cluster_dir : str
164 The full path to the cluster directory. If it does exist, it will
165 be used. If not, it will be created.
166 """
167 return ClusterDir(location=cluster_dir)
168
169 @classmethod
170 def create_cluster_dir_by_profile(cls, path, profile=u'default'):
171 """Create a cluster dir by profile name and path.
172
173 Parameters
174 ----------
175 path : str
176 The path (directory) to put the cluster directory in.
177 profile : str
178 The name of the profile. The name of the cluster directory will
179 be "clusterz_<profile>".
180 """
181 if not os.path.isdir(path):
182 raise ClusterDirError('Directory not found: %s' % path)
183 cluster_dir = os.path.join(path, u'clusterz_' + profile)
184 return ClusterDir(location=cluster_dir)
185
186 @classmethod
187 def find_cluster_dir_by_profile(cls, ipython_dir, profile=u'default'):
188 """Find an existing cluster dir by profile name, return its ClusterDir.
189
190 This searches through a sequence of paths for a cluster dir. If it
191 is not found, a :class:`ClusterDirError` exception will be raised.
192
193 The search path algorithm is:
194 1. ``os.getcwd()``
195 2. ``ipython_dir``
196 3. The directories found in the ":" separated
197 :env:`IPCLUSTER_DIR_PATH` environment variable.
198
199 Parameters
200 ----------
201 ipython_dir : unicode or str
202 The IPython directory to use.
203 profile : unicode or str
204 The name of the profile. The name of the cluster directory
205 will be "clusterz_<profile>".
206 """
207 dirname = u'clusterz_' + profile
208 cluster_dir_paths = os.environ.get('IPCLUSTER_DIR_PATH','')
209 if cluster_dir_paths:
210 cluster_dir_paths = cluster_dir_paths.split(':')
211 else:
212 cluster_dir_paths = []
213 paths = [os.getcwd(), ipython_dir] + cluster_dir_paths
214 for p in paths:
215 cluster_dir = os.path.join(p, dirname)
216 if os.path.isdir(cluster_dir):
217 return ClusterDir(location=cluster_dir)
218 else:
219 raise ClusterDirError('Cluster directory not found in paths: %s' % dirname)
220
221 @classmethod
222 def find_cluster_dir(cls, cluster_dir):
223 """Find/create a cluster dir and return its ClusterDir.
224
225 This will create the cluster directory if it doesn't exist.
226
227 Parameters
228 ----------
229 cluster_dir : unicode or str
230 The path of the cluster directory. This is expanded using
231 :func:`IPython.utils.genutils.expand_path`.
232 """
233 cluster_dir = expand_path(cluster_dir)
234 if not os.path.isdir(cluster_dir):
235 raise ClusterDirError('Cluster directory not found: %s' % cluster_dir)
236 return ClusterDir(location=cluster_dir)
237
238
239 #-----------------------------------------------------------------------------
240 # Command line options
241 #-----------------------------------------------------------------------------
242
243 class ClusterDirConfigLoader(BaseAppConfigLoader):
244
245 def _add_cluster_profile(self, parser):
246 paa = parser.add_argument
247 paa('-p', '--profile',
248 dest='Global.profile',type=unicode,
249 help=
250 """The string name of the profile to be used. This determines the name
251 of the cluster dir as: cluster_<profile>. The default profile is named
252 'default'. The cluster directory is resolve this way if the
253 --cluster-dir option is not used.""",
254 metavar='Global.profile')
255
256 def _add_cluster_dir(self, parser):
257 paa = parser.add_argument
258 paa('--cluster-dir',
259 dest='Global.cluster_dir',type=unicode,
260 help="""Set the cluster dir. This overrides the logic used by the
261 --profile option.""",
262 metavar='Global.cluster_dir')
263
264 def _add_work_dir(self, parser):
265 paa = parser.add_argument
266 paa('--work-dir',
267 dest='Global.work_dir',type=unicode,
268 help='Set the working dir for the process.',
269 metavar='Global.work_dir')
270
271 def _add_clean_logs(self, parser):
272 paa = parser.add_argument
273 paa('--clean-logs',
274 dest='Global.clean_logs', action='store_true',
275 help='Delete old log flies before starting.')
276
277 def _add_no_clean_logs(self, parser):
278 paa = parser.add_argument
279 paa('--no-clean-logs',
280 dest='Global.clean_logs', action='store_false',
281 help="Don't Delete old log flies before starting.")
282
283 def _add_arguments(self):
284 super(ClusterDirConfigLoader, self)._add_arguments()
285 self._add_cluster_profile(self.parser)
286 self._add_cluster_dir(self.parser)
287 self._add_work_dir(self.parser)
288 self._add_clean_logs(self.parser)
289 self._add_no_clean_logs(self.parser)
290
291
292 #-----------------------------------------------------------------------------
293 # Crash handler for this application
294 #-----------------------------------------------------------------------------
295
296
297 _message_template = """\
298 Oops, $self.app_name crashed. We do our best to make it stable, but...
299
300 A crash report was automatically generated with the following information:
301 - A verbatim copy of the crash traceback.
302 - Data on your current $self.app_name configuration.
303
304 It was left in the file named:
305 \t'$self.crash_report_fname'
306 If you can email this file to the developers, the information in it will help
307 them in understanding and correcting the problem.
308
309 You can mail it to: $self.contact_name at $self.contact_email
310 with the subject '$self.app_name Crash Report'.
311
312 If you want to do it now, the following command will work (under Unix):
313 mail -s '$self.app_name Crash Report' $self.contact_email < $self.crash_report_fname
314
315 To ensure accurate tracking of this issue, please file a report about it at:
316 $self.bug_tracker
317 """
318
319 class ClusterDirCrashHandler(CrashHandler):
320 """sys.excepthook for IPython itself, leaves a detailed report on disk."""
321
322 message_template = _message_template
323
324 def __init__(self, app):
325 contact_name = release.authors['Brian'][0]
326 contact_email = release.authors['Brian'][1]
327 bug_tracker = 'http://github.com/ipython/ipython/issues'
328 super(ClusterDirCrashHandler,self).__init__(
329 app, contact_name, contact_email, bug_tracker
330 )
331
332
333 #-----------------------------------------------------------------------------
334 # Main application
335 #-----------------------------------------------------------------------------
336
337 class ApplicationWithClusterDir(Application):
338 """An application that puts everything into a cluster directory.
339
340 Instead of looking for things in the ipython_dir, this type of application
341 will use its own private directory called the "cluster directory"
342 for things like config files, log files, etc.
343
344 The cluster directory is resolved as follows:
345
346 * If the ``--cluster-dir`` option is given, it is used.
347 * If ``--cluster-dir`` is not given, the application directory is
348 resolve using the profile name as ``cluster_<profile>``. The search
349 path for this directory is then i) cwd if it is found there
350 and ii) in ipython_dir otherwise.
351
352 The config file for the application is to be put in the cluster
353 dir and named the value of the ``config_file_name`` class attribute.
354 """
355
356 command_line_loader = ClusterDirConfigLoader
357 crash_handler_class = ClusterDirCrashHandler
358 auto_create_cluster_dir = True
359 # temporarily override default_log_level to DEBUG
360 default_log_level = logging.DEBUG
361
362 def create_default_config(self):
363 super(ApplicationWithClusterDir, self).create_default_config()
364 self.default_config.Global.profile = u'default'
365 self.default_config.Global.cluster_dir = u''
366 self.default_config.Global.work_dir = os.getcwd()
367 self.default_config.Global.log_to_file = False
368 self.default_config.Global.log_url = None
369 self.default_config.Global.clean_logs = False
370
371 def find_resources(self):
372 """This resolves the cluster directory.
373
374 This tries to find the cluster directory and if successful, it will
375 have done:
376 * Sets ``self.cluster_dir_obj`` to the :class:`ClusterDir` object for
377 the application.
378 * Sets ``self.cluster_dir`` attribute of the application and config
379 objects.
380
381 The algorithm used for this is as follows:
382 1. Try ``Global.cluster_dir``.
383 2. Try using ``Global.profile``.
384 3. If both of these fail and ``self.auto_create_cluster_dir`` is
385 ``True``, then create the new cluster dir in the IPython directory.
386 4. If all fails, then raise :class:`ClusterDirError`.
387 """
388
389 try:
390 cluster_dir = self.command_line_config.Global.cluster_dir
391 except AttributeError:
392 cluster_dir = self.default_config.Global.cluster_dir
393 cluster_dir = expand_path(cluster_dir)
394 try:
395 self.cluster_dir_obj = ClusterDir.find_cluster_dir(cluster_dir)
396 except ClusterDirError:
397 pass
398 else:
399 self.log.info('Using existing cluster dir: %s' % \
400 self.cluster_dir_obj.location
401 )
402 self.finish_cluster_dir()
403 return
404
405 try:
406 self.profile = self.command_line_config.Global.profile
407 except AttributeError:
408 self.profile = self.default_config.Global.profile
409 try:
410 self.cluster_dir_obj = ClusterDir.find_cluster_dir_by_profile(
411 self.ipython_dir, self.profile)
412 except ClusterDirError:
413 pass
414 else:
415 self.log.info('Using existing cluster dir: %s' % \
416 self.cluster_dir_obj.location
417 )
418 self.finish_cluster_dir()
419 return
420
421 if self.auto_create_cluster_dir:
422 self.cluster_dir_obj = ClusterDir.create_cluster_dir_by_profile(
423 self.ipython_dir, self.profile
424 )
425 self.log.info('Creating new cluster dir: %s' % \
426 self.cluster_dir_obj.location
427 )
428 self.finish_cluster_dir()
429 else:
430 raise ClusterDirError('Could not find a valid cluster directory.')
431
432 def finish_cluster_dir(self):
433 # Set the cluster directory
434 self.cluster_dir = self.cluster_dir_obj.location
435
436 # These have to be set because they could be different from the one
437 # that we just computed. Because command line has the highest
438 # priority, this will always end up in the master_config.
439 self.default_config.Global.cluster_dir = self.cluster_dir
440 self.command_line_config.Global.cluster_dir = self.cluster_dir
441
442 def find_config_file_name(self):
443 """Find the config file name for this application."""
444 # For this type of Application it should be set as a class attribute.
445 if not hasattr(self, 'default_config_file_name'):
446 self.log.critical("No config filename found")
447 else:
448 self.config_file_name = self.default_config_file_name
449
450 def find_config_file_paths(self):
451 # Set the search path to to the cluster directory. We should NOT
452 # include IPython.config.default here as the default config files
453 # are ALWAYS automatically moved to the cluster directory.
454 conf_dir = os.path.join(get_ipython_package_dir(), 'config', 'default')
455 self.config_file_paths = (self.cluster_dir,)
456
457 def pre_construct(self):
458 # The log and security dirs were set earlier, but here we put them
459 # into the config and log them.
460 config = self.master_config
461 sdir = self.cluster_dir_obj.security_dir
462 self.security_dir = config.Global.security_dir = sdir
463 ldir = self.cluster_dir_obj.log_dir
464 self.log_dir = config.Global.log_dir = ldir
465 pdir = self.cluster_dir_obj.pid_dir
466 self.pid_dir = config.Global.pid_dir = pdir
467 self.log.info("Cluster directory set to: %s" % self.cluster_dir)
468 config.Global.work_dir = unicode(expand_path(config.Global.work_dir))
469 # Change to the working directory. We do this just before construct
470 # is called so all the components there have the right working dir.
471 self.to_work_dir()
472
473 def to_work_dir(self):
474 wd = self.master_config.Global.work_dir
475 if unicode(wd) != unicode(os.getcwd()):
476 os.chdir(wd)
477 self.log.info("Changing to working dir: %s" % wd)
478
479 def start_logging(self):
480 # Remove old log files
481 if self.master_config.Global.clean_logs:
482 log_dir = self.master_config.Global.log_dir
483 for f in os.listdir(log_dir):
484 if f.startswith(self.name + u'-') and f.endswith('.log'):
485 os.remove(os.path.join(log_dir, f))
486 # Start logging to the new log file
487 if self.master_config.Global.log_to_file:
488 log_filename = self.name + u'-' + str(os.getpid()) + u'.log'
489 logfile = os.path.join(self.log_dir, log_filename)
490 open_log_file = open(logfile, 'w')
491 elif self.master_config.Global.log_url:
492 open_log_file = None
493 else:
494 open_log_file = sys.stdout
495 logger = logging.getLogger()
496 level = self.log_level
497 self.log = logger
498 # since we've reconnected the logger, we need to reconnect the log-level
499 self.log_level = level
500 if open_log_file is not None and self._log_handler not in self.log.handlers:
501 self.log.addHandler(self._log_handler)
502 # log.startLogging(open_log_file)
503
504 def write_pid_file(self, overwrite=False):
505 """Create a .pid file in the pid_dir with my pid.
506
507 This must be called after pre_construct, which sets `self.pid_dir`.
508 This raises :exc:`PIDFileError` if the pid file exists already.
509 """
510 pid_file = os.path.join(self.pid_dir, self.name + u'.pid')
511 if os.path.isfile(pid_file):
512 pid = self.get_pid_from_file()
513 if not overwrite:
514 raise PIDFileError(
515 'The pid file [%s] already exists. \nThis could mean that this '
516 'server is already running with [pid=%s].' % (pid_file, pid)
517 )
518 with open(pid_file, 'w') as f:
519 self.log.info("Creating pid file: %s" % pid_file)
520 f.write(repr(os.getpid())+'\n')
521
522 def remove_pid_file(self):
523 """Remove the pid file.
524
525 This should be called at shutdown by registering a callback with
526 :func:`reactor.addSystemEventTrigger`. This needs to return
527 ``None``.
528 """
529 pid_file = os.path.join(self.pid_dir, self.name + u'.pid')
530 if os.path.isfile(pid_file):
531 try:
532 self.log.info("Removing pid file: %s" % pid_file)
533 os.remove(pid_file)
534 except:
535 self.log.warn("Error removing the pid file: %s" % pid_file)
536
537 def get_pid_from_file(self):
538 """Get the pid from the pid file.
539
540 If the pid file doesn't exist a :exc:`PIDFileError` is raised.
541 """
542 pid_file = os.path.join(self.pid_dir, self.name + u'.pid')
543 if os.path.isfile(pid_file):
544 with open(pid_file, 'r') as f:
545 pid = int(f.read().strip())
546 return pid
547 else:
548 raise PIDFileError('pid file not found: %s' % pid_file)
549
@@ -0,0 +1,149 b''
1 """Base config factories."""
2
3 #-----------------------------------------------------------------------------
4 # Copyright (C) 2008-2009 The IPython Development Team
5 #
6 # Distributed under the terms of the BSD License. The full license is in
7 # the file COPYING, distributed as part of this software.
8 #-----------------------------------------------------------------------------
9
10 #-----------------------------------------------------------------------------
11 # Imports
12 #-----------------------------------------------------------------------------
13
14
15 import os
16 import logging
17
18 import uuid
19
20 from zmq.eventloop.ioloop import IOLoop
21
22 from IPython.config.configurable import Configurable
23 from IPython.utils.traitlets import Str,Int,Instance, CUnicode, CStr
24 from IPython.utils.importstring import import_item
25
26 from IPython.zmq.parallel.entry_point import select_random_ports
27 import IPython.zmq.parallel.streamsession as ss
28
29 #-----------------------------------------------------------------------------
30 # Classes
31 #-----------------------------------------------------------------------------
32
33
34 class SessionFactory(Configurable):
35 """The Base factory from which every factory in IPython.zmq.parallel inherits"""
36
37 packer = Str('',config=True)
38 unpacker = Str('',config=True)
39 ident = CStr('',config=True)
40 def _ident_default(self):
41 return str(uuid.uuid4())
42 username = Str(os.environ.get('USER','username'),config=True)
43 exec_key = CUnicode('',config=True)
44
45 # not configurable:
46 context = Instance('zmq.Context', (), {})
47 session = Instance('IPython.zmq.parallel.streamsession.StreamSession')
48 loop = Instance('zmq.eventloop.ioloop.IOLoop')
49 def _loop_default(self):
50 return IOLoop.instance()
51
52 def __init__(self, **kwargs):
53 super(SessionFactory, self).__init__(**kwargs)
54
55 keyfile = self.exec_key or None
56
57 # set the packers:
58 if not self.packer:
59 packer_f = unpacker_f = None
60 elif self.packer.lower() == 'json':
61 packer_f = ss.json_packer
62 unpacker_f = ss.json_unpacker
63 elif self.packer.lower() == 'pickle':
64 packer_f = ss.pickle_packer
65 unpacker_f = ss.pickle_unpacker
66 else:
67 packer_f = import_item(self.packer)
68 unpacker_f = import_item(self.unpacker)
69
70 # construct the session
71 self.session = ss.StreamSession(self.username, self.ident, packer=packer_f, unpacker=unpacker_f, keyfile=keyfile)
72
73
74 class RegistrationFactory(SessionFactory):
75 """The Base Configurable for objects that involve registration."""
76
77 url = Str('', config=True) # url takes precedence over ip,regport,transport
78 transport = Str('tcp', config=True)
79 ip = Str('127.0.0.1', config=True)
80 regport = Instance(int, config=True)
81 def _regport_default(self):
82 return 10101
83 # return select_random_ports(1)[0]
84
85 def __init__(self, **kwargs):
86 super(RegistrationFactory, self).__init__(**kwargs)
87 self._propagate_url()
88 self._rebuild_url()
89 self.on_trait_change(self._propagate_url, 'url')
90 self.on_trait_change(self._rebuild_url, 'ip')
91 self.on_trait_change(self._rebuild_url, 'transport')
92 self.on_trait_change(self._rebuild_url, 'regport')
93
94 def _rebuild_url(self):
95 self.url = "%s://%s:%i"%(self.transport, self.ip, self.regport)
96
97 def _propagate_url(self):
98 """Ensure self.url contains full transport://interface:port"""
99 if self.url:
100 iface = self.url.split('://',1)
101 if len(iface) == 2:
102 self.transport,iface = iface
103 iface = iface.split(':')
104 self.ip = iface[0]
105 if iface[1]:
106 self.regport = int(iface[1])
107
108 #-----------------------------------------------------------------------------
109 # argparse argument extenders
110 #-----------------------------------------------------------------------------
111
112
113 def add_session_arguments(parser):
114 paa = parser.add_argument
115 paa('--ident',
116 type=str, dest='SessionFactory.ident',
117 help='set the ZMQ and session identity [default: random uuid]',
118 metavar='identity')
119 # paa('--execkey',
120 # type=str, dest='SessionFactory.exec_key',
121 # help='path to a file containing an execution key.',
122 # metavar='execkey')
123 paa('--packer',
124 type=str, dest='SessionFactory.packer',
125 help='method to serialize messages: {json,pickle} [default: json]',
126 metavar='packer')
127 paa('--unpacker',
128 type=str, dest='SessionFactory.unpacker',
129 help='inverse function of `packer`. Only necessary when using something other than json|pickle',
130 metavar='packer')
131
132 def add_registration_arguments(parser):
133 paa = parser.add_argument
134 paa('--ip',
135 type=str, dest='RegistrationFactory.ip',
136 help="The IP used for registration [default: localhost]",
137 metavar='ip')
138 paa('--transport',
139 type=str, dest='RegistrationFactory.transport',
140 help="The ZeroMQ transport used for registration [default: tcp]",
141 metavar='transport')
142 paa('--url',
143 type=str, dest='RegistrationFactory.url',
144 help='set transport,ip,regport in one go, e.g. tcp://127.0.0.1:10101',
145 metavar='url')
146 paa('--regport',
147 type=int, dest='RegistrationFactory.regport',
148 help="The port used for registration [default: 10101]",
149 metavar='ip')
@@ -0,0 +1,322 b''
1 #!/usr/bin/env python
2 # encoding: utf-8
3 """
4 The IPython controller application.
5 """
6
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2008-2009 The IPython Development Team
9 #
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
13
14 #-----------------------------------------------------------------------------
15 # Imports
16 #-----------------------------------------------------------------------------
17
18 from __future__ import with_statement
19
20 import copy
21 import sys
22 import os
23 import logging
24 # from twisted.application import service
25 # from twisted.internet import reactor
26 # from twisted.python import log
27
28 import zmq
29 from zmq.log.handlers import PUBHandler
30
31 from IPython.config.loader import Config
32 from IPython.zmq.parallel import factory
33 from IPython.zmq.parallel.controller import ControllerFactory
34 from IPython.zmq.parallel.clusterdir import (
35 ApplicationWithClusterDir,
36 ClusterDirConfigLoader
37 )
38 # from IPython.kernel.fcutil import FCServiceFactory, FURLError
39 from IPython.utils.traitlets import Instance, Unicode
40
41 from entry_point import generate_exec_key
42
43
44 #-----------------------------------------------------------------------------
45 # Module level variables
46 #-----------------------------------------------------------------------------
47
48
49 #: The default config file name for this application
50 default_config_file_name = u'ipcontroller_config.py'
51
52
53 _description = """Start the IPython controller for parallel computing.
54
55 The IPython controller provides a gateway between the IPython engines and
56 clients. The controller needs to be started before the engines and can be
57 configured using command line options or using a cluster directory. Cluster
58 directories contain config, log and security files and are usually located in
59 your .ipython directory and named as "cluster_<profile>". See the --profile
60 and --cluster-dir options for details.
61 """
62
63 #-----------------------------------------------------------------------------
64 # Default interfaces
65 #-----------------------------------------------------------------------------
66
67 # The default client interfaces for FCClientServiceFactory.interfaces
68 default_client_interfaces = Config()
69 default_client_interfaces.Default.url_file = 'ipcontroller-client.url'
70
71 # Make this a dict we can pass to Config.__init__ for the default
72 default_client_interfaces = dict(copy.deepcopy(default_client_interfaces.items()))
73
74
75
76 # The default engine interfaces for FCEngineServiceFactory.interfaces
77 default_engine_interfaces = Config()
78 default_engine_interfaces.Default.url_file = u'ipcontroller-engine.url'
79
80 # Make this a dict we can pass to Config.__init__ for the default
81 default_engine_interfaces = dict(copy.deepcopy(default_engine_interfaces.items()))
82
83
84 #-----------------------------------------------------------------------------
85 # Service factories
86 #-----------------------------------------------------------------------------
87
88 #
89 # class FCClientServiceFactory(FCServiceFactory):
90 # """A Foolscap implementation of the client services."""
91 #
92 # cert_file = Unicode(u'ipcontroller-client.pem', config=True)
93 # interfaces = Instance(klass=Config, kw=default_client_interfaces,
94 # allow_none=False, config=True)
95 #
96 #
97 # class FCEngineServiceFactory(FCServiceFactory):
98 # """A Foolscap implementation of the engine services."""
99 #
100 # cert_file = Unicode(u'ipcontroller-engine.pem', config=True)
101 # interfaces = Instance(klass=dict, kw=default_engine_interfaces,
102 # allow_none=False, config=True)
103 #
104
105 #-----------------------------------------------------------------------------
106 # Command line options
107 #-----------------------------------------------------------------------------
108
109
110 class IPControllerAppConfigLoader(ClusterDirConfigLoader):
111
112 def _add_arguments(self):
113 super(IPControllerAppConfigLoader, self)._add_arguments()
114 paa = self.parser.add_argument
115
116 ## Hub Config:
117 paa('--mongodb',
118 dest='HubFactory.db_class', action='store_const',
119 const='IPython.zmq.parallel.mongodb.MongoDB',
120 help='Use MongoDB task storage [default: in-memory]')
121 paa('--hb',
122 type=int, dest='HubFactory.hb', nargs=2,
123 help='The (2) ports the Hub\'s Heartmonitor will use for the heartbeat '
124 'connections [default: random]',
125 metavar='Hub.hb_ports')
126
127 # Client config
128 paa('--client-ip',
129 type=str, dest='HubFactory.client_ip',
130 help='The IP address or hostname the Hub will listen on for '
131 'client connections. Both engine-ip and client-ip can be set simultaneously '
132 'via --ip [default: loopback]',
133 metavar='Hub.client_ip')
134 paa('--client-transport',
135 type=str, dest='HubFactory.client_transport',
136 help='The ZeroMQ transport the Hub will use for '
137 'client connections. Both engine-transport and client-transport can be set simultaneously '
138 'via --transport [default: tcp]',
139 metavar='Hub.client_transport')
140 paa('--query',
141 type=int, dest='HubFactory.query_port',
142 help='The port on which the Hub XREP socket will listen for result queries from clients [default: random]',
143 metavar='Hub.query_port')
144 paa('--notifier',
145 type=int, dest='HubFactory.notifier_port',
146 help='The port on which the Hub PUB socket will listen for notification connections [default: random]',
147 metavar='Hub.notifier_port')
148
149 # Engine config
150 paa('--engine-ip',
151 type=str, dest='HubFactory.engine_ip',
152 help='The IP address or hostname the Hub will listen on for '
153 'engine connections. This applies to the Hub and its schedulers'
154 'engine-ip and client-ip can be set simultaneously '
155 'via --ip [default: loopback]',
156 metavar='Hub.engine_ip')
157 paa('--engine-transport',
158 type=str, dest='HubFactory.engine_transport',
159 help='The ZeroMQ transport the Hub will use for '
160 'client connections. Both engine-transport and client-transport can be set simultaneously '
161 'via --transport [default: tcp]',
162 metavar='Hub.engine_transport')
163
164 # Scheduler config
165 paa('--mux',
166 type=int, dest='ControllerFactory.mux', nargs=2,
167 help='The (2) ports the MUX scheduler will listen on for client,engine '
168 'connections, respectively [default: random]',
169 metavar='Scheduler.mux_ports')
170 paa('--task',
171 type=int, dest='ControllerFactory.task', nargs=2,
172 help='The (2) ports the Task scheduler will listen on for client,engine '
173 'connections, respectively [default: random]',
174 metavar='Scheduler.task_ports')
175 paa('--control',
176 type=int, dest='ControllerFactory.control', nargs=2,
177 help='The (2) ports the Control scheduler will listen on for client,engine '
178 'connections, respectively [default: random]',
179 metavar='Scheduler.control_ports')
180 paa('--iopub',
181 type=int, dest='ControllerFactory.iopub', nargs=2,
182 help='The (2) ports the IOPub scheduler will listen on for client,engine '
183 'connections, respectively [default: random]',
184 metavar='Scheduler.iopub_ports')
185 paa('--scheme',
186 type=str, dest='ControllerFactory.scheme',
187 choices = ['pure', 'lru', 'plainrandom', 'weighted', 'twobin','leastload'],
188 help='select the task scheduler scheme [default: Python LRU]',
189 metavar='Scheduler.scheme')
190 paa('--usethreads',
191 dest='ControllerFactory.usethreads', action="store_true",
192 help='Use threads instead of processes for the schedulers',
193 )
194
195 ## Global config
196 paa('--log-to-file',
197 action='store_true', dest='Global.log_to_file',
198 help='Log to a file in the log directory (default is stdout)')
199 paa('--log-url',
200 type=str, dest='Global.log_url',
201 help='Broadcast logs to an iploggerz process [default: disabled]')
202 paa('-r','--reuse-key',
203 action='store_true', dest='Global.reuse_key',
204 help='Try to reuse existing execution keys.')
205 paa('--no-secure',
206 action='store_false', dest='Global.secure',
207 help='Turn off execution keys.')
208 paa('--secure',
209 action='store_true', dest='Global.secure',
210 help='Turn on execution keys (default).')
211 paa('--execkey',
212 type=str, dest='Global.exec_key',
213 help='path to a file containing an execution key.',
214 metavar='keyfile')
215 factory.add_session_arguments(self.parser)
216 factory.add_registration_arguments(self.parser)
217
218
219 #-----------------------------------------------------------------------------
220 # The main application
221 #-----------------------------------------------------------------------------
222
223
224 class IPControllerApp(ApplicationWithClusterDir):
225
226 name = u'ipcontrollerz'
227 description = _description
228 command_line_loader = IPControllerAppConfigLoader
229 default_config_file_name = default_config_file_name
230 auto_create_cluster_dir = True
231
232 def create_default_config(self):
233 super(IPControllerApp, self).create_default_config()
234 # Don't set defaults for Global.secure or Global.reuse_furls
235 # as those are set in a component.
236 self.default_config.Global.import_statements = []
237 self.default_config.Global.clean_logs = True
238 self.default_config.Global.secure = False
239 self.default_config.Global.reuse_key = False
240 self.default_config.Global.exec_key = "exec_key.key"
241
242 def pre_construct(self):
243 super(IPControllerApp, self).pre_construct()
244 c = self.master_config
245 # The defaults for these are set in FCClientServiceFactory and
246 # FCEngineServiceFactory, so we only set them here if the global
247 # options have be set to override the class level defaults.
248
249 # if hasattr(c.Global, 'reuse_furls'):
250 # c.FCClientServiceFactory.reuse_furls = c.Global.reuse_furls
251 # c.FCEngineServiceFactory.reuse_furls = c.Global.reuse_furls
252 # del c.Global.reuse_furls
253 # if hasattr(c.Global, 'secure'):
254 # c.FCClientServiceFactory.secure = c.Global.secure
255 # c.FCEngineServiceFactory.secure = c.Global.secure
256 # del c.Global.secure
257
258 def construct(self):
259 # This is the working dir by now.
260 sys.path.insert(0, '')
261 c = self.master_config
262
263 self.import_statements()
264
265 if c.Global.secure:
266 keyfile = os.path.join(c.Global.security_dir, c.Global.exec_key)
267 if not c.Global.reuse_key or not os.path.exists(keyfile):
268 generate_exec_key(keyfile)
269 c.SessionFactory.exec_key = keyfile
270 else:
271 keyfile = os.path.join(c.Global.security_dir, c.Global.exec_key)
272 if os.path.exists(keyfile):
273 os.remove(keyfile)
274 c.SessionFactory.exec_key = ''
275
276 try:
277 self.factory = ControllerFactory(config=c)
278 self.start_logging()
279 self.factory.construct()
280 except:
281 self.log.error("Couldn't construct the Controller", exc_info=True)
282 self.exit(1)
283
284
285 def import_statements(self):
286 statements = self.master_config.Global.import_statements
287 for s in statements:
288 try:
289 self.log.msg("Executing statement: '%s'" % s)
290 exec s in globals(), locals()
291 except:
292 self.log.msg("Error running statement: %s" % s)
293
294 # def start_logging(self):
295 # super(IPControllerApp, self).start_logging()
296 # if self.master_config.Global.log_url:
297 # context = self.factory.context
298 # lsock = context.socket(zmq.PUB)
299 # lsock.connect(self.master_config.Global.log_url)
300 # handler = PUBHandler(lsock)
301 # handler.root_topic = 'controller'
302 # handler.setLevel(self.log_level)
303 # self.log.addHandler(handler)
304 #
305 def start_app(self):
306 # Start the controller service.
307 self.factory.start()
308 self.write_pid_file(overwrite=True)
309 try:
310 self.factory.loop.start()
311 except KeyboardInterrupt:
312 self.log.critical("Interrupted, Exiting...\n")
313
314
315 def launch_new_instance():
316 """Create and run the IPython controller"""
317 app = IPControllerApp()
318 app.start()
319
320
321 if __name__ == '__main__':
322 launch_new_instance()
@@ -0,0 +1,261 b''
1 #!/usr/bin/env python
2 # encoding: utf-8
3 """
4 The IPython engine application
5 """
6
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2008-2009 The IPython Development Team
9 #
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
13
14 #-----------------------------------------------------------------------------
15 # Imports
16 #-----------------------------------------------------------------------------
17
18 import os
19 import sys
20
21 import zmq
22 from zmq.eventloop import ioloop
23
24 from IPython.zmq.parallel.clusterdir import (
25 ApplicationWithClusterDir,
26 ClusterDirConfigLoader
27 )
28 from IPython.zmq.log import EnginePUBHandler
29
30 from IPython.zmq.parallel import factory
31 from IPython.zmq.parallel.engine import EngineFactory
32 from IPython.zmq.parallel.streamkernel import Kernel
33 from IPython.utils.importstring import import_item
34
35 #-----------------------------------------------------------------------------
36 # Module level variables
37 #-----------------------------------------------------------------------------
38
39 #: The default config file name for this application
40 default_config_file_name = u'ipengine_config.py'
41
42
43 mpi4py_init = """from mpi4py import MPI as mpi
44 mpi.size = mpi.COMM_WORLD.Get_size()
45 mpi.rank = mpi.COMM_WORLD.Get_rank()
46 """
47
48
49 pytrilinos_init = """from PyTrilinos import Epetra
50 class SimpleStruct:
51 pass
52 mpi = SimpleStruct()
53 mpi.rank = 0
54 mpi.size = 0
55 """
56
57
58 _description = """Start an IPython engine for parallel computing.\n\n
59
60 IPython engines run in parallel and perform computations on behalf of a client
61 and controller. A controller needs to be started before the engines. The
62 engine can be configured using command line options or using a cluster
63 directory. Cluster directories contain config, log and security files and are
64 usually located in your .ipython directory and named as "cluster_<profile>".
65 See the --profile and --cluster-dir options for details.
66 """
67
68 #-----------------------------------------------------------------------------
69 # Command line options
70 #-----------------------------------------------------------------------------
71
72
73 class IPEngineAppConfigLoader(ClusterDirConfigLoader):
74
75 def _add_arguments(self):
76 super(IPEngineAppConfigLoader, self)._add_arguments()
77 paa = self.parser.add_argument
78 # Controller config
79 paa('--url-file',
80 type=unicode, dest='Global.url_file',
81 help='The full location of the file containing the FURL of the '
82 'controller. If this is not given, the FURL file must be in the '
83 'security directory of the cluster directory. This location is '
84 'resolved using the --profile and --app-dir options.',
85 metavar='Global.url_file')
86 # MPI
87 paa('--mpi',
88 type=str, dest='MPI.use',
89 help='How to enable MPI (mpi4py, pytrilinos, or empty string to disable).',
90 metavar='MPI.use')
91 # Global config
92 paa('--log-to-file',
93 action='store_true', dest='Global.log_to_file',
94 help='Log to a file in the log directory (default is stdout)')
95 paa('--log-url',
96 dest='Global.log_url',
97 help="url of ZMQ logger, as started with iploggerz")
98 paa('--execkey',
99 type=str, dest='Global.exec_key',
100 help='path to a file containing an execution key.',
101 metavar='keyfile')
102 paa('--no-secure',
103 action='store_false', dest='Global.secure',
104 help='Turn off execution keys.')
105 paa('--secure',
106 action='store_true', dest='Global.secure',
107 help='Turn on execution keys (default).')
108 # init command
109 paa('-c',
110 type=str, dest='Global.extra_exec_lines',
111 help='specify a command to be run at startup')
112
113 factory.add_session_arguments(self.parser)
114 factory.add_registration_arguments(self.parser)
115
116
117 #-----------------------------------------------------------------------------
118 # Main application
119 #-----------------------------------------------------------------------------
120
121
122 class IPEngineApp(ApplicationWithClusterDir):
123
124 name = u'ipenginez'
125 description = _description
126 command_line_loader = IPEngineAppConfigLoader
127 default_config_file_name = default_config_file_name
128 auto_create_cluster_dir = True
129
130 def create_default_config(self):
131 super(IPEngineApp, self).create_default_config()
132
133 # The engine should not clean logs as we don't want to remove the
134 # active log files of other running engines.
135 self.default_config.Global.clean_logs = False
136 self.default_config.Global.secure = True
137
138 # Global config attributes
139 self.default_config.Global.exec_lines = []
140 self.default_config.Global.extra_exec_lines = ''
141
142 # Configuration related to the controller
143 # This must match the filename (path not included) that the controller
144 # used for the FURL file.
145 self.default_config.Global.url = u'tcp://localhost:10101'
146 # If given, this is the actual location of the controller's FURL file.
147 # If not, this is computed using the profile, app_dir and furl_file_name
148 self.default_config.Global.key_file_name = u'exec_key.key'
149 self.default_config.Global.key_file = u''
150
151 # MPI related config attributes
152 self.default_config.MPI.use = ''
153 self.default_config.MPI.mpi4py = mpi4py_init
154 self.default_config.MPI.pytrilinos = pytrilinos_init
155
156 def post_load_command_line_config(self):
157 pass
158
159 def pre_construct(self):
160 super(IPEngineApp, self).pre_construct()
161 # self.find_cont_url_file()
162 self.find_key_file()
163 if self.master_config.Global.extra_exec_lines:
164 self.master_config.Global.exec_lines.append(self.master_config.Global.extra_exec_lines)
165
166 def find_key_file(self):
167 """Set the key file.
168
169 Here we don't try to actually see if it exists for is valid as that
170 is hadled by the connection logic.
171 """
172 config = self.master_config
173 # Find the actual controller key file
174 if not config.Global.key_file:
175 try_this = os.path.join(
176 config.Global.cluster_dir,
177 config.Global.security_dir,
178 config.Global.key_file_name
179 )
180 config.Global.key_file = try_this
181
182 def construct(self):
183 # This is the working dir by now.
184 sys.path.insert(0, '')
185 config = self.master_config
186 if os.path.exists(config.Global.key_file) and config.Global.secure:
187 config.SessionFactory.exec_key = config.Global.key_file
188
189 config.Kernel.exec_lines = config.Global.exec_lines
190
191 self.start_mpi()
192
193 # Create the underlying shell class and EngineService
194 # shell_class = import_item(self.master_config.Global.shell_class)
195 try:
196 self.engine = EngineFactory(config=config)
197 except:
198 self.log.error("Couldn't start the Engine", exc_info=True)
199 self.exit(1)
200
201 self.start_logging()
202
203 # Create the service hierarchy
204 # self.main_service = service.MultiService()
205 # self.engine_service.setServiceParent(self.main_service)
206 # self.tub_service = Tub()
207 # self.tub_service.setServiceParent(self.main_service)
208 # # This needs to be called before the connection is initiated
209 # self.main_service.startService()
210
211 # This initiates the connection to the controller and calls
212 # register_engine to tell the controller we are ready to do work
213 # self.engine_connector = EngineConnector(self.tub_service)
214
215 # self.log.info("Using furl file: %s" % self.master_config.Global.furl_file)
216
217 # reactor.callWhenRunning(self.call_connect)
218
219
220 def start_logging(self):
221 super(IPEngineApp, self).start_logging()
222 if self.master_config.Global.log_url:
223 context = self.engine.context
224 lsock = context.socket(zmq.PUB)
225 lsock.connect(self.master_config.Global.log_url)
226 handler = EnginePUBHandler(self.engine, lsock)
227 handler.setLevel(self.log_level)
228 self.log.addHandler(handler)
229
230 def start_mpi(self):
231 global mpi
232 mpikey = self.master_config.MPI.use
233 mpi_import_statement = self.master_config.MPI.get(mpikey, None)
234 if mpi_import_statement is not None:
235 try:
236 self.log.info("Initializing MPI:")
237 self.log.info(mpi_import_statement)
238 exec mpi_import_statement in globals()
239 except:
240 mpi = None
241 else:
242 mpi = None
243
244
245 def start_app(self):
246 self.engine.start()
247 try:
248 self.engine.loop.start()
249 except KeyboardInterrupt:
250 self.log.critical("Engine Interrupted, shutting down...\n")
251
252
253 def launch_new_instance():
254 """Create and run the IPython controller"""
255 app = IPEngineApp()
256 app.start()
257
258
259 if __name__ == '__main__':
260 launch_new_instance()
261
@@ -0,0 +1,131 b''
1 #!/usr/bin/env python
2 # encoding: utf-8
3 """
4 A simple IPython logger application
5 """
6
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2011 The IPython Development Team
9 #
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
13
14 #-----------------------------------------------------------------------------
15 # Imports
16 #-----------------------------------------------------------------------------
17
18 import os
19 import sys
20
21 import zmq
22
23 from IPython.zmq.parallel.clusterdir import (
24 ApplicationWithClusterDir,
25 ClusterDirConfigLoader
26 )
27 from IPython.zmq.parallel.logwatcher import LogWatcher
28
29 #-----------------------------------------------------------------------------
30 # Module level variables
31 #-----------------------------------------------------------------------------
32
33 #: The default config file name for this application
34 default_config_file_name = u'iplogger_config.py'
35
36 _description = """Start an IPython logger for parallel computing.\n\n
37
38 IPython controllers and engines (and your own processes) can broadcast log messages
39 by registering a `zmq.log.handlers.PUBHandler` with the `logging` module. The
40 logger can be configured using command line options or using a cluster
41 directory. Cluster directories contain config, log and security files and are
42 usually located in your .ipython directory and named as "cluster_<profile>".
43 See the --profile and --cluster-dir options for details.
44 """
45
46 #-----------------------------------------------------------------------------
47 # Command line options
48 #-----------------------------------------------------------------------------
49
50
51 class IPLoggerAppConfigLoader(ClusterDirConfigLoader):
52
53 def _add_arguments(self):
54 super(IPLoggerAppConfigLoader, self)._add_arguments()
55 paa = self.parser.add_argument
56 # Controller config
57 paa('--url',
58 type=str, dest='LogWatcher.url',
59 help='The url the LogWatcher will listen on',
60 )
61 # MPI
62 paa('--topics',
63 type=str, dest='LogWatcher.topics', nargs='+',
64 help='What topics to subscribe to',
65 metavar='topics')
66 # Global config
67 paa('--log-to-file',
68 action='store_true', dest='Global.log_to_file',
69 help='Log to a file in the log directory (default is stdout)')
70
71
72 #-----------------------------------------------------------------------------
73 # Main application
74 #-----------------------------------------------------------------------------
75
76
77 class IPLoggerApp(ApplicationWithClusterDir):
78
79 name = u'iploggerz'
80 description = _description
81 command_line_loader = IPLoggerAppConfigLoader
82 default_config_file_name = default_config_file_name
83 auto_create_cluster_dir = True
84
85 def create_default_config(self):
86 super(IPLoggerApp, self).create_default_config()
87
88 # The engine should not clean logs as we don't want to remove the
89 # active log files of other running engines.
90 self.default_config.Global.clean_logs = False
91
92 # If given, this is the actual location of the logger's URL file.
93 # If not, this is computed using the profile, app_dir and furl_file_name
94 self.default_config.Global.url_file_name = u'iplogger.url'
95 self.default_config.Global.url_file = u''
96
97 def post_load_command_line_config(self):
98 pass
99
100 def pre_construct(self):
101 super(IPLoggerApp, self).pre_construct()
102
103 def construct(self):
104 # This is the working dir by now.
105 sys.path.insert(0, '')
106
107 self.start_logging()
108
109 try:
110 self.watcher = LogWatcher(config=self.master_config)
111 except:
112 self.log.error("Couldn't start the LogWatcher", exc_info=True)
113 self.exit(1)
114
115
116 def start_app(self):
117 try:
118 self.watcher.loop.start()
119 except KeyboardInterrupt:
120 self.log.critical("Logging Interrupted, shutting down...\n")
121
122
123 def launch_new_instance():
124 """Create and run the IPython LogWatcher"""
125 app = IPLoggerApp()
126 app.start()
127
128
129 if __name__ == '__main__':
130 launch_new_instance()
131
@@ -0,0 +1,92 b''
1 #!/usr/bin/env python
2 """A simple logger object that consolidates messages incoming from ipclusterz processes."""
3
4 #-----------------------------------------------------------------------------
5 # Copyright (C) 2011 The IPython Development Team
6 #
7 # Distributed under the terms of the BSD License. The full license is in
8 # the file COPYING, distributed as part of this software.
9 #-----------------------------------------------------------------------------
10
11 #-----------------------------------------------------------------------------
12 # Imports
13 #-----------------------------------------------------------------------------
14
15
16 import sys
17 import logging
18
19 import zmq
20 from zmq.eventloop import ioloop, zmqstream
21 from IPython.config.configurable import Configurable
22 from IPython.utils.traitlets import Int, Str, Instance, List
23
24 #-----------------------------------------------------------------------------
25 # Classes
26 #-----------------------------------------------------------------------------
27
28
29 class LogWatcher(Configurable):
30 """A simple class that receives messages on a SUB socket, as published
31 by subclasses of `zmq.log.handlers.PUBHandler`, and logs them itself.
32
33 This can subscribe to multiple topics, but defaults to all topics.
34 """
35 # configurables
36 topics = List([''], config=True)
37 url = Str('tcp://127.0.0.1:20202', config=True)
38
39 # internals
40 context = Instance(zmq.Context, (), {})
41 stream = Instance('zmq.eventloop.zmqstream.ZMQStream')
42 loop = Instance('zmq.eventloop.ioloop.IOLoop')
43 def _loop_default(self):
44 return ioloop.IOLoop.instance()
45
46 def __init__(self, config=None):
47 super(LogWatcher, self).__init__(config=config)
48 s = self.context.socket(zmq.SUB)
49 s.bind(self.url)
50 self.stream = zmqstream.ZMQStream(s, self.loop)
51 self.subscribe()
52 self.on_trait_change(self.subscribe, 'topics')
53
54 self.stream.on_recv(self.log_message)
55
56 def subscribe(self):
57 """Update our SUB socket's subscriptions."""
58 self.stream.setsockopt(zmq.UNSUBSCRIBE, '')
59 for topic in self.topics:
60 logging.debug("Subscribing to: %r"%topic)
61 self.stream.setsockopt(zmq.SUBSCRIBE, topic)
62
63 def _extract_level(self, topic_str):
64 """Turn 'engine.0.INFO.extra' into (logging.INFO, 'engine.0.extra')"""
65 topics = topic_str.split('.')
66 for idx,t in enumerate(topics):
67 level = getattr(logging, t, None)
68 if level is not None:
69 break
70
71 if level is None:
72 level = logging.INFO
73 else:
74 topics.pop(idx)
75
76 return level, '.'.join(topics)
77
78
79 def log_message(self, raw):
80 """receive and parse a message, then log it."""
81 if len(raw) != 2 or '.' not in raw[0]:
82 logging.error("Invalid log message: %s"%raw)
83 return
84 else:
85 topic, msg = raw
86 # don't newline, since log messages always newline:
87 topic,level_name = topic.rsplit('.',1)
88 level,topic = self._extract_level(topic)
89 if msg[-1] == '\n':
90 msg = msg[:-1]
91 logging.log(level, "[%s] %s" % (topic, msg))
92
@@ -1,1075 +1,1095 b''
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3 """
4 4 A lightweight Traits like module.
5 5
6 6 This is designed to provide a lightweight, simple, pure Python version of
7 7 many of the capabilities of enthought.traits. This includes:
8 8
9 9 * Validation
10 10 * Type specification with defaults
11 11 * Static and dynamic notification
12 12 * Basic predefined types
13 13 * An API that is similar to enthought.traits
14 14
15 15 We don't support:
16 16
17 17 * Delegation
18 18 * Automatic GUI generation
19 19 * A full set of trait types. Most importantly, we don't provide container
20 20 traits (list, dict, tuple) that can trigger notifications if their
21 21 contents change.
22 22 * API compatibility with enthought.traits
23 23
24 24 There are also some important difference in our design:
25 25
26 26 * enthought.traits does not validate default values. We do.
27 27
28 28 We choose to create this module because we need these capabilities, but
29 29 we need them to be pure Python so they work in all Python implementations,
30 30 including Jython and IronPython.
31 31
32 32 Authors:
33 33
34 34 * Brian Granger
35 35 * Enthought, Inc. Some of the code in this file comes from enthought.traits
36 36 and is licensed under the BSD license. Also, many of the ideas also come
37 37 from enthought.traits even though our implementation is very different.
38 38 """
39 39
40 40 #-----------------------------------------------------------------------------
41 41 # Copyright (C) 2008-2009 The IPython Development Team
42 42 #
43 43 # Distributed under the terms of the BSD License. The full license is in
44 44 # the file COPYING, distributed as part of this software.
45 45 #-----------------------------------------------------------------------------
46 46
47 47 #-----------------------------------------------------------------------------
48 48 # Imports
49 49 #-----------------------------------------------------------------------------
50 50
51 51
52 52 import inspect
53 53 import sys
54 54 import types
55 55 from types import (
56 56 InstanceType, ClassType, FunctionType,
57 57 ListType, TupleType
58 58 )
59 59 from .importstring import import_item
60 60
61 61 ClassTypes = (ClassType, type)
62 62
63 SequenceTypes = (ListType, TupleType)
63 SequenceTypes = (ListType, TupleType, set, frozenset)
64 64
65 65 #-----------------------------------------------------------------------------
66 66 # Basic classes
67 67 #-----------------------------------------------------------------------------
68 68
69 69
70 70 class NoDefaultSpecified ( object ): pass
71 71 NoDefaultSpecified = NoDefaultSpecified()
72 72
73 73
74 74 class Undefined ( object ): pass
75 75 Undefined = Undefined()
76 76
77 77 class TraitError(Exception):
78 78 pass
79 79
80 80 #-----------------------------------------------------------------------------
81 81 # Utilities
82 82 #-----------------------------------------------------------------------------
83 83
84 84
85 85 def class_of ( object ):
86 86 """ Returns a string containing the class name of an object with the
87 87 correct indefinite article ('a' or 'an') preceding it (e.g., 'an Image',
88 88 'a PlotValue').
89 89 """
90 90 if isinstance( object, basestring ):
91 91 return add_article( object )
92 92
93 93 return add_article( object.__class__.__name__ )
94 94
95 95
96 96 def add_article ( name ):
97 97 """ Returns a string containing the correct indefinite article ('a' or 'an')
98 98 prefixed to the specified string.
99 99 """
100 100 if name[:1].lower() in 'aeiou':
101 101 return 'an ' + name
102 102
103 103 return 'a ' + name
104 104
105 105
106 106 def repr_type(obj):
107 107 """ Return a string representation of a value and its type for readable
108 108 error messages.
109 109 """
110 110 the_type = type(obj)
111 111 if the_type is InstanceType:
112 112 # Old-style class.
113 113 the_type = obj.__class__
114 114 msg = '%r %r' % (obj, the_type)
115 115 return msg
116 116
117 117
118 118 def parse_notifier_name(name):
119 119 """Convert the name argument to a list of names.
120 120
121 121 Examples
122 122 --------
123 123
124 124 >>> parse_notifier_name('a')
125 125 ['a']
126 126 >>> parse_notifier_name(['a','b'])
127 127 ['a', 'b']
128 128 >>> parse_notifier_name(None)
129 129 ['anytrait']
130 130 """
131 131 if isinstance(name, str):
132 132 return [name]
133 133 elif name is None:
134 134 return ['anytrait']
135 135 elif isinstance(name, (list, tuple)):
136 136 for n in name:
137 137 assert isinstance(n, str), "names must be strings"
138 138 return name
139 139
140 140
141 141 class _SimpleTest:
142 142 def __init__ ( self, value ): self.value = value
143 143 def __call__ ( self, test ):
144 144 return test == self.value
145 145 def __repr__(self):
146 146 return "<SimpleTest(%r)" % self.value
147 147 def __str__(self):
148 148 return self.__repr__()
149 149
150 150
151 151 def getmembers(object, predicate=None):
152 152 """A safe version of inspect.getmembers that handles missing attributes.
153 153
154 154 This is useful when there are descriptor based attributes that for
155 155 some reason raise AttributeError even though they exist. This happens
156 156 in zope.inteface with the __provides__ attribute.
157 157 """
158 158 results = []
159 159 for key in dir(object):
160 160 try:
161 161 value = getattr(object, key)
162 162 except AttributeError:
163 163 pass
164 164 else:
165 165 if not predicate or predicate(value):
166 166 results.append((key, value))
167 167 results.sort()
168 168 return results
169 169
170 170
171 171 #-----------------------------------------------------------------------------
172 172 # Base TraitType for all traits
173 173 #-----------------------------------------------------------------------------
174 174
175 175
176 176 class TraitType(object):
177 177 """A base class for all trait descriptors.
178 178
179 179 Notes
180 180 -----
181 181 Our implementation of traits is based on Python's descriptor
182 182 prototol. This class is the base class for all such descriptors. The
183 183 only magic we use is a custom metaclass for the main :class:`HasTraits`
184 184 class that does the following:
185 185
186 186 1. Sets the :attr:`name` attribute of every :class:`TraitType`
187 187 instance in the class dict to the name of the attribute.
188 188 2. Sets the :attr:`this_class` attribute of every :class:`TraitType`
189 189 instance in the class dict to the *class* that declared the trait.
190 190 This is used by the :class:`This` trait to allow subclasses to
191 191 accept superclasses for :class:`This` values.
192 192 """
193 193
194 194
195 195 metadata = {}
196 196 default_value = Undefined
197 197 info_text = 'any value'
198 198
199 199 def __init__(self, default_value=NoDefaultSpecified, **metadata):
200 200 """Create a TraitType.
201 201 """
202 202 if default_value is not NoDefaultSpecified:
203 203 self.default_value = default_value
204 204
205 205 if len(metadata) > 0:
206 206 if len(self.metadata) > 0:
207 207 self._metadata = self.metadata.copy()
208 208 self._metadata.update(metadata)
209 209 else:
210 210 self._metadata = metadata
211 211 else:
212 212 self._metadata = self.metadata
213 213
214 214 self.init()
215 215
216 216 def init(self):
217 217 pass
218 218
219 219 def get_default_value(self):
220 220 """Create a new instance of the default value."""
221 221 return self.default_value
222 222
223 223 def instance_init(self, obj):
224 224 """This is called by :meth:`HasTraits.__new__` to finish init'ing.
225 225
226 226 Some stages of initialization must be delayed until the parent
227 227 :class:`HasTraits` instance has been created. This method is
228 228 called in :meth:`HasTraits.__new__` after the instance has been
229 229 created.
230 230
231 231 This method trigger the creation and validation of default values
232 232 and also things like the resolution of str given class names in
233 233 :class:`Type` and :class`Instance`.
234 234
235 235 Parameters
236 236 ----------
237 237 obj : :class:`HasTraits` instance
238 238 The parent :class:`HasTraits` instance that has just been
239 239 created.
240 240 """
241 241 self.set_default_value(obj)
242 242
243 243 def set_default_value(self, obj):
244 244 """Set the default value on a per instance basis.
245 245
246 246 This method is called by :meth:`instance_init` to create and
247 247 validate the default value. The creation and validation of
248 248 default values must be delayed until the parent :class:`HasTraits`
249 249 class has been instantiated.
250 250 """
251 251 # Check for a deferred initializer defined in the same class as the
252 252 # trait declaration or above.
253 253 mro = type(obj).mro()
254 254 meth_name = '_%s_default' % self.name
255 255 for cls in mro[:mro.index(self.this_class)+1]:
256 256 if meth_name in cls.__dict__:
257 257 break
258 258 else:
259 259 # We didn't find one. Do static initialization.
260 260 dv = self.get_default_value()
261 261 newdv = self._validate(obj, dv)
262 262 obj._trait_values[self.name] = newdv
263 263 return
264 264 # Complete the dynamic initialization.
265 265 obj._trait_dyn_inits[self.name] = cls.__dict__[meth_name]
266 266
267 267 def __get__(self, obj, cls=None):
268 268 """Get the value of the trait by self.name for the instance.
269 269
270 270 Default values are instantiated when :meth:`HasTraits.__new__`
271 271 is called. Thus by the time this method gets called either the
272 272 default value or a user defined value (they called :meth:`__set__`)
273 273 is in the :class:`HasTraits` instance.
274 274 """
275 275 if obj is None:
276 276 return self
277 277 else:
278 278 try:
279 279 value = obj._trait_values[self.name]
280 280 except KeyError:
281 281 # Check for a dynamic initializer.
282 282 if self.name in obj._trait_dyn_inits:
283 283 value = obj._trait_dyn_inits[self.name](obj)
284 284 # FIXME: Do we really validate here?
285 285 value = self._validate(obj, value)
286 286 obj._trait_values[self.name] = value
287 287 return value
288 288 else:
289 289 raise TraitError('Unexpected error in TraitType: '
290 290 'both default value and dynamic initializer are '
291 291 'absent.')
292 292 except Exception:
293 293 # HasTraits should call set_default_value to populate
294 294 # this. So this should never be reached.
295 295 raise TraitError('Unexpected error in TraitType: '
296 296 'default value not set properly')
297 297 else:
298 298 return value
299 299
300 300 def __set__(self, obj, value):
301 301 new_value = self._validate(obj, value)
302 302 old_value = self.__get__(obj)
303 303 if old_value != new_value:
304 304 obj._trait_values[self.name] = new_value
305 305 obj._notify_trait(self.name, old_value, new_value)
306 306
307 307 def _validate(self, obj, value):
308 308 if hasattr(self, 'validate'):
309 309 return self.validate(obj, value)
310 310 elif hasattr(self, 'is_valid_for'):
311 311 valid = self.is_valid_for(value)
312 312 if valid:
313 313 return value
314 314 else:
315 315 raise TraitError('invalid value for type: %r' % value)
316 316 elif hasattr(self, 'value_for'):
317 317 return self.value_for(value)
318 318 else:
319 319 return value
320 320
321 321 def info(self):
322 322 return self.info_text
323 323
324 324 def error(self, obj, value):
325 325 if obj is not None:
326 326 e = "The '%s' trait of %s instance must be %s, but a value of %s was specified." \
327 327 % (self.name, class_of(obj),
328 328 self.info(), repr_type(value))
329 329 else:
330 330 e = "The '%s' trait must be %s, but a value of %r was specified." \
331 331 % (self.name, self.info(), repr_type(value))
332 332 raise TraitError(e)
333 333
334 334 def get_metadata(self, key):
335 335 return getattr(self, '_metadata', {}).get(key, None)
336 336
337 337 def set_metadata(self, key, value):
338 338 getattr(self, '_metadata', {})[key] = value
339 339
340 340
341 341 #-----------------------------------------------------------------------------
342 342 # The HasTraits implementation
343 343 #-----------------------------------------------------------------------------
344 344
345 345
346 346 class MetaHasTraits(type):
347 347 """A metaclass for HasTraits.
348 348
349 349 This metaclass makes sure that any TraitType class attributes are
350 350 instantiated and sets their name attribute.
351 351 """
352 352
353 353 def __new__(mcls, name, bases, classdict):
354 354 """Create the HasTraits class.
355 355
356 356 This instantiates all TraitTypes in the class dict and sets their
357 357 :attr:`name` attribute.
358 358 """
359 359 # print "MetaHasTraitlets (mcls, name): ", mcls, name
360 360 # print "MetaHasTraitlets (bases): ", bases
361 361 # print "MetaHasTraitlets (classdict): ", classdict
362 362 for k,v in classdict.iteritems():
363 363 if isinstance(v, TraitType):
364 364 v.name = k
365 365 elif inspect.isclass(v):
366 366 if issubclass(v, TraitType):
367 367 vinst = v()
368 368 vinst.name = k
369 369 classdict[k] = vinst
370 370 return super(MetaHasTraits, mcls).__new__(mcls, name, bases, classdict)
371 371
372 372 def __init__(cls, name, bases, classdict):
373 373 """Finish initializing the HasTraits class.
374 374
375 375 This sets the :attr:`this_class` attribute of each TraitType in the
376 376 class dict to the newly created class ``cls``.
377 377 """
378 378 for k, v in classdict.iteritems():
379 379 if isinstance(v, TraitType):
380 380 v.this_class = cls
381 381 super(MetaHasTraits, cls).__init__(name, bases, classdict)
382 382
383 383 class HasTraits(object):
384 384
385 385 __metaclass__ = MetaHasTraits
386 386
387 387 def __new__(cls, **kw):
388 388 # This is needed because in Python 2.6 object.__new__ only accepts
389 389 # the cls argument.
390 390 new_meth = super(HasTraits, cls).__new__
391 391 if new_meth is object.__new__:
392 392 inst = new_meth(cls)
393 393 else:
394 394 inst = new_meth(cls, **kw)
395 395 inst._trait_values = {}
396 396 inst._trait_notifiers = {}
397 397 inst._trait_dyn_inits = {}
398 398 # Here we tell all the TraitType instances to set their default
399 399 # values on the instance.
400 400 for key in dir(cls):
401 401 # Some descriptors raise AttributeError like zope.interface's
402 402 # __provides__ attributes even though they exist. This causes
403 403 # AttributeErrors even though they are listed in dir(cls).
404 404 try:
405 405 value = getattr(cls, key)
406 406 except AttributeError:
407 407 pass
408 408 else:
409 409 if isinstance(value, TraitType):
410 410 value.instance_init(inst)
411 411
412 412 return inst
413 413
414 414 def __init__(self, **kw):
415 415 # Allow trait values to be set using keyword arguments.
416 416 # We need to use setattr for this to trigger validation and
417 417 # notifications.
418 418 for key, value in kw.iteritems():
419 419 setattr(self, key, value)
420 420
421 421 def _notify_trait(self, name, old_value, new_value):
422 422
423 423 # First dynamic ones
424 424 callables = self._trait_notifiers.get(name,[])
425 425 more_callables = self._trait_notifiers.get('anytrait',[])
426 426 callables.extend(more_callables)
427 427
428 428 # Now static ones
429 429 try:
430 430 cb = getattr(self, '_%s_changed' % name)
431 431 except:
432 432 pass
433 433 else:
434 434 callables.append(cb)
435 435
436 436 # Call them all now
437 437 for c in callables:
438 438 # Traits catches and logs errors here. I allow them to raise
439 439 if callable(c):
440 440 argspec = inspect.getargspec(c)
441 441 nargs = len(argspec[0])
442 442 # Bound methods have an additional 'self' argument
443 443 # I don't know how to treat unbound methods, but they
444 444 # can't really be used for callbacks.
445 445 if isinstance(c, types.MethodType):
446 446 offset = -1
447 447 else:
448 448 offset = 0
449 449 if nargs + offset == 0:
450 450 c()
451 451 elif nargs + offset == 1:
452 452 c(name)
453 453 elif nargs + offset == 2:
454 454 c(name, new_value)
455 455 elif nargs + offset == 3:
456 456 c(name, old_value, new_value)
457 457 else:
458 458 raise TraitError('a trait changed callback '
459 459 'must have 0-3 arguments.')
460 460 else:
461 461 raise TraitError('a trait changed callback '
462 462 'must be callable.')
463 463
464 464
465 465 def _add_notifiers(self, handler, name):
466 466 if not self._trait_notifiers.has_key(name):
467 467 nlist = []
468 468 self._trait_notifiers[name] = nlist
469 469 else:
470 470 nlist = self._trait_notifiers[name]
471 471 if handler not in nlist:
472 472 nlist.append(handler)
473 473
474 474 def _remove_notifiers(self, handler, name):
475 475 if self._trait_notifiers.has_key(name):
476 476 nlist = self._trait_notifiers[name]
477 477 try:
478 478 index = nlist.index(handler)
479 479 except ValueError:
480 480 pass
481 481 else:
482 482 del nlist[index]
483 483
484 484 def on_trait_change(self, handler, name=None, remove=False):
485 485 """Setup a handler to be called when a trait changes.
486 486
487 487 This is used to setup dynamic notifications of trait changes.
488 488
489 489 Static handlers can be created by creating methods on a HasTraits
490 490 subclass with the naming convention '_[traitname]_changed'. Thus,
491 491 to create static handler for the trait 'a', create the method
492 492 _a_changed(self, name, old, new) (fewer arguments can be used, see
493 493 below).
494 494
495 495 Parameters
496 496 ----------
497 497 handler : callable
498 498 A callable that is called when a trait changes. Its
499 499 signature can be handler(), handler(name), handler(name, new)
500 500 or handler(name, old, new).
501 501 name : list, str, None
502 502 If None, the handler will apply to all traits. If a list
503 503 of str, handler will apply to all names in the list. If a
504 504 str, the handler will apply just to that name.
505 505 remove : bool
506 506 If False (the default), then install the handler. If True
507 507 then unintall it.
508 508 """
509 509 if remove:
510 510 names = parse_notifier_name(name)
511 511 for n in names:
512 512 self._remove_notifiers(handler, n)
513 513 else:
514 514 names = parse_notifier_name(name)
515 515 for n in names:
516 516 self._add_notifiers(handler, n)
517 517
518 518 def trait_names(self, **metadata):
519 519 """Get a list of all the names of this classes traits."""
520 520 return self.traits(**metadata).keys()
521 521
522 522 def traits(self, **metadata):
523 523 """Get a list of all the traits of this class.
524 524
525 525 The TraitTypes returned don't know anything about the values
526 526 that the various HasTrait's instances are holding.
527 527
528 528 This follows the same algorithm as traits does and does not allow
529 529 for any simple way of specifying merely that a metadata name
530 530 exists, but has any value. This is because get_metadata returns
531 531 None if a metadata key doesn't exist.
532 532 """
533 533 traits = dict([memb for memb in getmembers(self.__class__) if \
534 534 isinstance(memb[1], TraitType)])
535 535
536 536 if len(metadata) == 0:
537 537 return traits
538 538
539 539 for meta_name, meta_eval in metadata.items():
540 540 if type(meta_eval) is not FunctionType:
541 541 metadata[meta_name] = _SimpleTest(meta_eval)
542 542
543 543 result = {}
544 544 for name, trait in traits.items():
545 545 for meta_name, meta_eval in metadata.items():
546 546 if not meta_eval(trait.get_metadata(meta_name)):
547 547 break
548 548 else:
549 549 result[name] = trait
550 550
551 551 return result
552 552
553 553 def trait_metadata(self, traitname, key):
554 554 """Get metadata values for trait by key."""
555 555 try:
556 556 trait = getattr(self.__class__, traitname)
557 557 except AttributeError:
558 558 raise TraitError("Class %s does not have a trait named %s" %
559 559 (self.__class__.__name__, traitname))
560 560 else:
561 561 return trait.get_metadata(key)
562 562
563 563 #-----------------------------------------------------------------------------
564 564 # Actual TraitTypes implementations/subclasses
565 565 #-----------------------------------------------------------------------------
566 566
567 567 #-----------------------------------------------------------------------------
568 568 # TraitTypes subclasses for handling classes and instances of classes
569 569 #-----------------------------------------------------------------------------
570 570
571 571
572 572 class ClassBasedTraitType(TraitType):
573 573 """A trait with error reporting for Type, Instance and This."""
574 574
575 575 def error(self, obj, value):
576 576 kind = type(value)
577 577 if kind is InstanceType:
578 578 msg = 'class %s' % value.__class__.__name__
579 579 else:
580 580 msg = '%s (i.e. %s)' % ( str( kind )[1:-1], repr( value ) )
581 581
582 582 super(ClassBasedTraitType, self).error(obj, msg)
583 583
584 584
585 585 class Type(ClassBasedTraitType):
586 586 """A trait whose value must be a subclass of a specified class."""
587 587
588 588 def __init__ (self, default_value=None, klass=None, allow_none=True, **metadata ):
589 589 """Construct a Type trait
590 590
591 591 A Type trait specifies that its values must be subclasses of
592 592 a particular class.
593 593
594 594 If only ``default_value`` is given, it is used for the ``klass`` as
595 595 well.
596 596
597 597 Parameters
598 598 ----------
599 599 default_value : class, str or None
600 600 The default value must be a subclass of klass. If an str,
601 601 the str must be a fully specified class name, like 'foo.bar.Bah'.
602 602 The string is resolved into real class, when the parent
603 603 :class:`HasTraits` class is instantiated.
604 604 klass : class, str, None
605 605 Values of this trait must be a subclass of klass. The klass
606 606 may be specified in a string like: 'foo.bar.MyClass'.
607 607 The string is resolved into real class, when the parent
608 608 :class:`HasTraits` class is instantiated.
609 609 allow_none : boolean
610 610 Indicates whether None is allowed as an assignable value. Even if
611 611 ``False``, the default value may be ``None``.
612 612 """
613 613 if default_value is None:
614 614 if klass is None:
615 615 klass = object
616 616 elif klass is None:
617 617 klass = default_value
618 618
619 619 if not (inspect.isclass(klass) or isinstance(klass, basestring)):
620 620 raise TraitError("A Type trait must specify a class.")
621 621
622 622 self.klass = klass
623 623 self._allow_none = allow_none
624 624
625 625 super(Type, self).__init__(default_value, **metadata)
626 626
627 627 def validate(self, obj, value):
628 628 """Validates that the value is a valid object instance."""
629 629 try:
630 630 if issubclass(value, self.klass):
631 631 return value
632 632 except:
633 633 if (value is None) and (self._allow_none):
634 634 return value
635 635
636 636 self.error(obj, value)
637 637
638 638 def info(self):
639 639 """ Returns a description of the trait."""
640 640 if isinstance(self.klass, basestring):
641 641 klass = self.klass
642 642 else:
643 643 klass = self.klass.__name__
644 644 result = 'a subclass of ' + klass
645 645 if self._allow_none:
646 646 return result + ' or None'
647 647 return result
648 648
649 649 def instance_init(self, obj):
650 650 self._resolve_classes()
651 651 super(Type, self).instance_init(obj)
652 652
653 653 def _resolve_classes(self):
654 654 if isinstance(self.klass, basestring):
655 655 self.klass = import_item(self.klass)
656 656 if isinstance(self.default_value, basestring):
657 657 self.default_value = import_item(self.default_value)
658 658
659 659 def get_default_value(self):
660 660 return self.default_value
661 661
662 662
663 663 class DefaultValueGenerator(object):
664 664 """A class for generating new default value instances."""
665 665
666 666 def __init__(self, *args, **kw):
667 667 self.args = args
668 668 self.kw = kw
669 669
670 670 def generate(self, klass):
671 671 return klass(*self.args, **self.kw)
672 672
673 673
674 674 class Instance(ClassBasedTraitType):
675 675 """A trait whose value must be an instance of a specified class.
676 676
677 677 The value can also be an instance of a subclass of the specified class.
678 678 """
679 679
680 680 def __init__(self, klass=None, args=None, kw=None,
681 681 allow_none=True, **metadata ):
682 682 """Construct an Instance trait.
683 683
684 684 This trait allows values that are instances of a particular
685 685 class or its sublclasses. Our implementation is quite different
686 686 from that of enthough.traits as we don't allow instances to be used
687 687 for klass and we handle the ``args`` and ``kw`` arguments differently.
688 688
689 689 Parameters
690 690 ----------
691 691 klass : class, str
692 692 The class that forms the basis for the trait. Class names
693 693 can also be specified as strings, like 'foo.bar.Bar'.
694 694 args : tuple
695 695 Positional arguments for generating the default value.
696 696 kw : dict
697 697 Keyword arguments for generating the default value.
698 698 allow_none : bool
699 699 Indicates whether None is allowed as a value.
700 700
701 701 Default Value
702 702 -------------
703 703 If both ``args`` and ``kw`` are None, then the default value is None.
704 704 If ``args`` is a tuple and ``kw`` is a dict, then the default is
705 705 created as ``klass(*args, **kw)``. If either ``args`` or ``kw`` is
706 706 not (but not both), None is replace by ``()`` or ``{}``.
707 707 """
708 708
709 709 self._allow_none = allow_none
710 710
711 711 if (klass is None) or (not (inspect.isclass(klass) or isinstance(klass, basestring))):
712 712 raise TraitError('The klass argument must be a class'
713 713 ' you gave: %r' % klass)
714 714 self.klass = klass
715 715
716 716 # self.klass is a class, so handle default_value
717 717 if args is None and kw is None:
718 718 default_value = None
719 719 else:
720 720 if args is None:
721 721 # kw is not None
722 722 args = ()
723 723 elif kw is None:
724 724 # args is not None
725 725 kw = {}
726 726
727 727 if not isinstance(kw, dict):
728 728 raise TraitError("The 'kw' argument must be a dict or None.")
729 729 if not isinstance(args, tuple):
730 730 raise TraitError("The 'args' argument must be a tuple or None.")
731 731
732 732 default_value = DefaultValueGenerator(*args, **kw)
733 733
734 734 super(Instance, self).__init__(default_value, **metadata)
735 735
736 736 def validate(self, obj, value):
737 737 if value is None:
738 738 if self._allow_none:
739 739 return value
740 740 self.error(obj, value)
741 741
742 742 if isinstance(value, self.klass):
743 743 return value
744 744 else:
745 745 self.error(obj, value)
746 746
747 747 def info(self):
748 748 if isinstance(self.klass, basestring):
749 749 klass = self.klass
750 750 else:
751 751 klass = self.klass.__name__
752 752 result = class_of(klass)
753 753 if self._allow_none:
754 754 return result + ' or None'
755 755
756 756 return result
757 757
758 758 def instance_init(self, obj):
759 759 self._resolve_classes()
760 760 super(Instance, self).instance_init(obj)
761 761
762 762 def _resolve_classes(self):
763 763 if isinstance(self.klass, basestring):
764 764 self.klass = import_item(self.klass)
765 765
766 766 def get_default_value(self):
767 767 """Instantiate a default value instance.
768 768
769 769 This is called when the containing HasTraits classes'
770 770 :meth:`__new__` method is called to ensure that a unique instance
771 771 is created for each HasTraits instance.
772 772 """
773 773 dv = self.default_value
774 774 if isinstance(dv, DefaultValueGenerator):
775 775 return dv.generate(self.klass)
776 776 else:
777 777 return dv
778 778
779 779
780 780 class This(ClassBasedTraitType):
781 781 """A trait for instances of the class containing this trait.
782 782
783 783 Because how how and when class bodies are executed, the ``This``
784 784 trait can only have a default value of None. This, and because we
785 785 always validate default values, ``allow_none`` is *always* true.
786 786 """
787 787
788 788 info_text = 'an instance of the same type as the receiver or None'
789 789
790 790 def __init__(self, **metadata):
791 791 super(This, self).__init__(None, **metadata)
792 792
793 793 def validate(self, obj, value):
794 794 # What if value is a superclass of obj.__class__? This is
795 795 # complicated if it was the superclass that defined the This
796 796 # trait.
797 797 if isinstance(value, self.this_class) or (value is None):
798 798 return value
799 799 else:
800 800 self.error(obj, value)
801 801
802 802
803 803 #-----------------------------------------------------------------------------
804 804 # Basic TraitTypes implementations/subclasses
805 805 #-----------------------------------------------------------------------------
806 806
807 807
808 808 class Any(TraitType):
809 809 default_value = None
810 810 info_text = 'any value'
811 811
812 812
813 813 class Int(TraitType):
814 814 """A integer trait."""
815 815
816 816 default_value = 0
817 817 info_text = 'an integer'
818 818
819 819 def validate(self, obj, value):
820 820 if isinstance(value, int):
821 821 return value
822 822 self.error(obj, value)
823 823
824 824 class CInt(Int):
825 825 """A casting version of the int trait."""
826 826
827 827 def validate(self, obj, value):
828 828 try:
829 829 return int(value)
830 830 except:
831 831 self.error(obj, value)
832 832
833 833
834 834 class Long(TraitType):
835 835 """A long integer trait."""
836 836
837 837 default_value = 0L
838 838 info_text = 'a long'
839 839
840 840 def validate(self, obj, value):
841 841 if isinstance(value, long):
842 842 return value
843 843 if isinstance(value, int):
844 844 return long(value)
845 845 self.error(obj, value)
846 846
847 847
848 848 class CLong(Long):
849 849 """A casting version of the long integer trait."""
850 850
851 851 def validate(self, obj, value):
852 852 try:
853 853 return long(value)
854 854 except:
855 855 self.error(obj, value)
856 856
857 857
858 858 class Float(TraitType):
859 859 """A float trait."""
860 860
861 861 default_value = 0.0
862 862 info_text = 'a float'
863 863
864 864 def validate(self, obj, value):
865 865 if isinstance(value, float):
866 866 return value
867 867 if isinstance(value, int):
868 868 return float(value)
869 869 self.error(obj, value)
870 870
871 871
872 872 class CFloat(Float):
873 873 """A casting version of the float trait."""
874 874
875 875 def validate(self, obj, value):
876 876 try:
877 877 return float(value)
878 878 except:
879 879 self.error(obj, value)
880 880
881 881 class Complex(TraitType):
882 882 """A trait for complex numbers."""
883 883
884 884 default_value = 0.0 + 0.0j
885 885 info_text = 'a complex number'
886 886
887 887 def validate(self, obj, value):
888 888 if isinstance(value, complex):
889 889 return value
890 890 if isinstance(value, (float, int)):
891 891 return complex(value)
892 892 self.error(obj, value)
893 893
894 894
895 895 class CComplex(Complex):
896 896 """A casting version of the complex number trait."""
897 897
898 898 def validate (self, obj, value):
899 899 try:
900 900 return complex(value)
901 901 except:
902 902 self.error(obj, value)
903 903
904 904
905 905 class Str(TraitType):
906 906 """A trait for strings."""
907 907
908 908 default_value = ''
909 909 info_text = 'a string'
910 910
911 911 def validate(self, obj, value):
912 912 if isinstance(value, str):
913 913 return value
914 914 self.error(obj, value)
915 915
916 916
917 917 class CStr(Str):
918 918 """A casting version of the string trait."""
919 919
920 920 def validate(self, obj, value):
921 921 try:
922 922 return str(value)
923 923 except:
924 924 try:
925 925 return unicode(value)
926 926 except:
927 927 self.error(obj, value)
928 928
929 929
930 930 class Unicode(TraitType):
931 931 """A trait for unicode strings."""
932 932
933 933 default_value = u''
934 934 info_text = 'a unicode string'
935 935
936 936 def validate(self, obj, value):
937 937 if isinstance(value, unicode):
938 938 return value
939 939 if isinstance(value, str):
940 940 return unicode(value)
941 941 self.error(obj, value)
942 942
943 943
944 944 class CUnicode(Unicode):
945 945 """A casting version of the unicode trait."""
946 946
947 947 def validate(self, obj, value):
948 948 try:
949 949 return unicode(value)
950 950 except:
951 951 self.error(obj, value)
952 952
953 953
954 954 class Bool(TraitType):
955 955 """A boolean (True, False) trait."""
956 956
957 957 default_value = False
958 958 info_text = 'a boolean'
959 959
960 960 def validate(self, obj, value):
961 961 if isinstance(value, bool):
962 962 return value
963 963 self.error(obj, value)
964 964
965 965
966 966 class CBool(Bool):
967 967 """A casting version of the boolean trait."""
968 968
969 969 def validate(self, obj, value):
970 970 try:
971 971 return bool(value)
972 972 except:
973 973 self.error(obj, value)
974 974
975 975
976 976 class Enum(TraitType):
977 977 """An enum that whose value must be in a given sequence."""
978 978
979 979 def __init__(self, values, default_value=None, allow_none=True, **metadata):
980 980 self.values = values
981 981 self._allow_none = allow_none
982 982 super(Enum, self).__init__(default_value, **metadata)
983 983
984 984 def validate(self, obj, value):
985 985 if value is None:
986 986 if self._allow_none:
987 987 return value
988 988
989 989 if value in self.values:
990 990 return value
991 991 self.error(obj, value)
992 992
993 993 def info(self):
994 994 """ Returns a description of the trait."""
995 995 result = 'any of ' + repr(self.values)
996 996 if self._allow_none:
997 997 return result + ' or None'
998 998 return result
999 999
1000 1000 class CaselessStrEnum(Enum):
1001 1001 """An enum of strings that are caseless in validate."""
1002 1002
1003 1003 def validate(self, obj, value):
1004 1004 if value is None:
1005 1005 if self._allow_none:
1006 1006 return value
1007 1007
1008 1008 if not isinstance(value, str):
1009 1009 self.error(obj, value)
1010 1010
1011 1011 for v in self.values:
1012 1012 if v.lower() == value.lower():
1013 1013 return v
1014 1014 self.error(obj, value)
1015 1015
1016 1016
1017 1017 class List(Instance):
1018 1018 """An instance of a Python list."""
1019 1019
1020 1020 def __init__(self, default_value=None, allow_none=True, **metadata):
1021 """Create a list trait type from a list or tuple.
1021 """Create a list trait type from a list, set, or tuple.
1022 1022
1023 1023 The default value is created by doing ``list(default_value)``,
1024 1024 which creates a copy of the ``default_value``.
1025 1025 """
1026 1026 if default_value is None:
1027 1027 args = ((),)
1028 1028 elif isinstance(default_value, SequenceTypes):
1029 1029 args = (default_value,)
1030 1030 else:
1031 1031 raise TypeError('default value of List was %s' % default_value)
1032 1032
1033 1033 super(List,self).__init__(klass=list, args=args,
1034 1034 allow_none=allow_none, **metadata)
1035 1035
1036 1036
1037 class Set(Instance):
1038 """An instance of a Python set."""
1039
1040 def __init__(self, default_value=None, allow_none=True, **metadata):
1041 """Create a set trait type from a set, list, or tuple.
1042
1043 The default value is created by doing ``set(default_value)``,
1044 which creates a copy of the ``default_value``.
1045 """
1046 if default_value is None:
1047 args = ((),)
1048 elif isinstance(default_value, SequenceTypes):
1049 args = (default_value,)
1050 else:
1051 raise TypeError('default value of Set was %s' % default_value)
1052
1053 super(Set,self).__init__(klass=set, args=args,
1054 allow_none=allow_none, **metadata)
1055
1056
1037 1057 class Dict(Instance):
1038 1058 """An instance of a Python dict."""
1039 1059
1040 1060 def __init__(self, default_value=None, allow_none=True, **metadata):
1041 1061 """Create a dict trait type from a dict.
1042 1062
1043 1063 The default value is created by doing ``dict(default_value)``,
1044 1064 which creates a copy of the ``default_value``.
1045 1065 """
1046 1066 if default_value is None:
1047 1067 args = ((),)
1048 1068 elif isinstance(default_value, dict):
1049 1069 args = (default_value,)
1050 1070 elif isinstance(default_value, SequenceTypes):
1051 1071 args = (default_value,)
1052 1072 else:
1053 1073 raise TypeError('default value of Dict was %s' % default_value)
1054 1074
1055 1075 super(Dict,self).__init__(klass=dict, args=args,
1056 1076 allow_none=allow_none, **metadata)
1057 1077
1058 1078
1059 1079 class TCPAddress(TraitType):
1060 1080 """A trait for an (ip, port) tuple.
1061 1081
1062 1082 This allows for both IPv4 IP addresses as well as hostnames.
1063 1083 """
1064 1084
1065 1085 default_value = ('127.0.0.1', 0)
1066 1086 info_text = 'an (ip, port) tuple'
1067 1087
1068 1088 def validate(self, obj, value):
1069 1089 if isinstance(value, tuple):
1070 1090 if len(value) == 2:
1071 1091 if isinstance(value[0], basestring) and isinstance(value[1], int):
1072 1092 port = value[1]
1073 1093 if port >= 0 and port <= 65535:
1074 1094 return value
1075 1095 self.error(obj, value)
@@ -1,265 +1,345 b''
1 1 #!/usr/bin/env python
2 2 """The IPython Controller with 0MQ
3 3 This is the master object that handles connections from engines and clients,
4 4 and monitors traffic through the various queues.
5 5 """
6 6 #-----------------------------------------------------------------------------
7 7 # Copyright (C) 2010 The IPython Development Team
8 8 #
9 9 # Distributed under the terms of the BSD License. The full license is in
10 10 # the file COPYING, distributed as part of this software.
11 11 #-----------------------------------------------------------------------------
12 12
13 13 #-----------------------------------------------------------------------------
14 14 # Imports
15 15 #-----------------------------------------------------------------------------
16 16 from __future__ import print_function
17 17
18 18 import os
19 import sys
19 20 import time
20 21 import logging
21 22 from multiprocessing import Process
22 23
23 24 import zmq
24 25 from zmq.eventloop import ioloop
25 26 from zmq.eventloop.zmqstream import ZMQStream
26 from zmq.devices import ProcessMonitoredQueue
27 # from zmq.devices import ProcessMonitoredQueue
27 28
28 29 # internal:
30 from IPython.utils.importstring import import_item
31 from IPython.utils.traitlets import Int, Str, Instance, List, Bool
29 32 from IPython.zmq.entry_point import bind_port
30 33
31 from hub import Hub
32 34 from entry_point import (make_base_argument_parser, select_random_ports, split_ports,
33 35 connect_logger, parse_url, signal_children, generate_exec_key,
34 36 local_logger)
35 37
36 38
37 39 import streamsession as session
38 40 import heartmonitor
39 41 from scheduler import launch_scheduler
42 from hub import Hub, HubFactory
40 43
41 44 from dictdb import DictDB
42 45 try:
43 46 import pymongo
44 47 except ImportError:
45 48 MongoDB=None
46 49 else:
47 50 from mongodb import MongoDB
48 51
49 52 #-------------------------------------------------------------------------
50 53 # Entry Point
51 54 #-------------------------------------------------------------------------
52 55
53 56 def make_argument_parser():
54 57 """Make an argument parser"""
55 58 parser = make_base_argument_parser()
56 59
57 60 parser.add_argument('--client', type=int, metavar='PORT', default=0,
58 61 help='set the XREP port for clients [default: random]')
59 62 parser.add_argument('--notice', type=int, metavar='PORT', default=0,
60 63 help='set the PUB socket for registration notification [default: random]')
61 64 parser.add_argument('--hb', type=str, metavar='PORTS',
62 65 help='set the 2 ports for heartbeats [default: random]')
63 66 parser.add_argument('--ping', type=int, default=100,
64 67 help='set the heartbeat period in ms [default: 100]')
65 68 parser.add_argument('--monitor', type=int, metavar='PORT', default=0,
66 69 help='set the SUB port for queue monitoring [default: random]')
67 70 parser.add_argument('--mux', type=str, metavar='PORTS',
68 71 help='set the XREP ports for the MUX queue [default: random]')
69 72 parser.add_argument('--task', type=str, metavar='PORTS',
70 73 help='set the XREP/XREQ ports for the task queue [default: random]')
71 74 parser.add_argument('--control', type=str, metavar='PORTS',
72 75 help='set the XREP ports for the control queue [default: random]')
73 76 parser.add_argument('--iopub', type=str, metavar='PORTS',
74 77 help='set the PUB/SUB ports for the iopub relay [default: random]')
75 78 parser.add_argument('--scheduler', type=str, default='lru',
76 79 choices = ['pure', 'lru', 'plainrandom', 'weighted', 'twobin','leastload'],
77 80 help='select the task scheduler [default: Python LRU]')
78 81 parser.add_argument('--mongodb', action='store_true',
79 82 help='Use MongoDB task storage [default: in-memory]')
80 83 parser.add_argument('--session', type=str, default=None,
81 84 help='Manually specify the session id.')
82 85
83 86 return parser
87
88 class ControllerFactory(HubFactory):
89 """Configurable for setting up a Hub and Schedulers."""
90
91 scheme = Str('pure', config=True)
92 usethreads = Bool(False, config=True)
93
94 # internal
95 children = List()
96 mq_class = Str('zmq.devices.ProcessMonitoredQueue')
97
98 def _update_mq(self):
99 self.mq_class = 'zmq.devices.%sMonitoredQueue'%('Thread' if self.usethreads else 'Process')
100
101 def __init__(self, **kwargs):
102 super(ControllerFactory, self).__init__(**kwargs)
103 self.subconstructors.append(self.construct_schedulers)
104 self._update_mq()
105 self.on_trait_change(self._update_mq, 'usethreads')
106
107 def start(self):
108 super(ControllerFactory, self).start()
109 for child in self.children:
110 child.start()
111 if not self.usethreads:
112 signal_children([ getattr(c, 'launcher', c) for c in self.children ])
113
114
115 def construct_schedulers(self):
116 children = self.children
117 mq = import_item(self.mq_class)
118
119 # IOPub relay (in a Process)
120 q = mq(zmq.PUB, zmq.SUB, zmq.PUB, 'N/A','iopub')
121 q.bind_in(self.client_addrs['iopub'])
122 q.bind_out(self.engine_addrs['iopub'])
123 q.setsockopt_out(zmq.SUBSCRIBE, '')
124 q.connect_mon(self.monitor_url)
125 q.daemon=True
126 children.append(q)
127
128 # Multiplexer Queue (in a Process)
129 q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'in', 'out')
130 q.bind_in(self.client_addrs['mux'])
131 q.bind_out(self.engine_addrs['mux'])
132 q.connect_mon(self.monitor_url)
133 q.daemon=True
134 children.append(q)
135
136 # Control Queue (in a Process)
137 q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'incontrol', 'outcontrol')
138 q.bind_in(self.client_addrs['control'])
139 q.bind_out(self.engine_addrs['control'])
140 q.connect_mon(self.monitor_url)
141 q.daemon=True
142 children.append(q)
143 # Task Queue (in a Process)
144 if self.scheme == 'pure':
145 logging.warn("task::using pure XREQ Task scheduler")
146 q = mq(zmq.XREP, zmq.XREQ, zmq.PUB, 'intask', 'outtask')
147 q.bind_in(self.client_addrs['task'])
148 q.bind_out(self.engine_addrs['task'])
149 q.connect_mon(self.monitor_url)
150 q.daemon=True
151 children.append(q)
152 elif self.scheme == 'none':
153 logging.warn("task::using no Task scheduler")
154
155 else:
156 logging.warn("task::using Python %s Task scheduler"%self.scheme)
157 sargs = (self.client_addrs['task'], self.engine_addrs['task'], self.monitor_url, self.client_addrs['notification'])
158 q = Process(target=launch_scheduler, args=sargs, kwargs = dict(scheme=self.scheme))
159 q.daemon=True
160 children.append(q)
161
84 162
85 163 def main(argv=None):
164 """DO NOT USE ME ANYMORE"""
86 165
87 166 parser = make_argument_parser()
88 167
89 168 args = parser.parse_args(argv)
90 169 parse_url(args)
91 170
92 171 iface="%s://%s"%(args.transport,args.ip)+':%i'
93 172
94 173 random_ports = 0
95 174 if args.hb:
96 175 hb = split_ports(args.hb, 2)
97 176 else:
98 177 hb = select_random_ports(2)
99 178 if args.mux:
100 179 mux = split_ports(args.mux, 2)
101 180 else:
102 181 mux = None
103 182 random_ports += 2
104 183 if args.iopub:
105 184 iopub = split_ports(args.iopub, 2)
106 185 else:
107 186 iopub = None
108 187 random_ports += 2
109 188 if args.task:
110 189 task = split_ports(args.task, 2)
111 190 else:
112 191 task = None
113 192 random_ports += 2
114 193 if args.control:
115 194 control = split_ports(args.control, 2)
116 195 else:
117 196 control = None
118 197 random_ports += 2
119 198
120 199 ctx = zmq.Context()
121 200 loop = ioloop.IOLoop.instance()
122 201
123 202
124 203 # Registrar socket
125 204 reg = ZMQStream(ctx.socket(zmq.XREP), loop)
126 205 regport = bind_port(reg, args.ip, args.regport)
127 206
128 207 ### Engine connections ###
129 208
130 209 # heartbeat
131 210 hpub = ctx.socket(zmq.PUB)
132 211 bind_port(hpub, args.ip, hb[0])
133 212 hrep = ctx.socket(zmq.XREP)
134 213 bind_port(hrep, args.ip, hb[1])
135 214
136 215 hmon = heartmonitor.HeartMonitor(loop, ZMQStream(hpub,loop), ZMQStream(hrep,loop),args.ping)
137 216 hmon.start()
138 217
139 218 ### Client connections ###
140 219 # Clientele socket
141 220 c = ZMQStream(ctx.socket(zmq.XREP), loop)
142 221 cport = bind_port(c, args.ip, args.client)
143 222 # Notifier socket
144 223 n = ZMQStream(ctx.socket(zmq.PUB), loop)
145 224 nport = bind_port(n, args.ip, args.notice)
146 225
147 226 ### Key File ###
148 227 if args.execkey and not os.path.isfile(args.execkey):
149 228 generate_exec_key(args.execkey)
150 229
151 230 thesession = session.StreamSession(username=args.ident or "controller",
152 231 keyfile=args.execkey, session=args.session)
153 232
154 233 ### build and launch the queues ###
155 234
156 235 # monitor socket
157 236 sub = ctx.socket(zmq.SUB)
158 237 sub.setsockopt(zmq.SUBSCRIBE, "")
159 238 monport = bind_port(sub, args.ip, args.monitor)
160 239 sub = ZMQStream(sub, loop)
161 240
162 241 ports = select_random_ports(random_ports)
163 242 children = []
164 243
165 244 # IOPub relay (in a Process)
166 245 if not iopub:
167 246 iopub = (ports.pop(),ports.pop())
168 247 q = ProcessMonitoredQueue(zmq.SUB, zmq.PUB, zmq.PUB, 'iopub', 'N/A')
169 248 q.bind_in(iface%iopub[1])
170 249 q.bind_out(iface%iopub[0])
171 250 q.setsockopt_in(zmq.SUBSCRIBE, '')
172 251 q.connect_mon(iface%monport)
173 252 q.daemon=True
174 253 q.start()
175 254 children.append(q.launcher)
176 255
177 256 # Multiplexer Queue (in a Process)
178 257 if not mux:
179 258 mux = (ports.pop(),ports.pop())
180 259 q = ProcessMonitoredQueue(zmq.XREP, zmq.XREP, zmq.PUB, 'in', 'out')
181 260 q.bind_in(iface%mux[0])
182 261 q.bind_out(iface%mux[1])
183 262 q.connect_mon(iface%monport)
184 263 q.daemon=True
185 264 q.start()
186 265 children.append(q.launcher)
187 266
188 267 # Control Queue (in a Process)
189 268 if not control:
190 269 control = (ports.pop(),ports.pop())
191 270 q = ProcessMonitoredQueue(zmq.XREP, zmq.XREP, zmq.PUB, 'incontrol', 'outcontrol')
192 271 q.bind_in(iface%control[0])
193 272 q.bind_out(iface%control[1])
194 273 q.connect_mon(iface%monport)
195 274 q.daemon=True
196 275 q.start()
197 276 children.append(q.launcher)
198 277 # Task Queue (in a Process)
199 278 if not task:
200 279 task = (ports.pop(),ports.pop())
201 280 if args.scheduler == 'pure':
202 281 q = ProcessMonitoredQueue(zmq.XREP, zmq.XREQ, zmq.PUB, 'intask', 'outtask')
203 282 q.bind_in(iface%task[0])
204 283 q.bind_out(iface%task[1])
205 284 q.connect_mon(iface%monport)
206 285 q.daemon=True
207 286 q.start()
208 287 children.append(q.launcher)
209 288 else:
210 289 log_addr = iface%args.logport if args.logport else None
211 290 sargs = (iface%task[0], iface%task[1], iface%monport, iface%nport,
212 291 log_addr, args.loglevel, args.scheduler)
213 292 print (sargs)
214 293 q = Process(target=launch_scheduler, args=sargs)
215 294 q.daemon=True
216 295 q.start()
217 296 children.append(q)
218 297
219 298 if args.mongodb:
220 299 from mongodb import MongoDB
221 300 db = MongoDB(thesession.session)
222 301 else:
223 302 db = DictDB()
224 303 time.sleep(.25)
225 304
226 305 # build connection dicts
227 306 engine_addrs = {
228 307 'control' : iface%control[1],
229 308 'mux': iface%mux[1],
230 309 'heartbeat': (iface%hb[0], iface%hb[1]),
231 310 'task' : iface%task[1],
232 311 'iopub' : iface%iopub[1],
233 312 'monitor' : iface%monport,
234 313 }
235 314
236 315 client_addrs = {
237 316 'control' : iface%control[0],
238 317 'query': iface%cport,
239 318 'mux': iface%mux[0],
240 319 'task' : iface%task[0],
241 320 'iopub' : iface%iopub[0],
242 321 'notification': iface%nport
243 322 }
244 323
245 324 # setup logging
246 325 if args.logport:
247 326 connect_logger(ctx, iface%args.logport, root="controller", loglevel=args.loglevel)
248 327 else:
249 328 local_logger(args.loglevel)
250 329
251 330 # register relay of signals to the children
252 331 signal_children(children)
253 332 hub = Hub(loop=loop, session=thesession, monitor=sub, heartmonitor=hmon,
254 333 registrar=reg, clientele=c, notifier=n, db=db,
255 334 engine_addrs=engine_addrs, client_addrs=client_addrs)
256 335
257 336 dc = ioloop.DelayedCallback(lambda : print("Controller started..."), 100, loop)
258 337 dc.start()
259 loop.start()
260
261
338 try:
339 loop.start()
340 except KeyboardInterrupt:
341 print ("interrupted, exiting...", file=sys.__stderr__)
262 342
263 343
264 344 if __name__ == '__main__':
265 345 main()
@@ -1,148 +1,152 b''
1 1 """A Task logger that presents our DB interface,
2 2 but exists entirely in memory and implemented with dicts.
3 3
4 4 TaskRecords are dicts of the form:
5 5 {
6 6 'msg_id' : str(uuid),
7 7 'client_uuid' : str(uuid),
8 8 'engine_uuid' : str(uuid) or None,
9 9 'header' : dict(header),
10 10 'content': dict(content),
11 11 'buffers': list(buffers),
12 12 'submitted': datetime,
13 13 'started': datetime or None,
14 14 'completed': datetime or None,
15 15 'resubmitted': datetime or None,
16 16 'result_header' : dict(header) or None,
17 17 'result_content' : dict(content) or None,
18 18 'result_buffers' : list(buffers) or None,
19 19 }
20 20 With this info, many of the special categories of tasks can be defined by query:
21 21
22 22 pending: completed is None
23 23 client's outstanding: client_uuid = uuid && completed is None
24 24 MIA: arrived is None (and completed is None)
25 25 etc.
26 26
27 27 EngineRecords are dicts of the form:
28 28 {
29 29 'eid' : int(id),
30 30 'uuid': str(uuid)
31 31 }
32 32 This may be extended, but is currently.
33 33
34 34 We support a subset of mongodb operators:
35 35 $lt,$gt,$lte,$gte,$ne,$in,$nin,$all,$mod,$exists
36 36 """
37 37 #-----------------------------------------------------------------------------
38 38 # Copyright (C) 2010 The IPython Development Team
39 39 #
40 40 # Distributed under the terms of the BSD License. The full license is in
41 41 # the file COPYING, distributed as part of this software.
42 42 #-----------------------------------------------------------------------------
43 43
44 44
45 45 from datetime import datetime
46 46
47 47 filters = {
48 48 '$eq' : lambda a,b: a==b,
49 49 '$lt' : lambda a,b: a < b,
50 50 '$gt' : lambda a,b: b > a,
51 51 '$lte': lambda a,b: a <= b,
52 52 '$gte': lambda a,b: a >= b,
53 53 '$ne' : lambda a,b: not a==b,
54 54 '$in' : lambda a,b: a in b,
55 55 '$nin': lambda a,b: a not in b,
56 56 '$all' : lambda a,b: all([ a in bb for bb in b ]),
57 57 '$mod': lambda a,b: a%b[0] == b[1],
58 58 '$exists' : lambda a,b: (b and a is not None) or (a is None and not b)
59 59 }
60 60
61 61
62 62 class CompositeFilter(object):
63 63 """Composite filter for matching multiple properties."""
64 64
65 65 def __init__(self, dikt):
66 66 self.tests = []
67 67 self.values = []
68 68 for key, value in dikt.iteritems():
69 69 self.tests.append(filters[key])
70 70 self.values.append(value)
71 71
72 72 def __call__(self, value):
73 73 for test,check in zip(self.tests, self.values):
74 74 if not test(value, check):
75 75 return False
76 76 return True
77 77
78 class DictDB(object):
78 class BaseDB(object):
79 """Empty Parent class so traitlets work on DB."""
80 pass
81
82 class DictDB(BaseDB):
79 83 """Basic in-memory dict-based object for saving Task Records.
80 84
81 85 This is the first object to present the DB interface
82 86 for logging tasks out of memory.
83 87
84 88 The interface is based on MongoDB, so adding a MongoDB
85 89 backend should be straightforward.
86 90 """
87 91 _records = None
88 92
89 93 def __init__(self):
90 94 self._records = dict()
91 95
92 96 def _match_one(self, rec, tests):
93 97 """Check if a specific record matches tests."""
94 98 for key,test in tests.iteritems():
95 99 if not test(rec.get(key, None)):
96 100 return False
97 101 return True
98 102
99 103 def _match(self, check, id_only=True):
100 104 """Find all the matches for a check dict."""
101 105 matches = {}
102 106 tests = {}
103 107 for k,v in check.iteritems():
104 108 if isinstance(v, dict):
105 109 tests[k] = CompositeFilter(v)
106 110 else:
107 111 tests[k] = lambda o: o==v
108 112
109 113 for msg_id, rec in self._records.iteritems():
110 114 if self._match_one(rec, tests):
111 115 matches[msg_id] = rec
112 116 if id_only:
113 117 return matches.keys()
114 118 else:
115 119 return matches
116 120
117 121
118 122 def add_record(self, msg_id, rec):
119 123 """Add a new Task Record, by msg_id."""
120 124 if self._records.has_key(msg_id):
121 125 raise KeyError("Already have msg_id %r"%(msg_id))
122 126 self._records[msg_id] = rec
123 127
124 128 def get_record(self, msg_id):
125 129 """Get a specific Task Record, by msg_id."""
126 130 if not self._records.has_key(msg_id):
127 131 raise KeyError("No such msg_id %r"%(msg_id))
128 132 return self._records[msg_id]
129 133
130 134 def update_record(self, msg_id, rec):
131 135 """Update the data in an existing record."""
132 136 self._records[msg_id].update(rec)
133 137
134 138 def drop_matching_records(self, check):
135 139 """Remove a record from the DB."""
136 140 matches = self._match(check, id_only=True)
137 141 for m in matches:
138 142 del self._records[m]
139 143
140 144 def drop_record(self, msg_id):
141 145 """Remove a record from the DB."""
142 146 del self._records[msg_id]
143 147
144 148
145 149 def find_records(self, check, id_only=False):
146 150 """Find records matching a query dict."""
147 151 matches = self._match(check, id_only)
148 152 return matches No newline at end of file
@@ -1,144 +1,190 b''
1 1 #!/usr/bin/env python
2 2 """A simple engine that talks to a controller over 0MQ.
3 3 it handles registration, etc. and launches a kernel
4 4 connected to the Controller's queue(s).
5 5 """
6 6 from __future__ import print_function
7 7 import sys
8 8 import time
9 9 import traceback
10 10 import uuid
11 11 import logging
12 12 from pprint import pprint
13 13
14 14 import zmq
15 15 from zmq.eventloop import ioloop, zmqstream
16 16
17 17 # internal
18 18 from IPython.config.configurable import Configurable
19 from IPython.utils.traitlets import Instance, Str, Dict
19 from IPython.utils.traitlets import Instance, Str, Dict, Int, Type
20 20 # from IPython.utils.localinterfaces import LOCALHOST
21 21
22 from factory import RegistrationFactory
23
22 24 from streamsession import Message, StreamSession
23 25 from streamkernel import Kernel, make_kernel
24 26 import heartmonitor
25 27 from entry_point import (make_base_argument_parser, connect_engine_logger, parse_url,
26 28 local_logger)
27 29 # import taskthread
28 logger = logging.getLogger()
29 30
30 31 def printer(*msg):
31 # print (logger.handlers, file=sys.__stdout__)
32 logger.info(str(msg))
32 # print (logging.handlers, file=sys.__stdout__)
33 logging.info(str(msg))
33 34
34 class Engine(Configurable):
35 class EngineFactory(RegistrationFactory):
35 36 """IPython engine"""
36 37
37 kernel=None
38 id=None
39
40 38 # configurables:
41 context=Instance(zmq.Context)
42 loop=Instance(ioloop.IOLoop)
43 session=Instance(StreamSession)
44 ident=Str()
45 registrar=Instance(zmqstream.ZMQStream)
46 user_ns=Dict()
39 user_ns=Dict(config=True)
40 out_stream_factory=Type('IPython.zmq.iostream.OutStream', config=True)
41 display_hook_factory=Type('IPython.zmq.displayhook.DisplayHook', config=True)
42
43 # not configurable:
44 id=Int(allow_none=True)
45 registrar=Instance('zmq.eventloop.zmqstream.ZMQStream')
46 kernel=Instance(Kernel)
47
47 48
48 49 def __init__(self, **kwargs):
49 super(Engine, self).__init__(**kwargs)
50 if not self.ident:
51 self.ident = str(uuid.uuid4())
52 self.registrar.on_send(printer)
50 super(EngineFactory, self).__init__(**kwargs)
51 ctx = self.context
52
53 reg = ctx.socket(zmq.PAIR)
54 reg.setsockopt(zmq.IDENTITY, self.ident)
55 reg.connect(self.url)
56 self.registrar = zmqstream.ZMQStream(reg, self.loop)
53 57
54 58 def register(self):
59 """send the registration_request"""
55 60
61 logging.info("registering")
56 62 content = dict(queue=self.ident, heartbeat=self.ident, control=self.ident)
57 63 self.registrar.on_recv(self.complete_registration)
58 64 # print (self.session.key)
59 65 self.session.send(self.registrar, "registration_request",content=content)
60 66
61 67 def complete_registration(self, msg):
62 68 # print msg
69 ctx = self.context
70 loop = self.loop
71 identity = self.ident
72 print (identity)
73
63 74 idents,msg = self.session.feed_identities(msg)
64 75 msg = Message(self.session.unpack_message(msg))
76
65 77 if msg.content.status == 'ok':
66 78 self.id = int(msg.content.id)
67 self.session.username = 'engine-%i'%self.id
79
80 # create Shell Streams (MUX, Task, etc.):
68 81 queue_addr = msg.content.mux
69 82 shell_addrs = [ str(queue_addr) ]
70 control_addr = str(msg.content.control)
71 83 task_addr = msg.content.task
72 iopub_addr = msg.content.iopub
73 84 if task_addr:
74 85 shell_addrs.append(str(task_addr))
86 shell_streams = []
87 for addr in shell_addrs:
88 stream = zmqstream.ZMQStream(ctx.socket(zmq.PAIR), loop)
89 stream.setsockopt(zmq.IDENTITY, identity)
90 stream.connect(addr)
91 shell_streams.append(stream)
92
93 # control stream:
94 control_addr = str(msg.content.control)
95 control_stream = zmqstream.ZMQStream(ctx.socket(zmq.PAIR), loop)
96 control_stream.setsockopt(zmq.IDENTITY, identity)
97 control_stream.connect(control_addr)
75 98
99 # create iopub stream:
100 iopub_addr = msg.content.iopub
101 iopub_stream = zmqstream.ZMQStream(ctx.socket(zmq.PUB), loop)
102 iopub_stream.setsockopt(zmq.IDENTITY, identity)
103 iopub_stream.connect(iopub_addr)
104
105 # launch heartbeat
76 106 hb_addrs = msg.content.heartbeat
107 # print (hb_addrs)
108
109 # # Redirect input streams and set a display hook.
110 # if self.out_stream_factory:
111 # sys.stdout = self.out_stream_factory(self.session, iopub_stream, u'stdout')
112 # sys.stdout.topic = 'engine.%i.stdout'%self.id
113 # sys.stderr = self.out_stream_factory(self.session, iopub_stream, u'stderr')
114 # sys.stderr.topic = 'engine.%i.stderr'%self.id
115 # if self.display_hook_factory:
116 # sys.displayhook = self.display_hook_factory(self.session, iopub_stream)
117 # sys.displayhook.topic = 'engine.%i.pyout'%self.id
118
77 119 # ioloop.DelayedCallback(self.heart.start, 1000, self.loop).start()
78 k = make_kernel(self.id, self.ident, control_addr, shell_addrs, iopub_addr,
79 hb_addrs, client_addr=None, loop=self.loop,
80 context=self.context, key=self.session.key)[-1]
81 self.kernel = k
82 if self.user_ns is not None:
83 self.user_ns.update(self.kernel.user_ns)
84 self.kernel.user_ns = self.user_ns
120 self.kernel = Kernel(int_id=self.id, ident=self.ident, session=self.session,
121 control_stream=control_stream,
122 shell_streams=shell_streams, iopub_stream=iopub_stream, loop=loop,
123 user_ns = self.user_ns, config=self.config)
124 self.kernel.start()
125
126 heart = heartmonitor.Heart(*map(str, hb_addrs), heart_id=identity)
127 heart.start()
128
85 129
86 130 else:
87 logger.error("Registration Failed: %s"%msg)
131 logging.error("Registration Failed: %s"%msg)
88 132 raise Exception("Registration Failed: %s"%msg)
89 133
90 logger.info("completed registration with id %i"%self.id)
91
92 # logger.info(str(msg))
134 logging.info("Completed registration with id %i"%self.id)
135
93 136
94 137 def unregister(self):
95 self.session.send(self.registrar, "unregistration_request", content=dict(id=int(self.session.username)))
138 self.session.send(self.registrar, "unregistration_request", content=dict(id=self.id))
96 139 time.sleep(1)
97 140 sys.exit(0)
98 141
99 142 def start(self):
100 logger.info("registering")
101 self.register()
143 dc = ioloop.DelayedCallback(self.register, 0, self.loop)
144 dc.start()
145
102 146
103
104 147
105 148 def main(argv=None, user_ns=None):
106
149 """DO NOT USE ME ANYMORE"""
107 150 parser = make_base_argument_parser()
108 151
109 152 args = parser.parse_args(argv)
110 153
111 154 parse_url(args)
112 155
113 156 iface="%s://%s"%(args.transport,args.ip)+':%i'
114 157
115 158 loop = ioloop.IOLoop.instance()
116 159 session = StreamSession(keyfile=args.execkey)
117 160 # print (session.key)
118 161 ctx = zmq.Context()
119 162
120 163 # setup logging
121 164
122 165 reg_conn = iface % args.regport
123 166 print (reg_conn, file=sys.__stdout__)
124 167 print ("Starting the engine...", file=sys.__stderr__)
125 168
126 169 reg = ctx.socket(zmq.PAIR)
127 170 reg.connect(reg_conn)
128 171 reg = zmqstream.ZMQStream(reg, loop)
129 172
130 173 e = Engine(context=ctx, loop=loop, session=session, registrar=reg,
131 174 ident=args.ident or '', user_ns=user_ns)
132 175 if args.logport:
133 176 print ("connecting logger to %s"%(iface%args.logport), file=sys.__stdout__)
134 177 connect_engine_logger(ctx, iface%args.logport, e, loglevel=args.loglevel)
135 178 else:
136 179 local_logger(args.loglevel)
137 180
138 181 dc = ioloop.DelayedCallback(e.start, 0, loop)
139 182 dc.start()
140 loop.start()
183 try:
184 loop.start()
185 except KeyboardInterrupt:
186 print ("interrupted, exiting...", file=sys.__stderr__)
141 187
142 188 # Execution as a script
143 189 if __name__ == '__main__':
144 190 main()
@@ -1,147 +1,167 b''
1 1 """ Defines helper functions for creating kernel entry points and process
2 2 launchers.
3
4 ************
5 NOTE: Most of this module has been deprecated by moving to Configurables
6 ************
3 7 """
4 8
5 9 # Standard library imports.
6 10 import logging
7 11 import atexit
8 12 import sys
9 13 import os
10 14 import stat
11 15 import socket
12 16 from subprocess import Popen, PIPE
13 17 from signal import signal, SIGINT, SIGABRT, SIGTERM
14 18 try:
15 19 from signal import SIGKILL
16 20 except ImportError:
17 21 SIGKILL=None
18 22
19 23 # System library imports.
20 24 import zmq
21 25 from zmq.log import handlers
22 26 # Local imports.
23 27 from IPython.core.ultratb import FormattedTB
24 28 from IPython.external.argparse import ArgumentParser
25 29 from IPython.zmq.log import EnginePUBHandler
26 30
27 31 def split_ports(s, n):
28 32 """Parser helper for multiport strings"""
29 33 if not s:
30 34 return tuple([0]*n)
31 35 ports = map(int, s.split(','))
32 36 if len(ports) != n:
33 37 raise ValueError
34 38 return ports
35 39
40 _random_ports = set()
41
36 42 def select_random_ports(n):
37 43 """Selects and return n random ports that are available."""
38 44 ports = []
39 45 for i in xrange(n):
40 46 sock = socket.socket()
41 47 sock.bind(('', 0))
48 while sock.getsockname()[1] in _random_ports:
49 sock.close()
50 sock = socket.socket()
51 sock.bind(('', 0))
42 52 ports.append(sock)
43 53 for i, sock in enumerate(ports):
44 54 port = sock.getsockname()[1]
45 55 sock.close()
46 56 ports[i] = port
57 _random_ports.add(port)
47 58 return ports
48 59
49 60 def parse_url(args):
50 61 """Ensure args.url contains full transport://interface:port"""
51 62 if args.url:
52 63 iface = args.url.split('://',1)
53 64 if len(args) == 2:
54 65 args.transport,iface = iface
55 66 iface = iface.split(':')
56 67 args.ip = iface[0]
57 68 if iface[1]:
58 69 args.regport = iface[1]
59 70 args.url = "%s://%s:%i"%(args.transport, args.ip,args.regport)
60 71
61 72 def signal_children(children):
62 73 """Relay interupt/term signals to children, for more solid process cleanup."""
63 74 def terminate_children(sig, frame):
75 logging.critical("Got signal %i, terminating children..."%sig)
64 76 for child in children:
65 77 child.terminate()
78
79 sys.exit(sig != SIGINT)
66 80 # sys.exit(sig)
67 81 for sig in (SIGINT, SIGABRT, SIGTERM):
68 82 signal(sig, terminate_children)
69 83
70 84 def generate_exec_key(keyfile):
71 85 import uuid
72 86 newkey = str(uuid.uuid4())
73 87 with open(keyfile, 'w') as f:
74 88 # f.write('ipython-key ')
75 f.write(newkey)
89 f.write(newkey+'\n')
76 90 # set user-only RW permissions (0600)
77 91 # this will have no effect on Windows
78 92 os.chmod(keyfile, stat.S_IRUSR|stat.S_IWUSR)
79 93
80 94
81 95 def make_base_argument_parser():
82 96 """ Creates an ArgumentParser for the generic arguments supported by all
83 97 ipcluster entry points.
84 98 """
85 99
86 100 parser = ArgumentParser()
87 101 parser.add_argument('--ip', type=str, default='127.0.0.1',
88 102 help='set the controller\'s IP address [default: local]')
89 103 parser.add_argument('--transport', type=str, default='tcp',
90 104 help='set the transport to use [default: tcp]')
91 105 parser.add_argument('--regport', type=int, metavar='PORT', default=10101,
92 106 help='set the XREP port for registration [default: 10101]')
93 107 parser.add_argument('--logport', type=int, metavar='PORT', default=0,
94 108 help='set the PUB port for remote logging [default: log to stdout]')
95 109 parser.add_argument('--loglevel', type=str, metavar='LEVEL', default=logging.INFO,
96 110 help='set the log level [default: INFO]')
97 111 parser.add_argument('--ident', type=str,
98 112 help='set the ZMQ identity [default: random]')
99 113 parser.add_argument('--packer', type=str, default='json',
100 114 choices=['json','pickle'],
101 115 help='set the message format method [default: json]')
102 116 parser.add_argument('--url', type=str,
103 117 help='set transport,ip,regport in one arg, e.g. tcp://127.0.0.1:10101')
104 118 parser.add_argument('--execkey', type=str,
105 119 help="File containing key for authenticating requests.")
106 120
107 121 return parser
108 122
109 123 def integer_loglevel(loglevel):
110 124 try:
111 125 loglevel = int(loglevel)
112 126 except ValueError:
113 127 if isinstance(loglevel, str):
114 128 loglevel = getattr(logging, loglevel)
115 129 return loglevel
116 130
117 131 def connect_logger(context, iface, root="ip", loglevel=logging.DEBUG):
132 logger = logging.getLogger()
133 if any([isinstance(h, handlers.PUBHandler) for h in logger.handlers]):
134 # don't add a second PUBHandler
135 return
118 136 loglevel = integer_loglevel(loglevel)
119 137 lsock = context.socket(zmq.PUB)
120 138 lsock.connect(iface)
121 139 handler = handlers.PUBHandler(lsock)
122 140 handler.setLevel(loglevel)
123 141 handler.root_topic = root
124 logger = logging.getLogger()
125 142 logger.addHandler(handler)
126 143 logger.setLevel(loglevel)
127 144
128 145 def connect_engine_logger(context, iface, engine, loglevel=logging.DEBUG):
129 146 logger = logging.getLogger()
147 if any([isinstance(h, handlers.PUBHandler) for h in logger.handlers]):
148 # don't add a second PUBHandler
149 return
130 150 loglevel = integer_loglevel(loglevel)
131 151 lsock = context.socket(zmq.PUB)
132 152 lsock.connect(iface)
133 153 handler = EnginePUBHandler(engine, lsock)
134 154 handler.setLevel(loglevel)
135 155 logger.addHandler(handler)
136 156 logger.setLevel(loglevel)
137 157
138 158 def local_logger(loglevel=logging.DEBUG):
139 159 loglevel = integer_loglevel(loglevel)
140 160 logger = logging.getLogger()
141 if logger.handlers:
142 # if there are any handlers, skip the hookup
161 if any([isinstance(h, logging.StreamHandler) for h in logger.handlers]):
162 # don't add a second StreamHandler
143 163 return
144 164 handler = logging.StreamHandler()
145 165 handler.setLevel(loglevel)
146 166 logger.addHandler(handler)
147 167 logger.setLevel(loglevel)
@@ -1,159 +1,156 b''
1 1 #!/usr/bin/env python
2 2 """
3 3 A multi-heart Heartbeat system using PUB and XREP sockets. pings are sent out on the PUB,
4 4 and hearts are tracked based on their XREQ identities.
5 5 """
6 6
7 7 from __future__ import print_function
8 8 import time
9 9 import uuid
10 10 import logging
11 11
12 12 import zmq
13 13 from zmq.devices import ProcessDevice,ThreadDevice
14 14 from zmq.eventloop import ioloop, zmqstream
15 15
16 logger = logging.getLogger()
17
18 16 class Heart(object):
19 17 """A basic heart object for responding to a HeartMonitor.
20 18 This is a simple wrapper with defaults for the most common
21 19 Device model for responding to heartbeats.
22 20
23 21 It simply builds a threadsafe zmq.FORWARDER Device, defaulting to using
24 22 SUB/XREQ for in/out.
25 23
26 24 You can specify the XREQ's IDENTITY via the optional heart_id argument."""
27 25 device=None
28 26 id=None
29 27 def __init__(self, in_addr, out_addr, in_type=zmq.SUB, out_type=zmq.XREQ, heart_id=None):
30 28 self.device = ThreadDevice(zmq.FORWARDER, in_type, out_type)
31 29 self.device.daemon=True
32 30 self.device.connect_in(in_addr)
33 31 self.device.connect_out(out_addr)
34 32 if in_type == zmq.SUB:
35 33 self.device.setsockopt_in(zmq.SUBSCRIBE, "")
36 34 if heart_id is None:
37 35 heart_id = str(uuid.uuid4())
38 36 self.device.setsockopt_out(zmq.IDENTITY, heart_id)
39 37 self.id = heart_id
40 38
41 39 def start(self):
42 40 return self.device.start()
43 41
44 42 class HeartMonitor(object):
45 43 """A basic HeartMonitor class
46 44 pingstream: a PUB stream
47 45 pongstream: an XREP stream
48 46 period: the period of the heartbeat in milliseconds"""
49 47 loop=None
50 48 pingstream=None
51 49 pongstream=None
52 50 period=None
53 51 hearts=None
54 52 on_probation=None
55 53 last_ping=None
56 54 # debug=False
57 55
58 56 def __init__(self, loop, pingstream, pongstream, period=1000):
59 57 self.loop = loop
60 58 self.period = period
61 59
62 60 self.pingstream = pingstream
63 61 self.pongstream = pongstream
64 62 self.pongstream.on_recv(self.handle_pong)
65 63
66 64 self.hearts = set()
67 65 self.responses = set()
68 66 self.on_probation = set()
69 67 self.lifetime = 0
70 68 self.tic = time.time()
71 69
72 70 self._new_handlers = set()
73 71 self._failure_handlers = set()
74 72
75 73 def start(self):
76 74 self.caller = ioloop.PeriodicCallback(self.beat, self.period, self.loop)
77 75 self.caller.start()
78 76
79 77 def add_new_heart_handler(self, handler):
80 78 """add a new handler for new hearts"""
81 logger.debug("heartbeat::new_heart_handler: %s"%handler)
79 logging.debug("heartbeat::new_heart_handler: %s"%handler)
82 80 self._new_handlers.add(handler)
83 81
84 82 def add_heart_failure_handler(self, handler):
85 83 """add a new handler for heart failure"""
86 logger.debug("heartbeat::new heart failure handler: %s"%handler)
84 logging.debug("heartbeat::new heart failure handler: %s"%handler)
87 85 self._failure_handlers.add(handler)
88 86
89 87 def beat(self):
90 88 self.pongstream.flush()
91 89 self.last_ping = self.lifetime
92 90
93 91 toc = time.time()
94 92 self.lifetime += toc-self.tic
95 93 self.tic = toc
96 # logger.debug("heartbeat::%s"%self.lifetime)
94 # logging.debug("heartbeat::%s"%self.lifetime)
97 95 goodhearts = self.hearts.intersection(self.responses)
98 96 missed_beats = self.hearts.difference(goodhearts)
99 97 heartfailures = self.on_probation.intersection(missed_beats)
100 98 newhearts = self.responses.difference(goodhearts)
101 99 map(self.handle_new_heart, newhearts)
102 100 map(self.handle_heart_failure, heartfailures)
103 101 self.on_probation = missed_beats.intersection(self.hearts)
104 102 self.responses = set()
105 103 # print self.on_probation, self.hearts
106 # logger.debug("heartbeat::beat %.3f, %i beating hearts"%(self.lifetime, len(self.hearts)))
104 # logging.debug("heartbeat::beat %.3f, %i beating hearts"%(self.lifetime, len(self.hearts)))
107 105 self.pingstream.send(str(self.lifetime))
108 106
109 107 def handle_new_heart(self, heart):
110 108 if self._new_handlers:
111 109 for handler in self._new_handlers:
112 110 handler(heart)
113 111 else:
114 logger.info("heartbeat::yay, got new heart %s!"%heart)
112 logging.info("heartbeat::yay, got new heart %s!"%heart)
115 113 self.hearts.add(heart)
116 114
117 115 def handle_heart_failure(self, heart):
118 116 if self._failure_handlers:
119 117 for handler in self._failure_handlers:
120 118 try:
121 119 handler(heart)
122 120 except Exception as e:
123 print (e)
124 logger.error("heartbeat::Bad Handler! %s"%handler)
121 logging.error("heartbeat::Bad Handler! %s"%handler, exc_info=True)
125 122 pass
126 123 else:
127 logger.info("heartbeat::Heart %s failed :("%heart)
124 logging.info("heartbeat::Heart %s failed :("%heart)
128 125 self.hearts.remove(heart)
129 126
130 127
131 128 def handle_pong(self, msg):
132 129 "a heart just beat"
133 130 if msg[1] == str(self.lifetime):
134 131 delta = time.time()-self.tic
135 # logger.debug("heartbeat::heart %r took %.2f ms to respond"%(msg[0], 1000*delta))
132 # logging.debug("heartbeat::heart %r took %.2f ms to respond"%(msg[0], 1000*delta))
136 133 self.responses.add(msg[0])
137 134 elif msg[1] == str(self.last_ping):
138 135 delta = time.time()-self.tic + (self.lifetime-self.last_ping)
139 logger.warn("heartbeat::heart %r missed a beat, and took %.2f ms to respond"%(msg[0], 1000*delta))
136 logging.warn("heartbeat::heart %r missed a beat, and took %.2f ms to respond"%(msg[0], 1000*delta))
140 137 self.responses.add(msg[0])
141 138 else:
142 logger.warn("heartbeat::got bad heartbeat (possibly old?): %s (current=%.3f)"%
139 logging.warn("heartbeat::got bad heartbeat (possibly old?): %s (current=%.3f)"%
143 140 (msg[1],self.lifetime))
144 141
145 142
146 143 if __name__ == '__main__':
147 144 loop = ioloop.IOLoop.instance()
148 145 context = zmq.Context()
149 146 pub = context.socket(zmq.PUB)
150 147 pub.bind('tcp://127.0.0.1:5555')
151 148 xrep = context.socket(zmq.XREP)
152 149 xrep.bind('tcp://127.0.0.1:5556')
153 150
154 151 outstream = zmqstream.ZMQStream(pub, loop)
155 152 instream = zmqstream.ZMQStream(xrep, loop)
156 153
157 154 hb = HeartMonitor(loop, outstream, instream)
158 155
159 156 loop.start()
@@ -1,906 +1,1079 b''
1 1 #!/usr/bin/env python
2 2 """The IPython Controller Hub with 0MQ
3 3 This is the master object that handles connections from engines and clients,
4 4 and monitors traffic through the various queues.
5 5 """
6 6 #-----------------------------------------------------------------------------
7 7 # Copyright (C) 2010 The IPython Development Team
8 8 #
9 9 # Distributed under the terms of the BSD License. The full license is in
10 10 # the file COPYING, distributed as part of this software.
11 11 #-----------------------------------------------------------------------------
12 12
13 13 #-----------------------------------------------------------------------------
14 14 # Imports
15 15 #-----------------------------------------------------------------------------
16 16 from __future__ import print_function
17 17
18 18 import sys
19 19 from datetime import datetime
20 20 import time
21 21 import logging
22 22
23 23 import zmq
24 from zmq.eventloop import ioloop, zmqstream
24 from zmq.eventloop import ioloop
25 from zmq.eventloop.zmqstream import ZMQStream
25 26
26 27 # internal:
27 28 from IPython.config.configurable import Configurable
28 from IPython.utils.traitlets import HasTraits, Instance, Int, Str, Dict
29 # from IPython.zmq.log import logger # a Logger object
29 from IPython.utils.traitlets import HasTraits, Instance, Int, Str, Dict, Set, List, Bool
30 from IPython.utils.importstring import import_item
31
32 from entry_point import select_random_ports
33 from factory import RegistrationFactory
30 34
31 35 from streamsession import Message, wrap_exception, ISO8601
32 36 from heartmonitor import HeartMonitor
33 37 from util import validate_url_container
34 38
35 39 try:
36 40 from pymongo.binary import Binary
37 41 except ImportError:
38 42 MongoDB=None
39 43 else:
40 44 from mongodb import MongoDB
41 45
42 46 #-----------------------------------------------------------------------------
43 47 # Code
44 48 #-----------------------------------------------------------------------------
45 49
46 logger = logging.getLogger()
47
48 50 def _passer(*args, **kwargs):
49 51 return
50 52
51 53 def _printer(*args, **kwargs):
52 54 print (args)
53 55 print (kwargs)
54 56
55 57 def init_record(msg):
56 58 """Initialize a TaskRecord based on a request."""
57 59 header = msg['header']
58 60 return {
59 61 'msg_id' : header['msg_id'],
60 62 'header' : header,
61 63 'content': msg['content'],
62 64 'buffers': msg['buffers'],
63 65 'submitted': datetime.strptime(header['date'], ISO8601),
64 66 'client_uuid' : None,
65 67 'engine_uuid' : None,
66 68 'started': None,
67 69 'completed': None,
68 70 'resubmitted': None,
69 71 'result_header' : None,
70 72 'result_content' : None,
71 73 'result_buffers' : None,
72 74 'queue' : None,
73 75 'pyin' : None,
74 76 'pyout': None,
75 77 'pyerr': None,
76 78 'stdout': '',
77 79 'stderr': '',
78 80 }
79 81
80 82
81 83 class EngineConnector(HasTraits):
82 84 """A simple object for accessing the various zmq connections of an object.
83 85 Attributes are:
84 86 id (int): engine ID
85 87 uuid (str): uuid (unused?)
86 88 queue (str): identity of queue's XREQ socket
87 89 registration (str): identity of registration XREQ socket
88 90 heartbeat (str): identity of heartbeat XREQ socket
89 91 """
90 92 id=Int(0)
91 93 queue=Str()
92 94 control=Str()
93 95 registration=Str()
94 96 heartbeat=Str()
95 pending=Instance(set)
97 pending=Set()
96 98
97 99 def __init__(self, **kwargs):
98 100 super(EngineConnector, self).__init__(**kwargs)
99 logger.info("engine::Engine Connected: %i"%self.id)
101 logging.info("engine::Engine Connected: %i"%self.id)
102
103 class HubFactory(RegistrationFactory):
104 """The Configurable for setting up a Hub."""
105
106 # port-pairs for monitoredqueues:
107 hb = Instance(list, config=True)
108 def _hb_default(self):
109 return select_random_ports(2)
110
111 mux = Instance(list, config=True)
112 def _mux_default(self):
113 return select_random_ports(2)
114
115 task = Instance(list, config=True)
116 def _task_default(self):
117 return select_random_ports(2)
118
119 control = Instance(list, config=True)
120 def _control_default(self):
121 return select_random_ports(2)
122
123 iopub = Instance(list, config=True)
124 def _iopub_default(self):
125 return select_random_ports(2)
126
127 # single ports:
128 mon_port = Instance(int, config=True)
129 def _mon_port_default(self):
130 return select_random_ports(1)[0]
131
132 query_port = Instance(int, config=True)
133 def _query_port_default(self):
134 return select_random_ports(1)[0]
135
136 notifier_port = Instance(int, config=True)
137 def _notifier_port_default(self):
138 return select_random_ports(1)[0]
139
140 ping = Int(1000, config=True) # ping frequency
141
142 engine_ip = Str('127.0.0.1', config=True)
143 engine_transport = Str('tcp', config=True)
144
145 client_ip = Str('127.0.0.1', config=True)
146 client_transport = Str('tcp', config=True)
147
148 monitor_ip = Str('127.0.0.1', config=True)
149 monitor_transport = Str('tcp', config=True)
150
151 monitor_url = Str('')
152
153 db_class = Str('IPython.zmq.parallel.dictdb.DictDB', config=True)
154
155 # not configurable
156 db = Instance('IPython.zmq.parallel.dictdb.BaseDB')
157 heartmonitor = Instance('IPython.zmq.parallel.heartmonitor.HeartMonitor')
158 subconstructors = List()
159 _constructed = Bool(False)
160
161 def _update_monitor_url(self):
162 self.monitor_url = "%s://%s:%i"%(self.monitor_transport, self.monitor_ip, self.mon_port)
163
164 def _sync_ips(self):
165 self.engine_ip = self.ip
166 self.client_ip = self.ip
167 self.monitor_ip = self.ip
168 self._update_monitor_url()
169
170 def _sync_transports(self):
171 self.engine_transport = self.transport
172 self.client_transport = self.transport
173 self.monitor_transport = self.transport
174 self._update_monitor_url()
175
176 def __init__(self, **kwargs):
177 super(HubFactory, self).__init__(**kwargs)
178 self._update_monitor_url()
179 self.on_trait_change(self._sync_ips, 'ip')
180 self.on_trait_change(self._sync_transports, 'transport')
181 self.subconstructors.append(self.construct_hub)
182
183
184 def construct(self):
185 assert not self._constructed, "already constructed!"
186
187 for subc in self.subconstructors:
188 subc()
189
190 self._constructed = True
191
192
193 def start(self):
194 assert self._constructed, "must be constructed by self.construct() first!"
195 self.heartmonitor.start()
196 logging.info("Heartmonitor started")
197
198 def construct_hub(self):
199 """construct"""
200 client_iface = "%s://%s:"%(self.client_transport, self.client_ip) + "%i"
201 engine_iface = "%s://%s:"%(self.engine_transport, self.engine_ip) + "%i"
202
203 ctx = self.context
204 loop = self.loop
205
206 # Registrar socket
207 reg = ZMQStream(ctx.socket(zmq.XREP), loop)
208 reg.bind(client_iface % self.regport)
209 logging.info("Hub listening on %s for registration."%(client_iface%self.regport))
210 if self.client_ip != self.engine_ip:
211 reg.bind(engine_iface % self.regport)
212 logging.info("Hub listening on %s for registration."%(engine_iface%self.regport))
213
214 ### Engine connections ###
215
216 # heartbeat
217 hpub = ctx.socket(zmq.PUB)
218 hpub.bind(engine_iface % self.hb[0])
219 hrep = ctx.socket(zmq.XREP)
220 hrep.bind(engine_iface % self.hb[1])
221
222 self.heartmonitor = HeartMonitor(loop, ZMQStream(hpub,loop), ZMQStream(hrep,loop), self.ping)
223
224 ### Client connections ###
225 # Clientele socket
226 c = ZMQStream(ctx.socket(zmq.XREP), loop)
227 c.bind(client_iface%self.query_port)
228 # Notifier socket
229 n = ZMQStream(ctx.socket(zmq.PUB), loop)
230 n.bind(client_iface%self.notifier_port)
231
232 ### build and launch the queues ###
233
234 # monitor socket
235 sub = ctx.socket(zmq.SUB)
236 sub.setsockopt(zmq.SUBSCRIBE, "")
237 sub.bind(self.monitor_url)
238 sub = ZMQStream(sub, loop)
100 239
101 class Hub(Configurable):
240 # connect the db
241 self.db = import_item(self.db_class)()
242 time.sleep(.25)
243
244 # build connection dicts
245 self.engine_addrs = {
246 'control' : engine_iface%self.control[1],
247 'mux': engine_iface%self.mux[1],
248 'heartbeat': (engine_iface%self.hb[0], engine_iface%self.hb[1]),
249 'task' : engine_iface%self.task[1],
250 'iopub' : engine_iface%self.iopub[1],
251 # 'monitor' : engine_iface%self.mon_port,
252 }
253
254 self.client_addrs = {
255 'control' : client_iface%self.control[0],
256 'query': client_iface%self.query_port,
257 'mux': client_iface%self.mux[0],
258 'task' : client_iface%self.task[0],
259 'iopub' : client_iface%self.iopub[0],
260 'notification': client_iface%self.notifier_port
261 }
262 logging.debug("hub::Hub engine addrs: %s"%self.engine_addrs)
263 logging.debug("hub::Hub client addrs: %s"%self.client_addrs)
264 self.hub = Hub(loop=loop, session=self.session, monitor=sub, heartmonitor=self.heartmonitor,
265 registrar=reg, clientele=c, notifier=n, db=self.db,
266 engine_addrs=self.engine_addrs, client_addrs=self.client_addrs)
267
268
269 class Hub(HasTraits):
102 270 """The IPython Controller Hub with 0MQ connections
103 271
104 272 Parameters
105 273 ==========
106 274 loop: zmq IOLoop instance
107 275 session: StreamSession object
108 276 <removed> context: zmq context for creating new connections (?)
109 277 queue: ZMQStream for monitoring the command queue (SUB)
110 278 registrar: ZMQStream for engine registration requests (XREP)
111 279 heartbeat: HeartMonitor object checking the pulse of the engines
112 280 clientele: ZMQStream for client connections (XREP)
113 281 not used for jobs, only query/control commands
114 282 notifier: ZMQStream for broadcasting engine registration changes (PUB)
115 283 db: connection to db for out of memory logging of commands
116 284 NotImplemented
117 285 engine_addrs: dict of zmq connection information for engines to connect
118 286 to the queues.
119 287 client_addrs: dict of zmq connection information for engines to connect
120 288 to the queues.
121 289 """
122 290 # internal data structures:
123 ids=None # engine IDs
124 keytable=None
125 engines=None
126 clients=None
127 hearts=None
128 pending=None
129 tasks=None
130 completed=None
291 ids=Set() # engine IDs
292 keytable=Dict()
293 by_ident=Dict()
294 engines=Dict()
295 clients=Dict()
296 hearts=Dict()
297 pending=Set()
298 queues=Dict() # pending msg_ids keyed by engine_id
299 tasks=Dict() # pending msg_ids submitted as tasks, keyed by client_id
300 completed=Dict() # completed msg_ids keyed by engine_id
301 all_completed=Set() # completed msg_ids keyed by engine_id
131 302 # mia=None
132 incoming_registrations=None
133 registration_timeout=None
303 incoming_registrations=Dict()
304 registration_timeout=Int()
305 _idcounter=Int(0)
134 306
135 307 # objects from constructor:
136 308 loop=Instance(ioloop.IOLoop)
137 registrar=Instance(zmqstream.ZMQStream)
138 clientele=Instance(zmqstream.ZMQStream)
139 monitor=Instance(zmqstream.ZMQStream)
309 registrar=Instance(ZMQStream)
310 clientele=Instance(ZMQStream)
311 monitor=Instance(ZMQStream)
140 312 heartmonitor=Instance(HeartMonitor)
141 notifier=Instance(zmqstream.ZMQStream)
313 notifier=Instance(ZMQStream)
142 314 db=Instance(object)
143 315 client_addrs=Dict()
144 316 engine_addrs=Dict()
145 317
146 318
147 319 def __init__(self, **kwargs):
148 320 """
149 321 # universal:
150 322 loop: IOLoop for creating future connections
151 323 session: streamsession for sending serialized data
152 324 # engine:
153 325 queue: ZMQStream for monitoring queue messages
154 326 registrar: ZMQStream for engine registration
155 327 heartbeat: HeartMonitor object for tracking engines
156 328 # client:
157 329 clientele: ZMQStream for client connections
158 330 # extra:
159 331 db: ZMQStream for db connection (NotImplemented)
160 332 engine_addrs: zmq address/protocol dict for engine connections
161 333 client_addrs: zmq address/protocol dict for client connections
162 334 """
163 335
164 336 super(Hub, self).__init__(**kwargs)
165 337 self.ids = set()
166 self.keytable={}
167 self.incoming_registrations={}
168 self.engines = {}
169 self.by_ident = {}
170 self.clients = {}
171 self.hearts = {}
338 self.pending = set()
339 # self.keytable={}
340 # self.incoming_registrations={}
341 # self.engines = {}
342 # self.by_ident = {}
343 # self.clients = {}
344 # self.hearts = {}
172 345 # self.mia = set()
173 346 self.registration_timeout = max(5000, 2*self.heartmonitor.period)
174 347 # this is the stuff that will move to DB:
175 self.pending = set() # pending messages, keyed by msg_id
176 self.queues = {} # pending msg_ids keyed by engine_id
177 self.tasks = {} # pending msg_ids submitted as tasks, keyed by client_id
178 self.completed = {} # completed msg_ids keyed by engine_id
179 self.all_completed = set()
180 self._idcounter = 0
348 # self.pending = set() # pending messages, keyed by msg_id
349 # self.queues = {} # pending msg_ids keyed by engine_id
350 # self.tasks = {} # pending msg_ids submitted as tasks, keyed by client_id
351 # self.completed = {} # completed msg_ids keyed by engine_id
352 # self.all_completed = set()
353 # self._idcounter = 0
181 354 # self.sockets = {}
182 355 # self.loop = loop
183 356 # self.session = session
184 357 # self.registrar = registrar
185 358 # self.clientele = clientele
186 359 # self.queue = queue
187 360 # self.heartmonitor = heartbeat
188 361 # self.notifier = notifier
189 362 # self.db = db
190 363
191 364 # validate connection dicts:
192 365 # self.client_addrs = client_addrs
193 366 validate_url_container(self.client_addrs)
194 367
195 368 # assert isinstance(self.client_addrs['queue'], str)
196 369 # assert isinstance(self.client_addrs['control'], str)
197 370 # self.hb_addrs = hb_addrs
198 371 validate_url_container(self.engine_addrs)
199 372 # self.engine_addrs = engine_addrs
200 373 # assert isinstance(self.engine_addrs['queue'], str)
201 374 # assert isinstance(self.engine_addrs['control'], str)
202 375 # assert len(engine_addrs['heartbeat']) == 2
203 376
204 377 # register our callbacks
205 378 self.registrar.on_recv(self.dispatch_register_request)
206 379 self.clientele.on_recv(self.dispatch_client_msg)
207 380 self.monitor.on_recv(self.dispatch_monitor_traffic)
208 381
209 382 self.heartmonitor.add_heart_failure_handler(self.handle_heart_failure)
210 383 self.heartmonitor.add_new_heart_handler(self.handle_new_heart)
211 384
212 385 self.monitor_handlers = { 'in' : self.save_queue_request,
213 386 'out': self.save_queue_result,
214 387 'intask': self.save_task_request,
215 388 'outtask': self.save_task_result,
216 389 'tracktask': self.save_task_destination,
217 390 'incontrol': _passer,
218 391 'outcontrol': _passer,
219 392 'iopub': self.save_iopub_message,
220 393 }
221 394
222 395 self.client_handlers = {'queue_request': self.queue_status,
223 396 'result_request': self.get_results,
224 397 'purge_request': self.purge_results,
225 398 'load_request': self.check_load,
226 399 'resubmit_request': self.resubmit_task,
227 400 'shutdown_request': self.shutdown_request,
228 401 }
229 402
230 403 self.registrar_handlers = {'registration_request' : self.register_engine,
231 404 'unregistration_request' : self.unregister_engine,
232 405 'connection_request': self.connection_request,
233 406 }
234 407
235 logger.info("controller::created controller")
408 logging.info("hub::created hub")
236 409
237 410 @property
238 411 def _next_id(self):
239 412 """gemerate a new ID"""
240 413 newid = self._idcounter
241 414 self._idcounter += 1
242 415 return newid
243 416 # newid = 0
244 417 # incoming = [id[0] for id in self.incoming_registrations.itervalues()]
245 418 # # print newid, self.ids, self.incoming_registrations
246 419 # while newid in self.ids or newid in incoming:
247 420 # newid += 1
248 421 # return newid
249 422
250 423 #-----------------------------------------------------------------------------
251 424 # message validation
252 425 #-----------------------------------------------------------------------------
253 426
254 427 def _validate_targets(self, targets):
255 428 """turn any valid targets argument into a list of integer ids"""
256 429 if targets is None:
257 430 # default to all
258 431 targets = self.ids
259 432
260 433 if isinstance(targets, (int,str,unicode)):
261 434 # only one target specified
262 435 targets = [targets]
263 436 _targets = []
264 437 for t in targets:
265 438 # map raw identities to ids
266 439 if isinstance(t, (str,unicode)):
267 440 t = self.by_ident.get(t, t)
268 441 _targets.append(t)
269 442 targets = _targets
270 443 bad_targets = [ t for t in targets if t not in self.ids ]
271 444 if bad_targets:
272 445 raise IndexError("No Such Engine: %r"%bad_targets)
273 446 if not targets:
274 447 raise IndexError("No Engines Registered")
275 448 return targets
276 449
277 450 def _validate_client_msg(self, msg):
278 451 """validates and unpacks headers of a message. Returns False if invalid,
279 452 (ident, header, parent, content)"""
280 453 client_id = msg[0]
281 454 try:
282 455 msg = self.session.unpack_message(msg[1:], content=True)
283 456 except:
284 logger.error("client::Invalid Message %s"%msg, exc_info=True)
457 logging.error("client::Invalid Message %s"%msg, exc_info=True)
285 458 return False
286 459
287 460 msg_type = msg.get('msg_type', None)
288 461 if msg_type is None:
289 462 return False
290 463 header = msg.get('header')
291 464 # session doesn't handle split content for now:
292 465 return client_id, msg
293 466
294 467
295 468 #-----------------------------------------------------------------------------
296 469 # dispatch methods (1 per stream)
297 470 #-----------------------------------------------------------------------------
298 471
299 472 def dispatch_register_request(self, msg):
300 473 """"""
301 logger.debug("registration::dispatch_register_request(%s)"%msg)
474 logging.debug("registration::dispatch_register_request(%s)"%msg)
302 475 idents,msg = self.session.feed_identities(msg)
303 476 if not idents:
304 logger.error("Bad Queue Message: %s"%msg, exc_info=True)
477 logging.error("Bad Queue Message: %s"%msg, exc_info=True)
305 478 return
306 479 try:
307 480 msg = self.session.unpack_message(msg,content=True)
308 481 except:
309 logger.error("registration::got bad registration message: %s"%msg, exc_info=True)
482 logging.error("registration::got bad registration message: %s"%msg, exc_info=True)
310 483 return
311 484
312 485 msg_type = msg['msg_type']
313 486 content = msg['content']
314 487
315 488 handler = self.registrar_handlers.get(msg_type, None)
316 489 if handler is None:
317 logger.error("registration::got bad registration message: %s"%msg)
490 logging.error("registration::got bad registration message: %s"%msg)
318 491 else:
319 492 handler(idents, msg)
320 493
321 494 def dispatch_monitor_traffic(self, msg):
322 495 """all ME and Task queue messages come through here, as well as
323 496 IOPub traffic."""
324 logger.debug("monitor traffic: %s"%msg[:2])
497 logging.debug("monitor traffic: %s"%msg[:2])
325 498 switch = msg[0]
326 499 idents, msg = self.session.feed_identities(msg[1:])
327 500 if not idents:
328 logger.error("Bad Monitor Message: %s"%msg)
501 logging.error("Bad Monitor Message: %s"%msg)
329 502 return
330 503 handler = self.monitor_handlers.get(switch, None)
331 504 if handler is not None:
332 505 handler(idents, msg)
333 506 else:
334 logger.error("Invalid monitor topic: %s"%switch)
507 logging.error("Invalid monitor topic: %s"%switch)
335 508
336 509
337 510 def dispatch_client_msg(self, msg):
338 511 """Route messages from clients"""
339 512 idents, msg = self.session.feed_identities(msg)
340 513 if not idents:
341 logger.error("Bad Client Message: %s"%msg)
514 logging.error("Bad Client Message: %s"%msg)
342 515 return
343 516 client_id = idents[0]
344 517 try:
345 518 msg = self.session.unpack_message(msg, content=True)
346 519 except:
347 520 content = wrap_exception()
348 logger.error("Bad Client Message: %s"%msg, exc_info=True)
349 self.session.send(self.clientele, "controller_error", ident=client_id,
521 logging.error("Bad Client Message: %s"%msg, exc_info=True)
522 self.session.send(self.clientele, "hub_error", ident=client_id,
350 523 content=content)
351 524 return
352 525
353 526 # print client_id, header, parent, content
354 527 #switch on message type:
355 528 msg_type = msg['msg_type']
356 logger.info("client:: client %s requested %s"%(client_id, msg_type))
529 logging.info("client:: client %s requested %s"%(client_id, msg_type))
357 530 handler = self.client_handlers.get(msg_type, None)
358 531 try:
359 532 assert handler is not None, "Bad Message Type: %s"%msg_type
360 533 except:
361 534 content = wrap_exception()
362 logger.error("Bad Message Type: %s"%msg_type, exc_info=True)
363 self.session.send(self.clientele, "controller_error", ident=client_id,
535 logging.error("Bad Message Type: %s"%msg_type, exc_info=True)
536 self.session.send(self.clientele, "hub_error", ident=client_id,
364 537 content=content)
365 538 return
366 539 else:
367 540 handler(client_id, msg)
368 541
369 542 def dispatch_db(self, msg):
370 543 """"""
371 544 raise NotImplementedError
372 545
373 546 #---------------------------------------------------------------------------
374 547 # handler methods (1 per event)
375 548 #---------------------------------------------------------------------------
376 549
377 550 #----------------------- Heartbeat --------------------------------------
378 551
379 552 def handle_new_heart(self, heart):
380 553 """handler to attach to heartbeater.
381 554 Called when a new heart starts to beat.
382 555 Triggers completion of registration."""
383 logger.debug("heartbeat::handle_new_heart(%r)"%heart)
556 logging.debug("heartbeat::handle_new_heart(%r)"%heart)
384 557 if heart not in self.incoming_registrations:
385 logger.info("heartbeat::ignoring new heart: %r"%heart)
558 logging.info("heartbeat::ignoring new heart: %r"%heart)
386 559 else:
387 560 self.finish_registration(heart)
388 561
389 562
390 563 def handle_heart_failure(self, heart):
391 564 """handler to attach to heartbeater.
392 565 called when a previously registered heart fails to respond to beat request.
393 566 triggers unregistration"""
394 logger.debug("heartbeat::handle_heart_failure(%r)"%heart)
567 logging.debug("heartbeat::handle_heart_failure(%r)"%heart)
395 568 eid = self.hearts.get(heart, None)
396 569 queue = self.engines[eid].queue
397 570 if eid is None:
398 logger.info("heartbeat::ignoring heart failure %r"%heart)
571 logging.info("heartbeat::ignoring heart failure %r"%heart)
399 572 else:
400 573 self.unregister_engine(heart, dict(content=dict(id=eid, queue=queue)))
401 574
402 575 #----------------------- MUX Queue Traffic ------------------------------
403 576
404 577 def save_queue_request(self, idents, msg):
405 578 if len(idents) < 2:
406 logger.error("invalid identity prefix: %s"%idents)
579 logging.error("invalid identity prefix: %s"%idents)
407 580 return
408 581 queue_id, client_id = idents[:2]
409 582 try:
410 583 msg = self.session.unpack_message(msg, content=False)
411 584 except:
412 logger.error("queue::client %r sent invalid message to %r: %s"%(client_id, queue_id, msg), exc_info=True)
585 logging.error("queue::client %r sent invalid message to %r: %s"%(client_id, queue_id, msg), exc_info=True)
413 586 return
414 587
415 588 eid = self.by_ident.get(queue_id, None)
416 589 if eid is None:
417 logger.error("queue::target %r not registered"%queue_id)
418 logger.debug("queue:: valid are: %s"%(self.by_ident.keys()))
590 logging.error("queue::target %r not registered"%queue_id)
591 logging.debug("queue:: valid are: %s"%(self.by_ident.keys()))
419 592 return
420 593
421 594 header = msg['header']
422 595 msg_id = header['msg_id']
423 596 record = init_record(msg)
424 597 record['engine_uuid'] = queue_id
425 598 record['client_uuid'] = client_id
426 599 record['queue'] = 'mux'
427 600 if MongoDB is not None and isinstance(self.db, MongoDB):
428 601 record['buffers'] = map(Binary, record['buffers'])
429 602 self.pending.add(msg_id)
430 603 self.queues[eid].append(msg_id)
431 604 self.db.add_record(msg_id, record)
432 605
433 606 def save_queue_result(self, idents, msg):
434 607 if len(idents) < 2:
435 logger.error("invalid identity prefix: %s"%idents)
608 logging.error("invalid identity prefix: %s"%idents)
436 609 return
437 610
438 611 client_id, queue_id = idents[:2]
439 612 try:
440 613 msg = self.session.unpack_message(msg, content=False)
441 614 except:
442 logger.error("queue::engine %r sent invalid message to %r: %s"%(
615 logging.error("queue::engine %r sent invalid message to %r: %s"%(
443 616 queue_id,client_id, msg), exc_info=True)
444 617 return
445 618
446 619 eid = self.by_ident.get(queue_id, None)
447 620 if eid is None:
448 logger.error("queue::unknown engine %r is sending a reply: "%queue_id)
449 logger.debug("queue:: %s"%msg[2:])
621 logging.error("queue::unknown engine %r is sending a reply: "%queue_id)
622 logging.debug("queue:: %s"%msg[2:])
450 623 return
451 624
452 625 parent = msg['parent_header']
453 626 if not parent:
454 627 return
455 628 msg_id = parent['msg_id']
456 629 if msg_id in self.pending:
457 630 self.pending.remove(msg_id)
458 631 self.all_completed.add(msg_id)
459 632 self.queues[eid].remove(msg_id)
460 633 self.completed[eid].append(msg_id)
461 634 rheader = msg['header']
462 635 completed = datetime.strptime(rheader['date'], ISO8601)
463 636 started = rheader.get('started', None)
464 637 if started is not None:
465 638 started = datetime.strptime(started, ISO8601)
466 639 result = {
467 640 'result_header' : rheader,
468 641 'result_content': msg['content'],
469 642 'started' : started,
470 643 'completed' : completed
471 644 }
472 645 if MongoDB is not None and isinstance(self.db, MongoDB):
473 646 result['result_buffers'] = map(Binary, msg['buffers'])
474 647 else:
475 648 result['result_buffers'] = msg['buffers']
476 649 self.db.update_record(msg_id, result)
477 650 else:
478 logger.debug("queue:: unknown msg finished %s"%msg_id)
651 logging.debug("queue:: unknown msg finished %s"%msg_id)
479 652
480 653 #--------------------- Task Queue Traffic ------------------------------
481 654
482 655 def save_task_request(self, idents, msg):
483 656 """Save the submission of a task."""
484 657 client_id = idents[0]
485 658
486 659 try:
487 660 msg = self.session.unpack_message(msg, content=False)
488 661 except:
489 logger.error("task::client %r sent invalid task message: %s"%(
662 logging.error("task::client %r sent invalid task message: %s"%(
490 663 client_id, msg), exc_info=True)
491 664 return
492 665 record = init_record(msg)
493 666 if MongoDB is not None and isinstance(self.db, MongoDB):
494 667 record['buffers'] = map(Binary, record['buffers'])
495 668 record['client_uuid'] = client_id
496 669 record['queue'] = 'task'
497 670 header = msg['header']
498 671 msg_id = header['msg_id']
499 672 self.pending.add(msg_id)
500 673 self.db.add_record(msg_id, record)
501 674
502 675 def save_task_result(self, idents, msg):
503 676 """save the result of a completed task."""
504 677 client_id = idents[0]
505 678 try:
506 679 msg = self.session.unpack_message(msg, content=False)
507 680 except:
508 logger.error("task::invalid task result message send to %r: %s"%(
681 logging.error("task::invalid task result message send to %r: %s"%(
509 682 client_id, msg), exc_info=True)
510 683 raise
511 684 return
512 685
513 686 parent = msg['parent_header']
514 687 if not parent:
515 688 # print msg
516 logger.warn("Task %r had no parent!"%msg)
689 logging.warn("Task %r had no parent!"%msg)
517 690 return
518 691 msg_id = parent['msg_id']
519 692
520 693 header = msg['header']
521 694 engine_uuid = header.get('engine', None)
522 695 eid = self.by_ident.get(engine_uuid, None)
523 696
524 697 if msg_id in self.pending:
525 698 self.pending.remove(msg_id)
526 699 self.all_completed.add(msg_id)
527 700 if eid is not None:
528 701 self.completed[eid].append(msg_id)
529 702 if msg_id in self.tasks[eid]:
530 703 self.tasks[eid].remove(msg_id)
531 704 completed = datetime.strptime(header['date'], ISO8601)
532 705 started = header.get('started', None)
533 706 if started is not None:
534 707 started = datetime.strptime(started, ISO8601)
535 708 result = {
536 709 'result_header' : header,
537 710 'result_content': msg['content'],
538 711 'started' : started,
539 712 'completed' : completed,
540 713 'engine_uuid': engine_uuid
541 714 }
542 715 if MongoDB is not None and isinstance(self.db, MongoDB):
543 716 result['result_buffers'] = map(Binary, msg['buffers'])
544 717 else:
545 718 result['result_buffers'] = msg['buffers']
546 719 self.db.update_record(msg_id, result)
547 720
548 721 else:
549 logger.debug("task::unknown task %s finished"%msg_id)
722 logging.debug("task::unknown task %s finished"%msg_id)
550 723
551 724 def save_task_destination(self, idents, msg):
552 725 try:
553 726 msg = self.session.unpack_message(msg, content=True)
554 727 except:
555 logger.error("task::invalid task tracking message", exc_info=True)
728 logging.error("task::invalid task tracking message", exc_info=True)
556 729 return
557 730 content = msg['content']
558 731 print (content)
559 732 msg_id = content['msg_id']
560 733 engine_uuid = content['engine_id']
561 734 eid = self.by_ident[engine_uuid]
562 735
563 logger.info("task::task %s arrived on %s"%(msg_id, eid))
736 logging.info("task::task %s arrived on %s"%(msg_id, eid))
564 737 # if msg_id in self.mia:
565 738 # self.mia.remove(msg_id)
566 739 # else:
567 # logger.debug("task::task %s not listed as MIA?!"%(msg_id))
740 # logging.debug("task::task %s not listed as MIA?!"%(msg_id))
568 741
569 742 self.tasks[eid].append(msg_id)
570 743 # self.pending[msg_id][1].update(received=datetime.now(),engine=(eid,engine_uuid))
571 744 self.db.update_record(msg_id, dict(engine_uuid=engine_uuid))
572 745
573 746 def mia_task_request(self, idents, msg):
574 747 raise NotImplementedError
575 748 client_id = idents[0]
576 749 # content = dict(mia=self.mia,status='ok')
577 750 # self.session.send('mia_reply', content=content, idents=client_id)
578 751
579 752
580 753 #--------------------- IOPub Traffic ------------------------------
581 754
582 755 def save_iopub_message(self, topics, msg):
583 756 """save an iopub message into the db"""
584 757 print (topics)
585 758 try:
586 759 msg = self.session.unpack_message(msg, content=True)
587 760 except:
588 logger.error("iopub::invalid IOPub message", exc_info=True)
761 logging.error("iopub::invalid IOPub message", exc_info=True)
589 762 return
590 763
591 764 parent = msg['parent_header']
592 765 if not parent:
593 logger.error("iopub::invalid IOPub message: %s"%msg)
766 logging.error("iopub::invalid IOPub message: %s"%msg)
594 767 return
595 768 msg_id = parent['msg_id']
596 769 msg_type = msg['msg_type']
597 770 content = msg['content']
598 771
599 772 # ensure msg_id is in db
600 773 try:
601 774 rec = self.db.get_record(msg_id)
602 775 except:
603 logger.error("iopub::IOPub message has invalid parent", exc_info=True)
776 logging.error("iopub::IOPub message has invalid parent", exc_info=True)
604 777 return
605 778 # stream
606 779 d = {}
607 780 if msg_type == 'stream':
608 781 name = content['name']
609 782 s = rec[name] or ''
610 783 d[name] = s + content['data']
611 784
612 785 elif msg_type == 'pyerr':
613 786 d['pyerr'] = content
614 787 else:
615 788 d[msg_type] = content['data']
616 789
617 790 self.db.update_record(msg_id, d)
618 791
619 792
620 793
621 794 #-------------------------------------------------------------------------
622 795 # Registration requests
623 796 #-------------------------------------------------------------------------
624 797
625 798 def connection_request(self, client_id, msg):
626 799 """Reply with connection addresses for clients."""
627 logger.info("client::client %s connected"%client_id)
800 logging.info("client::client %s connected"%client_id)
628 801 content = dict(status='ok')
629 802 content.update(self.client_addrs)
630 803 jsonable = {}
631 804 for k,v in self.keytable.iteritems():
632 805 jsonable[str(k)] = v
633 806 content['engines'] = jsonable
634 807 self.session.send(self.registrar, 'connection_reply', content, parent=msg, ident=client_id)
635 808
636 809 def register_engine(self, reg, msg):
637 810 """Register a new engine."""
638 811 content = msg['content']
639 812 try:
640 813 queue = content['queue']
641 814 except KeyError:
642 logger.error("registration::queue not specified", exc_info=True)
815 logging.error("registration::queue not specified", exc_info=True)
643 816 return
644 817 heart = content.get('heartbeat', None)
645 818 """register a new engine, and create the socket(s) necessary"""
646 819 eid = self._next_id
647 820 # print (eid, queue, reg, heart)
648 821
649 logger.debug("registration::register_engine(%i, %r, %r, %r)"%(eid, queue, reg, heart))
822 logging.debug("registration::register_engine(%i, %r, %r, %r)"%(eid, queue, reg, heart))
650 823
651 824 content = dict(id=eid,status='ok')
652 825 content.update(self.engine_addrs)
653 826 # check if requesting available IDs:
654 827 if queue in self.by_ident:
655 828 try:
656 829 raise KeyError("queue_id %r in use"%queue)
657 830 except:
658 831 content = wrap_exception()
659 logger.error("queue_id %r in use"%queue, exc_info=True)
832 logging.error("queue_id %r in use"%queue, exc_info=True)
660 833 elif heart in self.hearts: # need to check unique hearts?
661 834 try:
662 835 raise KeyError("heart_id %r in use"%heart)
663 836 except:
664 logger.error("heart_id %r in use"%heart, exc_info=True)
837 logging.error("heart_id %r in use"%heart, exc_info=True)
665 838 content = wrap_exception()
666 839 else:
667 840 for h, pack in self.incoming_registrations.iteritems():
668 841 if heart == h:
669 842 try:
670 843 raise KeyError("heart_id %r in use"%heart)
671 844 except:
672 logger.error("heart_id %r in use"%heart, exc_info=True)
845 logging.error("heart_id %r in use"%heart, exc_info=True)
673 846 content = wrap_exception()
674 847 break
675 848 elif queue == pack[1]:
676 849 try:
677 850 raise KeyError("queue_id %r in use"%queue)
678 851 except:
679 logger.error("queue_id %r in use"%queue, exc_info=True)
852 logging.error("queue_id %r in use"%queue, exc_info=True)
680 853 content = wrap_exception()
681 854 break
682 855
683 856 msg = self.session.send(self.registrar, "registration_reply",
684 857 content=content,
685 858 ident=reg)
686 859
687 860 if content['status'] == 'ok':
688 861 if heart in self.heartmonitor.hearts:
689 862 # already beating
690 863 self.incoming_registrations[heart] = (eid,queue,reg[0],None)
691 864 self.finish_registration(heart)
692 865 else:
693 866 purge = lambda : self._purge_stalled_registration(heart)
694 867 dc = ioloop.DelayedCallback(purge, self.registration_timeout, self.loop)
695 868 dc.start()
696 869 self.incoming_registrations[heart] = (eid,queue,reg[0],dc)
697 870 else:
698 logger.error("registration::registration %i failed: %s"%(eid, content['evalue']))
871 logging.error("registration::registration %i failed: %s"%(eid, content['evalue']))
699 872 return eid
700 873
701 874 def unregister_engine(self, ident, msg):
702 875 """Unregister an engine that explicitly requested to leave."""
703 876 try:
704 877 eid = msg['content']['id']
705 878 except:
706 logger.error("registration::bad engine id for unregistration: %s"%ident, exc_info=True)
879 logging.error("registration::bad engine id for unregistration: %s"%ident, exc_info=True)
707 880 return
708 logger.info("registration::unregister_engine(%s)"%eid)
881 logging.info("registration::unregister_engine(%s)"%eid)
709 882 content=dict(id=eid, queue=self.engines[eid].queue)
710 883 self.ids.remove(eid)
711 884 self.keytable.pop(eid)
712 885 ec = self.engines.pop(eid)
713 886 self.hearts.pop(ec.heartbeat)
714 887 self.by_ident.pop(ec.queue)
715 888 self.completed.pop(eid)
716 889 for msg_id in self.queues.pop(eid):
717 890 msg = self.pending.remove(msg_id)
718 891 ############## TODO: HANDLE IT ################
719 892
720 893 if self.notifier:
721 894 self.session.send(self.notifier, "unregistration_notification", content=content)
722 895
723 896 def finish_registration(self, heart):
724 897 """Second half of engine registration, called after our HeartMonitor
725 898 has received a beat from the Engine's Heart."""
726 899 try:
727 900 (eid,queue,reg,purge) = self.incoming_registrations.pop(heart)
728 901 except KeyError:
729 logger.error("registration::tried to finish nonexistant registration", exc_info=True)
902 logging.error("registration::tried to finish nonexistant registration", exc_info=True)
730 903 return
731 logger.info("registration::finished registering engine %i:%r"%(eid,queue))
904 logging.info("registration::finished registering engine %i:%r"%(eid,queue))
732 905 if purge is not None:
733 906 purge.stop()
734 907 control = queue
735 908 self.ids.add(eid)
736 909 self.keytable[eid] = queue
737 910 self.engines[eid] = EngineConnector(id=eid, queue=queue, registration=reg,
738 911 control=control, heartbeat=heart)
739 912 self.by_ident[queue] = eid
740 913 self.queues[eid] = list()
741 914 self.tasks[eid] = list()
742 915 self.completed[eid] = list()
743 916 self.hearts[heart] = eid
744 917 content = dict(id=eid, queue=self.engines[eid].queue)
745 918 if self.notifier:
746 919 self.session.send(self.notifier, "registration_notification", content=content)
747 920
748 921 def _purge_stalled_registration(self, heart):
749 922 if heart in self.incoming_registrations:
750 923 eid = self.incoming_registrations.pop(heart)[0]
751 logger.info("registration::purging stalled registration: %i"%eid)
924 logging.info("registration::purging stalled registration: %i"%eid)
752 925 else:
753 926 pass
754 927
755 928 #-------------------------------------------------------------------------
756 929 # Client Requests
757 930 #-------------------------------------------------------------------------
758 931
759 932 def shutdown_request(self, client_id, msg):
760 933 """handle shutdown request."""
761 934 # s = self.context.socket(zmq.XREQ)
762 935 # s.connect(self.client_connections['mux'])
763 936 # time.sleep(0.1)
764 937 # for eid,ec in self.engines.iteritems():
765 938 # self.session.send(s, 'shutdown_request', content=dict(restart=False), ident=ec.queue)
766 939 # time.sleep(1)
767 940 self.session.send(self.clientele, 'shutdown_reply', content={'status': 'ok'}, ident=client_id)
768 941 dc = ioloop.DelayedCallback(lambda : self._shutdown(), 1000, self.loop)
769 942 dc.start()
770 943
771 944 def _shutdown(self):
772 logger.info("controller::controller shutting down.")
945 logging.info("hub::hub shutting down.")
773 946 time.sleep(0.1)
774 947 sys.exit(0)
775 948
776 949
777 950 def check_load(self, client_id, msg):
778 951 content = msg['content']
779 952 try:
780 953 targets = content['targets']
781 954 targets = self._validate_targets(targets)
782 955 except:
783 956 content = wrap_exception()
784 self.session.send(self.clientele, "controller_error",
957 self.session.send(self.clientele, "hub_error",
785 958 content=content, ident=client_id)
786 959 return
787 960
788 961 content = dict(status='ok')
789 962 # loads = {}
790 963 for t in targets:
791 964 content[bytes(t)] = len(self.queues[t])+len(self.tasks[t])
792 965 self.session.send(self.clientele, "load_reply", content=content, ident=client_id)
793 966
794 967
795 968 def queue_status(self, client_id, msg):
796 969 """Return the Queue status of one or more targets.
797 970 if verbose: return the msg_ids
798 971 else: return len of each type.
799 972 keys: queue (pending MUX jobs)
800 973 tasks (pending Task jobs)
801 974 completed (finished jobs from both queues)"""
802 975 content = msg['content']
803 976 targets = content['targets']
804 977 try:
805 978 targets = self._validate_targets(targets)
806 979 except:
807 980 content = wrap_exception()
808 self.session.send(self.clientele, "controller_error",
981 self.session.send(self.clientele, "hub_error",
809 982 content=content, ident=client_id)
810 983 return
811 984 verbose = content.get('verbose', False)
812 985 content = dict(status='ok')
813 986 for t in targets:
814 987 queue = self.queues[t]
815 988 completed = self.completed[t]
816 989 tasks = self.tasks[t]
817 990 if not verbose:
818 991 queue = len(queue)
819 992 completed = len(completed)
820 993 tasks = len(tasks)
821 994 content[bytes(t)] = {'queue': queue, 'completed': completed , 'tasks': tasks}
822 995 # pending
823 996 self.session.send(self.clientele, "queue_reply", content=content, ident=client_id)
824 997
825 998 def purge_results(self, client_id, msg):
826 999 """Purge results from memory. This method is more valuable before we move
827 1000 to a DB based message storage mechanism."""
828 1001 content = msg['content']
829 1002 msg_ids = content.get('msg_ids', [])
830 1003 reply = dict(status='ok')
831 1004 if msg_ids == 'all':
832 1005 self.db.drop_matching_records(dict(completed={'$ne':None}))
833 1006 else:
834 1007 for msg_id in msg_ids:
835 1008 if msg_id in self.all_completed:
836 1009 self.db.drop_record(msg_id)
837 1010 else:
838 1011 if msg_id in self.pending:
839 1012 try:
840 1013 raise IndexError("msg pending: %r"%msg_id)
841 1014 except:
842 1015 reply = wrap_exception()
843 1016 else:
844 1017 try:
845 1018 raise IndexError("No such msg: %r"%msg_id)
846 1019 except:
847 1020 reply = wrap_exception()
848 1021 break
849 1022 eids = content.get('engine_ids', [])
850 1023 for eid in eids:
851 1024 if eid not in self.engines:
852 1025 try:
853 1026 raise IndexError("No such engine: %i"%eid)
854 1027 except:
855 1028 reply = wrap_exception()
856 1029 break
857 1030 msg_ids = self.completed.pop(eid)
858 1031 uid = self.engines[eid].queue
859 1032 self.db.drop_matching_records(dict(engine_uuid=uid, completed={'$ne':None}))
860 1033
861 1034 self.session.send(self.clientele, 'purge_reply', content=reply, ident=client_id)
862 1035
863 1036 def resubmit_task(self, client_id, msg, buffers):
864 1037 """Resubmit a task."""
865 1038 raise NotImplementedError
866 1039
867 1040 def get_results(self, client_id, msg):
868 1041 """Get the result of 1 or more messages."""
869 1042 content = msg['content']
870 1043 msg_ids = sorted(set(content['msg_ids']))
871 1044 statusonly = content.get('status_only', False)
872 1045 pending = []
873 1046 completed = []
874 1047 content = dict(status='ok')
875 1048 content['pending'] = pending
876 1049 content['completed'] = completed
877 1050 buffers = []
878 1051 if not statusonly:
879 1052 content['results'] = {}
880 1053 records = self.db.find_records(dict(msg_id={'$in':msg_ids}))
881 1054 for msg_id in msg_ids:
882 1055 if msg_id in self.pending:
883 1056 pending.append(msg_id)
884 1057 elif msg_id in self.all_completed:
885 1058 completed.append(msg_id)
886 1059 if not statusonly:
887 1060 rec = records[msg_id]
888 1061 io_dict = {}
889 1062 for key in 'pyin pyout pyerr stdout stderr'.split():
890 1063 io_dict[key] = rec[key]
891 1064 content[msg_id] = { 'result_content': rec['result_content'],
892 1065 'header': rec['header'],
893 1066 'result_header' : rec['result_header'],
894 1067 'io' : io_dict,
895 1068 }
896 1069 buffers.extend(map(str, rec['result_buffers']))
897 1070 else:
898 1071 try:
899 1072 raise KeyError('No such message: '+msg_id)
900 1073 except:
901 1074 content = wrap_exception()
902 1075 break
903 1076 self.session.send(self.clientele, "result_reply", content=content,
904 1077 parent=msg, ident=client_id,
905 1078 buffers=buffers)
906 1079
@@ -1,93 +1,88 b''
1 1 #!/usr/bin/env python
2 2 from __future__ import print_function
3 3 import sys,os
4 4 import time
5 5 from subprocess import Popen, PIPE
6 6
7 from entry_point import parse_url
8 from controller import make_argument_parser
7 from IPython.external.argparse import ArgumentParser, SUPPRESS
9 8
10 9 def _filter_arg(flag, args):
11 10 filtered = []
12 11 if flag in args:
13 12 filtered.append(flag)
14 13 idx = args.index(flag)
15 14 if len(args) > idx+1:
16 15 if not args[idx+1].startswith('-'):
17 16 filtered.append(args[idx+1])
18 17 return filtered
19 18
20 19 def filter_args(flags, args=sys.argv[1:]):
21 20 filtered = []
22 21 for flag in flags:
23 22 if isinstance(flag, (list,tuple)):
24 23 for f in flag:
25 24 filtered.extend(_filter_arg(f, args))
26 25 else:
27 26 filtered.extend(_filter_arg(flag, args))
28 27 return filtered
29 28
30 29 def _strip_arg(flag, args):
31 30 while flag in args:
32 31 idx = args.index(flag)
33 32 args.pop(idx)
34 33 if len(args) > idx:
35 34 if not args[idx].startswith('-'):
36 35 args.pop(idx)
37 36
38 37 def strip_args(flags, args=sys.argv[1:]):
39 38 args = list(args)
40 39 for flag in flags:
41 40 if isinstance(flag, (list,tuple)):
42 41 for f in flag:
43 42 _strip_arg(f, args)
44 43 else:
45 44 _strip_arg(flag, args)
46 45 return args
47 46
48 47
49 48 def launch_process(mod, args):
50 49 """Launch a controller or engine in a subprocess."""
51 code = "from IPython.zmq.parallel.%s import main;main()"%mod
50 code = "from IPython.zmq.parallel.%s import launch_new_instance;launch_new_instance()"%mod
52 51 arguments = [ sys.executable, '-c', code ] + args
53 52 blackholew = file(os.devnull, 'w')
54 53 blackholer = file(os.devnull, 'r')
55 54
56 55 proc = Popen(arguments, stdin=blackholer, stdout=blackholew, stderr=PIPE)
57 56 return proc
58 57
59 58 def main():
60 parser = make_argument_parser()
59 parser = ArgumentParser(argument_default=SUPPRESS)
61 60 parser.add_argument('--n', '-n', type=int, default=1,
62 61 help="The number of engines to start.")
63 args = parser.parse_args()
64 parse_url(args)
65
66 controller_args = strip_args([('--n','-n')])
67 engine_args = filter_args(['--url', '--regport', '--logport', '--ip',
68 '--transport','--loglevel','--packer', '--execkey'])+['--ident']
62 ns,args = parser.parse_known_args()
63 n = ns.n
69 64
70 controller = launch_process('controller', controller_args)
65 controller = launch_process('ipcontrollerapp', args)
71 66 for i in range(10):
72 67 time.sleep(.1)
73 68 if controller.poll() is not None:
74 69 print("Controller failed to launch:")
75 70 print (controller.stderr.read())
76 71 sys.exit(255)
77 72
78 print("Launched Controller at %s"%args.url)
79 engines = [ launch_process('engine', engine_args+['engine-%i'%i]) for i in range(args.n) ]
80 print("%i Engines started"%args.n)
73 print("Launched Controller")
74 engines = [ launch_process('ipengineapp', args+['--ident', 'engine-%i'%i]) for i in range(n) ]
75 print("%i Engines started"%n)
81 76
82 77 def wait_quietly(p):
83 78 try:
84 79 p.wait()
85 80 except KeyboardInterrupt:
86 81 pass
87 82
88 83 wait_quietly(controller)
89 84 map(wait_quietly, engines)
90 85 print ("Engines cleaned up.")
91 86
92 87 if __name__ == '__main__':
93 88 main() No newline at end of file
@@ -1,60 +1,61 b''
1 1 """A TaskRecord backend using mongodb"""
2 2 #-----------------------------------------------------------------------------
3 3 # Copyright (C) 2010 The IPython Development Team
4 4 #
5 5 # Distributed under the terms of the BSD License. The full license is in
6 6 # the file COPYING, distributed as part of this software.
7 7 #-----------------------------------------------------------------------------
8 8
9 9 from datetime import datetime
10 10
11 11 from pymongo import Connection
12 12
13 from dictdb import BaseDB
13 14 #----------------------
14 15 # MongoDB class
15 16 #----------------------
16 class MongoDB(object):
17 class MongoDB(BaseDB):
17 18 """MongoDB TaskRecord backend."""
18 19 def __init__(self, session_uuid, *args, **kwargs):
19 20 self._connection = Connection(*args, **kwargs)
20 21 self._db = self._connection[session_uuid]
21 22 self._records = self._db['task_records']
22 23 self._table = {}
23 24
24 25
25 26 def add_record(self, msg_id, rec):
26 27 """Add a new Task Record, by msg_id."""
27 28 # print rec
28 29 obj_id = self._records.insert(rec)
29 30 self._table[msg_id] = obj_id
30 31
31 32 def get_record(self, msg_id):
32 33 """Get a specific Task Record, by msg_id."""
33 34 return self._records.find_one(self._table[msg_id])
34 35
35 36 def update_record(self, msg_id, rec):
36 37 """Update the data in an existing record."""
37 38 obj_id = self._table[msg_id]
38 39 self._records.update({'_id':obj_id}, {'$set': rec})
39 40
40 41 def drop_matching_records(self, check):
41 42 """Remove a record from the DB."""
42 43 self._records.remove(check)
43 44
44 45 def drop_record(self, msg_id):
45 46 """Remove a record from the DB."""
46 47 obj_id = self._table.pop(msg_id)
47 48 self._records.remove(obj_id)
48 49
49 50 def find_records(self, check, id_only=False):
50 51 """Find records matching a query dict."""
51 52 matches = list(self._records.find(check))
52 53 if id_only:
53 54 return [ rec['msg_id'] for rec in matches ]
54 55 else:
55 56 data = {}
56 57 for rec in matches:
57 58 data[rec['msg_id']] = rec
58 59 return data
59 60
60 61
@@ -1,434 +1,426 b''
1 1 """The Python scheduler for rich scheduling.
2 2
3 3 The Pure ZMQ scheduler does not allow routing schemes other than LRU,
4 4 nor does it check msg_id DAG dependencies. For those, a slightly slower
5 5 Python Scheduler exists.
6 6 """
7 7
8 8 #----------------------------------------------------------------------
9 9 # Imports
10 10 #----------------------------------------------------------------------
11 11
12 12 from __future__ import print_function
13 13 from random import randint,random
14 14 import logging
15 15 from types import FunctionType
16 16
17 17 try:
18 18 import numpy
19 19 except ImportError:
20 20 numpy = None
21 21
22 22 import zmq
23 23 from zmq.eventloop import ioloop, zmqstream
24 24
25 25 # local imports
26 26 from IPython.external.decorator import decorator
27 27 from IPython.config.configurable import Configurable
28 from IPython.utils.traitlets import Instance
28 from IPython.utils.traitlets import Instance, Dict, List, Set
29 29
30 30 from client import Client
31 31 from dependency import Dependency
32 32 import streamsession as ss
33 33 from entry_point import connect_logger, local_logger
34 34
35 35
36 logger = logging.getLogger()
37
38 36 @decorator
39 37 def logged(f,self,*args,**kwargs):
40 38 # print ("#--------------------")
41 logger.debug("scheduler::%s(*%s,**%s)"%(f.func_name, args, kwargs))
39 logging.debug("scheduler::%s(*%s,**%s)"%(f.func_name, args, kwargs))
42 40 # print ("#--")
43 41 return f(self,*args, **kwargs)
44 42
45 43 #----------------------------------------------------------------------
46 44 # Chooser functions
47 45 #----------------------------------------------------------------------
48 46
49 47 def plainrandom(loads):
50 48 """Plain random pick."""
51 49 n = len(loads)
52 50 return randint(0,n-1)
53 51
54 52 def lru(loads):
55 53 """Always pick the front of the line.
56 54
57 55 The content of `loads` is ignored.
58 56
59 57 Assumes LRU ordering of loads, with oldest first.
60 58 """
61 59 return 0
62 60
63 61 def twobin(loads):
64 62 """Pick two at random, use the LRU of the two.
65 63
66 64 The content of loads is ignored.
67 65
68 66 Assumes LRU ordering of loads, with oldest first.
69 67 """
70 68 n = len(loads)
71 69 a = randint(0,n-1)
72 70 b = randint(0,n-1)
73 71 return min(a,b)
74 72
75 73 def weighted(loads):
76 74 """Pick two at random using inverse load as weight.
77 75
78 76 Return the less loaded of the two.
79 77 """
80 78 # weight 0 a million times more than 1:
81 79 weights = 1./(1e-6+numpy.array(loads))
82 80 sums = weights.cumsum()
83 81 t = sums[-1]
84 82 x = random()*t
85 83 y = random()*t
86 84 idx = 0
87 85 idy = 0
88 86 while sums[idx] < x:
89 87 idx += 1
90 88 while sums[idy] < y:
91 89 idy += 1
92 90 if weights[idy] > weights[idx]:
93 91 return idy
94 92 else:
95 93 return idx
96 94
97 95 def leastload(loads):
98 96 """Always choose the lowest load.
99 97
100 98 If the lowest load occurs more than once, the first
101 99 occurance will be used. If loads has LRU ordering, this means
102 100 the LRU of those with the lowest load is chosen.
103 101 """
104 102 return loads.index(min(loads))
105 103
106 104 #---------------------------------------------------------------------
107 105 # Classes
108 106 #---------------------------------------------------------------------
109 107 class TaskScheduler(Configurable):
110 108 """Python TaskScheduler object.
111 109
112 110 This is the simplest object that supports msg_id based
113 111 DAG dependencies. *Only* task msg_ids are checked, not
114 112 msg_ids of jobs submitted via the MUX queue.
115 113
116 114 """
117 115
118 # configurables:
116 # input arguments:
119 117 scheme = Instance(FunctionType, default=leastload) # function for determining the destination
120 118 client_stream = Instance(zmqstream.ZMQStream) # client-facing stream
121 119 engine_stream = Instance(zmqstream.ZMQStream) # engine-facing stream
122 120 notifier_stream = Instance(zmqstream.ZMQStream) # hub-facing sub stream
123 121 mon_stream = Instance(zmqstream.ZMQStream) # hub-facing pub stream
124 122 io_loop = Instance(ioloop.IOLoop)
125 123
126 124 # internals:
127 dependencies = None # dict by msg_id of [ msg_ids that depend on key ]
128 depending = None # dict by msg_id of (msg_id, raw_msg, after, follow)
129 pending = None # dict by engine_uuid of submitted tasks
130 completed = None # dict by engine_uuid of completed tasks
131 clients = None # dict by msg_id for who submitted the task
132 targets = None # list of target IDENTs
133 loads = None # list of engine loads
134 all_done = None # set of all completed tasks
135 blacklist = None # dict by msg_id of locations where a job has encountered UnmetDependency
125 dependencies = Dict() # dict by msg_id of [ msg_ids that depend on key ]
126 depending = Dict() # dict by msg_id of (msg_id, raw_msg, after, follow)
127 pending = Dict() # dict by engine_uuid of submitted tasks
128 completed = Dict() # dict by engine_uuid of completed tasks
129 clients = Dict() # dict by msg_id for who submitted the task
130 targets = List() # list of target IDENTs
131 loads = List() # list of engine loads
132 all_done = Set() # set of all completed tasks
133 blacklist = Dict() # dict by msg_id of locations where a job has encountered UnmetDependency
134 session = Instance(ss.StreamSession)
136 135
137 136
138 137 def __init__(self, **kwargs):
139 138 super(TaskScheduler, self).__init__(**kwargs)
140 139
141 140 self.session = ss.StreamSession(username="TaskScheduler")
142 self.dependencies = {}
143 self.depending = {}
144 self.completed = {}
145 self.pending = {}
146 self.all_done = set()
147 self.blacklist = {}
148
149 self.targets = []
150 self.loads = []
151 141
152 142 self.engine_stream.on_recv(self.dispatch_result, copy=False)
153 143 self._notification_handlers = dict(
154 144 registration_notification = self._register_engine,
155 145 unregistration_notification = self._unregister_engine
156 146 )
157 147 self.notifier_stream.on_recv(self.dispatch_notification)
158 logger.info("Scheduler started...%r"%self)
148 logging.info("Scheduler started...%r"%self)
159 149
160 150 def resume_receiving(self):
161 151 """Resume accepting jobs."""
162 152 self.client_stream.on_recv(self.dispatch_submission, copy=False)
163 153
164 154 def stop_receiving(self):
165 155 """Stop accepting jobs while there are no engines.
166 156 Leave them in the ZMQ queue."""
167 157 self.client_stream.on_recv(None)
168 158
169 159 #-----------------------------------------------------------------------
170 160 # [Un]Registration Handling
171 161 #-----------------------------------------------------------------------
172 162
173 163 def dispatch_notification(self, msg):
174 164 """dispatch register/unregister events."""
175 165 idents,msg = self.session.feed_identities(msg)
176 166 msg = self.session.unpack_message(msg)
177 167 msg_type = msg['msg_type']
178 168 handler = self._notification_handlers.get(msg_type, None)
179 169 if handler is None:
180 170 raise Exception("Unhandled message type: %s"%msg_type)
181 171 else:
182 172 try:
183 173 handler(str(msg['content']['queue']))
184 174 except KeyError:
185 logger.error("task::Invalid notification msg: %s"%msg)
175 logging.error("task::Invalid notification msg: %s"%msg)
186 176
187 177 @logged
188 178 def _register_engine(self, uid):
189 179 """New engine with ident `uid` became available."""
190 180 # head of the line:
191 181 self.targets.insert(0,uid)
192 182 self.loads.insert(0,0)
193 183 # initialize sets
194 184 self.completed[uid] = set()
195 185 self.pending[uid] = {}
196 186 if len(self.targets) == 1:
197 187 self.resume_receiving()
198 188
199 189 def _unregister_engine(self, uid):
200 190 """Existing engine with ident `uid` became unavailable."""
201 191 if len(self.targets) == 1:
202 192 # this was our only engine
203 193 self.stop_receiving()
204 194
205 195 # handle any potentially finished tasks:
206 196 self.engine_stream.flush()
207 197
208 198 self.completed.pop(uid)
209 199 lost = self.pending.pop(uid)
210 200
211 201 idx = self.targets.index(uid)
212 202 self.targets.pop(idx)
213 203 self.loads.pop(idx)
214 204
215 205 self.handle_stranded_tasks(lost)
216 206
217 207 def handle_stranded_tasks(self, lost):
218 208 """Deal with jobs resident in an engine that died."""
219 209 # TODO: resubmit the tasks?
220 210 for msg_id in lost:
221 211 pass
222 212
223 213
224 214 #-----------------------------------------------------------------------
225 215 # Job Submission
226 216 #-----------------------------------------------------------------------
227 217 @logged
228 218 def dispatch_submission(self, raw_msg):
229 219 """Dispatch job submission to appropriate handlers."""
230 220 # ensure targets up to date:
231 221 self.notifier_stream.flush()
232 222 try:
233 223 idents, msg = self.session.feed_identities(raw_msg, copy=False)
234 224 except Exception as e:
235 logger.error("task::Invaid msg: %s"%msg)
225 logging.error("task::Invaid msg: %s"%msg)
236 226 return
237 227
238 228 # send to monitor
239 229 self.mon_stream.send_multipart(['intask']+raw_msg, copy=False)
240 230
241 231 msg = self.session.unpack_message(msg, content=False, copy=False)
242 232 header = msg['header']
243 233 msg_id = header['msg_id']
244 234
245 235 # time dependencies
246 236 after = Dependency(header.get('after', []))
247 237 if after.mode == 'all':
248 238 after.difference_update(self.all_done)
249 239 if after.check(self.all_done):
250 240 # recast as empty set, if `after` already met,
251 241 # to prevent unnecessary set comparisons
252 242 after = Dependency([])
253 243
254 244 # location dependencies
255 245 follow = Dependency(header.get('follow', []))
256 246 if len(after) == 0:
257 247 # time deps already met, try to run
258 248 if not self.maybe_run(msg_id, raw_msg, follow):
259 249 # can't run yet
260 250 self.save_unmet(msg_id, raw_msg, after, follow)
261 251 else:
262 252 self.save_unmet(msg_id, raw_msg, after, follow)
263 253
264 254 @logged
265 255 def maybe_run(self, msg_id, raw_msg, follow=None):
266 256 """check location dependencies, and run if they are met."""
267 257
268 258 if follow:
269 259 def can_run(idx):
270 260 target = self.targets[idx]
271 261 return target not in self.blacklist.get(msg_id, []) and\
272 262 follow.check(self.completed[target])
273 263
274 264 indices = filter(can_run, range(len(self.targets)))
275 265 if not indices:
276 266 return False
277 267 else:
278 268 indices = None
279 269
280 270 self.submit_task(msg_id, raw_msg, indices)
281 271 return True
282 272
283 273 @logged
284 274 def save_unmet(self, msg_id, msg, after, follow):
285 275 """Save a message for later submission when its dependencies are met."""
286 276 self.depending[msg_id] = (msg_id,msg,after,follow)
287 277 # track the ids in both follow/after, but not those already completed
288 278 for dep_id in after.union(follow).difference(self.all_done):
289 279 if dep_id not in self.dependencies:
290 280 self.dependencies[dep_id] = set()
291 281 self.dependencies[dep_id].add(msg_id)
292 282
293 283 @logged
294 284 def submit_task(self, msg_id, msg, follow=None, indices=None):
295 285 """Submit a task to any of a subset of our targets."""
296 286 if indices:
297 287 loads = [self.loads[i] for i in indices]
298 288 else:
299 289 loads = self.loads
300 290 idx = self.scheme(loads)
301 291 if indices:
302 292 idx = indices[idx]
303 293 target = self.targets[idx]
304 294 # print (target, map(str, msg[:3]))
305 295 self.engine_stream.send(target, flags=zmq.SNDMORE, copy=False)
306 296 self.engine_stream.send_multipart(msg, copy=False)
307 297 self.add_job(idx)
308 298 self.pending[target][msg_id] = (msg, follow)
309 299 content = dict(msg_id=msg_id, engine_id=target)
310 300 self.session.send(self.mon_stream, 'task_destination', content=content,
311 301 ident=['tracktask',self.session.session])
312 302
313 303 #-----------------------------------------------------------------------
314 304 # Result Handling
315 305 #-----------------------------------------------------------------------
316 306 @logged
317 307 def dispatch_result(self, raw_msg):
318 308 try:
319 309 idents,msg = self.session.feed_identities(raw_msg, copy=False)
320 310 except Exception as e:
321 logger.error("task::Invaid result: %s"%msg)
311 logging.error("task::Invaid result: %s"%msg)
322 312 return
323 313 msg = self.session.unpack_message(msg, content=False, copy=False)
324 314 header = msg['header']
325 315 if header.get('dependencies_met', True):
326 316 self.handle_result_success(idents, msg['parent_header'], raw_msg)
327 317 # send to monitor
328 318 self.mon_stream.send_multipart(['outtask']+raw_msg, copy=False)
329 319 else:
330 320 self.handle_unmet_dependency(idents, msg['parent_header'])
331 321
332 322 @logged
333 323 def handle_result_success(self, idents, parent, raw_msg):
334 324 # first, relay result to client
335 325 engine = idents[0]
336 326 client = idents[1]
337 327 # swap_ids for XREP-XREP mirror
338 328 raw_msg[:2] = [client,engine]
339 329 # print (map(str, raw_msg[:4]))
340 330 self.client_stream.send_multipart(raw_msg, copy=False)
341 331 # now, update our data structures
342 332 msg_id = parent['msg_id']
343 333 self.pending[engine].pop(msg_id)
344 334 self.completed[engine].add(msg_id)
345 335 self.all_done.add(msg_id)
346 336
347 337 self.update_dependencies(msg_id)
348 338
349 339 @logged
350 340 def handle_unmet_dependency(self, idents, parent):
351 341 engine = idents[0]
352 342 msg_id = parent['msg_id']
353 343 if msg_id not in self.blacklist:
354 344 self.blacklist[msg_id] = set()
355 345 self.blacklist[msg_id].add(engine)
356 346 raw_msg,follow = self.pending[engine].pop(msg_id)
357 347 if not self.maybe_run(msg_id, raw_msg, follow):
358 348 # resubmit failed, put it back in our dependency tree
359 349 self.save_unmet(msg_id, raw_msg, Dependency(), follow)
360 350 pass
361 351 @logged
362 352 def update_dependencies(self, dep_id):
363 353 """dep_id just finished. Update our dependency
364 354 table and submit any jobs that just became runable."""
365 355
366 356 if dep_id not in self.dependencies:
367 357 return
368 358 jobs = self.dependencies.pop(dep_id)
369 359 for job in jobs:
370 360 msg_id, raw_msg, after, follow = self.depending[job]
371 361 if dep_id in after:
372 362 after.remove(dep_id)
373 363 if not after: # time deps met, maybe run
374 364 if self.maybe_run(msg_id, raw_msg, follow):
375 365 self.depending.pop(job)
376 366 for mid in follow:
377 367 if mid in self.dependencies:
378 368 self.dependencies[mid].remove(msg_id)
379 369
380 370 #----------------------------------------------------------------------
381 371 # methods to be overridden by subclasses
382 372 #----------------------------------------------------------------------
383 373
384 374 def add_job(self, idx):
385 375 """Called after self.targets[idx] just got the job with header.
386 376 Override with subclasses. The default ordering is simple LRU.
387 377 The default loads are the number of outstanding jobs."""
388 378 self.loads[idx] += 1
389 379 for lis in (self.targets, self.loads):
390 380 lis.append(lis.pop(idx))
391 381
392 382
393 383 def finish_job(self, idx):
394 384 """Called after self.targets[idx] just finished a job.
395 385 Override with subclasses."""
396 386 self.loads[idx] -= 1
397 387
398 388
399 389
400 390 def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, log_addr=None, loglevel=logging.DEBUG, scheme='weighted'):
401 391 from zmq.eventloop import ioloop
402 392 from zmq.eventloop.zmqstream import ZMQStream
403 393
404 394 ctx = zmq.Context()
405 395 loop = ioloop.IOLoop()
406 396
407 scheme = globals().get(scheme)
408
409 397 ins = ZMQStream(ctx.socket(zmq.XREP),loop)
410 398 ins.bind(in_addr)
411 399 outs = ZMQStream(ctx.socket(zmq.XREP),loop)
412 400 outs.bind(out_addr)
413 401 mons = ZMQStream(ctx.socket(zmq.PUB),loop)
414 402 mons.connect(mon_addr)
415 403 nots = ZMQStream(ctx.socket(zmq.SUB),loop)
416 404 nots.setsockopt(zmq.SUBSCRIBE, '')
417 405 nots.connect(not_addr)
418 406
407 scheme = globals().get(scheme, None)
419 408 # setup logging
420 409 if log_addr:
421 410 connect_logger(ctx, log_addr, root="scheduler", loglevel=loglevel)
422 411 else:
423 412 local_logger(loglevel)
424 413
425 414 scheduler = TaskScheduler(client_stream=ins, engine_stream=outs,
426 415 mon_stream=mons,notifier_stream=nots,
427 416 scheme=scheme,io_loop=loop)
428 417
429 loop.start()
418 try:
419 loop.start()
420 except KeyboardInterrupt:
421 print ("interrupted, exiting...", file=sys.__stderr__)
430 422
431 423
432 424 if __name__ == '__main__':
433 425 iface = 'tcp://127.0.0.1:%i'
434 426 launch_scheduler(iface%12345,iface%1236,iface%12347,iface%12348)
@@ -1,461 +1,490 b''
1 1 #!/usr/bin/env python
2 2 """
3 3 Kernel adapted from kernel.py to use ZMQ Streams
4 4 """
5 5
6 6 #-----------------------------------------------------------------------------
7 7 # Imports
8 8 #-----------------------------------------------------------------------------
9 9
10 10 # Standard library imports.
11 11 from __future__ import print_function
12 12 import __builtin__
13 13 from code import CommandCompiler
14 14 import os
15 15 import sys
16 16 import time
17 17 import traceback
18 18 import logging
19 19 from datetime import datetime
20 20 from signal import SIGTERM, SIGKILL
21 21 from pprint import pprint
22 22
23 23 # System library imports.
24 24 import zmq
25 25 from zmq.eventloop import ioloop, zmqstream
26 26
27 27 # Local imports.
28 28 from IPython.core import ultratb
29 from IPython.utils.traitlets import HasTraits, Instance, List, Int
29 from IPython.utils.traitlets import HasTraits, Instance, List, Int, Dict, Set, Str
30 30 from IPython.zmq.completer import KernelCompleter
31 31 from IPython.zmq.iostream import OutStream
32 32 from IPython.zmq.displayhook import DisplayHook
33 33
34
34 from factory import SessionFactory
35 35 from streamsession import StreamSession, Message, extract_header, serialize_object,\
36 36 unpack_apply_message, ISO8601, wrap_exception
37 37 from dependency import UnmetDependency
38 38 import heartmonitor
39 39 from client import Client
40 40
41 logger = logging.getLogger()
42
43 41 def printer(*args):
44 42 pprint(args, stream=sys.__stdout__)
45 43
44
45 class _Passer:
46 """Empty class that implements `send()` that does nothing."""
47 def send(self, *args, **kwargs):
48 pass
49 send_multipart = send
50
51
46 52 #-----------------------------------------------------------------------------
47 53 # Main kernel class
48 54 #-----------------------------------------------------------------------------
49 55
50 class Kernel(HasTraits):
56 class Kernel(SessionFactory):
51 57
52 58 #---------------------------------------------------------------------------
53 59 # Kernel interface
54 60 #---------------------------------------------------------------------------
55
56 id = Int(-1)
57 session = Instance(StreamSession)
58 shell_streams = List()
61
62 # kwargs:
63 int_id = Int(-1, config=True)
64 user_ns = Dict(config=True)
65 exec_lines = List(config=True)
66
59 67 control_stream = Instance(zmqstream.ZMQStream)
60 68 task_stream = Instance(zmqstream.ZMQStream)
61 69 iopub_stream = Instance(zmqstream.ZMQStream)
62 client = Instance(Client)
63 loop = Instance(ioloop.IOLoop)
70 client = Instance('IPython.zmq.parallel.client.Client')
71
72 # internals
73 shell_streams = List()
74 compiler = Instance(CommandCompiler, (), {})
75 completer = Instance(KernelCompleter)
76
77 aborted = Set()
78 shell_handlers = Dict()
79 control_handlers = Dict()
80
81 def _set_prefix(self):
82 self.prefix = "engine.%s"%self.int_id
83
84 def _connect_completer(self):
85 self.completer = KernelCompleter(self.user_ns)
64 86
65 87 def __init__(self, **kwargs):
66 88 super(Kernel, self).__init__(**kwargs)
67 self.identity = self.shell_streams[0].getsockopt(zmq.IDENTITY)
68 self.prefix = 'engine.%s'%self.id
69 logger.root_topic = self.prefix
70 self.user_ns = {}
71 self.history = []
72 self.compiler = CommandCompiler()
73 self.completer = KernelCompleter(self.user_ns)
74 self.aborted = set()
89 self._set_prefix()
90 self._connect_completer()
91
92 self.on_trait_change(self._set_prefix, 'id')
93 self.on_trait_change(self._connect_completer, 'user_ns')
75 94
76 95 # Build dict of handlers for message types
77 self.shell_handlers = {}
78 self.control_handlers = {}
79 96 for msg_type in ['execute_request', 'complete_request', 'apply_request',
80 97 'clear_request']:
81 98 self.shell_handlers[msg_type] = getattr(self, msg_type)
82 99
83 100 for msg_type in ['shutdown_request', 'abort_request']+self.shell_handlers.keys():
84 101 self.control_handlers[msg_type] = getattr(self, msg_type)
85
102
103 self._initial_exec_lines()
86 104
87 105 def _wrap_exception(self, method=None):
88 e_info = dict(engineid=self.identity, method=method)
106 e_info = dict(engineid=self.ident, method=method)
89 107 content=wrap_exception(e_info)
90 108 return content
91 109
110 def _initial_exec_lines(self):
111 s = _Passer()
112 content = dict(silent=True, user_variable=[],user_expressions=[])
113 for line in self.exec_lines:
114 logging.debug("executing initialization: %s"%line)
115 content.update({'code':line})
116 msg = self.session.msg('execute_request', content)
117 self.execute_request(s, [], msg)
118
119
92 120 #-------------------- control handlers -----------------------------
93 121 def abort_queues(self):
94 122 for stream in self.shell_streams:
95 123 if stream:
96 124 self.abort_queue(stream)
97 125
98 126 def abort_queue(self, stream):
99 127 while True:
100 128 try:
101 129 msg = self.session.recv(stream, zmq.NOBLOCK,content=True)
102 130 except zmq.ZMQError as e:
103 131 if e.errno == zmq.EAGAIN:
104 132 break
105 133 else:
106 134 return
107 135 else:
108 136 if msg is None:
109 137 return
110 138 else:
111 139 idents,msg = msg
112 140
113 141 # assert self.reply_socketly_socket.rcvmore(), "Unexpected missing message part."
114 142 # msg = self.reply_socket.recv_json()
115 logger.info("Aborting:")
116 logger.info(str(msg))
143 logging.info("Aborting:")
144 logging.info(str(msg))
117 145 msg_type = msg['msg_type']
118 146 reply_type = msg_type.split('_')[0] + '_reply'
119 147 # reply_msg = self.session.msg(reply_type, {'status' : 'aborted'}, msg)
120 148 # self.reply_socket.send(ident,zmq.SNDMORE)
121 149 # self.reply_socket.send_json(reply_msg)
122 150 reply_msg = self.session.send(stream, reply_type,
123 151 content={'status' : 'aborted'}, parent=msg, ident=idents)[0]
124 logger.debug(str(reply_msg))
152 logging.debug(str(reply_msg))
125 153 # We need to wait a bit for requests to come in. This can probably
126 154 # be set shorter for true asynchronous clients.
127 155 time.sleep(0.05)
128 156
129 157 def abort_request(self, stream, ident, parent):
130 158 """abort a specifig msg by id"""
131 159 msg_ids = parent['content'].get('msg_ids', None)
132 160 if isinstance(msg_ids, basestring):
133 161 msg_ids = [msg_ids]
134 162 if not msg_ids:
135 163 self.abort_queues()
136 164 for mid in msg_ids:
137 165 self.aborted.add(str(mid))
138 166
139 167 content = dict(status='ok')
140 168 reply_msg = self.session.send(stream, 'abort_reply', content=content,
141 169 parent=parent, ident=ident)[0]
142 logger(Message(reply_msg), file=sys.__stdout__)
170 logging.debug(str(reply_msg))
143 171
144 172 def shutdown_request(self, stream, ident, parent):
145 173 """kill ourself. This should really be handled in an external process"""
146 174 try:
147 175 self.abort_queues()
148 176 except:
149 177 content = self._wrap_exception('shutdown')
150 178 else:
151 179 content = dict(parent['content'])
152 180 content['status'] = 'ok'
153 181 msg = self.session.send(stream, 'shutdown_reply',
154 182 content=content, parent=parent, ident=ident)
155 183 # msg = self.session.send(self.pub_socket, 'shutdown_reply',
156 184 # content, parent, ident)
157 185 # print >> sys.__stdout__, msg
158 186 # time.sleep(0.2)
159 187 dc = ioloop.DelayedCallback(lambda : sys.exit(0), 1000, self.loop)
160 188 dc.start()
161 189
162 190 def dispatch_control(self, msg):
163 191 idents,msg = self.session.feed_identities(msg, copy=False)
164 192 try:
165 193 msg = self.session.unpack_message(msg, content=True, copy=False)
166 194 except:
167 logger.error("Invalid Message", exc_info=True)
195 logging.error("Invalid Message", exc_info=True)
168 196 return
169 197
170 198 header = msg['header']
171 199 msg_id = header['msg_id']
172 200
173 201 handler = self.control_handlers.get(msg['msg_type'], None)
174 202 if handler is None:
175 logger.error("UNKNOWN CONTROL MESSAGE TYPE: %r"%msg['msg_type'])
203 logging.error("UNKNOWN CONTROL MESSAGE TYPE: %r"%msg['msg_type'])
176 204 else:
177 205 handler(self.control_stream, idents, msg)
178 206
179 207
180 208 #-------------------- queue helpers ------------------------------
181 209
182 210 def check_dependencies(self, dependencies):
183 211 if not dependencies:
184 212 return True
185 213 if len(dependencies) == 2 and dependencies[0] in 'any all'.split():
186 214 anyorall = dependencies[0]
187 215 dependencies = dependencies[1]
188 216 else:
189 217 anyorall = 'all'
190 218 results = self.client.get_results(dependencies,status_only=True)
191 219 if results['status'] != 'ok':
192 220 return False
193 221
194 222 if anyorall == 'any':
195 223 if not results['completed']:
196 224 return False
197 225 else:
198 226 if results['pending']:
199 227 return False
200 228
201 229 return True
202 230
203 231 def check_aborted(self, msg_id):
204 232 return msg_id in self.aborted
205 233
206 234 #-------------------- queue handlers -----------------------------
207 235
208 236 def clear_request(self, stream, idents, parent):
209 237 """Clear our namespace."""
210 238 self.user_ns = {}
211 239 msg = self.session.send(stream, 'clear_reply', ident=idents, parent=parent,
212 240 content = dict(status='ok'))
241 self._initial_exec_lines()
213 242
214 243 def execute_request(self, stream, ident, parent):
244 logging.debug('execute request %s'%parent)
215 245 try:
216 246 code = parent[u'content'][u'code']
217 247 except:
218 logger.error("Got bad msg: %s"%parent, exc_info=True)
248 logging.error("Got bad msg: %s"%parent, exc_info=True)
219 249 return
220 # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
221 # self.iopub_stream.send(pyin_msg)
222 250 self.session.send(self.iopub_stream, u'pyin', {u'code':code},parent=parent,
223 251 ident='%s.pyin'%self.prefix)
224 252 started = datetime.now().strftime(ISO8601)
225 253 try:
226 254 comp_code = self.compiler(code, '<zmq-kernel>')
227 255 # allow for not overriding displayhook
228 256 if hasattr(sys.displayhook, 'set_parent'):
229 257 sys.displayhook.set_parent(parent)
230 258 sys.stdout.set_parent(parent)
231 259 sys.stderr.set_parent(parent)
232 260 exec comp_code in self.user_ns, self.user_ns
233 261 except:
234 262 exc_content = self._wrap_exception('execute')
235 263 # exc_msg = self.session.msg(u'pyerr', exc_content, parent)
236 264 self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent,
237 265 ident='%s.pyerr'%self.prefix)
238 266 reply_content = exc_content
239 267 else:
240 268 reply_content = {'status' : 'ok'}
241 269 # reply_msg = self.session.msg(u'execute_reply', reply_content, parent)
242 270 # self.reply_socket.send(ident, zmq.SNDMORE)
243 271 # self.reply_socket.send_json(reply_msg)
244 272 reply_msg = self.session.send(stream, u'execute_reply', reply_content, parent=parent,
245 273 ident=ident, subheader = dict(started=started))
246 logger.debug(str(reply_msg))
274 logging.debug(str(reply_msg))
247 275 if reply_msg['content']['status'] == u'error':
248 276 self.abort_queues()
249 277
250 278 def complete_request(self, stream, ident, parent):
251 279 matches = {'matches' : self.complete(parent),
252 280 'status' : 'ok'}
253 281 completion_msg = self.session.send(stream, 'complete_reply',
254 282 matches, parent, ident)
255 283 # print >> sys.__stdout__, completion_msg
256 284
257 285 def complete(self, msg):
258 286 return self.completer.complete(msg.content.line, msg.content.text)
259 287
260 288 def apply_request(self, stream, ident, parent):
261 289 # print (parent)
262 290 try:
263 291 content = parent[u'content']
264 292 bufs = parent[u'buffers']
265 293 msg_id = parent['header']['msg_id']
266 294 bound = content.get('bound', False)
267 295 except:
268 logger.error("Got bad msg: %s"%parent, exc_info=True)
296 logging.error("Got bad msg: %s"%parent, exc_info=True)
269 297 return
270 298 # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
271 299 # self.iopub_stream.send(pyin_msg)
272 300 # self.session.send(self.iopub_stream, u'pyin', {u'code':code},parent=parent)
273 sub = {'dependencies_met' : True, 'engine' : self.identity,
301 sub = {'dependencies_met' : True, 'engine' : self.ident,
274 302 'started': datetime.now().strftime(ISO8601)}
275 303 try:
276 304 # allow for not overriding displayhook
277 305 if hasattr(sys.displayhook, 'set_parent'):
278 306 sys.displayhook.set_parent(parent)
279 307 sys.stdout.set_parent(parent)
280 308 sys.stderr.set_parent(parent)
281 309 # exec "f(*args,**kwargs)" in self.user_ns, self.user_ns
282 310 if bound:
283 311 working = self.user_ns
284 312 suffix = str(msg_id).replace("-","")
285 313 prefix = "_"
286 314
287 315 else:
288 316 working = dict()
289 317 suffix = prefix = "_" # prevent keyword collisions with lambda
290 318 f,args,kwargs = unpack_apply_message(bufs, working, copy=False)
291 319 # if f.fun
292 320 if hasattr(f, 'func_name'):
293 321 fname = f.func_name
294 322 else:
295 323 fname = f.__name__
296 324
297 325 fname = prefix+fname.strip('<>')+suffix
298 326 argname = prefix+"args"+suffix
299 327 kwargname = prefix+"kwargs"+suffix
300 328 resultname = prefix+"result"+suffix
301 329
302 330 ns = { fname : f, argname : args, kwargname : kwargs }
303 331 # print ns
304 332 working.update(ns)
305 333 code = "%s=%s(*%s,**%s)"%(resultname, fname, argname, kwargname)
306 334 exec code in working, working
307 335 result = working.get(resultname)
308 336 # clear the namespace
309 337 if bound:
310 338 for key in ns.iterkeys():
311 339 self.user_ns.pop(key)
312 340 else:
313 341 del working
314 342
315 343 packed_result,buf = serialize_object(result)
316 344 result_buf = [packed_result]+buf
317 345 except:
318 346 exc_content = self._wrap_exception('apply')
319 347 # exc_msg = self.session.msg(u'pyerr', exc_content, parent)
320 348 self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent,
321 349 ident='%s.pyerr'%self.prefix)
322 350 reply_content = exc_content
323 351 result_buf = []
324 352
325 353 if exc_content['ename'] == UnmetDependency.__name__:
326 354 sub['dependencies_met'] = False
327 355 else:
328 356 reply_content = {'status' : 'ok'}
329 357 # reply_msg = self.session.msg(u'execute_reply', reply_content, parent)
330 358 # self.reply_socket.send(ident, zmq.SNDMORE)
331 359 # self.reply_socket.send_json(reply_msg)
332 360 reply_msg = self.session.send(stream, u'apply_reply', reply_content,
333 361 parent=parent, ident=ident,buffers=result_buf, subheader=sub)
334 362 # print(Message(reply_msg), file=sys.__stdout__)
335 363 # if reply_msg['content']['status'] == u'error':
336 364 # self.abort_queues()
337 365
338 366 def dispatch_queue(self, stream, msg):
339 367 self.control_stream.flush()
340 368 idents,msg = self.session.feed_identities(msg, copy=False)
341 369 try:
342 370 msg = self.session.unpack_message(msg, content=True, copy=False)
343 371 except:
344 logger.error("Invalid Message", exc_info=True)
372 logging.error("Invalid Message", exc_info=True)
345 373 return
346 374
347 375
348 376 header = msg['header']
349 377 msg_id = header['msg_id']
350 378 if self.check_aborted(msg_id):
351 379 self.aborted.remove(msg_id)
352 380 # is it safe to assume a msg_id will not be resubmitted?
353 381 reply_type = msg['msg_type'].split('_')[0] + '_reply'
354 382 reply_msg = self.session.send(stream, reply_type,
355 383 content={'status' : 'aborted'}, parent=msg, ident=idents)
356 384 return
357 385 handler = self.shell_handlers.get(msg['msg_type'], None)
358 386 if handler is None:
359 logger.error("UNKNOWN MESSAGE TYPE: %r"%msg['msg_type'])
387 logging.error("UNKNOWN MESSAGE TYPE: %r"%msg['msg_type'])
360 388 else:
361 389 handler(stream, idents, msg)
362 390
363 391 def start(self):
364 392 #### stream mode:
365 393 if self.control_stream:
366 394 self.control_stream.on_recv(self.dispatch_control, copy=False)
367 395 self.control_stream.on_err(printer)
368 396
369 397 def make_dispatcher(stream):
370 398 def dispatcher(msg):
371 399 return self.dispatch_queue(stream, msg)
372 400 return dispatcher
373 401
374 402 for s in self.shell_streams:
403 # s.on_recv(printer)
375 404 s.on_recv(make_dispatcher(s), copy=False)
376 s.on_err(printer)
405 # s.on_err(printer)
377 406
378 407 if self.iopub_stream:
379 408 self.iopub_stream.on_err(printer)
380 409 # self.iopub_stream.on_send(printer)
381 410
382 411 #### while True mode:
383 412 # while True:
384 413 # idle = True
385 414 # try:
386 415 # msg = self.shell_stream.socket.recv_multipart(
387 416 # zmq.NOBLOCK, copy=False)
388 417 # except zmq.ZMQError, e:
389 418 # if e.errno != zmq.EAGAIN:
390 419 # raise e
391 420 # else:
392 421 # idle=False
393 422 # self.dispatch_queue(self.shell_stream, msg)
394 423 #
395 424 # if not self.task_stream.empty():
396 425 # idle=False
397 426 # msg = self.task_stream.recv_multipart()
398 427 # self.dispatch_queue(self.task_stream, msg)
399 428 # if idle:
400 429 # # don't busywait
401 430 # time.sleep(1e-3)
402 431
403 432 def make_kernel(int_id, identity, control_addr, shell_addrs, iopub_addr, hb_addrs,
404 433 client_addr=None, loop=None, context=None, key=None,
405 434 out_stream_factory=OutStream, display_hook_factory=DisplayHook):
406
435 """NO LONGER IN USE"""
407 436 # create loop, context, and session:
408 437 if loop is None:
409 438 loop = ioloop.IOLoop.instance()
410 439 if context is None:
411 440 context = zmq.Context()
412 441 c = context
413 442 session = StreamSession(key=key)
414 443 # print (session.key)
415 444 # print (control_addr, shell_addrs, iopub_addr, hb_addrs)
416 445
417 446 # create Control Stream
418 447 control_stream = zmqstream.ZMQStream(c.socket(zmq.PAIR), loop)
419 448 control_stream.setsockopt(zmq.IDENTITY, identity)
420 449 control_stream.connect(control_addr)
421 450
422 451 # create Shell Streams (MUX, Task, etc.):
423 452 shell_streams = []
424 453 for addr in shell_addrs:
425 454 stream = zmqstream.ZMQStream(c.socket(zmq.PAIR), loop)
426 455 stream.setsockopt(zmq.IDENTITY, identity)
427 456 stream.connect(addr)
428 457 shell_streams.append(stream)
429 458
430 459 # create iopub stream:
431 460 iopub_stream = zmqstream.ZMQStream(c.socket(zmq.PUB), loop)
432 461 iopub_stream.setsockopt(zmq.IDENTITY, identity)
433 462 iopub_stream.connect(iopub_addr)
434 463
435 464 # Redirect input streams and set a display hook.
436 465 if out_stream_factory:
437 466 sys.stdout = out_stream_factory(session, iopub_stream, u'stdout')
438 467 sys.stdout.topic = 'engine.%i.stdout'%int_id
439 468 sys.stderr = out_stream_factory(session, iopub_stream, u'stderr')
440 469 sys.stderr.topic = 'engine.%i.stderr'%int_id
441 470 if display_hook_factory:
442 471 sys.displayhook = display_hook_factory(session, iopub_stream)
443 472 sys.displayhook.topic = 'engine.%i.pyout'%int_id
444 473
445 474
446 475 # launch heartbeat
447 476 heart = heartmonitor.Heart(*map(str, hb_addrs), heart_id=identity)
448 477 heart.start()
449 478
450 479 # create (optional) Client
451 480 if client_addr:
452 481 client = Client(client_addr, username=identity)
453 482 else:
454 483 client = None
455 484
456 kernel = Kernel(id=int_id, session=session, control_stream=control_stream,
485 kernel = Kernel(id=int_id, session=session, control_stream=control_stream,
457 486 shell_streams=shell_streams, iopub_stream=iopub_stream,
458 487 client=client, loop=loop)
459 488 kernel.start()
460 489 return loop, c, kernel
461 490
@@ -1,543 +1,549 b''
1 1 #!/usr/bin/env python
2 2 """edited session.py to work with streams, and move msg_type to the header
3 3 """
4 4
5 5
6 6 import os
7 7 import sys
8 8 import traceback
9 9 import pprint
10 10 import uuid
11 11 from datetime import datetime
12 12
13 13 import zmq
14 14 from zmq.utils import jsonapi
15 15 from zmq.eventloop.zmqstream import ZMQStream
16 16
17 17 from IPython.utils.pickleutil import can, uncan, canSequence, uncanSequence
18 18 from IPython.utils.newserialized import serialize, unserialize
19 19
20 20 from IPython.zmq.parallel.error import RemoteError
21 21
22 22 try:
23 23 import cPickle
24 24 pickle = cPickle
25 25 except:
26 26 cPickle = None
27 27 import pickle
28 28
29 29 # packer priority: jsonlib[2], cPickle, simplejson/json, pickle
30 30 json_name = '' if not jsonapi.jsonmod else jsonapi.jsonmod.__name__
31 31 if json_name in ('jsonlib', 'jsonlib2'):
32 32 use_json = True
33 33 elif json_name:
34 34 if cPickle is None:
35 35 use_json = True
36 36 else:
37 37 use_json = False
38 38 else:
39 39 use_json = False
40 40
41 41 def squash_unicode(obj):
42 42 if isinstance(obj,dict):
43 43 for key in obj.keys():
44 44 obj[key] = squash_unicode(obj[key])
45 45 if isinstance(key, unicode):
46 46 obj[squash_unicode(key)] = obj.pop(key)
47 47 elif isinstance(obj, list):
48 48 for i,v in enumerate(obj):
49 49 obj[i] = squash_unicode(v)
50 50 elif isinstance(obj, unicode):
51 51 obj = obj.encode('utf8')
52 52 return obj
53 53
54 json_packer = jsonapi.dumps
55 json_unpacker = lambda s: squash_unicode(jsonapi.loads(s))
56
57 pickle_packer = lambda o: pickle.dumps(o,-1)
58 pickle_unpacker = pickle.loads
59
54 60 if use_json:
55 default_packer = jsonapi.dumps
56 default_unpacker = lambda s: squash_unicode(jsonapi.loads(s))
61 default_packer = json_packer
62 default_unpacker = json_unpacker
57 63 else:
58 default_packer = lambda o: pickle.dumps(o,-1)
59 default_unpacker = pickle.loads
64 default_packer = pickle_packer
65 default_unpacker = pickle_unpacker
60 66
61 67
62 68 DELIM="<IDS|MSG>"
63 69 ISO8601="%Y-%m-%dT%H:%M:%S.%f"
64 70
65 71 def wrap_exception(engine_info={}):
66 72 etype, evalue, tb = sys.exc_info()
67 73 stb = traceback.format_exception(etype, evalue, tb)
68 74 exc_content = {
69 75 'status' : 'error',
70 76 'traceback' : stb,
71 77 'ename' : unicode(etype.__name__),
72 78 'evalue' : unicode(evalue),
73 79 'engine_info' : engine_info
74 80 }
75 81 return exc_content
76 82
77 83 def unwrap_exception(content):
78 84 err = RemoteError(content['ename'], content['evalue'],
79 85 ''.join(content['traceback']),
80 86 content.get('engine_info', {}))
81 87 return err
82 88
83 89
84 90 class Message(object):
85 91 """A simple message object that maps dict keys to attributes.
86 92
87 93 A Message can be created from a dict and a dict from a Message instance
88 94 simply by calling dict(msg_obj)."""
89 95
90 96 def __init__(self, msg_dict):
91 97 dct = self.__dict__
92 98 for k, v in dict(msg_dict).iteritems():
93 99 if isinstance(v, dict):
94 100 v = Message(v)
95 101 dct[k] = v
96 102
97 103 # Having this iterator lets dict(msg_obj) work out of the box.
98 104 def __iter__(self):
99 105 return iter(self.__dict__.iteritems())
100 106
101 107 def __repr__(self):
102 108 return repr(self.__dict__)
103 109
104 110 def __str__(self):
105 111 return pprint.pformat(self.__dict__)
106 112
107 113 def __contains__(self, k):
108 114 return k in self.__dict__
109 115
110 116 def __getitem__(self, k):
111 117 return self.__dict__[k]
112 118
113 119
114 120 def msg_header(msg_id, msg_type, username, session):
115 121 date=datetime.now().strftime(ISO8601)
116 122 return locals()
117 123
118 124 def extract_header(msg_or_header):
119 125 """Given a message or header, return the header."""
120 126 if not msg_or_header:
121 127 return {}
122 128 try:
123 129 # See if msg_or_header is the entire message.
124 130 h = msg_or_header['header']
125 131 except KeyError:
126 132 try:
127 133 # See if msg_or_header is just the header
128 134 h = msg_or_header['msg_id']
129 135 except KeyError:
130 136 raise
131 137 else:
132 138 h = msg_or_header
133 139 if not isinstance(h, dict):
134 140 h = dict(h)
135 141 return h
136 142
137 143 def rekey(dikt):
138 144 """Rekey a dict that has been forced to use str keys where there should be
139 145 ints by json. This belongs in the jsonutil added by fperez."""
140 146 for k in dikt.iterkeys():
141 147 if isinstance(k, str):
142 148 ik=fk=None
143 149 try:
144 150 ik = int(k)
145 151 except ValueError:
146 152 try:
147 153 fk = float(k)
148 154 except ValueError:
149 155 continue
150 156 if ik is not None:
151 157 nk = ik
152 158 else:
153 159 nk = fk
154 160 if nk in dikt:
155 161 raise KeyError("already have key %r"%nk)
156 162 dikt[nk] = dikt.pop(k)
157 163 return dikt
158 164
159 165 def serialize_object(obj, threshold=64e-6):
160 166 """Serialize an object into a list of sendable buffers.
161 167
162 168 Parameters
163 169 ----------
164 170
165 171 obj : object
166 172 The object to be serialized
167 173 threshold : float
168 174 The threshold for not double-pickling the content.
169 175
170 176
171 177 Returns
172 178 -------
173 179 ('pmd', [bufs]) :
174 180 where pmd is the pickled metadata wrapper,
175 181 bufs is a list of data buffers
176 182 """
177 183 databuffers = []
178 184 if isinstance(obj, (list, tuple)):
179 185 clist = canSequence(obj)
180 186 slist = map(serialize, clist)
181 187 for s in slist:
182 188 if s.typeDescriptor in ('buffer', 'ndarray') or s.getDataSize() > threshold:
183 189 databuffers.append(s.getData())
184 190 s.data = None
185 191 return pickle.dumps(slist,-1), databuffers
186 192 elif isinstance(obj, dict):
187 193 sobj = {}
188 194 for k in sorted(obj.iterkeys()):
189 195 s = serialize(can(obj[k]))
190 196 if s.typeDescriptor in ('buffer', 'ndarray') or s.getDataSize() > threshold:
191 197 databuffers.append(s.getData())
192 198 s.data = None
193 199 sobj[k] = s
194 200 return pickle.dumps(sobj,-1),databuffers
195 201 else:
196 202 s = serialize(can(obj))
197 203 if s.typeDescriptor in ('buffer', 'ndarray') or s.getDataSize() > threshold:
198 204 databuffers.append(s.getData())
199 205 s.data = None
200 206 return pickle.dumps(s,-1),databuffers
201 207
202 208
203 209 def unserialize_object(bufs):
204 210 """reconstruct an object serialized by serialize_object from data buffers."""
205 211 bufs = list(bufs)
206 212 sobj = pickle.loads(bufs.pop(0))
207 213 if isinstance(sobj, (list, tuple)):
208 214 for s in sobj:
209 215 if s.data is None:
210 216 s.data = bufs.pop(0)
211 217 return uncanSequence(map(unserialize, sobj)), bufs
212 218 elif isinstance(sobj, dict):
213 219 newobj = {}
214 220 for k in sorted(sobj.iterkeys()):
215 221 s = sobj[k]
216 222 if s.data is None:
217 223 s.data = bufs.pop(0)
218 224 newobj[k] = uncan(unserialize(s))
219 225 return newobj, bufs
220 226 else:
221 227 if sobj.data is None:
222 228 sobj.data = bufs.pop(0)
223 229 return uncan(unserialize(sobj)), bufs
224 230
225 231 def pack_apply_message(f, args, kwargs, threshold=64e-6):
226 232 """pack up a function, args, and kwargs to be sent over the wire
227 233 as a series of buffers. Any object whose data is larger than `threshold`
228 234 will not have their data copied (currently only numpy arrays support zero-copy)"""
229 235 msg = [pickle.dumps(can(f),-1)]
230 236 databuffers = [] # for large objects
231 237 sargs, bufs = serialize_object(args,threshold)
232 238 msg.append(sargs)
233 239 databuffers.extend(bufs)
234 240 skwargs, bufs = serialize_object(kwargs,threshold)
235 241 msg.append(skwargs)
236 242 databuffers.extend(bufs)
237 243 msg.extend(databuffers)
238 244 return msg
239 245
240 246 def unpack_apply_message(bufs, g=None, copy=True):
241 247 """unpack f,args,kwargs from buffers packed by pack_apply_message()
242 248 Returns: original f,args,kwargs"""
243 249 bufs = list(bufs) # allow us to pop
244 250 assert len(bufs) >= 3, "not enough buffers!"
245 251 if not copy:
246 252 for i in range(3):
247 253 bufs[i] = bufs[i].bytes
248 254 cf = pickle.loads(bufs.pop(0))
249 255 sargs = list(pickle.loads(bufs.pop(0)))
250 256 skwargs = dict(pickle.loads(bufs.pop(0)))
251 257 # print sargs, skwargs
252 258 f = uncan(cf, g)
253 259 for sa in sargs:
254 260 if sa.data is None:
255 261 m = bufs.pop(0)
256 262 if sa.getTypeDescriptor() in ('buffer', 'ndarray'):
257 263 if copy:
258 264 sa.data = buffer(m)
259 265 else:
260 266 sa.data = m.buffer
261 267 else:
262 268 if copy:
263 269 sa.data = m
264 270 else:
265 271 sa.data = m.bytes
266 272
267 273 args = uncanSequence(map(unserialize, sargs), g)
268 274 kwargs = {}
269 275 for k in sorted(skwargs.iterkeys()):
270 276 sa = skwargs[k]
271 277 if sa.data is None:
272 278 sa.data = bufs.pop(0)
273 279 kwargs[k] = uncan(unserialize(sa), g)
274 280
275 281 return f,args,kwargs
276 282
277 283 class StreamSession(object):
278 284 """tweaked version of IPython.zmq.session.Session, for development in Parallel"""
279 285 debug=False
280 286 key=None
281 287
282 288 def __init__(self, username=None, session=None, packer=None, unpacker=None, key=None, keyfile=None):
283 289 if username is None:
284 290 username = os.environ.get('USER','username')
285 291 self.username = username
286 292 if session is None:
287 293 self.session = str(uuid.uuid4())
288 294 else:
289 295 self.session = session
290 296 self.msg_id = str(uuid.uuid4())
291 297 if packer is None:
292 298 self.pack = default_packer
293 299 else:
294 300 if not callable(packer):
295 301 raise TypeError("packer must be callable, not %s"%type(packer))
296 302 self.pack = packer
297 303
298 304 if unpacker is None:
299 305 self.unpack = default_unpacker
300 306 else:
301 307 if not callable(unpacker):
302 308 raise TypeError("unpacker must be callable, not %s"%type(unpacker))
303 309 self.unpack = unpacker
304 310
305 311 if key is not None and keyfile is not None:
306 312 raise TypeError("Must specify key OR keyfile, not both")
307 313 if keyfile is not None:
308 314 with open(keyfile) as f:
309 315 self.key = f.read().strip()
310 316 else:
311 317 self.key = key
312 318 # print key, keyfile, self.key
313 319 self.none = self.pack({})
314 320
315 321 def msg_header(self, msg_type):
316 322 h = msg_header(self.msg_id, msg_type, self.username, self.session)
317 323 self.msg_id = str(uuid.uuid4())
318 324 return h
319 325
320 326 def msg(self, msg_type, content=None, parent=None, subheader=None):
321 327 msg = {}
322 328 msg['header'] = self.msg_header(msg_type)
323 329 msg['msg_id'] = msg['header']['msg_id']
324 330 msg['parent_header'] = {} if parent is None else extract_header(parent)
325 331 msg['msg_type'] = msg_type
326 332 msg['content'] = {} if content is None else content
327 333 sub = {} if subheader is None else subheader
328 334 msg['header'].update(sub)
329 335 return msg
330 336
331 337 def check_key(self, msg_or_header):
332 338 """Check that a message's header has the right key"""
333 339 if self.key is None:
334 340 return True
335 341 header = extract_header(msg_or_header)
336 342 return header.get('key', None) == self.key
337 343
338 344
339 345 def send(self, stream, msg_or_type, content=None, buffers=None, parent=None, subheader=None, ident=None):
340 346 """Build and send a message via stream or socket.
341 347
342 348 Parameters
343 349 ----------
344 350
345 351 stream : zmq.Socket or ZMQStream
346 352 the socket-like object used to send the data
347 353 msg_or_type : str or Message/dict
348 354 Normally, msg_or_type will be a msg_type unless a message is being sent more
349 355 than once.
350 356
351 357 Returns
352 358 -------
353 359 (msg,sent) : tuple
354 360 msg : Message
355 361 the nice wrapped dict-like object containing the headers
356 362
357 363 """
358 364 if isinstance(msg_or_type, (Message, dict)):
359 365 # we got a Message, not a msg_type
360 366 # don't build a new Message
361 367 msg = msg_or_type
362 368 content = msg['content']
363 369 else:
364 370 msg = self.msg(msg_or_type, content, parent, subheader)
365 371 buffers = [] if buffers is None else buffers
366 372 to_send = []
367 373 if isinstance(ident, list):
368 374 # accept list of idents
369 375 to_send.extend(ident)
370 376 elif ident is not None:
371 377 to_send.append(ident)
372 378 to_send.append(DELIM)
373 379 if self.key is not None:
374 380 to_send.append(self.key)
375 381 to_send.append(self.pack(msg['header']))
376 382 to_send.append(self.pack(msg['parent_header']))
377 383
378 384 if content is None:
379 385 content = self.none
380 386 elif isinstance(content, dict):
381 387 content = self.pack(content)
382 388 elif isinstance(content, str):
383 389 # content is already packed, as in a relayed message
384 390 pass
385 391 else:
386 392 raise TypeError("Content incorrect type: %s"%type(content))
387 393 to_send.append(content)
388 394 flag = 0
389 395 if buffers:
390 396 flag = zmq.SNDMORE
391 397 stream.send_multipart(to_send, flag, copy=False)
392 398 for b in buffers[:-1]:
393 399 stream.send(b, flag, copy=False)
394 400 if buffers:
395 401 stream.send(buffers[-1], copy=False)
396 402 omsg = Message(msg)
397 403 if self.debug:
398 404 pprint.pprint(omsg)
399 405 pprint.pprint(to_send)
400 406 pprint.pprint(buffers)
401 407 return omsg
402 408
403 409 def send_raw(self, stream, msg, flags=0, copy=True, ident=None):
404 410 """Send a raw message via ident path.
405 411
406 412 Parameters
407 413 ----------
408 414 msg : list of sendable buffers"""
409 415 to_send = []
410 416 if isinstance(ident, str):
411 417 ident = [ident]
412 418 if ident is not None:
413 419 to_send.extend(ident)
414 420 to_send.append(DELIM)
415 421 if self.key is not None:
416 422 to_send.append(self.key)
417 423 to_send.extend(msg)
418 424 stream.send_multipart(msg, flags, copy=copy)
419 425
420 426 def recv(self, socket, mode=zmq.NOBLOCK, content=True, copy=True):
421 427 """receives and unpacks a message
422 428 returns [idents], msg"""
423 429 if isinstance(socket, ZMQStream):
424 430 socket = socket.socket
425 431 try:
426 432 msg = socket.recv_multipart(mode)
427 433 except zmq.ZMQError as e:
428 434 if e.errno == zmq.EAGAIN:
429 435 # We can convert EAGAIN to None as we know in this case
430 436 # recv_json won't return None.
431 437 return None
432 438 else:
433 439 raise
434 440 # return an actual Message object
435 441 # determine the number of idents by trying to unpack them.
436 442 # this is terrible:
437 443 idents, msg = self.feed_identities(msg, copy)
438 444 try:
439 445 return idents, self.unpack_message(msg, content=content, copy=copy)
440 446 except Exception as e:
441 447 print (idents, msg)
442 448 # TODO: handle it
443 449 raise e
444 450
445 451 def feed_identities(self, msg, copy=True):
446 452 """feed until DELIM is reached, then return the prefix as idents and remainder as
447 453 msg. This is easily broken by setting an IDENT to DELIM, but that would be silly.
448 454
449 455 Parameters
450 456 ----------
451 457 msg : a list of Message or bytes objects
452 458 the message to be split
453 459 copy : bool
454 460 flag determining whether the arguments are bytes or Messages
455 461
456 462 Returns
457 463 -------
458 464 (idents,msg) : two lists
459 465 idents will always be a list of bytes - the indentity prefix
460 466 msg will be a list of bytes or Messages, unchanged from input
461 467 msg should be unpackable via self.unpack_message at this point.
462 468 """
463 469 msg = list(msg)
464 470 idents = []
465 471 while len(msg) > 3:
466 472 if copy:
467 473 s = msg[0]
468 474 else:
469 475 s = msg[0].bytes
470 476 if s == DELIM:
471 477 msg.pop(0)
472 478 break
473 479 else:
474 480 idents.append(s)
475 481 msg.pop(0)
476 482
477 483 return idents, msg
478 484
479 485 def unpack_message(self, msg, content=True, copy=True):
480 486 """Return a message object from the format
481 487 sent by self.send.
482 488
483 489 Parameters:
484 490 -----------
485 491
486 492 content : bool (True)
487 493 whether to unpack the content dict (True),
488 494 or leave it serialized (False)
489 495
490 496 copy : bool (True)
491 497 whether to return the bytes (True),
492 498 or the non-copying Message object in each place (False)
493 499
494 500 """
495 501 ikey = int(self.key is not None)
496 502 minlen = 3 + ikey
497 503 if not len(msg) >= minlen:
498 504 raise TypeError("malformed message, must have at least %i elements"%minlen)
499 505 message = {}
500 506 if not copy:
501 507 for i in range(minlen):
502 508 msg[i] = msg[i].bytes
503 509 if ikey:
504 510 if not self.key == msg[0]:
505 511 raise KeyError("Invalid Session Key: %s"%msg[0])
506 512 message['header'] = self.unpack(msg[ikey+0])
507 513 message['msg_type'] = message['header']['msg_type']
508 514 message['parent_header'] = self.unpack(msg[ikey+1])
509 515 if content:
510 516 message['content'] = self.unpack(msg[ikey+2])
511 517 else:
512 518 message['content'] = msg[ikey+2]
513 519
514 520 # message['buffers'] = msg[3:]
515 521 # else:
516 522 # message['header'] = self.unpack(msg[0].bytes)
517 523 # message['msg_type'] = message['header']['msg_type']
518 524 # message['parent_header'] = self.unpack(msg[1].bytes)
519 525 # if content:
520 526 # message['content'] = self.unpack(msg[2].bytes)
521 527 # else:
522 528 # message['content'] = msg[2].bytes
523 529
524 530 message['buffers'] = msg[ikey+3:]# [ m.buffer for m in msg[3:] ]
525 531 return message
526 532
527 533
528 534
529 535 def test_msg2obj():
530 536 am = dict(x=1)
531 537 ao = Message(am)
532 538 assert ao.x == am['x']
533 539
534 540 am['y'] = dict(z=1)
535 541 ao = Message(am)
536 542 assert ao.y.z == am['y']['z']
537 543
538 544 k1, k2 = 'y', 'z'
539 545 assert ao[k1][k2] == am[k1][k2]
540 546
541 547 am2 = dict(ao)
542 548 assert am['x'] == am2['x']
543 549 assert am['y']['z'] == am2['y']['z']
@@ -1,250 +1,251 b''
1 1 #!/usr/bin/env python
2 2 # -*- coding: utf-8 -*-
3 3 """Setup script for IPython.
4 4
5 5 Under Posix environments it works like a typical setup.py script.
6 6 Under Windows, the command sdist is not supported, since IPython
7 7 requires utilities which are not available under Windows."""
8 8
9 9 #-----------------------------------------------------------------------------
10 10 # Copyright (c) 2008-2010, IPython Development Team.
11 11 # Copyright (c) 2001-2007, Fernando Perez <fernando.perez@colorado.edu>
12 12 # Copyright (c) 2001, Janko Hauser <jhauser@zscout.de>
13 13 # Copyright (c) 2001, Nathaniel Gray <n8gray@caltech.edu>
14 14 #
15 15 # Distributed under the terms of the Modified BSD License.
16 16 #
17 17 # The full license is in the file COPYING.txt, distributed with this software.
18 18 #-----------------------------------------------------------------------------
19 19
20 20 #-----------------------------------------------------------------------------
21 21 # Minimal Python version sanity check
22 22 #-----------------------------------------------------------------------------
23 23
24 24 import sys
25 25
26 26 # This check is also made in IPython/__init__, don't forget to update both when
27 27 # changing Python version requirements.
28 28 if sys.version[0:3] < '2.6':
29 29 error = """\
30 30 ERROR: 'IPython requires Python Version 2.6 or above.'
31 31 Exiting."""
32 32 print >> sys.stderr, error
33 33 sys.exit(1)
34 34
35 35 # At least we're on the python version we need, move on.
36 36
37 37 #-------------------------------------------------------------------------------
38 38 # Imports
39 39 #-------------------------------------------------------------------------------
40 40
41 41 # Stdlib imports
42 42 import os
43 43 import shutil
44 44
45 45 from glob import glob
46 46
47 47 # BEFORE importing distutils, remove MANIFEST. distutils doesn't properly
48 48 # update it when the contents of directories change.
49 49 if os.path.exists('MANIFEST'): os.remove('MANIFEST')
50 50
51 51 from distutils.core import setup
52 52
53 53 # Our own imports
54 54 from IPython.utils.path import target_update
55 55
56 56 from setupbase import (
57 57 setup_args,
58 58 find_packages,
59 59 find_package_data,
60 60 find_scripts,
61 61 find_data_files,
62 62 check_for_dependencies,
63 63 record_commit_info,
64 64 )
65 65
66 66 isfile = os.path.isfile
67 67 pjoin = os.path.join
68 68
69 69 #-----------------------------------------------------------------------------
70 70 # Function definitions
71 71 #-----------------------------------------------------------------------------
72 72
73 73 def cleanup():
74 74 """Clean up the junk left around by the build process"""
75 75 if "develop" not in sys.argv:
76 76 try:
77 77 shutil.rmtree('ipython.egg-info')
78 78 except:
79 79 try:
80 80 os.unlink('ipython.egg-info')
81 81 except:
82 82 pass
83 83
84 84 #-------------------------------------------------------------------------------
85 85 # Handle OS specific things
86 86 #-------------------------------------------------------------------------------
87 87
88 88 if os.name == 'posix':
89 89 os_name = 'posix'
90 90 elif os.name in ['nt','dos']:
91 91 os_name = 'windows'
92 92 else:
93 93 print 'Unsupported operating system:',os.name
94 94 sys.exit(1)
95 95
96 96 # Under Windows, 'sdist' has not been supported. Now that the docs build with
97 97 # Sphinx it might work, but let's not turn it on until someone confirms that it
98 98 # actually works.
99 99 if os_name == 'windows' and 'sdist' in sys.argv:
100 100 print 'The sdist command is not available under Windows. Exiting.'
101 101 sys.exit(1)
102 102
103 103 #-------------------------------------------------------------------------------
104 104 # Things related to the IPython documentation
105 105 #-------------------------------------------------------------------------------
106 106
107 107 # update the manuals when building a source dist
108 108 if len(sys.argv) >= 2 and sys.argv[1] in ('sdist','bdist_rpm'):
109 109 import textwrap
110 110
111 111 # List of things to be updated. Each entry is a triplet of args for
112 112 # target_update()
113 113 to_update = [
114 114 # FIXME - Disabled for now: we need to redo an automatic way
115 115 # of generating the magic info inside the rst.
116 116 #('docs/magic.tex',
117 117 #['IPython/Magic.py'],
118 118 #"cd doc && ./update_magic.sh" ),
119 119
120 120 ('docs/man/ipcluster.1.gz',
121 121 ['docs/man/ipcluster.1'],
122 122 'cd docs/man && gzip -9c ipcluster.1 > ipcluster.1.gz'),
123 123
124 124 ('docs/man/ipcontroller.1.gz',
125 125 ['docs/man/ipcontroller.1'],
126 126 'cd docs/man && gzip -9c ipcontroller.1 > ipcontroller.1.gz'),
127 127
128 128 ('docs/man/ipengine.1.gz',
129 129 ['docs/man/ipengine.1'],
130 130 'cd docs/man && gzip -9c ipengine.1 > ipengine.1.gz'),
131 131
132 132 ('docs/man/ipython.1.gz',
133 133 ['docs/man/ipython.1'],
134 134 'cd docs/man && gzip -9c ipython.1 > ipython.1.gz'),
135 135
136 136 ('docs/man/ipython-wx.1.gz',
137 137 ['docs/man/ipython-wx.1'],
138 138 'cd docs/man && gzip -9c ipython-wx.1 > ipython-wx.1.gz'),
139 139
140 140 ('docs/man/ipythonx.1.gz',
141 141 ['docs/man/ipythonx.1'],
142 142 'cd docs/man && gzip -9c ipythonx.1 > ipythonx.1.gz'),
143 143
144 144 ('docs/man/irunner.1.gz',
145 145 ['docs/man/irunner.1'],
146 146 'cd docs/man && gzip -9c irunner.1 > irunner.1.gz'),
147 147
148 148 ('docs/man/pycolor.1.gz',
149 149 ['docs/man/pycolor.1'],
150 150 'cd docs/man && gzip -9c pycolor.1 > pycolor.1.gz'),
151 151 ]
152 152
153 153 # Only build the docs if sphinx is present
154 154 try:
155 155 import sphinx
156 156 except ImportError:
157 157 pass
158 158 else:
159 159 # The Makefile calls the do_sphinx scripts to build html and pdf, so
160 160 # just one target is enough to cover all manual generation
161 161
162 162 # First, compute all the dependencies that can force us to rebuild the
163 163 # docs. Start with the main release file that contains metadata
164 164 docdeps = ['IPython/core/release.py']
165 165 # Inculde all the reST sources
166 166 pjoin = os.path.join
167 167 for dirpath,dirnames,filenames in os.walk('docs/source'):
168 168 if dirpath in ['_static','_templates']:
169 169 continue
170 170 docdeps += [ pjoin(dirpath,f) for f in filenames
171 171 if f.endswith('.txt') ]
172 172 # and the examples
173 173 for dirpath,dirnames,filenames in os.walk('docs/example'):
174 174 docdeps += [ pjoin(dirpath,f) for f in filenames
175 175 if not f.endswith('~') ]
176 176 # then, make them all dependencies for the main PDF (the html will get
177 177 # auto-generated as well).
178 178 to_update.append(
179 179 ('docs/dist/ipython.pdf',
180 180 docdeps,
181 181 "cd docs && make dist")
182 182 )
183 183
184 184 [ target_update(*t) for t in to_update ]
185 185
186 186 #---------------------------------------------------------------------------
187 187 # Find all the packages, package data, scripts and data_files
188 188 #---------------------------------------------------------------------------
189 189
190 190 packages = find_packages()
191 191 package_data = find_package_data()
192 192 scripts = find_scripts()
193 193 data_files = find_data_files()
194 194
195 195 #---------------------------------------------------------------------------
196 196 # Handle dependencies and setuptools specific things
197 197 #---------------------------------------------------------------------------
198 198
199 199 # For some commands, use setuptools. Note that we do NOT list install here!
200 200 # If you want a setuptools-enhanced install, just run 'setupegg.py install'
201 201 if len(set(('develop', 'sdist', 'release', 'bdist_egg', 'bdist_rpm',
202 202 'bdist', 'bdist_dumb', 'bdist_wininst', 'install_egg_info',
203 203 'build_sphinx', 'egg_info', 'easy_install', 'upload',
204 204 )).intersection(sys.argv)) > 0:
205 205 import setuptools
206 206
207 207 # This dict is used for passing extra arguments that are setuptools
208 208 # specific to setup
209 209 setuptools_extra_args = {}
210 210
211 211 if 'setuptools' in sys.modules:
212 212 setuptools_extra_args['zip_safe'] = False
213 213 setuptools_extra_args['entry_points'] = {
214 214 'console_scripts': [
215 215 'ipython = IPython.frontend.terminal.ipapp:launch_new_instance',
216 216 'ipython-qtconsole = IPython.frontend.qt.console.ipythonqt:main',
217 217 'pycolor = IPython.utils.PyColorize:main',
218 'ipcontrollerz = IPython.zmq.parallel.controller:main',
219 'ipenginez = IPython.zmq.parallel.engine:main',
218 'ipcontrollerz = IPython.zmq.parallel.ipcontrollerapp:launch_new_instance',
219 'ipenginez = IPython.zmq.parallel.ipengineapp:launch_new_instance',
220 'iploggerz = IPython.zmq.parallel.iploggerapp:launch_new_instance',
220 221 'ipclusterz = IPython.zmq.parallel.ipcluster:main',
221 222 'iptest = IPython.testing.iptest:main',
222 223 'irunner = IPython.lib.irunner:main'
223 224 ]
224 225 }
225 226 setup_args['extras_require'] = dict(
226 227 doc='Sphinx>=0.3',
227 228 test='nose>=0.10.1',
228 229 security='pyOpenSSL>=0.6'
229 230 )
230 231 else:
231 232 # If we are running without setuptools, call this function which will
232 233 # check for dependencies an inform the user what is needed. This is
233 234 # just to make life easy for users.
234 235 check_for_dependencies()
235 236
236 237 #---------------------------------------------------------------------------
237 238 # Do the actual setup now
238 239 #---------------------------------------------------------------------------
239 240
240 241 setup_args['cmdclass'] = {'build_py': record_commit_info('IPython')}
241 242 setup_args['packages'] = packages
242 243 setup_args['package_data'] = package_data
243 244 setup_args['scripts'] = scripts
244 245 setup_args['data_files'] = data_files
245 246 setup_args.update(setuptools_extra_args)
246 247
247 248
248 249 if __name__ == '__main__':
249 250 setup(**setup_args)
250 251 cleanup()
General Comments 0
You need to be logged in to leave comments. Login now